Skip to content

Commit

Permalink
Migrated to new conn
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Nov 21, 2023
1 parent 0bff002 commit e9380e8
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 54 deletions.
77 changes: 43 additions & 34 deletions src/datascript/sync/client.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
(:require
[datascript.conn :as conn]
[datascript.db :as db]
[datascript.serialize :as serialize]))
[datascript.serialize :as serialize]
[datascript.util :as util]))

(defn client-id []
(long (* (rand) 0x1FFFFFFFFFFFFF)))
Expand All @@ -15,24 +16,25 @@

(defn on-tx [conn report]
(when-not (:server? (:tx-meta report))
(let [{:keys [client-id send-fn server-idx pending]} (meta conn)
(let [{:keys [client-id send-fn server-idx connected?]} @(:atom conn)
tx {:tx-data (db/tx-from-datoms (:tx-data report))
:tx-id (new-tx-id client-id)}]
(send-fn
{:message :transacting
:server-idx @server-idx
:txs [tx]})
(swap! pending conj tx))))
(when connected?
(send-fn
{:message :transacting
:server-idx server-idx
:txs [tx]}))
(swap! (:atom conn) update :pending conj tx))))

(defn create-conn [patterns send-fn]
(let [res (atom nil :meta
{:client-id (client-id)
:server-db (atom nil)
:pending (atom #?(:clj clojure.lang.PersistentQueue/EMPTY
:cljs cljs.core.PersistentQueue.EMPTY))
:server-idx (atom nil)
:send-fn send-fn
:listeners (atom {})})]
(let [res (@#'conn/make-conn
{:db nil
:client-id (client-id)
:server-db nil
:pending util/empty-queue
:server-idx nil
:connected? true
:send-fn send-fn})]
(send-fn {:message :catching-up})
res))

Expand All @@ -41,33 +43,40 @@
:catched-up
(let [{:keys [snapshot server-idx]} body
db (serialize/from-serializable snapshot)]
(reset! conn db)
(reset! (:server-db (meta conn)) db)
(reset! (:server-idx (meta conn)) server-idx)
(conn/listen! conn :sync #(on-tx conn %)))
(conn/reset-conn! conn db {:server? true})
(swap! (:atom conn)
(fn [atom]
(-> atom
(assoc :server-db db)
(assoc :server-idx server-idx)
(update :listeners assoc :sync #(on-tx conn %))))))

:transacted
(let [{:keys [tx-data tx-id server-idx]} body
{*server-db :server-db
*server-idx :server-idx
*pending :pending
*listeners :listeners} (meta conn)
report (conn/with @*server-db tx-data {:server? true})
{server-db :server-db
pending :pending
listeners :listeners} @(:atom conn)
report (conn/with server-db tx-data {:server? true})
server-db' (:db-after report)]
(reset! *server-db server-db')
(reset! *server-idx server-idx)
(if (= tx-id (:tx-id (peek @*pending)))
(swap! *pending pop)
(swap! (:atom conn) assoc
:server-db server-db'
:server-idx server-idx)
(if (= tx-id (:tx-id (peek pending)))
(swap! (:atom conn) update :pending pop)
(do
(reset! conn (reduce conn/db-with server-db' @*pending))
(doseq [[_ callback] @*listeners]
(reset! conn (reduce conn/db-with server-db' pending))
(doseq [[_ callback] listeners]
(callback report))))))
nil)

(defn server-disconnected [conn]
;; TODO impl me
)
(swap! (:atom conn)
#(-> %
(assoc :connected? false)
(update :listeners dissoc :sync))))

(defn server-connected [conn]
;; TODO impl me
)
(swap! (:atom conn) assoc :connected? true)
(let [{:keys [send-fn server-idx]} @(:atom conn)]
(send-fn {:message :catching-up
:server-idx server-idx})))
23 changes: 11 additions & 12 deletions src/datascript/sync/server.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
[datascript.serialize :as serialize]))

(defn- client [conn channel]
(get @(:clients (meta conn)) channel))
(-> conn :atom deref :clients (get channel)))

(defn on-tx [conn report]
(let [*clients (:clients (meta conn))
(let [clients (:clients @(:atom conn))
msg {:message :transacted
:tx-data (db/tx-from-datoms (:tx-data report))
:tx-id (:tx-id (:tx-meta report))
:server-idx (:db/current-tx (:tempids report))}]
(doseq [[channel {:keys [status send-fn pending]}] @*clients]
(doseq [[channel {:keys [status send-fn pending]}] clients]
(if (= :active status)
(do
(when pending
(doseq [msg pending]
(send-fn channel msg))
(swap! *clients update client dissoc :pending))
(swap! (:atom conn) update :clients update client dissoc :pending))
(send-fn channel msg))
(swap! *clients update client update :pending (fnil conj []) msg)))))
(swap! (:atom conn) update :clients update client update :pending (fnil conj []) msg)))))

(defn client-connected [conn channel send-fn]
(let [*clients (:clients (meta conn))
clients' (swap! *clients assoc channel
{:status :connected
:send-fn send-fn})]
(let [clients' (:clients
(swap! (:atom conn) update :clients assoc channel
{:status :connected
:send-fn send-fn}))]
(when (= 1 (count clients'))
(conn/listen! conn :sync #(on-tx conn %)))
nil))
Expand All @@ -47,7 +47,7 @@
{:message :catched-up
:snapshot (serialize/serializable db) ;; TODO patterns
:server-idx server-idx})
(swap! (:clients (meta conn)) update channel
(swap! (:atom conn) update :clients update channel
(fn [client]
(-> client
(assoc :status :active)
Expand All @@ -60,8 +60,7 @@
nil)

(defn client-disconnected [conn channel]
(let [*clients (:clients (meta conn))
clients' (swap! *clients dissoc channel)]
(let [clients' (:clients (swap! (:atom conn) update :clients dissoc channel))]
(when (= 0 (count clients'))
(conn/unlisten! conn :sync))
nil))
4 changes: 4 additions & 0 deletions src/datascript/util.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
`(when *debug*
(println ~@body)))

(def empty-queue
#?(:clj clojure.lang.PersistentQueue/EMPTY
:cljs cljs.core.PersistentQueue.EMPTY))

(defn- rand-bits [pow]
(rand-int (bit-shift-left 1 pow)))

Expand Down
20 changes: 12 additions & 8 deletions test/datascript/test/sync.clj
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,19 @@
:c2 c2}))

(defn wait-all [{:keys [server c1 c2]}]
(wait-on (:pending (meta c1)) empty?)
(wait-on (:pending (meta c2)) empty?)
(wait-on (:server-idx (meta c1)) #(= % (:max-tx @server)))
(wait-on (:server-idx (meta c2)) #(= % (:max-tx @server))))
(wait-on (:atom c1)
#(and
(empty? (:pending %))
(= (:server-idx %) (:max-tx @server))))
(wait-on (:atom c2)
#(and
(empty? (:pending %))
(= (:server-idx %) (:max-tx @server)))))

(deftest test-sync
(let [{:keys [server c1 c2] :as setup} (setup)]
(d/transact! c1 [[:db/add 1 :name "Ivan"]])
(wait-all setup)
(wait-all setup)
(is (= #{[1 :name "Ivan"]}
(tdc/all-datoms @c1)))
(is (= #{[1 :name "Ivan"]}
Expand All @@ -78,6 +82,6 @@
(tdc/all-datoms @server)))))


; (t/test-ns *ns*)
; (t/run-test-var #'test-conn)

(comment
(t/test-ns *ns*)
(t/run-test-var #'test-conn))

0 comments on commit e9380e8

Please sign in to comment.