diff --git a/doc/kernel/services/data_passing/pipes.rst b/doc/kernel/services/data_passing/pipes.rst index b3130687db5da2..abe9ab1800fc4d 100644 --- a/doc/kernel/services/data_passing/pipes.rst +++ b/doc/kernel/services/data_passing/pipes.rst @@ -4,8 +4,8 @@ Pipes ##### A :dfn:`pipe` is a kernel object that allows a thread to send a byte stream -to another thread. Pipes can be used to synchronously transfer chunks of data -in whole or in part. +to another thread. Pipes enable efficient inter-thread communication and can +be used to synchronously transfer chunks of data in whole or in part. .. contents:: :local: @@ -14,192 +14,171 @@ in whole or in part. Concepts ******** -The pipe can be configured with a ring buffer which holds data that has been -sent but not yet received; alternatively, the pipe may have no ring buffer. - -Any number of pipes can be defined (limited only by available RAM). Each pipe is -referenced by its memory address. +Any number of pipes can be defined, limited only by available RAM. Each pipe +is referenced by its memory address. A pipe has the following key property: -* A **size** that indicates the size of the pipe's ring buffer. Note that a - size of zero defines a pipe with no ring buffer. - -A pipe must be initialized before it can be used. The pipe is initially empty. - -Data is synchronously **sent** either in whole or in part to a pipe by a -thread. If the specified minimum number of bytes can not be immediately -satisfied, then the operation will either fail immediately or attempt to send -as many bytes as possible and then pend in the hope that the send can be -completed later. Accepted data is either copied to the pipe's ring buffer -or directly to the waiting reader(s). - -Data is synchronously **received** from a pipe by a thread. If the specified -minimum number of bytes can not be immediately satisfied, then the operation -will either fail immediately or attempt to receive as many bytes as possible -and then pend in the hope that the receive can be completed later. Accepted -data is either copied from the pipe's ring buffer or directly from the -waiting sender(s). - -Data may also be **flushed** from a pipe by a thread. Flushing can be performed -either on the entire pipe or on only its ring buffer. Flushing the entire pipe -is equivalent to reading all the information in the ring buffer **and** waiting -to be written into a giant temporary buffer which is then discarded. Flushing -the ring buffer is equivalent to reading **only** the data in the ring buffer -into a temporary buffer which is then discarded. Flushing the ring buffer does -not guarantee that the ring buffer will stay empty; flushing it may allow a -pended writer to fill the ring buffer. - -.. note:: - Flushing does not in practice allocate or use additional buffers. - -.. note:: - The kernel does allow for an ISR to flush a pipe from an ISR. It also - allows it to send/receive data to/from one provided it does not attempt - to wait for space/data. +* A **size** that indicates the capacity of the pipe's ring buffer. + +A pipe must be initialized before it can be used. When initialized, the pipe +is empty. + +Threads interact with the pipe as follows: + +- **Writing**: Data is synchronously written, either in whole or in part, to + a pipe by a thread. If the pipe's ring buffer is full, the operation blocks + until sufficient space becomes available or the specified timeout expires. + +- **Reading**: Data is synchronously read, either in whole or in part, from a + pipe by a thread. If the pipe's ring buffer is empty, the operation blocks + until data becomes available or the specified timeout expires. + +- **Resetting**: A thread can reset a pipe, which resets its internal state and + ends all pending read and write operations with an error code. + +Pipes are well-suited for scenarios like producer-consumer patterns or +streaming data between threads. Implementation ************** -A pipe is defined using a variable of type :c:struct:`k_pipe` and an -optional character buffer of type ``unsigned char``. It must then be -initialized by calling :c:func:`k_pipe_init`. - -The following code defines and initializes an empty pipe that has a ring -buffer capable of holding 100 bytes and is aligned to a 4-byte boundary. +A pipe is defined using a variable of type :c:struct:`k_pipe` and a +byte buffer. The pipe must then be initialized by calling :c:func:`k_pipe_init`. +The following code defines and initializes an empty pipe with a ring buffer +capable of holding 100 bytes, aligned to a 4-byte boundary: .. code-block:: c - unsigned char __aligned(4) my_ring_buffer[100]; + uint8_t __aligned(4) my_ring_buffer[100]; struct k_pipe my_pipe; k_pipe_init(&my_pipe, my_ring_buffer, sizeof(my_ring_buffer)); -Alternatively, a pipe can be defined and initialized at compile time by -calling :c:macro:`K_PIPE_DEFINE`. - -The following code has the same effect as the code segment above. Observe -that macro defines both the pipe and its ring buffer. +Alternatively, a pipe can be defined and initialized at compile time using +the :c:macro:`K_PIPE_DEFINE` macro, which defines both the pipe and its +ring buffer: .. code-block:: c K_PIPE_DEFINE(my_pipe, 100, 4); +This has the same effect as the code above. + Writing to a Pipe ================= -Data is added to a pipe by calling :c:func:`k_pipe_put`. +Data is added to a pipe by calling :c:func:`k_pipe_write`. -The following code builds on the example above, and uses the pipe to pass -data from a producing thread to one or more consuming threads. If the pipe's -ring buffer fills up because the consumers can't keep up, the producing thread -waits for a specified amount of time. +The following example demonstrates using a pipe to send data from a producer +thread to one or more consumer threads. If the pipe's ring buffer fills up, +the producer thread waits for a specified amount of time. .. code-block:: c - struct message_header { - ... - }; - - void producer_thread(void) - { - unsigned char *data; - size_t total_size; - size_t bytes_written; - int rc; - ... - - while (1) { - /* Craft message to send in the pipe */ - data = ...; - total_size = ...; - - /* send data to the consumers */ - rc = k_pipe_put(&my_pipe, data, total_size, &bytes_written, - sizeof(struct message_header), K_NO_WAIT); - - if (rc < 0) { - /* Incomplete message header sent */ - ... - } else if (bytes_written < total_size) { - /* Some of the data was sent */ - ... - } else { - /* All data sent */ - ... - } - } - } + struct message_header { + size_t num_data_bytes; /* Example field */ + ... + }; + + void producer_thread(void) + { + int rc; + uint8_t *data; + size_t total_size; + size_t bytes_written; + + while (1) { + /* Craft message to send in the pipe */ + make_message(data, &total_size); + bytes_written = 0; + + /* Write data to the pipe, handling partial writes */ + while (bytes_written < total_size) { + rc = k_pipe_write(&my_pipe, &data[bytes_written], total_size - bytes_written, K_NO_WAIT); + + if (rc < 0) { + /* Error occurred */ + ... + break; + } else { + /* Partial or full write succeeded; adjust for next iteration */ + bytes_written += rc; + } + } + + /* Reset bytes_written for the next message */ + bytes_written = 0; + ... + } + } Reading from a Pipe =================== -Data is read from the pipe by calling :c:func:`k_pipe_get`. - -The following code builds on the example above, and uses the pipe to -process data items generated by one or more producing threads. - -.. code-block:: c - - void consumer_thread(void) - { - unsigned char buffer[120]; - size_t bytes_read; - struct message_header *header = (struct message_header *)buffer; - - while (1) { - rc = k_pipe_get(&my_pipe, buffer, sizeof(buffer), &bytes_read, - sizeof(*header), K_MSEC(100)); - - if ((rc < 0) || (bytes_read < sizeof (*header))) { - /* Incomplete message header received */ - ... - } else if (header->num_data_bytes + sizeof(*header) > bytes_read) { - /* Only some data was received */ - ... - } else { - /* All data was received */ - ... - } - } - } - -Use a pipe to send streams of data between threads. - -.. note:: - A pipe can be used to transfer long streams of data if desired. However - it is often preferable to send pointers to large data items to avoid - copying the data. - -Flushing a Pipe's Buffer -======================== +Data is retrieved from the pipe by calling :c:func:`k_pipe_read`. -Data is flushed from the pipe's ring buffer by calling -:c:func:`k_pipe_buffer_flush`. - -The following code builds on the examples above, and flushes the pipe's -buffer. +The following example builds on the producer thread example above. It shows +a consumer thread that processes data generated by the producer. .. code-block:: c - void monitor_thread(void) - { - while (1) { - ... - /* Pipe buffer contains stale data. Flush it. */ - k_pipe_buffer_flush(&my_pipe); - ... - } - } - -Flushing a Pipe -=============== - -All data in the pipe is flushed by calling :c:func:`k_pipe_flush`. - -The following code builds on the examples above, and flushes all the -data in the pipe. + struct message_header { + size_t num_data_bytes; /* Example field */ + ... + }; + + void consumer_thread(void) + { + int rc; + uint8_t buffer[128]; + size_t bytes_read = 0; + struct message_header *header = (struct message_header *)buffer; + + while (1) { + /* Step 1: Read the message header */ + bytes_read = 0; + read_header: + while (bytes_read < sizeof(*header)) { + rc = k_pipe_read(&my_pipe, &buffer[bytes_read], sizeof(*header) - bytes_read, &bytes_read, K_NO_WAIT); + + if (rc < 0) { + /* Error occurred */ + ... + goto read_header; + } + + /* Adjust for partial reads */ + bytes_read += rc; + } + + /* Step 2: Read the message body */ + bytes_read = 0; + while (bytes_read < header->num_data_bytes) { + rc = k_pipe_read(&my_pipe, &buffer[sizeof(*header) + bytes_read], header->num_data_bytes - bytes_read, K_NO_WAIT); + + if (rc < 0) { + /* Error occurred */ + ... + goto read_header; + } + + /* Adjust for partial reads */ + bytes_read += rc; + } + /* Successfully received the complete message */ + } + } + +Resetting a Pipe +================ + +The pipe can be reset by calling :c:func:`k_pipe_reset`. Resetting a pipe +resets its internal state and ends all pending operations with an error code. + +The following example demonstrates resetting a pipe in response to a critical +error: .. code-block:: c @@ -207,31 +186,21 @@ data in the pipe. { while (1) { ... - /* Critical error detected. Flush the entire pipe to reset it. */ - k_pipe_flush(&my_pipe); + /* Critical error detected: reset the entire pipe to reset it. */ + k_pipe_reset(&my_pipe); ... } } - -Suggested uses +Suggested Uses ************** -Use a pipe to send streams of data between threads. - -.. note:: - A pipe can be used to transfer long streams of data if desired. However it - is often preferable to send pointers to large data items to avoid copying - the data. Copying large data items will negatively impact interrupt latency - as a spinlock is held while copying that data. - - -Configuration Options -********************* - -Related configuration options: +Pipes are useful for sending streams of data between threads. Typical +applications include: -* :kconfig:option:`CONFIG_PIPES` +- Implementing producer-consumer patterns. +- Streaming logs or packets between threads. +- Handling variable-length message passing in real-time systems. API Reference ************* diff --git a/drivers/ethernet/eth_sam_gmac.c b/drivers/ethernet/eth_sam_gmac.c index e3748a86108329..0c2740095c9bca 100644 --- a/drivers/ethernet/eth_sam_gmac.c +++ b/drivers/ethernet/eth_sam_gmac.c @@ -403,7 +403,7 @@ static inline void eth_sam_gmac_init_qav(Gmac *gmac) /* * Reset ring buffer */ -static void ring_buf_reset(struct ring_buf *rb) +static void ring_buffer_reset(struct ring_buffer *rb) { rb->head = 0U; rb->tail = 0U; @@ -412,7 +412,7 @@ static void ring_buf_reset(struct ring_buf *rb) /* * Get one 32 bit item from the ring buffer */ -static uint32_t ring_buf_get(struct ring_buf *rb) +static uint32_t ring_buffer_get(struct ring_buffer *rb) { uint32_t val; @@ -428,7 +428,7 @@ static uint32_t ring_buf_get(struct ring_buf *rb) /* * Put one 32 bit item into the ring buffer */ -static void ring_buf_put(struct ring_buf *rb, uint32_t val) +static void ring_buffer_put(struct ring_buffer *rb, uint32_t val) { rb->buf[rb->head] = val; MODULO_INC(rb->head, rb->len); @@ -528,9 +528,9 @@ static void tx_descriptors_init(Gmac *gmac, struct gmac_queue *queue) #if GMAC_MULTIPLE_TX_PACKETS == 1 /* Reset TX frame list */ - ring_buf_reset(&queue->tx_frag_list); + ring_buffer_reset(&queue->tx_frag_list); #if defined(CONFIG_PTP_CLOCK_SAM_GMAC) - ring_buf_reset(&queue->tx_frames); + ring_buffer_reset(&queue->tx_frames); #endif #endif } @@ -721,14 +721,14 @@ static void tx_completed(Gmac *gmac, struct gmac_queue *queue) k_sem_give(&queue->tx_desc_sem); /* Release net buffer to the buffer pool */ - frag = UINT_TO_POINTER(ring_buf_get(&queue->tx_frag_list)); + frag = UINT_TO_POINTER(ring_buffer_get(&queue->tx_frag_list)); net_pkt_frag_unref(frag); LOG_DBG("Dropping frag %p", frag); if (tx_desc->w1 & GMAC_TXW1_LASTBUFFER) { #if defined(CONFIG_PTP_CLOCK_SAM_GMAC) /* Release net packet to the packet pool */ - pkt = UINT_TO_POINTER(ring_buf_get(&queue->tx_frames)); + pkt = UINT_TO_POINTER(ring_buffer_get(&queue->tx_frames)); #if defined(CONFIG_NET_GPTP) hdr = check_gptp_msg(get_iface(dev_data), @@ -756,10 +756,10 @@ static void tx_error_handler(Gmac *gmac, struct gmac_queue *queue) { #if GMAC_MULTIPLE_TX_PACKETS == 1 struct net_buf *frag; - struct ring_buf *tx_frag_list = &queue->tx_frag_list; + struct ring_buffer *tx_frag_list = &queue->tx_frag_list; #if defined(CONFIG_PTP_CLOCK_SAM_GMAC) struct net_pkt *pkt; - struct ring_buf *tx_frames = &queue->tx_frames; + struct ring_buffer *tx_frames = &queue->tx_frames; #endif #endif @@ -1495,7 +1495,7 @@ static int eth_tx(const struct device *dev, struct net_pkt *pkt) "tx_desc_list overflow"); /* Account for a sent frag */ - ring_buf_put(&queue->tx_frag_list, POINTER_TO_UINT(frag)); + ring_buffer_put(&queue->tx_frag_list, POINTER_TO_UINT(frag)); /* frag is internally queued, so it requires to hold a reference */ net_pkt_frag_ref(frag); @@ -1533,7 +1533,7 @@ static int eth_tx(const struct device *dev, struct net_pkt *pkt) #if GMAC_MULTIPLE_TX_PACKETS == 1 #if defined(CONFIG_PTP_CLOCK_SAM_GMAC) /* Account for a sent frame */ - ring_buf_put(&queue->tx_frames, POINTER_TO_UINT(pkt)); + ring_buffer_put(&queue->tx_frames, POINTER_TO_UINT(pkt)); /* pkt is internally queued, so it requires to hold a reference */ net_pkt_ref(pkt); diff --git a/drivers/ethernet/eth_sam_gmac_priv.h b/drivers/ethernet/eth_sam_gmac_priv.h index 8fd5a2b257b57d..c2a94a1c3f11e0 100644 --- a/drivers/ethernet/eth_sam_gmac_priv.h +++ b/drivers/ethernet/eth_sam_gmac_priv.h @@ -208,7 +208,7 @@ enum queue_idx { #endif /** Minimal ring buffer implementation */ -struct ring_buf { +struct ring_buffer { uint32_t *buf; uint16_t len; uint16_t head; @@ -242,9 +242,9 @@ struct gmac_queue { struct net_buf **rx_frag_list; #if GMAC_MULTIPLE_TX_PACKETS == 1 - struct ring_buf tx_frag_list; + struct ring_buffer tx_frag_list; #if defined(CONFIG_PTP_CLOCK_SAM_GMAC) - struct ring_buf tx_frames; + struct ring_buffer tx_frames; #endif #endif diff --git a/drivers/i2s/i2s_litex.c b/drivers/i2s/i2s_litex.c index 06bc4d5bc1d5a4..4e759e6755fed9 100644 --- a/drivers/i2s/i2s_litex.c +++ b/drivers/i2s/i2s_litex.c @@ -260,7 +260,7 @@ static void i2s_copy_to_fifo(uint8_t *src, size_t size, int sample_width, /* * Get data from the queue */ -static int queue_get(struct ring_buf *rb, void **mem_block, size_t *size) +static int queue_get(struct ring_buffer *rb, void **mem_block, size_t *size) { unsigned int key; @@ -282,7 +282,7 @@ static int queue_get(struct ring_buf *rb, void **mem_block, size_t *size) /* * Put data in the queue */ -static int queue_put(struct ring_buf *rb, void *mem_block, size_t size) +static int queue_put(struct ring_buffer *rb, void *mem_block, size_t size) { uint16_t head_next; unsigned int key; diff --git a/drivers/i2s/i2s_litex.h b/drivers/i2s/i2s_litex.h index c373371cf9a20b..9fcae9f98c5983 100644 --- a/drivers/i2s/i2s_litex.h +++ b/drivers/i2s/i2s_litex.h @@ -70,7 +70,7 @@ struct queue_item { }; /* Minimal ring buffer implementation */ -struct ring_buf { +struct ring_buffer { struct queue_item *buf; uint16_t len; uint16_t head; @@ -81,7 +81,7 @@ struct stream { int32_t state; struct k_sem sem; struct i2s_config cfg; - struct ring_buf mem_block_queue; + struct ring_buffer mem_block_queue; void *mem_block; }; diff --git a/drivers/i2s/i2s_ll_stm32.c b/drivers/i2s/i2s_ll_stm32.c index 24df83639a46d4..072e751ea74df5 100644 --- a/drivers/i2s/i2s_ll_stm32.c +++ b/drivers/i2s/i2s_ll_stm32.c @@ -30,7 +30,7 @@ static unsigned int div_round_closest(uint32_t dividend, uint32_t divisor) return (dividend + (divisor / 2U)) / divisor; } -static bool queue_is_empty(struct ring_buf *rb) +static bool queue_is_empty(struct ring_buffer *rb) { unsigned int key; @@ -50,7 +50,7 @@ static bool queue_is_empty(struct ring_buf *rb) /* * Get data from the queue */ -static int queue_get(struct ring_buf *rb, void **mem_block, size_t *size) +static int queue_get(struct ring_buffer *rb, void **mem_block, size_t *size) { unsigned int key; @@ -73,7 +73,7 @@ static int queue_get(struct ring_buf *rb, void **mem_block, size_t *size) /* * Put data in the queue */ -static int queue_put(struct ring_buf *rb, void *mem_block, size_t size) +static int queue_put(struct ring_buffer *rb, void *mem_block, size_t size) { uint16_t head_next; unsigned int key; diff --git a/drivers/i2s/i2s_ll_stm32.h b/drivers/i2s/i2s_ll_stm32.h index 1f9d0eb0113dc7..5c4d06b68801f0 100644 --- a/drivers/i2s/i2s_ll_stm32.h +++ b/drivers/i2s/i2s_ll_stm32.h @@ -13,7 +13,7 @@ struct queue_item { }; /* Minimal ring buffer implementation */ -struct ring_buf { +struct ring_buffer { struct queue_item *buf; uint16_t len; uint16_t head; @@ -44,7 +44,7 @@ struct stream { bool tx_stop_for_drain; struct i2s_config cfg; - struct ring_buf mem_block_queue; + struct ring_buffer mem_block_queue; void *mem_block; bool last_block; bool master; diff --git a/drivers/i2s/i2s_sam_ssc.c b/drivers/i2s/i2s_sam_ssc.c index f96bb34d6d4216..cabf8ec64c1da3 100644 --- a/drivers/i2s/i2s_sam_ssc.c +++ b/drivers/i2s/i2s_sam_ssc.c @@ -58,7 +58,7 @@ struct queue_item { }; /* Minimal ring buffer implementation */ -struct ring_buf { +struct ring_buffer { struct queue_item *buf; uint16_t len; uint16_t head; @@ -83,7 +83,7 @@ struct stream { uint8_t word_size_bytes; bool last_block; struct i2s_config cfg; - struct ring_buf mem_block_queue; + struct ring_buffer mem_block_queue; void *mem_block; int (*stream_start)(struct stream *, Ssc *const, const struct device *); @@ -113,7 +113,7 @@ static void tx_stream_disable(struct stream *, Ssc *const, /* * Get data from the queue */ -static int queue_get(struct ring_buf *rb, void **mem_block, size_t *size) +static int queue_get(struct ring_buffer *rb, void **mem_block, size_t *size) { unsigned int key; @@ -137,7 +137,7 @@ static int queue_get(struct ring_buf *rb, void **mem_block, size_t *size) /* * Put data in the queue */ -static int queue_put(struct ring_buf *rb, void *mem_block, size_t size) +static int queue_put(struct ring_buffer *rb, void *mem_block, size_t size) { uint16_t head_next; unsigned int key; diff --git a/include/zephyr/kernel.h b/include/zephyr/kernel.h index a7a474081e27ea..d3d0074563a9cc 100644 --- a/include/zephyr/kernel.h +++ b/include/zephyr/kernel.h @@ -22,6 +22,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -4991,6 +4992,18 @@ void k_mbox_data_get(struct k_mbox_msg *rx_msg, void *buffer); * @{ */ +/** + * @brief initialize a pipe + * + * This routine initializes a pipe object, prior to its first use. + * + * @param pipe Address of the pipe. + * @param buffer Address of the pipe's buffer. + * @param buffer_size Size of the pipe's buffer. + */ +__syscall void k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size); + +#ifdef CONFIG_PIPES /** Pipe Structure */ struct k_pipe { unsigned char *buffer; /**< Pipe buffer: may be NULL */ @@ -5061,19 +5074,7 @@ struct k_pipe { Z_PIPE_INITIALIZER(name, _k_pipe_buf_##name, pipe_buffer_size) /** - * @brief Initialize a pipe. - * - * This routine initializes a pipe object, prior to its first use. - * - * @param pipe Address of the pipe. - * @param buffer Address of the pipe's ring buffer, or NULL if no ring buffer - * is used. - * @param size Size of the pipe's ring buffer (in bytes), or zero if no ring - * buffer is used. - */ -void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size); - -/** + * @deprecated Dynamic allocation of pipe buffers will be removed in the new k_pipe API. * @brief Release a pipe's allocated buffer * * If a pipe object was given a dynamically allocated buffer via @@ -5084,9 +5085,10 @@ void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size); * @retval 0 on success * @retval -EAGAIN nothing to cleanup */ -int k_pipe_cleanup(struct k_pipe *pipe); +__deprecated int k_pipe_cleanup(struct k_pipe *pipe); /** + * @deprecated Dynamic allocation of pipe buffers will be removed in the new k_pipe API. * @brief Initialize a pipe and allocate a buffer for it * * Storage for the buffer region will be allocated from the calling thread's @@ -5101,9 +5103,10 @@ int k_pipe_cleanup(struct k_pipe *pipe); * @retval 0 on success * @retval -ENOMEM if memory couldn't be allocated */ -__syscall int k_pipe_alloc_init(struct k_pipe *pipe, size_t size); +__deprecated __syscall int k_pipe_alloc_init(struct k_pipe *pipe, size_t size); /** + * @deprecated k_pipe_put() is replaced by k_pipe_write(...) in the new k_pipe API. * @brief Write data to a pipe. * * This routine writes up to @a bytes_to_write bytes of data to @a pipe. @@ -5121,11 +5124,12 @@ __syscall int k_pipe_alloc_init(struct k_pipe *pipe, size_t size); * @retval -EAGAIN Waiting period timed out; between zero and @a min_xfer * minus one data bytes were written. */ -__syscall int k_pipe_put(struct k_pipe *pipe, const void *data, +__deprecated __syscall int k_pipe_put(struct k_pipe *pipe, const void *data, size_t bytes_to_write, size_t *bytes_written, size_t min_xfer, k_timeout_t timeout); /** + * @deprecated k_pipe_get() is replaced by k_pipe_read(...) in the new k_pipe API. * @brief Read data from a pipe. * * This routine reads up to @a bytes_to_read bytes of data from @a pipe. @@ -5144,11 +5148,12 @@ __syscall int k_pipe_put(struct k_pipe *pipe, const void *data, * @retval -EAGAIN Waiting period timed out; between zero and @a min_xfer * minus one data bytes were read. */ -__syscall int k_pipe_get(struct k_pipe *pipe, void *data, +__deprecated __syscall int k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, size_t *bytes_read, size_t min_xfer, k_timeout_t timeout); /** + * @deprecated k_pipe_read_avail() will be removed in the new k_pipe API. * @brief Query the number of bytes that may be read from @a pipe. * * @param pipe Address of the pipe. @@ -5156,9 +5161,10 @@ __syscall int k_pipe_get(struct k_pipe *pipe, void *data, * @retval a number n such that 0 <= n <= @ref k_pipe.size; the * result is zero for unbuffered pipes. */ -__syscall size_t k_pipe_read_avail(struct k_pipe *pipe); +__deprecated __syscall size_t k_pipe_read_avail(struct k_pipe *pipe); /** + * @deprecated k_pipe_write_avail() will be removed in the new k_pipe API. * @brief Query the number of bytes that may be written to @a pipe * * @param pipe Address of the pipe. @@ -5166,9 +5172,10 @@ __syscall size_t k_pipe_read_avail(struct k_pipe *pipe); * @retval a number n such that 0 <= n <= @ref k_pipe.size; the * result is zero for unbuffered pipes. */ -__syscall size_t k_pipe_write_avail(struct k_pipe *pipe); +__deprecated __syscall size_t k_pipe_write_avail(struct k_pipe *pipe); /** + * @deprecated k_pipe_flush() will be removed in the new k_pipe API. * @brief Flush the pipe of write data * * This routine flushes the pipe. Flushing the pipe is equivalent to reading @@ -5178,9 +5185,10 @@ __syscall size_t k_pipe_write_avail(struct k_pipe *pipe); * * @param pipe Address of the pipe. */ -__syscall void k_pipe_flush(struct k_pipe *pipe); +__deprecated __syscall void k_pipe_flush(struct k_pipe *pipe); /** + * @deprecated k_pipe_buffer_flush will be removed in the new k_pipe API. * @brief Flush the pipe's internal buffer * * This routine flushes the pipe's internal buffer. This is equivalent to @@ -5191,14 +5199,128 @@ __syscall void k_pipe_flush(struct k_pipe *pipe); * * @param pipe Address of the pipe. */ -__syscall void k_pipe_buffer_flush(struct k_pipe *pipe); +__deprecated __syscall void k_pipe_buffer_flush(struct k_pipe *pipe); -/** @} */ +#else /* CONFIG_PIPES */ + +enum pipe_flags { + PIPE_FLAG_OPEN = BIT(0), + PIPE_FLAG_RESET = BIT(1), +}; + +struct k_pipe { + size_t waiting; + struct ring_buf buf; + struct k_spinlock lock; + _wait_q_t data; + _wait_q_t space; + uint8_t flags; + + Z_DECL_POLL_EVENT +#ifdef CONFIG_OBJ_CORE_PIPE + struct k_obj_core obj_core; +#endif + SYS_PORT_TRACING_TRACKING_FIELD(k_pipe) +}; /** * @cond INTERNAL_HIDDEN */ +#define Z_PIPE_INITIALIZER(obj, pipe_buffer, pipe_buffer_size) \ +{ \ + .buf = RING_BUF_INIT(pipe_buffer, pipe_buffer_size), \ + .data = Z_WAIT_Q_INIT(&obj.data), \ + .space = Z_WAIT_Q_INIT(&obj.space), \ + .flags = PIPE_FLAG_OPEN, \ + .waiting = 0, \ + Z_POLL_EVENT_OBJ_INIT(obj) \ +} +/** + * INTERNAL_HIDDEN @endcond + */ +/** + * @brief Statically define and initialize a pipe. + * + * The pipe can be accessed outside the module where it is defined using: + * + * @code extern struct k_pipe ; @endcode + * + * @param name Name of the pipe. + * @param pipe_buffer_size Size of the pipe's ring buffer (in bytes). + * @param pipe_align Alignment of the pipe's ring buffer (power of 2). + * + */ +#define K_PIPE_DEFINE(name, pipe_buffer_size, pipe_align) \ + static unsigned char __noinit __aligned(pipe_align) \ + _k_pipe_buf_##name[pipe_buffer_size]; \ + STRUCT_SECTION_ITERABLE(k_pipe, name) = \ + Z_PIPE_INITIALIZER(name, _k_pipe_buf_##name, pipe_buffer_size) + + +/** + * @brief Write data to a pipe + * + * This routine writes up to @a len bytes of data to @a pipe. + * If the pipe is full, the routine will block until the data can be written or the timeout expires. + * + * @param pipe Address of the pipe. + * @param data Address of data to write. + * @param len Size of data (in bytes). + * @param timeout Waiting period to wait for the data to be written. + * + * @retval number of bytes written on success + * @retval -EAGAIN if no data could be written before the timeout expired + * @retval -ECANCELED if the write was interrupted by k_pipe_reset(..) + * @retval -EPIPE if the pipe was closed + */ +__syscall int k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, + k_timeout_t timeout); + +/** + * @brief Read data from a pipe + * This routine reads up to @a len bytes of data from @a pipe. + * If the pipe is empty, the routine will block until the data can be read or the timeout expires. + * + * @param pipe Address of the pipe. + * @param data Address to place the data read from pipe. + * @param len Requested number of bytes to read. + * @param timeout Waiting period to wait for the data to be read. + * + * @retval number of bytes read on success + * @retval -EAGAIN if no data could be read before the timeout expired + * @retval -ECANCELED if the read was interrupted by k_pipe_reset(..) + * @retval -EPIPE if the pipe was closed + */ +__syscall int k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, + k_timeout_t timeout); + +/** + * @brief Reset a pipe + * This routine resets the pipe, discarding any unread data and unblocking any threads waiting to + * write or read, causing the waiting threads to return with -ECANCELED. Calling k_pipe_read(..) or + * k_pipe_write(..) when the pipe is resetting but not yet reset will return -ECANCELED. + * The pipe is left open after a reset and can be used as normal. + * + * @param pipe Address of the pipe. + */ +__syscall void k_pipe_reset(struct k_pipe *pipe); + +/** + * @brief Close a pipe + * + * This routine closes a pipe. Any threads that were blocked on the pipe + * will be unblocked and receive an error code. + * + * @param pipe Address of the pipe. + */ +__syscall void k_pipe_close(struct k_pipe *pipe); +#endif /* CONFIG_PIPES */ +/** @} */ + +/** + * @cond INTERNAL_HIDDEN + */ struct k_mem_slab_info { uint32_t num_blocks; size_t block_size; @@ -5895,9 +6017,7 @@ struct k_poll_event { struct k_fifo *fifo, *typed_K_POLL_TYPE_FIFO_DATA_AVAILABLE; struct k_queue *queue, *typed_K_POLL_TYPE_DATA_AVAILABLE; struct k_msgq *msgq, *typed_K_POLL_TYPE_MSGQ_DATA_AVAILABLE; -#ifdef CONFIG_PIPES struct k_pipe *pipe, *typed_K_POLL_TYPE_PIPE_DATA_AVAILABLE; -#endif }; }; diff --git a/include/zephyr/mgmt/mcumgr/transport/smp_shell.h b/include/zephyr/mgmt/mcumgr/transport/smp_shell.h index 705ec697ff7013..a7c4831f58f96b 100644 --- a/include/zephyr/mgmt/mcumgr/transport/smp_shell.h +++ b/include/zephyr/mgmt/mcumgr/transport/smp_shell.h @@ -11,6 +11,7 @@ #ifndef ZEPHYR_INCLUDE_MGMT_SMP_SHELL_H_ #define ZEPHYR_INCLUDE_MGMT_SMP_SHELL_H_ +#include #include #ifdef __cplusplus diff --git a/include/zephyr/sys/ring_buffer.h b/include/zephyr/sys/ring_buffer.h index 6bec33e9e3a904..ee63976d784792 100644 --- a/include/zephyr/sys/ring_buffer.h +++ b/include/zephyr/sys/ring_buffer.h @@ -7,7 +7,6 @@ #ifndef ZEPHYR_INCLUDE_SYS_RING_BUFFER_H_ #define ZEPHYR_INCLUDE_SYS_RING_BUFFER_H_ -#include #include #include @@ -62,6 +61,11 @@ static inline void ring_buf_internal_reset(struct ring_buf *buf, int32_t value) buf->get_head = buf->get_tail = buf->get_base = value; } +#define RING_BUF_INIT(buf, size8) \ +{ \ + .buffer = buf, \ + .size = size8, \ +} /** * @brief Define and initialize a ring buffer for byte data. * @@ -80,10 +84,7 @@ static inline void ring_buf_internal_reset(struct ring_buf *buf, int32_t value) BUILD_ASSERT(size8 < RING_BUFFER_MAX_SIZE,\ RING_BUFFER_SIZE_ASSERT_MSG); \ static uint8_t __noinit _ring_buffer_data_##name[size8]; \ - struct ring_buf name = { \ - .buffer = _ring_buffer_data_##name, \ - .size = size8 \ - } + struct ring_buf name = RING_BUF_INIT(_ring_buffer_data_##name, size8) /** * @brief Define and initialize an "item based" ring buffer. diff --git a/include/zephyr/tracing/tracing.h b/include/zephyr/tracing/tracing.h index fd4a0d634edb34..eecdcddc61f86a 100644 --- a/include/zephyr/tracing/tracing.h +++ b/include/zephyr/tracing/tracing.h @@ -1558,8 +1558,80 @@ /** * @brief Trace initialization of Pipe * @param pipe Pipe object + * @param buffer data buffer + * @param size data buffer size */ -#define sys_port_trace_k_pipe_init(pipe) +#define sys_port_trace_k_pipe_init(pipe, buffer, size) + +/** + * @brief Trace Pipe reset entry + * @param pipe Pipe object + */ +#define sys_port_trace_k_pipe_reset_enter(pipe) + +/** + * @brief Trace Pipe reset exit + * @param pipe Pipe object + */ +#define sys_port_trace_k_pipe_reset_exit(pipe) + +/** + * @brief Trace Pipe close entry + * @param pipe Pipe object + */ +#define sys_port_trace_k_pipe_close_enter(pipe) + +/** + * @brief Trace Pipe close exit + * @param pipe Pipe object + */ +#define sys_port_trace_k_pipe_close_exit(pipe) + +/** + * @brief Trace Pipe write attempt entry + * @param pipe Pipe object + * @param data pointer to data + * @param len length of data + * @param timeout Timeout period + */ +#define sys_port_trace_k_pipe_write_enter(pipe, data, len, timeout) + +/** + * @brief Trace Pipe write attempt blocking + * @param pipe Pipe object + * @param timeout Timeout period + */ +#define sys_port_trace_k_pipe_write_blocking(pipe, timeout) + +/** + * @brief Trace Pipe write attempt outcome + * @param pipe Pipe object + * @param ret Return value + */ +#define sys_port_trace_k_pipe_write_exit(pipe, ret) + +/** + * @brief Trace Pipe read attempt entry + * @param pipe Pipe object + * @param data Pointer to data + * @param len Length of data + * @param timeout Timeout period + */ +#define sys_port_trace_k_pipe_read_enter(pipe, data, len, timeout) + +/** + * @brief Trace Pipe read attempt blocking + * @param pipe Pipe object + * @param timeout Timeout period + */ +#define sys_port_trace_k_pipe_read_blocking(pipe, timeout) + +/** + * @brief Trace Pipe read attempt outcome + * @param pipe Pipe object + * @param ret Return value + */ +#define sys_port_trace_k_pipe_read_exit(pipe, ret) /** * @brief Trace Pipe cleanup entry diff --git a/include/zephyr/tracing/tracking.h b/include/zephyr/tracing/tracking.h index 2b2429c83f7c07..d6e24ee46a8f00 100644 --- a/include/zephyr/tracing/tracking.h +++ b/include/zephyr/tracing/tracking.h @@ -73,8 +73,8 @@ extern struct k_event *_track_list_k_event; #define sys_port_track_k_queue_cancel_wait(queue) #define sys_port_track_k_queue_init(queue) \ sys_track_k_queue_init(queue) -#define sys_port_track_k_pipe_init(pipe) \ - sys_track_k_pipe_init(pipe) +#define sys_port_track_k_pipe_init(pipe, buffer, buffer_size) \ + sys_track_k_pipe_init(pipe, buffer, buffer_size) #define sys_port_track_k_condvar_init(condvar, ret) #define sys_port_track_k_stack_init(stack) \ sys_track_k_stack_init(stack) @@ -105,7 +105,7 @@ void sys_track_k_mutex_init(struct k_mutex *mutex); void sys_track_k_stack_init(struct k_stack *stack); void sys_track_k_msgq_init(struct k_msgq *msgq); void sys_track_k_mbox_init(struct k_mbox *mbox); -void sys_track_k_pipe_init(struct k_pipe *pipe); +void sys_track_k_pipe_init(struct k_pipe *pipe, void *buffer, size_t size); void sys_track_k_queue_init(struct k_queue *queue); void sys_track_k_event_init(struct k_event *event); void sys_track_socket_init(int sock, int family, int type, int proto); @@ -132,7 +132,7 @@ void sys_track_socket_init(int sock, int family, int type, int proto); #define sys_port_track_k_queue_peek_head(queue, ret) #define sys_port_track_k_queue_cancel_wait(queue) #define sys_port_track_k_queue_init(queue) -#define sys_port_track_k_pipe_init(pipe) +#define sys_port_track_k_pipe_init(pipe, buffer, buffer_size) #define sys_port_track_k_condvar_init(condvar, ret) #define sys_port_track_k_stack_init(stack) #define sys_port_track_k_thread_name_set(thread, ret) diff --git a/kernel/CMakeLists.txt b/kernel/CMakeLists.txt index e5db39713b6699..8ba95f6c5708cf 100644 --- a/kernel/CMakeLists.txt +++ b/kernel/CMakeLists.txt @@ -84,6 +84,10 @@ list(APPEND kernel_files thread.c sched.c ) +# FIXME: Once the prior pipe implementation is removed, this should be included in the above list +if(NOT CONFIG_PIPES) +list(APPEND kernel_files pipe.c) +endif() # NOT CONFIG_PIPES if(CONFIG_SMP) list(APPEND kernel_files smp.c diff --git a/kernel/Kconfig b/kernel/Kconfig index 36fcf1d821cc4a..eb69c9c3be54af 100644 --- a/kernel/Kconfig +++ b/kernel/Kconfig @@ -12,6 +12,7 @@ source "subsys/logging/Kconfig.template.log_config" config MULTITHREADING bool "Multi-threading" if ARCH_HAS_SINGLE_THREAD_SUPPORT default y + select RING_BUFFER help If disabled, only the main thread is available, so a main() function must be provided. Interrupts are available. Kernel objects will most @@ -713,6 +714,7 @@ config EVENTS config PIPES bool "Pipe objects" + select DEPRECATED help This option enables kernel pipes. A pipe is a kernel object that allows a thread to send a byte stream to another thread. Pipes can @@ -720,6 +722,9 @@ config PIPES Note that setting this option slightly increases the size of the thread structure. + This Kconfig is deprecated and will be removed, by disabling this + kconfig another implementation of k_pipe will be available when + CONFIG_MULTITHREADING is enabled. config KERNEL_MEM_POOL bool "Use Kernel Memory Pool" diff --git a/kernel/Kconfig.obj_core b/kernel/Kconfig.obj_core index 5c9a1418ffeea6..5e846b20ffaa76 100644 --- a/kernel/Kconfig.obj_core +++ b/kernel/Kconfig.obj_core @@ -77,7 +77,7 @@ config OBJ_CORE_SEM config OBJ_CORE_PIPE bool "Integrate pipe into object core framework" - default y if PIPES + default y help When enabled, this option integrates pipes into the object core framework. diff --git a/kernel/pipe.c b/kernel/pipe.c new file mode 100644 index 00000000000000..d707dc1d6d3df8 --- /dev/null +++ b/kernel/pipe.c @@ -0,0 +1,295 @@ +/* + * Copyright (c) 2024 Måns Ansgariusson + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include +#include +#include +#include +#include +#include + +#ifdef CONFIG_OBJ_CORE_PIPE +static struct k_obj_type obj_type_pipe; +#endif /* CONFIG_OBJ_CORE_PIPE */ + +static inline bool pipe_closed(struct k_pipe *pipe) +{ + return (pipe->flags & PIPE_FLAG_OPEN) == 0; +} + +static inline bool pipe_resetting(struct k_pipe *pipe) +{ + return (pipe->flags & PIPE_FLAG_RESET) != 0; +} + +static inline bool pipe_full(struct k_pipe *pipe) +{ + return ring_buf_space_get(&pipe->buf) == 0; +} + +static inline bool pipe_empty(struct k_pipe *pipe) +{ + return ring_buf_is_empty(&pipe->buf); +} + +static inline int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key, + k_timepoint_t time_limit) +{ + k_timeout_t timeout = sys_timepoint_timeout(time_limit); + + if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) { + return -EAGAIN; + } + + pipe->waiting++; + SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout); + z_pend_curr(&pipe->lock, *key, waitq, timeout); + *key = k_spin_lock(&pipe->lock); + pipe->waiting--; + if (unlikely(pipe_closed(pipe))) { + return -EPIPE; + } else if (unlikely(pipe_resetting(pipe))) { + if (pipe->waiting == 0) { + pipe->flags &= ~PIPE_FLAG_RESET; + } + return -ECANCELED; + } else if (sys_timepoint_expired(time_limit)) { + return -EAGAIN; + } + + return 0; +} + +static void notify_waiter(_wait_q_t *waitq) +{ + struct k_thread *thread_to_unblock = z_unpend_first_thread(waitq); + + if (likely(thread_to_unblock != NULL)) { + z_ready_thread(thread_to_unblock); + } +} + +void z_impl_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size) +{ + ring_buf_init(&pipe->buf, buffer_size, buffer); + pipe->flags = PIPE_FLAG_OPEN; + pipe->waiting = 0; + + pipe->lock = (struct k_spinlock){}; + z_waitq_init(&pipe->data); + z_waitq_init(&pipe->space); + k_object_init(pipe); + +#ifdef CONFIG_POLL + sys_dlist_init(&pipe->poll_events); +#endif /* CONFIG_POLL */ +#ifdef CONFIG_OBJ_CORE_PIPE + k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe); +#endif /* CONFIG_OBJ_CORE_PIPE */ + SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, buffer_size); +} + +int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout) +{ + int rc; + size_t written = 0; + k_spinlock_key_t key; + k_timepoint_t end = sys_timepoint_calc(timeout); + + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout); + while (written < len) { + key = k_spin_lock(&pipe->lock); + if (unlikely(pipe_closed(pipe))) { + k_spin_unlock(&pipe->lock, key); + rc = -EPIPE; + goto exit; + } else if (unlikely(pipe_resetting(pipe))) { + k_spin_unlock(&pipe->lock, key); + rc = -ECANCELED; + goto exit; + } else if (pipe_full(pipe)) { + rc = wait_for(&pipe->space, pipe, &key, end); + if (rc == -EAGAIN) { + /* the timeout expired */ + k_spin_unlock(&pipe->lock, key); + rc = written ? written : -EAGAIN; + goto exit; + } else if (unlikely(rc != 0)) { + /* The pipe was closed or reseted while waiting for space */ + k_spin_unlock(&pipe->lock, key); + goto exit; + } else if (unlikely(pipe_full(pipe))) { + /* Timeout has not elapsed, the pipe is open and not resetting, + * we've been notified of available space, but the notified space + * was consumed by another thread before the calling thread. + */ + k_spin_unlock(&pipe->lock, key); + continue; + } + /* The timeout has not elapsed, we've been notified of + * available space, and the pipe is not full. Continue writing. + */ + } + + rc = ring_buf_put(&pipe->buf, &data[written], len - written); + if (likely(rc != 0)) { + notify_waiter(&pipe->data); +#ifdef CONFIG_POLL + z_handle_obj_poll_events(&pipe->poll_events, + K_POLL_STATE_PIPE_DATA_AVAILABLE); +#endif /* CONFIG_POLL */ + } + k_spin_unlock(&pipe->lock, key); + written += rc; + } + rc = written; +exit: + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc); + return rc; +} + +int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout) +{ + int rc; + size_t read = 0; + k_spinlock_key_t key; + k_timepoint_t end = sys_timepoint_calc(timeout); + + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout); + while (read < len) { + key = k_spin_lock(&pipe->lock); + if (unlikely(pipe_resetting(pipe))) { + k_spin_unlock(&pipe->lock, key); + rc = -ECANCELED; + goto exit; + } else if (pipe_empty(pipe) && !pipe_closed(pipe)) { + rc = wait_for(&pipe->data, pipe, &key, end); + if (rc == -EAGAIN) { + /* The timeout elapsed */ + k_spin_unlock(&pipe->lock, key); + rc = read ? read : -EAGAIN; + goto exit; + } else if (unlikely(rc == -ECANCELED)) { + /* The pipe is being rested. */ + k_spin_unlock(&pipe->lock, key); + goto exit; + } else if (unlikely(rc == 0 && pipe_empty(pipe))) { + /* Timeout has not elapsed, we've been notified of available bytes + * but they have been consumed by another thread before the calling + * thread. + */ + k_spin_unlock(&pipe->lock, key); + continue; + } + /* The timeout has not elapsed, we've been notified of + * available bytes, and the pipe is not empty. Continue reading. + */ + } + + if (unlikely(pipe_closed(pipe) && pipe_empty(pipe))) { + k_spin_unlock(&pipe->lock, key); + rc = read ? read : -EPIPE; + goto exit; + } + + rc = ring_buf_get(&pipe->buf, &data[read], len - read); + if (likely(rc != 0)) { + notify_waiter(&pipe->space); + } + read += rc; + k_spin_unlock(&pipe->lock, key); + } + rc = read; +exit: + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc); + return rc; +} + +void z_impl_k_pipe_reset(struct k_pipe *pipe) +{ + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, reset, pipe); + K_SPINLOCK(&pipe->lock) { + ring_buf_reset(&pipe->buf); + if (likely(pipe->waiting != 0)) { + pipe->flags |= PIPE_FLAG_RESET; + z_unpend_all(&pipe->data); + z_unpend_all(&pipe->space); + } + } + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, reset, pipe); +} + +void z_impl_k_pipe_close(struct k_pipe *pipe) +{ + SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, close, pipe); + K_SPINLOCK(&pipe->lock) { + pipe->flags = 0; + z_unpend_all(&pipe->data); + z_unpend_all(&pipe->space); + } + SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, close, pipe); +} + +#ifdef CONFIG_USERSPACE +void z_vrfy_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size) +{ + K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); + K_OOPS(K_SYSCALL_MEMORY_WRITE(buffer, buffer_size)); + + z_impl_k_pipe_init(pipe, buffer, buffer_size); +} +#include + +int z_vrfy_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout) +{ + K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); + K_OOPS(K_SYSCALL_MEMORY_WRITE(data, len)); + + return z_impl_k_pipe_read(pipe, data, len, timeout); +} +#include + +int z_vrfy_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout) +{ + K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); + K_OOPS(K_SYSCALL_MEMORY_READ(data, len)); + + return z_impl_k_pipe_write(pipe, data, len, timeout); +} +#include + +void z_vrfy_k_pipe_reset(struct k_pipe *pipe) +{ + K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); + z_impl_k_pipe_reset(pipe); +} +#include + +void z_vrfy_k_pipe_close(struct k_pipe *pipe) +{ + K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); + z_impl_k_pipe_close(pipe); +} +#include +#endif /* CONFIG_USERSPACE */ + +#ifdef CONFIG_OBJ_CORE_PIPE +static int init_pipe_obj_core_list(void) +{ + /* Initialize pipe object type */ + z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID, + offsetof(struct k_pipe, obj_core)); + + /* Initialize and link statically defined pipes */ + STRUCT_SECTION_FOREACH(k_pipe, pipe) { + k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe); + } + + return 0; +} + +SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1, + CONFIG_KERNEL_INIT_PRIORITY_OBJECTS); +#endif /* CONFIG_OBJ_CORE_PIPE */ diff --git a/kernel/pipes.c b/kernel/pipes.c index a9eef5a4f368cb..59ada53bba2a55 100644 --- a/kernel/pipes.c +++ b/kernel/pipes.c @@ -36,7 +36,7 @@ static struct k_obj_type obj_type_pipe; #endif /* CONFIG_OBJ_CORE_PIPE */ -void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size) +void z_impl_k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size) { pipe->buffer = buffer; pipe->size = size; @@ -46,7 +46,7 @@ void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size) pipe->lock = (struct k_spinlock){}; z_waitq_init(&pipe->wait_q.writers); z_waitq_init(&pipe->wait_q.readers); - SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe); + SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, size); pipe->flags = 0; @@ -87,6 +87,15 @@ int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size) } #ifdef CONFIG_USERSPACE +static inline void z_vrfy_k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size) +{ + K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE)); + K_OOPS(K_SYSCALL_MEMORY_WRITE(buffer, size)); + + z_impl_k_pipe_init(pipe, buffer, size); +} +#include + static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size) { K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE)); diff --git a/kernel/poll.c b/kernel/poll.c index 05e9fe10c3e066..6d04891b3ebb96 100644 --- a/kernel/poll.c +++ b/kernel/poll.c @@ -87,13 +87,9 @@ static inline bool is_condition_met(struct k_poll_event *event, uint32_t *state) return true; } break; -#ifdef CONFIG_PIPES case K_POLL_TYPE_PIPE_DATA_AVAILABLE: - if (k_pipe_read_avail(event->pipe)) { - *state = K_POLL_STATE_PIPE_DATA_AVAILABLE; - return true; - } -#endif /* CONFIG_PIPES */ + *state = K_POLL_STATE_PIPE_DATA_AVAILABLE; + return true; case K_POLL_TYPE_IGNORE: break; default: @@ -154,12 +150,10 @@ static inline void register_event(struct k_poll_event *event, __ASSERT(event->msgq != NULL, "invalid message queue\n"); add_event(&event->msgq->poll_events, event, poller); break; -#ifdef CONFIG_PIPES case K_POLL_TYPE_PIPE_DATA_AVAILABLE: __ASSERT(event->pipe != NULL, "invalid pipe\n"); add_event(&event->pipe->poll_events, event, poller); break; -#endif /* CONFIG_PIPES */ case K_POLL_TYPE_IGNORE: /* nothing to do */ break; @@ -195,12 +189,10 @@ static inline void clear_event_registration(struct k_poll_event *event) __ASSERT(event->msgq != NULL, "invalid message queue\n"); remove_event = true; break; -#ifdef CONFIG_PIPES case K_POLL_TYPE_PIPE_DATA_AVAILABLE: __ASSERT(event->pipe != NULL, "invalid pipe\n"); remove_event = true; break; -#endif /* CONFIG_PIPES */ case K_POLL_TYPE_IGNORE: /* nothing to do */ break; @@ -413,11 +405,9 @@ static inline int z_vrfy_k_poll(struct k_poll_event *events, case K_POLL_TYPE_MSGQ_DATA_AVAILABLE: K_OOPS(K_SYSCALL_OBJ(e->msgq, K_OBJ_MSGQ)); break; -#ifdef CONFIG_PIPES case K_POLL_TYPE_PIPE_DATA_AVAILABLE: K_OOPS(K_SYSCALL_OBJ(e->pipe, K_OBJ_PIPE)); break; -#endif /* CONFIG_PIPES */ default: ret = -EINVAL; goto out_free; diff --git a/subsys/net/lib/sockets/Kconfig b/subsys/net/lib/sockets/Kconfig index 0669846a963c64..3f24cbc233f231 100644 --- a/subsys/net/lib/sockets/Kconfig +++ b/subsys/net/lib/sockets/Kconfig @@ -324,7 +324,6 @@ config NET_SOCKETS_CAN_RECEIVERS config NET_SOCKETPAIR bool "Support for socketpair" - select PIPES help Communicate over a pair of connected, unnamed UNIX domain sockets. diff --git a/subsys/net/lib/sockets/socketpair.c b/subsys/net/lib/sockets/socketpair.c index 8f2e1961e8a615..b89a292e1fd778 100644 --- a/subsys/net/lib/sockets/socketpair.c +++ b/subsys/net/lib/sockets/socketpair.c @@ -47,7 +47,7 @@ __net_socket struct spair { int remote; /**< the remote endpoint file descriptor */ uint32_t flags; /**< status and option bits */ struct k_sem sem; /**< semaphore for exclusive structure access */ - struct k_pipe recv_q; /**< receive queue of local endpoint */ + struct ring_buf recv_q; /** indicates local @a recv_q isn't empty */ struct k_poll_signal readable; /** indicates local @a recv_q isn't full */ @@ -106,7 +106,7 @@ static inline size_t spair_write_avail(struct spair *spair) return 0; } - return k_pipe_write_avail(&remote->recv_q); + return ring_buf_space_get(&remote->recv_q); } /** @@ -117,7 +117,7 @@ static inline size_t spair_write_avail(struct spair *spair) */ static inline size_t spair_read_avail(struct spair *spair) { - return k_pipe_read_avail(&spair->recv_q); + return ring_buf_size_get(&spair->recv_q); } /** Swap two 32-bit integers */ @@ -250,7 +250,7 @@ static struct spair *spair_new(void) spair->flags = SPAIR_FLAGS_DEFAULT; k_sem_init(&spair->sem, 1, 1); - k_pipe_init(&spair->recv_q, spair->buf, sizeof(spair->buf)); + ring_buf_init(&spair->recv_q, sizeof(spair->buf), spair->buf); k_poll_signal_init(&spair->readable); k_poll_signal_init(&spair->writeable); @@ -550,10 +550,7 @@ static ssize_t spair_write(void *obj, const void *buffer, size_t count) } } - res = k_pipe_put(&remote->recv_q, (void *)buffer, count, - &bytes_written, 1, K_NO_WAIT); - __ASSERT(res == 0, "k_pipe_put() failed: %d", res); - + bytes_written = ring_buf_put(&remote->recv_q, (void *)buffer, count); if (spair_write_avail(spair) == 0) { k_poll_signal_reset(&remote->writeable); } @@ -724,10 +721,7 @@ static ssize_t spair_read(void *obj, void *buffer, size_t count) } } - res = k_pipe_get(&spair->recv_q, (void *)buffer, count, &bytes_read, - 1, K_NO_WAIT); - __ASSERT(res == 0, "k_pipe_get() failed: %d", res); - + bytes_read = ring_buf_get(&spair->recv_q, (void *)buffer, count); if (spair_read_avail(spair) == 0 && !sock_is_eof(spair)) { k_poll_signal_reset(&spair->readable); } diff --git a/subsys/tracing/ctf/tracing_ctf.h b/subsys/tracing/ctf/tracing_ctf.h index 30ffb6ff950309..8380e51111686b 100644 --- a/subsys/tracing/ctf/tracing_ctf.h +++ b/subsys/tracing/ctf/tracing_ctf.h @@ -282,7 +282,18 @@ extern "C" { #define sys_port_trace_k_mbox_get_exit(mbox, timeout, ret) #define sys_port_trace_k_mbox_data_get(rx_msg) -#define sys_port_trace_k_pipe_init(pipe) +#define sys_port_trace_k_pipe_init(pipe, buffer, size) +#define sys_port_trace_k_pipe_reset_enter(pipe) +#define sys_port_trace_k_pipe_reset_exit(pipe) +#define sys_port_trace_k_pipe_close_enter(pipe) +#define sys_port_trace_k_pipe_close_exit(pipe) +#define sys_port_trace_k_pipe_write_enter(pipe, data, len, timeout) +#define sys_port_trace_k_pipe_write_blocking(pipe, timeout) +#define sys_port_trace_k_pipe_write_exit(pipe, ret) +#define sys_port_trace_k_pipe_read_enter(pipe, data, len, timeout) +#define sys_port_trace_k_pipe_read_blocking(pipe, timeout) +#define sys_port_trace_k_pipe_read_exit(pipe, ret) + #define sys_port_trace_k_pipe_cleanup_enter(pipe) #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_enter(pipe) diff --git a/subsys/tracing/sysview/tracing_sysview.h b/subsys/tracing/sysview/tracing_sysview.h index 8b669fd22dd626..324db73a81db2b 100644 --- a/subsys/tracing/sysview/tracing_sysview.h +++ b/subsys/tracing/sysview/tracing_sysview.h @@ -532,7 +532,18 @@ void sys_trace_thread_info(struct k_thread *thread); #define sys_port_trace_k_mbox_get_exit(mbox, timeout, ret) #define sys_port_trace_k_mbox_data_get(rx_msg) -#define sys_port_trace_k_pipe_init(pipe) +#define sys_port_trace_k_pipe_init(pipe, buffer, size) +#define sys_port_trace_k_pipe_reset_enter(pipe) +#define sys_port_trace_k_pipe_reset_exit(pipe) +#define sys_port_trace_k_pipe_close_enter(pipe) +#define sys_port_trace_k_pipe_close_exit(pipe) +#define sys_port_trace_k_pipe_write_enter(pipe, data, len, timeout) +#define sys_port_trace_k_pipe_write_blocking(pipe, timeout) +#define sys_port_trace_k_pipe_write_exit(pipe, ret) +#define sys_port_trace_k_pipe_read_enter(pipe, data, len, timeout) +#define sys_port_trace_k_pipe_read_blocking(pipe, timeout) +#define sys_port_trace_k_pipe_read_exit(pipe, ret) + #define sys_port_trace_k_pipe_cleanup_enter(pipe) #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_enter(pipe) diff --git a/subsys/tracing/test/tracing_test.h b/subsys/tracing/test/tracing_test.h index 7dddb50a947043..f2fcf3e4b38ebf 100644 --- a/subsys/tracing/test/tracing_test.h +++ b/subsys/tracing/test/tracing_test.h @@ -342,16 +342,31 @@ sys_trace_k_mbox_get_exit(mbox, rx_msg, buffer, timeout, ret) #define sys_port_trace_k_mbox_data_get(rx_msg) sys_trace_k_mbox_data_get(mbox, rx_msg, buffer) -#define sys_port_trace_k_pipe_init(pipe) sys_trace_k_pipe_init(pipe, buffer, size) +#define sys_port_trace_k_pipe_init(pipe, buffer, size) sys_trace_k_pipe_init(pipe, buffer, size) +#define sys_port_trace_k_pipe_reset_enter(pipe) sys_trace_k_pipe_reset_enter(pipe) +#define sys_port_trace_k_pipe_reset_exit(pipe) sys_trace_k_pipe_reset_exit(pipe) +#define sys_port_trace_k_pipe_close_enter(pipe) sys_trace_k_pipe_close_enter(pipe) +#define sys_port_trace_k_pipe_close_exit(pipe) sys_trace_k_pipe_close_exit(pipe) +#define sys_port_trace_k_pipe_write_enter(pipe, data, len, timeout) \ + sys_trace_k_pipe_write_enter(pipe, data, len, timeout) +#define sys_port_trace_k_pipe_write_blocking(pipe, timeout) \ + sys_trace_k_pipe_write_blocking(pipe, timeout) +#define sys_port_trace_k_pipe_write_exit(pipe, ret) \ + sys_trace_k_pipe_write_exit(pipe, ret) +#define sys_port_trace_k_pipe_read_enter(pipe, data, size, timeout) \ + sys_trace_k_pipe_read_enter(pipe, data, size, timeout) +#define sys_port_trace_k_pipe_read_blocking(pipe, timeout) \ + sys_trace_k_pipe_read_blocking(pipe, timeout) +#define sys_port_trace_k_pipe_read_exit(pipe, ret) \ + sys_trace_k_pipe_read_exit(pipe, ret) + #define sys_port_trace_k_pipe_cleanup_enter(pipe) sys_trace_k_pipe_cleanup_enter(pipe) #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) sys_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_enter(pipe) sys_trace_k_pipe_alloc_init_enter(pipe, size) #define sys_port_trace_k_pipe_alloc_init_exit(pipe, ret) \ sys_trace_k_pipe_alloc_init_exit(pipe, size, ret) -#define sys_port_trace_k_pipe_flush_enter(pipe) \ - sys_trace_k_pipe_flush_enter(pipe) -#define sys_port_trace_k_pipe_flush_exit(pipe) \ - sys_trace_k_pipe_flush_exit(pipe) +#define sys_port_trace_k_pipe_flush_enter(pipe) sys_trace_k_pipe_flush_enter(pipe) +#define sys_port_trace_k_pipe_flush_exit(pipe) sys_trace_k_pipe_flush_exit(pipe) #define sys_port_trace_k_pipe_buffer_flush_enter(pipe) \ sys_trace_k_pipe_buffer_flush_enter(pipe) #define sys_port_trace_k_pipe_buffer_flush_exit(pipe) \ @@ -624,6 +639,19 @@ void sys_trace_k_mbox_get_exit(struct k_mbox *mbox, struct k_mbox_msg *rx_msg, v void sys_trace_k_mbox_data_get(struct k_mbox *mbox, struct k_mbox_msg *rx_msg, void *buffer); void sys_trace_k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size); +void sys_trace_k_pipe_reset_enter(struct k_pipe *pipe); +void sys_trace_k_pipe_reset_exit(struct k_pipe *pipe); +void sys_trace_k_pipe_close_enter(struct k_pipe *pipe); +void sys_trace_k_pipe_close_exit(struct k_pipe *pipe); +void sys_trace_k_pipe_write_enter(struct k_pipe *pipe, const void *data, size_t len, + k_timeout_t timeout); +void sys_trace_k_pipe_write_blocking(struct k_pipe *pipe, k_timeout_t timeout); +void sys_trace_k_pipe_write_exit(struct k_pipe *pipe, int ret); +void sys_trace_k_pipe_read_enter(struct k_pipe *pipe, const void *data, size_t len, + k_timeout_t timeout); +void sys_trace_k_pipe_read_blocking(struct k_pipe *pipe, k_timeout_t timeout); +void sys_trace_k_pipe_read_exit(struct k_pipe *pipe, int ret); + void sys_trace_k_pipe_cleanup_enter(struct k_pipe *pipe); void sys_trace_k_pipe_cleanup_exit(struct k_pipe *pipe, int ret); void sys_trace_k_pipe_alloc_init_enter(struct k_pipe *pipe, size_t size); diff --git a/subsys/tracing/tracing_tracking.c b/subsys/tracing/tracing_tracking.c index ae115a5a0bc2fa..e257a7b76a2061 100644 --- a/subsys/tracing/tracing_tracking.c +++ b/subsys/tracing/tracing_tracking.c @@ -31,10 +31,8 @@ struct k_spinlock _track_list_k_msgq_lock; struct k_mbox *_track_list_k_mbox; struct k_spinlock _track_list_k_mbox_lock; -#ifdef CONFIG_PIPES struct k_pipe *_track_list_k_pipe; struct k_spinlock _track_list_k_pipe_lock; -#endif struct k_queue *_track_list_k_queue; struct k_spinlock _track_list_k_queue_lock; @@ -103,13 +101,14 @@ void sys_track_k_mbox_init(struct k_mbox *mbox) SYS_TRACK_LIST_PREPEND(_track_list_k_mbox, mbox)); } -#ifdef CONFIG_PIPES -void sys_track_k_pipe_init(struct k_pipe *pipe) +void sys_track_k_pipe_init(struct k_pipe *pipe, void *buffer, size_t size) { + ARG_UNUSED(buffer); + ARG_UNUSED(size); + SYS_PORT_TRACING_TYPE_MASK(k_pipe, SYS_TRACK_LIST_PREPEND(_track_list_k_pipe, pipe)); } -#endif void sys_track_k_queue_init(struct k_queue *queue) { @@ -159,10 +158,8 @@ static int sys_track_static_init(void) SYS_PORT_TRACING_TYPE_MASK(k_mbox, SYS_TRACK_STATIC_INIT(k_mbox)); -#ifdef CONFIG_PIPES SYS_PORT_TRACING_TYPE_MASK(k_pipe, - SYS_TRACK_STATIC_INIT(k_pipe)); -#endif + SYS_TRACK_STATIC_INIT(k_pipe, NULL, 0)); SYS_PORT_TRACING_TYPE_MASK(k_queue, SYS_TRACK_STATIC_INIT(k_queue)); diff --git a/subsys/tracing/user/tracing_user.h b/subsys/tracing/user/tracing_user.h index 942103e3bb46d4..f604d6b0423e31 100644 --- a/subsys/tracing/user/tracing_user.h +++ b/subsys/tracing/user/tracing_user.h @@ -322,7 +322,18 @@ void sys_trace_gpio_fire_callback_user(const struct device *port, struct gpio_ca #define sys_port_trace_k_mbox_get_exit(mbox, timeout, ret) #define sys_port_trace_k_mbox_data_get(rx_msg) -#define sys_port_trace_k_pipe_init(pipe) +#define sys_port_trace_k_pipe_init(pipe, buffer, size) +#define sys_port_trace_k_pipe_reset_enter(pipe) +#define sys_port_trace_k_pipe_reset_exit(pipe) +#define sys_port_trace_k_pipe_close_enter(pipe) +#define sys_port_trace_k_pipe_close_exit(pipe) +#define sys_port_trace_k_pipe_write_enter(pipe, data, len, timeout) +#define sys_port_trace_k_pipe_write_blocking(pipe, timeout) +#define sys_port_trace_k_pipe_write_exit(pipe, ret) +#define sys_port_trace_k_pipe_read_enter(pipe, data, len, timeout) +#define sys_port_trace_k_pipe_read_blocking(pipe, timeout) +#define sys_port_trace_k_pipe_read_exit(pipe, ret) + #define sys_port_trace_k_pipe_cleanup_enter(pipe) #define sys_port_trace_k_pipe_cleanup_exit(pipe, ret) #define sys_port_trace_k_pipe_alloc_init_enter(pipe) diff --git a/tests/benchmarks/app_kernel/prj.conf b/tests/benchmarks/app_kernel/prj.conf index 57cd1f1544e65c..ac968ba0fe9dd8 100644 --- a/tests/benchmarks/app_kernel/prj.conf +++ b/tests/benchmarks/app_kernel/prj.conf @@ -16,9 +16,6 @@ CONFIG_CBPRINTF_FP_SUPPORT=y # Can only run under 1 CPU CONFIG_MP_MAX_NUM_CPUS=1 -# Enable pipes -CONFIG_PIPES=y - CONFIG_APPLICATION_DEFINED_SYSCALL=y CONFIG_TIMING_FUNCTIONS=y diff --git a/tests/benchmarks/app_kernel/prj_user.conf b/tests/benchmarks/app_kernel/prj_user.conf index fd222e6fb14409..d64c95ecd8fed7 100644 --- a/tests/benchmarks/app_kernel/prj_user.conf +++ b/tests/benchmarks/app_kernel/prj_user.conf @@ -16,9 +16,6 @@ CONFIG_CBPRINTF_FP_SUPPORT=y # Can only run under 1 CPU CONFIG_MP_MAX_NUM_CPUS=1 -# Enable pipes -CONFIG_PIPES=y - CONFIG_APPLICATION_DEFINED_SYSCALL=y CONFIG_TIMING_FUNCTIONS=y CONFIG_USERSPACE=y diff --git a/tests/benchmarks/app_kernel/src/master.c b/tests/benchmarks/app_kernel/src/master.c index 541be64040ccb3..a7ebee3cc9bfb9 100644 --- a/tests/benchmarks/app_kernel/src/master.c +++ b/tests/benchmarks/app_kernel/src/master.c @@ -23,7 +23,7 @@ BENCH_BMEM char msg[MAX_MSG]; BENCH_BMEM char data_bench[MESSAGE_SIZE]; -BENCH_DMEM struct k_pipe *test_pipes[] = {&PIPE_NOBUFF, &PIPE_SMALLBUFF, &PIPE_BIGBUFF}; +BENCH_DMEM struct k_pipe *test_pipes[] = {&PIPE_SMALLBUFF, &PIPE_BIGBUFF}; BENCH_BMEM char sline[SLINE_LEN + 1]; /* @@ -70,7 +70,6 @@ K_MBOX_DEFINE(MAILB1); K_MUTEX_DEFINE(DEMO_MUTEX); -K_PIPE_DEFINE(PIPE_NOBUFF, 0, 4); K_PIPE_DEFINE(PIPE_SMALLBUFF, 256, 4); K_PIPE_DEFINE(PIPE_BIGBUFF, 4096, 4); @@ -188,7 +187,7 @@ int main(void) k_thread_access_grant(&recv_thread, &DEMOQX1, &DEMOQX4, &DEMOQX192, &MB_COMM, &CH_COMM, &SEM0, &SEM1, &SEM2, &SEM3, &SEM4, &STARTRCV, &DEMO_MUTEX, - &PIPE_NOBUFF, &PIPE_SMALLBUFF, &PIPE_BIGBUFF); + &PIPE_SMALLBUFF, &PIPE_BIGBUFF); k_thread_start(&recv_thread); k_thread_start(&test_thread); @@ -212,7 +211,7 @@ int main(void) k_thread_access_grant(&test_thread, &DEMOQX1, &DEMOQX4, &DEMOQX192, &MB_COMM, &CH_COMM, &SEM0, &SEM1, &SEM2, &SEM3, &SEM4, &STARTRCV, &DEMO_MUTEX, - &PIPE_NOBUFF, &PIPE_SMALLBUFF, &PIPE_BIGBUFF); + &PIPE_SMALLBUFF, &PIPE_BIGBUFF); k_thread_start(&recv_thread); k_thread_start(&test_thread); @@ -236,11 +235,11 @@ int main(void) k_thread_access_grant(&test_thread, &DEMOQX1, &DEMOQX4, &DEMOQX192, &MB_COMM, &CH_COMM, &SEM0, &SEM1, &SEM2, &SEM3, &SEM4, &STARTRCV, &DEMO_MUTEX, - &PIPE_NOBUFF, &PIPE_SMALLBUFF, &PIPE_BIGBUFF); + &PIPE_SMALLBUFF, &PIPE_BIGBUFF); k_thread_access_grant(&recv_thread, &DEMOQX1, &DEMOQX4, &DEMOQX192, &MB_COMM, &CH_COMM, &SEM0, &SEM1, &SEM2, &SEM3, &SEM4, &STARTRCV, &DEMO_MUTEX, - &PIPE_NOBUFF, &PIPE_SMALLBUFF, &PIPE_BIGBUFF); + &PIPE_SMALLBUFF, &PIPE_BIGBUFF); k_thread_start(&recv_thread); k_thread_start(&test_thread); diff --git a/tests/benchmarks/app_kernel/src/pipe_b.c b/tests/benchmarks/app_kernel/src/pipe_b.c index 3589b73806af55..0ec99b3c535c4a 100644 --- a/tests/benchmarks/app_kernel/src/pipe_b.c +++ b/tests/benchmarks/app_kernel/src/pipe_b.c @@ -90,7 +90,7 @@ void pipe_test(void) PRINT_STRING(dashline); for (putsize = 8U; putsize <= MESSAGE_SIZE_PIPE; putsize <<= 1) { - for (pipe = 0; pipe < 3; pipe++) { + for (pipe = 0; pipe < 2; pipe++) { putcount = NR_OF_PIPE_RUNS; pipeput(test_pipes[pipe], _ALL_N, putsize, putcount, &puttime[pipe]); @@ -125,7 +125,7 @@ void pipe_test(void) for (putsize = 8U; putsize <= (MESSAGE_SIZE_PIPE); putsize <<= 1) { putcount = MESSAGE_SIZE_PIPE / putsize; - for (pipe = 0; pipe < 3; pipe++) { + for (pipe = 0; pipe < 2; pipe++) { pipeput(test_pipes[pipe], _1_TO_N, putsize, putcount, &puttime[pipe]); /* size*count == MESSAGE_SIZE_PIPE */ @@ -171,16 +171,10 @@ int pipeput(struct k_pipe *pipe, for (i = 0; option == _1_TO_N || (i < count); i++) { size_t sizexferd = 0; size_t size2xfer = MIN(size, size2xfer_total - sizexferd_total); - int ret; - size_t mim_num_of_bytes = 0; - if (option == _ALL_N) { - mim_num_of_bytes = size2xfer; - } - ret = k_pipe_put(pipe, data_bench, size2xfer, - &sizexferd, mim_num_of_bytes, K_FOREVER); + sizexferd = k_pipe_write(pipe, data_bench, size2xfer, K_FOREVER); - if (ret != 0) { + if (sizexferd < 0) { return 1; } if (option == _ALL_N && sizexferd != size2xfer) { diff --git a/tests/benchmarks/app_kernel/src/pipe_r.c b/tests/benchmarks/app_kernel/src/pipe_r.c index 653e66491fae05..5f6d93e3acd05c 100644 --- a/tests/benchmarks/app_kernel/src/pipe_r.c +++ b/tests/benchmarks/app_kernel/src/pipe_r.c @@ -36,7 +36,7 @@ void piperecvtask(void) /* matching (ALL_N) */ for (getsize = 8; getsize <= MESSAGE_SIZE_PIPE; getsize <<= 1) { - for (pipe = 0; pipe < 3; pipe++) { + for (pipe = 0; pipe < 2; pipe++) { getcount = NR_OF_PIPE_RUNS; pipeget(test_pipes[pipe], _ALL_N, getsize, getcount, &gettime); @@ -52,7 +52,7 @@ void piperecvtask(void) /* non-matching (1_TO_N) */ for (getsize = (MESSAGE_SIZE_PIPE); getsize >= 8; getsize >>= 1) { getcount = MESSAGE_SIZE_PIPE / getsize; - for (pipe = 0; pipe < 3; pipe++) { + for (pipe = 0; pipe < 2; pipe++) { /* size*count == MESSAGE_SIZE_PIPE */ pipeget(test_pipes[pipe], _1_TO_N, getsize, getcount, &gettime); @@ -95,12 +95,9 @@ int pipeget(struct k_pipe *pipe, enum pipe_options option, int size, int count, for (i = 0; option == _1_TO_N || (i < count); i++) { size_t sizexferd = 0; size_t size2xfer = MIN(size, size2xfer_total - sizexferd_total); - int ret; - ret = k_pipe_get(pipe, data_recv, size2xfer, - &sizexferd, option, K_FOREVER); - - if (ret != 0) { + sizexferd = k_pipe_read(pipe, data_recv, size2xfer, K_FOREVER); + if (sizexferd < 0) { return 1; } diff --git a/tests/kernel/mem_protect/mem_protect/prj.conf b/tests/kernel/mem_protect/mem_protect/prj.conf index ffb1d6279ef2b6..8d6da5a325aa87 100644 --- a/tests/kernel/mem_protect/mem_protect/prj.conf +++ b/tests/kernel/mem_protect/mem_protect/prj.conf @@ -4,4 +4,3 @@ CONFIG_ZTEST_STACK_SIZE=2048 CONFIG_MAX_THREAD_BYTES=4 CONFIG_TEST_USERSPACE=y CONFIG_APPLICATION_DEFINED_SYSCALL=y -CONFIG_PIPES=y diff --git a/tests/kernel/mem_protect/userspace/prj.conf b/tests/kernel/mem_protect/userspace/prj.conf index c963d1f7f94b7e..06b36cf8efd26a 100644 --- a/tests/kernel/mem_protect/userspace/prj.conf +++ b/tests/kernel/mem_protect/userspace/prj.conf @@ -3,4 +3,3 @@ CONFIG_ZTEST=y CONFIG_INIT_STACKS=y CONFIG_APPLICATION_DEFINED_SYSCALL=y CONFIG_TEST_USERSPACE=y -CONFIG_PIPES=y diff --git a/tests/kernel/mem_protect/userspace/src/main.c b/tests/kernel/mem_protect/userspace/src/main.c index 08b3932c7adfaf..918e6e7fe61b7c 100644 --- a/tests/kernel/mem_protect/userspace/src/main.c +++ b/tests/kernel/mem_protect/userspace/src/main.c @@ -664,8 +664,6 @@ ZTEST(userspace, test_user_mode_enter) /* Define and initialize pipe. */ K_PIPE_DEFINE(kpipe, PIPE_LEN, BYTES_TO_READ_WRITE); -K_APP_BMEM(default_part) static size_t bytes_written_read; - /** * @brief Test to write to kobject using pipe * @@ -679,8 +677,7 @@ ZTEST_USER(userspace, test_write_kobject_user_pipe) */ set_fault(K_ERR_KERNEL_OOPS); - k_pipe_get(&kpipe, &test_revoke_sem, BYTES_TO_READ_WRITE, - &bytes_written_read, 1, K_NO_WAIT); + k_pipe_read(&kpipe, (uint8_t *)&test_revoke_sem, BYTES_TO_READ_WRITE, K_NO_WAIT); zassert_unreachable("System call memory write validation " "did not fault"); @@ -699,8 +696,7 @@ ZTEST_USER(userspace, test_read_kobject_user_pipe) */ set_fault(K_ERR_KERNEL_OOPS); - k_pipe_put(&kpipe, &test_revoke_sem, BYTES_TO_READ_WRITE, - &bytes_written_read, 1, K_NO_WAIT); + k_pipe_write(&kpipe, (uint8_t *)&test_revoke_sem, BYTES_TO_READ_WRITE, K_NO_WAIT); zassert_unreachable("System call memory read validation " "did not fault"); diff --git a/tests/kernel/mutex/mutex_error_case/prj.conf b/tests/kernel/mutex/mutex_error_case/prj.conf index d144e826538b8c..3fc9377dc69007 100644 --- a/tests/kernel/mutex/mutex_error_case/prj.conf +++ b/tests/kernel/mutex/mutex_error_case/prj.conf @@ -3,4 +3,3 @@ CONFIG_IRQ_OFFLOAD=y CONFIG_TEST_USERSPACE=y CONFIG_MP_MAX_NUM_CPUS=1 CONFIG_ZTEST_FATAL_HOOK=y -CONFIG_PIPES=y diff --git a/tests/kernel/obj_core/obj_core/prj.conf b/tests/kernel/obj_core/obj_core/prj.conf index 6c006bcb8e9750..95b1c1624c4c98 100644 --- a/tests/kernel/obj_core/obj_core/prj.conf +++ b/tests/kernel/obj_core/obj_core/prj.conf @@ -1,5 +1,4 @@ CONFIG_ZTEST=y CONFIG_OBJ_CORE=y CONFIG_EVENTS=y -CONFIG_PIPES=y CONFIG_SYS_MEM_BLOCKS=y diff --git a/tests/kernel/obj_core/obj_core_stats_api/prj.conf b/tests/kernel/obj_core/obj_core_stats_api/prj.conf index d163a31e10510c..1012c240aeb898 100644 --- a/tests/kernel/obj_core/obj_core_stats_api/prj.conf +++ b/tests/kernel/obj_core/obj_core_stats_api/prj.conf @@ -1,6 +1,5 @@ CONFIG_ZTEST=y CONFIG_OBJ_CORE=y CONFIG_OBJ_CORE_STATS=y -CONFIG_PIPES=y CONFIG_SCHED_THREAD_USAGE=y CONFIG_SCHED_THREAD_USAGE_ANALYSIS=y diff --git a/tests/kernel/obj_tracking/prj.conf b/tests/kernel/obj_tracking/prj.conf index 38956d679530a0..04860b00cd2b95 100644 --- a/tests/kernel/obj_tracking/prj.conf +++ b/tests/kernel/obj_tracking/prj.conf @@ -3,5 +3,4 @@ CONFIG_IRQ_OFFLOAD=y CONFIG_TRACING=y CONFIG_TRACING_OBJECT_TRACKING=y CONFIG_TRACING_NONE=y -CONFIG_PIPES=y CONFIG_EVENTS=y diff --git a/tests/kernel/pipe/pipe/CMakeLists.txt b/tests/kernel/pipe/deprecated/pipe/CMakeLists.txt similarity index 100% rename from tests/kernel/pipe/pipe/CMakeLists.txt rename to tests/kernel/pipe/deprecated/pipe/CMakeLists.txt diff --git a/tests/kernel/pipe/pipe/prj.conf b/tests/kernel/pipe/deprecated/pipe/prj.conf similarity index 100% rename from tests/kernel/pipe/pipe/prj.conf rename to tests/kernel/pipe/deprecated/pipe/prj.conf diff --git a/tests/kernel/pipe/pipe/src/main.c b/tests/kernel/pipe/deprecated/pipe/src/main.c similarity index 100% rename from tests/kernel/pipe/pipe/src/main.c rename to tests/kernel/pipe/deprecated/pipe/src/main.c diff --git a/tests/kernel/pipe/pipe/src/test_pipe.c b/tests/kernel/pipe/deprecated/pipe/src/test_pipe.c similarity index 100% rename from tests/kernel/pipe/pipe/src/test_pipe.c rename to tests/kernel/pipe/deprecated/pipe/src/test_pipe.c diff --git a/tests/kernel/pipe/deprecated/pipe/testcase.yaml b/tests/kernel/pipe/deprecated/pipe/testcase.yaml new file mode 100644 index 00000000000000..2b83dfa3a319c5 --- /dev/null +++ b/tests/kernel/pipe/deprecated/pipe/testcase.yaml @@ -0,0 +1,7 @@ +tests: + kernel.deprecated.pipe: + tags: + - kernel + - userspace + ignore_faults: true + extra_args: CMAKE_C_FLAGS="-D__deprecated='' -D__DEPRECATED_MACRO=''" diff --git a/tests/kernel/pipe/deprecated/pipe_api/CMakeLists.txt b/tests/kernel/pipe/deprecated/pipe_api/CMakeLists.txt new file mode 100644 index 00000000000000..fbaf5f15e11807 --- /dev/null +++ b/tests/kernel/pipe/deprecated/pipe_api/CMakeLists.txt @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(pipe_api) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/kernel/pipe/deprecated/pipe_api/prj.conf b/tests/kernel/pipe/deprecated/pipe_api/prj.conf new file mode 100644 index 00000000000000..d080e2fbdbd829 --- /dev/null +++ b/tests/kernel/pipe/deprecated/pipe_api/prj.conf @@ -0,0 +1,7 @@ +CONFIG_ZTEST=y +CONFIG_IRQ_OFFLOAD=y +CONFIG_TEST_USERSPACE=y +CONFIG_DYNAMIC_OBJECTS=y +CONFIG_MP_MAX_NUM_CPUS=1 +CONFIG_ZTEST_FATAL_HOOK=y +CONFIG_PIPES=y diff --git a/tests/kernel/pipe/pipe_api/src/main.c b/tests/kernel/pipe/deprecated/pipe_api/src/main.c similarity index 100% rename from tests/kernel/pipe/pipe_api/src/main.c rename to tests/kernel/pipe/deprecated/pipe_api/src/main.c diff --git a/tests/kernel/pipe/pipe_api/src/test_pipe_avail.c b/tests/kernel/pipe/deprecated/pipe_api/src/test_pipe_avail.c similarity index 100% rename from tests/kernel/pipe/pipe_api/src/test_pipe_avail.c rename to tests/kernel/pipe/deprecated/pipe_api/src/test_pipe_avail.c diff --git a/tests/kernel/pipe/pipe_api/src/test_pipe_contexts.c b/tests/kernel/pipe/deprecated/pipe_api/src/test_pipe_contexts.c similarity index 100% rename from tests/kernel/pipe/pipe_api/src/test_pipe_contexts.c rename to tests/kernel/pipe/deprecated/pipe_api/src/test_pipe_contexts.c diff --git a/tests/kernel/pipe/pipe_api/src/test_pipe_fail.c b/tests/kernel/pipe/deprecated/pipe_api/src/test_pipe_fail.c similarity index 100% rename from tests/kernel/pipe/pipe_api/src/test_pipe_fail.c rename to tests/kernel/pipe/deprecated/pipe_api/src/test_pipe_fail.c diff --git a/tests/kernel/pipe/deprecated/pipe_api/testcase.yaml b/tests/kernel/pipe/deprecated/pipe_api/testcase.yaml new file mode 100644 index 00000000000000..c2eb7078ac2cf7 --- /dev/null +++ b/tests/kernel/pipe/deprecated/pipe_api/testcase.yaml @@ -0,0 +1,6 @@ +tests: + kernel.deprecated.pipe.api: + tags: + - kernel + - userspace + extra_args: CMAKE_C_FLAGS="-D__deprecated='' -D__DEPRECATED_MACRO=''" diff --git a/tests/kernel/pipe/pipe/testcase.yaml b/tests/kernel/pipe/pipe/testcase.yaml deleted file mode 100644 index 9d3382e44873dc..00000000000000 --- a/tests/kernel/pipe/pipe/testcase.yaml +++ /dev/null @@ -1,6 +0,0 @@ -tests: - kernel.pipe: - tags: - - kernel - - userspace - ignore_faults: true diff --git a/tests/kernel/pipe/pipe_api/CMakeLists.txt b/tests/kernel/pipe/pipe_api/CMakeLists.txt index fbaf5f15e11807..7f0e871daba43b 100644 --- a/tests/kernel/pipe/pipe_api/CMakeLists.txt +++ b/tests/kernel/pipe/pipe_api/CMakeLists.txt @@ -1,8 +1,12 @@ -# SPDX-License-Identifier: Apache-2.0 +cmake_minimum_required(VERSION 3.13.1) -cmake_minimum_required(VERSION 3.20.0) find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) -project(pipe_api) -FILE(GLOB app_sources src/*.c) -target_sources(app PRIVATE ${app_sources}) +project(app) + +target_sources(app + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/src/basic.c + ${CMAKE_CURRENT_SOURCE_DIR}/src/stress.c + ${CMAKE_CURRENT_SOURCE_DIR}/src/concurrency.c +) diff --git a/tests/kernel/pipe/pipe_api/prj.conf b/tests/kernel/pipe/pipe_api/prj.conf index d080e2fbdbd829..238fe5eacc5936 100644 --- a/tests/kernel/pipe/pipe_api/prj.conf +++ b/tests/kernel/pipe/pipe_api/prj.conf @@ -1,7 +1,9 @@ CONFIG_ZTEST=y -CONFIG_IRQ_OFFLOAD=y -CONFIG_TEST_USERSPACE=y -CONFIG_DYNAMIC_OBJECTS=y -CONFIG_MP_MAX_NUM_CPUS=1 -CONFIG_ZTEST_FATAL_HOOK=y -CONFIG_PIPES=y +CONFIG_ZTRESS=y + +CONFIG_ZTEST_STACK_SIZE=4096 +CONFIG_HEAP_MEM_POOL_SIZE=2048 +CONFIG_MAIN_STACK_SIZE=4096 + +CONFIG_ENTROPY_GENERATOR=y +CONFIG_TEST_RANDOM_GENERATOR=y diff --git a/tests/kernel/pipe/pipe_api/src/basic.c b/tests/kernel/pipe/pipe_api/src/basic.c new file mode 100644 index 00000000000000..f1eb114c21050c --- /dev/null +++ b/tests/kernel/pipe/pipe_api/src/basic.c @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2024 Måns Ansgariusson + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include +#include +#include +#include + +ZTEST_SUITE(k_pipe_basic, NULL, NULL, NULL, NULL, NULL); + +static void mkrandom(uint8_t *buffer, size_t size) +{ + sys_rand_get(buffer, size); +} + +K_PIPE_DEFINE(test_define, 256, 4); + +ZTEST(k_pipe_basic, test_init) +{ + struct k_pipe pipe; + uint8_t buffer[10]; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(pipe.flags == PIPE_FLAG_OPEN, "Unexpected pipe flags"); +} + +ZTEST(k_pipe_basic, test_write_read_one) +{ + struct k_pipe pipe; + uint8_t buffer[10]; + uint8_t data = 0x55; + uint8_t read_data; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(k_pipe_write(&pipe, &data, 1, K_NO_WAIT) == 1, + "Failed to write to pipe"); + zassert_true(k_pipe_read(&pipe, &read_data, 1, K_NO_WAIT) == 1, + "Failed to read from pipe"); + zassert_true(read_data == data, "Unexpected data received from pipe"); +} + +ZTEST(k_pipe_basic, test_write_read_multiple) +{ + struct k_pipe pipe; + uint8_t buffer[10]; + uint8_t data = 0x55; + uint8_t read_data; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(k_pipe_write(&pipe, &data, 1, K_NO_WAIT) == 1, "Failed to write to pipe"); + zassert_true(k_pipe_write(&pipe, &data, 1, K_NO_WAIT) == 1, "Failed to write to pipe"); + zassert_true(k_pipe_read(&pipe, &read_data, 1, K_NO_WAIT) == 1, "Failed to read from pipe"); + zassert_true(read_data == data, "Unexpected data received from pipe"); + zassert_true(k_pipe_read(&pipe, &read_data, 1, K_NO_WAIT) == 1, "Failed to read from pipe"); + zassert_true(read_data == data, "Unexpected data received from pipe"); +} + +ZTEST(k_pipe_basic, test_write_full) +{ + struct k_pipe pipe; + uint8_t buffer[10]; + uint8_t data[10]; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(k_pipe_write(&pipe, data, sizeof(data), K_NO_WAIT) == 10, + "Failed to write multiple bytes to pipe"); + zassert_true(k_pipe_write(&pipe, data, sizeof(data), K_MSEC(1000)) == -EAGAIN, + "Should not be able to write to full pipe"); +} + +ZTEST(k_pipe_basic, test_read_empty) +{ + struct k_pipe pipe; + uint8_t buffer[10]; + uint8_t read_data; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(k_pipe_read(&pipe, &read_data, 1, K_MSEC(1000)) == -EAGAIN, + "Should not be able to read from empty pipe"); +} + +ZTEST(k_pipe_basic, test_read_write_full) +{ + struct k_pipe pipe; + uint8_t buffer[10]; + uint8_t input[10]; + uint8_t res[10]; + + mkrandom(input, sizeof(input)); + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(k_pipe_write(&pipe, input, sizeof(input), K_NO_WAIT) == sizeof(input), + "Failed to write multiple bytes to pipe"); + zassert_true(k_pipe_read(&pipe, res, sizeof(res), K_NO_WAIT) == sizeof(res), + "Failed to read multiple bytes from pipe"); + zassert_true(memcmp(input, res, sizeof(input)) == 0, + "Unexpected data received from pipe"); +} + +ZTEST(k_pipe_basic, test_read_write_wrapp_around) +{ + struct k_pipe pipe; + uint8_t buffer[12]; + uint8_t input[8]; + uint8_t res[16]; + + mkrandom(input, sizeof(input)); + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(k_pipe_write(&pipe, input, sizeof(input), K_NO_WAIT) == sizeof(input), + "Failed to write bytes to pipe"); + zassert_true(k_pipe_read(&pipe, res, 5, K_NO_WAIT) == 5, + "Failed to read bytes from pipe"); + zassert_true(memcmp(input, res, 5) == 0, "Unexpected data received from pipe"); + + zassert_true(k_pipe_write(&pipe, input, sizeof(input), K_NO_WAIT) == sizeof(input), + "Failed to write bytes to pipe"); + zassert_true(k_pipe_read(&pipe, res, sizeof(input) * 2 - 5, K_NO_WAIT) == + sizeof(input) * 2 - 5, "Failed to read remaining bytes from pipe"); + + zassert_true(memcmp(&input[5], res, sizeof(input) - 5) == 0, + "Unexpected data received from pipe"); + zassert_true(memcmp(input, &res[sizeof(input) - 5], sizeof(input)) == 0, + "Unexpected data received from pipe"); +} + +ZTEST(k_pipe_basic, test_reset) +{ + struct k_pipe pipe; + uint8_t buffer[10]; + uint8_t data = 0x55; + uint8_t read_data; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + + /* reset an empty pipe, & no waiting should not produce any side-effects*/ + k_pipe_reset(&pipe); + zassert_true(k_pipe_write(&pipe, &data, 1, K_NO_WAIT) == 1, + "Failed to write to resetted pipe"); + zassert_true(k_pipe_read(&pipe, &read_data, 1, K_NO_WAIT) == 1, + "Failed to read from resetted pipe"); + zassert_true(read_data == data, "Unexpected data received from pipe"); +} + +ZTEST(k_pipe_basic, test_close) +{ + struct k_pipe pipe; + uint8_t buffer[12]; + uint8_t input[8]; + uint8_t res[16]; + + mkrandom(input, sizeof(input)); + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(k_pipe_write(&pipe, input, sizeof(input), K_NO_WAIT) == sizeof(input), + "Failed to write bytes to pipe"); + k_pipe_close(&pipe); + + zassert_true(k_pipe_write(&pipe, input, sizeof(input), K_NO_WAIT) == -EPIPE, + "should not be able to write to closed pipe"); + zassert_true(k_pipe_read(&pipe, res, 5, K_NO_WAIT) == 5, + "You should be able to read from closed pipe"); + zassert_true(memcmp(input, res, 5) == 0, "Sequence should be equal"); + + zassert_true(k_pipe_read(&pipe, res, 5, K_NO_WAIT) == 3, + "you should be able to read remaining bytes from closed pipe"); + zassert_true(memcmp(&input[5], res, 3) == 0, "Written and read bytes should be equal"); + zassert_true(k_pipe_read(&pipe, res, 5, K_NO_WAIT) == -EPIPE, + "Closed and empty pipe should return -EPIPE"); +} diff --git a/tests/kernel/pipe/pipe_api/src/concurrency.c b/tests/kernel/pipe/pipe_api/src/concurrency.c new file mode 100644 index 00000000000000..964da1baaf25fe --- /dev/null +++ b/tests/kernel/pipe/pipe_api/src/concurrency.c @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2024 Måns Ansgariusson + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include +#include +#include +#include +#include + +LOG_MODULE_REGISTER(k_pipe_concurrency, LOG_LEVEL_DBG); +ZTEST_SUITE(k_pipe_concurrency, NULL, NULL, NULL, NULL, NULL); + +static const int partial_wait_time = 2000; +static const int dummy_data_size = 16; +static struct k_thread thread; +static K_THREAD_STACK_DEFINE(stack, 1024); + +static void thread_close(void *arg1, void *arg2, void *arg3) +{ + k_pipe_close((struct k_pipe *)arg1); +} + +static void thread_reset(void *arg1, void *arg2, void *arg3) +{ + k_pipe_reset((struct k_pipe *)arg1); +} + +static void thread_write(void *arg1, void *arg2, void *arg3) +{ + uint8_t garbage[dummy_data_size]; + + zassert_true(k_pipe_write((struct k_pipe *)arg1, garbage, sizeof(garbage), + K_MSEC(partial_wait_time)) == sizeof(garbage), "Failed to write to pipe"); +} + +static void thread_read(void *arg1, void *arg2, void *arg3) +{ + uint8_t garbage[dummy_data_size]; + + zassert_true(k_pipe_read((struct k_pipe *)arg1, garbage, sizeof(garbage), + K_MSEC(partial_wait_time)) == sizeof(garbage), "Failed to read from pipe"); +} + +ZTEST(k_pipe_concurrency, test_close_on_read) +{ + k_tid_t tid; + struct k_pipe pipe; + uint8_t buffer[dummy_data_size]; + uint8_t res; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack), + thread_close, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_MSEC(100)); + zassert_true(tid, "k_thread_create failed"); + zassert_true(k_pipe_read(&pipe, &res, sizeof(res), K_MSEC(1000)) == -EPIPE, + "Read on closed pipe should return -EPIPE"); + k_thread_join(tid, K_FOREVER); + zassert_true((pipe.flags & PIPE_FLAG_OPEN) == 0, + "Pipe should continue to be closed after all waiters have been released"); +} + +ZTEST(k_pipe_concurrency, test_close_on_write) +{ + k_tid_t tid; + struct k_pipe pipe; + uint8_t buffer[dummy_data_size]; + uint8_t garbage[dummy_data_size]; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(sizeof(garbage) == k_pipe_write(&pipe, garbage, sizeof(garbage), K_MSEC(1000)), + "Failed to write to pipe"); + + tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack), + thread_close, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_MSEC(100)); + zassert_true(tid, "k_thread_create failed"); + zassert_true(k_pipe_write(&pipe, garbage, sizeof(garbage), K_MSEC(1000)) == -EPIPE, + "write should return -EPIPE, when pipe is closed"); + k_thread_join(tid, K_FOREVER); + zassert_true((pipe.flags & PIPE_FLAG_OPEN) == 0, + "pipe should continue to be closed after all waiters have been released"); +} + +ZTEST(k_pipe_concurrency, test_reset_on_read) +{ + k_tid_t tid; + struct k_pipe pipe; + uint8_t buffer[dummy_data_size]; + uint8_t res; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + + tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack), + thread_reset, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_MSEC(100)); + zassert_true(tid, "k_thread_create failed"); + zassert_true(k_pipe_read(&pipe, &res, sizeof(res), K_MSEC(1000)) == -ECANCELED, + "reset on read should return -ECANCELED"); + k_thread_join(tid, K_FOREVER); + zassert_true((pipe.flags & PIPE_FLAG_RESET) == 0, + "pipe should not have reset flag after all waiters are done"); + zassert_true((pipe.flags & PIPE_FLAG_OPEN) != 0, + "pipe should continue to be open after pipe is reseted"); +} + +ZTEST(k_pipe_concurrency, test_reset_on_write) +{ + k_tid_t tid; + struct k_pipe pipe; + uint8_t buffer[dummy_data_size]; + uint8_t garbage[dummy_data_size]; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + zassert_true(sizeof(garbage) == k_pipe_write(&pipe, garbage, sizeof(garbage), K_MSEC(1000)), + "Failed to write to pipe"); + + tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack), + thread_reset, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_MSEC(100)); + zassert_true(tid, "k_thread_create failed"); + zassert_true(k_pipe_write(&pipe, garbage, sizeof(garbage), K_MSEC(1000)) == -ECANCELED, + "reset on write should return -ECANCELED"); + k_thread_join(tid, K_FOREVER); + zassert_true((pipe.flags & PIPE_FLAG_RESET) == 0, + "pipe should not have reset flag after all waiters are done"); + zassert_true((pipe.flags & PIPE_FLAG_OPEN) != 0, + "pipe should continue to be open after pipe is reseted"); +} + +ZTEST(k_pipe_concurrency, test_partial_read) +{ + k_tid_t tid; + struct k_pipe pipe; + uint8_t buffer[dummy_data_size]; + uint8_t garbage[dummy_data_size]; + size_t write_size = sizeof(garbage)/2; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack), + thread_read, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_NO_WAIT); + + zassert_true(k_pipe_write(&pipe, garbage, write_size, K_NO_WAIT) == write_size, + "write to pipe failed"); + k_msleep(partial_wait_time/4); + zassert_true(k_pipe_write(&pipe, garbage, write_size, K_NO_WAIT) == write_size, + "k_k_pipe_write should return number of bytes written"); + k_thread_join(tid, K_FOREVER); +} + +ZTEST(k_pipe_concurrency, test_partial_write) +{ + k_tid_t tid; + struct k_pipe pipe; + uint8_t buffer[dummy_data_size]; + uint8_t garbage[dummy_data_size]; + size_t read_size = sizeof(garbage)/2; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + + zassert_true(k_pipe_write(&pipe, garbage, sizeof(garbage), K_NO_WAIT) == sizeof(garbage), + "Failed to write to pipe"); + tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack), + thread_write, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_NO_WAIT); + + zassert_true(k_pipe_read(&pipe, garbage, read_size, K_NO_WAIT) == read_size, + "Failed to read from pipe"); + k_msleep(partial_wait_time/2); + zassert_true(k_pipe_read(&pipe, garbage, read_size, K_NO_WAIT) == read_size, + "failed t read from pipe"); + k_thread_join(tid, K_FOREVER); +} diff --git a/tests/kernel/pipe/pipe_api/src/stress.c b/tests/kernel/pipe/pipe_api/src/stress.c new file mode 100644 index 00000000000000..3638c68a33acfd --- /dev/null +++ b/tests/kernel/pipe/pipe_api/src/stress.c @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024 Måns Ansgariusson + * + * SPDX-License-Identifier: Apache-2.0 + */ +#include +#include +#include +#include +#include +#include +#include + +LOG_MODULE_REGISTER(k_k_pipe_stress, LOG_LEVEL_INF); + +ZTEST_SUITE(k_pipe_stress, NULL, NULL, NULL, NULL, NULL); + +ZTEST(k_pipe_stress, test_write) +{ + int rc; + struct k_pipe pipe; + size_t len = 512; + uint8_t buffer[len]; + uint8_t buf[len]; + size_t sent; + uint32_t start_cycles, end_cycles; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + start_cycles = k_uptime_get_32(); + sent = 0; + while (sent < len) { + rc = k_pipe_write(&pipe, &buf[sent], len - sent, K_FOREVER); + zassert_true(rc > 0, "Failed to write to pipe"); + sent += rc; + } + end_cycles = k_uptime_get_32(); + LOG_INF("Elapsed cycles: %u\n", end_cycles - start_cycles); +} + +ZTEST(k_pipe_stress, test_read) +{ + int rc; + struct k_pipe pipe; + size_t len = 512; + uint8_t buffer[len]; + uint8_t buf[len]; + size_t sent, read; + uint32_t start_cycles, end_cycles; + + k_pipe_init(&pipe, buffer, sizeof(buffer)); + start_cycles = k_uptime_get_32(); + for (int i = 0; i < 100; i++) { + sent = 0; + while (sent < len) { + rc = k_pipe_write(&pipe, &buf[sent], len - sent, K_FOREVER); + zassert_true(rc > 0, "Failed to write to pipe"); + sent += rc; + } + read = 0; + while (read < len) { + rc = k_pipe_read(&pipe, &buf[read], len - read, K_FOREVER); + zassert_true(rc > 0, "Failed to read from pipe"); + read += rc; + } + } + end_cycles = k_uptime_get_32(); + LOG_INF("Elapsed cycles: %u\n", end_cycles - start_cycles); +} diff --git a/tests/kernel/semaphore/semaphore/prj.conf b/tests/kernel/semaphore/semaphore/prj.conf index e193f5f626dd2a..1ae0b78bdbff63 100644 --- a/tests/kernel/semaphore/semaphore/prj.conf +++ b/tests/kernel/semaphore/semaphore/prj.conf @@ -2,5 +2,4 @@ CONFIG_ZTEST=y CONFIG_IRQ_OFFLOAD=y CONFIG_TEST_USERSPACE=y CONFIG_ZTEST_FATAL_HOOK=y -CONFIG_PIPES=y CONFIG_MAX_THREAD_BYTES=3 diff --git a/tests/kernel/semaphore/semaphore/src/main.c b/tests/kernel/semaphore/semaphore/src/main.c index 97816a3a47af79..51007c3a306aba 100644 --- a/tests/kernel/semaphore/semaphore/src/main.c +++ b/tests/kernel/semaphore/semaphore/src/main.c @@ -980,7 +980,6 @@ void sem_multiple_take_and_timeouts_helper(void *p1, void *p2, void *p3) { int timeout = POINTER_TO_INT(p1); int64_t start_ticks, end_ticks, diff_ticks; - size_t bytes_written; start_ticks = k_uptime_get(); @@ -994,8 +993,7 @@ void sem_multiple_take_and_timeouts_helper(void *p1, void *p2, void *p3) "time mismatch - expected at least %d, got %lld", timeout, diff_ticks); - k_pipe_put(&timeout_info_pipe, &timeout, sizeof(int), - &bytes_written, sizeof(int), K_FOREVER); + k_pipe_write(&timeout_info_pipe, (uint8_t *)&timeout, sizeof(int), K_FOREVER); } @@ -1011,10 +1009,9 @@ ZTEST(semaphore_1cpu, test_sem_multiple_take_and_timeouts) } static uint32_t timeout; - size_t bytes_read; k_sem_reset(&simple_sem); - k_pipe_flush(&timeout_info_pipe); + k_pipe_reset(&timeout_info_pipe); /* Multiple threads timeout and the sequence in which it times out * is pushed into a pipe and checked later on. @@ -1028,8 +1025,7 @@ ZTEST(semaphore_1cpu, test_sem_multiple_take_and_timeouts) } for (int i = 0; i < TOTAL_THREADS_WAITING; i++) { - k_pipe_get(&timeout_info_pipe, &timeout, sizeof(int), - &bytes_read, sizeof(int), K_FOREVER); + k_pipe_read(&timeout_info_pipe, (uint8_t *)&timeout, sizeof(int), K_FOREVER); zassert_equal(timeout, QSEC2MS(i + 1), "timeout did not occur properly: %d != %d", timeout, QSEC2MS(i + 1)); @@ -1043,10 +1039,10 @@ ZTEST(semaphore_1cpu, test_sem_multiple_take_and_timeouts) void sem_multi_take_timeout_diff_sem_helper(void *p1, void *p2, void *p3) { + int rc; int timeout = POINTER_TO_INT(p1); struct k_sem *sema = p2; int64_t start_ticks, end_ticks, diff_ticks; - size_t bytes_written; struct timeout_info info = { .timeout = timeout, .sema = sema @@ -1064,8 +1060,10 @@ void sem_multi_take_timeout_diff_sem_helper(void *p1, void *p2, void *p3) "time mismatch - expected at least %d, got %lld", timeout, diff_ticks); - k_pipe_put(&timeout_info_pipe, &info, sizeof(struct timeout_info), - &bytes_written, sizeof(struct timeout_info), K_FOREVER); + rc = k_pipe_write(&timeout_info_pipe, (uint8_t *)&info, sizeof(struct timeout_info), + K_FOREVER); + zassert_true(rc == sizeof(struct timeout_info), + "k_pipe_write failed: %d", rc); } /** @@ -1075,11 +1073,11 @@ void sem_multi_take_timeout_diff_sem_helper(void *p1, void *p2, void *p3) */ ZTEST(semaphore, test_sem_multi_take_timeout_diff_sem) { + int rc; if (IS_ENABLED(CONFIG_KERNEL_COHERENCE)) { ztest_test_skip(); } - size_t bytes_read; struct timeout_info seq_info[] = { { SEC2MS(2), &simple_sem }, { SEC2MS(1), &multiple_thread_sem }, @@ -1092,7 +1090,7 @@ ZTEST(semaphore, test_sem_multi_take_timeout_diff_sem) k_sem_reset(&simple_sem); k_sem_reset(&multiple_thread_sem); - k_pipe_flush(&timeout_info_pipe); + k_pipe_reset(&timeout_info_pipe); memset(&retrieved_info, 0, sizeof(struct timeout_info)); /* Multiple threads timeout on different semaphores and the sequence @@ -1108,13 +1106,10 @@ ZTEST(semaphore, test_sem_multi_take_timeout_diff_sem) } for (int i = 0; i < TOTAL_THREADS_WAITING; i++) { - k_pipe_get(&timeout_info_pipe, - &retrieved_info, - sizeof(struct timeout_info), - &bytes_read, - sizeof(struct timeout_info), - K_FOREVER); - + rc = k_pipe_read(&timeout_info_pipe, (uint8_t *)&retrieved_info, + sizeof(struct timeout_info), K_FOREVER); + zassert_true(rc == sizeof(struct timeout_info), + "k_pipe_read failed: %d", rc); zassert_true(retrieved_info.timeout == SEC2MS(i + 1), "timeout did not occur properly"); diff --git a/west.yml b/west.yml index 2a04f40cb8c7cf..7aa6f6db2a7c93 100644 --- a/west.yml +++ b/west.yml @@ -328,7 +328,7 @@ manifest: path: modules/lib/openthread - name: percepio path: modules/debug/percepio - revision: 0d44033c744978ca2505a06640b4f6964c5411e6 + revision: 388c2937db94f76ecba90b3e24542b03b88f1837 groups: - debug - name: picolibc