Skip to content

Commit

Permalink
K2: add TL types and functions for HTTP queries (#1121)
Browse files Browse the repository at this point in the history
  • Loading branch information
apolyakov authored Oct 22, 2024
1 parent c0e6fdb commit c5b72aa
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 50 deletions.
11 changes: 8 additions & 3 deletions runtime-light/component/init-functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ void process_k2_invoke_job_worker(tl::TLBuffer &tlb) noexcept {
get_component_context()->php_script_mutable_globals_singleton.get_superglobals().v$_SERVER.set_value(string{"JOB_ID"}, invoke_jw.job_id);
}

void process_k2_invoke_http([[maybe_unused]] tl::TLBuffer &tlb) noexcept {}
void process_k2_invoke_http(tl::TLBuffer &tlb) noexcept {
tl::K2InvokeHttp invoke_http{};
if (!invoke_http.fetch(tlb)) {
php_error("erroneous http request");
}
}

} // namespace

Expand All @@ -44,10 +49,10 @@ task_t<uint64_t> init_kphp_server_component() noexcept {
const auto [buffer, size]{co_await read_all_from_stream(stream_d)};
php_assert(size >= sizeof(uint32_t)); // check that we can fetch at least magic
tl::TLBuffer tlb{};
tlb.store_bytes(buffer, static_cast<size_t>(size));
tlb.store_bytes({buffer, static_cast<size_t>(size)});
get_platform_context()->allocator.free(buffer);

switch (const auto magic{*reinterpret_cast<const uint32_t *>(tlb.data())}) { // lookup magic
switch (const auto magic{*tlb.lookup_trivial<uint32_t>()}) { // lookup magic
case tl::K2_INVOKE_HTTP_MAGIC: {
process_k2_invoke_http(tlb);
break;
Expand Down
4 changes: 2 additions & 2 deletions runtime-light/stdlib/confdata/confdata-functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ task_t<mixed> f$confdata_get_value(string key) noexcept {
const auto response{co_await f$component_client_fetch_response(std::move(query))};

tlb.clean();
tlb.store_bytes(response.c_str(), static_cast<size_t>(response.size()));
tlb.store_bytes({response.c_str(), static_cast<size_t>(response.size())});
tl::Maybe<tl::confdataValue> maybe_confdata_value{};
if (!maybe_confdata_value.fetch(tlb)) {
php_warning("couldn't fetch response");
Expand All @@ -83,7 +83,7 @@ task_t<array<mixed>> f$confdata_get_values_by_any_wildcard(string wildcard) noex
const auto response{co_await f$component_client_fetch_response(std::move(query))};

tlb.clean();
tlb.store_bytes(response.c_str(), static_cast<size_t>(response.size()));
tlb.store_bytes({response.c_str(), static_cast<size_t>(response.size())});
tl::Dictionary<tl::confdataValue> dict_confdata_value{};
if (!dict_confdata_value.fetch(tlb)) {
php_warning("couldn't fetch response");
Expand Down
10 changes: 6 additions & 4 deletions runtime-light/stdlib/crypto/crypto-functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "runtime-light/stdlib/crypto/crypto-functions.h"

#include <cstddef>

#include "common/tl/constants/common.h"
#include "runtime-light/stdlib/component/component-api.h"
#include "runtime-light/tl/tl-core.h"
Expand Down Expand Up @@ -35,7 +37,7 @@ task_t<Optional<string>> f$openssl_random_pseudo_bytes(int64_t length) noexcept
string resp = co_await f$component_client_fetch_response(co_await query);

buffer.clean();
buffer.store_bytes(resp.c_str(), resp.size());
buffer.store_bytes({resp.c_str(), static_cast<size_t>(resp.size())});

// Maybe better to do this in some structure, but there's not much work to do with TL here
std::optional<uint32_t> magic = buffer.fetch_trivial<uint32_t>();
Expand All @@ -59,7 +61,7 @@ task_t<Optional<array<mixed>>> f$openssl_x509_parse(const string &data, bool sho
string resp_from_platform = co_await f$component_client_fetch_response(co_await query);

buffer.clean();
buffer.store_bytes(resp_from_platform.c_str(), resp_from_platform.size());
buffer.store_bytes({resp_from_platform.c_str(), static_cast<size_t>(resp_from_platform.size())});

tl::GetPemCertInfoResponse response;
if (!response.fetch(buffer)) {
Expand All @@ -79,7 +81,7 @@ task_t<bool> f$openssl_sign(const string &data, string &signature, const string
string resp_from_platform = co_await f$component_client_fetch_response(co_await query);

buffer.clean();
buffer.store_bytes(resp_from_platform.c_str(), resp_from_platform.size());
buffer.store_bytes({resp_from_platform.c_str(), static_cast<size_t>(resp_from_platform.size())});

std::optional<uint32_t> magic = buffer.fetch_trivial<uint32_t>();
if (!magic.has_value() || *magic != TL_MAYBE_TRUE) {
Expand All @@ -102,7 +104,7 @@ task_t<int64_t> f$openssl_verify(const string &data, const string &signature, co
string resp_from_platform = co_await f$component_client_fetch_response(co_await query);

buffer.clean();
buffer.store_bytes(resp_from_platform.c_str(), resp_from_platform.size());
buffer.store_bytes({resp_from_platform.c_str(), static_cast<size_t>(resp_from_platform.size())});

// For now returns only 1 or 0, -1 is never returned
// Because it's currently impossible to distiguish error from negative verification
Expand Down
2 changes: 1 addition & 1 deletion runtime-light/stdlib/job-worker/job-worker-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ task_t<int64_t> kphp_job_worker_start_impl(string request, double timeout, bool
const string response{(co_await wait_with_timeout_t{task_t<string>::awaiter_t{std::addressof(fetch_task)}, timeout}).value_or(string{})};

tl::TLBuffer tlb{};
tlb.store_bytes(response.c_str(), static_cast<size_t>(response.size()));
tlb.store_bytes({response.c_str(), static_cast<size_t>(response.size())});
tl::K2JobWorkerResponse jw_response{};
if (!jw_response.fetch(tlb)) {
co_return string{};
Expand Down
8 changes: 5 additions & 3 deletions runtime-light/stdlib/rpc/rpc-api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string_view>
#include <utility>

#include "common/algorithms/find.h"
Expand Down Expand Up @@ -249,7 +250,7 @@ task_t<array<mixed>> rpc_tl_query_result_one_impl(int64_t query_id) noexcept {
co_return make_fetch_error(string{"rpc response timeout"}, TL_ERROR_QUERY_TIMEOUT);
}
rpc_ctx.rpc_buffer.clean();
rpc_ctx.rpc_buffer.store_bytes(data.c_str(), data.size());
rpc_ctx.rpc_buffer.store_bytes({data.c_str(), static_cast<size_t>(data.size())});
co_return fetch_function_untyped(rpc_query);
}

Expand Down Expand Up @@ -296,7 +297,7 @@ task_t<class_instance<C$VK$TL$RpcResponse>> typed_rpc_tl_query_result_one_impl(i
co_return error_factory.make_error(string{"rpc response timeout"}, TL_ERROR_QUERY_TIMEOUT);
}
rpc_ctx.rpc_buffer.clean();
rpc_ctx.rpc_buffer.store_bytes(data.c_str(), data.size());
rpc_ctx.rpc_buffer.store_bytes({data.c_str(), static_cast<size_t>(data.size())});
co_return fetch_function_typed(rpc_query, error_factory);
}

Expand Down Expand Up @@ -404,7 +405,8 @@ bool is_int32_overflow(int64_t v) noexcept {
}

void store_raw_vector_double(const array<double> &vector) noexcept { // TODO: didn't we forget vector's length?
RpcComponentContext::get().rpc_buffer.store_bytes(reinterpret_cast<const char *>(vector.get_const_vector_pointer()), sizeof(double) * vector.count());
const std::string_view vector_view{reinterpret_cast<const char *>(vector.get_const_vector_pointer()), sizeof(double) * vector.count()};
RpcComponentContext::get().rpc_buffer.store_bytes(vector_view);
}

void fetch_raw_vector_double(array<double> &vector, int64_t num_elems) noexcept {
Expand Down
8 changes: 4 additions & 4 deletions runtime-light/tl/tl-core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ void TLBuffer::store_string(std::string_view str) noexcept {
str_len = 0;
store_trivial<uint8_t>(str_len);
}
store_bytes(str_buf, str_len);
store_bytes({str_buf, str_len});

const auto total_len{size_len + str_len};
const auto total_len_with_padding{(total_len + 3) & ~static_cast<string::size_type>(3)};
const auto padding{total_len_with_padding - total_len};

std::array padding_array{'\0', '\0', '\0', '\0'};
store_bytes(padding_array.data(), padding);
store_bytes({padding_array.data(), padding});
}

std::string_view TLBuffer::fetch_string() noexcept {
Expand Down Expand Up @@ -87,11 +87,11 @@ std::string_view TLBuffer::fetch_string() noexcept {
}
}
const auto total_len_with_padding{(size_len + string_len + 3) & ~static_cast<uint64_t>(3)};
if (m_remaining < total_len_with_padding - size_len) {
if (remaining() < total_len_with_padding - size_len) {
return {}; // TODO: error handling
}

std::string_view response{data() + m_pos, static_cast<size_t>(string_len)};
std::string_view response{data() + pos(), static_cast<size_t>(string_len)};
adjust(total_len_with_padding - size_len);
return response;
}
Expand Down
38 changes: 27 additions & 11 deletions runtime-light/tl/tl-core.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,33 +69,49 @@ class TLBuffer final : private vk::not_copyable {
m_remaining -= len;
}

void store_bytes(const char *src, size_t len) noexcept {
m_buffer.append(src, len);
m_remaining += len;
void store_bytes(std::string_view bytes_view) noexcept {
m_buffer.append(bytes_view.data(), bytes_view.size());
m_remaining += bytes_view.size();
}

std::string_view fetch_bytes(size_t len) noexcept {
if (len > remaining()) {
return {};
}
std::string_view bytes_view{data() + pos(), len};
adjust(len);
return bytes_view;
}

void store_string(std::string_view s) noexcept;

std::string_view fetch_string() noexcept;

template<standard_layout T, standard_layout U>
requires std::convertible_to<U, T> void store_trivial(const U &t) noexcept {
// Here we rely on that endianness of architecture is Little Endian
store_bytes(reinterpret_cast<const char *>(std::addressof(t)), sizeof(T));
store_bytes({reinterpret_cast<const char *>(std::addressof(t)), sizeof(T)});
}

void store_string(std::string_view s) noexcept;

template<standard_layout T>
std::optional<T> fetch_trivial() noexcept {
if (m_remaining < sizeof(T)) {
if (remaining() < sizeof(T)) {
return std::nullopt;
}

// Here we rely on that endianness of architecture is Little Endian
const auto t{*reinterpret_cast<const T *>(m_buffer.c_str() + m_pos)};
m_pos += sizeof(T);
m_remaining -= sizeof(T);
const auto t{*reinterpret_cast<const T *>(data() + pos())};
adjust(sizeof(T));
return t;
}

std::string_view fetch_string() noexcept;
template<standard_layout T>
std::optional<T> lookup_trivial() const noexcept {
if (remaining() < sizeof(T)) {
return std::nullopt;
}
return *reinterpret_cast<const T *>(data() + pos());
}
};

template<typename T>
Expand Down
38 changes: 30 additions & 8 deletions runtime-light/tl/tl-functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@

#include <cstddef>
#include <cstdint>
#include <optional>
#include <string_view>

#include "common/tl/constants/common.h"
#include "runtime-light/tl/tl-core.h"

namespace {

constexpr auto K2_JOB_WORKER_IGNORE_ANSWER_FLAG = static_cast<uint32_t>(1U << 0U);

} // namespace

namespace tl {

// ===== JOB WORKERS =====

bool K2InvokeJobWorker::fetch(TLBuffer &tlb) noexcept {
if (tlb.fetch_trivial<uint32_t>().value_or(TL_ZERO) != K2_INVOKE_JOB_WORKER_MAGIC) {
return false;
Expand All @@ -33,7 +30,7 @@ bool K2InvokeJobWorker::fetch(TLBuffer &tlb) noexcept {
}
const auto body_view{tlb.fetch_string()};

ignore_answer = static_cast<bool>(*opt_flags & K2_JOB_WORKER_IGNORE_ANSWER_FLAG);
ignore_answer = static_cast<bool>(*opt_flags & IGNORE_ANSWER_FLAG);
image_id = *opt_image_id;
job_id = *opt_job_id;
timeout_ns = *opt_timeout_ns;
Expand All @@ -42,7 +39,7 @@ bool K2InvokeJobWorker::fetch(TLBuffer &tlb) noexcept {
}

void K2InvokeJobWorker::store(TLBuffer &tlb) const noexcept {
const uint32_t flags{ignore_answer ? K2_JOB_WORKER_IGNORE_ANSWER_FLAG : 0x0};
const uint32_t flags{ignore_answer ? IGNORE_ANSWER_FLAG : 0x0};
tlb.store_trivial<uint32_t>(K2_INVOKE_JOB_WORKER_MAGIC);
tlb.store_trivial<uint32_t>(flags);
tlb.store_trivial<uint64_t>(image_id);
Expand All @@ -51,6 +48,8 @@ void K2InvokeJobWorker::store(TLBuffer &tlb) const noexcept {
tlb.store_string({body.c_str(), body.size()});
}

// ===== CRYPTO =====

void GetCryptosecurePseudorandomBytes::store(TLBuffer &tlb) const noexcept {
tlb.store_trivial<uint32_t>(GET_CRYPTOSECURE_PSEUDORANDOM_BYTES_MAGIC);
tlb.store_trivial<int32_t>(size);
Expand All @@ -77,6 +76,8 @@ void DigestVerify::store(TLBuffer &tlb) const noexcept {
tlb.store_string(std::string_view{signature.c_str(), signature.size()});
}

// ===== CONFDATA =====

void ConfdataGet::store(TLBuffer &tlb) const noexcept {
tlb.store_trivial<uint32_t>(CONFDATA_GET_MAGIC);
tlb.store_string({key.c_str(), static_cast<size_t>(key.size())});
Expand All @@ -87,4 +88,25 @@ void ConfdataGetWildcard::store(TLBuffer &tlb) const noexcept {
tlb.store_string({wildcard.c_str(), static_cast<size_t>(wildcard.size())});
}

// ===== HTTP =====

bool K2InvokeHttp::fetch(TLBuffer &tlb) noexcept {
if (tlb.fetch_trivial<uint32_t>().value_or(TL_ZERO) != K2_INVOKE_HTTP_MAGIC) {
return false;
}
if (/* flags */ !tlb.fetch_trivial<uint32_t>().has_value() || !connection.fetch(tlb) || !version.fetch(tlb)) {
return false;
}

const auto method_view{tlb.fetch_string()};
method = {method_view.data(), static_cast<string::size_type>(method_view.size())};
if (!uri.fetch(tlb) || !headers.fetch(tlb)) {
return false;
}
const auto body_view{tlb.fetch_bytes(tlb.remaining())};
body = {body_view.data(), static_cast<string::size_type>(body_view.size())};

return true;
}

} // namespace tl
26 changes: 24 additions & 2 deletions runtime-light/tl/tl-functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ namespace tl {

// ===== JOB WORKERS =====

inline constexpr uint32_t K2_INVOKE_HTTP_MAGIC = 0xd909'efe8;
inline constexpr uint32_t K2_INVOKE_JOB_WORKER_MAGIC = 0x437d'7312;

struct K2InvokeJobWorker final {
class K2InvokeJobWorker final {
static constexpr auto IGNORE_ANSWER_FLAG = static_cast<uint32_t>(1U << 0U);

public:
uint64_t image_id{};
int64_t job_id{};
bool ignore_answer{};
Expand Down Expand Up @@ -83,4 +85,24 @@ struct ConfdataGetWildcard final {
void store(TLBuffer &tlb) const noexcept;
};

// ===== HTTP =====

inline constexpr uint32_t K2_INVOKE_HTTP_MAGIC = 0xd909'efe8;

class K2InvokeHttp final {
static constexpr auto SCHEME_FLAG = static_cast<uint32_t>(1U << 0U);
static constexpr auto HOST_FLAG = static_cast<uint32_t>(1U << 1U);
static constexpr auto QUERY_FLAG = static_cast<uint32_t>(1U << 2U);

public:
httpConnection connection{};
HttpVersion version{};
string method;
httpUri uri{};
dictionary<httpHeaderValue> headers{};
string body;

bool fetch(TLBuffer &tlb) noexcept;
};

} // namespace tl
12 changes: 2 additions & 10 deletions runtime-light/tl/tl-types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,10 @@
#include "common/tl/constants/common.h"
#include "runtime-light/tl/tl-core.h"

namespace {

enum CertInfoItem : uint32_t { LONG_MAGIC = 0x533f'f89f, STR_MAGIC = 0xc427'feef, DICT_MAGIC = 0x1ea8'a774 };

constexpr uint32_t K2_JOB_WORKER_RESPONSE_MAGIC = 0x3afb'3a08;

} // namespace

namespace tl {

bool K2JobWorkerResponse::fetch(TLBuffer &tlb) noexcept {
if (tlb.fetch_trivial<uint32_t>().value_or(TL_ZERO) != K2_JOB_WORKER_RESPONSE_MAGIC) {
if (tlb.fetch_trivial<uint32_t>().value_or(TL_ZERO) != MAGIC) {
return false;
}

Expand All @@ -40,7 +32,7 @@ bool K2JobWorkerResponse::fetch(TLBuffer &tlb) noexcept {
}

void K2JobWorkerResponse::store(TLBuffer &tlb) const noexcept {
tlb.store_trivial<uint32_t>(K2_JOB_WORKER_RESPONSE_MAGIC);
tlb.store_trivial<uint32_t>(MAGIC);
tlb.store_trivial<uint32_t>(0x0); // flags
tlb.store_trivial<int64_t>(job_id);
tlb.store_string({body.c_str(), body.size()});
Expand Down
Loading

0 comments on commit c5b72aa

Please sign in to comment.