Skip to content

Commit

Permalink
Broadcast server awareness to all clients (#73)
Browse files Browse the repository at this point in the history
* Add methods to modify the awareness server side, server being a regular user

* Server can subscribe to awareness changes (only those from the frontend)

* mypy

* Add tests

* ruff and mypy

* Avoid depending on nest_asyncio

* Use the YRoom task group for asyncrhonous call

* Remove the Awareness object in favor of a futur one in pycrdt

* Apply suggestions from code review

Co-authored-by: David Brochart <[email protected]>

* Log an error if the task group is not initialized

* Update pycrdt_websocket/yroom.py

Co-authored-by: David Brochart <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Remove the logger in the Awareness args

* Docstring

* Update to pycrdt 0.10 and observe the awareness changes to broadcast local changes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* mypy

* Apply suggestions

Co-authored-by: David Brochart <[email protected]>

* Broadcast the changes only if the state actually changes

* Apply suggestions from code review

Co-authored-by: David Brochart <[email protected]>

* Broadcast all updates

Co-authored-by: David Brochart <[email protected]>

* Update pyproject.toml

Co-authored-by: David Brochart <[email protected]>

---------

Co-authored-by: David Brochart <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: David Brochart <[email protected]>
  • Loading branch information
4 people authored Oct 9, 2024
1 parent 78daf4e commit 826446f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
39 changes: 36 additions & 3 deletions pycrdt_websocket/yroom.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from functools import partial
from inspect import isawaitable
from logging import Logger, getLogger
from typing import Awaitable, Callable
from typing import Any, Awaitable, Callable

from anyio import (
TASK_STATUS_IGNORED,
Expand All @@ -16,16 +16,17 @@
from anyio.abc import TaskGroup, TaskStatus
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pycrdt import (
Awareness,
Doc,
Subscription,
YMessageType,
YSyncMessageType,
create_awareness_message,
create_sync_message,
create_update_message,
handle_sync_message,
)

from .awareness import Awareness
from .websocket import Websocket
from .ystore import BaseYStore
from .yutils import put_updates
Expand Down Expand Up @@ -77,11 +78,12 @@ def __init__(
ydoc: An optional document for the room (a new one is created otherwise).
"""
self.ydoc = Doc() if ydoc is None else ydoc
self.awareness = Awareness(self.ydoc)
self.ready_event = Event()
self.ready = ready
self.ystore = ystore
self.log = log or getLogger(__name__)
self.awareness = Awareness(self.ydoc)
self.awareness.observe(self.send_server_awareness)
self.clients = set()
self._on_message = None
self.exception_handler = exception_handler
Expand Down Expand Up @@ -304,3 +306,34 @@ async def serve(self, websocket: Websocket):
self.clients.remove(websocket)
except Exception as exception:
self._handle_exception(exception)

def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any]) -> None:
"""
Callback to broadcast the server awareness to clients.
Arguments:
type: The change type.
changes: The awareness changes.
"""
if type != "update" or changes[1] != "local":
return

if self._task_group is not None:
updated_clients = [v for value in changes[0].values() for v in value]
state = self.awareness.encode_awareness_update(updated_clients)
message = create_awareness_message(state)
self._task_group.start_soon(self._send_server_awareness, message)
else:
self.log.error("Cannot broadcast server awareness: YRoom not started")

async def _send_server_awareness(self, state: bytes) -> None:
try:
async with create_task_group() as tg:
for client in self.clients:
self.log.debug(
"Sending awareness from server to client with endpoint: %s",
client.path,
)
tg.start_soon(client.send, state)
except Exception as e:
self.log.error("Error while broadcasting awareness changes: %s", e)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ classifiers = [
dependencies = [
"anyio >=3.6.2,<5",
"sqlite-anyio >=0.2.3,<0.3.0",
"pycrdt >=0.9.16,<0.10.0",
"pycrdt >=0.10.1,<0.11.0",
]

[project.optional-dependencies]
Expand Down

0 comments on commit 826446f

Please sign in to comment.