Skip to content

Commit

Permalink
Writer batching qos (#406)
Browse files Browse the repository at this point in the history
* Added writer batching QoSPolicy

To mirror the implementation of the writer batching QoSPolicy in
CycloneDDS-C commit d6906eb

Signed-off-by: Martijn Reicher <[email protected]>

* Added writer batching in throughput example

This is handled through the WriterBatching QoSPolicy

Signed-off-by: Martijn Reicher <[email protected]>

* Modified comments due to GitHub review

Signed-off-by: Martijn Reicher <[email protected]>

---------

Signed-off-by: Martijn Reicher <[email protected]>
  • Loading branch information
reicheratwork authored and eboasson committed Aug 28, 2023
1 parent 2b99e1c commit 291bf88
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/ddscxx/include/dds/core/policy/CorePolicy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ UserData;
typedef dds::core::policy::detail::WriterDataLifecycle
WriterDataLifecycle;

typedef dds::core::policy::detail::WriterBatching
WriterBatching;

#ifdef OMG_DDS_PERSISTENCE_SUPPORT
typedef ::dds::core::policy::detail::DurabilityService
DurabilityService;
Expand Down Expand Up @@ -192,6 +195,7 @@ OMG_DDS_POLICY_TRAITS(DurabilityService, 22)
OMG_DDS_POLICY_TRAITS(DataRepresentation, 23)
OMG_DDS_POLICY_TRAITS(TypeConsistencyEnforcement, 24)
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
OMG_DDS_POLICY_TRAITS(WriterBatching, 25)

}
}
Expand Down
60 changes: 60 additions & 0 deletions src/ddscxx/include/dds/core/policy/TCorePolicy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,66 @@ class TWriterDataLifecycle : public dds::core::Value<D>
*/
static TWriterDataLifecycle ManuallyDisposeUnregisteredInstances();

};
//==============================================================================

template <typename D>
class TWriterBatching : public dds::core::Value<D>
{
public:
/**
* Creates a WriterBatching QoS instance
*
* @param batch_updates a boolean indicating if updates should be batched
* before being explicitly flushed
*/
explicit TWriterBatching(bool batch_updates = false);

/**
* Copies a WriterBatching QoS instance
*
* @param other the WriterBatching QoS instance to copy
*/
TWriterBatching(const TWriterBatching& other);

/**
* Copies a WriterBatching QoS instance
*
* @param other the WriterBatching QoS instance to copy
*
* @return reference to the WriterBatching QoS instance that was copied to
*/
TWriterBatching& operator=(const TWriterBatching& other) = default;

public:
/**
* Gets a boolean indicating if updates should be batched
*
* @return a boolean indicating if updates should be batched
*/
bool batch_updates() const;

/**
* Sets a boolean indicating if updates should be batched
*
* @param batch_updates a boolean indicating if updates should be batched
*/
TWriterBatching& batch_updates(
bool batch_updates);

public:
/**
* @return a WriterBatching QoS instance with batch_updates
* set to true
*/
static TWriterBatching BatchUpdates();

/**
* @return a WriterBatching QoS instance with batch_updates
* set to false
*/
static TWriterBatching DoNotBatchUpdates();

};

//==============================================================================
Expand Down
3 changes: 3 additions & 0 deletions src/ddscxx/include/dds/core/policy/detail/CorePolicy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ namespace dds { namespace core { namespace policy { namespace detail {

typedef dds::core::policy::TWriterDataLifecycle<org::eclipse::cyclonedds::core::policy::WriterDataLifecycleDelegate>
WriterDataLifecycle;

typedef dds::core::policy::TWriterBatching<org::eclipse::cyclonedds::core::policy::WriterBatchingDelegate>
WriterBatching;
} } } } // namespace dds::core::policy::detail


Expand Down
39 changes: 39 additions & 0 deletions src/ddscxx/include/dds/core/policy/detail/TCorePolicyImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,45 @@ TWriterDataLifecycle<D> TWriterDataLifecycle<D>::ManuallyDisposeUnregisteredInst
return TWriterDataLifecycle(false);
}

//TWriterBatching
template <typename D>
TWriterBatching<D>::TWriterBatching(bool batch_updates): dds::core::Value<D>(batch_updates)
{
}

template <typename D>
TWriterBatching<D>::TWriterBatching(const TWriterBatching& other): dds::core::Value<D>(other.delegate())
{
}

template <typename D>
bool TWriterBatching<D>::batch_updates() const
{
return this->delegate().batch_updates();
}

template <typename D>
TWriterBatching<D>& TWriterBatching<D>::batch_updates(
bool batch_updates)
{
this->delegate().batch_updates(batch_updates);
return *this;
}

template <typename D>
TWriterBatching<D> TWriterBatching<D>::BatchUpdates()
{
return TWriterBatching(true);
}


template <typename D>
TWriterBatching<D> TWriterBatching<D>::DoNotBatchUpdates()
{
return TWriterBatching(false);
}


//TReaderDataLifecycle
template <typename D>
TReaderDataLifecycle<D>::TReaderDataLifecycle(const dds::core::Duration& autopurge_nowriter_samples_delay, const dds::core::Duration& autopurge_disposed_samples_delay)
Expand Down
1 change: 1 addition & 0 deletions src/ddscxx/include/dds/pub/qos/detail/DataWriterQos.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* dds::core::policy::WriterDataLifecycle | Dispose with unregister or not (@ref DCPS_QoS_WriterDataLifecycle "info") | WriterDataLifecycle::AutoDisposeUnregisteredInstances()
* dds::core::policy::DataRepresentation | Supported data representation kinds (@ref DCPS_QoS_DataRepresentation "info") | DataRepresentation::DataRepresentation(dds::core::policy::DataRepresentationId::XCDR1)
* dds::core::policy::TypeConsistencyEnforcement | Type consistency enforcement policies (@ref DCPS_QoS_TypeConsistencyEnforcement "info") | dds::core::policy::TypeConsistencyKind::DISALLOW_TYPE_COERCION
* dds::core::policy::WriterBatching | Writer data batching | dds::core::policy::WriterBatching::DoNotBatchUpdates()
*
* A QosPolicy can be set when the DataWriter is created or modified with the set
* qos operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,29 @@ class OMG_DDS_API WriterDataLifecycleDelegate
bool autodispose_;
};

//==============================================================================

class OMG_DDS_API WriterBatchingDelegate
{
public:
WriterBatchingDelegate(const WriterBatchingDelegate& other);
explicit WriterBatchingDelegate(bool batch_updates);

bool batch_updates() const;
void batch_updates(bool b);

bool operator ==(const WriterBatchingDelegate& other) const;

WriterBatchingDelegate& operator =(const WriterBatchingDelegate& other) = default;

void check() const;

void set_iso_policy(const dds_qos_t* qos);
void set_c_policy(dds_qos_t* qos) const;

private:
bool batch_updates_;
};


#ifdef OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class OMG_DDS_API DataWriterQosDelegate
void policy(const dds::core::policy::DataRepresentation& datarepresentation);
void policy(const dds::core::policy::TypeConsistencyEnforcement& typeconsistencyenforcement);
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
void policy(const dds::core::policy::WriterBatching& writerbatching);

template <typename POLICY> const POLICY& policy() const;
template <typename POLICY> POLICY& policy();
Expand Down Expand Up @@ -96,6 +97,7 @@ class OMG_DDS_API DataWriterQosDelegate
dds::core::policy::DataRepresentation datarepresentation_;
dds::core::policy::TypeConsistencyEnforcement typeconsistencyenforcement_;
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
dds::core::policy::WriterBatching writerbatching_;
};


Expand Down Expand Up @@ -296,6 +298,18 @@ DataWriterQosDelegate::policy<dds::core::policy::TypeConsistencyEnforcement>()
}
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT

template<> inline const dds::core::policy::WriterBatching&
DataWriterQosDelegate::policy<dds::core::policy::WriterBatching>() const
{
return writerbatching_;
}

template<> inline dds::core::policy::WriterBatching&
DataWriterQosDelegate::policy<dds::core::policy::WriterBatching>()
{
return writerbatching_;
}

}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/ddscxx/src/dds/core/policy/CorePolicy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::TypeConsistencyEnforcement, Type
#ifdef OMG_DDS_PERSISTENCE_SUPPORT
OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::DurabilityService, DurabilityService)
#endif // OMG_DDS_PERSISTENCE_SUPPORT
OMG_DDS_DEFINE_POLICY_TRAITS(dds::core::policy::WriterBatching, WriterBatching)
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,48 @@ void WriterDataLifecycleDelegate::set_c_policy(dds_qos_t* qos) const
}


//==============================================================================

WriterBatchingDelegate::WriterBatchingDelegate(const WriterBatchingDelegate& other)
: batch_updates_(other.batch_updates_)
{
}

WriterBatchingDelegate::WriterBatchingDelegate(bool batch_updates): batch_updates_(batch_updates)
{
}

bool WriterBatchingDelegate::batch_updates() const
{
return batch_updates_;
}

void WriterBatchingDelegate::batch_updates(bool b)
{
batch_updates_ = b;
}

bool WriterBatchingDelegate::operator ==(const WriterBatchingDelegate& other) const
{
return other.batch_updates() == batch_updates_;
}

void WriterBatchingDelegate::check() const
{
/* The batch_updates is just a boolean: nothing to check. */
}

void WriterBatchingDelegate::set_iso_policy(const dds_qos_t* qos)
{
(void)dds_qget_writer_batching(qos, &batch_updates_);
}

void WriterBatchingDelegate::set_c_policy(dds_qos_t* qos) const
{
dds_qset_writer_batching(qos, batch_updates_);
}


//==============================================================================

#ifdef OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,9 @@ AnyDataWriterDelegate::write_flush()
void
AnyDataWriterDelegate::set_batch(bool enable)
{
DDSRT_WARNING_DEPRECATED_OFF
dds_write_set_batch (enable);
DDSRT_WARNING_DEPRECATED_ON
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ DataWriterQosDelegate::policy(const dds::core::policy::TypeConsistencyEnforcemen
}
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT

void
DataWriterQosDelegate::policy(const dds::core::policy::WriterBatching& writerbatching)
{
writerbatching.delegate().check();
writerbatching_ = writerbatching;
}

dds_qos_t*
DataWriterQosDelegate::ddsc_qos() const
{
Expand All @@ -184,6 +191,7 @@ DataWriterQosDelegate::ddsc_qos() const
datarepresentation_.delegate().set_c_policy(qos);
typeconsistencyenforcement_.delegate().set_c_policy(qos);
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
writerbatching_.delegate().set_c_policy(qos);
return qos;
}

Expand Down Expand Up @@ -211,6 +219,7 @@ DataWriterQosDelegate::ddsc_qos(const dds_qos_t* qos)
datarepresentation_.delegate().set_iso_policy(qos);
typeconsistencyenforcement_.delegate().set_iso_policy(qos);
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
writerbatching_.delegate().set_iso_policy(qos);
}

void
Expand Down Expand Up @@ -242,6 +251,7 @@ DataWriterQosDelegate::named_qos(const struct _DDS_NamedDataWriterQos &qos)
datarepresentation_.delegate().v_policy((v_writerDataRepresentationPolicy&)(q->writer_datarepresentation));
typeconsistencyenforcement_.delegate().v_policy((v_writerTypeConsistencyEnforcementPolicy&)(q->writer_typeconsistencyenforcement));
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
writerbatching_.delegate().v_policy((v_writerbatchingPolicy&)(q->writer_batching) );
#endif
}

Expand Down Expand Up @@ -276,6 +286,7 @@ DataWriterQosDelegate::operator ==(const DataWriterQosDelegate& other) const
&& other.datarepresentation_ == datarepresentation_
&& other.typeconsistencyenforcement_ == typeconsistencyenforcement_
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
&& other.writerbatching_ == writerbatching_
;
}

Expand Down
10 changes: 10 additions & 0 deletions src/ddscxx/tests/Qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ DataRepresentation nonDefaultRepresentation({dds::core::policy::DataRepresen
dds::core::policy::DataRepresentationId::XCDR2});
TypeConsistencyEnforcement nonDefaultTypeConsistencyEnforcement(dds::core::policy::TypeConsistencyKind::ALLOW_TYPE_COERCION, true, true, true, true, true);
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
WriterBatching nonDefaultWriterBatching(true);



Expand Down Expand Up @@ -107,6 +108,7 @@ ReaderDataLifecycle tmpRdLifecycle;
DataRepresentation tmpRepresentation;
TypeConsistencyEnforcement tmpEnforcement;
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
WriterBatching tmpWriterBatching;

TEST(Qos, DomainParticipant)
{
Expand Down Expand Up @@ -354,6 +356,7 @@ TEST(Qos, DataWriter)
<< nonDefaultRepresentation
<< nonDefaultTypeConsistencyEnforcement
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
<< nonDefaultWriterBatching
;
DataWriterQos dwQosWConstructed(dwQosShifted);
DataWriterQos dwQosWAssigned1 = dwQosShifted; /* Actually calls copy constructor. */
Expand All @@ -363,6 +366,9 @@ TEST(Qos, DataWriter)
DataWriterQos dwQosTAssigned2;
dwQosWAssigned2 = dwQosShifted;
dwQosTAssigned2 = tQosShifted;
dwQosTConstructed << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */
dwQosTAssigned1 << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */
dwQosTAssigned2 << nonDefaultWriterBatching; /* Necessary, since the TopicQos does not have the WriterBatching QoSPolicy. */

/* Compare the QoSses. */
ASSERT_NE(dwQosDefault, dwQosWConstructed);
Expand Down Expand Up @@ -395,6 +401,7 @@ TEST(Qos, DataWriter)
dwQosShifted >> tmpRepresentation;
dwQosShifted >> tmpEnforcement;
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
dwQosShifted >> tmpWriterBatching;
ASSERT_EQ(nonDefaultUserData, tmpUserData);
ASSERT_EQ(nonDefaultDurability, tmpDurability);
ASSERT_EQ(nonDefaultDeadline, tmpDeadline);
Expand All @@ -412,6 +419,7 @@ TEST(Qos, DataWriter)
ASSERT_EQ(nonDefaultRepresentation, tmpRepresentation);
ASSERT_EQ(nonDefaultTypeConsistencyEnforcement, tmpEnforcement);
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
ASSERT_EQ(nonDefaultWriterBatching, tmpWriterBatching);

ASSERT_EQ(nonDefaultUserData, dwQosWConstructed.policy<UserData>());
ASSERT_EQ(nonDefaultDurability, dwQosWConstructed.policy<Durability>());
Expand All @@ -430,6 +438,7 @@ TEST(Qos, DataWriter)
ASSERT_EQ(nonDefaultRepresentation, dwQosWConstructed.policy<DataRepresentation>());
ASSERT_EQ(nonDefaultTypeConsistencyEnforcement, dwQosWConstructed.policy<TypeConsistencyEnforcement>());
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
ASSERT_EQ(nonDefaultWriterBatching, dwQosWConstructed.policy<WriterBatching>());

#ifdef OMG_DDS_OWNERSHIP_SUPPORT
dwQosShifted >> tmpStrength;
Expand Down Expand Up @@ -646,4 +655,5 @@ TEST(Qos, policy_name)
ASSERT_EQ(dds::core::policy::policy_name<DataRepresentation>::name(), "DataRepresentation");
ASSERT_EQ(dds::core::policy::policy_name<TypeConsistencyEnforcement>::name(), "TypeConsistencyEnforcement");
#endif // OMG_DDS_EXTENSIBLE_AND_DYNAMIC_TOPIC_TYPE_SUPPORT
ASSERT_EQ(dds::core::policy::policy_name<WriterBatching>::name(), "WriterBatching");
}

0 comments on commit 291bf88

Please sign in to comment.