Skip to content

Commit

Permalink
feat!: Add Table class to support merge, update and delete statements (
Browse files Browse the repository at this point in the history
…#235)

Co-authored-by: Juan Barreto <[email protected]>
  • Loading branch information
zerodarkzone and jbarreto-riot authored Jan 22, 2025
1 parent fc02ed0 commit 8b23ea2
Show file tree
Hide file tree
Showing 37 changed files with 2,212 additions and 65 deletions.
212 changes: 212 additions & 0 deletions docs/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -599,3 +599,215 @@ See something that you would like to see supported? [Open an issue](https://gith
* [rowsBetween](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.WindowSpec.rowsBetween.html)
* sql
* SQLFrame Specific: Get the SQL representation of the WindowSpec


## Extra Functionality not Present in PySpark

SQLFrame supports the following extra functionality not in PySpark

### Table Class

SQLFrame provides a `Table` class that supports extra DML operations like `update`, `delete` and `merge`. This class is returned when using the `table` function from the `DataFrameReader` class.

```python
import google.auth
from google.api_core import client_info
from google.oauth2 import service_account
from google.cloud.bigquery.dbapi import connect
from sqlframe.bigquery import BigQuerySession
from sqlframe.base.table import WhenMatched, WhenNotMatched, WhenNotMatchedBySource

creds = service_account.Credentials.from_service_account_file("path/to/credentials.json")

client = google.cloud.bigquery.Client(
project="my-project",
credentials=creds,
location="us-central1",
client_info=client_info.ClientInfo(user_agent="sqlframe"),
)

conn = connect(client=client)
session = BigQuerySession(conn=conn, default_dataset="sqlframe.db1")

df_employee = session.createDataFrame(
[
{"id": 1, "fname": "Jack", "lname": "Shephard", "age": 37, "store_id": 1},
{"id": 2, "fname": "John", "lname": "Locke", "age": 65, "store_id": 2},
{"id": 3, "fname": "Kate", "lname": "Austen", "age": 37, "store_id": 3},
{"id": 4, "fname": "Claire", "lname": "Littleton", "age": 27, "store_id": 1},
{"id": 5, "fname": "Hugo", "lname": "Reyes", "age": 29, "store_id": 3},
]
)

df_employee.write.mode("overwrite").saveAsTable("employee")

table_employee = session.table("employee") # This object is of Type DatabricksTable
```

#### Update Statement
The `update` method of the `Table` class is equivalent to the `UPDATE table_name` statement used in standard `sql`.

```python
# Generates a `LazyExpression` object which can be executed using the `execute` method
update_expr = table_employee.update(
set_={"age": table_employee["age"] + 1},
where=table_employee["id"] == 1,
)

# Excecutes the update statement
update_expr.execute()

# Show the result
table_employee.show()
```

Output:
```
+----+--------+-----------+-----+----------+
| id | fname | lname | age | store_id |
+----+--------+-----------+-----+----------+
| 1 | Jack | Shephard | 38 | 1 |
| 2 | John | Locke | 65 | 2 |
| 3 | Kate | Austen | 37 | 3 |
| 4 | Claire | Littleton | 27 | 1 |
| 5 | Hugo | Reyes | 29 | 3 |
+----+--------+-----------+-----+----------+
```
#### Delete Statement
The `delete` method of the `Table` class is equivalent to the `DELETE FROM table_name` statement used in standard `sql`.

```python
# Generates a `LazyExpression` object which can be executed using the `execute` method
delete_expr = table_employee.delete(
where=table_employee["id"] == 1,
)

# Excecutes the delete statement
delete_expr.execute()

# Show the result
table_employee.show()
```

Output:
```
+----+--------+-----------+-----+----------+
| id | fname | lname | age | store_id |
+----+--------+-----------+-----+----------+
| 2 | John | Locke | 65 | 2 |
| 3 | Kate | Austen | 37 | 3 |
| 4 | Claire | Littleton | 27 | 1 |
| 5 | Hugo | Reyes | 29 | 3 |
+----+--------+-----------+-----+----------+
```
#### Merge Statement

The `merge` method of the `Table` class is equivalent to the `MERGE INTO table_name` statement used in some `sql` engines.

```python
df_new_employee = session.createDataFrame(
[
{"id": 1, "fname": "Jack", "lname": "Shephard", "age": 38, "store_id": 1, "delete": False},
{"id": 2, "fname": "Cate", "lname": "Austen", "age": 39, "store_id": 5, "delete": False},
{"id": 5, "fname": "Ugo", "lname": "Reyes", "age": 29, "store_id": 3, "delete": True},
{"id": 6, "fname": "Sun-Hwa", "lname": "Kwon", "age": 27, "store_id": 5, "delete": False},
]
)

# Generates a `LazyExpression` object which can be executed using the `execute` method
merge_expr = table_employee.merge(
df_new_employee,
condition=table_employee["id"] == df_new_employee["id"],
clauses=[
WhenMatched(condition=table_employee["fname"] == df_new_employee["fname"]).update(
set_={
"age": df_new_employee["age"],
}
),
WhenMatched(condition=df_new_employee["delete"]).delete(),
WhenNotMatched().insert(
values={
"id": df_new_employee["id"],
"fname": df_new_employee["fname"],
"lname": df_new_employee["lname"],
"age": df_new_employee["age"],
"store_id": df_new_employee["store_id"],
}
),
],
)

# Excecutes the merge statement
merge_expr.execute()

# Show the result
table_employee.show()
```

Output:
```
+----+---------+-----------+-----+----------+
| id | fname | lname | age | store_id |
+----+---------+-----------+-----+----------+
| 1 | Jack | Shephard | 38 | 1 |
| 2 | John | Locke | 65 | 2 |
| 3 | Kate | Austen | 37 | 3 |
| 4 | Claire | Littleton | 27 | 1 |
| 6 | Sun-Hwa | Kwon | 27 | 5 |
+----+---------+-----------+-----+----------+
```


Some engines like `BigQuery` support an extra clause inside the `merge` statement which is `WHEN NOT MATCHED BY SOURCE THEN DELETE`.

```python
df_new_employee = session.createDataFrame(
[
{"id": 1, "fname": "Jack", "lname": "Shephard", "age": 38, "store_id": 1},
{"id": 2, "fname": "Cate", "lname": "Austen", "age": 39, "store_id": 5},
{"id": 5, "fname": "Hugo", "lname": "Reyes", "age": 29, "store_id": 3},
{"id": 6, "fname": "Sun-Hwa", "lname": "Kwon", "age": 27, "store_id": 5},
]
)

# Generates a `LazyExpression` object which can be executed using the `execute` method
merge_expr = table_employee.merge(
df_new_employee,
condition=table_employee["id"] == df_new_employee["id"],
clauses=[
WhenMatched(condition=table_employee["fname"] == df_new_employee["fname"]).update(
set_={
"age": df_new_employee["age"],
}
),
WhenNotMatched().insert(
values={
"id": df_new_employee["id"],
"fname": df_new_employee["fname"],
"lname": df_new_employee["lname"],
"age": df_new_employee["age"],
"store_id": df_new_employee["store_id"],
}
),
WhenNotMatchedBySource().delete(),
],
)

# Excecutes the merge statement
merge_expr.execute()

# Show the result
table_employee.show()
```

Output:
```
+----+---------+-----------+-----+----------+
| id | fname | lname | age | store_id |
+----+---------+-----------+-----+----------+
| 1 | Jack | Shephard | 38 | 1 |
| 2 | John | Locke | 65 | 2 |
| 5 | Hugo | Reyes | 29 | 3 |
| 6 | Sun-Hwa | Kwon | 27 | 5 |
+----+---------+-----------+-----+----------+
```
Loading

0 comments on commit 8b23ea2

Please sign in to comment.