Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
  • Loading branch information
HDT3213 committed May 9, 2021
1 parent 721d9c3 commit 6617272
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 129 deletions.
12 changes: 6 additions & 6 deletions cluster/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/jolestar/go-commons-pool/v2"
)

type ConnectionFactory struct {
type connectionFactory struct {
Peer string
}

func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
c, err := client.MakeClient(f.Peer)
if err != nil {
return nil, err
Expand All @@ -26,7 +26,7 @@ func (f *ConnectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject,
return pool.NewPooledObject(c), nil
}

func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
c, ok := object.Object.(*client.Client)
if !ok {
return errors.New("type mismatch")
Expand All @@ -35,17 +35,17 @@ func (f *ConnectionFactory) DestroyObject(ctx context.Context, object *pool.Pool
return nil
}

func (f *ConnectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
// do validate
return true
}

func (f *ConnectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
// do activate
return nil
}

func (f *ConnectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
// do passivate
return nil
}
23 changes: 15 additions & 8 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package cluster provides a server side cluster which is transparent to client. You can connect to any node in the cluster to access all data in the cluster
package cluster

import (
Expand All @@ -16,6 +17,8 @@ import (
"strings"
)

// Cluster represents a node of godis cluster
// it holds part of data and coordinates other nodes to finish transactions
type Cluster struct {
self string

Expand All @@ -37,7 +40,7 @@ const (
// if only one node involved in a transaction, just execute the command don't apply tcc procedure
var allowFastTransaction = true

// start current processing as a node of cluster
// MakeCluster creates and starts a node of cluster
func MakeCluster() *Cluster {
cluster := &Cluster{
self: config.Properties.Self,
Expand All @@ -62,7 +65,7 @@ func MakeCluster() *Cluster {
cluster.peerPicker.AddNode(nodes...)
ctx := context.Background()
for _, peer := range config.Properties.Peers {
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &ConnectionFactory{
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
Peer: peer,
})
}
Expand All @@ -73,11 +76,12 @@ func MakeCluster() *Cluster {
// CmdFunc represents the handler of a redis command
type CmdFunc func(cluster *Cluster, c redis.Connection, cmdAndArgs [][]byte) redis.Reply

// Close stops current node of cluster
func (cluster *Cluster) Close() {
cluster.db.Close()
}

var router = MakeRouter()
var router = makeRouter()

func isAuthenticated(c redis.Connection) bool {
if config.Properties.RequirePass == "" {
Expand All @@ -86,16 +90,17 @@ func isAuthenticated(c redis.Connection) bool {
return c.GetPassword() == config.Properties.RequirePass
}

func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Reply) {
// Exec executes command on cluster
func (cluster *Cluster) Exec(c redis.Connection, cmdArgs [][]byte) (result redis.Reply) {
defer func() {
if err := recover(); err != nil {
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
result = &reply.UnknownErrReply{}
}
}()
cmd := strings.ToLower(string(args[0]))
cmd := strings.ToLower(string(cmdArgs[0]))
if cmd == "auth" {
return godis.Auth(cluster.db, c, args[1:])
return godis.Auth(cluster.db, c, cmdArgs[1:])
}
if !isAuthenticated(c) {
return reply.MakeErrReply("NOAUTH Authentication required")
Expand All @@ -104,14 +109,16 @@ func (cluster *Cluster) Exec(c redis.Connection, args [][]byte) (result redis.Re
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmd + "', or not supported in cluster mode")
}
result = cmdFunc(cluster, c, args)
result = cmdFunc(cluster, c, cmdArgs)
return
}

// AfterClientClose does some clean after client close connection
func (cluster *Cluster) AfterClientClose(c redis.Connection) {
cluster.db.AfterClientClose(c)
}

func Ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
func ping(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
return godis.Ping(cluster.db, args[1:])
}

Expand Down
28 changes: 13 additions & 15 deletions cluster/com.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// communicate with peers within cluster
package cluster

import (
Expand Down Expand Up @@ -33,29 +32,28 @@ func (cluster *Cluster) returnPeerClient(peer string, peerClient *client.Client)
return connectionFactory.ReturnObject(context.Background(), peerClient)
}

// relay command to peer
// relay relays command to peer
// cannot call Prepare, Commit, Rollback of self node
func (cluster *Cluster) Relay(peer string, c redis.Connection, args [][]byte) redis.Reply {
func (cluster *Cluster) relay(peer string, c redis.Connection, args [][]byte) redis.Reply {
if peer == cluster.self {
// to self db
return cluster.db.Exec(c, args)
} else {
peerClient, err := cluster.getPeerClient(peer)
if err != nil {
return reply.MakeErrReply(err.Error())
}
defer func() {
_ = cluster.returnPeerClient(peer, peerClient)
}()
return peerClient.Send(args)
}
peerClient, err := cluster.getPeerClient(peer)
if err != nil {
return reply.MakeErrReply(err.Error())
}
defer func() {
_ = cluster.returnPeerClient(peer, peerClient)
}()
return peerClient.Send(args)
}

// broadcast command to all node in cluster
func (cluster *Cluster) Broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply {
// broadcast broadcasts command to all node in cluster
func (cluster *Cluster) broadcast(c redis.Connection, args [][]byte) map[string]redis.Reply {
result := make(map[string]redis.Reply)
for _, node := range cluster.nodes {
reply := cluster.Relay(node, c, args)
reply := cluster.relay(node, c, args)
result[node] = reply
}
return result
Expand Down
6 changes: 3 additions & 3 deletions cluster/com_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ func TestRelay(t *testing.T) {
testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"})
key := RandString(4)
value := RandString(4)
ret := testCluster2.Relay("127.0.0.1:6379", nil, toArgs("SET", key, value))
ret := testCluster2.relay("127.0.0.1:6379", nil, toArgs("SET", key, value))
asserts.AssertNotError(t, ret)
ret = testCluster2.Relay("127.0.0.1:6379", nil, toArgs("GET", key))
ret = testCluster2.relay("127.0.0.1:6379", nil, toArgs("GET", key))
asserts.AssertBulkReply(t, ret, value)
}

func TestBroadcast(t *testing.T) {
testCluster2 := MakeTestCluster([]string{"127.0.0.1:6379"})
key := RandString(4)
value := RandString(4)
rets := testCluster2.Broadcast(nil, toArgs("SET", key, value))
rets := testCluster2.broadcast(nil, toArgs("SET", key, value))
for _, v := range rets {
asserts.AssertNotError(t, v)
}
Expand Down
28 changes: 15 additions & 13 deletions cluster/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
)

// Del atomically removes given keys from cluster, keys can be distributed on any node
// if the given keys are distributed on different node, Del will use try-commit-catch to remove them
func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'del' command")
Expand All @@ -17,22 +19,22 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
groupMap := cluster.groupBy(keys)
if len(groupMap) == 1 && allowFastTransaction { // do fast
for peer, group := range groupMap { // only one group
return cluster.Relay(peer, c, makeArgs("DEL", group...))
return cluster.relay(peer, c, makeArgs("DEL", group...))
}
}
// prepare
var errReply redis.Reply
txId := cluster.idGenerator.NextId()
txIdStr := strconv.FormatInt(txId, 10)
txID := cluster.idGenerator.NextId()
txIDStr := strconv.FormatInt(txID, 10)
rollback := false
for peer, group := range groupMap {
args := []string{txIdStr}
args := []string{txIDStr}
args = append(args, group...)
var resp redis.Reply
if peer == cluster.self {
resp = PrepareDel(cluster, c, makeArgs("PrepareDel", args...))
resp = prepareDel(cluster, c, makeArgs("PrepareDel", args...))
} else {
resp = cluster.Relay(peer, c, makeArgs("PrepareDel", args...))
resp = cluster.relay(peer, c, makeArgs("PrepareDel", args...))
}
if reply.IsErrorReply(resp) {
errReply = resp
Expand All @@ -43,10 +45,10 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
var respList []redis.Reply
if rollback {
// rollback
RequestRollback(cluster, c, txId, groupMap)
requestRollback(cluster, c, txID, groupMap)
} else {
// commit
respList, errReply = RequestCommit(cluster, c, txId, groupMap)
respList, errReply = requestCommit(cluster, c, txID, groupMap)
if errReply != nil {
rollback = true
}
Expand All @@ -63,19 +65,19 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
}

// args: PrepareDel id keys...
func PrepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
func prepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 3 {
return reply.MakeErrReply("ERR wrong number of arguments for 'preparedel' command")
}
txId := string(args[1])
txID := string(args[1])
keys := make([]string, 0, len(args)-2)
for i := 2; i < len(args); i++ {
arg := args[i]
keys = append(keys, string(arg))
}
txArgs := makeArgs("DEL", keys...) // actual args for cluster.db
tx := NewTransaction(cluster, c, txId, txArgs, keys)
cluster.transactions.Put(txId, tx)
tx := NewTransaction(cluster, c, txID, txArgs, keys)
cluster.transactions.Put(txID, tx)
err := tx.prepare()
if err != nil {
return reply.MakeErrReply(err.Error())
Expand All @@ -84,7 +86,7 @@ func PrepareDel(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply
}

// invoker should provide lock
func CommitDel(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
func commitDel(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
keys := make([]string, len(tx.args))
for i, v := range tx.args {
keys[i] = string(v)
Expand Down
4 changes: 3 additions & 1 deletion cluster/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"github.com/hdt3213/godis/redis/reply"
)

// FlushDB removes all data in current database
func FlushDB(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
replies := cluster.Broadcast(c, args)
replies := cluster.broadcast(c, args)
var errReply reply.ErrorReply
for _, v := range replies {
if reply.IsErrorReply(v) {
Expand All @@ -20,6 +21,7 @@ func FlushDB(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
return reply.MakeErrReply("error occurs: " + errReply.Error())
}

// FlushAll removes all data in cluster
func FlushAll(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
return FlushDB(cluster, c, args)
}
Loading

0 comments on commit 6617272

Please sign in to comment.