Skip to content

Commit

Permalink
exit broadcast as soon as possible
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 22, 2025
1 parent 5a5c07e commit fae0a07
Showing 1 changed file with 30 additions and 27 deletions.
57 changes: 30 additions & 27 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
)

Expand Down Expand Up @@ -79,11 +80,12 @@ type RegionSyncer struct {
clientCtx context.Context
clientCancel context.CancelFunc
}
server Server
wg sync.WaitGroup
history *historyBuffer
limit *ratelimit.RateLimiter
tlsConfig *grpcutil.TLSConfig
broadcastDone chan struct{}
server Server
wg sync.WaitGroup
history *historyBuffer
limit *ratelimit.RateLimiter
tlsConfig *grpcutil.TLSConfig
// status when as client
streamingRunning atomic.Bool
}
Expand All @@ -96,10 +98,11 @@ func NewRegionSyncer(s Server) *RegionSyncer {
return nil
}
syncer := &RegionSyncer{
server: s,
history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)),
limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity),
tlsConfig: s.GetTLSConfig(),
server: s,
history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)),
limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity),
tlsConfig: s.GetTLSConfig(),
broadcastDone: make(chan struct{}, 1),
}
syncer.mu.streams = make(map[string]ServerStream)
return syncer
Expand Down Expand Up @@ -160,13 +163,13 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor
RegionLeaders: leaders,
Buckets: buckets,
}
s.broadcast(regions)
s.broadcast(ctx, regions)
case <-ticker.C:
alive := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
StartIndex: s.history.getNextIndex(),
}
s.broadcast(alive)
s.broadcast(ctx, alive)
}
requests = requests[:0]
stats = stats[:0]
Expand Down Expand Up @@ -344,23 +347,23 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) {
s.mu.streams[name] = stream
}

func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
var failed []string
s.mu.RLock()
for name, sender := range s.mu.streams {
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err))
failed = append(failed, name)
}
}
s.mu.RUnlock()
if len(failed) > 0 {
func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) {
go func() {
defer logutil.LogPanic()
s.mu.Lock()
for _, name := range failed {
delete(s.mu.streams, name)
log.Info("region syncer delete the stream", zap.String("stream", name))
defer s.mu.Unlock()
for name, sender := range s.mu.streams {
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err))
delete(s.mu.streams, name)
log.Info("region syncer delete the stream", zap.String("stream", name))
}
}
s.mu.Unlock()
s.broadcastDone <- struct{}{}
}()
select {
case <-s.broadcastDone:
case <-ctx.Done():
}
}

0 comments on commit fae0a07

Please sign in to comment.