From d144df1906f54d53453c872d6caab34e1dc2e005 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Tue, 28 Nov 2023 18:46:57 +0100 Subject: [PATCH 1/3] Fix raw sample/cdr confusion in loans There were various issues causing confusion in the contents and size of loans: * dds_write_impl performed a look-ahead and could decide to allocate memory for a PSMX loan, only taking into account the type to decide whether to allocate based on the raw sample size or the serialized size, but failing to account for the possibility of a serialized key value. * What used to be the "force serialization" flag was interpreted by serdata_default as (also) requiring that the data in a loan would be in serialized form, even though that wasn't the case. It will handle serialized data just fine even if it is not necessary, but there is no reason for serializing it into shared memory if the data can just be memcpy'd around. * This resulted in serdata_default using slightly different rules for deciding whether or not to serialize and so could (attempt to) put serialized data in a loan allocated for a raw sample. The metadata would be set correctly, and the amount of memory copied was that of the destination buffer, so as long as the CDR was no longer than the raw sample (unlikely) and the serdata buffer happened to be large enough, it was ok. But those conditions are not always satisfied. Taken together, this meant: out-of-bounds reads and inefficiencies. This commit refactors dds_write_impl, taking the burden of doing things with the contents of the loans from the serdata implementation (it leaves the interface unchanged), instead doing it in dds_write_impl where all the relevant information is available. There was also some confusion in the handling of loans of complex types, where the application would be given a heap loan but the serdata implementation did not handle this correctly throughout. The code now supports it properly. Note that returning a heap loan even for a PSMX writer is deliberate: this way, the types, memory allocation routines and lifetimes to be used for any reference in the type is independent of the use of PSMX plugins. It also moves the publishing via PSMX to *before* the network part. This ensures that the latency via shared memory is not affected by the speed with which the network can absorb the data. In this refactored version, supporting zero-copy within the process (and without PSMX) was missing. That has also been added. Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds__write.h | 9 +- src/core/ddsc/src/dds_heap_loan.c | 5 +- src/core/ddsc/src/dds_serdata_default.c | 68 ++- src/core/ddsc/src/dds_write.c | 415 +++++++++++------- src/core/ddsc/tests/psmx.c | 29 +- src/core/ddsi/include/dds/ddsi/ddsi_serdata.h | 8 +- src/core/ddsi/src/ddsi_serdata.c | 2 +- src/core/xtests/symbol_export/symbol_export.c | 3 + 8 files changed, 320 insertions(+), 219 deletions(-) diff --git a/src/core/ddsc/src/dds__write.h b/src/core/ddsc/src/dds__write.h index 429fc7e66e..25dc1dabbc 100644 --- a/src/core/ddsc/src/dds__write.h +++ b/src/core/ddsc/src/dds__write.h @@ -32,13 +32,16 @@ typedef enum { /** @component write_data */ DDS_EXPORT_INTERNAL_FUNCTION -dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t tstamp, dds_write_action action); +dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t timestamp, dds_write_action action) + ddsrt_attribute_warn_unused_result ddsrt_nonnull_all; /** @component write_data */ -dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_xpack *xp, struct ddsi_serdata *d, bool flush); +dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_xpack *xp, struct ddsi_serdata *d, bool flush) + ddsrt_attribute_warn_unused_result ddsrt_nonnull_all; /** @component write_data */ -dds_return_t dds_writecdr_local_orphan_impl (struct ddsi_local_orphan_writer *lowr, struct ddsi_serdata *d); +dds_return_t dds_writecdr_local_orphan_impl (struct ddsi_local_orphan_writer *lowr, struct ddsi_serdata *d) + ddsrt_nonnull_all; /** @component write_data */ void dds_write_flush_impl (dds_writer *wr); diff --git a/src/core/ddsc/src/dds_heap_loan.c b/src/core/ddsc/src/dds_heap_loan.c index 819a7b43f5..03ad5b6ba7 100644 --- a/src/core/ddsc/src/dds_heap_loan.c +++ b/src/core/ddsc/src/dds_heap_loan.c @@ -47,7 +47,7 @@ const dds_loaned_sample_ops_t dds_loan_heap_ops = { dds_return_t dds_heap_loan (const struct ddsi_sertype *type, dds_loaned_sample_state_t sample_state, struct dds_loaned_sample **loaned_sample) { - assert (sample_state == DDS_LOANED_SAMPLE_STATE_RAW_KEY || sample_state == DDS_LOANED_SAMPLE_STATE_RAW_DATA); + assert (sample_state == DDS_LOANED_SAMPLE_STATE_UNITIALIZED || sample_state == DDS_LOANED_SAMPLE_STATE_RAW_KEY || sample_state == DDS_LOANED_SAMPLE_STATE_RAW_DATA); dds_heap_loan_t *s = ddsrt_malloc (sizeof (*s)); if (s == NULL) @@ -65,6 +65,9 @@ dds_return_t dds_heap_loan (const struct ddsi_sertype *type, dds_loaned_sample_s s->c.metadata->sample_state = sample_state; s->c.metadata->cdr_identifier = DDSI_RTPS_SAMPLE_NATIVE; s->c.metadata->cdr_options = 0; + s->c.metadata->sample_size = type->sizeof_type; + s->c.metadata->instance_id = 0; + s->c.metadata->data_type = 0; s->c.loan_origin.origin_kind = DDS_LOAN_ORIGIN_KIND_HEAP; s->c.loan_origin.psmx_endpoint = NULL; ddsrt_atomic_st32 (&s->c.refc, 1); diff --git a/src/core/ddsc/src/dds_serdata_default.c b/src/core/ddsc/src/dds_serdata_default.c index 161305907c..a812721158 100644 --- a/src/core/ddsc/src/dds_serdata_default.c +++ b/src/core/ddsc/src/dds_serdata_default.c @@ -513,9 +513,10 @@ static struct ddsi_serdata *serdata_default_from_keyhash_cdr_nokey (const struct static void istream_from_serdata_default (dds_istream_t * __restrict s, const struct dds_serdata_default * __restrict d) { - if (d->c.loan != NULL) + if (d->c.loan != NULL && + (d->c.loan->metadata->sample_state == DDS_LOANED_SAMPLE_STATE_SERIALIZED_KEY || + d->c.loan->metadata->sample_state == DDS_LOANED_SAMPLE_STATE_SERIALIZED_DATA)) { - assert (d->c.loan->metadata->sample_state == DDS_LOANED_SAMPLE_STATE_SERIALIZED_KEY || d->c.loan->metadata->sample_state == DDS_LOANED_SAMPLE_STATE_SERIALIZED_DATA); s->m_buffer = d->c.loan->sample_ptr; s->m_index = 0; s->m_size = d->c.loan->metadata->sample_size; @@ -715,6 +716,7 @@ static bool serdata_default_to_sample_cdr (const struct ddsi_serdata *serdata_co dds_istream_t is; if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */ if (d->c.loan != NULL && + tp->c.is_memcpy_safe && (d->c.loan->metadata->sample_state == DDS_LOANED_SAMPLE_STATE_RAW_DATA || d->c.loan->metadata->sample_state == DDS_LOANED_SAMPLE_STATE_RAW_KEY)) { @@ -844,68 +846,50 @@ static bool loaned_sample_state_to_serdata_kind (dds_loaned_sample_state_t lss, return false; } -static struct ddsi_serdata *serdata_default_from_loaned_sample (const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, dds_loaned_sample_t *loaned_sample, bool force_serialization) +static struct ddsi_serdata *serdata_default_from_loaned_sample (const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, dds_loaned_sample_t *loaned_sample, bool will_require_cdr) { /* type = the type of data being serialized kind = the kind of data contained (key or normal) sample = the raw sample made into the serdata loaned_sample = the loaned buffer in use - force_serialization = whether the contents of the loaned sample should be serialized + will_require_cdr = whether we will need the CDR (or a highly likely to need it) */ const struct dds_sertype_default *tp = (const struct dds_sertype_default *) type; - assert (loaned_sample->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX); - bool serialize_data = force_serialization || !type->is_memcpy_safe; + assert (sample == loaned_sample->sample_ptr); + assert (loaned_sample->metadata->sample_state == (kind == SDK_KEY ? DDS_LOANED_SAMPLE_STATE_RAW_KEY : DDS_LOANED_SAMPLE_STATE_RAW_DATA)); + assert (loaned_sample->metadata->cdr_identifier == DDSI_RTPS_SAMPLE_NATIVE); + assert (loaned_sample->metadata->cdr_options == 0); struct dds_serdata_default *d; - if (serialize_data) + if (will_require_cdr) { - // maybe if there is a loan and that loan is not the sample, use the loan block as the serialization buffer? + // If serialization is/will be required, construct the serdata the normal way d = (struct dds_serdata_default *) type->serdata_ops->from_sample (type, kind, sample); + if (d == NULL) + return NULL; } else { + // If we know there is no neeed for the serialized representation (so only PSMX and "memcpy safe"), + // construct an empty serdata and stay away from the serializers d = serdata_default_new (tp, kind, tp->write_encoding_version); - if (d == NULL || !gen_serdata_key_from_sample (tp, &d->key, sample)) + if (d == NULL) return NULL; - } - - if (d != NULL) - { - // now owner of loan - d->c.loan = loaned_sample; - if (d->c.loan->sample_ptr != sample) //if the sample we are serializing is itself not loaned - { - assert (d->c.loan->metadata->sample_state == DDS_LOANED_SAMPLE_STATE_UNITIALIZED); - if (serialize_data) - { - d->c.loan->metadata->sample_state = (kind == SDK_KEY ? DDS_LOANED_SAMPLE_STATE_SERIALIZED_KEY : DDS_LOANED_SAMPLE_STATE_SERIALIZED_DATA); - d->c.loan->metadata->cdr_identifier = d->hdr.identifier; - d->c.loan->metadata->cdr_options = d->hdr.options; - memcpy (d->c.loan->sample_ptr, d->data, d->c.loan->metadata->sample_size); - } - else - { - d->c.loan->metadata->sample_state = (kind == SDK_KEY ? DDS_LOANED_SAMPLE_STATE_RAW_KEY : DDS_LOANED_SAMPLE_STATE_RAW_DATA); - d->c.loan->metadata->cdr_identifier = DDSI_RTPS_SAMPLE_NATIVE; - d->c.loan->metadata->cdr_options = 0; - memcpy (d->c.loan->sample_ptr, sample, d->c.loan->metadata->sample_size); - } - } - else + if (!gen_serdata_key_from_sample (tp, &d->key, sample)) { - d->c.loan->metadata->sample_state = (kind == SDK_KEY ? DDS_LOANED_SAMPLE_STATE_RAW_KEY : DDS_LOANED_SAMPLE_STATE_RAW_DATA); - d->c.loan->metadata->cdr_identifier = DDSI_RTPS_SAMPLE_NATIVE; - d->c.loan->metadata->cdr_options = 0; + ddsi_serdata_unref (&d->c); + return NULL; } - - if (tp->c.has_key) - (void) fix_serdata_default (d, tp->c.serdata_basehash); - else - (void) fix_serdata_default_nokey (d, tp->c.serdata_basehash); } + // now owner of loan + d->c.loan = loaned_sample; + if (tp->c.has_key) + (void) fix_serdata_default (d, tp->c.serdata_basehash); + else + (void) fix_serdata_default_nokey (d, tp->c.serdata_basehash); return (struct ddsi_serdata *) d; } diff --git a/src/core/ddsc/src/dds_write.c b/src/core/ddsc/src/dds_write.c index 6c608d37d6..34e4b2e458 100644 --- a/src/core/ddsc/src/dds_write.c +++ b/src/core/ddsc/src/dds_write.c @@ -397,10 +397,10 @@ static dds_return_t dds_writecdr_impl_common (struct dds_writer *wr, struct ddsi return ret; } -static bool evaluate_topic_filter (const dds_writer *wr, const void *data, bool writekey) +static bool evaluate_topic_filter (const dds_writer *wr, const void *data, enum ddsi_serdata_kind sdkind) { // false if data rejected by filter - if (wr->m_topic->m_filter.mode == DDS_TOPIC_FILTER_NONE || writekey) + if (wr->m_topic->m_filter.mode == DDS_TOPIC_FILTER_NONE || sdkind == SDK_KEY) return true; const struct dds_topic_filter *f = &wr->m_topic->m_filter; @@ -428,39 +428,8 @@ static bool evaluate_topic_filter (const dds_writer *wr, const void *data, bool return true; } -static bool requires_serialization(struct dds_topic *topic) -{ - return !topic->m_stype->is_memcpy_safe; -} - -static bool allows_serialization_into_buffer(struct dds_topic *topic) -{ - return topic->m_stype->ops->serialize_into != NULL && - topic->m_stype->ops->get_serialized_size != NULL; -} - -static bool get_required_buffer_size(struct dds_topic *topic, const void *sample, uint32_t *sz32) -{ - size_t sz; - assert (topic && sz32 && sample); - - if (!requires_serialization(topic)) - sz = topic->m_stype->sizeof_type; - else if (allows_serialization_into_buffer(topic)) - sz = ddsi_sertype_get_serialized_size(topic->m_stype, (void*) sample); - else - return false; - - if (sz == SIZE_MAX || sz > UINT32_MAX) - return false; // SIZE_MAX: error value (FIXME) or oversize - *sz32 = (uint32_t) sz; - return true; -} - -static dds_return_t dds_write_basic_impl (struct ddsi_thread_state * const ts, dds_writer *wr, struct ddsi_serdata *d) - ddsrt_nonnull_all; - -static dds_return_t dds_write_basic_impl (struct ddsi_thread_state * const ts, dds_writer *wr, struct ddsi_serdata *d) +ddsrt_nonnull_all +static dds_return_t dds_write_impl_deliver_via_ddsi (struct ddsi_thread_state * const ts, dds_writer *wr, struct ddsi_serdata *d) { struct ddsi_writer *ddsi_wr = wr->m_wr; dds_return_t ret = DDS_RETCODE_OK; @@ -516,7 +485,7 @@ dds_return_t dds_request_writer_loan (dds_writer *wr, enum dds_writer_loan_type ret = DDS_RETCODE_OK; } else - ret = dds_heap_loan (wr->m_topic->m_stype, DDS_LOANED_SAMPLE_STATE_RAW_DATA, &loan); + ret = dds_heap_loan (wr->m_topic->m_stype, DDS_LOANED_SAMPLE_STATE_UNITIALIZED, &loan); break; } @@ -562,97 +531,14 @@ dds_return_t dds_return_writer_loan (dds_writer *wr, void **samples_ptr, int32_t return ret; } -static dds_loaned_sample_t *get_loan_to_use (dds_writer *wr, const void *data, dds_loaned_sample_t **loan_to_free) -{ - // 3. Check whether data is loaned - dds_loaned_sample_t *supplied_loan = dds_loan_pool_find_and_remove_loan (wr->m_loans, data); - assert (supplied_loan == NULL || ddsrt_atomic_ld32 (&supplied_loan->refc) == 1); - - assert ((supplied_loan == NULL) || - (supplied_loan != NULL && ddsrt_atomic_ld32 (&supplied_loan->refc) == 1 && supplied_loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_HEAP) || - (supplied_loan != NULL && ddsrt_atomic_ld32 (&supplied_loan->refc) == 1 && supplied_loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX)); - if (supplied_loan && supplied_loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX) - { - // a PSMX loan, use it - *loan_to_free = NULL; - return supplied_loan; - } - - dds_loaned_sample_t *loan = NULL; - - // 4. If it is a heap loan, attempt to get a PSMX loan - // FIXME: the condition is actually: if not a PSMX loan, try to get one - // FIXME: should this not be required to succeed? We're assuming the PSMX bit will work later on - // FIXME: what about: supplied_loan is a heap loan and no PSMX involved? why not use it? - if (wr->m_endpoint.psmx_endpoints.length > 0) - { - uint32_t required_size = 0; - assert (wr->m_endpoint.psmx_endpoints.length == 1); // FIXME: support multiple PSMX instances - if (get_required_buffer_size (wr->m_topic, data, &required_size) && required_size) - { - // attempt to get a loan from a PSMX - loan = dds_psmx_endpoint_request_loan (wr->m_endpoint.psmx_endpoints.endpoints[0], required_size); - } - } - - // too many cases ... - assert ((supplied_loan == NULL && loan == NULL) || - (supplied_loan == NULL && loan != NULL && ddsrt_atomic_ld32 (&loan->refc) == 1 && loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX) || - (supplied_loan != NULL && loan == NULL && ddsrt_atomic_ld32 (&supplied_loan->refc) == 1 && supplied_loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_HEAP) || - (supplied_loan != NULL && loan == supplied_loan && ddsrt_atomic_ld32 (&loan->refc) == 1 && loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX) || - (supplied_loan != NULL && loan != supplied_loan - && ddsrt_atomic_ld32 (&supplied_loan->refc) == 1 && supplied_loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX - && ddsrt_atomic_ld32 (&loan->refc) == 1 && loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX)); - - // by definition different from loan - // not to be freed yet: freeing it invalidates data - *loan_to_free = supplied_loan; - return loan; -} - -static struct ddsi_serdata *make_serdata (struct ddsi_writer * const ddsi_wr, const void *data, dds_loaned_sample_t *loan, bool writekey, bool use_only_psmx) +ddsrt_nonnull_all +static bool dds_write_impl_use_only_psmx (dds_writer *wr) { - struct ddsi_serdata *d; - if (loan == NULL) - d = ddsi_serdata_from_sample (ddsi_wr->type, writekey ? SDK_KEY : SDK_DATA, data); - else - { - assert (ddsrt_atomic_ld32 (&loan->refc) == 1); - d = ddsi_serdata_from_loaned_sample (ddsi_wr->type, writekey ? SDK_KEY : SDK_DATA, data, loan, !use_only_psmx); - } - if (d == NULL) - { - if (loan != NULL) - dds_loaned_sample_unref (loan); - } - return d; -} - -// has to support two cases: -// 1) data is in an external buffer allocated on the stack or dynamically -// 2) data is in an zerocopy buffer obtained by dds_loan_sample -dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t tstamp, dds_write_action action) -{ - // 1. Input validation - struct ddsi_thread_state * const thrst = ddsi_lookup_thread_state (); - const bool writekey = action & DDS_WR_KEY_BIT; - struct ddsi_writer *ddsi_wr = wr->m_wr; - int ret = DDS_RETCODE_OK; - - if (data == NULL) - return DDS_RETCODE_BAD_PARAMETER; - - // 2. Topic filter - if (!evaluate_topic_filter (wr, data, writekey)) - return DDS_RETCODE_OK; - - ddsi_thread_state_awake (thrst, &wr->m_entity.m_domain->gv); - - // 3. Check whether data is loaned - dds_loaned_sample_t *loan_to_free; - dds_loaned_sample_t * const loan = get_loan_to_use (wr, data, &loan_to_free); - assert (loan == NULL || loan != loan_to_free); + // Return false if PSMX is not involved + if (wr->m_endpoint.psmx_endpoints.length == 0) + return false; + struct ddsi_writer * const ddsi_wr = wr->m_wr; // ddsi_wr->as can be changed by the matching/unmatching of proxy readers if we don't hold the lock // it is rather unfortunate that this then means we have to lock here to check, then lock again to // actually distribute the data, so some further refactoring is needed. @@ -678,49 +564,262 @@ dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t tstamp // and hence they are not considered for data transfer. // The alternative is to block new fast path connections entirely (by holding // the mutex) until data delivery is complete. - const bool use_only_psmx = ddsi_wr->xqos->durability.kind == DDS_DURABILITY_VOLATILE && no_network_readers && no_fast_path_readers; + return ddsi_wr->xqos->durability.kind == DDS_DURABILITY_VOLATILE && no_network_readers && no_fast_path_readers; +} + +ddsrt_attribute_warn_unused_result ddsrt_nonnull_all +static dds_return_t dds_write_impl_deliver_via_psmx (struct dds_loaned_sample *loan) +{ + assert (loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX); + assert (loan->metadata->sample_state != DDS_LOANED_SAMPLE_STATE_UNITIALIZED); + struct dds_psmx_endpoint * const endpoint = loan->loan_origin.psmx_endpoint; + const dds_return_t ret = endpoint->ops.write (endpoint, loan); + dds_loaned_sample_unref (loan); + return ret; +} - // create a correct serdata - struct ddsi_serdata * const d = make_serdata (ddsi_wr, data, loan, writekey, use_only_psmx); +ddsrt_nonnull_all +static void dds_write_impl_set_loan_writeinfo (const struct dds_writer *wr, struct dds_loaned_sample *loan, dds_time_t timestamp, uint32_t statusinfo) +{ + assert (loan->metadata->sample_state != DDS_LOANED_SAMPLE_STATE_UNITIALIZED); + struct dds_psmx_metadata *md = loan->metadata; + memcpy (&md->guid, &wr->m_entity.m_guid, sizeof (md->guid)); + md->timestamp = timestamp; + md->statusinfo = statusinfo; +} - // data, loan_to_free no longer needed (all paths) - if (loan_to_free) - dds_loaned_sample_unref (loan_to_free); +ddsrt_attribute_warn_unused_result ddsrt_nonnull_all +static struct dds_loaned_sample *dds_write_impl_make_raw_loan (const struct dds_writer *wr, const void *data, enum ddsi_serdata_kind sdkind, dds_time_t timestamp, uint32_t statusinfo) +{ + struct ddsi_sertype const * const sertype = wr->m_wr->type; + assert (sertype->is_memcpy_safe); + assert (wr->m_endpoint.psmx_endpoints.length == 1); // FIXME: support multiple PSMX instances + struct dds_loaned_sample * const loan = dds_psmx_endpoint_request_loan (wr->m_endpoint.psmx_endpoints.endpoints[0], sertype->sizeof_type); + if (loan == NULL) + return NULL; + struct dds_psmx_metadata * const md = loan->metadata; + md->sample_state = (sdkind == SDK_KEY) ? DDS_LOANED_SAMPLE_STATE_RAW_KEY : DDS_LOANED_SAMPLE_STATE_RAW_DATA; + md->cdr_identifier = DDSI_RTPS_SAMPLE_NATIVE; + md->cdr_options = 0; + if (sdkind == SDK_DATA || sertype->has_key) + memcpy (loan->sample_ptr, data, sertype->sizeof_type); + dds_write_impl_set_loan_writeinfo (wr, loan, timestamp, statusinfo); + return loan; +} - // bail out if serdata creation failed - if (d == NULL) +ddsrt_attribute_warn_unused_result ddsrt_nonnull_all +static struct dds_loaned_sample *dds_write_impl_make_serialized_loan (const struct dds_writer *wr, const struct ddsi_serdata *sd) +{ + assert (wr->m_endpoint.psmx_endpoints.length == 1); // FIXME: support multiple PSMX instances + assert (ddsi_serdata_size (sd) >= 4); + const uint32_t loan_size = ddsi_serdata_size (sd) - 4; + struct dds_loaned_sample * const loan = dds_psmx_endpoint_request_loan (wr->m_endpoint.psmx_endpoints.endpoints[0], loan_size); + if (loan == NULL) + return NULL; + struct dds_psmx_metadata * const md = loan->metadata; + md->sample_state = (sd->kind == SDK_KEY) ? DDS_LOANED_SAMPLE_STATE_SERIALIZED_KEY : DDS_LOANED_SAMPLE_STATE_SERIALIZED_DATA; + struct { uint16_t identifier, options; } header; + ddsi_serdata_to_ser (sd, 0, 4, &header); + md->cdr_identifier = header.identifier; + md->cdr_options = header.options; + if (loan_size > 0) + ddsi_serdata_to_ser (sd, 4, loan_size, loan->sample_ptr); + dds_write_impl_set_loan_writeinfo (wr, loan, sd->timestamp.v, sd->statusinfo); + return loan; +} + +ddsrt_attribute_warn_unused_result ddsrt_nonnull ((1, 3)) +static struct ddsi_serdata *dds_write_impl_make_serdata (const struct ddsi_sertype *sertype, enum ddsi_serdata_kind sdkind, const void *data, struct dds_loaned_sample *heap_loan, dds_time_t timestamp, uint32_t statusinfo) +{ + assert (heap_loan == NULL || heap_loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_HEAP); + struct ddsi_serdata *serdata; + if (heap_loan == NULL) + serdata = ddsi_serdata_from_sample (sertype, sdkind, data); + else // claim "cdr required" to keep things simple + serdata = ddsi_serdata_from_loaned_sample (sertype, sdkind, data, heap_loan, true); + if (serdata == NULL) + return NULL; + serdata->statusinfo = statusinfo; + serdata->timestamp.v = timestamp; + return serdata; +} + +ddsrt_attribute_warn_unused_result ddsrt_nonnull_all +static dds_return_t dds_write_impl_psmxloan_serdata (struct dds_writer *wr, const void *data, enum ddsi_serdata_kind sdkind, dds_time_t timestamp, uint32_t statusinfo, struct dds_loaned_sample **psmx_loan, struct ddsi_serdata **serdata) +{ + const bool use_only_psmx = dds_write_impl_use_only_psmx (wr); + struct ddsi_sertype const * const sertype = wr->m_wr->type; + struct dds_loaned_sample *loan = dds_loan_pool_find_and_remove_loan (wr->m_loans, data); + if (loan) + { + // If we have a loan: + // - if there is only PSMX involved, we can short-circuit delivery and get out quickly + // - if anything is around, construct a regular serdata and derive a PSMX copy of it afterward if needed + assert (ddsrt_atomic_ld32 (&loan->refc) == 1); + struct dds_psmx_metadata * const md = loan->metadata; + assert (md->sample_state == DDS_LOANED_SAMPLE_STATE_UNITIALIZED); + md->sample_state = (sdkind == SDK_KEY) ? DDS_LOANED_SAMPLE_STATE_RAW_KEY : DDS_LOANED_SAMPLE_STATE_RAW_DATA; + md->cdr_identifier = DDSI_RTPS_SAMPLE_NATIVE; + md->cdr_options = 0; + dds_write_impl_set_loan_writeinfo (wr, loan, timestamp, statusinfo); + switch (loan->loan_origin.origin_kind) + { + case DDS_LOAN_ORIGIN_KIND_PSMX: + // we never do PSMX loans for complex types + assert (sertype->is_memcpy_safe); + *psmx_loan = loan; + if (use_only_psmx) + { + // short-circuit possible without requiring a serdata + *serdata = NULL; + } + else if ((*serdata = dds_write_impl_make_serdata (sertype, sdkind, data, NULL, timestamp, statusinfo)) == NULL) + { + // It is either no memory or invalid data, we've historically gambled on it being invalid + // data because being out of memory is exceedingly unlikely on decent platforms + dds_loaned_sample_unref (loan); + return DDS_RETCODE_BAD_PARAMETER; + } + break; + case DDS_LOAN_ORIGIN_KIND_HEAP: + if (wr->m_endpoint.psmx_endpoints.length == 0) + { + // no PSMX, so local readers and/or network; keeping the loan makes sense for local readers + *psmx_loan = NULL; + // claim "cdr required" - it may not be strictly required for volatile data if there are only + // local readers, but let's not complicate it too much now + if ((*serdata = dds_write_impl_make_serdata (sertype, sdkind, data, loan, timestamp, statusinfo)) == NULL) + { + // It is either no memory or invalid data, we've historically gambled on it being invalid + // data because being out of memory is exceedingly unlikely on decent platforms + dds_loaned_sample_unref (loan); + return DDS_RETCODE_BAD_PARAMETER; + } + } + else + { + // PSMX, so CDR in memory. The loan is (probably) useless to us, because of PSMX loopback to + // local readers. It seems not so likely that there will be enough local readers not using + // PSMX to make it worth the retaining the loan + assert (!sertype->is_memcpy_safe); + // Make a "standard" serdata and then use that to make a PSMX loan + *serdata = dds_write_impl_make_serdata (sertype, sdkind, data, NULL, timestamp, statusinfo); + dds_loaned_sample_unref (loan); + if (*serdata == NULL) + { + // It is either no memory or invalid data, we've historically gambled on it being invalid + // data because being out of memory is exceedingly unlikely on decent platforms + return DDS_RETCODE_BAD_PARAMETER; + } + *psmx_loan = dds_write_impl_make_serialized_loan (wr, *serdata); + if (*psmx_loan == NULL) + { + ddsi_serdata_unref (*serdata); + return DDS_RETCODE_OUT_OF_RESOURCES; + } + } + break; + } + return DDS_RETCODE_OK; + } + else if (use_only_psmx && sertype->is_memcpy_safe) + { + assert (wr->m_endpoint.psmx_endpoints.length == 1); // FIXME: support multiple PSMX instances + *serdata = NULL; + *psmx_loan = dds_write_impl_make_raw_loan (wr, data, sdkind, timestamp, statusinfo); + return (*psmx_loan != NULL) ? DDS_RETCODE_OK : DDS_RETCODE_OUT_OF_RESOURCES; + } + else { - ret = DDS_RETCODE_BAD_PARAMETER; - goto fail_serdata; + // not much room for optimization, so keep things simple: construct a serdata, then take it from + // there + if ((*serdata = dds_write_impl_make_serdata (sertype, sdkind, data, NULL, timestamp, statusinfo)) == NULL) + { + // It is either no memory or invalid data, we've historically gambled on it being invalid + // data because being out of memory is exceedingly unlikely on decent platforms + return DDS_RETCODE_BAD_PARAMETER; + } + + // If PSMX and no loan, make one using the data/serdata we do have + if (wr->m_endpoint.psmx_endpoints.length == 0) + *psmx_loan = NULL; + else + { + assert (wr->m_endpoint.psmx_endpoints.length == 1); // FIXME: support multiple PSMX instances + if (sertype->is_memcpy_safe) + *psmx_loan = dds_write_impl_make_raw_loan (wr, data, sdkind, timestamp, statusinfo); + else + *psmx_loan = dds_write_impl_make_serialized_loan (wr, *serdata); + if (*psmx_loan == NULL) + { + ddsi_serdata_unref (*serdata); + return DDS_RETCODE_OUT_OF_RESOURCES; + } + } + return DDS_RETCODE_OK; } +} - // refc(d) = 1 after successful construction - d->statusinfo = (((action & DDS_WR_DISPOSE_BIT) ? DDSI_STATUSINFO_DISPOSE : 0) | - ((action & DDS_WR_UNREGISTER_BIT) ? DDSI_STATUSINFO_UNREGISTER : 0)); - d->timestamp.v = tstamp; +dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t timestamp, dds_write_action action) +{ + struct ddsi_thread_state * const thrst = ddsi_lookup_thread_state (); + const enum ddsi_serdata_kind sdkind = (action & DDS_WR_KEY_BIT) ? SDK_KEY : SDK_DATA; + const uint32_t statusinfo = + (((action & DDS_WR_DISPOSE_BIT) ? DDSI_STATUSINFO_DISPOSE : 0) | + ((action & DDS_WR_UNREGISTER_BIT) ? DDSI_STATUSINFO_UNREGISTER : 0)); + int ret = DDS_RETCODE_OK; - // 6. Deliver the data - // 6.a ... via network - if ((ret = dds_write_basic_impl (thrst, wr, d)) != DDS_RETCODE_OK) - goto unref_serdata; + if (!evaluate_topic_filter (wr, data, sdkind)) + return DDS_RETCODE_OK; - // 6.b ... through PSMX - if (loan) + // I. psmx loan => assert (psmx && is_memcpy_safe) + // a. psmx only + // - no need for a serdata, so skip everything and deliver loan via PSMX + // b. psmx && others + // - ddsi_serdata_from_sample because we need a "normal one" anyway + // - deliver loan via PSMX (has to wait for from_sample because publishing via PSMX invalidates the data) + // - deliver serdata + // + // II. heap loan => (note: not necessarily is_memcpy_safe) + // a. psmx only - assert (!is_memcpy_safe); as-if no loan + // b. psmx && others - assert (!is_memcpy_safe); as-if no loan: we typically have PSMX loopback and that makes the loan useless + // c. no psmx + // - ddsi_serdata_from_loaned_sample, deliver serdata + // + // III. not loan + // a. psmx only + // 1. is_memcpy_safe + // - allocate PSMX loan, memcpy, deliver loan via PSMX + // 2. not is_memcpy_safe + // - get_serialized_size (key/sample), allocate PSMX loan, serialize_into, free serdata + // - deliver loan via PSMX + // b. psmx && others + // 1. is_memcpy_safe + // - allocate PSMX loan, memcpy, deliver loan via PSMX + // - ddsi_serdata_from_sample, deliver serdata + // 2. not is_memcpy_safe + // - ddsi_serdata_from_sample + // - allocate PSMX loan based on size of CDR in serdata, memcpy, deliver loan via PSMX + // - deliver serdata + // c. no psmx + // - ddsi_serdata_from_sample, deliver serdata + ddsi_thread_state_awake (thrst, &wr->m_entity.m_domain->gv); + struct ddsi_serdata *serdata; + struct dds_loaned_sample *psmx_loan; + if ((ret = dds_write_impl_psmxloan_serdata (wr, data, sdkind, timestamp, statusinfo, &psmx_loan, &serdata)) == DDS_RETCODE_OK) { - assert (loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX); - struct dds_psmx_endpoint *endpoint = loan->loan_origin.psmx_endpoint; - - // populate metadata fields - struct dds_psmx_metadata *md = loan->metadata; - memcpy (&md->guid, &ddsi_wr->e.guid, sizeof (md->guid)); - md->timestamp = d->timestamp.v; - md->statusinfo = d->statusinfo; - ret = endpoint->ops.write (endpoint, loan); + assert (psmx_loan != NULL || serdata != NULL); + assert ((psmx_loan == NULL) == (wr->m_endpoint.psmx_endpoints.length == 0)); + assert (psmx_loan == NULL || psmx_loan->loan_origin.origin_kind == DDS_LOAN_ORIGIN_KIND_PSMX); + if (psmx_loan != NULL) + ret = dds_write_impl_deliver_via_psmx (psmx_loan); // "consumes" the loan + if (serdata != NULL) + { + if (ret == DDS_RETCODE_OK) + ret = dds_write_impl_deliver_via_ddsi (thrst, wr, serdata); + ddsi_serdata_unref (serdata); + } } - -unref_serdata: - ddsi_serdata_unref (d); // refc(d) = 0 -fail_serdata: ddsi_thread_state_asleep (thrst); return ret; } diff --git a/src/core/ddsc/tests/psmx.c b/src/core/ddsc/tests/psmx.c index 532236430e..cbccc7f50b 100644 --- a/src/core/ddsc/tests/psmx.c +++ b/src/core/ddsc/tests/psmx.c @@ -1320,21 +1320,30 @@ CU_Test (ddsc_psmx, zero_copy) printf (" | rddata[%d] %p", (int) i, rddata[i]); fflush (stdout); } - // If not using _wl, private copies. Otherwise it gets very implementation-specific - // whether they are actually the same, but it is a test, so let's check. Changes to - // the implementation may well require changing this test. - if (!cases[k].wrloan_ok || !dds_is_shared_memory_available (wr)) - { - for (size_t i = 0; i < sizeof (rds) / sizeof (rds[0]) - 1; i++) - for (size_t j = i + 1; j < sizeof (rds) / sizeof (rds[0]); j++) - CU_ASSERT (rddata[i] != rddata[j]); - } - else + // For self-contained types: if somewhere in the process a loan gets + // involved the readers will get the same address. + // + // Iceoryx -> shared memory -> whenever the writer uses a loan or Iceoryx is used + // CDDS-based plugin -> many copies but it works if the writer uses a loan and + // PSMX is avoided (because the plugin always produces separate copies) + // + // This is obviously very implementation-specific, but as this is a test, + // let's check. Changes to the implementation will likely require changing + // this test. + if (cases[k].wrloan_ok && + (( dds_is_shared_memory_available (wr) && (wrloan || psmx_enabled)) || + (!dds_is_shared_memory_available (wr) && (wrloan && !psmx_enabled)))) { for (size_t i = 1; i < sizeof (rds) / sizeof (rds[0]); i++) CU_ASSERT (rddata[i] == rddata[0]); CU_ASSERT ((wrloan && wrdata == rddata[0]) || (!wrloan && wrdata != rddata[0])); } + else + { + for (size_t i = 0; i < sizeof (rds) / sizeof (rds[0]) - 1; i++) + for (size_t j = i + 1; j < sizeof (rds) / sizeof (rds[0]); j++) + CU_ASSERT (rddata[i] != rddata[j]); + } printf ("\n"); fflush (stdout); rc = dds_delete (wr); diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h b/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h index 9fb1279ca2..dfd9f431b1 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h @@ -146,7 +146,7 @@ typedef void (*ddsi_serdata_get_keyhash_t) (const struct ddsi_serdata *d, struct // Used for taking a loaned sample and constructing a serdata around this // takes over ownership of loan on success (leaves it unchanged on failure) -typedef struct ddsi_serdata* (*ddsi_serdata_from_loan_t) (const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, struct dds_loaned_sample *loaned_sample, bool force_serialization); +typedef struct ddsi_serdata* (*ddsi_serdata_from_loan_t) (const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, struct dds_loaned_sample *loaned_sample, bool will_require_cdr); // Used for constructing a serdata from data received on a PSMX typedef struct ddsi_serdata* (*ddsi_serdata_from_psmx_t) (const struct ddsi_sertype *type, struct dds_loaned_sample *loaned_sample); @@ -327,12 +327,12 @@ DDS_INLINE_EXPORT inline void ddsi_serdata_get_keyhash (const struct ddsi_serdat d->ops->get_keyhash (d, buf, force_md5); } -DDS_INLINE_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_loaned_sample(const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, struct dds_loaned_sample *loan, bool force_serialization) ddsrt_nonnull_all; +DDS_INLINE_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_loaned_sample(const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, struct dds_loaned_sample *loan, bool will_require_cdr) ddsrt_nonnull_all; /** @component typesupport_if */ -DDS_INLINE_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_loaned_sample(const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, struct dds_loaned_sample *loan, bool force_serialization) +DDS_INLINE_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_loaned_sample(const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, struct dds_loaned_sample *loan, bool will_require_cdr) { - return type->serdata_ops->from_loaned_sample(type, kind, sample, loan, force_serialization); + return type->serdata_ops->from_loaned_sample(type, kind, sample, loan, will_require_cdr); } DDS_INLINE_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_psmx(const struct ddsi_sertype *type, struct dds_loaned_sample *data) ddsrt_nonnull_all; diff --git a/src/core/ddsi/src/ddsi_serdata.c b/src/core/ddsi/src/ddsi_serdata.c index 7761c8fd7a..e46b4d9608 100644 --- a/src/core/ddsi/src/ddsi_serdata.c +++ b/src/core/ddsi/src/ddsi_serdata.c @@ -80,5 +80,5 @@ DDS_EXPORT extern inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, DDS_EXPORT extern inline bool ddsi_serdata_print (const struct ddsi_serdata *d, char *buf, size_t size); DDS_EXPORT extern inline bool ddsi_serdata_print_untyped (const struct ddsi_sertype *type, const struct ddsi_serdata *d, char *buf, size_t size); DDS_EXPORT extern inline void ddsi_serdata_get_keyhash (const struct ddsi_serdata *d, struct ddsi_keyhash *buf, bool force_md5); -DDS_EXPORT extern inline struct ddsi_serdata *ddsi_serdata_from_loaned_sample (const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, struct dds_loaned_sample *loan, bool force_serialization); +DDS_EXPORT extern inline struct ddsi_serdata *ddsi_serdata_from_loaned_sample (const struct ddsi_sertype *type, enum ddsi_serdata_kind kind, const char *sample, struct dds_loaned_sample *loan, bool will_require_cdr); DDS_EXPORT extern inline struct ddsi_serdata *ddsi_serdata_from_psmx (const struct ddsi_sertype *type, struct dds_loaned_sample *data); diff --git a/src/core/xtests/symbol_export/symbol_export.c b/src/core/xtests/symbol_export/symbol_export.c index 1a70c8c00c..ccbc766e49 100644 --- a/src/core/xtests/symbol_export/symbol_export.c +++ b/src/core/xtests/symbol_export/symbol_export.c @@ -75,6 +75,9 @@ DDSRT_WARNING_DEPRECATED_OFF +DDSRT_WARNING_CLANG_OFF(unused-result) +DDSRT_WARNING_GNUC_OFF(unused-result) + #ifdef DDS_HAS_SECURITY static void test_DDS_Security_Exception_vset (void *ptr, const char *msg, ...) { From 6cdf599518e3c2358a45a41513505b117b37c2cf Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Tue, 28 Nov 2023 18:47:43 +0100 Subject: [PATCH 2/3] Tighten PSMX tests a bit further also attempt to detect a failure to match because of an event loop that got stuck without sending out discovery data Signed-off-by: Erik Boasson --- src/core/ddsc/tests/PsmxDataModels.idl | 20 +++++++-- src/core/ddsc/tests/psmx.c | 62 ++++++++++++++++++++------ 2 files changed, 65 insertions(+), 17 deletions(-) diff --git a/src/core/ddsc/tests/PsmxDataModels.idl b/src/core/ddsc/tests/PsmxDataModels.idl index 2013b2a34d..5f30b988b3 100644 --- a/src/core/ddsc/tests/PsmxDataModels.idl +++ b/src/core/ddsc/tests/PsmxDataModels.idl @@ -16,10 +16,24 @@ struct NSC_Model { string a,b,c; }; +@nested +struct PsmxType1xy { + long x; + octet y; +}; + +// A memcpy-able type where fields are laid differently in C +// and CDR: +// offset +// C CDR +// x.x 0 0 +// x.y 4 4 +// x.z 8 5 +// +// size 12 6 struct PsmxType1 { - long long_1; - long long_2; - long long_3; + PsmxType1xy xy; + octet z; }; struct PsmxKeySupportCheck { diff --git a/src/core/ddsc/tests/psmx.c b/src/core/ddsc/tests/psmx.c index cbccc7f50b..10c66e5b37 100644 --- a/src/core/ddsc/tests/psmx.c +++ b/src/core/ddsc/tests/psmx.c @@ -23,6 +23,7 @@ #include "dds/ddsi/ddsi_entity_index.h" #include "ddsi__addrset.h" #include "ddsi__entity.h" +#include "ddsi__xevent.h" #include "dds__entity.h" #include "dds__serdata_default.h" @@ -236,11 +237,12 @@ static bool check_writer_addrset_once (struct tracebuf *tb, dds_entity_t wrhandl static bool check_writer_addrset (struct tracebuf *tb, dds_entity_t wrhandle, int nports, const uint32_t *ports) { const dds_time_t abstimeout = dds_time () + DDS_SECS (10); + bool exclaimed = false; do { if (check_writer_addrset_once (NULL, wrhandle, nports, ports)) return true; // this is such a crazily unlikely scenario a sleep is fine - print (tb, "!"); + if (!exclaimed) { print (tb, "!"); exclaimed = true; } dds_sleepfor (DDS_MSECS (10)); } while (dds_time () < abstimeout); return check_writer_addrset_once (tb, wrhandle, nports, ports); @@ -571,6 +573,33 @@ static void print_time (struct tracebuf *tb) print (tb, "%d.%06d", (int32_t) (t / DDS_NSECS_IN_SEC), (int32_t) (t % DDS_NSECS_IN_SEC) / 1000); } +static void check_transmit_queue_empty_helper (void *vflag) +{ + ddsrt_atomic_or32 (vflag, 1); +} + +static void check_transmit_queue_empty (struct tracebuf *tb, struct ddsi_domaingv *gvs[MAX_DOMAINS]) +{ + dds_time_t tstop = dds_time () + DDS_SECS (5); + ddsrt_atomic_uint32_t flag; + print (tb, "; check xmit queue ("); + print_time (tb); + print (tb, ") dom "); + for (int i = 0; i < MAX_DOMAINS; i++) + { + ddsrt_atomic_st32 (&flag, 0); + print (tb, "%d:", i); + ddsi_qxev_nt_callback (gvs[i]->xevents, check_transmit_queue_empty_helper, &flag); + while (!ddsrt_atomic_ld32 (&flag) && dds_time () < tstop) + dds_sleepfor (DDS_MSECS (10)); + print_time (tb); + if (ddsrt_atomic_ld32 (&flag)) + print (tb, "(ok)"); + else + print (tb, "(dead)"); + } +} + enum local_delivery_mode { // Avoid Cyclone's internal local delivery path, to guarantee that PSMX is actually used // (done by overwriting the fast path table) @@ -717,6 +746,11 @@ static void dotest (const dds_topic_descriptor_t *tpdesc, const void *sample, bo }; if (!allmatched (ws, wr, nrds_active, rds)) { + // sanity check for a once-observed problem where SEDP fails because a + // scheduling problem with a pre-emptive ACKNACK event causes event thread + // to not get around to sending queued messages, which resulted in the SEDP + // data never making it out + check_transmit_queue_empty (&tb, gvs); fail_match (); fail_one = true; goto next; @@ -900,9 +934,9 @@ static bool eq_PsmxType1 (const void *vsent, const void *vrecvd, bool valid_data const PsmxType1 *sent = vsent; const PsmxType1 *recvd = vrecvd; if (valid_data) - return sent->long_1 == recvd->long_1 && sent->long_2 == recvd->long_2 && sent->long_3 == recvd->long_3; + return sent->xy.x == recvd->xy.x && sent->xy.y == recvd->xy.y && sent->z == recvd->z; else - return recvd->long_1 == 0 && recvd->long_2 == 0 && recvd->long_3 == 0; + return recvd->xy.x == 0 && recvd->xy.y == 0 && recvd->z == 0; } static bool eq_DynamicData_Msg (const void *vsent, const void *vrecvd, bool valid_data) @@ -948,7 +982,7 @@ static bool eq_DynamicData_KMsg (const void *vsent, const void *vrecvd, bool val CU_Test(ddsc_psmx, one_writer, .timeout = 240) { failed = false; - dotest (&PsmxType1_desc, &(const PsmxType1){ 5, 3, 53 }, eq_PsmxType1, LDM_NONE, WM_NORMAL, false); + dotest (&PsmxType1_desc, &(const PsmxType1){ {5, 3}, 53 }, eq_PsmxType1, LDM_NONE, WM_NORMAL, false); CU_ASSERT (!failed); } @@ -983,42 +1017,42 @@ CU_Test(ddsc_psmx, one_writer_dynsize_strkey, .timeout = 240) CU_Test(ddsc_psmx, one_writer_fastpath, .timeout = 240) { failed = false; - dotest (&PsmxType1_desc, &(const PsmxType1){ 7, 3, 73 }, eq_PsmxType1, LDM_FASTPATH, WM_NORMAL, false); + dotest (&PsmxType1_desc, &(const PsmxType1){ {7, 3}, 73 }, eq_PsmxType1, LDM_FASTPATH, WM_NORMAL, false); CU_ASSERT (!failed); } CU_Test(ddsc_psmx, one_writer_slowpath, .timeout = 240) { failed = false; - dotest (&Space_Type3_desc, &(const PsmxType1){ 2, 3, 23 }, eq_PsmxType1, LDM_SLOWPATH, WM_NORMAL, false); + dotest (&Space_Type3_desc, &(const PsmxType1){ {2, 3}, 23 }, eq_PsmxType1, LDM_SLOWPATH, WM_NORMAL, false); CU_ASSERT (!failed); } CU_Test(ddsc_psmx, one_writer_wloan, .timeout = 240) { failed = false; - dotest (&PsmxType1_desc, &(const PsmxType1){ 3, 7, 37 }, eq_PsmxType1, LDM_NONE, WM_LOAN, false); + dotest (&PsmxType1_desc, &(const PsmxType1){ {3, 7}, 37 }, eq_PsmxType1, LDM_NONE, WM_LOAN, false); CU_ASSERT (!failed); } CU_Test(ddsc_psmx, one_writer_rloan, .timeout = 240) { failed = false; - dotest (&PsmxType1_desc, &(const PsmxType1){ 11, 17, 1117 }, eq_PsmxType1, LDM_NONE, WM_NORMAL, true); + dotest (&PsmxType1_desc, &(const PsmxType1){ {11, 17}, 251 }, eq_PsmxType1, LDM_NONE, WM_NORMAL, true); CU_ASSERT (!failed); } CU_Test(ddsc_psmx, one_writer_wrloan, .timeout = 240) { failed = false; - dotest (&PsmxType1_desc, &(const PsmxType1){ 5113, 51, 13 }, eq_PsmxType1, LDM_NONE, WM_LOAN, true); + dotest (&PsmxType1_desc, &(const PsmxType1){ {5113, 51}, 13 }, eq_PsmxType1, LDM_NONE, WM_LOAN, true); CU_ASSERT (!failed); } CU_Test(ddsc_psmx, one_writer_forwardcdr, .timeout = 240) { failed = false; - dotest (&PsmxType1_desc, &(const PsmxType1){ 5, 3, 53 }, eq_PsmxType1, LDM_NONE, WM_FORWARDCDR, false); + dotest (&PsmxType1_desc, &(const PsmxType1){ {5, 3}, 53 }, eq_PsmxType1, LDM_NONE, WM_FORWARDCDR, false); CU_ASSERT (!failed); } @@ -1137,11 +1171,11 @@ CU_Test(ddsc_psmx, partition_xtalk) CU_ASSERT_FATAL (checkwr > 0); } - rc = dds_write (wr, &(PsmxType1){ 1, 2, 3 }); + rc = dds_write (wr, &(PsmxType1){ {1, 2}, 3 }); CU_ASSERT_FATAL (rc == 0); if (checkwr) { - rc = dds_write (checkwr, &(PsmxType1){ 4, 5, 6 }); + rc = dds_write (checkwr, &(PsmxType1){ {4, 5}, 6 }); CU_ASSERT_FATAL (rc == 0); } @@ -1152,9 +1186,9 @@ CU_Test(ddsc_psmx, partition_xtalk) dds_sleepfor (DDS_MSECS (10)); CU_ASSERT_FATAL (rc == 1); if (checkwr == 0) { - CU_ASSERT_FATAL (t.long_1 == 1 && t.long_2 == 2 && t.long_3 == 3); + CU_ASSERT_FATAL (t.xy.x == 1 && t.xy.y == 2 && t.z == 3); } else { - CU_ASSERT_FATAL (t.long_1 == 4 && t.long_2 == 5 && t.long_3 == 6); + CU_ASSERT_FATAL (t.xy.x == 4 && t.xy.y == 5 && t.z == 6); } rc = dds_delete (wr); From 2c459a2346c7d5e9a3873e1971307bd0c545f55e Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 4 Dec 2023 13:56:40 +0100 Subject: [PATCH 3/3] Full test coverage for loans/types/PSMX This runs over: - memcpy-safe types and non-memcpy-safe types - use of writer loans - use of reader loans - use of PSMX - writing and disposing (sample vs key) That should provide basic test coverage for all combinations that the code distinguishes between. The type definitions are chosen so that raw samples and CDR have different memory layouts and one cannot be confused for the other. Signed-off-by: Erik Boasson --- src/core/ddsc/tests/PsmxDataModels.idl | 17 +++ src/core/ddsc/tests/psmx.c | 175 +++++++++++++++++++++++++ 2 files changed, 192 insertions(+) diff --git a/src/core/ddsc/tests/PsmxDataModels.idl b/src/core/ddsc/tests/PsmxDataModels.idl index 5f30b988b3..9bc79b78fa 100644 --- a/src/core/ddsc/tests/PsmxDataModels.idl +++ b/src/core/ddsc/tests/PsmxDataModels.idl @@ -39,3 +39,20 @@ struct PsmxType1 { struct PsmxKeySupportCheck { @key long k; }; + +struct PsmxLoanTest0 { + PsmxType1xy xy; + @key octet z; +}; + +struct PsmxLoanTest1 { + PsmxType1xy xy; + @key octet z; + string str; +}; + +struct PsmxLoanTest2 { + PsmxType1xy xy; + @key octet z; + @key string str; +}; diff --git a/src/core/ddsc/tests/psmx.c b/src/core/ddsc/tests/psmx.c index 10c66e5b37..e54cd30db5 100644 --- a/src/core/ddsc/tests/psmx.c +++ b/src/core/ddsc/tests/psmx.c @@ -1396,3 +1396,178 @@ CU_Test (ddsc_psmx, zero_copy) } dds_delete (dds_get_parent (pp)); } + +static void deepcopy_sample_contents (const dds_topic_descriptor_t *tpdesc, void *output, const void *input) +{ + struct dds_cdrstream_desc desc; + dds_cdrstream_desc_from_topic_desc (&desc, tpdesc); + struct dds_ostream os; + dds_ostream_init (&os, &dds_cdrstream_default_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2); + dds_stream_write_sample (&os, &dds_cdrstream_default_allocator, input, &desc); + struct dds_istream is; + dds_istream_init (&is, os.m_index, os.m_buffer, os.m_xcdr_version); + memset (output, 0, desc.size); + dds_stream_read_sample (&is, output, &dds_cdrstream_default_allocator, &desc); + dds_istream_fini (&is); + dds_ostream_fini (&os, &dds_cdrstream_default_allocator); + dds_cdrstream_desc_fini (&desc, &dds_cdrstream_default_allocator); +} + +static const void *get_write_datapointer (dds_entity_t wr, const dds_topic_descriptor_t *tpdesc, const void *template, bool wrloan) +{ + if (!wrloan) + return template; + else + { + void *tmp; + dds_return_t rc = dds_request_loan (wr, &tmp); + CU_ASSERT_FATAL (rc == 0); + deepcopy_sample_contents (tpdesc, tmp, template); + return tmp; + } +} + +static bool data_equal (const dds_topic_descriptor_t *tpdesc, const void *a, const void *b, bool justkey) +{ + struct dds_cdrstream_desc desc; + dds_cdrstream_desc_from_topic_desc (&desc, tpdesc); + struct dds_ostream osa, osb; + dds_ostream_init (&osa, &dds_cdrstream_default_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2); + dds_ostream_init (&osb, &dds_cdrstream_default_allocator, 0, DDSI_RTPS_CDR_ENC_VERSION_2); + if (justkey) + { + dds_stream_write_key (&osa, DDS_CDR_KEY_SERIALIZATION_SAMPLE, &dds_cdrstream_default_allocator, a, &desc); + dds_stream_write_key (&osb, DDS_CDR_KEY_SERIALIZATION_SAMPLE, &dds_cdrstream_default_allocator, b, &desc); + } + else + { + dds_stream_write_sample (&osa, &dds_cdrstream_default_allocator, a, &desc); + dds_stream_write_sample (&osb, &dds_cdrstream_default_allocator, b, &desc); + } + const bool eq = (osa.m_index == osb.m_index) && memcmp (osa.m_buffer, osb.m_buffer, osa.m_index) == 0; + dds_ostream_fini (&osa, &dds_cdrstream_default_allocator); + dds_ostream_fini (&osb, &dds_cdrstream_default_allocator); + dds_cdrstream_desc_fini (&desc, &dds_cdrstream_default_allocator); + return eq; +} + +CU_Test (ddsc_psmx, writer_loan) +{ + const struct { + const dds_topic_descriptor_t *desc; + const struct { + dds_return_t (*op) (dds_entity_t wr, const void *data); + const void *data; + } step[2]; + } cases[] = { + { &PsmxLoanTest0_desc, { + { dds_write, &(PsmxLoanTest0){ { 0x12345678, 0x55 }, 0x22 } }, + { dds_dispose, &(PsmxLoanTest0){ { 0, 0 }, 0x22 } } } }, + { &PsmxLoanTest1_desc, { + { dds_write, &(PsmxLoanTest1){ { 0x12345678, 0x55 }, 0x22, "aap" } }, + { dds_dispose, &(PsmxLoanTest1){ { 0, 0, }, 0x22, "" } } } }, + { &PsmxLoanTest2_desc, { + { dds_write, &(PsmxLoanTest2){ { 0x12345678, 0x55 }, 0x22, "noot" } }, + { dds_dispose, &(PsmxLoanTest2){ { 0, 0, }, 0x22, "noot" } } } }, + }; + dds_return_t rc; + + const dds_entity_t pp = create_participant (0); + CU_ASSERT_FATAL (pp > 0); + for (size_t k = 0; k < sizeof (cases) / sizeof (cases[0]); k++) + { + char topicname[100]; + create_unique_topic_name ("writer_loan", topicname, sizeof (topicname)); + const dds_entity_t tp = dds_create_topic (pp, cases[k].desc, topicname, NULL, NULL); + CU_ASSERT_FATAL (tp > 0); + for (int wr_psmx_enabled_i = 0; wr_psmx_enabled_i <= 1; wr_psmx_enabled_i++) + { + const bool wr_psmx_enabled = wr_psmx_enabled_i; + for (int rd_psmx_enabled_i = 0; rd_psmx_enabled_i <= 1; rd_psmx_enabled_i++) + { + const bool rd_psmx_enabled = rd_psmx_enabled_i; + for (int wrloan_i = 0; wrloan_i <= 1; wrloan_i++) + { + const bool wrloan = wrloan_i; + for (int rdloan_i = 0; rdloan_i <= 1; rdloan_i++) + { + const bool rdloan = rdloan_i; + dds_qos_t *qos; + printf ("ddsc_psmx writer_loan: %s psmx %d %d wrloan %d rdloan %d", cases[k].desc->m_typename, wr_psmx_enabled, rd_psmx_enabled, wrloan, rdloan); + fflush (stdout); + + qos = dds_create_qos (); + if (!wr_psmx_enabled) + dds_qset_psmx_instances (qos, 0, NULL); + const dds_entity_t wr = dds_create_writer (pp, tp, qos, NULL); + CU_ASSERT_FATAL (wr > 0); + CU_ASSERT_FATAL (endpoint_has_psmx_enabled (wr) == wr_psmx_enabled); + dds_delete_qos (qos); + + qos = dds_create_qos (); + if (!rd_psmx_enabled) + dds_qset_psmx_instances (qos, 0, NULL); + dds_entity_t rd; + const dds_entity_t ws = dds_create_waitset (pp); + rd = dds_create_reader (pp, tp, qos, NULL); + CU_ASSERT_FATAL (rd > 0); + CU_ASSERT_FATAL (endpoint_has_psmx_enabled (rd) == rd_psmx_enabled); + dds_delete_qos (qos); + + rc = dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_FATAL (rc == 0); + rc = dds_waitset_attach (ws, rd, 0); + CU_ASSERT_FATAL (rc == 0); + + for (int step_i = 0; step_i < (int) (sizeof (cases[k].step) / sizeof (cases[k].step[0])); step_i++) + { + const void *wrdata = cases[k].step[step_i].data; + dds_return_t (* const op) (dds_entity_t wr, const void *data) = cases[k].step[step_i].op; + + // dispose + loan is not supported (triggers use-after-free; this is documented, but bad) + if (op == dds_dispose && wrloan) + continue; + + const bool justkey = (op != dds_write); + printf (" | %s %p", (op == dds_write) ? "write" : "dispose", wrdata); fflush (stdout); + rc = op (wr, get_write_datapointer (wr, cases[k].desc, wrdata, wrloan)); + CU_ASSERT_FATAL (rc == 0); + (void) dds_waitset_wait (ws, NULL, 0, DDS_INFINITY); + dds_sample_info_t si; + void *rddata; + if (rdloan) + rddata = NULL; + else + { + rddata = ddsrt_malloc (cases[k].desc->m_size); + memset (rddata, 0, cases[k].desc->m_size); + } + rc = dds_take (rd, &rddata, &si, 1, 1); + CU_ASSERT_FATAL (rc == 1); + printf (" | rddata %p", rddata); fflush (stdout); + CU_ASSERT_FATAL (data_equal (cases[k].desc, wrdata, rddata, justkey)); + if (!rdloan) + dds_sample_free (rddata, cases[k].desc, DDS_FREE_ALL); + else + { + rc = dds_return_loan (rd, &rddata, 1); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + } + } + printf ("\n"); fflush (stdout); + + rc = dds_delete (wr); + CU_ASSERT_FATAL (rc == 0); + rc = dds_delete (rd); + CU_ASSERT_FATAL (rc == 0); + rc = dds_delete (ws); + CU_ASSERT_FATAL (rc == 0); + } + } + } + } + rc = dds_delete (tp); + CU_ASSERT_FATAL (rc == 0); + } + dds_delete (dds_get_parent (pp)); +}