Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort keys and fold resubmit steps by virtue of dflow #269

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 40 additions & 133 deletions dpgen2/entrypoint/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,136 +808,43 @@ def print_list_steps(
return "\n".join(ret)


def successful_step_keys(wf):
all_step_keys = []
steps = wf.query_step()
# For reused steps whose startedAt are identical, sort them by key
steps.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
for step in steps:
if step.key is not None and step.phase == "Succeeded":
all_step_keys.append(step.key)
return all_step_keys


def get_superop(key):
if "prep-train" in key:
return key.replace("prep-train", "prep-run-train")
elif "run-train-" in key:
return re.sub("run-train-[0-9]*", "prep-run-train", key)
elif "prep-lmp" in key:
return key.replace("prep-lmp", "prep-run-explore")
elif "run-lmp-" in key:
return re.sub("run-lmp-[0-9]*", "prep-run-explore", key)
elif "prep-fp" in key:
return key.replace("prep-fp", "prep-run-fp")
elif "run-fp-" in key:
return re.sub("run-fp-[0-9]*", "prep-run-fp", key)
elif "prep-caly-input" in key:
return key.replace("prep-caly-input", "prep-run-explore")
elif "collect-run-calypso-" in key:
return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-dp-optim-" in key:
return re.sub("prep-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "run-dp-optim-" in key:
return re.sub("run-dp-optim-[0-9]*-[0-9]*-[0-9]*", "prep-run-explore", key)
elif "prep-caly-model-devi" in key:
return key.replace("prep-caly-model-devi", "prep-run-explore")
elif "run-caly-model-devi" in key:
return re.sub("run-caly-model-devi-[0-9]*", "prep-run-explore", key)
elif "caly-evo-step" in key:
return re.sub("caly-evo-step-[0-9]*", "prep-run-explore", key)
elif "diffcsp-gen-" in key:
return re.sub("diffcsp-gen-[0-9]*", "prep-run-explore", key)
elif "prep-relax" in key:
return re.sub("prep-relax", "prep-run-explore", key)
elif "run-relax-" in key:
return re.sub("run-relax-[0-9]*", "prep-run-explore", key)
return None


def fold_keys(all_step_keys):
folded_keys = {}
for key in all_step_keys:
is_superop = False
for superop in ["prep-run-train", "prep-run-explore", "prep-run-fp"]:
if superop in key:
if key not in folded_keys:
folded_keys[key] = []
is_superop = True
break
if is_superop:
continue
superop = get_superop(key)
# if its super OP is succeeded, fold it into its super OP
if superop is not None and superop in all_step_keys:
if superop not in folded_keys:
folded_keys[superop] = []
folded_keys[superop].append(key)
else:
folded_keys[key] = [key]
for k, v in folded_keys.items():
if v == []:
folded_keys[k] = [k]
return folded_keys


def get_resubmit_keys(
wf,
):
all_step_keys = successful_step_keys(wf)
step_keys = [
"prep-run-train",
"prep-train",
"run-train",
"prep-caly-input",
"prep-caly-model-devi",
"run-caly-model-devi",
"prep-run-explore",
"prep-lmp",
"run-lmp",
"diffcsp-gen",
"prep-relax",
"run-relax",
wf_info = wf.query()
all_steps = [
step
for step in wf_info.get_step(sort_by_generation=True)
if step.key is not None
]
super_keys = ["prep-run-train", "prep-run-explore", "prep-run-fp"]
other_keys = [
"select-confs",
"prep-run-fp",
"prep-fp",
"run-fp",
"collect-data",
"scheduler",
"id",
]
if (
len(
matched_step_key(
all_step_keys,
[
"collect-run-calypso",
"prep-dp-optim",
"run-dp-optim",
],
)
)
> 0
):
# calypso default mode
step_keys += [
"collect-run-calypso",
"prep-dp-optim",
"run-dp-optim",
]
else:
# calypso merge mode
step_keys.append("caly-evo-step")

all_step_keys = matched_step_key(
all_step_keys,
step_keys,
)
all_step_keys = sort_slice_ops(
all_step_keys,
["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"],
)
folded_keys = fold_keys(all_step_keys)
folded_keys = {}
for step in all_steps:
if len(matched_step_key([step.key], super_keys)) > 0:
sub_steps = wf_info.get_step(parent_id=step.id, sort_by_generation=True)
sub_keys = [
step.key
for step in sub_steps
if step.key is not None and step.phase == "Succeeded"
]
sub_keys = sort_slice_ops(
sub_keys,
["run-train", "run-lmp", "run-fp", "diffcsp-gen", "run-relax"],
)
if step.phase == "Succeeded":
folded_keys[step.key] = sub_keys
else:
for key in sub_keys:
folded_keys[key] = [key]
elif len(matched_step_key([step.key], other_keys)) > 0:
Comment on lines +830 to +846
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential empty sub_steps to prevent errors.

When retrieving sub_steps, there is a possibility that it could be empty. Attempting to process an empty list without checking could lead to unexpected behavior or errors.

Consider adding a check to ensure sub_steps is not empty before proceeding:

if not sub_steps:
    continue

Comment on lines +841 to +846
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor conditional logic to reduce duplication.

The conditional statements within the loop assign values to folded_keys in both branches. This can be refactored to reduce code duplication and enhance readability.

Refactor the code as follows:

-            if step.phase == "Succeeded":
-                folded_keys[step.key] = sub_keys
-            else:
-                for key in sub_keys:
-                    folded_keys[key] = [key]
+            if step.phase == "Succeeded":
+                keys_to_add = {step.key: sub_keys}
+            else:
+                keys_to_add = {key: [key] for key in sub_keys}
+            folded_keys.update(keys_to_add)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if step.phase == "Succeeded":
folded_keys[step.key] = sub_keys
else:
for key in sub_keys:
folded_keys[key] = [key]
elif len(matched_step_key([step.key], other_keys)) > 0:
if step.phase == "Succeeded":
keys_to_add = {step.key: sub_keys}
else:
keys_to_add = {key: [key] for key in sub_keys}
folded_keys.update(keys_to_add)
elif len(matched_step_key([step.key], other_keys)) > 0:

folded_keys[step.key] = [step.key]
return folded_keys


Expand All @@ -955,7 +862,12 @@ def resubmit_concurrent_learning(

old_wf = Workflow(id=wfid)
folded_keys = get_resubmit_keys(old_wf)
all_step_keys = sum(folded_keys.values(), [])
all_step_keys = []
super_keys = {}
for super_key, keys in folded_keys.items():
all_step_keys += keys
for key in keys:
super_keys[key] = super_key
Comment on lines +865 to +870
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure indices are within valid range when accessing all_step_keys.

When accessing all_step_keys[ii] using indices from reuse_idx, there's a risk of an IndexError if an index is out of range. This can happen if reuse contains invalid indices.

Add a validation check to ensure indices are within the valid range:

max_index = len(all_step_keys) - 1
for idx in reuse_idx:
    if idx < 0 or idx > max_index:
        raise IndexError(f"Index {idx} is out of range for all_step_keys.")


if list_steps:
prt_str = print_keys_in_nice_format(
Expand All @@ -971,21 +883,16 @@ def resubmit_concurrent_learning(
if fold:
reused_folded_keys = {}
for key in reused_keys:
superop = get_superop(key)
if superop is not None:
if superop not in reused_folded_keys:
reused_folded_keys[superop] = []
reused_folded_keys[superop].append(key)
else:
reused_folded_keys[key] = [key]
super_key = super_keys[key]
if super_key not in reused_folded_keys:
reused_folded_keys[super_key] = []
reused_folded_keys[super_key].append(key)
Comment on lines +886 to +889
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential KeyError when accessing super_keys.

In the loop, super_key = super_keys[key] assumes that key exists in super_keys. If key is not found, a KeyError will be raised.

Modify the code to safely access super_keys:

-            super_key = super_keys[key]
+            super_key = super_keys.get(key)
+            if super_key is None:
+                continue  # or handle the missing key appropriately
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
super_key = super_keys[key]
if super_key not in reused_folded_keys:
reused_folded_keys[super_key] = []
reused_folded_keys[super_key].append(key)
super_key = super_keys.get(key)
if super_key is None:
continue # or handle the missing key appropriately
if super_key not in reused_folded_keys:
reused_folded_keys[super_key] = []
reused_folded_keys[super_key].append(key)

for k, v in reused_folded_keys.items():
# reuse the super OP iif all steps within it are reused
if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]):
if set(v) == set(folded_keys[k]):
reused_folded_keys[k] = [k]
reused_keys = sum(reused_folded_keys.values(), [])
reuse_step = old_wf.query_step(key=reused_keys)
# For reused steps whose startedAt are identical, sort them by key
reuse_step.sort(key=lambda x: "%s-%s" % (x.startedAt, x.key))
reuse_step = old_wf.query_step(key=reused_keys, sort_by_generation=True)

wf = submit_concurrent_learning(
wf_config,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ classifiers = [
dependencies = [
'numpy',
'dpdata>=0.2.20',
'pydflow>=1.8.95',
'pydflow>=1.8.97',
'dargs>=0.3.1',
'scipy',
'lbg',
Expand Down
Loading