diff --git a/internal/impl/postgresql/input_pg_stream.go b/internal/impl/postgresql/input_pg_stream.go index 780253dfc..466ad1861 100644 --- a/internal/impl/postgresql/input_pg_stream.go +++ b/internal/impl/postgresql/input_pg_stream.go @@ -12,7 +12,6 @@ import ( "context" "encoding/json" "fmt" - "strconv" "sync" "sync/atomic" "time" @@ -401,8 +400,7 @@ func (p *pgStreamInput) Connect(ctx context.Context) error { batchMsg := service.NewMessage(mb) - streaming := strconv.FormatBool(message.IsStreaming) - batchMsg.MetaSet("streaming", streaming) + batchMsg.MetaSet("mode", string(message.Mode)) batchMsg.MetaSet("table", message.Changes[0].Table) batchMsg.MetaSet("operation", message.Changes[0].Operation) if message.Changes[0].TableSnapshotProgress != nil { @@ -419,7 +417,7 @@ func (p *pgStreamInput) Connect(ctx context.Context) error { p.logger.Debugf("Flush batch error: %w", err) break } - if message.IsStreaming { + if message.Mode == pglogicalstream.StreamModeStreaming { if !p.flushBatch(ctx, cp, flushedBatch, latestOffset, true) { break } diff --git a/internal/impl/postgresql/pglogicalstream/logical_stream.go b/internal/impl/postgresql/pglogicalstream/logical_stream.go index f3f0bd40b..d745f6f3a 100644 --- a/internal/impl/postgresql/pglogicalstream/logical_stream.go +++ b/internal/impl/postgresql/pglogicalstream/logical_stream.go @@ -448,7 +448,7 @@ func (s *Stream) streamMessagesAsync() { Changes: []StreamMessageChanges{ *message, }, - IsStreaming: true, + Mode: StreamModeStreaming, WALLagBytes: &metrics.WalLagInBytes, } } @@ -511,7 +511,7 @@ func (s *Stream) streamMessagesAsync() { s.messages <- StreamMessage{ Lsn: &lsn, Changes: pgoutputChanges, - IsStreaming: true, + Mode: StreamModeStreaming, WALLagBytes: &metrics.WalLagInBytes, } } @@ -661,7 +661,7 @@ func (s *Stream) processSnapshot(ctx context.Context) error { tableProgress := s.monitor.GetSnapshotProgressForTable(tableWithoutSchema) snapshotChangePacket.Changes[0].TableSnapshotProgress = &tableProgress - snapshotChangePacket.IsStreaming = false + snapshotChangePacket.Mode = StreamModeSnapshot waitingFromBenthos := time.Now() s.messages <- snapshotChangePacket diff --git a/internal/impl/postgresql/pglogicalstream/stream_message.go b/internal/impl/postgresql/pglogicalstream/stream_message.go index 3a139fcc2..7c6b8531c 100644 --- a/internal/impl/postgresql/pglogicalstream/stream_message.go +++ b/internal/impl/postgresql/pglogicalstream/stream_message.go @@ -25,10 +25,17 @@ type StreamMessageMetrics struct { IsStreaming bool `json:"is_streaming"` } +type StreamMode string + +const ( + StreamModeStreaming StreamMode = "streaming" + StreamModeSnapshot StreamMode = "snapshot" +) + // StreamMessage represents a single message after it has been decoded by the plugin type StreamMessage struct { Lsn *string `json:"lsn"` Changes []StreamMessageChanges `json:"changes"` - IsStreaming bool `json:"is_streaming"` + Mode StreamMode `json:"mode"` WALLagBytes *int64 `json:"wal_lag_bytes"` }