diff --git a/pyproject.toml b/pyproject.toml index 8631c44..88a8ff9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "sinker" -version = "0.1.2" +version = "0.1.3" description = "Synchronize Postgres to Elasticsearch" authors = ["Loren Siebert "] license = "MIT/Apache-2.0" @@ -15,6 +15,7 @@ elasticsearch = "^8.17.0" environs = ">=9.5,<15.0" psycopg = "^3.1.8" pytest-mock = "^3.10.0" +sqlglot = "^26.2.1" [tool.poetry.group.dev.dependencies] flake8 = ">=6,<8" diff --git a/src/sinker/sinker.py b/src/sinker/sinker.py index a538618..6047dd6 100644 --- a/src/sinker/sinker.py +++ b/src/sinker/sinker.py @@ -17,7 +17,7 @@ PGCHUNK_SIZE, SCHEMA_TABLE_DELIMITER, ) -from .utils import generate_schema_tables +from .utils import parse_schema_tables logger = logging.getLogger(__name__) @@ -107,23 +107,23 @@ def setup_pg(self) -> None: plpgsql: str = f"{schema_view_name}_fn" create_function: str = q.CREATE_FUNCTION.format(plpgsql, SINKER_SCHEMA, SINKER_TODO_TABLE, schema_view_name) ddl_list.append(create_function) - for schema_table in generate_schema_tables(view_select_query): + # The last table is the top-level table that gets DELETE events with an ID in the replication slot. + # The materialized views do not contain the ID of the doc being deleted, + # so we'll use this table's delete events as a proxy. + # lsn,xid,data + # 0/24EDA4D8,17393,BEGIN 17393 + # 0/24EDA4D8,17393,"table public.""Foo"": DELETE: id[text]:'91754ea9-2983-4cf7-bdf9-fc23d2386d90'" + # 0/24EDC1B0,17393,COMMIT 17393 + # 0/24EDC228,17394,BEGIN 17394 + # 0/24EF0D60,17394,table sinker.foo_mv: DELETE: (no-tuple-data) + # 0/24EF4718,17394,COMMIT 17394 + self.parent_table, schema_tables = parse_schema_tables(view_select_query) + for schema_table in schema_tables: schema, _, table = schema_table.rpartition(SCHEMA_TABLE_DELIMITER) schema = schema or DEFAULT_SCHEMA trigger_name: str = f"{SINKER_SCHEMA}_{self.view}_{schema}_{table}" create_trigger: str = q.CREATE_TRIGGER.format(trigger_name, schema, table, plpgsql) ddl_list.append(create_trigger) - # The last table is the top-level table that gets DELETE events with an ID in the replication slot. - # The materialized views do not contain the ID of the doc being deleted, - # so we'll use this table's delete events as a proxy. - # lsn,xid,data - # 0/24EDA4D8,17393,BEGIN 17393 - # 0/24EDA4D8,17393,"table public.""Foo"": DELETE: id[text]:'91754ea9-2983-4cf7-bdf9-fc23d2386d90'" - # 0/24EDC1B0,17393,COMMIT 17393 - # 0/24EDC228,17394,BEGIN 17394 - # 0/24EF0D60,17394,table sinker.foo_mv: DELETE: (no-tuple-data) - # 0/24EF4718,17394,COMMIT 17394 - self.parent_table = schema_table create_todo_entry: str = q.CREATE_TODO_ENTRY.format(SINKER_SCHEMA, SINKER_TODO_TABLE, schema_view_name) ddl_list.append(create_todo_entry) psycopg.connect(autocommit=True).execute("; ".join(ddl_list)) diff --git a/src/sinker/utils.py b/src/sinker/utils.py index cfed5c9..79e606f 100644 --- a/src/sinker/utils.py +++ b/src/sinker/utils.py @@ -1,20 +1,18 @@ -import re +from typing import Set, Tuple -from typing import Iterable +import sqlglot +from sqlglot.expressions import Table, CTE -TABLE_RE = re.compile(r"from\s\"?(\S+)\b", re.I) - -def generate_schema_tables(view_select_query: str) -> Iterable[str]: +def parse_schema_tables(view_select_query: str) -> Tuple[str, Set[str]]: """ - Given a view select query, return a list of unique tables that are referenced in the query - in the order they were encountered. + Given a view select query, return a primary parent table and the set of unique tables that are referenced in the query. Skip anything that looks like a function call. :param view_select_query: The select query from the view """ - seen: set = set() - for table_candidate in TABLE_RE.findall(view_select_query): - if "(" not in table_candidate: - if table_candidate not in seen: - seen.add(table_candidate) - yield table_candidate + parsed = sqlglot.parse_one(view_select_query) + parent_table = parsed.find(Table).name + tables = {table.name for table in parsed.find_all(Table)} + ctes = {cte.alias for cte in parsed.find_all(CTE)} + schema_tables = tables - ctes + return parent_table, schema_tables diff --git a/tests/test_generate_schema_tables.py b/tests/test_generate_schema_tables.py deleted file mode 100644 index ef8cb14..0000000 --- a/tests/test_generate_schema_tables.py +++ /dev/null @@ -1,15 +0,0 @@ -from sinker.utils import generate_schema_tables - - -def test_generate_schema_tables(): - view_select_query = """select id, - json_build_object( - 'name', "name", - 'otherEmailDomains',(select array_agg(split_part(email, '@', 2)) FROM unnest(emails) as email), - 'emailDomains', (select array_agg(split_part(value, '@', 2)) - from "EmailAddress" EA where "personId"="Person".id), - 'emailAddresses', (select array_agg(value) from "EmailAddress" EA where "personId"="Person".id), - ) as "person" - from "person" - """ - assert list(generate_schema_tables(view_select_query)) == ["EmailAddress", "person"] diff --git a/tests/test_parse_schema_tables.py b/tests/test_parse_schema_tables.py new file mode 100644 index 0000000..1b7e9f6 --- /dev/null +++ b/tests/test_parse_schema_tables.py @@ -0,0 +1,60 @@ +from sinker.utils import parse_schema_tables + + +def test_parse_schema_tables(): + view_select_query = """select id, + json_build_object( + 'name', "name", + 'otherEmailDomains',(select array_agg(split_part(email, '@', 2)) FROM unnest(emails) as email), + 'emailDomains', (select array_agg(split_part(value, '@', 2)) + from "EmailAddress" EA where "personId"="Person".id), + 'emailAddresses', (select array_agg(value) from "EmailAddress" EA where "personId"="Person".id), + ) as "person" + from "person" + """ + parent_table, schema_tables = parse_schema_tables(view_select_query) + assert parent_table == "person" + assert schema_tables == {"EmailAddress", "person"} + +def test_parse_schema_tables_with_cte(): + view_select_query = """ + WITH + attendees AS ( + SELECT DISTINCT ON (a."personId", a."hostedEventId") + a."hostedEventId", + a.status, + e.value as email, + p."primaryOrganizationId" + FROM "HostedEventAttendance" a + JOIN "Person" p ON a."personId" = p.id + JOIN "EmailAddress" e ON p.id = e."personId" + GROUP BY + a."personId", + a."hostedEventId", + a.status, + e.value, + p."primaryOrganizationId" + ) + SELECT + id, + json_build_object( + 'summary', "name", + 'startTime', "timestamp", + 'attendees', ( + SELECT json_agg(json_build_object('email', attendees.email, 'eventResponse', attendees.status)) AS formatted_attendees + FROM attendees + WHERE attendees."hostedEventId" = "HostedEvent".id + ), + 'organizationIds', + ( + SELECT array_agg(attendees."primaryOrganizationId") + FROM attendees + WHERE attendees."hostedEventId" = "HostedEvent".id + ) + ) AS "hosted_events" + FROM + "HostedEvent" + """ + parent_table, schema_tables = parse_schema_tables(view_select_query) + assert parent_table == "HostedEvent" + assert schema_tables == {"EmailAddress", "HostedEvent", "HostedEventAttendance", "Person"}