-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Postgres replication #392
Postgres replication #392
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks super solid already. I had a few questions, mostly about the transition from syncing the snapshot table to when we start replication from a slot (do we lose data?)
do you think that going for binary pgout is a good idea after all? I think json one will be simpler and faster
- messages are created AFAIK on the server so you do not need to decode them
- the structure is almost good to be loaded
we can actually keep both but we add a lot of code that should probably be tested
return dlt.resource( | ||
table_replication_items, | ||
name=table, | ||
write_disposition="merge", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for tables with INSERT only (ie. logs) append
will also work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, should work, didn't think of that. I can fetch details about the publication using pg_publication
catalog and use append
if it only publishes insert
. Will be more efficient than merge
.
from copy import deepcopy | ||
from dataclasses import dataclass, field | ||
|
||
import psycopg2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there will be people using sql_database
that are not using postgres replication. we are not surfacing this into top level init? anyway... I think we may land it in separate verified source (if we do not share anything with sql alchemy source)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate source makes sense to me. Only thing is that the table_snapshot
resource for initial loading uses the sql_table
resource under the hood. How should we handle one verified source depending on another?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd just assume that user did dlt init
twice: for replication and for sql_database. if you cannot import sql_table
you can raise exception and tell user to use dlt init
. we have nothing better now :)
self.last_commit_ts: pendulum.DateTime | ||
self.last_commit_lsn = None | ||
|
||
def __call__(self, msg: ReplicationMessage) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do I understand correctly that we get a message one by one? I hope that our python code will keep up. also we need some batching (as you say in the ticket)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, messages come in one by one. I will implement batching and then do some load tests to see if performance is acceptable for larger volumes.
the table schema. | ||
""" | ||
# store table schema information | ||
columns = {c.name: _to_dlt_column_schema(c) for c in decoded_msg.columns} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when do we get this message? is it coming with every change? it looked like it in jsonpgout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my previous comment.
@rudolfix see my replies on your comments. Regarding
That being said, |
@jorritsandbrink it should be easy. if there > 1 table we still have one LSN with which we track new messages right? if so the change is trivial: you can specify a table name which is resolved dynamically: drawbacks:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is so good! I have a few code improvements and one optimization that will make this way faster. I love that we have so many tests.
it also looks to me that dlt-hub/dlt#1105 is crucial to support replication of many tables from a single resource
## Initialize the pipeline | ||
|
||
```bash | ||
dlt init pg_replication duckdb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a good place to tell the user to use dlt init sql_database ...
if they want to use this initial resource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extended the README: 57b5e1e
sources/pg_replication/__init__.py
Outdated
from .helpers import _gen_replication_resource_name, get_pub_ops, replication_items | ||
|
||
|
||
@dlt.sources.config.with_config(sections=("sources", "pg_replication")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you need this trick because you want to generate a resource name dynamically. now this is supported without tricks. see ie. kinesis resource:
name=lambda args: args["stream_name"],
primary_key="_kinesis_msg_id",
standalone=True,
)
def kinesis_stream(
stream_name: str = dlt.config.value,
I think you can move all the settings to decorator now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in eec75f0
from .schema_types import _to_dlt_column_schema, _to_dlt_val | ||
|
||
|
||
@dlt.sources.config.with_config(sections=("sources", "pg_replication")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can declare it as a dlt.source
? it returns many resources now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it only returns resources on the first call (when it creates the slot) and only if persist_snapshots
is True
. So I'd say it's primarily a helper to set up replication, rather than a source.
sources/pg_replication/helpers.py
Outdated
columns: Optional[Dict[str, TTableHintTemplate[TAnySchemaColumns]]] = None, | ||
target_batch_size: int = 1000, | ||
flush_slot: bool = True, | ||
write_disposition: TWriteDisposition = "append", # TODO: remove after https://github.com/dlt-hub/dlt/issues/1031 has been released |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is already merged and available in 0.4.7a0 (pre-release)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works nicely with 0.4.7 indeed. I removed the obsolete lines of code in 5636e07.
sources/pg_replication/helpers.py
Outdated
""" | ||
# start where we left off in previous run | ||
resource_name = _gen_replication_resource_name(slot_name, pub_name) | ||
start_lsn = dlt.current.resource_state(resource_name).get("last_commit_lsn", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason not to use Incremental? the code below is surely faster than incremental...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say it's simpler without Incremental
, didn't see a reason to use it.
sources/pg_replication/helpers.py
Outdated
write_disposition=write_disposition, # TODO: remove after https://github.com/dlt-hub/dlt/issues/1031 has been released | ||
) | ||
yield from gen | ||
if gen.generated_all: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is good! just one thing: if gen
is closed ie by
add_limit`) your cleanup code will not be executed because yield raises GeneratorExit... please do:
try
yield from gen
finally:
...your finalization code
I do not like this because it is not intuitive... I do not see how to make it easier without rewriting code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case we need the cleanup code to only execute if the batch has been yielded completely. We can't split at any LSN, it needs to be the LSN of a Commit
message. This is because transactions need to be processed atomically, and transactions can consist of multiple messages (e.g. 100 inserted records = 100 messages). The code has been set up such that batch boundaries are always Commit
messages. Replication can also only start (start_lsn
) at the beginning of a transaction. So if we have a transaction of 100 inserts, and add_limit(50)
we don't want to persist resource state, because we haven't yielded the entire transaction.
# https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns | ||
cur.execute( | ||
f""" | ||
SELECT a.attname |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is probably good enough. but sql_table
returns primary key (also compound one) in primary_key
hint. is that not enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a primary key or some kind of unique column? we use SqlAlchemy reflection to get the table schema. what is the schema o this table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I get it now. sql_table
creates a resource for the snapshot table, which doesn't have a primary key, because it is created with a CTAS statement:
cur.execute(
f"""
START TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET TRANSACTION SNAPSHOT '{snapshot_name}';
CREATE TABLE {snapshot_qual_name} AS SELECT {col_str} FROM {qual_name};
"""
)
I point _get_pk
at the original table (table_name
instead of snapshot_table_name
), which does have a primary key, hence the difference.
sources/pg_replication/helpers.py
Outdated
Stores table schema information from Relation message in object state. | ||
""" | ||
# store table schema information | ||
columns = {c.name: _to_dlt_column_schema(c) for c in decoded_msg.columns} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a full table schema or just modified columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decoded_msg.columns
has all columns, modified or not.
sources/pg_replication/helpers.py
Outdated
data_item["lsn"] = lsn | ||
if for_delete: | ||
data_item["deleted_ts"] = commit_ts | ||
return dlt.mark.with_hints( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is all correct but will be very slow. what happens currently is
- you yield items one by one
- each item has full hints with it.
this will generate a lot of unnecessary processing in the pipeline.
what you should do instead (I think it is not very complicated):
- buffer data items per table name to be able to yield them together
- keep schema changes per table and yield them once
you yield stuff when we reach batch size: - for each table and schema change
yield dlt.mark.with_hints([], dlt.mark.make_hints(...)
- you can yield empty list just to update hints - for each table and data items
yield dlt.mark.with_table_name(data_items[table_name], table_name)
wheredata_items[table_name]
is a list of data items collected for a particular table
the speedup will be significant. and this code structure also makes easier to yield arrow tables if we want to be even faster
tell me WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better indeed! I started building this, but faced the "hint accumulation issue" that's blocking progress. This is on hold until that issue has been completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has now been implemented in 5636e07.
…into 933-postgres-replication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I have one question to the missing PK in SqlAlchemy. let's take a look. PK should be really there.
I expect that for large loads this will be slow. we are doing a lot of operations per row. also the pgout decoder is quite heavy.
# https://wiki.postgresql.org/wiki/Retrieve_primary_key_columns | ||
cur.execute( | ||
f""" | ||
SELECT a.attname |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a primary key or some kind of unique column? we use SqlAlchemy reflection to get the table schema. what is the schema o this table?
elif isinstance(decoded_msg, Delete): | ||
column_data = decoded_msg.old_tuple.column_data | ||
table_name = self.last_table_schema[decoded_msg.relation_id]["name"] | ||
data_item = self.gen_data_item( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks good.
sources/pg_replication/helpers.py
Outdated
- `target_batch_size` is reached | ||
- a table's schema has changed | ||
""" | ||
op = (msg.payload[:1]).decode("utf-8") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could compare binary strings without decoding to make it a little bit faster. also you could put the more frequent (I/U/D) operations to the top
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in 7bd211b.
@jorritsandbrink I finally enabled tests on our CI, and most of them are passing but:
|
@rudolfix The first issue is actually also version related. I was testing on Postgres 16 locally, but have been able to reproduce both issues on Postgres 13.
|
@jorritsandbrink should we spawn another postgres instance just to test replication? I do not want to make it too complicated and I'm totally fine with 15. version. |
@rudolfix yes, using a separate instance for replication sounds good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Tell us what you do here
verified source
)Relevant issue
dlt-hub/dlt#933
More PR info
Adds initial support for postgres replication. Some things are still missing, but this is a good time to get feedback.
pgoutput
pluginpsycopg2
's support for logical replication—this streams messages frompgoutput
into Python in an endless looppypgoutput
to decodepgoutput
's binary messages—the library's functionality to consume messages and transform them into "change events" (Pydantic models) is not used because it only works on Linuxdlt
-compatible Python objects, e.g. the string"t"
becomes the booleanTrue
. "Binary mode" would be faster, but less robust.Relies on a dedicated replication slot and publication for a table. I.e. two tables means two slots and two publications. This provides granular control and does not intruduce significant overhead if I'm not mistaken.No longer the case, changed because of user feedback. It is now possible to replicate one table, multiple tables, or an entire schema using a single publication.Adds two resource types:table_snapshot
for initial load, andtable_changes
for CDC.table_snapshot
persists the state of the table in the snapshot that gets exported when creating a replication slot into a physical table, and then usessql_table
resource to do the resttable_changes
generates and yields "data items" (TDataItem
) and "metadata items" (DataItemWithMeta
) from decoded replication messages. Items are first stored in-memory in a list, before they are yielded from this list.init_replication
to setup a replication slot and publication. This function optionally persists snapshot tables representing the state of the exact moment the replication slot got created. It then returnssql_table
resources to enable an initial load. Users do not need to useinit_replication
—they can create a slot and publication in any other way they see fit.replication_resource
to create aDltResource
that consumes a slot/publication and generates data items with metadata (DataItemWithMeta
). It dispatches data to multiple tables if the publication publishes changes for multiple tables.include_columns
argument to exclude any columns not provided as input (or includes all columns if not provided)Organizes code in subfolder underMoved to its own top-level folder.sql_database
:/sources/sql_database/pg_replication
What's not (yet) included:
Chunking mechanism to limit batch size inimplementedtable_changes
no longer applies—multiple tables are now handled at the resource levelDltSource
to handle multiple tables / an entire databasetruncate
operationPerhaps some more data type mappingdone—common types are handled and exotic types default totext
Deletion of snapshot table after it has been consumednot implemented—couldn't find a good way to do thisExample pipelinedoneMore testsdone