Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Store arbitrary relations from events #11391

Merged
merged 12 commits into from
Nov 22, 2021
1 change: 1 addition & 0 deletions changelog.d/11391.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Store and allow querying of arbitrary event relations.
27 changes: 13 additions & 14 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
richvdh marked this conversation as resolved.
Show resolved Hide resolved
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1696,34 +1696,33 @@ def non_null_str_or_none(val: Any) -> Optional[str]:
},
)

def _handle_event_relations(self, txn, event):
"""Handles inserting relation data during peristence of events
def _handle_event_relations(
self, txn: LoggingTransaction, event: EventBase
) -> None:
"""Handles inserting relation data during persistence of events

Args:
txn
event (EventBase)
txn: The current database transaction.
event: The event which might have relations.
"""
relation = event.content.get("m.relates_to")
if not relation:
# No relations
return

# Relations must have a type and parent event ID.
rel_type = relation.get("rel_type")
if rel_type not in (
RelationTypes.ANNOTATION,
RelationTypes.REFERENCE,
RelationTypes.REPLACE,
RelationTypes.THREAD,
):
# Unknown relation type
if not rel_type:
clokep marked this conversation as resolved.
Show resolved Hide resolved
return

parent_id = relation.get("event_id")
if not parent_id:
clokep marked this conversation as resolved.
Show resolved Hide resolved
# Invalid relation
return

aggregation_key = relation.get("key")
# Annotations have a key field.
aggregation_key = None
if rel_type == RelationTypes.ANNOTATION:
aggregation_key = relation.get("key")

self.db_pool.simple_insert_txn(
txn,
Expand Down
44 changes: 29 additions & 15 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@

import attr

from synapse.api.constants import EventContentFields, RelationTypes
from synapse.api.constants import EventContentFields
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
Expand Down Expand Up @@ -171,8 +171,11 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
self._purged_chain_cover_index,
)

self.db_pool.updates.register_noop_background_update("event_thread_relation")
clokep marked this conversation as resolved.
Show resolved Hide resolved

self.db_pool.updates.register_background_update_handler(
"event_thread_relation", self._event_thread_relation
"event_arbitrary_relations",
self._event_arbitrary_relations,
)

################################################################################
Expand Down Expand Up @@ -1099,11 +1102,16 @@ def purged_chain_cover_txn(txn) -> int:

return result

async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int:
"""Background update handler which will store thread relations for existing events."""
async def _event_arbitrary_relations(
self, progress: JsonDict, batch_size: int
) -> int:
"""Background update handler which will store previously unknown relations for existing events."""
last_event_id = progress.get("last_event_id", "")

def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
# Iterate over events which do not appear in the event_relations
clokep marked this conversation as resolved.
Show resolved Hide resolved
# table -- they might have a relation that was not previously stored
# due to an unknown relation type.
txn.execute(
"""
SELECT event_id, json FROM event_json
Expand All @@ -1115,7 +1123,7 @@ def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
)

results = list(txn)
missing_thread_relations = []
missing_relations = []
clokep marked this conversation as resolved.
Show resolved Hide resolved
for (event_id, event_json_raw) in results:
try:
event_json = db_to_json(event_json_raw)
Expand All @@ -1127,19 +1135,23 @@ def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
)
continue

# If there's no relation (or it is not a thread), skip!
# If there's no relation, skip!
relates_to = event_json["content"].get("m.relates_to")
if not relates_to or not isinstance(relates_to, dict):
continue
if relates_to.get("rel_type") != RelationTypes.THREAD:

# The only expected relation type would be from threads, but
# there could be other unknown ones.
rel_type = relates_to.get("rel_type")
if not isinstance(rel_type, str):
continue

# Get the parent ID.
parent_id = relates_to.get("event_id")
if not isinstance(parent_id, str):
continue

missing_thread_relations.append((event_id, parent_id))
missing_relations.append((event_id, parent_id, rel_type))

# Insert the missing data.
self.db_pool.simple_insert_many_txn(
Expand All @@ -1149,26 +1161,28 @@ def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
{
"event_id": event_id,
"relates_to_Id": parent_id,
clokep marked this conversation as resolved.
Show resolved Hide resolved
"relation_type": RelationTypes.THREAD,
"relation_type": rel_type,
}
for event_id, parent_id in missing_thread_relations
for event_id, parent_id, rel_type in missing_relations
],
)

if results:
latest_event_id = results[-1][0]
self.db_pool.updates._background_update_progress_txn(
txn, "event_thread_relation", {"last_event_id": latest_event_id}
txn, "event_arbitrary_relations", {"last_event_id": latest_event_id}
)

return len(results)

num_rows = await self.db_pool.runInteraction(
desc="event_thread_relation", func=_event_thread_relation_txn
desc="event_arbitrary_relations", func=_event_arbitrary_relations_txn
)

if not num_rows:
await self.db_pool.updates._end_background_update("event_thread_relation")
await self.db_pool.updates._end_background_update(
"event_arbitrary_relations"
)

return num_rows

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

-- Check old events for thread relations.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6502, 'event_thread_relation', '{}');
(6507, 'event_arbitrary_relations', '{}');
47 changes: 47 additions & 0 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2019 New Vector Ltd
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -765,6 +766,52 @@ def test_aggregations_redaction_prevents_access_to_aggregations(self):
self.assertIn("chunk", channel.json_body)
self.assertEquals(channel.json_body["chunk"], [])

def test_unknown_relations(self):
"""Unknown relations should be accepted."""
channel = self._send_relation("m.relation.test", "m.room.test")
self.assertEquals(200, channel.code, channel.json_body)
event_id = channel.json_body["event_id"]

channel = self.make_request(
"GET",
"/_matrix/client/unstable/rooms/%s/relations/%s?limit=1"
% (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)

# We expect to get back a single pagination result, which is the full
# relation event we sent above.
self.assertEquals(len(channel.json_body["chunk"]), 1, channel.json_body)
self.assert_dict(
{"event_id": event_id, "sender": self.user_id, "type": "m.room.test"},
channel.json_body["chunk"][0],
)

# We also expect to get the original event (the id of which is self.parent_id)
self.assertEquals(
channel.json_body["original_event"]["event_id"], self.parent_id
)

# When bundling the unknown relation is not included.
channel = self.make_request(
"GET",
"/rooms/%s/event/%s" % (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertNotIn("m.relations", channel.json_body["unsigned"])

# But unknown relations can be directly queried.
channel = self.make_request(
"GET",
"/_matrix/client/unstable/rooms/%s/aggregations/%s?limit=1"
% (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertEquals(channel.json_body["chunk"], [])

def _send_relation(
self,
relation_type: str,
Expand Down