Skip to content

Commit

Permalink
* modify the implementation of split_module
Browse files Browse the repository at this point in the history
* improve the code of optimizer
* change the BMFLOG_NODE type of split and assemble module
  • Loading branch information
JackLau1222 committed Oct 30, 2024
1 parent 6fb4f1c commit 608fdb0
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 72 deletions.
7 changes: 1 addition & 6 deletions bmf/engine/c_engine/include/split_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#include <bmf/sdk/module.h>
#include <bmf/sdk/module_registry.h>
#include "safe_queue.h"

USE_BMF_SDK_NS
class SplitModule : public Module {
Expand All @@ -30,17 +29,13 @@ class SplitModule : public Module {

int close();

std::map<int, bool> in_eof_;
bool in_eof_;

int last_input_num_;

int last_output_num_;

int stream_index_;

int queue_index_;

std::map<int, std::shared_ptr<bmf_engine::SafeQueue<Packet>>> queue_map_;
};

REGISTER_MODULE_CLASS(SplitModule)
Expand Down
4 changes: 2 additions & 2 deletions bmf/engine/c_engine/src/assemble_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ AssembleModule::AssembleModule(int node_id, JsonParam json_param)

int AssembleModule::process(Task &task) {
if (task.get_inputs().size() != last_input_num_) {
BMFLOG_NODE(BMF_INFO, node_id_)
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "Input Queue size changed from " << last_input_num_ << " to "
<< task.get_inputs().size();
last_input_num_ = task.get_inputs().size();
Expand All @@ -40,7 +40,7 @@ int AssembleModule::process(Task &task) {
}
}
if (task.get_outputs().size() != last_output_num_) {
BMFLOG_NODE(BMF_INFO, node_id_)
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "Output Queue size changed from " << last_output_num_ << " to "
<< task.get_outputs().size();
last_output_num_ = task.get_outputs().size();
Expand Down
18 changes: 10 additions & 8 deletions bmf/engine/c_engine/src/optimizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ void process_distributed_node(std::vector<bmf_engine::NodeConfig> &nodes) {
} else if (upstream_node) {
int dist_nums = node->get_dist_nums();
// preallocate space for new nodes
nodes.reserve(nodes.size() + dist_nums + 2);
// two internal nodes and dist_nums - 1 dist nodes
nodes.reserve(nodes.size() + dist_nums + 1);
// repoint to memory address after allocation
node = &nodes[nodes_index];
upstream_node = nullptr;
Expand All @@ -401,7 +402,6 @@ void process_distributed_node(std::vector<bmf_engine::NodeConfig> &nodes) {

// creat and insert copies of the current node
for (int i = 1; i < dist_nums; ++i) {
//node = &nodes[nodes_index + 1];
auto new_node = NodeConfig(*node);
new_node.set_id(nodes.size());
new_node.change_input_stream_identifier(split_node.output_streams[i].
Expand All @@ -421,15 +421,17 @@ void process_distributed_node(std::vector<bmf_engine::NodeConfig> &nodes) {
nodes.push_back(assemble_node);

// link downstream node's inputstream and assemble node's outputstream
for (auto &tem_node : nodes)
for (auto &input_stream : tem_node.input_streams)
for (auto &tem_node : nodes) {
for (auto &input_stream : tem_node.input_streams) {
if (input_stream.get_identifier() ==
node->output_streams[0].get_identifier() &&
tem_node.get_id() != assemble_node.get_id())
tem_node.get_id() != assemble_node.get_id()) {
tem_node.change_input_stream_identifier((assemble_node.
get_output_streams())[0].
get_identifier());

get_output_streams())[0].
get_identifier());
}
}
}
}
nodes_index++;
}
Expand Down
79 changes: 23 additions & 56 deletions bmf/engine/c_engine/src/split_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ SplitModule::SplitModule(int node_id, JsonParam json_param)
last_input_num_ = 0;
last_output_num_ = 0;
stream_index_ = 0;
queue_index_ = 0;
in_eof_ = false;
return;
}

int SplitModule::process(Task &task) {
if (task.get_inputs().size() != last_input_num_) {
BMFLOG_NODE(BMF_INFO, node_id_)
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "Input Queue size changed from " << last_input_num_ << " to "
<< task.get_inputs().size();
last_input_num_ = task.get_inputs().size();
Expand All @@ -38,79 +38,46 @@ int SplitModule::process(Task &task) {
}
}
if (task.get_outputs().size() != last_output_num_) {
BMFLOG_NODE(BMF_INFO, node_id_)
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "Output Queue size changed from " << last_output_num_ << " to "
<< task.get_outputs().size();
last_output_num_ = task.get_outputs().size();
// init queue_map_
for (int i = 0; i < last_output_num_; i++) {
std::shared_ptr<bmf_engine::SafeQueue<Packet>> tmp_queue =
std::make_shared<bmf_engine::SafeQueue<Packet>>();
queue_map_.insert(
std::pair<int, std::shared_ptr<bmf_engine::SafeQueue<Packet>>>(
i, tmp_queue));
}
}

if (in_eof_.size() != task.get_inputs().size()) {
in_eof_.clear();
for (auto input_queue : task.get_inputs())
in_eof_[input_queue.first] = false;
}

// Data cache into queue_map
auto tem_queue = task.get_inputs();
Packet pkt;
while (!tem_queue[0]->empty()) {
Packet pkt = tem_queue[0]->front();
tem_queue[0]->pop();
queue_map_[stream_index_]->push(pkt);
if (pkt.timestamp() == EOS or pkt.timestamp() == BMF_EOF) {
/* add EOF pkt for multi downstream node */
for (size_t i = 1; i < task.get_outputs().size(); i++) {
queue_map_[i]->push(Packet::generate_eof_packet());
}
}
stream_index_ = (stream_index_ + 1) % task.get_outputs().size();
}
auto input_queue = task.get_inputs()[0];

// Data Splitting
while (!queue_map_[queue_index_]->empty()) {
Packet pkt;

while (task.pop_packet_from_input_queue(0, pkt)) {

if (in_eof_[queue_index_] == true)
if (in_eof_ == true)
continue;

auto queue = queue_map_.find(queue_index_);
if (queue->second->pop(pkt)) {
// fill splitted pkt into multi output stream
task.fill_output_packet(queue_index_, pkt);
if (pkt.timestamp() == BMF_EOF) {
in_eof_[queue_index_] = true;
// fill splitted pkt into multi output stream
task.fill_output_packet(stream_index_, pkt);
if (pkt.timestamp() == BMF_EOF) {
// fill eof packet for extra distributed node
for (size_t i = 1; i < task.get_outputs().size(); i++) {
task.fill_output_packet(i, Packet::generate_eof_packet());
}
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "get packet :" << pkt.timestamp()
<< " data:" << pkt.type_info().name
<< " in queue:" << queue_index_;

queue_index_ = (queue_index_ + 1) % task.get_outputs().size();
in_eof_ = true;
}
}
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "get packet :" << pkt.timestamp()
<< " data:" << pkt.type_info().name
<< " in queue:" << 0;

bool all_eof = true;
for (auto f_eof : in_eof_) {
if (f_eof.second == false) {
all_eof = false;
break;
}
stream_index_ = (stream_index_ + 1) % task.get_outputs().size();
}
if (all_eof)

if (in_eof_)
task.set_timestamp(DONE);

return 0;
}

int SplitModule::reset() {
in_eof_.clear();
in_eof_ = false;
return 0;
}

Expand Down

0 comments on commit 608fdb0

Please sign in to comment.