Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Store arbitrary relations from events #11391

Merged
merged 12 commits into from
Nov 22, 2021
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1712,11 +1712,11 @@ def _handle_event_relations(

# Relations must have a type and parent event ID.
rel_type = relation.get("rel_type")
if not rel_type:
if not isinstance(rel_type, str):
return

parent_id = relation.get("event_id")
if not parent_id:
if not isinstance(parent_id, str):
return

# Annotations have a key field.
Expand Down
67 changes: 43 additions & 24 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import attr

from synapse.api.constants import EventContentFields
from synapse.api.constants import EventContentFields, RelationTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -171,6 +171,9 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
self._purged_chain_cover_index,
)

# The event_thread_relation background update was replaced with the
# event_arbitrary_relations one, which handles any relation to avoid
# needed to potentially crawl the entire events table in the future.
self.db_pool.updates.register_noop_background_update("event_thread_relation")
clokep marked this conversation as resolved.
Show resolved Hide resolved

self.db_pool.updates.register_background_update_handler(
Expand Down Expand Up @@ -1109,21 +1112,20 @@ async def _event_arbitrary_relations(
last_event_id = progress.get("last_event_id", "")

def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
# Iterate over events which do not appear in the event_relations
# table -- they might have a relation that was not previously stored
# due to an unknown relation type.
# Fetch events and then filter based on whether the event has a
# relation or not.
txn.execute(
"""
SELECT event_id, json FROM event_json
LEFT JOIN event_relations USING (event_id)
WHERE event_id > ? AND event_relations.event_id IS NULL
WHERE event_id > ?
ORDER BY event_id LIMIT ?
""",
(last_event_id, batch_size),
)

results = list(txn)
missing_relations = []
# (event_id, parent_id, rel_type) for each relation
missing_relations: List[Tuple[str, str, str]] = []
clokep marked this conversation as resolved.
Show resolved Hide resolved
for (event_id, event_json_raw) in results:
try:
event_json = db_to_json(event_json_raw)
Expand All @@ -1140,32 +1142,49 @@ def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
if not relates_to or not isinstance(relates_to, dict):
continue

# The only expected relation type would be from threads, but
# there could be other unknown ones.
# If the relation type or parent event ID is not a string a
# string, skip it.
clokep marked this conversation as resolved.
Show resolved Hide resolved
#
# Do not consider relation types that have existed for a long time,
# only include the new thread relation and any unknown relations.
clokep marked this conversation as resolved.
Show resolved Hide resolved
rel_type = relates_to.get("rel_type")
if not isinstance(rel_type, str):
if not isinstance(rel_type, str) or rel_type in (
RelationTypes.ANNOTATION,
RelationTypes.REFERENCE,
RelationTypes.REPLACE,
):
continue

# Get the parent ID.
parent_id = relates_to.get("event_id")
if not isinstance(parent_id, str):
continue

missing_relations.append((event_id, parent_id, rel_type))

# Insert the missing data.
self.db_pool.simple_insert_many_txn(
txn=txn,
table="event_relations",
values=[
{
"event_id": event_id,
"relates_to_Id": parent_id,
"relation_type": rel_type,
}
for event_id, parent_id, rel_type in missing_relations
],
)
# Insert the missing data, note that we upsert here in-case the event
clokep marked this conversation as resolved.
Show resolved Hide resolved
# has already been processed.
if missing_relations:
self.db_pool.simple_upsert_many_txn(
txn=txn,
table="event_relations",
key_names=("event_id",),
key_values=[(r[0],) for r in missing_relations],
value_names=("relates_to_id", "relation_type"),
value_values=[r[1:] for r in missing_relations],
)

# Iterate the parent IDs and invalidate caches.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we usually clear caches as part of background updates, but in this case I think it could be a bit broken without doing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems sensible

for parent_id in {r[1] for r in missing_relations}:
cache_tuple = (parent_id,)
self._invalidate_cache_and_stream(
txn, self.get_relations_for_event, cache_tuple
)
self._invalidate_cache_and_stream(
txn, self.get_aggregation_groups_for_event, cache_tuple
)
self._invalidate_cache_and_stream(
txn, self.get_thread_summary, cache_tuple
)

if results:
latest_event_id = results[-1][0]
Expand Down
64 changes: 64 additions & 0 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def default_config(self) -> dict:
return config

def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()

self.user_id, self.user_token = self._create_user("alice")
self.user2_id, self.user2_token = self._create_user("bob")

Expand Down Expand Up @@ -858,3 +860,65 @@ def _create_user(self, localpart: str) -> Tuple[str, str]:
access_token = self.login(localpart, "abc123")

return user_id, access_token

def test_background_update(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting this in this file seems dirty, but it seems best to keep relations tests together? Maybe it would be better to start a storage test file and we can add to it in the future when fleshing out some unit tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best here.

"""Test the event_arbitrary_relations background update."""
channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍")
self.assertEquals(200, channel.code, channel.json_body)
annotation_event_id_good = channel.json_body["event_id"]

channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="A")
self.assertEquals(200, channel.code, channel.json_body)
annotation_event_id_bad = channel.json_body["event_id"]

channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
self.assertEquals(200, channel.code, channel.json_body)
thread_event_id = channel.json_body["event_id"]

# Clean-up the table as if the inserts did not happen during event creation.
self.get_success(
self.store.db_pool.simple_delete_many(
table="event_relations",
column="event_id",
iterable=(annotation_event_id_bad, thread_event_id),
keyvalues={},
desc="RelationsTestCase.test_background_update",
)
)

# Only the "good" annotation should be found.
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertEquals(
[ev["event_id"] for ev in channel.json_body["chunk"]],
[annotation_event_id_good],
)

# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{"update_name": "event_arbitrary_relations", "progress_json": "{}"},
)
)

# Ugh, have to reset this flag
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()

# The "good" annotation and the thread should be found, but not the "bad"
# annotation.
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertCountEqual(
[ev["event_id"] for ev in channel.json_body["chunk"]],
[annotation_event_id_good, thread_event_id],
)
7 changes: 6 additions & 1 deletion tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ def wait_on_thread(self, deferred, timeout=10):
time.sleep(0.01)

def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
"""
Block until all background database updates have completed.

Note that callers must ensure that's a store property created on the
testcase.
"""
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
Expand Down