Skip to content

Commit

Permalink
Nexus callbacks, tasks, and incoming service registry (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
bergundy authored Jan 31, 2024
1 parent 34e5d97 commit 822966d
Show file tree
Hide file tree
Showing 12 changed files with 438 additions and 1 deletion.
10 changes: 9 additions & 1 deletion api-linter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
- "**/operatorservice/v1/request_response.proto"
disabled_rules:
- "core::0122::name-suffix"
- "core::0131::request-name-behavior"
- "core::0131::request-name-reference"
- "core::0131::request-name-required"
- "core::0131::request-unknown-fields"
- "core::0132::request-parent-required"
- "core::0132::request-unknown-fields"
- "core::0132::request-parent-required"
- "core::0132::response-unknown-fields"
- "core::0133::request-parent-required"
- "core::0133::request-resource-behavior"
- "core::0133::request-resource-field"
- "core::0134::request-unknown-fields"
- "core::0135::request-resource-name"
- "core::0135::request-name-behavior"
- "core::0135::request-name-reference"
- "core::0158::request-page-size-field"
- "core::0158::request-page-token-field"
- "core::0158::response-next-page-token-field"
Expand Down
95 changes: 95 additions & 0 deletions google/protobuf/struct.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Protocol Buffers - Google's data interchange format
// Copyright 2008 Google Inc. All rights reserved.
// https://developers.google.com/protocol-buffers/
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

syntax = "proto3";

package google.protobuf;

option cc_enable_arenas = true;
option go_package = "google.golang.org/protobuf/types/known/structpb";
option java_package = "com.google.protobuf";
option java_outer_classname = "StructProto";
option java_multiple_files = true;
option objc_class_prefix = "GPB";
option csharp_namespace = "Google.Protobuf.WellKnownTypes";

// `Struct` represents a structured data value, consisting of fields
// which map to dynamically typed values. In some languages, `Struct`
// might be supported by a native representation. For example, in
// scripting languages like JS a struct is represented as an
// object. The details of that representation are described together
// with the proto support for the language.
//
// The JSON representation for `Struct` is JSON object.
message Struct {
// Unordered map of dynamically typed values.
map<string, Value> fields = 1;
}

// `Value` represents a dynamically typed value which can be either
// null, a number, a string, a boolean, a recursive struct value, or a
// list of values. A producer of value is expected to set one of these
// variants. Absence of any variant indicates an error.
//
// The JSON representation for `Value` is JSON value.
message Value {
// The kind of value.
oneof kind {
// Represents a null value.
NullValue null_value = 1;
// Represents a double value.
double number_value = 2;
// Represents a string value.
string string_value = 3;
// Represents a boolean value.
bool bool_value = 4;
// Represents a structured value.
Struct struct_value = 5;
// Represents a repeated `Value`.
ListValue list_value = 6;
}
}

// `NullValue` is a singleton enumeration to represent the null value for the
// `Value` type union.
//
// The JSON representation for `NullValue` is JSON `null`.
enum NullValue {
// Null value.
NULL_VALUE = 0;
}

// `ListValue` is a wrapper around a repeated field of values.
//
// The JSON representation for `ListValue` is JSON array.
message ListValue {
// Repeated field of dynamically typed values.
repeated Value values = 1;
}
15 changes: 15 additions & 0 deletions temporal/api/common/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,18 @@ message ResetOptions {
// possibly others in the future.)
bool current_run_only = 11;
}

// Callback to attach to various events in the system, e.g. workflow run completion.
message Callback {
message Nexus {
// Callback URL.
// (-- api-linter: core::0140::uri=disabled
// aip.dev/not-precedent: Not following this guideline. --)
string url = 1;
}

reserved 1; // For a generic callback mechanism to be added later.
oneof variant {
Nexus nexus = 2;
}
}
16 changes: 16 additions & 0 deletions temporal/api/enums/v1/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,19 @@ enum Severity {
SEVERITY_MEDIUM = 2;
SEVERITY_LOW = 3;
}

// State of the callback.
enum CallbackState {
// Default value, unspecified state.
CALLBACK_STATE_UNSPECIFIED = 0;
// Callback is standing by, waiting to be triggered.
CALLBACK_STATE_STANDBY = 1;
// Callback is in the queue waiting to be executed or is currently executing.
CALLBACK_STATE_SCHEDULED = 2;
// Callback has failed with a retryable error and is backing off before the next attempt.
CALLBACK_STATE_BACKING_OFF = 3;
// Callback has failed.
CALLBACK_STATE_FAILED = 4;
// Callback has succeeded.
CALLBACK_STATE_SUCCEEDED = 5;
}
2 changes: 2 additions & 0 deletions temporal/api/enums/v1/task_queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ enum TaskQueueType {
TASK_QUEUE_TYPE_WORKFLOW = 1;
// Activity type of task queue.
TASK_QUEUE_TYPE_ACTIVITY = 2;
// Task queue type for dispatching Nexus requests.
TASK_QUEUE_TYPE_NEXUS = 3;
}

// Specifies which category of tasks may reach a worker on a versioned task queue.
Expand Down
3 changes: 3 additions & 0 deletions temporal/api/history/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ message WorkflowExecutionStartedEventAttributes {
// If this workflow intends to use anything other than the current overall default version for
// the queue, then we include it here.
temporal.api.common.v1.WorkerVersionStamp source_version_stamp = 29;

// Completion callbacks attached when this workflow was started.
repeated temporal.api.common.v1.Callback completion_callbacks = 30;
}

message WorkflowExecutionCompletedEventAttributes {
Expand Down
138 changes: 138 additions & 0 deletions temporal/api/nexus/v1/message.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// The MIT License
//
// Copyright (c) 2023 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

syntax = "proto3";

package temporal.api.nexus.v1;

option go_package = "go.temporal.io/api/nexus/v1;nexus";
option java_package = "io.temporal.api.nexus.v1";
option java_multiple_files = true;
option java_outer_classname = "MessageProto";
option ruby_package = "Temporalio::Api::Nexus::V1";
option csharp_namespace = "Temporalio.Api.Nexus.V1";

import "google/protobuf/any.proto";
import "google/protobuf/struct.proto";
import "temporal/api/common/v1/message.proto";

// A general purpose failure message.
// See: https://github.com/nexus-rpc/api/blob/main/SPEC.md#failure
message Failure {
string message = 1;
map<string, string> metadata = 2;
google.protobuf.Value details = 3;
}

message HandlerError {
// See https://github.com/nexus-rpc/api/blob/main/SPEC.md#predefined-handler-errors.
string error_type = 1;
Failure failure = 2;
}

message UnsuccessfulOperationError {
// See https://github.com/nexus-rpc/api/blob/main/SPEC.md#operationinfo.
string operation_state = 1;
Failure failure = 2;
}

// A request to start an operation.
message StartOperationRequest {
// Type of operation to start.
string operation = 1;
// A request ID that can be used as an idempotentency key.
string request_id = 2;
// Callback URL to call upon completion if the started operation is async.
string callback = 3;
// Full request body from the incoming HTTP request.
temporal.api.common.v1.Payload payload = 4;
}

// A request to cancel an operation.
message CancelOperationRequest {
// Type of operation to cancel.
string operation = 1;
// Operation ID as originally generated by a Handler.
string operation_id = 2;
}

// A Nexus request.
message Request {
// Headers extracted from the original request in the Temporal frontend.
// When using Nexus over HTTP, this includes the request's HTTP headers ignoring multiple values.
map<string, string> header = 1;
oneof variant {
StartOperationRequest start_operation = 2;
CancelOperationRequest cancel_operation = 3;
}
}

// Response variant for StartOperationRequest.
message StartOperationResponse {
// An operation completed successfully.
message Sync {
temporal.api.common.v1.Payload payload = 1;
}

// The operation will complete asynchronously.
// The returned ID can be used to reference this operation.
message Async {
string operation_id = 1;
}

oneof variant {
Sync sync_success = 1;
Async async_success = 2;
// The operation completed unsuccessfully (failed or canceled).
UnsuccessfulOperationError operation_error = 3;
}
}

// Response variant for CancelOperationRequest.
message CancelOperationResponse {
}

// A response indicating that the handler has successfully processed a request.
message Response {
// Variant must correlate to the corresponding Request's variant.
oneof variant {
StartOperationResponse start_operation = 1;
CancelOperationResponse cancel_operation = 2;
}
}

// A binding from a service name to namespace, task queue, and metadata for dispatching incoming Nexus requests.
message IncomingService {
// Data version for this service. Must match current version on update or set to 0 to create a new service.
int64 version = 1;
// Service name, unique for this cluster.
// The service name is used to address this service.
// By default, when using Nexus over HTTP, the service name is matched against the base URL path.
// E.g. the URL /my-service would match a service named "my-service".
// The name can contain any characters and is escaped when matched against a URL.
string name = 2;
// Namespace to route requests to.
string namespace = 3;
// Task queue to route requests to.
string task_queue = 4;
// Generic service metadata that is available to the server's authorizer.
map<string, google.protobuf.Any> metadata = 5;
}

41 changes: 41 additions & 0 deletions temporal/api/operatorservice/v1/request_response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ option ruby_package = "Temporalio::Api::OperatorService::V1";
option csharp_namespace = "Temporalio.Api.OperatorService.V1";

import "temporal/api/enums/v1/common.proto";
import "temporal/api/nexus/v1/message.proto";
import "google/protobuf/duration.proto";

// (-- Search Attribute --)
Expand Down Expand Up @@ -128,3 +129,43 @@ message ClusterMetadata {
// A flag to indicate if a connection is active.
bool is_connection_enabled = 6;
}

message GetNexusIncomingServiceRequest {
// Name of service to retrieve.
string name = 1;
}

message GetNexusIncomingServiceResponse {
temporal.api.nexus.v1.IncomingService service = 1;
}

message CreateOrUpdateNexusIncomingServiceRequest {
temporal.api.nexus.v1.IncomingService service = 1;
}

message CreateOrUpdateNexusIncomingServiceResponse {
// Data post acceptance. Can be used to issue additional updates to this record.
temporal.api.nexus.v1.IncomingService service = 1;
}

message DeleteNexusIncomingServiceRequest {
// Name of service to delete.
string name = 1;
}

message DeleteNexusIncomingServiceResponse {
}

message ListNexusIncomingServicesRequest {
int32 page_size = 1;
// To get the next page, pass in `ListNexusIncomingServicesResponse.next_page_token` from the previous page's
// response, the token will be empty if there's no other page.
// Note: the last page may be empty if the total number of services registered is a multiple of the page size.
bytes next_page_token = 2;
}

message ListNexusIncomingServicesResponse {
// Token for getting the next page.
bytes next_page_token = 1;
repeated temporal.api.nexus.v1.IncomingService services = 2;
}
20 changes: 20 additions & 0 deletions temporal/api/operatorservice/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,24 @@ service OperatorService {
// ListClusters returns information about Temporal clusters.
rpc ListClusters(ListClustersRequest) returns (ListClustersResponse) {
}

// Get a registered incoming Nexus service by name. The returned version can be used for optimistic updates.
rpc GetNexusIncomingService(GetNexusIncomingServiceRequest) returns (GetNexusIncomingServiceResponse) {
}

// Optimistically create or update a Nexus service based on provided version.
// To update an existing service, get the current service record via the `GetNexusIncomingService` API, modify it
// and submit to this API.
// Set version to 0 to create a new service.
// Returns the updated service with the updated version, which can be used for subsequent updates.
rpc CreateOrUpdateNexusIncomingService(CreateOrUpdateNexusIncomingServiceRequest) returns (CreateOrUpdateNexusIncomingServiceResponse) {
}

// Delete an incoming Nexus service by name.
rpc DeleteNexusIncomingService(DeleteNexusIncomingServiceRequest) returns (DeleteNexusIncomingServiceResponse) {
}

// List all nexus incoming service names. Use next_page_token in the response for pagination.
rpc ListNexusIncomingServices(ListNexusIncomingServicesRequest) returns (ListNexusIncomingServicesResponse) {
}
}
Loading

0 comments on commit 822966d

Please sign in to comment.