Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(go/adbc/driver/snowflake): improve bulk ingestion speed #1456

Merged
merged 24 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 60 additions & 30 deletions docs/source/driver/snowflake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -259,37 +259,61 @@ Bulk Ingestion
Bulk ingestion is supported. The mapping from Arrow types to Snowflake types
is provided below.

Partitioned Result Sets
-----------------------
Bulk ingestion is implemented by writing Arrow data to Parquet file(s) and uploading (via PUT) to a temporary internal stage.
One or more COPY queries are executed in order to load the data into the target table.

Partitioned result sets are not currently supported.
In order for the driver to leverage this temporary stage, the user must have
the `CREATE STAGE <https://docs.snowflake.com/en/sql-reference/sql/create-stage>` privilege on the schema. In addition,
the current database and schema for the session must be set. If these are not set, the ``CREATE TEMPORARY STAGE`` command
executed by the driver can fail with the following error:

Performance
-----------
.. code-block:: sql

Formal benchmarking is forthcoming. Snowflake does provide an Arrow native
format for requesting results, but bulk ingestion is still currently executed
using the REST API. As described in the `Snowflake Documentation
<https://pkg.go.dev/github.com/snowflakedb/gosnowflake#hdr-Batch_Inserts_and_Binding_Parameters>`
the driver will potentially attempt to improve performance by streaming the data
(without creating files on the local machine) to a temporary stage for ingestion
if the number of values exceeds some threshold.
CREATE TEMPORARY STAGE ADBC$BIND FILE_FORMAT = (TYPE = PARQUET USE_LOGICAL_TYPE = TRUE BINARY_AS_TEXT = FALSE)
CANNOT perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA' or use a qualified name.

In order for the driver to leverage this temporary stage, the user must have
the ``CREATE STAGE`` privilege on the schema. If the user does not have this
privilege, the driver will fall back to sending the data with the query
to the snowflake database.
The following informal benchmark demonstrates expected performance using default ingestion settings::

In addition, the current database and schema for the session must be set. If
these are not set, the ``CREATE TEMPORARY STAGE`` command executed by the driver
can fail with the following error:
Running on GCP e2-standard-4 (4 vCPU, 16GB RAM)
Snowflake warehouse size M, same GCP region as Snowflake account
Default ingestion settings

.. code-block:: sql
TPC-H Lineitem (16 Columns):
Scale Factor 1 (6M Rows): 9.5s
Scale Factor 10 (60M Rows): 45s

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
CANNOT perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA' or use a qualified name.
The default settings for ingestion should be well balanced for many real-world configurations. If required, performance
and resource usage may be tuned with the following options on the :cpp:class:`AdbcStatement` object:

``adbc.snowflake.rpc.ingest_writer_concurrency``
Number of Parquet files to write in parallel. Default attempts to maximize workers based on logical cores detected,
but may need to be adjusted if running in a constrained environment. If set to 0, default value is used. Cannot be negative.

``adbc.snowflake.rpc.ingest_upload_concurrency``
Number of Parquet files to upload in parallel. Greater concurrency can smooth out TCP congestion and help make
use of available network bandwith, but will increase memory utilization. Default is 8. If set to 0, default value is used.
Cannot be negative.

``adbc.snowflake.rpc.ingest_copy_concurrency``
Maximum number of COPY operations to run concurrently. Bulk ingestion performance is optimized by executing COPY
queries as files are still being uploaded. Snowflake COPY speed scales with warehouse size, so smaller warehouses
may benefit from setting this value higher to ensure long-running COPY queries do not block newly uploaded files
from being loaded. Default is 4. If set to 0, only a single COPY query will be executed as part of ingestion,
once all files have finished uploading. Cannot be negative.

``adbc.snowflake.rpc.ingest_target_file_size``
Approximate size of Parquet files written during ingestion. Actual size will be slightly larger, depending on
size of footer/metadata. Default is 10 MB. If set to 0, file size has no limit. Cannot be negative.

Partitioned Result Sets
-----------------------

Partitioned result sets are not currently supported.

Performance
-----------

In addition, results are potentially fetched in parallel from multiple endpoints.
When querying Snowflake data, results are potentially fetched in parallel from multiple endpoints.
A limited number of batches are queued per endpoint, though data is always
returned to the client in the order of the endpoints.

Expand Down Expand Up @@ -490,16 +514,19 @@ indicated are done to ensure consistency of the stream of record batches.
- Notes

* - integral types
- int64
- All integral types in Snowflake are stored as 64-bit integers.
- number(38, 0)
- All integral types in Snowflake are stored as numbers for which neither
precision nor scale can be specified.

* - float/double
- float64
- Snowflake does not distinguish between float or double. Both are 64-bit values.

* - decimal/numeric
- int64/float64
- If scale == 0, then int64 is used, else float64.
- numeric
- Snowflake will respect the precision/scale of the Arrow type. See the
``adbc.snowflake.sql.client_option.use_high_precision`` for exceptions to this
behavior.

* - time
- time64[ns]
Expand All @@ -513,8 +540,9 @@ indicated are done to ensure consistency of the stream of record batches.
| timestamp_ntz
| timestamp_tz
- timestamp[ns]
- Local time zone will be used. No timezone will be specified in
the Arrow type. Values will be converted to UTC.
- Local time zone will be used, except for timestamp_ntz which is not an instant.
In this case no timezone will be present in the type. Physical values will be
UTC-normalized.

* - | variant
| object
Expand All @@ -523,7 +551,9 @@ indicated are done to ensure consistency of the stream of record batches.
- Snowflake does not provide information about nested
types. Values will be strings in a format similar to JSON that
can be parsed. The Arrow type will contain a metadata key
``logicalType`` with the Snowflake field type.
``logicalType`` with the Snowflake field type. Arrow Struct and
Map types will be stored as objects when ingested. List types will
be stored as arrays.

* - | geography
| geometry
Expand Down
Loading
Loading