-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlisteners.js
289 lines (261 loc) · 10.9 KB
/
listeners.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
// SPDX-FileCopyrightText: 2023 Mix Irving <[email protected]>
//
// SPDX-License-Identifier: LGPL-3.0-only
const {
where,
and,
or,
live: dbLive,
isDecrypted,
type,
toPullStream,
} = require('ssb-db2/operators')
const {
validator: {
group: {
addMember: isAddMember,
excludeMember: isExcludeMember,
initEpoch: isInitEpoch,
},
},
keySchemes,
} = require('private-group-spec')
const pull = require('pull-stream')
const paraMap = require('pull-paramap')
const clarify = require('clarify-error')
const Epochs = require('./lib/epochs')
const { reAddMembers, createNewEpoch } = require('./lib/exclude')
const hookClose = require('./lib/hook-close')
function randomTimeout(config) {
if (!config) throw new Error('Please give config')
const timeoutLow = config.tribes2?.timeoutLow ?? 5
const timeoutHigh = config.tribes2?.timeoutHigh ?? 30
const timeoutRandom = Math.random() * (timeoutHigh - timeoutLow) + timeoutLow
return timeoutRandom * 1000
}
module.exports = function startListeners(ssb, config, onError) {
hookClose(ssb)
let isClosed = false
hookClose.onClose(() => { isClosed = true })
const { getTipEpochs, getPreferredEpoch, getMembers } = Epochs(ssb)
ssb.metafeeds.findOrCreate((err, myRoot) => {
// prettier-ignore
if (err) return onError(clarify(err, 'Error getting own root in start()'))
// check if we've been excluded
pull(
ssb.db.query(
where(and(isDecrypted('box2'), type('group/exclude-member'))),
dbLive({ old: true }),
toPullStream()
),
pull.filter(isExcludeMember),
pull.filter((msg) =>
// it's an exclusion of us
msg.value.content.excludes.includes(myRoot.id)
),
pull.drain(
(msg) => {
const groupId = msg.value.content.recps[0]
getTipEpochs(groupId, (err, tipEpochs) => {
// prettier-ignore
if (err) return onError(clarify(err, 'Error on getting tip epochs after finding exclusion of ourselves'))
const excludeEpochRootId = msg.value.content.tangles.members.root
const excludeIsInTipEpoch = tipEpochs
.map((tip) => tip.id)
.includes(excludeEpochRootId)
// ignore the exclude if it's an old one (we were added back to the group later)
if (!excludeIsInTipEpoch) return
ssb.box2.excludeGroupInfo(groupId, (err) => {
// prettier-ignore
if (err) return onError(clarify(err, 'Error on excluding group info after finding exclusion of ourselves'))
})
})
},
(err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'Error on looking for exclude messages excluding us'))
}
)
)
// look for new epochs that we're added to
pull(
ssb.db.query(
where(and(isDecrypted('box2'), type('group/add-member'))),
dbLive({ old: true }),
toPullStream()
),
pull.filter(isAddMember),
// groups/epochs we're added to
pull.filter((msg) => {
return msg.value.content.recps.includes(myRoot.id)
}),
// to find new epochs we only check groups we've accepted the invite to
paraMap((msg, cb) => {
pull(
ssb.box2.listGroupIds(),
pull.filter((groupId) => groupId === msg.value.content.recps[0]),
pull.take(1),
pull.collect((err, groupIds) => {
// prettier-ignore
if (err) return cb(clarify(err, "Error getting groups we're already in when looking for new epochs"))
cb(null, groupIds.length ? msg : null)
})
)
}, 4),
pull.filter(Boolean),
pull.drain(
(msg) => {
const groupId = msg.value.content.recps[0]
const secret = Buffer.from(msg.value.content.secret, 'base64')
ssb.box2.addGroupInfo(groupId, { key: secret }, (err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'Cannot add new epoch key that we found'))
const newKeyPick = {
key: secret,
scheme: keySchemes.private_group,
}
// TODO: naively guessing that this is the latest key for now
ssb.box2.pickGroupWriteKey(groupId, newKeyPick, (err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'Error switching to new epoch key that we found'))
ssb.db.reindexEncrypted((err) => {
// prettier-ignore
if (err && !isClosed) onError(clarify(err, 'Error reindexing after finding new epoch'))
})
})
})
},
(err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, "Error finding new epochs we've been added to"))
}
)
)
// listen for new epochs and update groupInfo as required
pull(
ssb.db.query(
where(or(type('group/init'), type('group/add-member'))),
dbLive({ old: true }),
toPullStream()
),
pull.filter((msg) => isInitEpoch(msg) || isAddMember(msg)),
pull.map((msg) => msg.value.content.recps[0]),
pull.drain(
(groupId) => {
ssb.box2.getGroupInfo(groupId, (err, groupInfo) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'fatal error in live updating groupInfo'))
if (!groupInfo) return // group that we've not accepted an invite to yet
if (groupInfo.excluded) return // group where we were excluded
getPreferredEpoch(groupId, (err, epochInfo) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'fatal error getting preferred epoch'))
if (groupInfo.writeKey.key.equals(epochInfo.secret)) return
ssb.box2.addGroupInfo(
groupId,
{ key: epochInfo.secret },
(err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'Error adding new epoch key'))
const newKeyPick = {
key: epochInfo.secret,
scheme: keySchemes.private_group,
}
ssb.box2.pickGroupWriteKey(groupId, newKeyPick, (err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'Error picking group write key'))
ssb.db.reindexEncrypted((err) => {
// prettier-ignore
if (err && !isClosed) onError(clarify(err, 'Error reindexing after finding new epoch'))
})
})
}
)
})
})
},
(err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'Problem listening to new messages'))
}
)
)
// recover from half-finished excludeMembers() calls
const recoverExclude = config.tribes2?.recoverExclude ?? false
if (recoverExclude) {
pull(
ssb.tribes2.list({ live: true }),
pull.unique('id'),
pull.map((group) =>
pull(
getPreferredEpoch.stream(group.id, { live: true }),
pull.unique('id'),
pull.drain(
(preferredEpoch) => {
// re-add missing people to a new epoch if the epoch creator didn't add everyone but they added us.
// we're only doing this for the preferred epoch atm
const timeout = randomTimeout(config)
const timeoutId = setTimeout(() => {
reAddMembers(ssb, group.id, null, (err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'Failed re-adding members to epoch that missed some'))
})
}, timeout)
hookClose.onClose(() => clearTimeout(timeoutId))
// if we find an exclude and it's not excluding us but we don't find a new epoch, even after a while, then create a new epoch, since we assume that the excluder crashed or something
pull(
getMembers.stream(preferredEpoch.id, { live: true }),
pull.filter((members) => members.toExclude.length),
pull.take(1),
pull.drain(
() => {
const timeout = randomTimeout(config)
const timeoutId = setTimeout(() => {
ssb.tribes2.get(group.id, (err, group) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, "Couldn't get group info when checking for missing epochs to recover"))
// checking if we were one of the members who got excluded now, in that case we ignore this
if (group.excluded) return
getPreferredEpoch(
group.id,
(err, newPreferredEpoch) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, "Couldn't get preferred epoch when checking for missing epochs to recover"))
// if we've found a new epoch then we don't need to create one ourselves
if (preferredEpoch.id !== newPreferredEpoch.id)
return
createNewEpoch(ssb, group.id, null, (err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, "Couldn't create new epoch to recover from a missing one"))
})
}
)
})
}, timeout)
hookClose.onClose(() => clearTimeout(timeoutId))
},
(err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, "Couldn't get info on exclusion events when looking for epochs that fail to get created"))
}
)
)
},
(err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, "Failed finding new preferred epochs when looking for them to add missing members to or when checking if an epoch is missing"))
}
)
)
),
pull.drain(
() => {},
(err) => {
// prettier-ignore
if (err && !isClosed) return onError(clarify(err, 'Failed listing groups when trying to find missing epochs or epochs with missing members'))
}
)
)
}
})
}