Skip to content

Commit

Permalink
feat: Add Activate to replace PySpark Imports (#155)
Browse files Browse the repository at this point in the history
feat: add pyspark replace
  • Loading branch information
eakmanrq authored Aug 25, 2024
1 parent 32336fb commit 43e3557
Show file tree
Hide file tree
Showing 44 changed files with 1,185 additions and 150 deletions.
84 changes: 56 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ SQLFrame also has a "Standalone" session that be used to generate SQL without an

SQLFrame is great for:

* Users who want to run PySpark DataFrame code without having to use a Spark cluster
* Users who want a DataFrame API that leverages the full power of their engine to do the processing
* Users who want to run PySpark code quickly locally without the overhead of starting a Spark session
* Users who want a SQL representation of their DataFrame code for debugging or sharing with others
* See [Spark Engine](https://sqlframe.readthedocs.io/en/stable/spark/) for more details
* Users who want a DataFrame API that leverages the full power of their engine to do the processing
* Users who want to run PySpark DataFrame code without the complexity of using Spark for processing

## Installation

Expand All @@ -45,44 +45,72 @@ See specific engine documentation for additional setup instructions.
## Configuration

SQLFrame generates consistently accurate yet complex SQL for engine execution.
However, when using df.sql(), it produces more human-readable SQL.
However, when using df.sql(optimize=True), it produces more human-readable SQL.
For details on how to configure this output and leverage OpenAI to enhance the SQL, see [Generated SQL Configuration](https://sqlframe.readthedocs.io/en/stable/configuration/#generated-sql).

SQLFrame by default uses the Spark dialect for input and output.
This can be changed to make SQLFrame feel more like a native DataFrame API for the engine you are using.
See [Input and Output Dialect Configuration](https://sqlframe.readthedocs.io/en/stable/configuration/#input-and-output-dialect).

## Activating SQLFrame

SQLFrame can either replace pyspark imports or be used alongside them.
To replace pyspark imports, use the [activate function](https://sqlframe.readthedocs.io/en/stable/configuration/#activating-sqlframe) to set the engine to use.

```python
from sqlframe import activate

# Activate SQLFrame to run directly on DuckDB
activate(engine="duckdb")

from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()
```

SQLFrame can also be directly imported which both maintains pyspark imports but also allows for a more engine-native DataFrame API:

```python
from sqlframe.duckdb import DuckDBSession

session = DuckDBSession.builder.getOrCreate()
```

## Example Usage

```python
from sqlframe.bigquery import BigQuerySession
from sqlframe.bigquery import functions as F
from sqlframe.bigquery import Window
from sqlframe import activate

# Activate SQLFrame to run directly on BigQuery
activate(engine="bigquery")

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

session = BigQuerySession()
session = SparkSession.builder.getOrCreate()
table_path = '"bigquery-public-data".samples.natality'
# Top 5 years with the greatest year-over-year % change in new families with single child
df = (
session.table(table_path)
.where(F.col("ever_born") == 1)
.groupBy("year")
.agg(F.count("*").alias("num_single_child_families"))
.withColumn(
"last_year_num_single_child_families",
F.lag(F.col("num_single_child_families"), 1).over(Window.orderBy("year"))
)
.withColumn(
"percent_change",
(F.col("num_single_child_families") - F.col("last_year_num_single_child_families"))
/ F.col("last_year_num_single_child_families")
)
.orderBy(F.abs(F.col("percent_change")).desc())
.select(
F.col("year").alias("year"),
F.format_number("num_single_child_families", 0).alias("new families single child"),
F.format_number(F.col("percent_change") * 100, 2).alias("percent change"),
)
.limit(5)
session.table(table_path)
.where(F.col("ever_born") == 1)
.groupBy("year")
.agg(F.count("*").alias("num_single_child_families"))
.withColumn(
"last_year_num_single_child_families",
F.lag(F.col("num_single_child_families"), 1).over(Window.orderBy("year"))
)
.withColumn(
"percent_change",
(F.col("num_single_child_families") - F.col("last_year_num_single_child_families"))
/ F.col("last_year_num_single_child_families")
)
.orderBy(F.abs(F.col("percent_change")).desc())
.select(
F.col("year").alias("year"),
F.format_number("num_single_child_families", 0).alias("new families single child"),
F.format_number(F.col("percent_change") * 100, 2).alias("percent change"),
)
.limit(5)
)
```
```python
Expand Down
88 changes: 72 additions & 16 deletions docs/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,62 @@
pip install "sqlframe[bigquery]"
```

## Enabling SQLFrame

SQLFrame can be used in two ways:

* Directly importing the `sqlframe.bigquery` package
* Using the [activate](./configuration.md#activating-sqlframe) function to allow for continuing to use `pyspark.sql` but have it use SQLFrame behind the scenes.

### Import

If converting a PySpark pipeline, all `pyspark.sql` should be replaced with `sqlframe.bigquery`.
In addition, many classes will have a `BigQuery` prefix.
For example, `BigQueryDataFrame` instead of `DataFrame`.


```python
# PySpark import
# from pyspark.sql import SparkSession
# from pyspark.sql import functions as F
# from pyspark.sql.dataframe import DataFrame
# SQLFrame import
from sqlframe.bigquery import BigQuerySession
from sqlframe.bigquery import functions as F
from sqlframe.bigquery import BigQueryDataFrame
```

### Activate

If you would like to continue using `pyspark.sql` but have it use SQLFrame behind the scenes, you can use the [activate](./configuration.md#activating-sqlframe) function.

```python
from sqlframe import activate
activate("bigquery", config={"default_dataset": "sqlframe.db1"})

from pyspark.sql import SparkSession
```

`SparkSession` will now be a SQLFrame `BigQuerySession` object and everything will be run on BigQuery directly.

See [activate configuration](./configuration.md#activating-sqlframe) for information on how to pass in a connection and config options.

## Creating a Session

SQLFrame uses the [BigQuery DBAPI Connection](https://cloud.google.com/python/docs/reference/bigquery/latest/dbapi#class-googlecloudbigquerydbapiconnectionclientnone-bqstorageclientnone) to connect to BigQuery.
A BigQuerySession, which implements the PySpark Session API, can be created by passing in a `google.cloud.bigquery.dbapi.Connection` object or by allowing SQLFrame to create a connection for you.
By default, SQLFrame will create a connection by inferring it from the environment (for example using gcloud auth).
Regardless of approach, it is recommended to configure `default_dataset` in the `BigQuerySession` constructor in order to make it easier to use the catalog methods (see example below).

=== "Without Providing Connection"
=== "Import + Without Providing Connection"

```python
from sqlframe.bigquery import BigQuerySession

session = BigQuerySession(default_dataset="sqlframe.db1")
```

=== "With Providing Connection"
=== "Import + With Providing Connection"

```python
import google.auth
Expand All @@ -43,23 +83,39 @@ Regardless of approach, it is recommended to configure `default_dataset` in the
session = BigQuerySession(conn=conn, default_dataset="sqlframe.db1")
```

## Imports
=== "Activate + Without Providing Connection"

If converting a PySpark pipeline, all `pyspark.sql` should be replaced with `sqlframe.bigquery`.
In addition, many classes will have a `BigQuery` prefix.
For example, `BigQueryDataFrame` instead of `DataFrame`.
```python
from sqlframe import activate
activate("bigquery", config={"default_dataset": "sqlframe.db1"})
from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()
```

=== "Activate + With Providing Connection"

```python
# PySpark import
# from pyspark.sql import SparkSession
# from pyspark.sql import functions as F
# from pyspark.sql.dataframe import DataFrame
# SQLFrame import
from sqlframe.bigquery import BigQuerySession
from sqlframe.bigquery import functions as F
from sqlframe.bigquery import BigQueryDataFrame
```
```python
import google.auth
from google.api_core import client_info
from google.oauth2 import service_account
from google.cloud.bigquery.dbapi import connect
from sqlframe import activate
creds = service_account.Credentials.from_service_account_file("path/to/credentials.json")

client = google.cloud.bigquery.Client(
project="my-project",
credentials=creds,
location="us-central1",
client_info=client_info.ClientInfo(user_agent="sqlframe"),
)

conn = connect(client=client)
activate("bigquery", conn=conn, config={"default_dataset": "sqlframe.db1"})

from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()
```

## Using BigQuery Unique Functions

Expand Down
47 changes: 47 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,53 @@ In this configuration, you can use BigQuery syntax for elements such as date for

SQLFrame supports multiple dialects, all of which can be specific as the `input_dialect` and `output_dialect`.

## Activating SQLFrame

SQLFrame can be activated in order to replace `pyspark` imports with `sqlframe` imports for the given engine.
This allows you to use SQLFrame as a drop-in replacement for PySpark by just adding two lines of code.

### Activate with Engine

If you just provide an engine to `activate` then it will create a connection for that engine with default settings (if the engine supports it).

```python

from sqlframe import activate
activate("duckdb")

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# "spark" is not a SQLFrame DuckDBSession and will run directly on DuckDB
```

### Activate with Connection

If you provide a connection to `activate` then it will use that connection for the engine.

```python
import duckdb
from sqlframe import activate
connection = duckdb.connect("file.duckdb")
activate("duckdb", conn=connection)

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# "spark" is a SQLFrame DuckDBSession and will run directly on DuckDB using `file.duckdb` for persistence
```

### Activate with Configuration

If you provide a configuration to `activate` then it will use that configuration to create a connection for the engine.

```python
from sqlframe import activate
activate("duckdb", config={"sqlframe.input.dialect": "duckdb"})

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# "spark" is a SQLFrame DuckDBSession and will run directly on DuckDB with input dialect set to DuckDB
```

## Generated SQL

### Pretty
Expand Down
Loading

0 comments on commit 43e3557

Please sign in to comment.