From eb7dd3ddc1bc281a323effdd88807c92bd3bc670 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 14 Nov 2024 21:05:16 +0000 Subject: [PATCH] snowpipe: batch schema migration When first running a pipeline with heterogeneous data, you might need to do a lot of column additions. Support a batch path for updating columns so that we don't have to fallback to retrying the whole batch - also it makes the experience much faster. --- .../snowflake/output_snowflake_streaming.go | 57 +++++++++++++------ internal/impl/snowflake/streaming/parquet.go | 6 +- .../impl/snowflake/streaming/schema_errors.go | 22 ++++++- 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index a48cbd748..ca1e428f5 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -564,27 +564,12 @@ func (o *snowflakeStreamerOutput) WriteBatchInternal(ctx context.Context, batch } else { // Only evolve the schema if requested. if o.schemaEvolutionEnabled() { - nullColumnErr := streaming.NonNullColumnError{} - if errors.As(err, &nullColumnErr) { + schemaErr, ok := asSchemaMigrationError(o, err) + if ok { // put the channel back so that we can reopen it along with the rest of the channels to // pick up the new schema. o.channelPool.Put(channel) - // Return an error so that we release our read lock and can take the write lock - // to forcibly reopen all our channels to get a new schema. - return schemaMigrationNeededError{ - migrator: func(ctx context.Context) error { - return o.MigrateNotNullColumn(ctx, nullColumnErr) - }, - } - } - missingColumnErr := streaming.MissingColumnError{} - if errors.As(err, &missingColumnErr) { - o.channelPool.Put(channel) - return schemaMigrationNeededError{ - migrator: func(ctx context.Context) error { - return o.MigrateMissingColumn(ctx, missingColumnErr) - }, - } + return schemaErr } } reopened, reopenErr := o.openChannel(ctx, channel.Name, channel.ID) @@ -605,6 +590,42 @@ func (o *snowflakeStreamerOutput) WriteBatchInternal(ctx context.Context, batch return err } +func asSchemaMigrationError(o *snowflakeStreamerOutput, err error) (schemaMigrationNeededError, bool) { + nullColumnErr := streaming.NonNullColumnError{} + if errors.As(err, &nullColumnErr) { + // Return an error so that we release our read lock and can take the write lock + // to forcibly reopen all our channels to get a new schema. + return schemaMigrationNeededError{ + migrator: func(ctx context.Context) error { + return o.MigrateNotNullColumn(ctx, nullColumnErr) + }, + }, true + } + missingColumnErr := streaming.MissingColumnError{} + if errors.As(err, &missingColumnErr) { + return schemaMigrationNeededError{ + migrator: func(ctx context.Context) error { + return o.MigrateMissingColumn(ctx, missingColumnErr) + }, + }, true + } + batchErr := streaming.BatchSchemaMismatchError[streaming.MissingColumnError]{} + if errors.As(err, &batchErr) { + return schemaMigrationNeededError{ + migrator: func(ctx context.Context) error { + for _, missingCol := range batchErr.Errors { + // TODO(rockwood): Consider a batch SQL statement that adds N columns at a time + if err := o.MigrateMissingColumn(ctx, missingCol); err != nil { + return err + } + } + return nil + }, + }, true + } + return schemaMigrationNeededError{}, false +} + type schemaMigrationNeededError struct { migrator func(ctx context.Context) error } diff --git a/internal/impl/snowflake/streaming/parquet.go b/internal/impl/snowflake/streaming/parquet.go index 494ba0061..125821efb 100644 --- a/internal/impl/snowflake/streaming/parquet.go +++ b/internal/impl/snowflake/streaming/parquet.go @@ -34,16 +34,20 @@ func messageToRow(msg *service.Message, out []any, nameToPosition map[string]int if !ok { return fmt.Errorf("expected object, got: %T", v) } + var missingColumns []MissingColumnError for k, v := range row { idx, ok := nameToPosition[normalizeColumnName(k)] if !ok { if !allowExtraProperties && v != nil { - return MissingColumnError{columnName: k, val: v} + missingColumns = append(missingColumns, MissingColumnError{columnName: k, val: v}) } continue } out[idx] = v } + if len(missingColumns) > 0 { + return BatchSchemaMismatchError[MissingColumnError]{missingColumns} + } return nil } diff --git a/internal/impl/snowflake/streaming/schema_errors.go b/internal/impl/snowflake/streaming/schema_errors.go index 117db476b..1524f9d5c 100644 --- a/internal/impl/snowflake/streaming/schema_errors.go +++ b/internal/impl/snowflake/streaming/schema_errors.go @@ -10,17 +10,37 @@ package streaming -import "fmt" +import ( + "errors" + "fmt" +) // SchemaMismatchError occurs when the user provided data has data that // doesn't match the schema *and* the table can be evolved to accommodate // // This can be used as a mechanism to evolve the schema dynamically. type SchemaMismatchError interface { + error ColumnName() string Value() any } +var _ error = BatchSchemaMismatchError[SchemaMismatchError]{} + +// BatchSchemaMismatchError is when multiple schema mismatch errors happen at once +type BatchSchemaMismatchError[T SchemaMismatchError] struct { + Errors []T +} + +// Error implements the error interface +func (e BatchSchemaMismatchError[T]) Error() string { + errs := []error{} + for _, err := range e.Errors { + errs = append(errs, err) + } + return errors.Join(errs...).Error() +} + var _ error = NonNullColumnError{} var _ SchemaMismatchError = NonNullColumnError{}