Skip to content

Commit

Permalink
Changes to support extracting the results from the event
Browse files Browse the repository at this point in the history
  • Loading branch information
rtuck99 committed Oct 15, 2024
1 parent 82b684b commit 65bf6a3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 34 deletions.
68 changes: 47 additions & 21 deletions src/dodal/devices/zocalo/zocalo_results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from collections.abc import Generator, Sequence
from enum import Enum
from inspect import get_annotations
from queue import Empty, Queue
from typing import Any, TypedDict

Expand Down Expand Up @@ -130,14 +131,14 @@ def __init__(
self.transport: CommonTransport | None = None
self.use_cpu_and_gpu = use_cpu_and_gpu

self.centres_of_mass, self._com_setter = soft_signal_r_and_setter(
NDArray[np.uint64], name="centres_of_mass"
self.centre_of_mass, self._com_setter = soft_signal_r_and_setter(
NDArray[np.uint64], name="centre_of_mass"
)
self.bbox_sizes, self._bbox_setter = soft_signal_r_and_setter(
NDArray[np.uint64], "bbox_sizes", self.name
self.bounding_box, self._bounding_box_setter = soft_signal_r_and_setter(
NDArray[np.uint64], name="bounding_box"
)
self.max_voxel, self._max_voxel_setter = soft_signal_r_and_setter(
NDArray[np.uint64], int, name="max_voxel"
NDArray[np.uint64], name="max_voxel"
)
self.max_count, self._max_count_setter = soft_signal_r_and_setter(
NDArray[np.uint64], name="max_count"
Expand All @@ -160,8 +161,8 @@ def __init__(
self.max_count,
self.n_voxels,
self.total_count,
self.centres_of_mass,
self.bbox_sizes,
self.centre_of_mass,
self.bounding_box,
self.ispyb_dcid,
self.ispyb_dcgid,
],
Expand All @@ -171,13 +172,12 @@ def __init__(

async def _put_results(self, results: Sequence[XrcResult], recipe_parameters):
centres_of_mass = np.array([r["centre_of_mass"] for r in results])
bbox_sizes = np.array([bbox_size(r) for r in results])
self._com_setter(centres_of_mass)
self._bbox_setter(bbox_sizes)
self._max_voxel_setter([r["max_voxel"] for r in results])
self._max_count_setter([r["max_count"] for r in results])
self._n_voxels_setter([r["n_voxels"] for r in results])
self._total_count_setter([r["total_count"] for r in results])
self._bounding_box_setter(np.array([r["bounding_box"] for r in results]))
self._max_voxel_setter(np.array([r["max_voxel"] for r in results]))
self._max_count_setter(np.array([r["max_count"] for r in results]))
self._n_voxels_setter(np.array([r["n_voxels"] for r in results]))
self._total_count_setter(np.array([r["total_count"] for r in results]))
self._ispyb_dcid_setter(recipe_parameters["dcid"])
self._ispyb_dcgid_setter(recipe_parameters["dcgid"])

Expand Down Expand Up @@ -348,22 +348,48 @@ def get_full_processing_results(
"""A plan that will return the raw zocalo results, ranked in descending order according to the sort key.
Returns empty list in the event no results found."""
LOGGER.info("Retrieving raw zocalo processing results")
com = yield from bps.rd(zocalo.centres_of_mass, default_value=[]) # type: ignore
com = yield from bps.rd(zocalo.centre_of_mass, default_value=[]) # type: ignore
max_voxel = yield from bps.rd(zocalo.max_voxel, default_value=[]) # type: ignore
max_count = yield from bps.rd(zocalo.max_count, default_value=[]) # type: ignore
n_voxels = yield from bps.rd(zocalo.n_voxels, default_value=[]) # type: ignore
total_count = yield from bps.rd(zocalo.total_count, default_value=[]) # type: ignore
bounding_box = yield from bps.rd(zocalo.bounding_box, default_value=[]) # type: ignore
return [
XrcResult(
centre_of_mass=com,
max_voxel=mv,
max_count=mc,
n_voxels=n,
total_count=tc,
bounding_box=bb,
_corrected_xrc_result(
XrcResult(
centre_of_mass=com.tolist(),
max_voxel=mv.tolist(),
max_count=int(mc),
n_voxels=int(n),
total_count=int(tc),
bounding_box=bb.tolist(),
)
)
for com, mv, mc, n, tc, bb in zip(
com, max_voxel, max_count, n_voxels, total_count, bounding_box, strict=True
)
]


def get_processing_results_from_event(
device_name: str, doc: dict
) -> Sequence[XrcResult]:
"""
Decode an event document into the corresponding x-ray centring results
Args:
doc A bluesky event document containing the signals read from the ZocaloResults
device_name The device name prefix to prepend to the document keys
Returns:
The list of XrcResults decoded from the event document
"""
results_keys = get_annotations(XrcResult).keys()
results_dict = {k: doc["data"][f"{device_name}-{k}"] for k in results_keys}
results_values = [results_dict[k].tolist() for k in results_keys]

def create_result(*argv):
kwargs = dict(zip(results_keys, argv, strict=False))
return XrcResult(**kwargs)

return list(map(create_result, *results_values))
35 changes: 22 additions & 13 deletions tests/devices/unit_tests/test_zocalo_results.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from asyncio import get_running_loop
from functools import partial
from queue import Empty
from unittest.mock import AsyncMock, MagicMock, call, patch

import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
import numpy as np
import pytest
from bluesky.run_engine import RunEngine
Expand All @@ -17,6 +19,7 @@
ZocaloResults,
ZocaloSource,
get_full_processing_results,
get_processing_results_from_event,
)

TEST_RESULTS: list[XrcResult] = [
Expand Down Expand Up @@ -116,19 +119,6 @@ def plan():
return device


async def test_put_result_read_results(
mocked_zocalo_device,
RE,
) -> None:
zocalo_device = await mocked_zocalo_device([], run_setup=True)
await zocalo_device._put_results(TEST_RESULTS, test_recipe_parameters)
reading = await zocalo_device.read()
centres: list[XrcResult] = reading["zocalo-centres_of_mass"]["value"]
bboxes: list[XrcResult] = reading["zocalo-bbox_sizes"]["value"]
assert np.all(centres == np.array([[1, 2, 3], [2, 3, 4], [4, 5, 6]]))
assert np.all(bboxes[0] == [2, 2, 1])


async def test_trigger_and_wait_puts_results(
mocked_zocalo_device,
RE,
Expand Down Expand Up @@ -169,6 +159,25 @@ def plan():
RE(plan())


async def test_get_processing_results_from_event(mocked_zocalo_device, RE) -> None:
zocalo_device: ZocaloResults = await mocked_zocalo_device(
TEST_RESULTS, run_setup=False
)

xrc_results_fut = get_running_loop().create_future()

def handle_zocalo_result(name: str, doc: dict):
xrc_results_fut.set_result(get_processing_results_from_event("zocalo", doc))

@bpp.subs_decorator({"event": handle_zocalo_result})
@bpp.run_decorator()
def plan():
yield from bps.trigger_and_read([zocalo_device])

RE(plan())
assert xrc_results_fut.result() == TEST_RESULTS


@patch(
"dodal.devices.zocalo.zocalo_results.workflows.recipe.wrap_subscribe", autospec=True
)
Expand Down

0 comments on commit 65bf6a3

Please sign in to comment.