Skip to content

Commit

Permalink
split long lines
Browse files Browse the repository at this point in the history
  • Loading branch information
joellubi committed Jan 24, 2024
1 parent 0853403 commit ee13822
Showing 1 changed file with 38 additions and 4 deletions.
42 changes: 38 additions & 4 deletions go/adbc/driver/snowflake/bulk_ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,17 @@ func (st *statement) ingestStream(ctx context.Context) (nrows int64, err error)
pool := newBufferPool(int(st.ingestOptions.targetFileSize))
buffers := make(chan *bytes.Buffer, st.ingestOptions.writerConcurrency)
g.Go(func() error {
return runParallelParquetWriters(gCtx, schema, int(st.ingestOptions.targetFileSize), int(st.ingestOptions.writerConcurrency), parquetProps, arrowProps, pool.GetBuffer, records, buffers)
return runParallelParquetWriters(
gCtx,
schema,
int(st.ingestOptions.targetFileSize),
int(st.ingestOptions.writerConcurrency),
parquetProps,
arrowProps,
pool.GetBuffer,
records,
buffers,
)
})

// Create a temporary stage, we can't start uploading until it has been created
Expand Down Expand Up @@ -289,7 +299,14 @@ func readRecords(ctx context.Context, rdr array.RecordReader, out chan<- arrow.R
return nil
}

func writeParquet(schema *arrow.Schema, w io.Writer, in <-chan arrow.Record, targetSize int, parquetProps *parquet.WriterProperties, arrowProps pqarrow.ArrowWriterProperties) error {
func writeParquet(
schema *arrow.Schema,
w io.Writer,
in <-chan arrow.Record,
targetSize int,
parquetProps *parquet.WriterProperties,
arrowProps pqarrow.ArrowWriterProperties,
) error {
limitWr := &limitWriter{w: w, limit: targetSize}
pqWriter, err := pqarrow.NewFileWriter(schema, limitWr, parquetProps, arrowProps)
if err != nil {
Expand All @@ -312,7 +329,17 @@ func writeParquet(schema *arrow.Schema, w io.Writer, in <-chan arrow.Record, tar
return io.EOF
}

func runParallelParquetWriters(ctx context.Context, schema *arrow.Schema, targetSize int, concurrency int, parquetProps *parquet.WriterProperties, arrowProps pqarrow.ArrowWriterProperties, newBuffer func() *bytes.Buffer, in <-chan arrow.Record, out chan<- *bytes.Buffer) error {
func runParallelParquetWriters(
ctx context.Context,
schema *arrow.Schema,
targetSize int,
concurrency int,
parquetProps *parquet.WriterProperties,
arrowProps pqarrow.ArrowWriterProperties,
newBuffer func() *bytes.Buffer,
in <-chan arrow.Record,
out chan<- *bytes.Buffer,
) error {
var once sync.Once
defer close(out)

Expand Down Expand Up @@ -378,7 +405,14 @@ func uploadStream(ctx context.Context, cn snowflakeConn, r io.Reader, name strin
return nil
}

func uploadAllStreams(ctx context.Context, cn snowflakeConn, streams <-chan *bytes.Buffer, concurrency int, freeBuffer func(*bytes.Buffer), uploadCallback func()) error {
func uploadAllStreams(
ctx context.Context,
cn snowflakeConn,
streams <-chan *bytes.Buffer,
concurrency int,
freeBuffer func(*bytes.Buffer),
uploadCallback func(),
) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)

Expand Down

0 comments on commit ee13822

Please sign in to comment.