Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when_all/split: operation state might get prematurely destroyed when child completes synchronously inside its stop callback #300

Open
jiixyj opened this issue Oct 23, 2024 · 8 comments

Comments

@jiixyj
Copy link

jiixyj commented Oct 23, 2024

I may have stumbled across a nasty lifetime issue in the handling of stop callbacks in the when_all/split algorithms. But this would apply to any algorithm using a inplace_stop_source inside its operation state.

when_all has a inplace_stop_source inside its operation state, and then a stop callback, like this:

struct when_all_opstate {
    // ...
    inplace_stop_source stop_src{};
    // ...
    optional<stop_callback> on_stop{nullopt};
    // ...
};

In on_stop, a stop callback is registered, which calls stop_src.request_stop() when there is a stop request on the receiver's stop token. All child senders of the when_all are registered on the stop_src. This propagates the stop request from the receiver to all children of the when_all sender.

Now, what I observed is the following chain of events:

  • receiver's stop token triggers
  • on_stop callback calls stop_src.stop_requested()
  • the stop_src now iterates over its list of registered stop callbacks (those are the ones from the children) (*)
  • the last child calls set_stopped synchronously from inside its stop callback. In my case it's from deregistering a sleep timer from an self-written epoll-based "io context", something like this:
                struct stop_cb {
                    opstate* op;
                    void operator()() noexcept
                    {
                        if (op->loop_->deregister_deadline(op)) {
                            op->set_stopped();
                        } else {
                            // lost the race, main loop will call `set_value()`
                        }
                    }
                };
  • now, arrive() of the when_all opstate gets called
  • complete() of the when_all opstate gets called
  • ex::set_stopped is called, satisfying the receiver contract of the when_all sender
  • the when_all opstate is synchronously (!) destroyed from inside the set_stopped callback by some follow up work.
    -> UB, since we are still iterating over the list of registered stop callbacks of stop_src! (the line marked with "*" above)

I've managed to "hack around" it by doing some checking of thread id's and deferring the completion to the stop callback if I detect that the completion is called synchronously from inside a stop callback. So something like this:

template <typename Opstate>
struct on_stop_request_with_thread_id {
public:
    void operator()() const noexcept
    {
        id_->store(std::this_thread::get_id(), std::memory_order::relaxed);
        op_->request_stop();
        if (id_->load(std::memory_order::relaxed) == std::thread::id{}) {
            op_->deferred_complete();
            return;
        }
        id_->store(std::thread::id{}, std::memory_order::relaxed);
    }

public: // NOLINT enable aggregate initialization
    Opstate* op_;
    std::atomic<std::thread::id>* id_;
};

export template <typename Opstate, typename Token>
struct stop_callback_with_thread_id {
public:
    bool should_defer_completion()
    {
        if (id_.load(std::memory_order::relaxed) != std::this_thread::get_id()) {
            return false;
        }

        id_.store(std::thread::id{}, std::memory_order::relaxed);
        return true;
    }

    void reset() { on_stop_.reset(); }

    void emplace(Token token, Opstate* op)
    {
        on_stop_.emplace(std::move(token), on_stop_request_with_thread_id<Opstate>{op, &id_});
    }

private:
    using stop_callback_t = ex::stop_callback_for_t<Token, on_stop_request_with_thread_id<Opstate>>;

    std::atomic<std::thread::id> id_{};
    std::optional<stop_callback_t> on_stop_ = {};
};

...and then using stop_callback_with_thread_id instead of optional<stop_callback> inside when_all's opstate, and having a complete() like this:

            void complete(Rcvr& rcvr) noexcept
            {
                // If we are completing synchronously from inside a stop
                // callback, defer completion.
                if (on_stop.should_defer_completion()) {
                    this->deferred_rcvr = &rcvr;
                    return;
                }

                complete_impl(rcvr);
            }

This all feels very hacky to me, though.

I haven't deeply investigated split yet, but I think the solution could be a bit simpler there, by using a stop callback like this instead of on-stop-request:

template <typename SharedState>
struct split_on_stop_request {
    void operator()() const noexcept
    {
        shared_state_->inc_ref();
        shared_state_->stop_src.request_stop();
        shared_state_->dec_ref();
    }

    SharedState* shared_state_;
};

...i.e. just wrapping the request_stop() between inc_ref/dec_ref to ensure the opstate object stays alive long enough.

I do wonder if there is a more elegant way to solve this issue. I don't think synchronous completions from stop callbacks should be outlawed -- it seems "natural" to me to do the set_stopped right inside the stop callback if possible. Or maybe synchronous destruction from inside the set_stopped completion of when_all is the problem? I've thought that you have to assume the lifetime of the opstate may end when calling the completion, though.

@dietmarkuehl
Copy link
Collaborator

The scenario seems plausible. I think a nice way to work around the early destruction could be to increment the number of expected completions before iterating: the iteration over the children is conceptually an outstanding task. Once that is done the count is decremented and the appropriate completion is triggered if all outstanding work is completed.

@lewissbaker
Copy link
Collaborator

This seems eerily similar to an issue reported in libunifex some time back:
facebookexperimental/libunifex#445

@dietmarkuehl
Copy link
Collaborator

The standard doesn't have that problem [yet?]: it sets up a callback using on-stop-request (see [exec.when.all p12]) which isn't defined/described. However, only when_all's stop source is passed in, ie., there is no option to play tricks with the count.

@jiixyj
Copy link
Author

jiixyj commented Oct 23, 2024

The scenario seems plausible. I think a nice way to work around the early destruction could be to increment the number of expected completions before iterating: the iteration over the children is conceptually an outstanding task. Once that is done the count is decremented and the appropriate completion is triggered if all outstanding work is completed.

This is an excellent suggestion, thanks! Now, looking at libunifex's implementation, this is how they fixed it as well.

This seems eerily similar to an issue reported in libunifex some time back: facebookexperimental/libunifex#445

Yep, seems to be the exact same issue. Thanks for the pointer! In the comments they mention stop_when as well, which is interesting, because this is where I originally stumbled across this (trying to implement the exposition-only stop_when sender algorithm from P3149R6). I was implementing it in a naive way, without stashing the original sender's result in a result_variant in the opstate. But I guess there's no way around it because of this lifetime issue.

Reading P3409R0 earlier, I had hopes that maybe single_inplace_stop_source could fix it, because it wouldn't need to loop around the list of stop callbacks as there is just a single one. But it needs to do some book-keeping after calling the stop callback, so this is sadly not a correctness fix:

  inline bool single_inplace_stop_source::request_stop() noexcept {
...
      callback->execute(callback);

      state_.store(stop_requested_callback_done_state(), memory_order_release); // <<< this might access the opstate after its lifetime ended
      state_.notify_one();
    }

    return true;
  }

@jiixyj
Copy link
Author

jiixyj commented Oct 23, 2024

The standard doesn't have that problem [yet?]: it sets up a callback using on-stop-request (see [exec.when.all p12]) which isn't defined/described. However, only when_all's stop source is passed in, ie., there is no option to play tricks with the count.

It is mentioned in [exec.snd.expos]: https://eel.is/c++draft/exec#snd.expos-16

@dietmarkuehl
Copy link
Collaborator

It is mentioned in [exec.snd.expos]: https://eel.is/c++draft/exec#snd.expos-16

I didn't look there! Thanks for pointing that out. The implication is, of course, that the standard does have the problem. It may be reasonable to factor out the counting behavior and the stop callback handling into a separate entity used in relevant places: on-stop-request is also used for split (and other similar algorithms would use the same approach).

@jiixyj
Copy link
Author

jiixyj commented Oct 24, 2024

Here is one possible outline of a fix for the standard. It is a bit fiddly because of the "basic sender" infrastructure, but not too bad I feel.

  • Make on-stop-request generic over "operation state like" types:
template <class State>
struct on-stop-request {
    State& state;
    void operator()() noexcept { state.request_stop(); }
};

The idea is that every operation state can customize how it wants stop requests to be handled.

  • Add a request_stop() member function to basic-state:
  template<class Sndr, class Rcvr>
  struct basic-state {                                          // exposition only
    basic-state(Sndr&& sndr, Rcvr&& rcvr) noexcept(see below)
      : rcvr(std::move(rcvr))
      , state(impls-for<tag_of_t<Sndr>>::get-state(std::forward<Sndr>(sndr), rcvr)) { }

    Rcvr rcvr;                                                  // exposition only
    state-type<Sndr, Rcvr> state;                               // exposition only

    void request_stop() noexcept
        requires requires { state.request_stop(rcvr); }
    {
        state.request_stop(rcvr);
    }
  };

...so basic-state handles stop requests by delegating to a possible request_stop() function of the "local" state that takes the receiver as an argument (needed by when_all).

  • In basic-operation, give the basic-state object to start instead of the local state and the receiver separately:
    void start() & noexcept
    {
        inner_ops.apply(
            [this](auto&... ops) { impls_for<tag_t>::start(*static_cast<basic_state<Sndr, Rcvr>*>(this), ops...); });
    }
  • Change all start functions to take the basic-state instead of local state and receiver separately. Now, the sender can register a stop callback by using ex::on_stop_request{basic_state}. To get the previous rcvr and state arguments back it can destructure the basic_state argument.
    static constexpr auto start = []<class... Ops>(auto& basic_state, Ops&... ops) noexcept -> void {
        auto& [rcvr, state] = basic_state;

        state.on_stop.emplace(ex::get_stop_token(ex::get_env(rcvr)), ex::on_stop_request{basic_state});
  • For when_all, add a request_stop to its state-type:
            void request_stop(Rcvr& rcvr) noexcept
            {
                if (this->count++ == 0) {
                    return;
                }
                this->stop_src.request_stop();
                this->arrive(rcvr);
            }
  • For split add a request_stop() to its local-state:
    void request_stop(Rcvr&) noexcept { sh_state->request_stop(); }

...and add a request_stop() to its shared-state:

    void request_stop() noexcept
    {
        this->inc_ref();
        this->stop_src.request_stop();
        this->dec_ref();
    }

There is a simpler way that only involves the "local" state class, but then when_all would need to save a pointer/reference to the receiver in its local state, which is not needed in the approach above.

Here is a standalone example/test demonstrating the problem for when_all (also works for split if you replace when_all by it):

namespace tst {

template <class State>
class on_stop_request {
public:
    void operator()() const noexcept { state_->request_stop(); }

public: // NOLINT enable aggregate initialization
    State* state_;
};

class wait_forever_sender {
public:
    using sender_concept = ex::sender_t;

    using completion_signatures = ex::completion_signatures<ex::set_stopped_t()>;

    template <typename Rcvr>
    ex::operation_state auto connect(Rcvr&& rcvr) noexcept(
        std::is_nothrow_constructible_v<std::remove_cvref_t<Rcvr>, Rcvr>)
    {
        return opstate{std::forward<Rcvr>(rcvr)};
    }

private:
    template <typename Rcvr>
    class opstate {
    public:
        using operation_state_concept = ex::operation_state_t;

        void start() & noexcept
        {
            on_stop_.emplace(ex::get_stop_token(ex::get_env(rcvr_)), tst::on_stop_request{this});
        }

        void request_stop()
        {
            on_stop_.reset();
            ex::set_stopped(std::move(rcvr_));
        }

    private:
        using stop_callback =
            ex::stop_callback_for_t<ex::stop_token_of_t<ex::env_of_t<Rcvr>>, tst::on_stop_request<opstate>>;

    public: // NOLINT enable aggregate initialization
        Rcvr rcvr_;
        std::optional<stop_callback> on_stop_ = {};
    };
};

ex::sender auto wait_forever()
{
    return wait_forever_sender{};
}

struct call_function_on_stop_receiver {
public:
    using receiver_concept = ex::receiver_t;

    explicit call_function_on_stop_receiver(std::function<void()>* f, ex::inplace_stop_token stoken)
        : f_{f}, stoken_{stoken} { };

    call_function_on_stop_receiver(call_function_on_stop_receiver&& other) noexcept
        : f_{std::move(other.f_)}, stoken_{std::move(other.stoken_)}, called_{other.called_.exchange(true)} { };
    call_function_on_stop_receiver& operator=(call_function_on_stop_receiver&& other) = delete;
    ~call_function_on_stop_receiver() { assert(called_); }

    void set_value() && noexcept { std::terminate(); }
    void set_error(std::exception_ptr) && noexcept { std::terminate(); }
    void set_stopped() && noexcept
    {
        called_ = true;
        (*f_)();
    }

    auto get_env() const noexcept { return ex::prop{ex::get_stop_token, stoken_}; }

private:
    std::function<void()>* f_;
    ex::inplace_stop_token stoken_;
    std::atomic<bool> called_ = false;
};

} // namespace tst

int main()
{
    ex::inplace_stop_source ssource{};

    std::function<void()> f;

    auto* op = new auto(
        ex::connect(ex::when_all(tst::wait_forever()), tst::call_function_on_stop_receiver{&f, ssource.get_token()}));

    f = [&] { delete op; }; // NOLINT(cppcoreguidelines-owning-memory)
    ex::start(*op);

    ssource.request_stop();
}

@dietmarkuehl
Copy link
Collaborator

I have implemented something akin to the proposed fix here. I plan to write that up as LWG issue together with a corresponding proposed fix. I'll post the issue here once I have created it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants