diff --git a/common/rpc-headers.cpp b/common/rpc-headers.cpp index 9e28d740b3..9548dd3297 100644 --- a/common/rpc-headers.cpp +++ b/common/rpc-headers.cpp @@ -2,28 +2,91 @@ // Copyright (c) 2020 LLC «V Kontakte» // Distributed under the GPL v3 License, see LICENSE.notice.txt -#include "common/algorithms/find.h" #include "common/rpc-headers.h" +#include "common/algorithms/find.h" #include "common/tl/constants/common.h" -size_t fill_extra_headers_if_needed(RpcExtraHeaders &extra_headers, uint32_t function_magic, int actor_id, bool ignore_answer) { - size_t extra_headers_size = 0; - bool need_actor = actor_id != 0 && vk::none_of_equal(function_magic, TL_RPC_DEST_ACTOR, TL_RPC_DEST_ACTOR_FLAGS); - bool need_flags = ignore_answer && vk::none_of_equal(function_magic, TL_RPC_DEST_FLAGS, TL_RPC_DEST_ACTOR_FLAGS); - - if (need_actor && need_flags) { - extra_headers.rpc_dest_actor_flags.op = TL_RPC_DEST_ACTOR_FLAGS; - extra_headers.rpc_dest_actor_flags.actor_id = actor_id; - extra_headers.rpc_dest_actor_flags.flags = vk::tl::common::rpc_invoke_req_extra_flags::no_result; - extra_headers_size = sizeof(extra_headers.rpc_dest_actor_flags); - } else if (need_actor) { - extra_headers.rpc_dest_actor.op = TL_RPC_DEST_ACTOR; - extra_headers.rpc_dest_actor.actor_id = actor_id; - extra_headers_size = sizeof(extra_headers.rpc_dest_actor); - } else if (need_flags) { - extra_headers.rpc_dest_flags.op = TL_RPC_DEST_FLAGS; - extra_headers.rpc_dest_flags.flags = vk::tl::common::rpc_invoke_req_extra_flags::no_result; - extra_headers_size = sizeof(extra_headers.rpc_dest_flags); + +RegularizeWrappersReturnT regularize_wrappers(const char *rpc_payload, std::int32_t actor_id, bool ignore_result) { + static_assert(sizeof(RpcDestActorFlagsHeaders) >= sizeof(RpcDestActorHeaders)); + static_assert(sizeof(RpcDestActorFlagsHeaders) >= sizeof(RpcDestFlagsHeaders)); + + const auto cur_wrapper{*reinterpret_cast(rpc_payload)}; + const auto function_magic{*reinterpret_cast(rpc_payload)}; + + if (actor_id == 0 && !ignore_result && vk::none_of_equal(function_magic, TL_RPC_DEST_ACTOR, TL_RPC_DEST_FLAGS, TL_RPC_DEST_ACTOR_FLAGS)) { + return {std::nullopt, 0, std::nullopt, nullptr}; + } + + RpcExtraHeaders extra_headers{}; + const std::size_t new_wrapper_size{sizeof(RpcDestActorFlagsHeaders)}; + std::size_t cur_wrapper_size{0}; + std::int32_t cur_wrapper_actor_id{0}; + bool cur_wrapper_ignore_result{false}; + + switch (function_magic) { + case TL_RPC_DEST_ACTOR_FLAGS: + cur_wrapper_size = sizeof(RpcDestActorFlagsHeaders); + cur_wrapper_actor_id = cur_wrapper.rpc_dest_actor_flags.actor_id; + cur_wrapper_ignore_result = static_cast(cur_wrapper.rpc_dest_actor_flags.flags & vk::tl::common::rpc_invoke_req_extra_flags::no_result); + + extra_headers.rpc_dest_actor_flags.op = TL_RPC_DEST_ACTOR_FLAGS; + extra_headers.rpc_dest_actor_flags.actor_id = actor_id != 0 ? actor_id : cur_wrapper.rpc_dest_actor_flags.actor_id; + if (ignore_result) { + extra_headers.rpc_dest_actor_flags.flags = cur_wrapper.rpc_dest_actor_flags.flags | vk::tl::common::rpc_invoke_req_extra_flags::no_result; + } else { + extra_headers.rpc_dest_actor_flags.flags = cur_wrapper.rpc_dest_actor_flags.flags & ~vk::tl::common::rpc_invoke_req_extra_flags::no_result; + } + + break; + case TL_RPC_DEST_ACTOR: + cur_wrapper_size = sizeof(RpcDestActorHeaders); + cur_wrapper_actor_id = cur_wrapper.rpc_dest_actor.actor_id; + + extra_headers.rpc_dest_actor_flags.op = TL_RPC_DEST_ACTOR_FLAGS; + extra_headers.rpc_dest_actor_flags.actor_id = actor_id != 0 ? actor_id : cur_wrapper.rpc_dest_actor.actor_id; + extra_headers.rpc_dest_actor_flags.flags = ignore_result ? vk::tl::common::rpc_invoke_req_extra_flags::no_result : 0x0; + + break; + case TL_RPC_DEST_FLAGS: + cur_wrapper_size = sizeof(RpcDestFlagsHeaders); + cur_wrapper_ignore_result = static_cast(cur_wrapper.rpc_dest_flags.flags & vk::tl::common::rpc_invoke_req_extra_flags::no_result); + + extra_headers.rpc_dest_actor_flags.op = TL_RPC_DEST_ACTOR_FLAGS; + extra_headers.rpc_dest_actor_flags.actor_id = actor_id; + if (ignore_result) { + extra_headers.rpc_dest_actor_flags.flags = cur_wrapper.rpc_dest_flags.flags | vk::tl::common::rpc_invoke_req_extra_flags::no_result; + } else { + extra_headers.rpc_dest_actor_flags.flags = cur_wrapper.rpc_dest_flags.flags & ~vk::tl::common::rpc_invoke_req_extra_flags::no_result; + } + + break; + default: + // we don't have a cur_wrapper, but we do have 'actor_id' or 'ignore_result' set + extra_headers.rpc_dest_actor_flags.op = TL_RPC_DEST_ACTOR_FLAGS; + extra_headers.rpc_dest_actor_flags.actor_id = actor_id; + extra_headers.rpc_dest_actor_flags.flags = ignore_result ? vk::tl::common::rpc_invoke_req_extra_flags::no_result : 0x0; + + break; } - return extra_headers_size; + + decltype(RegularizeWrappersReturnT{}.opt_actor_id_warning_info) opt_actor_id_warning{}; + if (actor_id != 0 && cur_wrapper_actor_id != 0) { + opt_actor_id_warning.emplace("inaccurate use of 'actor_id': '%d' was passed into RPC connection constructor, " + "but '%d' was already set in RpcDestActor or RpcDestActorFlags\n", + actor_id, cur_wrapper_actor_id); + } + + const char *opt_ignore_result_warning_msg{nullptr}; + if (!ignore_result && cur_wrapper_ignore_result) { + opt_ignore_result_warning_msg = "inaccurate use of 'ignore_answer': 'false' was passed into TL query function (e.g., rpc_tl_query), " + "but 'true' was already set in RpcDestFlags or RpcDestActorFlags\n"; + } + + return { + std::pair{extra_headers, new_wrapper_size}, + cur_wrapper_size, + std::move(opt_actor_id_warning), + opt_ignore_result_warning_msg, + }; } diff --git a/common/rpc-headers.h b/common/rpc-headers.h index 355ae2ed99..5dceecc233 100644 --- a/common/rpc-headers.h +++ b/common/rpc-headers.h @@ -4,8 +4,10 @@ #pragma once -#include #include +#include +#include +#include #pragma pack(push, 1) @@ -44,4 +46,17 @@ struct RpcHeaders { #pragma pack(pop) -size_t fill_extra_headers_if_needed(RpcExtraHeaders &extra_headers, uint32_t function_magic, int actor_id, bool ignore_answer); +struct RegularizeWrappersReturnT { + /// Optionally contains a new wrapper and its size + std::optional> opt_new_wrapper; + /// The size of a wrapper found in rpc payload (0 if there is no one) + std::size_t cur_wrapper_size; + /// Optionally contains a tuple of . + /// If not std::nullopt, can be used to warn about actor_id redefinition, for example, + /// 'php_warning(format_str, current_wrapper_actor_id, new_actor_id)' + std::optional> opt_actor_id_warning_info; + /// Optionally contains a string. If not nullptr, can be used to warn about inaccurate usage of 'ignore_result'. + const char *opt_ignore_result_warning_msg; +}; + +RegularizeWrappersReturnT regularize_wrappers(const char *rpc_payload, std::int32_t actor_id, bool ignore_result); diff --git a/runtime/rpc.cpp b/runtime/rpc.cpp index 27e7188865..d476cc54e0 100644 --- a/runtime/rpc.cpp +++ b/runtime/rpc.cpp @@ -5,6 +5,7 @@ #include "runtime/rpc.h" #include +#include #include #include "common/rpc-error-codes.h" @@ -679,22 +680,40 @@ int64_t rpc_send_impl(const class_instance &conn, double timeou store_int(-1); // reserve for crc32 php_assert (data_buf.size() % sizeof(int) == 0); - const char *rpc_payload_start = data_buf.c_str() + sizeof(RpcHeaders); - size_t rpc_payload_size = data_buf.size() - sizeof(RpcHeaders); - uint32_t function_magic = CurrentProcessingQuery::get().get_last_stored_tl_function_magic(); - RpcExtraHeaders extra_headers{}; - size_t extra_headers_size = fill_extra_headers_if_needed(extra_headers, function_magic, conn.get()->actor_id, ignore_answer); + const auto [opt_new_wrapper, cur_wrapper_size, opt_actor_id_warning_info, opt_ignore_result_warning_msg]{ + regularize_wrappers(data_buf.c_str() + sizeof(RpcHeaders), conn.get()->actor_id, ignore_answer)}; - const auto request_size = static_cast(data_buf.size() + extra_headers_size); - char *p = static_cast(dl::allocate(request_size)); + if (opt_actor_id_warning_info.has_value()) { + const auto [msg, cur_wrapper_actor_id, new_wrapper_actor_id]{opt_actor_id_warning_info.value()}; + php_warning(msg, cur_wrapper_actor_id, new_wrapper_actor_id); + } + if (opt_ignore_result_warning_msg != nullptr) { + php_warning("%s", opt_ignore_result_warning_msg); + } + + char *request_buf{nullptr}; + std::size_t request_size{0}; - // Memory will look like this: + // 'request_buf' will look like this: // [ RpcHeaders (reserved in f$rpc_clean) ] [ RpcExtraHeaders (optional) ] [ payload ] - memcpy(p, data_buf.c_str(), sizeof(RpcHeaders)); - memcpy(p + sizeof(RpcHeaders), &extra_headers, extra_headers_size); - memcpy(p + sizeof(RpcHeaders) + extra_headers_size, rpc_payload_start, rpc_payload_size); + if (opt_new_wrapper.has_value()) { + const auto [new_wrapper, new_wrapper_size]{opt_new_wrapper.value()}; + request_size = data_buf.size() - cur_wrapper_size + new_wrapper_size; + request_buf = static_cast(dl::allocate(request_size)); + + std::memcpy(request_buf, data_buf.c_str(), sizeof(RpcHeaders)); + std::memcpy(request_buf + sizeof(RpcHeaders), &new_wrapper, new_wrapper_size); + std::memcpy(request_buf + sizeof(RpcHeaders) + new_wrapper_size, + data_buf.c_str() + sizeof(RpcHeaders) + cur_wrapper_size, + data_buf.size() - sizeof(RpcHeaders) - cur_wrapper_size); + } else { + request_size = data_buf.size(); + request_buf = static_cast(dl::allocate(request_size)); + + std::memcpy(request_buf, data_buf.c_str(), request_size); + } - slot_id_t q_id = rpc_send_query(conn.get()->host_num, p, static_cast(request_size), timeout_convert_to_ms(timeout)); + slot_id_t q_id = rpc_send_query(conn.get()->host_num, request_buf, static_cast(request_size), timeout_convert_to_ms(timeout)); // request's statistics req_extra_info = rpc_request_extra_info_t{request_size}; @@ -739,7 +758,7 @@ int64_t rpc_send_impl(const class_instance &conn, double timeou double send_timestamp = std::chrono::duration{std::chrono::system_clock::now().time_since_epoch()}.count(); cur->resumable_id = register_forked_resumable(new rpc_resumable(q_id)); - cur->function_magic = function_magic; + cur->function_magic = CurrentProcessingQuery::get().get_last_stored_tl_function_magic(); cur->actor_or_port = conn.get()->actor_id > 0 ? conn.get()->actor_id : -conn.get()->port; cur->timer = nullptr; diff --git a/runtime/typed_rpc.h b/runtime/typed_rpc.h index f756084a51..6b57fdf5a1 100644 --- a/runtime/typed_rpc.h +++ b/runtime/typed_rpc.h @@ -49,8 +49,7 @@ array f$typed_rpc_tl_query(const class_instance &conne static_assert(std::is_same_v, "Unexpected type"); if (ignore_answer && need_responses_extra_info) { - php_warning( - "Both $ignore_answer and $need_responses_extra_info are 'true'. Can't collect metrics for ignored answers"); + php_warning("Both $ignore_answer and $need_responses_extra_info are 'true'. Can't collect metrics for ignored answers"); } size_t bytes_sent = 0; diff --git a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActor.php b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActor.php new file mode 100644 index 0000000000..9b89786da3 --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActor.php @@ -0,0 +1,77 @@ +actor_id = $actor_id; + $this->query = $query; + } + + /** + * @param TL\RpcFunctionReturnResult $function_return_result + * @return TL\RpcFunctionReturnResult + */ + public static function functionReturnValue($function_return_result) { + if ($function_return_result instanceof rpcDestActor_result) { + return $function_return_result->value; + } + warning('Unexpected result type in functionReturnValue: ' . ($function_return_result ? get_class($function_return_result) : 'null')); + return (new rpcDestActor_result())->value; + } + + /** + * @kphp-inline + * + * @param TL\RpcResponse $response + * @return TL\RpcFunctionReturnResult + */ + public static function result(TL\RpcResponse $response) { + return self::functionReturnValue($response->getResult()); + } + + /** + * @kphp-inline + * + * @return string + */ + public function getTLFunctionName() { + return 'rpcDestActor'; + } + +} + +/** + * @kphp-tl-class + */ +class rpcDestActor_result implements TL\RpcFunctionReturnResult { + + /** @var TL\RpcFunctionReturnResult */ + public $value = null; + +} diff --git a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActorFlags.php b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActorFlags.php new file mode 100644 index 0000000000..9a1b39cec8 --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestActorFlags.php @@ -0,0 +1,87 @@ +actor_id = $actor_id; + $this->flags = $flags; + $this->extra = $extra; + $this->query = $query; + } + + /** + * @param TL\RpcFunctionReturnResult $function_return_result + * @return TL\RpcFunctionReturnResult + */ + public static function functionReturnValue($function_return_result) { + if ($function_return_result instanceof rpcDestActorFlags_result) { + return $function_return_result->value; + } + warning('Unexpected result type in functionReturnValue: ' . ($function_return_result ? get_class($function_return_result) : 'null')); + return (new rpcDestActorFlags_result())->value; + } + + /** + * @kphp-inline + * + * @param TL\RpcResponse $response + * @return TL\RpcFunctionReturnResult + */ + public static function result(TL\RpcResponse $response) { + return self::functionReturnValue($response->getResult()); + } + + /** + * @kphp-inline + * + * @return string + */ + public function getTLFunctionName() { + return 'rpcDestActorFlags'; + } + +} + +/** + * @kphp-tl-class + */ +class rpcDestActorFlags_result implements TL\RpcFunctionReturnResult { + + /** @var TL\RpcFunctionReturnResult */ + public $value = null; + +} diff --git a/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestFlags.php b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestFlags.php new file mode 100644 index 0000000000..c0cdcdd3c3 --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/_common/Functions/rpcDestFlags.php @@ -0,0 +1,82 @@ +flags = $flags; + $this->extra = $extra; + $this->query = $query; + } + + /** + * @param TL\RpcFunctionReturnResult $function_return_result + * @return TL\RpcFunctionReturnResult + */ + public static function functionReturnValue($function_return_result) { + if ($function_return_result instanceof rpcDestFlags_result) { + return $function_return_result->value; + } + warning('Unexpected result type in functionReturnValue: ' . ($function_return_result ? get_class($function_return_result) : 'null')); + return (new rpcDestFlags_result())->value; + } + + /** + * @kphp-inline + * + * @param TL\RpcResponse $response + * @return TL\RpcFunctionReturnResult + */ + public static function result(TL\RpcResponse $response) { + return self::functionReturnValue($response->getResult()); + } + + /** + * @kphp-inline + * + * @return string + */ + public function getTLFunctionName() { + return 'rpcDestFlags'; + } + +} + +/** + * @kphp-tl-class + */ +class rpcDestFlags_result implements TL\RpcFunctionReturnResult { + + /** @var TL\RpcFunctionReturnResult */ + public $value = null; + +} diff --git a/tests/python/tests/rpc/php/VK/TL/_common/Types/rpcInvokeReqExtra.php b/tests/python/tests/rpc/php/VK/TL/_common/Types/rpcInvokeReqExtra.php new file mode 100644 index 0000000000..018cdfa4d3 --- /dev/null +++ b/tests/python/tests/rpc/php/VK/TL/_common/Types/rpcInvokeReqExtra.php @@ -0,0 +1,191 @@ +return_binlog_pos) { + $mask |= self::BIT_RETURN_BINLOG_POS_0; + } + + if ($this->return_binlog_time) { + $mask |= self::BIT_RETURN_BINLOG_TIME_1; + } + + if ($this->return_pid) { + $mask |= self::BIT_RETURN_PID_2; + } + + if ($this->return_request_sizes) { + $mask |= self::BIT_RETURN_REQUEST_SIZES_3; + } + + if ($this->return_failed_subqueries) { + $mask |= self::BIT_RETURN_FAILED_SUBQUERIES_4; + } + + if ($this->return_query_stats) { + $mask |= self::BIT_RETURN_QUERY_STATS_6; + } + + if ($this->no_result) { + $mask |= self::BIT_NO_RESULT_7; + } + + if ($this->wait_binlog_pos !== null) { + $mask |= self::BIT_WAIT_BINLOG_POS_16; + } + + if ($this->string_forward_keys !== null) { + $mask |= self::BIT_STRING_FORWARD_KEYS_18; + } + + if ($this->int_forward_keys !== null) { + $mask |= self::BIT_INT_FORWARD_KEYS_19; + } + + if ($this->string_forward !== null) { + $mask |= self::BIT_STRING_FORWARD_20; + } + + if ($this->int_forward !== null) { + $mask |= self::BIT_INT_FORWARD_21; + } + + if ($this->custom_timeout_ms !== null) { + $mask |= self::BIT_CUSTOM_TIMEOUT_MS_23; + } + + if ($this->supported_compression_version !== null) { + $mask |= self::BIT_SUPPORTED_COMPRESSION_VERSION_25; + } + + if ($this->random_delay !== null) { + $mask |= self::BIT_RANDOM_DELAY_26; + } + + if ($this->return_view_number) { + $mask |= self::BIT_RETURN_VIEW_NUMBER_27; + } + + return $mask; + } + +} diff --git a/tests/python/tests/rpc/php/index.php b/tests/python/tests/rpc/php/index.php index 5bde09d781..c3004102f0 100644 --- a/tests/python/tests/rpc/php/index.php +++ b/tests/python/tests/rpc/php/index.php @@ -1,83 +1,57 @@ "engine.stat"], ['_' => "engine.pid"], ['_' => "engine.version"], ["_" => "engine.sleep", "time_ms" => (int)(200)]]; - $conn = new_rpc_connection("localhost", (int)$_GET["master-port"]); - $requests_extra_info = new \KphpRpcRequestsExtraInfo; - - $query_ids = rpc_tl_query($conn, $queries, -1.0, false, $requests_extra_info, true); +require_once "rpc_extra_info.php"; +require_once "rpc_wrappers.php"; - $res = rpc_tl_query_result($query_ids); - - $responses_extra_info = []; - foreach ($query_ids as $q_id) { - $extra_info = extract_kphp_rpc_response_extra_info($q_id); - if (is_null($extra_info)) { - critical_error("got null rpc response extra info after processing an rpc response!"); +function do_http_worker() { + switch($_SERVER["PHP_SELF"]) { + case "/test_kphp_untyped_rpc_extra_info": { + test_kphp_untyped_rpc_extra_info(); + return; } - array_push($responses_extra_info, $extra_info); - } - - echo json_encode([ - "result" => $res["result"], - "requests_extra_info" => array_map(fn($req_tup): array => [$req_tup[0]], $requests_extra_info->get()), - "responses_extra_info" => array_map(fn($resp_tup): array => [$resp_tup[0], $resp_tup[1]], $responses_extra_info), - ]); -} - -function get_kphp_typed_rpc_extra_info() { - $queries = [new engine_stat(), new engine_pid(), new engine_version(), new engine_sleep(200)]; - $conn = new_rpc_connection("localhost", (int)$_GET["master-port"]); - $requests_extra_info = new \KphpRpcRequestsExtraInfo; - - $query_ids = typed_rpc_tl_query($conn, $queries, -1.0, false, $requests_extra_info, true); - - $res = typed_rpc_tl_query_result($query_ids); - - $responses_extra_info = []; - foreach ($query_ids as $q_id) { - $extra_info = extract_kphp_rpc_response_extra_info($q_id); - if (is_null($extra_info)) { - critical_error("got null rpc response extra info after processing an rpc response!"); + case "/test_kphp_typed_rpc_extra_info": { + test_kphp_typed_rpc_extra_info(); + return; } - array_push($responses_extra_info, $extra_info); - } - - foreach ($res as $resp) { - if ($resp->isError()) { - critical_error("bad rpc response"); + case "/test_rpc_no_wrappers_with_actor_id": { + test_rpc_no_wrappers_with_actor_id(); + return; } - } - - echo json_encode([ - "requests_extra_info" => array_map(fn($req_tup): array => [$req_tup[0]], $requests_extra_info->get()), - "responses_extra_info" => array_map(fn($resp_tup): array => [$resp_tup[0], $resp_tup[1]], $responses_extra_info), - ]); -} - -function do_http_worker() { - switch($_SERVER["PHP_SELF"]) { - case "/get_kphp_untyped_rpc_extra_info": { - get_kphp_untyped_rpc_extra_info(); + case "/test_rpc_no_wrappers_with_ignore_answer": { + test_rpc_no_wrappers_with_ignore_answer(); + return; + } + case "/test_rpc_dest_actor_with_actor_id": { + test_rpc_dest_actor_with_actor_id(); + return; + } + case "/test_rpc_dest_actor_with_ignore_answer": { + test_rpc_dest_actor_with_ignore_answer(); + return; + } + case "/test_rpc_dest_flags_with_actor_id": { + test_rpc_dest_flags_with_actor_id(); + return; + } + case "/test_rpc_dest_flags_with_ignore_answer": { + test_rpc_dest_flags_with_ignore_answer(); + return; + } + case "/test_rpc_dest_flags_with_ignore_answer_1": { + test_rpc_dest_flags_with_ignore_answer_1(); + return; + } + case "/test_rpc_dest_actor_flags_with_actor_id": { + test_rpc_dest_actor_flags_with_actor_id(); + return; + } + case "/test_rpc_dest_actor_flags_with_ignore_answer": { + test_rpc_dest_actor_flags_with_ignore_answer(); return; } - case "/get_kphp_typed_rpc_extra_info": { - get_kphp_typed_rpc_extra_info(); + case "/test_rpc_dest_actor_flags_with_ignore_answer_1": { + test_rpc_dest_actor_flags_with_ignore_answer_1(); return; } } diff --git a/tests/python/tests/rpc/php/rpc_extra_info.php b/tests/python/tests/rpc/php/rpc_extra_info.php new file mode 100644 index 0000000000..f671c3538e --- /dev/null +++ b/tests/python/tests/rpc/php/rpc_extra_info.php @@ -0,0 +1,71 @@ + "engine.stat"], ['_' => "engine.pid"], ['_' => "engine.version"], ["_" => "engine.sleep", "time_ms" => (int)(200)]]; + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"]); + $requests_extra_info = new \KphpRpcRequestsExtraInfo; + + $query_ids = rpc_tl_query($conn, $queries, -1.0, false, $requests_extra_info, true); + + $res = rpc_tl_query_result($query_ids); + + $responses_extra_info = []; + foreach ($query_ids as $q_id) { + $extra_info = extract_kphp_rpc_response_extra_info($q_id); + if (is_null($extra_info)) { + critical_error("got null rpc response extra info after processing an rpc response!"); + } + array_push($responses_extra_info, $extra_info); + } + + echo json_encode([ + "result" => $res["result"], + "requests_extra_info" => array_map(fn($req_tup): array => [$req_tup[0]], $requests_extra_info->get()), + "responses_extra_info" => array_map(fn($resp_tup): array => [$resp_tup[0], $resp_tup[1]], $responses_extra_info), + ]); +} + +function test_kphp_typed_rpc_extra_info() { + $queries = [new engine_stat(), new engine_pid(), new engine_version(), new engine_sleep(200)]; + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"]); + $requests_extra_info = new \KphpRpcRequestsExtraInfo; + + $query_ids = typed_rpc_tl_query($conn, $queries, -1.0, false, $requests_extra_info, true); + + $res = typed_rpc_tl_query_result($query_ids); + + $responses_extra_info = []; + foreach ($query_ids as $q_id) { + $extra_info = extract_kphp_rpc_response_extra_info($q_id); + if (is_null($extra_info)) { + critical_error("got null rpc response extra info after processing an rpc response!"); + } + array_push($responses_extra_info, $extra_info); + } + + foreach ($res as $resp) { + if ($resp->isError()) { + critical_error("bad rpc response"); + } + } + + echo json_encode([ + "requests_extra_info" => array_map(fn($req_tup): array => [$req_tup[0]], $requests_extra_info->get()), + "responses_extra_info" => array_map(fn($resp_tup): array => [$resp_tup[0], $resp_tup[1]], $responses_extra_info), + ]); +} diff --git a/tests/python/tests/rpc/php/rpc_wrappers.php b/tests/python/tests/rpc/php/rpc_wrappers.php new file mode 100644 index 0000000000..ff81854a37 --- /dev/null +++ b/tests/python/tests/rpc/php/rpc_wrappers.php @@ -0,0 +1,161 @@ + $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_no_wrappers_with_ignore_answer() { + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"]); + + $query_ids = typed_rpc_tl_query($conn, [new engine_stat()], -1.0, true); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_dest_actor_with_actor_id() { + $actor_id = 1997; + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"], 0); + + $query_ids = typed_rpc_tl_query($conn, [new rpcDestActor($actor_id, new engine_stat())]); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_dest_actor_with_ignore_answer() { + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"], 0); + + $query_ids = typed_rpc_tl_query($conn, [new rpcDestActor(0, new engine_stat())], -1.0, true); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_dest_flags_with_actor_id() { + $actor_id = 1997; + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"], $actor_id); + + $query_ids = typed_rpc_tl_query($conn, [new rpcDestFlags(0, new rpcInvokeReqExtra(), new engine_stat())]); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_dest_flags_with_ignore_answer() { + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"], 0); + + $query_ids = typed_rpc_tl_query($conn, [new rpcDestFlags(0, new rpcInvokeReqExtra(), new engine_stat())], -1.0, true); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_dest_flags_with_ignore_answer_1() { + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"], 0); + + $query_ids = typed_rpc_tl_query($conn, [new rpcDestFlags(rpcInvokeReqExtra::BIT_NO_RESULT_7, new rpcInvokeReqExtra(), new engine_stat())], -1.0, true); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_dest_actor_flags_with_actor_id() { + $actor_id = 1997; + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"], $actor_id); + + $query_ids = typed_rpc_tl_query($conn, [new rpcDestActorFlags(0, 0, new rpcInvokeReqExtra(), new engine_stat())]); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_dest_actor_flags_with_ignore_answer() { + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"], 0); + + $query_ids = typed_rpc_tl_query($conn, [new rpcDestActorFlags(0, 0, new rpcInvokeReqExtra(), new engine_stat())], -1.0, true); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} + +function test_rpc_dest_actor_flags_with_ignore_answer_1() { + $conn = new_rpc_connection("localhost", (int)$_GET["master-port"], 0); + + $query_ids = typed_rpc_tl_query($conn, [new rpcDestActorFlags(0, rpcInvokeReqExtra::BIT_NO_RESULT_7, new rpcInvokeReqExtra(), new engine_stat())], -1.0, true); + $resp = typed_rpc_tl_query_result($query_ids)[0]; + + if ($resp instanceof rpcResponseError) { + echo json_encode(["error" => $resp->error_code]); + return; + } + + critical_error("unreachable"); +} diff --git a/tests/python/tests/rpc/test_rpc_extra_info.py b/tests/python/tests/rpc/test_rpc_extra_info.py index d2ddb27611..c0d0ab03ec 100644 --- a/tests/python/tests/rpc/test_rpc_extra_info.py +++ b/tests/python/tests/rpc/test_rpc_extra_info.py @@ -6,7 +6,7 @@ class TestRpcExtraInfo(KphpServerAutoTestCase): def test_untyped_rpc_extra_info(self): rpc_extra_info = self.kphp_server.http_get( - "/get_kphp_untyped_rpc_extra_info?master-port={}".format(self.kphp_server.master_port)) + "/test_kphp_untyped_rpc_extra_info?master-port={}".format(self.kphp_server.master_port)) self.assertEqual(rpc_extra_info.status_code, 200) self.assertNotEqual(rpc_extra_info.text, "") @@ -33,7 +33,7 @@ def test_untyped_rpc_extra_info(self): def test_typed_rpc_extra_info(self): rpc_extra_info = self.kphp_server.http_get( - "/get_kphp_typed_rpc_extra_info?master-port={}".format(self.kphp_server.master_port)) + "/test_kphp_typed_rpc_extra_info?master-port={}".format(self.kphp_server.master_port)) self.assertEqual(rpc_extra_info.status_code, 200) self.assertNotEqual(rpc_extra_info.text, "") diff --git a/tests/python/tests/rpc/test_rpc_wrappers.py b/tests/python/tests/rpc/test_rpc_wrappers.py new file mode 100644 index 0000000000..ebd126f820 --- /dev/null +++ b/tests/python/tests/rpc/test_rpc_wrappers.py @@ -0,0 +1,109 @@ +import json + +from python.lib.testcase import KphpServerAutoTestCase + +BAD_ACTOR_ID_ERROR_CODE = -2002 +WRONG_QUERY_ID_ERROR_CODE = -1003 + + +class TestRpcWrappers(KphpServerAutoTestCase): + def test_rpc_no_wrappers_with_actor_id(self): + rpc_response = self.kphp_server.http_get("/test_rpc_no_wrappers_with_actor_id?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], BAD_ACTOR_ID_ERROR_CODE) + + def test_rpc_no_wrappers_with_ignore_answer(self): + rpc_response = self.kphp_server.http_get("/test_rpc_no_wrappers_with_ignore_answer?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], WRONG_QUERY_ID_ERROR_CODE) + + def test_rpc_dest_actor_with_actor_id(self): + rpc_response = self.kphp_server.http_get("/test_rpc_dest_actor_with_actor_id?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], BAD_ACTOR_ID_ERROR_CODE) + + def test_rpc_dest_actor_with_ignore_answer(self): + rpc_response = self.kphp_server.http_get("/test_rpc_dest_actor_with_ignore_answer?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], WRONG_QUERY_ID_ERROR_CODE) + + def test_rpc_dest_flags_with_actor_id(self): + bad_actor_id_error_code = -2002 + rpc_response = self.kphp_server.http_get("/test_rpc_dest_flags_with_actor_id?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], BAD_ACTOR_ID_ERROR_CODE) + + def test_rpc_dest_flags_with_ignore_answer(self): + rpc_response = self.kphp_server.http_get("/test_rpc_dest_flags_with_ignore_answer?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], WRONG_QUERY_ID_ERROR_CODE) + + def test_rpc_dest_flags_with_ignore_answer_1(self): + rpc_response = self.kphp_server.http_get("/test_rpc_dest_flags_with_ignore_answer_1?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], WRONG_QUERY_ID_ERROR_CODE) + + def test_rpc_dest_actor_flags_with_actor_id(self): + rpc_response = self.kphp_server.http_get("/test_rpc_dest_actor_flags_with_actor_id?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], BAD_ACTOR_ID_ERROR_CODE) + + def test_rpc_dest_actor_flags_with_ignore_answer(self): + rpc_response = self.kphp_server.http_get("/test_rpc_dest_actor_flags_with_ignore_answer?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], WRONG_QUERY_ID_ERROR_CODE) + + def test_rpc_dest_actor_flags_with_ignore_answer_1(self): + rpc_response = self.kphp_server.http_get("/test_rpc_dest_actor_flags_with_ignore_answer_1?master-port={}".format(self.kphp_server.master_port)) + + self.assertEqual(rpc_response.status_code, 200) + self.assertNotEqual(rpc_response.text, "") + + output = json.loads(rpc_response.text) + + self.assertEqual(output["error"], WRONG_QUERY_ID_ERROR_CODE) diff --git a/vkext/vkext-rpc.cpp b/vkext/vkext-rpc.cpp index 71a5a1d3d0..6f5512327c 100644 --- a/vkext/vkext-rpc.cpp +++ b/vkext/vkext-rpc.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "common/rpc-headers.h" #include "common/crc32.h" @@ -1353,11 +1354,22 @@ static int rpc_write(struct rpc_connection *c, long long qid, double timeout, bo return -1; } - RpcExtraHeaders extra_headers{}; - size_t extra_headers_size = fill_extra_headers_if_needed(extra_headers, *reinterpret_cast(outbuf->rptr), c->default_actor_id, ignore_answer); + const auto [opt_new_wrapper, cur_wrapper_size, opt_actor_id_warning_info, opt_ignore_result_warning_msg]{ + regularize_wrappers(outbuf->rptr, c->default_actor_id, ignore_answer)}; - outbuf->rptr -= extra_headers_size; - memcpy(outbuf->rptr, &extra_headers, extra_headers_size); + if (opt_actor_id_warning_info.has_value()) { + const auto [msg, cur_wrapper_actor_id, new_wrapper_actor_id]{opt_actor_id_warning_info.value()}; + php_error_docref(nullptr, E_WARNING, msg, cur_wrapper_actor_id, new_wrapper_actor_id); + } + if (opt_ignore_result_warning_msg != nullptr) { + php_error_docref(nullptr, E_WARNING, opt_ignore_result_warning_msg); + } + + if (opt_new_wrapper.has_value()) { + const auto [new_wrapper, new_wrapper_size]{opt_new_wrapper.value()}; + outbuf->rptr -= new_wrapper_size - cur_wrapper_size; + std::memcpy(outbuf->rptr, &new_wrapper, new_wrapper_size); + } unsigned crc32 = 0; int len = sizeof(RpcHeaders) + sizeof(crc32) + (outbuf->wptr - outbuf->rptr);