Skip to content

Commit

Permalink
openlineage: tests: do not check whitespace in returned SQL statement…
Browse files Browse the repository at this point in the history
… after splitting it (apache#40826)
  • Loading branch information
mobuchowski authored Jul 16, 2024
1 parent dc6cc58 commit 616c881
Showing 1 changed file with 43 additions and 52 deletions.
95 changes: 43 additions & 52 deletions tests/providers/openlineage/test_sqlparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
ColumnLineageDatasetFacetFieldsAdditionalInputFields,
SchemaDatasetFacet,
SchemaField,
SqlJobFacet,
)
from openlineage.client.run import Dataset
from openlineage.common.sql import DbTableMeta

from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.sqlparser import DatabaseInfo, GetTableSchemasParams, SQLParser

DB_NAME = "FOOD_DELIVERY"
Expand Down Expand Up @@ -315,57 +313,50 @@ def test_generate_openlineage_metadata_from_sql(self, mock_parse, parser_returns
FROM top_delivery_times
)"""
expected_schema = "PUBLIC" if parser_returns_schema else "ANOTHER_SCHEMA"
expected = OperatorLineage(
inputs=[
Dataset(
namespace="myscheme://host:port",
name=f"{expected_schema}.top_delivery_times",
facets={
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="order_id", type="int4"),
SchemaField(name="order_placed_on", type="timestamp"),
SchemaField(name="customer_email", type="varchar"),
]
)
},
)
],
outputs=[
Dataset(
namespace="myscheme://host:port",
name=f"{expected_schema}.popular_orders_day_of_week",
facets={
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="order_day_of_week", type="varchar"),
SchemaField(name="order_placed_on", type="timestamp"),
SchemaField(name="orders_placed", type="int4"),
]
),
"columnLineage": ColumnLineageDatasetFacet(
fields={
"order_day_of_week": ColumnLineageDatasetFacetFieldsAdditional(
inputFields=[
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
namespace="myscheme://host:port",
name=f"{expected_schema}.top_delivery_times",
field="order_placed_on",
)
],
transformationDescription="",
transformationType="",
)
}
),
},
)
],
job_facets={"sql": SqlJobFacet(query=formatted_sql)},
)

assert expected == parser.generate_openlineage_metadata_from_sql(
metadata = parser.generate_openlineage_metadata_from_sql(
sql=sql,
hook=hook,
database_info=db_info,
)

assert metadata.inputs == [
Dataset(
namespace="myscheme://host:port",
name=f"{expected_schema}.top_delivery_times",
facets={
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name="order_id", type="int4"),
SchemaField(name="order_placed_on", type="timestamp"),
SchemaField(name="customer_email", type="varchar"),
]
)
},
)
]
assert len(metadata.outputs) == 1
assert metadata.outputs[0].namespace == "myscheme://host:port"
assert metadata.outputs[0].name == f"{expected_schema}.popular_orders_day_of_week"
assert metadata.outputs[0].facets["schema"] == SchemaDatasetFacet(
fields=[
SchemaField(name="order_day_of_week", type="varchar"),
SchemaField(name="order_placed_on", type="timestamp"),
SchemaField(name="orders_placed", type="int4"),
]
)
assert metadata.outputs[0].facets["columnLineage"] == ColumnLineageDatasetFacet(
fields={
"order_day_of_week": ColumnLineageDatasetFacetFieldsAdditional(
inputFields=[
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
namespace="myscheme://host:port",
name=f"{expected_schema}.top_delivery_times",
field="order_placed_on",
)
],
transformationDescription="",
transformationType="",
)
}
)
assert metadata.job_facets["sql"].query.replace(" ", "") == formatted_sql.replace(" ", "")

0 comments on commit 616c881

Please sign in to comment.