Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Transcribe streaming "A complete signal was sent without ending frame" #2827

Merged
merged 1 commit into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ namespace Model
AudioStream& WriteAudioEvent(const AudioEvent& value)
{
Aws::Utils::Event::Message msg;
msg.InsertEventHeader(":message-type", Aws::String("event"));
msg.InsertEventHeader(":event-type", Aws::String("AudioEvent"));
msg.InsertEventHeader(":content-type", Aws::String("application/octet-stream"));
msg.WriteEventPayload(value.GetAudioChunk());
if(!value.GetAudioChunk().empty())
{
msg.InsertEventHeader(":message-type", Aws::String("event"));
msg.InsertEventHeader(":event-type", Aws::String("AudioEvent"));
msg.InsertEventHeader(":content-type", Aws::String("application/octet-stream"));
msg.WriteEventPayload(value.GetAudioChunk());
}
WriteEvent(msg);
return *this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ namespace Aws
bool InitEncodedStruct(const Aws::Utils::Event::Message& msg, aws_event_stream_message* encoded);

/**
* Initialize signed C struct based on unsigned C struct.
* Initialize signed C struct with a content of a payload message.
* Returns true if successful.
* signedmsg will contain an AWS Stream Event with a payload in a H2 frame payload section.
* A successfully initialized struct must be cleaned up when you're done with it.
*/
bool InitSignedStruct(const aws_event_stream_message* msg, aws_event_stream_message* signedmsg);
bool InitSignedStruct(const aws_event_stream_message* payload, aws_event_stream_message* signedmsg);

Aws::Client::AWSAuthSigner* m_signer;
Aws::String m_signatureSeed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace Aws
{
public:

explicit ConcurrentStreamBuf(size_t bufferLength = 4 * 1024);
explicit ConcurrentStreamBuf(size_t bufferLength = 8 * 1024);

void SetEof();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,31 +188,35 @@ bool AWSAuthEventStreamV4Signer::SignEventMessage(Event::Message& message, Aws::
const auto nonSignatureHeadersHash = hashOutcome.GetResult();
stringToSign << HashingUtils::HexEncode(nonSignatureHeadersHash) << Aws::Auth::AWSAuthHelper::NEWLINE;

if (message.GetEventPayload().empty())
{
AWS_LOGSTREAM_WARN(v4StreamingLogTag, "Attempting to sign an empty message (no payload and no headers). "
"It is unlikely that this is the intended behavior.");
}
else
if (!message.GetEventPayload().empty())
{
// use a preallocatedStreamBuf to avoid making a copy.
// The Hashing API requires either Aws::String or IStream as input.
// TODO: the hashing API should be accept 'unsigned char*' as input.
Utils::Stream::PreallocatedStreamBuf streamBuf(message.GetEventPayload().data(), message.GetEventPayload().size());
Aws::IOStream payload(&streamBuf);
hashOutcome = m_hash.Calculate(payload);
}
else
{
// only a signature and a date will be in a frame
AWS_LOGSTREAM_INFO(v4StreamingLogTag, "Signing an event with an empty payload");

if (!hashOutcome.IsSuccess())
{
AWS_LOGSTREAM_ERROR(v4StreamingLogTag, "Failed to hash (sha256) non-signature headers.");
return false;
}
const auto payloadHash = hashOutcome.GetResult();
stringToSign << HashingUtils::HexEncode(payloadHash);
AWS_LOGSTREAM_DEBUG(v4StreamingLogTag, "Payload hash - " << HashingUtils::HexEncode(payloadHash));
hashOutcome = m_hash.Calculate(""); // SHA256 of an empty buffer
}

if (!hashOutcome.IsSuccess())
{
AWS_LOGSTREAM_ERROR(v4StreamingLogTag, "Failed to hash (sha256) non-signature headers.");
return false;
}
const auto payloadHash = hashOutcome.GetResult();
stringToSign << HashingUtils::HexEncode(payloadHash);
AWS_LOGSTREAM_DEBUG(v4StreamingLogTag, "Payload hash - " << HashingUtils::HexEncode(payloadHash));

Aws::Utils::ByteBuffer finalSignatureDigest = GenerateSignature(m_credentialsProvider->GetAWSCredentials(), stringToSign.str(), simpleDate, m_region, m_serviceName);
Aws::String canonicalRequestString = stringToSign.str();
AWS_LOGSTREAM_TRACE(v4StreamingLogTag, "EventStream Event Canonical Request String: " << canonicalRequestString);
Aws::Utils::ByteBuffer finalSignatureDigest = GenerateSignature(m_credentialsProvider->GetAWSCredentials(), canonicalRequestString, simpleDate, m_region, m_serviceName);
const auto finalSignature = HashingUtils::HexEncode(finalSignatureDigest);
AWS_LOGSTREAM_DEBUG(v4StreamingLogTag, "Final computed signing hash: " << finalSignature);
priorSignature = finalSignature;
Expand Down
2 changes: 1 addition & 1 deletion src/aws-cpp-sdk-core/source/utils/event/EventStreamBuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Aws
{
namespace Event
{
const size_t DEFAULT_BUF_SIZE = 1024;
const size_t DEFAULT_BUF_SIZE = 8096;

EventStreamBuf::EventStreamBuf(EventStreamDecoder& decoder, size_t bufferLength) :
m_byteBuffer(bufferLength),
Expand Down
32 changes: 23 additions & 9 deletions src/aws-cpp-sdk-core/source/utils/event/EventStreamEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,18 @@ namespace Aws
Aws::Vector<unsigned char> outputBits;

aws_event_stream_message encoded;
if (InitEncodedStruct(msg, &encoded))
aws_event_stream_message* encodedPayload = nullptr;
bool msgEncodeSuccess = true; // empty message "successes" to encode
if (!msg.GetEventHeaders().empty() || !msg.GetEventPayload().empty())
{
InitEncodedStruct(msg, &encoded);
encodedPayload = &encoded;
}

if (msgEncodeSuccess)
{
aws_event_stream_message signedMessage;
if (InitSignedStruct(&encoded, &signedMessage))
if (InitSignedStruct(encodedPayload, &signedMessage))
{
// success!
const auto signedMessageBuffer = aws_event_stream_message_buffer(&signedMessage);
Expand All @@ -96,7 +104,10 @@ namespace Aws

aws_event_stream_message_clean_up(&signedMessage);
}
aws_event_stream_message_clean_up(&encoded);
if (encodedPayload)
{
aws_event_stream_message_clean_up(encodedPayload);
}
}

return outputBits;
Expand Down Expand Up @@ -124,24 +135,27 @@ namespace Aws
return success;
}

bool EventStreamEncoder::InitSignedStruct(const aws_event_stream_message* msg, aws_event_stream_message* signedmsg)
bool EventStreamEncoder::InitSignedStruct(const aws_event_stream_message* payload, aws_event_stream_message* signedmsg)
{
bool success = false;

const auto msgbuf = aws_event_stream_message_buffer(msg);
const auto msglen = aws_event_stream_message_total_length(msg);
Event::Message signedMessage;
signedMessage.WriteEventPayload(msgbuf, msglen);
if (payload)
{
const auto msgbuf = aws_event_stream_message_buffer(payload);
const auto msglen = aws_event_stream_message_total_length(payload);
signedMessage.WriteEventPayload(msgbuf, msglen);
}

assert(m_signer);
if (m_signer->SignEventMessage(signedMessage, m_signatureSeed))
{
aws_array_list headers;
EncodeHeaders(signedMessage, &headers);

aws_byte_buf payload = aws_byte_buf_from_array(signedMessage.GetEventPayload().data(), signedMessage.GetEventPayload().size());
aws_byte_buf signedPayload = aws_byte_buf_from_array(signedMessage.GetEventPayload().data(), signedMessage.GetEventPayload().size());

if(aws_event_stream_message_init(signedmsg, get_aws_allocator(), &headers, &payload) == AWS_OP_SUCCESS)
if(aws_event_stream_message_init(signedmsg, get_aws_allocator(), &headers, &signedPayload) == AWS_OP_SUCCESS)
{
success = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,24 @@ namespace Model
${typeInfo.className}& Write${entry.value.shape.name}(const ${entry.value.shape.name}& value)
{
Aws::Utils::Event::Message msg;
#if(!$entry.value.shape.eventPayloadType.equals("blob"))
msg.InsertEventHeader(":message-type", Aws::String("event"));
msg.InsertEventHeader(":event-type", Aws::String("${entry.value.shape.name}"));
#if($entry.value.shape.eventPayloadType.equals("blob"))
msg.InsertEventHeader(":content-type", Aws::String("application/octet-stream"));
msg.WriteEventPayload(value.Get$CppViewHelper.capitalizeFirstChar(${entry.value.shape.eventPayloadMemberName})());
#elseif($entry.value.shape.eventPayloadType.equals("string"))
#if($entry.value.shape.eventPayloadType.equals("string"))
msg.InsertEventHeader(":content-type", Aws::String("text/plain"));
msg.WriteEventPayload(value.Get$CppViewHelper.capitalizeFirstChar(${entry.value.shape.eventPayloadMemberName})()));
#elseif($entry.value.shape.eventPayloadType.equals("structure") || $entry.value.shape.eventPayloadType.equals("list"))
msg.InsertEventHeader(":content-type", Aws::String("application/json"));
msg.WriteEventPayload(value.Jsonize().View().WriteCompact());
#end
#else##if($entry.value.shape.eventPayloadType.equals("blob"))
if(!value.Get$CppViewHelper.capitalizeFirstChar(${entry.value.shape.eventPayloadMemberName})().empty())
{
msg.InsertEventHeader(":message-type", Aws::String("event"));
msg.InsertEventHeader(":event-type", Aws::String("${entry.value.shape.name}"));
msg.InsertEventHeader(":content-type", Aws::String("application/octet-stream"));
msg.WriteEventPayload(value.Get$CppViewHelper.capitalizeFirstChar(${entry.value.shape.eventPayloadMemberName})());
}
#end
WriteEvent(msg);
return *this;
Expand Down
Loading