Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add data connectors #478

Merged
merged 18 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.PHONY: schemas tests test_setup main_tests schemathesis_tests collect_coverage style_checks pre_commit_checks run download_avro check_avro avro_models update_avro k3d_cluster install_amaltheas all

AMALTHEA_JS_VERSION ?= 0.12.2
AMALTHEA_SESSIONS_VERSION ?= 0.0.10-new-operator-chart
AMALTHEA_JS_VERSION ?= 0.13.0
AMALTHEA_SESSIONS_VERSION ?= 0.13.0
codegen_params = --input-file-type openapi --output-model-type pydantic_v2.BaseModel --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --set-default-enum-member --openapi-scopes schemas paths parameters --set-default-enum-member --use-one-literal-as-default --use-default

define test_apispec_up_to_date
Expand Down Expand Up @@ -166,5 +166,5 @@ install_amaltheas: ## Installs both version of amalthea in the. NOTE: It uses t

# TODO: Add the version variables from the top of the file here when the charts are fully published
amalthea_schema: ## Updates generates pydantic classes from CRDs
curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/feat-add-cloud-storage/config/crd/bases/amalthea.dev_amaltheasessions.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_amalthea_session.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg
curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/main/config/crd/bases/amalthea.dev_amaltheasessions.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_amalthea_session.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg
curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/main/controller/crds/jupyter_server.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_jupyter_server.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg
3 changes: 3 additions & 0 deletions bases/renku_data_services/data_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
session_repo=config.session_repo,
storage_repo=config.storage_repo,
rp_repo=config.rp_repo,
data_connector_repo=config.data_connector_repo,
data_connector_project_link_repo=config.data_connector_to_project_link_repo,
data_connector_secret_repo=config.data_connector_secret_repo,
internal_gitlab_authenticator=config.gitlab_authenticator,
)
platform_config = PlatformConfigBP(
Expand Down
1 change: 1 addition & 0 deletions components/renku_data_services/app_config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ def data_connector_secret_repo(self) -> DataConnectorSecretRepository:
data_connector_repo=self.data_connector_repo,
user_repo=self.kc_user_repo,
secret_service_public_key=self.secrets_service_public_key,
authz=self.authz,
)
return self._data_connector_secret_repo

Expand Down
2 changes: 1 addition & 1 deletion components/renku_data_services/authn/keycloak.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def authenticate(
user = base_models.AuthenticatedAPIUser(
is_admin=is_admin,
id=id,
access_token=access_token,
access_token=token,
full_name=parsed.get("name"),
first_name=parsed.get("given_name"),
last_name=parsed.get("family_name"),
Expand Down
24 changes: 0 additions & 24 deletions components/renku_data_services/base_api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,6 @@ async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwar
return decorator


def validate_path_project_id(
f: Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]],
) -> Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]]:
"""Decorator for a Sanic handler that validates the project_id path parameter."""
_path_project_id_regex = re.compile(r"^[A-Za-z0-9]{26}$")

@wraps(f)
async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwargs) -> _T:
project_id = cast(str | None, kwargs.get("project_id"))
if not project_id:
raise errors.ProgrammingError(
message="Could not find 'project_id' in the keyword arguments for the handler in order to validate it."
)
if not _path_project_id_regex.match(project_id):
raise errors.ValidationError(
message=f"The 'project_id' path parameter {project_id} does not match the required "
f"regex {_path_project_id_regex}"
)

return await f(request, *args, **kwargs)

return decorated_function


def validate_path_user_id(
f: Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]],
) -> Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]]:
Expand Down
4 changes: 2 additions & 2 deletions components/renku_data_services/data_connectors/api.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,10 @@ components:
exclusive:
type: boolean
description: if true, only values from 'examples' can be used
datatype:
type:
type: string
description: data type of option value. RClone has more options but they map to the ones listed here.
enum: ["int", "bool", "string", "Time"]
enum: ["int", "bool", "string", "Time", "Duration", "MultiEncoder", "SizeSuffix", "SpaceSepList", "CommaSepList", "Tristate"]
Ulid:
description: ULID identifier
type: string
Expand Down
12 changes: 9 additions & 3 deletions components/renku_data_services/data_connectors/apispec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: api.spec.yaml
# timestamp: 2024-10-22T07:46:54+00:00
# timestamp: 2024-10-28T20:03:14+00:00

from __future__ import annotations

Expand All @@ -23,11 +23,17 @@ class Example(BaseAPISpec):
)


class Datatype(Enum):
class Type(Enum):
int = "int"
bool = "bool"
string = "string"
Time = "Time"
Duration = "Duration"
MultiEncoder = "MultiEncoder"
SizeSuffix = "SizeSuffix"
SpaceSepList = "SpaceSepList"
CommaSepList = "CommaSepList"
Tristate = "Tristate"


class RCloneOption(BaseAPISpec):
Expand Down Expand Up @@ -65,7 +71,7 @@ class RCloneOption(BaseAPISpec):
exclusive: Optional[bool] = Field(
None, description="if true, only values from 'examples' can be used"
)
datatype: Optional[Datatype] = Field(
type: Optional[Type] = Field(
None,
description="data type of option value. RClone has more options but they map to the ones listed here.",
)
Expand Down
40 changes: 38 additions & 2 deletions components/renku_data_services/data_connectors/db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Adapters for data connectors database classes."""

from collections.abc import Callable
from collections.abc import AsyncIterator, Callable
from typing import TypeVar

from cryptography.hazmat.primitives.asymmetric import rsa
from sqlalchemy import Select, delete, func, select
from sqlalchemy import Select, delete, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from ulid import ULID

Expand Down Expand Up @@ -477,11 +477,47 @@ def __init__(
data_connector_repo: DataConnectorRepository,
user_repo: UserRepo,
secret_service_public_key: rsa.RSAPublicKey,
authz: Authz,
) -> None:
self.session_maker = session_maker
self.data_connector_repo = data_connector_repo
self.user_repo = user_repo
self.secret_service_public_key = secret_service_public_key
self.authz = authz

async def get_data_connectors_with_secrets(
self,
user: base_models.APIUser,
project_id: ULID,
) -> AsyncIterator[models.DataConnectorWithSecrets]:
"""Get all data connectors and their secrets for a project."""
if user.id is None:
raise errors.UnauthorizedError(message="You do not have the required permissions for this operation.")

can_read_project = await self.authz.has_permission(user, ResourceType.project, project_id, Scope.READ)
if not can_read_project:
raise errors.MissingResourceError(
message=f"The project ID with {project_id} does not exist or you dont have permission to access it"
)

async with self.session_maker() as session:
stmt = select(schemas.DataConnectorORM).where(
schemas.DataConnectorORM.project_links.any(
schemas.DataConnectorToProjectLinkORM.project_id == project_id
),
or_(
# Data connectors with secrets for the specific user
schemas.DataConnectorORM.secrets.any(
schemas.DataConnectorSecretORM.user_id == user.id,
),
# Data connectors without any secrets
# See: https://docs.sqlalchemy.org/en/20/orm/queryguide/select.html#exists-forms-has-any
~schemas.DataConnectorORM.secrets.any(),
olevski marked this conversation as resolved.
Show resolved Hide resolved
),
)
results = await session.stream_scalars(stmt)
async for dc in results:
yield models.DataConnectorWithSecrets(dc.dump(), [secret.dump() for secret in dc.secrets])

async def get_data_connector_secrets(
self,
Expand Down
8 changes: 8 additions & 0 deletions components/renku_data_services/data_connectors/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,11 @@ class DataConnectorPermissions:
write: bool
delete: bool
change_membership: bool


@dataclass
class DataConnectorWithSecrets:
"""A data connector with its secrets."""

data_connector: DataConnector
secrets: list[DataConnectorSecret] = field(default_factory=list)
2 changes: 2 additions & 0 deletions components/renku_data_services/data_connectors/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class DataConnectorORM(BaseORM):
onupdate=func.now(),
nullable=False,
)
secrets: Mapped[list["DataConnectorSecretORM"]] = relationship(init=False, viewonly=True, lazy="selectin")
project_links: Mapped[list["DataConnectorToProjectLinkORM"]] = relationship(init=False, viewonly=True)

def dump(self) -> models.DataConnector:
"""Create a data connector model from the DataConnectorORM."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""expand and separate environments from session launchers

Revision ID: 584598f3b769
Revises: 726d5d0e1f28
Revises: cefb45b5d71e
Create Date: 2024-08-12 14:25:24.292285

"""
Expand All @@ -12,7 +12,7 @@

# revision identifiers, used by Alembic.
revision = "584598f3b769"
down_revision = "726d5d0e1f28"
down_revision = "cefb45b5d71e"
branch_labels = None
depends_on = None

Expand Down
6 changes: 1 addition & 5 deletions components/renku_data_services/notebooks/api.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1015,19 +1015,15 @@ components:
additionalProperties: true
readonly:
type: boolean
default: true
source_path:
type: string
target_path:
type: string
storage_id:
allOf:
- "$ref": "#/components/schemas/Ulid"
- description: The storage ID is used to know which storage config from the DB should be overriden
- description: If the storage_id is provided then this config must replace an existing storage config in the session
required:
- configuration
- source_path
- target_path
- storage_id
olevski marked this conversation as resolved.
Show resolved Hide resolved
ServerName:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ async def get_storage_by_id(
# TODO: remove project_id once authz on the data service works properly
request_url = self.storage_url + f"/storage/{storage_id}?project_id={project_id}"
logger.info(f"getting storage info by id: {request_url}")
async with httpx.AsyncClient() as client:
res = await client.get(request_url, headers=headers, timeout=10)
async with httpx.AsyncClient(timeout=10) as client:
res = await client.get(request_url, headers=headers)
if res.status_code == 404:
raise MissingResourceError(message=f"Couldn't find cloud storage with id {storage_id}")
if res.status_code == 401:
Expand All @@ -79,8 +79,8 @@ async def get_storage_by_id(

async def validate_storage_configuration(self, configuration: dict[str, Any], source_path: str) -> None:
"""Validate the cloud storage configuration."""
async with httpx.AsyncClient() as client:
res = await client.post(self.storage_url + "/storage_schema/validate", json=configuration, timeout=10)
async with httpx.AsyncClient(timeout=10) as client:
res = await client.post(self.storage_url + "/storage_schema/validate", json=configuration)
if res.status_code == 422:
raise InvalidCloudStorageConfiguration(
message=f"The provided cloud storage configuration isn't valid: {res.json()}",
Expand All @@ -92,8 +92,8 @@ async def validate_storage_configuration(self, configuration: dict[str, Any], so

async def obscure_password_fields_for_storage(self, configuration: dict[str, Any]) -> dict[str, Any]:
"""Obscures password fields for use with rclone."""
async with httpx.AsyncClient() as client:
res = await client.post(self.storage_url + "/storage_schema/obscure", json=configuration, timeout=10)
async with httpx.AsyncClient(timeout=10) as client:
res = await client.post(self.storage_url + "/storage_schema/obscure", json=configuration)

if res.status_code != 200:
raise InvalidCloudStorageConfiguration(
Expand Down Expand Up @@ -300,8 +300,8 @@ async def get_oauth2_connections(self, user: APIUser | None = None) -> list[OAut
return []
request_url = f"{self.service_url}/oauth2/connections"
headers = {"Authorization": f"bearer {user.access_token}"}
async with httpx.AsyncClient() as client:
res = await client.get(request_url, headers=headers, timeout=10)
async with httpx.AsyncClient(timeout=10) as client:
res = await client.get(request_url, headers=headers)
if res.status_code != 200:
raise IntermittentError(message="The data service sent an unexpected response, please try again later")
connections = res.json()
Expand All @@ -311,8 +311,8 @@ async def get_oauth2_connections(self, user: APIUser | None = None) -> list[OAut
async def get_oauth2_provider(self, provider_id: str) -> OAuth2Provider:
"""Get a specific provider."""
request_url = f"{self.service_url}/oauth2/providers/{provider_id}"
async with httpx.AsyncClient() as client:
res = await client.get(request_url, timeout=10)
async with httpx.AsyncClient(timeout=10) as client:
res = await client.get(request_url)
if res.status_code != 200:
raise IntermittentError(message="The data service sent an unexpected response, please try again later")
provider = res.json()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
PatchServerError,
)
from renku_data_services.notebooks.errors.programming import ProgrammingError
from renku_data_services.notebooks.errors.user import MissingResourceError
from renku_data_services.notebooks.util.kubernetes_ import find_env_var
from renku_data_services.notebooks.util.retries import (
retry_with_exponential_backoff_async,
Expand Down Expand Up @@ -351,7 +350,7 @@ class ServerCache(Generic[_SessionType]):

def __init__(self, url: str, server_type: type[_SessionType]):
self.url = url
self.client = httpx.AsyncClient()
self.client = httpx.AsyncClient(timeout=10)
self.server_type: type[_SessionType] = server_type
self.url_path_name = "servers"
if server_type == AmaltheaSessionV1Alpha1:
Expand Down Expand Up @@ -449,7 +448,7 @@ async def get_server_logs(
server = await self.get_server(server_name, safe_username)
if not server:
raise errors.MissingResourceError(
message=f"Cannot find server {server_name} for user " f"{safe_username} to retrieve logs."
message=f"Cannot find server {server_name} for user {safe_username} to retrieve logs."
)
pod_name = f"{server_name}-0"
return await self.renku_ns_client.get_pod_logs(pod_name, max_log_lines)
Expand All @@ -474,8 +473,8 @@ async def patch_server(
"""Patch a server."""
server = await self.get_server(server_name, safe_username)
if not server:
raise MissingResourceError(
f"Cannot find server {server_name} for user " f"{safe_username} in order to patch it."
raise errors.MissingResourceError(
message=f"Cannot find server {server_name} for user {safe_username} in order to patch it."
)
return await self.renku_ns_client.patch_server(server_name=server_name, patch=patch)

Expand All @@ -491,7 +490,7 @@ async def delete_server(self, server_name: str, safe_username: str) -> None:
server = await self.get_server(server_name, safe_username)
if not server:
raise errors.MissingResourceError(
message=f"Cannot find server {server_name} for user " f"{safe_username} in order to delete it."
message=f"Cannot find server {server_name} for user {safe_username} in order to delete it."
)
return await self.renku_ns_client.delete_server(server_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from configparser import ConfigParser
from io import StringIO
from pathlib import PurePosixPath
from typing import Any, Final, Optional, Self
from typing import Any, Final, Optional, Protocol, Self

from kubernetes import client
from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema
Expand Down Expand Up @@ -36,6 +36,15 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None:
raise ValidationError("'storage_id' cannot be used together with 'source_path' or 'target_path'")


class RCloneStorageRequestOverride(Protocol):
"""A small dataclass for handling overrides to the data connector requests."""

source_path: str | None = None
target_path: str | None = None
configuration: dict[str, Any] | None = None
readonly: bool | None = None


class RCloneStorage(ICloudStorageRequest):
"""RClone based storage."""

Expand Down Expand Up @@ -221,6 +230,17 @@ def _stringify(value: Any) -> str:
parser.write(stringio)
return stringio.getvalue()

def with_override(self, override: RCloneStorageRequestOverride) -> "RCloneStorage":
"""Override certain fields on the storage."""
return RCloneStorage(
source_path=override.source_path if override.source_path else self.source_path,
mount_folder=override.target_path if override.target_path else self.mount_folder,
readonly=override.readonly if override.readonly is not None else self.readonly,
configuration=override.configuration if override.configuration else self.configuration,
name=self.name,
config=self.config,
)


class LaunchNotebookResponseCloudStorage(RCloneStorageRequest):
"""Notebook launch response with cloud storage attached."""
Expand Down
Loading
Loading