From 34e8a24c86861495ab186af2099d664a3782e4d8 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 15 Nov 2023 10:11:55 +0100 Subject: [PATCH] Examples: use multipart in the region example --- .../fairmq-start-ex-region-advanced.sh.in | 92 +++++++++++++------ examples/region/processor.cxx | 22 +++-- examples/region/sampler.cxx | 29 ++++-- examples/region/sink.cxx | 8 +- 4 files changed, 98 insertions(+), 53 deletions(-) diff --git a/examples/region/fairmq-start-ex-region-advanced.sh.in b/examples/region/fairmq-start-ex-region-advanced.sh.in index 3bf4e0f6f..a0ad9dea9 100755 --- a/examples/region/fairmq-start-ex-region-advanced.sh.in +++ b/examples/region/fairmq-start-ex-region-advanced.sh.in @@ -22,32 +22,66 @@ SAMPLER+=" --transport $transport" SAMPLER+=" --shm-monitor true" SAMPLER+=" --chan-name data1" SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777" -xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & - -PROCESSOR="fairmq-ex-region-processor" -PROCESSOR+=" --id processor1" -PROCESSOR+=" --severity debug" -PROCESSOR+=" --transport $transport" -PROCESSOR+=" --shm-monitor true" -PROCESSOR+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777" -PROCESSOR+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778" -PROCESSOR+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779" -xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$PROCESSOR & - -SINK1="fairmq-ex-region-sink" -SINK1+=" --id sink1" -SINK1+=" --severity debug" -SINK1+=" --chan-name data2" -SINK1+=" --transport $transport" -SINK1+=" --shm-monitor true" -SINK1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778" -xterm -geometry 120x32+1500+0 -hold -e @EX_BIN_DIR@/$SINK1 & - -SINK2="fairmq-ex-region-sink" -SINK2+=" --id sink2" -SINK2+=" --severity debug" -SINK2+=" --chan-name data3" -SINK2+=" --transport $transport" -SINK2+=" --shm-monitor true" -SINK2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779" -xterm -geometry 120x32+1500+500 -hold -e @EX_BIN_DIR@/$SINK2 & +xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & + +PROCESSOR1="fairmq-ex-region-processor" +PROCESSOR1+=" --id processor1" +PROCESSOR1+=" --severity debug" +PROCESSOR1+=" --transport $transport" +PROCESSOR1+=" --shm-segment-id 1" +PROCESSOR1+=" --shm-monitor true" +PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777" +PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778" +PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779" +xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 & + +PROCESSOR2="fairmq-ex-region-processor" +PROCESSOR2+=" --id processor2" +PROCESSOR2+=" --severity debug" +PROCESSOR2+=" --transport $transport" +PROCESSOR2+=" --shm-segment-id 2" +PROCESSOR2+=" --shm-monitor true" +PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777" +PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788" +PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789" +xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 & + +SINK1_1="fairmq-ex-region-sink" +SINK1_1+=" --id sink1_1" +SINK1_1+=" --severity debug" +SINK1_1+=" --chan-name data2" +SINK1_1+=" --transport $transport" +SINK1_1+=" --shm-segment-id 1" +SINK1_1+=" --shm-monitor true" +SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778" +xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 & + +SINK1_2="fairmq-ex-region-sink" +SINK1_2+=" --id sink1_2" +SINK1_2+=" --severity debug" +SINK1_2+=" --chan-name data3" +SINK1_2+=" --transport $transport" +SINK1_2+=" --shm-segment-id 1" +SINK1_2+=" --shm-monitor true" +SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779" +xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 & + +SINK2_1="fairmq-ex-region-sink" +SINK2_1+=" --id sink2_1" +SINK2_1+=" --severity debug" +SINK2_1+=" --chan-name data2" +SINK2_1+=" --transport $transport" +SINK2_1+=" --shm-segment-id 2" +SINK2_1+=" --shm-monitor true" +SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788" +xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 & + +SINK2_2="fairmq-ex-region-sink" +SINK2_2+=" --id sink2_2" +SINK2_2+=" --severity debug" +SINK2_2+=" --chan-name data3" +SINK2_2+=" --transport $transport" +SINK2_2+=" --shm-segment-id 2" +SINK2_2+=" --shm-monitor true" +SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789" +xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 & diff --git a/examples/region/processor.cxx b/examples/region/processor.cxx index 4a8d4c6ca..2ed2f7209 100644 --- a/examples/region/processor.cxx +++ b/examples/region/processor.cxx @@ -36,16 +36,22 @@ struct Processor : Device Channel& dataOut2 = GetChannel("data3", 0); while (!NewStatePending()) { - auto msg(dataIn.Transport()->CreateMessage()); - dataIn.Receive(msg); + fair::mq::Parts inParts; + dataIn.Receive(inParts); - fair::mq::MessagePtr msgCopy1(NewMessage()); - msgCopy1->Copy(*msg); - fair::mq::MessagePtr msgCopy2(NewMessage()); - msgCopy2->Copy(*msg); + fair::mq::Parts outParts1; + fair::mq::Parts outParts2; - dataOut1.Send(msgCopy1); - dataOut2.Send(msgCopy2); + for (const auto& inPart : inParts) { + outParts1.AddPart(NewMessage()); + outParts1.fParts.back()->Copy(*inPart); + + outParts2.AddPart(NewMessage()); + outParts2.fParts.back()->Copy(*inPart); + } + + dataOut1.Send(outParts1); + dataOut2.Send(outParts2); if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state."; diff --git a/examples/region/sampler.cxx b/examples/region/sampler.cxx index 33d204c55..b4bb169ad 100644 --- a/examples/region/sampler.cxx +++ b/examples/region/sampler.cxx @@ -26,6 +26,7 @@ struct Sampler : fair::mq::Device fMaxIterations = fConfig->GetProperty("max-iterations"); fChanName = fConfig->GetProperty("chan-name"); fSamplingRate = fConfig->GetProperty("sampling-rate"); + fRCSegmentSize = fConfig->GetProperty("rc-segment-size"); GetChannel(fChanName, 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) { LOG(info) << "Region event: " << info.event << ": " @@ -45,6 +46,7 @@ struct Sampler : fair::mq::Device } regionCfg.lock = !fExternalRegion; // mlock region after creation regionCfg.zero = !fExternalRegion; // zero region content after creation + regionCfg.rcSegmentSize = fRCSegmentSize; // size of the corresponding reference count segment fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor( fChanName, // region is created using the transport of this channel... 0, // ... and this sub-channel @@ -66,17 +68,22 @@ struct Sampler : fair::mq::Device fair::mq::tools::RateLimiter rateLimiter(fSamplingRate); while (!NewStatePending()) { - fair::mq::MessagePtr msg(NewMessageFor(fChanName, // channel - 0, // sub-channel - fRegion, // region - fRegion->GetData(), // ptr within region - fMsgSize, // offset from ptr - nullptr // hint - )); + fair::mq::Parts parts; + // make 64 parts + for (int i = 0; i < 64; ++i) { + parts.AddPart(NewMessageFor( + fChanName, // channel + 0, // sub-channel + fRegion, // region + fRegion->GetData(), // ptr within region + fMsgSize, // offset from ptr + nullptr // hint + )); + } std::lock_guard lock(fMtx); - ++fNumUnackedMsgs; - if (Send(msg, fChanName, 0) > 0) { + fNumUnackedMsgs += parts.Size(); + if (Send(parts, fChanName, 0) > 0) { if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { LOG(info) << "Configured maximum number of iterations reached. Stopping sending."; break; @@ -117,6 +124,7 @@ struct Sampler : fair::mq::Device uint32_t fLinger = 100; uint64_t fMaxIterations = 0; uint64_t fNumIterations = 0; + uint64_t fRCSegmentSize = 10000000; fair::mq::UnmanagedRegionPtr fRegion = nullptr; std::mutex fMtx; uint64_t fNumUnackedMsgs = 0; @@ -132,7 +140,8 @@ void addCustomOptions(bpo::options_description& options) ("sampling-rate", bpo::value()->default_value(0.), "Sampling rate (Hz).") ("region-linger", bpo::value()->default_value(100), "Linger period for regions") ("max-iterations", bpo::value()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)") - ("external-region", bpo::value()->default_value(false), "Use region created by another process"); + ("external-region", bpo::value()->default_value(false), "Use region created by another process") + ("rc-segment-size", bpo::value()->default_value(10000000), "Size of the reference count segment for Unamanged Region"); } std::unique_ptr getDevice(fair::mq::ProgOptions& /*config*/) diff --git a/examples/region/sink.cxx b/examples/region/sink.cxx index 3dd3837ef..2e8eab647 100644 --- a/examples/region/sink.cxx +++ b/examples/region/sink.cxx @@ -36,12 +36,8 @@ struct Sink : Device Channel& dataIn = GetChannel(fChanName, 0); while (!NewStatePending()) { - auto msg(dataIn.Transport()->CreateMessage()); - dataIn.Receive(msg); - - // void* ptr = msg->GetData(); - // char* cptr = static_cast(ptr); - // LOG(info) << "check: " << cptr[3]; + fair::mq::Parts parts; + dataIn.Receive(parts); if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { LOG(info) << "Configured max number of iterations reached. Leaving RUNNING state.";