Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataSourceModelAccess.retrieve and related updates #169

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion basin3d/core/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def basin3d_plugin(cls):
return cls


def basin3d_plugin_access(plugin_class, synthesis_model_class, access_type):
def _basin3d_plugin_access(plugin_class, synthesis_model_class, access_type):
"""Decorator for registering model access"""

def _inner(func):
Expand Down
12 changes: 5 additions & 7 deletions basin3d/core/schema/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class QueryBase(BaseModel):

datasource: Optional[List[str]] = Field(title="Datasource Identifiers",
description="List of datasource identifiers to query by.")

id: Optional[str] = Field(title="Identifier", description="The unique identifier for the desired object")

is_valid_translated_query: Union[None, bool] = Field(default=None, title="Valid translated query",
description="Indicates whether the translated query is valid: None = is not translated")

Expand Down Expand Up @@ -68,14 +71,9 @@ class Config:
prefixed_fields: ClassVar[List[str]] = []


class QueryById(QueryBase):
"""Query for a single data object by identifier"""

id: str = Field(title="Identifier", description="The unique identifier for the desired data object")


class QueryMonitoringFeature(QueryBase):
"""Query :class:`basin3d.core.models.MonitoringFeature`"""
# optional but id (QueryBase) is required to query by named monitoring feature
feature_type: Optional[FeatureTypeEnum] = Field(title="Feature Type",
description="Filter results by the specified feature type.")
monitoring_feature: Optional[List[str]] = Field(title="Monitoring Features",
Expand All @@ -101,7 +99,7 @@ def __init__(self, **data):
data[field] = isinstance(data[field], str) and data[field].upper() or data[field]
super().__init__(**data)

prefixed_fields: ClassVar[List[str]] = ['monitoring_feature', 'parent_feature']
prefixed_fields: ClassVar[List[str]] = ['id', 'monitoring_feature', 'parent_feature']


class QueryMeasurementTimeseriesTVP(QueryBase):
Expand Down
140 changes: 98 additions & 42 deletions basin3d/core/synthesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from basin3d.core.models import Base, MeasurementTimeseriesTVPObservation, MonitoringFeature
from basin3d.core.plugin import DataSourcePluginAccess, DataSourcePluginPoint
from basin3d.core.schema.enum import MessageLevelEnum, AggregationDurationEnum
from basin3d.core.schema.query import QueryBase, QueryById, QueryMeasurementTimeseriesTVP, \
from basin3d.core.schema.query import QueryBase, QueryMeasurementTimeseriesTVP, \
QueryMonitoringFeature, SynthesisMessage, SynthesisResponse
from basin3d.core.translate import translate_query

Expand All @@ -33,7 +33,7 @@ class MonitorMixin(object):
def log(self,
message: str, level: Optional[MessageLevelEnum] = None, where: Optional[List] = None) -> Optional[SynthesisMessage]:
"""
Add a synthesis message to the synthesis respoonse
Add a synthesis message to the synthesis response
:param message: The message
:param level: The message level
:param where: Where the message is from
Expand Down Expand Up @@ -188,7 +188,7 @@ def __next__(self) -> Base:

def log(self, message: str, level: Optional[MessageLevelEnum] = None, where: Optional[List] = None): # type: ignore[override]
"""
Add a synthesis message to the synthesis respoonse
Add a synthesis message to the synthesis response
:param message: The message
:param level: The message level
:return: None
Expand Down Expand Up @@ -237,45 +237,74 @@ def list(self, query: QueryBase) -> DataSourceModelIterator:
return DataSourceModelIterator(query, self)

@monitor.ctx_synthesis
def retrieve(self, query: QueryById) -> SynthesisResponse:
def retrieve(self, query: QueryBase, **kwargs) -> SynthesisResponse:
"""
Retrieve a single synthesized value

:param query: The query for this request
:param kwargs: messages, list of messages to be returned in the SynthesisResponse
"""
messages = []
if query.id:

# split the datasource id prefix from the primary key
id_list = query.id.split("-")
datasource = None
try:
plugin = self.plugins[id_list[0]]
datasource = plugin.get_datasource()
except KeyError:
pass

if datasource:
datasource_pk = query.id.replace("{}-".format(id_list[0]),
"", 1) # The datasource id prefix needs to be removed

plugin_views = plugin.get_plugin_access()
monitor.set_ctx_basin3d_where([plugin.get_datasource().id, self.synthesis_model.__name__])
if self.synthesis_model in plugin_views:
synthesized_query: QueryById = query.copy()
synthesized_query.id = datasource_pk
synthesized_query.datasource = [datasource.id]
obj: Base = plugin_views[self.synthesis_model].get(query=synthesized_query)
return SynthesisResponse(query=query, data=obj) # type: ignore[call-arg]
datasource_id: Optional[str] = None
datasource = None
messages: List[Optional[SynthesisMessage]] = []

if 'messages' in kwargs and kwargs.get('messages'):
msg = kwargs.get('messages')
if isinstance(msg, List) and isinstance(msg[0], SynthesisMessage):
messages.extend(msg)
else:
messages.append(self.log('Message mis-configured.', MessageLevelEnum.WARN))

# if the datasource is in the query, utilize it straight up
if query.datasource:
datasource_id = query.datasource[0]

# otherwise, get from a prefixed field.
# Not tying specifically to id at this point to enable different retrieve approaches from future classes.
elif query.prefixed_fields:
value = None
for prefixed_field in query.prefixed_fields:
attr_value = getattr(query, prefixed_field)
if attr_value and isinstance(attr_value, str):
value = attr_value
break
elif attr_value and isinstance(attr_value, list):
value = attr_value[0]
break

if value and isinstance(value, str):
datasource_id = value.split("-")[0]

try:
plugin = self.plugins[datasource_id]
datasource = plugin.get_datasource()
except KeyError:
pass

if datasource:

plugin_views = plugin.get_plugin_access()
monitor.set_ctx_basin3d_where([plugin.get_datasource().id, self.synthesis_model.__name__])
if self.synthesis_model in plugin_views and hasattr(plugin_views[self.synthesis_model], 'get'):

# Now translate the query object
translated_query_params: QueryBase = self.synthesize_query(plugin_views[self.synthesis_model], query)
translated_query_params.datasource = [plugin.get_datasource().id]

# Get the model access iterator if synthesized query is valid
if translated_query_params.is_valid_translated_query:
item: Optional[Base] = plugin_views[self.synthesis_model].get(query=translated_query_params)
else:
messages.append(self.log("Plugin view does not exist",
MessageLevelEnum.WARN,
[plugin.get_datasource().id, self.synthesis_model.__name__],
))
logger.warning(f'Translated query for datasource {plugin.get_datasource().id} is not valid.')

if item:
return SynthesisResponse(query=query, data=item, messages=messages) # type: ignore[call-arg]
else:
messages.append(self.log(f"DataSource not not found for id {query.id}",
MessageLevelEnum.ERROR))
messages.append(self.log("Plugin view does not exist", MessageLevelEnum.WARN,
[plugin.get_datasource().id, self.synthesis_model.__name__],))

else:
messages.append(self.log("DataSource not found for retrieve request", MessageLevelEnum.ERROR))

return SynthesisResponse(query=query, messages=messages) # type: ignore[call-arg]

Expand All @@ -299,11 +328,13 @@ class MonitoringFeatureAccess(DataSourceModelAccess):
* *utc_offset:* float, Coordinate Universal Time offset in hours (offset in hours), e.g., +9
* *url:* url, URL with details for the feature

**Filter** by the following attributes (/?attribute=parameter&attribute=parameter&...)
**Filter** by the following attributes

* *datasource (optional):* a single data source id prefix (e.g ?datasource=`datasource.id_prefix`)
* *datasource (optional):* str, a single data source id prefix
* *id (optional):* str, a single monitoring feature id. Cannot be combined with monitoring_feature
* *parent_feature (optional)*: list, a list of parent feature ids. Plugin must have this functionality enabled.
* *monitoring_feature (optional)*: list, a list of monitoring feature ids. Cannot be combined with id, which will take precedence.

**Restrict fields** with query parameter ‘fields’. (e.g. ?fields=id,name)
"""
synthesis_model = MonitoringFeature

Expand All @@ -319,6 +350,34 @@ def synthesize_query(self, plugin_access: DataSourcePluginAccess, query: QueryMo
"""
return translate_query(plugin_access, query)

def retrieve(self, query: QueryMonitoringFeature) -> SynthesisResponse:
"""
Retrieve the specified Monitoring Feature

:param query: :class:`basin3d.core.schema.query.QueryMonitoringFeature`, id must be specified; monitoring_feature if specified will be removed.
:return: The synthesized response containing the specified MonitoringFeature if it exists
"""

# validate that id is specified
if not query.id:
return SynthesisResponse(
query=query,
messages=[self.log('query.id field is missing and is required for monitoring feature request by id.',
MessageLevelEnum.ERROR)]) # type: ignore[call-arg]

msg = []

# remove monitoring_feature specification (i.e., id takes precedence)
if query.monitoring_feature:
mf_text = ', '.join(query.monitoring_feature)
query.monitoring_feature = None
msg.append(self.log(f'Monitoring Feature query has both id {query.id} and monitoring_feature {mf_text} '
f'specified. Removing monitoring_feature and using id.', MessageLevelEnum.WARN))

# retrieve / get method order should be: MonitoringFeatureAccess, DataSourceModelAccess, <plugin>MonitoringFeatureAccess.get
synthesis_response: SynthesisResponse = super().retrieve(query=query, messages=msg)
return synthesis_response


class MeasurementTimeseriesTVPObservationAccess(DataSourceModelAccess):
"""
Expand All @@ -341,7 +400,7 @@ class MeasurementTimeseriesTVPObservationAccess(DataSourceModelAccess):
* *statistic:* list, statistical properties of the observation result (MEAN, MIN, MAX, TOTAL)
* *result_quality:* list, quality assessment values contained in the result (VALIDATED, UNVALIDATED, SUSPECTED, REJECTED, ESTIMATED)

**Filter** by the following attributes (?attribute=parameter&attribute=parameter&...):
**Filter** by the following attributes:

* *monitoring_feature (required):* List of monitoring_features ids
* *observed_property (required):* List of observed property variable ids
Expand All @@ -351,10 +410,7 @@ class MeasurementTimeseriesTVPObservationAccess(DataSourceModelAccess):
* *statistic (optional):* List of statistic options, enum (INSTANT|MEAN|MIN|MAX|TOTAL)
* *datasource (optional):* a single data source id prefix (e.g ?datasource=`datasource.id_prefix`)
* *result_quality (optional):* enum (VALIDATED|UNVALIDATED|SUSPECTED|REJECTED|ESTIMATED)
* *sampling_medium (optional):* ADD here -- probably should be enum

**Restrict fields** with query parameter ‘fields’. (e.g. ?fields=id,name)

* *sampling_medium (optional):* enum (SOLID_PHASE|WATER|GAS|OTHER)

"""
synthesis_model = MeasurementTimeseriesTVPObservation
Expand Down
64 changes: 37 additions & 27 deletions basin3d/core/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from basin3d.core import monitor
from basin3d.core.schema.enum import MAPPING_DELIMITER, NO_MAPPING_TEXT
from basin3d.core.schema.query import QueryBase, QueryById, QueryMeasurementTimeseriesTVP, QueryMonitoringFeature
from basin3d.core.schema.query import QueryBase, QueryMeasurementTimeseriesTVP, QueryMonitoringFeature


logger = monitor.get_logger(__name__)
Expand Down Expand Up @@ -114,26 +114,30 @@ def _is_translated_query_valid(datasource_id, query, translated_query) -> Option
:param translated_query: the translated query
:return: boolean (True = valid translated query, False = invalid translated query) or None (translated query could not be assessed)
"""
# loop thru kwargs
for attr in query.mapped_fields:
translated_attr_value = getattr(translated_query, attr)
b3d_attr_value = getattr(query, attr)
if isinstance(b3d_attr_value, list):
b3d_attr_value = ', '.join(b3d_attr_value)
if translated_attr_value and isinstance(translated_attr_value, list):
# if list and all of list == NOT_SUPPORTED, False
if all([x == NO_MAPPING_TEXT for x in translated_attr_value]):
logger.warning(f'Translated query for datasource {datasource_id} is invalid. No vocabulary found for attribute {attr} with values: {b3d_attr_value}.')
return False
elif translated_attr_value and isinstance(translated_attr_value, str):
# if single NOT_SUPPORTED, False
if translated_attr_value == NO_MAPPING_TEXT:
logger.warning(f'Translated query for datasource {datasource_id} is invalid. No vocabulary found for attribute {attr} with value: {b3d_attr_value}.')
return False
elif translated_attr_value:
logger.warning(
f'Translated query for datasource {datasource_id} cannot be assessed. Translated value for {attr} is not expected type.')
return None
for field_type, field_list in zip(['mapped', 'prefixed'], [query.mapped_fields, query.prefixed_fields]):
# loop thru kwargs
for attr in field_list:
translated_attr_value = getattr(translated_query, attr)
b3d_attr_value = getattr(query, attr)
msg_prefix = ''
if field_type == 'mapped':
msg_prefix = 'No vocabulary found for attribute {attr} with values: {b3d_attr_value}.'
if isinstance(b3d_attr_value, list):
b3d_attr_value = ', '.join(b3d_attr_value)
if translated_attr_value and isinstance(translated_attr_value, list):
# if list and all of list == NOT_SUPPORTED, False
if all([x == NO_MAPPING_TEXT for x in translated_attr_value]):
logger.warning(f'Translated query for datasource {datasource_id} is invalid.{msg_prefix}')
return False
elif translated_attr_value and isinstance(translated_attr_value, str):
# if single NOT_SUPPORTED, False
if translated_attr_value == NO_MAPPING_TEXT:
logger.warning(f'Translated query for datasource {datasource_id} is invalid.{msg_prefix}')
return False
elif translated_attr_value:
logger.warning(
f'Translated query for datasource {datasource_id} cannot be assessed. Translated value for {attr} is not expected type.')
return None
return True


Expand Down Expand Up @@ -175,7 +179,7 @@ def _order_mapped_fields(plugin_access, query_mapped_fields):
return query_mapped_fields_ordered


def _translate_mapped_query_attrs(plugin_access, query: Union[QueryMeasurementTimeseriesTVP, QueryMonitoringFeature, QueryById]) -> QueryBase:
def _translate_mapped_query_attrs(plugin_access, query: Union[QueryMeasurementTimeseriesTVP, QueryMonitoringFeature]) -> QueryBase:
"""
Translation functionality
"""
Expand Down Expand Up @@ -214,7 +218,7 @@ def _translate_mapped_query_attrs(plugin_access, query: Union[QueryMeasurementTi
return query


def _translate_prefixed_query_attrs(plugin_access, query: Union[QueryMeasurementTimeseriesTVP, QueryMonitoringFeature, QueryById]) -> QueryBase:
def _translate_prefixed_query_attrs(plugin_access, query: Union[QueryMeasurementTimeseriesTVP, QueryMonitoringFeature]) -> QueryBase:
"""

:param plugin_access:
Expand All @@ -237,12 +241,18 @@ def extract_id(identifer):
for attr in query.prefixed_fields:
attr_value = getattr(query, attr)
if attr_value:

# if the value is a string
if isinstance(attr_value, str):
attr_value = attr_value.split(",")
translated_value = extract_id(attr_value)
if translated_value == attr_value:
translated_value = NO_MAPPING_TEXT

translated_values = [extract_id(x) for x in attr_value if x.startswith("{}-".format(id_prefix))]
# otherwise assume it is a list
else:
translated_value = [extract_id(x) for x in attr_value if x.startswith("{}-".format(id_prefix))]

setattr(query, attr, translated_values)
setattr(query, attr, translated_value)

return query

Expand Down Expand Up @@ -385,7 +395,7 @@ def translate_attributes(plugin_access, mapped_attrs, **kwargs):
return kwargs


def translate_query(plugin_access, query: Union[QueryMeasurementTimeseriesTVP, QueryMonitoringFeature, QueryById]) -> QueryBase:
def translate_query(plugin_access, query: Union[QueryMeasurementTimeseriesTVP, QueryMonitoringFeature]) -> QueryBase:
"""
Translate BASIN-3D vocabulary specified in a query to the datasource vocabularies defined by :class:`basin3d.core.models.AttributeMapping` objects specified in the datasource plugin.

Expand Down
Loading