Skip to content

Commit

Permalink
use async_connect() instead of run() in streaming data source
Browse files Browse the repository at this point in the history
  • Loading branch information
cwaldren-ld committed Sep 15, 2023
1 parent fe9e559 commit 61a3bef
Showing 1 changed file with 122 additions and 123 deletions.
245 changes: 122 additions & 123 deletions libs/server-sdk/src/data_sources/streaming_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,145 +12,144 @@

namespace launchdarkly::server_side::data_sources {

static char const* const kCouldNotParseEndpoint =
"Could not parse streaming endpoint URL";

static char const* DataSourceErrorToString(launchdarkly::sse::Error error) {
switch (error) {
case sse::Error::NoContent:
return "server responded 204 (No Content), will not attempt to "
"reconnect";
case sse::Error::InvalidRedirectLocation:
return "server responded with an invalid redirection";
case sse::Error::UnrecoverableClientError:
return "unrecoverable client-side error";
default:
return "unrecognized error";
}
}

StreamingDataSource::StreamingDataSource(
config::shared::built::ServiceEndpoints const& endpoints,
config::shared::built::DataSourceConfig<config::shared::ServerSDK> const&
data_source_config,
config::shared::built::HttpProperties http_properties,
boost::asio::any_io_executor ioc,
IDataSourceUpdateSink& handler,
DataSourceStatusManager& status_manager,
Logger const& logger)
: exec_(std::move(ioc)),
logger_(logger),
status_manager_(status_manager),
data_source_handler_(
DataSourceEventHandler(handler, logger, status_manager_)),
http_config_(std::move(http_properties)),
streaming_config_(
std::get<config::shared::built::StreamingConfig<
config::shared::ServerSDK>>(data_source_config.method)),
streaming_endpoint_(endpoints.StreamingBaseUrl()) {}

void StreamingDataSource::Start() {
status_manager_.SetState(DataSourceStatus::DataSourceState::kInitializing);

auto updated_url = network::AppendUrl(streaming_endpoint_,
streaming_config_.streaming_path);

// Bad URL, don't set the client. Start will then report the bad status.
if (!updated_url) {
LD_LOG(logger_, LogLevel::kError) << kCouldNotParseEndpoint;
status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
kCouldNotParseEndpoint);
return;
static char const *const kCouldNotParseEndpoint =
"Could not parse streaming endpoint URL";

static char const *DataSourceErrorToString(launchdarkly::sse::Error error) {
switch (error) {
case sse::Error::NoContent:
return "server responded 204 (No Content), will not attempt to "
"reconnect";
case sse::Error::InvalidRedirectLocation:
return "server responded with an invalid redirection";
case sse::Error::UnrecoverableClientError:
return "unrecoverable client-side error";
default:
return "unrecognized error";
}
}

auto uri_components = boost::urls::parse_uri(*updated_url);

// Unlikely that it could be parsed earlier, and it cannot be parsed now.
if (!uri_components) {
LD_LOG(logger_, LogLevel::kError) << kCouldNotParseEndpoint;
status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
kCouldNotParseEndpoint);
return;
}
StreamingDataSource::StreamingDataSource(
config::shared::built::ServiceEndpoints const &endpoints,
config::shared::built::DataSourceConfig<config::shared::ServerSDK> const &
data_source_config,
config::shared::built::HttpProperties http_properties,
boost::asio::any_io_executor ioc,
IDataSourceUpdateSink &handler,
DataSourceStatusManager &status_manager,
Logger const &logger)
: exec_(std::move(ioc)),
logger_(logger),
status_manager_(status_manager),
data_source_handler_(
DataSourceEventHandler(handler, logger, status_manager_)),
http_config_(std::move(http_properties)),
streaming_config_(
std::get<config::shared::built::StreamingConfig<
config::shared::ServerSDK>>(data_source_config.method)),
streaming_endpoint_(endpoints.StreamingBaseUrl()) {}

void StreamingDataSource::Start() {
status_manager_.SetState(DataSourceStatus::DataSourceState::kInitializing);

auto updated_url = network::AppendUrl(streaming_endpoint_,
streaming_config_.streaming_path);

// Bad URL, don't set the client. Start will then report the bad status.
if (!updated_url) {
LD_LOG(logger_, LogLevel::kError) << kCouldNotParseEndpoint;
status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
kCouldNotParseEndpoint);
return;
}

boost::urls::url url = uri_components.value();
auto uri_components = boost::urls::parse_uri(*updated_url);

auto client_builder = launchdarkly::sse::Builder(exec_, url.buffer());
// Unlikely that it could be parsed earlier, and it cannot be parsed now.
if (!uri_components) {
LD_LOG(logger_, LogLevel::kError) << kCouldNotParseEndpoint;
status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
kCouldNotParseEndpoint);
return;
}

client_builder.method(boost::beast::http::verb::get);
boost::urls::url url = uri_components.value();

// TODO: can the read timeout be shared with *all* http requests? Or should
// it have a default in defaults.hpp? This must be greater than the
// heartbeat interval of the streaming service.
client_builder.read_timeout(std::chrono::minutes(5));
auto client_builder = launchdarkly::sse::Builder(exec_, url.buffer());

client_builder.write_timeout(http_config_.WriteTimeout());
client_builder.method(boost::beast::http::verb::get);

client_builder.connect_timeout(http_config_.ConnectTimeout());
// TODO: can the read timeout be shared with *all* http requests? Or should
// it have a default in defaults.hpp? This must be greater than the
// heartbeat interval of the streaming service.
client_builder.read_timeout(std::chrono::minutes(5));

client_builder.initial_reconnect_delay(
streaming_config_.initial_reconnect_delay);
client_builder.write_timeout(http_config_.WriteTimeout());

for (auto const& header : http_config_.BaseHeaders()) {
client_builder.header(header.first, header.second);
}
client_builder.connect_timeout(http_config_.ConnectTimeout());

// TODO: Handle proxy support. sc-204386
client_builder.initial_reconnect_delay(
streaming_config_.initial_reconnect_delay);

auto weak_self = weak_from_this();
for (auto const &header: http_config_.BaseHeaders()) {
client_builder.header(header.first, header.second);
}

client_builder.receiver([weak_self](launchdarkly::sse::Event const& event) {
if (auto self = weak_self.lock()) {
self->data_source_handler_.HandleMessage(event.type(),
event.data());
// TODO: Use the result of handle message to restart the
// event source if we got bad data. sc-204387
// TODO: Handle proxy support. sc-204386

auto weak_self = weak_from_this();

client_builder.receiver([weak_self](launchdarkly::sse::Event const &event) {
if (auto self = weak_self.lock()) {
self->data_source_handler_.HandleMessage(event.type(),
event.data());
// TODO: Use the result of handle message to restart the
// event source if we got bad data. sc-204387
}
});

client_builder.logger([weak_self](auto msg) {
if (auto self = weak_self.lock()) {
LD_LOG(self->logger_, LogLevel::kDebug) << msg;
}
});

client_builder.errors([weak_self](auto error) {
if (auto self = weak_self.lock()) {
auto error_string = DataSourceErrorToString(error);
LD_LOG(self->logger_, LogLevel::kError) << error_string;
self->status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse,
error_string);
}
});

client_ = client_builder.build();

if (!client_) {
LD_LOG(logger_, LogLevel::kError) << kCouldNotParseEndpoint;
status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
kCouldNotParseEndpoint);
return;
}
});
client_->async_connect();
}

client_builder.logger([weak_self](auto msg) {
if (auto self = weak_self.lock()) {
LD_LOG(self->logger_, LogLevel::kDebug) << msg;
void StreamingDataSource::ShutdownAsync(std::function<void()> completion) {
if (client_) {
status_manager_.SetState(
DataSourceStatus::DataSourceState::kInitializing);
return client_->async_shutdown(std::move(completion));
}
});

client_builder.errors([weak_self](auto error) {
if (auto self = weak_self.lock()) {
auto error_string = DataSourceErrorToString(error);
LD_LOG(self->logger_, LogLevel::kError) << error_string;
self->status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse,
error_string);
if (completion) {
boost::asio::post(exec_, completion);
}
});

client_ = client_builder.build();

if (!client_) {
LD_LOG(logger_, LogLevel::kError) << kCouldNotParseEndpoint;
status_manager_.SetState(
DataSourceStatus::DataSourceState::kOff,
DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError,
kCouldNotParseEndpoint);
return;
}
client_->run();
}

void StreamingDataSource::ShutdownAsync(std::function<void()> completion) {
if (client_) {
status_manager_.SetState(
DataSourceStatus::DataSourceState::kInitializing);
return client_->async_shutdown(std::move(completion));
}
if (completion) {
boost::asio::post(exec_, completion);
}
}

} // namespace launchdarkly::server_side::data_sources

0 comments on commit 61a3bef

Please sign in to comment.