Skip to content

Commit

Permalink
Allow ConcurrentStreamBuf to return nothing without closing the strea…
Browse files Browse the repository at this point in the history
…m (or setting
  • Loading branch information
SergeyRyabinin committed Jan 29, 2024
1 parent 5687dd6 commit 669654e
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 25 deletions.
5 changes: 5 additions & 0 deletions src/aws-cpp-sdk-core/include/aws/core/utils/DateTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ namespace Aws
*/
Aws::String ToGmtString(const char* formatStr) const;

/**
* Convert dateTime to GMT time string using predefined format.
*/
Aws::String ToGmtStringWithMs() const;

/**
* Get the representation of this datetime as seconds.milliseconds since epoch
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace Aws
std::streampos seekpos(std::streampos pos, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override;

int underflow() override;
int uflow() override;
int overflow(int ch) override;
int sync() override;
std::streamsize showmanyc() override;
Expand Down
23 changes: 11 additions & 12 deletions src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <aws/core/monitoring/HttpClientMetrics.h>
#include <cassert>
#include <algorithm>
#include <thread>


using namespace Aws::Client;
Expand Down Expand Up @@ -299,8 +300,10 @@ static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, boo
size_t amountRead = 0;
if (isStreaming)
{
ioStream->peek();
amountRead = ioStream->readsome(ptr, amountToRead);
if (!ioStream->eof() && ioStream->peek() != EOF)
{
amountRead = ioStream->readsome(ptr, amountToRead);
}
if (amountRead == 0 && !ioStream->eof())
{
return CURL_READFUNC_PAUSE;
Expand Down Expand Up @@ -428,18 +431,14 @@ static int CurlProgressCallback(void *userdata, double, double, double, double)
return 0;
}
// forcing "underflow" on the IOStream with ConcurrentStreamBuf to move data from back buffer to put area
ioStream->peek();
// char output[1];
// if (ioStream->readsome(output, 1) > 0)
// {
// ioStream->unget();
// if (!ioStream->good())
// {
// AWS_LOGSTREAM_WARN(CURL_HTTP_CLIENT_TAG, "Input stream failed to perform unget().");
// }
// }
int peekVal = ioStream->peek();
AWS_UNREFERENCED_PARAM(peekVal);

// forcing curl to try to ReadBody again (~to poll body IOStream for HTTP2)
// This is a spin pause-unpause in case of no data provided by a customer callback
// But otherwise curl will slow down the transfer and start calling as at frequency of 1s
// see https://curl.se/mail/lib-2020-07/0046.html
// we should use multi handle or another HTTP client in the future to avoid this
curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT);

return 0;
Expand Down
17 changes: 17 additions & 0 deletions src/aws-cpp-sdk-core/source/utils/DateTimeCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <cassert>
#include <iostream>
#include <cstring>
#include <iomanip>

static const char* CLASS_TAG = "DateTime";
static const char* RFC822_DATE_FORMAT_STR_MINUS_Z = "%a, %d %b %Y %H:%M:%S";
Expand Down Expand Up @@ -1264,6 +1265,22 @@ Aws::String DateTime::ToGmtString(const char* formatStr) const
return formattedString;
}

Aws::String DateTime::ToGmtStringWithMs() const
{
struct tm gmtTimeStamp = ConvertTimestampToGmtStruct();

char formattedString[100];
std::strftime(formattedString, sizeof(formattedString), "%Y-%m-%dT%H:%M:%S", &gmtTimeStamp);
Aws::String formattedStringStr = formattedString;

Aws::StringStream msSs;
msSs << "." << std::setfill('0') << std::setw(3) <<
std::chrono::duration_cast<std::chrono::milliseconds>(m_time.time_since_epoch()).count() % 1000;

formattedStringStr += msSs.str();
return formattedStringStr;
}

double DateTime::SecondsWithMSPrecision() const
{
std::chrono::duration<double, std::chrono::seconds::period> timestamp(m_time.time_since_epoch());
Expand Down
27 changes: 21 additions & 6 deletions src/aws-cpp-sdk-core/source/utils/stream/ConcurrentStreamBuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ namespace Aws
{
bool closeStream = false;
{
std::unique_lock<std::mutex> lock(m_lock);

if (!m_eofInput)
std::unique_lock<std::mutex> lock(m_lock, std::defer_lock);
if (!lock.try_lock())
{
m_signal.wait_for(lock, std::chrono::milliseconds(20), [this] { return m_backbuf.empty() == false || m_eofInput; });
// don't block consumer, it will retry asking later
return 'z'; // just returning some valid value other than EOF
}

if (m_eofInput && m_backbuf.empty())
Expand All @@ -176,17 +176,32 @@ namespace Aws
std::copy(m_backbuf.begin(), m_backbuf.end(), std::back_inserter(m_getArea));
m_backbuf.clear();
}
m_signal.notify_one();
}
if (closeStream)
{
CloseStream();
return std::char_traits<char>::eof();
}

m_signal.notify_one();
char* gbegin = reinterpret_cast<char*>(&m_getArea[0]);
setg(gbegin, gbegin, gbegin + m_getArea.size());
return std::char_traits<char>::to_int_type(*gptr());

if (!m_getArea.empty())
return std::char_traits<char>::to_int_type(*gptr());
else
return 'a'; // just returning some valid value other than EOF
}

int ConcurrentStreamBuf::uflow()
{
/* Make clang happy with our "great" streambuf */
if (underflow() == std::char_traits<char>::eof() || m_getArea.empty())
return std::char_traits<char>::eof();

auto ret = traits_type::to_int_type(*this->gptr());
this->gbump(1);
return ret;
}

std::streamsize ConcurrentStreamBuf::showmanyc()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ Aws::String TranscribeStreamingTests::RunTestLikeSample(size_t timeoutMs, const
Aws::Vector<char> buf(bufferSize);

int64_t lastAudioEventSentAt = Aws::Utils::DateTime::Now().Millis();
while (file) {
while (file && file.peek() != -1) {
if(Aws::Utils::DateTime::Now().Millis() > testMustEndBeforeMs)
{
FAIL() << "Test is taking too long, aborting.";
Expand All @@ -350,25 +350,32 @@ Aws::String TranscribeStreamingTests::RunTestLikeSample(size_t timeoutMs, const
FAIL() << "Provided file is empty: " << fileName;
}

Aws::Vector<unsigned char> bits{buf.begin(), buf.end()};
Aws::Vector<unsigned char> bits{buf.begin(), buf.begin() + file.gcount()};
AudioEvent event(std::move(bits));
if (!stream) {
FAIL() << "Failed to create a stream" << std::endl;
// break; // Unreachable due to FAIL macro above
}

// The std::basic_istream::gcount() is used to count the characters in the given string. It returns
// the number of characters extracted by the last read() operation.
// the number of characters extracted by the last read() operation.`
if (file.gcount() > 0) {
int64_t now = Aws::Utils::DateTime::Now().Millis();
int64_t sleepForMs = std::max((int64_t) 0l, (int64_t) ((lastAudioEventSentAt - now) + chunkLengthToUseMs));
std::this_thread::sleep_for(std::chrono::milliseconds(sleepForMs));
lastAudioEventSentAt = Aws::Utils::DateTime::Now().Millis();

int64_t sleepForMs = std::max((int64_t) 0l, (int64_t) (chunkLengthToUseMs - (now - lastAudioEventSentAt)));
int64_t sleepUntil = now + sleepForMs - 3;
while(Aws::Utils::DateTime::Now().Millis() <= sleepUntil)
{
// just trying to perform a high-precision sleep to simulate real-time streaming
// one of our CI hosts actually oversleep in a normal people's sleep
std::this_thread::sleep_for(std::chrono::nanoseconds(10));
}

if (!stream.WriteAudioEvent(event)) {
FAIL() << "Failed to write an audio event";
// break; // Unreachable due to FAIL macro above
}
lastAudioEventSentAt = Aws::Utils::DateTime::Now().Millis();
if(Aws::Utils::DateTime::Now().Millis() > testMustEndBeforeMs)
{
FAIL() << "Test is taking too long, aborting.";
Expand Down Expand Up @@ -404,7 +411,6 @@ Aws::String TranscribeStreamingTests::RunTestLikeSample(size_t timeoutMs, const
}
stream.flush();
stream.WaitForDrain();
std::this_thread::sleep_for(std::chrono::milliseconds(1500)); /* We are investigating why we need this */
stream.Close();
};

Expand All @@ -423,13 +429,21 @@ Aws::String TranscribeStreamingTests::RunTestLikeSample(size_t timeoutMs, const
signaling.Release();
};

// abort operation taking too long by using this callback
std::atomic<bool> shouldContinue;
shouldContinue = true;
request.SetContinueRequestHandler([&shouldContinue](const Aws::Http::HttpRequest*) { return shouldContinue.load(); });

m_client->StartStreamTranscriptionAsync(request, OnStreamReady, OnResponseCallback,
nullptr /*context*/);

EXPECT_TRUE(
signaling.WaitOneFor(timeoutMs)
) << "Did not get a response after " << Aws::Utils::StringUtils::to_string(timeoutMs) << " ms";

request.GetAudioStream()->Close();
shouldContinue = false;

if (failedToKeepUpCount)
std::cout << "Failed to keep up count: " << failedToKeepUpCount << " \n";

Expand Down

0 comments on commit 669654e

Please sign in to comment.