From 616c8816bf6b9a0e448a335a134b3a806d430bd0 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 17 Jul 2024 00:14:22 +0200 Subject: [PATCH] openlineage: tests: do not check whitespace in returned SQL statement after splitting it (#40826) --- tests/providers/openlineage/test_sqlparser.py | 95 +++++++++---------- 1 file changed, 43 insertions(+), 52 deletions(-) diff --git a/tests/providers/openlineage/test_sqlparser.py b/tests/providers/openlineage/test_sqlparser.py index 30e74eefcb55..020d8384c6bc 100644 --- a/tests/providers/openlineage/test_sqlparser.py +++ b/tests/providers/openlineage/test_sqlparser.py @@ -26,12 +26,10 @@ ColumnLineageDatasetFacetFieldsAdditionalInputFields, SchemaDatasetFacet, SchemaField, - SqlJobFacet, ) from openlineage.client.run import Dataset from openlineage.common.sql import DbTableMeta -from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.sqlparser import DatabaseInfo, GetTableSchemasParams, SQLParser DB_NAME = "FOOD_DELIVERY" @@ -315,57 +313,50 @@ def test_generate_openlineage_metadata_from_sql(self, mock_parse, parser_returns FROM top_delivery_times )""" expected_schema = "PUBLIC" if parser_returns_schema else "ANOTHER_SCHEMA" - expected = OperatorLineage( - inputs=[ - Dataset( - namespace="myscheme://host:port", - name=f"{expected_schema}.top_delivery_times", - facets={ - "schema": SchemaDatasetFacet( - fields=[ - SchemaField(name="order_id", type="int4"), - SchemaField(name="order_placed_on", type="timestamp"), - SchemaField(name="customer_email", type="varchar"), - ] - ) - }, - ) - ], - outputs=[ - Dataset( - namespace="myscheme://host:port", - name=f"{expected_schema}.popular_orders_day_of_week", - facets={ - "schema": SchemaDatasetFacet( - fields=[ - SchemaField(name="order_day_of_week", type="varchar"), - SchemaField(name="order_placed_on", type="timestamp"), - SchemaField(name="orders_placed", type="int4"), - ] - ), - "columnLineage": ColumnLineageDatasetFacet( - fields={ - "order_day_of_week": ColumnLineageDatasetFacetFieldsAdditional( - inputFields=[ - ColumnLineageDatasetFacetFieldsAdditionalInputFields( - namespace="myscheme://host:port", - name=f"{expected_schema}.top_delivery_times", - field="order_placed_on", - ) - ], - transformationDescription="", - transformationType="", - ) - } - ), - }, - ) - ], - job_facets={"sql": SqlJobFacet(query=formatted_sql)}, - ) - - assert expected == parser.generate_openlineage_metadata_from_sql( + metadata = parser.generate_openlineage_metadata_from_sql( sql=sql, hook=hook, database_info=db_info, ) + + assert metadata.inputs == [ + Dataset( + namespace="myscheme://host:port", + name=f"{expected_schema}.top_delivery_times", + facets={ + "schema": SchemaDatasetFacet( + fields=[ + SchemaField(name="order_id", type="int4"), + SchemaField(name="order_placed_on", type="timestamp"), + SchemaField(name="customer_email", type="varchar"), + ] + ) + }, + ) + ] + assert len(metadata.outputs) == 1 + assert metadata.outputs[0].namespace == "myscheme://host:port" + assert metadata.outputs[0].name == f"{expected_schema}.popular_orders_day_of_week" + assert metadata.outputs[0].facets["schema"] == SchemaDatasetFacet( + fields=[ + SchemaField(name="order_day_of_week", type="varchar"), + SchemaField(name="order_placed_on", type="timestamp"), + SchemaField(name="orders_placed", type="int4"), + ] + ) + assert metadata.outputs[0].facets["columnLineage"] == ColumnLineageDatasetFacet( + fields={ + "order_day_of_week": ColumnLineageDatasetFacetFieldsAdditional( + inputFields=[ + ColumnLineageDatasetFacetFieldsAdditionalInputFields( + namespace="myscheme://host:port", + name=f"{expected_schema}.top_delivery_times", + field="order_placed_on", + ) + ], + transformationDescription="", + transformationType="", + ) + } + ) + assert metadata.job_facets["sql"].query.replace(" ", "") == formatted_sql.replace(" ", "")