Skip to content

Commit

Permalink
pgcdc: cleanup monitor with periodic utility
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj committed Nov 14, 2024
1 parent 1f8a650 commit cd352e0
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 38 deletions.
2 changes: 1 addition & 1 deletion internal/impl/postgresql/pglogicalstream/logical_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) {
stream.standbyMessageTimeout = config.PgStandbyTimeout
stream.nextStandbyMessageDeadline = time.Now().Add(stream.standbyMessageTimeout)

monitor, err := NewMonitor(config.DBRawDSN, stream.logger, tableNames, stream.slotName, config.WalMonitorInterval)
monitor, err := NewMonitor(ctx, config.DBRawDSN, stream.logger, tableNames, stream.slotName, config.WalMonitorInterval)
if err != nil {
return nil, err
}
Expand Down
64 changes: 27 additions & 37 deletions internal/impl/postgresql/pglogicalstream/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"context"
"database/sql"
"fmt"
"maps"
"math"
"strings"
"sync"
"time"

"github.com/redpanda-data/benthos/v4/public/service"
"github.com/redpanda-data/connect/v4/internal/periodic"
)

// Report is a structure that contains the current state of the Monitor
Expand All @@ -38,16 +40,21 @@ type Monitor struct {
// finding the difference between the latest LSN and the last confirmed LSN for the replication slot
replicationLagInBytes int64

dbConn *sql.DB
slotName string
logger *service.Logger
ticker *time.Ticker
cancelTicker context.CancelFunc
ctx context.Context
dbConn *sql.DB
slotName string
logger *service.Logger
loop *periodic.Periodic
}

// NewMonitor creates a new Monitor instance
func NewMonitor(dbDSN string, logger *service.Logger, tables []string, slotName string, interval time.Duration) (*Monitor, error) {
func NewMonitor(
ctx context.Context,
dbDSN string,
logger *service.Logger,
tables []string,
slotName string,
interval time.Duration,
) (*Monitor, error) {
dbConn, err := openPgConnectionFromConfig(dbDSN)
if err != nil {
return nil, err
Expand All @@ -60,29 +67,11 @@ func NewMonitor(dbDSN string, logger *service.Logger, tables []string, slotName
slotName: slotName,
logger: logger,
}

if err = m.readTablesStat(tables); err != nil {
m.loop = periodic.NewWithContext(interval, m.readReplicationLag)
if err = m.readTablesStat(ctx, tables); err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
m.ctx = ctx
m.cancelTicker = cancel
m.ticker = time.NewTicker(interval)

go func() {
for {
select {
case <-m.ticker.C:
m.readReplicationLag()
break
case <-m.ctx.Done():
m.ticker.Stop()
return
}
}
}()

m.loop.Start()
return m, nil
}

Expand All @@ -101,7 +90,7 @@ func (m *Monitor) UpdateSnapshotProgressForTable(table string, position int) {
}

// we need to read the tables stat to calculate the snapshot ingestion progress
func (m *Monitor) readTablesStat(tables []string) error {
func (m *Monitor) readTablesStat(ctx context.Context, tables []string) error {
results := make(map[string]int64)

for _, table := range tables {
Expand All @@ -110,7 +99,7 @@ func (m *Monitor) readTablesStat(tables []string) error {
query := "SELECT COUNT(*) FROM " + tableWithoutSchema

var count int64
err := m.dbConn.QueryRow(query).Scan(&count)
err := m.dbConn.QueryRowContext(ctx, query).Scan(&count)

if err != nil {
// If the error is because the table doesn't exist, we'll set the count to 0
Expand All @@ -130,27 +119,29 @@ func (m *Monitor) readTablesStat(tables []string) error {
return nil
}

func (m *Monitor) readReplicationLag() {
result, err := m.dbConn.Query(`SELECT slot_name,
func (m *Monitor) readReplicationLag(ctx context.Context) {
result, err := m.dbConn.QueryContext(ctx, `SELECT slot_name,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes
FROM pg_replication_slots WHERE slot_name = $1;`, m.slotName)
// calculate the replication lag in bytes
// replicationLagInBytes = latestLsn - confirmedLsn
if err != nil || result.Err() != nil {
m.logger.Errorf("Error reading replication lag: %v", err)
m.logger.Warnf("Error reading replication lag: %v", err)
return
}

var slotName string
var lagbytes int64
for result.Next() {
if err = result.Scan(&slotName, &lagbytes); err != nil {
m.logger.Errorf("Error reading replication lag: %v", err)
m.logger.Warnf("Error reading replication lag: %v", err)
return
}
}

m.lock.Lock()
m.replicationLagInBytes = lagbytes
m.lock.Unlock()
}

// Report returns a snapshot of the monitor's state
Expand All @@ -161,13 +152,12 @@ func (m *Monitor) Report() *Report {
// report the replication lag
return &Report{
WalLagInBytes: m.replicationLagInBytes,
TableProgress: m.snapshotProgress,
TableProgress: maps.Clone(m.snapshotProgress),
}
}

// Stop stops the monitor
func (m *Monitor) Stop() error {
m.cancelTicker()
m.ticker.Stop()
m.loop.Stop()
return m.dbConn.Close()
}

0 comments on commit cd352e0

Please sign in to comment.