From ee13822979d6063500d5ad3cbb7fdf553aba6c51 Mon Sep 17 00:00:00 2001 From: joel Date: Wed, 24 Jan 2024 18:53:58 -0500 Subject: [PATCH] split long lines --- go/adbc/driver/snowflake/bulk_ingestion.go | 42 +++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/go/adbc/driver/snowflake/bulk_ingestion.go b/go/adbc/driver/snowflake/bulk_ingestion.go index 602d5c32e9..61b1b1da73 100644 --- a/go/adbc/driver/snowflake/bulk_ingestion.go +++ b/go/adbc/driver/snowflake/bulk_ingestion.go @@ -228,7 +228,17 @@ func (st *statement) ingestStream(ctx context.Context) (nrows int64, err error) pool := newBufferPool(int(st.ingestOptions.targetFileSize)) buffers := make(chan *bytes.Buffer, st.ingestOptions.writerConcurrency) g.Go(func() error { - return runParallelParquetWriters(gCtx, schema, int(st.ingestOptions.targetFileSize), int(st.ingestOptions.writerConcurrency), parquetProps, arrowProps, pool.GetBuffer, records, buffers) + return runParallelParquetWriters( + gCtx, + schema, + int(st.ingestOptions.targetFileSize), + int(st.ingestOptions.writerConcurrency), + parquetProps, + arrowProps, + pool.GetBuffer, + records, + buffers, + ) }) // Create a temporary stage, we can't start uploading until it has been created @@ -289,7 +299,14 @@ func readRecords(ctx context.Context, rdr array.RecordReader, out chan<- arrow.R return nil } -func writeParquet(schema *arrow.Schema, w io.Writer, in <-chan arrow.Record, targetSize int, parquetProps *parquet.WriterProperties, arrowProps pqarrow.ArrowWriterProperties) error { +func writeParquet( + schema *arrow.Schema, + w io.Writer, + in <-chan arrow.Record, + targetSize int, + parquetProps *parquet.WriterProperties, + arrowProps pqarrow.ArrowWriterProperties, +) error { limitWr := &limitWriter{w: w, limit: targetSize} pqWriter, err := pqarrow.NewFileWriter(schema, limitWr, parquetProps, arrowProps) if err != nil { @@ -312,7 +329,17 @@ func writeParquet(schema *arrow.Schema, w io.Writer, in <-chan arrow.Record, tar return io.EOF } -func runParallelParquetWriters(ctx context.Context, schema *arrow.Schema, targetSize int, concurrency int, parquetProps *parquet.WriterProperties, arrowProps pqarrow.ArrowWriterProperties, newBuffer func() *bytes.Buffer, in <-chan arrow.Record, out chan<- *bytes.Buffer) error { +func runParallelParquetWriters( + ctx context.Context, + schema *arrow.Schema, + targetSize int, + concurrency int, + parquetProps *parquet.WriterProperties, + arrowProps pqarrow.ArrowWriterProperties, + newBuffer func() *bytes.Buffer, + in <-chan arrow.Record, + out chan<- *bytes.Buffer, +) error { var once sync.Once defer close(out) @@ -378,7 +405,14 @@ func uploadStream(ctx context.Context, cn snowflakeConn, r io.Reader, name strin return nil } -func uploadAllStreams(ctx context.Context, cn snowflakeConn, streams <-chan *bytes.Buffer, concurrency int, freeBuffer func(*bytes.Buffer), uploadCallback func()) error { +func uploadAllStreams( + ctx context.Context, + cn snowflakeConn, + streams <-chan *bytes.Buffer, + concurrency int, + freeBuffer func(*bytes.Buffer), + uploadCallback func(), +) error { g, ctx := errgroup.WithContext(ctx) g.SetLimit(concurrency)