Skip to content

Commit

Permalink
Merge pull request #3212 from tloncorp/hm/resubscribe-ddos-prevention
Browse files Browse the repository at this point in the history
subscriptions: kick resub ddos protection
  • Loading branch information
arthyn authored Feb 22, 2024
2 parents 401bc2c + e8233e7 commit ccb391d
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 61 deletions.
116 changes: 86 additions & 30 deletions desk/app/channels.hoon
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
:: XX chat thread entries can no longer be edited. maybe fix before
:: release?
::
:: note: all subscriptions are handled by the subscriber library so
:: we can have resubscribe loop protection.
::
/- c=channels, g=groups, ha=hark
/- meta
/+ default-agent, verb, dbug, sparse, neg=negotiate
/+ utils=channel-utils, volume
/+ utils=channel-utils, volume, s=subscriber
:: performance, keep warm
/+ channel-json
::
Expand All @@ -24,7 +27,7 @@
|%
+$ card card:agent:gall
+$ current-state
$: %2
$: %3
=v-channels:c
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c) ::TODO vestigial, in groups-ui now, remove me
Expand All @@ -33,6 +36,8 @@
:: .pending-ref-edits: for migration, see also +poke %negotiate-notif
::
pending-ref-edits=(jug ship [=kind:c name=term])
:: delayed resubscribes
=^subs:s
==
--
=| current-state
Expand Down Expand Up @@ -83,8 +88,9 @@
++ on-arvo
|= [=wire sign=sign-arvo]
^- (quip card _this)
~& strange-diary-arvo+wire
`this
=^ cards state
abet:(arvo:cor wire sign)
[cards this]
--
|_ [=bowl:gall cards=(list card)]
++ abet [(flop cards) state]
Expand All @@ -99,30 +105,51 @@
::
++ safe-watch
|= [=wire =dock =path]
|= delay=?
^+ cor
?: (~(has by wex.bowl) wire dock) cor
(emit %pass wire %agent dock %watch path)
=^ caz=(list card) subs
(~(subscribe s [subs bowl]) wire dock path delay)
(emil caz)
::
++ load
|= =vase
|^ ^+ cor
=+ !<(old=versioned-state vase)
=? old ?=(%0 -.old) (state-0-to-1 old)
=? old ?=(%1 -.old) (state-1-to-2 old)
?> ?=(%2 -.old)
=? old ?=(%2 -.old) (state-2-to-3 old)
?> ?=(%3 -.old)
=. state old
inflate-io
::
+$ versioned-state $%(state-2 state-1 state-0)
+$ state-2 current-state
+$ versioned-state $%(state-3 state-2 state-1 state-0)
+$ state-3 current-state
::
+$ state-2
$: %2
=v-channels:c
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c) ::TODO vestigial, in groups-ui now, remove me
hidden-posts=(set id-post:c)
::
:: .pending-ref-edits: for migration, see also +poke %negotiate-notif
::
pending-ref-edits=(jug ship [=kind:c name=term])
==
+$ state-1
$: %1
v-channels=(map nest:c v-channel-1)
voc=(map [nest:c plan:c] (unit said:c))
pins=(list nest:c)
hidden-posts=(set id-post:c)
==
++ state-2-to-3
|= s=state-2
^- state-3
%= s - %3
pending-ref-edits [pending-ref-edits.s *^subs:^s]
==
++ v-channel-1
|^ ,[global local]
+$ global
Expand Down Expand Up @@ -242,6 +269,12 @@
:: happen. that way, local subs get established without issue.
inflate-io
::
++ unsubscribe
|= [=wire =dock]
^+ cor
=^ caz=(list card) subs
(~(unsubscribe s [subs bowl]) wire dock)
(emil caz)
++ inflate-io
:: initiate version negotiation with our own channels-server
::
Expand Down Expand Up @@ -285,17 +318,17 @@
=(wire path)
==
?: keep cor
(emit %pass pole %agent [sub-ship dude] %leave ~)
(unsubscribe pole [sub-ship dude])
::
:: watch all the subscriptions we expect to have
::
=. cor watch-groups
=. cor (watch-groups |)
::
=. cor
%+ roll
~(tap by v-channels)
|= [[=nest:c *] core=_cor]
ca-abet:ca-safe-sub:(ca-abed:ca-core:core nest)
ca-abet:(ca-safe-sub:(ca-abed:ca-core:core nest) |)
::
cor
::
Expand Down Expand Up @@ -411,7 +444,7 @@
|= [=nest:c =plan:c]
?. (~(has by v-channels) nest)
=/ wire (said-wire nest plan)
(safe-watch wire [ship.nest server] wire)
((safe-watch wire [ship.nest server] wire) |)
::TODO not guaranteed to resolve, we might have partial backlog
ca-abet:(ca-said:(ca-abed:ca-core nest) plan)
::
Expand Down Expand Up @@ -476,7 +509,7 @@
::
[%groups ~]
?+ -.sign !!
%kick watch-groups
%kick (watch-groups &)
%watch-ack
?~ p.sign
cor
Expand Down Expand Up @@ -545,6 +578,18 @@
``loob+!>((~(has by v-channels) kind.pole ship name.pole))
==
::
++ arvo
|= [=(pole knot) sign=sign-arvo]
^+ cor
?+ pole ~|(bad-arvo-take/pole !!)
[%~.~ %cancel-retry rest=*] cor
::
[%~.~ %retry rest=*]
=^ caz=(list card) subs
(~(handle-wakeup s [subs bowl]) pole)
(emil caz)
==
::
++ unreads
^- unreads:c
%- ~(gas by *unreads:c)
Expand All @@ -566,8 +611,14 @@
++ emit |=(=card ca-core(cor (^emit card)))
++ emil |=(caz=(list card) ca-core(cor (^emil caz)))
++ give |=(=gift:agent:gall ca-core(cor (^give gift)))
++ safe-watch |=([=wire =dock =path] ca-core(cor (^safe-watch +<)))
++ ca-perms ~(. perms:utils our.bowl now.bowl nest group.perm.perm.channel)
++ safe-watch
|= [=wire =dock =path]
|= delay=?
ca-core(cor ((^safe-watch wire dock path) delay))
++ unsubscribe
|= [=wire =dock]
ca-core(cor (^unsubscribe wire dock))
++ ca-abet
%_ cor
v-channels
Expand Down Expand Up @@ -608,7 +659,7 @@
=. last-read.remark.channel now.bowl
=. ca-core ca-give-unread
=. ca-core (ca-response %join group)
ca-safe-sub
(ca-safe-sub |)
::
:: handle an action from the client
::
Expand Down Expand Up @@ -687,19 +738,23 @@
(~(has by wex.bowl) [ca-sub-wire ship.nest dap.bowl])
::
++ ca-safe-sub
|= delay=?
?: ca-has-sub ca-core
?^ posts.channel ca-start-updates
?^ posts.channel (ca-start-updates delay)
=. load.net.channel |
%. delay
%^ safe-watch (weld ca-area /checkpoint) [ship.nest server]
?. =(our.bowl ship.nest)
=/ count ?:(=(%diary kind.nest) '20' '100')
/[kind.nest]/[name.nest]/checkpoint/before/[count]
/[kind.nest]/[name.nest]/checkpoint/time-range/(scot %da *@da)
::
++ ca-start-updates
|= delay=?
:: not most optimal time, should maintain last heard time instead
=/ tim=(unit time)
(bind (ram:on-v-posts:c posts.channel) head)
%. delay
%^ safe-watch ca-sub-wire [ship.nest server]
/[kind.nest]/[name.nest]/updates/(scot %da (fall tim *@da))
::
Expand All @@ -724,9 +779,9 @@
~
=/ =path /[kind.nest]/[name.nest]/create
=/ =wire (weld ca-area /create)
(emit %pass wire %agent [our.bowl server] %watch path)
((safe-watch wire [our.bowl server] path) |)
::
%kick ca-safe-sub
%kick (ca-safe-sub &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed creation" u.p.sign)
Expand All @@ -739,16 +794,15 @@
=+ !<(=update:c q.cage)
=. ca-core (ca-u-channels update)
=. ca-core ca-give-unread
=. ca-core
(emit %pass (weld ca-area /create) %agent [ship.nest server] %leave ~)
ca-safe-sub
=. ca-core (unsubscribe (weld ca-area /create) [ship.nest server])
(ca-safe-sub |)
==
::
++ ca-take-update
|= =sign:agent:gall
^+ ca-core
?+ -.sign ca-core
%kick ca-safe-sub
%kick (ca-safe-sub &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed subscription" u.p.sign)
Expand All @@ -767,7 +821,7 @@
^+ ca-core
?+ -.sign ca-core
:: only if kicked prematurely
%kick ?:(load.net.channel ca-core ca-safe-sub)
%kick ?:(load.net.channel ca-core (ca-safe-sub &))
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed partial checkpoint" u.p.sign)
Expand All @@ -786,7 +840,7 @@
^+ ca-core
?+ -.sign ca-core
:: only hit if kicked prematurely (we %leave after the first %fact)
%kick ca-sync-backlog
%kick (ca-sync-backlog &)
%watch-ack
?~ p.sign ca-core
%- (slog leaf+"{<dap.bowl>}: Failed backlog" u.p.sign)
Expand All @@ -805,11 +859,11 @@
^+ ca-core
=. load.net.channel &
=. ca-core (ca-apply-checkpoint chk &)
=. ca-core ca-start-updates
=. ca-core (ca-start-updates |)
=. ca-core (ca-fetch-contacts chk)
=. ca-core ca-sync-backlog
=. ca-core (ca-sync-backlog |)
=/ wire (weld ca-area /checkpoint)
(emit %pass wire %agent [ship.nest server] %leave ~)
(unsubscribe wire [ship.nest server])
::
++ ca-fetch-contacts
|= chk=u-checkpoint:c
Expand Down Expand Up @@ -848,8 +902,10 @@
ca-core
::
++ ca-sync-backlog
|= delay=?
=/ checkpoint-start (pry:on-v-posts:c posts.channel)
?~ checkpoint-start ca-core
%. delay
%^ safe-watch (weld ca-area /backlog) [ship.nest server]
%+ welp
/[kind.nest]/[name.nest]/checkpoint/time-range
Expand All @@ -860,7 +916,7 @@
|= chk=u-checkpoint:c
=. ca-core (ca-apply-checkpoint chk |)
=/ wire (weld ca-area /backlog)
(emit %pass wire %agent [ship.nest server] %leave ~)
(unsubscribe wire [ship.nest server])
::
++ ca-apply-logs
|= =log:c
Expand Down Expand Up @@ -1568,7 +1624,7 @@
++ ca-recheck
|= sects=(set sect:g)
:: if our read permissions restored, re-subscribe
?: (can-read:ca-perms our.bowl) ca-safe-sub
?: (can-read:ca-perms our.bowl) (ca-safe-sub |)
ca-core
::
:: assorted helpers
Expand All @@ -1578,7 +1634,7 @@
:: leave the subscription only
::
++ ca-simple-leave
(emit %pass ca-sub-wire %agent [ship.nest server] %leave ~)
(unsubscribe ca-sub-wire [ship.nest server])
::
:: Leave the subscription, tell people about it, and delete our local
:: state for the channel
Expand Down
Loading

0 comments on commit ccb391d

Please sign in to comment.