Skip to content

Commit

Permalink
Merge pull request #343 from bluesky/337-add-basemodel-support-1
Browse files Browse the repository at this point in the history
Convert from `BaseModel` to `jsonschema` + `TypedDict`
  • Loading branch information
evalott100 authored Jan 15, 2025
2 parents acd40a9 + 895f19d commit a8d4575
Show file tree
Hide file tree
Showing 43 changed files with 2,564 additions and 1,494 deletions.
2 changes: 1 addition & 1 deletion docs/explanations/schema-generation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ After changing any of the documents it's necessary to regenerate the schemas. Th

.. code-block:: bash
regenerate-schema
python -m event_model.generate
which is a python environment script in a dev install of event-model.

Expand Down
6 changes: 2 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@ dev = [
"numpydoc",

# For schema generation.
"pydantic>=2.6",
"pydantic<3",
"datamodel-code-generator",
]

[project.scripts]
regenerate-schema = "event_model.documents.generate.__main__:main"

[project.urls]
GitHub = "https://github.com/bluesky/event-model"

Expand Down
40 changes: 24 additions & 16 deletions src/event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,16 @@
)
from .documents.event_page import EventPage, PartialEventPage
from .documents.resource import PartialResource, Resource
from .documents.run_start import Calculation, Hints, Projection, Projections, RunStart
from .documents.run_start import (
CalculatedEventProjection,
Calculation,
ConfigurationProjection,
Hints,
LinkedEventProjection,
Projections,
RunStart,
StaticProjection,
)
from .documents.run_stop import RunStop
from .documents.stream_datum import StreamDatum, StreamRange
from .documents.stream_resource import StreamResource
Expand Down Expand Up @@ -78,7 +87,10 @@
"Resource",
"Calculation",
"Hints",
"Projection",
"LinkedEventProjection",
"StaticProjection",
"CalculatedEventProjection",
"ConfigurationProjection",
"Projections",
"RunStart",
"RunStop",
Expand Down Expand Up @@ -350,8 +362,8 @@ def __call__(
else:
raise EventModelValueError(
"SingleRunDocumentRouter associated with start document "
f'{self._start_doc["uid"]} '
f'received a second start document with uid {doc["uid"]}'
f"{self._start_doc['uid']} "
f"received a second start document with uid {doc['uid']}"
)
elif name == "descriptor":
assert isinstance(self._start_doc, dict)
Expand All @@ -360,9 +372,9 @@ def __call__(
else:
raise EventModelValueError(
"SingleRunDocumentRouter associated with start document "
f'{self._start_doc["uid"]} '
f'received a descriptor {doc["uid"]} associated with '
f'start document {doc["run_start"]}'
f"{self._start_doc['uid']} "
f"received a descriptor {doc['uid']} associated with "
f"start document {doc['run_start']}"
)
# Defer to superclass for dispatch/processing.
return super().__call__(name, doc, validate=validate)
Expand Down Expand Up @@ -403,7 +415,7 @@ def get_descriptor(self, doc: dict) -> EventDescriptor:
elif doc["descriptor"] not in self._descriptors:
raise EventModelValueError(
"SingleRunDocumentRouter has not processed a descriptor with "
f'uid {doc["descriptor"]}'
f"uid {doc['descriptor']}"
)

return self._descriptors[doc["descriptor"]]
Expand Down Expand Up @@ -1066,9 +1078,7 @@ def get_handler(self, resource: Resource) -> Any:
f"mapped from {original_root} to {root} by root_map."
)
else:
msg += (
f"Its 'root' field {original_root} was " f"*not* modified by root_map."
)
msg += f"Its 'root' field {original_root} was *not* modified by root_map."
error_to_raise = EventModelError(msg)
handler = _attempt_with_retries(
func=handler_class,
Expand Down Expand Up @@ -1554,8 +1564,7 @@ def start(self, start_doc: RunStart) -> None:
if uid in self._start_to_start_doc:
if self._start_to_start_doc[uid] == start_doc:
raise ValueError(
"RunRouter received the same 'start' document twice:\n"
"{start_doc!r}"
"RunRouter received the same 'start' document twice:\n{start_doc!r}"
)
else:
raise ValueError(
Expand Down Expand Up @@ -1821,9 +1830,8 @@ class MismatchedDataKeys(InvalidData):
DocumentNames.resource: "schemas/resource.json",
DocumentNames.stream_datum: "schemas/stream_datum.json",
DocumentNames.stream_resource: "schemas/stream_resource.json",
# DEPRECATED:
DocumentNames.bulk_events: "schemas/bulk_events.json",
DocumentNames.bulk_datum: "schemas/bulk_datum.json",
DocumentNames.bulk_events: "schemas/bulk_events.json",
}
schemas = {}
for name, filename in SCHEMA_NAMES.items():
Expand Down Expand Up @@ -2155,7 +2163,7 @@ def __call__(
) -> RunStop:
if self.poison_pill:
raise EventModelError(
"Already composed a RunStop document for run " "{!r}.".format(
"Already composed a RunStop document for run {!r}.".format(
self.start["uid"]
)
)
Expand Down
61 changes: 61 additions & 0 deletions src/event_model/basemodels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Tuple, Type, Union

from event_model.basemodels.datum import Datum
from event_model.basemodels.datum_page import DatumPage
from event_model.basemodels.event import Event
from event_model.basemodels.event_descriptor import (
Dtype,
EventDescriptor,
Limits,
LimitsRange,
)
from event_model.basemodels.event_page import EventPage
from event_model.basemodels.resource import Resource
from event_model.basemodels.run_start import RunStart
from event_model.basemodels.run_stop import RunStop
from event_model.basemodels.stream_datum import StreamDatum
from event_model.basemodels.stream_resource import StreamResource

DocumentType = Union[
Type[Datum],
Type[DatumPage],
Type[Event],
Type[EventDescriptor],
Type[EventPage],
Type[Resource],
Type[RunStart],
Type[RunStop],
Type[StreamDatum],
Type[StreamResource],
]

ALL_BASEMODELS: Tuple[DocumentType, ...] = (
Datum,
DatumPage,
Event,
EventDescriptor,
EventPage,
Resource,
RunStart,
RunStop,
StreamDatum,
StreamResource,
)


__all__ = [
"Datum",
"DatumPage",
"Dtype",
"Event",
"EventDescriptor",
"EventPage",
"Limits",
"LimitsRange",
"Resource",
"RunStart",
"RunStop",
"StreamDatum",
"StreamResource",
"DocumentType",
]
32 changes: 32 additions & 0 deletions src/event_model/basemodels/datum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Any, Dict

from pydantic import (
BaseModel,
ConfigDict,
Field,
)
from typing_extensions import Annotated


class Datum(BaseModel):
"""Document to reference a quanta of externally-stored data"""

model_config = ConfigDict(extra="forbid")

datum_id: Annotated[
str,
Field(
description="Globally unique identifier for this Datum (akin to 'uid' "
"for other Document types), typically formatted as '<resource>/<integer>'"
),
]
datum_kwargs: Annotated[
Dict[str, Any],
Field(
description="Arguments to pass to the Handler to "
"retrieve one quanta of data",
),
]
resource: Annotated[
str, Field(description="The UID of the Resource to which this Datum belongs")
]
35 changes: 35 additions & 0 deletions src/event_model/basemodels/datum_page.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Any, Dict, List

from pydantic import BaseModel, ConfigDict, Field, RootModel
from typing_extensions import Annotated


class DataFrameForDatumPage(RootModel):
root: List[str] = Field(alias="Dataframe")


class DatumPage(BaseModel):
"""Page of documents to reference a quanta of externally-stored data"""

model_config = ConfigDict(extra="forbid")

datum_id: Annotated[
DataFrameForDatumPage,
Field(
description="Array unique identifiers for each Datum (akin to 'uid' for "
"other Document types), typically formatted as '<resource>/<integer>'"
),
]
datum_kwargs: Annotated[
Dict[str, List[Any]],
Field(
description="Array of arguments to pass to the Handler to "
"retrieve one quanta of data"
),
]
resource: Annotated[
str,
Field(
description="The UID of the Resource to which all Datums in the page belong"
),
]
48 changes: 48 additions & 0 deletions src/event_model/basemodels/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Any, Dict, Union

from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import Annotated


class PartialEvent(BaseModel):
model_config = ConfigDict(extra="forbid")

data: Annotated[Dict[str, Any], Field(description="The actual measurement data")]
filled: Annotated[
Dict[str, Union[bool, str]],
Field(
default_factory=dict,
description="Mapping each of the keys of externally-stored data to the "
"boolean False, indicating that the data has not been loaded, or to "
"foreign keys (moved here from 'data' when the data was loaded)",
),
]
time: Annotated[
float,
Field(
description="The event time. This maybe different than the timestamps on "
"each of the data entries.",
),
]
timestamps: Annotated[
Dict[str, Any],
Field(description="The timestamps of the individual measurement data"),
]


class Event(PartialEvent):
"""Document to record a quanta of collected data"""

model_config = ConfigDict(extra="forbid")

descriptor: Annotated[
str, Field(description="UID of the EventDescriptor to which this Event belongs")
]
seq_num: Annotated[
int,
Field(
description="Sequence number to identify the location of this Event in the "
"Event stream",
),
]
uid: Annotated[str, Field(description="Globally unique identifier for this Event")]
Loading

0 comments on commit a8d4575

Please sign in to comment.