Skip to content

Commit

Permalink
Apply proper annotations, fix tests, fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
corva-alex-s committed Dec 1, 2023
1 parent 2706455 commit 4ac38c9
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 31 deletions.
2 changes: 1 addition & 1 deletion docs/modules/ROOT/examples/merging/tutorial001.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
def app(event: StreamTimeEvent, api: Api, cache: Cache):
# since we passed merge_events=True all 3 incoming events
# and their records will be merged into a single event with 9 records
assert len(event.records) != 9 # this will not fail
assert len(event.records) == 9 # this will not fail
return event
34 changes: 20 additions & 14 deletions src/corva/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import pydantic
import redis
from typing_extensions import assert_never

from corva.api import Api
from corva.configuration import SETTINGS
Expand Down Expand Up @@ -89,7 +88,13 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]:
)
data_transformation_type = raw_custom_event_type or raw_event_type
if merge_events:
aws_event = _merge_events(aws_event, data_transformation_type)
aws_event = _merge_events(
aws_event,
cast(
Union[Type[RawScheduledEvent], Type[RawStreamEvent]],
data_transformation_type,
),
)
raw_events = data_transformation_type.from_raw_event(event=aws_event)

if (
Expand Down Expand Up @@ -567,7 +572,10 @@ def _get_custom_event_type_by_raw_aws_event(
return None, None


def _merge_events(aws_event: Any, data_transformation_type: Type[RawBaseEvent]) -> Any:
def _merge_events(
aws_event: Any,
data_transformation_type: Union[Type[RawScheduledEvent], Type[RawStreamEvent]],
) -> Any:
"""
Merges incoming aws_events into one.
Merge happens differently, depending on app type.
Expand All @@ -578,7 +586,10 @@ def _merge_events(aws_event: Any, data_transformation_type: Type[RawBaseEvent])
# scheduled event
if not isinstance(aws_event[0], dict):
aws_event = list(itertools.chain(*aws_event))
is_depth = aws_event[0]["scheduler_type"] == SchedulerType.data_depth_milestone
scheduler_type = aws_event[0]["scheduler_type"]
if isinstance(scheduler_type, SchedulerType):
scheduler_type = scheduler_type.value
is_depth = scheduler_type == SchedulerType.data_depth_milestone.value
event_start, event_end = (
("top_depth", "bottom_depth")
if is_depth
Expand All @@ -595,13 +606,8 @@ def _merge_events(aws_event: Any, data_transformation_type: Type[RawBaseEvent])
aws_event = aws_event[0]
return aws_event

elif data_transformation_type == RawStreamEvent:
# stream event
for event in aws_event[1:]:
aws_event[0]["records"].extend(event["records"])
aws_event = [aws_event[0]]
return aws_event

else:
# unexpected event type, raise an exception
assert_never(data_transformation_type) # type: ignore
# stream event
for event in aws_event[1:]:
aws_event[0]["records"].extend(event["records"])
aws_event = [aws_event[0]]
return aws_event
10 changes: 8 additions & 2 deletions tests/unit/test_docs/test_merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def test_tutorial001(context):
asset_id=1,
company_id=1,
),
RawTimeRecord(
collection=str(),
timestamp=timestamp + 2,
asset_id=1,
company_id=1,
),
],
metadata=RawMetadata(
app_stream_id=1,
Expand All @@ -55,10 +61,10 @@ def test_tutorial001(context):
).dict()
]
)
timestamp += 2
timestamp += 3

result_event: StreamTimeEvent = tutorial001.app(event, context)[0]
assert len(result_event.records) == 6, "records were not merged into a single event"
assert len(result_event.records) == 9, "records were not merged into a single event"


@pytest.mark.parametrize(
Expand Down
14 changes: 0 additions & 14 deletions tests/unit/test_merge_events.py

This file was deleted.

0 comments on commit 4ac38c9

Please sign in to comment.