Skip to content

Commit

Permalink
Merge pull request #2978 from rockwotj/streaming-opt
Browse files Browse the repository at this point in the history
snowflake: optimize performance of common data paths
  • Loading branch information
rockwotj authored Nov 4, 2024
2 parents 7ea878e + 7c62dca commit 86922d7
Show file tree
Hide file tree
Showing 12 changed files with 527 additions and 299 deletions.
10 changes: 10 additions & 0 deletions docs/modules/components/pages/outputs/snowflake_streaming.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ output:
mapping: "" # No default (optional)
init_statement: | # No default (optional)
CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
build_parallelism: 1
batching:
count: 0
byte_size: 0
Expand Down Expand Up @@ -335,6 +336,15 @@ init_statement: |2
ALTER TABLE t1 ADD COLUMN a2 NUMBER;
```
=== `build_parallelism`
The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.
*Type*: `int`
*Default*: `1`
=== `batching`
Allows you to configure a xref:configuration:batching.adoc[batching policy].
Expand Down
63 changes: 40 additions & 23 deletions internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ import (
)

const (
ssoFieldAccount = "account"
ssoFieldUser = "user"
ssoFieldRole = "role"
ssoFieldDB = "database"
ssoFieldSchema = "schema"
ssoFieldTable = "table"
ssoFieldKey = "private_key"
ssoFieldKeyFile = "private_key_file"
ssoFieldKeyPass = "private_key_pass"
ssoFieldInitStatement = "init_statement"
ssoFieldBatching = "batching"
ssoFieldChannelPrefix = "channel_prefix"
ssoFieldMapping = "mapping"
ssoFieldAccount = "account"
ssoFieldUser = "user"
ssoFieldRole = "role"
ssoFieldDB = "database"
ssoFieldSchema = "schema"
ssoFieldTable = "table"
ssoFieldKey = "private_key"
ssoFieldKeyFile = "private_key_file"
ssoFieldKeyPass = "private_key_pass"
ssoFieldInitStatement = "init_statement"
ssoFieldBatching = "batching"
ssoFieldChannelPrefix = "channel_prefix"
ssoFieldMapping = "mapping"
ssoFieldBuildParallelism = "build_parallelism"
)

func snowflakeStreamingOutputConfig() *service.ConfigSpec {
Expand Down Expand Up @@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
ALTER TABLE t1 ALTER COLUMN c1 DROP NOT NULL;
ALTER TABLE t1 ADD COLUMN a2 NUMBER;
`),
service.NewIntField(ssoFieldBuildParallelism).Description("The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.").Default(1).Advanced(),
service.NewBatchPolicyField(ssoFieldBatching),
service.NewOutputMaxInFlightField(),
service.NewStringField(ssoFieldChannelPrefix).
Expand Down Expand Up @@ -266,6 +268,10 @@ func newSnowflakeStreamer(
return nil, err
}
}
buildParallelism, err := conf.FieldInt(ssoFieldBuildParallelism)
if err != nil {
return nil, err
}
var channelPrefix string
if conf.Contains(ssoFieldChannelPrefix) {
channelPrefix, err = conf.FieldString(ssoFieldChannelPrefix)
Expand Down Expand Up @@ -331,8 +337,11 @@ func newSnowflakeStreamer(
logger: mgr.Logger(),
buildTime: mgr.Metrics().NewTimer("snowflake_build_output_latency_ns"),
uploadTime: mgr.Metrics().NewTimer("snowflake_upload_latency_ns"),
convertTime: mgr.Metrics().NewTimer("snowflake_convert_latency_ns"),
serializeTime: mgr.Metrics().NewTimer("snowflake_serialize_latency_ns"),
compressedOutput: mgr.Metrics().NewCounter("snowflake_compressed_output_size_bytes"),
initStatementsFn: initStatementsFn,
buildParallelism: buildParallelism,
}
return o, nil
}
Expand All @@ -345,6 +354,9 @@ type snowflakeStreamerOutput struct {
compressedOutput *service.MetricCounter
uploadTime *service.MetricTimer
buildTime *service.MetricTimer
convertTime *service.MetricTimer
serializeTime *service.MetricTimer
buildParallelism int

channelPrefix, db, schema, table string
mapping *bloblang.Executor
Expand All @@ -368,11 +380,12 @@ func (o *snowflakeStreamerOutput) openNewChannel(ctx context.Context) (*streamin
func (o *snowflakeStreamerOutput) openChannel(ctx context.Context, name string, id int16) (*streaming.SnowflakeIngestionChannel, error) {
o.logger.Debugf("opening snowflake streaming channel: %s", name)
return o.client.OpenChannel(ctx, streaming.ChannelOptions{
ID: id,
Name: name,
DatabaseName: o.db,
SchemaName: o.schema,
TableName: o.table,
ID: id,
Name: name,
DatabaseName: o.db,
SchemaName: o.schema,
TableName: o.table,
BuildParallelism: o.buildParallelism,
})
}

Expand Down Expand Up @@ -415,12 +428,16 @@ func (o *snowflakeStreamerOutput) WriteBatch(ctx context.Context, batch service.
return fmt.Errorf("unable to open snowflake streaming channel: %w", err)
}
}
o.logger.Debugf("inserting rows using channel %s", channel.Name)
stats, err := channel.InsertRows(ctx, batch)
o.compressedOutput.Incr(int64(stats.CompressedOutputSize))
o.uploadTime.Timing(stats.UploadTime.Nanoseconds())
o.buildTime.Timing(stats.BuildTime.Nanoseconds())
// If there is some kind of failure, try to reopen the channel
if err != nil {
if err == nil {
o.logger.Debugf("done inserting rows using channel %s, stats: %+v", channel.Name, stats)
o.compressedOutput.Incr(int64(stats.CompressedOutputSize))
o.uploadTime.Timing(stats.UploadTime.Nanoseconds())
o.buildTime.Timing(stats.BuildTime.Nanoseconds())
o.convertTime.Timing(stats.ConvertTime.Nanoseconds())
o.serializeTime.Timing(stats.SerializeTime.Nanoseconds())
} else {
reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID)
if reopenErr == nil {
o.channelPool.Put(reopened)
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/snowflake/streaming/int128/int128.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func (i Num) ToBigEndian() []byte {

// AppendBigEndian converts an Int128 into big endian bytes
func (i Num) AppendBigEndian(b []byte) []byte {
b = binary.BigEndian.AppendUint64(b[0:8], uint64(i.hi))
return binary.BigEndian.AppendUint64(b[8:16], i.lo)
b = binary.BigEndian.AppendUint64(b, uint64(i.hi))
return binary.BigEndian.AppendUint64(b, i.lo)
}

// ToInt64 casts an Int128 to a int64 by truncating the bytes.
Expand Down
16 changes: 16 additions & 0 deletions internal/impl/snowflake/streaming/int128/int128_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
package int128

import (
"crypto/rand"
"fmt"
"math"
"slices"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -388,3 +390,17 @@ func TestFitsInPrec(t *testing.T) {
require.NoError(t, err)
require.True(t, n.FitsInPrecision(38), snowflakeNumberTiny)
}

func TestToBytes(t *testing.T) {
for i := 0; i < 100; i++ {
input := make([]byte, 16)
_, err := rand.Read(input)
require.NoError(t, err)
n := FromBigEndian(input)
require.Equal(t, input, n.ToBigEndian())
require.Equal(t, input, n.AppendBigEndian(nil))
cloned := slices.Clone(input)
require.Equal(t, input, n.AppendBigEndian(cloned)[16:32])
require.Equal(t, input, cloned) // Make sure cloned isn't mutated
}
}
9 changes: 5 additions & 4 deletions internal/impl/snowflake/streaming/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ func TestAllSnowflakeDatatypes(t *testing.T) {
ctx := context.Background()
restClient, streamClient := setup(t)
channelOpts := streaming.ChannelOptions{
Name: t.Name(),
DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"),
SchemaName: "PUBLIC",
TableName: "TEST_TABLE_KITCHEN_SINK",
Name: t.Name(),
DatabaseName: envOr("SNOWFLAKE_DB", "BABY_DATABASE"),
SchemaName: "PUBLIC",
TableName: "TEST_TABLE_KITCHEN_SINK",
BuildParallelism: 1,
}
_, err := restClient.RunSQL(ctx, streaming.RunSQLRequest{
Database: channelOpts.DatabaseName,
Expand Down
84 changes: 48 additions & 36 deletions internal/impl/snowflake/streaming/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,76 +11,86 @@
package streaming

import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/format"
"github.com/redpanda-data/benthos/v4/public/service"
"github.com/segmentio/encoding/thrift"
)

func messageToRow(msg *service.Message) (map[string]any, error) {
// messageToRow converts a message into columnar form using the provided name to index mapping.
// We have to materialize the column into a row so that we can know if a column is null - the
// msg can be sparse, but the row must not be sparse.
func messageToRow(msg *service.Message, out []any, nameToPosition map[string]int) error {
v, err := msg.AsStructured()
if err != nil {
return nil, fmt.Errorf("error extracting object from message: %w", err)
return fmt.Errorf("error extracting object from message: %w", err)
}
row, ok := v.(map[string]any)
if !ok {
return nil, fmt.Errorf("expected object, got: %T", v)
return fmt.Errorf("expected object, got: %T", v)
}
mapped := make(map[string]any, len(row))
for k, v := range row {
mapped[normalizeColumnName(k)] = v
idx, ok := nameToPosition[normalizeColumnName(k)]
if !ok {
// TODO(schema): Unknown column, we just skip it.
// In the future we may evolve the schema based on the new data.
continue
}
out[idx] = v
}
return mapped, nil
return nil
}

// TODO: If the memory pressure is too great from writing all
// records buffered as a single row group, then consider
// return some kind of iterator of chunks of rows that we can
// then feed into the actual parquet construction process.
//
// If a single parquet file is too much, we can consider having multiple
// parquet files in a single bdec file.
func constructRowGroup(
batch service.MessageBatch,
schema *parquet.Schema,
transformers map[string]*dataTransformer,
) ([]parquet.Row, error) {
transformers []*dataTransformer,
) ([]parquet.Row, []*statsBuffer, error) {
// We write all of our data in a columnar fashion, but need to pivot that data so that we can feed it into
// out parquet library (which sadly will redo the pivot - maybe we need a lower level abstraction...).
// So create a massive matrix that we will write stuff in columnar form, but then we don't need to move any
// data to create rows of the data via an in-place transpose operation.
//
// TODO: Consider caching/pooling this matrix as I expect many are similarily sized.
matrix := make([]parquet.Value, len(batch)*len(schema.Fields()))
rowWidth := len(schema.Fields())
for idx, field := range schema.Fields() {
// The column index is consistent between two schemas that are the same because the schema fields are always
// in sorted order.
columnIndex := idx
t := transformers[field.Name()]
t.buf.Prepare(matrix, columnIndex, rowWidth)
t.stats.Reset()
matrix := make([]parquet.Value, len(batch)*rowWidth)
nameToPosition := make(map[string]int, rowWidth)
stats := make([]*statsBuffer, rowWidth)
buffers := make([]typedBuffer, rowWidth)
for idx, t := range transformers {
leaf, ok := schema.Lookup(t.name)
if !ok {
return nil, nil, fmt.Errorf("invariant failed: unable to find column %q", t.name)
}
buffers[idx] = t.bufferFactory()
buffers[idx].Prepare(matrix, leaf.ColumnIndex, rowWidth)
stats[idx] = &statsBuffer{}
nameToPosition[t.name] = idx
}
// First we need to shred our record into columns, snowflake's data model
// is thankfully a flat list of columns, so no dremel style record shredding
// is needed
row := make([]any, rowWidth)
for _, msg := range batch {
row, err := messageToRow(msg)
err := messageToRow(msg, row, nameToPosition)
if err != nil {
return nil, err
return nil, nil, err
}
// We **must** write a null, so iterate over the schema not the record,
// which might be sparse
for name, t := range transformers {
v := row[name]
err = t.converter.ValidateAndConvert(t.stats, v, t.buf)
for i, v := range row {
t := transformers[i]
s := stats[i]
b := buffers[i]
err = t.converter.ValidateAndConvert(s, v, b)
if err != nil {
return nil, fmt.Errorf("invalid data for column %s: %w", name, err)
// TODO(schema): if this is a null value err then we can evolve the schema to mark it null.
return nil, nil, fmt.Errorf("invalid data for column %s: %w", t.name, err)
}
// reset the column as nil for the next row
row[i] = nil
}
}
// Now all our values have been written to each buffer - here is where we do our matrix
Expand All @@ -90,7 +100,7 @@ func constructRowGroup(
rowStart := i * rowWidth
rows[i] = matrix[rowStart : rowStart+rowWidth]
}
return rows, nil
return rows, stats, nil
}

type parquetFileData struct {
Expand All @@ -99,9 +109,10 @@ type parquetFileData struct {
metadata map[string]string
}

func writeParquetFile(writer io.Writer, rpcnVersion string, data parquetFileData) (err error) {
pw := parquet.NewGenericWriter[map[string]any](
writer,
func writeParquetFile(rpcnVersion string, data parquetFileData) (out []byte, err error) {
b := bytes.NewBuffer(nil)
pw := parquet.NewGenericWriter[any](
b,
data.schema,
parquet.CreatedBy("RedpandaConnect", rpcnVersion, "unknown"),
// Recommended by the Snowflake team to enable data page stats
Expand All @@ -121,6 +132,7 @@ func writeParquetFile(writer io.Writer, rpcnVersion string, data parquetFileData
return
}
err = pw.Close()
out = b.Bytes()
return
}

Expand Down
Loading

0 comments on commit 86922d7

Please sign in to comment.