diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/EntityTypeMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/EntityTypeMapper.java index b1a40cd5de1db..9a72d7dc2c77d 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/EntityTypeMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/EntityTypeMapper.java @@ -38,7 +38,6 @@ public class EntityTypeMapper { .put(EntityType.TEST, "test") .put(EntityType.DATAHUB_VIEW, Constants.DATAHUB_VIEW_ENTITY_NAME) .put(EntityType.DATA_PRODUCT, Constants.DATA_PRODUCT_ENTITY_NAME) - .put(EntityType.INGESTION_SOURCE, Constants.INGESTION_SOURCE_ENTITY_NAME) .build(); private static final Map ENTITY_NAME_TO_TYPE = diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java index fa56f15bcf8d4..d019473606e58 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java @@ -4,6 +4,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.datahub.graphql.QueryContext; import com.linkedin.datahub.graphql.exception.AuthorizationException; +import com.linkedin.datahub.graphql.generated.FacetFilterInput; import com.linkedin.datahub.graphql.generated.ListIngestionSourcesInput; import com.linkedin.datahub.graphql.generated.ListIngestionSourcesResult; import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils; @@ -20,6 +21,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -51,6 +53,7 @@ public CompletableFuture get(final DataFetchingEnvir final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart(); final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount(); final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery(); + final List filters = input.getFilters() == null ? Collections.emptyList() : input.getFilters(); return CompletableFuture.supplyAsync(() -> { try { @@ -58,7 +61,8 @@ public CompletableFuture get(final DataFetchingEnvir final SearchResult gmsResult = _entityClient.search( Constants.INGESTION_SOURCE_ENTITY_NAME, query, - Collections.emptyMap(), + buildFilter(filters, Collections.emptyList()), + null, start, count, context.getAuthentication(), diff --git a/datahub-graphql-core/src/main/resources/ingestion.graphql b/datahub-graphql-core/src/main/resources/ingestion.graphql index 256b94ccdc244..69c8aff124583 100644 --- a/datahub-graphql-core/src/main/resources/ingestion.graphql +++ b/datahub-graphql-core/src/main/resources/ingestion.graphql @@ -428,6 +428,11 @@ input ListIngestionSourcesInput { An optional search query """ query: String + + """ + Optional Facet filters to apply to the result set + """ + filters: [FacetFilterInput!] } """ diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java index 3fa19cca0623c..e7e981fb9ad0d 100644 --- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java +++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java @@ -30,7 +30,7 @@ public class ListIngestionSourceResolverTest { - private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null); + private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null, null); @Test public void testGetSuccess() throws Exception { diff --git a/docs/cli.md b/docs/cli.md index cbe6b79183e21..5f8ed0d9e1a5f 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -110,8 +110,10 @@ To schedule a recipe called "test", to run at 5am everyday, London time with the datahub ingest deploy --name "test" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml ```` -The `name` parameter specifies the recipe name to be uploaded, or if the name already exists, updates an existing recipe. -**Note**: In case the name contains whitespaces the cli will convert them to underscores (`_`). +To update an existing recipe please use the `--urn` parameter to specify the id of the recipe to update. + +**Note:** Updating a recipe will result in a replacement of the existing options with what was specified in the cli command. +I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated. ### init diff --git a/docs/ui-ingestion.md b/docs/ui-ingestion.md index 270ba22826635..4435f66e514f3 100644 --- a/docs/ui-ingestion.md +++ b/docs/ui-ingestion.md @@ -157,12 +157,16 @@ Once you're happy with your changes, simply click 'Done' to save. -Using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy). +You can upload and even update recipes using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy). An example execution would look something like: + ```bash datahub ingest deploy --name "My Test Ingestion Source" --schedule "5 * * * *" --time-zone "UTC" -c recipe.yaml ``` +This would create a new recipe with the name `My Test Ingestion Source`. Note that to update an existing recipe, it's `urn` id must be passed as a parameter. +DataHub supports having multiple recipes with the same name so to distinguish them we use the urn for unique identification. + diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index f72019586fc79..dd4f105ae4df4 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -6,9 +6,9 @@ import os import sys import tzlocal -import pytz +from string import Template from datetime import datetime -from typing import Optional, Union +from typing import Optional import click import click_spinner @@ -226,11 +226,18 @@ async def run_func_check_upgrade(pipeline: Pipeline) -> None: help="Config file in .toml or .yaml format.", required=True, ) +@click.option( + "--urn", + type=str, + help="Urn of recipe to update", + required=False, +) @click.option( "--executor-id", type=str, default="default", help="Executor id to route execution requests to. Do not use this unless you have configured a custom executor.", + required=False, ) @click.option( "--cli-version", @@ -256,6 +263,7 @@ async def run_func_check_upgrade(pipeline: Pipeline) -> None: def deploy( name: str, config: str, + urn: str, executor_id: str, cli_version: str, schedule: str, @@ -270,62 +278,75 @@ def deploy( datahub_graph = get_default_graph() - urns = list(datahub_graph.get_urns_by_filter( - entity_types=[IngestionSourceUrn.ENTITY_TYPE], - query=name - )) - - print(urns) - - if len(urns) > 1: - click.echo(f"Found multiple ingestion source urns for {name}: {urns}") - exit() - - endpoint = "" - input_obj = {} - graphql_query = f"mutation {endpoint}({input_obj})" + graphql_query_template = Template( + """ + mutation $endpoint ( $variable_types ) { + $endpoint ( $input ) + }""") pipeline_config = load_config_file( config, - squirrel_original_config=True, - squirrel_field="__raw_config", allow_stdin=True, resolve_env_vars=False, ) - print(pipeline_config) - - if len(urns) == 1: - logger.info(f"Found recipe urn {urns[0]} for {name}, will update this recipe.") - endpoint = "updateIngestionSource" - input_obj = { - "urn": urns[0], - "input": { - "name": name, - "type": pipeline_config["source"]["type"], - "schedule": { - "interval": schedule, - "timezone": time_zone, - }, - "config": json.dumps(pipeline_config) + variables: dict = { + "name": name, + "type": pipeline_config["source"]["type"], + "recipe": json.dumps(pipeline_config), + "executorId": executor_id, + } + graphql_variable_types: dict = { + "$name": "String!", + "$type": "String!", + "$recipe": "String!", + "$executorId": "String!", + } + input_template: dict = { + "input": { + "name": "$name", + "type": "$type", + "config": { + "recipe": "$recipe", + "executorId": "$executorId" } } + } + graphql_endpoint: str + + if urn: + if not datahub_graph.exists(urn): + logger.error(f"Could not find recipe for provided urn: {urn}") + exit() + logger.info(f"Found recipe URN, will update recipe.") + graphql_endpoint = "updateIngestionSource" + input_template["urn"] = "$urn" + graphql_variable_types["$urn"] = "String!" + variables["urn"] = urn else: - logger.info(f"No ingestion source urn found for {name}. Will create a new recipe.") - endpoint = "createIngestionSource" - input_obj = { - "types": types, - "query": query, - "orFilters": orFilters, - "batchSize": batch_size, - "scrollId": scroll_id, - } + logger.info(f"No URN specified recipe urn, will create a new recipe.") + graphql_endpoint = "createIngestionSource" + + if schedule: + input_template["input"]["schedule"] = "$schedule" + graphql_variable_types["$schedule"] = "UpdateIngestionSourceScheduleInput" + variables["schedule"] = {"interval": schedule, "timezone": time_zone} + + if cli_version: + input_template["input"]["config"]["version"] = "$version" + graphql_variable_types["$version"] = "String" + variables["version"] = cli_version + + graphql_query = graphql_query_template.substitute( + endpoint=graphql_endpoint, + variable_types=json.dumps(graphql_variable_types, sort_keys=True).replace("\"", "")[1:-1], + input=json.dumps(input_template, sort_keys=True).replace("\"", "")[1:-1] + ) - exit() - #datahub_graph.execute_graphql(graphql_query) + response = datahub_graph.execute_graphql(graphql_query, variables=variables) click.echo( - f"✅ Successfully wrote data ingestion source metadata for {ingestion_source_urn} to DataHub ({datahub_graph})" + f"✅ Successfully wrote data ingestion source metadata for recipe {name} with id: {response[graphql_endpoint]} to DataHub." ) def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> None: