Skip to content

Commit

Permalink
Migrate to latest stores
Browse files Browse the repository at this point in the history
  • Loading branch information
hbcarlos committed Oct 9, 2023
1 parent 9365912 commit 014fc40
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 47 deletions.
13 changes: 10 additions & 3 deletions jupyter_collaboration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from jupyter_server.extension.application import ExtensionApp
from traitlets import Bool, Float, Type
from ypy_websocket.ystore import BaseYStore
from ypy_websocket.stores import BaseYStore

from .handlers import DocSessionHandler, YDocWebSocketHandler
from .loaders import FileLoaderMapping
Expand All @@ -22,6 +22,8 @@ class YDocExtension(ExtensionApp):
Enables Real Time Collaboration in JupyterLab
"""

_store: BaseYStore = None

disable_rtc = Bool(False, config=True, help="Whether to disable real time collaboration.")

file_poll_interval = Float(
Expand Down Expand Up @@ -80,10 +82,12 @@ def initialize_handlers(self):
for k, v in self.config.get(self.ystore_class.__name__, {}).items():
setattr(self.ystore_class, k, v)

# Instantiate the store
self._store = self.ystore_class(log=self.log)

self.ywebsocket_server = JupyterWebsocketServer(
rooms_ready=False,
auto_clean_rooms=False,
ystore_class=self.ystore_class,
log=self.log,
)

Expand All @@ -103,7 +107,7 @@ def initialize_handlers(self):
"document_cleanup_delay": self.document_cleanup_delay,
"document_save_delay": self.document_save_delay,
"file_loaders": self.file_loaders,
"ystore_class": self.ystore_class,
"store": self._store,
"ywebsocket_server": self.ywebsocket_server,
},
),
Expand All @@ -120,3 +124,6 @@ async def stop_extension(self):
],
timeout=3,
)

if self._store is not None and self._store.started.is_set():
self._store.stop()
19 changes: 13 additions & 6 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from jupyter_ydoc import ydocs as YDOCS
from tornado import web
from tornado.websocket import WebSocketHandler
from ypy_websocket.stores import BaseYStore
from ypy_websocket.websocket_server import YRoom
from ypy_websocket.ystore import BaseYStore
from ypy_websocket.yutils import YMessageType, write_var_uint

from .loaders import FileLoaderMapping
Expand Down Expand Up @@ -62,6 +62,15 @@ def create_task(self, aw):
task.add_done_callback(self._background_tasks.discard)

async def prepare(self):
# NOTE: Initialize in the ExtensionApp.start_extension once
# https://github.com/jupyter-server/jupyter_server/issues/1329
# is done.
# We are temporarily initializing the store here because `start``
# is an async function
if self._store is not None and not self._store.started.is_set():
await self._store.start()
await self._store.initialize()

if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()
Expand All @@ -84,15 +93,13 @@ async def prepare(self):
)

file = self._file_loaders[file_id]
updates_file_path = f".{file_type}:{file_id}.y"
ystore = self._ystore_class(path=updates_file_path, log=self.log)
self.room = DocumentRoom(
self._room_id,
file_format,
file_type,
file,
self.event_logger,
ystore,
self._store,
self.log,
self._document_save_delay,
)
Expand All @@ -111,15 +118,15 @@ def initialize(
self,
ywebsocket_server: JupyterWebsocketServer,
file_loaders: FileLoaderMapping,
ystore_class: type[BaseYStore],
store: BaseYStore,
document_cleanup_delay: float | None = 60.0,
document_save_delay: float | None = 1.0,
) -> None:
self._background_tasks = set()
# File ID manager cannot be passed as argument as the extension may load after this one
self._file_id_manager = self.settings["file_id_manager"]
self._file_loaders = file_loaders
self._ystore_class = ystore_class
self._store = store
self._cleanup_delay = document_cleanup_delay
self._document_save_delay = document_save_delay
self._websocket_server = ywebsocket_server
Expand Down
57 changes: 29 additions & 28 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

from jupyter_events import EventLogger
from jupyter_ydoc import ydocs as YDOCS
from ypy_websocket.stores import BaseYStore
from ypy_websocket.websocket_server import YRoom
from ypy_websocket.ystore import BaseYStore, YDocNotFound
from ypy_websocket.yutils import write_var_uint

from .loaders import FileLoader
Expand Down Expand Up @@ -104,36 +104,28 @@ async def initialize(self) -> None:
return

self.log.info("Initializing room %s", self._room_id)

model = await self._file.load_content(self._file_format, self._file_type, True)

async with self._update_lock:
# try to apply Y updates from the YStore for this document
read_from_source = True
if self.ystore is not None:
try:
await self.ystore.apply_updates(self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
pass
if self.ystore is not None and await self.ystore.exists(self._room_id):
# Load the content from the store
await self.ystore.apply_updates(self._room_id, self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)

if not read_from_source:
# if YStore updates and source file are out-of-sync, resync updates with source
if self._document.source != model["content"]:
# TODO: Delete document from the store.
self._emit(
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
)
Expand All @@ -142,17 +134,26 @@ async def initialize(self) -> None:
self._file.path,
self.ystore.__class__.__name__,
)
read_from_source = True

if read_from_source:
doc = await self.ystore.get(self._room_id)
await self.ystore.remove(self._room_id)
version = 0
if "version" in doc:
version = doc["version"] + 1

await self.ystore.create(self._room_id, version)
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)

else:
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
self.log.info(
"Content in room %s loaded from file %s", self._room_id, self._file.path
)
self._document.source = model["content"]

if self.ystore:
await self.ystore.encode_state_as_update(self.ydoc)
if self.ystore is not None:
await self.ystore.create(self._room_id, 0)
await self.ystore.encode_state_as_update(self._room_id, self.ydoc)

self._last_modified = model["last_modified"]
self._document.dirty = False
Expand Down
18 changes: 13 additions & 5 deletions jupyter_collaboration/stores.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.

from __future__ import annotations

from logging import Logger

from traitlets import Int, Unicode
from traitlets.config import LoggingConfigurable
from ypy_websocket.ystore import SQLiteYStore as _SQLiteYStore
from ypy_websocket.ystore import TempFileYStore as _TempFileYStore
from ypy_websocket.stores import FileYStore
from ypy_websocket.stores import SQLiteYStore as _SQLiteYStore


class TempFileYStore(_TempFileYStore):
prefix_dir = "jupyter_ystore_"
class TempFileYStore(FileYStore):
def __init__(self, log: Logger | None = None):
super().__init__(path=".jupyter_store", log=log)


class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # type: ignore
Expand All @@ -17,7 +22,7 @@ class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): #

class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMetaclass):
db_path = Unicode(
".jupyter_ystore.db",
".jupyter_store.db",
config=True,
help="""The path to the YStore database. Defaults to '.jupyter_ystore.db' in the current
directory.""",
Expand All @@ -30,3 +35,6 @@ class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMet
help="""The document time-to-live in seconds. Defaults to None (document history is never
cleared).""",
)

def __init__(self, log: Logger | None = None):
super().__init__(path=self.db_path, log=log)
3 changes: 0 additions & 3 deletions jupyter_collaboration/websocketserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from tornado.websocket import WebSocketHandler
from ypy_websocket.websocket_server import WebsocketServer, YRoom
from ypy_websocket.ystore import BaseYStore


class RoomNotFound(LookupError):
Expand All @@ -27,13 +26,11 @@ class JupyterWebsocketServer(WebsocketServer):

def __init__(
self,
ystore_class: BaseYStore,
rooms_ready: bool = True,
auto_clean_rooms: bool = True,
log: Logger | None = None,
):
super().__init__(rooms_ready, auto_clean_rooms, log)
self.ystore_class = ystore_class
self.ypatch_nb = 0
self.connected_users: dict[Any, Any] = {}
# Async loop is not yet ready at the object instantiation
Expand Down
8 changes: 6 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,20 @@ def rtc_create_SQLite_store(jp_serverapp):
setattr(SQLiteYStore, k, v)

async def _inner(type: str, path: str, content: str) -> DocumentRoom:
db = SQLiteYStore(path=f"{type}:{path}")
room_id = f"{type}:{path}"
db = SQLiteYStore()
await db.start()
await db.initialize()

if type == "notebook":
doc = YNotebook()
else:
doc = YUnicode()

doc.source = content
await db.encode_state_as_update(doc.ydoc)

await db.create(room_id, 0)
await db.encode_state_as_update(room_id, doc.ydoc)

return db

Expand Down

0 comments on commit 014fc40

Please sign in to comment.