From 291bf884451b4e5949f0f10fce8c3ef4f15904f9 Mon Sep 17 00:00:00 2001 From: reicheratwork <66302498+reicheratwork@users.noreply.github.com> Date: Fri, 12 May 2023 15:52:09 +0200 Subject: [PATCH] Writer batching qos (#406) * Added writer batching QoSPolicy To mirror the implementation of the writer batching QoSPolicy in CycloneDDS-C commit d6906eb Signed-off-by: Martijn Reicher * Added writer batching in throughput example This is handled through the WriterBatching QoSPolicy Signed-off-by: Martijn Reicher * Modified comments due to GitHub review Signed-off-by: Martijn Reicher --------- Signed-off-by: Martijn Reicher --- .../include/dds/core/policy/CorePolicy.hpp | 4 ++ .../include/dds/core/policy/TCorePolicy.hpp | 60 +++++++++++++++++++ .../dds/core/policy/detail/CorePolicy.hpp | 3 + .../core/policy/detail/TCorePolicyImpl.hpp | 39 ++++++++++++ .../dds/pub/qos/detail/DataWriterQos.hpp | 1 + .../cyclonedds/core/policy/PolicyDelegate.hpp | 23 +++++++ .../pub/qos/DataWriterQosDelegate.hpp | 14 +++++ src/ddscxx/src/dds/core/policy/CorePolicy.cpp | 1 + .../cyclonedds/core/policy/PolicyDelegate.cpp | 42 +++++++++++++ .../cyclonedds/pub/AnyDataWriterDelegate.cpp | 2 + .../pub/qos/DataWriterQosDelegate.cpp | 11 ++++ src/ddscxx/tests/Qos.cpp | 10 ++++ 12 files changed, 210 insertions(+) diff --git a/src/ddscxx/include/dds/core/policy/CorePolicy.hpp b/src/ddscxx/include/dds/core/policy/CorePolicy.hpp index 78935380..7d073032 100644 --- a/src/ddscxx/include/dds/core/policy/CorePolicy.hpp +++ b/src/ddscxx/include/dds/core/policy/CorePolicy.hpp @@ -152,6 +152,9 @@ UserData; typedef dds::core::policy::detail::WriterDataLifecycle WriterDataLifecycle; +typedef dds::core::policy::detail::WriterBatching +WriterBatching; + #ifdef OMG_DDS_PERSISTENCE_SUPPORT typedef ::dds::core::policy::detail::DurabilityService DurabilityService; @@ -192,6 +195,7 @@ OMG_DDS_POLICY_TRAITS(DurabilityService, 22) OMG_DDS_POLICY_TRAITS(DataRepresentation, 23) OMG_DDS_POLICY_TRAITS(TypeConsistencyEnforcement, 24) #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +OMG_DDS_POLICY_TRAITS(WriterBatching, 25) } } diff --git a/src/ddscxx/include/dds/core/policy/TCorePolicy.hpp b/src/ddscxx/include/dds/core/policy/TCorePolicy.hpp index 4faefc07..9644fe2c 100644 --- a/src/ddscxx/include/dds/core/policy/TCorePolicy.hpp +++ b/src/ddscxx/include/dds/core/policy/TCorePolicy.hpp @@ -842,6 +842,66 @@ class TWriterDataLifecycle : public dds::core::Value */ static TWriterDataLifecycle ManuallyDisposeUnregisteredInstances(); +}; +//============================================================================== + +template +class TWriterBatching : public dds::core::Value +{ +public: + /** + * Creates a WriterBatching QoS instance + * + * @param batch_updates a boolean indicating if updates should be batched + * before being explicitly flushed + */ + explicit TWriterBatching(bool batch_updates = false); + + /** + * Copies a WriterBatching QoS instance + * + * @param other the WriterBatching QoS instance to copy + */ + TWriterBatching(const TWriterBatching& other); + + /** + * Copies a WriterBatching QoS instance + * + * @param other the WriterBatching QoS instance to copy + * + * @return reference to the WriterBatching QoS instance that was copied to + */ + TWriterBatching& operator=(const TWriterBatching& other) = default; + +public: + /** + * Gets a boolean indicating if updates should be batched + * + * @return a boolean indicating if updates should be batched + */ + bool batch_updates() const; + + /** + * Sets a boolean indicating if updates should be batched + * + * @param batch_updates a boolean indicating if updates should be batched + */ + TWriterBatching& batch_updates( + bool batch_updates); + +public: + /** + * @return a WriterBatching QoS instance with batch_updates + * set to true + */ + static TWriterBatching BatchUpdates(); + + /** + * @return a WriterBatching QoS instance with batch_updates + * set to false + */ + static TWriterBatching DoNotBatchUpdates(); + }; //============================================================================== diff --git a/src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp b/src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp index ba3eefe2..4b38c0ee 100644 --- a/src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp +++ b/src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp @@ -105,6 +105,9 @@ namespace dds { namespace core { namespace policy { namespace detail { typedef dds::core::policy::TWriterDataLifecycle WriterDataLifecycle; + + typedef dds::core::policy::TWriterBatching + WriterBatching; } } } } // namespace dds::core::policy::detail diff --git a/src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp b/src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp index 00c67aa6..ab298732 100644 --- a/src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp +++ b/src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp @@ -501,6 +501,45 @@ TWriterDataLifecycle TWriterDataLifecycle::ManuallyDisposeUnregisteredInst return TWriterDataLifecycle(false); } +//TWriterBatching +template +TWriterBatching::TWriterBatching(bool batch_updates): dds::core::Value(batch_updates) +{ +} + +template +TWriterBatching::TWriterBatching(const TWriterBatching& other): dds::core::Value(other.delegate()) +{ +} + +template +bool TWriterBatching::batch_updates() const +{ + return this->delegate().batch_updates(); +} + +template +TWriterBatching& TWriterBatching::batch_updates( + bool batch_updates) +{ + this->delegate().batch_updates(batch_updates); + return *this; +} + +template +TWriterBatching TWriterBatching::BatchUpdates() +{ + return TWriterBatching(true); +} + + +template +TWriterBatching TWriterBatching::DoNotBatchUpdates() +{ + return TWriterBatching(false); +} + + //TReaderDataLifecycle template TReaderDataLifecycle::TReaderDataLifecycle(const dds::core::Duration& autopurge_nowriter_samples_delay, const dds::core::Duration& autopurge_disposed_samples_delay) diff --git a/src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp b/src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp index 5034a423..537a2676 100644 --- a/src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp +++ b/src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp @@ -53,6 +53,7 @@ * dds::core::policy::WriterDataLifecycle | Dispose with unregister or not (@ref DCPS_QoS_WriterDataLifecycle "info") | WriterDataLifecycle::AutoDisposeUnregisteredInstances() * dds::core::policy::DataRepresentation | Supported data representation kinds (@ref DCPS_QoS_DataRepresentation "info") | DataRepresentation::DataRepresentation(dds::core::policy::DataRepresentationId::XCDR1) * dds::core::policy::TypeConsistencyEnforcement | Type consistency enforcement policies (@ref DCPS_QoS_TypeConsistencyEnforcement "info") | dds::core::policy::TypeConsistencyKind::DISALLOW_TYPE_COERCION + * dds::core::policy::WriterBatching | Writer data batching | dds::core::policy::WriterBatching::DoNotBatchUpdates() * * A QosPolicy can be set when the DataWriter is created or modified with the set * qos operation. diff --git a/src/ddscxx/include/org/eclipse/cyclonedds/core/policy/PolicyDelegate.hpp b/src/ddscxx/include/org/eclipse/cyclonedds/core/policy/PolicyDelegate.hpp index 2e0756ec..38a5b3db 100644 --- a/src/ddscxx/include/org/eclipse/cyclonedds/core/policy/PolicyDelegate.hpp +++ b/src/ddscxx/include/org/eclipse/cyclonedds/core/policy/PolicyDelegate.hpp @@ -898,6 +898,29 @@ class OMG_DDS_API WriterDataLifecycleDelegate bool autodispose_; }; +//============================================================================== + +class OMG_DDS_API WriterBatchingDelegate +{ +public: + WriterBatchingDelegate(const WriterBatchingDelegate& other); + explicit WriterBatchingDelegate(bool batch_updates); + + bool batch_updates() const; + void batch_updates(bool b); + + bool operator ==(const WriterBatchingDelegate& other) const; + + WriterBatchingDelegate& operator =(const WriterBatchingDelegate& other) = default; + + void check() const; + + void set_iso_policy(const dds_qos_t* qos); + void set_c_policy(dds_qos_t* qos) const; + +private: + bool batch_updates_; +}; #ifdef OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT diff --git a/src/ddscxx/include/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.hpp b/src/ddscxx/include/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.hpp index dc3d9183..62f1fa7a 100644 --- a/src/ddscxx/include/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.hpp +++ b/src/ddscxx/include/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.hpp @@ -60,6 +60,7 @@ class OMG_DDS_API DataWriterQosDelegate void policy(const dds::core::policy::DataRepresentation& datarepresentation); void policy(const dds::core::policy::TypeConsistencyEnforcement& typeconsistencyenforcement); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + void policy(const dds::core::policy::WriterBatching& writerbatching); template const POLICY& policy() const; template POLICY& policy(); @@ -96,6 +97,7 @@ class OMG_DDS_API DataWriterQosDelegate dds::core::policy::DataRepresentation datarepresentation_; dds::core::policy::TypeConsistencyEnforcement typeconsistencyenforcement_; #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + dds::core::policy::WriterBatching writerbatching_; }; @@ -296,6 +298,18 @@ DataWriterQosDelegate::policy() } #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +template<> inline const dds::core::policy::WriterBatching& +DataWriterQosDelegate::policy() const +{ + return writerbatching_; +} + +template<> inline dds::core::policy::WriterBatching& +DataWriterQosDelegate::policy() +{ + return writerbatching_; +} + } } } diff --git a/src/ddscxx/src/dds/core/policy/CorePolicy.cpp b/src/ddscxx/src/dds/core/policy/CorePolicy.cpp index a6acb07e..dfb050e5 100644 --- a/src/ddscxx/src/dds/core/policy/CorePolicy.cpp +++ b/src/ddscxx/src/dds/core/policy/CorePolicy.cpp @@ -50,3 +50,4 @@ OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::TypeConsistencyEnforcement, Type #ifdef OMG_DDS_PERSISTENCE_SUPPORT OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::DurabilityService, DurabilityService) #endif // OMG_DDS_PERSISTENCE_SUPPORT +OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::WriterBatching, WriterBatching) diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/core/policy/PolicyDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/core/policy/PolicyDelegate.cpp index 4b919374..6c9f660a 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/core/policy/PolicyDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/core/policy/PolicyDelegate.cpp @@ -1624,6 +1624,48 @@ void WriterDataLifecycleDelegate::set_c_policy(dds_qos_t* qos) const } +//============================================================================== + +WriterBatchingDelegate::WriterBatchingDelegate(const WriterBatchingDelegate& other) + : batch_updates_(other.batch_updates_) +{ +} + +WriterBatchingDelegate::WriterBatchingDelegate(bool batch_updates): batch_updates_(batch_updates) +{ +} + +bool WriterBatchingDelegate::batch_updates() const +{ + return batch_updates_; +} + +void WriterBatchingDelegate::batch_updates(bool b) +{ + batch_updates_ = b; +} + +bool WriterBatchingDelegate::operator ==(const WriterBatchingDelegate& other) const +{ + return other.batch_updates() == batch_updates_; +} + +void WriterBatchingDelegate::check() const +{ + /* The batch_updates is just a boolean: nothing to check. */ +} + +void WriterBatchingDelegate::set_iso_policy(const dds_qos_t* qos) +{ + (void)dds_qget_writer_batching(qos, &batch_updates_); +} + +void WriterBatchingDelegate::set_c_policy(dds_qos_t* qos) const +{ + dds_qset_writer_batching(qos, batch_updates_); +} + + //============================================================================== #ifdef OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp index 7c5b13ab..3219059e 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/pub/AnyDataWriterDelegate.cpp @@ -501,7 +501,9 @@ AnyDataWriterDelegate::write_flush() void AnyDataWriterDelegate::set_batch(bool enable) { +DDSRT_WARNING_DEPRECATED_OFF dds_write_set_batch (enable); +DDSRT_WARNING_DEPRECATED_ON } } diff --git a/src/ddscxx/src/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.cpp b/src/ddscxx/src/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.cpp index da2d258f..387b2709 100644 --- a/src/ddscxx/src/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.cpp +++ b/src/ddscxx/src/org/eclipse/cyclonedds/pub/qos/DataWriterQosDelegate.cpp @@ -160,6 +160,13 @@ DataWriterQosDelegate::policy(const dds::core::policy::TypeConsistencyEnforcemen } #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +void +DataWriterQosDelegate::policy(const dds::core::policy::WriterBatching& writerbatching) +{ + writerbatching.delegate().check(); + writerbatching_ = writerbatching; +} + dds_qos_t* DataWriterQosDelegate::ddsc_qos() const { @@ -184,6 +191,7 @@ DataWriterQosDelegate::ddsc_qos() const datarepresentation_.delegate().set_c_policy(qos); typeconsistencyenforcement_.delegate().set_c_policy(qos); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + writerbatching_.delegate().set_c_policy(qos); return qos; } @@ -211,6 +219,7 @@ DataWriterQosDelegate::ddsc_qos(const dds_qos_t* qos) datarepresentation_.delegate().set_iso_policy(qos); typeconsistencyenforcement_.delegate().set_iso_policy(qos); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + writerbatching_.delegate().set_iso_policy(qos); } void @@ -242,6 +251,7 @@ DataWriterQosDelegate::named_qos(const struct _DDS_NamedDataWriterQos &qos) datarepresentation_.delegate().v_policy((v_writerDataRepresentationPolicy&)(q->writer_datarepresentation)); typeconsistencyenforcement_.delegate().v_policy((v_writerTypeConsistencyEnforcementPolicy&)(q->writer_typeconsistencyenforcement)); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + writerbatching_.delegate().v_policy((v_writerbatchingPolicy&)(q->writer_batching) ); #endif } @@ -276,6 +286,7 @@ DataWriterQosDelegate::operator ==(const DataWriterQosDelegate& other) const && other.datarepresentation_ == datarepresentation_ && other.typeconsistencyenforcement_ == typeconsistencyenforcement_ #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + && other.writerbatching_ == writerbatching_ ; } diff --git a/src/ddscxx/tests/Qos.cpp b/src/ddscxx/tests/Qos.cpp index 3ac1c47d..0ac7e7c2 100644 --- a/src/ddscxx/tests/Qos.cpp +++ b/src/ddscxx/tests/Qos.cpp @@ -71,6 +71,7 @@ DataRepresentation nonDefaultRepresentation({dds::core::policy::DataRepresen dds::core::policy::DataRepresentationId::XCDR2}); TypeConsistencyEnforcement nonDefaultTypeConsistencyEnforcement(dds::core::policy::TypeConsistencyKind::ALLOW_TYPE_COERCION, true, true, true, true, true); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +WriterBatching nonDefaultWriterBatching(true); @@ -107,6 +108,7 @@ ReaderDataLifecycle tmpRdLifecycle; DataRepresentation tmpRepresentation; TypeConsistencyEnforcement tmpEnforcement; #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT +WriterBatching tmpWriterBatching; TEST(Qos, DomainParticipant) { @@ -354,6 +356,7 @@ TEST(Qos, DataWriter) << nonDefaultRepresentation << nonDefaultTypeConsistencyEnforcement #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + << nonDefaultWriterBatching ; DataWriterQos dwQosWConstructed(dwQosShifted); DataWriterQos dwQosWAssigned1 = dwQosShifted; /* Actually calls copy constructor. */ @@ -363,6 +366,9 @@ TEST(Qos, DataWriter) DataWriterQos dwQosTAssigned2; dwQosWAssigned2 = dwQosShifted; dwQosTAssigned2 = tQosShifted; + dwQosTConstructed << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */ + dwQosTAssigned1 << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */ + dwQosTAssigned2 << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */ /* Compare the QoSses. */ ASSERT_NE(dwQosDefault, dwQosWConstructed); @@ -395,6 +401,7 @@ TEST(Qos, DataWriter) dwQosShifted >> tmpRepresentation; dwQosShifted >> tmpEnforcement; #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + dwQosShifted >> tmpWriterBatching; ASSERT_EQ(nonDefaultUserData, tmpUserData); ASSERT_EQ(nonDefaultDurability, tmpDurability); ASSERT_EQ(nonDefaultDeadline, tmpDeadline); @@ -412,6 +419,7 @@ TEST(Qos, DataWriter) ASSERT_EQ(nonDefaultRepresentation, tmpRepresentation); ASSERT_EQ(nonDefaultTypeConsistencyEnforcement, tmpEnforcement); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + ASSERT_EQ(nonDefaultWriterBatching, tmpWriterBatching); ASSERT_EQ(nonDefaultUserData, dwQosWConstructed.policy()); ASSERT_EQ(nonDefaultDurability, dwQosWConstructed.policy()); @@ -430,6 +438,7 @@ TEST(Qos, DataWriter) ASSERT_EQ(nonDefaultRepresentation, dwQosWConstructed.policy()); ASSERT_EQ(nonDefaultTypeConsistencyEnforcement, dwQosWConstructed.policy()); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + ASSERT_EQ(nonDefaultWriterBatching, dwQosWConstructed.policy()); #ifdef OMG_DDS_OWNERSHIP_SUPPORT dwQosShifted >> tmpStrength; @@ -646,4 +655,5 @@ TEST(Qos, policy_name) ASSERT_EQ(dds::core::policy::policy_name::name(), "DataRepresentation"); ASSERT_EQ(dds::core::policy::policy_name::name(), "TypeConsistencyEnforcement"); #endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT + ASSERT_EQ(dds::core::policy::policy_name::name(), "WriterBatching"); }