Skip to content

Commit

Permalink
Subscriptions: bi-directional subscription & publish streaming.
Browse files Browse the repository at this point in the history
Adds SubscribeTopicEvents proto API which dynamically subscribes to
pubsub topics based on dapr/proposals#52.

This is a basic gRPC implementation of the API whereby, like
Subscription hot-reloading today, subscribing to a topic will reload
_every_ active subscription for the current daprd. In a future PR,
reloading of Subscriptions will be granular to the specific pubsub
topic.

Stream subscriptions are also only active once daprd declares the
application as both present and ready. Dynamic stream subscriptions
should be active both whether a app is running or not, as well as
whether it is ready or not. This will be addressed in a future PR.

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL committed May 14, 2024
1 parent bc77d50 commit 6d0043e
Show file tree
Hide file tree
Showing 28 changed files with 3,935 additions and 2,101 deletions.
46 changes: 46 additions & 0 deletions dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "dapr/proto/common/v1/common.proto";
import "dapr/proto/runtime/v1/appcallback.proto";

option csharp_namespace = "Dapr.Client.Autogen.Grpc.v1";
option java_outer_classname = "DaprProtos";
Expand Down Expand Up @@ -58,6 +59,10 @@ service Dapr {
// Bulk Publishes multiple events to the specified topic.
rpc BulkPublishEventAlpha1(BulkPublishRequest) returns (BulkPublishResponse) {}

// SubscribeTopicEvents subscribes to a PubSub topic and receives topic events
// from it.
rpc SubscribeTopicEvents(stream SubscribeTopicEventsRequest) returns (stream TopicEventRequest) {}

// Invokes binding data to specific output bindings
rpc InvokeBinding(InvokeBindingRequest) returns (InvokeBindingResponse) {}

Expand Down Expand Up @@ -411,6 +416,47 @@ message BulkPublishResponseFailedEntry {
string error = 2;
}

// SubscribeTopicEventsRequest is a message containing the details for
// subscribing to a topic via streaming.
// The first message must always be the initial request. All subsequent
// messages must be event responses.
message SubscribeTopicEventsRequest {
oneof subscribe_topic_events_request_type {
SubscribeTopicEventsInitialRequest initial_request = 1;
SubscribeTopicEventsResponse event_response = 2;
}
}

// SubscribeTopicEventsInitialRequest is the initial message containing the
// details for subscribing to a topic via streaming.
message SubscribeTopicEventsInitialRequest {
// The name of the pubsub component
string pubsub_name = 1;

// The pubsub topic
string topic = 2;

// The metadata passing to pub components
//
// metadata property:
// - key : the key of the message.
map<string, string> metadata = 3;

// dead_letter_topic is the topic to which messages that fail to be processed
// are sent.
optional string dead_letter_topic = 4;
}

// SubscribeTopicEventsResponse is a message containing the result of a
// subscription to a topic.
message SubscribeTopicEventsResponse {
// id is the unique identifier for the subscription request.
string id = 1;

// status is the result of the subscription request.
TopicEventResponse status = 2;
}

// InvokeBindingRequest is the message to send data to output bindings
message InvokeBindingRequest {
// The name of the output binding to invoke.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/dapr/dapr

go 1.22.2
go 1.22.3

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/grpc/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ var endpoints = map[string][]string{
"publish.v1alpha1": {
daprRuntimePrefix + "v1.Dapr/BulkPublishEventAlpha1",
},
"subscribe.v1": {
daprRuntimePrefix + "v1.Dapr/SubscribeTopicEvents",
},
"bindings.v1": {
daprRuntimePrefix + "v1.Dapr/InvokeBinding",
},
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/resiliency/breaker"
"github.com/dapr/dapr/pkg/runtime/channels"
"github.com/dapr/dapr/pkg/runtime/processor"
runtimePubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
"github.com/dapr/dapr/utils"
"github.com/dapr/kit/logger"
Expand Down Expand Up @@ -90,6 +91,7 @@ type api struct {
sendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
tracingSpec config.TracingSpec
accessControlList *config.AccessControlList
processor *processor.Processor
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
Expand All @@ -105,6 +107,7 @@ type APIOpts struct {
SendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
TracingSpec config.TracingSpec
AccessControlList *config.AccessControlList
Processor *processor.Processor
}

// NewAPI returns a new gRPC API.
Expand All @@ -119,6 +122,7 @@ func NewAPI(opts APIOpts) API {
sendToOutputBindingFn: opts.SendToOutputBindingFn,
tracingSpec: opts.TracingSpec,
accessControlList: opts.AccessControlList,
processor: opts.Processor,
closeCh: make(chan struct{}),
}
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/api/grpc/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package grpc

import (
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

// SubscribeTopicEvents is called by the Dapr runtime to ad hoc stream
// subscribe to topics. If gRPC API server closes, returns func early with nil
// to close stream.
func (a *api) SubscribeTopicEvents(stream runtimev1pb.Dapr_SubscribeTopicEventsServer) error {
errCh := make(chan error, 2)
subDone := make(chan struct{})
a.wg.Add(2)
go func() {
errCh <- a.processor.PubSub().Streamer().Subscribe(stream)
close(subDone)
a.wg.Done()
}()
go func() {
select {
case <-a.closeCh:
case <-subDone:
}
errCh <- nil
a.wg.Done()
}()

return <-errCh
}
Loading

0 comments on commit 6d0043e

Please sign in to comment.