diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 150ff738c15..5ffe7fbc74c 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/keypath" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" ) @@ -79,11 +80,12 @@ type RegionSyncer struct { clientCtx context.Context clientCancel context.CancelFunc } - server Server - wg sync.WaitGroup - history *historyBuffer - limit *ratelimit.RateLimiter - tlsConfig *grpcutil.TLSConfig + broadcastDone chan struct{} + server Server + wg sync.WaitGroup + history *historyBuffer + limit *ratelimit.RateLimiter + tlsConfig *grpcutil.TLSConfig // status when as client streamingRunning atomic.Bool } @@ -96,10 +98,11 @@ func NewRegionSyncer(s Server) *RegionSyncer { return nil } syncer := &RegionSyncer{ - server: s, - history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), - limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), - tlsConfig: s.GetTLSConfig(), + server: s, + history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), + limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), + tlsConfig: s.GetTLSConfig(), + broadcastDone: make(chan struct{}, 1), } syncer.mu.streams = make(map[string]ServerStream) return syncer @@ -160,13 +163,13 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor RegionLeaders: leaders, Buckets: buckets, } - s.broadcast(regions) + s.broadcast(ctx, regions) case <-ticker.C: alive := &pdpb.SyncRegionResponse{ Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()}, StartIndex: s.history.getNextIndex(), } - s.broadcast(alive) + s.broadcast(ctx, alive) } requests = requests[:0] stats = stats[:0] @@ -344,23 +347,23 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) { s.mu.streams[name] = stream } -func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) { - var failed []string - s.mu.RLock() - for name, sender := range s.mu.streams { - err := sender.Send(regions) - if err != nil { - log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err)) - failed = append(failed, name) - } - } - s.mu.RUnlock() - if len(failed) > 0 { +func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) { + go func() { + defer logutil.LogPanic() s.mu.Lock() - for _, name := range failed { - delete(s.mu.streams, name) - log.Info("region syncer delete the stream", zap.String("stream", name)) + defer s.mu.Unlock() + for name, sender := range s.mu.streams { + err := sender.Send(regions) + if err != nil { + log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err)) + delete(s.mu.streams, name) + log.Info("region syncer delete the stream", zap.String("stream", name)) + } } - s.mu.Unlock() + s.broadcastDone <- struct{}{} + }() + select { + case <-s.broadcastDone: + case <-ctx.Done(): } }