From daa03c498fd35a93d1413164ba770475a8eca80b Mon Sep 17 00:00:00 2001 From: Ryan Eakman <6326532+eakmanrq@users.noreply.github.com> Date: Sun, 11 Aug 2024 18:57:26 -0700 Subject: [PATCH] feat: add lineage support (#140) --- docs/bigquery.md | 2 ++ docs/duckdb.md | 2 ++ docs/postgres.md | 2 ++ docs/snowflake.md | 2 ++ docs/spark.md | 2 ++ docs/standalone.md | 2 ++ sqlframe/base/dataframe.py | 8 ++++++++ tests/unit/standalone/test_dataframe.py | 11 +++++++++++ 8 files changed, 31 insertions(+) diff --git a/docs/bigquery.md b/docs/bigquery.md index 0e543bb..51085c8 100644 --- a/docs/bigquery.md +++ b/docs/bigquery.md @@ -214,6 +214,8 @@ See something that you would like to see supported? [Open an issue](https://gith * [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html) * [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html) * [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html) +* lineage + * Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation. * [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html) * [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html) * [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html) diff --git a/docs/duckdb.md b/docs/duckdb.md index a9eb03a..835f032 100644 --- a/docs/duckdb.md +++ b/docs/duckdb.md @@ -187,6 +187,8 @@ See something that you would like to see supported? [Open an issue](https://gith * [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html) * [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html) * [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html) +* lineage + * Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation. * [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html) * [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html) * [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html) diff --git a/docs/postgres.md b/docs/postgres.md index 3619aed..23138d4 100644 --- a/docs/postgres.md +++ b/docs/postgres.md @@ -198,6 +198,8 @@ See something that you would like to see supported? [Open an issue](https://gith * [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html) * [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html) * [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html) +* lineage + * Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation. * [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html) * [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html) * [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html) diff --git a/docs/snowflake.md b/docs/snowflake.md index ebdf97a..620e0eb 100644 --- a/docs/snowflake.md +++ b/docs/snowflake.md @@ -209,6 +209,8 @@ See something that you would like to see supported? [Open an issue](https://gith * [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html) * [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html) * [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html) +* lineage + * Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation. * [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html) * [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html) * [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html) diff --git a/docs/spark.md b/docs/spark.md index 41d1fa7..4511442 100644 --- a/docs/spark.md +++ b/docs/spark.md @@ -156,6 +156,8 @@ df.show(5) * [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html) * [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html) * [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html) +* lineage + * Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation. * [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html) * [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html) * [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html) diff --git a/docs/standalone.md b/docs/standalone.md index 51b88bd..7269842 100644 --- a/docs/standalone.md +++ b/docs/standalone.md @@ -133,6 +133,8 @@ See something that you would like to see supported? [Open an issue](https://gith * [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html) * [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html) * [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html) +* lineage + * Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation. * [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html) * [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html) * [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html) diff --git a/sqlframe/base/dataframe.py b/sqlframe/base/dataframe.py index 08665c1..823e5aa 100644 --- a/sqlframe/base/dataframe.py +++ b/sqlframe/base/dataframe.py @@ -17,6 +17,7 @@ from prettytable import PrettyTable from sqlglot import Dialect from sqlglot import expressions as exp +from sqlglot import lineage as sqlglot_lineage from sqlglot.helper import ensure_list, flatten, object_to_dict, seq_get from sqlglot.optimizer.pushdown_projections import pushdown_projections from sqlglot.optimizer.qualify import qualify @@ -1613,6 +1614,13 @@ def print_schema( 0, ) + def lineage(self, col: ColumnOrName, optimize: bool = True) -> sqlglot_lineage.Node: + return sqlglot_lineage.lineage( + column=self._ensure_and_normalize_col(col).alias_or_name, + sql=self._get_expressions(optimize=optimize)[0], + schema=self.session.catalog._schema, + ) + def toPandas(self) -> pd.DataFrame: return self.session._fetchdf(self._get_expressions(optimize=False)) diff --git a/tests/unit/standalone/test_dataframe.py b/tests/unit/standalone/test_dataframe.py index 80f84a2..865bb60 100644 --- a/tests/unit/standalone/test_dataframe.py +++ b/tests/unit/standalone/test_dataframe.py @@ -137,3 +137,14 @@ def test_expand_star_table_alias(standalone_employee: StandaloneDataFrame): standalone_employee.alias("blah").select("blah.*").sql(pretty=False, optimize=False) == "WITH `t51718876` AS (SELECT CAST(`employee_id` AS INT) AS `employee_id`, CAST(`fname` AS STRING) AS `fname`, CAST(`lname` AS STRING) AS `lname`, CAST(`age` AS INT) AS `age`, CAST(`store_id` AS INT) AS `store_id` FROM VALUES (1, 'Jack', 'Shephard', 37, 1), (2, 'John', 'Locke', 65, 1), (3, 'Kate', 'Austen', 37, 2), (4, 'Claire', 'Littleton', 27, 2), (5, 'Hugo', 'Reyes', 29, 100) AS `a1`(`employee_id`, `fname`, `lname`, `age`, `store_id`)), `t37842204` AS (SELECT `employee_id`, `fname`, `lname`, `age`, `store_id` FROM `t51718876`) SELECT `t37842204`.`employee_id`, `t37842204`.`fname`, `t37842204`.`lname`, `t37842204`.`age`, `t37842204`.`store_id` FROM `t37842204`" ) + + +def test_lineage(standalone_employee: StandaloneDataFrame): + assert ( + standalone_employee.lineage("age").source.sql() + == "SELECT a1.age AS age FROM (VALUES (1, 'Jack', 'Shephard', 37, 1), (2, 'John', 'Locke', 65, 1), (3, 'Kate', 'Austen', 37, 2), (4, 'Claire', 'Littleton', 27, 2), (5, 'Hugo', 'Reyes', 29, 100)) AS a1(employee_id, fname, lname, age, store_id)" + ) + assert ( + standalone_employee.session.sql("SELECT * FROM employee").lineage("age").source.sql() + == "SELECT employee.age AS age FROM employee AS employee" + )