From 9870d2f555cb4aab4d96454c8698392db060aa46 Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Wed, 17 Apr 2024 23:05:14 -0700 Subject: [PATCH] Fixed meltano setup (#1249) --- .github/workflows/meltano/action.yml | 38 +++ .github/workflows/test-meltano-pipeline.yml | 67 ----- .../warehouse-meltano-ecosystems-ost.yml | 46 ++++ warehouse/meltano-setup/meltano.yml | 63 ++++- .../loaders/target-bigquery--z3z1ma.lock | 256 ++++++++++++++++++ warehouse/meltano-setup/pyproject.toml | 2 +- 6 files changed, 393 insertions(+), 79 deletions(-) create mode 100644 .github/workflows/meltano/action.yml delete mode 100644 .github/workflows/test-meltano-pipeline.yml create mode 100644 .github/workflows/warehouse-meltano-ecosystems-ost.yml create mode 100644 warehouse/meltano-setup/plugins/loaders/target-bigquery--z3z1ma.lock diff --git a/.github/workflows/meltano/action.yml b/.github/workflows/meltano/action.yml new file mode 100644 index 000000000..a5857fb19 --- /dev/null +++ b/.github/workflows/meltano/action.yml @@ -0,0 +1,38 @@ +# Generic worker executor. This is callable for use with cron scheduling. + +# Execute the worker for a specific group +name: warehouse-meltano-sync + +inputs: + tap: + description: The name of the tap + required: true + target: + description: The name of the target + required: true + meltano_path: + description: The path to meltano + required: true + +runs: + using: "composite" + steps: + - uses: actions/setup-python@v5 + with: + python-version: '3.10.13' + + - uses: snok/install-poetry@v1 + with: + version: '1.7.1' + - name: Install poetry deps for meltano + shell: bash + run: | + cd ${{ inputs.meltano_path }} && poetry install && meltano install + + - name: Run tap:${{ inputs.tap }} into target:${{ inputs.target }} + shell: bash + # This is for testing for now + run: | + env + + \ No newline at end of file diff --git a/.github/workflows/test-meltano-pipeline.yml b/.github/workflows/test-meltano-pipeline.yml deleted file mode 100644 index 609589e17..000000000 --- a/.github/workflows/test-meltano-pipeline.yml +++ /dev/null @@ -1,67 +0,0 @@ -# This is a test meltano pipeline -name: test-meltano-pipeline -env: - X_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - TAP_POSTGRES_AIRBYTE_CONFIG_PASSWORD: ${{ secrets.DB_PASSWORD }} - TAP_POSTGRES_AIRBYTE_CONFIG_HOST: ${{ secrets.DB_HOST }} - TAP_POSTGRES_AIRBYTE_CONFIG_DATABASE: ${{ vars.DB_NAME }} - TAP_POSTGRES_AIRBYTE_CONFIG_PORT: ${{ vars.DB_PORT }} - TAP_POSTGRES_AIRBYTE_CONFIG_USERNAME: ${{ vars.DB_USERNAME }} - -on: - workflow_dispatch: - -jobs: - test-meltano-pipeline: - name: test-meltano-pipeline - environment: temporary-2024-03-13 - runs-on: ubuntu-latest - - permissions: - contents: 'read' - id-token: 'write' - - steps: - - name: Checkout code - uses: actions/checkout@v3 - with: - fetch-depth: 1 - - - name: 'Login to GitHub Container Registry' - uses: docker/login-action@v3 - with: - registry: ghcr.io - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - uses: actions/setup-python@v5 - with: - python-version: '3.10.13' - - - uses: snok/install-poetry@v1 - with: - version: '1.7.1' - - - name: Set up WARP - uses: fscarmen/warp-on-actions@v1.1 - with: - stack: dual - - - name: Run poetry install - run: | - cd warehouse/meltano-setup && poetry install - - - name: Run meltano - run: | - cd warehouse/meltano-setup && - poetry run meltano install && - poetry run meltano run tap-postgres target-jsonl - - # At this time this auth isn't working for dbt - # - uses: 'google-github-actions/auth@v2' - # with: - # service_account: oso-github-actions@oso-production.iam.gserviceaccount.com - # workload_identity_provider: projects/1054148520225/locations/global/workloadIdentityPools/github/providers/oso-github-actions - # create_credentials_file: true - # access_token_lifetime: 3600s diff --git a/.github/workflows/warehouse-meltano-ecosystems-ost.yml b/.github/workflows/warehouse-meltano-ecosystems-ost.yml new file mode 100644 index 000000000..b4ffce193 --- /dev/null +++ b/.github/workflows/warehouse-meltano-ecosystems-ost.yml @@ -0,0 +1,46 @@ +# This is a test meltano pipeline +name: meltano-ecosystem-ost +env: + X_GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_PASSWORD: ${{ secrets.TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_PASSWORD }} + TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_HOST: ${{ secrets.TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_HOST }} + TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_DATABASE: ${{ secrets.TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_DATABASE }} + TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_PORT: ${{ secrets.TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_PORT }} + TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_USERNAME: ${{ secrets.TAP_ECOSYSTEMS_OST_AIRBYTE_CONFIG_USERNAME }} + MELTANO_STATE_BACKEND_URI: ${{ secrets.MELTANO_STATE_BACKEND_URI }} + +on: + workflow_dispatch: + +jobs: + meltano-ecosystem-ost: + name: meltano-ecosystem-ost + environment: indexer + runs-on: ubuntu-latest + + permissions: + contents: 'read' + id-token: 'write' + + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + fetch-depth: 1 + + - uses: 'google-github-actions/auth@v2' + with: + credentials_json: '${{ secrets.GOOGLE_CREDENTIALS_JSON }}' + create_credentials_file: true + + - name: Run meltano + uses: ./.github/workflows/meltano + with: + tap: tap-ecosystems-ost + target: target-bigquery + env: + TARGET_BIGQUERY_CREDENTIALS_PATH: ${{ env.GOOGLE_APPLICATION_CREDENTIALS }} + TARGET_BIGQUERY_BUCKET: oso-dataset-transfer-bucket + TARGET_BIGQUERY_PROJECT: opensource-observer + TARGET_BIGQUERY_DATASET: ecosystems-ost \ No newline at end of file diff --git a/warehouse/meltano-setup/meltano.yml b/warehouse/meltano-setup/meltano.yml index 8c589da97..a8f8cf05f 100644 --- a/warehouse/meltano-setup/meltano.yml +++ b/warehouse/meltano-setup/meltano.yml @@ -5,14 +5,22 @@ environments: - name: dev - name: staging - name: prod +# state_backend: +# uri: gs://oso-playground-dataset-transfer-bucket/meltano/state.db plugins: extractors: - - name: tap-postgres + - name: tap-ecosystems-ost + inherit_from: tap-postgres variant: airbyte - pip_url: git+https://github.com/MeltanoLabs/tap-airbyte-wrapper.git + pip_url: git+https://github.com/opensource-observer/tap-airbyte-wrapper.git config: + nullable_generated_fields: + - "*._ab_cdc_deleted_at" + airbyte_spec: + image: airbyte/source-postgres + tag: 3.3.26 airbyte_config: - jdbc_url_params: "replication=postgres" + jdbc_url_params: replication=postgres ssl_mode: mode: disable schemas: @@ -20,15 +28,48 @@ plugins: replication_method: plugin: pgoutput method: CDC - publication: publication_name + publication: oso_publication replication_slot: oso_slot - initial_waiting_seconds: 5 + initial_waiting_seconds: 10 select: - - test_table.id - - test_table.data - - test_table.created_at + - projects.* + - issues.* + - collections.* + - votes.* + + - name: tap-local-test + inherit_from: tap-postgres + variant: airbyte + pip_url: git+https://github.com/opensource-observer/tap-airbyte-wrapper.git + config: + nullable_generated_fields: + - "*._ab_cdc_deleted_at" + airbyte_spec: + image: airbyte/source-postgres + tag: 3.3.26 + airbyte_config: + jdbc_url_params: replication=postgres + ssl_mode: + mode: disable + schemas: + - public + replication_method: + plugin: pgoutput + method: CDC + publication: oso_publication + replication_slot: oso_replication_slot + initial_waiting_seconds: 10 + select: + - foo.* + - bar.* loaders: - - name: target-jsonl - variant: andyh1203 - pip_url: target-jsonl + - name: target-bigquery + variant: z3z1ma + pip_url: git+https://github.com/z3z1ma/target-bigquery.git + config: + denormalized: true + method: gcs_stage + +elt: + buffer_size: 1073741824 diff --git a/warehouse/meltano-setup/plugins/loaders/target-bigquery--z3z1ma.lock b/warehouse/meltano-setup/plugins/loaders/target-bigquery--z3z1ma.lock new file mode 100644 index 000000000..e103a8b93 --- /dev/null +++ b/warehouse/meltano-setup/plugins/loaders/target-bigquery--z3z1ma.lock @@ -0,0 +1,256 @@ +{ + "plugin_type": "loaders", + "name": "target-bigquery", + "namespace": "target_bigquery", + "variant": "z3z1ma", + "label": "Google BigQuery", + "docs": "https://hub.meltano.com/loaders/target-bigquery--z3z1ma", + "repo": "https://github.com/z3z1ma/target-bigquery", + "pip_url": "git+https://github.com/z3z1ma/target-bigquery.git", + "executable": "target-bigquery", + "description": "BigQuery loader", + "logo_url": "https://hub.meltano.com/assets/logos/loaders/bigquery.png", + "capabilities": [ + "about", + "schema-flattening", + "stream-maps" + ], + "settings_group_validation": [ + [ + "dataset", + "method", + "project" + ] + ], + "settings": [ + { + "name": "batch_size", + "kind": "integer", + "value": 500, + "label": "Batch Size", + "description": "The maximum number of rows to send in a single batch or commit." + }, + { + "name": "bucket", + "kind": "string", + "label": "Bucket", + "description": "The GCS bucket to use for staging data. Only used if method is gcs_stage." + }, + { + "name": "cluster_on_key_properties", + "kind": "boolean", + "value": false, + "label": "Cluster On Key Properties", + "description": "Determines whether to cluster on the key properties from the tap. Defaults to false. When false, clustering will be based on _sdc_batched_at instead." + }, + { + "name": "column_name_transforms.add_underscore_when_invalid", + "kind": "boolean", + "value": false, + "label": "Column Name Transforms Add Underscore When Invalid", + "description": "Add an underscore when a column starts with a digit" + }, + { + "name": "column_name_transforms.lower", + "kind": "boolean", + "value": false, + "label": "Column Name Transforms Lower", + "description": "Lowercase column names" + }, + { + "name": "column_name_transforms.quote", + "kind": "boolean", + "value": false, + "label": "Column Name Transforms Quote", + "description": "Quote columns during DDL generation" + }, + { + "name": "column_name_transforms.snake_case", + "kind": "boolean", + "value": false, + "label": "Column Name Transforms Snake Case", + "description": "Convert columns to snake case" + }, + { + "name": "credentials_json", + "kind": "string", + "label": "Credentials Json", + "description": "A JSON string of your service account JSON file." + }, + { + "name": "credentials_path", + "kind": "string", + "label": "Credentials Path", + "description": "The path to a gcp credentials json file." + }, + { + "name": "dataset", + "kind": "string", + "label": "Dataset", + "description": "The target dataset to materialize data into." + }, + { + "name": "dedupe_before_upsert", + "kind": "string", + "value": false, + "label": "Dedupe Before Upsert", + "description": "This option is only used if `upsert` is enabled for a stream. The selection criteria for the stream's candidacy is the same as upsert. If the stream is marked for deduping before upsert, we will create a _session scoped temporary table during the merge transaction to dedupe the ingested records. This is useful for streams that are not unique on the key properties during an ingest but are unique in the source system. Data lake ingestion is often a good example of this where the same unique record may exist in the lake at different points in time from different extracts." + }, + { + "name": "denormalized", + "kind": "boolean", + "value": false, + "label": "Denormalized", + "description": "Determines whether to denormalize the data before writing to BigQuery. A false value will write data using a fixed JSON column based schema, while a true value will write data using a dynamic schema derived from the tap." + }, + { + "name": "fail_fast", + "kind": "boolean", + "value": true, + "label": "Fail Fast", + "description": "Fail the entire load job if any row fails to insert." + }, + { + "name": "flattening_enabled", + "kind": "boolean", + "label": "Flattening Enabled", + "description": "'True' to enable schema flattening and automatically expand nested properties." + }, + { + "name": "flattening_max_depth", + "kind": "integer", + "label": "Flattening Max Depth", + "description": "The max depth to flatten schemas." + }, + { + "name": "generate_view", + "kind": "boolean", + "value": false, + "label": "Generate View", + "description": "Determines whether to generate a view based on the SCHEMA message parsed from the tap. Only valid if denormalized=false meaning you are using the fixed JSON column based schema." + }, + { + "name": "location", + "kind": "string", + "value": "US", + "label": "Location", + "description": "The target dataset/bucket location to materialize data into." + }, + { + "name": "method", + "kind": "options", + "value": "storage_write_api", + "label": "Method", + "description": "The method to use for writing to BigQuery.", + "options": [ + { + "label": "Storage Write API", + "value": "storage_write_api" + }, + { + "label": "Batch Job", + "value": "batch_job" + }, + { + "label": "Gcs Stage", + "value": "gcs_stage" + }, + { + "label": "Streaming Insert", + "value": "streaming_insert" + } + ] + }, + { + "name": "options.max_workers", + "kind": "integer", + "label": "Options Max Workers", + "description": "By default, each sink type has a preconfigured max worker pool limit. This sets an override for maximum number of workers in the pool." + }, + { + "name": "options.process_pool", + "kind": "boolean", + "value": false, + "label": "Options Process Pool", + "description": "By default we use an autoscaling threadpool to write to BigQuery. If set to true, we will use a process pool." + }, + { + "name": "options.storage_write_batch_mode", + "kind": "boolean", + "value": false, + "label": "Options Storage Write Batch Mode", + "description": "By default, we use the default stream (Committed mode) in the storage_write_api load method which results in streaming records which are immediately available and is generally fastest. If this is set to true, we will use the application created streams (Committed mode) to transactionally batch data on STATE messages and at end of pipe." + }, + { + "name": "overwrite", + "kind": "string", + "value": false, + "label": "Overwrite", + "description": "Determines if the target table should be overwritten on load. Defaults to false. A value of true will write to a temporary table and then overwrite the target table inside a transaction (so it is safe). A value of false will write to the target table directly (append). A value of an array of strings will evaluate the strings in order using fnmatch. At the end of the array, the value of the last match will be used. If not matched, the default value is false. This is mutually exclusive with the `upsert` option. If both are set, `upsert` will take precedence." + }, + { + "name": "partition_granularity", + "kind": "options", + "value": "month", + "label": "Partition Granularity", + "description": "The granularity of the partitioning strategy. Defaults to month.", + "options": [ + { + "label": "Year", + "value": "year" + }, + { + "label": "Month", + "value": "month" + }, + { + "label": "Day", + "value": "day" + }, + { + "label": "Hour", + "value": "hour" + } + ] + }, + { + "name": "project", + "kind": "string", + "label": "Project", + "description": "The target GCP project to materialize data into." + }, + { + "name": "schema_resolver_version", + "kind": "integer", + "value": 1, + "label": "Schema Resolver Version", + "description": "The version of the schema resolver to use. Defaults to 1. Version 2 uses JSON as a fallback during denormalization. This only has an effect if denormalized=true" + }, + { + "name": "stream_map_config", + "kind": "object", + "label": "Stream Map Config", + "description": "User-defined config values to be used within map expressions." + }, + { + "name": "stream_maps", + "kind": "object", + "label": "Stream Maps", + "description": "Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html)." + }, + { + "name": "timeout", + "kind": "integer", + "value": 600, + "label": "Timeout", + "description": "Default timeout for batch_job and gcs_stage derived LoadJobs." + }, + { + "name": "upsert", + "kind": "string", + "value": false, + "label": "Upsert", + "description": "Determines if we should upsert. Defaults to false. A value of true will write to a temporary table and then merge into the target table (upsert). This requires the target table to be unique on the key properties. A value of false will write to the target table directly (append). A value of an array of strings will evaluate the strings in order using fnmatch. At the end of the array, the value of the last match will be used. If not matched, the default value is false (append)." + } + ] +} \ No newline at end of file diff --git a/warehouse/meltano-setup/pyproject.toml b/warehouse/meltano-setup/pyproject.toml index 5e56f684b..d17fb12ce 100644 --- a/warehouse/meltano-setup/pyproject.toml +++ b/warehouse/meltano-setup/pyproject.toml @@ -8,7 +8,7 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.10,<3.13" -meltano = "^3.3.2" +meltano = {extras = ["gcs"], version = "^3.3.2"} psycopg = "^3.1.18"