Skip to content

Commit

Permalink
Merge pull request #23 from JarvusInnovations/deploy-dagster
Browse files Browse the repository at this point in the history
Deploy dagster and parse
  • Loading branch information
atvaccaro authored Aug 29, 2023
2 parents 8453244 + 26a68a6 commit a365924
Show file tree
Hide file tree
Showing 30 changed files with 5,254 additions and 609 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/build-dags.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: build-dags

on:
pull_request:
push:
branches: [main]

jobs:
check_and_build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: 3.11
- uses: abatilo/actions-poetry@v2
- name: run mypy and pytest
working-directory: dags
run: |
poetry export --with=dev --without-hashes --format=requirements.txt > requirements.txt
poetry run pip install -r requirements.txt
poetry run mypy .
# un-comment and move up once we have tests; pytest exits with an exit code if no tests are found
# poetry run pytest
env:
RAW_BUCKET: gs://this-does-not-exist-raw
PARSED_BUCKET: gs://this-does-not-exist-parsed
- uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- uses: docker/build-push-action@v4
with:
context: dags
push: ${{ github.event_name == 'push' && github.ref == 'refs/heads/master' }}
tags: 'ghcr.io/jarvusinnovations/transit-data-analytics-demo/dags:latest'
6 changes: 4 additions & 2 deletions .github/workflows/build-fetcher-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ jobs:
python-version: 3.11
- uses: abatilo/actions-poetry@v2
- name: run mypy and pytest
id: test
working-directory: fetcher
run: |
poetry export --with=dev --without-hashes --format=requirements.txt > requirements.txt
poetry run pip install -r requirements.txt
poetry run mypy .
poetry run pytest
echo "VERSION=$(poetry version --short)" >> "$GITHUB_OUTPUT"
env:
RAW_BUCKET: gs://this-does-not-exist-raw
PARSED_BUCKET: gs://this-does-not-exist-parsed
Expand All @@ -32,5 +34,5 @@ jobs:
- uses: docker/build-push-action@v4
with:
context: fetcher
push: false
# tags: user/app:latest
push: ${{ github.event_name == 'push' && github.ref == 'refs/heads/master' }}
tags: 'ghcr.io/jarvusinnovations/transit-data-analytics-demo/fetcher:${{ steps.test.outputs.VERSION }}'
2 changes: 2 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ jobs:
# sqlfluff needs dbt to be set up and authenticated
- uses: pre-commit/[email protected]
env:
# skip sqlfluff for now; it's not authenticating to bigquery properly
SKIP: sqlfluff-lint
BIGQUERY_SERVICE_ACCOUNT: /tmp/keyfile
DBT_PROFILES_DIR: warehouse
DBT_TARGET: prod_service_account
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[holomapping]
holosource = "dagster"
root = "helm/dagster"
files = "**"
3 changes: 3 additions & 0 deletions .holo/sources/dagster.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[holosource]
url = "https://github.com/dagster-io/dagster"
ref = "refs/tags/1.4.3"
7 changes: 6 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
hooks:
# - id: check-yaml
- id: check-toml
- id: check-yaml
args: ["--allow-multiple-documents"]
- id: end-of-file-fixer
- id: mixed-line-ending
- id: pretty-format-json
args: ["--autofix"]
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 23.3.0
Expand Down
6 changes: 6 additions & 0 deletions ci/prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ jarvus:
chart: kubernetes/charts/metabase
values:
- kubernetes/values/prod-metabase.yml
- name: dagster
namespace: dagster
driver: helm
chart: kubernetes/charts/dagster
values:
- kubernetes/values/prod-dagster.yml

# base services
- name: cert-manager
Expand Down
14 changes: 10 additions & 4 deletions ci/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,22 @@ class JarvusConfig(BaseModel):


@task
def helm_plugins(c):
def helm_reqs(c):
c.run("helm plugin install https://github.com/databus23/helm-diff", warn=True)

# https://github.com/dagster-io/dagster/blob/master/.buildkite/dagster-buildkite/dagster_buildkite/steps/helm.py#L75-L80
# https://github.com/dagster-io/dagster/issues/8167
c.run(
"helm repo add bitnami-pre-2022 https://raw.githubusercontent.com/bitnami/charts/eb5f9a9513d987b519f0ecd732e7031241c50328/bitnami"
)


@task
def parse_jarvus_config(c: Context):
c.update({"jarvus_config": JarvusConfig(**c.config["jarvus"]._config)})


@task(helm_plugins, parse_jarvus_config)
@task(helm_reqs, parse_jarvus_config)
def diff(c, name=None):
for deployment in c.config.jarvus_config.deployments:
if not name or name == deployment.name:
Expand All @@ -80,7 +86,7 @@ def diff(c, name=None):
f"../{deployment.chart}",
deployment.namespace_cli,
deployment.values_cli,
"-allow-unreleased",
"--allow-unreleased",
]
c.run(" ".join(args))
elif deployment.driver == Driver.kustomize:
Expand All @@ -89,7 +95,7 @@ def diff(c, name=None):
raise _assert_never(deployment.driver)


@task(helm_plugins, parse_jarvus_config)
@task(helm_reqs, parse_jarvus_config)
def apply(c, name=None):
deployment: Deployment
for deployment in c.config.jarvus_config.deployments:
Expand Down
1 change: 1 addition & 0 deletions dags/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.env
2 changes: 2 additions & 0 deletions dags/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
data/
tmp*/
20 changes: 20 additions & 0 deletions dags/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM python:3.11-bullseye

LABEL org.opencontainers.image.source=https://github.com/JarvusInnovations/transit-data-analytics-demo

RUN apt-get update \
&& apt install -y curl

RUN mkdir /app
WORKDIR /app

RUN curl -sSL https://install.python-poetry.org | python3 -
ENV PATH="$PATH:/root/.local/bin"

COPY ./pyproject.toml ./poetry.lock /app/
ENV POETRY_VIRTUALENVS_CREATE=false
RUN poetry export --without-hashes --format=requirements.txt | pip install -r /dev/stdin

COPY . /app

CMD ["python"]
29 changes: 29 additions & 0 deletions dags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# dags

This is a [Dagster](https://dagster.io/) project scaffolded with [`dagster project scaffold`](https://docs.dagster.io/getting-started/create-new-project).

## Getting started

First, ensure that poetry is installed and then install the dependencies.
```bash
curl -sSL https://install.python-poetry.org | python3 -
poetry install
```

Then, start the Dagster UI web server (optionally specifying a port):
```bash
poetry run dagster dev <--port 1234>
```

Open http://localhost:<port, 3000 default> with your browser to see the project.

### Unit testing

Tests are in the `dags_tests` directory and you can run tests using `pytest`:

```bash
poetry run pytest dags_tests
```

### Deployment
Dagster itself is deployed via hologit and Helm; the [values file](../kubernetes/values/prod-dagster.yml) contains any Kubernetes overrides. The dags/source code in this folder are deployed by pushing a Docker image (currently `ghcr.io/jarvusinnovations/transit-data-analytics-demo/dags:latest` built from [this folder](./Dockerfile)) that is then referenced by a user code deployment in the values.
95 changes: 95 additions & 0 deletions dags/dags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
from typing import Union, Any

import pendulum
from dagster import (
AssetSelection,
Definitions,
define_asset_job,
load_assets_from_modules,
OutputContext,
InputContext,
build_schedule_from_partitioned_job,
)
from dagster._utils.backoff import backoff
from dagster_gcp import GCSResource, ConfigurablePickledObjectGCSIOManager # type: ignore[import]
from dagster_gcp.gcs import PickledObjectGCSIOManager # type: ignore[import]
from google.api_core.exceptions import Forbidden, ServiceUnavailable, TooManyRequests
from pydantic import BaseModel
from upath import UPath

from . import assets
from .common import SERIALIZERS


class HivePartitionedPydanticGCSIOManager(PickledObjectGCSIOManager):
def get_path_for_partition(
self, context: Union[InputContext, OutputContext], path: UPath, partition: str
) -> "UPath":
"""
(Docs taken from parent class)
Override this method if you want to use a different partitioning scheme
(for example, if the saving function handles partitioning instead).
The extension will be added later.
Args:
context (Union[InputContext, OutputContext]): The context for the I/O operation.
path (UPath): The path to the file or object.
partition (str): Formatted partition/multipartition key
Returns:
UPath: The path to the file with the partition key appended.
"""
feed_type, hour = partition.split("/")
parsed_hour = pendulum.from_format(hour, "YYYY-MM-DD-HH:mm")
return path / "/".join(
[
f"feed_type={feed_type}",
f"dt={SERIALIZERS[pendulum.Date](parsed_hour.date())}",
f"{SERIALIZERS[pendulum.DateTime](parsed_hour)}.jsonl",
]
)

def load_from_path(self, context: InputContext, path: UPath) -> Any:
raise NotImplementedError("HivePartitionedGCSIOManager cannot be used to load data")

def dump_to_path(self, context: OutputContext, obj: Any, path: UPath) -> None:
assert isinstance(obj, list)

if self.path_exists(path):
context.log.warning(f"Removing existing GCS key: {path}")
self.unlink(path)

if obj:
assert isinstance(obj[0], BaseModel)

jsonl_str = "\n".join(item.json() for item in obj)

backoff(
self.bucket_obj.blob(str(path)).upload_from_string,
args=[jsonl_str],
retry_on=(TooManyRequests, Forbidden, ServiceUnavailable),
)


defs = Definitions(
assets=load_assets_from_modules([assets]),
schedules=[
build_schedule_from_partitioned_job(
define_asset_job("parse_job", selection=AssetSelection.all()),
),
],
resources={
"gcs_io_manager": ConfigurablePickledObjectGCSIOManager(
gcs=GCSResource(project="transit-data-analytics-demo"),
gcs_bucket=os.environ["PARSED_BUCKET"].removeprefix("gs://"),
gcs_prefix="",
),
"pydantic_gcs_io_manager": HivePartitionedPydanticGCSIOManager(
client=GCSResource(project="transit-data-analytics-demo").get_client(),
bucket=os.environ["PARSED_BUCKET"].removeprefix("gs://"),
prefix="", # no prefix; tables are the first partition right now
),
},
)
Loading

0 comments on commit a365924

Please sign in to comment.