From 81f674a33fb9af3c1ed8b423925d9f25cc6d22ff Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Fri, 26 Jan 2024 00:34:28 +0000 Subject: [PATCH] Fix Transcribe streaming "A complete signal was sent without the preceding empty frame" --- .../transcribestreaming/model/AudioStream.h | 11 +++--- .../aws/core/utils/event/EventStreamEncoder.h | 5 +-- .../core/utils/stream/ConcurrentStreamBuf.h | 2 +- .../signer/AWSAuthEventStreamV4Signer.cpp | 34 +++++++++++-------- .../source/utils/event/EventStreamBuf.cpp | 2 +- .../source/utils/event/EventStreamEncoder.cpp | 32 ++++++++++++----- .../velocity/cpp/json/EventStreamHeader.vm | 15 +++++--- 7 files changed, 65 insertions(+), 36 deletions(-) diff --git a/generated/src/aws-cpp-sdk-transcribestreaming/include/aws/transcribestreaming/model/AudioStream.h b/generated/src/aws-cpp-sdk-transcribestreaming/include/aws/transcribestreaming/model/AudioStream.h index b17b8966813..e2c5782695b 100644 --- a/generated/src/aws-cpp-sdk-transcribestreaming/include/aws/transcribestreaming/model/AudioStream.h +++ b/generated/src/aws-cpp-sdk-transcribestreaming/include/aws/transcribestreaming/model/AudioStream.h @@ -31,10 +31,13 @@ namespace Model AudioStream& WriteAudioEvent(const AudioEvent& value) { Aws::Utils::Event::Message msg; - msg.InsertEventHeader(":message-type", Aws::String("event")); - msg.InsertEventHeader(":event-type", Aws::String("AudioEvent")); - msg.InsertEventHeader(":content-type", Aws::String("application/octet-stream")); - msg.WriteEventPayload(value.GetAudioChunk()); + if(!value.GetAudioChunk().empty()) + { + msg.InsertEventHeader(":message-type", Aws::String("event")); + msg.InsertEventHeader(":event-type", Aws::String("AudioEvent")); + msg.InsertEventHeader(":content-type", Aws::String("application/octet-stream")); + msg.WriteEventPayload(value.GetAudioChunk()); + } WriteEvent(msg); return *this; } diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/event/EventStreamEncoder.h b/src/aws-cpp-sdk-core/include/aws/core/utils/event/EventStreamEncoder.h index 8b9c123241c..656388b8e87 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/event/EventStreamEncoder.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/event/EventStreamEncoder.h @@ -49,11 +49,12 @@ namespace Aws bool InitEncodedStruct(const Aws::Utils::Event::Message& msg, aws_event_stream_message* encoded); /** - * Initialize signed C struct based on unsigned C struct. + * Initialize signed C struct with a content of a payload message. * Returns true if successful. + * signedmsg will contain an AWS Stream Event with a payload in a H2 frame payload section. * A successfully initialized struct must be cleaned up when you're done with it. */ - bool InitSignedStruct(const aws_event_stream_message* msg, aws_event_stream_message* signedmsg); + bool InitSignedStruct(const aws_event_stream_message* payload, aws_event_stream_message* signedmsg); Aws::Client::AWSAuthSigner* m_signer; Aws::String m_signatureSeed; diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h b/src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h index 9ef7d9763e1..a7b57a7554a 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/stream/ConcurrentStreamBuf.h @@ -30,7 +30,7 @@ namespace Aws { public: - explicit ConcurrentStreamBuf(size_t bufferLength = 4 * 1024); + explicit ConcurrentStreamBuf(size_t bufferLength = 8 * 1024); void SetEof(); diff --git a/src/aws-cpp-sdk-core/source/auth/signer/AWSAuthEventStreamV4Signer.cpp b/src/aws-cpp-sdk-core/source/auth/signer/AWSAuthEventStreamV4Signer.cpp index 195e83a751a..45a8bd4338d 100644 --- a/src/aws-cpp-sdk-core/source/auth/signer/AWSAuthEventStreamV4Signer.cpp +++ b/src/aws-cpp-sdk-core/source/auth/signer/AWSAuthEventStreamV4Signer.cpp @@ -188,12 +188,7 @@ bool AWSAuthEventStreamV4Signer::SignEventMessage(Event::Message& message, Aws:: const auto nonSignatureHeadersHash = hashOutcome.GetResult(); stringToSign << HashingUtils::HexEncode(nonSignatureHeadersHash) << Aws::Auth::AWSAuthHelper::NEWLINE; - if (message.GetEventPayload().empty()) - { - AWS_LOGSTREAM_WARN(v4StreamingLogTag, "Attempting to sign an empty message (no payload and no headers). " - "It is unlikely that this is the intended behavior."); - } - else + if (!message.GetEventPayload().empty()) { // use a preallocatedStreamBuf to avoid making a copy. // The Hashing API requires either Aws::String or IStream as input. @@ -201,18 +196,27 @@ bool AWSAuthEventStreamV4Signer::SignEventMessage(Event::Message& message, Aws:: Utils::Stream::PreallocatedStreamBuf streamBuf(message.GetEventPayload().data(), message.GetEventPayload().size()); Aws::IOStream payload(&streamBuf); hashOutcome = m_hash.Calculate(payload); + } + else + { + // only a signature and a date will be in a frame + AWS_LOGSTREAM_INFO(v4StreamingLogTag, "Signing an event with an empty payload"); - if (!hashOutcome.IsSuccess()) - { - AWS_LOGSTREAM_ERROR(v4StreamingLogTag, "Failed to hash (sha256) non-signature headers."); - return false; - } - const auto payloadHash = hashOutcome.GetResult(); - stringToSign << HashingUtils::HexEncode(payloadHash); - AWS_LOGSTREAM_DEBUG(v4StreamingLogTag, "Payload hash - " << HashingUtils::HexEncode(payloadHash)); + hashOutcome = m_hash.Calculate(""); // SHA256 of an empty buffer + } + + if (!hashOutcome.IsSuccess()) + { + AWS_LOGSTREAM_ERROR(v4StreamingLogTag, "Failed to hash (sha256) non-signature headers."); + return false; } + const auto payloadHash = hashOutcome.GetResult(); + stringToSign << HashingUtils::HexEncode(payloadHash); + AWS_LOGSTREAM_DEBUG(v4StreamingLogTag, "Payload hash - " << HashingUtils::HexEncode(payloadHash)); - Aws::Utils::ByteBuffer finalSignatureDigest = GenerateSignature(m_credentialsProvider->GetAWSCredentials(), stringToSign.str(), simpleDate, m_region, m_serviceName); + Aws::String canonicalRequestString = stringToSign.str(); + AWS_LOGSTREAM_TRACE(v4StreamingLogTag, "EventStream Event Canonical Request String: " << canonicalRequestString); + Aws::Utils::ByteBuffer finalSignatureDigest = GenerateSignature(m_credentialsProvider->GetAWSCredentials(), canonicalRequestString, simpleDate, m_region, m_serviceName); const auto finalSignature = HashingUtils::HexEncode(finalSignatureDigest); AWS_LOGSTREAM_DEBUG(v4StreamingLogTag, "Final computed signing hash: " << finalSignature); priorSignature = finalSignature; diff --git a/src/aws-cpp-sdk-core/source/utils/event/EventStreamBuf.cpp b/src/aws-cpp-sdk-core/source/utils/event/EventStreamBuf.cpp index a6e6259985c..611153add44 100644 --- a/src/aws-cpp-sdk-core/source/utils/event/EventStreamBuf.cpp +++ b/src/aws-cpp-sdk-core/source/utils/event/EventStreamBuf.cpp @@ -11,7 +11,7 @@ namespace Aws { namespace Event { - const size_t DEFAULT_BUF_SIZE = 1024; + const size_t DEFAULT_BUF_SIZE = 8096; EventStreamBuf::EventStreamBuf(EventStreamDecoder& decoder, size_t bufferLength) : m_byteBuffer(bufferLength), diff --git a/src/aws-cpp-sdk-core/source/utils/event/EventStreamEncoder.cpp b/src/aws-cpp-sdk-core/source/utils/event/EventStreamEncoder.cpp index 750bf9e1e6d..97a48ac40b9 100644 --- a/src/aws-cpp-sdk-core/source/utils/event/EventStreamEncoder.cpp +++ b/src/aws-cpp-sdk-core/source/utils/event/EventStreamEncoder.cpp @@ -83,10 +83,18 @@ namespace Aws Aws::Vector outputBits; aws_event_stream_message encoded; - if (InitEncodedStruct(msg, &encoded)) + aws_event_stream_message* encodedPayload = nullptr; + bool msgEncodeSuccess = true; // empty message "successes" to encode + if (!msg.GetEventHeaders().empty() || !msg.GetEventPayload().empty()) + { + InitEncodedStruct(msg, &encoded); + encodedPayload = &encoded; + } + + if (msgEncodeSuccess) { aws_event_stream_message signedMessage; - if (InitSignedStruct(&encoded, &signedMessage)) + if (InitSignedStruct(encodedPayload, &signedMessage)) { // success! const auto signedMessageBuffer = aws_event_stream_message_buffer(&signedMessage); @@ -96,7 +104,10 @@ namespace Aws aws_event_stream_message_clean_up(&signedMessage); } - aws_event_stream_message_clean_up(&encoded); + if (encodedPayload) + { + aws_event_stream_message_clean_up(encodedPayload); + } } return outputBits; @@ -124,14 +135,17 @@ namespace Aws return success; } - bool EventStreamEncoder::InitSignedStruct(const aws_event_stream_message* msg, aws_event_stream_message* signedmsg) + bool EventStreamEncoder::InitSignedStruct(const aws_event_stream_message* payload, aws_event_stream_message* signedmsg) { bool success = false; - const auto msgbuf = aws_event_stream_message_buffer(msg); - const auto msglen = aws_event_stream_message_total_length(msg); Event::Message signedMessage; - signedMessage.WriteEventPayload(msgbuf, msglen); + if (payload) + { + const auto msgbuf = aws_event_stream_message_buffer(payload); + const auto msglen = aws_event_stream_message_total_length(payload); + signedMessage.WriteEventPayload(msgbuf, msglen); + } assert(m_signer); if (m_signer->SignEventMessage(signedMessage, m_signatureSeed)) @@ -139,9 +153,9 @@ namespace Aws aws_array_list headers; EncodeHeaders(signedMessage, &headers); - aws_byte_buf payload = aws_byte_buf_from_array(signedMessage.GetEventPayload().data(), signedMessage.GetEventPayload().size()); + aws_byte_buf signedPayload = aws_byte_buf_from_array(signedMessage.GetEventPayload().data(), signedMessage.GetEventPayload().size()); - if(aws_event_stream_message_init(signedmsg, get_aws_allocator(), &headers, &payload) == AWS_OP_SUCCESS) + if(aws_event_stream_message_init(signedmsg, get_aws_allocator(), &headers, &signedPayload) == AWS_OP_SUCCESS) { success = true; } diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/EventStreamHeader.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/EventStreamHeader.vm index fd83f42e3cb..66c1d2cac57 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/EventStreamHeader.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/json/EventStreamHeader.vm @@ -48,17 +48,24 @@ namespace Model ${typeInfo.className}& Write${entry.value.shape.name}(const ${entry.value.shape.name}& value) { Aws::Utils::Event::Message msg; +#if(!$entry.value.shape.eventPayloadType.equals("blob")) msg.InsertEventHeader(":message-type", Aws::String("event")); msg.InsertEventHeader(":event-type", Aws::String("${entry.value.shape.name}")); -#if($entry.value.shape.eventPayloadType.equals("blob")) - msg.InsertEventHeader(":content-type", Aws::String("application/octet-stream")); - msg.WriteEventPayload(value.Get$CppViewHelper.capitalizeFirstChar(${entry.value.shape.eventPayloadMemberName})()); -#elseif($entry.value.shape.eventPayloadType.equals("string")) +#if($entry.value.shape.eventPayloadType.equals("string")) msg.InsertEventHeader(":content-type", Aws::String("text/plain")); msg.WriteEventPayload(value.Get$CppViewHelper.capitalizeFirstChar(${entry.value.shape.eventPayloadMemberName})())); #elseif($entry.value.shape.eventPayloadType.equals("structure") || $entry.value.shape.eventPayloadType.equals("list")) msg.InsertEventHeader(":content-type", Aws::String("application/json")); msg.WriteEventPayload(value.Jsonize().View().WriteCompact()); +#end +#else##if($entry.value.shape.eventPayloadType.equals("blob")) + if(!value.Get$CppViewHelper.capitalizeFirstChar(${entry.value.shape.eventPayloadMemberName})().empty()) + { + msg.InsertEventHeader(":message-type", Aws::String("event")); + msg.InsertEventHeader(":event-type", Aws::String("${entry.value.shape.name}")); + msg.InsertEventHeader(":content-type", Aws::String("application/octet-stream")); + msg.WriteEventPayload(value.Get$CppViewHelper.capitalizeFirstChar(${entry.value.shape.eventPayloadMemberName})()); + } #end WriteEvent(msg); return *this;