forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhot_restarting_base.h
120 lines (104 loc) · 5.57 KB
/
hot_restarting_base.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#pragma once
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <unistd.h>
#include <array>
#include <atomic>
#include <cstdint>
#include <string>
#include "envoy/common/platform.h"
#include "envoy/server/hot_restart.h"
#include "envoy/server/options.h"
#include "envoy/stats/scope.h"
#include "source/common/common/assert.h"
#include "source/server/hot_restart.pb.h"
namespace Envoy {
namespace Server {
class RpcStream : public Logger::Loggable<Logger::Id::main> {
public:
enum class Blocking { Yes, No };
explicit RpcStream(uint64_t base_id) : base_id_(base_id) {}
~RpcStream();
void initDomainSocketAddress(sockaddr_un* address);
sockaddr_un createDomainSocketAddress(uint64_t id, const std::string& role,
const std::string& socket_path, mode_t socket_mode);
void bindDomainSocket(uint64_t id, const std::string& role, const std::string& socket_path,
mode_t socket_mode);
// Protocol description:
//
// In each direction between parent<-->child, a series of pairs of:
// A uint64 'length' (bytes in network order),
// followed by 'length' bytes of a serialized HotRestartMessage.
// Each new message must start in a new sendmsg datagram, i.e. 'length' must always start at
// byte 0. Each sendmsg datagram can be up to 4096 bytes (including 'length' if present). When
// the serialized protobuf is longer than 4096-8 bytes, and so cannot fit in just one datagram,
// it is delivered by a series of datagrams. In each of these continuation datagrams, the
// protobuf data starts at byte 0.
//
// There is no mechanism to explicitly pair responses to requests. However, the child initiates
// all exchanges, and blocks until a reply is received, so there is implicit pairing.
bool sendHotRestartMessage(sockaddr_un& address, const envoy::HotRestartMessage& proto,
bool allow_failure = false);
// Receive data, possibly enough to build one of our protocol messages.
// If block is true, blocks until a full protocol message is available.
// If block is false, returns nullptr if we run out of data to receive before a full protocol
// message is available. In either case, the HotRestartingBase may end up buffering some data
// for the next protocol message, even if the function returns a protobuf.
std::unique_ptr<envoy::HotRestartMessage> receiveHotRestartMessage(Blocking block);
bool replyIsExpectedType(const envoy::HotRestartMessage* proto,
envoy::HotRestartMessage::Reply::ReplyCase oneof_type) const;
int domain_socket_{-1};
private:
void getPassedFdIfPresent(envoy::HotRestartMessage* out, msghdr* message);
std::unique_ptr<envoy::HotRestartMessage> parseProtoAndResetState();
void initRecvBufIfNewMessage();
// An int in [0, MaxConcurrentProcesses). As hot restarts happen, each next process gets the
// next of 0,1,2,0,1,...
// A HotRestartingBase's domain socket's name contains its base_id_ value, and so we can use
// this value to determine which domain socket name to treat as our parent, and which to treat
// as our child. (E.g. if we are 2, 1 is parent and 0 is child).
const uint64_t base_id_;
// State for the receiving half of the protocol.
//
// When filled, the size in bytes that the in-flight HotRestartMessage should be.
// When empty, we're ready to start receiving a new message (starting with a uint64 'length').
absl::optional<uint64_t> expected_proto_length_;
// How much of the current in-flight message (including both the uint64 'length', plus the proto
// itself) we have received. Once this equals expected_proto_length_ + sizeof(uint64_t), we're
// ready to parse the HotRestartMessage. Should be set to 0 in between messages, to indicate
// readiness for a new message.
uint64_t cur_msg_recvd_bytes_{};
// The first 8 bytes will always be the raw net-order bytes of the current value of
// expected_proto_length_. The protobuf partial data starts at byte 8.
// Should be resized to 0 in between messages, to indicate readiness for a new message.
std::vector<uint8_t> recv_buf_;
};
/**
* Logic shared by the implementations of both sides of the child<-->parent hot restart protocol:
* domain socket communication, and our ad hoc RPC protocol.
*/
class HotRestartingBase : public Logger::Loggable<Logger::Id::main> {
protected:
HotRestartingBase(uint64_t base_id)
: main_rpc_stream_(base_id), udp_forwarding_rpc_stream_(base_id) {}
// Returns a Gauge that tracks hot-restart generation, where every successive
// child increments this number.
static Stats::Gauge& hotRestartGeneration(Stats::Scope& scope);
// A stream over a unix socket between the parent and child instances, used
// for the child instance to request socket information and control draining
// and shutdown of the parent.
RpcStream main_rpc_stream_;
// A separate channel is used for udp forwarding because udp forwarding can
// begin while communication on the main channel is still occurring. The hot
// restarter is single-threaded, so we don't have to worry about packets coming
// in a jumbled order, but there are two instances of the hot restarter, the
// parent and the child; it is possible for the child to send a udp packet
// while the parent is sending a request on the main channel, for which it will
// expect to receive a response (and not an unrelated udp packet). Therefore, a
// separate channel is used to deliver udp packets, ensuring no interference
// between the two data sources.
RpcStream udp_forwarding_rpc_stream_;
};
} // namespace Server
} // namespace Envoy