diff --git a/app/log/command/command.go b/app/log/command/command.go index c6e4481ee..0c63d2bc3 100644 --- a/app/log/command/command.go +++ b/app/log/command/command.go @@ -4,14 +4,11 @@ package command import ( "context" - "time" - - grpc "google.golang.org/grpc" - core "github.com/v2fly/v2ray-core/v5" "github.com/v2fly/v2ray-core/v5/app/log" "github.com/v2fly/v2ray-core/v5/common" cmlog "github.com/v2fly/v2ray-core/v5/common/log" + grpc "google.golang.org/grpc" ) // LoggerServer is the implemention of LoggerService @@ -44,22 +41,19 @@ func (s *LoggerServer) FollowLog(_ *FollowLogRequest, stream LoggerService_Follo if !ok { return newError("logger not support following") } - var err error + done := make(chan struct{}) f := func(msg cmlog.Message) { - err = stream.Send(&FollowLogResponse{ + err := stream.Send(&FollowLogResponse{ Message: msg.String(), }) + if err != nil { + close(done) + } } follower.AddFollower(f) defer follower.RemoveFollower(f) - ticker := time.NewTicker(time.Second) - for { - <-ticker.C - if err != nil { - ticker.Stop() - return nil - } - } + <-done + return nil } func (s *LoggerServer) mustEmbedUnimplementedLoggerServiceServer() {} diff --git a/app/observatory/burst/healthping.go b/app/observatory/burst/healthping.go index 8e50e46e6..f4a0596e6 100644 --- a/app/observatory/burst/healthping.go +++ b/app/observatory/burst/healthping.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "strings" - sync "sync" + "sync" "time" "github.com/v2fly/v2ray-core/v5/common/dice" @@ -21,9 +21,10 @@ type HealthPingSettings struct { // HealthPing is the health checker for balancers type HealthPing struct { - ctx context.Context - access sync.Mutex - ticker *time.Ticker + ctx context.Context + access sync.Mutex + ticker *time.Ticker + tickerClose chan struct{} Settings *HealthPingSettings Results map[string]*HealthPingRTTS @@ -72,7 +73,9 @@ func (h *HealthPing) StartScheduler(selector func() ([]string, error)) { } interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount) ticker := time.NewTicker(interval) + tickerClose := make(chan struct{}) h.ticker = ticker + h.tickerClose = tickerClose go func() { for { go func() { @@ -84,9 +87,11 @@ func (h *HealthPing) StartScheduler(selector func() ([]string, error)) { h.doCheck(tags, interval, h.Settings.SamplingCount) h.Cleanup(tags) }() - _, ok := <-ticker.C - if !ok { - break + select { + case <-ticker.C: + continue + case <-tickerClose: + return } } }() @@ -94,8 +99,13 @@ func (h *HealthPing) StartScheduler(selector func() ([]string, error)) { // StopScheduler implements the HealthChecker func (h *HealthPing) StopScheduler() { + if h.ticker == nil { + return + } h.ticker.Stop() h.ticker = nil + close(h.tickerClose) + h.tickerClose = nil } // Check implements the HealthChecker