Skip to content

Commit

Permalink
feat: add lineage support (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
eakmanrq authored Aug 12, 2024
1 parent efa1d10 commit daa03c4
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ See something that you would like to see supported? [Open an issue](https://gith
* [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html)
* [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html)
* [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html)
* lineage
* Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation.
* [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html)
* [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html)
* [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html)
Expand Down
2 changes: 2 additions & 0 deletions docs/duckdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ See something that you would like to see supported? [Open an issue](https://gith
* [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html)
* [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html)
* [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html)
* lineage
* Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation.
* [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html)
* [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html)
* [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html)
Expand Down
2 changes: 2 additions & 0 deletions docs/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ See something that you would like to see supported? [Open an issue](https://gith
* [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html)
* [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html)
* [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html)
* lineage
* Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation.
* [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html)
* [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html)
* [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html)
Expand Down
2 changes: 2 additions & 0 deletions docs/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ See something that you would like to see supported? [Open an issue](https://gith
* [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html)
* [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html)
* [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html)
* lineage
* Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation.
* [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html)
* [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html)
* [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html)
Expand Down
2 changes: 2 additions & 0 deletions docs/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ df.show(5)
* [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html)
* [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html)
* [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html)
* lineage
* Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation.
* [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html)
* [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html)
* [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html)
Expand Down
2 changes: 2 additions & 0 deletions docs/standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ See something that you would like to see supported? [Open an issue](https://gith
* [intersectAll](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.intersectAll.html)
* [join](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html)
* [limit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.limit.html)
* lineage
* Get lineage for a specific column. [Returns a SQLGlot Node](https://sqlglot.com/sqlglot/lineage.html#Node). Can be used to get lineage SQL or HTML representation.
* [na](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.na.html)
* [orderBy](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.orderBy.html)
* [persist](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.persist.html)
Expand Down
8 changes: 8 additions & 0 deletions sqlframe/base/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from prettytable import PrettyTable
from sqlglot import Dialect
from sqlglot import expressions as exp
from sqlglot import lineage as sqlglot_lineage
from sqlglot.helper import ensure_list, flatten, object_to_dict, seq_get
from sqlglot.optimizer.pushdown_projections import pushdown_projections
from sqlglot.optimizer.qualify import qualify
Expand Down Expand Up @@ -1613,6 +1614,13 @@ def print_schema(
0,
)

def lineage(self, col: ColumnOrName, optimize: bool = True) -> sqlglot_lineage.Node:
return sqlglot_lineage.lineage(
column=self._ensure_and_normalize_col(col).alias_or_name,
sql=self._get_expressions(optimize=optimize)[0],
schema=self.session.catalog._schema,
)

def toPandas(self) -> pd.DataFrame:
return self.session._fetchdf(self._get_expressions(optimize=False))

Expand Down
11 changes: 11 additions & 0 deletions tests/unit/standalone/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,14 @@ def test_expand_star_table_alias(standalone_employee: StandaloneDataFrame):
standalone_employee.alias("blah").select("blah.*").sql(pretty=False, optimize=False)
== "WITH `t51718876` AS (SELECT CAST(`employee_id` AS INT) AS `employee_id`, CAST(`fname` AS STRING) AS `fname`, CAST(`lname` AS STRING) AS `lname`, CAST(`age` AS INT) AS `age`, CAST(`store_id` AS INT) AS `store_id` FROM VALUES (1, 'Jack', 'Shephard', 37, 1), (2, 'John', 'Locke', 65, 1), (3, 'Kate', 'Austen', 37, 2), (4, 'Claire', 'Littleton', 27, 2), (5, 'Hugo', 'Reyes', 29, 100) AS `a1`(`employee_id`, `fname`, `lname`, `age`, `store_id`)), `t37842204` AS (SELECT `employee_id`, `fname`, `lname`, `age`, `store_id` FROM `t51718876`) SELECT `t37842204`.`employee_id`, `t37842204`.`fname`, `t37842204`.`lname`, `t37842204`.`age`, `t37842204`.`store_id` FROM `t37842204`"
)


def test_lineage(standalone_employee: StandaloneDataFrame):
assert (
standalone_employee.lineage("age").source.sql()
== "SELECT a1.age AS age FROM (VALUES (1, 'Jack', 'Shephard', 37, 1), (2, 'John', 'Locke', 65, 1), (3, 'Kate', 'Austen', 37, 2), (4, 'Claire', 'Littleton', 27, 2), (5, 'Hugo', 'Reyes', 29, 100)) AS a1(employee_id, fname, lname, age, store_id)"
)
assert (
standalone_employee.session.sql("SELECT * FROM employee").lineage("age").source.sql()
== "SELECT employee.age AS age FROM employee AS employee"
)

0 comments on commit daa03c4

Please sign in to comment.