Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres replication #392

Merged
merged 44 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cb58fbc
WIP
Feb 25, 2024
d218a3a
WIP
Mar 3, 2024
945da6d
Merge branch 'master' of https://github.com/dlt-hub/verified-sources …
Mar 3, 2024
fa9a4c1
move config to correct position
Mar 4, 2024
4cdf823
extend SQLAlchemy type mapping
Mar 12, 2024
8808812
add initial support for postgres replication
Mar 12, 2024
1914acf
add credentials instruction
Mar 12, 2024
36739ec
Merge branch 'master' of https://github.com/dlt-hub/verified-sources …
Mar 12, 2024
cc6a11d
undo adding secret
Mar 12, 2024
f815361
add module docstring
Mar 12, 2024
a318fee
use from import to prevent AttributeError when running test_dlt_init.py
Mar 13, 2024
8aed399
enable multiple tables per publication
Mar 15, 2024
9fc3c39
add support for schema replication
Mar 16, 2024
8c2f905
add support for unmapped data types
Mar 16, 2024
a0af605
add test for init_replication
Mar 17, 2024
051830c
update docstrings
Mar 17, 2024
656989a
return resource instead of single-element list
Mar 17, 2024
d014645
add example pipeline
Mar 18, 2024
269422e
add more example pipelines
Mar 18, 2024
c674f24
add nullability hints
Mar 18, 2024
a919c82
add README
Mar 18, 2024
57b5e1e
add sql_database dependency instruction
Mar 19, 2024
5636e07
batch data items per table and yield hints only once
Mar 22, 2024
2713464
postpone replication column hints to preserve order
Mar 22, 2024
eec75f0
refactor to use resource decorator
Mar 22, 2024
aae3754
Merge branch 'master' of https://github.com/dlt-hub/verified-sources …
Mar 22, 2024
493147d
add support for table schema changes
Mar 23, 2024
7bd211b
optimize message type detection for performance
Mar 25, 2024
48442ba
upgrade dlt to 0.4.8
Apr 9, 2024
d303efd
Merge branch 'master' of https://github.com/dlt-hub/verified-sources …
Apr 9, 2024
524945f
enables to run tests in parallel
rudolfix Apr 14, 2024
ab005a1
Merge branch 'master' into 933-postgres-replication
rudolfix Apr 14, 2024
c596180
fixes format
rudolfix Apr 14, 2024
34610b6
make test more specific to handle postgres version differences
Apr 15, 2024
7a07045
add postgres server version requirement for schema replication functi…
Apr 15, 2024
61712b4
removed whitespace
Apr 15, 2024
fd1d973
explicitly fetch credentials from pg_replication source
Apr 22, 2024
8bc4da3
add superuser check
Apr 22, 2024
796c980
Merge branch 'master' into 933-postgres-replication
rudolfix May 1, 2024
77fb1dd
updates lock file
rudolfix May 1, 2024
8a1d910
use psycopg2-binary instead of psycopg2
May 2, 2024
b0d2abb
use destination-specific escape identifier
May 2, 2024
f63ceff
replace string literal with int literal
May 2, 2024
22758fe
include pypgoutput decoders in library
May 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ black = "^23.3.0"
pypdf2 = "^3.0.1"
greenlet = "<3.0.0"
confluent-kafka = "^2.3.0"
types-psycopg2 = "^2.9.0"
pytest-mock = "^3.12.0"
twisted = "22.10.0"
pytest-forked = "^1.6.0"
Expand All @@ -41,6 +42,9 @@ sqlalchemy = ">=1.4"
pymysql = "^1.0.3"
connectorx = ">=0.3.1"

[tool.poetry.group.pg_replication.dependencies]
psycopg2-binary = ">=2.9.9"

[tool.poetry.group.google_sheets.dependencies]
google-api-python-client = "^2.78.0"

Expand Down
2 changes: 1 addition & 1 deletion sources/.dlt/example.secrets.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ location = "US"
## chess pipeline
# the section below defines secrets for "chess_dlt_config_example" source in chess/__init__.py
[sources.chess]
secret_str="secret string" # a string secret
secret_str="secret string" # a string secret
2 changes: 1 addition & 1 deletion sources/filesystem/helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Helpers for the filesystem resource."""
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Type, Union
from typing import Any, Dict, Iterable, List, Optional, Type, Union
from fsspec import AbstractFileSystem # type: ignore

from dlt.common.configuration import resolve_type
Expand Down
79 changes: 79 additions & 0 deletions sources/pg_replication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Postgres replication
[Postgres](https://www.postgresql.org/) is one of the most popular relational database management systems. This verified source uses Postgres' replication functionality to efficiently process changes in tables (a process often referred to as _Change Data Capture_ or CDC). It uses [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) and the standard built-in `pgoutput` [output plugin](https://www.postgresql.org/docs/current/logicaldecoding-output-plugin.html).

Resources that can be loaded using this verified source are:

| Name | Description |
|----------------------|-------------------------------------------------|
| replication_resource | Load published messages from a replication slot |

## Initialize the pipeline

```bash
dlt init pg_replication duckdb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good place to tell the user to use dlt init sql_database ... if they want to use this initial resource

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extended the README: 57b5e1e

```

This uses `duckdb` as destination, but you can choose any of the supported [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Add `sql_database` source

```bash
dlt init sql_database duckdb
```

This source depends on the [sql_database](../sql_database/README.md) verified source internally to perform initial loads. This step can be skipped if you don't do initial loads.
## Set up user

The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigned:

```sql
CREATE ROLE replication_user WITH LOGIN REPLICATION;
```

It also needs `CREATE` privilege on the database:

```sql
GRANT CREATE ON DATABASE dlt_data TO replication_user;
```

### Set up RDS
1. You must enable replication for RDS Postgres instance via **Parameter Group**: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html
2. `WITH LOGIN REPLICATION;` does not work on RDS, instead do:
```sql
GRANT rds_replication TO replication_user;
```
3. Do not fallback to non SSL connection by setting connection parameters:
```toml
sources.pg_replication.credentials="postgresql://loader:[email protected]:5432/dlt_data?sslmode=require&connect_timeout=300"
```


## Add credentials
1. Open `.dlt/secrets.toml`.
2. Enter your Postgres credentials:

```toml
[sources.pg_replication]
credentials="postgresql://replication_user:<<password>>@localhost:5432/dlt_data"
```
3. Enter credentials for your chosen destination as per the [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Run the pipeline

1. Install the necessary dependencies by running the following command:

```bash
pip install -r requirements.txt
```

1. Now the pipeline can be run by using the command:

```bash
python pg_replication_pipeline.py
```

1. To make sure that everything is loaded as expected, use the command:

```bash
dlt pipeline pg_replication_pipeline show
```
103 changes: 103 additions & 0 deletions sources/pg_replication/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Replicates postgres tables in batch using logical decoding."""

from typing import Dict, Sequence, Optional, Iterable, Union

import dlt

from dlt.common.typing import TDataItem
from dlt.common.schema.typing import TTableSchemaColumns
from dlt.extract.items import DataItemWithMeta
from dlt.sources.credentials import ConnectionStringCredentials

from .helpers import advance_slot, get_max_lsn, ItemGenerator


@dlt.resource(
name=lambda args: args["slot_name"] + "_" + args["pub_name"],
standalone=True,
)
def replication_resource(
slot_name: str,
pub_name: str,
credentials: ConnectionStringCredentials = dlt.secrets.value,
include_columns: Optional[Dict[str, Sequence[str]]] = None,
columns: Optional[Dict[str, TTableSchemaColumns]] = None,
target_batch_size: int = 1000,
flush_slot: bool = True,
) -> Iterable[Union[TDataItem, DataItemWithMeta]]:
"""Resource yielding data items for changes in one or more postgres tables.
- Relies on a replication slot and publication that publishes DML operations
(i.e. `insert`, `update`, and/or `delete`). Helper `init_replication` can be
used to set this up.
- Maintains LSN of last consumed message in state to track progress.
- At start of the run, advances the slot upto last consumed message in previous run.
- Processes in batches to limit memory usage.
Args:
slot_name (str): Name of the replication slot to consume replication messages from.
pub_name (str): Name of the publication that publishes DML operations for the table(s).
credentials (ConnectionStringCredentials): Postgres database credentials.
include_columns (Optional[Dict[str, Sequence[str]]]): Maps table name(s) to
sequence of names of columns to include in the generated data items.
Any column not in the sequence is excluded. If not provided, all columns
are included. For example:
```
include_columns={
"table_x": ["col_a", "col_c"],
"table_y": ["col_x", "col_y", "col_z"],
}
```
columns (Optional[Dict[str, TTableHintTemplate[TAnySchemaColumns]]]): Maps
table name(s) to column hints to apply on the replicated table(s). For example:
```
columns={
"table_x": {"col_a": {"data_type": "complex"}},
"table_y": {"col_y": {"precision": 32}},
}
```
target_batch_size (int): Desired number of data items yielded in a batch.
Can be used to limit the data items in memory. Note that the number of
data items yielded can be (far) greater than `target_batch_size`, because
all messages belonging to the same transaction are always processed in
the same batch, regardless of the number of messages in the transaction
and regardless of the value of `target_batch_size`. The number of data
items can also be smaller than `target_batch_size` when the replication
slot is exhausted before a batch is full.
flush_slot (bool): Whether processed messages are discarded from the replication
slot. Recommended value is True. Be careful when setting False—not flushing
can eventually lead to a “disk full” condition on the server, because
the server retains all the WAL segments that might be needed to stream
the changes via all of the currently open replication slots.
Yields:
Data items for changes published in the publication.
"""
# start where we left off in previous run
start_lsn = dlt.current.resource_state().get("last_commit_lsn", 0)
if flush_slot:
advance_slot(start_lsn, slot_name, credentials)

# continue until last message in replication slot
options = {"publication_names": pub_name, "proto_version": "1"}
upto_lsn = get_max_lsn(slot_name, options, credentials)
if upto_lsn is None:
return "Replication slot is empty."

# generate items in batches
while True:
gen = ItemGenerator(
credentials=credentials,
slot_name=slot_name,
options=options,
upto_lsn=upto_lsn,
start_lsn=start_lsn,
target_batch_size=target_batch_size,
include_columns=include_columns,
columns=columns,
)
yield from gen
if gen.generated_all:
dlt.current.resource_state()["last_commit_lsn"] = gen.last_commit_lsn
break
start_lsn = gen.last_commit_lsn
Loading
Loading