This repository has been archived by the owner on Jan 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathsimulator_communication.h
282 lines (252 loc) · 8.23 KB
/
simulator_communication.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
// Copyright (c) 2017 Baidu Inc. All Rights Reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <boost/asio.hpp>
#include <stdio.h>
#include "data_packet.h"
namespace simulator {
namespace communication {
/******************************** MessageHeader *******************************/
/**
* Manage the message header that will be added in front of the message to be
* sent.
*
* Message header is a fixed-size byte array that encodes necessary information
* for later process of the actual message. Currently, message header is simply
* the size of the message. To put more content in message header, one only
* needs to change the member function `make_header`.
*/
class MessageHeader {
private:
using Socket = boost::asio::ip::tcp::socket;
using BinaryBuffer = simulator::util::BinaryBuffer;
using BinaryBufferPtr = std::unique_ptr<BinaryBuffer>;
public:
MessageHeader() {}
/**
* Read message header from socket.
*
* This function should be called before reading the message content.
*/
void read_from_socket(Socket& s) {
try {
size_t bytes_read = boost::asio::read(
s, boost::asio::buffer((char*)&msg_size_, header_size));
CHECK_EQ(bytes_read, header_size);
} catch (boost::system::system_error const & e) {
LOG(FATAL) << "asio error occured when reading message header";
}
}
/**
* Add message header content in from of the message.
*/
void insert_into_msg(BinaryBufferPtr& msg_body) {
msg_body->insert(0, msg_size_);
}
/**
* Construct message header.
*/
void make_header(size_t msg_size) {
msg_size_ = msg_size;
}
/**
* Return the size (in terms of bytes) of message (not including message
* head).
*/
size_t msg_size() { return msg_size_; }
private:
size_t msg_size_;
static size_t const header_size = sizeof(size_t);
};
/******************************** Communicators *******************************/
/**
* Communicator provides the interface for composing and interpreting messages
* and sending and receiving messages through socket.
*
* Communicator is designed specifically for our game simulation. It assumes
* that users from both endpoints (e.g., server and client) have consensus on
* the message format.
*/
class Communicator {
protected:
using BinaryBuffer = simulator::util::BinaryBuffer;
using BinaryBufferPtr = std::unique_ptr<BinaryBuffer>;
using IOService = boost::asio::io_service;
using Socket = boost::asio::ip::tcp::socket;
public:
Communicator();
virtual ~Communicator() {}
protected:
/**
* Establish TCP/IP connection with a remote endpoint.
*
* Users should implement this method for both server and client sides. The
* function returns true if connection is established, false otherwise.
*/
virtual bool establish_connection() = 0;
/**
* Shutdown and close socket.
*/
void close_connection();
/**
* Send message to through an established socket.
*
* The call will block until the message is transferred.
*/
void deliver_msg();
/**
* Receive message from an established socket.
*
* This function does not interpret message. Users should use `read_msg` to
* get content from the message.
*/
void receive_msg();
/**
* Get content from message.
*/
template <typename... Args>
void read_msg(Args&... args) {}
/**
* Get content from message.
*/
template <typename T, typename... Args>
void read_msg(T& t, Args& ... rets) {
msg_body_->read(t);
read_msg(rets...);
}
/**
* Get content from message.
*
* @param[out] sim_data simulation data
* @param[out] rets rest of data to get from message
*/
template <typename... Args>
void read_msg(StatePacket& sim_data, Args& ... rets) {
read_msg(rets...);
sim_data.decode(*msg_body_);
}
/**
* Compose message with provided inputs.
*/
template <typename... Args>
void compose_msg(const Args& ... args) {
msg_body_->clear();
append_msg(args...);
}
/**
* Compose message with provided inputs.
*
* @param[in] sim_data simulation data
* @param[in] rets rest of data to put into message
*/
template <typename... Args>
void compose_msg(const StatePacket& sim_data, const Args& ... args) {
msg_body_->clear();
append_msg(args...);
sim_data.encode(*msg_body_);
}
IOService io_service_;
Socket socket_;
private:
/**
* Utility function called by `compose_msg` to add data into the back
* of message.
*/
template <typename... Args>
void append_msg(const Args& ... args) {}
/**
* Utility function called by `compose_msg` to add data into the back
* of message.
*/
template <typename T, typename... Args>
void append_msg(const T& t, const Args& ... args) {
msg_body_->append(t);
append_msg(args...);
}
MessageHeader msg_header_;
BinaryBufferPtr msg_body_;
};
/**
* Communicator on the server (main process) side.
*/
class CommServer : public Communicator {
using Acceptor = boost::asio::ip::tcp::acceptor;
using Endpoint = boost::asio::ip::tcp::endpoint;
public:
CommServer();
int port() const { return port_; }
~CommServer() {}
protected:
/**
* Set up listening port and wait for connection from client side.
*/
virtual bool establish_connection() override;
/**
* Initiate a remote function call.
*
* A remote function call is accomplished by following steps:
* 1. Caller (CommServer) sends a message with the function name and
* arguments to Callee (CommClient), and waits for Callee's reply.
* 2. Callee executes the function request by Caller, and send back the
* function name and returns to Caller.
* 3. Caller receives the reply from Callee, check the function name that
* comes with it. Then Caller processes the rest of of the reply.
*
* The return of the function is sent back in the form of message.
*
* @param[in] func_name function name
* @param[in] sim_data pointer to simulation data. NULL if the function
* to call does not use simulation data.
* @param[in] args input arguments
*/
template <typename... Args>
void call_remote_func(const std::string& func_name,
const StatePacket* sim_data,
Args... args) {
if (sim_data) {
compose_msg(*sim_data, func_name, args...);
} else {
compose_msg(func_name, args...);
}
deliver_msg();
receive_msg(); // wait for return
// make sure that we did get the return we want
std::string reply;
read_msg(reply);
CHECK_EQ(reply, func_name);
}
protected:
int port_; // listening port
private:
Acceptor acceptor_;
};
/**
* Communicator on the client (child processes) side.
*/
class CommClient : public Communicator {
using Resolver = boost::asio::ip::tcp::resolver;
using TCP_Query = boost::asio::ip::tcp::resolver::query;
using TCP_Iterator = boost::asio::ip::tcp::resolver::iterator;
public:
CommClient(const std::string& host, const int port_no);
~CommClient() {}
protected:
/**
* Try to connect to server for at most MAX_ATTEMPTS times.
*/
virtual bool establish_connection() override;
std::string host_; // Address of server
int port_; // port of server
private:
Resolver resolver_;
static const int MAX_ATTEMPTS;
};
}} // simulator::communication