Skip to content

Commit

Permalink
kernel/pipe: add missing calls to z_reschedule()
Browse files Browse the repository at this point in the history
We are waking up threads but failed to let them run if they are
higher priority. Add missing calls to z_reschedule().

Also wake up all pending writers as we don't know how many there might
be. It is more efficient to wake them all when the ring buffer is full
before reading from it rather than waking them one by one whenever there is
more room in it.

Thanks to Peter Mitsis for noticing those issues.

Signed-off-by: Nicolas Pitre <[email protected]>
  • Loading branch information
Nicolas Pitre authored and kartben committed Jan 21, 2025
1 parent 99c2057 commit 3da90f9
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions kernel/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ static inline bool pipe_empty(struct k_pipe *pipe)
}

static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key,
k_timepoint_t time_limit)
k_timepoint_t time_limit, bool *need_resched)
{
k_timeout_t timeout = sys_timepoint_timeout(time_limit);
int rc;
Expand All @@ -45,6 +45,7 @@ static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key
}

pipe->waiting++;
*need_resched = false;
SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout);
rc = z_pend_curr(&pipe->lock, *key, waitq, timeout);
*key = k_spin_lock(&pipe->lock);
Expand Down Expand Up @@ -85,7 +86,7 @@ struct pipe_buf_spec {
size_t used;
};

static size_t copy_to_pending_readers(struct k_pipe *pipe,
static size_t copy_to_pending_readers(struct k_pipe *pipe, bool *need_resched,
const uint8_t *data, size_t len)
{
struct k_thread *reader;
Expand Down Expand Up @@ -132,6 +133,7 @@ static size_t copy_to_pending_readers(struct k_pipe *pipe,
/* rest of thread wake-up outside the scheduler lock */
z_thread_return_value_set_with_data(reader, 0, NULL);
z_ready_thread(reader);
*need_resched = true;
}
} while (reader != NULL && written < len);

Expand All @@ -144,6 +146,7 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_
size_t written = 0;
k_timepoint_t end = sys_timepoint_calc(timeout);
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
bool need_resched = false;

SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout);

Expand All @@ -160,7 +163,8 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_

if (pipe_empty(pipe)) {
if (pipe->waiting != 0) {
written += copy_to_pending_readers(pipe, &data[written],
written += copy_to_pending_readers(pipe, &need_resched,
&data[written],
len - written);
if (written >= len) {
rc = written;
Expand All @@ -179,7 +183,7 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_
break;
}

rc = wait_for(&pipe->space, pipe, &key, end);
rc = wait_for(&pipe->space, pipe, &key, end, &need_resched);
if (rc != 0) {
if (rc == -EAGAIN) {
rc = written ? written : -EAGAIN;
Expand All @@ -189,7 +193,11 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_
}
exit:
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc);
k_spin_unlock(&pipe->lock, key);
if (need_resched) {
z_reschedule(&pipe->lock, key);
} else {
k_spin_unlock(&pipe->lock, key);
}
return rc;
}

Expand All @@ -199,6 +207,7 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout
int rc;
k_timepoint_t end = sys_timepoint_calc(timeout);
k_spinlock_key_t key = k_spin_lock(&pipe->lock);
bool need_resched = false;

SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout);

Expand All @@ -209,7 +218,8 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout

for (;;) {
if (pipe_full(pipe)) {
z_sched_wake(&pipe->space, 0, NULL);
/* One or more pending writers may exist. */
need_resched = z_sched_wake_all(&pipe->space, 0, NULL);
}

buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used);
Expand All @@ -226,7 +236,7 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout
/* provide our "direct copy" info to potential writers */
_current->base.swap_data = &buf;

rc = wait_for(&pipe->data, pipe, &key, end);
rc = wait_for(&pipe->data, pipe, &key, end, &need_resched);
if (rc != 0) {
if (rc == -EAGAIN) {
rc = buf.used ? buf.used : -EAGAIN;
Expand All @@ -236,7 +246,11 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout
}
exit:
SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc);
k_spin_unlock(&pipe->lock, key);
if (need_resched) {
z_reschedule(&pipe->lock, key);
} else {
k_spin_unlock(&pipe->lock, key);
}
return rc;
}

Expand Down

0 comments on commit 3da90f9

Please sign in to comment.