Skip to content

Commit

Permalink
Support common table expressions (CTEs)
Browse files Browse the repository at this point in the history
- Bump minor version to `0.1.3`
- Use [`sqlglot`](https://github.com/tobymao/sqlglot) for parsing
  • Loading branch information
loren committed Jan 16, 2025
1 parent 89f2f82 commit 20413ba
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 42 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sinker"
version = "0.1.2"
version = "0.1.3"
description = "Synchronize Postgres to Elasticsearch"
authors = ["Loren Siebert <[email protected]>"]
license = "MIT/Apache-2.0"
Expand All @@ -15,6 +15,7 @@ elasticsearch = "^8.17.0"
environs = ">=9.5,<15.0"
psycopg = "^3.1.8"
pytest-mock = "^3.10.0"
sqlglot = "^26.2.1"

[tool.poetry.group.dev.dependencies]
flake8 = ">=6,<8"
Expand Down
26 changes: 13 additions & 13 deletions src/sinker/sinker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
PGCHUNK_SIZE,
SCHEMA_TABLE_DELIMITER,
)
from .utils import generate_schema_tables
from .utils import parse_schema_tables

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,23 +107,23 @@ def setup_pg(self) -> None:
plpgsql: str = f"{schema_view_name}_fn"
create_function: str = q.CREATE_FUNCTION.format(plpgsql, SINKER_SCHEMA, SINKER_TODO_TABLE, schema_view_name)
ddl_list.append(create_function)
for schema_table in generate_schema_tables(view_select_query):
# The last table is the top-level table that gets DELETE events with an ID in the replication slot.
# The materialized views do not contain the ID of the doc being deleted,
# so we'll use this table's delete events as a proxy.
# lsn,xid,data
# 0/24EDA4D8,17393,BEGIN 17393
# 0/24EDA4D8,17393,"table public.""Foo"": DELETE: id[text]:'91754ea9-2983-4cf7-bdf9-fc23d2386d90'"
# 0/24EDC1B0,17393,COMMIT 17393
# 0/24EDC228,17394,BEGIN 17394
# 0/24EF0D60,17394,table sinker.foo_mv: DELETE: (no-tuple-data)
# 0/24EF4718,17394,COMMIT 17394
self.parent_table, schema_tables = parse_schema_tables(view_select_query)
for schema_table in schema_tables:
schema, _, table = schema_table.rpartition(SCHEMA_TABLE_DELIMITER)
schema = schema or DEFAULT_SCHEMA
trigger_name: str = f"{SINKER_SCHEMA}_{self.view}_{schema}_{table}"
create_trigger: str = q.CREATE_TRIGGER.format(trigger_name, schema, table, plpgsql)
ddl_list.append(create_trigger)
# The last table is the top-level table that gets DELETE events with an ID in the replication slot.
# The materialized views do not contain the ID of the doc being deleted,
# so we'll use this table's delete events as a proxy.
# lsn,xid,data
# 0/24EDA4D8,17393,BEGIN 17393
# 0/24EDA4D8,17393,"table public.""Foo"": DELETE: id[text]:'91754ea9-2983-4cf7-bdf9-fc23d2386d90'"
# 0/24EDC1B0,17393,COMMIT 17393
# 0/24EDC228,17394,BEGIN 17394
# 0/24EF0D60,17394,table sinker.foo_mv: DELETE: (no-tuple-data)
# 0/24EF4718,17394,COMMIT 17394
self.parent_table = schema_table
create_todo_entry: str = q.CREATE_TODO_ENTRY.format(SINKER_SCHEMA, SINKER_TODO_TABLE, schema_view_name)
ddl_list.append(create_todo_entry)
psycopg.connect(autocommit=True).execute("; ".join(ddl_list))
Expand Down
24 changes: 11 additions & 13 deletions src/sinker/utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import re
from typing import Set, Tuple

from typing import Iterable
import sqlglot
from sqlglot.expressions import Table, CTE

TABLE_RE = re.compile(r"from\s\"?(\S+)\b", re.I)


def generate_schema_tables(view_select_query: str) -> Iterable[str]:
def parse_schema_tables(view_select_query: str) -> Tuple[str, Set[str]]:
"""
Given a view select query, return a list of unique tables that are referenced in the query
in the order they were encountered.
Given a view select query, return a primary parent table and the set of unique tables that are referenced in the query.
Skip anything that looks like a function call.
:param view_select_query: The select query from the view
"""
seen: set = set()
for table_candidate in TABLE_RE.findall(view_select_query):
if "(" not in table_candidate:
if table_candidate not in seen:
seen.add(table_candidate)
yield table_candidate
parsed = sqlglot.parse_one(view_select_query)
parent_table = parsed.find(Table).name
tables = {table.name for table in parsed.find_all(Table)}
ctes = {cte.alias for cte in parsed.find_all(CTE)}
schema_tables = tables - ctes
return parent_table, schema_tables
15 changes: 0 additions & 15 deletions tests/test_generate_schema_tables.py

This file was deleted.

60 changes: 60 additions & 0 deletions tests/test_parse_schema_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from sinker.utils import parse_schema_tables


def test_parse_schema_tables():
view_select_query = """select id,
json_build_object(
'name', "name",
'otherEmailDomains',(select array_agg(split_part(email, '@', 2)) FROM unnest(emails) as email),
'emailDomains', (select array_agg(split_part(value, '@', 2))
from "EmailAddress" EA where "personId"="Person".id),
'emailAddresses', (select array_agg(value) from "EmailAddress" EA where "personId"="Person".id),
) as "person"
from "person"
"""
parent_table, schema_tables = parse_schema_tables(view_select_query)
assert parent_table == "person"
assert schema_tables == {"EmailAddress", "person"}

def test_parse_schema_tables_with_cte():
view_select_query = """
WITH
attendees AS (
SELECT DISTINCT ON (a."personId", a."hostedEventId")
a."hostedEventId",
a.status,
e.value as email,
p."primaryOrganizationId"
FROM "HostedEventAttendance" a
JOIN "Person" p ON a."personId" = p.id
JOIN "EmailAddress" e ON p.id = e."personId"
GROUP BY
a."personId",
a."hostedEventId",
a.status,
e.value,
p."primaryOrganizationId"
)
SELECT
id,
json_build_object(
'summary', "name",
'startTime', "timestamp",
'attendees', (
SELECT json_agg(json_build_object('email', attendees.email, 'eventResponse', attendees.status)) AS formatted_attendees
FROM attendees
WHERE attendees."hostedEventId" = "HostedEvent".id
),
'organizationIds',
(
SELECT array_agg(attendees."primaryOrganizationId")
FROM attendees
WHERE attendees."hostedEventId" = "HostedEvent".id
)
) AS "hosted_events"
FROM
"HostedEvent"
"""
parent_table, schema_tables = parse_schema_tables(view_select_query)
assert parent_table == "HostedEvent"
assert schema_tables == {"EmailAddress", "HostedEvent", "HostedEventAttendance", "Person"}

0 comments on commit 20413ba

Please sign in to comment.