Skip to content

Commit

Permalink
Optionally collect metrics from rpc requests and responses (#982)
Browse files Browse the repository at this point in the history
This commit adds two flags to both typed and untyped RPC TL query builtins.

'requests_extra_info' parameter collects extra info about sent RPC requests:
'request_size'.

'need_responses_extra_info' tells the runtime to collect extra info about RPC
response associated with the RPC request. This info may later be extracted
with new 'extract_kphp_rpc_response_extra_info' builtin.
  • Loading branch information
apolyakov authored Apr 26, 2024
1 parent 30dc9f9 commit e56cfd3
Show file tree
Hide file tree
Showing 26 changed files with 1,072 additions and 55 deletions.
25 changes: 23 additions & 2 deletions builtin-functions/_functions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,27 @@ final class RpcConnection {
private function __construct();
}

/**
* 'KphpRpcRequestsExtraInfo' is a builtin KPHP class. It may accumulate extra information
* about RPC requests sent in both typed and untyped versions of rpc_tl_query builtins.
*/
final class KphpRpcRequestsExtraInfo {
/**
* 'get' returns an array of extra information (request size) about sent RPC requests.
*
* @return tuple(int)[]
*/
public function get ();
}

/**
* 'extract_kphp_rpc_response_extra_info' function takes request ID and returns:
* 1. 'null' in case there is no extra information about the request;
* 2. 'tuple(response size, response time)' in case we got a response associated with
* the request ID.
*/
function extract_kphp_rpc_response_extra_info ($resumable_id ::: int) ::: ?tuple(int, float);

/** rpc store **/
function new_rpc_connection ($str ::: string, $port ::: int, $actor_id ::: mixed = 0, $timeout ::: float = 0.3, $connect_timeout ::: float = 0.3, $reconnect_timeout ::: float = 17.0) ::: \RpcConnection; // TODO: make actor_id int
function store_gzip_pack_threshold ($pack_threshold_bytes ::: int) ::: void;
Expand Down Expand Up @@ -956,7 +977,7 @@ function rpc_wait_concurrently ($request_id ::: int) ::: bool;
function rpc_mc_parse_raw_wildcard_with_flags_to_array ($raw_result ::: string, &$result ::: array) ::: bool;

function rpc_tl_query_one (\RpcConnection $rpc_conn, $arr ::: mixed, $timeout ::: float = -1.0) ::: int;
function rpc_tl_query (\RpcConnection $rpc_conn, $arr ::: array, $timeout ::: float = -1.0, $ignore_answer ::: bool = false) ::: int[];
function rpc_tl_query (\RpcConnection $rpc_conn, $arr ::: array, $timeout ::: float = -1.0, $ignore_answer ::: bool = false, \KphpRpcRequestsExtraInfo $requests_extra_info = null, $need_responses_extra_info ::: bool = false) ::: int[];
/** @kphp-extern-func-info resumable */
function rpc_tl_query_result_one ($query_id ::: int) ::: mixed[];
/** @kphp-extern-func-info resumable */
Expand Down Expand Up @@ -984,7 +1005,7 @@ interface RpcResponse {
/** @kphp-extern-func-info tl_common_h_dep */
function typed_rpc_tl_query_one (\RpcConnection $connection, @tl\RpcFunction $query_function, $timeout ::: float = -1.0) ::: int;
/** @kphp-extern-func-info tl_common_h_dep */
function typed_rpc_tl_query (\RpcConnection $connection, @tl\RpcFunction[] $query_functions, $timeout ::: float = -1.0, $ignore_answer ::: bool = false) ::: int[];
function typed_rpc_tl_query (\RpcConnection $connection, @tl\RpcFunction[] $query_functions, $timeout ::: float = -1.0, $ignore_answer ::: bool = false, \KphpRpcRequestsExtraInfo $requests_extra_info = null, $need_responses_extra_info ::: bool = false) ::: int[];
/** @kphp-extern-func-info tl_common_h_dep resumable */
function typed_rpc_tl_query_result_one (int $query_id) ::: @tl\RpcResponse;
/** @kphp-extern-func-info tl_common_h_dep resumable */
Expand Down
79 changes: 66 additions & 13 deletions runtime/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ static void process_rpc_timeout(kphp_event_timer *timer) {
return process_rpc_timeout(timer->wakeup_extra, false);
}

int64_t rpc_send(const class_instance<C$RpcConnection> &conn, double timeout, bool ignore_answer) {
int64_t rpc_send_impl(const class_instance<C$RpcConnection> &conn, double timeout, rpc_request_extra_info_t &req_extra_info, bool collect_resp_extra_info,
bool ignore_answer) {
if (unlikely (conn.is_null() || conn.get()->host_num < 0)) {
php_warning("Wrong RpcConnection specified");
return -1;
Expand Down Expand Up @@ -694,6 +695,10 @@ int64_t rpc_send(const class_instance<C$RpcConnection> &conn, double timeout, bo
memcpy(p + sizeof(RpcHeaders) + extra_headers_size, rpc_payload_start, rpc_payload_size);

slot_id_t q_id = rpc_send_query(conn.get()->host_num, p, static_cast<int>(request_size), timeout_convert_to_ms(timeout));

// request's statistics
req_extra_info = rpc_request_extra_info_t{request_size};

if (q_id <= 0) {
return -1;
}
Expand Down Expand Up @@ -722,7 +727,10 @@ int64_t rpc_send(const class_instance<C$RpcConnection> &conn, double timeout, bo
sizeof(rpc_request) * (rpc_requests_size - (rpc_first_unfinished_request_id - rpc_first_array_request_id)));
rpc_first_array_request_id = rpc_first_unfinished_request_id;
} else {
rpc_requests = static_cast <rpc_request *> (dl::reallocate(rpc_requests, sizeof(rpc_request) * 2 * rpc_requests_size, sizeof(rpc_request) * rpc_requests_size));
rpc_requests = static_cast<rpc_request *>(dl::reallocate(
rpc_requests,
sizeof(rpc_request) * 2 * rpc_requests_size,
sizeof(rpc_request) * rpc_requests_size));
rpc_requests_size *= 2;
}
}
Expand All @@ -739,6 +747,11 @@ int64_t rpc_send(const class_instance<C$RpcConnection> &conn, double timeout, bo
kphp_tracing::on_rpc_query_send(q_id, cur->actor_or_port, cur->function_magic, static_cast<int>(request_size), send_timestamp, ignore_answer);
}

// response's metrics
if (collect_resp_extra_info) {
rpc_responses_extra_info_map.emplace_value(cur->resumable_id, rpc_response_extra_info_status_t::NOT_READY, rpc_response_extra_info_t{0, send_timestamp});
}

if (ignore_answer) {
int64_t resumable_id = cur->resumable_id;
process_rpc_timeout(q_id, true);
Expand Down Expand Up @@ -766,7 +779,8 @@ void f$rpc_flush() {
}

int64_t f$rpc_send(const class_instance<C$RpcConnection> &conn, double timeout) {
int64_t request_id = rpc_send(conn, timeout);
rpc_request_extra_info_t _{};
int64_t request_id = rpc_send_impl(conn, timeout, _, false);
if (request_id <= 0) {
return 0;
}
Expand All @@ -776,14 +790,17 @@ int64_t f$rpc_send(const class_instance<C$RpcConnection> &conn, double timeout)
}

int64_t f$rpc_send_noflush(const class_instance<C$RpcConnection> &conn, double timeout) {
int64_t request_id = rpc_send(conn, timeout);
rpc_request_extra_info_t _{};
int64_t request_id = rpc_send_impl(conn, timeout, _, false);
if (request_id <= 0) {
return 0;
}

return request_id;
}



void process_rpc_answer(int32_t request_id, char *result, int32_t result_len) {
rpc_request *request = get_rpc_request(request_id);

Expand All @@ -801,6 +818,15 @@ void process_rpc_answer(int32_t request_id, char *result, int32_t result_len) {
int64_t resumable_id = request->resumable_id;
request->resumable_id = -1;

{ // response's metrics
const auto resp_timestamp = std::chrono::duration<double>{std::chrono::system_clock::now().time_since_epoch()}.count();
if (rpc_responses_extra_info_map.isset(resumable_id)) {
auto &resp_extra_info = rpc_responses_extra_info_map[resumable_id];
resp_extra_info.second = {result_len, resp_timestamp - std::get<1>(resp_extra_info.second)};
resp_extra_info.first = rpc_response_extra_info_status_t::READY;
}
}

if (request->timer) {
remove_event_timer(request->timer);
}
Expand Down Expand Up @@ -1123,46 +1149,55 @@ array<mixed> fetch_function(const class_instance<RpcTlQuery> &rpc_query) {
return new_tl_object;
}

int64_t rpc_tl_query_impl(const class_instance<C$RpcConnection> &c, const mixed &tl_object, double timeout, bool ignore_answer, bool bytes_estimating, size_t &bytes_sent, bool flush) {
int64_t rpc_tl_query_impl(const class_instance<C$RpcConnection> &c, const mixed &tl_object, double timeout, rpc_request_extra_info_t &req_extra_info,
bool collect_resp_extra_info, bool ignore_answer, bool bytes_estimating, size_t &bytes_sent, bool flush) {
f$rpc_clean();

class_instance<RpcTlQuery> rpc_query = store_function(tl_object);
if (!CurException.is_null()) {
rpc_query.destroy();
CurException = Optional<bool>{};
}

if (rpc_query.is_null()) {
return 0;
}

if (bytes_estimating) {
estimate_and_flush_overflow(bytes_sent);
}
int64_t query_id = rpc_send(c, timeout, ignore_answer);

int64_t query_id = rpc_send_impl(c, timeout, req_extra_info, collect_resp_extra_info, ignore_answer);
if (query_id <= 0) {
return 0;
}

if (unlikely(kphp_tracing::cur_trace_level >= 2)) {
kphp_tracing::on_rpc_query_provide_details_after_send({}, tl_object);
}

if (flush) {
f$rpc_flush();
}

if (ignore_answer) {
return -1;
}

if (dl::query_num != rpc_tl_results_last_query_num) {
rpc_tl_results_last_query_num = dl::query_num;
}

rpc_query.get()->query_id = query_id;
RpcPendingQueries::get().save(rpc_query);

return query_id;
}

int64_t f$rpc_tl_query_one(const class_instance<C$RpcConnection> &c, const mixed &tl_object, double timeout) {
size_t bytes_sent = 0;
return rpc_tl_query_impl(c, tl_object, timeout, false, false, bytes_sent, true);
rpc_request_extra_info_t _{};
return rpc_tl_query_impl(c, tl_object, timeout, _, false, false, false, bytes_sent, true);
}

int64_t f$rpc_tl_pending_queries_count() {
Expand Down Expand Up @@ -1208,18 +1243,35 @@ bool f$rpc_mc_parse_raw_wildcard_with_flags_to_array(const string &raw_result, a
return true;
}

array<int64_t> f$rpc_tl_query(const class_instance<C$RpcConnection> &c, const array<mixed> &tl_objects, double timeout, bool ignore_answer) {
array<int64_t> result(tl_objects.size());
array<int64_t> f$rpc_tl_query(const class_instance<C$RpcConnection> &c, const array<mixed> &tl_objects, double timeout, bool ignore_answer,
class_instance<C$KphpRpcRequestsExtraInfo> requests_extra_info, bool need_responses_extra_info) {
if (ignore_answer && need_responses_extra_info) {
php_warning("Both $ignore_answer and $need_responses_extra_info are 'true'. Can't collect metrics for ignored answers");
}

size_t bytes_sent = 0;
bool collect_resp_extra_info = !ignore_answer && need_responses_extra_info;
array<int64_t> queries{tl_objects.size()};
array<rpc_request_extra_info_t> req_extra_info_arr{tl_objects.size()};

for (auto it = tl_objects.begin(); it != tl_objects.end(); ++it) {
int64_t query_id = rpc_tl_query_impl(c, it.get_value(), timeout, ignore_answer, true, bytes_sent, false);
result.set_value(it.get_key(), query_id);
rpc_request_extra_info_t req_ei{};

int64_t query_id = rpc_tl_query_impl(c, it.get_value(), timeout, req_ei, collect_resp_extra_info, ignore_answer, true, bytes_sent, false);

queries.set_value(it.get_key(), query_id);
req_extra_info_arr.set_value(it.get_key(), std::move(req_ei));
}

if (bytes_sent > 0) {
f$rpc_flush();
}

return result;
if (!requests_extra_info.is_null()) {
requests_extra_info->extra_info_arr_ = std::move(req_extra_info_arr);
}

return queries;
}


Expand Down Expand Up @@ -1402,6 +1454,7 @@ static void reset_rpc_global_vars() {
hard_reset_var(rpc_data_copy_backup);
hard_reset_var(rpc_request_need_timer);
fail_rpc_on_int32_overflow = false;
hard_reset_var(rpc_responses_extra_info_map);
}

void init_rpc_lib() {
Expand Down
11 changes: 8 additions & 3 deletions runtime/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "runtime/net_events.h"
#include "runtime/resumable.h"
#include "runtime/to-array-processor.h"
#include "runtime/rpc_extra_info.h"

DECLARE_VERBOSITY(rpc);

Expand Down Expand Up @@ -81,6 +82,7 @@ int32_t rpc_fetch_int();
int64_t f$fetch_int();

int64_t f$fetch_lookup_int();

string f$fetch_lookup_data(int64_t x4_bytes_length);

int64_t f$fetch_long();
Expand Down Expand Up @@ -197,7 +199,9 @@ bool f$rpc_clean(bool is_error = false);
bool rpc_store(bool is_error = false);

int64_t f$rpc_send(const class_instance<C$RpcConnection> &conn, double timeout = -1.0);
int64_t rpc_send(const class_instance<C$RpcConnection> &conn, double timeout, bool ignore_answer = false);

int64_t rpc_send_impl(const class_instance<C$RpcConnection> &conn, double timeout, rpc_request_extra_info_t &req_extra_info, bool collect_resp_extra_info,
bool ignore_answer = false);

int64_t f$rpc_send_noflush(const class_instance<C$RpcConnection> &conn, double timeout = -1.0);

Expand All @@ -208,8 +212,8 @@ Optional<string> f$rpc_get(int64_t request_id, double timeout = -1.0);
Optional<string> f$rpc_get_synchronously(int64_t request_id);

bool rpc_get_and_parse(int64_t request_id, double timeout);
bool f$rpc_get_and_parse(int64_t request_id, double timeout = -1.0);

bool f$rpc_get_and_parse(int64_t request_id, double timeout = -1.0);

int64_t f$rpc_queue_create();

Expand Down Expand Up @@ -248,7 +252,8 @@ int64_t f$rpc_tl_query_one(const class_instance<C$RpcConnection> &c, const mixed
int64_t f$rpc_tl_pending_queries_count();
bool f$rpc_mc_parse_raw_wildcard_with_flags_to_array(const string &raw_result, array<mixed> &result);

array<int64_t> f$rpc_tl_query(const class_instance<C$RpcConnection> &c, const array<mixed> &tl_objects, double timeout = -1.0, bool ignore_answer = false);
array<int64_t> f$rpc_tl_query(const class_instance<C$RpcConnection> &c, const array<mixed> &tl_objects, double timeout = -1.0, bool ignore_answer = false,
class_instance<C$KphpRpcRequestsExtraInfo> requests_extra_info = {}, bool need_responses_extra_info = false);

array<mixed> f$rpc_tl_query_result_one(int64_t query_id);

Expand Down
24 changes: 24 additions & 0 deletions runtime/rpc_extra_info.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#include "runtime/rpc_extra_info.h"


array<std::pair<rpc_response_extra_info_status_t, rpc_response_extra_info_t>> rpc_responses_extra_info_map;

array<rpc_request_extra_info_t> f$KphpRpcRequestsExtraInfo$$get(class_instance<C$KphpRpcRequestsExtraInfo> v$this) {
return v$this->extra_info_arr_;
}

Optional<rpc_response_extra_info_t> f$extract_kphp_rpc_response_extra_info(std::int64_t resumable_id) {
const auto *resp_extra_info_ptr = rpc_responses_extra_info_map.find_value(resumable_id);

if (resp_extra_info_ptr == nullptr || resp_extra_info_ptr->first == rpc_response_extra_info_status_t::NOT_READY) {
return {};
}

const auto res = resp_extra_info_ptr->second;
rpc_responses_extra_info_map.unset(resumable_id);
return res;
}
41 changes: 41 additions & 0 deletions runtime/rpc_extra_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Compiler for PHP (aka KPHP)
// Copyright (c) 2024 LLC «V Kontakte»
// Distributed under the GPL v3 License, see LICENSE.notice.txt

#pragma once

#include <tuple>
#include <cstdint>

#include "runtime/kphp_core.h"
#include "runtime/dummy-visitor-methods.h"
#include "runtime/refcountable_php_classes.h"
#include "common/algorithms/hashes.h"
#include "common/wrappers/string_view.h"


using rpc_request_extra_info_t = std::tuple<std::int64_t>; // tuple(request_size)
using rpc_response_extra_info_t = std::tuple<std::int64_t, double>; // tuple(response_size, response_time)
enum class rpc_response_extra_info_status_t : std::uint8_t { NOT_READY, READY };

extern array<std::pair<rpc_response_extra_info_status_t, rpc_response_extra_info_t>> rpc_responses_extra_info_map;

struct C$KphpRpcRequestsExtraInfo final : public refcountable_php_classes<C$KphpRpcRequestsExtraInfo>, private DummyVisitorMethods {
using DummyVisitorMethods::accept;

array<rpc_request_extra_info_t> extra_info_arr_;

C$KphpRpcRequestsExtraInfo() = default;

const char *get_class() const noexcept {
return R"(KphpRpcRequestsExtraInfo)";
}

int get_hash() const noexcept {
return static_cast<std::int32_t>(vk::std_hash(vk::string_view(C$KphpRpcRequestsExtraInfo::get_class())));
}
};

array<rpc_request_extra_info_t> f$KphpRpcRequestsExtraInfo$$get(class_instance<C$KphpRpcRequestsExtraInfo> v$this);

Optional<rpc_response_extra_info_t> f$extract_kphp_rpc_response_extra_info(std::int64_t resumable_id);
1 change: 1 addition & 0 deletions runtime/runtime.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ prepend(KPHP_RUNTIME_SOURCES ${BASE_DIR}/runtime/
regexp.cpp
resumable.cpp
rpc.cpp
rpc_extra_info.cpp
serialize-functions.cpp
storage.cpp
streams.cpp
Expand Down
14 changes: 6 additions & 8 deletions runtime/typed_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,9 @@ class typed_rpc_tl_query_result_resumable : public Resumable {
};
} // namespace

int64_t typed_rpc_tl_query_impl(const class_instance<C$RpcConnection> &connection,
const RpcRequest &req,
double timeout,
bool ignore_answer,
bool bytes_estimating,
size_t &bytes_sent,
bool flush) {
int64_t
typed_rpc_tl_query_impl(const class_instance<C$RpcConnection> &connection, const RpcRequest &req, double timeout, rpc_request_extra_info_t &req_extra_info,
bool collect_resp_extra_info, bool ignore_answer, bool bytes_estimating, size_t &bytes_sent, bool flush) {
f$rpc_clean();
if (req.empty()) {
php_warning("Writing rpc query error: query function is null");
Expand All @@ -158,7 +154,9 @@ int64_t typed_rpc_tl_query_impl(const class_instance<C$RpcConnection> &connectio
if (bytes_estimating) {
estimate_and_flush_overflow(bytes_sent);
}
const int64_t query_id = rpc_send(connection, timeout, ignore_answer);

const int64_t query_id = rpc_send_impl(connection, timeout, req_extra_info, collect_resp_extra_info, ignore_answer);

if (query_id <= 0) {
return 0;
}
Expand Down
Loading

0 comments on commit e56cfd3

Please sign in to comment.