Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(go/adbc/driver/snowflake): improve bulk ingestion speed #1456

Merged
merged 24 commits into from
Jan 26, 2024

Conversation

joellubi
Copy link
Member

@joellubi joellubi commented Jan 12, 2024

What

  • Replace Snowflake bulk ingestion with Parquet-based approach with higher throughput and better type support
    • Previously: INSERT bind parameters were uploaded to a CSV-based stage, once per record batch
    • Now: Parquet files written concurrently to stage independently of record batch size. Parquet logical types are used to infer schema on COPY.
  • Tests to validate type support and consistency through Arrow -> Parquet -> Snowflake -> Arrow roundtrip
  • Improved type mapping between Arrow <-> Snowflake timestamps. TIMESTAMP_LTZ is more consistent with Arrow timestamp semantics than TIMESTAMP_TZ, which can lead to lossy roundtrips.
  • Minor bugfix where Snowflake local timestamps with timezone set to UTC were being interpreted as non-local.

Why

Methodology

The general approach for ingestion is most clearly demonstrated by the path taken when stmt.Bind() for a single record is used:

IngestRecord

flowchart LR
    A(Record) --> B(Write Parquet)
    B --> C(Upload File)
    C --> D(Execute COPY)
    D --> E(Check Row Count)
Loading

The Arrow record is written to a Parquet file due to its logical type support, compressibility, and native Snowflake support. The file is then uploaded to a temporary Snowflake stage via PUT query, and then loaded into the target table via COPY query. Once the COPY has finished, one more query to check the resulting row count is dispatched to accurately return the number of rows affected. This is used instead of counting the Arrow rows written in case there are any undetected losses when importing the uploaded file into Snowflake.

A similar approach is taken when ingesting an arbitrarily large stream of records via stmt.BindStream(), but makes use of several opportunities to parallelize the work involved at different stages:

IngestStream

flowchart LR

    A(Read Records) --> B(Write Parquet)

    A --> C(Write Parquet)
    A --> D(Write Parquet)
    A --> E(Write Parquet)

    B --> J(Buffer Pool)
    C --> J
    D --> J
    E --> J

    J --> K(Upload File)
    J --> L(Upload File)

    K --> M(Finalize COPY)
    L --> M

    M --> N(Check Row Count)


    O(File Ready) --> P(Execute COPY)
    P --> O
Loading

The same steps are used, but the stream of records is now distributed among a pool of Parquet writers. This step is inherently CPU-bound, so it is desirable for it to scale independently with the availability of logical cores for writing/compression. These Parquet files are written to a buffer pool in memory to help decouple the upload stage from writing, and so that a writer can start working on the next file while the last file it wrote is being uploaded. Uploads from the buffer pool also benefit from parallelism, but more so to maximize network utilization by limiting idle time between uploads and amortizing potential slowdown in any one upload.

Technically, only a single COPY command is required after the last file is uploaded in order to load the Parquet files into the Snowflake table. However, on many warehouses this operation takes as long or even longer than the upload itself but can be made faster by paying for a larger warehouse. Given the batched approach taken and that the COPY command is idempotent, we can execute COPY repeatedly as files are uploaded to load them into the table on an ongoing basis. These COPY queries are executed asynchronously and listen for an upload-completed callback to ensure at least one file will be loaded by the query (otherwise it will no-op so this just prevents spamming Snowflake with a bunch of no-op COPYs).

Empirically, ingestion works reasonably well on an XS warehouse. COPY speed is no longer a bottleneck with an S warehouse with high-speed home internet, or on an M warehouse with same-region data center networking.

Performance

Running on GCP e2-medium (shared-core 1 vCPU, 4GB RAM)
Snowflake warehouse size M, same GCP region as Snowflake account
Default ingestion settings

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):

  • Current: 11m50s
  • New: 14s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):

  • Current: Didn't attempt
  • New: 1m16s

This configuration is CPU bound, so I did another attempt with more cores available...
Now with GCP e2-standard-4 (4 vCPU, 16GB RAM)

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):

  • Current: 11m17s
  • New: 9.5s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):

  • Current: 1h47m
  • New: 45s

Considerations

  • Snowflake guides indicate that ingestion via CSV is the fastest. Experimentally, it does appear to be true that a COPY query on staged CSV files executes much faster than for Parquet files. However by distributing the COPY workloads in parallel to the batched file uploads, overall performance is better with Parquet since it can be compressed much more efficiently allowing the upload to complete in less time and with fewer bytes transferred than with CSV. Type support is also much better.
  • Single-Record ingestion performance is slightly worse than the previous INSERT-bind approach. As a rough idea, a record that previously ingested in about 1.7s now ingests in about 2.5s. However, the new approach does come with expanded type support and better consistency with the streaming approach.
  • An ingestion run that fails part-way through may leave the table with partial results. Transaction semantics may be added in the future by overriding the CopyConcurrency parameter to be 0, in which case only the final COPY will execute.

Additional Work

Blocking

  • Timestamps will roundtrip properly after Arrow GH-39466 is closed. A test is included but skipped for now.
  • Date64 will roundtrip properly after Arrow GH-39456 is closed. A test is included but skipped for now.

Non-Blocking

  • Compression codec and level are included in ingestOptions but are not configurable using stmt.SetOption(). It is trivial to add this, but it would be nice to be able to use the currently internal CompressionCodecFromString method to automatically pick up support for any other codecs added in the future. Captured in feat(go/adbc/driver/snowflake): Expose file compression options for bulk ingestion #1473.
  • List and Map types have some issues on ingestion. Snowflake returns SQL execution internal error whenever repetition level is greater than 0. Still some more investigation to do here. This is non-blocking because neither type was previously supported for ingestion.
  • Context cancelation is supported for all goroutines and queries executed as part of ingestion, except for the PUT query (i.e. file uploads). This issue is being tracked in gosnowflake 1028. In practice, it likely takes just a few seconds for in-progress uploads to complete and properly conclude cancelation. Once this issue is fixed, the queries would be canceled in Snowflake, allowing the process to exit faster and reduce unnecessary work.
  • The code previously meant to map Snowflake types to Go types is no longer used. It may still be useful for binding an Arrow record to an arbitrary Update query, but stmt.Prepare should be implemented first to follow ADBC spec for binding parameters.

@github-actions github-actions bot added this to the ADBC Libraries 0.10.0 milestone Jan 12, 2024
@joellubi joellubi marked this pull request as ready for review January 14, 2024 20:23
Comment on lines -215 to +218
q := int64(t) / int64(math.Pow10(int(srcMeta.Scale)))
r := int64(t) % int64(math.Pow10(int(srcMeta.Scale)))
v, err := arrow.TimestampFromTime(time.Unix(q, r), dt.Unit)
if err != nil {
return nil, err
}
tb.Append(v)
tb.Append(arrow.Timestamp(t))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, did Snowflake change how they return timestamp data? In addition, this change to use tb.Append(arrow.Timestamp(t)) doesn't take into account the timestamp unit for the input data type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's entirely possible that they did, the behavior seems to vary between different types (such as TIMESTAMP_TZ vs TIMESTAMP_LTZ). When I switched the type to TIMESTAMP_LTZ, all results were returned in this format rather than the struct representation.

In this particular case, I observed that the returned Int64 value corresponds to the scalar value of the timestamp in whatever unit is specified by the type. If the scale is 3, then the Int64 is already meant to denote milliseconds, and so on. Since the values are already in the unit specified by scale, we can just add the existing value to the array. The test TestSqlIngestTimestampTypes confirms the roundtrip behavior (currently skipped but it does pass when the upstream changes are pulled in).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the values are already in the unit specified by scale, we can just add the existing value to the array

This is assuming the unit for the arrow column matches the scale that snowflake is returning which i'm not sure is a valid assumption. We should ensure we scale the value properly to the desired arrow unit. (which is what the previous code did). Are we sure we can always assume the unit for the Arrow column will match the scale that Snowflake returned?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might have to make this assumption one way or another. In the existing approach we rely on srcMeta.Scale being correct, which comes from Snowflake. If that doesn't match the actual scale of the int64 value we get then the calculations to get q and r (sec and nsec) will be wrong as well. Given that assumption, the additional computation doesn't end up being necessary.

Copy link
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't read through the driver_test.go yet, but had a bunch of comments.

go/adbc/driver/snowflake/statement.go Outdated Show resolved Hide resolved
go/adbc/driver/snowflake/statement.go Show resolved Hide resolved
go/adbc/driver/snowflake/statement.go Outdated Show resolved Hide resolved
go/adbc/driver/snowflake/statement.go Outdated Show resolved Hide resolved
go/adbc/driver/snowflake/bulk_ingestion.go Outdated Show resolved Hide resolved
go/adbc/driver/snowflake/bulk_ingestion.go Outdated Show resolved Hide resolved
go/adbc/driver/snowflake/bulk_ingestion.go Outdated Show resolved Hide resolved
go/adbc/driver/snowflake/bulk_ingestion.go Outdated Show resolved Hide resolved
go/adbc/driver/snowflake/bulk_ingestion.go Outdated Show resolved Hide resolved
Comment on lines +526 to +534
type limitWriter struct {
w io.Writer
limit int

bytesWritten int
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than tracking the bytes written like this, the parquet writer already exposes the ability to check how many bytes have been written via a TotalBytesWritten method. Having this just seems redundant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for pointing that out, I wasn't aware that bytes were being tracked by the writer. I did give this a try but it appears that the most similar method exposed by the parquet writer is RowGroupTotalBytesWritten. This is pretty close but unfortunately the count resets with each new row group. Since multiple row groups may be created in a single call to Write in the case of records larger than the max row group size, it doesn't seem there's a straightforward way to account for all bytes written with this approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For situations like this, a single row group per file makes the most sense. We should set the max row group length to math.MaxInt64, at which point you can use the RowGroupTotalBytesWritten to handle it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach makes sense to me. I did try to add the parquet writer option parquet.WithMaxRowGroupLength(math.MaxInt64) but the value RowGroupTotalBytesWritten returns still doesn't seem to agree exactly with the total number of bytes the writer actually has going through it. It doesn't report as many bytes written for a given write as the limitWriter does. As a result the file sizes written using this approach are typically between 13-14 MB, whereas they were previously between 10-11 which is what I would expect. Any idea why this might be the case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the RowGroupTotalBytesWritten likely doesn't include the parquet metadata being written, but that still seems like a bit more of a discrepency than I'd expect. Maybe there's a spot in the library that I missed updating this value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened arrow@39789 to track this. I spent a bit of time looking into this but I think I'd like to isolate it with a test separately to help figure it out. Any objection to leaving the current approach with a reference to the issue, and following up with a fix separately?

@zeroshade
Copy link
Member

@joellubi this looks pretty good to me, do we know why that single test is failing?

pool := newBufferPool(int(st.ingestOptions.targetFileSize))
buffers := make(chan *bytes.Buffer, st.ingestOptions.writerConcurrency)
g.Go(func() error {
return runParallelParquetWriters(gCtx, schema, int(st.ingestOptions.targetFileSize), int(st.ingestOptions.writerConcurrency), parquetProps, arrowProps, pool.GetBuffer, records, buffers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split this line for readability?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I split this and a few other long ones.

@joellubi
Copy link
Member Author

@zeroshade How do you feel about the resolution to changes you requested, and about handling the row group bytes written as a separate issue? I rebased to bring in the v15 libraries and was able to un-skip the test that depended on the issues fixed upstream. All appears to be green now.

@zeroshade
Copy link
Member

@joellubi Looks good to me and we can address the bytes written as a separate issue. Thanks a ton for this! Would you mind commenting with any updated benchmarks on this comparing the old version to this, just so we have them in the notes (so we can include them in the release notes when this gets pushed out)

@zeroshade zeroshade merged commit b57e19b into apache:main Jan 26, 2024
33 checks passed
@joellubi
Copy link
Member Author

@joellubi Looks good to me and we can address the bytes written as a separate issue. Thanks a ton for this! Would you mind commenting with any updated benchmarks on this comparing the old version to this, just so we have them in the notes (so we can include them in the release notes when this gets pushed out)

Thanks! I just re-ran the benchmarks on the current main branch in the same cloud environment as the existing benchmarks. The numbers came out basically the same so I would say the numbers in the PR description and in the documentation are accurate.

soumyadsanyal pushed a commit to soumyadsanyal/arrow-adbc that referenced this pull request Jan 31, 2024
…1456)

# What

- Replace Snowflake bulk ingestion with Parquet-based approach with
higher throughput and better type support
- Previously: INSERT bind parameters were uploaded to a CSV-based stage,
once per record batch
- Now: Parquet files written concurrently to stage independently of
record batch size. Parquet logical types are used to infer schema on
COPY.
- Tests to validate type support and consistency through Arrow ->
Parquet -> Snowflake -> Arrow roundtrip
- Improved type mapping between Arrow <-> Snowflake timestamps.
[TIMESTAMP_LTZ](https://docs.snowflake.com/en/sql-reference/data-types-datetime#timestamp-ltz-timestamp-ntz-timestamp-tz)
is more consistent with Arrow timestamp semantics than TIMESTAMP_TZ,
which can lead to lossy roundtrips.
- Minor bugfix where Snowflake local timestamps with timezone set to UTC
were being interpreted as non-local.

# Why

- Implements apache#1327, which comes from improvement request apache#1322
- BindStream ingestion is significantly faster
- Arrow type support is improved

# Methodology

The general approach for ingestion is most clearly demonstrated by the
path taken when `stmt.Bind()` for a single record is used:
### IngestRecord
```mermaid
flowchart LR
    A(Record) --> B(Write Parquet)
    B --> C(Upload File)
    C --> D(Execute COPY)
    D --> E(Check Row Count)
```
The Arrow record is written to a Parquet file due to its logical type
support, compressibility, and native Snowflake support. The file is then
uploaded to a temporary Snowflake stage via PUT query, and then loaded
into the target table via COPY query. Once the COPY has finished, one
more query to check the resulting row count is dispatched to accurately
return the number of rows affected. This is used instead of counting the
Arrow rows written in case there are any undetected losses when
importing the uploaded file into Snowflake.

A similar approach is taken when ingesting an arbitrarily large stream
of records via `stmt.BindStream()`, but makes use of several
opportunities to parallelize the work involved at different stages:

### IngestStream
```mermaid
flowchart LR

    A(Read Records) --> B(Write Parquet)

    A --> C(Write Parquet)
    A --> D(Write Parquet)
    A --> E(Write Parquet)

    B --> J(Buffer Pool)
    C --> J
    D --> J
    E --> J

    J --> K(Upload File)
    J --> L(Upload File)

    K --> M(Finalize COPY)
    L --> M

    M --> N(Check Row Count)


    O(File Ready) --> P(Execute COPY)
    P --> O
```
The same steps are used, but the stream of records is now distributed
among a pool of Parquet writers. This step is inherently CPU-bound, so
it is desirable for it to scale independently with the availability of
logical cores for writing/compression. These Parquet files are written
to a buffer pool in memory to help decouple the upload stage from
writing, and so that a writer can start working on the next file _while_
the last file it wrote is being uploaded. Uploads from the buffer pool
also benefit from parallelism, but more so to maximize network
utilization by limiting idle time between uploads and amortizing
potential slowdown in any one upload.

Technically, only a single COPY command is required after the last file
is uploaded in order to load the Parquet files into the Snowflake table.
However, on many warehouses this operation takes as long or even longer
than the upload itself but can be made faster by paying for a larger
warehouse. Given the batched approach taken and that the COPY command is
idempotent, we can execute COPY repeatedly as files are uploaded to load
them into the table on an ongoing basis. These COPY queries are executed
asynchronously and listen for an upload-completed callback to ensure at
least one file will be loaded by the query (otherwise it will no-op so
this just prevents spamming Snowflake with a bunch of no-op COPYs).

Empirically, ingestion works reasonably well on an XS warehouse. COPY
speed is no longer a bottleneck with an S warehouse with high-speed home
internet, or on an M warehouse with same-region data center networking.

# Performance

Running on GCP e2-medium (shared-core 1 vCPU, 4GB RAM)
Snowflake warehouse size M, same GCP region as Snowflake account
Default ingestion settings

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):
- Current: 11m50s
- New: 14s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):
- Current: Didn't attempt
- New: 1m16s

_This configuration is CPU bound, so I did another attempt with more
cores available..._
Now with GCP e2-standard-4 (4 vCPU, 16GB RAM)

Benchmarking TPC-H Lineitem @ SF1 (6M Rows):
- Current: 11m17s
- New: 9.5s

Benchmarking TPC-H Lineitem @ SF10 (60M Rows):
- Current: 1h47m
- New: 45s

# Considerations

- Snowflake
[guides](https://community.snowflake.com/s/article/How-to-Load-Terabytes-Into-Snowflake-Speeds-Feeds-and-Techniques)
indicate that ingestion via CSV is the fastest. Experimentally, it does
appear to be true that a COPY query on staged CSV files executes much
faster than for Parquet files. However by distributing the COPY
workloads _in parallel to_ the batched file uploads, overall performance
is better with Parquet since it can be compressed _much_ more
efficiently allowing the upload to complete in less time and with fewer
bytes transferred than with CSV. Type support is also much better.
- Single-Record ingestion performance is slightly worse than the
previous INSERT-bind approach. As a rough idea, a record that previously
ingested in about 1.7s now ingests in about 2.5s. However, the new
approach does come with expanded type support and better consistency
with the streaming approach.
- An ingestion run that fails part-way through may leave the table with
partial results. Transaction semantics may be added in the future by
overriding the CopyConcurrency parameter to be 0, in which case only the
final COPY will execute.

# Additional Work

### Blocking
- ~Timestamps will roundtrip properly after Arrow
[GH-39466](apache/arrow#39466) is closed. A
test is included but skipped for now.~
- ~Date64 will roundtrip properly after Arrow
[GH-39456](apache/arrow#39456) is closed. A
test is included but skipped for now.~

### Non-Blocking
- Compression codec and level are included in `ingestOptions` but are
not configurable using `stmt.SetOption()`. It is trivial to add this,
but it would be nice to be able to use the currently internal
[CompressionCodecFromString](https://github.com/apache/arrow/blob/e6323646558ee01234ce58af273c5a834745f298/go/parquet/internal/gen-go/parquet/parquet.go#L387-L399)
method to automatically pick up support for any other codecs added in
the future. Captured in apache#1473.
- List and Map types have some issues on ingestion. Snowflake returns
`SQL execution internal error` whenever repetition level is greater than
0. Still some more investigation to do here. This is non-blocking
because neither type was previously supported for ingestion.
- Context cancelation is supported for all goroutines and queries
executed as part of ingestion, _except_ for the PUT query (i.e. file
uploads). This issue is being tracked in gosnowflake
[1028](snowflakedb/gosnowflake#1028). In
practice, it likely takes just a few seconds for in-progress uploads to
complete and properly conclude cancelation. Once this issue is fixed,
the queries would be canceled in Snowflake, allowing the process to exit
faster and reduce unnecessary work.
- ~The code previously meant to map Snowflake types to Go types is no
longer used. It may still be useful for binding an Arrow record to an
arbitrary Update query, but `stmt.Prepare` should be implemented first
to follow ADBC spec for binding parameters.~
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants