-
Notifications
You must be signed in to change notification settings - Fork 50
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* move config to correct position * extend SQLAlchemy type mapping * add initial support for postgres replication * add credentials instruction * undo adding secret * add module docstring * use from import to prevent AttributeError when running test_dlt_init.py * enable multiple tables per publication * add support for schema replication * add support for unmapped data types * add test for init_replication * update docstrings * return resource instead of single-element list * add example pipeline * add more example pipelines * add nullability hints * add README * add sql_database dependency instruction * batch data items per table and yield hints only once * postpone replication column hints to preserve order * refactor to use resource decorator * add support for table schema changes * optimize message type detection for performance * upgrade dlt to 0.4.8 * enables to run tests in parallel * fixes format * make test more specific to handle postgres version differences * add postgres server version requirement for schema replication functionality * removed whitespace * explicitly fetch credentials from pg_replication source * add superuser check * updates lock file * use psycopg2-binary instead of psycopg2 * use destination-specific escape identifier * replace string literal with int literal * include pypgoutput decoders in library --------- Co-authored-by: Jorrit Sandbrink <[email protected]> Co-authored-by: Marcin Rudolf <[email protected]>
- Loading branch information
1 parent
817822c
commit df0cb7e
Showing
18 changed files
with
2,891 additions
and
5 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
# Postgres replication | ||
[Postgres](https://www.postgresql.org/) is one of the most popular relational database management systems. This verified source uses Postgres' replication functionality to efficiently process changes in tables (a process often referred to as _Change Data Capture_ or CDC). It uses [logical decoding](https://www.postgresql.org/docs/current/logicaldecoding.html) and the standard built-in `pgoutput` [output plugin](https://www.postgresql.org/docs/current/logicaldecoding-output-plugin.html). | ||
|
||
Resources that can be loaded using this verified source are: | ||
|
||
| Name | Description | | ||
|----------------------|-------------------------------------------------| | ||
| replication_resource | Load published messages from a replication slot | | ||
|
||
## Initialize the pipeline | ||
|
||
```bash | ||
dlt init pg_replication duckdb | ||
``` | ||
|
||
This uses `duckdb` as destination, but you can choose any of the supported [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/). | ||
|
||
## Add `sql_database` source | ||
|
||
```bash | ||
dlt init sql_database duckdb | ||
``` | ||
|
||
This source depends on the [sql_database](../sql_database/README.md) verified source internally to perform initial loads. This step can be skipped if you don't do initial loads. | ||
## Set up user | ||
|
||
The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigned: | ||
|
||
```sql | ||
CREATE ROLE replication_user WITH LOGIN REPLICATION; | ||
``` | ||
|
||
It also needs `CREATE` privilege on the database: | ||
|
||
```sql | ||
GRANT CREATE ON DATABASE dlt_data TO replication_user; | ||
``` | ||
|
||
### Set up RDS | ||
1. You must enable replication for RDS Postgres instance via **Parameter Group**: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html | ||
2. `WITH LOGIN REPLICATION;` does not work on RDS, instead do: | ||
```sql | ||
GRANT rds_replication TO replication_user; | ||
``` | ||
3. Do not fallback to non SSL connection by setting connection parameters: | ||
```toml | ||
sources.pg_replication.credentials="postgresql://loader:[email protected]:5432/dlt_data?sslmode=require&connect_timeout=300" | ||
``` | ||
|
||
|
||
## Add credentials | ||
1. Open `.dlt/secrets.toml`. | ||
2. Enter your Postgres credentials: | ||
|
||
```toml | ||
[sources.pg_replication] | ||
credentials="postgresql://replication_user:<<password>>@localhost:5432/dlt_data" | ||
``` | ||
3. Enter credentials for your chosen destination as per the [docs](https://dlthub.com/docs/dlt-ecosystem/destinations/). | ||
|
||
## Run the pipeline | ||
|
||
1. Install the necessary dependencies by running the following command: | ||
|
||
```bash | ||
pip install -r requirements.txt | ||
``` | ||
|
||
1. Now the pipeline can be run by using the command: | ||
|
||
```bash | ||
python pg_replication_pipeline.py | ||
``` | ||
|
||
1. To make sure that everything is loaded as expected, use the command: | ||
|
||
```bash | ||
dlt pipeline pg_replication_pipeline show | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
"""Replicates postgres tables in batch using logical decoding.""" | ||
|
||
from typing import Dict, Sequence, Optional, Iterable, Union | ||
|
||
import dlt | ||
|
||
from dlt.common.typing import TDataItem | ||
from dlt.common.schema.typing import TTableSchemaColumns | ||
from dlt.extract.items import DataItemWithMeta | ||
from dlt.sources.credentials import ConnectionStringCredentials | ||
|
||
from .helpers import advance_slot, get_max_lsn, ItemGenerator | ||
|
||
|
||
@dlt.resource( | ||
name=lambda args: args["slot_name"] + "_" + args["pub_name"], | ||
standalone=True, | ||
) | ||
def replication_resource( | ||
slot_name: str, | ||
pub_name: str, | ||
credentials: ConnectionStringCredentials = dlt.secrets.value, | ||
include_columns: Optional[Dict[str, Sequence[str]]] = None, | ||
columns: Optional[Dict[str, TTableSchemaColumns]] = None, | ||
target_batch_size: int = 1000, | ||
flush_slot: bool = True, | ||
) -> Iterable[Union[TDataItem, DataItemWithMeta]]: | ||
"""Resource yielding data items for changes in one or more postgres tables. | ||
- Relies on a replication slot and publication that publishes DML operations | ||
(i.e. `insert`, `update`, and/or `delete`). Helper `init_replication` can be | ||
used to set this up. | ||
- Maintains LSN of last consumed message in state to track progress. | ||
- At start of the run, advances the slot upto last consumed message in previous run. | ||
- Processes in batches to limit memory usage. | ||
Args: | ||
slot_name (str): Name of the replication slot to consume replication messages from. | ||
pub_name (str): Name of the publication that publishes DML operations for the table(s). | ||
credentials (ConnectionStringCredentials): Postgres database credentials. | ||
include_columns (Optional[Dict[str, Sequence[str]]]): Maps table name(s) to | ||
sequence of names of columns to include in the generated data items. | ||
Any column not in the sequence is excluded. If not provided, all columns | ||
are included. For example: | ||
``` | ||
include_columns={ | ||
"table_x": ["col_a", "col_c"], | ||
"table_y": ["col_x", "col_y", "col_z"], | ||
} | ||
``` | ||
columns (Optional[Dict[str, TTableHintTemplate[TAnySchemaColumns]]]): Maps | ||
table name(s) to column hints to apply on the replicated table(s). For example: | ||
``` | ||
columns={ | ||
"table_x": {"col_a": {"data_type": "complex"}}, | ||
"table_y": {"col_y": {"precision": 32}}, | ||
} | ||
``` | ||
target_batch_size (int): Desired number of data items yielded in a batch. | ||
Can be used to limit the data items in memory. Note that the number of | ||
data items yielded can be (far) greater than `target_batch_size`, because | ||
all messages belonging to the same transaction are always processed in | ||
the same batch, regardless of the number of messages in the transaction | ||
and regardless of the value of `target_batch_size`. The number of data | ||
items can also be smaller than `target_batch_size` when the replication | ||
slot is exhausted before a batch is full. | ||
flush_slot (bool): Whether processed messages are discarded from the replication | ||
slot. Recommended value is True. Be careful when setting False—not flushing | ||
can eventually lead to a “disk full” condition on the server, because | ||
the server retains all the WAL segments that might be needed to stream | ||
the changes via all of the currently open replication slots. | ||
Yields: | ||
Data items for changes published in the publication. | ||
""" | ||
# start where we left off in previous run | ||
start_lsn = dlt.current.resource_state().get("last_commit_lsn", 0) | ||
if flush_slot: | ||
advance_slot(start_lsn, slot_name, credentials) | ||
|
||
# continue until last message in replication slot | ||
options = {"publication_names": pub_name, "proto_version": "1"} | ||
upto_lsn = get_max_lsn(slot_name, options, credentials) | ||
if upto_lsn is None: | ||
return "Replication slot is empty." | ||
|
||
# generate items in batches | ||
while True: | ||
gen = ItemGenerator( | ||
credentials=credentials, | ||
slot_name=slot_name, | ||
options=options, | ||
upto_lsn=upto_lsn, | ||
start_lsn=start_lsn, | ||
target_batch_size=target_batch_size, | ||
include_columns=include_columns, | ||
columns=columns, | ||
) | ||
yield from gen | ||
if gen.generated_all: | ||
dlt.current.resource_state()["last_commit_lsn"] = gen.last_commit_lsn | ||
break | ||
start_lsn = gen.last_commit_lsn |
Oops, something went wrong.