Skip to content

Commit

Permalink
pgcdc: remove bool for operation
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj committed Nov 8, 2024
1 parent e47ca50 commit 6eae232
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
6 changes: 2 additions & 4 deletions internal/impl/postgresql/input_pg_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -401,8 +400,7 @@ func (p *pgStreamInput) Connect(ctx context.Context) error {

batchMsg := service.NewMessage(mb)

streaming := strconv.FormatBool(message.IsStreaming)
batchMsg.MetaSet("streaming", streaming)
batchMsg.MetaSet("mode", string(message.Mode))
batchMsg.MetaSet("table", message.Changes[0].Table)
batchMsg.MetaSet("operation", message.Changes[0].Operation)
if message.Changes[0].TableSnapshotProgress != nil {
Expand All @@ -419,7 +417,7 @@ func (p *pgStreamInput) Connect(ctx context.Context) error {
p.logger.Debugf("Flush batch error: %w", err)
break
}
if message.IsStreaming {
if message.Mode == pglogicalstream.StreamModeStreaming {
if !p.flushBatch(ctx, cp, flushedBatch, latestOffset, true) {
break
}
Expand Down
6 changes: 3 additions & 3 deletions internal/impl/postgresql/pglogicalstream/logical_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (s *Stream) streamMessagesAsync() {
Changes: []StreamMessageChanges{
*message,
},
IsStreaming: true,
Mode: StreamModeStreaming,
WALLagBytes: &metrics.WalLagInBytes,
}
}
Expand Down Expand Up @@ -511,7 +511,7 @@ func (s *Stream) streamMessagesAsync() {
s.messages <- StreamMessage{
Lsn: &lsn,
Changes: pgoutputChanges,
IsStreaming: true,
Mode: StreamModeStreaming,
WALLagBytes: &metrics.WalLagInBytes,
}
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func (s *Stream) processSnapshot(ctx context.Context) error {

tableProgress := s.monitor.GetSnapshotProgressForTable(tableWithoutSchema)
snapshotChangePacket.Changes[0].TableSnapshotProgress = &tableProgress
snapshotChangePacket.IsStreaming = false
snapshotChangePacket.Mode = StreamModeSnapshot

waitingFromBenthos := time.Now()
s.messages <- snapshotChangePacket
Expand Down
9 changes: 8 additions & 1 deletion internal/impl/postgresql/pglogicalstream/stream_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@ type StreamMessageMetrics struct {
IsStreaming bool `json:"is_streaming"`
}

type StreamMode string

Check failure on line 28 in internal/impl/postgresql/pglogicalstream/stream_message.go

View workflow job for this annotation

GitHub Actions / golangci-lint

exported: exported type StreamMode should have comment or be unexported (revive)

const (
StreamModeStreaming StreamMode = "streaming"

Check failure on line 31 in internal/impl/postgresql/pglogicalstream/stream_message.go

View workflow job for this annotation

GitHub Actions / golangci-lint

exported: exported const StreamModeStreaming should have comment (or a comment on this block) or be unexported (revive)
StreamModeSnapshot StreamMode = "snapshot"
)

// StreamMessage represents a single message after it has been decoded by the plugin
type StreamMessage struct {
Lsn *string `json:"lsn"`
Changes []StreamMessageChanges `json:"changes"`
IsStreaming bool `json:"is_streaming"`
Mode StreamMode `json:"mode"`
WALLagBytes *int64 `json:"wal_lag_bytes"`
}

0 comments on commit 6eae232

Please sign in to comment.