-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
113 lines (87 loc) · 3.36 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# SPDX-FileCopyrightText: 2023-2024 DeepLime <[email protected]>
# SPDX-License-Identifier: MIT
#####################################
# !! DO NOT EDIT THIS FILE !! #
#####################################
import argparse
import json
import logging
import os
from importlib import import_module
from typing import Dict, List
from onecode import (
ConfigOption,
Env,
Logger,
Mode,
Project,
register_ext_module
)
def main(
data: Dict = None,
flow_name: str = None,
logger: logging.Handler = None
):
"""
Starts the OneCode project with the given options.
Args:
data: Initialize the Project with data before execution (typically
`Mode.LOAD_THEN_EXECUTE`).
flow_name: Execute only the flow specified by its ID. If None provided, all flows will run.
logger: Add a logging handler to the OneCode Logger.
Raises:
FileNotFoundError: if the OneCode project configuration file is not found.
"""
cur_dir = os.path.dirname(__file__)
config_file = os.path.join(cur_dir, Env.ONECODE_CONFIG_FILE)
if not os.path.exists(config_file):
raise FileNotFoundError(
"""
Missing OneCode config files, it looks like someone tampered with the wrong file...
You may recreate the files according to the documentation specs.
"""
)
# register elements from OneCode inline extensions if any
globals()['onecode_ext'] = register_ext_module()
Project().data = data
Logger().reset()
Logger().add_handler(logger)
with open(config_file) as f:
workflows = json.load(f)
all_manifests = []
for wfl in workflows:
flow_file = wfl['file']
if flow_name is None or flow_name == flow_file:
if not os.path.exists(os.path.join(cur_dir, 'flows', f'{flow_file}.py')):
print(f"Registered flow {wfl['label']} ({flow_file}.py) doesn't exist => skipping")
else:
Project().current_flow = flow_file
# clear any previous MANIFEST.txt output
manifest = Project().get_output_manifest()
if os.path.exists(manifest):
os.remove(manifest)
flow = import_module(f"flows.{flow_file}")
flow.run()
all_manifests.append(manifest)
return all_manifests[0] if len(all_manifests) == 1 else all_manifests
def _main(raw_args: List[str] = None):
parser = argparse.ArgumentParser(description='Use optional JSON parameters file')
parser.add_argument('--flow', default=None, help='Specify the flow to run')
parser.add_argument('--flush', action="store_true", help='Flush the logs immediately')
parser.add_argument('file', nargs='?', help='Path to the input JSON file')
args = parser.parse_args(raw_args)
data = None
if args.file is not None:
if not os.path.exists(args.file):
raise FileNotFoundError(f'Input parameters file {args.file} does not exist')
print(f'Using provided parameter file: {args.file}')
with open(args.file, 'r') as f:
data = json.load(f)
Project().mode = Mode.LOAD_THEN_EXECUTE
else:
Project().mode = Mode.EXECUTE
if args.flush:
Project().set_config(ConfigOption.FLUSH_STDOUT, True)
main(data, args.flow)
if __name__ == '__main__':
_main()