diff --git a/example/README.md b/example/README.md new file mode 100644 index 00000000..531a8cf9 --- /dev/null +++ b/example/README.md @@ -0,0 +1,68 @@ +# raftexample + +raftexample is an example usage of etcd's [raft library](https://github.com/etcd-io/raft). It provides a simple REST API for a key-value store cluster backed by the [Raft][raft] consensus algorithm. + +[raft]: http://raftconsensus.github.io/ + +## Getting Started + +### Building raftexample + +To Build `example`: + +```sh +git clone git@github.com:etcd-io/raft.git +cd ./raft/example +go mod tidy +go build -o bin/raftexample main.go +``` + +### Running single node raftexample + +First start a single-member cluster of raftexample: + +```sh +./bin/raftexample --id 1 --cluster http://127.0.0.1:12379 --port 12379 +``` + +Each raftexample process maintains a single raft instance and a key-value server. +The process's list of comma separated peers (--cluster), its raft ID index into the peer list (--id), and http key-value server port (--port) are passed through the command line. + +Next, store a value ("hello") to a key ("my-key"): + +``` +curl -L http://127.0.0.1:12379/my-key -XPUT -d hello +``` + +Finally, retrieve the stored key: + +``` +curl -L http://127.0.0.1:12379/my-key +``` + +### Dynamic cluster reconfiguration + +Nodes can be added to or removed from a running cluster using requests to the REST API. + +For example, suppose we have a 3-node cluster that was started with the commands: + +``` +./bin/raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12379 +./bin/raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22379 +./bin/raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32379 +``` + + +A fourth node with ID 4 can be added by issuing a POST: +```sh +./bin/raftexample --id 4 --cluster http://127.0.0.1:42379 --port 42379 + +curl -L http://127.0.0.1:12379/4 -XPOST -d http://127.0.0.1:42379 +``` + +We can remove a node using a DELETE request: +```sh +curl -L http://127.0.0.1:12379/3 -XDELETE +``` + +Node 3 should shut itself down once the cluster has processed this request. diff --git a/example/go.mod b/example/go.mod new file mode 100644 index 00000000..cd8cb45d --- /dev/null +++ b/example/go.mod @@ -0,0 +1,17 @@ +module go.etcd.io/raft/v3/example + +go 1.21 + +require ( + github.com/sirupsen/logrus v1.9.3 + go.etcd.io/raft/v3 v3.0.0 +) + +replace go.etcd.io/raft/v3 => ../ + +require ( + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + golang.org/x/sys v0.15.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/example/go.sum b/example/go.sum new file mode 100644 index 00000000..44734740 --- /dev/null +++ b/example/go.sum @@ -0,0 +1,61 @@ +github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA= +github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/example/main.go b/example/main.go new file mode 100644 index 00000000..d2b9dd16 --- /dev/null +++ b/example/main.go @@ -0,0 +1,77 @@ +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + "time" + + log "github.com/sirupsen/logrus" + + "go.etcd.io/raft/v3/example/server" +) + +func main() { + cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") + id := flag.Uint64("id", 1, "node ID") + kvport := flag.Int("port", 9021, "key-value server port") + logLevel := flag.String("log-level", "info", "You can set the logging level") + logCaller := flag.Bool("log-caller", false, "You can set the logging Caller") + //join := flag.Bool("join", false, "join an existing cluster") + flag.Parse() + + newLogLevel, err := log.ParseLevel(*logLevel) + if err != nil { + newLogLevel = log.InfoLevel + } + + log.SetLevel(newLogLevel) + if *logCaller { + log.SetReportCaller(true) + } + + var members = []server.Member{} + for i, v := range strings.Split(*cluster, ",") { + members = append(members, server.Member{ + NodeID: uint64(i + 1), + Host: v, + Learner: false, + }) + } + cfg := &server.Config{ + NodeId: *id, + ListenPort: *kvport, + TickInterval: 500 * time.Millisecond, + ElectionTick: 5, + Members: members, + } + fmt.Println(cfg.Members) + + if err := cfg.Verify(); err != nil { + log.Fatalf("config verify err: %v", err) + } + + log.Infof("cfg: %s", cfg) + + svr := server.NewServer(cfg) + svr.Start() + + sig := waitForSigterm() + log.Infof("service received signal %s", sig) + log.Infof("gracefully shutting down http service at :%v", *kvport) + + svr.Stop() + log.Infof("successfully shut down the service") +} + +func waitForSigterm() os.Signal { + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) + for { + sig := <-ch + return sig + } +} diff --git a/example/server/bytespool.go b/example/server/bytespool.go new file mode 100644 index 00000000..984feb31 --- /dev/null +++ b/example/server/bytespool.go @@ -0,0 +1,85 @@ +package server + +import ( + "sync" +) + +func newBytes(size int) func() interface{} { + return func() interface{} { + return make([]byte, size) + } +} + +const ( + zeroSize int = 1 << 14 // 16K + + // 1K - 2K - 4K - 8K - 16K - 32K - 64K + numPools = 7 + sizeStep = 2 + startSize int = 1 << 10 // 1K + maxSize int = 1 << 16 // 64K +) + +var ( + zero = make([]byte, zeroSize) + + pools [numPools]sync.Pool + poolSize [numPools]int +) + +func init() { + size := startSize + for ii := 0; ii < numPools; ii++ { + pools[ii] = sync.Pool{ + New: newBytes(size), + } + poolSize[ii] = size + size *= sizeStep + } +} + +// GetPool returns a sync.Pool that generates bytes slice with the size. +// Return nil if no such pool exists. +func GetPool(size int) *sync.Pool { + for idx, psize := range poolSize { + if size <= psize { + return &pools[idx] + } + } + return nil +} + +// Alloc returns a bytes slice with the size. +// Make a new bytes slice if oversize. +func Alloc(size int) []byte { + if pool := GetPool(size); pool != nil { + b := pool.Get().([]byte) + return b[:size] + } + return make([]byte, size) +} + +// Free puts the bytes slice into suitable pool. +// Discard the bytes slice if oversize. +func Free(b []byte) { + size := cap(b) + if size > maxSize { + return + } + + b = b[0:size] + for ii := numPools - 1; ii >= 0; ii-- { + if size >= poolSize[ii] { + pools[ii].Put(b) // nolint: staticcheck + return + } + } +} + +// Zero clean up the bytes slice b to zero. +func Zero(b []byte) { + for len(b) > 0 { + n := copy(b, zero) + b = b[n:] + } +} diff --git a/example/server/codec.go b/example/server/codec.go new file mode 100644 index 00000000..bb5d9c7c --- /dev/null +++ b/example/server/codec.go @@ -0,0 +1,122 @@ +package server + +import ( + "bytes" + "encoding/binary" + "hash/crc32" + "io" + + log "github.com/sirupsen/logrus" + + pb "go.etcd.io/raft/v3/raftpb" +) + +type raftMsgs []pb.Message + +func (msgs raftMsgs) Len() int { + return len([]pb.Message(msgs)) +} + +// msgcnt 4 bytes +// len|recoder +// ... +// len|recoder +// crc 4 bytes +func (msgs raftMsgs) Encode(w io.Writer) error { + crc := crc32.NewIEEE() + mw := io.MultiWriter(w, crc) + cnt := uint32(msgs.Len()) + b := make([]byte, 4) + + // write header + binary.BigEndian.PutUint32(b, cnt) + if _, err := w.Write(b); err != nil { + return err + } + + // write body + for i := 0; i < msgs.Len(); i++ { + buf := Alloc(4 + msgs[i].Size()) + binary.BigEndian.PutUint32(buf, uint32(msgs[i].Size())) + _, err := msgs[i].MarshalTo(buf[4:]) + if err != nil { + Free(buf) + return err + } + if _, err = mw.Write(buf); err != nil { + Free(buf) + return err + } + Free(buf) + } + + // write checksum + binary.BigEndian.PutUint32(b, crc.Sum32()) + _, err := w.Write(b) + return err +} + +func (msgs raftMsgs) Decode(r io.Reader) (raftMsgs, error) { + w := crc32.NewIEEE() + tr := io.TeeReader(r, w) + + b := make([]byte, 4) + // read msgcnt + if _, err := io.ReadFull(r, b); err != nil { + log.Errorf("read head[msgcnt] error: %v", err) + return nil, err + } + cnt := binary.BigEndian.Uint32(b) + msgs = make([]pb.Message, 0, cnt) + for i := 0; i < int(cnt); i++ { + // read recorder len + if _, err := io.ReadFull(tr, b); err != nil { + log.Errorf("read the %d's recorder len error: %v", i, err) + return nil, err + } + msglen := binary.BigEndian.Uint32(b) + data := Alloc(int(msglen)) + // read recorder + if _, err := io.ReadFull(tr, data); err != nil { + log.Errorf("read the %d's recorder error: %v", i, err) + Free(data) + return nil, err + } + var msg pb.Message + if err := msg.Unmarshal(data); err != nil { + Free(data) + return nil, err + } + Free(data) + msgs = append(msgs, msg) + } + // read checksum + if _, err := io.ReadFull(r, b); err != nil { + log.Errorf("read checksum error: %v", err) + return nil, err + } + if binary.BigEndian.Uint32(b) != w.Sum32() { + log.Error("checksum not match") + return nil, ErrInvalidData + } + + return msgs, nil +} + +func normalEntryEncode(id uint64, data []byte) []byte { + var buf bytes.Buffer + err := binary.Write(&buf, binary.BigEndian, id) + if err != nil { + panic(err) + } + _, err = buf.Write(data) + if err != nil { + panic(err) + } + return buf.Bytes() +} + +func normalEntryDecode(data []byte) (uint64, []byte) { + id := binary.BigEndian.Uint64(data[0:8]) + return id, data[8:] +} diff --git a/example/server/config.go b/example/server/config.go new file mode 100644 index 00000000..f2899f1d --- /dev/null +++ b/example/server/config.go @@ -0,0 +1,88 @@ +package server + +import ( + "encoding/json" + "fmt" + "time" +) + +type Config struct { + // NodeID is the identity of the local node. NodeID cannot be 0. + // This parameter is required. + NodeId uint64 `json:"nodeId"` + ListenPort int `json:"listen_port"` + + // TickInterval is the interval of timer which check heartbeat and election timeout. + // The default value is 100ms. + TickInterval time.Duration `json:"tick_interval"` + // HeartbeatTick is the heartbeat interval. A leader sends heartbeat + // message to maintain the leadership every heartbeat interval. + // The default value is 1. + HeartbeatTick int `json:"heartbeat_tick"` + // ElectionTick is the election timeout. If a follower does not receive any message + // from the leader of current term during ElectionTick, it will become candidate and start an election. + // ElectionTick must be greater than HeartbeatTick. + // We suggest to use ElectionTick = 5 * HeartbeatTick to avoid unnecessary leader switching. + // The default value is 5. + ElectionTick int `json:"election_tick"` + + // MaxSnapConcurrency limits the max number of snapshot concurrency. + // the default value is 10. + MaxSnapConcurrency int `json:"max_snapshots"` + + // SnapshotTimeout is the snapshot timeout in memory. + // the default value is 10s + SnapshotTimeout int `json:"snapshot_timeout"` + + ProposeTimeout int `json:"propose_timeout"` + + Members []Member `json:"-"` + + // Applied is the last applied index. It should only be set when restarting + Applied uint64 `json:"-"` + + SM StateMachine `json:"-"` +} + +func (cfg *Config) Verify() error { + if cfg.NodeId == 0 { + return fmt.Errorf("invalid nodeid=%d", cfg.NodeId) + } + + if cfg.ListenPort == 0 { + return fmt.Errorf("invalid listen port=%d", cfg.ListenPort) + } + + if cfg.TickInterval <= 0 { + cfg.TickInterval = 100 * time.Millisecond + } + + if cfg.HeartbeatTick <= 0 { + cfg.HeartbeatTick = 1 + } + + if cfg.ElectionTick <= 0 { + cfg.ElectionTick = 5 + } + + if cfg.MaxSnapConcurrency <= 0 { + cfg.MaxSnapConcurrency = 10 + } + + if cfg.SnapshotTimeout <= 0 { + cfg.SnapshotTimeout = 10 + } + + if cfg.ProposeTimeout <= 0 { + cfg.ProposeTimeout = 10 + } + return nil +} + +func (cfg *Config) String() string { + b, err := json.Marshal(cfg) + if err != nil { + return fmt.Sprintf("config Marshal err :%w", err) + } + return string(b) +} diff --git a/example/server/engine.go b/example/server/engine.go new file mode 100644 index 00000000..90841bb1 --- /dev/null +++ b/example/server/engine.go @@ -0,0 +1,20 @@ +package server + +type Engine struct { + store *Store +} + +func NewEngine() *Engine { + return &Engine{ + store: NewStore(), + } +} + +func (e *Engine) Apply(data [][]byte) error { + + return e.store.Batch(data) +} + +func (e *Engine) Get(key string) (string, bool) { + return e.store.Get(key) +} diff --git a/example/server/errors.go b/example/server/errors.go new file mode 100644 index 00000000..c203e6f4 --- /dev/null +++ b/example/server/errors.go @@ -0,0 +1,13 @@ +package server + +import ( + "errors" +) + +var ( + ErrInvalidData = errors.New("raftserver: Invalid data") + ErrStopped = errors.New("raftserver: server stopped") + ErrNotFoundNotifier = errors.New("raftserver: not found notifier") + ErrTimeout = errors.New("raftserver: request timed out") + ErrNoPeers = errors.New("raftserver: no peers in config") +) diff --git a/example/server/kvstore.go b/example/server/kvstore.go new file mode 100644 index 00000000..ec19eca5 --- /dev/null +++ b/example/server/kvstore.go @@ -0,0 +1,67 @@ +package server + +import ( + "bytes" + "encoding/gob" + "sync" + + log "github.com/sirupsen/logrus" +) + +// Store a key-value Storage +type Store struct { + mu sync.RWMutex + kv map[string]string // current committed key-value pairs +} + +type kv struct { + Key string + Val string +} + +func NewStore() *Store { + return &Store{ + kv: make(map[string]string), + } +} + +func (s *Store) Apply(data [][]byte, index uint64) error { + for _, v := range data { + var dataKv kv + dec := gob.NewDecoder(bytes.NewBuffer(v)) + if err := dec.Decode(&dataKv); err != nil { + log.Fatalf("raftexample: could not decode message (%v)", err) + } + s.mu.Lock() + s.kv[dataKv.Key] = dataKv.Val + s.mu.Unlock() + + log.Infof("kv key=%s, val=%s", dataKv.Key, dataKv.Val) + } + + return nil +} + +func (s *Store) Batch(data [][]byte) error { + for _, v := range data { + var dataKv kv + dec := gob.NewDecoder(bytes.NewBuffer(v)) + if err := dec.Decode(&dataKv); err != nil { + log.Fatalf("raftexample: could not decode message (%v)", err) + } + s.mu.Lock() + s.kv[dataKv.Key] = dataKv.Val + s.mu.Unlock() + + log.Infof("kv key=%s, val=%s", dataKv.Key, dataKv.Val) + } + + return nil +} + +func (s *Store) Get(key string) (string, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + v, ok := s.kv[key] + return v, ok +} diff --git a/example/server/log_storage.go b/example/server/log_storage.go new file mode 100644 index 00000000..1edaebec --- /dev/null +++ b/example/server/log_storage.go @@ -0,0 +1,212 @@ +package server + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" + + "go.etcd.io/raft/v3" + pb "go.etcd.io/raft/v3/raftpb" + + log "github.com/sirupsen/logrus" +) + +type raftLogStorage struct { + nodeId uint64 + walMu sync.RWMutex + wal *raft.MemoryStorage + ssc *SnapshotCollection + sm StateMachine + cs pb.ConfState + memberMu sync.RWMutex + members map[uint64]Member + applied uint64 + snapIndex uint64 +} + +func NewRaftLogStorage(nodeId uint64, sm StateMachine, shotter *SnapshotCollection) (*raftLogStorage, error) { + rs := &raftLogStorage{ + nodeId: nodeId, + ssc: shotter, + members: make(map[uint64]Member), + sm: sm, + } + + rs.wal = raft.NewMemoryStorage() + + return rs, nil +} + +func (s *raftLogStorage) InitialState() (pb.HardState, pb.ConfState, error) { + hs, _, err := s.wal.InitialState() + return hs, s.cs, err +} + +func (s *raftLogStorage) Term(index uint64) (uint64, error) { + s.walMu.RLock() + defer s.walMu.RUnlock() + return s.wal.Term(index) +} + +func (s *raftLogStorage) LastIndex() (uint64, error) { + s.walMu.RLock() + defer s.walMu.RUnlock() + return s.wal.LastIndex() +} + +func (s *raftLogStorage) FirstIndex() (uint64, error) { + s.walMu.RLock() + defer s.walMu.RUnlock() + return s.wal.FirstIndex() +} + +func (s *raftLogStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { + s.walMu.RLock() + defer s.walMu.RUnlock() + return s.wal.Entries(lo, hi, maxSize) +} + +func (s *raftLogStorage) Snapshot() (pb.Snapshot, error) { + var members []*Member + s.memberMu.RLock() + cs := s.cs + for _, m := range s.members { + member := &Member{} + *member = m + members = append(members, member) + } + s.memberMu.RUnlock() + + st, err := s.sm.Snapshot() + if err != nil { + return pb.Snapshot{}, err + } + snapIndex := st.Index() + snapTerm, err := s.Term(snapIndex) + if err != nil { + st.Close() + return pb.Snapshot{}, err + } + if snapIndex > s.Applied() { + st.Close() + return pb.Snapshot{}, fmt.Errorf("snapIndex(%d) greater than applied(%d)", snapIndex, s.Applied()) + } + name := st.Name() + snap := &snapshot{ + st: st, + meta: SnapshotMeta{ + Name: name, + Index: snapIndex, + Term: snapTerm, + Mbs: members, + Voters: cs.Voters, + Learners: cs.Learners, + }, + } + runtime.SetFinalizer(snap, func(snap *snapshot) { + snap.Close() + }) + if err = s.ssc.Set(snap); err != nil { + log.Errorf("set snapshot(%s) error: %v", name, err) + return pb.Snapshot{}, err + } + log.Infof("generator a snapshot(%s)", name) + return pb.Snapshot{ + Data: []byte(name), + Metadata: pb.SnapshotMetadata{ + ConfState: cs, + Index: snapIndex, + Term: snapTerm, + }, + }, nil +} + +func (s *raftLogStorage) SaveEntries(entries []pb.Entry) error { + s.walMu.Lock() + defer s.walMu.Unlock() + return s.wal.Append(entries) +} + +func (s *raftLogStorage) SaveHardState(hs pb.HardState) error { + s.walMu.Lock() + defer s.walMu.Unlock() + return s.wal.SetHardState(hs) +} + +func (s *raftLogStorage) SetApplied(applied uint64) { + atomic.StoreUint64(&s.applied, applied) +} + +func (s *raftLogStorage) SetSnapIndex(index uint64) { + atomic.StoreUint64(&s.snapIndex, index) +} + +func (s *raftLogStorage) Applied() uint64 { + return atomic.LoadUint64(&s.applied) +} + +func (s *raftLogStorage) Truncate(index uint64) error { + s.walMu.Lock() + defer s.walMu.Unlock() + return s.wal.Compact(index) +} + +func (s *raftLogStorage) ApplySnapshot(st pb.Snapshot) error { + s.walMu.Lock() + defer s.walMu.Unlock() + if err := s.wal.ApplySnapshot(st); err != nil { + return err + } + s.SetApplied(st.Metadata.Index) + return nil +} + +func (s *raftLogStorage) confState() pb.ConfState { + var cs pb.ConfState + + for _, m := range s.members { + if m.Learner { + cs.Learners = append(cs.Learners, m.NodeID) + } else { + cs.Voters = append(cs.Voters, m.NodeID) + } + } + + return cs +} + +func (s *raftLogStorage) AddMembers(m Member) { + s.memberMu.Lock() + defer s.memberMu.Unlock() + s.members[m.NodeID] = m + s.cs = s.confState() +} + +func (s *raftLogStorage) RemoveMember(id uint64) { + s.memberMu.Lock() + defer s.memberMu.Unlock() + delete(s.members, id) + s.cs = s.confState() +} + +func (s *raftLogStorage) SetMembers(members []*Member) { + mbs := make(map[uint64]Member) + s.memberMu.Lock() + defer s.memberMu.Unlock() + for i := 0; i < len(members); i++ { + mbs[members[i].NodeID] = *members[i] + } + s.members = mbs + s.cs = s.confState() +} + +func (s *raftLogStorage) GetMember(id uint64) (Member, bool) { + s.memberMu.RLock() + defer s.memberMu.RUnlock() + m, hit := s.members[id] + return m, hit +} + +func (s *raftLogStorage) Close() { +} diff --git a/example/server/notify.go b/example/server/notify.go new file mode 100644 index 00000000..ea58b2c3 --- /dev/null +++ b/example/server/notify.go @@ -0,0 +1,56 @@ +package server + +import ( + "context" +) + +type notifier chan error + +func newNotifier() notifier { + return make(chan error, 1) +} + +func (nc notifier) notify(err error) { + select { + case nc <- err: + default: + } +} + +func (nc notifier) wait(ctx context.Context, stopc <-chan struct{}) error { + select { + case err := <-nc: + return err + case <-ctx.Done(): + return ctx.Err() + case <-stopc: + return ErrStopped + } +} + +type readIndexNotifier struct { + ch chan struct{} + err error +} + +func newReadIndexNotifier() *readIndexNotifier { + return &readIndexNotifier{ + ch: make(chan struct{}), + } +} + +func (nr *readIndexNotifier) Notify(err error) { + nr.err = err + close(nr.ch) +} + +func (nr *readIndexNotifier) Wait(ctx context.Context, stopc <-chan struct{}) error { + select { + case <-nr.ch: + return nr.err + case <-ctx.Done(): + return ctx.Err() + case <-stopc: + return ErrStopped + } +} diff --git a/example/server/raft_server.go b/example/server/raft_server.go new file mode 100644 index 00000000..b0de8bfa --- /dev/null +++ b/example/server/raft_server.go @@ -0,0 +1,706 @@ +package server + +import ( + "bytes" + "context" + "encoding/gob" + "strconv" + "sync" + "sync/atomic" + "time" + + log "github.com/sirupsen/logrus" + + "go.etcd.io/raft/v3" + pb "go.etcd.io/raft/v3/raftpb" +) + +const ( + applyChCapacity = 128 +) + +type RaftServer interface { + Start(rh RequestHandler) + Stop() + + Propose(ctx context.Context, key, value string) error + ReadIndex(ctx context.Context) error + TransferLeadership(ctx context.Context, leader, transferee uint64) + AddMember(ctx context.Context, member Member) error + RemoveMember(ctx context.Context, nodeID uint64) error + IsLeader() bool + Status() Status + + // In order to prevent log expansion, the application needs to call this method. + Truncate(index uint64) error +} + +// toApply contains entries, snapshot to be applied. Once +// an toApply is consumed, the entries will be persisted to +// to raft storage concurrently; the application must read +// raftDone before assuming the raft messages are stable. +type toApply struct { + entries []pb.Entry + snapshot pb.Snapshot + + // notifyc synchronizes example server applies with the raft node + notifyc chan struct{} +} + +type raftServer struct { + cfg Config + proposeTimeout time.Duration + tickInterval time.Duration + snapTimeout time.Duration + lead uint64 + node raft.Node + snapshotCollection *SnapshotCollection + logStorage *raftLogStorage + sm StateMachine + uid atomic.Uint64 + readNotifier atomic.Value + notifiers sync.Map + tr Transport + applyWait WaitTime + leaderChangeMu sync.RWMutex + leaderChangeClosed bool + leaderChangeC chan struct{} + readStateC chan raft.ReadState + applyc chan toApply + snapshotC chan Snapshot + snapMsgc chan pb.Message + readwaitc chan struct{} + stopc chan struct{} + once sync.Once +} + +func NewRaftServer(cfg *Config) (RaftServer, error) { + if err := cfg.Verify(); err != nil { + return nil, err + } + proposeTimeout := time.Duration(cfg.ProposeTimeout) * time.Second + snapTimeout := time.Duration(cfg.SnapshotTimeout) * time.Second + rs := &raftServer{ + cfg: *cfg, + proposeTimeout: proposeTimeout, + tickInterval: cfg.TickInterval, + snapTimeout: snapTimeout, + snapshotCollection: NewSnapshotCollection(cfg.MaxSnapConcurrency, snapTimeout), + sm: cfg.SM, + applyWait: NewTimeList(), + leaderChangeClosed: false, + leaderChangeC: make(chan struct{}, 1), + readStateC: make(chan raft.ReadState, 64), + applyc: make(chan toApply, applyChCapacity), + snapshotC: make(chan Snapshot), + snapMsgc: make(chan pb.Message, cfg.MaxSnapConcurrency), + readwaitc: make(chan struct{}, 1), + stopc: make(chan struct{}), + } + rs.readNotifier.Store(newReadIndexNotifier()) + + begin := time.Now() + logStorage, err := NewRaftLogStorage(cfg.NodeId, rs.sm, rs.snapshotCollection) + if err != nil { + return nil, err + } + lastIndex, _ := logStorage.LastIndex() + firstIndex, _ := logStorage.FirstIndex() + hs, _, err := logStorage.InitialState() + if err != nil { + return nil, err + } + + log.Infof("load raft wal success, total: %dus, firstIndex: %d, lastIndex: %d, members: %v", + time.Since(begin).Microseconds(), firstIndex, lastIndex, cfg.Members) + + rs.logStorage = logStorage + raftCfg := &raft.Config{ + ID: cfg.NodeId, + ElectionTick: cfg.ElectionTick, + HeartbeatTick: cfg.HeartbeatTick, + Storage: logStorage, + MaxSizePerMsg: 64 * 1024 * 1024, + MaxInflightMsgs: 1024, + CheckQuorum: true, + PreVote: true, + Logger: log.StandardLogger(), + } + rs.tr = NewTransport(cfg.ListenPort, rs) + for _, m := range cfg.Members { + rs.addMember(m) + } + if hs.Commit < cfg.Applied { + cfg.Applied = hs.Commit + } + raftCfg.Applied = cfg.Applied + logStorage.SetApplied(cfg.Applied) + rs.node = raft.RestartNode(raftCfg) + + return rs, nil +} + +func (s *raftServer) Start(rh RequestHandler) { + go s.tr.Serve(rh) + go s.raftStart() + go s.raftApply() + go s.linearizableReadLoop() +} + +func (s *raftServer) Stop() { + s.once.Do(func() { + s.tr.Stop() + s.node.Stop() + close(s.stopc) + s.snapshotCollection.Stop() + s.logStorage.Close() + }) +} + +func (s *raftServer) Propose(ctx context.Context, key, value string) (err error) { + id := s.uid.Add(1) + + var buf bytes.Buffer + if err = gob.NewEncoder(&buf).Encode(kv{Key: key, Val: value}); err != nil { + log.Fatalf("raft propose(%d) err :%v", id, err) + } + + return s.propose(ctx, id, pb.EntryNormal, normalEntryEncode(id, buf.Bytes())) +} + +func (s *raftServer) propose(ctx context.Context, id uint64, entryType pb.EntryType, data []byte) (err error) { + nr := newNotifier() + s.notifiers.Store(id, nr) + var cancel context.CancelFunc + + if _, ok := ctx.Deadline(); !ok { + ctx, cancel = context.WithTimeout(ctx, s.proposeTimeout) + defer cancel() + } + defer func() { + s.notifiers.Delete(id) + }() + log.Infof("propose id %v entryType %s data %s", id, entryType, string(data)) + msg := pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: entryType, Data: data}}} + if err = s.node.Step(ctx, msg); err != nil { + return + } + + return nr.wait(ctx, s.stopc) +} + +func (s *raftServer) IsLeader() bool { + return atomic.LoadUint64(&s.lead) == s.cfg.NodeId +} + +func (s *raftServer) ReadIndex(ctx context.Context) error { + var cancel context.CancelFunc + if _, ok := ctx.Deadline(); !ok { + ctx, cancel = context.WithTimeout(ctx, s.proposeTimeout) + defer cancel() + } + // wait for read state notification + nr := s.readNotifier.Load().(*readIndexNotifier) + select { + case s.readwaitc <- struct{}{}: + default: + } + return nr.Wait(ctx, s.stopc) +} + +func (s *raftServer) TransferLeadership(ctx context.Context, leader, transferee uint64) { + s.node.TransferLeadership(ctx, leader, transferee) +} + +func (s *raftServer) changeMember(ctx context.Context, cc pb.ConfChange) (err error) { + data, err := cc.Marshal() + if err != nil { + return err + } + return s.propose(ctx, cc.ID, pb.EntryConfChange, data) +} + +func (s *raftServer) AddMember(ctx context.Context, member Member) (err error) { + body, err := member.Marshal() + if err != nil { + return err + } + addType := pb.ConfChangeAddNode + if member.Learner { + addType = pb.ConfChangeAddLearnerNode + } + cc := pb.ConfChange{ + ID: s.uid.Add(1), + Type: addType, + NodeID: member.NodeID, + Context: body, + } + return s.changeMember(ctx, cc) +} + +func (s *raftServer) RemoveMember(ctx context.Context, peerId uint64) (err error) { + cc := pb.ConfChange{ + ID: s.uid.Add(1), + Type: pb.ConfChangeRemoveNode, + NodeID: peerId, + } + return s.changeMember(ctx, cc) +} + +func (s *raftServer) Status() Status { + st := s.node.Status() + status := Status{ + Id: st.ID, + Term: st.Term, + Vote: st.Vote, + Commit: st.Commit, + Leader: st.Lead, + RaftState: st.RaftState.String(), + Applied: s.logStorage.Applied(), + RaftApplied: st.Applied, + ApplyingLength: len(s.applyc), + LeadTransferee: st.LeadTransferee, + } + for id, pr := range st.Progress { + var host string + if m, ok := s.logStorage.GetMember(id); ok { + host = m.Host + } + peer := Peer{ + Id: id, + Host: host, + Match: pr.Match, + Next: pr.Next, + State: pr.State.String(), + Paused: pr.IsPaused(), + PendingSnapshot: pr.PendingSnapshot, + RecentActive: pr.RecentActive, + IsLearner: pr.IsLearner, + InflightFull: pr.Inflights.Full(), + InflightCount: pr.Inflights.Count(), + } + status.Peers = append(status.Peers, peer) + } + return status +} + +func (s *raftServer) Truncate(index uint64) error { + return s.logStorage.Truncate(index) +} + +func (s *raftServer) notify(id uint64, err error) { + val, hit := s.notifiers.Load(id) + if !hit { + return + } + val.(notifier).notify(err) +} + +func (s *raftServer) getSnapshot(name string) *snapshot { + return s.snapshotCollection.Get(name) +} + +func (s *raftServer) reportSnapshot(to uint64, status raft.SnapshotStatus) { + s.node.ReportSnapshot(to, status) +} + +func (s *raftServer) deleteSnapshot(name string) { + s.snapshotCollection.Delete(name) +} + +func (s *raftServer) raftApply() { + for { + select { + case ap := <-s.applyc: + entries := ap.entries + snap := ap.snapshot + n := len(s.applyc) + for i := 0; i < n && raft.IsEmptySnap(snap); i++ { + ap = <-s.applyc + entries = append(entries, ap.entries...) + snap = ap.snapshot + } + s.applyEntries(entries) + + // wait for the raft routine to finish the disk writes before triggering a + // snapshot. or applied index might be greater than the last index in raft + // storage, since the raft routine might be slower than toApply routine. + <-ap.notifyc + + s.applySnapshotFinish(snap) + s.applyWait.Trigger(s.logStorage.Applied()) + case snapMsg := <-s.snapMsgc: + go s.processSnapshotMessage(snapMsg) + case snap := <-s.snapshotC: + s.applySnapshot(snap) + case <-s.stopc: + return + } + } +} + +func (s *raftServer) applyConfChange(entry pb.Entry) { + var cc pb.ConfChange + if err := cc.Unmarshal(entry.Data); err != nil { + log.Panicf("unmarshal confchange error: %v", err) + return + } + if entry.Index <= s.logStorage.Applied() { + s.notify(cc.ID, nil) + return + } + switch cc.Type { + case pb.ConfChangeAddNode, pb.ConfChangeAddLearnerNode: + var member Member + if err := member.Unmarshal(cc.Context); err != nil { + log.Panicf("failed to unmarshal context that in conf change, error: %v", err) + } + s.addMember(member) + case pb.ConfChangeRemoveNode: + s.removeMember(cc.NodeID) + default: + } + s.node.ApplyConfChange(cc) + if err := s.sm.ApplyMemberChange(ConfChange(cc), entry.Index); err != nil { + log.Panicf("application sm toApply member change error: %v", err) + } + + s.notify(cc.ID, nil) +} + +func (s *raftServer) applyEntries(entries []pb.Entry) { + var ( + prIds []uint64 + pendinsDatas [][]byte + lastIndex uint64 + ) + if len(entries) == 0 { + return + } + for _, ent := range entries { + switch ent.Type { + case pb.EntryConfChange: + if len(pendinsDatas) > 0 { + if err := s.sm.Apply(pendinsDatas, lastIndex); err != nil { + log.Panicf("StateMachine toApply error: %v", err) + } + for i := 0; i < len(prIds); i++ { + s.notify(prIds[i], nil) + } + pendinsDatas = pendinsDatas[0:0] + prIds = prIds[0:0] + } + s.applyConfChange(ent) + case pb.EntryNormal: + if len(ent.Data) == 0 { + continue + } + id, data := normalEntryDecode(ent.Data) + if ent.Index <= s.logStorage.Applied() { // this message should be ignored + s.notify(id, nil) + continue + } + pendinsDatas = append(pendinsDatas, data) + prIds = append(prIds, id) + lastIndex = ent.Index + default: + } + } + + if len(pendinsDatas) > 0 { + if err := s.sm.Apply(pendinsDatas, lastIndex); err != nil { + log.Panicf("StateMachine toApply error: %v", err) + } + for i := 0; i < len(prIds); i++ { + s.notify(prIds[i], nil) + } + } + + if len(entries) > 0 { + // save applied id + s.logStorage.SetApplied(entries[len(entries)-1].Index) + } +} + +func (s *raftServer) applySnapshotFinish(st pb.Snapshot) { + if raft.IsEmptySnap(st) { + return + } + log.Infof("node[%d] toApply snapshot[meta: %s, name: %s]", s.cfg.NodeId, st.Metadata.String(), string(st.Data)) + if err := s.logStorage.ApplySnapshot(st); err != nil { + log.Panicf("toApply snapshot error: %v", err) + } +} + +func (s *raftServer) applySnapshot(snap Snapshot) { + meta := snap.(*applySnapshot).meta + nr := snap.(*applySnapshot).nr + log.Infof("toApply snapshot(%s) data......", meta.Name) + // read snapshot data + if err := s.sm.ApplySnapshot(meta, snap); err != nil { + log.Errorf("toApply snapshot(%s) error: %v", meta.Name, err) + nr.notify(err) + return + } + log.Infof("toApply snapshot(%s) success", meta.Name) + s.updateMembers(meta.Mbs) + s.logStorage.SetApplied(meta.Index) + nr.notify(nil) +} + +func (s *raftServer) processSnapshotMessage(m pb.Message) { + name := string(m.Snapshot.Data) + st := s.getSnapshot(name) + if st == nil { + log.Errorf("not found snapshot(%s)", name) + s.reportSnapshot(m.To, raft.SnapshotFailure) + return + } + defer s.deleteSnapshot(name) + if err := s.tr.SendSnapshot(m.To, st); err != nil { + s.reportSnapshot(m.To, raft.SnapshotFailure) + log.Errorf("send snapshot(%s) to node(%d) error: %v", name, m.To, err) + return + } + s.reportSnapshot(m.To, raft.SnapshotFinish) + // send snapshot message to m.TO + s.tr.Send([]pb.Message{m}) +} + +func (s *raftServer) raftStart() { + ticker := time.NewTicker(s.tickInterval) + defer ticker.Stop() + islead := false + + for { + select { + case <-s.stopc: + return + case <-ticker.C: + s.node.Tick() + case rd := <-s.node.Ready(): + if rd.SoftState != nil { + leader := atomic.SwapUint64(&s.lead, rd.SoftState.Lead) + if rd.SoftState.Lead != leader { + var leaderHost string + if m, ok := s.logStorage.GetMember(rd.SoftState.Lead); ok { + leaderHost = m.Host + } + if rd.SoftState.Lead == raft.None { + s.leaderChangeMu.Lock() + s.leaderChangeC = make(chan struct{}, 1) + s.leaderChangeClosed = false + s.leaderChangeMu.Unlock() + } else { + if !s.leaderChangeClosed { + close(s.leaderChangeC) + s.leaderChangeClosed = true + } + } + s.sm.LeaderChange(rd.SoftState.Lead, leaderHost) + } + + islead = s.IsLeader() + } + + if len(rd.ReadStates) != 0 { + select { + case s.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: + case <-s.stopc: + return + default: + log.Warn("read state chan is not ready!!!") + } + } + notifyc := make(chan struct{}, 1) + ap := toApply{ + entries: rd.CommittedEntries, + snapshot: rd.Snapshot, + notifyc: notifyc, + } + + s.updateCommittedIndex(&ap) + + select { + case s.applyc <- ap: + case <-s.stopc: + return + } + + // the leader can write to its disk in parallel with replicating to the followers and then + // writing to their disks. + // For more details, check raft thesis 10.2.1 + if islead { + s.tr.Send(s.processMessages(rd.Messages)) + } + + // Append the new entries to log storage. + err := s.logStorage.SaveEntries(rd.Entries) + if err != nil { + log.Panicf("save raft entries error: %v", err) + } + + if !raft.IsEmptyHardState(rd.HardState) { + if err := s.logStorage.SaveHardState(rd.HardState); err != nil { + log.Panicf("save raft hardstate error: %v", err) + } + } + + if !islead { + s.tr.Send(s.processMessages(rd.Messages)) + } + + // leader already processed 'MsgSnap' and signaled + notifyc <- struct{}{} + + s.node.Advance() + } + } +} + +func (s *raftServer) updateCommittedIndex(ap *toApply) { + var ci uint64 + if len(ap.entries) != 0 { + ci = ap.entries[len(ap.entries)-1].Index + } + if ap.snapshot.Metadata.Index > ci { + ci = ap.snapshot.Metadata.Index + } + if ci != 0 { + s.sm.UpdateCommittedIndex(ci) + } +} + +func (s *raftServer) processMessages(ms []pb.Message) []pb.Message { + sentAppResp := false + for i := len(ms) - 1; i >= 0; i-- { + if _, hit := s.logStorage.GetMember(ms[i].To); !hit { + ms[i].To = 0 + } + if ms[i].Type == pb.MsgAppResp { + if sentAppResp { + ms[i].To = 0 + } else { + sentAppResp = true + } + } + + if ms[i].Type == pb.MsgSnap { + select { + case s.snapMsgc <- ms[i]: + default: + s.snapshotCollection.Delete(string(ms[i].Snapshot.Data)) + s.node.ReportSnapshot(ms[i].To, raft.SnapshotFailure) + } + ms[i].To = 0 + } + } + return ms +} + +func (s *raftServer) readIndexOnce() error { + ctx, cancel := context.WithTimeout(context.Background(), s.proposeTimeout) + defer cancel() + readId := strconv.AppendUint([]byte{}, s.uid.Add(1), 10) + s.leaderChangeMu.RLock() + leaderChangeC := s.leaderChangeC + s.leaderChangeMu.RUnlock() + select { + case <-leaderChangeC: + case <-s.stopc: + return ErrStopped + case <-ctx.Done(): + return ctx.Err() + } + err := s.node.ReadIndex(ctx, readId) + if err != nil { + log.Errorf("read index error: %v", err) + return err + } + + done := false + var rs raft.ReadState + for !done { + select { + case rs = <-s.readStateC: + done = bytes.Equal(rs.RequestCtx, readId) + if !done { + log.Warn("ignored out-of-date read index response") + } + case <-ctx.Done(): + log.Warnf("raft read index timeout, the length of applyC is %d", len(s.applyc)) + return ctx.Err() + case <-s.stopc: + return ErrStopped + } + } + if s.logStorage.Applied() < rs.Index { + select { + case <-s.applyWait.Wait(rs.Index): + case <-s.stopc: + return ErrStopped + } + } + return nil +} + +func (s *raftServer) linearizableReadLoop() { + for { + select { + case <-s.readwaitc: + case <-s.stopc: + return + } + nextnr := newReadIndexNotifier() + nr := s.readNotifier.Load().(*readIndexNotifier) + + var err error + for { + err = s.readIndexOnce() + if err == nil || err == ErrStopped { + break + } + } + + nr.Notify(err) + s.readNotifier.Store(nextnr) + } +} + +func (s *raftServer) addMember(member Member) { + s.logStorage.AddMembers(member) + if member.NodeID != s.cfg.NodeId { + s.tr.AddMember(member) + } +} + +func (s *raftServer) removeMember(id uint64) { + s.logStorage.RemoveMember(id) + s.tr.RemoveMember(id) +} + +func (s *raftServer) updateMembers(mbs []*Member) { + s.logStorage.SetMembers(mbs) + s.tr.SetMembers(mbs) +} + +func (s *raftServer) handleMessage(msgs raftMsgs) error { + ctx, cancel := context.WithTimeout(context.Background(), s.proposeTimeout) + defer cancel() + for i := 0; i < msgs.Len(); i++ { + if err := s.node.Step(ctx, msgs[i]); err != nil { + return err + } + } + return nil +} + +func (s *raftServer) handleSnapshot(st Snapshot) error { + select { + case s.snapshotC <- st: + case <-s.stopc: + return ErrStopped + } + + return st.(*applySnapshot).nr.wait(context.TODO(), s.stopc) +} diff --git a/example/server/server.go b/example/server/server.go new file mode 100644 index 00000000..3b041998 --- /dev/null +++ b/example/server/server.go @@ -0,0 +1,143 @@ +package server + +import ( + "context" + "io" + "net/http" + "strconv" + "sync" + + log "github.com/sirupsen/logrus" +) + +type Server struct { + raftServer RaftServer + engine *Engine + rh RequestHandler + + mu sync.RWMutex + leader Member + + appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. + committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. +} + +func NewServer(cfg *Config) *Server { + server := &Server{} + + cfg.SM = server + raft, err := NewRaftServer(cfg) + if err != nil { + log.Fatalf("start raft server err: %v", err) + } + + server.raftServer = raft + server.engine = NewEngine() + server.rh = newRequestHandler(server) + + return server +} + +func (s *Server) Start() { + s.raftServer.Start(s.rh) +} + +func (s *Server) Stop() { + s.raftServer.Stop() +} + +// newRequestHandler +func newRequestHandler(s *Server) RequestHandler { + return func(w http.ResponseWriter, r *http.Request) bool { + return s.requestHandler(w, r) + } +} + +// requestHandler +func (s *Server) requestHandler(w http.ResponseWriter, r *http.Request) bool { + key := r.RequestURI + defer r.Body.Close() + switch key { + case "/cluster/status": + status := s.raftServer.Status() + w.Write([]byte(status.String())) + return true + } + + switch r.Method { + case http.MethodPut: + v, err := io.ReadAll(r.Body) + if err != nil { + log.Printf("Failed to read on PUT (%v)", err) + http.Error(w, "Failed on PUT", http.StatusBadRequest) + return true + } + + err = s.raftServer.Propose(context.Background(), key, string(v)) + if err != nil { + log.Printf("Failed to read on PUT (%v)", err) + http.Error(w, "Failed on PUT", http.StatusBadRequest) + return true + } + + // optimistic-- no waiting for ack from raft. value is not yet + // committed so a subsequent get on the key may return old value + w.WriteHeader(http.StatusNoContent) + return true + case http.MethodGet: + if v, ok := s.engine.Get(key); ok { + w.Write([]byte(v)) + } else { + http.Error(w, "Failed to GET", http.StatusNotFound) + } + return true + case http.MethodPost: + url, err := io.ReadAll(r.Body) + if err != nil { + log.Printf("Failed to read on POST (%v)", err) + http.Error(w, "Failed on POST", http.StatusBadRequest) + return true + } + + nodeID, err := strconv.ParseUint(key[1:], 0, 64) + if err != nil { + log.Printf("Failed to convert ID for conf change (%v)", err) + http.Error(w, "Failed on POST", http.StatusBadRequest) + return true + } + + member := Member{ + NodeID: nodeID, + Host: string(url), + Learner: false, + Context: nil, + } + + err = s.raftServer.AddMember(context.Background(), member) + if err != nil { + log.Printf("Failed to AddMember(%s) on POST (%v)", member.String(), err) + http.Error(w, "Failed on POST", http.StatusBadRequest) + } else { + w.WriteHeader(http.StatusNoContent) + } + return true + case http.MethodDelete: + nodeID, err := strconv.ParseUint(key[1:], 0, 64) + if err != nil { + log.Printf("Failed to convert ID for conf change (%v)", err) + http.Error(w, "Failed on DELETE", http.StatusBadRequest) + return true + } + + err = s.raftServer.RemoveMember(context.Background(), nodeID) + if err != nil { + log.Printf("Failed to RemoveMember on DELETE (%v)", err) + http.Error(w, "Failed on DELETE", http.StatusBadRequest) + } else { + w.WriteHeader(http.StatusNoContent) + } + return true + default: + return false + } +} diff --git a/example/server/snapshot.go b/example/server/snapshot.go new file mode 100644 index 00000000..b7fd2488 --- /dev/null +++ b/example/server/snapshot.go @@ -0,0 +1,198 @@ +package server + +import ( + "container/list" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + "go.etcd.io/raft/v3" +) + +type snapshot struct { + st Snapshot + meta SnapshotMeta + expire time.Time +} + +func (s *snapshot) Read() ([]byte, error) { + return s.st.Read() +} + +func (s *snapshot) Name() string { + return s.st.Name() +} + +func (s *snapshot) Index() uint64 { + return s.st.Index() +} + +func (s *snapshot) Close() { + s.st.Close() +} + +type applySnapshot struct { + meta SnapshotMeta + r io.Reader + nr notifier +} + +func newApplySnapshot(r io.Reader) Snapshot { + return &applySnapshot{ + r: r, + nr: newNotifier(), + } +} + +func (s *applySnapshot) Read() ([]byte, error) { + b := make([]byte, 4) + crc := crc32.NewIEEE() + tr := io.TeeReader(s.r, crc) + + // read msg header 4 bytes + _, err := io.ReadFull(s.r, b) + if err != nil { + if err != io.EOF { + log.Errorf("read header of snapshot error: %v", err) + } + return nil, err + } + + // read msg body + msgLen := int(binary.BigEndian.Uint32(b)) // recorder len + body := make([]byte, msgLen) + if _, err = io.ReadFull(tr, body); err != nil { + log.Errorf("read recorder of snapshot error: %v len(%d)", err, msgLen) + return nil, err + } + + // read checksum and check + if _, err = io.ReadFull(s.r, b); err != nil { + log.Errorf("read checksum of snapshot error: %v", err) + return nil, err + } + if binary.BigEndian.Uint32(b) != crc.Sum32() { + log.Error("checksum not match") + return nil, ErrInvalidData + } + return body, nil +} + +func (s *applySnapshot) Name() string { + return s.meta.Name +} + +func (s *applySnapshot) Index() uint64 { + return s.meta.Index +} + +func (s *applySnapshot) Close() { + // nothing to do +} + +// SnapshotCollection +type SnapshotCollection struct { + sync.Mutex + maxSnapshot int + timeout time.Duration + evictList *list.List + snaps map[string]*list.Element + stopC chan struct{} +} + +// NewSnapshotCollection +func NewSnapshotCollection(maxSnapshot int, timeout time.Duration) *SnapshotCollection { + snapshotCollection := &SnapshotCollection{ + maxSnapshot: maxSnapshot, + timeout: timeout, + evictList: list.New(), + snaps: make(map[string]*list.Element), + stopC: make(chan struct{}), + } + + go snapshotCollection.evict() + return snapshotCollection +} + +func (s *SnapshotCollection) evict() { + ticker := time.NewTicker(s.timeout) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.Lock() + elem := s.evictList.Front() + if elem != nil { + snap := elem.Value.(*snapshot) + if time.Since(snap.expire) >= 0 { + s.evictList.Remove(elem) + delete(s.snaps, snap.st.Name()) + } + } + s.Unlock() + case <-s.stopC: + s.deleteAll() + return + } + } +} + +func (s *SnapshotCollection) Set(st *snapshot) error { + s.Lock() + defer s.Unlock() + if s.evictList.Len() >= s.maxSnapshot { + elem := s.evictList.Front() + snap := elem.Value.(*snapshot) + if time.Since(snap.expire) < 0 { + return raft.ErrSnapshotTemporarilyUnavailable + } + s.evictList.Remove(elem) + delete(s.snaps, snap.st.Name()) + } + if _, hit := s.snaps[st.Name()]; hit { + return fmt.Errorf("snapshot(%s) exist", st.Name()) + } + st.expire = time.Now().Add(s.timeout) + s.snaps[st.Name()] = s.evictList.PushBack(st) + return nil +} + +func (s *SnapshotCollection) Get(key string) *snapshot { + s.Lock() + defer s.Unlock() + + if v, ok := s.snaps[key]; ok { + snap := v.Value.(*snapshot) + snap.expire = time.Now().Add(s.timeout) + s.evictList.MoveToBack(v) + return snap + } + return nil +} + +func (s *SnapshotCollection) Delete(key string) { + s.Lock() + defer s.Unlock() + if v, ok := s.snaps[key]; ok { + delete(s.snaps, key) + s.evictList.Remove(v) + } +} + +func (s *SnapshotCollection) deleteAll() { + s.Lock() + defer s.Unlock() + for key, val := range s.snaps { + delete(s.snaps, key) + s.evictList.Remove(val) + } +} + +func (s *SnapshotCollection) Stop() { + close(s.stopC) +} diff --git a/example/server/statemachine.go b/example/server/statemachine.go new file mode 100644 index 00000000..3d3495ca --- /dev/null +++ b/example/server/statemachine.go @@ -0,0 +1,81 @@ +package server + +import ( + "sync/atomic" + + log "github.com/sirupsen/logrus" +) + +type Snapshot interface { + Read() ([]byte, error) + Name() string + Index() uint64 + Close() +} + +// The StateMachine interface is supplied by the application to persist/snapshot data of application. +type StateMachine interface { + Apply(data [][]byte, index uint64) error + ApplyMemberChange(cc ConfChange, index uint64) error + Snapshot() (Snapshot, error) + ApplySnapshot(meta SnapshotMeta, st Snapshot) error + + UpdateCommittedIndex(index uint64) + LeaderChange(leader uint64, host string) +} + +func (s *Server) Lookup(key string) (string, bool) { + return s.engine.Get(key) +} + +// Server +func (s *Server) Apply(data [][]byte, index uint64) error { + s.updateApplyIndex(index) + + return s.engine.Apply(data) +} + +func (s *Server) ApplyMemberChange(cc ConfChange, index uint64) error { + s.updateApplyIndex(index) + + //TODO implement me + log.Warning("implement me ApplyMemberChange") + return nil +} + +func (s *Server) Snapshot() (Snapshot, error) { + //TODO implement me + log.Warning("implement me Snapshot") + return nil, nil +} + +func (s *Server) ApplySnapshot(meta SnapshotMeta, st Snapshot) error { + //TODO implement me + log.Warning("implement me ApplySnapshot") + + return nil +} + +func (s *Server) updateApplyIndex(index uint64) { + if atomic.LoadUint64(&s.appliedIndex) < index { + atomic.StoreUint64(&s.appliedIndex, index) + } +} + +func (s *Server) UpdateCommittedIndex(index uint64) { + if atomic.LoadUint64(&s.committedIndex) < index { + atomic.StoreUint64(&s.committedIndex, index) + } +} + +func (s *Server) LeaderChange(leaderID uint64, host string) { + s.mu.Lock() + defer s.mu.Unlock() + member := Member{ + NodeID: leaderID, + Host: host, + Learner: false, + } + s.leader = member + log.Infof("receive leader change, member %s", member.String()) +} diff --git a/example/server/transport.go b/example/server/transport.go new file mode 100644 index 00000000..0d43d2d6 --- /dev/null +++ b/example/server/transport.go @@ -0,0 +1,231 @@ +package server + +import ( + "bufio" + "context" + "fmt" + "net/http" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + pb "go.etcd.io/raft/v3/raftpb" +) + +type Transport interface { + Serve(rh RequestHandler) + Stop() + + Send(msgs []pb.Message) + SendSnapshot(to uint64, st *snapshot) error + RemoveMember(id uint64) + AddMember(m Member) + SetMembers(members []*Member) +} + +type raftServerHandler interface { + handleMessage(msgs raftMsgs) error + handleSnapshot(st Snapshot) error +} + +type transport struct { + port int + raftServerHandler raftServerHandler + rh RequestHandler + httpSvr *http.Server + mu sync.RWMutex + senders map[uint64]*transportSender + pool sync.Pool + once sync.Once +} + +const ( + raftMsgUrl = "/raftMsgs" + snapshotUrl = "/snapshot" +) + +func NewTransport(port int, raftServerHandler raftServerHandler) Transport { + tr := &transport{ + port: port, + raftServerHandler: raftServerHandler, + senders: make(map[uint64]*transportSender), + } + tr.httpSvr = &http.Server{ + Addr: fmt.Sprintf(":%d", port), + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + Handler: func(tr *transport) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + tr.handlerWrapper(w, r) + } + }(tr), + } + tr.pool = sync.Pool{ + New: func() interface{} { + return bufio.NewReader(nil) + }, + } + return tr +} + +type RequestHandler func(w http.ResponseWriter, r *http.Request) bool + +func (tr *transport) Serve(rh RequestHandler) { + tr.rh = rh + if err := tr.httpSvr.ListenAndServe(); err != nil { + if err != http.ErrServerClosed { + log.Panicf("raft transport listen error: %v", err) + } + } +} + +func (tr *transport) handlerWrapper(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case raftMsgUrl: + tr.handleRaftMsg(w, r) + return + case snapshotUrl: + tr.handleSnapshot(w, r) + return + case "/ping": + w.WriteHeader(http.StatusOK) + return + default: + if tr.rh != nil && tr.rh(w, r) { + return + } + http.Error(w, fmt.Sprintf("unsupported path requested: %q", r.URL.Path), http.StatusBadRequest) + return + } +} + +func (tr *transport) RemoveMember(id uint64) { + tr.mu.Lock() + if sender, hit := tr.senders[id]; hit { + delete(tr.senders, id) + sender.Stop() + } + tr.mu.Unlock() +} + +func (tr *transport) AddMember(m Member) { + tr.mu.Lock() + if _, hit := tr.senders[m.NodeID]; !hit { + tr.senders[m.NodeID] = newTransportSender(m.NodeID, m.Host) + } + tr.mu.Unlock() +} + +func (tr *transport) SetMembers(members []*Member) { + senderMap := make(map[uint64]*transportSender) + tr.mu.Lock() + for _, m := range members { + if sender, hit := tr.senders[m.NodeID]; hit { + senderMap[m.NodeID] = sender + delete(tr.senders, m.NodeID) + } else { + senderMap[m.NodeID] = newTransportSender(m.NodeID, m.Host) + } + } + for _, sender := range tr.senders { + sender.Stop() + } + tr.senders = senderMap + tr.mu.Unlock() +} + +func (tr *transport) Stop() { + tr.once.Do(func() { + tr.httpSvr.Shutdown(context.TODO()) + tr.mu.Lock() + for id, sender := range tr.senders { + sender.Stop() + delete(tr.senders, id) + } + tr.mu.Unlock() + }) +} + +func (tr *transport) Send(msgs []pb.Message) { + msgMap := map[uint64][]pb.Message{} + for i := 0; i < len(msgs); i++ { + if msgs[i].To == 0 { + continue + } + msgGroup, hit := msgMap[msgs[i].To] + if !hit { + msgMap[msgs[i].To] = []pb.Message{msgs[i]} + } else { + msgMap[msgs[i].To] = append(msgGroup, msgs[i]) + } + } + + for id, m := range msgMap { + tr.mu.RLock() + sender, hit := tr.senders[id] + tr.mu.RUnlock() + if hit { + sender.Send(m) + } else { + log.Warnf("ignore these messages, because not found sender for node(%d)", id) + } + } +} + +func (tr *transport) SendSnapshot(to uint64, st *snapshot) error { + tr.mu.RLock() + sender, hit := tr.senders[to] + tr.mu.RUnlock() + if !hit { + return fmt.Errorf("not found sender(%d)", to) + } + if err := sender.SendSnapshot(st); err != nil { + return err + } + return nil +} + +func (tr *transport) handleSnapshot(w http.ResponseWriter, r *http.Request) { + buffer := bufio.NewReader(r.Body) + snap := newApplySnapshot(buffer) + metaData, err := snap.Read() + if err != nil { + w.WriteHeader(http.StatusExpectationFailed) + return + } + meta := SnapshotMeta{} + if err := meta.Unmarshal(metaData); err != nil { + w.WriteHeader(http.StatusExpectationFailed) + return + } + snap.(*applySnapshot).meta = meta + log.Infof("recv snapshot meta: %s", meta.String()) + + if err := tr.raftServerHandler.handleSnapshot(snap); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) +} + +func (tr *transport) handleRaftMsg(w http.ResponseWriter, r *http.Request) { + var msgs raftMsgs + buffer := tr.pool.Get().(*bufio.Reader) + buffer.Reset(r.Body) + defer func() { + buffer.Reset(nil) + tr.pool.Put(buffer) + }() + msgs, err := msgs.Decode(buffer) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + if err := tr.raftServerHandler.handleMessage(msgs); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/example/server/transport_sender.go b/example/server/transport_sender.go new file mode 100644 index 00000000..e61ab85e --- /dev/null +++ b/example/server/transport_sender.go @@ -0,0 +1,190 @@ +package server + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "net/http" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + pb "go.etcd.io/raft/v3/raftpb" +) + +const ( + inputChannelSize = 128 + maxMsgSize = 64 * 1024 * 1024 + maxIdleConns = 10 + maxIdleConnsPerHost = 5 + rwBufferSize = 16 * 1024 + idleConnTimeout = 30 * time.Second + requestTimeout = 10 * time.Second +) + +type transportSender struct { + nodeId uint64 + msgUrl string + snapUrl string + inputc chan raftMsgs + stopc chan struct{} + client *http.Client + once sync.Once +} + +func newTransportSender(nodeId uint64, host string) *transportSender { + tr := &http.Transport{ + MaxIdleConns: maxIdleConns, + MaxIdleConnsPerHost: maxIdleConnsPerHost, + WriteBufferSize: rwBufferSize, + ReadBufferSize: rwBufferSize, + IdleConnTimeout: idleConnTimeout, + DisableCompression: true, + } + sender := &transportSender{ + nodeId: nodeId, + msgUrl: fmt.Sprintf("%s%s", host, raftMsgUrl), + snapUrl: fmt.Sprintf("%s%s", host, snapshotUrl), + inputc: make(chan raftMsgs, inputChannelSize), + client: &http.Client{Transport: tr}, + stopc: make(chan struct{}), + } + + go sender.loopSend() + return sender +} + +func (sender *transportSender) Send(msgs []pb.Message) { + select { + case sender.inputc <- raftMsgs(msgs): + case <-sender.stopc: + default: + } +} + +func (sender *transportSender) Stop() { + sender.once.Do(func() { + close(sender.stopc) + }) +} + +func (sender *transportSender) loopSend() { + buffer := &bytes.Buffer{} + var errCnt uint64 + + for { + select { + case msgs := <-sender.inputc: + buffer.Reset() + var size int + for i := 0; i < len(msgs); i++ { + size += msgs[i].Size() + } + for i := 0; i < inputChannelSize && size < maxMsgSize; i++ { + var done bool + select { + case ms := <-sender.inputc: + for i := 0; i < len(ms); i++ { + size += ms[i].Size() + } + msgs = append(msgs, ms...) + default: + done = true + } + if done { + break + } + } + err := msgs.Encode(buffer) + if err != nil { + continue + } + req, err := http.NewRequest(http.MethodPut, sender.msgUrl, buffer) + if err != nil { + continue + } + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + req = req.WithContext(ctx) + req.Header.Set("Content-Type", "application/octet-stream") + resp, err := sender.client.Do(req) + cancel() + if err != nil { + errCnt++ + if errCnt == 1 { + log.Errorf("sent msg to %s error: %v, msg count: %d", sender.msgUrl, err, len(msgs)) + } + continue + } + resp.Body.Close() + prev := errCnt + errCnt = 0 + if prev > 0 { + log.Infof("sent msg to %s success, peer has been recovered", sender.msgUrl) + } + case <-sender.stopc: + return + } + } +} + +func (sender *transportSender) SendSnapshot(snap *snapshot) error { + rd, wr := io.Pipe() + req, err := http.NewRequest(http.MethodPut, sender.snapUrl, rd) + if err != nil { + return fmt.Errorf("New snapshot request error %v", err) + } + metaData, err := snap.meta.Marshal() + if err != nil { + return fmt.Errorf("marshal snapshot meta error %v", err) + } + go func() { + var err error + defer wr.CloseWithError(err) + write := func(data []byte) error { + b := make([]byte, 4) + crc := crc32.NewIEEE() + mw := io.MultiWriter(wr, crc) + binary.BigEndian.PutUint32(b, uint32(len(data))) + if _, err := wr.Write(b); err != nil { + return err + } + if _, err := mw.Write(data); err != nil { + return err + } + binary.BigEndian.PutUint32(b, crc.Sum32()) + if _, err := wr.Write(b); err != nil { + return err + } + return nil + } + if err = write(metaData); err != nil { + return + } + for { + data, err := snap.Read() + if err != nil { + break + } + if err = write(data); err != nil { + break + } + } + }() + + log.Infof("send snapshot(%s) to node(%d) %s", snap.Name(), sender.nodeId, sender.snapUrl) + resp, err := sender.client.Do(req) + if err != nil { + rd.CloseWithError(err) + return fmt.Errorf("send snapshot error %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("send snapshot return unexpect code(%d)", resp.StatusCode) + } + log.Infof("send snapshot(%s) to node(%d) success", snap.Name(), sender.nodeId) + return nil +} diff --git a/example/server/types.go b/example/server/types.go new file mode 100644 index 00000000..d581afed --- /dev/null +++ b/example/server/types.go @@ -0,0 +1,115 @@ +package server + +import ( + "encoding/json" + "fmt" + + pb "go.etcd.io/raft/v3/raftpb" +) + +type ( + ConfChange pb.ConfChange +) + +type Peer struct { + Id uint64 `json:"id"` + Host string `json:"host"` + Match uint64 `json:"match"` + Next uint64 `json:"next"` + State string `json:"state"` + Paused bool `json:"paused"` + PendingSnapshot uint64 `json:"pendingSnapshot"` + RecentActive bool `json:"active"` + IsLearner bool `json:"isLearner"` + InflightFull bool `json:"isInflightFull"` + InflightCount int `json:"inflightCount"` +} + +type Status struct { + Id uint64 `json:"nodeId"` + Term uint64 `json:"term"` + Vote uint64 `json:"vote"` + Commit uint64 `json:"commit"` + Leader uint64 `json:"leader"` + RaftState string `json:"raftState"` + Applied uint64 `json:"applied"` + RaftApplied uint64 `json:"raftApplied"` + LeadTransferee uint64 `json:"transferee"` + ApplyingLength int `json:"applyingLength"` + Peers []Peer `json:"peers"` +} + +type Members struct { + Mbs []Member `json:"members"` +} + +type Member struct { + NodeID uint64 `json:"nodeID,omitempty"` + Host string `json:"host,omitempty"` + Learner bool `json:"learner,omitempty"` + Context []byte `json:"context,omitempty"` +} + +type SnapshotMeta struct { + Name string `json:"name,omitempty"` + Index uint64 `json:"index,omitempty"` + Term uint64 `json:"term,omitempty"` + Mbs []*Member `json:"mbs,omitempty"` + Voters []uint64 `json:"voters,omitempty"` + Learners []uint64 `json:"learners,omitempty"` +} + +func (s *Status) Marshal() ([]byte, error) { + return json.Marshal(s) +} + +func (s *Status) Unmarshal(data []byte) error { + err := json.Unmarshal(data, &s) + + return err +} + +func (s *Status) String() string { + b, err := s.Marshal() + if err != nil { + return fmt.Sprintf("Status Marshal err :%w", err) + } + return string(b) +} + +func (m *Member) Marshal() ([]byte, error) { + return json.Marshal(m) +} + +func (m *Member) Unmarshal(data []byte) error { + err := json.Unmarshal(data, &m) + + return err +} + +func (m *Member) String() string { + b, err := m.Marshal() + if err != nil { + return fmt.Sprintf("Member Marshal err :%w", err) + } + return string(b) +} + +func (s *SnapshotMeta) Marshal() ([]byte, error) { + return json.Marshal(s) +} + +func (s *SnapshotMeta) Unmarshal(data []byte) error { + err := json.Unmarshal(data, &s) + + return err +} + +func (s *SnapshotMeta) String() string { + b, err := s.Marshal() + if err != nil { + return fmt.Sprintf("SnapshotMeta Marshal err :%w", err) + } + return string(b) + +} diff --git a/example/server/wait_time.go b/example/server/wait_time.go new file mode 100644 index 00000000..f13af7db --- /dev/null +++ b/example/server/wait_time.go @@ -0,0 +1,83 @@ +package server + +import ( + "sort" + "sync" +) + +type WaitTime interface { + Wait(deadline uint64) <-chan struct{} + Trigger(deadline uint64) +} + +var closec chan struct{} + +func init() { + closec = make(chan struct{}) + close(closec) +} + +type item struct { + key uint64 + ch chan struct{} +} + +type itemSlice []item + +type timeList struct { + sync.Mutex + lastDeadline uint64 + items itemSlice +} + +func NewTimeList() WaitTime { + return &timeList{ + items: make(itemSlice, 0, 128), + } +} + +func (t *timeList) Wait(deadline uint64) <-chan struct{} { + t.Lock() + defer t.Unlock() + if t.lastDeadline >= deadline { + return closec + } + + i := sort.Search(len(t.items), func(i int) bool { + return t.items[i].key >= deadline + }) + if i < len(t.items) && t.items[i].key == deadline { + return t.items[i].ch + } + it := item{ + key: deadline, + ch: make(chan struct{}), + } + if i == len(t.items) { + t.items = append(t.items, it) + return it.ch + } + t.items = append(t.items, it) // this for expand memory space + copy(t.items[i+1:], t.items[i:len(t.items)-1]) + t.items[i] = it + return it.ch +} + +func (t *timeList) Trigger(deadline uint64) { + t.Lock() + defer t.Unlock() + t.lastDeadline = deadline + index := sort.Search(len(t.items), func(i int) bool { + return t.items[i].key > deadline + }) + + for i := 0; i < index; i++ { + close(t.items[i].ch) + } + if index == len(t.items) { + t.items = t.items[0:0] + return + } + copy(t.items[0:index], t.items[index:]) + t.items = t.items[0 : len(t.items)-index] +}