From 3da90f9d49c78475205b917751a78717737938bc Mon Sep 17 00:00:00 2001 From: Nicolas Pitre Date: Fri, 17 Jan 2025 17:47:06 -0500 Subject: [PATCH] kernel/pipe: add missing calls to z_reschedule() We are waking up threads but failed to let them run if they are higher priority. Add missing calls to z_reschedule(). Also wake up all pending writers as we don't know how many there might be. It is more efficient to wake them all when the ring buffer is full before reading from it rather than waking them one by one whenever there is more room in it. Thanks to Peter Mitsis for noticing those issues. Signed-off-by: Nicolas Pitre --- kernel/pipe.c | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/kernel/pipe.c b/kernel/pipe.c index c4e19e1a4506..9adfd270368e 100644 --- a/kernel/pipe.c +++ b/kernel/pipe.c @@ -35,7 +35,7 @@ static inline bool pipe_empty(struct k_pipe *pipe) } static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key, - k_timepoint_t time_limit) + k_timepoint_t time_limit, bool *need_resched) { k_timeout_t timeout = sys_timepoint_timeout(time_limit); int rc; @@ -45,6 +45,7 @@ static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key } pipe->waiting++; + *need_resched = false; SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout); rc = z_pend_curr(&pipe->lock, *key, waitq, timeout); *key = k_spin_lock(&pipe->lock); @@ -85,7 +86,7 @@ struct pipe_buf_spec { size_t used; }; -static size_t copy_to_pending_readers(struct k_pipe *pipe, +static size_t copy_to_pending_readers(struct k_pipe *pipe, bool *need_resched, const uint8_t *data, size_t len) { struct k_thread *reader; @@ -132,6 +133,7 @@ static size_t copy_to_pending_readers(struct k_pipe *pipe, /* rest of thread wake-up outside the scheduler lock */ z_thread_return_value_set_with_data(reader, 0, NULL); z_ready_thread(reader); + *need_resched = true; } } while (reader != NULL && written < len); @@ -144,6 +146,7 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_ size_t written = 0; k_timepoint_t end = sys_timepoint_calc(timeout); k_spinlock_key_t key = k_spin_lock(&pipe->lock); + bool need_resched = false; SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout); @@ -160,7 +163,8 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_ if (pipe_empty(pipe)) { if (pipe->waiting != 0) { - written += copy_to_pending_readers(pipe, &data[written], + written += copy_to_pending_readers(pipe, &need_resched, + &data[written], len - written); if (written >= len) { rc = written; @@ -179,7 +183,7 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_ break; } - rc = wait_for(&pipe->space, pipe, &key, end); + rc = wait_for(&pipe->space, pipe, &key, end, &need_resched); if (rc != 0) { if (rc == -EAGAIN) { rc = written ? written : -EAGAIN; @@ -189,7 +193,11 @@ int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_ } exit: SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc); - k_spin_unlock(&pipe->lock, key); + if (need_resched) { + z_reschedule(&pipe->lock, key); + } else { + k_spin_unlock(&pipe->lock, key); + } return rc; } @@ -199,6 +207,7 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout int rc; k_timepoint_t end = sys_timepoint_calc(timeout); k_spinlock_key_t key = k_spin_lock(&pipe->lock); + bool need_resched = false; SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout); @@ -209,7 +218,8 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout for (;;) { if (pipe_full(pipe)) { - z_sched_wake(&pipe->space, 0, NULL); + /* One or more pending writers may exist. */ + need_resched = z_sched_wake_all(&pipe->space, 0, NULL); } buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used); @@ -226,7 +236,7 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout /* provide our "direct copy" info to potential writers */ _current->base.swap_data = &buf; - rc = wait_for(&pipe->data, pipe, &key, end); + rc = wait_for(&pipe->data, pipe, &key, end, &need_resched); if (rc != 0) { if (rc == -EAGAIN) { rc = buf.used ? buf.used : -EAGAIN; @@ -236,7 +246,11 @@ int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout } exit: SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc); - k_spin_unlock(&pipe->lock, key); + if (need_resched) { + z_reschedule(&pipe->lock, key); + } else { + k_spin_unlock(&pipe->lock, key); + } return rc; }