Skip to content

Commit

Permalink
Merge pull request #33 from absolutelightning/lock-free
Browse files Browse the repository at this point in the history
Lock free and single event loop
  • Loading branch information
absolutelightning authored Jan 7, 2025
2 parents b0eb8fb + 79b899c commit 105d0cd
Show file tree
Hide file tree
Showing 11 changed files with 3 additions and 62 deletions.
8 changes: 0 additions & 8 deletions datastructures/hnsw/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"
"math/rand"
"sort"
"sync"
"time"

"github.com/absolutelightning/gods/queues/priorityqueue"
Expand All @@ -24,7 +23,6 @@ type HNSW struct {
LayerFactor float64 // Probability factor, but we won't use it with the current randomLevel
EfSearch int // number of candidates during search
DistFunc DistanceFunc // distance function
lock sync.Mutex // lock for thread-safe operations
EntryPoint *Node // top entry point into the graph
Rand *rand.Rand // random number generator
}
Expand Down Expand Up @@ -96,9 +94,6 @@ func (h *HNSW) randomLevel() int {

// Insert adds a new element `vector` into the HNSW graph.
func (h *HNSW) Insert(vector Vector) string {
h.lock.Lock()
defer h.lock.Unlock()

level := h.randomLevel()

// Create the new node
Expand Down Expand Up @@ -440,9 +435,6 @@ func min(a, b int) int {
}

func (h *HNSW) Delete(nodeID string) bool {
h.lock.Lock()
defer h.lock.Unlock()

if len(h.Layers) == 0 {
return false
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func main() {
log.Fatal(gnet.Run(
tredsServer,
"tcp://0.0.0.0:"+strconv.Itoa(tredsServer.Port),
gnet.WithMulticore(true),
// Single Event loop
gnet.WithMulticore(false),
gnet.WithReusePort(false),
gnet.WithTCPKeepAlive(300*time.Second),
))
Expand Down
2 changes: 0 additions & 2 deletions server/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ func executeDiscard() ExecutionHook {
return gnet.None
}

ts.LockClientTransaction()
defer ts.UnlockClientTransaction()
delete(ts.GetClientTransaction(), c.RemoteAddr().String())

res := "OK"
Expand Down
3 changes: 0 additions & 3 deletions server/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ func executeExec() ExecutionHook {
return gnet.None
}

ts.LockClientTransaction()
defer ts.UnlockClientTransaction()

clientTransaction, ok := ts.GetClientTransaction()[c.RemoteAddr().String()]
if !ok {
ts.RespondErr(c, fmt.Errorf("no transaction started"))
Expand Down
3 changes: 0 additions & 3 deletions server/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func executeMulti() ExecutionHook {
return gnet.None
}

ts.LockClientTransaction()
defer ts.UnlockClientTransaction()

ts.GetClientTransaction()[c.RemoteAddr().String()] = make([]string, 0)

res := "OK"
Expand Down
3 changes: 0 additions & 3 deletions server/psubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ func executePSubscribeCommand() ExecutionHook {
allChannels[channelPrefix] = struct{}{}
}

ts.LockChannelSubs()
defer ts.UnlockChannelSubs()

for channel := range allChannels {
prevData, ok := subscriptionData.Get([]byte(channel))
if !ok {
Expand Down
2 changes: 0 additions & 2 deletions server/punsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ func executePUnsubscribeCommand() ExecutionHook {
allChannels[channelPrefix] = struct{}{}
}

ts.LockChannelSubs()
defer ts.UnlockChannelSubs()
for channel := range allChannels {
prevData, ok := subscriptionData.Get([]byte(channel))
if !ok {
Expand Down
27 changes: 0 additions & 27 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"

wal "github.com/hashicorp/raft-wal"
Expand Down Expand Up @@ -39,10 +38,8 @@ type Server struct {
tredsCommandRegistry commands.CommandRegistry
tredsServerCommandRegistry ServerCommandRegistry
clientTransaction map[string][]string
clientTransactionLock *sync.Mutex

channelSubscriptionData *radix.Tree
channelSubscriptionLock *sync.Mutex
connectionSubscription map[string]map[string]struct{}

connectionMap map[string]gnet.Conn
Expand Down Expand Up @@ -206,10 +203,8 @@ func New(port, segmentSize int, bindAddr, advertiseAddr, serverId string, applyT
id: config.LocalID,
raftApplyTimeout: applyTimeout,
clientTransaction: make(map[string][]string),
clientTransactionLock: &sync.Mutex{},
connP: connPool.NewConnPool(time.Second * 5),
channelSubscriptionData: radix.New(),
channelSubscriptionLock: &sync.Mutex{},
connectionSubscription: make(map[string]map[string]struct{}),
connectionMap: make(map[string]gnet.Conn),
}, nil
Expand Down Expand Up @@ -276,22 +271,6 @@ func (ts *Server) GetRaftApplyTimeout() time.Duration {
return ts.raftApplyTimeout
}

func (ts *Server) LockClientTransaction() {
ts.clientTransactionLock.Lock()
}

func (ts *Server) UnlockClientTransaction() {
ts.clientTransactionLock.Unlock()
}

func (ts *Server) LockChannelSubs() {
ts.channelSubscriptionLock.Lock()
}

func (ts *Server) UnlockChannelSubs() {
ts.channelSubscriptionLock.Unlock()
}

func (ts *Server) SetChannelSubscriptionData(data *radix.Tree) {
ts.channelSubscriptionData = data
}
Expand Down Expand Up @@ -323,8 +302,6 @@ func (ts *Server) OnTraffic(c gnet.Conn) gnet.Action {

// Check for transaction first, if transaction just enqueue the command
if _, ok := ts.clientTransaction[c.RemoteAddr().String()]; ok {
ts.clientTransactionLock.Lock()
defer ts.clientTransactionLock.Unlock()
ts.clientTransaction[c.RemoteAddr().String()] = append(ts.clientTransaction[c.RemoteAddr().String()], inp)
res := "QUEUED"
_, errConn := c.Write([]byte(resp.EncodeSimpleString(res)))
Expand Down Expand Up @@ -428,14 +405,10 @@ func (ts *Server) OnClose(c gnet.Conn, _ error) gnet.Action {
}

func (ts *Server) CleanUpClientTransaction(c gnet.Conn) {
ts.clientTransactionLock.Lock()
defer ts.clientTransactionLock.Unlock()
delete(ts.clientTransaction, c.RemoteAddr().String())
}

func (ts *Server) CleanUpChannelSubscriptions(c gnet.Conn) {
ts.channelSubscriptionLock.Lock()
defer ts.channelSubscriptionLock.Unlock()
// use connectionSubscription map to delete all subscriptions for this connection
if _, ok := ts.connectionSubscription[c.RemoteAddr().String()]; ok {
for channel := range ts.connectionSubscription[c.RemoteAddr().String()] {
Expand Down
2 changes: 0 additions & 2 deletions server/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ func executeSubscribeCommandName() ExecutionHook {
allChannels[channel] = struct{}{}
}

ts.LockChannelSubs()
defer ts.UnlockChannelSubs()
for channel := range allChannels {
prevData, ok := subscriptionData.Get([]byte(channel))
if !ok {
Expand Down
10 changes: 1 addition & 9 deletions server/treds_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"log"
"strings"
"sync"
"time"

"github.com/hashicorp/raft"
Expand All @@ -22,7 +21,6 @@ type TredsFsm struct {
cmdRegistry commands.CommandRegistry
tredsStore store.Store
conn gnet.Conn
storeLock *sync.Mutex
}

func (t *TredsFsm) Apply(log *raft.Log) interface{} {
Expand All @@ -35,10 +33,6 @@ func (t *TredsFsm) Apply(log *raft.Log) interface{} {
if err != nil {
return err
}
if commandReg.IsWrite {
t.storeLock.Lock()
defer t.storeLock.Unlock()
}
currentStore := t.tredsStore
if currentStore != nil {
return commandReg.Execute(args, currentStore)
Expand Down Expand Up @@ -85,8 +79,6 @@ func (t *TredsFsm) Restore(old io.ReadCloser) error {
return err
}
ts := store.NewTredsStore()
t.storeLock.Lock()
defer t.storeLock.Unlock()
err = ts.Restore(data)
t.tredsStore = ts
if err != nil {
Expand All @@ -96,5 +88,5 @@ func (t *TredsFsm) Restore(old io.ReadCloser) error {
}

func NewTredsFsm(registry commands.CommandRegistry, store store.Store) *TredsFsm {
return &TredsFsm{cmdRegistry: registry, tredsStore: store, storeLock: &sync.Mutex{}}
return &TredsFsm{cmdRegistry: registry, tredsStore: store}
}
2 changes: 0 additions & 2 deletions server/unsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func executeUnsubscribeCommand() ExecutionHook {

subscriptionData := ts.GetChannelSubscriptionData()

ts.LockChannelSubs()
defer ts.UnlockChannelSubs()
for _, channel := range args {
prevData, ok := subscriptionData.Get([]byte(channel))
if !ok {
Expand Down

0 comments on commit 105d0cd

Please sign in to comment.