Skip to content

Commit

Permalink
Use partial functions in flows to increase user flexibility in settin…
Browse files Browse the repository at this point in the history
…g job attributes (e.g. executor) (#1359)

Continuation of #1322.

Challenges:
- Mainly Dask, which does not play nicely when `functools.partial()` is
applied to a `Delayed` object. See
dask/dask#10707. There is now a workaround.
- Also there is a minor issue with dynamic switching of Covalent
executors, but there is a workaround. See
AgnostiqHQ/covalent#1889

Remaining issue:

Need to be able to parallelize the following code on Dask.

```python
from dask.distributed import Client
from ase.build import bulk
from quacc.recipes.emt.slabs import bulk_to_slabs_flow
from functools import partial

client = Client()

atoms = bulk("Cu")
delayed = bulk_to_slabs_flow(atoms)
result = client.gather(client.compute(delayed))
```

Hint: To monitor, set `"logfile": ""`.

---------

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
  • Loading branch information
Andrew-S-Rosen and deepsource-autofix[bot] authored Dec 18, 2023
1 parent 284f954 commit 9bcb488
Show file tree
Hide file tree
Showing 32 changed files with 316 additions and 230 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Added

- The `WORKFLOW_ENGINE` quacc setting now accepts `None`
- The `WORKFLOW_ENGINE` quacc setting now accepts `None`.
- A `DEBUG` quacc setting as been added.

### Changed

- The way to run complex, dynamic flows has been modified to rely on `functools.partial()` instead of kwargs. See the updated documentation.
- Refactored test suite

## [0.4.5]
Expand Down
2 changes: 1 addition & 1 deletion docs/user/basics/wflow_syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ To help enable interoperability between workflow engines, quacc offers a unified
<center>

| Quacc | Covalent |
| ------------------- | ---------------------------------|
| ------------------- | --------------------------------- |
| `#!Python @job` | `#!Python @delayed` |
| `#!Python @flow` | No effect |
| `#!Python @subflow` | `#!Python delayed(...).compute()` |
Expand Down
43 changes: 42 additions & 1 deletion docs/user/recipes/recipes_intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ print(result)
'volume': 11.761470249999999}
```

### A Simple Mixed-Code Workflow
### A Mixed-Code Workflow

```mermaid
graph LR
Expand Down Expand Up @@ -272,6 +272,47 @@ print(result2)
'volume': 11.761470249999999}
```

### A More Complex Workflow

```mermaid
graph LR
A[Input] --> B(Make Slabs)
B --> C(Slab Relax) --> G(Slab Static) --> K[Output]
B --> D(Slab Relax) --> H(Slab Static) --> K[Output]
B --> E(Slab Relax) --> I(Slab Static) --> K[Output]
B --> F(Slab Relax) --> J(Slab Static) --> K[Output];
```

In this example, we will run a pre-made workflow that generates a set of slabs from a bulk structure and then runs a structure relaxation and static calculation on each slab. We will specifically highlight an example where we want to override the default parameters of one step in the recipe, in this case to tighten the force tolerance for the slab relaxation.

!!! Tip

Unsure what arguments a given function takes? Check out the [API documentation](https://quantum-accelerators.github.io/quacc/reference/quacc/recipes/emt/slabs.html).

```python
from functools import partial
from ase.build import bulk
from quacc.recipes.emt.core import relax_job
from quacc.recipes.emt.slabs import bulk_to_slabs_flow

# Define the Atoms object
atoms = bulk("Cu")

# Define the workflow
custom_relax_job = partial(relax_job, opt_params={"fmax": 1e-4}) # (1)!
result = bulk_to_slabs_flow(atoms, custom_relax_job=custom_relax_job)

# Print the result
print(result)
```

1. We have used a [partial function](https://www.learnpython.org/en/Partial_functions) here, which is a way to create a new function with specific arguments already applied. In other words, `#!Python opt_params={"fmax": 1e-4}` will be set as a keyword argument in the `relax_job` function by default. The same could be achieved, albeit more verbosely, as follows:

```python
def custom_relax_job(*args, **kwargs):
return relax_job(*args, opt_params={"fmax": 1e-4}, **kwargs)
```

## Concluding Comments

At this point, you now have the basic idea of how quacc recipes work!
Expand Down
11 changes: 10 additions & 1 deletion src/quacc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from typing import TYPE_CHECKING

from ase.atoms import Atoms
from ase.io.jsonio import decode, encode

from quacc.settings import QuaccSettings
from quacc.wflow_tools.decorators import Flow, Job, Subflow, flow, job, subflow
Expand All @@ -17,6 +16,8 @@


def atoms_as_dict(s: Atoms) -> dict[str, Any]:
from ase.io.jsonio import encode

# Uses Monty's MSONable spec
# Normally, we would want to this to be a wrapper around atoms.todict() with @module and
# @class key-value pairs inserted. However, atoms.todict()/atoms.fromdict() does not currently
Expand All @@ -25,6 +26,8 @@ def atoms_as_dict(s: Atoms) -> dict[str, Any]:


def atoms_from_dict(d: dict[str, Any]) -> Atoms:
from ase.io.jsonio import decode

# Uses Monty's MSONable spec
# Normally, we would want to have this be a wrapper around atoms.fromdict()
# that just ignores the @module/@class key-value pairs. However, atoms.todict()/atoms.fromdict()
Expand All @@ -41,3 +44,9 @@ def atoms_from_dict(d: dict[str, Any]) -> Atoms:

# Load the settings
SETTINGS = QuaccSettings()

if SETTINGS.DEBUG:
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
12 changes: 6 additions & 6 deletions src/quacc/recipes/common/defects.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from quacc.atoms.defects import make_defects_from_bulk

if TYPE_CHECKING:
from typing import Callable
from typing import Any

from ase.atoms import Atoms

Expand All @@ -19,7 +19,7 @@ def bulk_to_defects_subflow(
atoms: Atoms,
relax_job: Job,
static_job: Job | None = None,
make_defects_fn: Callable = make_defects_from_bulk,
make_defects_kwargs: dict[str, Any] | None = None,
) -> list[dict]:
"""
Workflow consisting of:
Expand All @@ -38,22 +38,22 @@ def bulk_to_defects_subflow(
The relaxation function.
static_job
The static function.
make_defects_fn
The function for generating defects.
make_defects_kwargs
Keyword arguments for [quacc.atoms.defects.make_defects_from_bulk][]
Returns
-------
list[dict]
List of dictionary of results
"""

defects = make_defects_fn(atoms)
defects = make_defects_from_bulk(atoms, **make_defects_kwargs)

results = []
for defect in defects:
result = relax_job(defect)

if static_job:
if static_job is not None:
result = static_job(result["atoms"])

results.append(result)
Expand Down
24 changes: 14 additions & 10 deletions src/quacc/recipes/common/slabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from quacc.atoms.slabs import make_adsorbate_structures, make_slabs_from_bulk

if TYPE_CHECKING:
from typing import Callable
from typing import Any

from ase.atoms import Atoms

Expand All @@ -19,7 +19,7 @@ def bulk_to_slabs_subflow(
atoms: Atoms,
relax_job: Job,
static_job: Job | None = None,
make_slabs_fn: Callable = make_slabs_from_bulk,
make_slabs_kwargs: dict[str, Any] | None = None,
) -> list[dict]:
"""
Workflow consisting of:
Expand All @@ -38,16 +38,18 @@ def bulk_to_slabs_subflow(
The relaxation function.
static_job
The static function.
make_slabs_fn
The function for generating slabs.
make_slabs_kwargs
Additional keyword arguments to pass to
[quacc.atoms.slabs.make_slabs_from_bulk][]
Returns
-------
list[dict]
List of schemas.
"""
make_slabs_kwargs = make_slabs_kwargs or {}

slabs = make_slabs_fn(atoms)
slabs = make_slabs_from_bulk(atoms, **make_slabs_kwargs)

results = []
for slab in slabs:
Expand All @@ -67,7 +69,7 @@ def slab_to_ads_subflow(
adsorbate: Atoms,
relax_job: Job,
static_job: Job | None,
make_ads_fn: Callable = make_adsorbate_structures,
make_ads_kwargs: dict[str, Any] | None = None,
) -> list[dict]:
"""
Workflow consisting of:
Expand All @@ -88,22 +90,24 @@ def slab_to_ads_subflow(
The slab releaxation job.
static_job
The slab static job.
make_ads_fn
The function to generate slab-adsorbate structures.
make_ads_kwargs
Additional keyword arguments to pass to
[quacc.atoms.slabs.make_adsorbate_structures][]
Returns
-------
list[dict]
List of schemas.
"""
make_ads_kwargs = make_ads_kwargs or {}

slabs = make_ads_fn(atoms, adsorbate)
slabs = make_adsorbate_structures(atoms, adsorbate, **make_ads_kwargs)

results = []
for slab in slabs:
result = relax_job(slab)

if static_job:
if static_job is not None:
result = static_job(result["atoms"])

results.append(result)
Expand Down
39 changes: 17 additions & 22 deletions src/quacc/recipes/emt/defects.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""Defect recipes for EMT."""
from __future__ import annotations

from functools import partial
from typing import TYPE_CHECKING

from pymatgen.analysis.defects.generators import VacancyGenerator

from quacc import flow
from quacc.atoms.defects import make_defects_from_bulk
from quacc.recipes.common.defects import bulk_to_defects_subflow
from quacc.recipes.emt.core import relax_job, static_job
from quacc.utils.dicts import merge_dicts

if TYPE_CHECKING:
from typing import Any
Expand All @@ -23,6 +22,7 @@
VoronoiInterstitialGenerator,
)

from quacc import Job
from quacc.schemas._aliases.ase import OptSchema, RunSchema


Expand All @@ -38,10 +38,10 @@ def bulk_to_defects_flow(
| VoronoiInterstitialGenerator
) = VacancyGenerator,
defect_charge: int = 0,
make_defects_kwargs: dict[str, Any] | None = None,
custom_relax_job: Job | None = None,
custom_static_job: Job | None = None,
run_static: bool = True,
defect_relax_kwargs: dict[str, Any] | None = None,
defect_static_kwargs: dict[str, Any] | None = None,
make_defects_kwargs: dict[str, Any] | None = None,
) -> list[RunSchema | OptSchema]:
"""
Workflow consisting of:
Expand All @@ -60,34 +60,29 @@ def bulk_to_defects_flow(
Defect generator
defect_charge
Charge state of the defect
custom_relax_job
Relaxation job, which defaults to [quacc.recipes.emt.core.relax_job][].
custom_static_job
Static job, which defaults to [quacc.recipes.emt.core.static_job][].
make_defects_kwargs
Keyword arguments to pass to
[quacc.atoms.defects.make_defects_from_bulk][]
run_static
Whether to run the static calculation.
defect_relax_kwargs
Additional keyword arguments to pass to [quacc.recipes.emt.core.relax_job][].
defect_static_kwargs
Additional keyword arguments to pass to [quacc.recipes.emt.core.static_job][].
Returns
-------
list[RunSchema | OptSchema]
List of dictionary of results from [quacc.schemas.ase.summarize_run][]
or [quacc.schemas.ase.summarize_opt_run][]
"""
make_defects_kwargs = make_defects_kwargs or {}
defect_relax_kwargs = defect_relax_kwargs or {}
defect_static_kwargs = defect_static_kwargs or {}
make_defects_kwargs = merge_dicts(
make_defects_kwargs, {"defect_gen": defect_gen, "defect_charge": defect_charge}
)

return bulk_to_defects_subflow(
atoms,
partial(relax_job, **defect_relax_kwargs),
static_job=partial(static_job, **defect_static_kwargs) if run_static else None,
make_defects_fn=partial(
make_defects_from_bulk,
defect_gen=defect_gen,
defect_charge=defect_charge,
**make_defects_kwargs,
),
relax_job if custom_relax_job is None else custom_relax_job,
static_job=(static_job if custom_static_job is None else custom_static_job)
if run_static
else None,
make_defects_kwargs=make_defects_kwargs,
)
15 changes: 6 additions & 9 deletions src/quacc/recipes/emt/phonons.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
"""Phonon recipes for EMT"""
from __future__ import annotations

from functools import partial
from typing import TYPE_CHECKING

from quacc import flow
from quacc.recipes.common.phonons import phonon_flow as phonon_flow_
from quacc.recipes.emt.core import static_job

if TYPE_CHECKING:
from typing import Any

from ase.atoms import Atoms
from numpy.typing import ArrayLike

from quacc import Job
from quacc.schemas._aliases.phonons import PhononSchema


Expand All @@ -26,7 +24,7 @@ def phonon_flow(
t_step: float = 10,
t_min: float = 0,
t_max: float = 1000,
static_job_kwargs: dict[str, Any] | None = None,
custom_static_job: Job | None = None,
) -> PhononSchema:
"""
Carry out a phonon calculation.
Expand All @@ -47,20 +45,19 @@ def phonon_flow(
Min temperature (K).
t_max
Max temperature (K).
static_job_kwargs
Additional keyword arguments for [quacc.recipes.emt.core.static_job][]
for the force calculations.
static_job
The static job for the force calculations, which defaults
to [quacc.recipes.emt.core.static_job][]
Returns
-------
PhononSchema
Dictionary of results from [quacc.schemas.phonons.summarize_phonopy][]
"""
static_job_kwargs = static_job_kwargs or {}

return phonon_flow_(
atoms,
partial(static_job, **static_job_kwargs),
static_job if custom_static_job is None else custom_static_job,
supercell_matrix=supercell_matrix,
atom_disp=atom_disp,
t_step=t_step,
Expand Down
Loading

0 comments on commit 9bcb488

Please sign in to comment.