Skip to content

Commit

Permalink
Add snowflake to application parameter to configuration (#1266)
Browse files Browse the repository at this point in the history
* Add snowflake to application parameter to configuration

* Set default application parameter if it is not specified

* Adjust tests and for connection params

* Use empty string to skip setting the application parameter

* Set default value for application parameter

* Fix if check bug

* Uppercase SNOWFLAKE_APPLICATION_ID and re-use in tests

* Add note in docs about application parameter for snowflake

* Update text for snowflake's application connection parameter

* Fix typo

* Update docs/website/docs/dlt-ecosystem/destinations/snowflake.md

Co-authored-by: VioletM <[email protected]>

* Update snowflake.md

* Update doc

---------

Co-authored-by: VioletM <[email protected]>
  • Loading branch information
sultaniman and VioletM authored Apr 24, 2024
1 parent 4763496 commit 5d296bc
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 15 deletions.
14 changes: 14 additions & 0 deletions dlt/destinations/impl/snowflake/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def _read_private_key(private_key: str, password: Optional[str] = None) -> bytes
)


SNOWFLAKE_APPLICATION_ID = "dltHub_dlt"


@configspec(init=False)
class SnowflakeCredentials(ConnectionStringCredentials):
drivername: Final[str] = dataclasses.field(default="snowflake", init=False, repr=False, compare=False) # type: ignore[misc]
Expand All @@ -60,6 +63,7 @@ class SnowflakeCredentials(ConnectionStringCredentials):
authenticator: Optional[str] = None
private_key: Optional[TSecretStrValue] = None
private_key_passphrase: Optional[TSecretStrValue] = None
application: Optional[str] = SNOWFLAKE_APPLICATION_ID

__config_gen_annotations__: ClassVar[List[str]] = ["password", "warehouse", "role"]

Expand All @@ -85,6 +89,10 @@ def to_url(self) -> URL:
query["warehouse"] = self.warehouse
if self.role and "role" not in query:
query["role"] = self.role

if self.application != "" and "application" not in query:
query["application"] = self.application

return URL.create(
self.drivername,
self.username,
Expand All @@ -99,6 +107,7 @@ def to_connector_params(self) -> Dict[str, Any]:
private_key: Optional[bytes] = None
if self.private_key:
private_key = _read_private_key(self.private_key, self.private_key_passphrase)

conn_params = dict(
self.query or {},
user=self.username,
Expand All @@ -109,8 +118,13 @@ def to_connector_params(self) -> Dict[str, Any]:
role=self.role,
private_key=private_key,
)

if self.authenticator:
conn_params["authenticator"] = self.authenticator

if self.application != "" and "application" not in conn_params:
conn_params["application"] = self.application

return conn_params


Expand Down
31 changes: 17 additions & 14 deletions docs/website/docs/dlt-ecosystem/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ keywords: [Snowflake, destination, data warehouse]

# Snowflake

## Install dlt with Snowflake
**To install the dlt library with Snowflake dependencies, run:**
## Install `dlt` with Snowflake
**To install the `dlt` library with Snowflake dependencies, run:**
```sh
pip install dlt[snowflake]
```
Expand All @@ -25,7 +25,7 @@ pip install -r requirements.txt
```
This will install `dlt` with the `snowflake` extra, which contains the Snowflake Python dbapi client.

**3. Create a new database, user, and give dlt access.**
**3. Create a new database, user, and give `dlt` access.**

Read the next chapter below.

Expand All @@ -44,7 +44,6 @@ In the case of Snowflake, the **host** is your [Account Identifier](https://docs

The **warehouse** and **role** are optional if you assign defaults to your user. In the example below, we do not do that, so we set them explicitly.


### Setup the database user and permissions
The instructions below assume that you use the default account setup that you get after creating a Snowflake account. You should have a default warehouse named **COMPUTE_WH** and a Snowflake account. Below, we create a new database, user, and assign permissions. The permissions are very generous. A more experienced user can easily reduce `dlt` permissions to just one schema in the database.
```sql
Expand All @@ -57,7 +56,7 @@ CREATE ROLE DLT_LOADER_ROLE;
GRANT ROLE DLT_LOADER_ROLE TO USER loader;
-- give database access to new role
GRANT USAGE ON DATABASE dlt_data TO DLT_LOADER_ROLE;
-- allow dlt to create new schemas
-- allow `dlt` to create new schemas
GRANT CREATE SCHEMA ON DATABASE dlt_data TO ROLE DLT_LOADER_ROLE
-- allow access to a warehouse named COMPUTE_WH
GRANT USAGE ON WAREHOUSE COMPUTE_WH TO DLT_LOADER_ROLE;
Expand Down Expand Up @@ -143,22 +142,22 @@ Names of tables and columns in [schemas](../../general-usage/schema.md) are kept

## Staging support

Snowflake supports S3 and GCS as file staging destinations. dlt will upload files in the parquet format to the bucket provider and will ask Snowflake to copy their data directly into the db.
Snowflake supports S3 and GCS as file staging destinations. `dlt` will upload files in the parquet format to the bucket provider and will ask Snowflake to copy their data directly into the db.

Alternatively to parquet files, you can also specify jsonl as the staging file format. For this, set the `loader_file_format` argument of the `run` command of the pipeline to `jsonl`.

### Snowflake and Amazon S3

Please refer to the [S3 documentation](./filesystem.md#aws-s3) to learn how to set up your bucket with the bucket_url and credentials. For S3, the dlt Redshift loader will use the AWS credentials provided for S3 to access the S3 bucket if not specified otherwise (see config options below). Alternatively, you can create a stage for your S3 Bucket by following the instructions provided in the [Snowflake S3 documentation](https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration).
Please refer to the [S3 documentation](./filesystem.md#aws-s3) to learn how to set up your bucket with the bucket_url and credentials. For S3, the `dlt` Redshift loader will use the AWS credentials provided for S3 to access the S3 bucket if not specified otherwise (see config options below). Alternatively, you can create a stage for your S3 Bucket by following the instructions provided in the [Snowflake S3 documentation](https://docs.snowflake.com/en/user-guide/data-load-s3-config-storage-integration).
The basic steps are as follows:

* Create a storage integration linked to GCS and the right bucket
* Grant access to this storage integration to the Snowflake role you are using to load the data into Snowflake.
* Create a stage from this storage integration in the PUBLIC namespace, or the namespace of the schema of your data.
* Also grant access to this stage for the role you are using to load data into Snowflake.
* Provide the name of your stage (including the namespace) to dlt like so:
* Provide the name of your stage (including the namespace) to `dlt` like so:

To prevent dlt from forwarding the S3 bucket credentials on every command, and set your S3 stage, change these settings:
To prevent `dlt` from forwarding the S3 bucket credentials on every command, and set your S3 stage, change these settings:

```toml
[destination]
Expand All @@ -168,7 +167,7 @@ stage_name="PUBLIC.my_s3_stage"
To run Snowflake with S3 as the staging destination:

```py
# Create a dlt pipeline that will load
# Create a `dlt` pipeline that will load
# chess player data to the Snowflake destination
# via staging on S3
pipeline = dlt.pipeline(
Expand All @@ -187,7 +186,7 @@ Please refer to the [Google Storage filesystem documentation](./filesystem.md#go
* Grant access to this storage integration to the Snowflake role you are using to load the data into Snowflake.
* Create a stage from this storage integration in the PUBLIC namespace, or the namespace of the schema of your data.
* Also grant access to this stage for the role you are using to load data into Snowflake.
* Provide the name of your stage (including the namespace) to dlt like so:
* Provide the name of your stage (including the namespace) to `dlt` like so:

```toml
[destination]
Expand All @@ -197,7 +196,7 @@ stage_name="PUBLIC.my_gcs_stage"
To run Snowflake with GCS as the staging destination:

```py
# Create a dlt pipeline that will load
# Create a `dlt` pipeline that will load
# chess player data to the Snowflake destination
# via staging on GCS
pipeline = dlt.pipeline(
Expand All @@ -218,7 +217,7 @@ Please consult the Snowflake Documentation on [how to create a stage for your Az
* Grant access to this storage integration to the Snowflake role you are using to load the data into Snowflake.
* Create a stage from this storage integration in the PUBLIC namespace, or the namespace of the schema of your data.
* Also grant access to this stage for the role you are using to load data into Snowflake.
* Provide the name of your stage (including the namespace) to dlt like so:
* Provide the name of your stage (including the namespace) to `dlt` like so:

```toml
[destination]
Expand All @@ -228,7 +227,7 @@ stage_name="PUBLIC.my_azure_stage"
To run Snowflake with Azure as the staging destination:

```py
# Create a dlt pipeline that will load
# Create a `dlt` pipeline that will load
# chess player data to the Snowflake destination
# via staging on Azure
pipeline = dlt.pipeline(
Expand All @@ -255,5 +254,9 @@ This destination [integrates with dbt](../transformations/dbt/dbt.md) via [dbt-s
### Syncing of `dlt` state
This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination)

### Snowflake connection identifier
We enable Snowflake to identify that the connection is created by `dlt`. Snowflake will use this identifier to better understand the usage patterns
associated with `dlt` integration. The connection identifier is `dltHub_dlt`.

<!--@@@DLT_TUBA snowflake-->

31 changes: 30 additions & 1 deletion tests/load/snowflake/test_snowflake_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dlt.common.utils import digest128

from dlt.destinations.impl.snowflake.configuration import (
SNOWFLAKE_APPLICATION_ID,
SnowflakeClientConfiguration,
SnowflakeCredentials,
)
Expand All @@ -21,7 +22,7 @@


def test_connection_string_with_all_params() -> None:
url = "snowflake://user1:pass1@host1/db1?warehouse=warehouse1&role=role1&private_key=cGs%3D&private_key_passphrase=paphr"
url = "snowflake://user1:pass1@host1/db1?application=dltHub_dlt&warehouse=warehouse1&role=role1&private_key=cGs%3D&private_key_passphrase=paphr"

creds = SnowflakeCredentials()
creds.parse_native_representation(url)
Expand All @@ -36,9 +37,20 @@ def test_connection_string_with_all_params() -> None:
assert creds.private_key_passphrase == "paphr"

expected = make_url(url)
to_url_value = str(creds.to_url())

# Test URL components regardless of query param order
assert make_url(creds.to_native_representation()) == expected
assert to_url_value == str(expected)

creds.application = "custom"
url = "snowflake://user1:pass1@host1/db1?application=custom&warehouse=warehouse1&role=role1&private_key=cGs%3D&private_key_passphrase=paphr"
creds.parse_native_representation(url)
expected = make_url(url)
to_url_value = str(creds.to_url())
assert make_url(creds.to_native_representation()) == expected
assert to_url_value == str(expected)
assert "application=custom" in str(expected)


def test_to_connector_params() -> None:
Expand Down Expand Up @@ -66,6 +78,8 @@ def test_to_connector_params() -> None:
password=None,
warehouse="warehouse1",
role="role1",
# default application identifier will be used
application=SNOWFLAKE_APPLICATION_ID,
)

# base64 encoded DER key
Expand All @@ -79,6 +93,8 @@ def test_to_connector_params() -> None:
creds.host = "host1"
creds.warehouse = "warehouse1"
creds.role = "role1"
# set application identifier and check it
creds.application = "custom_app_id"

params = creds.to_connector_params()

Expand All @@ -92,6 +108,7 @@ def test_to_connector_params() -> None:
password=None,
warehouse="warehouse1",
role="role1",
application="custom_app_id",
)


Expand All @@ -103,12 +120,14 @@ def test_snowflake_credentials_native_value(environment) -> None:
)
# set password via env
os.environ["CREDENTIALS__PASSWORD"] = "pass"
os.environ["CREDENTIALS__APPLICATION"] = "dlt"
c = resolve_configuration(
SnowflakeCredentials(),
explicit_value="snowflake://user1@host1/db1?warehouse=warehouse1&role=role1",
)
assert c.is_resolved()
assert c.password == "pass"
assert "application=dlt" in str(c.to_url())
# # but if password is specified - it is final
c = resolve_configuration(
SnowflakeCredentials(),
Expand All @@ -126,6 +145,16 @@ def test_snowflake_credentials_native_value(environment) -> None:
)
assert c.is_resolved()
assert c.private_key == "pk"
assert "application=dlt" in str(c.to_url())

# check with application = "" it should not be in connection string
os.environ["CREDENTIALS__APPLICATION"] = ""
c = resolve_configuration(
SnowflakeCredentials(),
explicit_value="snowflake://user1@host1/db1?warehouse=warehouse1&role=role1",
)
assert c.is_resolved()
assert "application=" not in str(c.to_url())


def test_snowflake_configuration() -> None:
Expand Down

0 comments on commit 5d296bc

Please sign in to comment.