Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PostgreSQL CDC Plugin #2917

Open
wants to merge 127 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 126 commits
Commits
Show all changes
127 commits
Select commit Hold shift + click to select a range
7561a0d
Move repos into connect
Jeffail Sep 30, 2024
99164a2
Add placeholders for logging and TODOs on panics
Jeffail Sep 30, 2024
ffd356b
feat(pgstream): added support for pgoutput native plugin
le-vlad Oct 1, 2024
2a1b515
feat(pgstream): added support for pgoutput native plugin
le-vlad Oct 1, 2024
2c66b77
chore(pg_stream): updated table filtering
le-vlad Oct 2, 2024
73fb9f7
chore(): updated tests for pglogical stream
le-vlad Oct 4, 2024
5306dcc
chore(): fmt applied
le-vlad Oct 4, 2024
7031109
chore(): code re-org
le-vlad Oct 4, 2024
184cf64
chore(): added temp. replication slot and removed outdated code
le-vlad Oct 4, 2024
c388aab
chore(): fixed eslint errors && tests
le-vlad Oct 7, 2024
dd82b0f
chore(): removed panics
le-vlad Oct 7, 2024
aa230d5
fix(): table name in snapshotter
le-vlad Oct 8, 2024
3fb3996
chore(): working on stream uncomited changes
le-vlad Oct 9, 2024
878fdc5
fix(postgres): correct order for message LSN ack
le-vlad Oct 9, 2024
5b4a835
chore(): removed log line
le-vlad Oct 10, 2024
8cf7edd
chore(): working on metrics
le-vlad Oct 11, 2024
2149189
chore(): removed test case && working on monitor testing
le-vlad Oct 12, 2024
7bf0898
chore(): monitor testing
le-vlad Oct 13, 2024
f18c8e4
chore(): added backward compatibility for postgresql
le-vlad Oct 15, 2024
39fdace
chore(): updated tests for different pg versions && working on metrics
le-vlad Oct 16, 2024
4f360c9
chore(): added WAL lag streaming
le-vlad Oct 16, 2024
fac3216
chore(): added snapshot metrics
le-vlad Oct 16, 2024
0418bcf
chore(): added snapshot metrics streaming
le-vlad Oct 16, 2024
5579b16
chore(): added explicit value for snapshot batch size
le-vlad Oct 17, 2024
64770ad
chore(): updated docs
le-vlad Oct 17, 2024
803b682
chore(): updated docs
le-vlad Oct 17, 2024
34f5995
chore(): applieds golangci-lint notes
le-vlad Oct 17, 2024
622303e
chore(): working on faster snapshot processing
le-vlad Oct 23, 2024
041a55c
chore(): experimenting with object pool
le-vlad Oct 23, 2024
c7c198c
Revert "chore(): experimenting with object pool"
le-vlad Oct 23, 2024
b16738d
chore(): use common pool to process snapshot
le-vlad Oct 23, 2024
8800444
chore(): added snapshot message rate counter
le-vlad Oct 23, 2024
79428da
chore(): working on batches
le-vlad Oct 25, 2024
85abfc8
fixed(): test
le-vlad Oct 25, 2024
d4a4960
fix(): metrics
le-vlad Oct 25, 2024
0187c7a
chore(): removed unused struct
le-vlad Oct 25, 2024
fd87cbe
chore(): stabilised batches
le-vlad Oct 28, 2024
e328d5f
chore(): removed debug lines; fixed linter
le-vlad Oct 28, 2024
61dede9
Merge branch 'pg_cdc_batch' into pg_cdc
le-vlad Oct 28, 2024
a4534be
Merge branch 'main' of github.com:le-vlad/connect into pg_cdc
le-vlad Nov 1, 2024
6fd8e4f
chore(): updated tls config field && small refactoring
le-vlad Nov 1, 2024
f4e14bd
ref(): use context when create publication
le-vlad Nov 1, 2024
9ff7079
pgcdc: cleanup configuration
rockwotj Nov 4, 2024
2517627
pgcdc: simplify stream setup
rockwotj Nov 4, 2024
79d9f1f
more review feedback. This got to be a lot so just checkpointing so Vlad
rockwotj Nov 4, 2024
4e05248
Chan cleanup WIP
mihaitodor Nov 5, 2024
4a4b7ba
chore(): addressed pull requests changes
le-vlad Nov 5, 2024
ea64c1c
chore(): updated tests
le-vlad Nov 5, 2024
f169a73
chore(): applied pg notes
le-vlad Nov 5, 2024
22ff49a
chore(): removed unused vars
le-vlad Nov 5, 2024
2f15977
chore(): run make deps to fix ci pipeline
le-vlad Nov 5, 2024
a492556
fix(postgres_cdc): monitor tests
le-vlad Nov 5, 2024
0f71f7c
chore(postgres_cdc): added integration test skip check
le-vlad Nov 5, 2024
078ffd9
fix(postgres_cdc): lint warnings
le-vlad Nov 5, 2024
cf65fdc
chore(): specify monitoring && standby intervals via config
le-vlad Nov 6, 2024
1bac4b5
ref(): simplified batch flush
le-vlad Nov 6, 2024
cdd1a01
chore(): removed redundant tests + deps
le-vlad Nov 6, 2024
0f0eb6f
chore(): updated docs
le-vlad Nov 6, 2024
131c0a0
pgstream: create batcher in foreground
rockwotj Nov 7, 2024
e47ca50
pgstream: only check for done once
rockwotj Nov 7, 2024
6eae232
pgcdc: remove bool for operation
rockwotj Nov 8, 2024
35b7d98
pgcdc: update docs for mode
rockwotj Nov 8, 2024
4f13dcc
pgcdc: validate slot names can't cause SQL injection
rockwotj Nov 8, 2024
83d1db5
pgcdc: use error type for error handling, not bool
rockwotj Nov 8, 2024
3de85d0
pgcdc: import sanitization code from pgx
rockwotj Nov 8, 2024
a67ca7c
pgcdc: add note about pk in snapshot reading
rockwotj Nov 8, 2024
ca4cfdb
pgcdc: properly sanitize query
rockwotj Nov 8, 2024
a685430
pgcdc add note about how waiting for commit is buggy
rockwotj Nov 8, 2024
2d3b322
pgcdc: drop unused param
rockwotj Nov 8, 2024
743ee33
pgcdc: actually remove unused param
rockwotj Nov 8, 2024
b6789ff
pgcdc: update docs
rockwotj Nov 8, 2024
fe18543
ref(): small code refactoring
le-vlad Nov 11, 2024
3622b17
feat(): added max_parallel_snapshot_tables config field
le-vlad Nov 11, 2024
9f521ba
chore(): added pk ordering to consume snapshot
le-vlad Nov 11, 2024
77d7e22
Merge branch 'main' of github.com:redpanda-data/connect into pg_cdc
le-vlad Nov 11, 2024
0a84279
Merge branch 'main' of github.com:le-vlad/connect into pg_cdc
le-vlad Nov 11, 2024
7235fd7
fix(): enabled integration tests
le-vlad Nov 11, 2024
99dbe63
chore(): small fixes && pr notes
le-vlad Nov 11, 2024
61ea84b
chore(): updated docs && fixed lint
le-vlad Nov 11, 2024
141832f
chore(): revert integration tests
le-vlad Nov 12, 2024
51a940e
chore(): added publication updates instead of re-creation
le-vlad Nov 12, 2024
ba87494
pgcdc: prefix stat names
rockwotj Nov 12, 2024
19eda8a
pgcdc: remove lsnrestart field
rockwotj Nov 12, 2024
af512ce
pgcdc: add a high watermark utility
rockwotj Nov 12, 2024
aa4bc89
pgcdc: use watermark for log position
rockwotj Nov 12, 2024
2524a3f
pgcdc: remove layer of nesting from switch
rockwotj Nov 12, 2024
f31c71b
pgcdc: use typed duration fields
rockwotj Nov 12, 2024
5f2bce0
pgcdc: fix waiting for txn ack
rockwotj Nov 12, 2024
ec5783f
pgcdc: dedup config fields
rockwotj Nov 12, 2024
089ed64
pgcdc: fix config field defaults
rockwotj Nov 12, 2024
f8cbc95
pgcdc: properly implement watermark
rockwotj Nov 13, 2024
a74571a
pgcdc: properly ack only on commit messages, once everything is
rockwotj Nov 13, 2024
7d73864
pgcdc: there are actually 3 handlers
rockwotj Nov 13, 2024
8d0aaed
pgcdc: simplify plugin handling code
rockwotj Nov 13, 2024
f75e003
pgcdc: fix randomized ID
rockwotj Nov 13, 2024
fc29d42
pgcdc: remove unused import
rockwotj Nov 13, 2024
82cc4e1
pgcdc: always include mode
rockwotj Nov 13, 2024
1bce997
pgcdc: fix period batching and cleanup logic
rockwotj Nov 13, 2024
b85c8bc
pgcdc: fix lint error
rockwotj Nov 13, 2024
30678ed
pgcdc: regen docs
rockwotj Nov 13, 2024
408394a
chore(): added +1 to standby update to follow postgresql requirements
le-vlad Nov 13, 2024
94dcd26
Merge branch 'pg_cdc' of github.com:le-vlad/connect into pg_cdc
le-vlad Nov 13, 2024
fe05268
chore: goimports
rockwotj Nov 13, 2024
581c7d4
pgcdc: simplify shutdown in the input
rockwotj Nov 13, 2024
998b4c5
pgcdc: localize the pg stream
rockwotj Nov 13, 2024
080f3f7
pgcdc: simplify internal flow control
rockwotj Nov 13, 2024
bde31e2
pgcdc: don't produce 0 messages
rockwotj Nov 13, 2024
d1ea325
pgcdc: rename stream uncommitted to batch transactions
rockwotj Nov 13, 2024
093c0ac
pgcdc: fix config name
rockwotj Nov 13, 2024
a2a80da
pgcdc: add some TODOs
rockwotj Nov 13, 2024
8084bdf
pgcdc: update docs
rockwotj Nov 13, 2024
1f8a650
pgcdc: review feedback
rockwotj Nov 14, 2024
cd352e0
pgcdc: cleanup monitor with periodic utility
rockwotj Nov 14, 2024
5594cbd
pgcdc: fmt
rockwotj Nov 14, 2024
272eef0
pgcdc: check for non-zero duration
rockwotj Nov 14, 2024
b70f694
chore(): sanitized queries && fixed tests
le-vlad Nov 14, 2024
2a4e42b
chore(): removed wal2json support
le-vlad Nov 15, 2024
0469751
chore(): updated pgstream docs
le-vlad Nov 15, 2024
b550df9
feat(): added support for composite primary keys
le-vlad Nov 15, 2024
8137883
pgcdc: mark as enterprise licensed
rockwotj Nov 15, 2024
93f701e
chore(): applied make fmt
le-vlad Nov 15, 2024
a8574bd
Merge branch 'pg_cdc' of github.com:le-vlad/connect into pg_cdc
le-vlad Nov 15, 2024
3cfc436
pgcdc/snapshot: use context for cancellation
rockwotj Nov 15, 2024
83c39b2
Merge branch 'pg_cdc' of https://github.com/le-vlad/connect into pg_cdc
rockwotj Nov 15, 2024
b64dc1b
pgcdc: fix primary key order by clause
rockwotj Nov 15, 2024
d05aae0
pgcdc: fix zero batch check
rockwotj Nov 15, 2024
438719a
update changelog
rockwotj Nov 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 393 additions & 0 deletions docs/modules/components/pages/inputs/pg_stream.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,393 @@
= pg_stream
:type: input
:status: beta
:categories: ["Services"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////

// © 2024 Redpanda Data Inc.


component_type_dropdown::[]


Streams changes from a PostgreSQL database using logical replication.

Introduced in version 4.39.0.


[tabs]
======
Common::
+
--

```yml
# Common config fields, showing default values
input:
label: ""
pg_stream:
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable # No default (required)
batch_transactions: true
stream_snapshot: false
snapshot_memory_safety_factor: 1
snapshot_batch_size: 0
schema: public # No default (required)
tables: [] # No default (required)
checkpoint_limit: 1024
temporary_slot: false
slot_name: ""
pg_standby_timeout: 10s
pg_wal_monitor_interval: 3s
max_parallel_snapshot_tables: 1
auto_replay_nacks: true
batching:
count: 0
byte_size: 0
period: ""
check: ""
```

--
Advanced::
+
--

```yml
# All config fields, showing default values
input:
label: ""
pg_stream:
dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable # No default (required)
batch_transactions: true
stream_snapshot: false
snapshot_memory_safety_factor: 1
snapshot_batch_size: 0
schema: public # No default (required)
tables: [] # No default (required)
checkpoint_limit: 1024
temporary_slot: false
slot_name: ""
pg_standby_timeout: 10s
pg_wal_monitor_interval: 3s
max_parallel_snapshot_tables: 1
auto_replay_nacks: true
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
```

--
======

Streams changes from a PostgreSQL database for Change Data Capture (CDC).
Additionally, if `stream_snapshot` is set to true, then the existing data in the database is also streamed too.

== Metadata

This input adds the following metadata fields to each message:
- mode (Either "streaming" or "snapshot" indicating whether the message is part of a streaming operation or snapshot processing)
- table (Name of the table that the message originated from)
- operation (Type of operation that generated the message, such as INSERT, UPDATE, or DELETE)


== Fields

=== `dsn`

The Data Source Name for the PostgreSQL database in the form of `postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]`. Please note that Postgres enforces SSL by default, you can override this with the parameter `sslmode=disable` if required.


*Type*: `string`


```yml
# Examples

dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable
```

=== `batch_transactions`

When set to true, transactions are batched into a single message.


*Type*: `bool`

*Default*: `true`

=== `stream_snapshot`

When set to true, the plugin will first stream a snapshot of all existing data in the database before streaming changes. In order to use this the tables that are being snapshot MUST have a primary key set so that reading from the table can be parallelized.


*Type*: `bool`

*Default*: `false`

```yml
# Examples

stream_snapshot: true
```

=== `snapshot_memory_safety_factor`

Determines the fraction of available memory that can be used for streaming the snapshot. Values between 0 and 1 represent the percentage of memory to use. Lower values make initial streaming slower but help prevent out-of-memory errors.


*Type*: `float`

*Default*: `1`

```yml
# Examples

snapshot_memory_safety_factor: 0.2
```

=== `snapshot_batch_size`

The number of rows to fetch in each batch when querying the snapshot. A value of 0 lets the plugin determine the batch size based on `snapshot_memory_safety_factor` property.


*Type*: `int`

*Default*: `0`

```yml
# Examples

snapshot_batch_size: 10000
```

=== `schema`

The PostgreSQL schema from which to replicate data.


*Type*: `string`


```yml
# Examples

schema: public
```

=== `tables`

A list of table names to include in the logical replication. Each table should be specified as a separate item.


*Type*: `array`


```yml
# Examples

tables: |2-
- my_table
- my_table_2

```

=== `checkpoint_limit`

The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given LSN will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.


*Type*: `int`

*Default*: `1024`

=== `temporary_slot`

If set to true, creates a temporary replication slot that is automatically dropped when the connection is closed.


*Type*: `bool`

*Default*: `false`

=== `slot_name`

The name of the PostgreSQL logical replication slot to use. If not provided, a random name will be generated. You can create this slot manually before starting replication if desired.


*Type*: `string`

*Default*: `""`

```yml
# Examples

slot_name: my_test_slot
```

=== `pg_standby_timeout`

Specify the standby timeout before refreshing an idle connection.


*Type*: `string`

*Default*: `"10s"`

```yml
# Examples

pg_standby_timeout: 30s
```

=== `pg_wal_monitor_interval`

How often to report changes to the replication lag.


*Type*: `string`

*Default*: `"3s"`

```yml
# Examples

pg_wal_monitor_interval: 6s
```

=== `max_parallel_snapshot_tables`

Int specifies a number of tables that will be processed in parallel during the snapshot processing stage


*Type*: `int`

*Default*: `1`

=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.


*Type*: `bool`

*Default*: `true`

=== `batching`

Allows you to configure a xref:configuration:batching.adoc[batching policy].


*Type*: `object`


```yml
# Examples

batching:
byte_size: 5000
count: 0
period: 1s

batching:
count: 10
period: 1s

batching:
check: this.contains("END BATCH")
count: 0
period: 1m
```

=== `batching.count`

A number of messages at which the batch should be flushed. If `0` disables count based batching.


*Type*: `int`

*Default*: `0`

=== `batching.byte_size`

An amount of bytes at which the batch should be flushed. If `0` disables size based batching.


*Type*: `int`

*Default*: `0`

=== `batching.period`

A period in which an incomplete batch should be flushed regardless of its size.


*Type*: `string`

*Default*: `""`

```yml
# Examples

period: 1s

period: 1m

period: 500ms
```

=== `batching.check`

A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.


*Type*: `string`

*Default*: `""`

```yml
# Examples

check: this.type == "end_of_transaction"
```

=== `batching.processors`

A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.


*Type*: `array`


```yml
# Examples

processors:
- archive:
format: concatenate

processors:
- archive:
format: lines

processors:
- archive:
format: json_array
```


Loading
Loading