Skip to content

Commit

Permalink
Only skip optional filters on certain exceptions.
Browse files Browse the repository at this point in the history
We currently skip filters that are not required when they raise an
exception during instantiation.  This sometimes masks underlying issues
with the configuration or data - this changes the behavior to only skip
filters if certain exceptions are raised.
  • Loading branch information
matz-e committed Aug 12, 2024
1 parent cd45fc5 commit f568e6a
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/functionalizer/filters/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Module with filters to process edge data."""

from .definitions import DatasetOperation, load # NOQA
from .definitions import DatasetOperation, FilterInitializationError, load # NOQA
from .helpers import enable_debug # NOQA

from . import helpers # NOQA
6 changes: 5 additions & 1 deletion src/functionalizer/filters/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def load(*dirnames: str) -> None:
importlib.import_module(modulename)


class FilterInitializationError(RuntimeError):
pass


# ---------------------------------------------------
# Dataset operations
# ---------------------------------------------------
Expand Down Expand Up @@ -108,7 +112,7 @@ def initialize(mcs, names, *args):
)
try:
filters.append(fcls(*args))
except Exception as e:
except FilterInitializationError as e:
if fcls._required:
logger.fatal("Could not instantiate %s", fcls.__name__)
raise
Expand Down
7 changes: 5 additions & 2 deletions src/functionalizer/filters/implementations/spine_length.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import sparkmanager as sm
from pyspark.sql import functions as F

from functionalizer.filters import DatasetOperation
from functionalizer.filters import DatasetOperation, FilterInitializationError
from functionalizer.utils import get_logger

from . import add_bin_column, add_random_column
Expand All @@ -29,9 +29,12 @@ def __init__(self, recipe, source, target):
recipe to obtain the desired distribution of spine lengths to match.
"""
super().__init__(recipe, source, target)
self.seed = recipe.seeds.synapseSeed
self.seed = recipe.get("seed")
logger.info("Using seed %d for spine length adjustment", self.seed)

if not recipe.get("spine_lengths"):
raise FilterInitializationError("'synapse_reposition' not in recipe")

self.binnings = sorted(recipe.spine_lengths, key=attrgetter("length"))

def apply(self, circuit):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pandas as pd
from pyspark.sql import functions as F

from functionalizer.filters import DatasetOperation
from functionalizer.filters import DatasetOperation, FilterInitializationError


class SpineMorphologies(DatasetOperation):
Expand Down Expand Up @@ -53,6 +53,10 @@ class SpineMorphologies(DatasetOperation):
def __init__(self, recipe, source, target):
"""Initializes the filter using the morphology database."""
super().__init__(recipe, source, target)

if not target.spine_morphology_path:
raise FilterInitializationError("target nodes do not define 'spine_morphologies_dir'")

self._morphologies, self._filter = _create_spine_morphology_udf(
target.spine_morphology_path
)
Expand Down Expand Up @@ -105,6 +109,7 @@ def _read_spine_morphology_attributes(spine_morpho_path: Path):
Returns a dataframe with spine morphology properties.
"""
files = sorted(spine_morpho_path.glob("*.h5"))
assert len(files) > 0, "no spine morphologies present"
ids = np.ndarray((0,), dtype=int)
lengths = np.ndarray((0,), dtype=float)
morphologies = np.ndarray((0,), dtype=int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas as pd
import sparkmanager as sm

from functionalizer.filters import DatasetOperation
from functionalizer.filters import DatasetOperation, FilterInitializationError


class SynapseReposition(DatasetOperation):
Expand All @@ -20,6 +20,8 @@ class SynapseReposition(DatasetOperation):
def __init__(self, recipe, source, target):
"""Initialize the filter, extracting the reposition part of the recipe."""
super().__init__(recipe, source, target)
if not recipe.get("synapse_reposition"):
raise FilterInitializationError("'synapse_reposition' not in recipe")
self.columns, self.reposition = recipe.as_matrix("synapse_reposition")
self.unset_value = len(recipe.get("synapse_reposition"))

Expand Down
31 changes: 31 additions & 0 deletions tests/circuit_1000n/circuit_config_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"manifest": {
"$BASE_DIR": "./",
"$COMPONENT_DIR": "$BASE_DIR",
"$NETWORK_DIR": "$BASE_DIR"
},
"components": {
"biophysical_neuron_models_dir": "no comprendo",
"provenance": {
"bioname_dir": "$COMPONENT_DIR/bioname"
}
},
"networks": {
"nodes": [
{
"nodes_file": "$NETWORK_DIR/nodes.h5",
"nodes_types_file": null,
"populations": {
"All": {
"morphologies_dir": null,
"alternate_morphologies": {
"h5v1": "$BASE_DIR/morphologies/h5"
},
"spine_morphologies_dir": "no comprendo"
}
}
}
],
"edges": []
}
}
17 changes: 14 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
[str(DATADIR / "touches" / "*.parquet")],
)

DEFAULT_ARGS = {
"recipe_file": DATADIR / "recipe.json",
"circuit_config": CIRCUIT_CONFIG,
"source": None,
"source_nodeset": None,
"target": None,
"target_nodeset": None,
"edges": [str(DATADIR / "touches" / "*.parquet")],
}

filters.load()


Expand All @@ -49,10 +59,11 @@ def circuit_config():
return CIRCUIT_CONFIG


@pytest.fixture(scope="session", name="fz")
def fz_fixture(tmp_path_factory):
@pytest.fixture(scope="class", name="fz", params=[{}])
def fz_fixture(request, tmp_path_factory):
tmpdir = tmp_path_factory.mktemp("filters")
return create_functionalizer(tmpdir, RM.FUNCTIONAL.value).init_data(*ARGS)
kwargs = DEFAULT_ARGS | request.param
return create_functionalizer(tmpdir, RM.FUNCTIONAL.value).init_data(**kwargs)


@pytest.fixture(scope="session", name="gj")
Expand Down
7 changes: 3 additions & 4 deletions tests/test_data_input_sonata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy
import pytest
import sparkmanager as sm
from conftest import ARGS, DATADIR, create_functionalizer
from conftest import DEFAULT_ARGS, DATADIR, create_functionalizer
from functionalizer.io.circuit import BRANCH_COLUMNS, EdgeData
from functionalizer.utils.conf import Configuration

Expand Down Expand Up @@ -54,9 +54,8 @@ def test_branch_shift(edges_w_branch_type):
@pytest.mark.slow
def test_sonata_properties(tmp_path_factory):
tmpdir = tmp_path_factory.mktemp("sonata_properties")
fz = create_functionalizer(tmpdir, ["SynapseProperties"]).init_data(
*ARGS[:-1], edges=(os.path.join(DATADIR, "edges.h5"), "default")
)
kwargs = DEFAULT_ARGS | {"edges": (os.path.join(DATADIR, "edges.h5"), "default")}
fz = create_functionalizer(tmpdir, ["SynapseProperties"]).init_data(**kwargs)
fz.process_filters()

assert "delay" in fz.circuit.df.columns
Expand Down
34 changes: 28 additions & 6 deletions tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pyspark.sql.functions as F
import pytest
import sparkmanager as sm
from conftest import ARGS, DATADIR, create_functionalizer
from conftest import DEFAULT_ARGS, DATADIR, create_functionalizer
from functionalizer.utils.spark import cache_broadcast_single_part

NUM_AFTER_DISTANCE = 226301
Expand Down Expand Up @@ -33,9 +33,8 @@ def layer_counts(circuit):
return dict(zip(res["mtype"], res["count"]))

tmpdir = tmp_path_factory.mktemp("fixed_probabilities")
fz = create_functionalizer(tmpdir, ["ReduceAndCut"]).init_data(
DATADIR / "recipe_fixed.json", *ARGS[1:]
)
kwargs = DEFAULT_ARGS | {"recipe_file": DATADIR / "recipe_fixed.json"}
fz = create_functionalizer(tmpdir, ["ReduceAndCut"]).init_data(**kwargs)

before = layer_counts(fz.circuit)
fz.process_filters()
Expand All @@ -48,6 +47,29 @@ def layer_counts(circuit):
assert "L6" not in after


class TestFilterInitialization:
"""Test initialization of optional filters"""

def test_spine_morphos(self, fz):
fz.process_filters(filters=["SpineMorphologies"])


class TestBogusFilterInitialization:
"""Test initialization of optional filters"""

@pytest.mark.parametrize("fz", [{"circuit_config": DATADIR / "circuit_config_invalid.json"}], indirect=True)
def test_spine_morphos(self, fz):
with pytest.raises(AssertionError):
fz.process_filters(filters=["SpineMorphologies"])

class TestFilterInitialization:
"""Test initialization of optional filters"""

def test_spine_morphos(self, fz):
fz.process_filters(filters=["SpineMorphologies"])
fz.process_filters(filters=["SpineMorphologies"])


@pytest.mark.slow
class TestFilters(object):
"""Sequential tests of filters."""
Expand All @@ -71,7 +93,7 @@ def test_reduce_and_cut(self, fz):
def test_resume(self, fz, tmp_path_factory):
"""Make sure that resuming "works" """
tmpdir = tmp_path_factory.mktemp("filters")
fz2 = create_functionalizer(tmpdir).init_data(*ARGS)
fz2 = create_functionalizer(tmpdir).init_data(**DEFAULT_ARGS)
fz2.process_filters()
original = fz.circuit.df.count()
count = fz2.circuit.df.count()
Expand All @@ -80,7 +102,7 @@ def test_resume(self, fz, tmp_path_factory):
def test_overwrite(self, fz, tmp_path_factory):
"""Test that overwriting checkpointed data works"""
tmpdir = tmp_path_factory.mktemp("filters")
fz2 = create_functionalizer(tmpdir).init_data(*ARGS)
fz2 = create_functionalizer(tmpdir).init_data(**DEFAULT_ARGS)
fz2.process_filters(overwrite=True)
original = fz.circuit.df.count()
count = fz2.circuit.df.count()
Expand Down

0 comments on commit f568e6a

Please sign in to comment.