Skip to content

Commit

Permalink
Apply review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro93 committed Jun 29, 2023
1 parent f0f67bc commit a4ff7e6
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class EntityTypeMapper {
.put(EntityType.TEST, "test")
.put(EntityType.DATAHUB_VIEW, Constants.DATAHUB_VIEW_ENTITY_NAME)
.put(EntityType.DATA_PRODUCT, Constants.DATA_PRODUCT_ENTITY_NAME)
.put(EntityType.INGESTION_SOURCE, Constants.INGESTION_SOURCE_ENTITY_NAME)
.build();

private static final Map<String, EntityType> ENTITY_NAME_TO_TYPE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesInput;
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesResult;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
Expand All @@ -20,6 +21,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -51,14 +53,16 @@ public CompletableFuture<ListIngestionSourcesResult> get(final DataFetchingEnvir
final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart();
final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();
final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery();
final List<FacetFilterInput> filters = input.getFilters() == null ? Collections.emptyList() : input.getFilters();

return CompletableFuture.supplyAsync(() -> {
try {
// First, get all ingestion sources Urns.
final SearchResult gmsResult = _entityClient.search(
Constants.INGESTION_SOURCE_ENTITY_NAME,
query,
Collections.emptyMap(),
buildFilter(filters, Collections.emptyList()),
null,
start,
count,
context.getAuthentication(),
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ input ListIngestionSourcesInput {
An optional search query
"""
query: String

"""
Optional Facet filters to apply to the result set
"""
filters: [FacetFilterInput!]
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class ListIngestionSourceResolverTest {

private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null);
private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null, null);

@Test
public void testGetSuccess() throws Exception {
Expand Down
6 changes: 4 additions & 2 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ To schedule a recipe called "test", to run at 5am everyday, London time with the
datahub ingest deploy --name "test" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml
````

The `name` parameter specifies the recipe name to be uploaded, or if the name already exists, updates an existing recipe.
**Note**: In case the name contains whitespaces the cli will convert them to underscores (`_`).
To update an existing recipe please use the `--urn` parameter to specify the id of the recipe to update.

**Note:** Updating a recipe will result in a replacement of the existing options with what was specified in the cli command.
I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated.

### init

Expand Down
6 changes: 5 additions & 1 deletion docs/ui-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,16 @@ Once you're happy with your changes, simply click 'Done' to save.
</TabItem>
<TabItem value="cli" label="CLI" default>

Using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy).
You can upload and even update recipes using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy).
An example execution would look something like:

```bash
datahub ingest deploy --name "My Test Ingestion Source" --schedule "5 * * * *" --time-zone "UTC" -c recipe.yaml
```

This would create a new recipe with the name `My Test Ingestion Source`. Note that to update an existing recipe, it's `urn` id must be passed as a parameter.
DataHub supports having multiple recipes with the same name so to distinguish them we use the urn for unique identification.

</TabItem>
<TabItem value="graphql" label="GraphQL" default>

Expand Down
111 changes: 66 additions & 45 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import os
import sys
import tzlocal
import pytz
from string import Template
from datetime import datetime
from typing import Optional, Union
from typing import Optional

import click
import click_spinner
Expand Down Expand Up @@ -226,11 +226,18 @@ async def run_func_check_upgrade(pipeline: Pipeline) -> None:
help="Config file in .toml or .yaml format.",
required=True,
)
@click.option(
"--urn",
type=str,
help="Urn of recipe to update",
required=False,
)
@click.option(
"--executor-id",
type=str,
default="default",
help="Executor id to route execution requests to. Do not use this unless you have configured a custom executor.",
required=False,
)
@click.option(
"--cli-version",
Expand All @@ -256,6 +263,7 @@ async def run_func_check_upgrade(pipeline: Pipeline) -> None:
def deploy(
name: str,
config: str,
urn: str,
executor_id: str,
cli_version: str,
schedule: str,
Expand All @@ -270,62 +278,75 @@ def deploy(

datahub_graph = get_default_graph()

urns = list(datahub_graph.get_urns_by_filter(
entity_types=[IngestionSourceUrn.ENTITY_TYPE],
query=name
))

print(urns)

if len(urns) > 1:
click.echo(f"Found multiple ingestion source urns for {name}: {urns}")
exit()

endpoint = ""
input_obj = {}
graphql_query = f"mutation {endpoint}({input_obj})"
graphql_query_template = Template(
"""
mutation $endpoint ( $variable_types ) {
$endpoint ( $input )
}""")

pipeline_config = load_config_file(
config,
squirrel_original_config=True,
squirrel_field="__raw_config",
allow_stdin=True,
resolve_env_vars=False,
)

print(pipeline_config)

if len(urns) == 1:
logger.info(f"Found recipe urn {urns[0]} for {name}, will update this recipe.")
endpoint = "updateIngestionSource"
input_obj = {
"urn": urns[0],
"input": {
"name": name,
"type": pipeline_config["source"]["type"],
"schedule": {
"interval": schedule,
"timezone": time_zone,
},
"config": json.dumps(pipeline_config)
variables: dict = {
"name": name,
"type": pipeline_config["source"]["type"],
"recipe": json.dumps(pipeline_config),
"executorId": executor_id,
}
graphql_variable_types: dict = {
"$name": "String!",
"$type": "String!",
"$recipe": "String!",
"$executorId": "String!",
}
input_template: dict = {
"input": {
"name": "$name",
"type": "$type",
"config": {
"recipe": "$recipe",
"executorId": "$executorId"
}
}
}
graphql_endpoint: str

if urn:
if not datahub_graph.exists(urn):
logger.error(f"Could not find recipe for provided urn: {urn}")
exit()
logger.info(f"Found recipe URN, will update recipe.")
graphql_endpoint = "updateIngestionSource"
input_template["urn"] = "$urn"
graphql_variable_types["$urn"] = "String!"
variables["urn"] = urn
else:
logger.info(f"No ingestion source urn found for {name}. Will create a new recipe.")
endpoint = "createIngestionSource"
input_obj = {
"types": types,
"query": query,
"orFilters": orFilters,
"batchSize": batch_size,
"scrollId": scroll_id,
}
logger.info(f"No URN specified recipe urn, will create a new recipe.")
graphql_endpoint = "createIngestionSource"

if schedule:
input_template["input"]["schedule"] = "$schedule"
graphql_variable_types["$schedule"] = "UpdateIngestionSourceScheduleInput"
variables["schedule"] = {"interval": schedule, "timezone": time_zone}

if cli_version:
input_template["input"]["config"]["version"] = "$version"
graphql_variable_types["$version"] = "String"
variables["version"] = cli_version

graphql_query = graphql_query_template.substitute(
endpoint=graphql_endpoint,
variable_types=json.dumps(graphql_variable_types, sort_keys=True).replace("\"", "")[1:-1],
input=json.dumps(input_template, sort_keys=True).replace("\"", "")[1:-1]
)

exit()
#datahub_graph.execute_graphql(graphql_query)
response = datahub_graph.execute_graphql(graphql_query, variables=variables)

click.echo(
f"✅ Successfully wrote data ingestion source metadata for {ingestion_source_urn} to DataHub ({datahub_graph})"
f"✅ Successfully wrote data ingestion source metadata for recipe {name} with id: {response[graphql_endpoint]} to DataHub."
)

def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> None:
Expand Down

0 comments on commit a4ff7e6

Please sign in to comment.