Skip to content

Commit

Permalink
dispatcher: migrate to SubConn.Shutdown and deprecate Addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Nov 21, 2023
1 parent 20100d9 commit 514fc3b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
21 changes: 12 additions & 9 deletions broker/protocol/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func WithDispatchItemRoute(ctx context.Context, dr DispatchRouter, item string,
id = rt.Members[rt.Primary]
}
return context.WithValue(ctx, dispatchRouteCtxKey{},
dispatchRoute{route: rt, id: id, item: item, DispatchRouter: dr})
dispatchRoute{route: rt, id: id, item: item, DispatchRouter: dr})
}

// DispatchRouter routes item to Routes, and observes item Routes.
Expand Down Expand Up @@ -152,8 +152,11 @@ func (d *dispatcher) updateSubConnState(sc balancer.SubConn, state balancer.SubC
})
}

// This method has been deprecated but may still be in use. See https://github.com/grpc/grpc-go/pull/6481
// For updates to the logic, apply them to `updateSubConnState` instead which has been integrated with the new
// StateListener interface
func (d *dispatcher) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
d.updateSubConnState(sc, state)
d.updateSubConnState(sc, state)
}

// markedSubConn tracks the last mark associated with a SubConn.
Expand Down Expand Up @@ -192,10 +195,10 @@ func (d *dispatcher) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
Addr: d.idToAddr(dr.route, dispatchID),
}},
balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
d.updateSubConnState(msc.subConn, state)
},
},
StateListener: func(state balancer.SubConnState) {
d.updateSubConnState(msc.subConn, state)
},
},
); err != nil {
return balancer.PickResult{}, err
}
Expand All @@ -218,7 +221,7 @@ func (d *dispatcher) Pick(info balancer.PickInfo) (balancer.PickResult, error) {

if tr, ok := trace.FromContext(info.Ctx); ok {
tr.LazyPrintf("Pick(Route: %s, ID: %s) => %s (%s)",
&dr.route, &dr.id, &dispatchID, state)
&dr.route, &dr.id, &dispatchID, state)
}
switch state {
case connectivity.Idle, connectivity.Connecting:
Expand Down Expand Up @@ -312,10 +315,10 @@ func (d *dispatcher) sweep() {
d.mu.Unlock()

for _, sc := range toSweep {
// RemoveSubConn begins SubConn shutdown. We expect to see a
// SubConn.Shutdown begins a shutdown. We expect to see a
// HandleSubConnStateChange with connectivity.Shutdown, at which
// point we'll de-index it.
d.cc.RemoveSubConn(sc)
sc.Shutdown()
}
}

Expand Down
12 changes: 6 additions & 6 deletions broker/protocol/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {

// Case: Specific remote peer is dispatched to.
ctx = WithDispatchRoute(context.Background(),
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})

result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrNoSubConnAvailable)
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
mockSubConn{Name: "remote.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})

ctx = WithDispatchRoute(context.Background(),
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})

_, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrTransientFailure)
Expand Down Expand Up @@ -253,7 +253,7 @@ func (s mockSubConn) UpdateAddresses([]resolver.Address) { panic("deprecated") }
func (s mockSubConn) UpdateState(state balancer.SubConnState) { s.disp.updateSubConnState(s, state) }
func (s mockSubConn) Connect() {}
func (s mockSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, func() {}
return nil, func() {}
}
func (s mockSubConn) Shutdown() {
var c = s.disp.cc.(*mockClientConn)
Expand All @@ -266,12 +266,12 @@ func (c *mockClientConn) NewSubConn(a []resolver.Address, _ balancer.NewSubConnO
return sc, c.err
}

func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) {}
func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) { panic("deprecated") }
func (c *mockClientConn) UpdateState(balancer.State) {}
func (c *mockClientConn) ResolveNow(resolver.ResolveNowOptions) {}
func (c *mockClientConn) Target() string { return "default.addr" }
func (c *mockClientConn) RemoveSubConn(balancer.SubConn) {
panic("deprecated")
func (c *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
}

type mockRouter struct{ invalidated string }
Expand Down

0 comments on commit 514fc3b

Please sign in to comment.