Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kernel: k_pipe: Rewrite k_pipe API #83283

Merged
merged 11 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 140 additions & 171 deletions doc/kernel/services/data_passing/pipes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Pipes
#####

A :dfn:`pipe` is a kernel object that allows a thread to send a byte stream
to another thread. Pipes can be used to synchronously transfer chunks of data
in whole or in part.
to another thread. Pipes enable efficient inter-thread communication and can
be used to synchronously transfer chunks of data in whole or in part.

.. contents::
:local:
Expand All @@ -14,224 +14,193 @@ in whole or in part.
Concepts
********

The pipe can be configured with a ring buffer which holds data that has been
sent but not yet received; alternatively, the pipe may have no ring buffer.

Any number of pipes can be defined (limited only by available RAM). Each pipe is
referenced by its memory address.
Any number of pipes can be defined, limited only by available RAM. Each pipe
is referenced by its memory address.

A pipe has the following key property:

* A **size** that indicates the size of the pipe's ring buffer. Note that a
size of zero defines a pipe with no ring buffer.

A pipe must be initialized before it can be used. The pipe is initially empty.

Data is synchronously **sent** either in whole or in part to a pipe by a
thread. If the specified minimum number of bytes can not be immediately
satisfied, then the operation will either fail immediately or attempt to send
as many bytes as possible and then pend in the hope that the send can be
completed later. Accepted data is either copied to the pipe's ring buffer
or directly to the waiting reader(s).

Data is synchronously **received** from a pipe by a thread. If the specified
minimum number of bytes can not be immediately satisfied, then the operation
will either fail immediately or attempt to receive as many bytes as possible
and then pend in the hope that the receive can be completed later. Accepted
data is either copied from the pipe's ring buffer or directly from the
waiting sender(s).

Data may also be **flushed** from a pipe by a thread. Flushing can be performed
either on the entire pipe or on only its ring buffer. Flushing the entire pipe
is equivalent to reading all the information in the ring buffer **and** waiting
to be written into a giant temporary buffer which is then discarded. Flushing
the ring buffer is equivalent to reading **only** the data in the ring buffer
into a temporary buffer which is then discarded. Flushing the ring buffer does
not guarantee that the ring buffer will stay empty; flushing it may allow a
pended writer to fill the ring buffer.

.. note::
Flushing does not in practice allocate or use additional buffers.

.. note::
The kernel does allow for an ISR to flush a pipe from an ISR. It also
allows it to send/receive data to/from one provided it does not attempt
to wait for space/data.
* A **size** that indicates the capacity of the pipe's ring buffer.

A pipe must be initialized before it can be used. When initialized, the pipe
is empty.

Threads interact with the pipe as follows:

- **Writing**: Data is synchronously written, either in whole or in part, to
a pipe by a thread. If the pipe's ring buffer is full, the operation blocks
until sufficient space becomes available or the specified timeout expires.

- **Reading**: Data is synchronously read, either in whole or in part, from a
pipe by a thread. If the pipe's ring buffer is empty, the operation blocks
until data becomes available or the specified timeout expires.

- **Resetting**: A thread can reset a pipe, which resets its internal state and
ends all pending read and write operations with an error code.

Pipes are well-suited for scenarios like producer-consumer patterns or
streaming data between threads.

Implementation
**************

A pipe is defined using a variable of type :c:struct:`k_pipe` and an
optional character buffer of type ``unsigned char``. It must then be
initialized by calling :c:func:`k_pipe_init`.

The following code defines and initializes an empty pipe that has a ring
buffer capable of holding 100 bytes and is aligned to a 4-byte boundary.
A pipe is defined using a variable of type :c:struct:`k_pipe` and a
byte buffer. The pipe must then be initialized by calling :c:func:`k_pipe_init`.

The following code defines and initializes an empty pipe with a ring buffer
capable of holding 100 bytes, aligned to a 4-byte boundary:

.. code-block:: c

unsigned char __aligned(4) my_ring_buffer[100];
uint8_t __aligned(4) my_ring_buffer[100];
struct k_pipe my_pipe;

k_pipe_init(&my_pipe, my_ring_buffer, sizeof(my_ring_buffer));

Alternatively, a pipe can be defined and initialized at compile time by
calling :c:macro:`K_PIPE_DEFINE`.

The following code has the same effect as the code segment above. Observe
that macro defines both the pipe and its ring buffer.
Alternatively, a pipe can be defined and initialized at compile time using
the :c:macro:`K_PIPE_DEFINE` macro, which defines both the pipe and its
ring buffer:

.. code-block:: c

K_PIPE_DEFINE(my_pipe, 100, 4);

This has the same effect as the code above.

Writing to a Pipe
=================

Data is added to a pipe by calling :c:func:`k_pipe_put`.
Data is added to a pipe by calling :c:func:`k_pipe_write`.

The following code builds on the example above, and uses the pipe to pass
data from a producing thread to one or more consuming threads. If the pipe's
ring buffer fills up because the consumers can't keep up, the producing thread
waits for a specified amount of time.
The following example demonstrates using a pipe to send data from a producer
thread to one or more consumer threads. If the pipe's ring buffer fills up,
the producer thread waits for a specified amount of time.

.. code-block:: c

struct message_header {
...
};

void producer_thread(void)
{
unsigned char *data;
size_t total_size;
size_t bytes_written;
int rc;
...

while (1) {
/* Craft message to send in the pipe */
data = ...;
total_size = ...;

/* send data to the consumers */
rc = k_pipe_put(&my_pipe, data, total_size, &bytes_written,
sizeof(struct message_header), K_NO_WAIT);

if (rc < 0) {
/* Incomplete message header sent */
...
} else if (bytes_written < total_size) {
/* Some of the data was sent */
...
} else {
/* All data sent */
...
}
}
}
struct message_header {
size_t num_data_bytes; /* Example field */
...
};

void producer_thread(void)
{
int rc;
uint8_t *data;
size_t total_size;
size_t bytes_written;

while (1) {
/* Craft message to send in the pipe */
make_message(data, &total_size);
bytes_written = 0;

/* Write data to the pipe, handling partial writes */
while (bytes_written < total_size) {
rc = k_pipe_write(&my_pipe, &data[bytes_written], total_size - bytes_written, K_NO_WAIT);

if (rc < 0) {
/* Error occurred */
...
break;
} else {
/* Partial or full write succeeded; adjust for next iteration */
bytes_written += rc;
}
}

/* Reset bytes_written for the next message */
bytes_written = 0;
...
}
}

Reading from a Pipe
===================

Data is read from the pipe by calling :c:func:`k_pipe_get`.

The following code builds on the example above, and uses the pipe to
process data items generated by one or more producing threads.

.. code-block:: c

void consumer_thread(void)
{
unsigned char buffer[120];
size_t bytes_read;
struct message_header *header = (struct message_header *)buffer;

while (1) {
rc = k_pipe_get(&my_pipe, buffer, sizeof(buffer), &bytes_read,
sizeof(*header), K_MSEC(100));

if ((rc < 0) || (bytes_read < sizeof (*header))) {
/* Incomplete message header received */
...
} else if (header->num_data_bytes + sizeof(*header) > bytes_read) {
/* Only some data was received */
...
} else {
/* All data was received */
...
}
}
}

Use a pipe to send streams of data between threads.

.. note::
A pipe can be used to transfer long streams of data if desired. However
it is often preferable to send pointers to large data items to avoid
copying the data.

Flushing a Pipe's Buffer
========================
Data is retrieved from the pipe by calling :c:func:`k_pipe_read`.

Data is flushed from the pipe's ring buffer by calling
:c:func:`k_pipe_buffer_flush`.

The following code builds on the examples above, and flushes the pipe's
buffer.
The following example builds on the producer thread example above. It shows
a consumer thread that processes data generated by the producer.

.. code-block:: c

void monitor_thread(void)
{
while (1) {
...
/* Pipe buffer contains stale data. Flush it. */
k_pipe_buffer_flush(&my_pipe);
...
}
}

Flushing a Pipe
===============

All data in the pipe is flushed by calling :c:func:`k_pipe_flush`.

The following code builds on the examples above, and flushes all the
data in the pipe.
struct message_header {
size_t num_data_bytes; /* Example field */
...
};

void consumer_thread(void)
{
int rc;
uint8_t buffer[128];
size_t bytes_read = 0;
struct message_header *header = (struct message_header *)buffer;

while (1) {
/* Step 1: Read the message header */
bytes_read = 0;
read_header:
while (bytes_read < sizeof(*header)) {
rc = k_pipe_read(&my_pipe, &buffer[bytes_read], sizeof(*header) - bytes_read, &bytes_read, K_NO_WAIT);

if (rc < 0) {
/* Error occurred */
...
goto read_header;
}

/* Adjust for partial reads */
bytes_read += rc;
}

/* Step 2: Read the message body */
bytes_read = 0;
while (bytes_read < header->num_data_bytes) {
rc = k_pipe_read(&my_pipe, &buffer[sizeof(*header) + bytes_read], header->num_data_bytes - bytes_read, K_NO_WAIT);

if (rc < 0) {
/* Error occurred */
...
goto read_header;
}

/* Adjust for partial reads */
bytes_read += rc;
}
/* Successfully received the complete message */
}
}

Resetting a Pipe
================

The pipe can be reset by calling :c:func:`k_pipe_reset`. Resetting a pipe
resets its internal state and ends all pending operations with an error code.

The following example demonstrates resetting a pipe in response to a critical
error:

.. code-block:: c

void monitor_thread(void)
{
while (1) {
...
/* Critical error detected. Flush the entire pipe to reset it. */
k_pipe_flush(&my_pipe);
/* Critical error detected: reset the entire pipe to reset it. */
k_pipe_reset(&my_pipe);
...
}
}


Suggested uses
Suggested Uses
**************

Use a pipe to send streams of data between threads.

.. note::
A pipe can be used to transfer long streams of data if desired. However it
is often preferable to send pointers to large data items to avoid copying
the data. Copying large data items will negatively impact interrupt latency
as a spinlock is held while copying that data.


Configuration Options
*********************

Related configuration options:
Pipes are useful for sending streams of data between threads. Typical
applications include:

* :kconfig:option:`CONFIG_PIPES`
- Implementing producer-consumer patterns.
- Streaming logs or packets between threads.
- Handling variable-length message passing in real-time systems.

API Reference
*************
Expand Down
Loading
Loading