-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_pipeline.py
83 lines (62 loc) · 2.23 KB
/
run_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import pandas as pd
import sqlalchemy as sa
from pydiverse.pipedag import Flow, Stage, Table, materialize
from pydiverse.pipedag.context import StageLockContext
from pydiverse.pipedag.util import setup_structlog
@materialize(lazy=True)
def lazy_task_1():
return sa.select([sa.literal(1).label("x"), sa.literal(2).label("y")])
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_2(input1: sa.Table, input2: sa.Table):
query = sa.select([(input1.c.x * 5).label("x5"), input2.c.a]).select_from(
input1.outerjoin(input2, input2.c.x == input1.c.x)
)
return Table(query, name="task_2_out", primary_key=["a"])
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_3(input1: sa.Table, my_stage: Stage):
return sa.text(f"SELECT * FROM {my_stage.transaction_name}.{input1.name}")
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_4(input1: sa.Table, prev_stage: Stage):
return sa.text(f"SELECT * FROM {prev_stage.name}.{input1.name}")
@materialize(nout=2, version="1.0.0")
def eager_inputs():
dfA = pd.DataFrame(
{
"a": [0, 1, 2, 4],
"b": [9, 8, 7, 6],
}
)
dfB = pd.DataFrame(
{
"a": [2, 1, 0, 1],
"x": [1, 1, 2, 2],
}
)
return Table(dfA, "dfA"), Table(dfB, "dfB_%%")
@materialize(version="1.0.0", input_type=pd.DataFrame)
def eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):
return tbl1.merge(tbl2, on="x")
def main():
with Flow() as f:
with Stage("stage_1"):
lazy_1 = lazy_task_1()
a, b = eager_inputs()
with Stage("stage_2") as stage2:
lazy_2 = lazy_task_2(lazy_1, b)
lazy_3 = lazy_task_3(lazy_2, stage2)
eager = eager_task(lazy_1, b)
with Stage("stage_3"):
lazy_4 = lazy_task_4(lazy_2, stage2)
_ = lazy_3, lazy_4, eager # unused terminal output tables
# Run flow
result = f.run()
assert result.successful
# Run in a different way for testing
with StageLockContext():
result = f.run()
assert result.successful
assert result.get(lazy_1, as_type=pd.DataFrame)["x"][0] == 1
if __name__ == "__main__":
# initialize logging
setup_structlog()
main()