From 8aa9e965490950c75a6c13011956c6e4e5e00af6 Mon Sep 17 00:00:00 2001 From: Arnaud de Turckheim Date: Tue, 27 Aug 2024 10:40:40 +0200 Subject: [PATCH] upipe-ts: add AIT support --- include/upipe-ts/Makefile.am | 2 + include/upipe-ts/upipe_ts_ait_decoder.h | 47 +++ include/upipe-ts/upipe_ts_ait_generator.h | 47 +++ include/upipe-ts/uref_ts_flow.h | 5 + lib/upipe-ts/Makefile.am | 2 + lib/upipe-ts/upipe_ts_ait_decoder.c | 382 +++++++++++++++++++++ lib/upipe-ts/upipe_ts_ait_generator.c | 392 ++++++++++++++++++++++ lib/upipe-ts/upipe_ts_demux.c | 28 ++ lib/upipe-ts/upipe_ts_mux.c | 54 ++- lib/upipe-ts/upipe_ts_pmt_decoder.c | 12 + lib/upipe-ts/upipe_ts_psi_generator.c | 7 +- 11 files changed, 975 insertions(+), 3 deletions(-) create mode 100644 include/upipe-ts/upipe_ts_ait_decoder.h create mode 100644 include/upipe-ts/upipe_ts_ait_generator.h create mode 100644 lib/upipe-ts/upipe_ts_ait_decoder.c create mode 100644 lib/upipe-ts/upipe_ts_ait_generator.c diff --git a/include/upipe-ts/Makefile.am b/include/upipe-ts/Makefile.am index 882333a93..58b122af3 100644 --- a/include/upipe-ts/Makefile.am +++ b/include/upipe-ts/Makefile.am @@ -9,6 +9,8 @@ myinclude_HEADERS = \ upipe_ts_encaps.h \ upipe_ts_pcr_interpolator.h \ upipe_ts_mux.h \ + upipe_ts_ait_decoder.h \ + upipe_ts_ait_generator.h \ upipe_ts_eit_decoder.h \ upipe_ts_nit_decoder.h \ upipe_ts_cat_decoder.h \ diff --git a/include/upipe-ts/upipe_ts_ait_decoder.h b/include/upipe-ts/upipe_ts_ait_decoder.h new file mode 100644 index 000000000..f37759650 --- /dev/null +++ b/include/upipe-ts/upipe_ts_ait_decoder.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2024 EasyTools S.A.S. + * + * Authors: Arnaud de Turckheim + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + */ + +/** @file + * @short Upipe module decoding the application information table of DVB + * streams + */ + +#ifndef _UPIPE_TS_UPIPE_TS_AIT_DECODER_H_ +/** @hidden */ +#define _UPIPE_TS_UPIPE_TS_AIT_DECODER_H_ +#ifdef __cplusplus +extern "C" { +#endif + +#include "upipe/upipe.h" +#include "upipe-ts/upipe_ts_demux.h" + +#define UPIPE_TS_AITD_SIGNATURE UBASE_FOURCC('t','s',0x74,'d') + +/** @This returns the management structure for all ts_aitd pipes. + * + * @return pointer to manager + */ +struct upipe_mgr *upipe_ts_aitd_mgr_alloc(void); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/include/upipe-ts/upipe_ts_ait_generator.h b/include/upipe-ts/upipe_ts_ait_generator.h new file mode 100644 index 000000000..1560d1bd2 --- /dev/null +++ b/include/upipe-ts/upipe_ts_ait_generator.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2024 EasyTools S.A.S. + * + * Authors: Arnaud de Turckheim + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + */ + +/** @file + * @short Upipe module generating the application information table of DVB + * streams + */ + +#ifndef _UPIPE_TS_UPIPE_TS_AIT_GENRATOR_H_ +/** @hidden */ +#define _UPIPE_TS_UPIPE_TS_AIT_GENRATOR_H_ +#ifdef __cplusplus +extern "C" { +#endif + +#include "upipe/upipe.h" +#include "upipe-ts/upipe_ts_demux.h" + +#define UPIPE_TS_AITG_SIGNATURE UBASE_FOURCC('t','s',0x74,'g') + +/** @This returns the management structure for all ts_aitg pipes. + * + * @return pointer to manager + */ +struct upipe_mgr *upipe_ts_aitg_mgr_alloc(void); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/include/upipe-ts/uref_ts_flow.h b/include/upipe-ts/uref_ts_flow.h index d618eb1e6..46000cf62 100644 --- a/include/upipe-ts/uref_ts_flow.h +++ b/include/upipe-ts/uref_ts_flow.h @@ -91,6 +91,11 @@ UREF_ATTR_UNSIGNED(ts_flow, sdt_descriptors, "t.sdt.descs", UREF_ATTR_OPAQUE_VA(ts_flow, sdt_descriptor, "t.sdt.desc[%" PRIu64"]", SDT descriptor, uint64_t nb, nb) UREF_TS_ATTR_DESCRIPTOR(ts_flow, sdt_descriptor) +UREF_ATTR_UNSIGNED(ts_flow, ait_descriptors, "t.ait.descs", + number of SDT descriptors) +UREF_ATTR_OPAQUE_VA(ts_flow, ait_descriptor, "t.ait.desc[%" PRIu64"]", + SDT descriptor, uint64_t nb, nb) +UREF_TS_ATTR_DESCRIPTOR(ts_flow, ait_descriptor) /* EIT */ UREF_ATTR_SMALL_UNSIGNED(ts_flow, last_table_id, "t.lasttid", diff --git a/lib/upipe-ts/Makefile.am b/lib/upipe-ts/Makefile.am index 8886d4bb4..1ca899e18 100644 --- a/lib/upipe-ts/Makefile.am +++ b/lib/upipe-ts/Makefile.am @@ -5,6 +5,8 @@ noinst_HEADERS = upipe_ts_psi_decoder.h libupipe_ts_la_SOURCES = \ upipe_ts_check.c \ upipe_ts_decaps.c \ + upipe_ts_ait_decoder.c \ + upipe_ts_ait_generator.c \ upipe_ts_eit_decoder.c \ upipe_ts_nit_decoder.c \ upipe_ts_cat_decoder.c \ diff --git a/lib/upipe-ts/upipe_ts_ait_decoder.c b/lib/upipe-ts/upipe_ts_ait_decoder.c new file mode 100644 index 000000000..ee88361c9 --- /dev/null +++ b/lib/upipe-ts/upipe_ts_ait_decoder.c @@ -0,0 +1,382 @@ +/* + * Copyright (C) 2024 EasyTools S.A.S. + * + * Authors: Arnaud de Turckheim + * + * This service is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * (at your option) any later version. + * + * This service is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this service; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + */ + +/** @file + * @short Upipe module decoding the application information table of DVB + * streams + * Normative references: + * - ETSI EN 300 468 V1.13.1 (2012-08) (SI in DVB systems) + * - ETSI TR 101 211 V1.9.1 (2009-06) (Guidelines of SI in DVB systems) + * - ETSI TS 102 809 V1.1.1 (2010-01) (Signalling and carriage of interactive + * applications and services) + */ + +#include "upipe/ubase.h" +#include "upipe/uclock.h" +#include "upipe/ulist.h" +#include "upipe/uref.h" +#include "upipe/uref_clock.h" +#include "upipe/uref_flow.h" +#include "upipe/uref_block.h" +#include "upipe/upipe.h" +#include "upipe/upipe_helper_upipe.h" +#include "upipe/upipe_helper_urefcount.h" +#include "upipe/upipe_helper_void.h" +#include "upipe/upipe_helper_output.h" +#include "upipe/upipe_helper_ubuf_mgr.h" +#include "upipe/upipe_helper_uref_mgr.h" +#include "upipe/upipe_helper_flow_def.h" +#include "upipe-ts/upipe_ts_ait_decoder.h" +#include "upipe-ts/uref_ts_flow.h" +#include "upipe_ts_psi_decoder.h" + +#include +#include +#include +#include +#include + +#include +#include + +/** we only accept TS packets */ +#define EXPECTED_FLOW_DEF "block.mpegtspsi.mpegtsait." +#define OUTPUT_FLOW_DEF "block.mpegtspsi.mpegtsait." + +/** @hidden */ +static int upipe_ts_aitd_check(struct upipe *upipe, struct uref *flow_format); + +/** @internal @This is the private context of a ts_aitd pipe. */ +struct upipe_ts_aitd { + /** refcount management structure */ + struct urefcount urefcount; + + /** ubuf manager */ + struct ubuf_mgr *ubuf_mgr; + /** flow format packet */ + struct uref *flow_format; + /** ubuf manager request */ + struct urequest ubuf_mgr_request; + + /** pipe acting as output */ + struct upipe *output; + /** output flow definition */ + struct uref *flow_def; + /** output state */ + enum upipe_helper_output_state output_state; + /** list of output requests */ + struct uchain request_list; + /** input flow definition */ + struct uref *flow_def_input; + /** attributes in the sequence header */ + struct uref *flow_def_attr; + + /** uref manager */ + struct uref_mgr *uref_mgr; + /** uref manager request */ + struct urequest uref_mgr_request; + + /** currently in effect AIT table */ + UPIPE_TS_PSID_TABLE_DECLARE(ait); + /** AIT table being gathered */ + UPIPE_TS_PSID_TABLE_DECLARE(next_ait); + + /** public upipe structure */ + struct upipe upipe; +}; + +UPIPE_HELPER_UPIPE(upipe_ts_aitd, upipe, UPIPE_TS_AITD_SIGNATURE) +UPIPE_HELPER_UREFCOUNT(upipe_ts_aitd, urefcount, upipe_ts_aitd_free) +UPIPE_HELPER_VOID(upipe_ts_aitd) +UPIPE_HELPER_OUTPUT(upipe_ts_aitd, output, flow_def, output_state, request_list) +UPIPE_HELPER_UBUF_MGR(upipe_ts_aitd, ubuf_mgr, flow_format, ubuf_mgr_request, + upipe_ts_aitd_check, + upipe_ts_aitd_register_output_request, + upipe_ts_aitd_unregister_output_request) +UPIPE_HELPER_UREF_MGR(upipe_ts_aitd, uref_mgr, uref_mgr_request, NULL, + upipe_ts_aitd_register_output_request, + upipe_ts_aitd_unregister_output_request) +UPIPE_HELPER_FLOW_DEF(upipe_ts_aitd, flow_def_input, flow_def_attr) + +/** @internal @This allocates a ts_aitd pipe. + * + * @param mgr common management structure + * @param uprobe structure used to raise events + * @param signature signature of the pipe allocator + * @param args optional arguments + * @return pointer to upipe or NULL in case of allocation error + */ +static struct upipe *upipe_ts_aitd_alloc(struct upipe_mgr *mgr, + struct uprobe *uprobe, + uint32_t signature, va_list args) +{ + struct upipe *upipe = upipe_ts_aitd_alloc_void(mgr, uprobe, signature, + args); + if (unlikely(upipe == NULL)) + return NULL; + + struct upipe_ts_aitd *upipe_ts_aitd = upipe_ts_aitd_from_upipe(upipe); + upipe_ts_aitd_init_urefcount(upipe); + upipe_ts_aitd_init_output(upipe); + upipe_ts_aitd_init_ubuf_mgr(upipe); + upipe_ts_aitd_init_uref_mgr(upipe); + upipe_ts_aitd_init_flow_def(upipe); + upipe_ts_psid_table_init(upipe_ts_aitd->ait); + upipe_ts_psid_table_init(upipe_ts_aitd->next_ait); + + upipe_throw_ready(upipe); + return upipe; +} + +/** @internal @This validates the next AIT. + * + * @param upipe description structure of the pipe + * @return false if the AIT is invalid + */ +static bool upipe_ts_aitd_table_validate(struct upipe *upipe) +{ + struct upipe_ts_aitd *upipe_ts_aitd = upipe_ts_aitd_from_upipe(upipe); + upipe_ts_psid_table_foreach (upipe_ts_aitd->next_ait, section_uref) { + const uint8_t *section; + int size = -1; + if (unlikely(!ubase_check(uref_block_read(section_uref, 0, &size, + §ion)))) + return false; + + uref_block_unmap(section_uref, 0); + } + return true; +} + +/** @internal @This outputs the AIT. + * + * @param upipe description structure of the pipe + */ +static void upipe_ts_aitd_send(struct upipe *upipe) +{ + struct upipe_ts_aitd *upipe_ts_aitd = upipe_ts_aitd_from_upipe(upipe); + struct uref *uref = NULL; + bool first = true; + + upipe_verbose(upipe, "send AIT"); + + if (unlikely(upipe_ts_aitd->uref_mgr == NULL)) + return; + + upipe_use(upipe); + + upipe_ts_psid_table_foreach(upipe_ts_aitd->ait, section) { + struct ubuf *ubuf = ubuf_dup(section->ubuf); + if (uref) + /* send previous section */ + upipe_ts_aitd_output(upipe, uref, NULL); + uref = uref_alloc(upipe_ts_aitd->uref_mgr); + + if (unlikely(uref == NULL || ubuf == NULL)) { + ubuf_free(ubuf); + uref_free(uref); + uref = NULL; + upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); + continue; + } + + uref_attach_ubuf(uref, ubuf); + if (first) + uref_block_set_start(uref); + first = false; + } + + if (uref) { + uref_block_set_end(uref); + upipe_ts_aitd_output(upipe, uref, NULL); + } + + upipe_release(upipe); +} + +/** @internal @This parses a new PSI section. + * + * @param upipe description structure of the pipe + * @param uref uref structure + * @param upump_p reference to pump that generated the buffer + */ +static void upipe_ts_aitd_input(struct upipe *upipe, struct uref *uref, + struct upump **upump_p) +{ + struct upipe_ts_aitd *upipe_ts_aitd = upipe_ts_aitd_from_upipe(upipe); + assert(upipe_ts_aitd->flow_def_input != NULL); + + if (!upipe_ts_psid_table_section(upipe_ts_aitd->next_ait, uref)) + return; + + if (upipe_ts_psid_table_validate(upipe_ts_aitd->ait) && + upipe_ts_psid_table_compare(upipe_ts_aitd->ait, + upipe_ts_aitd->next_ait)) { + /* Identical AIT. */ + upipe_ts_psid_table_clean(upipe_ts_aitd->next_ait); + upipe_ts_psid_table_init(upipe_ts_aitd->next_ait); + upipe_ts_aitd_send(upipe); + return; + } + + if (!ubase_check(upipe_ts_psid_table_merge(upipe_ts_aitd->next_ait, + upipe_ts_aitd->ubuf_mgr)) || + !upipe_ts_aitd_table_validate(upipe)) { + upipe_warn(upipe, "invalid AIT section received"); + upipe_ts_psid_table_clean(upipe_ts_aitd->next_ait); + upipe_ts_psid_table_init(upipe_ts_aitd->next_ait); + return; + } + + struct uref *flow_def = upipe_ts_aitd_alloc_flow_def_attr(upipe); + if (flow_def) + flow_def = upipe_ts_aitd_store_flow_def_attr(upipe, flow_def); + if (unlikely(flow_def == NULL)) { + upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); + return; + } + upipe_ts_aitd_store_flow_def(upipe, flow_def); + + /* Switch tables. */ + if (upipe_ts_psid_table_validate(upipe_ts_aitd->ait)) + upipe_ts_psid_table_clean(upipe_ts_aitd->ait); + upipe_ts_psid_table_copy(upipe_ts_aitd->ait, upipe_ts_aitd->next_ait); + upipe_ts_psid_table_init(upipe_ts_aitd->next_ait); + + upipe_ts_aitd_send(upipe); +} + +/** @internal @This receives an ubuf manager. + * + * @param upipe description structure of the pipe + * @param flow_format amended flow format + * @return an error code + */ +static int upipe_ts_aitd_check(struct upipe *upipe, struct uref *flow_format) +{ + struct upipe_ts_aitd *upipe_ts_aitd = upipe_ts_aitd_from_upipe(upipe); + + if (flow_format != NULL) { + flow_format = upipe_ts_aitd_store_flow_def_input(upipe, flow_format); + if (flow_format != NULL) + upipe_ts_aitd_store_flow_def(upipe, flow_format); + } + + if (unlikely(upipe_ts_aitd->uref_mgr == NULL)) + upipe_ts_aitd_demand_uref_mgr(upipe); + + return UBASE_ERR_NONE; +} + +/** @internal @This sets the input flow definition. + * + * @param upipe description structure of the pipe + * @param flow_def flow definition packet + * @return an error code + */ +static int upipe_ts_aitd_set_flow_def(struct upipe *upipe, + struct uref *flow_def) +{ + if (flow_def == NULL) + return UBASE_ERR_INVALID; + UBASE_RETURN(uref_flow_match_def(flow_def, EXPECTED_FLOW_DEF)) + struct uref *flow_def_dup; + if (unlikely((flow_def_dup = uref_dup(flow_def)) == NULL)) { + upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); + return UBASE_ERR_ALLOC; + } + upipe_ts_aitd_demand_ubuf_mgr(upipe, flow_def_dup); + return UBASE_ERR_NONE; +} + +/** @internal @This processes control commands. + * + * @param upipe description structure of the pipe + * @param command type of command to process + * @param args arguments of the command + * @return an error code + */ +static int upipe_ts_aitd_control_real(struct upipe *upipe, int command, + va_list args) +{ + UBASE_HANDLED_RETURN(upipe_ts_aitd_control_output(upipe, command, args)); + switch (command) { + case UPIPE_SET_FLOW_DEF: { + struct uref *flow_def = va_arg(args, struct uref *); + return upipe_ts_aitd_set_flow_def(upipe, flow_def); + } + + default: + return UBASE_ERR_UNHANDLED; + } +} + +/** @internal @This processes control commands and checks the internal state. + * + * @param upipe description structure of the pipe + * @param command type of command to process + * @param args arguments of the command + * @return an error code + */ +static int upipe_ts_aitd_control(struct upipe *upipe, int command, va_list args) +{ + UBASE_RETURN(upipe_ts_aitd_control_real(upipe, command, args)); + return upipe_ts_aitd_check(upipe, NULL); +} + +/** @This frees a upipe. + * + * @param upipe description structure of the pipe + */ +static void upipe_ts_aitd_free(struct upipe *upipe) +{ + upipe_throw_dead(upipe); + + struct upipe_ts_aitd *upipe_ts_aitd = upipe_ts_aitd_from_upipe(upipe); + upipe_ts_psid_table_clean(upipe_ts_aitd->ait); + upipe_ts_psid_table_clean(upipe_ts_aitd->next_ait); + upipe_ts_aitd_clean_output(upipe); + upipe_ts_aitd_clean_uref_mgr(upipe); + upipe_ts_aitd_clean_ubuf_mgr(upipe); + upipe_ts_aitd_clean_flow_def(upipe); + upipe_ts_aitd_clean_urefcount(upipe); + upipe_ts_aitd_free_void(upipe); +} + +/** module manager static descriptor */ +static struct upipe_mgr upipe_ts_aitd_mgr = { + .refcount = NULL, + .signature = UPIPE_TS_AITD_SIGNATURE, + + .upipe_alloc = upipe_ts_aitd_alloc, + .upipe_input = upipe_ts_aitd_input, + .upipe_control = upipe_ts_aitd_control, + + .upipe_mgr_control = NULL +}; + +/** @This returns the management structure for all ts_aitd pipes. + * + * @return pointer to manager + */ +struct upipe_mgr *upipe_ts_aitd_mgr_alloc(void) +{ + return &upipe_ts_aitd_mgr; +} diff --git a/lib/upipe-ts/upipe_ts_ait_generator.c b/lib/upipe-ts/upipe_ts_ait_generator.c new file mode 100644 index 000000000..b356f5782 --- /dev/null +++ b/lib/upipe-ts/upipe_ts_ait_generator.c @@ -0,0 +1,392 @@ +/* + * Copyright (C) 2024 EasyTools S.A.S. + * + * Authors: Arnaud de Turckheim + * + * This service is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 2.1 of the License, or + * (at your option) any later version. + * + * This service is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this service; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA. + */ + +/** @file + * @short Upipe module generating the application information table of DVB + * streams + * Normative references: + * - ETSI EN 300 468 V1.13.1 (2012-08) (SI in DVB systems) + * - ETSI TR 101 211 V1.9.1 (2009-06) (Guidelines of SI in DVB systems) + * - ETSI TS 102 809 V1.1.1 (2010-01) (Signalling and carriage of interactive + * applications and services) + */ + +#include "upipe/ubase.h" +#include "upipe/uclock.h" +#include "upipe/ulist.h" +#include "upipe/uref.h" +#include "upipe/uref_block_flow.h" +#include "upipe/uref_clock.h" +#include "upipe/uref_flow.h" +#include "upipe/uref_block.h" +#include "upipe/upipe.h" +#include "upipe/upipe_helper_upipe.h" +#include "upipe/upipe_helper_urefcount.h" +#include "upipe/upipe_helper_void.h" +#include "upipe/upipe_helper_output.h" +#include "upipe/upipe_helper_uclock.h" +#include "upipe/upipe_helper_upump_mgr.h" +#include "upipe/upipe_helper_upump.h" +#include "upipe-ts/upipe_ts_ait_generator.h" +#include "upipe-ts/uref_ts_flow.h" +#include "upipe_ts_psi_decoder.h" + +#include +#include +#include +#include +#include + +#include +#include + +/** we only accept TS packets */ +#define EXPECTED_FLOW_DEF "block.mpegtspsi.mpegtsait." +#define OUTPUT_FLOW_DEF "block.mpegtspsi.mpegtsait." + +/** @hidden */ +static int upipe_ts_aitg_check(struct upipe *upipe, struct uref *flow_format); + +/** @hidden */ +static void upipe_ts_aitg_schedule(struct upipe *upipe); + +/** @internal @This is the private context of a ts_aitg pipe. */ +struct upipe_ts_aitg { + /** refcount management structure */ + struct urefcount urefcount; + + /** uclock */ + struct uclock *uclock; + /** uclock request */ + struct urequest uclock_request; + + /** upump manager */ + struct upump_mgr *upump_mgr; + /** upump */ + struct upump *upump; + + /** pipe acting as output */ + struct upipe *output; + /** output flow definition */ + struct uref *flow_def; + /** output state */ + enum upipe_helper_output_state output_state; + /** list of output requests */ + struct uchain request_list; + + /** AIT output interval */ + uint64_t interval; + /** list of urefs composing the current AIT */ + struct uchain ait; + /** AIT size */ + uint64_t size; + + /** last output date */ + uint64_t last_cr_sys; + /** compute octetrate */ + uint64_t octetrate; + + /** public upipe structure */ + struct upipe upipe; +}; + +UPIPE_HELPER_UPIPE(upipe_ts_aitg, upipe, UPIPE_TS_AITG_SIGNATURE) +UPIPE_HELPER_UREFCOUNT(upipe_ts_aitg, urefcount, upipe_ts_aitg_free) +UPIPE_HELPER_VOID(upipe_ts_aitg) +UPIPE_HELPER_UPUMP_MGR(upipe_ts_aitg, upump_mgr) +UPIPE_HELPER_UPUMP(upipe_ts_aitg, upump, upump_mgr) +UPIPE_HELPER_OUTPUT(upipe_ts_aitg, output, flow_def, output_state, request_list) +UPIPE_HELPER_UCLOCK(upipe_ts_aitg, uclock, uclock_request, NULL, + upipe_ts_aitg_register_output_request, + upipe_ts_aitg_unregister_output_request); + +/** @internal @This allocates a ts_aitg pipe. + * + * @param mgr common management structure + * @param uprobe structure used to raise events + * @param signature signature of the pipe allocator + * @param args optional arguments + * @return pointer to upipe or NULL in case of allocation error + */ +static struct upipe *upipe_ts_aitg_alloc(struct upipe_mgr *mgr, + struct uprobe *uprobe, + uint32_t signature, va_list args) +{ + struct upipe *upipe = upipe_ts_aitg_alloc_void(mgr, uprobe, signature, + args); + if (unlikely(upipe == NULL)) + return NULL; + + struct upipe_ts_aitg *upipe_ts_aitg = upipe_ts_aitg_from_upipe(upipe); + upipe_ts_aitg_init_urefcount(upipe); + upipe_ts_aitg_init_uclock(upipe); + upipe_ts_aitg_init_upump_mgr(upipe); + upipe_ts_aitg_init_upump(upipe); + upipe_ts_aitg_init_output(upipe); + ulist_init(&upipe_ts_aitg->ait); + upipe_ts_aitg->size = 0; + upipe_ts_aitg->interval = UCLOCK_FREQ; + upipe_ts_aitg->last_cr_sys = 0; + upipe_ts_aitg->octetrate = 1; + + upipe_throw_ready(upipe); + return upipe; +} + +/** @internal @This flushes the current AIT if any. + * + * @param upipe description structure of the pipe + */ +static void upipe_ts_aitg_flush(struct upipe *upipe) +{ + struct upipe_ts_aitg *upipe_ts_aitg = upipe_ts_aitg_from_upipe(upipe); + struct uchain *uchain; + + while ((uchain = ulist_pop(&upipe_ts_aitg->ait))) + uref_free(uref_from_uchain(uchain)); + upipe_ts_aitg->size = 0; +} + +/** @This frees a upipe. + * + * @param upipe description structure of the pipe + */ +static void upipe_ts_aitg_free(struct upipe *upipe) +{ + upipe_throw_dead(upipe); + + upipe_ts_aitg_flush(upipe); + upipe_ts_aitg_clean_output(upipe); + upipe_ts_aitg_clean_upump(upipe); + upipe_ts_aitg_clean_upump_mgr(upipe); + upipe_ts_aitg_clean_uclock(upipe); + upipe_ts_aitg_clean_urefcount(upipe); + upipe_ts_aitg_free_void(upipe); +} + +/** @internal @This outputs the AIT. + * + * @param upipe description structure of the pipe + */ +static void upipe_ts_aitg_send(struct upipe *upipe) +{ + struct upipe_ts_aitg *upipe_ts_aitg = upipe_ts_aitg_from_upipe(upipe); + uint64_t now = upipe_ts_aitg_now(upipe); + struct uchain *uchain; + + /* duplicate AIT */ + struct uchain urefs; + ulist_init(&urefs); + ulist_foreach(&upipe_ts_aitg->ait, uchain) { + struct uref *uref = uref_dup(uref_from_uchain(uchain)); + if (unlikely(!uref)) { + upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); + while ((uchain = ulist_pop(&urefs))) + uref_free(uref_from_uchain(uchain)); + return; + } + ulist_add(&urefs, uref_to_uchain(uref)); + } + + /* send duplicated AIT */ + upipe_use(upipe); + upipe_ts_aitg->last_cr_sys = now; + while ((uchain = ulist_pop(&urefs))) { + struct uref *uref = uref_from_uchain(uchain); + uref_clock_set_cr_sys(uref, now); + upipe_ts_aitg_output(upipe, uref, &upipe_ts_aitg->upump); + } + if (!upipe_single(upipe)) + upipe_ts_aitg_schedule(upipe); + upipe_release(upipe); +} + +/** @internal @This is called when the timer timeout. + * + * @param upump description structure of the timer + */ +static void upipe_ts_aitg_upump_cb(struct upump *upump) +{ + struct upipe *upipe = upump_get_opaque(upump, struct upipe *); + upipe_ts_aitg_send(upipe); +} + +/** @internal @This schedules the next output of the AIT. + * + * @param upipe description structure of the pipe + */ +static void upipe_ts_aitg_schedule(struct upipe *upipe) +{ + struct upipe_ts_aitg *upipe_ts_aitg = upipe_ts_aitg_from_upipe(upipe); + uint64_t now = upipe_ts_aitg_now(upipe); + if (now == UINT64_MAX) + return; + + uint64_t next_cr_sys = upipe_ts_aitg->last_cr_sys + upipe_ts_aitg->interval; + if (next_cr_sys <= now) + upipe_ts_aitg_send(upipe); + else + upipe_ts_aitg_wait_upump(upipe, next_cr_sys - now, + upipe_ts_aitg_upump_cb); +} + +/** @internal @This is called when a new AIT is ready. + * + * @param upipe description structure of the pipe + */ +static void upipe_ts_aitg_work(struct upipe *upipe, struct uref *uref) +{ + struct upipe_ts_aitg *upipe_ts_aitg = upipe_ts_aitg_from_upipe(upipe); + uint64_t size = 0; + + uref_block_size(uref, &size); + upipe_ts_aitg->size += size; + ulist_add(&upipe_ts_aitg->ait, uref_to_uchain(uref)); + + if (!ubase_check(uref_block_get_end(uref))) + return; + + uint64_t octetrate = upipe_ts_aitg->size * UCLOCK_FREQ / + upipe_ts_aitg->interval; + if (octetrate > upipe_ts_aitg->octetrate) { + struct uref *flow_def = uref_dup(upipe_ts_aitg->flow_def); + uref_block_flow_set_octetrate(flow_def, octetrate); + upipe_ts_aitg_store_flow_def(upipe, flow_def); + upipe_ts_aitg->octetrate = octetrate; + } + + upipe_ts_aitg_schedule(upipe); +} + +/** @internal @This parses a new PSI section. + * + * @param upipe description structure of the pipe + * @param uref uref structure + * @param upump_p reference to pump that generated the buffer + */ +static void upipe_ts_aitg_input(struct upipe *upipe, struct uref *uref, + struct upump **upump_p) +{ + upipe_ts_aitg_set_upump(upipe, NULL); + + if (ubase_check(uref_block_get_start(uref))) + upipe_ts_aitg_flush(upipe); + + upipe_ts_aitg_work(upipe, uref); +} + +/** @internal @This sets the input flow definition. + * + * @param upipe description structure of the pipe + * @param flow_def flow definition packet + * @return an error code + */ +static int upipe_ts_aitg_set_flow_def(struct upipe *upipe, + struct uref *flow_def) +{ + UBASE_RETURN(uref_flow_match_def(flow_def, EXPECTED_FLOW_DEF)) + struct uref *flow_def_dup; + if (unlikely((flow_def_dup = uref_dup(flow_def)) == NULL)) { + upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); + return UBASE_ERR_ALLOC; + } + upipe_ts_aitg_store_flow_def(upipe, flow_def_dup); + return UBASE_ERR_NONE; +} + +/** @internal @This processes control commands. + * + * @param upipe description structure of the pipe + * @param command type of command to process + * @param args arguments of the command + * @return an error code + */ +static int upipe_ts_aitg_control_real(struct upipe *upipe, int command, + va_list args) +{ + UBASE_HANDLED_RETURN(upipe_ts_aitg_control_output(upipe, command, args)); + switch (command) { + case UPIPE_ATTACH_UCLOCK: + upipe_ts_aitg_set_upump(upipe, NULL); + upipe_ts_aitg_require_uclock(upipe); + return UBASE_ERR_NONE; + case UPIPE_ATTACH_UPUMP_MGR: + upipe_ts_aitg_set_upump(upipe, NULL); + return upipe_ts_aitg_attach_upump_mgr(upipe); + + case UPIPE_SET_FLOW_DEF: { + struct uref *flow_def = va_arg(args, struct uref *); + return upipe_ts_aitg_set_flow_def(upipe, flow_def); + } + + default: + return UBASE_ERR_UNHANDLED; + } +} + +/** @internal @This checks the internal state of the pipe. + * + * @param upipe description structure of the pipe + * @param flow_format amended flow format + * @return an error code + */ +static int upipe_ts_aitg_check(struct upipe *upipe, struct uref *flow_format) +{ + struct upipe_ts_aitg *upipe_ts_aitg = upipe_ts_aitg_from_upipe(upipe); + + upipe_ts_aitg_check_upump_mgr(upipe); + if (unlikely(!upipe_ts_aitg->uclock)) + upipe_ts_aitg_require_uclock(upipe); + return UBASE_ERR_NONE; +} + +/** @internal @This processes control commands and checks the internal state. + * + * @param upipe description structure of the pipe + * @param command type of command to process + * @param args arguments of the command + * @return an error code + */ +static int upipe_ts_aitg_control(struct upipe *upipe, int command, va_list args) +{ + UBASE_RETURN(upipe_ts_aitg_control_real(upipe, command, args)); + return upipe_ts_aitg_check(upipe, NULL); +} + +/** module manager static descriptor */ +static struct upipe_mgr upipe_ts_aitg_mgr = { + .refcount = NULL, + .signature = UPIPE_TS_AITG_SIGNATURE, + + .upipe_alloc = upipe_ts_aitg_alloc, + .upipe_input = upipe_ts_aitg_input, + .upipe_control = upipe_ts_aitg_control, + + .upipe_mgr_control = NULL +}; + +/** @This returns the management structure for all ts_aitg pipes. + * + * @return pointer to manager + */ +struct upipe_mgr *upipe_ts_aitg_mgr_alloc(void) +{ + return &upipe_ts_aitg_mgr; +} diff --git a/lib/upipe-ts/upipe_ts_demux.c b/lib/upipe-ts/upipe_ts_demux.c index 4c204ea01..afb753efe 100644 --- a/lib/upipe-ts/upipe_ts_demux.c +++ b/lib/upipe-ts/upipe_ts_demux.c @@ -40,6 +40,7 @@ */ #include "upipe/config.h" +#include "upipe/ubase.h" #include "upipe/ulist.h" #include "upipe/uprobe.h" #include "upipe/uprobe_prefix.h" @@ -73,6 +74,7 @@ #include "upipe-ts/upipe_ts_sync.h" #include "upipe-ts/upipe_ts_check.h" #include "upipe-ts/upipe_ts_decaps.h" +#include "upipe-ts/upipe_ts_ait_decoder.h" #include "upipe-ts/upipe_ts_eit_decoder.h" #include "upipe-ts/upipe_ts_nit_decoder.h" #include "upipe-ts/upipe_ts_psi_merge.h" @@ -173,6 +175,8 @@ struct upipe_ts_demux_mgr { struct upipe_mgr *ts_eitd_mgr; /** pointer to ts_scte35d manager */ struct upipe_mgr *ts_scte35d_mgr; + /** pointer to ts_aitd manager */ + struct upipe_mgr *ts_aitd_mgr; /* ES */ /** pointer to ts_pesd manager */ @@ -846,6 +850,28 @@ static int upipe_ts_demux_output_plumber(struct upipe *upipe, return UBASE_ERR_NONE; } + if (!ubase_ncmp(def, "block.mpegtspsi.mpegtsait.")) { + /* allocate ts_psim inner */ + struct upipe *output = + upipe_void_alloc_output(inner, ts_demux_mgr->ts_psim_mgr, + uprobe_pfx_alloc( + uprobe_use(&upipe_ts_demux_output->probe), + UPROBE_LOG_VERBOSE, "psim")); + if (unlikely(output == NULL)) + return UBASE_ERR_ALLOC; + + /** allocate AIT decoder */ + output = upipe_void_chain_output(output, ts_demux_mgr->ts_aitd_mgr, + uprobe_pfx_alloc( + uprobe_use(&upipe_ts_demux_output->last_inner_probe), + UPROBE_LOG_VERBOSE, "aitd")); + if (unlikely(output == NULL)) + return UBASE_ERR_ALLOC; + + upipe_ts_demux_output_store_bin_output(upipe, output); + return UBASE_ERR_NONE; + } + if (!ubase_ncmp(def, "block.dvb_teletext.") && ts_demux_mgr->probe_uref_mgr != NULL) { /* allocate probe_uref for teletext without PTS */ @@ -3873,6 +3899,7 @@ static void upipe_ts_demux_mgr_free(struct urefcount *urefcount) upipe_mgr_release(ts_demux_mgr->ts_eitd_mgr); upipe_mgr_release(ts_demux_mgr->ts_pesd_mgr); upipe_mgr_release(ts_demux_mgr->ts_scte35d_mgr); + upipe_mgr_release(ts_demux_mgr->ts_aitd_mgr); upipe_mgr_release(ts_demux_mgr->autof_mgr); urefcount_clean(urefcount); @@ -3975,6 +4002,7 @@ struct upipe_mgr *upipe_ts_demux_mgr_alloc(void) ts_demux_mgr->ts_eitd_mgr = upipe_ts_eitd_mgr_alloc(); ts_demux_mgr->ts_pesd_mgr = upipe_ts_pesd_mgr_alloc(); ts_demux_mgr->ts_scte35d_mgr = upipe_ts_scte35d_mgr_alloc(); + ts_demux_mgr->ts_aitd_mgr = upipe_ts_aitd_mgr_alloc(); ts_demux_mgr->autof_mgr = NULL; diff --git a/lib/upipe-ts/upipe_ts_mux.c b/lib/upipe-ts/upipe_ts_mux.c index 4de96d5ac..0783e4fd4 100644 --- a/lib/upipe-ts/upipe_ts_mux.c +++ b/lib/upipe-ts/upipe_ts_mux.c @@ -37,6 +37,7 @@ * - ETSI TS 103 197 V1.3.1 (2003-01) (DVB SimulCrypt) */ +#include "upipe/ubase.h" #include "upipe/ulist.h" #include "upipe/uprobe.h" #include "upipe/uprobe_prefix.h" @@ -73,6 +74,7 @@ #include "upipe-ts/upipe_ts_psi_generator.h" #include "upipe-ts/upipe_ts_si_generator.h" #include "upipe-ts/upipe_ts_scte35_generator.h" +#include "upipe-ts/upipe_ts_ait_generator.h" #include "upipe-ts/upipe_ts_tstd.h" #include @@ -200,6 +202,8 @@ struct upipe_ts_mux_mgr { struct upipe_mgr *ts_sig_mgr; /** pointer to ts_scte35g manager */ struct upipe_mgr *ts_scte35g_mgr; + /** pointer to ts_aitg manager */ + struct upipe_mgr *ts_aitg_mgr; /* ES */ /** pointer to ts_tstd manager */ @@ -516,6 +520,8 @@ enum upipe_ts_mux_input_type { UPIPE_TS_MUX_INPUT_SCTE35, /** metadata */ UPIPE_TS_MUX_INPUT_METADATA, + /** PSI */ + UPIPE_TS_MUX_INPUT_PSI, }; @@ -1119,8 +1125,9 @@ static int upipe_ts_mux_input_set_flow_def(struct upipe *upipe, const char *sub_def = def; if (!ubase_ncmp(sub_def, "void.")) { sub_def += strlen("void"); - } - else if (!ubase_ncmp(sub_def, "block.")) { + } else if (!ubase_ncmp(sub_def, "block.mpegtspsi.")) { + sub_def += strlen("block"); + } else if (!ubase_ncmp(sub_def, "block.")) { sub_def += strlen("block"); uref_block_flow_get_octetrate(flow_def, &octetrate); @@ -1186,6 +1193,8 @@ static int upipe_ts_mux_input_set_flow_def(struct upipe *upipe, flow_def_dup, PES_STREAM_ID_PRIVATE_1)); } else if (!ubase_ncmp(def, "void.scte35.")) { input_type = UPIPE_TS_MUX_INPUT_SCTE35; + } else if (!ubase_ncmp(def, "mpegtspsi.")) { + input_type = UPIPE_TS_MUX_INPUT_PSI; } uint64_t pes_overhead = 0; @@ -1419,6 +1428,45 @@ static int upipe_ts_mux_input_set_flow_def(struct upipe *upipe, } else if (!ubase_ncmp(def, "void.")) { /* virtual PES */ upipe_ts_mux_input_store_bin_input(upipe, upipe_use(input->psig_flow)); + } else if (!ubase_ncmp(def, "block.mpegtspsi.")) { /* PSI */ + struct upipe_ts_mux_mgr *ts_mux_mgr = + upipe_ts_mux_mgr_from_upipe_mgr(upipe_ts_mux_to_upipe(upipe_ts_mux)->mgr); + struct upipe *aitg; + if (unlikely((aitg = upipe_void_alloc( + ts_mux_mgr->ts_aitg_mgr, + uprobe_pfx_alloc_va( + uprobe_use(&input->probe), + UPROBE_LOG_VERBOSE, "aitg"))) == NULL) || + !ubase_check(upipe_set_flow_def(aitg, flow_def))) { + upipe_release(aitg); + return UBASE_ERR_ALLOC; + } + //upipe_ts_aitg_set_interval(aitg, input->aitg_interval); + upipe_ts_mux_input_store_bin_input(upipe, aitg); + + if (input->psi_pid != NULL && input->psi_pid->pid != pid) { + upipe_ts_mux_psi_pid_release(input->psi_pid); + input->psi_pid = NULL; + } + if (input->psi_pid == NULL) + input->psi_pid = + upipe_ts_mux_psi_pid_use(upipe_ts_mux_to_upipe(upipe_ts_mux), + pid); + + if (unlikely(!ubase_check(upipe_void_spawn_output_sub(aitg, + input->psi_pid->psi_join, + uprobe_pfx_alloc( + uprobe_use(&input->probe), + UPROBE_LOG_VERBOSE, "aitg psi_join"))))) + upipe_throw_fatal(upipe, UBASE_ERR_ALLOC); + + UBASE_FATAL(upipe, uref_ts_flow_set_tb_rate(flow_def_dup, TB_RATE_PSI)); + + if (!ubase_check(upipe_set_flow_def(input->psig_flow, flow_def_dup))) { + uref_free(flow_def_dup); + return UBASE_ERR_ALLOC; + } + } else { /* standard PES */ upipe_ts_mux_input_store_bin_input(upipe, upipe_use(input->tstd)); upipe_ts_mux_set_max_delay(input->tstd, input->max_delay); @@ -5050,6 +5098,7 @@ static void upipe_ts_mux_mgr_free(struct urefcount *urefcount) upipe_mgr_release(ts_mux_mgr->ts_psig_mgr); upipe_mgr_release(ts_mux_mgr->ts_sig_mgr); upipe_mgr_release(ts_mux_mgr->ts_scte35g_mgr); + upipe_mgr_release(ts_mux_mgr->ts_aitg_mgr); urefcount_clean(urefcount); free(ts_mux_mgr); @@ -5183,6 +5232,7 @@ struct upipe_mgr *upipe_ts_mux_mgr_alloc(void) ts_mux_mgr->ts_psig_mgr = upipe_ts_psig_mgr_alloc(); ts_mux_mgr->ts_sig_mgr = upipe_ts_sig_mgr_alloc(); ts_mux_mgr->ts_scte35g_mgr = upipe_ts_scte35g_mgr_alloc(); + ts_mux_mgr->ts_aitg_mgr = upipe_ts_aitg_mgr_alloc(); urefcount_init(upipe_ts_mux_mgr_to_urefcount(ts_mux_mgr), upipe_ts_mux_mgr_free); diff --git a/lib/upipe-ts/upipe_ts_pmt_decoder.c b/lib/upipe-ts/upipe_ts_pmt_decoder.c index 639bf65ed..b51e69418 100644 --- a/lib/upipe-ts/upipe_ts_pmt_decoder.c +++ b/lib/upipe-ts/upipe_ts_pmt_decoder.c @@ -516,6 +516,18 @@ static void upipe_ts_pmtd_parse_es_descs(struct upipe *upipe, break; case 0x6b: /* Ancillary data descriptor */ + break; + + case 0x6f: /* Application signalling descriptor */ + if ((valid = desc6f_validate(desc))) { + UBASE_FATAL(upipe, uref_flow_set_def(flow_def, + "block,mpegts.mpegtspsi.mpegtsait.")) + UBASE_FATAL(upipe, uref_flow_set_raw_def(flow_def, + "block.mpegts.mpegtspsi.mpegtsait.")) + copy = true; + } + break; + case 0x70: /* Adaptation field data descriptor */ break; diff --git a/lib/upipe-ts/upipe_ts_psi_generator.c b/lib/upipe-ts/upipe_ts_psi_generator.c index 280198f57..52432810d 100644 --- a/lib/upipe-ts/upipe_ts_psi_generator.c +++ b/lib/upipe-ts/upipe_ts_psi_generator.c @@ -388,6 +388,7 @@ static int upipe_ts_psig_flow_check_inner(struct upipe *upipe, } } else if (ubase_ncmp(raw_def, "void.scte35.") && + ubase_ncmp(sub_def, ".mpegtspsi.mpegtsait.") && ubase_ncmp(sub_def, ".mpeg1video.") && ubase_ncmp(sub_def, ".mpeg2video.") && ubase_ncmp(sub_def, ".mpeg4.") && @@ -523,6 +524,8 @@ static int upipe_ts_psig_flow_build_inner(struct upipe *upipe, uint8_t *es, stream_type = PMT_STREAMTYPE_ATSC_A52E; } else if (!ubase_ncmp(sub_def, ".id3.")) { stream_type = PMT_STREAMTYPE_META_PES; + } else if (!ubase_ncmp(sub_def, ".mpegtspsi.")) { + stream_type = PMT_STREAMTYPE_PRIVATE_PSI; } upipe_notice_va(upipe, @@ -942,7 +945,9 @@ static int upipe_ts_psig_flow_set_flow_def(struct upipe *upipe, return UBASE_ERR_INVALID; uint64_t pid; const char *raw_def; - UBASE_RETURN(uref_flow_match_def(flow_def, "void.")) + if (unlikely(!ubase_check(uref_flow_match_def(flow_def, "void.")) && + !ubase_check(uref_flow_match_def(flow_def, "block.")))) + return UBASE_ERR_INVALID; UBASE_RETURN(uref_ts_flow_get_pid(flow_def, &pid)) UBASE_RETURN(uref_flow_get_raw_def(flow_def, &raw_def))