Skip to content

Commit

Permalink
Subscriptions: bi-di index on per subscription (dapr#7770)
Browse files Browse the repository at this point in the history
* Subscriptions: bi-directional subscription & publish streaming.

Adds SubscribeTopicEvents proto API which dynamically subscribes to
pubsub topics based on dapr/proposals#52.

This is a basic gRPC implementation of the API whereby, like
Subscription hot-reloading today, subscribing to a topic will reload
_every_ active subscription for the current daprd. In a future PR,
reloading of Subscriptions will be granular to the specific pubsub
topic.

Stream subscriptions are also only active once daprd declares the
application as both present and ready. Dynamic stream subscriptions
should be active both whether a app is running or not, as well as
whether it is ready or not. This will be addressed in a future PR.

Signed-off-by: joshvanl <[email protected]>

* Subscriptions: bi-directional subscription streaming- subscribe on no
healthz.

Refactors pubsub machinery to allow for gRPC bi-directional subcription
streaming when there is no application, or the application in unhealhty.

dapr/proposals#52

Signed-off-by: joshvanl <[email protected]>

* Fix unit tests

Signed-off-by: joshvanl <[email protected]>

* Fix subscription allowed

Signed-off-by: joshvanl <[email protected]>

* Subscriptions: bi-di index on per subscription

Index on per subscription so that streams or Subscription hot reloading
events will only reload that specific subscription, rather than
reloading _every_ subscription for that PubSub component. This
dramatically reduces disruption to topic subscriptions for a given
PubSub component.

Signed-off-by: joshvanl <[email protected]>

* Lock streamer when sending topic mesgae to stream connection

Signed-off-by: joshvanl <[email protected]>

* Log Info when a streaming subscription unsubscribes

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Signed-off-by: Jake Engelberg <[email protected]>
  • Loading branch information
JoshVanL authored and jake-engelberg committed Sep 20, 2024
1 parent 9e7b5f3 commit e3156b9
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 118 deletions.
12 changes: 6 additions & 6 deletions pkg/api/grpc/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ func (a *api) streamSubscribe(stream runtimev1pb.Dapr_SubscribeTopicEventsAlpha1
return errors.New("topic is required")
}

// TODO: @joshvanl: handle duplicate key names.
key := a.pubsubAdapterStreamer.StreamerKey(req.GetPubsubName(), req.GetTopic())
a.Universal.CompStore().AddStreamSubscription(&subapi.Subscription{
err = a.Universal.CompStore().AddStreamSubscription(&subapi.Subscription{
ObjectMeta: metav1.ObjectMeta{Name: key},
Spec: subapi.SubscriptionSpec{
Pubsubname: req.GetPubsubName(),
Expand All @@ -82,17 +81,18 @@ func (a *api) streamSubscribe(stream runtimev1pb.Dapr_SubscribeTopicEventsAlpha1
Routes: subapi.Routes{Default: "/"},
},
})
if err != nil {
return err
}

if err := a.processor.Subscriber().ReloadPubSub(req.GetPubsubName()); err != nil {
if err := a.processor.Subscriber().StartStreamerSubscription(key); err != nil {
a.Universal.CompStore().DeleteStreamSubscription(key)
return err
}

defer func() {
a.processor.Subscriber().StopStreamerSubscription(req.GetPubsubName(), key)
a.Universal.CompStore().DeleteStreamSubscription(key)
if err := a.processor.Subscriber().ReloadPubSub(req.GetPubsubName()); err != nil {
a.logger.Errorf("Error reloading subscriptions after gRPC Subscribe shutdown: %s", err)
}
}()

return a.pubsubAdapterStreamer.Subscribe(stream, req)
Expand Down
78 changes: 57 additions & 21 deletions pkg/runtime/compstore/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,23 @@ limitations under the License.
package compstore

import (
"fmt"

subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1"
rtpubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
"github.com/dapr/kit/ptr"
)

type DeclarativeSubscription struct {
Comp *subapi.Subscription
Subscription rtpubsub.Subscription
Comp *subapi.Subscription
*NamedSubscription
}

type NamedSubscription struct {
// Name is the optional name of the subscription. If not set, the name of the
// component is used.
Name *string
rtpubsub.Subscription
}

type subscriptions struct {
Expand Down Expand Up @@ -49,25 +59,37 @@ func (c *ComponentStore) AddDeclarativeSubscription(comp *subapi.Subscription, s
}

c.subscriptions.declaratives[comp.Name] = &DeclarativeSubscription{
Comp: comp,
Subscription: sub,
Comp: comp,
NamedSubscription: &NamedSubscription{
Name: ptr.Of(comp.Name),
Subscription: sub,
},
}
c.subscriptions.declarativesList = append(c.subscriptions.declarativesList, comp.Name)
}

func (c *ComponentStore) AddStreamSubscription(comp *subapi.Subscription) {
func (c *ComponentStore) AddStreamSubscription(comp *subapi.Subscription) error {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.subscriptions.streams[comp.Name]; ok {
return fmt.Errorf("streamer already subscribed to pubsub %q topic %q", comp.Spec.Pubsubname, comp.Spec.Topic)
}

c.subscriptions.streams[comp.Name] = &DeclarativeSubscription{
Comp: comp,
Subscription: rtpubsub.Subscription{
PubsubName: comp.Spec.Pubsubname,
Topic: comp.Spec.Topic,
DeadLetterTopic: comp.Spec.DeadLetterTopic,
Metadata: comp.Spec.Metadata,
Rules: []*rtpubsub.Rule{{Path: "/"}},
NamedSubscription: &NamedSubscription{
Name: ptr.Of(comp.Name),
Subscription: rtpubsub.Subscription{
PubsubName: comp.Spec.Pubsubname,
Topic: comp.Spec.Topic,
DeadLetterTopic: comp.Spec.DeadLetterTopic,
Metadata: comp.Spec.Metadata,
Rules: []*rtpubsub.Rule{{Path: "/"}},
},
},
}

return nil
}

func (c *ComponentStore) DeleteStreamSubscription(names ...string) {
Expand Down Expand Up @@ -132,11 +154,11 @@ func (c *ComponentStore) ListSubscriptions() []rtpubsub.Subscription {
return subs
}

func (c *ComponentStore) ListSubscriptionsAppByPubSub(name string) []rtpubsub.Subscription {
func (c *ComponentStore) ListSubscriptionsAppByPubSub(name string) []*NamedSubscription {
c.lock.RLock()
defer c.lock.RUnlock()

var subs []rtpubsub.Subscription
var subs []*NamedSubscription
taken := make(map[string]int)
for _, subName := range c.subscriptions.declarativesList {
sub := c.subscriptions.declaratives[subName]
Expand All @@ -146,7 +168,10 @@ func (c *ComponentStore) ListSubscriptionsAppByPubSub(name string) []rtpubsub.Su

if _, ok := taken[sub.Subscription.Topic]; !ok {
taken[sub.Subscription.Topic] = len(subs)
subs = append(subs, sub.Subscription)
subs = append(subs, &NamedSubscription{
Name: ptr.Of(subName),
Subscription: sub.Subscription,
})
}
}
for i := range c.subscriptions.programmatics {
Expand All @@ -156,36 +181,47 @@ func (c *ComponentStore) ListSubscriptionsAppByPubSub(name string) []rtpubsub.Su
}

if j, ok := taken[sub.Topic]; ok {
subs[j] = sub
subs[j] = &NamedSubscription{Subscription: sub}
} else {
taken[sub.Topic] = len(subs)
subs = append(subs, sub)
subs = append(subs, &NamedSubscription{Subscription: sub})
}
}

return subs
}

func (c *ComponentStore) ListSubscriptionsStreamByPubSub(name string) []rtpubsub.Subscription {
func (c *ComponentStore) ListSubscriptionsStreamByPubSub(name string) []*NamedSubscription {
c.lock.RLock()
defer c.lock.RUnlock()

var subs []rtpubsub.Subscription
var subs []*NamedSubscription
for _, sub := range c.subscriptions.streams {
if sub.Subscription.PubsubName == name {
subs = append(subs, sub.Subscription)
subs = append(subs, sub.NamedSubscription)
}
}

return subs
}

func (c *ComponentStore) GetDeclarativeSubscription(name string) (*subapi.Subscription, bool) {
func (c *ComponentStore) GetDeclarativeSubscription(name string) (*DeclarativeSubscription, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
for i, sub := range c.subscriptions.declaratives {
if sub.Comp.Name == name {
return c.subscriptions.declaratives[i].Comp, true
return c.subscriptions.declaratives[i], true
}
}
return nil, false
}

func (c *ComponentStore) GetStreamSubscription(key string) (*NamedSubscription, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
for i, sub := range c.subscriptions.streams {
if sub.Comp.Name == key {
return c.subscriptions.streams[i].NamedSubscription, true
}
}
return nil, false
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/hotreload/reconciler/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *subscriptions) update(ctx context.Context, sub subapi.Subscription) {

if exists {
log.Infof("Closing existing Subscription to reload: %s", oldSub.Name)
if err := s.proc.CloseSubscription(ctx, oldSub); err != nil {
if err := s.proc.CloseSubscription(ctx, oldSub.Comp); err != nil {
log.Errorf("Failed to close existing Subscription: %s", err)
return
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/runtime/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ type SecretManager interface {
}

type SubscribeManager interface {
ReloadPubSub(string) error
StopPubSub(string)
InitProgramaticSubscriptions(context.Context) error
StartAppSubscriptions() error
StopAppSubscriptions()
StopAllSubscriptionsForever()
ReloadDeclaredAppSubscription(name, pubsubName string) error
StartStreamerSubscription(key string) error
StopStreamerSubscription(pubsubName, key string)
ReloadPubSub(string) error
StopPubSub(string)
}

type BindingManager interface {
Expand Down
Loading

0 comments on commit e3156b9

Please sign in to comment.