Skip to content

Commit

Permalink
Merge pull request #86 from corva-ai/feature/DEVC-627_make_optional_d…
Browse files Browse the repository at this point in the history
…ata_field

DEVC-627 | Handle optional data field
  • Loading branch information
vladbagmet authored Feb 7, 2024
2 parents c9c9e1f + 136c4f3 commit 34387dc
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- Filter out records with `None` data from stream time records.


## [1.11.2] - 2024-01-05
Expand Down
34 changes: 26 additions & 8 deletions src/corva/models/stream/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class RawMetadata(CorvaBaseEvent):
RecordsDepth = Sequence[RawDepthRecord]
else:
RecordsBase = pydantic.conlist(RawBaseRecord, min_items=1)
RecordsTime = pydantic.conlist(RawTimeRecord, min_items=1)
RecordsTime = pydantic.conlist(RawTimeRecord, min_items=0)
RecordsDepth = pydantic.conlist(RawDepthRecord, min_items=1)


Expand Down Expand Up @@ -105,7 +105,10 @@ def is_completed(self) -> bool:
There can only be 1 completed record always located at the end of the list.
"""

return self.records[-1].collection == 'wits.completed'
if not self.records:
return False

return self.records[-1].collection == "wits.completed"

@property
def max_record_value(self) -> Union[int, float]:
Expand Down Expand Up @@ -165,33 +168,48 @@ def filter_records(
def set_asset_id(cls, values: dict) -> dict:
"""Calculates asset_id field."""

records: List[RawBaseRecord] = values['records']
records: List[RawBaseRecord] = values["records"]

values["asset_id"] = int(records[0].asset_id)
if records:
values["asset_id"] = int(records[0].asset_id)

return values

@pydantic.root_validator(pre=False, skip_on_failure=True)
def set_company_id(cls, values: dict) -> dict:
"""Calculates company_id field."""

records: List[RawBaseRecord] = values['records']
records: List[RawBaseRecord] = values["records"]

values["company_id"] = int(records[0].company_id)
if records:
values["company_id"] = int(records[0].company_id)

return values

@pydantic.validator("records", pre=True)
def validate_records(cls, v):
if isinstance(v, List):
return [
record
for record in v
if (
(isinstance(record, dict) and record.get("data") is not None)
or (hasattr(record, "data") and record.data is not None)
)
]
return v


class RawStreamTimeEvent(RawStreamEvent):
records: RecordsTime
rerun: Optional[RerunTime] = None
_max_record_value_cache_key: ClassVar[str] = 'last_processed_timestamp'
_max_record_value_cache_key: ClassVar[str] = "last_processed_timestamp"


class RawStreamDepthEvent(RawStreamEvent):
records: RecordsDepth
rerun: Optional[RerunDepth] = None
_max_record_value_cache_key: ClassVar[str] = 'last_processed_depth'
_max_record_value_cache_key: ClassVar[str] = "last_processed_depth"
log_identifier: str = None # type: ignore

@pydantic.root_validator(pre=False, skip_on_failure=True)
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/test_stream_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,42 @@ def stream_app(event, api, cache):
result_event: StreamDepthEvent = stream_app(event, context)[0]

assert result_event.log_identifier == 'log_identifier'


def test_raw_stream_event_with_none_data_field_returns_expected_result(context):
"""Make sure that raw stream event with empty data field
can be handled without validation exception.
"""

@stream
def stream_app(event, api, cache):
pytest.fail(
"Stream app call should be skipped "
"because there is no data to build an event"
)

event = [
{
"metadata": {
"app_stream_id": 123,
"apps": {"test-provider.test-app-name": {"app_connection_id": 456}},
"log_type": "time",
"source_type": "drilling",
},
"records": [
{
"app": "corva.wits-historical-import",
"asset_id": 1,
"collection": "wits.completed",
"company_id": 80,
"data": None,
"provider": "corva",
"timestamp": 1688999883,
"version": 1,
} # DEVC-627. This record should be filtered out because data is None.
],
}
]

_ = stream_app(event, context)[0]
assert True, "App call should be skipped"

0 comments on commit 34387dc

Please sign in to comment.