diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d283644b..576abdd5 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -57,19 +57,34 @@ set_target_properties(rdm_client PROPERTIES RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/rdm") +set(RMA_TEST ${PROJECT_SOURCE_DIR}/examples/rma) + +add_executable(rma_server ${RMA_TEST}/server.cc) +target_link_libraries(rma_server hpnl flatbuffers) +set_target_properties(rma_server + PROPERTIES + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/rma") + +add_executable(rma_client ${RMA_TEST}/client.cc) +target_link_libraries(rma_client hpnl flatbuffers) +set_target_properties(rma_client + PROPERTIES + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/rma") + + option(WITH_PMEM "enable PMoF code" OFF) if(WITH_PMEM) - set(PMEM_TEST ${PROJECT_SOURCE_DIR}/examples/pmem) + set(PMEM_TEST ${PROJECT_SOURCE_DIR}/examples/pmof) - add_executable(pmem_server ${PMEM_TEST}/server.cc) - target_link_libraries(pmem_server hpnl) - set_target_properties(pmem_server + add_executable(pmof_server ${PMEM_TEST}/server.cc) + target_link_libraries(pmof_server pmemobj hpnl) + set_target_properties(pmof_server PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/pmem") + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/pmof") - add_executable(pmem_client ${PMEM_TEST}/client.cc) - target_link_libraries(pmem_client pmemobj hpnl) - set_target_properties(pmem_client + add_executable(pmof_client ${PMEM_TEST}/client.cc) + target_link_libraries(pmof_client pmemobj hpnl) + set_target_properties(pmof_client PROPERTIES - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/pmem") + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/pmof") endif(WITH_PMEM) diff --git a/examples/connection/client.cc b/examples/connection/client.cc index 0242c240..38c77f75 100644 --- a/examples/connection/client.cc +++ b/examples/connection/client.cc @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "HPNL/Connection.h" -#include "HPNL/Client.h" -#include "HPNL/ChunkMgr.h" #include "HPNL/Callback.h" +#include "HPNL/ChunkMgr.h" +#include "HPNL/Client.h" +#include "HPNL/Connection.h" #include @@ -30,39 +30,41 @@ uint64_t start, end = 0; uint64_t timestamp_now() { - return std::chrono::high_resolution_clock::now().time_since_epoch() / std::chrono::milliseconds(1); + return std::chrono::high_resolution_clock::now().time_since_epoch() / + std::chrono::milliseconds(1); } class ShutdownCallback : public Callback { - public: - explicit ShutdownCallback(Client *_clt) : clt(_clt) {} - ~ShutdownCallback() override = default; - void operator()(void *param_1, void *param_2) override { - clt->shutdown(); - } - private: - Client *clt; + public: + explicit ShutdownCallback(Client* _clt) : clt(_clt) {} + ~ShutdownCallback() override = default; + void operator()(void* param_1, void* param_2) override { clt->shutdown(); } + + private: + Client* clt; }; class ConnectedCallback : public Callback { - public: - ConnectedCallback(Client *client_, ChunkMgr *bufMgr_) : client(client_), bufMgr(bufMgr_) {} - ~ConnectedCallback() override = default; - void operator()(void *param_1, void *param_2) override { - std::cout << "connected." << std::endl; - auto *con = (Connection*)param_1; - //client->shutdown(con); - } - private: - Client *client; - ChunkMgr *bufMgr; + public: + ConnectedCallback(Client* client_, ChunkMgr* bufMgr_) + : client(client_), bufMgr(bufMgr_) {} + ~ConnectedCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::cout << "connected." << std::endl; + auto* con = (Connection*)param_1; + // client->shutdown(con); + } + + private: + Client* client; + ChunkMgr* bufMgr; }; void connect() { auto client = new Client(1, 16); client->init(); - ChunkMgr *bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10); + ChunkMgr* bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10); client->set_buf_mgr(bufMgr); auto connectedCallback = new ConnectedCallback(client, bufMgr); @@ -84,7 +86,7 @@ void connect() { delete bufMgr; } -int main(int argc, char *argv[]) { - connect(); +int main(int argc, char* argv[]) { + connect(); return 0; } diff --git a/examples/connection/server.cc b/examples/connection/server.cc index 84b97853..f63170ce 100644 --- a/examples/connection/server.cc +++ b/examples/connection/server.cc @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include "HPNL/Callback.h" +#include "HPNL/ChunkMgr.h" #include "HPNL/Connection.h" #include "HPNL/Server.h" -#include "HPNL/ChunkMgr.h" -#include "HPNL/Callback.h" #include @@ -28,19 +28,19 @@ #define MAX_WORKERS 10 class ShutdownCallback : public Callback { - public: - ShutdownCallback() = default; - ~ShutdownCallback() override = default; - void operator()(void *param_1, void *param_2) override { - std::cout << "connection shutdown..." << std::endl; - } + public: + ShutdownCallback() = default; + ~ShutdownCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::cout << "connection shutdown..." << std::endl; + } }; -int main(int argc, char *argv[]) { +int main(int argc, char* argv[]) { auto server = new Server(1, 16); server->init(); - ChunkMgr *bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10); + ChunkMgr* bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10); server->set_buf_mgr(bufMgr); auto shutdownCallback = new ShutdownCallback(); diff --git a/examples/ping-pong/client.cc b/examples/ping-pong/client.cc index 7cdcc804..62f77062 100644 --- a/examples/ping-pong/client.cc +++ b/examples/ping-pong/client.cc @@ -15,17 +15,17 @@ // specific language governing permissions and limitations // under the License. -#include +#include -#include "HPNL/Connection.h" -#include "HPNL/Client.h" -#include "HPNL/ChunkMgr.h" #include "HPNL/Callback.h" +#include "HPNL/ChunkMgr.h" +#include "HPNL/Client.h" +#include "HPNL/Connection.h" #include #define MSG_SIZE 4096 -#define BUFFER_SIZE (65536*2) +#define BUFFER_SIZE (65536 * 2) #define BUFFER_NUM 128 int count = 0; @@ -33,84 +33,90 @@ uint64_t start, end = 0; std::mutex mtx; uint64_t timestamp_now() { - return std::chrono::high_resolution_clock::now().time_since_epoch() / std::chrono::milliseconds(1); + return std::chrono::high_resolution_clock::now().time_since_epoch() / + std::chrono::milliseconds(1); } class ShutdownCallback : public Callback { - public: - explicit ShutdownCallback(Client *_clt) : clt(_clt) {} - ~ShutdownCallback() override = default; - void operator()(void *param_1, void *param_2) override { - std::cout << "connection shutdown..." << std::endl; - clt->shutdown(); - } - private: - Client *clt; + public: + explicit ShutdownCallback(Client* _clt) : clt(_clt) {} + ~ShutdownCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::cout << "connection shutdown..." << std::endl; + clt->shutdown(); + } + + private: + Client* clt; }; class ConnectedCallback : public Callback { - public: - explicit ConnectedCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {} - ~ConnectedCallback() override = default; - void operator()(void *param_1, void *param_2) override { - auto con = (Connection*)param_1; - Chunk *ck = bufMgr->get(con); - ck->size = MSG_SIZE; - memset(ck->buffer, '0', MSG_SIZE); - con->send(ck); - } - private: - ChunkMgr *bufMgr; + public: + explicit ConnectedCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + ~ConnectedCallback() override = default; + void operator()(void* param_1, void* param_2) override { + auto con = static_cast(param_1); + Chunk* ck = bufMgr->get(con); + ck->size = MSG_SIZE; + memset(ck->buffer, '0', MSG_SIZE); + con->send(ck); + } + + private: + ChunkMgr* bufMgr; }; class RecvCallback : public Callback { - public: - RecvCallback(Client *client_, ChunkMgr *bufMgr_) : client(client_), bufMgr(bufMgr_) {} - ~RecvCallback() override = default; - void operator()(void *param_1, void *param_2) override { - std::lock_guard lk(mtx); - count++; - int mid = *(int*)param_1; - Chunk *ck = bufMgr->get(mid); - auto con = (Connection*)ck->con; - if (count >= 1000000) { - end = timestamp_now(); - printf("finished, totally consumes %f s, message round trip time is %f us.\n", (end-start)/1000.0, (end-start)*1000/1000000.0); - return; - } - if (count == 1) { - printf("start ping-pong.\n"); - } - if (count == 1) { - start = timestamp_now(); - } - ck->size = MSG_SIZE; - con->send(ck); + public: + RecvCallback(Client* client_, ChunkMgr* bufMgr_) : client(client_), bufMgr(bufMgr_) {} + ~RecvCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::lock_guard lk(mtx); + count++; + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + auto con = static_cast(ck->con); + if (count >= 1000000) { + end = timestamp_now(); + printf("finished, totally consumes %f s, message round trip time is %f us.\n", + (end - start) / 1000.0, (end - start) * 1000 / 1000000.0); + return; + } + if (count == 1) { + printf("start ping-pong.\n"); } - private: - Client *client; - ChunkMgr *bufMgr; + if (count == 1) { + start = timestamp_now(); + } + ck->size = MSG_SIZE; + con->send(ck); + } + + private: + Client* client; + ChunkMgr* bufMgr; }; class SendCallback : public Callback { - public: - explicit SendCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {} - ~SendCallback() override = default; - void operator()(void *param_1, void *param_2) override { - int mid = *(int*)param_1; - Chunk *ck = bufMgr->get(mid); - auto con = (Connection*)ck->con; - bufMgr->reclaim(ck, con); - } - private: - ChunkMgr *bufMgr; + public: + explicit SendCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + ~SendCallback() override = default; + void operator()(void* param_1, void* param_2) override { + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + auto con = static_cast(ck->con); + bufMgr->reclaim(ck, con); + } + + private: + ChunkMgr* bufMgr; }; -int main(int argc, char *argv[]) { +int main(int argc, char* argv[]) { auto client = new Client(1, 16); client->init(); - ChunkMgr *bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10); + ChunkMgr* bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10); client->set_buf_mgr(bufMgr); auto recvCallback = new RecvCallback(client, bufMgr); diff --git a/examples/ping-pong/server.cc b/examples/ping-pong/server.cc index 017d9e41..a09680fa 100644 --- a/examples/ping-pong/server.cc +++ b/examples/ping-pong/server.cc @@ -15,61 +15,62 @@ // specific language governing permissions and limitations // under the License. +#include "HPNL/Callback.h" +#include "HPNL/ChunkMgr.h" #include "HPNL/Connection.h" #include "HPNL/Server.h" -#include "HPNL/ChunkMgr.h" -#include "HPNL/Callback.h" #include #define MSG_SIZE 4096 -#define BUFFER_SIZE (65536*2) +#define BUFFER_SIZE (65536 * 2) #define BUFFER_NUM 128 class ShutdownCallback : public Callback { - public: - ShutdownCallback() = default; - ~ShutdownCallback() override = default; - void operator()(void *param_1, void *param_2) override { - std::cout << "connection shutdown..." << std::endl; - } + public: + ShutdownCallback() = default; + ~ShutdownCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::cout << "connection shutdown..." << std::endl; + } }; class RecvCallback : public Callback { - public: - explicit RecvCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {} - ~RecvCallback() override = default; - void operator()(void *param_1, void *param_2) override { - int mid = *(int*)param_1; - auto ck = bufMgr->get(mid); - ck->size = MSG_SIZE; - auto con = (Connection*)ck->con; - con->send(ck); - } - private: - ChunkMgr *bufMgr; + public: + explicit RecvCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + ~RecvCallback() override = default; + void operator()(void* param_1, void* param_2) override { + int mid = *static_cast(param_1); + auto ck = bufMgr->get(mid); + ck->size = MSG_SIZE; + auto con = static_cast(ck->con); + con->send(ck); + } + + private: + ChunkMgr* bufMgr; }; class SendCallback : public Callback { - public: - explicit SendCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {} - ~SendCallback() override = default; - void operator()(void *param_1, void *param_2) override { - int mid = *(int*)param_1; - Chunk *ck = bufMgr->get(mid); - auto con = (Connection*)ck->con; - bufMgr->reclaim(ck, con); - } - private: - ChunkMgr *bufMgr; -}; + public: + explicit SendCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + ~SendCallback() override = default; + void operator()(void* param_1, void* param_2) override { + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + auto con = static_cast(ck->con); + bufMgr->reclaim(ck, con); + } -int main(int argc, char *argv[]) { + private: + ChunkMgr* bufMgr; +}; +int main(int argc, char* argv[]) { auto server = new Server(1, 16); server->init(); - ChunkMgr *bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10); + ChunkMgr* bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10); server->set_buf_mgr(bufMgr); auto recvCallback = new RecvCallback(bufMgr); diff --git a/examples/pmem/client.cc b/examples/pmem/client.cc deleted file mode 100644 index 59bdca64..00000000 --- a/examples/pmem/client.cc +++ /dev/null @@ -1,155 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include - -#include "HPNL/Connection.h" -#include "HPNL/Client.h" -#include "HPNL/ChunkMgr.h" -#include "HPNL/Callback.h" - -#include - -#define SIZE 4096 -#define BUFFER_SIZE 65536 -#define BUFFER_NUM 65536 - -int count = 0; -long addr, rkey, len; -char rma_buf[4096]; - -uint64_t timestamp_now() { - return std::chrono::high_resolution_clock::now().time_since_epoch() / std::chrono::milliseconds(1); -} - -class ShutdownCallback : public Callback { - public: - ShutdownCallback(Client *_clt) : clt(_clt) {} - virtual ~ShutdownCallback() {} - virtual void operator()(void *param_1, void *param_2) override { - std::cout << "connection shutdown..." << std::endl; - clt->shutdown(); - } - private: - Client *clt; -}; - -class ConnectedCallback : public Callback { - public: - ConnectedCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {} - virtual ~ConnectedCallback() {} - virtual void operator()(void *param_1, void *param_2) override { - Connection *con = (Connection*)param_1; - char* buffer = (char*)std::malloc(SIZE); - memset(buffer, '0', SIZE); - con->sendBuf(buffer, SIZE); - std::free(buffer); - } - private: - ChunkMgr *bufMgr; -}; - -class RecvCallback : public Callback { - public: - RecvCallback(ChunkMgr *bufMgr_, Client *client_) : bufMgr(bufMgr_), client(client_) {} - virtual ~RecvCallback() {} - virtual void operator()(void *param_1, void *param_2) override { - int mid = *(int*)param_1; - Chunk *ck = bufMgr->get(mid); - - char* buffer = (char*)std::malloc(SIZE); - memset(buffer, '0', SIZE); - Connection *con = (Connection*)ck->con; - - if (count == 0) { - char* buf = (char*)ck->buffer; - addr = atol(buf); - con->sendBuf(buffer, SIZE); - } else if (count == 1){ - char* buf = (char*)ck->buffer; - rkey = atol(buf); - con->sendBuf(buffer, SIZE); - } else { - char* buf = (char*)ck->buffer; - len = atol(buf); - client->reg_rma_buffer(rma_buf, 4096, 0); - con->read(0, 0, len, addr, rkey); - } - count++; - } - private: - ChunkMgr *bufMgr; - Client *client; -}; - -class SendCallback : public Callback { - public: - SendCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {} - virtual ~SendCallback() {} - virtual void operator()(void *param_1, void *param_2) override { - int mid = *(int*)param_1; - Chunk *ck = bufMgr->get(mid); - Connection *con = (Connection*)ck->con; - con->activate_send_chunk(ck); - } - private: - ChunkMgr *bufMgr; -}; - -class ReadCallback : public Callback { - public: - ReadCallback() {} - virtual ~ReadCallback() {} - virtual void operator()(void *param_1, void *param_2) override { - std::cout << "rma buffer " << rma_buf << std::endl; - } -}; - -int main(int argc, char *argv[]) { - Client *client = new Client(1, 16); - ChunkMgr *bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10); - - client->init(); - client->set_buf_mgr(bufMgr); - - RecvCallback *recvCallback = new RecvCallback(bufMgr, client); - SendCallback *sendCallback = new SendCallback(bufMgr); - ReadCallback *readCallback = new ReadCallback(); - ConnectedCallback *connectedCallback = new ConnectedCallback(bufMgr); - ShutdownCallback *shutdownCallback = new ShutdownCallback(client); - - client->set_recv_callback(recvCallback); - client->set_send_callback(sendCallback); - client->set_read_callback(readCallback); - client->set_connected_callback(connectedCallback); - client->set_shutdown_callback(shutdownCallback); - - client->start(); - client->connect("172.168.0.40", "123456"); - - client->wait(); - - delete shutdownCallback; - delete connectedCallback; - delete sendCallback; - delete recvCallback; - delete client; - delete bufMgr; - - return 0; -} diff --git a/examples/pmem/server.cc b/examples/pmem/server.cc deleted file mode 100644 index 94c6b33b..00000000 --- a/examples/pmem/server.cc +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "HPNL/Connection.h" -#include "HPNL/Server.h" -#include "HPNL/ChunkMgr.h" -#include "HPNL/Callback.h" -#include -#include -#include -#include -#ifndef _WIN32 -#include -#endif -#include -#include -#include - -#include - -/* size of the pmemobj pool -- 1 GB */ -#define POOL_SIZE (1024*1024*1024L) -#define MAX_BUF_LEN 20 - -/* name of our layout in the pool */ -#define LAYOUT_NAME "pmem_spark_shuffle" - -#define BUFFER_SIZE 65536 -#define BUFFER_NUM 65536 - -int count = 0; -std::string local_addr, local_rkey, local_len; - -struct my_root { - size_t len; - char buf[MAX_BUF_LEN]; -}; - -class ShutdownCallback : public Callback { - public: - ShutdownCallback() {} - virtual ~ShutdownCallback() {} - virtual void operator()(void *param_1, void *param_2) override { - std::cout << "connection shutdown..." << std::endl; - } -}; - -class RecvCallback : public Callback { - public: - RecvCallback(ChunkMgr *bufMgr_, Server *server_) : bufMgr(bufMgr_), server(server_) { - const char path[] = "/dev/dax0.0"; - /* create the pmemobj pool or open it if it already exists */ - pop = pmemobj_open(path, LAYOUT_NAME); - if (pop == NULL) { - pop = pmemobj_create(path, LAYOUT_NAME, POOL_SIZE, S_IRUSR | S_IWUSR); - } - - if (pop == NULL) { - exit(1); - } - - PMEMoid root = pmemobj_root(pop, sizeof(struct my_root)); - rootp = (struct my_root*)pmemobj_direct(root); - - struct my_root *data_tmp = (struct my_root*)((uintptr_t)pop+root.off); - - char buf[MAX_BUF_LEN] = "hello world"; - rootp->len = strlen("hello world"); - - pmemobj_persist(pop, &rootp->len, sizeof(rootp->len)); - pmemobj_memcpy_persist(pop, rootp->buf, buf, rootp->len); - } - virtual ~RecvCallback() { - pmemobj_close(pop); - } - virtual void operator()(void *param_1, void *param_2) override { - int mid = *(int*)param_1; - Chunk *ck = bufMgr->get(mid); - Connection *con = (Connection*)ck->con; - - if (count == 0) { - local_addr = std::to_string((long)rootp->buf); - con->sendBuf(local_addr.c_str(), local_addr.length()); - } else if (count == 1){ - rkey = server->reg_rma_buffer((char*)pop, POOL_SIZE, 0); - local_rkey = std::to_string(rkey); - con->sendBuf(local_rkey.c_str(), local_rkey.length()); - } else { - local_len = std::to_string(rootp->len); - con->sendBuf(local_len.c_str(), local_len.length()); - } - count++; - } - private: - ChunkMgr *bufMgr; - Server *server; - PMEMobjpool *pop; - struct my_root *rootp; - uint64_t rkey; -}; - -class SendCallback : public Callback { - public: - SendCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {} - virtual ~SendCallback() {} - virtual void operator()(void *param_1, void *param_2) override { - int mid = *(int*)param_1; - Chunk *ck = bufMgr->get(mid); - Connection *con = (Connection*)ck->con; - con->activate_send_chunk(ck); - } - private: - ChunkMgr *bufMgr; -}; - -int main(int argc, char *argv[]) { - Server *server = new Server(1, 16); - ChunkMgr *bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10); - - server->init(); - server->set_buf_mgr(bufMgr); - - RecvCallback *recvCallback = new RecvCallback(bufMgr, server); - SendCallback *sendCallback = new SendCallback(bufMgr); - ShutdownCallback *shutdownCallback = new ShutdownCallback(); - - server->set_recv_callback(recvCallback); - server->set_send_callback(sendCallback); - server->set_connected_callback(NULL); - server->set_shutdown_callback(shutdownCallback); - - server->start(); - server->listen("172.168.0.40", "123456"); - - server->wait(); - - delete sendCallback; - delete recvCallback; - delete server; - delete bufMgr; - - return 0; -} diff --git a/examples/pmof/client.cc b/examples/pmof/client.cc new file mode 100644 index 00000000..eaf38a93 --- /dev/null +++ b/examples/pmof/client.cc @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "HPNL/Callback.h" +#include "HPNL/ChunkMgr.h" +#include "HPNL/Client.h" +#include "HPNL/Connection.h" + +#include + +#include "flatbuffers/flatbuffers.h" +#include "format_generated.h" + +#define MSG_SIZE 4096 +#define BUFFER_SIZE (65536 * 2) +#define BUFFER_NUM 128 + +int count = 0; +uint64_t start, end = 0; +std::mutex mtx; + +char* rma_buffer = nullptr; +uint64_t rkey = 0; +uint64_t remote_buffer = 0; +uint64_t remote_rkey = 0; + +uint64_t timestamp_now() { + return std::chrono::high_resolution_clock::now().time_since_epoch() / + std::chrono::milliseconds(1); +} + +class ShutdownCallback : public Callback { + public: + explicit ShutdownCallback(Client* _clt) : clt(_clt) {} + ~ShutdownCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::cout << "connection shutdown..." << std::endl; + clt->shutdown(); + } + + private: + Client* clt; +}; + +class ConnectedCallback : public Callback { + public: + explicit ConnectedCallback(Client* client_, ChunkMgr* bufMgr_) + : client(client_), bufMgr(bufMgr_) {} + ~ConnectedCallback() override = default; + void operator()(void* param_1, void* param_2) override { + rma_buffer = static_cast(std::malloc(4096)); + memset(rma_buffer, '0', 4096); + rkey = client->reg_rma_buffer(rma_buffer, 4096, 0); + + auto con = static_cast(param_1); + Chunk* ck = bufMgr->get(con); + ck->size = MSG_SIZE; + memset(ck->buffer, '0', MSG_SIZE); + con->send(ck); + } + + private: + Client* client; + ChunkMgr* bufMgr; +}; + +class RecvCallback : public Callback { + public: + RecvCallback(Client* client_, ChunkMgr* bufMgr_) : client(client_), bufMgr(bufMgr_) {} + ~RecvCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::lock_guard lk(mtx); + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + + flatbuffers::FlatBufferBuilder builder; + builder.PushFlatBuffer(static_cast(ck->buffer), ck->size); + auto msg = flatbuffers::GetRoot(builder.GetBufferPointer()); + + auto con = static_cast(ck->con); + remote_buffer = msg->buffer(); + remote_rkey = msg->rkey(); + con->read(0, 0, 4096, msg->buffer(), msg->rkey()); + } + + private: + Client* client; + ChunkMgr* bufMgr; +}; + +class SendCallback : public Callback { + public: + explicit SendCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + ~SendCallback() override = default; + void operator()(void* param_1, void* param_2) override { + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + auto con = static_cast(ck->con); + bufMgr->reclaim(ck, con); + } + + private: + ChunkMgr* bufMgr; +}; + +int main(int argc, char* argv[]) { + auto client = new Client(1, 16); + client->init(); + + ChunkMgr* bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10); + client->set_buf_mgr(bufMgr); + + auto recvCallback = new RecvCallback(client, bufMgr); + auto sendCallback = new SendCallback(bufMgr); + auto connectedCallback = new ConnectedCallback(client, bufMgr); + auto shutdownCallback = new ShutdownCallback(client); + + client->set_recv_callback(recvCallback); + client->set_send_callback(sendCallback); + client->set_connected_callback(connectedCallback); + client->set_shutdown_callback(shutdownCallback); + + client->start(); + client->connect("172.168.2.106", "12345"); + + client->wait(); + + delete shutdownCallback; + delete connectedCallback; + delete sendCallback; + delete recvCallback; + delete client; + delete bufMgr; + return 0; +} diff --git a/examples/pmof/format.fbs b/examples/pmof/format.fbs new file mode 100644 index 00000000..61caa5e0 --- /dev/null +++ b/examples/pmof/format.fbs @@ -0,0 +1,7 @@ +table rma_msg { + buffer: ulong; + size: short; + id: short; + rkey: ulong; +} + diff --git a/examples/pmof/format_generated.h b/examples/pmof/format_generated.h new file mode 100644 index 00000000..256f8e1e --- /dev/null +++ b/examples/pmof/format_generated.h @@ -0,0 +1,81 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +#ifndef FLATBUFFERS_GENERATED_FORMAT_H_ +#define FLATBUFFERS_GENERATED_FORMAT_H_ + +#include "flatbuffers/flatbuffers.h" + +struct rma_msg; + +struct rma_msg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_BUFFER = 4, + VT_SIZE = 6, + VT_ID = 8, + VT_RKEY = 10 + }; + uint64_t buffer() const { + return GetField(VT_BUFFER, 0); + } + int16_t size() const { + return GetField(VT_SIZE, 0); + } + int16_t id() const { + return GetField(VT_ID, 0); + } + uint64_t rkey() const { + return GetField(VT_RKEY, 0); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_BUFFER) && + VerifyField(verifier, VT_SIZE) && + VerifyField(verifier, VT_ID) && + VerifyField(verifier, VT_RKEY) && + verifier.EndTable(); + } +}; + +struct rma_msgBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_buffer(uint64_t buffer) { + fbb_.AddElement(rma_msg::VT_BUFFER, buffer, 0); + } + void add_size(int16_t size) { + fbb_.AddElement(rma_msg::VT_SIZE, size, 0); + } + void add_id(int16_t id) { + fbb_.AddElement(rma_msg::VT_ID, id, 0); + } + void add_rkey(uint64_t rkey) { + fbb_.AddElement(rma_msg::VT_RKEY, rkey, 0); + } + explicit rma_msgBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + rma_msgBuilder &operator=(const rma_msgBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset Createrma_msg( + flatbuffers::FlatBufferBuilder &_fbb, + uint64_t buffer = 0, + int16_t size = 0, + int16_t id = 0, + uint64_t rkey = 0) { + rma_msgBuilder builder_(_fbb); + builder_.add_rkey(rkey); + builder_.add_buffer(buffer); + builder_.add_id(id); + builder_.add_size(size); + return builder_.Finish(); +} + +#endif // FLATBUFFERS_GENERATED_FORMAT_H_ diff --git a/examples/pmof/server.cc b/examples/pmof/server.cc new file mode 100644 index 00000000..cb2c2245 --- /dev/null +++ b/examples/pmof/server.cc @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "HPNL/Callback.h" +#include "HPNL/ChunkMgr.h" +#include "HPNL/Connection.h" +#include "HPNL/Server.h" + +#include +#include +#include +#include + +#include "format_generated.h" + +/* size of the pmemobj pool -- 1 GB */ +#define POOL_SIZE (1024 * 1024 * 1024L) +#define MAX_BUF_LEN 20 + +/* name of our layout in the pool */ +#define LAYOUT_NAME "pmem_spark_shuffle" + +#define MSG_SIZE 4096 +#define BUFFER_SIZE (65536 * 2) +#define BUFFER_NUM 128 + +struct my_root { + size_t len; + char buf[MAX_BUF_LEN]; +}; + +class ShutdownCallback : public Callback { + public: + ShutdownCallback() = default; + ~ShutdownCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::cout << "connection shutdown..." << std::endl; + } +}; + +class RecvCallback : public Callback { + public: + explicit RecvCallback(Server* server_, ChunkMgr* bufMgr_) + : server(server_), bufMgr(bufMgr_) {} + ~RecvCallback() override = default; + void operator()(void* param_1, void* param_2) override { + const char path[] = "/dev/dax0.0"; + /* create the pmemobj pool or open it if it already exists */ + pop = pmemobj_open(path, LAYOUT_NAME); + if (pop == nullptr) { + pop = pmemobj_create(path, LAYOUT_NAME, POOL_SIZE, S_IRUSR | S_IWUSR); + } + + if (pop == nullptr) { + exit(1); + } + + PMEMoid root = pmemobj_root(pop, sizeof(struct my_root)); + rootp = static_cast(pmemobj_direct(root)); + + char buf[MAX_BUF_LEN] = "hello world"; + rootp->len = strlen("hello world"); + + pmemobj_persist(pop, &rootp->len, sizeof(rootp->len)); + pmemobj_memcpy_persist(pop, rootp->buf, buf, rootp->len); + + rkey = server->reg_rma_buffer(reinterpret_cast(pop), POOL_SIZE, 0); + + int mid = *static_cast(param_1); + auto ck = bufMgr->get(mid); + ck->size = MSG_SIZE; + auto con = static_cast(ck->con); + + flatbuffers::FlatBufferBuilder builder; + auto msg = + Createrma_msg(builder, reinterpret_cast(pop), rootp->len, 0, rkey); + builder.Finish(msg); + uint8_t* pmof_buf = builder.GetBufferPointer(); + + memcpy(ck->buffer, pmof_buf, builder.GetSize()); + ck->size = builder.GetSize(); + con->send(ck); + } + + private: + Server* server; + ChunkMgr* bufMgr; + PMEMobjpool* pop; + struct my_root* rootp; + uint64_t rkey; +}; + +class SendCallback : public Callback { + public: + explicit SendCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + ~SendCallback() override = default; + void operator()(void* param_1, void* param_2) override { + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + auto con = static_cast(ck->con); + bufMgr->reclaim(ck, con); + } + + private: + ChunkMgr* bufMgr; +}; + +int main(int argc, char* argv[]) { + auto server = new Server(1, 16); + server->init(); + + ChunkMgr* bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10); + server->set_buf_mgr(bufMgr); + + auto recvCallback = new RecvCallback(server, bufMgr); + auto sendCallback = new SendCallback(bufMgr); + auto shutdownCallback = new ShutdownCallback(); + server->set_recv_callback(recvCallback); + server->set_send_callback(sendCallback); + server->set_connected_callback(nullptr); + server->set_shutdown_callback(shutdownCallback); + + server->start(); + server->listen("172.168.2.106", "12345"); + + server->wait(); + + delete recvCallback; + delete sendCallback; + delete shutdownCallback; + delete server; + delete bufMgr; + return 0; +} diff --git a/examples/rma/client.cc b/examples/rma/client.cc new file mode 100644 index 00000000..ff869f07 --- /dev/null +++ b/examples/rma/client.cc @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "HPNL/Callback.h" +#include "HPNL/ChunkMgr.h" +#include "HPNL/Client.h" +#include "HPNL/Connection.h" + +#include + +#include "flatbuffers/flatbuffers.h" +#include "format_generated.h" + +#define MSG_SIZE 4096 +#define BUFFER_SIZE (65536 * 2) +#define BUFFER_NUM 128 + +int count = 0; +uint64_t start, end = 0; +std::mutex mtx; + +char* rma_buffer = nullptr; +uint64_t rkey = 0; +uint64_t remote_buffer = 0; +uint64_t remote_rkey = 0; + +uint64_t timestamp_now() { + return std::chrono::high_resolution_clock::now().time_since_epoch() / + std::chrono::milliseconds(1); +} + +class ShutdownCallback : public Callback { + public: + explicit ShutdownCallback(Client* _clt) : clt(_clt) {} + ~ShutdownCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::cout << "connection shutdown..." << std::endl; + clt->shutdown(); + } + + private: + Client* clt; +}; + +class ConnectedCallback : public Callback { + public: + explicit ConnectedCallback(Client* client_, ChunkMgr* bufMgr_) + : client(client_), bufMgr(bufMgr_) {} + ~ConnectedCallback() override = default; + void operator()(void* param_1, void* param_2) override { + rma_buffer = static_cast(std::malloc(4096)); + memset(rma_buffer, '0', 4096); + rkey = client->reg_rma_buffer(rma_buffer, 4096, 0); + + auto con = static_cast(param_1); + Chunk* ck = bufMgr->get(con); + ck->size = MSG_SIZE; + memset(ck->buffer, '0', MSG_SIZE); + con->send(ck); + } + + private: + Client* client; + ChunkMgr* bufMgr; +}; + +class RecvCallback : public Callback { + public: + RecvCallback(Client* client_, ChunkMgr* bufMgr_) : client(client_), bufMgr(bufMgr_) {} + ~RecvCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::lock_guard lk(mtx); + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + + flatbuffers::FlatBufferBuilder builder; + builder.PushFlatBuffer(static_cast(ck->buffer), ck->size); + auto msg = flatbuffers::GetRoot(builder.GetBufferPointer()); + + auto con = static_cast(ck->con); + remote_buffer = msg->buffer(); + remote_rkey = msg->rkey(); + con->read(0, 0, 4096, msg->buffer(), msg->rkey()); + } + + private: + Client* client; + ChunkMgr* bufMgr; +}; + +class SendCallback : public Callback { + public: + explicit SendCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + ~SendCallback() override = default; + void operator()(void* param_1, void* param_2) override { + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + auto con = static_cast(ck->con); + bufMgr->reclaim(ck, con); + } + + private: + ChunkMgr* bufMgr; +}; + +class ReadCallback : public Callback { + public: + explicit ReadCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + void operator()(void* param_1, void* param_2) override { + count++; + if (count >= 10000000) { + end = timestamp_now(); + printf("finished, totally consumes %f s, message round trip time is %f us.\n", + (end - start) / 1000.0, (end - start) * 1000 / 1000000.0); + return; + } + if (count == 1) { + printf("start ping-pong.\n"); + } + if (count == 1) { + start = timestamp_now(); + } + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + auto con = static_cast(ck->con); + con->read(0, 0, 4096, remote_buffer, remote_rkey); + } + + private: + ChunkMgr* bufMgr; +}; + +int main(int argc, char* argv[]) { + auto client = new Client(1, 16); + client->init(); + + ChunkMgr* bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10); + client->set_buf_mgr(bufMgr); + + auto recvCallback = new RecvCallback(client, bufMgr); + auto sendCallback = new SendCallback(bufMgr); + auto readCallback = new ReadCallback(bufMgr); + auto connectedCallback = new ConnectedCallback(client, bufMgr); + auto shutdownCallback = new ShutdownCallback(client); + + client->set_recv_callback(recvCallback); + client->set_send_callback(sendCallback); + client->set_read_callback(readCallback); + client->set_connected_callback(connectedCallback); + client->set_shutdown_callback(shutdownCallback); + + client->start(); + client->connect("172.168.2.106", "12345"); + + client->wait(); + + delete shutdownCallback; + delete connectedCallback; + delete sendCallback; + delete recvCallback; + delete client; + delete bufMgr; + return 0; +} diff --git a/examples/rma/format.fbs b/examples/rma/format.fbs new file mode 100644 index 00000000..61caa5e0 --- /dev/null +++ b/examples/rma/format.fbs @@ -0,0 +1,7 @@ +table rma_msg { + buffer: ulong; + size: short; + id: short; + rkey: ulong; +} + diff --git a/examples/rma/format_generated.h b/examples/rma/format_generated.h new file mode 100644 index 00000000..256f8e1e --- /dev/null +++ b/examples/rma/format_generated.h @@ -0,0 +1,81 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +#ifndef FLATBUFFERS_GENERATED_FORMAT_H_ +#define FLATBUFFERS_GENERATED_FORMAT_H_ + +#include "flatbuffers/flatbuffers.h" + +struct rma_msg; + +struct rma_msg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_BUFFER = 4, + VT_SIZE = 6, + VT_ID = 8, + VT_RKEY = 10 + }; + uint64_t buffer() const { + return GetField(VT_BUFFER, 0); + } + int16_t size() const { + return GetField(VT_SIZE, 0); + } + int16_t id() const { + return GetField(VT_ID, 0); + } + uint64_t rkey() const { + return GetField(VT_RKEY, 0); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_BUFFER) && + VerifyField(verifier, VT_SIZE) && + VerifyField(verifier, VT_ID) && + VerifyField(verifier, VT_RKEY) && + verifier.EndTable(); + } +}; + +struct rma_msgBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_buffer(uint64_t buffer) { + fbb_.AddElement(rma_msg::VT_BUFFER, buffer, 0); + } + void add_size(int16_t size) { + fbb_.AddElement(rma_msg::VT_SIZE, size, 0); + } + void add_id(int16_t id) { + fbb_.AddElement(rma_msg::VT_ID, id, 0); + } + void add_rkey(uint64_t rkey) { + fbb_.AddElement(rma_msg::VT_RKEY, rkey, 0); + } + explicit rma_msgBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + rma_msgBuilder &operator=(const rma_msgBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset Createrma_msg( + flatbuffers::FlatBufferBuilder &_fbb, + uint64_t buffer = 0, + int16_t size = 0, + int16_t id = 0, + uint64_t rkey = 0) { + rma_msgBuilder builder_(_fbb); + builder_.add_rkey(rkey); + builder_.add_buffer(buffer); + builder_.add_id(id); + builder_.add_size(size); + return builder_.Finish(); +} + +#endif // FLATBUFFERS_GENERATED_FORMAT_H_ diff --git a/examples/rma/server.cc b/examples/rma/server.cc new file mode 100644 index 00000000..834f85d2 --- /dev/null +++ b/examples/rma/server.cc @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "HPNL/Callback.h" +#include "HPNL/ChunkMgr.h" +#include "HPNL/Connection.h" +#include "HPNL/Server.h" + +#include +#include + +#include "format_generated.h" + +#define MSG_SIZE 4096 +#define BUFFER_SIZE (65536 * 2) +#define BUFFER_NUM 128 + +char rma_buffer[4096]; +uint64_t rkey = 0; + +class ShutdownCallback : public Callback { + public: + ShutdownCallback() = default; + ~ShutdownCallback() override = default; + void operator()(void* param_1, void* param_2) override { + std::cout << "connection shutdown..." << std::endl; + } +}; + +class RecvCallback : public Callback { + public: + explicit RecvCallback(Server* server_, ChunkMgr* bufMgr_) + : server(server_), bufMgr(bufMgr_) {} + ~RecvCallback() override = default; + void operator()(void* param_1, void* param_2) override { + memset(rma_buffer, '0', 4096); + rkey = server->reg_rma_buffer(rma_buffer, 4096, 0); + + int mid = *static_cast(param_1); + auto ck = bufMgr->get(mid); + ck->size = MSG_SIZE; + auto con = static_cast(ck->con); + + flatbuffers::FlatBufferBuilder builder; + auto msg = + Createrma_msg(builder, reinterpret_cast(rma_buffer), 4096, 0, rkey); + builder.Finish(msg); + uint8_t* buf = builder.GetBufferPointer(); + + memcpy(ck->buffer, buf, builder.GetSize()); + ck->size = builder.GetSize(); + con->send(ck); + } + + private: + Server* server; + ChunkMgr* bufMgr; +}; + +class SendCallback : public Callback { + public: + explicit SendCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {} + ~SendCallback() override = default; + void operator()(void* param_1, void* param_2) override { + int mid = *static_cast(param_1); + Chunk* ck = bufMgr->get(mid); + auto con = static_cast(ck->con); + bufMgr->reclaim(ck, con); + } + + private: + ChunkMgr* bufMgr; +}; + +int main(int argc, char* argv[]) { + auto server = new Server(1, 16); + server->init(); + + ChunkMgr* bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10); + server->set_buf_mgr(bufMgr); + + auto recvCallback = new RecvCallback(server, bufMgr); + auto sendCallback = new SendCallback(bufMgr); + auto shutdownCallback = new ShutdownCallback(); + server->set_recv_callback(recvCallback); + server->set_send_callback(sendCallback); + server->set_connected_callback(nullptr); + server->set_shutdown_callback(shutdownCallback); + + server->start(); + server->listen("172.168.2.106", "12345"); + + server->wait(); + + delete recvCallback; + delete sendCallback; + delete shutdownCallback; + delete server; + delete bufMgr; + return 0; +} diff --git a/src/demultiplexer/RdmCqDemultiplexer.cc b/src/demultiplexer/RdmCqDemultiplexer.cc index 4dbff874..882a939b 100644 --- a/src/demultiplexer/RdmCqDemultiplexer.cc +++ b/src/demultiplexer/RdmCqDemultiplexer.cc @@ -95,7 +95,7 @@ int RdmCqDemultiplexer::wait_event() { if (con->get_recv_callback()) { (*con->get_recv_callback())(&ck->buffer_id, &entry.len); } - con->activate_recv_chunk(); + con->activate_recv_chunk(ck); } else if (entry.flags & FI_SEND) { fi_context2* ctx = (fi_context2*)entry.op_context; Chunk* ck = (Chunk*)ctx->internal[4];