From cd352e004ef9ba174953ba9e79ba1a9e7d8ceb6c Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 14 Nov 2024 02:43:13 +0000 Subject: [PATCH] pgcdc: cleanup monitor with periodic utility --- .../pglogicalstream/logical_stream.go | 2 +- .../postgresql/pglogicalstream/monitor.go | 64 ++++++++----------- 2 files changed, 28 insertions(+), 38 deletions(-) diff --git a/internal/impl/postgresql/pglogicalstream/logical_stream.go b/internal/impl/postgresql/pglogicalstream/logical_stream.go index 19b237ef8..2c6c6bb11 100644 --- a/internal/impl/postgresql/pglogicalstream/logical_stream.go +++ b/internal/impl/postgresql/pglogicalstream/logical_stream.go @@ -221,7 +221,7 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) { stream.standbyMessageTimeout = config.PgStandbyTimeout stream.nextStandbyMessageDeadline = time.Now().Add(stream.standbyMessageTimeout) - monitor, err := NewMonitor(config.DBRawDSN, stream.logger, tableNames, stream.slotName, config.WalMonitorInterval) + monitor, err := NewMonitor(ctx, config.DBRawDSN, stream.logger, tableNames, stream.slotName, config.WalMonitorInterval) if err != nil { return nil, err } diff --git a/internal/impl/postgresql/pglogicalstream/monitor.go b/internal/impl/postgresql/pglogicalstream/monitor.go index e55ae3976..c1bd1ccd9 100644 --- a/internal/impl/postgresql/pglogicalstream/monitor.go +++ b/internal/impl/postgresql/pglogicalstream/monitor.go @@ -12,12 +12,14 @@ import ( "context" "database/sql" "fmt" + "maps" "math" "strings" "sync" "time" "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/connect/v4/internal/periodic" ) // Report is a structure that contains the current state of the Monitor @@ -38,16 +40,21 @@ type Monitor struct { // finding the difference between the latest LSN and the last confirmed LSN for the replication slot replicationLagInBytes int64 - dbConn *sql.DB - slotName string - logger *service.Logger - ticker *time.Ticker - cancelTicker context.CancelFunc - ctx context.Context + dbConn *sql.DB + slotName string + logger *service.Logger + loop *periodic.Periodic } // NewMonitor creates a new Monitor instance -func NewMonitor(dbDSN string, logger *service.Logger, tables []string, slotName string, interval time.Duration) (*Monitor, error) { +func NewMonitor( + ctx context.Context, + dbDSN string, + logger *service.Logger, + tables []string, + slotName string, + interval time.Duration, +) (*Monitor, error) { dbConn, err := openPgConnectionFromConfig(dbDSN) if err != nil { return nil, err @@ -60,29 +67,11 @@ func NewMonitor(dbDSN string, logger *service.Logger, tables []string, slotName slotName: slotName, logger: logger, } - - if err = m.readTablesStat(tables); err != nil { + m.loop = periodic.NewWithContext(interval, m.readReplicationLag) + if err = m.readTablesStat(ctx, tables); err != nil { return nil, err } - - ctx, cancel := context.WithCancel(context.Background()) - m.ctx = ctx - m.cancelTicker = cancel - m.ticker = time.NewTicker(interval) - - go func() { - for { - select { - case <-m.ticker.C: - m.readReplicationLag() - break - case <-m.ctx.Done(): - m.ticker.Stop() - return - } - } - }() - + m.loop.Start() return m, nil } @@ -101,7 +90,7 @@ func (m *Monitor) UpdateSnapshotProgressForTable(table string, position int) { } // we need to read the tables stat to calculate the snapshot ingestion progress -func (m *Monitor) readTablesStat(tables []string) error { +func (m *Monitor) readTablesStat(ctx context.Context, tables []string) error { results := make(map[string]int64) for _, table := range tables { @@ -110,7 +99,7 @@ func (m *Monitor) readTablesStat(tables []string) error { query := "SELECT COUNT(*) FROM " + tableWithoutSchema var count int64 - err := m.dbConn.QueryRow(query).Scan(&count) + err := m.dbConn.QueryRowContext(ctx, query).Scan(&count) if err != nil { // If the error is because the table doesn't exist, we'll set the count to 0 @@ -130,14 +119,14 @@ func (m *Monitor) readTablesStat(tables []string) error { return nil } -func (m *Monitor) readReplicationLag() { - result, err := m.dbConn.Query(`SELECT slot_name, +func (m *Monitor) readReplicationLag(ctx context.Context) { + result, err := m.dbConn.QueryContext(ctx, `SELECT slot_name, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes FROM pg_replication_slots WHERE slot_name = $1;`, m.slotName) // calculate the replication lag in bytes // replicationLagInBytes = latestLsn - confirmedLsn if err != nil || result.Err() != nil { - m.logger.Errorf("Error reading replication lag: %v", err) + m.logger.Warnf("Error reading replication lag: %v", err) return } @@ -145,12 +134,14 @@ func (m *Monitor) readReplicationLag() { var lagbytes int64 for result.Next() { if err = result.Scan(&slotName, &lagbytes); err != nil { - m.logger.Errorf("Error reading replication lag: %v", err) + m.logger.Warnf("Error reading replication lag: %v", err) return } } + m.lock.Lock() m.replicationLagInBytes = lagbytes + m.lock.Unlock() } // Report returns a snapshot of the monitor's state @@ -161,13 +152,12 @@ func (m *Monitor) Report() *Report { // report the replication lag return &Report{ WalLagInBytes: m.replicationLagInBytes, - TableProgress: m.snapshotProgress, + TableProgress: maps.Clone(m.snapshotProgress), } } // Stop stops the monitor func (m *Monitor) Stop() error { - m.cancelTicker() - m.ticker.Stop() + m.loop.Stop() return m.dbConn.Close() }