Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
add remote memory access example
Browse files Browse the repository at this point in the history
Signed-off-by: Haodong Tang <[email protected]>
  • Loading branch information
haodong committed Jul 16, 2019
1 parent 8eb947d commit 0b933de
Show file tree
Hide file tree
Showing 16 changed files with 940 additions and 458 deletions.
33 changes: 24 additions & 9 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,34 @@ set_target_properties(rdm_client
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/rdm")

set(RMA_TEST ${PROJECT_SOURCE_DIR}/examples/rma)

add_executable(rma_server ${RMA_TEST}/server.cc)
target_link_libraries(rma_server hpnl flatbuffers)
set_target_properties(rma_server
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/rma")

add_executable(rma_client ${RMA_TEST}/client.cc)
target_link_libraries(rma_client hpnl flatbuffers)
set_target_properties(rma_client
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/rma")


option(WITH_PMEM "enable PMoF code" OFF)
if(WITH_PMEM)
set(PMEM_TEST ${PROJECT_SOURCE_DIR}/examples/pmem)
set(PMEM_TEST ${PROJECT_SOURCE_DIR}/examples/pmof)

add_executable(pmem_server ${PMEM_TEST}/server.cc)
target_link_libraries(pmem_server hpnl)
set_target_properties(pmem_server
add_executable(pmof_server ${PMEM_TEST}/server.cc)
target_link_libraries(pmof_server pmemobj hpnl)
set_target_properties(pmof_server
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/pmem")
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/pmof")

add_executable(pmem_client ${PMEM_TEST}/client.cc)
target_link_libraries(pmem_client pmemobj hpnl)
set_target_properties(pmem_client
add_executable(pmof_client ${PMEM_TEST}/client.cc)
target_link_libraries(pmof_client pmemobj hpnl)
set_target_properties(pmof_client
PROPERTIES
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/pmem")
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/bin/pmof")
endif(WITH_PMEM)
54 changes: 28 additions & 26 deletions examples/connection/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

#include "HPNL/Connection.h"
#include "HPNL/Client.h"
#include "HPNL/ChunkMgr.h"
#include "HPNL/Callback.h"
#include "HPNL/ChunkMgr.h"
#include "HPNL/Client.h"
#include "HPNL/Connection.h"

#include <iostream>

Expand All @@ -30,39 +30,41 @@
uint64_t start, end = 0;

uint64_t timestamp_now() {
return std::chrono::high_resolution_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
return std::chrono::high_resolution_clock::now().time_since_epoch() /
std::chrono::milliseconds(1);
}

class ShutdownCallback : public Callback {
public:
explicit ShutdownCallback(Client *_clt) : clt(_clt) {}
~ShutdownCallback() override = default;
void operator()(void *param_1, void *param_2) override {
clt->shutdown();
}
private:
Client *clt;
public:
explicit ShutdownCallback(Client* _clt) : clt(_clt) {}
~ShutdownCallback() override = default;
void operator()(void* param_1, void* param_2) override { clt->shutdown(); }

private:
Client* clt;
};

class ConnectedCallback : public Callback {
public:
ConnectedCallback(Client *client_, ChunkMgr *bufMgr_) : client(client_), bufMgr(bufMgr_) {}
~ConnectedCallback() override = default;
void operator()(void *param_1, void *param_2) override {
std::cout << "connected." << std::endl;
auto *con = (Connection*)param_1;
//client->shutdown(con);
}
private:
Client *client;
ChunkMgr *bufMgr;
public:
ConnectedCallback(Client* client_, ChunkMgr* bufMgr_)
: client(client_), bufMgr(bufMgr_) {}
~ConnectedCallback() override = default;
void operator()(void* param_1, void* param_2) override {
std::cout << "connected." << std::endl;
auto* con = (Connection*)param_1;
// client->shutdown(con);
}

private:
Client* client;
ChunkMgr* bufMgr;
};

void connect() {
auto client = new Client(1, 16);
client->init();

ChunkMgr *bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10);
ChunkMgr* bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10);
client->set_buf_mgr(bufMgr);

auto connectedCallback = new ConnectedCallback(client, bufMgr);
Expand All @@ -84,7 +86,7 @@ void connect() {
delete bufMgr;
}

int main(int argc, char *argv[]) {
connect();
int main(int argc, char* argv[]) {
connect();
return 0;
}
20 changes: 10 additions & 10 deletions examples/connection/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

#include "HPNL/Callback.h"
#include "HPNL/ChunkMgr.h"
#include "HPNL/Connection.h"
#include "HPNL/Server.h"
#include "HPNL/ChunkMgr.h"
#include "HPNL/Callback.h"

#include <iostream>

Expand All @@ -28,19 +28,19 @@
#define MAX_WORKERS 10

class ShutdownCallback : public Callback {
public:
ShutdownCallback() = default;
~ShutdownCallback() override = default;
void operator()(void *param_1, void *param_2) override {
std::cout << "connection shutdown..." << std::endl;
}
public:
ShutdownCallback() = default;
~ShutdownCallback() override = default;
void operator()(void* param_1, void* param_2) override {
std::cout << "connection shutdown..." << std::endl;
}
};

int main(int argc, char *argv[]) {
int main(int argc, char* argv[]) {
auto server = new Server(1, 16);
server->init();

ChunkMgr *bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10);
ChunkMgr* bufMgr = new ChunkPool(server, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10);
server->set_buf_mgr(bufMgr);

auto shutdownCallback = new ShutdownCallback();
Expand Down
136 changes: 71 additions & 65 deletions examples/ping-pong/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,102 +15,108 @@
// specific language governing permissions and limitations
// under the License.

#include <string.h>
#include <cstring>

#include "HPNL/Connection.h"
#include "HPNL/Client.h"
#include "HPNL/ChunkMgr.h"
#include "HPNL/Callback.h"
#include "HPNL/ChunkMgr.h"
#include "HPNL/Client.h"
#include "HPNL/Connection.h"

#include <iostream>

#define MSG_SIZE 4096
#define BUFFER_SIZE (65536*2)
#define BUFFER_SIZE (65536 * 2)
#define BUFFER_NUM 128

int count = 0;
uint64_t start, end = 0;
std::mutex mtx;

uint64_t timestamp_now() {
return std::chrono::high_resolution_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
return std::chrono::high_resolution_clock::now().time_since_epoch() /
std::chrono::milliseconds(1);
}

class ShutdownCallback : public Callback {
public:
explicit ShutdownCallback(Client *_clt) : clt(_clt) {}
~ShutdownCallback() override = default;
void operator()(void *param_1, void *param_2) override {
std::cout << "connection shutdown..." << std::endl;
clt->shutdown();
}
private:
Client *clt;
public:
explicit ShutdownCallback(Client* _clt) : clt(_clt) {}
~ShutdownCallback() override = default;
void operator()(void* param_1, void* param_2) override {
std::cout << "connection shutdown..." << std::endl;
clt->shutdown();
}

private:
Client* clt;
};

class ConnectedCallback : public Callback {
public:
explicit ConnectedCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {}
~ConnectedCallback() override = default;
void operator()(void *param_1, void *param_2) override {
auto con = (Connection*)param_1;
Chunk *ck = bufMgr->get(con);
ck->size = MSG_SIZE;
memset(ck->buffer, '0', MSG_SIZE);
con->send(ck);
}
private:
ChunkMgr *bufMgr;
public:
explicit ConnectedCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {}
~ConnectedCallback() override = default;
void operator()(void* param_1, void* param_2) override {
auto con = static_cast<Connection*>(param_1);
Chunk* ck = bufMgr->get(con);
ck->size = MSG_SIZE;
memset(ck->buffer, '0', MSG_SIZE);
con->send(ck);
}

private:
ChunkMgr* bufMgr;
};

class RecvCallback : public Callback {
public:
RecvCallback(Client *client_, ChunkMgr *bufMgr_) : client(client_), bufMgr(bufMgr_) {}
~RecvCallback() override = default;
void operator()(void *param_1, void *param_2) override {
std::lock_guard<std::mutex> lk(mtx);
count++;
int mid = *(int*)param_1;
Chunk *ck = bufMgr->get(mid);
auto con = (Connection*)ck->con;
if (count >= 1000000) {
end = timestamp_now();
printf("finished, totally consumes %f s, message round trip time is %f us.\n", (end-start)/1000.0, (end-start)*1000/1000000.0);
return;
}
if (count == 1) {
printf("start ping-pong.\n");
}
if (count == 1) {
start = timestamp_now();
}
ck->size = MSG_SIZE;
con->send(ck);
public:
RecvCallback(Client* client_, ChunkMgr* bufMgr_) : client(client_), bufMgr(bufMgr_) {}
~RecvCallback() override = default;
void operator()(void* param_1, void* param_2) override {
std::lock_guard<std::mutex> lk(mtx);
count++;
int mid = *static_cast<int*>(param_1);
Chunk* ck = bufMgr->get(mid);
auto con = static_cast<Connection*>(ck->con);
if (count >= 1000000) {
end = timestamp_now();
printf("finished, totally consumes %f s, message round trip time is %f us.\n",
(end - start) / 1000.0, (end - start) * 1000 / 1000000.0);
return;
}
if (count == 1) {
printf("start ping-pong.\n");
}
private:
Client *client;
ChunkMgr *bufMgr;
if (count == 1) {
start = timestamp_now();
}
ck->size = MSG_SIZE;
con->send(ck);
}

private:
Client* client;
ChunkMgr* bufMgr;
};

class SendCallback : public Callback {
public:
explicit SendCallback(ChunkMgr *bufMgr_) : bufMgr(bufMgr_) {}
~SendCallback() override = default;
void operator()(void *param_1, void *param_2) override {
int mid = *(int*)param_1;
Chunk *ck = bufMgr->get(mid);
auto con = (Connection*)ck->con;
bufMgr->reclaim(ck, con);
}
private:
ChunkMgr *bufMgr;
public:
explicit SendCallback(ChunkMgr* bufMgr_) : bufMgr(bufMgr_) {}
~SendCallback() override = default;
void operator()(void* param_1, void* param_2) override {
int mid = *static_cast<int*>(param_1);
Chunk* ck = bufMgr->get(mid);
auto con = static_cast<Connection*>(ck->con);
bufMgr->reclaim(ck, con);
}

private:
ChunkMgr* bufMgr;
};

int main(int argc, char *argv[]) {
int main(int argc, char* argv[]) {
auto client = new Client(1, 16);
client->init();

ChunkMgr *bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM*10);
ChunkMgr* bufMgr = new ChunkPool(client, BUFFER_SIZE, BUFFER_NUM, BUFFER_NUM * 10);
client->set_buf_mgr(bufMgr);

auto recvCallback = new RecvCallback(client, bufMgr);
Expand Down
Loading

0 comments on commit 0b933de

Please sign in to comment.