Skip to content

Commit

Permalink
flux-exec: fix credits list update race
Browse files Browse the repository at this point in the history
Problem: When a subprocess exits / fails, it is removed from the
"subprocess credits" list.  However, it is possible this exit / failure
is not seen yet before the subprocess credits is updated during a stdin
callback.  This can lead to the subprocess credits list calculating an
an invalid "min credits".  This can lead to the stdin callback being
started when it should not be.

Solution: If the number of "active" subprocesses does not equal the
number of subprocesses on the credits list, remove exited/failed credits
from the credits list in stdin_cb().

Fixes #6542
  • Loading branch information
chu11 committed Jan 13, 2025
1 parent 619cccf commit e38bb34
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,11 @@ void subprocess_update_credits (flux_subprocess_t *p, int bytes, bool reorder)
void subprocess_remove_credits (flux_subprocess_t *p)
{
struct subproc_credit *spcred = flux_subprocess_aux_get (p, "credits");
if (zlistx_delete (subprocess_credits, spcred->handle) < 0)
log_err_exit ("zlistx_delete");
if (spcred->handle) {
if (zlistx_delete (subprocess_credits, spcred->handle) < 0)
log_err_exit ("zlistx_delete");

Check warning on line 195 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L195

Added line #L195 was not covered by tests
spcred->handle = NULL;
}
}

void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
Expand Down Expand Up @@ -306,23 +309,46 @@ static void stdin_cb (flux_reactor_t *r,
log_err_exit ("fbuf_read");

if (lenp) {
int write_count = 0;
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if ((len = flux_subprocess_write (p, "stdin", ptr, lenp)) < 0)
log_err_exit ("flux_subprocess_write");
if (stdin_enable_flow_control) {
/* N.B. since we are subtracting the same number
* of credits from all subprocesses, the sorted
* order in the credits list should not change
/* N.B. normally we are subtracting the same
* number of credits from all active subprocesses,
* so the sorted order in the credits list should
* not change. See possible corner case below.
*/
subprocess_update_credits (p, -1*len, false);
}
write_count++;
}
p = zlistx_next (subprocesses);
}
if (stdin_enable_flow_control) {
/* N.B. under a racy scenario stdin_cb() could be called
* before state_cb(), so that a subprocess that has exited
* has not been "recognized" as exited.
*
* This can lead to subprocesses in the subprocess_credits
* list not having their credits updated above and
* subprocess_min_credits() returning an invalid value.
*
* Correct this by removing exited/failed subprocesses
* from the credits list, just like in state_cb().
*/
if (write_count != zlistx_size (subprocess_credits)) {
p = zlistx_first (subprocesses);
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_EXITED
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_FAILED)
subprocess_remove_credits (p);
p = zlistx_next (subprocesses);

Check warning on line 349 in src/cmd/flux-exec.c

View check run for this annotation

Codecov / codecov/patch

src/cmd/flux-exec.c#L344-L349

Added lines #L344 - L349 were not covered by tests
}
}
min_credits = subprocess_min_credits ();
if (min_credits == 0)
flux_watcher_stop (stdin_w);
Expand Down

0 comments on commit e38bb34

Please sign in to comment.