Skip to content

Commit

Permalink
Archngv (#40)
Browse files Browse the repository at this point in the history
Merge ArchNGV branch.
- 'load_graph_archngv.py' loads archngv graphs and saves them in pickle binary format
- replace m2r2 with sphinx_mdinclude
- install joblib and archngv in the venv
- setup.sh and sbatch files can be run from any directory

---------

Co-authored-by: Christos Kotsalos <[email protected]>
Co-authored-by: StephLisa <[email protected]>
  • Loading branch information
3 people authored Sep 16, 2024
1 parent 2dd34a4 commit 546b801
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*py~
*~
*.egg-info
examples/data/graphs_folder/dumped_graph.bin

# Mac related stuff
.DS_Store
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ The code can be run using

python3 compute_static_flow_pressure.py

### Load Archngv graph

An archngv graph can be loaded and converted to a pickle binary format, using the script `load_graph_archngv.py` inside the folder `examples`.
Run the script as

python3 load_graph_archngv.py --filename_ngv "path_to_ngv_circuit" --output_graph "output_graph_name.bin"

### Sonata reports

Structure of the reports:
Expand Down
2 changes: 1 addition & 1 deletion astrovascpy/bloodflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

# PETSc is compiled with complex number support
# -> many warnings from/to PETSc to/from NumPy/SciPy
warnings.filterwarnings(action="ignore", category=np.ComplexWarning)
warnings.filterwarnings(action="ignore", category=np.exceptions.ComplexWarning)

print = partial(print, flush=True)

Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"sphinx.ext.napoleon",
"sphinx.ext.todo",
"sphinx_click",
"m2r2",
"sphinx_mdinclude",
]

todo_include_todos = True
Expand Down
17 changes: 12 additions & 5 deletions examples/job_script.sbatch
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
#SBATCH --mem=0
#SBATCH --output="%x-%j.log"

pushd ..
source setup.sh
popd
JOB_SCRIPT=$(scontrol show job ${SLURM_JOB_ID} | awk -F= '/Command=/{print $2}')
JOB_SCRIPT_DIR=$(dirname ${JOB_SCRIPT})

SETUP_SCRIPT="${JOB_SCRIPT_DIR}/../setup.sh"
if [[ ! -f ${SETUP_SCRIPT} ]]; then
>&2 echo "[ERROR] The 'setup.sh' script could not be found!"
exit -1
fi

source ${SETUP_SCRIPT}

echo
echo "### Simulation Start"
echo
# time srun dplace python compute_static_flow_pressure.py
time srun dplace python simulate_OU_process.py
# time srun dplace python "${JOB_SCRIPT_DIR}/compute_static_flow_pressure.py"
time srun dplace python "${JOB_SCRIPT_DIR}/simulate_OU_process.py"
133 changes: 133 additions & 0 deletions examples/load_graph_archngv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import argparse
import multiprocessing
import pickle
from functools import partial
from pathlib import Path

import numpy as np
import pandas as pd
import psutil
from archngv import NGVCircuit
from joblib import Parallel, delayed, parallel_config
from tqdm import tqdm

from astrovascpy import bloodflow
from astrovascpy.exceptions import BloodFlowError
from astrovascpy.utils import Graph


def load_graph_archngv_parallel(
filename, n_workers, n_astro=None, parallelization_backend="multiprocessing"
):
"""Load a vasculature from an NGV circuit.
Args:
filename (str): vasculature dataset.
n_workers (int): number of processes to set endfeet on edges.
n_astro (int): for testing, if not None, it will reduce the number of astrocytes used
parallelization_backend (str): Either multiprocessing or joblib
Returns:
vasculatureAPI.PointVasculature: graph containing point vasculature skeleton.
Raises:
BloodFlowError: if the file object identified by filename is not in h5 format.
"""
if not Path(filename).exists():
raise BloodFlowError("File provided does not exist")
circuit = NGVCircuit(filename)
pv = circuit.vasculature.point_graph
graph = Graph.from_point_vasculature(pv)
graph.edge_properties.index = pd.MultiIndex.from_frame(
graph.edge_properties.loc[:, ["section_id", "segment_id"]]
)
gv_conn = circuit.gliovascular_connectome
worker = partial(bloodflow.get_closest_edges, graph=graph)

args = (
(
gv_conn.vasculature_sections_segments(endfoot_id).vasculature_section_id.values[0],
gv_conn.vasculature_sections_segments(endfoot_id).vasculature_segment_id.values[0],
gv_conn.get(endfoot_id, ["endfoot_compartment_length"]).values[0],
)
for astro_id in np.arange(n_astro or circuit.astrocytes.size)
for endfoot_id in gv_conn.astrocyte_endfeet(astro_id)
)
endfoot_ids = [
endfoot_id
for astro_id in np.arange(n_astro or circuit.astrocytes.size)
for endfoot_id in gv_conn.astrocyte_endfeet(astro_id)
]

if parallelization_backend == "multiprocessing":
with multiprocessing.Pool(n_workers) as pool:
for result_ids, result_endfeet in zip(
tqdm(
pool.imap(worker, args, chunksize=max(1, int(len(endfoot_ids) / n_workers))),
total=len(endfoot_ids),
),
endfoot_ids,
):
# Only the main process executes this part, i.e. as soon as it receives the parallelly generated data
graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = (
result_endfeet
)

elif parallelization_backend == "joblib":
with parallel_config(
backend="loky", prefer="processes", n_jobs=n_workers, inner_max_num_threads=1
):
parallel = Parallel(return_as="generator", batch_size="auto")
parallelized_region = parallel(
delayed(worker)(arg) for arg in tqdm(args, total=len(endfoot_ids))
)

for result_ids, result_endfeet in zip(parallelized_region, endfoot_ids):
# Only the main process executes this part, i.e. as soon as it receives the parallelly generated data
graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = (
result_endfeet
)

else:
raise BloodFlowError(
f"parallelization_backend={parallelization_backend} invalid option. Use 'joblib' or 'multiprocessing'."
)

return graph


def main():
global print
print = partial(print, flush=True)

parser = argparse.ArgumentParser(description="File paths for NGVCircuits and output graph.")
parser.add_argument(
"--filename_ngv", type=str, required=True, help="Path to the NGV circuits file"
)
parser.add_argument(
"--output_graph", type=str, required=True, help="Path to the output graph file"
)
args = parser.parse_args()

filename_ngv = args.filename_ngv
output_graph = args.output_graph

n_cores = psutil.cpu_count(logical=False)
print(f"number of physical CPU cores = {n_cores}")

print(f"NGV Circuits file: {filename_ngv}")
print("loading circuit : start")
graph = load_graph_archngv_parallel(
filename_ngv, n_workers=n_cores
) # n_astro=50 for debugging (smaller processing needs)
print("loading circuit : finish")

print("pickle graph : start")
filehandler = open(output_graph, "wb")
pickle.dump(graph, filehandler)
print("pickle graph : finish")
print(f"Graph file: {output_graph}")


if __name__ == "__main__":
main()
36 changes: 36 additions & 0 deletions examples/load_graph_archngv.sbatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash

#SBATCH --job-name="archngv"
#SBATCH --nodes=1

#SBATCH --account=proj16
#SBATCH --partition=prod
#SBATCH --constraint=cpu
#SBATCH --time=00:30:00

#SBATCH --cpus-per-task=2
#SBATCH --exclusive
#SBATCH --mem=0
#SBATCH --output="%x-%j.log"

JOB_SCRIPT=$(scontrol show job ${SLURM_JOB_ID} | awk -F= '/Command=/{print $2}')
JOB_SCRIPT_DIR=$(dirname ${JOB_SCRIPT})

SETUP_SCRIPT="${JOB_SCRIPT_DIR}/../setup.sh"
if [[ ! -f ${SETUP_SCRIPT} ]]; then
>&2 echo "[ERROR] The 'setup.sh' script could not be found!"
exit 2
fi

source ${SETUP_SCRIPT}

FILENAME_NGV="/gpfs/bbp.cscs.ch/project/proj137/NGVCircuits/rat_O1"

GRAPH_PATH="./data/graphs_folder/dumped_graph.bin"

echo
echo "### Loading graph"
echo
# It is imperative to use srun and dplace, otherwise the Python processes
# do not work properly (possible deadlocks and/or performance degradation)
time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --filename_ngv ${FILENAME_NGV} --output_graph ${GRAPH_PATH}
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
]

doc_reqs = [
"m2r2",
"sphinx-mdinclude",
"sphinx",
"sphinx-bluebrain-theme",
"sphinx-click",
Expand Down
14 changes: 8 additions & 6 deletions setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ echo
echo "### setup/set env started"
echo

SETUP_DIR=$(dirname ${BASH_SOURCE[0]})

if command -v module &> /dev/null
then
module purge
Expand All @@ -26,7 +28,7 @@ else
conda install -y pip

conda install -y -c conda-forge mpi mpi4py petsc petsc4py
"$CONDA_PREFIX/bin/pip" install tox
"$CONDA_PREFIX/bin/pip" install tox joblib archngv
# If complex number support is needed
#conda install -y -c conda-forge mpi mpi4py "petsc=*=*complex*" "petsc4py=*=*complex*"
fi
Expand Down Expand Up @@ -58,15 +60,15 @@ then
echo "python-venv already set"
source python-venv/bin/activate
else
python3 -m venv --prompt astrovascpy python-venv
source python-venv/bin/activate
python3 -m venv --prompt astrovascpy ${SETUP_DIR}/python-venv
source ${SETUP_DIR}/python-venv/bin/activate
python3 -m pip install --upgrade pip
fi
pip3 install -e .
pip3 install tox
pip3 install -e ${SETUP_DIR}
pip3 install tox joblib archngv
else
conda_bin=`conda info | grep "active env location" | grep -o "/.*"`/bin
$conda_bin/pip install -e .
$conda_bin/pip install -e ${SETUP_DIR}
fi

# Backend solver/library for the linear systems
Expand Down

0 comments on commit 546b801

Please sign in to comment.