Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add compute block config validation #5

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ __pycache__/
dist/
build/
venv/
.venv/
213 changes: 210 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,37 @@ You can install the package via pip once it's published:
pip install scystream-sdk
```

## Usage
## Introduction

One of the central concepts of scystream are the so-called **Compute Blocks**.

A Compute Block describes an independent programm, that acts as some kind of worker
which will be scheduled using the scystream-core application.
This worker executes a task (e.g. a NLP task, a crawling task).

This SDK aims to provide helper functions and all other requirements you need to implement
a custom Compute Block on your own.

Each worker can have multiple entrypoints, each aiming to solve one task.
These entrypoints can be configured from the outside using the **Settings**.
These are basically ENV-Variables, which will be parsed & validated using pydantic.

You can either set "global" Settings (for the entrypoint), by using the `envs` block.
Or you can set "input/output-related" Settings by using the `config` block in each input/output.

## Basic Usage of the SDK

```python3
from scystream.sdk.core import entrypoint
from scystream.sdk.scheduler import Scheduler


@entrypoint
@entrypoint()
def example_task():
print("Executing example_task...")


@entrypoint
@entrypoint()
def another_task(task_name):
print(f"Executing another_task with task name: {task_name}")

Expand All @@ -35,3 +53,192 @@ if __name__ == "__main__":
main()

```

## Defining Settings and Using them.

Earlier, we already wrote about **Settings**.
Each Input & Output can be configured using these settings.
There are also Global Settings, refered to as `envs` in the `cbc.yaml`

Below you can find a simple example of how we define & validate these settings.
Therefore you should use the `EnvSettings` class.

```python3
from scystream.sdk.core import entrypoint
from scystream.sdk.env.settings import EnvSettings

class TextDataInputSettings(EnvSettings):
TXT_SRC_PATH: str # no default provided, manual setting is a MUST

class DBDataInputSettings(EnvSettings):
DATA_TABLE_NAME: str = "nlp_information"
DB_HOST: str = "time.rwth-aachen.de"
DB_PORT: str = 1234

class TopicModellingEntrypointSettings(EnvSettings):
LANGUAGE: str = "de"

text_data: TextDataInputSettings
db_data: DBDataInputSettings

@entrypoint(TopicModellingEntrypointSettings) # Pass it to the Entrypoint
def topic_modelling(settings): # The settings param is automatically injected to your function, you can use it
print(f"Running topic modelling, using file: {settings.text_data.TXT_SRC_PATH}")

@entrypoint()
def test_entrypint():
print("This entrypoint does not have any configs.")
```

Of course, you will also be able to use your settings in other files/directories.
For that, just import your desired setting and use the `get_settings()` function.
It will load the configurations correctly.

## Compute Block Config

We expect every repository which will be used within the scystream application
to contain a **Compute Block Config File**, the `cbc.yaml`, within the root directory.
This `cbc.yaml` will be used to define the entrypoints, the inputs & outputs each
Compute Block offers, necessary for the scystream-frontend to understand.

This is an example `cbc.yaml`:

```yaml
name: "NLP toolbox"
description: "Contains NLP algorithms..."
author: "John Doe"
docker_image: "https://ghcr.io/nlp-toolbox"

entrypoints:
topic_modelling:
description: "Run topic modelling"
envs:
LANGUAGE: "de"
inputs:
text_data:
description: "Text file. Can be uploaded by the user."
type: "file"
config:
TXT_SRC_PATH: null
db_data:
description: "Information in a database"
type: "db_table"
config:
DATA_TABLE_NAME: "nlp_information"
DB_HOST: "time.rwth-aachen.de"
DB_PORT: 1234
outputs:
topic_model:
type: "file"
description: "Topic model file"
config:
OUTPUT_PATH_TOPIC_MODEL: null
run_durations:
type: "db_table"
description: "Table that contains the run durations per day."
config:
RUN_DURATIONS_TABLE_NAME: "run_durations_nlp"

analyze_runtime:
description: "Analyze the runtimes"
inputs:
run_durations:
description: "Table that contains all runtimes and dates"
type: "db_table"
config:
RUN_DURATIONS_TABLE_NAME: "run_durations_nlp"
outputs:
csv_output:
type: "file"
description: "A csv containing statistical information"
config:
CSV_OUTPUT_PATH: "outputs/statistics.csv"
```

### Generating a config

After writing the functionality of your ComputeBlock (see more below) you can generate
the corresponding `cbc.yaml` by using the following function:

```python3
from scystream.sdk.config import generate_config_from_compute_block, get_compute_block
from pathlib import Path

@entrypoint()
def example_entrypoint():
print("Example...")

if __name__ == "__main__":
compute_block = get_compute_block()
generate_config_from_compute_block(cb, Path("cbc.yaml"))
```

This will take all the entrypoints, their defined settings, and generate a config from them.

> [!NOTE]
> Make sure to edit the generated config by your user-defined metadata
> (e.g. author, description, docker_image, ...)

### Validating a config

If you want your `cbc.yaml` to be located in a different directory or have a different name, you
have to configure that accordingly:

```python3
from scystream.sdk.config import global_config

if __name__ == "__main__":
# Set the config_path
global_config.set_config_path("custom_dir/custom_name.yaml")
```

Of course, you can also write the config completely on your own.

> [!NOTE]
> When using `Scheduler.execute_function("entrypoint")` the Settings for the
> entrypoint and the config will be validated.
> If the Settings do not correspond to the definition in the yaml, execution will not be possible.

To validate the config, you can also use a helper function like this:

```python3
from scystream.sdk.config import validate_config_with_code

@entrypoint()
def example_entrypoint():
print("Example...")

if __name__ == "__main__":
validate_config_with_code()
```

## Development of the SDK

### Installation

1. Create a venv and use it

```bash
python3 -m venv .venv
source .venv/bin/activate
```

2. Install the package within the venv

> [!NOTE]
> This will also install all the install_requirements from the setup.py

```bash
pip install -e .
```

3. Develop!

### Tests

To run all the tests run the following command:

```bash
python3 -m unittest discover -s tests
```

6 changes: 6 additions & 0 deletions scystream/sdk/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .config_loader import global_config, \
validate_config_with_code, load_config
from .compute_block_utils import get_compute_block

__all__ = ["global_config", "validate_config_with_code",
"load_config", "EnvSettings", "get_compute_block"]
72 changes: 72 additions & 0 deletions scystream/sdk/config/compute_block_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import Union
from pydantic_core import PydanticUndefinedType
from scystream.sdk.config.models import ComputeBlock, Entrypoint, \
InputOutputModel
from scystream.sdk.env.settings import InputSettings, \
OutputSettings
from scystream.sdk.config.entrypoints import get_registered_functions


def _get_pydantic_default_value_or_none(value):
if type(value.default) is PydanticUndefinedType:
return None
return value.default


def _build_input_output_dict_from_class(
subject: Union[InputSettings, OutputSettings]
):
config_dict = {}
for key, value in subject.model_fields.items():
config_dict[key] = _get_pydantic_default_value_or_none(value)
return InputOutputModel(
type="TODO: SetType",
description="<to-be-set>",
config=config_dict
)


def get_compute_block() -> ComputeBlock:
"""
Converts Entrypoints & Settings to a ComputeBlock
"""
entrypoints = {}
for entrypoint, func in get_registered_functions().items():
envs = {}
inputs = {}
outputs = {}

if func["settings"]:
entrypoint_settings_class = func["settings"]
for key, value in entrypoint_settings_class.model_fields.items():
if (
isinstance(value.default_factory, type) and
issubclass(value.default_factory, InputSettings)
):
inputs[key] = _build_input_output_dict_from_class(
value.default_factory
)
elif (
isinstance(value.default_factory, type) and
issubclass(value.default_factory, OutputSettings)
):
outputs[key] = _build_input_output_dict_from_class(
value.default_factory
)
else:
envs[key] = _get_pydantic_default_value_or_none(value)

entrypoints[entrypoint] = Entrypoint(
description="<tbd>",
envs=envs if envs != {} else None,
inputs=inputs if inputs != {} else None,
outputs=outputs if outputs != {} else None
)

return ComputeBlock(
name="<tbs>",
description="<tbs>",
author="<tbs>",
entrypoints=entrypoints,
docker_image="<tbs>"
)
Loading
Loading