diff --git a/.changeset/hungry-boats-kneel.md b/.changeset/hungry-boats-kneel.md new file mode 100644 index 00000000000..f1514cfc4f2 --- /dev/null +++ b/.changeset/hungry-boats-kneel.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add telemetry for LLO plugin #added diff --git a/.tool-versions b/.tool-versions index bdf11a7ed21..7c999437710 100644 --- a/.tool-versions +++ b/.tool-versions @@ -5,6 +5,6 @@ pnpm 9.4.0 postgres 15.1 helm 3.10.3 golangci-lint 1.62.2 -protoc 25.1 +protoc 29.3 python 3.10.5 act 0.2.30 diff --git a/GNUmakefile b/GNUmakefile index 8e91e8df448..e41a3240073 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -168,7 +168,7 @@ codecgen: $(codecgen) ## Install codecgen .PHONY: protoc protoc: ## Install protoc - core/scripts/install-protoc.sh 25.1 / + core/scripts/install-protoc.sh 29.3 / go install google.golang.org/protobuf/cmd/protoc-gen-go@`go list -m -json google.golang.org/protobuf | jq -r .Version` go install github.com/smartcontractkit/wsrpc/cmd/protoc-gen-go-wsrpc@`go list -m -json github.com/smartcontractkit/wsrpc | jq -r .Version` diff --git a/core/capabilities/remote/types/messages.pb.go b/core/capabilities/remote/types/messages.pb.go index 0675bcc0f2a..dcbca5d6398 100644 --- a/core/capabilities/remote/types/messages.pb.go +++ b/core/capabilities/remote/types/messages.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 -// protoc v4.25.1 +// protoc-gen-go v1.36.4 +// protoc v5.29.3 // source: core/capabilities/remote/types/messages.proto package types @@ -11,6 +11,7 @@ import ( protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -79,12 +80,11 @@ func (Error) EnumDescriptor() ([]byte, []int) { } type Message struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Signature []byte `protobuf:"bytes,1,opt,name=signature,proto3" json:"signature,omitempty"` + Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` // proto-encoded MessageBody to sign unknownFields protoimpl.UnknownFields - - Signature []byte `protobuf:"bytes,1,opt,name=signature,proto3" json:"signature,omitempty"` - Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` // proto-encoded MessageBody to sign + sizeCache protoimpl.SizeCache } func (x *Message) Reset() { @@ -132,28 +132,27 @@ func (x *Message) GetBody() []byte { } type MessageBody struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` - Sender []byte `protobuf:"bytes,2,opt,name=sender,proto3" json:"sender,omitempty"` - Receiver []byte `protobuf:"bytes,3,opt,name=receiver,proto3" json:"receiver,omitempty"` - Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - MessageId []byte `protobuf:"bytes,5,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` // scoped to sender - CapabilityId string `protobuf:"bytes,6,opt,name=capability_id,json=capabilityId,proto3" json:"capability_id,omitempty"` - Method string `protobuf:"bytes,9,opt,name=method,proto3" json:"method,omitempty"` - Error Error `protobuf:"varint,10,opt,name=error,proto3,enum=remote.Error" json:"error,omitempty"` - ErrorMsg string `protobuf:"bytes,11,opt,name=errorMsg,proto3" json:"errorMsg,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Version uint32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` + Sender []byte `protobuf:"bytes,2,opt,name=sender,proto3" json:"sender,omitempty"` + Receiver []byte `protobuf:"bytes,3,opt,name=receiver,proto3" json:"receiver,omitempty"` + Timestamp int64 `protobuf:"varint,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + MessageId []byte `protobuf:"bytes,5,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` // scoped to sender + CapabilityId string `protobuf:"bytes,6,opt,name=capability_id,json=capabilityId,proto3" json:"capability_id,omitempty"` + Method string `protobuf:"bytes,9,opt,name=method,proto3" json:"method,omitempty"` + Error Error `protobuf:"varint,10,opt,name=error,proto3,enum=remote.Error" json:"error,omitempty"` + ErrorMsg string `protobuf:"bytes,11,opt,name=errorMsg,proto3" json:"errorMsg,omitempty"` // payload contains a CapabilityRequest or CapabilityResponse Payload []byte `protobuf:"bytes,12,opt,name=payload,proto3" json:"payload,omitempty"` - // Types that are assignable to Metadata: + // Types that are valid to be assigned to Metadata: // // *MessageBody_TriggerRegistrationMetadata // *MessageBody_TriggerEventMetadata Metadata isMessageBody_Metadata `protobuf_oneof:"metadata"` CapabilityDonId uint32 `protobuf:"varint,15,opt,name=capability_don_id,json=capabilityDonId,proto3" json:"capability_don_id,omitempty"` CallerDonId uint32 `protobuf:"varint,16,opt,name=caller_don_id,json=callerDonId,proto3" json:"caller_don_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *MessageBody) Reset() { @@ -256,23 +255,27 @@ func (x *MessageBody) GetPayload() []byte { return nil } -func (m *MessageBody) GetMetadata() isMessageBody_Metadata { - if m != nil { - return m.Metadata +func (x *MessageBody) GetMetadata() isMessageBody_Metadata { + if x != nil { + return x.Metadata } return nil } func (x *MessageBody) GetTriggerRegistrationMetadata() *TriggerRegistrationMetadata { - if x, ok := x.GetMetadata().(*MessageBody_TriggerRegistrationMetadata); ok { - return x.TriggerRegistrationMetadata + if x != nil { + if x, ok := x.Metadata.(*MessageBody_TriggerRegistrationMetadata); ok { + return x.TriggerRegistrationMetadata + } } return nil } func (x *MessageBody) GetTriggerEventMetadata() *TriggerEventMetadata { - if x, ok := x.GetMetadata().(*MessageBody_TriggerEventMetadata); ok { - return x.TriggerEventMetadata + if x != nil { + if x, ok := x.Metadata.(*MessageBody_TriggerEventMetadata); ok { + return x.TriggerEventMetadata + } } return nil } @@ -308,11 +311,10 @@ func (*MessageBody_TriggerRegistrationMetadata) isMessageBody_Metadata() {} func (*MessageBody_TriggerEventMetadata) isMessageBody_Metadata() {} type TriggerRegistrationMetadata struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - LastReceivedEventId string `protobuf:"bytes,1,opt,name=last_received_event_id,json=lastReceivedEventId,proto3" json:"last_received_event_id,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + LastReceivedEventId string `protobuf:"bytes,1,opt,name=last_received_event_id,json=lastReceivedEventId,proto3" json:"last_received_event_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *TriggerRegistrationMetadata) Reset() { @@ -353,12 +355,11 @@ func (x *TriggerRegistrationMetadata) GetLastReceivedEventId() string { } type TriggerEventMetadata struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - TriggerEventId string `protobuf:"bytes,1,opt,name=trigger_event_id,json=triggerEventId,proto3" json:"trigger_event_id,omitempty"` - WorkflowIds []string `protobuf:"bytes,2,rep,name=workflow_ids,json=workflowIds,proto3" json:"workflow_ids,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + TriggerEventId string `protobuf:"bytes,1,opt,name=trigger_event_id,json=triggerEventId,proto3" json:"trigger_event_id,omitempty"` + WorkflowIds []string `protobuf:"bytes,2,rep,name=workflow_ids,json=workflowIds,proto3" json:"workflow_ids,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *TriggerEventMetadata) Reset() { @@ -407,7 +408,7 @@ func (x *TriggerEventMetadata) GetWorkflowIds() []string { var File_core_capabilities_remote_types_messages_proto protoreflect.FileDescriptor -var file_core_capabilities_remote_types_messages_proto_rawDesc = []byte{ +var file_core_capabilities_remote_types_messages_proto_rawDesc = string([]byte{ 0x0a, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, @@ -475,16 +476,16 @@ var file_core_capabilities_remote_types_messages_proto_rawDesc = []byte{ 0x05, 0x42, 0x20, 0x5a, 0x1e, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x63, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +}) var ( file_core_capabilities_remote_types_messages_proto_rawDescOnce sync.Once - file_core_capabilities_remote_types_messages_proto_rawDescData = file_core_capabilities_remote_types_messages_proto_rawDesc + file_core_capabilities_remote_types_messages_proto_rawDescData []byte ) func file_core_capabilities_remote_types_messages_proto_rawDescGZIP() []byte { file_core_capabilities_remote_types_messages_proto_rawDescOnce.Do(func() { - file_core_capabilities_remote_types_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_core_capabilities_remote_types_messages_proto_rawDescData) + file_core_capabilities_remote_types_messages_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_core_capabilities_remote_types_messages_proto_rawDesc), len(file_core_capabilities_remote_types_messages_proto_rawDesc))) }) return file_core_capabilities_remote_types_messages_proto_rawDescData } @@ -522,7 +523,7 @@ func file_core_capabilities_remote_types_messages_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_core_capabilities_remote_types_messages_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_core_capabilities_remote_types_messages_proto_rawDesc), len(file_core_capabilities_remote_types_messages_proto_rawDesc)), NumEnums: 1, NumMessages: 4, NumExtensions: 0, @@ -534,7 +535,6 @@ func file_core_capabilities_remote_types_messages_proto_init() { MessageInfos: file_core_capabilities_remote_types_messages_proto_msgTypes, }.Build() File_core_capabilities_remote_types_messages_proto = out.File - file_core_capabilities_remote_types_messages_proto_rawDesc = nil file_core_capabilities_remote_types_messages_proto_goTypes = nil file_core_capabilities_remote_types_messages_proto_depIdxs = nil } diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 28d772587a5..aa72e0f8298 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -35,7 +35,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36 - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13 github.com/smartcontractkit/libocr v0.0.0-20241223215956-e5b78d8e3919 github.com/spf13/cobra v1.8.1 @@ -45,7 +45,7 @@ require ( github.com/umbracle/fastrlp v0.0.0-20220527094140-59d5dd30e722 github.com/urfave/cli v1.22.14 go.uber.org/zap v1.27.0 - google.golang.org/protobuf v1.35.1 + google.golang.org/protobuf v1.36.4 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index f60027da571..29c5cc91a8e 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1176,8 +1176,8 @@ github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f3 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36/go.mod h1:Z2e1ynSJ4pg83b4Qldbmryc5lmnrI3ojOdg1FUloa68= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 h1:w7w42ml8MOxdoyAZ9+og0342UkiH3deRM1V0Pj5JR5g= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4/go.mod h1:wtdAmAUMooLavbrTA7PgHg40lyDlKesxI/RR+5Xcz18= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 h1:GcPYNVFYjB065CNq0h8nK/VeU08nUkHgBX0cJIEpuHY= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 h1:CvDfgWoLoYPapOumE/UZCplfCu5oNmy9BuH+6V6+fJ8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 h1:lJi0dWfgNJl4Um5KzeZZPVBi//CPDfzzeVmv4Z2OGNY= @@ -1901,8 +1901,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/core/services/llo/attested_retirement_report.pb.go b/core/services/llo/attested_retirement_report.pb.go index b59f623e984..d093a8eda25 100644 --- a/core/services/llo/attested_retirement_report.pb.go +++ b/core/services/llo/attested_retirement_report.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 -// protoc v4.25.1 +// protoc-gen-go v1.36.4 +// protoc v5.29.3 // source: attested_retirement_report.proto package llo @@ -11,6 +11,7 @@ import ( protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -21,13 +22,12 @@ const ( ) type AttestedRetirementReport struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` RetirementReport []byte `protobuf:"bytes,1,opt,name=retirementReport,proto3" json:"retirementReport,omitempty"` SeqNr uint64 `protobuf:"varint,2,opt,name=seqNr,proto3" json:"seqNr,omitempty"` Sigs []*AttributedOnchainSignature `protobuf:"bytes,3,rep,name=sigs,proto3" json:"sigs,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *AttestedRetirementReport) Reset() { @@ -82,12 +82,11 @@ func (x *AttestedRetirementReport) GetSigs() []*AttributedOnchainSignature { } type AttributedOnchainSignature struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Signature []byte `protobuf:"bytes,1,opt,name=signature,proto3" json:"signature,omitempty"` + Signer uint32 `protobuf:"varint,2,opt,name=signer,proto3" json:"signer,omitempty"` unknownFields protoimpl.UnknownFields - - Signature []byte `protobuf:"bytes,1,opt,name=signature,proto3" json:"signature,omitempty"` - Signer uint32 `protobuf:"varint,2,opt,name=signer,proto3" json:"signer,omitempty"` + sizeCache protoimpl.SizeCache } func (x *AttributedOnchainSignature) Reset() { @@ -136,7 +135,7 @@ func (x *AttributedOnchainSignature) GetSigner() uint32 { var File_attested_retirement_report_proto protoreflect.FileDescriptor -var file_attested_retirement_report_proto_rawDesc = []byte{ +var file_attested_retirement_report_proto_rawDesc = string([]byte{ 0x0a, 0x20, 0x61, 0x74, 0x74, 0x65, 0x73, 0x74, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x74, 0x69, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x6c, 0x6c, 0x6f, 0x22, 0x91, 0x01, 0x0a, 0x18, 0x41, 0x74, 0x74, 0x65, @@ -159,16 +158,16 @@ var file_attested_retirement_report_proto_rawDesc = []byte{ 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x6c, 0x6c, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +}) var ( file_attested_retirement_report_proto_rawDescOnce sync.Once - file_attested_retirement_report_proto_rawDescData = file_attested_retirement_report_proto_rawDesc + file_attested_retirement_report_proto_rawDescData []byte ) func file_attested_retirement_report_proto_rawDescGZIP() []byte { file_attested_retirement_report_proto_rawDescOnce.Do(func() { - file_attested_retirement_report_proto_rawDescData = protoimpl.X.CompressGZIP(file_attested_retirement_report_proto_rawDescData) + file_attested_retirement_report_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_attested_retirement_report_proto_rawDesc), len(file_attested_retirement_report_proto_rawDesc))) }) return file_attested_retirement_report_proto_rawDescData } @@ -196,7 +195,7 @@ func file_attested_retirement_report_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_attested_retirement_report_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_attested_retirement_report_proto_rawDesc), len(file_attested_retirement_report_proto_rawDesc)), NumEnums: 0, NumMessages: 2, NumExtensions: 0, @@ -207,7 +206,6 @@ func file_attested_retirement_report_proto_init() { MessageInfos: file_attested_retirement_report_proto_msgTypes, }.Build() File_attested_retirement_report_proto = out.File - file_attested_retirement_report_proto_rawDesc = nil file_attested_retirement_report_proto_goTypes = nil file_attested_retirement_report_proto_depIdxs = nil } diff --git a/core/services/llo/bm/dummy_transmitter.go b/core/services/llo/bm/dummy_transmitter.go index f62635a7953..af07423b9a5 100644 --- a/core/services/llo/bm/dummy_transmitter.go +++ b/core/services/llo/bm/dummy_transmitter.go @@ -8,7 +8,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" - "github.com/smartcontractkit/libocr/offchainreporting2plus/types" ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink/v2/core/services/llo/evm" @@ -58,10 +57,10 @@ func (t *transmitter) Close() error { func (t *transmitter) Transmit( ctx context.Context, - digest types.ConfigDigest, + digest ocr2types.ConfigDigest, seqNr uint64, report ocr3types.ReportWithInfo[llotypes.ReportInfo], - sigs []types.AttributedOnchainSignature, + sigs []ocr2types.AttributedOnchainSignature, ) error { lggr := t.lggr { diff --git a/core/services/llo/data_source.go b/core/services/llo/data_source.go index 855ac7d9940..895dbdd3819 100644 --- a/core/services/llo/data_source.go +++ b/core/services/llo/data_source.go @@ -92,6 +92,7 @@ func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSour // Observe looks up all streams in the registry and populates a map of stream ID => value func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error { now := time.Now() + lggr := logger.With(d.lggr, "observationTimestamp", opts.ObservationTimestamp(), "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) if opts.VerboseLogging() { streamIDs := make([]streams.StreamID, 0, len(streamValues)) @@ -99,7 +100,8 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, streamIDs = append(streamIDs, streamID) } sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] }) - d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) + lggr = logger.With(lggr, "streamIDs", streamIDs) + lggr.Debugw("Observing streams") } var wg sync.WaitGroup @@ -110,8 +112,23 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, var errs []ErrObservationFailed // oc only lives for the duration of this Observe call - oc := NewObservationContext(d.registry, d.t) + oc := NewObservationContext(lggr, d.registry, d.t) + + // Telemetry + { + // Size needs to accommodate the max number of telemetry events that could be generated + // Standard case might be about 3 bridge requests per spec and one stream<=>spec + // Overallocate for safety (to avoid dropping packets) + telemCh := d.t.MakeTelemChannel(opts, 10*len(streamValues)) + if telemCh != nil { + ctx = pipeline.WithTelemetryCh(ctx, telemCh) + // After all Observations have returned, nothing else will be sent to the + // telemetry channel, so it can safely be closed + defer close(telemCh) + } + } + // Observe all streams concurrently for _, streamID := range maps.Keys(streamValues) { go func(streamID llotypes.StreamID) { defer wg.Done() @@ -138,11 +155,13 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, }(streamID) } + // Wait for all Observations to complete wg.Wait() - elapsed := time.Since(now) // Only log on errors or if VerboseLogging is turned on if len(errs) > 0 || opts.VerboseLogging() { + elapsed := time.Since(now) + slices.Sort(successfulStreamIDs) sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) @@ -153,7 +172,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, failedStreamIDs[i] = e.streamID } - lggr := logger.With(d.lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) + lggr = logger.With(lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs) if opts.VerboseLogging() { lggr = logger.With(lggr, "streamValues", streamValues) diff --git a/core/services/llo/data_source_test.go b/core/services/llo/data_source_test.go index 349ec70007d..df352de1efd 100644 --- a/core/services/llo/data_source_test.go +++ b/core/services/llo/data_source_test.go @@ -2,12 +2,15 @@ package llo import ( "context" + "encoding/hex" "errors" "fmt" "math" "math/big" + "sort" "sync" "testing" + "time" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" @@ -26,6 +29,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/llo/telem" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" ) @@ -86,10 +90,14 @@ func (m *mockOpts) OutCtx() ocr3types.OutcomeContext { func (m *mockOpts) ConfigDigest() ocr2types.ConfigDigest { return ocr2types.ConfigDigest{6, 5, 4} } +func (m *mockOpts) ObservationTimestamp() time.Time { + return time.Unix(1737936858, 0) +} type mockTelemeter struct { mu sync.Mutex v3PremiumLegacyPackets []v3PremiumLegacyPacket + ch chan interface{} } type v3PremiumLegacyPacket struct { @@ -109,6 +117,11 @@ func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline. m.v3PremiumLegacyPackets = append(m.v3PremiumLegacyPackets, v3PremiumLegacyPacket{run, trrs, streamID, opts, val, err}) } +func (m *mockTelemeter) MakeTelemChannel(opts llo.DSOpts, size int) (ch chan<- interface{}) { + m.ch = make(chan interface{}, size) + return m.ch +} + func Test_DataSource(t *testing.T) { lggr := logger.TestLogger(t) reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} @@ -165,7 +178,7 @@ func Test_DataSource(t *testing.T) { vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, llo.StreamValues{ 2: llo.ToDecimal(decimal.NewFromInt(40602)), @@ -184,7 +197,28 @@ func Test_DataSource(t *testing.T) { assert.Equal(t, 1, int(pkt.streamID)) assert.Equal(t, opts, pkt.opts) assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String()) - assert.Nil(t, pkt.err) + require.NoError(t, pkt.err) + + telems := []interface{}{} + for p := range tm.ch { + telems = append(telems, p) + } + require.Len(t, telems, 3) + sort.Slice(telems, func(i, j int) bool { + return telems[i].(*telem.LLOObservationTelemetry).StreamId < telems[j].(*telem.LLOObservationTelemetry).StreamId + }) + require.IsType(t, &telem.LLOObservationTelemetry{}, telems[0]) + obsTelem := telems[0].(*telem.LLOObservationTelemetry) + assert.Equal(t, uint32(1), obsTelem.StreamId) + assert.Equal(t, int32(llo.LLOStreamValue_Decimal), obsTelem.StreamValueType) + assert.Equal(t, "00000000020885", hex.EncodeToString(obsTelem.StreamValueBinary)) + assert.Equal(t, "2181", obsTelem.StreamValueText) + assert.Nil(t, obsTelem.ObservationError) + assert.Equal(t, int64(1737936858000000000), obsTelem.ObservationTimestamp) + assert.Greater(t, obsTelem.ObservationFinishedAt, int64(1737936858000000000)) + assert.Equal(t, uint32(0), obsTelem.DonId) + assert.Equal(t, opts.SeqNr(), obsTelem.SeqNr) + assert.Equal(t, opts.ConfigDigest().Hex(), hex.EncodeToString(obsTelem.ConfigDigest)) }) t.Run("records telemetry for errors", func(t *testing.T) { diff --git a/core/services/llo/observation_context.go b/core/services/llo/observation_context.go index 5bf82fa5a79..ef6cf6429cd 100644 --- a/core/services/llo/observation_context.go +++ b/core/services/llo/observation_context.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "sync" + "time" "github.com/shopspring/decimal" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/services/llo/telem" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -33,6 +36,7 @@ type execution struct { } type observationContext struct { + l logger.Logger r Registry t Telemeter @@ -41,16 +45,17 @@ type observationContext struct { executions map[streams.Pipeline]*execution } -func NewObservationContext(r Registry, t Telemeter) ObservationContext { - return newObservationContext(r, t) +func NewObservationContext(l logger.Logger, r Registry, t Telemeter) ObservationContext { + return newObservationContext(l, r, t) } -func newObservationContext(r Registry, t Telemeter) *observationContext { - return &observationContext{r, t, sync.Mutex{}, make(map[streams.Pipeline]*execution)} +func newObservationContext(l logger.Logger, r Registry, t Telemeter) *observationContext { + return &observationContext{l, r, t, sync.Mutex{}, make(map[streams.Pipeline]*execution)} } func (oc *observationContext) Observe(ctx context.Context, streamID streams.StreamID, opts llo.DSOpts) (val llo.StreamValue, err error) { run, trrs, err := oc.run(ctx, streamID) + observationFinishedAt := time.Now() if err != nil { // FIXME: This is a hack specific for V3 telemetry, future schemas should // use a generic stream value telemetry instead @@ -59,25 +64,65 @@ func (oc *observationContext) Observe(ctx context.Context, streamID streams.Stre return nil, err } // Extract stream value based on streamID attribute + found := false for _, trr := range trrs { if trr.Task.TaskStreamID() != nil && *trr.Task.TaskStreamID() == streamID { val, err = resultToStreamValue(trr.Result.Value) if err != nil { return nil, fmt.Errorf("failed to convert result to StreamValue for streamID %d: %w", streamID, err) } - return val, nil + if trr.FinishedAt.Valid { + observationFinishedAt = trr.FinishedAt.Time + } + found = true + break } } // If no streamID attribute is found in the task results, then assume the // final output is the stream ID and return that. This is safe to do since // the registry will never return a spec that doesn't match either by tag // or by spec streamID. - - val, err = extractFinalResultAsStreamValue(trrs) - // FIXME: This is a hack specific for V3 telemetry, future schemas should - // use a generic stream value telemetry instead - // https://smartcontract-it.atlassian.net/browse/MERC-6290 - oc.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, err) + if !found { + // FIXME: This is a hack specific for V3 telemetry, future schemas should + // use the generic stream value telemetry instead + // https://smartcontract-it.atlassian.net/browse/MERC-6290 + val, err = extractFinalResultAsStreamValue(trrs) + oc.t.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, err) + } + if ch := pipeline.GetTelemetryCh(ctx); ch != nil { + cd := opts.ConfigDigest() + ot := &telem.LLOObservationTelemetry{ + StreamId: streamID, + ObservationTimestamp: opts.ObservationTimestamp().UnixNano(), + ObservationFinishedAt: observationFinishedAt.UnixNano(), + SeqNr: opts.SeqNr(), + ConfigDigest: cd[:], + } + if err != nil { + ot.ObservationError = new(string) + *ot.ObservationError = err.Error() + } + if val != nil { + ot.StreamValueType = int32(val.Type()) + b, err := val.MarshalBinary() + if err != nil { + oc.l.Errorw("failed to MarshalBinary on stream value", "error", err) + } else { + ot.StreamValueBinary = b + } + s, err := val.MarshalText() + if err != nil { + oc.l.Errorw("failed to MarshalText on stream value", "error", err) + } else { + ot.StreamValueText = string(s) + } + } + select { + case ch <- ot: + default: + oc.l.Error("telemetry channel is full, dropping observation telemetry") + } + } return } diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index fe626815603..1ad1521393c 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -60,7 +60,8 @@ func TestObservationContext_Observe(t *testing.T) { ctx := tests.Context(t) r := &mockRegistry{} telem := &mockTelemeter{} - oc := newObservationContext(r, telem) + lggr := logger.TestLogger(t) + oc := newObservationContext(lggr, r, telem) opts := llo.DSOpts(nil) missingStreamID := streams.StreamID(0) @@ -128,7 +129,8 @@ func TestObservationContext_Observe_concurrencyStressTest(t *testing.T) { ctx := tests.Context(t) r := &mockRegistry{} telem := &mockTelemeter{} - oc := newObservationContext(r, telem) + lggr := logger.TestLogger(t) + oc := newObservationContext(lggr, r, telem) opts := llo.DSOpts(nil) streamID := streams.StreamID(1) @@ -252,7 +254,7 @@ result3 -> result3_parse -> multiply3; require.NoError(t, err) telem := &mockTelemeter{} - oc := newObservationContext(r, telem) + oc := newObservationContext(lggr, r, telem) opts := llo.DSOpts(nil) val, err := oc.Observe(ctx, streams.StreamID(1), opts) @@ -337,7 +339,7 @@ result3 -> result3_parse -> multiply3; } telem := &mockTelemeter{} - oc := newObservationContext(r, telem) + oc := newObservationContext(lggr, r, telem) opts := llo.DSOpts(nil) // concurrency stress test diff --git a/core/services/llo/telem/generate.go b/core/services/llo/telem/generate.go new file mode 100644 index 00000000000..b3f5649b498 --- /dev/null +++ b/core/services/llo/telem/generate.go @@ -0,0 +1,3 @@ +package telem + +//go:generate protoc --go_out=. --go_opt=paths=source_relative telem_streams.proto diff --git a/core/services/llo/telem/telem_streams.pb.go b/core/services/llo/telem/telem_streams.pb.go new file mode 100644 index 00000000000..1ea10cf35ab --- /dev/null +++ b/core/services/llo/telem/telem_streams.pb.go @@ -0,0 +1,429 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.4 +// protoc v5.29.3 +// source: telem_streams.proto + +package telem + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type LLOBridgeTelemetry struct { + state protoimpl.MessageState `protogen:"open.v1"` + BridgeAdapterName string `protobuf:"bytes,1,opt,name=bridge_adapter_name,json=bridgeAdapterName,proto3" json:"bridge_adapter_name,omitempty"` + BridgeRequestData []byte `protobuf:"bytes,2,opt,name=bridge_request_data,json=bridgeRequestData,proto3" json:"bridge_request_data,omitempty"` + BridgeResponseData []byte `protobuf:"bytes,3,opt,name=bridge_response_data,json=bridgeResponseData,proto3" json:"bridge_response_data,omitempty"` + BridgeResponseError *string `protobuf:"bytes,4,opt,name=bridge_response_error,json=bridgeResponseError,proto3,oneof" json:"bridge_response_error,omitempty"` + BridgeResponseStatusCode int32 `protobuf:"varint,5,opt,name=bridge_response_status_code,json=bridgeResponseStatusCode,proto3" json:"bridge_response_status_code,omitempty"` + RequestStartTimestamp int64 `protobuf:"varint,6,opt,name=request_start_timestamp,json=requestStartTimestamp,proto3" json:"request_start_timestamp,omitempty"` + RequestFinishTimestamp int64 `protobuf:"varint,7,opt,name=request_finish_timestamp,json=requestFinishTimestamp,proto3" json:"request_finish_timestamp,omitempty"` + LocalCacheHit bool `protobuf:"varint,8,opt,name=local_cache_hit,json=localCacheHit,proto3" json:"local_cache_hit,omitempty"` + SpecId int32 `protobuf:"varint,9,opt,name=spec_id,json=specId,proto3" json:"spec_id,omitempty"` + StreamId *uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3,oneof" json:"stream_id,omitempty"` + DotId string `protobuf:"bytes,11,opt,name=dot_id,json=dotId,proto3" json:"dot_id,omitempty"` + DonId uint32 `protobuf:"varint,12,opt,name=don_id,json=donId,proto3" json:"don_id,omitempty"` + SeqNr uint64 `protobuf:"varint,13,opt,name=seq_nr,json=seqNr,proto3" json:"seq_nr,omitempty"` + ConfigDigest []byte `protobuf:"bytes,14,opt,name=config_digest,json=configDigest,proto3" json:"config_digest,omitempty"` + ObservationTimestamp int64 `protobuf:"varint,15,opt,name=observation_timestamp,json=observationTimestamp,proto3" json:"observation_timestamp,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LLOBridgeTelemetry) Reset() { + *x = LLOBridgeTelemetry{} + mi := &file_telem_streams_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LLOBridgeTelemetry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOBridgeTelemetry) ProtoMessage() {} + +func (x *LLOBridgeTelemetry) ProtoReflect() protoreflect.Message { + mi := &file_telem_streams_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOBridgeTelemetry.ProtoReflect.Descriptor instead. +func (*LLOBridgeTelemetry) Descriptor() ([]byte, []int) { + return file_telem_streams_proto_rawDescGZIP(), []int{0} +} + +func (x *LLOBridgeTelemetry) GetBridgeAdapterName() string { + if x != nil { + return x.BridgeAdapterName + } + return "" +} + +func (x *LLOBridgeTelemetry) GetBridgeRequestData() []byte { + if x != nil { + return x.BridgeRequestData + } + return nil +} + +func (x *LLOBridgeTelemetry) GetBridgeResponseData() []byte { + if x != nil { + return x.BridgeResponseData + } + return nil +} + +func (x *LLOBridgeTelemetry) GetBridgeResponseError() string { + if x != nil && x.BridgeResponseError != nil { + return *x.BridgeResponseError + } + return "" +} + +func (x *LLOBridgeTelemetry) GetBridgeResponseStatusCode() int32 { + if x != nil { + return x.BridgeResponseStatusCode + } + return 0 +} + +func (x *LLOBridgeTelemetry) GetRequestStartTimestamp() int64 { + if x != nil { + return x.RequestStartTimestamp + } + return 0 +} + +func (x *LLOBridgeTelemetry) GetRequestFinishTimestamp() int64 { + if x != nil { + return x.RequestFinishTimestamp + } + return 0 +} + +func (x *LLOBridgeTelemetry) GetLocalCacheHit() bool { + if x != nil { + return x.LocalCacheHit + } + return false +} + +func (x *LLOBridgeTelemetry) GetSpecId() int32 { + if x != nil { + return x.SpecId + } + return 0 +} + +func (x *LLOBridgeTelemetry) GetStreamId() uint32 { + if x != nil && x.StreamId != nil { + return *x.StreamId + } + return 0 +} + +func (x *LLOBridgeTelemetry) GetDotId() string { + if x != nil { + return x.DotId + } + return "" +} + +func (x *LLOBridgeTelemetry) GetDonId() uint32 { + if x != nil { + return x.DonId + } + return 0 +} + +func (x *LLOBridgeTelemetry) GetSeqNr() uint64 { + if x != nil { + return x.SeqNr + } + return 0 +} + +func (x *LLOBridgeTelemetry) GetConfigDigest() []byte { + if x != nil { + return x.ConfigDigest + } + return nil +} + +func (x *LLOBridgeTelemetry) GetObservationTimestamp() int64 { + if x != nil { + return x.ObservationTimestamp + } + return 0 +} + +type LLOObservationTelemetry struct { + state protoimpl.MessageState `protogen:"open.v1"` + StreamId uint32 `protobuf:"varint,1,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` + StreamValueType int32 `protobuf:"varint,2,opt,name=stream_value_type,json=streamValueType,proto3" json:"stream_value_type,omitempty"` + StreamValueBinary []byte `protobuf:"bytes,3,opt,name=stream_value_binary,json=streamValueBinary,proto3" json:"stream_value_binary,omitempty"` + StreamValueText string `protobuf:"bytes,4,opt,name=stream_value_text,json=streamValueText,proto3" json:"stream_value_text,omitempty"` + ObservationError *string `protobuf:"bytes,5,opt,name=observation_error,json=observationError,proto3,oneof" json:"observation_error,omitempty"` + ObservationTimestamp int64 `protobuf:"varint,6,opt,name=observation_timestamp,json=observationTimestamp,proto3" json:"observation_timestamp,omitempty"` + ObservationFinishedAt int64 `protobuf:"varint,7,opt,name=observation_finished_at,json=observationFinishedAt,proto3" json:"observation_finished_at,omitempty"` + DonId uint32 `protobuf:"varint,8,opt,name=don_id,json=donId,proto3" json:"don_id,omitempty"` + SeqNr uint64 `protobuf:"varint,9,opt,name=seq_nr,json=seqNr,proto3" json:"seq_nr,omitempty"` + ConfigDigest []byte `protobuf:"bytes,10,opt,name=config_digest,json=configDigest,proto3" json:"config_digest,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LLOObservationTelemetry) Reset() { + *x = LLOObservationTelemetry{} + mi := &file_telem_streams_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LLOObservationTelemetry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LLOObservationTelemetry) ProtoMessage() {} + +func (x *LLOObservationTelemetry) ProtoReflect() protoreflect.Message { + mi := &file_telem_streams_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LLOObservationTelemetry.ProtoReflect.Descriptor instead. +func (*LLOObservationTelemetry) Descriptor() ([]byte, []int) { + return file_telem_streams_proto_rawDescGZIP(), []int{1} +} + +func (x *LLOObservationTelemetry) GetStreamId() uint32 { + if x != nil { + return x.StreamId + } + return 0 +} + +func (x *LLOObservationTelemetry) GetStreamValueType() int32 { + if x != nil { + return x.StreamValueType + } + return 0 +} + +func (x *LLOObservationTelemetry) GetStreamValueBinary() []byte { + if x != nil { + return x.StreamValueBinary + } + return nil +} + +func (x *LLOObservationTelemetry) GetStreamValueText() string { + if x != nil { + return x.StreamValueText + } + return "" +} + +func (x *LLOObservationTelemetry) GetObservationError() string { + if x != nil && x.ObservationError != nil { + return *x.ObservationError + } + return "" +} + +func (x *LLOObservationTelemetry) GetObservationTimestamp() int64 { + if x != nil { + return x.ObservationTimestamp + } + return 0 +} + +func (x *LLOObservationTelemetry) GetObservationFinishedAt() int64 { + if x != nil { + return x.ObservationFinishedAt + } + return 0 +} + +func (x *LLOObservationTelemetry) GetDonId() uint32 { + if x != nil { + return x.DonId + } + return 0 +} + +func (x *LLOObservationTelemetry) GetSeqNr() uint64 { + if x != nil { + return x.SeqNr + } + return 0 +} + +func (x *LLOObservationTelemetry) GetConfigDigest() []byte { + if x != nil { + return x.ConfigDigest + } + return nil +} + +var File_telem_streams_proto protoreflect.FileDescriptor + +var file_telem_streams_proto_rawDesc = string([]byte{ + 0x0a, 0x13, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x22, 0xba, 0x05, 0x0a, + 0x12, 0x4c, 0x4c, 0x4f, 0x42, 0x72, 0x69, 0x64, 0x67, 0x65, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, + 0x74, 0x72, 0x79, 0x12, 0x2e, 0x0a, 0x13, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x61, 0x64, + 0x61, 0x70, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x11, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x41, 0x64, 0x61, 0x70, 0x74, 0x65, 0x72, 0x4e, + 0x61, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x11, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44, + 0x61, 0x74, 0x61, 0x12, 0x30, 0x0a, 0x14, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x72, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x12, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x37, 0x0a, 0x15, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, + 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x13, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x12, 0x3d, + 0x0a, 0x1b, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x18, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x36, 0x0a, + 0x17, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x38, 0x0a, 0x18, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x5f, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x16, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x46, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, + 0x26, 0x0a, 0x0f, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x5f, 0x63, 0x61, 0x63, 0x68, 0x65, 0x5f, 0x68, + 0x69, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x43, + 0x61, 0x63, 0x68, 0x65, 0x48, 0x69, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x70, 0x65, 0x63, 0x5f, + 0x69, 0x64, 0x18, 0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x73, 0x70, 0x65, 0x63, 0x49, 0x64, + 0x12, 0x20, 0x0a, 0x09, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x69, 0x64, 0x18, 0x0a, 0x20, + 0x01, 0x28, 0x0d, 0x48, 0x01, 0x52, 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x64, 0x88, + 0x01, 0x01, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x6f, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x0b, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x64, 0x6f, 0x74, 0x49, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x6f, 0x6e, + 0x5f, 0x69, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x64, 0x6f, 0x6e, 0x49, 0x64, + 0x12, 0x15, 0x0a, 0x06, 0x73, 0x65, 0x71, 0x5f, 0x6e, 0x72, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x05, 0x73, 0x65, 0x71, 0x4e, 0x72, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x5f, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, + 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, 0x65, 0x73, 0x74, 0x12, 0x33, 0x0a, 0x15, + 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x03, 0x52, 0x14, 0x6f, 0x62, 0x73, + 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x42, 0x18, 0x0a, 0x16, 0x5f, 0x62, 0x72, 0x69, 0x64, 0x67, 0x65, 0x5f, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x69, 0x64, 0x22, 0xc6, 0x03, 0x0a, 0x17, 0x4c, 0x4c, + 0x4f, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x65, 0x6c, 0x65, + 0x6d, 0x65, 0x74, 0x72, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x49, 0x64, 0x12, 0x2a, 0x0a, 0x11, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0f, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x2e, + 0x0a, 0x13, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x62, + 0x69, 0x6e, 0x61, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x11, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x12, 0x2a, + 0x0a, 0x11, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x5f, 0x74, + 0x65, 0x78, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x54, 0x65, 0x78, 0x74, 0x12, 0x30, 0x0a, 0x11, 0x6f, 0x62, + 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x10, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x12, 0x33, 0x0a, 0x15, + 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x14, 0x6f, 0x62, 0x73, + 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x12, 0x36, 0x0a, 0x17, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x5f, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x15, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, + 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x41, 0x74, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x6f, 0x6e, + 0x5f, 0x69, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x64, 0x6f, 0x6e, 0x49, 0x64, + 0x12, 0x15, 0x0a, 0x06, 0x73, 0x65, 0x71, 0x5f, 0x6e, 0x72, 0x18, 0x09, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x05, 0x73, 0x65, 0x71, 0x4e, 0x72, 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x5f, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0c, + 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x44, 0x69, 0x67, 0x65, 0x73, 0x74, 0x42, 0x14, 0x0a, 0x12, + 0x5f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x42, 0x42, 0x5a, 0x40, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, + 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x32, 0x2f, 0x63, + 0x6f, 0x72, 0x65, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x6c, 0x6c, 0x6f, + 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +}) + +var ( + file_telem_streams_proto_rawDescOnce sync.Once + file_telem_streams_proto_rawDescData []byte +) + +func file_telem_streams_proto_rawDescGZIP() []byte { + file_telem_streams_proto_rawDescOnce.Do(func() { + file_telem_streams_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_telem_streams_proto_rawDesc), len(file_telem_streams_proto_rawDesc))) + }) + return file_telem_streams_proto_rawDescData +} + +var file_telem_streams_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_telem_streams_proto_goTypes = []any{ + (*LLOBridgeTelemetry)(nil), // 0: telem.LLOBridgeTelemetry + (*LLOObservationTelemetry)(nil), // 1: telem.LLOObservationTelemetry +} +var file_telem_streams_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_telem_streams_proto_init() } +func file_telem_streams_proto_init() { + if File_telem_streams_proto != nil { + return + } + file_telem_streams_proto_msgTypes[0].OneofWrappers = []any{} + file_telem_streams_proto_msgTypes[1].OneofWrappers = []any{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_telem_streams_proto_rawDesc), len(file_telem_streams_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_telem_streams_proto_goTypes, + DependencyIndexes: file_telem_streams_proto_depIdxs, + MessageInfos: file_telem_streams_proto_msgTypes, + }.Build() + File_telem_streams_proto = out.File + file_telem_streams_proto_goTypes = nil + file_telem_streams_proto_depIdxs = nil +} diff --git a/core/services/llo/telem/telem_streams.proto b/core/services/llo/telem/telem_streams.proto new file mode 100644 index 00000000000..1063a20ec34 --- /dev/null +++ b/core/services/llo/telem/telem_streams.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink/v2/core/services/llo/telem"; + +package telem; + +message LLOBridgeTelemetry { + string bridge_adapter_name = 1; + bytes bridge_request_data = 2; + bytes bridge_response_data = 3; + optional string bridge_response_error = 4; + int32 bridge_response_status_code = 5; + int64 request_start_timestamp=6; + int64 request_finish_timestamp=7; + bool local_cache_hit = 8; + + int32 spec_id = 9; + optional uint32 stream_id = 10; + string dot_id = 11; + uint32 don_id = 12; + uint64 seq_nr = 13; + bytes config_digest = 14; + + int64 observation_timestamp = 15; +} + +message LLOObservationTelemetry { + uint32 stream_id = 1; + int32 stream_value_type = 2; + bytes stream_value_binary = 3; + string stream_value_text = 4; + optional string observation_error = 5; + int64 observation_timestamp = 6; + int64 observation_finished_at = 7; + uint32 don_id = 8; + uint64 seq_nr = 9; + bytes config_digest = 10; +} diff --git a/core/services/llo/telemetry.go b/core/services/llo/telemetry.go index 0b315d78d2b..e604d4027c1 100644 --- a/core/services/llo/telemetry.go +++ b/core/services/llo/telemetry.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "github.com/smartcontractkit/libocr/commontypes" "google.golang.org/protobuf/proto" @@ -13,17 +14,19 @@ import ( "github.com/smartcontractkit/chainlink-data-streams/llo" "github.com/smartcontractkit/chainlink/v2/core/services/llo/evm" + "github.com/smartcontractkit/chainlink/v2/core/services/llo/telem" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/eautils" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" - "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" + legacytelem "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" ) const adapterLWBAErrorName = "AdapterLWBAError" type Telemeter interface { EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) + MakeTelemChannel(opts llo.DSOpts, size int) (ch chan<- interface{}) } type TelemeterService interface { @@ -43,11 +46,12 @@ func newTelemeter(lggr logger.Logger, monitoringEndpoint commontypes.MonitoringE // feed) so we need to make sure the buffer is large enough. // // 2000 feeds * 5s/250ms = 40_000 should hold ~5s of buffer in the worst case. - chTelemetryObservation := make(chan TelemetryObservation, 40_000) + chTelemetryPipeline := make(chan *TelemetryPipeline, 40_000) t := &telemeter{ - chTelemetryObservation: chTelemetryObservation, - monitoringEndpoint: monitoringEndpoint, - donID: donID, + chTelemetryPipeline: chTelemetryPipeline, + monitoringEndpoint: monitoringEndpoint, + donID: donID, + chch: make(chan telemetryCollectionContext, 1000), // chch should be consumed from very quickly so we don't need a large buffer, but it also won't hurt } t.Service, t.eng = services.Config{ Name: "LLOTelemeterService", @@ -61,16 +65,17 @@ type telemeter struct { services.Service eng *services.Engine - monitoringEndpoint commontypes.MonitoringEndpoint - chTelemetryObservation chan TelemetryObservation - donID uint32 + monitoringEndpoint commontypes.MonitoringEndpoint + chTelemetryPipeline chan *TelemetryPipeline + donID uint32 + chch chan telemetryCollectionContext } func (t *telemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) { if t.Service.Ready() != nil { // This should never happen, telemeter should always be started BEFORE // the oracle and closed AFTER it - t.eng.SugaredLogger.Errorw("Telemeter not ready, dropping observation", "run", run, "streamID", streamID, "opts", opts, "val", val, "err", err) + t.eng.Errorw("Telemeter not ready, dropping observation", "run", run, "streamID", streamID, "opts", opts, "val", val, "err", err) return } var adapterError *eautils.AdapterError @@ -81,20 +86,70 @@ func (t *telemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.Task // ignore errors return } - tObs := TelemetryObservation{run, trrs, streamID, opts, val, dpInvariantViolationDetected} + tp := &TelemetryPipeline{run, trrs, streamID, opts, val, dpInvariantViolationDetected} select { - case t.chTelemetryObservation <- tObs: + case t.chTelemetryPipeline <- tp: default: } } +type telemetryCollectionContext struct { + in <-chan interface{} + opts llo.DSOpts +} + +// CollectTelemetryObserve reads telem packets from the returned channel and +// sends them to the monitoring endpoint. Stops reading when channel closed or +// when telemeter is stopped +// +// CALLER IS RESPONSIBLE FOR CLOSING THE RETURNED CHANNEL TO AVOID MEMORY +// LEAKS. +func (t *telemeter) MakeTelemChannel(opts llo.DSOpts, size int) chan<- interface{} { + ch := make(chan interface{}, size) + tcc := telemetryCollectionContext{ + in: ch, + opts: opts, + } + + select { + case t.chch <- tcc: + default: + // This should be performant enough with buffer of t.chch large enough + // that we never hit this case, however, we should NEVER block + // observations on telemetry even if something pathological happens. + t.eng.Errorw("Telemeter chch full, will not record telemetry", "seqNr", opts.SeqNr()) + return nil + } + return ch +} + func (t *telemeter) start(_ context.Context) error { t.eng.Go(func(ctx context.Context) { + wg := sync.WaitGroup{} for { select { - case tObs := <-t.chTelemetryObservation: - t.collectV3PremiumLegacyTelemetry(tObs) + case tcc := <-t.chch: + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case p, ok := <-tcc.in: + if !ok { + // channel closed by producer + return + } + t.collectTelemetry(p, tcc.opts) + case <-ctx.Done(): + return + } + } + }() + + case p := <-t.chTelemetryPipeline: + t.collectV3PremiumLegacyTelemetry(p) case <-ctx.Done(): + wg.Wait() return } } @@ -102,7 +157,45 @@ func (t *telemeter) start(_ context.Context) error { return nil } -func (t *telemeter) collectV3PremiumLegacyTelemetry(d TelemetryObservation) { +func (t *telemeter) collectTelemetry(p interface{}, opts llo.DSOpts) { + var msg proto.Message + switch v := p.(type) { + case *pipeline.BridgeTelemetry: + cd := opts.ConfigDigest() + msg = &telem.LLOBridgeTelemetry{ + BridgeAdapterName: v.Name, + BridgeRequestData: v.RequestData, + BridgeResponseData: v.ResponseData, + BridgeResponseError: v.ResponseError, + BridgeResponseStatusCode: int32(v.ResponseStatusCode), //nolint:gosec // G115 // even if overflow does happen, its harmless + RequestStartTimestamp: v.RequestStartTimestamp.UnixNano(), + RequestFinishTimestamp: v.RequestFinishTimestamp.UnixNano(), + LocalCacheHit: v.LocalCacheHit, + SpecId: v.SpecID, + StreamId: v.StreamID, + DotId: v.DotID, + DonId: t.donID, + SeqNr: opts.SeqNr(), + ConfigDigest: cd[:], + ObservationTimestamp: opts.ObservationTimestamp().UnixNano(), + } + case *telem.LLOObservationTelemetry: + v.DonId = t.donID + msg = v + default: + t.eng.Warnw("Unknown telemetry type", "type", fmt.Sprintf("%T", p)) + return + } + bytes, err := proto.Marshal(msg) + if err != nil { + t.eng.Warnf("protobuf marshal failed %v", err.Error()) + return + } + + t.monitoringEndpoint.SendLog(bytes) +} + +func (t *telemeter) collectV3PremiumLegacyTelemetry(d *TelemetryPipeline) { eaTelemetryValues := ocrcommon.ParseMercuryEATelemetry(t.eng.SugaredLogger, d.trrs, mercuryutils.REPORT_V3) for _, eaTelem := range eaTelemetryValues { var benchmarkPrice, bidPrice, askPrice int64 @@ -119,7 +212,7 @@ func (t *telemeter) collectV3PremiumLegacyTelemetry(d TelemetryObservation) { askPrice = v.Ask.IntPart() ask = v.Ask.String() } - tea := &telem.EnhancedEAMercury{ + tea := &legacytelem.EnhancedEAMercury{ DataSource: eaTelem.DataSource, DpBenchmarkPrice: eaTelem.DpBenchmarkPrice, DpBid: eaTelem.DpBid, @@ -147,7 +240,7 @@ func (t *telemeter) collectV3PremiumLegacyTelemetry(d TelemetryObservation) { } epoch, round, err := evm.SeqNrToEpochAndRound(d.opts.OutCtx().SeqNr) if err != nil { - t.eng.SugaredLogger.Warnw("Failed to convert sequence number to epoch and round", "err", err) + t.eng.Warnw("Failed to convert sequence number to epoch and round", "err", err) } else { tea.Round = int64(round) tea.Epoch = int64(epoch) @@ -155,7 +248,7 @@ func (t *telemeter) collectV3PremiumLegacyTelemetry(d TelemetryObservation) { bytes, err := proto.Marshal(tea) if err != nil { - t.eng.SugaredLogger.Warnf("protobuf marshal failed %v", err.Error()) + t.eng.Warnf("protobuf marshal failed %v", err.Error()) continue } @@ -163,7 +256,12 @@ func (t *telemeter) collectV3PremiumLegacyTelemetry(d TelemetryObservation) { } } -type TelemetryObservation struct { +type TelemetryObserve struct { + Opts llo.DSOpts + Telemetry interface{} +} + +type TelemetryPipeline struct { run *pipeline.Run trrs pipeline.TaskRunResults streamID uint32 @@ -178,6 +276,9 @@ type nullTelemeter struct{} func (t *nullTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) { } +func (t *nullTelemeter) MakeTelemChannel(opts llo.DSOpts, size int) (ch chan<- interface{}) { + return nil +} func (t *nullTelemeter) Start(context.Context) error { return nil } diff --git a/core/services/llo/telemetry_test.go b/core/services/llo/telemetry_test.go index ec650bedb83..1ae78a2e0fa 100644 --- a/core/services/llo/telemetry_test.go +++ b/core/services/llo/telemetry_test.go @@ -1,6 +1,7 @@ package llo import ( + "encoding/hex" "errors" "testing" "time" @@ -9,15 +10,18 @@ import ( "github.com/smartcontractkit/libocr/commontypes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/proto" "gopkg.in/guregu/null.v4" "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/llo/telem" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/eautils" - "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" + legacytelem "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem" - "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" ) @@ -106,8 +110,8 @@ var trrs = pipeline.TaskRunResults{ }, } -func Test_Telemeter(t *testing.T) { - lggr := logger.Test(t) +func Test_Telemeter_v3PremiumLegacy(t *testing.T) { + lggr := logger.TestLogger(t) m := &mockMonitoringEndpoint{} run := &pipeline.Run{ID: 42} @@ -132,7 +136,7 @@ func Test_Telemeter(t *testing.T) { var i int for log := range m.chLogs { - decoded := &telem.EnhancedEAMercury{} + decoded := &legacytelem.EnhancedEAMercury{} require.NoError(t, proto.Unmarshal(log, decoded)) assert.True(t, decoded.DpInvariantViolationDetected) if i == 2 { @@ -150,10 +154,10 @@ func Test_Telemeter(t *testing.T) { var i int for log := range m.chLogs { - decoded := &telem.EnhancedEAMercury{} + decoded := &legacytelem.EnhancedEAMercury{} require.NoError(t, proto.Unmarshal(log, decoded)) assert.Equal(t, int(1003), int(decoded.Version)) - assert.Equal(t, float64(123456.123456789), decoded.DpBenchmarkPrice) + assert.InDelta(t, float64(123456.123456789), decoded.DpBenchmarkPrice, 0.0000000001) assert.Zero(t, decoded.DpBid) assert.Zero(t, decoded.DpAsk) assert.False(t, decoded.DpInvariantViolationDetected) @@ -200,7 +204,7 @@ func Test_Telemeter(t *testing.T) { var i int for log := range m.chLogs { - decoded := &telem.EnhancedEAMercury{} + decoded := &legacytelem.EnhancedEAMercury{} require.NoError(t, proto.Unmarshal(log, decoded)) assert.Equal(t, int64(103), decoded.ObservationBenchmarkPrice) assert.Equal(t, "103.32", decoded.ObservationBenchmarkPriceString) @@ -216,3 +220,107 @@ func Test_Telemeter(t *testing.T) { } }) } + +func Test_Telemeter_observationTelemetry(t *testing.T) { + lggr := logger.TestLogger(t) + + donID := uint32(1) + + opts := &mockOpts{} + + t.Run("transmits *pipeline.BridgeTelemetry", func(t *testing.T) { + t.Parallel() + m := &mockMonitoringEndpoint{chLogs: make(chan []byte, 100)} + tm := newTelemeter(lggr, m, donID) + servicetest.Run(t, tm) + ch := tm.MakeTelemChannel(opts, 100) + + ch <- &pipeline.BridgeTelemetry{ + Name: "test-bridge-1", + RequestData: []byte(`foo`), + ResponseData: []byte(`bar`), + ResponseError: ptr("test error"), + ResponseStatusCode: 200, + RequestStartTimestamp: time.Unix(1, 1), + RequestFinishTimestamp: time.Unix(2, 1), + LocalCacheHit: true, + SpecID: 3, + StreamID: ptr(uint32(135)), + DotID: "ds1", + } + + log := <-m.chLogs + decoded := &telem.LLOBridgeTelemetry{} + require.NoError(t, proto.Unmarshal(log, decoded)) + assert.Equal(t, "test-bridge-1", decoded.BridgeAdapterName) + assert.Equal(t, []byte(`foo`), decoded.BridgeRequestData) + assert.Equal(t, []byte(`bar`), decoded.BridgeResponseData) + require.NotNil(t, decoded.BridgeResponseError) + assert.Equal(t, "test error", *decoded.BridgeResponseError) + assert.Equal(t, int32(200), decoded.BridgeResponseStatusCode) + assert.Equal(t, int64(1000000001), decoded.RequestStartTimestamp) + assert.Equal(t, int64(2000000001), decoded.RequestFinishTimestamp) + assert.True(t, decoded.LocalCacheHit) + assert.Equal(t, int32(3), decoded.SpecId) + require.NotNil(t, decoded.StreamId) + assert.Equal(t, uint32(135), *decoded.StreamId) + assert.Equal(t, "ds1", decoded.DotId) + + // added by telemeter + assert.Equal(t, donID, decoded.DonId) + assert.Equal(t, opts.SeqNr(), decoded.SeqNr) + assert.Equal(t, opts.ConfigDigest().Hex(), hex.EncodeToString(decoded.ConfigDigest)) + assert.Equal(t, opts.ObservationTimestamp().UnixNano(), decoded.ObservationTimestamp) + }) + t.Run("transmits *telem.LLOObservationTelemetry", func(t *testing.T) { + t.Parallel() + m := &mockMonitoringEndpoint{chLogs: make(chan []byte, 100)} + tm := newTelemeter(lggr, m, donID) + servicetest.Run(t, tm) + ch := tm.MakeTelemChannel(opts, 100) + + ch <- &telem.LLOObservationTelemetry{ + StreamId: 135, + StreamValueType: 1, + StreamValueBinary: []byte{0x01, 0x02, 0x03}, + StreamValueText: "stream value text", + ObservationError: ptr("test error"), + ObservationTimestamp: time.Unix(1, 1).UnixNano(), + ObservationFinishedAt: time.Unix(2, 1).UnixNano(), + SeqNr: 42, + ConfigDigest: []byte{0x01, 0x02, 0x03}, + } + + log := <-m.chLogs + decoded := &telem.LLOObservationTelemetry{} + require.NoError(t, proto.Unmarshal(log, decoded)) + assert.Equal(t, uint32(135), decoded.StreamId) + assert.Equal(t, int32(1), decoded.StreamValueType) + assert.Equal(t, []byte{0x01, 0x02, 0x03}, decoded.StreamValueBinary) + assert.Equal(t, "stream value text", decoded.StreamValueText) + require.NotNil(t, decoded.ObservationError) + assert.Equal(t, "test error", *decoded.ObservationError) + assert.Equal(t, int64(1000000001), decoded.ObservationTimestamp) + assert.Equal(t, int64(2000000001), decoded.ObservationFinishedAt) + assert.Equal(t, uint64(42), decoded.SeqNr) + assert.Equal(t, []byte{0x01, 0x02, 0x03}, decoded.ConfigDigest) + + // telemeter adds don ID + assert.Equal(t, donID, decoded.DonId) + }) + + t.Run("ignores unknown telemetry type", func(t *testing.T) { + t.Parallel() + m := &mockMonitoringEndpoint{chLogs: make(chan []byte, 100)} + obsLggr, observedLogs := logger.TestLoggerObserved(t, zapcore.WarnLevel) + tm := newTelemeter(obsLggr, m, donID) + servicetest.Run(t, tm) + ch := tm.MakeTelemChannel(opts, 100) + + ch <- struct{}{} + + testutils.WaitForLogMessage(t, observedLogs, "Unknown telemetry type") + }) +} + +func ptr[T any](t T) *T { return &t } diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index 65c47dd1cc9..a1b66ba8931 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -539,3 +539,26 @@ func selectBlock(block string) (string, error) { } return "", pkgerrors.Errorf("unsupported block param: %s", block) } + +// WithTelemetry adds an optional telemetry channel to the context. If ch is +// non-nil, certain tasks MAY choose to send arbitrary telemetry data on this +// channel. The provided channel SHOULD be buffered, but if it blocks, the task +// SHOULD NOT block. +type contextKey string + +const ctxTelemetryKey contextKey = "telemetry" + +func WithTelemetryCh(ctx context.Context, ch chan<- interface{}) context.Context { + if ch == nil { + return ctx + } + return context.WithValue(ctx, ctxTelemetryKey, ch) +} + +func GetTelemetryCh(ctx context.Context) chan<- interface{} { + ch, ok := ctx.Value(ctxTelemetryKey).(chan<- interface{}) + if !ok { + return nil + } + return ch +} diff --git a/core/services/pipeline/common_http.go b/core/services/pipeline/common_http.go index 83fa21382ea..04e20fc8568 100644 --- a/core/services/pipeline/common_http.go +++ b/core/services/pipeline/common_http.go @@ -24,19 +24,23 @@ func makeHTTPRequest( requestData MapParam, client *http.Client, httpLimit int64, -) ([]byte, int, http.Header, time.Duration, error) { +) (responseBytes []byte, statusCode int, respHeaders http.Header, start, finish time.Time, err error) { var bodyReader io.Reader if requestData != nil { - bodyBytes, err := json.Marshal(requestData) + var bodyBytes []byte + bodyBytes, err = json.Marshal(requestData) if err != nil { - return nil, 0, nil, 0, errors.Wrap(err, "failed to encode request body as JSON") + err = errors.Wrap(err, "failed to encode request body as JSON") + return } bodyReader = bytes.NewReader(bodyBytes) } - request, err := http.NewRequestWithContext(ctx, string(method), url.String(), bodyReader) + var request *http.Request + request, err = http.NewRequestWithContext(ctx, string(method), url.String(), bodyReader) if err != nil { - return nil, 0, nil, 0, errors.Wrap(err, "failed to create http.Request") + err = errors.Wrap(err, "failed to create http.Request") + return } request.Header.Set("Content-Type", "application/json") if len(reqHeaders)%2 != 0 { @@ -53,21 +57,22 @@ func makeHTTPRequest( Logger: logger.Sugared(lggr).Named("HTTPRequest"), } - start := time.Now() - responseBytes, statusCode, respHeaders, err := httpRequest.SendRequest() + start = time.Now() + responseBytes, statusCode, respHeaders, err = httpRequest.SendRequest() + finish = time.Now() if ctx.Err() != nil { - return nil, 0, nil, 0, errors.New("http request timed out or interrupted") + err = errors.New("http request timed out or interrupted") + return } if err != nil { - return nil, 0, nil, 0, errors.Wrapf(err, "error making http request") + err = errors.Wrapf(err, "error making http request") + return } - elapsed := time.Since(start) // TODO: return elapsed from utils/http if statusCode >= 400 { - maybeErr := bestEffortExtractError(responseBytes) - return responseBytes, statusCode, respHeaders, 0, errors.Errorf("got error from %s: (status code %v) %s", url.String(), statusCode, maybeErr) + err = errors.Errorf("got error from %s: (status code %v) %s", url.String(), statusCode, bestEffortExtractError(responseBytes)) } - return responseBytes, statusCode, respHeaders, elapsed, nil + return } type PossibleErrorResponses struct { diff --git a/core/services/pipeline/task.bridge.go b/core/services/pipeline/task.bridge.go index a885bee8df0..4ab6b0ac967 100644 --- a/core/services/pipeline/task.bridge.go +++ b/core/services/pipeline/task.bridge.go @@ -72,6 +72,20 @@ type BridgeTask struct { httpClient *http.Client } +type BridgeTelemetry struct { + RequestStartTimestamp time.Time `json:"requestStartTimestamp"` + RequestFinishTimestamp time.Time `json:"requestFinishTimestamp"` + RequestData []byte `json:"requestData"` + ResponseData []byte `json:"responseData"` + Name string `json:"name"` + DotID string `json:"dotID"` + ResponseError *string `json:"responseError"` + StreamID *uint32 `json:"streamID"` + SpecID int32 `json:"specID"` + ResponseStatusCode int `json:"responseStatusCode"` + LocalCacheHit bool `json:"localCacheHit"` +} + var _ Task = (*BridgeTask)(nil) var zeroURL = new(url.URL) @@ -171,7 +185,37 @@ func (t *BridgeTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inp } var cachedResponse bool - responseBytes, statusCode, headers, elapsed, err := makeHTTPRequest(requestCtx, lggr, "POST", url, reqHeaders, requestData, t.httpClient, t.config.DefaultHTTPLimit()) + responseBytes, statusCode, headers, start, finish, err := makeHTTPRequest(requestCtx, lggr, "POST", url, reqHeaders, requestData, t.httpClient, t.config.DefaultHTTPLimit()) + elapsed := finish.Sub(start) + + defer func() { + telemetryCh := GetTelemetryCh(ctx) + if telemetryCh != nil { + bt := &BridgeTelemetry{ + Name: t.Name, + RequestData: requestDataJSON, + ResponseData: responseBytes, + ResponseStatusCode: statusCode, + RequestStartTimestamp: start, + RequestFinishTimestamp: finish, + LocalCacheHit: cachedResponse, + SpecID: t.specId, + DotID: t.DotID(), + } + if err != nil { + bt.ResponseError = new(string) + *bt.ResponseError = err.Error() + } + if t.StreamID.Valid { + bt.StreamID = &t.StreamID.Uint32 + } + select { + case telemetryCh <- bt: + default: + lggr.Warn("bridge task: telemetry channel is full, dropping telemetry") + } + } + }() // check for external adapter response object status if code, ok := eautils.BestEffortExtractEAStatus(responseBytes); ok { diff --git a/core/services/pipeline/task.bridge_test.go b/core/services/pipeline/task.bridge_test.go index cd81f8656fd..2ad0616fdd5 100644 --- a/core/services/pipeline/task.bridge_test.go +++ b/core/services/pipeline/task.bridge_test.go @@ -214,6 +214,8 @@ func TestBridgeTask_Happy(t *testing.T) { db := pgtest.NewSqlxDB(t) cfg := configtest.NewTestGeneralConfig(t) + telemCh := make(chan interface{}, 1) + ctx := pipeline.WithTelemetryCh(testutils.Context(t), telemCh) s1 := httptest.NewServer(fakePriceResponder(t, utils.MustUnmarshalToMap(btcUSDPairing), decimal.NewFromInt(9700), "", nil)) defer s1.Close() @@ -231,11 +233,11 @@ func TestBridgeTask_Happy(t *testing.T) { } c := clhttptest.NewTestLocalOnlyHTTPClient() trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.JobPipeline().MaxSuccessfulRuns()) - specID, err := trORM.CreateSpec(testutils.Context(t), pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute)) + specID, err := trORM.CreateSpec(ctx, pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute)) require.NoError(t, err) task.HelperSetDependencies(cfg.JobPipeline(), cfg.WebServer(), orm, specID, uuid.UUID{}, c) - result, runInfo := task.Run(testutils.Context(t), logger.TestLogger(t), pipeline.NewVarsFrom(nil), nil) + result, runInfo := task.Run(ctx, logger.TestLogger(t), pipeline.NewVarsFrom(nil), nil) assert.False(t, runInfo.IsPending) assert.False(t, runInfo.IsRetryable) require.NoError(t, result.Error) @@ -248,6 +250,22 @@ func TestBridgeTask_Happy(t *testing.T) { err = json.Unmarshal([]byte(result.Value.(string)), &x) require.NoError(t, err) require.Equal(t, decimal.NewFromInt(9700), x.Data.Result) + + telem := <-telemCh + require.IsType(t, &pipeline.BridgeTelemetry{}, telem) + btelem := telem.(*pipeline.BridgeTelemetry) + assert.Equal(t, string(bridge.Name), btelem.Name) + assert.Equal(t, btcUSDPairing, string(btelem.RequestData)) + assert.Equal(t, `{"errorMessage":null,"error":null,"statusCode":null,"providerStatusCode":null,"data":{"result":"9700"}} +`, string(btelem.ResponseData)) + assert.Nil(t, btelem.ResponseError) + assert.NotZero(t, btelem.RequestStartTimestamp) + assert.NotZero(t, btelem.RequestFinishTimestamp) + assert.Equal(t, 200, btelem.ResponseStatusCode) + assert.False(t, btelem.LocalCacheHit) + assert.Equal(t, specID, btelem.SpecID) + assert.NotEqual(t, uuid.Nil, btelem.StreamID) + assert.NotEqual(t, uuid.Nil, btelem.DotID) } func TestBridgeTask_HandlesIntermittentFailure(t *testing.T) { diff --git a/core/services/pipeline/task.http.go b/core/services/pipeline/task.http.go index c23b1e8fa99..7f39fd7006a 100644 --- a/core/services/pipeline/task.http.go +++ b/core/services/pipeline/task.http.go @@ -104,7 +104,8 @@ func (t *HTTPTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, input } else { client = t.httpClient } - responseBytes, statusCode, respHeaders, elapsed, err := makeHTTPRequest(requestCtx, lggr, method, url, reqHeaders, requestData, client, t.config.DefaultHTTPLimit()) + responseBytes, statusCode, respHeaders, start, finish, err := makeHTTPRequest(requestCtx, lggr, method, url, reqHeaders, requestData, client, t.config.DefaultHTTPLimit()) + elapsed := finish.Sub(start).Milliseconds() if err != nil { if errors.Is(errors.Cause(err), clhttp.ErrDisallowedIP) { err = errors.Wrap(err, `connections to local resources are disabled by default, if you are sure this is safe, you can enable on a per-task basis by setting allowUnrestrictedNetworkAccess="true" in the pipeline task spec, e.g. fetch [type="http" method=GET url="$(decode_cbor.url)" allowUnrestrictedNetworkAccess="true"]`) diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury.pb.go b/core/services/relay/evm/mercury/wsrpc/pb/mercury.pb.go index c3755b36809..260a8c25a24 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury.pb.go +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 -// protoc v4.25.1 +// protoc-gen-go v1.36.4 +// protoc v5.29.3 // source: mercury.proto package pb @@ -11,6 +11,7 @@ import ( protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -21,12 +22,11 @@ const ( ) type TransmitRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + ReportFormat uint32 `protobuf:"varint,2,opt,name=reportFormat,proto3" json:"reportFormat,omitempty"` unknownFields protoimpl.UnknownFields - - Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` - ReportFormat uint32 `protobuf:"varint,2,opt,name=reportFormat,proto3" json:"reportFormat,omitempty"` + sizeCache protoimpl.SizeCache } func (x *TransmitRequest) Reset() { @@ -74,12 +74,11 @@ func (x *TransmitRequest) GetReportFormat() uint32 { } type TransmitResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` unknownFields protoimpl.UnknownFields - - Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` - Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + sizeCache protoimpl.SizeCache } func (x *TransmitResponse) Reset() { @@ -127,11 +126,10 @@ func (x *TransmitResponse) GetError() string { } type LatestReportRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + FeedId []byte `protobuf:"bytes,1,opt,name=feedId,proto3" json:"feedId,omitempty"` unknownFields protoimpl.UnknownFields - - FeedId []byte `protobuf:"bytes,1,opt,name=feedId,proto3" json:"feedId,omitempty"` + sizeCache protoimpl.SizeCache } func (x *LatestReportRequest) Reset() { @@ -172,12 +170,11 @@ func (x *LatestReportRequest) GetFeedId() []byte { } type LatestReportResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + Report *Report `protobuf:"bytes,2,opt,name=report,proto3" json:"report,omitempty"` unknownFields protoimpl.UnknownFields - - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` - Report *Report `protobuf:"bytes,2,opt,name=report,proto3" json:"report,omitempty"` + sizeCache protoimpl.SizeCache } func (x *LatestReportResponse) Reset() { @@ -225,24 +222,23 @@ func (x *LatestReportResponse) GetReport() *Report { } type Report struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - FeedId []byte `protobuf:"bytes,1,opt,name=feedId,proto3" json:"feedId,omitempty"` - Price []byte `protobuf:"bytes,2,opt,name=price,proto3" json:"price,omitempty"` - Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` - ValidFromBlockNumber int64 `protobuf:"varint,4,opt,name=validFromBlockNumber,proto3" json:"validFromBlockNumber,omitempty"` - CurrentBlockNumber int64 `protobuf:"varint,5,opt,name=currentBlockNumber,proto3" json:"currentBlockNumber,omitempty"` - CurrentBlockHash []byte `protobuf:"bytes,6,opt,name=currentBlockHash,proto3" json:"currentBlockHash,omitempty"` - CurrentBlockTimestamp uint64 `protobuf:"varint,7,opt,name=currentBlockTimestamp,proto3" json:"currentBlockTimestamp,omitempty"` - ObservationsTimestamp int64 `protobuf:"varint,8,opt,name=observationsTimestamp,proto3" json:"observationsTimestamp,omitempty"` - ConfigDigest []byte `protobuf:"bytes,9,opt,name=configDigest,proto3" json:"configDigest,omitempty"` - Epoch uint32 `protobuf:"varint,10,opt,name=epoch,proto3" json:"epoch,omitempty"` - Round uint32 `protobuf:"varint,11,opt,name=round,proto3" json:"round,omitempty"` - OperatorName string `protobuf:"bytes,12,opt,name=operatorName,proto3" json:"operatorName,omitempty"` - TransmittingOperator []byte `protobuf:"bytes,13,opt,name=transmittingOperator,proto3" json:"transmittingOperator,omitempty"` - CreatedAt *Timestamp `protobuf:"bytes,14,opt,name=createdAt,proto3" json:"createdAt,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + FeedId []byte `protobuf:"bytes,1,opt,name=feedId,proto3" json:"feedId,omitempty"` + Price []byte `protobuf:"bytes,2,opt,name=price,proto3" json:"price,omitempty"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` + ValidFromBlockNumber int64 `protobuf:"varint,4,opt,name=validFromBlockNumber,proto3" json:"validFromBlockNumber,omitempty"` + CurrentBlockNumber int64 `protobuf:"varint,5,opt,name=currentBlockNumber,proto3" json:"currentBlockNumber,omitempty"` + CurrentBlockHash []byte `protobuf:"bytes,6,opt,name=currentBlockHash,proto3" json:"currentBlockHash,omitempty"` + CurrentBlockTimestamp uint64 `protobuf:"varint,7,opt,name=currentBlockTimestamp,proto3" json:"currentBlockTimestamp,omitempty"` + ObservationsTimestamp int64 `protobuf:"varint,8,opt,name=observationsTimestamp,proto3" json:"observationsTimestamp,omitempty"` + ConfigDigest []byte `protobuf:"bytes,9,opt,name=configDigest,proto3" json:"configDigest,omitempty"` + Epoch uint32 `protobuf:"varint,10,opt,name=epoch,proto3" json:"epoch,omitempty"` + Round uint32 `protobuf:"varint,11,opt,name=round,proto3" json:"round,omitempty"` + OperatorName string `protobuf:"bytes,12,opt,name=operatorName,proto3" json:"operatorName,omitempty"` + TransmittingOperator []byte `protobuf:"bytes,13,opt,name=transmittingOperator,proto3" json:"transmittingOperator,omitempty"` + CreatedAt *Timestamp `protobuf:"bytes,14,opt,name=createdAt,proto3" json:"createdAt,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Report) Reset() { @@ -375,10 +371,7 @@ func (x *Report) GetCreatedAt() *Timestamp { // Taken from: https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/timestamp.proto type Timestamp struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Represents seconds of UTC time since Unix epoch // 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to // 9999-12-31T23:59:59Z inclusive. @@ -387,7 +380,9 @@ type Timestamp struct { // second values with fractions must still have non-negative nanos values // that count forward in time. Must be from 0 to 999,999,999 // inclusive. - Nanos int32 `protobuf:"varint,2,opt,name=nanos,proto3" json:"nanos,omitempty"` + Nanos int32 `protobuf:"varint,2,opt,name=nanos,proto3" json:"nanos,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Timestamp) Reset() { @@ -436,7 +431,7 @@ func (x *Timestamp) GetNanos() int32 { var File_mercury_proto protoreflect.FileDescriptor -var file_mercury_proto_rawDesc = []byte{ +var file_mercury_proto_rawDesc = string([]byte{ 0x0a, 0x0d, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x4f, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, @@ -508,16 +503,16 @@ var file_mercury_proto_rawDesc = []byte{ 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2f, 0x65, 0x76, 0x6d, 0x2f, 0x6d, 0x65, 0x72, 0x63, 0x75, 0x72, 0x79, 0x2f, 0x77, 0x73, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +}) var ( file_mercury_proto_rawDescOnce sync.Once - file_mercury_proto_rawDescData = file_mercury_proto_rawDesc + file_mercury_proto_rawDescData []byte ) func file_mercury_proto_rawDescGZIP() []byte { file_mercury_proto_rawDescOnce.Do(func() { - file_mercury_proto_rawDescData = protoimpl.X.CompressGZIP(file_mercury_proto_rawDescData) + file_mercury_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mercury_proto_rawDesc), len(file_mercury_proto_rawDesc))) }) return file_mercury_proto_rawDescData } @@ -554,7 +549,7 @@ func file_mercury_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_mercury_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_mercury_proto_rawDesc), len(file_mercury_proto_rawDesc)), NumEnums: 0, NumMessages: 6, NumExtensions: 0, @@ -565,7 +560,6 @@ func file_mercury_proto_init() { MessageInfos: file_mercury_proto_msgTypes, }.Build() File_mercury_proto = out.File - file_mercury_proto_rawDesc = nil file_mercury_proto_goTypes = nil file_mercury_proto_depIdxs = nil } diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go index 4d05db4380f..1e0a862f487 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-wsrpc. DO NOT EDIT. // versions: // - protoc-gen-go-wsrpc v0.0.1 -// - protoc v4.25.1 +// - protoc v5.29.3 package pb diff --git a/deployment/go.mod b/deployment/go.mod index b174304860c..11703932290 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -48,7 +48,7 @@ require ( golang.org/x/oauth2 v0.23.0 golang.org/x/sync v0.10.0 google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 + google.golang.org/protobuf v1.36.4 gopkg.in/guregu/null.v4 v4.0.0 ) @@ -418,7 +418,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 // indirect github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect diff --git a/deployment/go.sum b/deployment/go.sum index 0f46413e89a..f03cc9a704b 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1402,8 +1402,8 @@ github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f3 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36/go.mod h1:Z2e1ynSJ4pg83b4Qldbmryc5lmnrI3ojOdg1FUloa68= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 h1:w7w42ml8MOxdoyAZ9+og0342UkiH3deRM1V0Pj5JR5g= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4/go.mod h1:wtdAmAUMooLavbrTA7PgHg40lyDlKesxI/RR+5Xcz18= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 h1:GcPYNVFYjB065CNq0h8nK/VeU08nUkHgBX0cJIEpuHY= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 h1:CvDfgWoLoYPapOumE/UZCplfCu5oNmy9BuH+6V6+fJ8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 h1:lJi0dWfgNJl4Um5KzeZZPVBi//CPDfzzeVmv4Z2OGNY= @@ -2180,8 +2180,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/go.mod b/go.mod index db5dc1fbf51..fe792fec2a5 100644 --- a/go.mod +++ b/go.mod @@ -81,7 +81,7 @@ require ( github.com/smartcontractkit/chainlink-ccip v0.0.0-20250129104727-56a4f7e9e8dc github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36 github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 github.com/smartcontractkit/chainlink-feeds v0.1.1 github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250121205514-f73e2f86c23b @@ -122,7 +122,7 @@ require ( golang.org/x/tools v0.26.0 gonum.org/v1/gonum v0.15.1 google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 + google.golang.org/protobuf v1.36.4 gopkg.in/guregu/null.v4 v4.0.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 @@ -411,4 +411,5 @@ replace ( github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 github.com/sourcegraph/sourcegraph/lib => github.com/sourcegraph/sourcegraph-public-snapshot/lib v0.0.0-20240822153003-c864f15af264 + ) diff --git a/go.sum b/go.sum index ce9e7e870e2..d00f4406a92 100644 --- a/go.sum +++ b/go.sum @@ -1160,8 +1160,8 @@ github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f3 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36/go.mod h1:Z2e1ynSJ4pg83b4Qldbmryc5lmnrI3ojOdg1FUloa68= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 h1:w7w42ml8MOxdoyAZ9+og0342UkiH3deRM1V0Pj5JR5g= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4/go.mod h1:wtdAmAUMooLavbrTA7PgHg40lyDlKesxI/RR+5Xcz18= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 h1:GcPYNVFYjB065CNq0h8nK/VeU08nUkHgBX0cJIEpuHY= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 h1:CvDfgWoLoYPapOumE/UZCplfCu5oNmy9BuH+6V6+fJ8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 h1:lJi0dWfgNJl4Um5KzeZZPVBi//CPDfzzeVmv4Z2OGNY= @@ -1914,8 +1914,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index fdb5ed65746..d376f8b41b3 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -437,7 +437,7 @@ require ( github.com/smartcontractkit/ccip-owner-contracts v0.0.0-salt-fix // indirect github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250103152858-8973fd0c912b // indirect github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 // indirect github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250121205514-f73e2f86c23b // indirect @@ -534,7 +534,7 @@ require ( google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect - google.golang.org/protobuf v1.35.1 // indirect + google.golang.org/protobuf v1.36.4 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 5c61d7d0f1f..87031208b92 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1430,8 +1430,8 @@ github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f3 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36/go.mod h1:Z2e1ynSJ4pg83b4Qldbmryc5lmnrI3ojOdg1FUloa68= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 h1:w7w42ml8MOxdoyAZ9+og0342UkiH3deRM1V0Pj5JR5g= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4/go.mod h1:wtdAmAUMooLavbrTA7PgHg40lyDlKesxI/RR+5Xcz18= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 h1:GcPYNVFYjB065CNq0h8nK/VeU08nUkHgBX0cJIEpuHY= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 h1:CvDfgWoLoYPapOumE/UZCplfCu5oNmy9BuH+6V6+fJ8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 h1:lJi0dWfgNJl4Um5KzeZZPVBi//CPDfzzeVmv4Z2OGNY= @@ -2214,8 +2214,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index fa2acb5256f..26a726de172 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -421,7 +421,7 @@ require ( github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250103152858-8973fd0c912b // indirect github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.1 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 // indirect github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250121205514-f73e2f86c23b // indirect @@ -531,7 +531,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/grpc v1.67.1 // indirect - google.golang.org/protobuf v1.35.1 // indirect + google.golang.org/protobuf v1.36.4 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/guregu/null.v4 v4.0.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 53d72b48248..f583a85c1a9 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1417,8 +1417,8 @@ github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f3 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250127125541-a8fa42cc0f36/go.mod h1:Z2e1ynSJ4pg83b4Qldbmryc5lmnrI3ojOdg1FUloa68= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4 h1:w7w42ml8MOxdoyAZ9+og0342UkiH3deRM1V0Pj5JR5g= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20250121210000-2a9675d7a1b4/go.mod h1:wtdAmAUMooLavbrTA7PgHg40lyDlKesxI/RR+5Xcz18= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3 h1:GcPYNVFYjB065CNq0h8nK/VeU08nUkHgBX0cJIEpuHY= -github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250115135646-ac859d85e7e3/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5 h1:CvDfgWoLoYPapOumE/UZCplfCu5oNmy9BuH+6V6+fJ8= +github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250128203428-08031923fbe5/go.mod h1:pDZagSGjs9U+l4YIFhveDznMHqxuuz+5vRxvVgpbdr8= github.com/smartcontractkit/chainlink-feeds v0.1.1 h1:JzvUOM/OgGQA1sOqTXXl52R6AnNt+Wg64sVG+XSA49c= github.com/smartcontractkit/chainlink-feeds v0.1.1/go.mod h1:55EZ94HlKCfAsUiKUTNI7QlE/3d3IwTlsU3YNa/nBb4= github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250121195549-294ec6a40b92 h1:lJi0dWfgNJl4Um5KzeZZPVBi//CPDfzzeVmv4Z2OGNY= @@ -2199,8 +2199,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=