The g-set workload is almost the same as
the broadcast problem: we want to keep track of a shared, constantly growing
set of elements. Clients issue add
requests to add an element to the set, and
read
requests to get all current elements. To do this, we're going to build a
state-based CRDT called a G-Set.
In the previous chapter we built a broadcast
system which replicates messages between all nodes. Each node maintained a set
of broadcasted elements, and new elements were incrementally added to that set
via broadcast
messages. To ensure that every message made it to every node,
we retried broadcasts endlessly until every neighbor had acknowledged the
request.
This raises a few issues. First, we're doing an awful lot of book-keeping on a per-message basis--keeping track of exactly who needs to receive what updates. Second, our implementation spawns a thread for each request, which means with ten thousand incomplete broadcast requests, we have ten thousand threads, each spamming the network with messages. Could we be more efficient?
If our set of elements is reasonably small, such that it fits in a single message, we could do something even more aggressive. Instead of trying to work out who needs what elements when, we could simply send our entire set to our neighbors periodically. We'll deliver the same elements multiple times, but that's fine: set union is idempotent.
Let's begin, then, by adding a small task scheduler to node.rb
. Something
that can handle the periodic broadcast of our state to our neighbors. We'll
keep track of a list of tasks we'd like to run once the node starts, and, once we get an init
message, spawn threads to run those tasks.
def initialize
@node_id = nil
@node_ids = nil
@next_msg_id = 0
@handlers = {}
@callbacks = {}
@periodic_tasks = [] # New!
@lock = Monitor.new
@log_lock = Mutex.new
# Register an initial handler for the init message
on "init" do |msg|
# Set our node ID and IDs
@node_id = msg[:body][:node_id]
@node_ids = msg[:body][:node_ids]
reply! msg, type: "init_ok"
log "Node #{@node_id} initialized"
# Spawn periodic task handlers
start_periodic_tasks!
end
end
We'll add a small method which captures a callback and time interval to the periodic task list:
# Periodically evaluates block every dt seconds with the node lock
# held--helpful for building periodic replication tasks, timeouts, etc.
def every(dt, &block)
@periodic_tasks << {dt: dt, f: block}
end
And to start the tasks, we'll run through each task and start a thread for it.
# Launches threads to process periodic handlers
def start_periodic_tasks!
@periodic_tasks.each do |task|
Thread.new do
loop do
task[:f].call
sleep task[:dt]
end
end
end
end
Now let's create a new file for our G-Set node: g_set.rb
. We'll keep track of
our elements in @set
, and return its value on reads. For adds, we'll merge
the added value into @set
.
class GSetServer
attr_reader :node
def initialize
@node = Node.new
@lock = Mutex.new
@set = Set.new
@node.on "read" do |msg|
@lock.synchronize do
@node.reply! msg, type: "read_ok", value: @set.to_a
end
end
@node.on "add" do |msg|
@lock.synchronize do
@set.add msg[:body][:element]
end
@node.reply! msg, type: "add_ok"
end
end
end
GSetServer.new.node.main!
We haven't done any kind of inter-node replication yet, but let's see what
happens anyway. We'll have Maelstrom use the g-set
workload on this binary. In particular, we're looking to make sure that reads and writes can actually execute, and that some values get returned:
$ ./maelstrom test -w g-set --bin g_set.rb
...
INFO [2021-02-25 10:48:13,039] jepsen worker 3 - jepsen.util 3 :invoke :add 0
INFO [2021-02-25 10:48:13,042] jepsen worker 3 - jepsen.util 3 :ok :add 0
INFO [2021-02-25 10:48:13,312] jepsen worker 3 - jepsen.util 3 :invoke :read nil
INFO [2021-02-25 10:48:13,315] jepsen worker 3 - jepsen.util 3 :ok :read [0]
...
:lost-count 119,
So we lost lots of values (by failing to make them available on all nodes), but at least local node state is being updated correctly. Now let's figure out how to replicate state back and forth.
We'll invent a new type of message: replicate
, which will carry the full
value of the set with it. We won't bother with acknowledgement: if a message is
lost, later attempts will also include all its elements. When we receive a
replicate
message, we'll take the union of our current set and the sending
node's elements.
@node.on "replicate" do |msg|
@lock.synchronize do
@set |= msg[:body][:value]
end
end
Every five seconds, we'll have our node broadcast its state to all other nodes. Like the broadcast protocol, we could use a more complex topology here to be more efficient, but this is a good place to start.
@node.every 5 do
set = @lock.synchronize do
@set.to_a
end
@node.log "Replicating current set #{set}"
@node.node_ids.each do |n|
# Don't replicate to self
unless n == @node.node_id
@node.send! n, type: "replicate", value: set
end
end
end
Let's see how that goes:
$ ./maelstrom test -w g-set --bin g_set.rb --time-limit 10
...
:valid? true,
:lost-count 0,
:lost (),
:stable-count 19,
:stale-count 17,
:stale (0 1 2 3 4 5 6 7 8 11 12 13 14 15 16 17 18),
:never-read-count 0,
:stable-latencies {0 0,
0.5 2533,
0.95 8083,
0.99 8083,
1 8083},
:attempt-count 19,
:never-read (),
:duplicated {}},
:valid? true}
Everything looks good! ヽ(‘ー`)ノ
Hey, look at that! No writes lost! And check out the network statistics:
:net {:stats {:all {:send-count 184,
:recv-count 184,
:msg-count 184,
:msgs-per-op 3.9148936},
:clients {:send-count 104,
:recv-count 104,
:msg-count 104},
:servers {:send-count 80,
:recv-count 80,
:msg-count 80,
:msgs-per-op 1.7021277}},
80 inter-server messages to accomplish 19 writes. This is already better than the best broadcast system we designed in the previous chapter, in terms of message count. Note that we're actually running for 20 seconds here: ten seconds of mixed reads and writes, plus a ten second quiet period. That's four replication rounds, times five nodes, times four messages each.
What happens if we ramp up the request rate?
$ ./maelstrom test -w g-set --bin g_set.rb --time-limit 10 --rate 100
...
:net {:stats {:all {:send-count 2136,
:recv-count 2136,
:msg-count 2136,
:msgs-per-op 2.1085885},
:clients {:send-count 2036,
:recv-count 2036,
:msg-count 2036},
:servers {:send-count 100,
:recv-count 100,
:msg-count 100,
:msgs-per-op 0.09871668}},
The broadcast system we built exchanged messages linear in the number of broadcast requests. This periodic replication approach has a constant number of messages per second--though of course the messages themselves grow over time.
The downside, of course, is the latency: our extremely-asynchronous replication mechanism leads to high latencies: roughly 2.5 seconds median, and as high as five seconds in extreme cases. (Maelstrom reports latencies as high as ten, but that's a measurement artifact caused by the quiet period at the end of the test, where no reads are performed.) We could mitigate this latency by exchanging messages more frequently, or on every new write, like the broadcast systems we built earlier.
What about network partitions? Can we still make progress when the network is partitioned? Are writes lost?
$ ./maelstrom test -w g-set --bin g_set.rb --time-limit 30 --rate 10 --nemesis partition
...
Everything looks good! ヽ(‘ー`)ノ
This system delivers total availability: even when nodes are partitioned away from the network, reads and writes can still take place. There's no impact on individual request latency!
Nothing we've done here is all that specific to sets, really: it could work for any kind of CRDT. Let's see if we can pull apart the replication system from the set data structure itself. What would such a G-set look like? For starters, it'd have a set somewhere inside it:
class GSet
attr_reader :set
def initialize(set = Set.new)
@set = set
end
end
We need a way to serialize and un-serialize this set to a JSON-compatible
representation. We did this implicitly in our replicate
handlers before, but
now we'll need to be explicit: our sets are represented as JSON arrays.
class GSet
...
# Returns a new GSet from a JSON representation
def from_json(json_array)
GSet.new json_array.to_set
end
# Turns a GSet into a JSON representation: an array.
def to_json
@set.to_a
end
end
We want a way to render the G-set so that clients can read it. Here, too, we use an array.
# Returns a JSON-serializable representation of the effective value of this
# GSet.
def read
@set.to_a
end
We'll also want a way to merge two GSets together, producing a new GSet.
# Merging two GSets means taking the set union of their sets.
def merge(other)
GSet.new @set.merge(other.set)
end
Finally, we'll want a way to add individual elements to our sets.
# Returns a new GSet with this element added.
def add(element)
GSet.new(@set | [element])
end
Now, let's replace our GSetServer
's set with an empty GSet.
class GSetServer
attr_reader :node
def initialize
@node = Node.new
@lock = Mutex.new
@crdt = GSet.new
...
In each of our handlers, we'll replace direct set operations with the generic
versions on @crdt
. Since GSet is immutable, we can also remove some
of our defensive locking.
@node.on "read" do |msg|
@node.reply! msg, type: "read_ok", value: @crdt.read
end
@node.on "add" do |msg|
@lock.synchronize do
@crdt = @crdt.add msg[:body][:element]
end
@node.reply! msg, type: "add_ok"
end
@node.on "replicate" do |msg|
other = @crdt.from_json(msg[:body][:value])
@lock.synchronize do
@crdt = @crdt.merge(other)
end
end
@node.every 5 do
@node.log "Replicating current value #{@crdt.to_json}"
@node.node_ids.each do |n|
# Don't replicate to self
unless n == @node.node_id
@node.send! n, type: "replicate", value: @crdt.to_json
end
end
end
We can double-check that this behaves identically to the hardcoded set:
$ ./maelstrom test -w g-set --bin g_set.rb --time-limit 20 --rate 10
...
:stable-latencies {0 0,
0.5 2233,
0.95 4642,
0.99 4844,
1 4844},
:attempt-count 97,
:never-read (),
:duplicated {}},
:valid? true}
Everything looks good! ヽ(‘ー`)ノ
Excellent! Now that we have a general-purpose CRDT server, we can implement support for all kinds of datatypes. Let's try a counter.