Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow disabling send size zerocopy for rdma #2267

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion example/rdma_performance/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ DEFINE_int32(expected_qps, 0, "The expected QPS");
DEFINE_int32(max_thread_num, 16, "The max number of threads are used");
DEFINE_int32(attachment_size, -1, "Attachment size is used (in Bytes)");
DEFINE_bool(echo_attachment, false, "Select whether attachment should be echo");
DEFINE_bool(attachment_as_userdata, false, "Append attachment as user_data");
DEFINE_string(connection_type, "single", "Connection type of the channel");
DEFINE_string(protocol, "baidu_std", "Protocol type.");
DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers");
Expand Down Expand Up @@ -86,7 +87,12 @@ class PerformanceTest {
if (attachment_size > 0) {
_addr = malloc(attachment_size);
butil::fast_rand_bytes(_addr, attachment_size);
_attachment.append(_addr, attachment_size);
if (FLAGS_attachment_as_userdata) {
brpc::rdma::RegisterMemoryForRdma(_addr, (size_t)attachment_size);
_attachment.append_user_data(_addr, attachment_size, NULL);
} else {
_attachment.append(_addr, attachment_size);
}
}
_echo_attachment = echo_attachment;
}
Expand Down
46 changes: 35 additions & 11 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ extern bool g_skip_rdma_init;

DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
DEFINE_bool(rdma_send_zerocopy, true, "Enable zerocopy for send side");
DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
Expand Down Expand Up @@ -801,29 +802,45 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
wr.sg_list = sglist;
wr.opcode = IBV_WR_SEND_WITH_IMM;

RdmaIOBuf* data = (RdmaIOBuf*)from[current];
size_t sge_index = 0;
while (sge_index < (uint32_t)max_sge &&
this_len < _remote_recv_block_size) {
if (data->size() == 0) {
if (from[current]->size() == 0) {
// The current IOBuf is empty, find next one
++current;
if (current == ndata) {
break;
}
data = (RdmaIOBuf*)from[current];
continue;
}

ssize_t len = data->cut_into_sglist_and_iobuf(
sglist, &sge_index, to, max_sge,
_remote_recv_block_size - this_len);
if (len < 0) {
return -1;
ssize_t len = 0;
if (FLAGS_rdma_send_zerocopy) {
ssize_t len = ((RdmaIOBuf*)from[current])->cut_into_sglist_and_iobuf(
sglist, &sge_index, to, max_sge,
_remote_recv_block_size - this_len);
if (len < 0) {
return -1;
}
this_len += len;
total_len += len;
} else {
len = _remote_recv_block_size - this_len;
void* buf = AllocBlock(len);
if (!buf) {
return -1;
}
len = from[current]->copy_to(buf, len);
from[current]->cutn(to, len);
sglist[sge_index].length = len;
sglist[sge_index].addr = (uint64_t)buf;
sglist[sge_index].lkey = GetLKey(buf);
++sge_index;
this_len += len;
total_len += len;
_sbuf_data[_sq_current] = buf;
break;
}
CHECK(len > 0);
this_len += len;
total_len += len;
}
if (this_len == 0) {
continue;
Expand Down Expand Up @@ -951,6 +968,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
uint32_t acks = butil::NetToHost32(wc.imm_data);
uint32_t num = acks;
while (num > 0) {
if (!FLAGS_rdma_send_zerocopy) {
DeallocBlock(_sbuf_data[_sq_sent]);
}
_sbuf[_sq_sent++].clear();
if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
_sq_sent = 0;
Expand Down Expand Up @@ -1139,6 +1159,10 @@ int RdmaEndpoint::AllocateResources() {
if (_rbuf.size() != _rq_size) {
return -1;
}
_sbuf_data.resize(_sq_size, NULL);
if (_sbuf_data.size() != _sq_size) {
return -1;
}
_rbuf_data.resize(_rq_size, NULL);
if (_rbuf_data.size() != _rq_size) {
return -1;
Expand Down
2 changes: 2 additions & 0 deletions src/brpc/rdma/rdma_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ friend class brpc::Socket;
// Act as sendbuf and recvbuf, but requires no memcpy
std::vector<butil::IOBuf> _sbuf;
std::vector<butil::IOBuf> _rbuf;
// Data address of _sbuf
std::vector<void*> _sbuf_data;
// Data address of _rbuf
std::vector<void*> _rbuf_data;
// Remote block size for receiving
Expand Down
13 changes: 8 additions & 5 deletions src/brpc/rdma/rdma_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,12 +643,15 @@ ibv_pd* GetRdmaPd() {
}

uint32_t GetLKey(void* buf) {
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
ibv_mr** mr_ptr = g_user_mrs->seek(buf);
if (mr_ptr) {
return (*mr_ptr)->lkey;
uint32_t lkey = GetRegionId(buf);
if (lkey == 0) {
BAIDU_SCOPED_LOCK(*g_user_mrs_lock);
ibv_mr** mr_ptr = g_user_mrs->seek(buf);
if (mr_ptr) {
return (*mr_ptr)->lkey;
}
}
return 0;
return lkey;
}

ibv_gid GetRdmaGid() {
Expand Down
7 changes: 6 additions & 1 deletion test/brpc_rdma_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct HelloMessage {
};

DECLARE_bool(rdma_trace_verbose);
DECLARE_bool(rdma_send_zerocopy);
DECLARE_int32(rdma_memory_pool_max_regions);
extern ibv_cq* (*IbvCreateCq)(ibv_context*, int, void*, ibv_comp_channel*, int);
extern int (*IbvDestroyCq)(ibv_cq*);
Expand Down Expand Up @@ -1873,7 +1874,11 @@ TEST_F(RdmaTest, send_rpcs_with_user_defined_iobuf) {
google::protobuf::Closure* done = DoNothing();
::test::EchoService::Stub(&channel).Echo(&cntl[0], &req[0], &res[0], done);
bthread_id_join(cntl[0].call_id());
ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode());
if (rdma::FLAGS_rdma_send_zerocopy) {
ASSERT_EQ(ERDMAMEM, cntl[0].ErrorCode());
} else {
ASSERT_EQ(0, cntl[0].ErrorCode());
}
attach.clear();
sleep(2); // wait for client recover from EHOSTDOWN
cntl[0].Reset();
Expand Down