From 13888b2c98ea534455eda0a4c68f8352fab4c91b Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 22 Oct 2024 13:08:14 +0100 Subject: [PATCH 1/7] CORE-20867 add a flow retry topic to handle retries of transient RPC calls --- .../src/main/java/net/corda/schema/Schemas.java | 2 ++ .../src/main/resources/net/corda/schema/Flow.yaml | 8 ++++++++ gradle.properties | 2 +- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/data/topic-schema/src/main/java/net/corda/schema/Schemas.java b/data/topic-schema/src/main/java/net/corda/schema/Schemas.java index df9b7e2b8..1bc33ed4b 100644 --- a/data/topic-schema/src/main/java/net/corda/schema/Schemas.java +++ b/data/topic-schema/src/main/java/net/corda/schema/Schemas.java @@ -113,6 +113,8 @@ private Flow() { public static final String FLOW_MAPPER_SESSION_IN = "flow.mapper.session.in"; public static final String FLOW_START = "flow.start"; public static final String FLOW_SESSION = "flow.session"; + + public static final String FLOW_RETRY = "flow.retry"; public static final String EXTERNAL_MESSAGE_SAMPLE = "external.message.sample"; } diff --git a/data/topic-schema/src/main/resources/net/corda/schema/Flow.yaml b/data/topic-schema/src/main/resources/net/corda/schema/Flow.yaml index fc47fff16..2a6df8d34 100644 --- a/data/topic-schema/src/main/resources/net/corda/schema/Flow.yaml +++ b/data/topic-schema/src/main/resources/net/corda/schema/Flow.yaml @@ -89,4 +89,12 @@ topics: producers: - flow config: + FlowRetry: + name: flow.retry + consumers: + - flow + producers: + - flow + config: + diff --git a/gradle.properties b/gradle.properties index 2f92d360f..5adb08edc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,7 +5,7 @@ cordaProductVersion = 5.2.1 # NOTE: update this each time this module contains a breaking change ## NOTE: currently this is a top level revision, so all API versions will line up, but this could be moved to ## a per module property in which case module versions can change independently. -cordaApiRevision = 53 +cordaApiRevision = 54 # Main kotlin.stdlib.default.dependency = false From f7d81297adb36ade3ee978c01f15810e04a9888a Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 22 Oct 2024 13:31:27 +0100 Subject: [PATCH 2/7] CORE-20867 add max poll records constant --- .../java/net/corda/schema/configuration/MessagingConfig.java | 1 + 1 file changed, 1 insertion(+) diff --git a/data/config-schema/src/main/java/net/corda/schema/configuration/MessagingConfig.java b/data/config-schema/src/main/java/net/corda/schema/configuration/MessagingConfig.java index c850ec22c..17158ea39 100644 --- a/data/config-schema/src/main/java/net/corda/schema/configuration/MessagingConfig.java +++ b/data/config-schema/src/main/java/net/corda/schema/configuration/MessagingConfig.java @@ -25,6 +25,7 @@ private Bus() { public static final String KAFKA_BOOTSTRAP_SERVERS = KAFKA_PROPERTIES_COMMON + ".bootstrap.servers"; public static final String KAFKA_PROPERTIES_CONSUMER = KAFKA_PROPERTIES + ".consumer"; public static final String KAFKA_CONSUMER_MAX_POLL_INTERVAL = KAFKA_PROPERTIES_CONSUMER + ".max.poll.interval.ms"; + public static final String KAFKA_CONSUMER_MAX_POLL_RECORDS = KAFKA_PROPERTIES_CONSUMER + ".max.poll.records"; public static final String KAFKA_PROPERTIES_PRODUCER = KAFKA_PROPERTIES + ".producer"; public static final String KAFKA_PRODUCER_CLIENT_ID = KAFKA_PROPERTIES_PRODUCER + ".client.id"; From 58e9c5e4dce5de41b9806a45f62008310fccca55 Mon Sep 17 00:00:00 2001 From: LWogan Date: Tue, 22 Oct 2024 13:58:38 +0100 Subject: [PATCH 3/7] CORE-20867 add transient error retry limit timeout --- .../net/corda/schema/configuration/MessagingConfig.java | 1 + .../schema/configuration/messaging/1.0/subscription.json | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/data/config-schema/src/main/java/net/corda/schema/configuration/MessagingConfig.java b/data/config-schema/src/main/java/net/corda/schema/configuration/MessagingConfig.java index 17158ea39..449a14506 100644 --- a/data/config-schema/src/main/java/net/corda/schema/configuration/MessagingConfig.java +++ b/data/config-schema/src/main/java/net/corda/schema/configuration/MessagingConfig.java @@ -62,6 +62,7 @@ private Subscription() { public static final String MEDIATOR_PROCESSING_MIN_POOL_RECORD_COUNT = SUBSCRIPTION + ".mediator.minPoolRecordCount"; public static final String MEDIATOR_PROCESSING_POLL_TIMEOUT = SUBSCRIPTION + ".mediator.pollTimeout"; public static final String MEDIATOR_PROCESSING_PROCESSOR_TIMEOUT = SUBSCRIPTION + ".mediator.processorTimeout"; + public static final String MEDIATOR_PROCESSING_TRANSIENT_ERROR_TIMEOUT = SUBSCRIPTION + ".mediator.transientErrorRetryTimeout"; } /** diff --git a/data/config-schema/src/main/resources/net/corda/schema/configuration/messaging/1.0/subscription.json b/data/config-schema/src/main/resources/net/corda/schema/configuration/messaging/1.0/subscription.json index 9e611c427..6d9963ae2 100644 --- a/data/config-schema/src/main/resources/net/corda/schema/configuration/messaging/1.0/subscription.json +++ b/data/config-schema/src/main/resources/net/corda/schema/configuration/messaging/1.0/subscription.json @@ -79,6 +79,13 @@ "minimum": 1000, "maximum": 2147483647, "default": 120000 + }, + "transientErrorRetryTimeout": { + "description": "The length of time in milliseconds the mediator will allow an inter-worker RPC call to return transient errors before marking the state as failed. A retry topic is used to persist retries across multiple consumer polls.", + "type": "integer", + "minimum": 1000, + "maximum": 2147483647, + "default": 1200000 } } } From e84724ebd3ace0414d3e8c0dc01286c92dd8df32 Mon Sep 17 00:00:00 2001 From: LWogan Date: Thu, 24 Oct 2024 14:17:59 +0100 Subject: [PATCH 4/7] CORE-20867 add avro object to track sync request headers --- .../avro/net/corda/data/RequestHeader.avsc | 16 +++++++++++ .../avro/net/corda/data/RequestHeaders.avsc | 16 +++++++++++ .../net/corda/data/flow/event/FlowEvent.avsc | 3 ++- .../external/ExternalEventRetryRequest.avsc | 27 +++++++++++++++++++ 4 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeader.avsc create mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeaders.avsc create mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeader.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeader.avsc new file mode 100644 index 000000000..ef4e6c22d --- /dev/null +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeader.avsc @@ -0,0 +1,16 @@ +{ + "type": "record", + "name": "RequestHeader", + "namespace": "net.corda.data", + "doc": "Key-value pair of request header. Key is type String and value can be bytes or String.", + "fields": [ + { + "name": "key", + "type": "string" + }, + { + "name": "value", + "type": ["null", "string", "bytes"] + } + ] +} \ No newline at end of file diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeaders.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeaders.avsc new file mode 100644 index 000000000..bd966247b --- /dev/null +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeaders.avsc @@ -0,0 +1,16 @@ +{ + "type": "record", + "name": "RequestHeaders", + "namespace": "net.corda.data", + "doc": "Avro representation of the RPC Sync Request Headers", + "fields": [ + { + "name": "items", + "doc": "List of the headers.", + "type": { + "type": "array", + "items": "RequestHeader" + } + } + ] +} \ No newline at end of file diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/FlowEvent.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/FlowEvent.avsc index 53b7fcb9b..3ccff118e 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/FlowEvent.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/FlowEvent.avsc @@ -13,7 +13,8 @@ "net.corda.data.flow.event.StartFlow", "net.corda.data.flow.output.FlowStatus", "net.corda.data.flow.event.SessionEvent", - "net.corda.data.flow.event.external.ExternalEventResponse" + "net.corda.data.flow.event.external.ExternalEventResponse", + "net.corda.data.flow.event.external.ExternalEventRetryRequest" ] } ] diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc new file mode 100644 index 000000000..9e94361c3 --- /dev/null +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc @@ -0,0 +1,27 @@ +{ + "type": "record", + "name": "ExternalEventRetryRequest", + "namespace": "net.corda.data.flow.event.external", + "doc": "This event captures the details of the external event request to allow it to be polled from the bus and retried.", + "fields": [ + + { + "name": "payload", + "type": ["null", "bytes"], + "doc": "Avro serialized representation of the external event payload" + }, + { + "name": "requestHeaders", + "type": "net.corda.data.RequestHeaders", + "doc": "The key/value headers for the sync request." + }, + { + "name": "timestamp", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + }, + "doc": "Time ([Instant]) in milliseconds when the retry request was (re)created." + } + ] +} From 45eead8aa67f36fd3302a098822599f3d1bd666c Mon Sep 17 00:00:00 2001 From: LWogan Date: Fri, 25 Oct 2024 10:11:14 +0100 Subject: [PATCH 5/7] CORE-20867 simplify avro record --- .../external/ExternalEventRetryRequest.avsc | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc index 9e94361c3..2b097cc52 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc @@ -4,24 +4,10 @@ "namespace": "net.corda.data.flow.event.external", "doc": "This event captures the details of the external event request to allow it to be polled from the bus and retried.", "fields": [ - { - "name": "payload", - "type": ["null", "bytes"], - "doc": "Avro serialized representation of the external event payload" - }, - { - "name": "requestHeaders", - "type": "net.corda.data.RequestHeaders", - "doc": "The key/value headers for the sync request." - }, - { - "name": "timestamp", - "type": { - "type": "long", - "logicalType": "timestamp-millis" - }, - "doc": "Time ([Instant]) in milliseconds when the retry request was (re)created." + "name": "requestId", + "type": ["string"], + "doc": "The requestId of the external event request to retry." } ] } From e7fd64ea6def825fee032e8cbb28872fdf61e06e Mon Sep 17 00:00:00 2001 From: LWogan Date: Fri, 25 Oct 2024 10:39:08 +0100 Subject: [PATCH 6/7] CORE-20867 turn avro object into string --- .../data/flow/event/external/ExternalEventRetryRequest.avsc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc index 2b097cc52..53c345183 100644 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc +++ b/data/avro-schema/src/main/resources/avro/net/corda/data/flow/event/external/ExternalEventRetryRequest.avsc @@ -6,7 +6,7 @@ "fields": [ { "name": "requestId", - "type": ["string"], + "type": "string", "doc": "The requestId of the external event request to retry." } ] From 1453c995e799864129748a9f7da468a3375f3304 Mon Sep 17 00:00:00 2001 From: LWogan Date: Fri, 25 Oct 2024 13:18:06 +0100 Subject: [PATCH 7/7] CORE-20867 remove object not needed --- .../avro/net/corda/data/RequestHeader.avsc | 16 ---------------- .../avro/net/corda/data/RequestHeaders.avsc | 16 ---------------- 2 files changed, 32 deletions(-) delete mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeader.avsc delete mode 100644 data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeaders.avsc diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeader.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeader.avsc deleted file mode 100644 index ef4e6c22d..000000000 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeader.avsc +++ /dev/null @@ -1,16 +0,0 @@ -{ - "type": "record", - "name": "RequestHeader", - "namespace": "net.corda.data", - "doc": "Key-value pair of request header. Key is type String and value can be bytes or String.", - "fields": [ - { - "name": "key", - "type": "string" - }, - { - "name": "value", - "type": ["null", "string", "bytes"] - } - ] -} \ No newline at end of file diff --git a/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeaders.avsc b/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeaders.avsc deleted file mode 100644 index bd966247b..000000000 --- a/data/avro-schema/src/main/resources/avro/net/corda/data/RequestHeaders.avsc +++ /dev/null @@ -1,16 +0,0 @@ -{ - "type": "record", - "name": "RequestHeaders", - "namespace": "net.corda.data", - "doc": "Avro representation of the RPC Sync Request Headers", - "fields": [ - { - "name": "items", - "doc": "List of the headers.", - "type": { - "type": "array", - "items": "RequestHeader" - } - } - ] -} \ No newline at end of file