Dynamic execution for solving equations #6017
-
Hi all! I am exploring Dagster as a proof-of-concept for some scientific computing. I have some example (working) Dagster code below, and I am wondering whether it would be possible to extend the Dagster code I wrote below to meet my use case. My use case is that I am given a (predetermined) set of equations in several variables and I want to solve for the unknown variables given particular values of known variables, allowing for any combination of variables to be the known ones. For Dagster, I imagine this corresponds vaguely to a question of whether it's possible to dynamically (and repeatedly/recursively) alter the execution flow. The first op to run would depend on (I imagine) the op execution context, and each op to run after that would depend on the outputs from the previous ops as well as the op execution context. Assumptions:
My code below demonstrates, for a particular set of equations (in 4 variables—see code comment below), how I would attempt this problem if the question of which variables are known had a predetermined answer (thus falling short of satisfying my use case). Separately, given values for known variables, there is always some DAG consisting entirely of As a side note, this feature is available in (and seemingly the hallmark feature of) a different Python package from math import log, exp
from dagster import op, job
from typing import List
"""
A / B = C
C = exp(D)
"""
@op
def resolve_00(A: List[float], B: List[float]) -> List[float]:
return [a / b for (a, b) in zip(A, B)] # C
@op
def resolve_01(C: List[float], B: List[float]) -> List[float]:
return [c * b for (c, b) in zip(C, B)] # A
@op
def resolve_02(C: List[float], A: List[float]) -> List[float]:
return [a / c for (c, a) in zip(C, A)] # B
@op
def resolve_10(D: List[float]) -> List[float]:
return list(map(exp, D)) # C
@op
def resolve_11(C: List[float]) -> List[float]:
return list(map(log, C)) # D
@op
def start_A() -> List[float]:
return [2., 4., 6.]
@op
def start_B() -> List[float]:
return [1., 2., 3.]
@op
def display(context, l):
context.log.info(f"Resolved to {l}.")
@job
def main():
A = start_A()
B = start_B()
C = resolve_00(A, B)
D = resolve_11(C)
display(C)
if __name__ == "__main__":
result = main.execute_in_process(
run_config={
"loggers": {
"console": {"config": {"log_level": "INFO"}}
}
}
) |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
This should actually be possible (but potentially quite difficult) using conditional branching. Dagster requires that the structure of the DAG be static through the duration of a given run, but the subset of a DAG that gets executed can be determined dynamically at runtime. So conceptually, if you define a DAG that contains all necessary sub-DAGs, then you've sort of solved your problem (if you can determine which sub-DAG you need at runtime). Your graph could start off with an output for each of your variables A, B, C, D. These Outs could be marked with is_required=False, and only fired if they are available for a given run. Downstream ops consuming these outputs would only execute if each of their inputs were fired. However, in practice I think it would be quite difficult to write such a DAG in a way that guaranteed that you wouldn't redo work you had already done. The graph would need multiple copies of each op, and get pretty complex if you had an appreciable number of variables you wanted to handle. Another option would be to move a bit more of the control flow logic to the body of the op rather than have Dagster manage all of it. The main benefit of doing it this way is mostly that it would likely be significantly easier to write this code (at least from my perspective). At the extreme end would be putting all of the necessary logic inside a single op, but you could also make different tradeoffs. You could for example have a graph that was just a bunch of copies of this op chained together:
I think the right approach would just depend on what benefits you hope to get out of using multiple ops over a single function. Happy to discuss more though :) |
Beta Was this translation helpful? Give feedback.
This should actually be possible (but potentially quite difficult) using conditional branching. Dagster requires that the structure of the DAG be static through the duration of a given run, but the subset of a DAG that gets executed can be determined dynamically at runtime. So conceptually, if you define a DAG that contains all necessary sub-DAGs, then you've sort of solved your problem (if you can determine which sub-DAG you need at runtime). Your graph could start off with an output for each of your variables A, B, C, D. These Outs could be marked with is_required=False, and only fired if they are available for a given run. Downstream ops consuming these outputs would only execute if ea…