From e3156b902d45eb2ed4cf00dfed64f4fe247afe82 Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Thu, 20 Jun 2024 21:10:14 +0100 Subject: [PATCH] Subscriptions: bi-di index on per subscription (#7770) * Subscriptions: bi-directional subscription & publish streaming. Adds SubscribeTopicEvents proto API which dynamically subscribes to pubsub topics based on https://github.com/dapr/proposals/pull/52. This is a basic gRPC implementation of the API whereby, like Subscription hot-reloading today, subscribing to a topic will reload _every_ active subscription for the current daprd. In a future PR, reloading of Subscriptions will be granular to the specific pubsub topic. Stream subscriptions are also only active once daprd declares the application as both present and ready. Dynamic stream subscriptions should be active both whether a app is running or not, as well as whether it is ready or not. This will be addressed in a future PR. Signed-off-by: joshvanl * Subscriptions: bi-directional subscription streaming- subscribe on no healthz. Refactors pubsub machinery to allow for gRPC bi-directional subcription streaming when there is no application, or the application in unhealhty. dapr/proposals#52 Signed-off-by: joshvanl * Fix unit tests Signed-off-by: joshvanl * Fix subscription allowed Signed-off-by: joshvanl * Subscriptions: bi-di index on per subscription Index on per subscription so that streams or Subscription hot reloading events will only reload that specific subscription, rather than reloading _every_ subscription for that PubSub component. This dramatically reduces disruption to topic subscriptions for a given PubSub component. Signed-off-by: joshvanl * Lock streamer when sending topic mesgae to stream connection Signed-off-by: joshvanl * Log Info when a streaming subscription unsubscribes Signed-off-by: joshvanl --------- Signed-off-by: joshvanl Signed-off-by: Jake Engelberg --- pkg/api/grpc/subscribe.go | 12 +- pkg/runtime/compstore/subscriptions.go | 78 +++++-- .../hotreload/reconciler/subscriptions.go | 2 +- pkg/runtime/processor/manager.go | 7 +- .../processor/subscriber/subscriber.go | 200 +++++++++++++----- pkg/runtime/processor/subscriptions.go | 19 +- pkg/runtime/pubsub/streamer/conn.go | 25 +-- pkg/runtime/pubsub/streamer/streamer.go | 13 +- .../daprd/subscriptions/stream/errors.go | 2 +- 9 files changed, 240 insertions(+), 118 deletions(-) diff --git a/pkg/api/grpc/subscribe.go b/pkg/api/grpc/subscribe.go index 219b264319e..8ab3ca16747 100644 --- a/pkg/api/grpc/subscribe.go +++ b/pkg/api/grpc/subscribe.go @@ -70,9 +70,8 @@ func (a *api) streamSubscribe(stream runtimev1pb.Dapr_SubscribeTopicEventsAlpha1 return errors.New("topic is required") } - // TODO: @joshvanl: handle duplicate key names. key := a.pubsubAdapterStreamer.StreamerKey(req.GetPubsubName(), req.GetTopic()) - a.Universal.CompStore().AddStreamSubscription(&subapi.Subscription{ + err = a.Universal.CompStore().AddStreamSubscription(&subapi.Subscription{ ObjectMeta: metav1.ObjectMeta{Name: key}, Spec: subapi.SubscriptionSpec{ Pubsubname: req.GetPubsubName(), @@ -82,17 +81,18 @@ func (a *api) streamSubscribe(stream runtimev1pb.Dapr_SubscribeTopicEventsAlpha1 Routes: subapi.Routes{Default: "/"}, }, }) + if err != nil { + return err + } - if err := a.processor.Subscriber().ReloadPubSub(req.GetPubsubName()); err != nil { + if err := a.processor.Subscriber().StartStreamerSubscription(key); err != nil { a.Universal.CompStore().DeleteStreamSubscription(key) return err } defer func() { + a.processor.Subscriber().StopStreamerSubscription(req.GetPubsubName(), key) a.Universal.CompStore().DeleteStreamSubscription(key) - if err := a.processor.Subscriber().ReloadPubSub(req.GetPubsubName()); err != nil { - a.logger.Errorf("Error reloading subscriptions after gRPC Subscribe shutdown: %s", err) - } }() return a.pubsubAdapterStreamer.Subscribe(stream, req) diff --git a/pkg/runtime/compstore/subscriptions.go b/pkg/runtime/compstore/subscriptions.go index ebe21ff11ca..ef364568435 100644 --- a/pkg/runtime/compstore/subscriptions.go +++ b/pkg/runtime/compstore/subscriptions.go @@ -14,13 +14,23 @@ limitations under the License. package compstore import ( + "fmt" + subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1" rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub" + "github.com/dapr/kit/ptr" ) type DeclarativeSubscription struct { - Comp *subapi.Subscription - Subscription rtpubsub.Subscription + Comp *subapi.Subscription + *NamedSubscription +} + +type NamedSubscription struct { + // Name is the optional name of the subscription. If not set, the name of the + // component is used. + Name *string + rtpubsub.Subscription } type subscriptions struct { @@ -49,25 +59,37 @@ func (c *ComponentStore) AddDeclarativeSubscription(comp *subapi.Subscription, s } c.subscriptions.declaratives[comp.Name] = &DeclarativeSubscription{ - Comp: comp, - Subscription: sub, + Comp: comp, + NamedSubscription: &NamedSubscription{ + Name: ptr.Of(comp.Name), + Subscription: sub, + }, } c.subscriptions.declarativesList = append(c.subscriptions.declarativesList, comp.Name) } -func (c *ComponentStore) AddStreamSubscription(comp *subapi.Subscription) { +func (c *ComponentStore) AddStreamSubscription(comp *subapi.Subscription) error { c.lock.Lock() defer c.lock.Unlock() + if _, ok := c.subscriptions.streams[comp.Name]; ok { + return fmt.Errorf("streamer already subscribed to pubsub %q topic %q", comp.Spec.Pubsubname, comp.Spec.Topic) + } + c.subscriptions.streams[comp.Name] = &DeclarativeSubscription{ Comp: comp, - Subscription: rtpubsub.Subscription{ - PubsubName: comp.Spec.Pubsubname, - Topic: comp.Spec.Topic, - DeadLetterTopic: comp.Spec.DeadLetterTopic, - Metadata: comp.Spec.Metadata, - Rules: []*rtpubsub.Rule{{Path: "/"}}, + NamedSubscription: &NamedSubscription{ + Name: ptr.Of(comp.Name), + Subscription: rtpubsub.Subscription{ + PubsubName: comp.Spec.Pubsubname, + Topic: comp.Spec.Topic, + DeadLetterTopic: comp.Spec.DeadLetterTopic, + Metadata: comp.Spec.Metadata, + Rules: []*rtpubsub.Rule{{Path: "/"}}, + }, }, } + + return nil } func (c *ComponentStore) DeleteStreamSubscription(names ...string) { @@ -132,11 +154,11 @@ func (c *ComponentStore) ListSubscriptions() []rtpubsub.Subscription { return subs } -func (c *ComponentStore) ListSubscriptionsAppByPubSub(name string) []rtpubsub.Subscription { +func (c *ComponentStore) ListSubscriptionsAppByPubSub(name string) []*NamedSubscription { c.lock.RLock() defer c.lock.RUnlock() - var subs []rtpubsub.Subscription + var subs []*NamedSubscription taken := make(map[string]int) for _, subName := range c.subscriptions.declarativesList { sub := c.subscriptions.declaratives[subName] @@ -146,7 +168,10 @@ func (c *ComponentStore) ListSubscriptionsAppByPubSub(name string) []rtpubsub.Su if _, ok := taken[sub.Subscription.Topic]; !ok { taken[sub.Subscription.Topic] = len(subs) - subs = append(subs, sub.Subscription) + subs = append(subs, &NamedSubscription{ + Name: ptr.Of(subName), + Subscription: sub.Subscription, + }) } } for i := range c.subscriptions.programmatics { @@ -156,36 +181,47 @@ func (c *ComponentStore) ListSubscriptionsAppByPubSub(name string) []rtpubsub.Su } if j, ok := taken[sub.Topic]; ok { - subs[j] = sub + subs[j] = &NamedSubscription{Subscription: sub} } else { taken[sub.Topic] = len(subs) - subs = append(subs, sub) + subs = append(subs, &NamedSubscription{Subscription: sub}) } } return subs } -func (c *ComponentStore) ListSubscriptionsStreamByPubSub(name string) []rtpubsub.Subscription { +func (c *ComponentStore) ListSubscriptionsStreamByPubSub(name string) []*NamedSubscription { c.lock.RLock() defer c.lock.RUnlock() - var subs []rtpubsub.Subscription + var subs []*NamedSubscription for _, sub := range c.subscriptions.streams { if sub.Subscription.PubsubName == name { - subs = append(subs, sub.Subscription) + subs = append(subs, sub.NamedSubscription) } } return subs } -func (c *ComponentStore) GetDeclarativeSubscription(name string) (*subapi.Subscription, bool) { +func (c *ComponentStore) GetDeclarativeSubscription(name string) (*DeclarativeSubscription, bool) { c.lock.RLock() defer c.lock.RUnlock() for i, sub := range c.subscriptions.declaratives { if sub.Comp.Name == name { - return c.subscriptions.declaratives[i].Comp, true + return c.subscriptions.declaratives[i], true + } + } + return nil, false +} + +func (c *ComponentStore) GetStreamSubscription(key string) (*NamedSubscription, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + for i, sub := range c.subscriptions.streams { + if sub.Comp.Name == key { + return c.subscriptions.streams[i].NamedSubscription, true } } return nil, false diff --git a/pkg/runtime/hotreload/reconciler/subscriptions.go b/pkg/runtime/hotreload/reconciler/subscriptions.go index a7adf62556c..2634e04169a 100644 --- a/pkg/runtime/hotreload/reconciler/subscriptions.go +++ b/pkg/runtime/hotreload/reconciler/subscriptions.go @@ -37,7 +37,7 @@ func (s *subscriptions) update(ctx context.Context, sub subapi.Subscription) { if exists { log.Infof("Closing existing Subscription to reload: %s", oldSub.Name) - if err := s.proc.CloseSubscription(ctx, oldSub); err != nil { + if err := s.proc.CloseSubscription(ctx, oldSub.Comp); err != nil { log.Errorf("Failed to close existing Subscription: %s", err) return } diff --git a/pkg/runtime/processor/manager.go b/pkg/runtime/processor/manager.go index f23d2f30559..3c28d12632a 100644 --- a/pkg/runtime/processor/manager.go +++ b/pkg/runtime/processor/manager.go @@ -41,12 +41,15 @@ type SecretManager interface { } type SubscribeManager interface { - ReloadPubSub(string) error - StopPubSub(string) InitProgramaticSubscriptions(context.Context) error StartAppSubscriptions() error StopAppSubscriptions() StopAllSubscriptionsForever() + ReloadDeclaredAppSubscription(name, pubsubName string) error + StartStreamerSubscription(key string) error + StopStreamerSubscription(pubsubName, key string) + ReloadPubSub(string) error + StopPubSub(string) } type BindingManager interface { diff --git a/pkg/runtime/processor/subscriber/subscriber.go b/pkg/runtime/processor/subscriber/subscriber.go index ce32ebe2ae8..12c4da0c05c 100644 --- a/pkg/runtime/processor/subscriber/subscriber.go +++ b/pkg/runtime/processor/subscriber/subscriber.go @@ -58,8 +58,8 @@ type Subscriber struct { adapter rtpubsub.Adapter adapterStreamer rtpubsub.AdapterStreamer - appSubs map[string][]*subscription.Subscription - streamSubs map[string][]*subscription.Subscription + appSubs map[string][]*namedSubscription + streamSubs map[string][]*namedSubscription appSubActive bool hasInitProg bool lock sync.RWMutex @@ -67,6 +67,11 @@ type Subscriber struct { closed bool } +type namedSubscription struct { + name *string + *subscription.Subscription +} + var log = logger.NewLogger("dapr.runtime.processor.subscription") func New(opts Options) *Subscriber { @@ -81,8 +86,8 @@ func New(opts Options) *Subscriber { compStore: opts.CompStore, adapter: opts.Adapter, adapterStreamer: opts.AdapterStreamer, - appSubs: make(map[string][]*subscription.Subscription), - streamSubs: make(map[string][]*subscription.Subscription), + appSubs: make(map[string][]*namedSubscription), + streamSubs: make(map[string][]*namedSubscription), } } @@ -120,6 +125,97 @@ func (s *Subscriber) ReloadPubSub(name string) error { return errors.Join(errs...) } +func (s *Subscriber) StartStreamerSubscription(key string) error { + s.lock.Lock() + defer s.lock.Unlock() + + if s.closed { + return nil + } + + sub, ok := s.compStore.GetStreamSubscription(key) + if !ok { + return nil + } + + pubsub, ok := s.compStore.GetPubSub(sub.PubsubName) + if !ok { + return nil + } + + ss, err := s.startSubscription(pubsub, sub, true) + if err != nil { + return fmt.Errorf("failed to create subscription for %s: %s", sub.PubsubName, err) + } + + s.streamSubs[sub.PubsubName] = append(s.streamSubs[sub.PubsubName], &namedSubscription{ + name: &key, + Subscription: ss, + }) + + return nil +} + +func (s *Subscriber) StopStreamerSubscription(pubsubName, key string) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.closed { + return + } + + for i, sub := range s.streamSubs[pubsubName] { + if sub.name != nil && *sub.name == key { + sub.Stop() + s.streamSubs[pubsubName] = append(s.streamSubs[pubsubName][:i], s.streamSubs[pubsubName][i+1:]...) + return + } + } +} + +func (s *Subscriber) ReloadDeclaredAppSubscription(name, pubsubName string) error { + s.lock.Lock() + defer s.lock.Unlock() + + if s.closed { + return nil + } + + for i, appsub := range s.appSubs[pubsubName] { + if appsub.name != nil && name == *appsub.name { + appsub.Stop() + s.appSubs[pubsubName] = append(s.appSubs[pubsubName][:i], s.appSubs[pubsubName][i+1:]...) + break + } + } + + ps, ok := s.compStore.GetPubSub(pubsubName) + if !ok { + return nil + } + + sub, ok := s.compStore.GetDeclarativeSubscription(name) + if !ok { + return nil + } + + if !rtpubsub.IsOperationAllowed(sub.Topic, ps, ps.ScopedSubscriptions) { + return nil + } + + ss, err := s.startSubscription(ps, sub.NamedSubscription, false) + if err != nil { + return fmt.Errorf("failed to create subscription for %s: %s", sub.PubsubName, err) + } + + s.appSubs[sub.PubsubName] = append(s.appSubs[sub.PubsubName], &namedSubscription{ + name: &name, + Subscription: ss, + }) + + return nil +} + func (s *Subscriber) StopPubSub(name string) { s.lock.Lock() defer s.lock.Unlock() @@ -154,33 +250,22 @@ func (s *Subscriber) StartAppSubscriptions() error { sub.Stop() } } - s.appSubs = make(map[string][]*subscription.Subscription) + s.appSubs = make(map[string][]*namedSubscription) var errs []error for name, ps := range s.compStore.ListPubSubs() { ps := ps for _, sub := range s.compStore.ListSubscriptionsAppByPubSub(name) { - sub := sub - ss, err := subscription.New(subscription.Options{ - AppID: s.appID, - Namespace: s.namespace, - PubSubName: sub.PubsubName, - Topic: sub.Topic, - IsHTTP: s.isHTTP, - PubSub: ps, - Resiliency: s.resiliency, - TraceSpec: s.tracingSpec, - Route: sub, - Channels: s.channels, - GRPC: s.grpc, - Adapter: s.adapter, - }) + ss, err := s.startSubscription(ps, sub, false) if err != nil { errs = append(errs, err) continue } - s.appSubs[name] = append(s.appSubs[name], ss) + s.appSubs[name] = append(s.appSubs[name], &namedSubscription{ + name: sub.Name, + Subscription: ss, + }) } } @@ -203,7 +288,7 @@ func (s *Subscriber) StopAppSubscriptions() { } } - s.appSubs = make(map[string][]*subscription.Subscription) + s.appSubs = make(map[string][]*namedSubscription) } func (s *Subscriber) StopAllSubscriptionsForever() { @@ -223,8 +308,8 @@ func (s *Subscriber) StopAllSubscriptionsForever() { } } - s.appSubs = make(map[string][]*subscription.Subscription) - s.streamSubs = make(map[string][]*subscription.Subscription) + s.appSubs = make(map[string][]*namedSubscription) + s.streamSubs = make(map[string][]*namedSubscription) } func (s *Subscriber) InitProgramaticSubscriptions(ctx context.Context) error { @@ -243,30 +328,19 @@ func (s *Subscriber) reloadPubSubStream(name string, pubsub *rtpubsub.PubsubItem return nil } - subs := make([]*subscription.Subscription, 0, len(s.compStore.ListSubscriptionsStreamByPubSub(name))) + subs := make([]*namedSubscription, 0, len(s.compStore.ListSubscriptionsStreamByPubSub(name))) var errs []error for _, sub := range s.compStore.ListSubscriptionsStreamByPubSub(name) { - ss, err := subscription.New(subscription.Options{ - AppID: s.appID, - Namespace: s.namespace, - PubSubName: name, - Topic: sub.Topic, - IsHTTP: s.isHTTP, - PubSub: pubsub, - Resiliency: s.resiliency, - TraceSpec: s.tracingSpec, - Route: sub, - Channels: s.channels, - GRPC: s.grpc, - Adapter: s.adapter, - AdapterStreamer: s.adapterStreamer, - }) + ss, err := s.startSubscription(pubsub, sub, true) if err != nil { errs = append(errs, fmt.Errorf("failed to create subscription for %s: %s", name, err)) continue } - subs = append(subs, ss) + subs = append(subs, &namedSubscription{ + name: sub.Name, + Subscription: ss, + }) } s.streamSubs[name] = subs @@ -290,28 +364,18 @@ func (s *Subscriber) reloadPubSubApp(name string, pubsub *rtpubsub.PubsubItem) e } var errs []error - subs := make([]*subscription.Subscription, 0, len(s.compStore.ListSubscriptionsAppByPubSub(name))) + subs := make([]*namedSubscription, 0, len(s.compStore.ListSubscriptionsAppByPubSub(name))) for _, sub := range s.compStore.ListSubscriptionsAppByPubSub(name) { - ss, err := subscription.New(subscription.Options{ - AppID: s.appID, - Namespace: s.namespace, - PubSubName: name, - Topic: sub.Topic, - IsHTTP: s.isHTTP, - PubSub: pubsub, - Resiliency: s.resiliency, - TraceSpec: s.tracingSpec, - Route: sub, - Channels: s.channels, - GRPC: s.grpc, - Adapter: s.adapter, - }) + ss, err := s.startSubscription(pubsub, sub, false) if err != nil { errs = append(errs, fmt.Errorf("failed to create subscription for %s: %s", name, err)) continue } - subs = append(subs, ss) + subs = append(subs, &namedSubscription{ + name: sub.Name, + Subscription: ss, + }) } s.appSubs[name] = subs @@ -370,3 +434,25 @@ func (s *Subscriber) initProgramaticSubscriptions(ctx context.Context) error { return nil } + +func (s *Subscriber) startSubscription(pubsub *rtpubsub.PubsubItem, comp *compstore.NamedSubscription, isStreamer bool) (*subscription.Subscription, error) { + var streamer rtpubsub.AdapterStreamer + if isStreamer { + streamer = s.adapterStreamer + } + return subscription.New(subscription.Options{ + AppID: s.appID, + Namespace: s.namespace, + PubSubName: comp.PubsubName, + Topic: comp.Topic, + IsHTTP: s.isHTTP, + PubSub: pubsub, + Resiliency: s.resiliency, + TraceSpec: s.tracingSpec, + Route: comp.Subscription, + Channels: s.channels, + GRPC: s.grpc, + Adapter: s.adapter, + AdapterStreamer: streamer, + }) +} diff --git a/pkg/runtime/processor/subscriptions.go b/pkg/runtime/processor/subscriptions.go index b32a1e64bd3..b76358acd44 100644 --- a/pkg/runtime/processor/subscriptions.go +++ b/pkg/runtime/processor/subscriptions.go @@ -33,9 +33,6 @@ func (p *Processor) AddPendingSubscription(ctx context.Context, subscriptions .. return true } - // TODO: @joshvanl: also index by subscription so entire pubsub doesn't need - // to be reloaded. - pubsubsToReload := make(map[string]struct{}) for i := range scopedSubs { comp := scopedSubs[i] sub := rtpubsub.Subscription{ @@ -65,16 +62,8 @@ func (p *Processor) AddPendingSubscription(ctx context.Context, subscriptions .. } p.compStore.AddDeclarativeSubscription(&comp, sub) - pubsubsToReload[comp.Spec.Pubsubname] = struct{}{} - } - - for pubsubName := range pubsubsToReload { - if err := p.subscriber.ReloadPubSub(pubsubName); err != nil { - names := make([]string, len(scopedSubs)) - for i, sub := range scopedSubs { - names[i] = sub.Name - } - p.compStore.DeleteDeclarativeSubscription(names...) + if err := p.subscriber.ReloadDeclaredAppSubscription(comp.Name, comp.Spec.Pubsubname); err != nil { + p.compStore.DeleteDeclarativeSubscription(comp.Name) p.errorSubscriptions(ctx, err) return false } @@ -101,9 +90,7 @@ func (p *Processor) CloseSubscription(ctx context.Context, sub *subapi.Subscript return nil } p.compStore.DeleteDeclarativeSubscription(sub.Name) - // TODO: @joshvanl: also index by subscription so entire pubsub doesn't need - // to be reloaded. - if err := p.subscriber.ReloadPubSub(sub.Spec.Pubsubname); err != nil { + if err := p.subscriber.ReloadDeclaredAppSubscription(sub.Name, sub.Spec.Pubsubname); err != nil { return err } return nil diff --git a/pkg/runtime/pubsub/streamer/conn.go b/pkg/runtime/pubsub/streamer/conn.go index 46e34d249bf..97f57fe56e7 100644 --- a/pkg/runtime/pubsub/streamer/conn.go +++ b/pkg/runtime/pubsub/streamer/conn.go @@ -22,10 +22,23 @@ import ( type conn struct { lock sync.RWMutex + streamLock sync.Mutex stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server publishResponses map[string]chan *rtv1pb.SubscribeTopicEventsResponseAlpha1 } +func (c *conn) registerPublishResponse(id string) (chan *rtv1pb.SubscribeTopicEventsResponseAlpha1, func()) { + ch := make(chan *rtv1pb.SubscribeTopicEventsResponseAlpha1) + c.lock.Lock() + c.publishResponses[id] = ch + c.lock.Unlock() + return ch, func() { + c.lock.Lock() + delete(c.publishResponses, id) + c.lock.Unlock() + } +} + func (c *conn) notifyPublishResponse(ctx context.Context, resp *rtv1pb.SubscribeTopicEventsResponseAlpha1) { c.lock.RLock() ch, ok := c.publishResponses[resp.GetId()] @@ -41,15 +54,3 @@ func (c *conn) notifyPublishResponse(ctx context.Context, resp *rtv1pb.Subscribe case ch <- resp: } } - -func (c *conn) registerPublishResponse(id string) (chan *rtv1pb.SubscribeTopicEventsResponseAlpha1, func()) { - ch := make(chan *rtv1pb.SubscribeTopicEventsResponseAlpha1) - c.lock.Lock() - c.publishResponses[id] = ch - c.lock.Unlock() - return ch, func() { - c.lock.Lock() - delete(c.publishResponses, id) - c.lock.Unlock() - } -} diff --git a/pkg/runtime/pubsub/streamer/streamer.go b/pkg/runtime/pubsub/streamer/streamer.go index 41f7f19a681..33672e63b6a 100644 --- a/pkg/runtime/pubsub/streamer/streamer.go +++ b/pkg/runtime/pubsub/streamer/streamer.go @@ -22,6 +22,9 @@ import ( "sync" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + contribpubsub "github.com/dapr/components-contrib/pubsub" "github.com/dapr/dapr/pkg/config" diag "github.com/dapr/dapr/pkg/diagnostics" @@ -79,7 +82,12 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server for { resp, err := stream.Recv() - if errors.Is(err, context.Canceled) || errors.Is(err, io.EOF) { + s, ok := status.FromError(err) + + if (ok && s.Code() == codes.Canceled) || + errors.Is(err, context.Canceled) || + errors.Is(err, io.EOF) { + log.Infof("Unsubscribed from pubsub '%s' topic '%s'", req.GetPubsubName(), req.GetTopic()) return err } @@ -92,7 +100,6 @@ func (s *streamer) Subscribe(stream rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server if eventResp == nil { return errors.New("duplicate initial request received") } - wg.Add(1) go func() { defer wg.Done() @@ -119,7 +126,9 @@ func (s *streamer) Publish(ctx context.Context, msg *rtpubsub.SubscribedMessage) defer defFn() start := time.Now() + conn.streamLock.Lock() err = conn.stream.Send(envelope) + conn.streamLock.Unlock() elapsed := diag.ElapsedSince(start) if span != nil { diff --git a/tests/integration/suite/daprd/subscriptions/stream/errors.go b/tests/integration/suite/daprd/subscriptions/stream/errors.go index 7caa455e270..21066ec25e7 100644 --- a/tests/integration/suite/daprd/subscriptions/stream/errors.go +++ b/tests/integration/suite/daprd/subscriptions/stream/errors.go @@ -118,7 +118,7 @@ func (e *errors) Run(t *testing.T, ctx context.Context) { _, err = streamDupe.Recv() s, ok = status.FromError(err) require.True(t, ok) - assert.Contains(t, s.Message(), `already subscribed to pubsub "mypub" topic "a"`) + assert.Contains(t, s.Message(), `streamer already subscribed to pubsub "mypub" topic "a"`) streamDoubleInit, err := client.SubscribeTopicEventsAlpha1(ctx) require.NoError(t, err)