Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Josh 923 diff against working branch #473

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ services:
- mq

mercury-service-api:
image: 258143015559.dkr.ecr.us-east-1.amazonaws.com/mercury/api
platform: linux/amd64
image: 258143015559.dkr.ecr.us-east-1.amazonaws.com/mercury/api:v1api-latest
restart: always
volumes:
- ./services/ge_cloud/:/code
ports:
- 5000:5000
# V1 runs on external port 7000
- 5000:7000
environment:
LOGGING_LEVEL: ${LOGGING_LEVEL}
ENVIRONMENT: ${ENVIRONMENT}
Expand Down
1 change: 1 addition & 0 deletions great_expectations_cloud/agent/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
from great_expectations_cloud.agent.actions.run_scheduled_checkpoint import (
RunScheduledCheckpointAction,
)
from great_expectations_cloud.agent.actions.run_window_checkpoint import RunWindowCheckpointAction
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def check_draft_datasource_config(
"fluent-style Data Source."
)
try:
datasource_cls = self._context.sources.type_lookup[datasource_type]
datasource_cls = self._context.data_sources.type_lookup[datasource_type]
except LookupError as exc:
raise TypeError( # noqa: TRY003 # one off error
"DraftDatasourceConfigAction received an unknown Data Source type."
Expand All @@ -75,10 +75,10 @@ def _get_table_names(self, datasource: Datasource) -> list[str]:

def _update_table_names_list(self, config_id: UUID, table_names: list[str]) -> None:
with create_session(access_token=self._auth_key) as session:
response = session.patch(
url=f"{self._base_url}/organizations/"
f"{self._organization_id}/datasources/drafts/{config_id}",
json={"table_names": table_names},
url = f"{self._base_url}/api/v1/organizations/{self._organization_id}/draft-table-names/{config_id}"
response = session.put(
url=url,
json={"data": {"table_names": table_names}},
)
if not response.ok:
raise RuntimeError( # noqa: TRY003 # one off error
Expand All @@ -90,8 +90,8 @@ def _update_table_names_list(self, config_id: UUID, table_names: list[str]) -> N

def get_draft_config(self, config_id: UUID) -> dict[str, Any]:
resource_url = (
f"{self._base_url}/organizations/"
f"{self._organization_id}/datasources/drafts/{config_id}"
f"{self._base_url}/api/v1/organizations/"
f"{self._organization_id}/draft-datasources/{config_id}"
)
with create_session(access_token=self._auth_key) as session:
response = session.get(resource_url)
Expand All @@ -102,11 +102,11 @@ def get_draft_config(self, config_id: UUID) -> dict[str, Any]:
)
data = response.json()
try:
return data["data"]["attributes"]["draft_config"] # type: ignore[no-any-return]
return data["data"]["config"] # type: ignore[no-any-return]
except KeyError as e:
raise RuntimeError( # noqa: TRY003 # one off error
"Malformed response received from GX Cloud"
) from e


register_event_action("0", DraftDatasourceConfigEvent, DraftDatasourceConfigAction)
register_event_action("1", DraftDatasourceConfigEvent, DraftDatasourceConfigAction)
31 changes: 22 additions & 9 deletions great_expectations_cloud/agent/actions/run_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,46 @@


class RunCheckpointAction(AgentAction[RunCheckpointEvent]):
# TODO: New actions need to be created that are compatible with GX v1 and registered for v1.
# This action is registered for v0, see register_event_action()

@override
def run(self, event: RunCheckpointEvent, id: str) -> ActionResult:
return run_checkpoint(self._context, event, id)


class MissingCheckpointNameError(ValueError):
"""Property checkpoint_name is required but not present."""


def run_checkpoint(
context: CloudDataContext,
event: RunCheckpointEvent | RunScheduledCheckpointEvent | RunWindowCheckpointEvent,
id: str,
evaluation_parameters: dict | None = None,
) -> ActionResult:
"""Note: the logic for this action is broken out into this function so that
the same logic can be used for both RunCheckpointEvent and RunScheduledCheckpointEvent."""
# TODO: move connection testing into OSS; there isn't really a reason it can't be done there

# the checkpoint_name property on possible events is optional for backwards compatibility,
# but this action requires it in order to run:
if not event.checkpoint_name:
raise MissingCheckpointNameError

# test connection to data source and any assets used by checkpoint
for datasource_name, data_asset_names in event.datasource_names_to_asset_names.items():
datasource = context.get_datasource(datasource_name)
datasource = context.data_sources.get(name=datasource_name)
datasource.test_connection(test_assets=False) # raises `TestConnectionError` on failure
for (
data_asset_name
) in data_asset_names: # only test connection for assets that are validated in checkpoint
asset = datasource.get_asset(data_asset_name)
asset.test_connection() # raises `TestConnectionError` on failure
checkpoint_run_result = context.run_checkpoint(
ge_cloud_id=event.checkpoint_id,
batch_request={"options": event.splitter_options} if event.splitter_options else None,
if evaluation_parameters is None:
evaluation_parameters = {}

# run checkpoint
checkpoint = context.checkpoints.get(name=event.checkpoint_name)
checkpoint_run_result = checkpoint.run(
batch_parameters=event.splitter_options,
evaluation_parameters=evaluation_parameters,
)

validation_results = checkpoint_run_result.run_results
Expand All @@ -66,4 +79,4 @@ def run_checkpoint(
)


register_event_action("0", RunCheckpointEvent, RunCheckpointAction)
register_event_action("1", RunCheckpointEvent, RunCheckpointAction)
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,4 @@ def _raise_on_any_metric_exception(self, metric_run: MetricRun) -> None:


register_event_action("0", RunMetricsListEvent, MetricListAction)
register_event_action("1", RunMetricsListEvent, MetricListAction)
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,17 @@
ActionResult,
AgentAction,
)
from great_expectations_cloud.agent.actions.run_checkpoint import run_checkpoint
from great_expectations_cloud.agent.actions.run_checkpoint import run_checkpoint_v0
from great_expectations_cloud.agent.event_handler import register_event_action
from great_expectations_cloud.agent.models import (
RunScheduledCheckpointEvent,
)


class RunScheduledCheckpointAction(AgentAction[RunScheduledCheckpointEvent]):
# TODO: New actions need to be created that are compatible with GX v1 and registered for v1.
# This action is registered for v0, see register_event_action()

@override
def run(self, event: RunScheduledCheckpointEvent, id: str) -> ActionResult:
return run_checkpoint(self._context, event, id)
return run_checkpoint_v0(self._context, event, id)


register_event_action("0", RunScheduledCheckpointEvent, RunScheduledCheckpointAction)
register_event_action("1", RunScheduledCheckpointEvent, RunScheduledCheckpointAction)
36 changes: 28 additions & 8 deletions great_expectations_cloud/agent/actions/run_window_checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,47 @@
from __future__ import annotations

from great_expectations.core.http import create_session
from great_expectations.exceptions import GXCloudError
from typing_extensions import override

from great_expectations_cloud.agent.actions.agent_action import (
ActionResult,
AgentAction,
)
from great_expectations_cloud.agent.actions.run_checkpoint import run_checkpoint
from great_expectations_cloud.agent.actions.run_checkpoint import run_checkpoint_v0
from great_expectations_cloud.agent.event_handler import register_event_action
from great_expectations_cloud.agent.models import (
RunWindowCheckpointEvent,
)


class RunWindowCheckpointAction(AgentAction[RunWindowCheckpointEvent]):
# TODO: New actions need to be created that are compatible with GX v1 and registered for v1.
# This action is registered for v0, see register_event_action()

@override
def run(self, event: RunWindowCheckpointEvent, id: str) -> ActionResult:
# TODO: https://greatexpectations.atlassian.net/browse/ZELDA-922
# This currently only runs a normal checkpoint. Logic for window checkpoints needs to be added (e.g. call the backend to get the params and then construct the evaluation_parameters before passing them into context.run_checkpoint()) One way we can do this via a param in `run_checkpoint()` that takes a function to build the evaluation_parameters, defaulting to a noop for the other checkpoint action types.
return run_checkpoint(self._context, event, id)
with create_session(access_token=self._auth_key) as session:
expectation_parameters_for_checkpoint_url = f"{self._base_url}/api/v1/organizations/"
f"{self._organization_id}/checkpoints/{event.checkpoint_id}/expectation-parameters"
response = session.get(url=expectation_parameters_for_checkpoint_url)

if not response.ok:
raise GXCloudError(
message=f"RunWindowCheckpointAction encountered an error while connecting to GX Cloud. "
f"Unable to retrieve expectation_parameters for Checkpoint with ID={event.checkpoint_id}.",
response=response,
)
data = response.json()
try:
expectation_parameters = data["data"]["expectation_parameters"]
except KeyError as e:
raise GXCloudError(
message="Malformed response received from GX Cloud",
response=response,
) from e

# Note: In v0 expectation_parameters are called evaluation_parameters.
return run_checkpoint_v0(
self._context, event, id, evaluation_parameters=expectation_parameters
)


register_event_action("0", RunWindowCheckpointEvent, RunWindowCheckpointAction)
register_event_action("1", RunWindowCheckpointEvent, RunWindowCheckpointAction)
16 changes: 9 additions & 7 deletions great_expectations_cloud/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
JobStatus,
ScheduledEventBase,
UnknownEvent,
UpdateJobStatusRequest,
build_failed_job_completed_status,
)

Expand All @@ -74,7 +75,7 @@ class GXAgentConfig(AgentBaseExtraForbid):
queue: str
connection_string: AmqpDsn
# pydantic will coerce this string to AnyUrl type
gx_cloud_base_url: AnyUrl = CLOUD_DEFAULT_BASE_URL # type: ignore[assignment] # pydantic will coerce
gx_cloud_base_url: AnyUrl = CLOUD_DEFAULT_BASE_URL
gx_cloud_organization_id: str
gx_cloud_access_token: str

Expand Down Expand Up @@ -389,7 +390,7 @@ def _get_config(cls) -> GXAgentConfig:

# obtain the broker url and queue name from Cloud
agent_sessions_url = (
f"{env_vars.gx_cloud_base_url}/organizations/"
f"{env_vars.gx_cloud_base_url}/api/v1/organizations/"
f"{env_vars.gx_cloud_organization_id}/agent-sessions"
)

Expand Down Expand Up @@ -431,10 +432,11 @@ def _update_status(self, job_id: str, status: JobStatus, org_id: UUID) -> None:
extra={"job_id": job_id, "status": str(status), "organization_id": str(org_id)},
)
agent_sessions_url = (
f"{self._config.gx_cloud_base_url}/organizations/{org_id}" + f"/agent-jobs/{job_id}"
f"{self._config.gx_cloud_base_url}/api/v1/organizations/{org_id}"
+ f"/agent-jobs/{job_id}"
)
with create_session(access_token=self.get_auth_key()) as session:
data = status.json()
data = UpdateJobStatusRequest(data=status).json()
session.patch(agent_sessions_url, data=data)
LOGGER.info(
"Status updated",
Expand All @@ -454,16 +456,16 @@ def _create_scheduled_job_and_set_started(
event_context: event with related properties and actions.
"""
data = {
**event_context.event.dict(),
"correlation_id": event_context.correlation_id,
"event": event_context.event.dict(),
}
LOGGER.info(
"Creating scheduled job and setting started",
extra={**data, "organization_id": str(org_id)},
)

agent_sessions_url = (
f"{self._config.gx_cloud_base_url}/organizations/{org_id}" + "/agent-jobs"
f"{self._config.gx_cloud_base_url}/api/v1/organizations/{org_id}" + "/agent-jobs"
)
with create_session(access_token=self.get_auth_key()) as session:
payload = Payload(data=data)
Expand Down Expand Up @@ -493,7 +495,7 @@ def _set_http_session_headers(
Note: the Agent-Job-Id header value will be set for all GX Cloud request until this method is
called again.
"""
from great_expectations import __version__ # type: ignore[attr-defined] # TODO: fix this
from great_expectations import __version__
from great_expectations.core import http
from great_expectations.data_context.store.gx_cloud_store_backend import GXCloudStoreBackend

Expand Down
2 changes: 1 addition & 1 deletion great_expectations_cloud/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class GxAgentEnvVars(BaseSettings):
# pydantic will coerce this string to AnyUrl type
gx_cloud_base_url: AnyUrl = CLOUD_DEFAULT_BASE_URL # type: ignore[assignment]
gx_cloud_base_url: AnyUrl = CLOUD_DEFAULT_BASE_URL
gx_cloud_organization_id: str
gx_cloud_access_token: str

Expand Down
2 changes: 1 addition & 1 deletion great_expectations_cloud/agent/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _get_major_version(version: str) -> str:
return str(parsed.major)


version = gx.__version__ # type: ignore[attr-defined] # TODO: fix this
version = gx.__version__
_GX_MAJOR_VERSION = _get_major_version(str(version))


Expand Down
4 changes: 4 additions & 0 deletions great_expectations_cloud/agent/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ class JobCompleted(AgentBaseExtraForbid):
JobStatus = Union[JobStarted, JobCompleted]


class UpdateJobStatusRequest(AgentBaseExtraForbid):
data: JobStatus


def build_failed_job_completed_status(error: BaseException) -> JobCompleted:
if isinstance(error, GXCoreError):
status = JobCompleted(
Expand Down
Loading
Loading