diff --git a/bmf/engine/c_engine/include/split_module.h b/bmf/engine/c_engine/include/split_module.h index 2797432..b34f5fb 100644 --- a/bmf/engine/c_engine/include/split_module.h +++ b/bmf/engine/c_engine/include/split_module.h @@ -17,7 +17,6 @@ #include #include -#include "safe_queue.h" USE_BMF_SDK_NS class SplitModule : public Module { @@ -30,17 +29,13 @@ class SplitModule : public Module { int close(); - std::map in_eof_; + bool in_eof_; int last_input_num_; int last_output_num_; int stream_index_; - - int queue_index_; - - std::map>> queue_map_; }; REGISTER_MODULE_CLASS(SplitModule) diff --git a/bmf/engine/c_engine/src/assemble_module.cpp b/bmf/engine/c_engine/src/assemble_module.cpp index 177e12f..92b7293 100644 --- a/bmf/engine/c_engine/src/assemble_module.cpp +++ b/bmf/engine/c_engine/src/assemble_module.cpp @@ -26,7 +26,7 @@ AssembleModule::AssembleModule(int node_id, JsonParam json_param) int AssembleModule::process(Task &task) { if (task.get_inputs().size() != last_input_num_) { - BMFLOG_NODE(BMF_INFO, node_id_) + BMFLOG_NODE(BMF_DEBUG, node_id_) << "Input Queue size changed from " << last_input_num_ << " to " << task.get_inputs().size(); last_input_num_ = task.get_inputs().size(); @@ -40,7 +40,7 @@ int AssembleModule::process(Task &task) { } } if (task.get_outputs().size() != last_output_num_) { - BMFLOG_NODE(BMF_INFO, node_id_) + BMFLOG_NODE(BMF_DEBUG, node_id_) << "Output Queue size changed from " << last_output_num_ << " to " << task.get_outputs().size(); last_output_num_ = task.get_outputs().size(); diff --git a/bmf/engine/c_engine/src/optimizer.cpp b/bmf/engine/c_engine/src/optimizer.cpp index e281dc1..1c7c5a7 100644 --- a/bmf/engine/c_engine/src/optimizer.cpp +++ b/bmf/engine/c_engine/src/optimizer.cpp @@ -382,7 +382,8 @@ void process_distributed_node(std::vector &nodes) { } else if (upstream_node) { int dist_nums = node->get_dist_nums(); // preallocate space for new nodes - nodes.reserve(nodes.size() + dist_nums + 2); + // two internal nodes and dist_nums - 1 dist nodes + nodes.reserve(nodes.size() + dist_nums + 1); // repoint to memory address after allocation node = &nodes[nodes_index]; upstream_node = nullptr; @@ -401,7 +402,6 @@ void process_distributed_node(std::vector &nodes) { // creat and insert copies of the current node for (int i = 1; i < dist_nums; ++i) { - //node = &nodes[nodes_index + 1]; auto new_node = NodeConfig(*node); new_node.set_id(nodes.size()); new_node.change_input_stream_identifier(split_node.output_streams[i]. @@ -421,15 +421,17 @@ void process_distributed_node(std::vector &nodes) { nodes.push_back(assemble_node); // link downstream node's inputstream and assemble node's outputstream - for (auto &tem_node : nodes) - for (auto &input_stream : tem_node.input_streams) + for (auto &tem_node : nodes) { + for (auto &input_stream : tem_node.input_streams) { if (input_stream.get_identifier() == node->output_streams[0].get_identifier() && - tem_node.get_id() != assemble_node.get_id()) + tem_node.get_id() != assemble_node.get_id()) { tem_node.change_input_stream_identifier((assemble_node. - get_output_streams())[0]. - get_identifier()); - + get_output_streams())[0]. + get_identifier()); + } + } + } } nodes_index++; } diff --git a/bmf/engine/c_engine/src/split_module.cpp b/bmf/engine/c_engine/src/split_module.cpp index 977f855..702fea1 100644 --- a/bmf/engine/c_engine/src/split_module.cpp +++ b/bmf/engine/c_engine/src/split_module.cpp @@ -21,13 +21,13 @@ SplitModule::SplitModule(int node_id, JsonParam json_param) last_input_num_ = 0; last_output_num_ = 0; stream_index_ = 0; - queue_index_ = 0; + in_eof_ = false; return; } int SplitModule::process(Task &task) { if (task.get_inputs().size() != last_input_num_) { - BMFLOG_NODE(BMF_INFO, node_id_) + BMFLOG_NODE(BMF_DEBUG, node_id_) << "Input Queue size changed from " << last_input_num_ << " to " << task.get_inputs().size(); last_input_num_ = task.get_inputs().size(); @@ -38,79 +38,46 @@ int SplitModule::process(Task &task) { } } if (task.get_outputs().size() != last_output_num_) { - BMFLOG_NODE(BMF_INFO, node_id_) + BMFLOG_NODE(BMF_DEBUG, node_id_) << "Output Queue size changed from " << last_output_num_ << " to " << task.get_outputs().size(); last_output_num_ = task.get_outputs().size(); - // init queue_map_ - for (int i = 0; i < last_output_num_; i++) { - std::shared_ptr> tmp_queue = - std::make_shared>(); - queue_map_.insert( - std::pair>>( - i, tmp_queue)); - } - } - - if (in_eof_.size() != task.get_inputs().size()) { - in_eof_.clear(); - for (auto input_queue : task.get_inputs()) - in_eof_[input_queue.first] = false; } - // Data cache into queue_map - auto tem_queue = task.get_inputs(); - Packet pkt; - while (!tem_queue[0]->empty()) { - Packet pkt = tem_queue[0]->front(); - tem_queue[0]->pop(); - queue_map_[stream_index_]->push(pkt); - if (pkt.timestamp() == EOS or pkt.timestamp() == BMF_EOF) { - /* add EOF pkt for multi downstream node */ - for (size_t i = 1; i < task.get_outputs().size(); i++) { - queue_map_[i]->push(Packet::generate_eof_packet()); - } - } - stream_index_ = (stream_index_ + 1) % task.get_outputs().size(); - } + auto input_queue = task.get_inputs()[0]; // Data Splitting - while (!queue_map_[queue_index_]->empty()) { + Packet pkt; + + while (task.pop_packet_from_input_queue(0, pkt)) { - if (in_eof_[queue_index_] == true) + if (in_eof_ == true) continue; - - auto queue = queue_map_.find(queue_index_); - if (queue->second->pop(pkt)) { - // fill splitted pkt into multi output stream - task.fill_output_packet(queue_index_, pkt); - if (pkt.timestamp() == BMF_EOF) { - in_eof_[queue_index_] = true; + // fill splitted pkt into multi output stream + task.fill_output_packet(stream_index_, pkt); + if (pkt.timestamp() == BMF_EOF) { + // fill eof packet for extra distributed node + for (size_t i = 1; i < task.get_outputs().size(); i++) { + task.fill_output_packet(i, Packet::generate_eof_packet()); } - BMFLOG_NODE(BMF_DEBUG, node_id_) - << "get packet :" << pkt.timestamp() - << " data:" << pkt.type_info().name - << " in queue:" << queue_index_; - - queue_index_ = (queue_index_ + 1) % task.get_outputs().size(); + in_eof_ = true; } - } + BMFLOG_NODE(BMF_DEBUG, node_id_) + << "get packet :" << pkt.timestamp() + << " data:" << pkt.type_info().name + << " in queue:" << 0; - bool all_eof = true; - for (auto f_eof : in_eof_) { - if (f_eof.second == false) { - all_eof = false; - break; - } + stream_index_ = (stream_index_ + 1) % task.get_outputs().size(); } - if (all_eof) + + if (in_eof_) task.set_timestamp(DONE); return 0; } int SplitModule::reset() { - in_eof_.clear(); + in_eof_ = false; return 0; }