From 0418bcf9fe02fedd8c4cd71ff04fcca38c70b91b Mon Sep 17 00:00:00 2001 From: Vladyslav Len Date: Wed, 16 Oct 2024 13:50:01 +0200 Subject: [PATCH] chore(): added snapshot metrics streaming --- internal/impl/postgresql/integration_test.go | 2 -- internal/impl/postgresql/pglogicalstream/logical_stream.go | 2 -- internal/impl/postgresql/pglogicalstream/monitor.go | 1 - internal/impl/postgresql/pglogicalstream/snapshotter.go | 1 - internal/impl/postgresql/pglogicalstream/stream_message.go | 2 +- 5 files changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/impl/postgresql/integration_test.go b/internal/impl/postgresql/integration_test.go index d8147a0fe..efb8c6aaf 100644 --- a/internal/impl/postgresql/integration_test.go +++ b/internal/impl/postgresql/integration_test.go @@ -701,8 +701,6 @@ file: } func TestIntegrationPgMultiVersionsCDCForPgOutputStreamComittedPlugin(t *testing.T) { - // running tests in the look to test different PostgreSQL versions - t.Parallel() for _, v := range []string{"17", "16", "15", "14", "13", "12", "11", "10"} { tmpDir := t.TempDir() pool, err := dockertest.NewPool("") diff --git a/internal/impl/postgresql/pglogicalstream/logical_stream.go b/internal/impl/postgresql/pglogicalstream/logical_stream.go index 5300e3594..9234e40e8 100644 --- a/internal/impl/postgresql/pglogicalstream/logical_stream.go +++ b/internal/impl/postgresql/pglogicalstream/logical_stream.go @@ -206,8 +206,6 @@ func NewPgStream(config Config) (*Stream, error) { SnapshotAction: "export", }, version, stream.snapshotter) if err != nil { - fmt.Println(err) - fmt.Println("Failed to create replication slot", err.Error()) return nil, err } stream.snapshotName = createSlotResult.SnapshotName diff --git a/internal/impl/postgresql/pglogicalstream/monitor.go b/internal/impl/postgresql/pglogicalstream/monitor.go index f9f6852d8..662fc93b2 100644 --- a/internal/impl/postgresql/pglogicalstream/monitor.go +++ b/internal/impl/postgresql/pglogicalstream/monitor.go @@ -123,7 +123,6 @@ func (m *Monitor) readTablesStat(tables []string) error { results[tableWithoutSchema] = count } - fmt.Println("Table stat", results) m.tableStat = results return nil } diff --git a/internal/impl/postgresql/pglogicalstream/snapshotter.go b/internal/impl/postgresql/pglogicalstream/snapshotter.go index a1e44e9dd..54d9c800c 100644 --- a/internal/impl/postgresql/pglogicalstream/snapshotter.go +++ b/internal/impl/postgresql/pglogicalstream/snapshotter.go @@ -98,7 +98,6 @@ func (s *Snapshotter) prepare() error { return err } if _, err := s.pgConnection.Exec(fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s';", s.snapshotName)); err != nil { - fmt.Println("Failed to prepare snapshot", err) return err } diff --git a/internal/impl/postgresql/pglogicalstream/stream_message.go b/internal/impl/postgresql/pglogicalstream/stream_message.go index d2f6e72c2..e4abd06e9 100644 --- a/internal/impl/postgresql/pglogicalstream/stream_message.go +++ b/internal/impl/postgresql/pglogicalstream/stream_message.go @@ -14,7 +14,7 @@ type StreamMessageChanges struct { Operation string `json:"operation"` Schema string `json:"schema"` Table string `json:"table"` - TableSnapshotProgress *float64 `json:"table_snapshot_progress"` + TableSnapshotProgress *float64 `json:"table_snapshot_progress,omitempty"` // For deleted messages - there will be old changes if replica identity set to full or empty changes Data map[string]any `json:"data"` }