Skip to content

Commit

Permalink
Subscriptions: bi-directional subscription streaming. (#7747)
Browse files Browse the repository at this point in the history
* Subscriptions: bi-directional subscription & publish streaming.

Adds SubscribeTopicEvents proto API which dynamically subscribes to
pubsub topics based on dapr/proposals#52.

This is a basic gRPC implementation of the API whereby, like
Subscription hot-reloading today, subscribing to a topic will reload
_every_ active subscription for the current daprd. In a future PR,
reloading of Subscriptions will be granular to the specific pubsub
topic.

Stream subscriptions are also only active once daprd declares the
application as both present and ready. Dynamic stream subscriptions
should be active both whether a app is running or not, as well as
whether it is ready or not. This will be addressed in a future PR.

Signed-off-by: joshvanl <[email protected]>

* Updates go.mod go version to 1.22.3 in e2e apps

Signed-off-by: joshvanl <[email protected]>

* Remove small context timeout on httpserver int tests

Signed-off-by: joshvanl <[email protected]>

* Wait for daprd2 to be ready before calling meta endpoint in
hot-op-inf-comp test

Signed-off-by: joshvanl <[email protected]>

* Increase int test daprd wait until ready timeout to 30s

Signed-off-by: joshvanl <[email protected]>

* Assert httpendpoint int test resp body with eventually

Signed-off-by: joshvanl <[email protected]>

* Set subscription APIs Alpha1

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
  • Loading branch information
JoshVanL and yaron2 authored May 28, 2024
1 parent 8e65d49 commit 7093021
Show file tree
Hide file tree
Showing 32 changed files with 3,977 additions and 2,150 deletions.
46 changes: 46 additions & 0 deletions dapr/proto/runtime/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "dapr/proto/common/v1/common.proto";
import "dapr/proto/runtime/v1/appcallback.proto";

option csharp_namespace = "Dapr.Client.Autogen.Grpc.v1";
option java_outer_classname = "DaprProtos";
Expand Down Expand Up @@ -58,6 +59,10 @@ service Dapr {
// Bulk Publishes multiple events to the specified topic.
rpc BulkPublishEventAlpha1(BulkPublishRequest) returns (BulkPublishResponse) {}

// SubscribeTopicEventsAlpha1 subscribes to a PubSub topic and receives topic
// events from it.
rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream TopicEventRequest) {}

// Invokes binding data to specific output bindings
rpc InvokeBinding(InvokeBindingRequest) returns (InvokeBindingResponse) {}

Expand Down Expand Up @@ -411,6 +416,47 @@ message BulkPublishResponseFailedEntry {
string error = 2;
}

// SubscribeTopicEventsRequestAlpha1 is a message containing the details for
// subscribing to a topic via streaming.
// The first message must always be the initial request. All subsequent
// messages must be event responses.
message SubscribeTopicEventsRequestAlpha1 {
oneof subscribe_topic_events_request_type {
SubscribeTopicEventsInitialRequestAlpha1 initial_request = 1;
SubscribeTopicEventsResponseAlpha1 event_response = 2;
}
}

// SubscribeTopicEventsInitialRequestAlpha1 is the initial message containing the
// details for subscribing to a topic via streaming.
message SubscribeTopicEventsInitialRequestAlpha1 {
// The name of the pubsub component
string pubsub_name = 1;

// The pubsub topic
string topic = 2;

// The metadata passing to pub components
//
// metadata property:
// - key : the key of the message.
map<string, string> metadata = 3;

// dead_letter_topic is the topic to which messages that fail to be processed
// are sent.
optional string dead_letter_topic = 4;
}

// SubscribeTopicEventsResponseAlpha1 is a message containing the result of a
// subscription to a topic.
message SubscribeTopicEventsResponseAlpha1 {
// id is the unique identifier for the subscription request.
string id = 1;

// status is the result of the subscription request.
TopicEventResponse status = 2;
}

// InvokeBindingRequest is the message to send data to output bindings
message InvokeBindingRequest {
// The name of the output binding to invoke.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/dapr/dapr

go 1.22.2
go 1.22.3

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/grpc/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ var endpoints = map[string][]string{
"shutdown.v1": {
daprRuntimePrefix + "v1.Dapr/Shutdown",
},
"subscribe.v1alpha1": {
daprRuntimePrefix + "v1.Dapr/SubscribeTopicEventsAlpha1",
},
}

// Returns the middlewares (unary and stream) for supporting API allowlist
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/dapr/dapr/pkg/resiliency"
"github.com/dapr/dapr/pkg/resiliency/breaker"
"github.com/dapr/dapr/pkg/runtime/channels"
"github.com/dapr/dapr/pkg/runtime/processor"
runtimePubsub "github.com/dapr/dapr/pkg/runtime/pubsub"
"github.com/dapr/dapr/utils"
"github.com/dapr/kit/logger"
Expand Down Expand Up @@ -90,6 +91,7 @@ type api struct {
sendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
tracingSpec config.TracingSpec
accessControlList *config.AccessControlList
processor *processor.Processor
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
Expand All @@ -105,6 +107,7 @@ type APIOpts struct {
SendToOutputBindingFn func(ctx context.Context, name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error)
TracingSpec config.TracingSpec
AccessControlList *config.AccessControlList
Processor *processor.Processor
}

// NewAPI returns a new gRPC API.
Expand All @@ -119,6 +122,7 @@ func NewAPI(opts APIOpts) API {
sendToOutputBindingFn: opts.SendToOutputBindingFn,
tracingSpec: opts.TracingSpec,
accessControlList: opts.AccessControlList,
processor: opts.Processor,
closeCh: make(chan struct{}),
}
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/api/grpc/subscribe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package grpc

import (
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)

// SubscribeTopicEvents is called by the Dapr runtime to ad hoc stream
// subscribe to topics. If gRPC API server closes, returns func early with nil
// to close stream.
func (a *api) SubscribeTopicEventsAlpha1(stream runtimev1pb.Dapr_SubscribeTopicEventsAlpha1Server) error {
errCh := make(chan error, 2)
subDone := make(chan struct{})
a.wg.Add(2)
go func() {
errCh <- a.processor.PubSub().Streamer().Subscribe(stream)
close(subDone)
a.wg.Done()
}()
go func() {
select {
case <-a.closeCh:
case <-subDone:
}
errCh <- nil
a.wg.Done()
}()

return <-errCh
}
Loading

0 comments on commit 7093021

Please sign in to comment.