Skip to content

Commit

Permalink
add queue_map_ for split_module to generate extra eof_packet
Browse files Browse the repository at this point in the history
  • Loading branch information
JackLau1222 committed Oct 29, 2024
1 parent 60ec753 commit 6fb4f1c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 10 deletions.
1 change: 1 addition & 0 deletions bmf/engine/c_engine/include/assemble_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <bmf/sdk/module.h>
#include <bmf/sdk/module_registry.h>
#include "safe_queue.h"

USE_BMF_SDK_NS
class AssembleModule : public Module {
public:
Expand Down
5 changes: 5 additions & 0 deletions bmf/engine/c_engine/include/split_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <bmf/sdk/module.h>
#include <bmf/sdk/module_registry.h>
#include "safe_queue.h"

USE_BMF_SDK_NS
class SplitModule : public Module {
Expand All @@ -36,6 +37,10 @@ class SplitModule : public Module {
int last_output_num_;

int stream_index_;

int queue_index_;

std::map<int, std::shared_ptr<bmf_engine::SafeQueue<Packet>>> queue_map_;
};

REGISTER_MODULE_CLASS(SplitModule)
Expand Down
46 changes: 36 additions & 10 deletions bmf/engine/c_engine/src/split_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ SplitModule::SplitModule(int node_id, JsonParam json_param)
last_input_num_ = 0;
last_output_num_ = 0;
stream_index_ = 0;
queue_index_ = 0;
return;
}

Expand All @@ -41,6 +42,14 @@ int SplitModule::process(Task &task) {
<< "Output Queue size changed from " << last_output_num_ << " to "
<< task.get_outputs().size();
last_output_num_ = task.get_outputs().size();
// init queue_map_
for (int i = 0; i < last_output_num_; i++) {
std::shared_ptr<bmf_engine::SafeQueue<Packet>> tmp_queue =
std::make_shared<bmf_engine::SafeQueue<Packet>>();
queue_map_.insert(
std::pair<int, std::shared_ptr<bmf_engine::SafeQueue<Packet>>>(
i, tmp_queue));
}
}

if (in_eof_.size() != task.get_inputs().size()) {
Expand All @@ -49,24 +58,41 @@ int SplitModule::process(Task &task) {
in_eof_[input_queue.first] = false;
}

// Data Splitting
// Data cache into queue_map
auto tem_queue = task.get_inputs();
Packet pkt;
for (auto input_queue : task.get_inputs()) {
while (task.pop_packet_from_input_queue(input_queue.first, pkt)) {

if (in_eof_[input_queue.first] == true)
continue;
while (!tem_queue[0]->empty()) {
Packet pkt = tem_queue[0]->front();
tem_queue[0]->pop();
queue_map_[stream_index_]->push(pkt);
if (pkt.timestamp() == EOS or pkt.timestamp() == BMF_EOF) {
/* add EOF pkt for multi downstream node */
for (size_t i = 1; i < task.get_outputs().size(); i++) {
queue_map_[i]->push(Packet::generate_eof_packet());
}
}
stream_index_ = (stream_index_ + 1) % task.get_outputs().size();
}

// Data Splitting
while (!queue_map_[queue_index_]->empty()) {

if (in_eof_[queue_index_] == true)
continue;

auto queue = queue_map_.find(queue_index_);
if (queue->second->pop(pkt)) {
// fill splitted pkt into multi output stream
task.fill_output_packet(stream_index_, pkt);
task.fill_output_packet(queue_index_, pkt);
if (pkt.timestamp() == BMF_EOF) {
in_eof_[input_queue.first] = true;
in_eof_[queue_index_] = true;
}
BMFLOG_NODE(BMF_DEBUG, node_id_)
<< "get packet :" << pkt.timestamp()
<< " data:" << pkt.type_info().name
<< " in queue:" << input_queue.first;
<< " in queue:" << queue_index_;

stream_index_ = (stream_index_ + 1) % task.get_outputs().size();
queue_index_ = (queue_index_ + 1) % task.get_outputs().size();
}
}

Expand Down

0 comments on commit 6fb4f1c

Please sign in to comment.