From 47d2311d873d44d93a9472f116bf2e0cf0c40291 Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Wed, 15 Nov 2023 12:36:22 +0000 Subject: [PATCH] dispatcher: migrate to SubConn.Shutdown and deprecate Addresses See https://github.com/grpc/grpc-go/pull/6471/files and https://github.com/grpc/grpc-go/pull/6493/files --- broker/protocol/dispatcher.go | 21 ++++++++++++--------- broker/protocol/dispatcher_test.go | 12 ++++++------ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/broker/protocol/dispatcher.go b/broker/protocol/dispatcher.go index 80f3b5d2..d078e73f 100644 --- a/broker/protocol/dispatcher.go +++ b/broker/protocol/dispatcher.go @@ -55,7 +55,7 @@ func WithDispatchItemRoute(ctx context.Context, dr DispatchRouter, item string, id = rt.Members[rt.Primary] } return context.WithValue(ctx, dispatchRouteCtxKey{}, - dispatchRoute{route: rt, id: id, item: item, DispatchRouter: dr}) + dispatchRoute{route: rt, id: id, item: item, DispatchRouter: dr}) } // DispatchRouter routes item to Routes, and observes item Routes. @@ -141,8 +141,11 @@ func (d *dispatcher) updateSubConnState(sc balancer.SubConn, state balancer.SubC }) } +// This method has been deprecated but may still be in use. See https://github.com/grpc/grpc-go/pull/6481 +// For updates to the logic, apply them to `updateSubConnState` instead which has been integrated with the new +// StateListener interface func (d *dispatcher) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { - d.updateSubConnState(sc, state) + d.updateSubConnState(sc, state) } // markedSubConn tracks the last mark associated with a SubConn. @@ -181,10 +184,10 @@ func (d *dispatcher) Pick(info balancer.PickInfo) (balancer.PickResult, error) { Addr: d.idToAddr(dr.route, dispatchID), }}, balancer.NewSubConnOptions{ - StateListener: func(state balancer.SubConnState) { - d.updateSubConnState(msc.subConn, state) - }, - }, + StateListener: func(state balancer.SubConnState) { + d.updateSubConnState(msc.subConn, state) + }, + }, ); err != nil { return balancer.PickResult{}, err } @@ -207,7 +210,7 @@ func (d *dispatcher) Pick(info balancer.PickInfo) (balancer.PickResult, error) { if tr, ok := trace.FromContext(info.Ctx); ok { tr.LazyPrintf("Pick(Route: %s, ID: %s) => %s (%s)", - &dr.route, &dr.id, &dispatchID, state) + &dr.route, &dr.id, &dispatchID, state) } switch state { case connectivity.Idle, connectivity.Connecting: @@ -301,10 +304,10 @@ func (d *dispatcher) sweep() { d.mu.Unlock() for _, sc := range toSweep { - // RemoveSubConn begins SubConn shutdown. We expect to see a + // SubConn.Shutdown begins a shutdown. We expect to see a // HandleSubConnStateChange with connectivity.Shutdown, at which // point we'll de-index it. - d.cc.RemoveSubConn(sc) + sc.Shutdown() } } diff --git a/broker/protocol/dispatcher_test.go b/broker/protocol/dispatcher_test.go index 4c96baed..db3db801 100644 --- a/broker/protocol/dispatcher_test.go +++ b/broker/protocol/dispatcher_test.go @@ -72,7 +72,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) { // Case: Specific remote peer is dispatched to. ctx = WithDispatchRoute(context.Background(), - buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"}) + buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"}) result, err = disp.Pick(balancer.PickInfo{Ctx: ctx}) c.Check(err, gc.Equals, balancer.ErrNoSubConnAvailable) @@ -130,7 +130,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) { mockSubConn{Name: "remote.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) ctx = WithDispatchRoute(context.Background(), - buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"}) + buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"}) _, err = disp.Pick(balancer.PickInfo{Ctx: ctx}) c.Check(err, gc.Equals, balancer.ErrTransientFailure) @@ -253,7 +253,7 @@ func (s mockSubConn) UpdateAddresses([]resolver.Address) { panic("deprecated") } func (s mockSubConn) UpdateState(state balancer.SubConnState) { s.disp.updateSubConnState(s, state) } func (s mockSubConn) Connect() {} func (s mockSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) { - return nil, func() {} + return nil, func() {} } func (s mockSubConn) Shutdown() { var c = s.disp.cc.(*mockClientConn) @@ -266,12 +266,12 @@ func (c *mockClientConn) NewSubConn(a []resolver.Address, _ balancer.NewSubConnO return sc, c.err } -func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) {} +func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) { panic("deprecated") } func (c *mockClientConn) UpdateState(balancer.State) {} func (c *mockClientConn) ResolveNow(resolver.ResolveNowOptions) {} func (c *mockClientConn) Target() string { return "default.addr" } -func (c *mockClientConn) RemoveSubConn(balancer.SubConn) { - panic("deprecated") +func (c *mockClientConn) RemoveSubConn(sc balancer.SubConn) { + sc.Shutdown() } type mockRouter struct{ invalidated string }