Skip to content

Commit

Permalink
[PYG-256]⏱ CogniteTimeSeries (#365)
Browse files Browse the repository at this point in the history
* refactor: added datapoints_api

* refactor; generate datapoints api

* refactor: mark cognite timeseries data classes

* tests: added generation test

* refactor: finished example

* tests: added integration test

* feat: generate with Datapoints API

* tests: removed irrelevant test

* tests: regen

* style

* docs: updated docs

* refactor; deprecation warning

* refactor: handle empty case
  • Loading branch information
doctrino authored Nov 16, 2024
1 parent c1d66f2 commit d3ac024
Show file tree
Hide file tree
Showing 42 changed files with 1,101 additions and 1,324 deletions.
2 changes: 2 additions & 0 deletions cognite/pygen/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
dm.ContainerId("cdf_cdm", "CogniteFile"): frozenset({"isUploaded", "uploadedTime"}),
}

COGNITE_TIMESERIES = dm.ContainerId("cdf_cdm", "CogniteTimeSeries")


def is_readonly_property(container: dm.ContainerId, identifier: str) -> bool:
return container in _READONLY_PROPERTIES and identifier in _READONLY_PROPERTIES[container]
12 changes: 12 additions & 0 deletions cognite/pygen/_core/generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ def generate_apis(self, client_dir: Path) -> dict[Path, str]:
sdk[data_classes_dir / "_core" / "__init__.py"] = self.generate_data_class_core_init_file()
sdk[data_classes_dir / "_core" / "query.py"] = self.generate_data_class_core_query_file()
sdk[data_classes_dir / "_core" / "cdf_external.py"] = self.generate_data_class_core_cdf_external_file()
sdk[data_classes_dir / "_core" / "datapoints_api.py"] = self.generate_data_class_core_datapoints_api_file()
return sdk

def generate_api_core_file(self) -> str:
Expand Down Expand Up @@ -535,6 +536,17 @@ def generate_data_class_core_query_file(self) -> str:
+ "\n"
)

def generate_data_class_core_datapoints_api_file(self) -> str:
"""Generate the core data classes file for the SDK."""
data_class_core = self.env.get_template("data_classes_core_datapoints_api.py.jinja")

return (
data_class_core.render(
top_level_package=self.top_level_package,
)
+ "\n"
)

def generate_data_class_core_cdf_external_file(self) -> str:
"""Generate the core data classes file for the SDK."""
data_class_core = self.env.get_template("data_classes_core_cdf_external.py.jinja")
Expand Down
10 changes: 10 additions & 0 deletions cognite/pygen/_core/models/data_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from cognite.client.data_classes.data_modeling.views import ViewProperty

from cognite.pygen import config as pygen_config
from cognite.pygen._constants import COGNITE_TIMESERIES
from cognite.pygen.config.reserved_words import is_reserved_word
from cognite.pygen.utils.cdf import _find_first_node_type
from cognite.pygen.utils.text import create_name, to_pascal, to_words
Expand Down Expand Up @@ -192,6 +193,15 @@ def update_direct_children(self, children: list[DataClass]):
self.direct_children.extend(children)
self.initialization.add("children")

@property
def is_cognite_timeseries(self) -> bool:
return any(
isinstance(field, BaseConnectionField | BasePrimitiveField)
and field.container is not None
and field.container.source == COGNITE_TIMESERIES
for field in self
)

@property
def read_base_class(self) -> str:
"""Parent read classes."""
Expand Down
7 changes: 7 additions & 0 deletions cognite/pygen/_core/templates/api_class_timeseries.py.jinja
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import datetime
import warnings
from collections.abc import Sequence
from typing import Literal, cast

Expand Down Expand Up @@ -386,6 +387,12 @@ class {{ timeseries_api.name}}:
>>> {{ data_class.variable_list }} = client.{{ api_class.parent_attribute }}.{{ timeseries_api.parent_attribute }}(limit=5).retrieve()

"""
warnings.warn(
"This method is deprecated and will soon be removed. "
"Use the .select()...data.retrieve_dataframe() method instead.",
UserWarning,
stacklevel=2,
)
filter_ = {{ data_class.filter_name }}(
self._view_id,{% for parm in list_method.parameters %}
{{ parm.name }},{% endfor %}
Expand Down
6 changes: 4 additions & 2 deletions cognite/pygen/_core/templates/data_class_node.py.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ from pydantic import field_validator, model_validator

from {{ top_level_package }}.data_classes._core import ({% if has_default_instance_space %}
DEFAULT_INSTANCE_SPACE,{% endif %}
DEFAULT_QUERY_LIMIT,
DEFAULT_QUERY_LIMIT,{% if data_class.is_cognite_timeseries %}
DataPointsAPI,{% endif %}
DataRecord,
DataRecordGraphQL,
DataRecordWrite,
Expand Down Expand Up @@ -585,7 +586,8 @@ class _{{ data_class.query_cls_name }}(NodeQueryCore[T_DomainModelList, {{ data_
self.space,
self.external_id,{% for field in data_class.filtering_fields %}
self.{{ field.name }},{% endfor %}
]){% endif %}
]){% endif %}{% if data_class.is_cognite_timeseries %}
self.data = DataPointsAPI(client, lambda limit: self._list(limit=limit).as_node_ids()){% endif %}

def list_{{ data_class.variable }}(self, limit: int = DEFAULT_QUERY_LIMIT) -> {{ data_class.read_list_name }}:
return self._list(limit=limit)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from collections.abc import Callable

import pandas as pd
import datetime
from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling.ids import NodeId
from cognite.client.data_classes.datapoints import Aggregate
from cognite.client.utils._time import ZoneInfo

from {{ top_level_package }}.data_classes._core.constants import DEFAULT_QUERY_LIMIT


class DataPointsAPI:
def __init__(self, client: CogniteClient, get_node_ids: Callable[[int], list[NodeId]]) -> None:
self._client = client
self._get_node_ids = get_node_ids

def retrieve_dataframe(
self,
start: int | str | datetime.datetime | None = None,
end: int | str | datetime.datetime | None = None,
aggregates: Aggregate | str | list[Aggregate | str] | None = None,
granularity: str | None = None,
timezone: str | datetime.timezone | ZoneInfo | None = None,
target_unit: str | None = None,
target_unit_system: str | None = None,
limit: int | None = None,
timeseries_limit: int = DEFAULT_QUERY_LIMIT,
include_outside_points: bool = False,
ignore_unknown_ids: bool = False,
include_status: bool = False,
ignore_bad_datapoints: bool = True,
treat_uncertain_as_bad: bool = True,
uniform_index: bool = False,
include_aggregate_name: bool = True,
include_granularity_name: bool = False,
) -> pd.DataFrame:
"""Get datapoints directly in a pandas dataframe.

Time series support status codes like Good, Uncertain and Bad. You can read more in the Cognite Data Fusion developer documentation on
`status codes. <https://developer.cognite.com/dev/concepts/reference/quality_codes/>`_

Note:
For many more usage examples, check out the :py:meth:`~DatapointsAPI.retrieve` method which accepts exactly the same arguments.

Args:
start (int | str | datetime.datetime | None): Inclusive start. Default: 1970-01-01 UTC.
end (int | str | datetime.datetime | None): Exclusive end. Default: "now"
aggregates (Aggregate | str | list[Aggregate | str] | None): Single aggregate or list of aggregates to retrieve. Available options: ``average``, ``continuous_variance``, ``count``, ``count_bad``, ``count_good``, ``count_uncertain``, ``discrete_variance``, ``duration_bad``, ``duration_good``, ``duration_uncertain``, ``interpolation``, ``max``, ``min``, ``step_interpolation``, ``sum`` and ``total_variation``. Default: None (raw datapoints returned)
granularity (str | None): The granularity to fetch aggregates at. Can be given as an abbreviation or spelled out for clarity: ``s/second(s)``, ``m/minute(s)``, ``h/hour(s)``, ``d/day(s)``, ``w/week(s)``, ``mo/month(s)``, ``q/quarter(s)``, or ``y/year(s)``. Examples: ``30s``, ``5m``, ``1day``, ``2weeks``. Default: None.
timezone (str | datetime.timezone | ZoneInfo | None): For raw datapoints, which timezone to use when displaying (will not affect what is retrieved). For aggregates, which timezone to align to for granularity 'hour' and longer. Align to the start of the hour, -day or -month. For timezones of type Region/Location, like 'Europe/Oslo', pass a string or ``ZoneInfo`` instance. The aggregate duration will then vary, typically due to daylight saving time. You can also use a fixed offset from UTC by passing a string like '+04:00', 'UTC-7' or 'UTC-02:30' or an instance of ``datetime.timezone``. Note: Historical timezones with second offset are not supported, and timezones with minute offsets (e.g. UTC+05:30 or Asia/Kolkata) may take longer to execute.
target_unit (str | None): The unit_external_id of the datapoints returned. If the time series does not have a unit_external_id that can be converted to the target_unit, an error will be returned. Cannot be used with target_unit_system.
target_unit_system (str | None): The unit system of the datapoints returned. Cannot be used with target_unit.
limit (int | None): Maximum number of datapoints to return for each time series. Default: None (no limit)
timeseries_limit (int): Maximum number of timeseries to fetch (columns in the dataframe). Default: 5
include_outside_points (bool): Whether to include outside points. Not allowed when fetching aggregates. Default: False
ignore_unknown_ids (bool): Whether to ignore missing time series rather than raising an exception. Default: False
include_status (bool): Also return the status code, an integer, for each datapoint in the response. Only relevant for raw datapoint queries, not aggregates.
ignore_bad_datapoints (bool): Treat datapoints with a bad status code as if they do not exist. If set to false, raw queries will include bad datapoints in the response, and aggregates will in general omit the time period between a bad datapoint and the next good datapoint. Also, the period between a bad datapoint and the previous good datapoint will be considered constant. Default: True.
treat_uncertain_as_bad (bool): Treat datapoints with uncertain status codes as bad. If false, treat datapoints with uncertain status codes as good. Used for both raw queries and aggregates. Default: True.
uniform_index (bool): If only querying aggregates AND a single granularity is used AND no limit is used, specifying `uniform_index=True` will return a dataframe with an equidistant datetime index from the earliest `start` to the latest `end` (missing values will be NaNs). If these requirements are not met, a ValueError is raised. Default: False
include_aggregate_name (bool): Include 'aggregate' in the column name, e.g. `my-ts|average`. Ignored for raw time series. Default: True
include_granularity_name (bool): Include 'granularity' in the column name, e.g. `my-ts|12h`. Added after 'aggregate' when present. Ignored for raw time series. Default: False

Returns:
pd.DataFrame: A pandas DataFrame containing the requested time series. The ordering of columns is ids first, then external_ids. For time series with multiple aggregates, they will be sorted in alphabetical order ("average" before "max").

Warning:
If you have duplicated time series in your query, the dataframe columns will also contain duplicates.

When retrieving raw datapoints with ``ignore_bad_datapoints=False``, bad datapoints with the value NaN can not be distinguished from those
missing a value (due to being stored in a numpy array); all will become NaNs in the dataframe.
"""
node_ids = self._get_node_ids(timeseries_limit)
if not node_ids:
return pd.DataFrame()
return self._client.time_series.data.retrieve_dataframe(
instance_id=node_ids,
start=start,
end=end,
aggregates=aggregates,
granularity=granularity,
timezone=timezone,
target_unit=target_unit,
target_unit_system=target_unit_system,
limit=limit,
include_outside_points=include_outside_points,
ignore_unknown_ids=ignore_unknown_ids,
include_status=include_status,
ignore_bad_datapoints=ignore_bad_datapoints,
treat_uncertain_as_bad=treat_uncertain_as_bad,
uniform_index=uniform_index,
include_aggregate_name=include_aggregate_name,
include_granularity_name=include_granularity_name,
column_names="instance_id",
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from {{ top_level_package }}.data_classes._core.constants import * # noqa
from {{ top_level_package }}.data_classes._core.base import * # noqa
from {{ top_level_package }}.data_classes._core.cdf_external import * # noqa
from {{ top_level_package }}.data_classes._core.datapoints_api import * # noqa
from {{ top_level_package }}.data_classes._core.helpers import * # noqa
from {{ top_level_package }}.data_classes._core.query import * # noqa
1 change: 1 addition & 0 deletions cognite/pygen/config/reserved_words.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"replace",
"type",
"list_full",
"data",
}
| {f for f in dir(BaseModel)}
| {
Expand Down
9 changes: 8 additions & 1 deletion docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ Changes are grouped as follows
- `Security` in case of vulnerabilities.

## TBD
### Added
- Any views that extends the `CogniteTimeSeries` now has the property `data` you can use to retrieve datapoints.
For example, `pygen.rotor.select().rotor_speed_controller.data.retrieve_dataframe(...)` will retrieve the datapoints
for the `rotor_speed_controller` timeseries.

### Fixed
- Doing `.query()` over an edge with properties without filtering no longer raises a `ValueError`.
- The `.query()` method has been renamed to `.select()`. The `.query()` method is still available, but will
be removed shortly.
- Doing `.select()` over an edge with properties without filtering no longer raises a `ValueError`.
- When using the `CogniteCore` model, either directly or an extension of it, the generated SDK now
respects the read-only properties in `CogniteAsset` and `CogniteFile`.

Expand Down
1,311 changes: 172 additions & 1,139 deletions docs/usage/timeseries.ipynb

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions examples/cognite_core/data_classes/_cognite_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cognite_core.data_classes._core import (
DEFAULT_INSTANCE_SPACE,
DEFAULT_QUERY_LIMIT,
DataPointsAPI,
DataRecord,
DataRecordGraphQL,
DataRecordWrite,
Expand Down Expand Up @@ -967,6 +968,7 @@ def __init__(
self.source_updated_user,
]
)
self.data = DataPointsAPI(client, lambda limit: self._list(limit=limit).as_node_ids())

def list_cognite_time_series(self, limit: int = DEFAULT_QUERY_LIMIT) -> CogniteTimeSeriesList:
return self._list(limit=limit)
Expand Down
1 change: 1 addition & 0 deletions examples/cognite_core/data_classes/_core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from cognite_core.data_classes._core.constants import * # noqa
from cognite_core.data_classes._core.base import * # noqa
from cognite_core.data_classes._core.cdf_external import * # noqa
from cognite_core.data_classes._core.datapoints_api import * # noqa
from cognite_core.data_classes._core.helpers import * # noqa
from cognite_core.data_classes._core.query import * # noqa
Loading

0 comments on commit d3ac024

Please sign in to comment.