Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Florio committed May 23, 2021
0 parents commit d5cef24
Show file tree
Hide file tree
Showing 16 changed files with 861 additions and 0 deletions.
18 changes: 18 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/

# Personal
notes.md
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# CRDT
Go implementation of some [CRDTs](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)

## WIP
This is still a work in progress in the early stage. A lot can (and probably) will change. Currently, the implemented CRDTs are:
- GCounter: grow only counter
- GSet: grow only set
- Counter: a regular counter (increase and decrease)

The next in my pipeline are:
- Set: a regular set
- Register: a register (key-value pair)
- Dictionary: map of key-value pair

A How-to will follow as soon as I am satisfied with the initial implementation.
168 changes: 168 additions & 0 deletions counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package main

import (
"sync"
"time"

"github.com/google/uuid"

"github.com/elleFlorio/crdt/network"
)

type counter struct {
id uint32
replica string
stateInc map[string]int64
stateDec map[string]int64
bufferInc map[string]int64
bufferDec map[string]int64
bufLock sync.Mutex
net network.Overlay
ch chan network.Message
}

func NewCounter(net network.Overlay, syncTimeMs int) *counter {
return newCounter(nil, net, syncTimeMs)
}

func ConnectCounter(id uint32, net network.Overlay, syncTimeMs int) *counter {
return newCounter(&id, net, syncTimeMs)
}

func newCounter(id *uint32, net network.Overlay, syncTimeMs int) *counter {
var cntId uint32
if id == nil {
cntId = uuid.New().ID()
} else {
cntId = *id
}
cntChan := make(chan network.Message, 10)
cntStateInc := make(map[string]int64)
cntStateDec := make(map[string]int64)
cntBufInc := make(map[string]int64)
cntBufDec := make(map[string]int64)
cnt := &counter{
id: cntId,
replica: net.GetLocalAddr(),
stateInc: cntStateInc,
stateDec: cntStateDec,
bufferInc: cntBufInc,
bufferDec: cntBufDec,
net: net,
ch: cntChan,
}

net.Listen(cntChan)
cnt.listen()
cnt.synchronize(syncTimeMs)

return cnt
}

func (c *counter) listen() {
go func() {
for msg := range c.ch {
if msg.Id == c.id {
received := msg.Payload.(counterDState)
dState := c.getDelta(received)
c.store(dState)
}
}
}()
}

func (c *counter) store(dState counterDState) {
c.bufLock.Lock()
if dState.DStateInc > 0 {
c.stateInc[dState.Replica] = dState.DStateInc
c.bufferInc[dState.Replica] = dState.DStateInc
}

if dState.DStateDec > 0 {
c.stateDec[dState.Replica] = dState.DStateDec
c.bufferDec[dState.Replica] = dState.DStateDec
}

c.bufLock.Unlock()
}

func (c *counter) getDelta(dState counterDState) counterDState {
// No need to compute the minimum delta
return dState
}

func (c *counter) synchronize(intevalMs int) {
ticker := time.NewTicker(time.Duration(intevalMs) * time.Millisecond)

go func() {
for range ticker.C {
replicas := c.net.GetNodes()
dStates := make(map[string]*counterDState)

c.bufLock.Lock()
for _, replica := range replicas {

for replicaBuf, dStateInc := range c.bufferInc {
if replica != replicaBuf && replica != c.replica {
if dState, ok := dStates[replicaBuf]; ok {
dState.DStateInc = dStateInc
} else {
dStates[replica] = &counterDState{replicaBuf, dStateInc, 0}
}
}
}

for replicaBuf, dStateDec := range c.bufferDec {
if replica != replicaBuf && replica != c.replica {
if dState, ok := dStates[replicaBuf]; ok {
dState.DStateDec = dStateDec
} else {
dStates[replica] = &counterDState{replicaBuf, 0, dStateDec}
}
}
}
}

for replica, payload := range dStates {
msg := network.Message{
Id: c.id,
Payload: *payload,
}
c.net.Send(msg, replica)
}

c.bufferInc = make(map[string]int64)
c.bufferDec = make(map[string]int64)
c.bufLock.Unlock()
}

}()
}

func (c *counter) Id() uint32 {
return c.id
}

func (c *counter) Inc() {
delta := c.stateInc[c.replica] + 1
dState := counterDState{c.replica, delta, 0}
c.store(dState)
}

func (c *counter) Dec() {
delta := c.stateDec[c.replica] + 1
dState := counterDState{c.replica, 0, delta}
c.store(dState)
}

func (c *counter) Read() int64 {
var total int64 = 0
for _, inc := range c.stateInc {
total += inc
}
for _, dec := range c.stateDec {
total -= dec
}

return total
}
81 changes: 81 additions & 0 deletions counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"testing"
"time"

"github.com/elleFlorio/crdt/network"
)

func TestCounter(t *testing.T) {
var state1 int64 = 0
var state2 int64 = 0

node1 := network.CreateLocalNode()
node2 := network.CreateLocalNode()

counter1 := NewCounter(node1, 10)
counter2 := ConnectCounter(counter1.Id(), node2, 10)

counter1.Inc()
counter1.Inc()

time.Sleep(time.Duration(20) * time.Millisecond)

state1 = counter1.Read()
state2 = counter2.Read()

if state1 != 2 {
t.Fatalf("Reference state is not correct: State %d - Expected %d", state1, 2)
}

if state1 != state2 {
t.Fatalf("States are not equals: State1 %d - State2 %d", state1, state2)
}

counter2.Inc()

time.Sleep(time.Duration(20) * time.Millisecond)

state1 = counter1.Read()
state2 = counter2.Read()

if state2 != 3 {
t.Fatalf("Reference state is not correct: State %d - Expected %d", state2, 3)
}

if state1 != state2 {
t.Fatalf("States are not equals: State1 %d - State2 %d", state1, state2)
}

counter1.Dec()
counter1.Dec()

time.Sleep(time.Duration(20) * time.Millisecond)

state1 = counter1.Read()
state2 = counter2.Read()

if state1 != 1 {
t.Fatalf("Reference state is not correct: State %d - Expected %d", state1, 1)
}

if state1 != state2 {
t.Fatalf("States are not equals: State1 %d - State2 %d", state1, state2)
}

counter2.Dec()

time.Sleep(time.Duration(20) * time.Millisecond)

state1 = counter1.Read()
state2 = counter2.Read()

if state2 != 0 {
t.Fatalf("Reference state is not correct: State %d - Expected %d", state2, 0)
}

if state1 != state2 {
t.Fatalf("States are not equals: State1 %d - State2 %d", state1, state2)
}
}
17 changes: 17 additions & 0 deletions dstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

type gCounterDState struct {
Replica string
DState uint64
}

type gSetDState struct {
Replica string
DState []GSetElement
}

type counterDState struct {
Replica string
DStateInc int64
DStateDec int64
}
Loading

0 comments on commit d5cef24

Please sign in to comment.