diff --git a/burr/core/application.py b/burr/core/application.py index 1b57afc2..9cbbcb1b 100644 --- a/burr/core/application.py +++ b/burr/core/application.py @@ -180,17 +180,11 @@ async def _arun_function( def _state_update(state_to_modify: State, modified_state: State) -> State: - """This is a hack to apply state updates and ensure that we are respecting deletions. Specifically, the process is: - - 1. We subset the state to what we want to read - 2. We perform a set of state-specific writes to it - 3. We measure which ones were deleted - 4. We then merge the whole state back in - 5. We then delete the keys that were deleted + """Performs a state update for an action -- allowing for deletes. This is suboptimal -- we should not be observing the state, we should be using the state commands and layering in deltas. That said, we currently eagerly evaluate the state at all operations, which means we have to do it this way. See - https://github.com/DAGWorks-Inc/burr/issues/33 for a more details plan. + https://github.com/DAGWorks-Inc/burr/issues/33 for a more detailed plan. This function was written to solve this issue: https://github.com/DAGWorks-Inc/burr/issues/28. @@ -200,13 +194,23 @@ def _state_update(state_to_modify: State, modified_state: State) -> State: :param state_to_modify: The state to modify-- this is the original :return: """ - old_state_keys = set(state_to_modify.keys()) - new_state_keys = set(modified_state.keys()) - deleted_keys = list(old_state_keys - new_state_keys) + # We want to wipe these, as these should never be changed by the action + # an action that modifies these is going to incur "undefined behavior" -- we can effectively + # do anything we want -- in this case we're dropping it, but we may error out in the future + # This is really an issue in the case of parallelism -- E.G. when an action reduces multiple states + # a common failure mode is to modify. # TODO -- unify the logic of choosing whether a key is internal or not - # Right now this is __sequence_id and __prior_step, but it could be more - deleted_keys_filtered = [item for item in deleted_keys if not item.startswith("__")] - return state_to_modify.merge(modified_state).wipe(delete=deleted_keys_filtered) + private_fields = [item for item in modified_state.keys() if item.startswith("__")] + modified_state_without_private_fields = modified_state.wipe(delete=private_fields) + # we filter here as we don't want to + deleted_keys = [ + item + for item in ( + set(state_to_modify.keys()) - set(modified_state_without_private_fields.keys()) + ) + if not item.startswith("__") + ] + return state_to_modify.merge(modified_state_without_private_fields).wipe(delete=deleted_keys) def _validate_reducer_writes(reducer: Reducer, state: State, name: str) -> None: diff --git a/docs/concepts/state.rst b/docs/concepts/state.rst index a710af71..b5e87413 100644 --- a/docs/concepts/state.rst +++ b/docs/concepts/state.rst @@ -27,6 +27,11 @@ State manipulation is done through calling methods on the ``State`` class. The m The ``State`` object can only be treated immutably! Calling ``state.update(foo=bar)`` will do nothing if you don't use the value returned by the call. +.. warning:: + + State contains a set of "private" variables that start with `__`. -- E.G. `__SEQUENCE_ID`. These are internal to Burr, used by the application to track state. + Any modifications to them outside of the framework is considered undefined behavior -- it could be dropped, or error out (we reserve the right to alter this later). + The read operations extend from those in the `Mapping `_ interface, but there are a few extra: