From 10182ad97e5badad0983305bef1419a7d81413f8 Mon Sep 17 00:00:00 2001 From: xiaost Date: Wed, 29 Mar 2017 16:19:57 +0800 Subject: [PATCH] init commit --- .gitignore | 38 ++ README.md | 38 ++ cache/cache.go | 214 +++++++++++ cache/cache_test.go | 54 +++ cache/data.go | 109 ++++++ cache/data_test.go | 89 +++++ cache/gen_proto.sh | 3 + cache/hash.go | 73 ++++ cache/hash_test.go | 50 +++ cache/index.go | 228 +++++++++++ cache/index.pb.go | 622 +++++++++++++++++++++++++++++++ cache/index.proto | 21 ++ cache/index_test.go | 199 ++++++++++ cache/pool.go | 74 ++++ cache/pool_test.go | 34 ++ cache/shard.go | 286 ++++++++++++++ cache/shard_test.go | 88 +++++ main.go | 94 +++++ protocol/memcache/common.go | 36 ++ protocol/memcache/parser.go | 160 ++++++++ protocol/memcache/parser_test.go | 197 ++++++++++ server/common.go | 38 ++ server/common_test.go | 107 ++++++ server/memcache.go | 299 +++++++++++++++ server/memcache_test.go | 88 +++++ tools/mcstat/mcstat.go | 144 +++++++ 26 files changed, 3383 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 cache/cache.go create mode 100644 cache/cache_test.go create mode 100644 cache/data.go create mode 100644 cache/data_test.go create mode 100755 cache/gen_proto.sh create mode 100644 cache/hash.go create mode 100644 cache/hash_test.go create mode 100644 cache/index.go create mode 100644 cache/index.pb.go create mode 100644 cache/index.proto create mode 100644 cache/index_test.go create mode 100644 cache/pool.go create mode 100644 cache/pool_test.go create mode 100644 cache/shard.go create mode 100644 cache/shard_test.go create mode 100644 main.go create mode 100644 protocol/memcache/common.go create mode 100644 protocol/memcache/parser.go create mode 100644 protocol/memcache/parser_test.go create mode 100644 server/common.go create mode 100644 server/common_test.go create mode 100644 server/memcache.go create mode 100644 server/memcache_test.go create mode 100644 tools/mcstat/mcstat.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..55645b2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# external packages folder +vendor/ + + +# bin +blobcached +tools/mcstat/mcstat + +# default cache path +cachedata/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..3d5d414 --- /dev/null +++ b/README.md @@ -0,0 +1,38 @@ +Blobcached +===== +Blobcached is a memcached protocol-compatible cache server for blob on SSD. + +### Supported commands +| Command | Format | +| ------ | ------ | +| get | get []+\r\n | +| set | set [noreply]\r\n\r\n | +| delete | delete [noreply]\r\n | +| touch | touch [noreply]\r\n | +| stats | stats\r\n | + +### How it works +#### concepts +| Name | | +| ------ | ------ | +| indexfile | an indexfile contains many of `items` powered by [blotdb](https://github.com/boltdb/bolt) | +| datafile | a regular file powered by [mmap](https://github.com/edsrzf/mmap-go) for storing values | +| item | an item is made up of `key`, `offset`, `term`, `size` anchoring the value in datafile | +| term | everytime the `datafile` is full, the `term` of `datafile` is increased | + +#### Command: Set +* get the `offset` and `term` of `datafile` +* write value to the `datafile` +* write `item` with the `offset`, `term` and `key` to the `indexfile` + +#### Command: Get +* get the `item` by `key` +* check `term` and `offset` of the `item` against `datafile` +* read value from the `datafile` + +#### Command: Touch +* implemented by `get` & `set` + +#### GC +* Blobcached scans and removes expired or invalid `items` in the `indexfile` +* by default, the rate up to 32k items/second diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..2d30548 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,214 @@ +package cache + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/pkg/errors" +) + +var ( + ErrNotFound = errors.New("key not found") + ErrValueSize = errors.New("value size exceeded") +) + +const ( + MaxShards = 128 + MaxValueSize = int64(128 << 20) // 128MB + MinShardSize = MaxValueSize + 4096 +) + +type CacheMetrics struct { + GetTotal int64 // number of get request + GetHits int64 // number of items that hit from data + GetMisses int64 // number of items that not found + GetExpired int64 // number of items that expired when get + SetTotal int64 // number of set request + DelTotal int64 // number of del request + Expired int64 // number of items that expired + Evicted int64 // number of items evicted + EvictedAge int64 // min age of the last evicted item +} + +func (m *CacheMetrics) Add(o CacheMetrics) { + m.GetTotal += o.GetTotal + m.GetHits += o.GetHits + m.GetMisses += o.GetMisses + m.GetExpired += o.GetExpired + m.SetTotal += o.SetTotal + m.DelTotal += o.DelTotal + m.Expired += o.Expired + m.Evicted += o.Evicted + // use min age + if m.EvictedAge <= 0 || (o.EvictedAge > 0 && o.EvictedAge < m.EvictedAge) { + m.EvictedAge = o.EvictedAge + } +} + +type CacheStats struct { + Keys uint64 // number of keys + Bytes uint64 // bytes of keys that used + LastUpdate int64 // stat time, the stat is async updated +} + +func (st *CacheStats) Add(o CacheStats) { + st.Keys += o.Keys + st.Bytes += o.Bytes + // use oldest time + if o.LastUpdate < st.LastUpdate { + st.LastUpdate = o.LastUpdate + } +} + +type Cache struct { + hash ConsistentHash + shards []*Shard + + options CacheOptions +} + +type Allocator interface { + Malloc(n int) []byte + Free([]byte) +} + +type CacheOptions struct { + ShardNum int + Size int64 + TTL int64 + Allocator Allocator +} + +var DefualtCacheOptions = CacheOptions{ + ShardNum: 16, + Size: 32 * MaxValueSize, // 32*128MB = 4GB + TTL: 0, + Allocator: NewAllocatorPool(), +} + +func NewCache(path string, options *CacheOptions) (*Cache, error) { + os.MkdirAll(path, 0700) + if options == nil { + options = &CacheOptions{} + *options = DefualtCacheOptions + } + if options.Allocator == nil { + options.Allocator = NewAllocatorPool() + } + if options.ShardNum > MaxShards { + options.ShardNum = MaxShards + } + if options.Size/int64(options.ShardNum) < MinShardSize { + options.ShardNum = int(options.Size / MinShardSize) + } + var err error + cache := Cache{options: *options} + cache.shards = make([]*Shard, options.ShardNum) + for i := 0; i < MaxShards; i++ { + fn := filepath.Join(path, fmt.Sprintf("shard.%03d", i)) + if i >= options.ShardNum { // rm unused files + os.Remove(fn + indexSubfix) + os.Remove(fn + dataSubfix) + continue + } + sopts := &ShardOptions{ + Size: options.Size / int64(options.ShardNum), + TTL: options.TTL, + Allocator: options.Allocator, + } + cache.shards[i], err = LoadCacheShard(fn, sopts) + if err != nil { + return nil, errors.Wrap(err, "load cache shard") + } + } + cache.hash = NewConsistentHashTable(len(cache.shards)) + return &cache, nil +} + +func (c *Cache) Close() error { + var err error + for _, s := range c.shards { + er := s.Close() + if er != nil { + err = er + } + } + return err +} + +func (c *Cache) getshard(key string) *Shard { + return c.shards[c.hash.Get(key)] +} + +type Item struct { + Key string + Value []byte + Timestamp int64 + TTL uint32 + Flags uint32 + + allocator Allocator +} + +func (i *Item) Free() { + if i.Value != nil && i.allocator != nil { + i.allocator.Free(i.Value) + i.allocator = nil + i.Value = nil + } +} + +func (c *Cache) Set(item Item) error { + if int64(len(item.Value)) > MaxValueSize { + return ErrValueSize + } + s := c.getshard(item.Key) + return s.Set(item) +} + +func (c *Cache) Get(key string) (Item, error) { + s := c.getshard(key) + return s.Get(key) +} + +func (c *Cache) Del(key string) error { + s := c.getshard(key) + return s.Del(key) +} + +func (c *Cache) GetMetrics() CacheMetrics { + var m CacheMetrics + for _, s := range c.GetMetricsByShards() { + m.Add(s) + } + return m +} + +func (c *Cache) GetStats() CacheStats { + var st CacheStats + for _, s := range c.GetStatsByShards() { + st.Add(s) + } + return st +} + +func (c *Cache) GetMetricsByShards() []CacheMetrics { + var ret = make([]CacheMetrics, len(c.shards), len(c.shards)) + for i, s := range c.shards { + ret[i] = s.GetMetrics() + } + return ret +} + +func (c *Cache) GetStatsByShards() []CacheStats { + var ret = make([]CacheStats, len(c.shards), len(c.shards)) + for i, s := range c.shards { + ret[i] = s.GetStats() + } + return ret +} + +func (c *Cache) GetOptions() CacheOptions { + return c.options +} diff --git a/cache/cache_test.go b/cache/cache_test.go new file mode 100644 index 0000000..b338a20 --- /dev/null +++ b/cache/cache_test.go @@ -0,0 +1,54 @@ +package cache + +import ( + "bytes" + "io/ioutil" + "math/rand" + "os" + "strconv" + "testing" +) + +func TestCache(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cache") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + c, err := NewCache(dir, nil) + if err != nil { + t.Fatal(err) + } + b := make([]byte, 100) + + n := 1000 + for i := 0; i < n; i++ { + key := strconv.FormatInt(rand.Int63(), 16) + rand.Read(b) + if err := c.Set(Item{Key: key, Value: b}); err != nil { + t.Fatal(err) + } + item, err := c.Get(key) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(item.Value, b) { + t.Fatal("set get not equal") + } + if err := c.Del(key); err != nil { + t.Fatal(err) + } + if _, err := c.Get(key); err != ErrNotFound { + t.Fatal("should not found") + } + } + m := c.GetMetrics() + + k := int64(n) + if m.GetTotal != 2*k || m.DelTotal != k || m.SetTotal != k || m.GetMisses != k || m.GetHits != k { + t.Fatal("metrics err", m) + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/cache/data.go b/cache/data.go new file mode 100644 index 0000000..da92105 --- /dev/null +++ b/cache/data.go @@ -0,0 +1,109 @@ +package cache + +import ( + "encoding/binary" + "os" + "sync" + + mmap "github.com/edsrzf/mmap-go" + "github.com/pkg/errors" +) + +var ( + ErrOutOfRange = errors.New("out of data range") + ErrHeader = errors.New("header err") + ErrShortRead = errors.New("short read") + + datamagic = uint64(20126241245322) + datahdrsize = int64(16) + maxsize = int64(1 << 30) // 1GB +) + +type dataheader struct { + magic uint64 + size int64 +} + +func (h *dataheader) Unmarshal(b []byte) error { + if len(b) < int(datahdrsize) { + return ErrOutOfRange + } + h.magic = binary.BigEndian.Uint64(b[:8]) + h.size = int64(binary.BigEndian.Uint64(b[8:16])) + return nil +} + +func (h *dataheader) MarshalTo(b []byte) { + binary.BigEndian.PutUint64(b[:8], h.magic) + binary.BigEndian.PutUint64(b[8:16], uint64(h.size)) +} + +type CacheData struct { + mu sync.RWMutex + data mmap.MMap +} + +func LoadCacheData(fn string, sz int64) (*CacheData, error) { + f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return nil, errors.Wrap(err, "os.OpenFile") + } + if err := f.Truncate(sz); err != nil { + return nil, errors.Wrap(err, "truncate data") + } + defer f.Close() + d, err := mmap.MapRegion(f, int(sz), mmap.RDWR, 0, 0) + if err != nil { + return nil, errors.Wrap(err, "mmap.MapRegion") + } + if len(d) != int(sz) { + panic("mmap bytes len err") + } + return &CacheData{data: d}, nil +} + +func (d *CacheData) Close() error { + d.mu.Lock() + defer d.mu.Unlock() + return d.data.Unmap() +} + +func (d *CacheData) Size() int64 { + return int64(len(d.data)) +} + +func (d *CacheData) Read(offset int64, b []byte) error { + d.mu.Lock() + defer d.mu.Unlock() + if int(offset) >= len(d.data) { + return ErrOutOfRange + } + var hdr dataheader + err := hdr.Unmarshal(d.data[offset:]) + if err != nil { + return err + } + if hdr.magic != datamagic || hdr.size != int64(len(b)) || hdr.size > MaxValueSize { + return ErrHeader + } + offset += datahdrsize + if int(offset+int64(hdr.size)) > len(d.data) { + return ErrOutOfRange + } + copy(b, d.data[offset:offset+hdr.size]) + return nil +} + +func (d *CacheData) Write(offset int64, b []byte) error { + d.mu.RLock() + defer d.mu.RUnlock() + sz := int64(len(b)) + voffset := offset + datahdrsize + if int(voffset+sz) > len(d.data) { + return ErrOutOfRange + } + hdr := dataheader{magic: datamagic, size: sz} + hdr.MarshalTo(d.data[offset : offset+datahdrsize]) + copy(d.data[voffset:voffset+sz], b) + return nil +} diff --git a/cache/data_test.go b/cache/data_test.go new file mode 100644 index 0000000..4029920 --- /dev/null +++ b/cache/data_test.go @@ -0,0 +1,89 @@ +package cache + +import ( + "bytes" + "crypto/rand" + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +func TestCacheData(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cachedata") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + fn := filepath.Join(dir, "data") + c, err := LoadCacheData(fn, 1024) + if err != nil { + t.Fatal(err) + } + if c.Size() != 1024 { + t.Fatal("size err") + } + b := make([]byte, 8) + rand.Read(b) + if err := c.Write(1000, b); err != nil { + t.Fatal(err) + } + + bb := make([]byte, 8) + if err := c.Read(1000, bb); err != nil { + t.Fatal(err) + } + if !bytes.Equal(bb, b) { + t.Fatal("bytes not equal") + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} + +func TestCacheDataReadErr(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cachedata") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + fn := filepath.Join(dir, "data") + c, err := LoadCacheData(fn, 1024) + if err != nil { + t.Fatal(err) + } + + bb := make([]byte, 8) + if err := c.Read(1024, bb); err != ErrOutOfRange { + t.Fatal("should out of range") + } + if err := c.Read(1023, bb); err != ErrOutOfRange { + t.Fatal("should out of range") + } + if err := c.Read(1000, bb); err != ErrHeader { + t.Fatal("should err header") + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} +func TestCacheDataWriteErr(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cachedata") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + fn := filepath.Join(dir, "data") + c, err := LoadCacheData(fn, 1024) + if err != nil { + t.Fatal(err) + } + b := make([]byte, 8) + rand.Read(b) + if err := c.Write(1001, b); err != ErrOutOfRange { + t.Fatal("should out of range") + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/cache/gen_proto.sh b/cache/gen_proto.sh new file mode 100755 index 0000000..3f1b818 --- /dev/null +++ b/cache/gen_proto.sh @@ -0,0 +1,3 @@ +#! /bin/bash +PATH_GOGOPROTOBUF=$GOPATH/src/github.com/gogo/protobuf +protoc --proto_path=$GOPATH/src/:$PATH_GOGOPROTOBUF/protobuf/:. --gogofaster_out=. *.proto diff --git a/cache/hash.go b/cache/hash.go new file mode 100644 index 0000000..afb08b1 --- /dev/null +++ b/cache/hash.go @@ -0,0 +1,73 @@ +package cache + +import ( + "crypto/md5" + "encoding/binary" + "fmt" + "hash" + "hash/fnv" + "sort" + "sync" +) + +const ( + vhashnodes = 1000 +) + +type consistentNode struct { + pos uint64 + v int +} + +type consistentNodes []consistentNode + +func (n consistentNodes) Len() int { return len(n) } +func (n consistentNodes) Swap(i, j int) { n[i], n[j] = n[j], n[i] } +func (n consistentNodes) Less(i, j int) bool { return n[i].pos < n[j].pos } + +// a simplify consistent hash impelment without weight +type ConsistentHash struct { + nodes consistentNodes +} + +var hashPool = sync.Pool{New: func() interface{} { + return fnv.New64a() +}} + +func dohash(b []byte) uint64 { + h := hashPool.Get().(hash.Hash64) + defer hashPool.Put(h) + h.Reset() + h.Write(b) + return h.Sum64() +} + +// NewConsistentHashTable creates a ConsistentHash with value[0, n) +func NewConsistentHashTable(n int) ConsistentHash { + var h ConsistentHash + h.nodes = make(consistentNodes, 0, n*vhashnodes) + for i := 0; i < n; i++ { + for j := 0; j < vhashnodes; j++ { + // we use md5 distribute vnodes + b := md5.Sum([]byte(fmt.Sprintf("hash-%d-%d", i, j))) + h.nodes = append(h.nodes, consistentNode{binary.BigEndian.Uint64(b[:]), i}) + } + } + sort.Sort(h.nodes) + return h +} + +func (c *ConsistentHash) search(k uint64) int { + i := sort.Search(len(c.nodes), func(i int) bool { + return c.nodes[i].pos >= k + }) + if i == len(c.nodes) { + return c.nodes[0].v + } + return c.nodes[i].v +} + +func (c *ConsistentHash) Get(key string) int { + h := dohash([]byte(key)) + return c.search(h) +} diff --git a/cache/hash_test.go b/cache/hash_test.go new file mode 100644 index 0000000..f1fa7a9 --- /dev/null +++ b/cache/hash_test.go @@ -0,0 +1,50 @@ +package cache + +import ( + "math/rand" + "strconv" + "testing" +) + +func TestConsistentHash(t *testing.T) { + n1 := 5 + n2 := 16 + c1 := NewConsistentHashTable(n1) + c2 := NewConsistentHashTable(n2) + + m1 := make(map[int]int) + total := 100000 + match := 0 + for i := 0; i < total; i++ { + s := strconv.FormatInt(rand.Int63(), 36) + strconv.FormatInt(rand.Int63(), 36) + h1 := c1.Get(s) + h2 := c2.Get(s) + if h1 == h2 { + match += 1 + } + m1[h1] += 1 + } + + for i := 0; i < n1; i++ { + diff := 0 + for j := 0; j < n1; j++ { + diff += m1[i] - m1[j] + } + if diff > int(float32(total/n1)*0.1) { + t.Fatal("total", total, "slot", i, "num", m1[i], "diff err") + } + } + + matchRate := float32(match) / float32(total) + expectRate := float32(n1) / float32(n2) + if matchRate < expectRate*0.9 || matchRate > expectRate*1.1 { + t.Fatal("match rate err", matchRate, "expect", expectRate) + } +} + +func BenchmarkDoHash(b *testing.B) { + for i := 0; i < b.N; i++ { + s := strconv.FormatInt(rand.Int63(), 36) + strconv.FormatInt(rand.Int63(), 36) + dohash([]byte(s)) + } +} diff --git a/cache/index.go b/cache/index.go new file mode 100644 index 0000000..9ae6cdd --- /dev/null +++ b/cache/index.go @@ -0,0 +1,228 @@ +package cache + +import ( + "sync" + "time" + + "github.com/boltdb/bolt" + "github.com/pkg/errors" +) + +type CacheIndex struct { + db *bolt.DB + + mu sync.RWMutex + meta IndexMeta +} + +var ( + indexMetaBucket = []byte("meta") + indexDataBucket = []byte("data") + indexMetaKey = []byte("indexmeta") +) + +func LoadCacheIndex(fn string, datasize int64) (*CacheIndex, error) { + var err error + var index CacheIndex + index.db, err = bolt.Open(fn, 0600, &bolt.Options{Timeout: 10 * time.Second}) + if err != nil { + return nil, errors.Wrap(err, "bolt.Open") + } + index.db.NoSync = true // for improve performance + err = index.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(indexMetaBucket) + if err != nil { + return err + } + v := bucket.Get(indexMetaKey) + if v == nil { + return nil + } + return index.meta.Unmarshal(v) + }) + if err != nil { + return nil, errors.Wrap(err, "bolt.Update") + } + meta := &index.meta + if meta.DataSize != datasize { + meta.DataSize = datasize + } + if meta.Head > datasize { + meta.Head = 0 + meta.Term += 1 // datasize changed, move to next term + } + return &index, nil +} + +func (i *CacheIndex) Close() error { + i.mu.Lock() + defer i.mu.Unlock() + return i.db.Close() +} + +func (i *CacheIndex) Get(key string) (*IndexItem, error) { + var item IndexItem + err := i.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(indexDataBucket) + if bucket == nil { + return ErrNotFound + } + v := bucket.Get([]byte(key)) + if v == nil { + return ErrNotFound + } + return item.Unmarshal(v) + }) + if err != nil { + return nil, err + } + meta := i.GetIndexMeta() + if meta.IsValidate(item) { + return &item, nil + } + return nil, ErrNotFound +} + +func (i *CacheIndex) Reserve(size int32) (*IndexItem, error) { + i.mu.Lock() + defer i.mu.Unlock() + meta := &i.meta + if int64(size) > meta.DataSize { + return nil, errors.New("not enough space") + } + if meta.Head+datahdrsize+int64(size) > meta.DataSize { + meta.Head = 0 + meta.Term += 1 + } + idx := &IndexItem{Term: meta.Term, Offset: meta.Head, ValueSize: size, Timestamp: time.Now().Unix()} + meta.Head += datahdrsize + int64(size) + + err := i.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(indexMetaBucket) + if err != nil { + return err + } + b, err := i.meta.Marshal() + if err != nil { + return err + } + return bucket.Put(indexMetaKey, b) + }) + if err != nil { + return nil, err + } + return idx, nil +} + +func (i *CacheIndex) Set(key string, item *IndexItem) error { + return i.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(indexDataBucket) + if err != nil { + return err + } + b, _ := item.Marshal() + return bucket.Put([]byte(key), b) + }) +} + +func (i *CacheIndex) Del(key string) error { + return i.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(indexDataBucket) + if err != nil { + return err + } + return bucket.Delete([]byte(key)) + }) +} + +func (i *CacheIndex) Dels(keys []string) error { + if len(keys) == 0 { + return nil + } + return i.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(indexDataBucket) + if err != nil { + return err + } + for _, key := range keys { + if err := bucket.Delete([]byte(key)); err != nil { + return err + } + } + return nil + }) +} + +func (i *CacheIndex) GetIndexMeta() IndexMeta { + i.mu.RLock() + defer i.mu.RUnlock() + return i.meta +} + +func (i *CacheIndex) Iter(lastkey string, maxIter int, f func(key string, item IndexItem) error) error { + var item IndexItem + err := i.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(indexDataBucket) + if bucket == nil { + return nil + } + c := bucket.Cursor() + + var k, v []byte + + if lastkey == "" { + k, v = c.First() + } else { + k, v = c.Seek([]byte(lastkey)) + if string(k) == lastkey { + k, v = c.Next() + } + } + for k != nil && maxIter > 0 { + maxIter -= 1 + if err := item.Unmarshal(v); err != nil { + return err + } + if err := f(string(k), item); err != nil { + return err + } + k, v = c.Next() + } + return nil + }) + return err +} + +func (i *CacheIndex) GetKeys() (n uint64, err error) { + err = i.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(indexDataBucket) + if bucket != nil { + st := bucket.Stats() + n = uint64(st.KeyN) + } + return nil + }) + return +} + +// extends index.pb.go >>>>>>>>>>>>>>>>>>>>>> + +// IsValidate return if the item is validate +func (m IndexMeta) IsValidate(i IndexItem) bool { + if i.Term > m.Term { + return false + } + if i.Term == m.Term { + return i.Offset < m.Head && i.Offset+int64(i.ValueSize) <= m.DataSize + } + // i.Term < m.Term + if i.Term+1 != m.Term { + return false + } + return m.Head <= i.Offset && i.Offset+int64(i.ValueSize) <= m.DataSize +} + +// TotalSize returns bytes used including index & data of the item +func (i IndexItem) TotalSize() int64 { + return int64(i.Size()) + datahdrsize + int64(i.ValueSize) +} diff --git a/cache/index.pb.go b/cache/index.pb.go new file mode 100644 index 0000000..0603bc8 --- /dev/null +++ b/cache/index.pb.go @@ -0,0 +1,622 @@ +// Code generated by protoc-gen-gogo. +// source: index.proto +// DO NOT EDIT! + +/* + Package cache is a generated protocol buffer package. + + It is generated from these files: + index.proto + + It has these top-level messages: + IndexMeta + IndexItem +*/ +package cache + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" +import _ "github.com/gogo/protobuf/gogoproto" + +import io "io" +import github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type IndexMeta struct { + Term int64 `protobuf:"varint,1,req,name=Term" json:"Term"` + Head int64 `protobuf:"varint,2,req,name=Head" json:"Head"` + DataSize int64 `protobuf:"varint,3,req,name=DataSize" json:"DataSize"` +} + +func (m *IndexMeta) Reset() { *m = IndexMeta{} } +func (m *IndexMeta) String() string { return proto.CompactTextString(m) } +func (*IndexMeta) ProtoMessage() {} +func (*IndexMeta) Descriptor() ([]byte, []int) { return fileDescriptorIndex, []int{0} } + +type IndexItem struct { + Term int64 `protobuf:"varint,1,req,name=Term" json:"Term"` + Offset int64 `protobuf:"varint,2,req,name=Offset" json:"Offset"` + ValueSize int32 `protobuf:"varint,3,req,name=ValueSize" json:"ValueSize"` + Timestamp int64 `protobuf:"varint,4,req,name=Timestamp" json:"Timestamp"` + TTL uint32 `protobuf:"varint,5,req,name=TTL" json:"TTL"` + Flags uint32 `protobuf:"varint,6,req,name=Flags" json:"Flags"` +} + +func (m *IndexItem) Reset() { *m = IndexItem{} } +func (m *IndexItem) String() string { return proto.CompactTextString(m) } +func (*IndexItem) ProtoMessage() {} +func (*IndexItem) Descriptor() ([]byte, []int) { return fileDescriptorIndex, []int{1} } + +func init() { + proto.RegisterType((*IndexMeta)(nil), "cache.IndexMeta") + proto.RegisterType((*IndexItem)(nil), "cache.IndexItem") +} +func (m *IndexMeta) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *IndexMeta) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintIndex(data, i, uint64(m.Term)) + data[i] = 0x10 + i++ + i = encodeVarintIndex(data, i, uint64(m.Head)) + data[i] = 0x18 + i++ + i = encodeVarintIndex(data, i, uint64(m.DataSize)) + return i, nil +} + +func (m *IndexItem) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *IndexItem) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintIndex(data, i, uint64(m.Term)) + data[i] = 0x10 + i++ + i = encodeVarintIndex(data, i, uint64(m.Offset)) + data[i] = 0x18 + i++ + i = encodeVarintIndex(data, i, uint64(m.ValueSize)) + data[i] = 0x20 + i++ + i = encodeVarintIndex(data, i, uint64(m.Timestamp)) + data[i] = 0x28 + i++ + i = encodeVarintIndex(data, i, uint64(m.TTL)) + data[i] = 0x30 + i++ + i = encodeVarintIndex(data, i, uint64(m.Flags)) + return i, nil +} + +func encodeFixed64Index(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Index(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintIndex(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *IndexMeta) Size() (n int) { + var l int + _ = l + n += 1 + sovIndex(uint64(m.Term)) + n += 1 + sovIndex(uint64(m.Head)) + n += 1 + sovIndex(uint64(m.DataSize)) + return n +} + +func (m *IndexItem) Size() (n int) { + var l int + _ = l + n += 1 + sovIndex(uint64(m.Term)) + n += 1 + sovIndex(uint64(m.Offset)) + n += 1 + sovIndex(uint64(m.ValueSize)) + n += 1 + sovIndex(uint64(m.Timestamp)) + n += 1 + sovIndex(uint64(m.TTL)) + n += 1 + sovIndex(uint64(m.Flags)) + return n +} + +func sovIndex(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozIndex(x uint64) (n int) { + return sovIndex(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *IndexMeta) Unmarshal(data []byte) error { + var hasFields [1]uint64 + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: IndexMeta: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: IndexMeta: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType) + } + m.Term = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Term |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000001) + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Head", wireType) + } + m.Head = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Head |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000002) + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DataSize", wireType) + } + m.DataSize = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.DataSize |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000004) + default: + iNdEx = preIndex + skippy, err := skipIndex(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthIndex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + if hasFields[0]&uint64(0x00000001) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("Term") + } + if hasFields[0]&uint64(0x00000002) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("Head") + } + if hasFields[0]&uint64(0x00000004) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("DataSize") + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *IndexItem) Unmarshal(data []byte) error { + var hasFields [1]uint64 + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: IndexItem: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: IndexItem: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType) + } + m.Term = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Term |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000001) + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) + } + m.Offset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Offset |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000002) + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ValueSize", wireType) + } + m.ValueSize = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.ValueSize |= (int32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000004) + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + m.Timestamp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Timestamp |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000008) + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType) + } + m.TTL = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.TTL |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000010) + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Flags", wireType) + } + m.Flags = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIndex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Flags |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + hasFields[0] |= uint64(0x00000020) + default: + iNdEx = preIndex + skippy, err := skipIndex(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthIndex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + if hasFields[0]&uint64(0x00000001) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("Term") + } + if hasFields[0]&uint64(0x00000002) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("Offset") + } + if hasFields[0]&uint64(0x00000004) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("ValueSize") + } + if hasFields[0]&uint64(0x00000008) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("Timestamp") + } + if hasFields[0]&uint64(0x00000010) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("TTL") + } + if hasFields[0]&uint64(0x00000020) == 0 { + return github_com_gogo_protobuf_proto.NewRequiredNotSetError("Flags") + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipIndex(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowIndex + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowIndex + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowIndex + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthIndex + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowIndex + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipIndex(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthIndex = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowIndex = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("index.proto", fileDescriptorIndex) } + +var fileDescriptorIndex = []byte{ + // 228 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0xcc, 0x4b, 0x49, + 0xad, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x4d, 0x4e, 0x4c, 0xce, 0x48, 0x95, 0xd2, + 0x4d, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xcf, 0x4f, 0xcf, 0xd7, + 0x07, 0xcb, 0x26, 0x95, 0xa6, 0x81, 0x79, 0x60, 0x0e, 0x98, 0x05, 0xd1, 0xa5, 0xe4, 0xcd, 0xc5, + 0xe9, 0x09, 0x32, 0xc4, 0x37, 0xb5, 0x24, 0x51, 0x48, 0x88, 0x8b, 0x25, 0x24, 0xb5, 0x28, 0x57, + 0x82, 0x51, 0x81, 0x49, 0x83, 0xd9, 0x89, 0xe5, 0xc4, 0x3d, 0x79, 0x06, 0x90, 0x98, 0x47, 0x6a, + 0x62, 0x8a, 0x04, 0x13, 0x92, 0x98, 0x18, 0x17, 0x87, 0x4b, 0x62, 0x49, 0x62, 0x70, 0x66, 0x55, + 0xaa, 0x04, 0x33, 0x42, 0x5c, 0xa9, 0x9f, 0x11, 0x6a, 0x9a, 0x67, 0x49, 0x6a, 0x2e, 0x56, 0xd3, + 0x44, 0xb8, 0xd8, 0xfc, 0xd3, 0xd2, 0x8a, 0x53, 0x4b, 0x50, 0xcc, 0x13, 0xe7, 0xe2, 0x0c, 0x4b, + 0xcc, 0x29, 0x4d, 0x85, 0x1b, 0xc8, 0x8a, 0x90, 0x08, 0xc9, 0xcc, 0x4d, 0x2d, 0x2e, 0x49, 0xcc, + 0x2d, 0x90, 0x60, 0x41, 0xd2, 0x21, 0xc8, 0xc5, 0x1c, 0x12, 0xe2, 0x23, 0xc1, 0xaa, 0xc0, 0xa4, + 0xc1, 0x0b, 0x15, 0x12, 0xe6, 0x62, 0x75, 0xcb, 0x49, 0x4c, 0x2f, 0x96, 0x60, 0x43, 0x08, 0x3a, + 0x89, 0x9c, 0x78, 0x28, 0xc7, 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, + 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x00, 0x08, 0x00, 0x00, 0xff, 0xff, 0x63, 0x2a, 0xe4, 0x44, + 0x38, 0x01, 0x00, 0x00, +} diff --git a/cache/index.proto b/cache/index.proto new file mode 100644 index 0000000..e0aa389 --- /dev/null +++ b/cache/index.proto @@ -0,0 +1,21 @@ +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +package cache; + +option (gogoproto.goproto_getters_all) = false; + +message IndexMeta { + required int64 Term = 1 [(gogoproto.nullable) = false]; + required int64 Head = 2 [(gogoproto.nullable) = false]; + required int64 DataSize = 3 [(gogoproto.nullable) = false]; +} + + +message IndexItem { + required int64 Term = 1 [(gogoproto.nullable) = false]; + required int64 Offset = 2 [(gogoproto.nullable) = false]; + required int32 ValueSize = 3 [(gogoproto.nullable) = false]; + required int64 Timestamp = 4 [(gogoproto.nullable) = false]; + required uint32 TTL = 5 [(gogoproto.nullable) = false]; + required uint32 Flags = 6 [(gogoproto.nullable) = false]; +} diff --git a/cache/index_test.go b/cache/index_test.go new file mode 100644 index 0000000..ba8ebca --- /dev/null +++ b/cache/index_test.go @@ -0,0 +1,199 @@ +package cache + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +func TestCacheIndexReserve(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cacheindex") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + fn := filepath.Join(dir, "index") + c, err := LoadCacheIndex(fn, 1024) + if err != nil { + t.Fatal(err) + } + item, err := c.Reserve(400) + if err != nil { + t.Fatal(err) + } + if item.Offset != 0 || item.Term != 0 { + t.Fatal("item err", item) + } + item1, err := c.Reserve(400) + if item1.Offset != item.Offset+400+datahdrsize { + t.Fatal("item err", item1) + } + item2, err := c.Reserve(400) + if item2.Offset != 0 || item2.Term != 1 { + t.Fatal("item err", item2) + } + meta := c.GetIndexMeta() + if meta.Term != 1 || meta.Head != 400+datahdrsize { + t.Fatal("meta err", meta) + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} + +func TestCacheIndexGetSet(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cacheindex") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + fn := filepath.Join(dir, "index") + c, err := LoadCacheIndex(fn, 1024) + if err != nil { + t.Fatal(err) + } + var item1, item2 *IndexItem + item1, _ = c.Reserve(400) + if err := c.Set("k1", item1); err != nil { + t.Fatal(err) + } + item2, _ = c.Reserve(400) + if err := c.Set("k2", item2); err != nil { + t.Fatal(err) + } + t1, err := c.Get("k1") + if err != nil { + t.Fatal(err) + } + t2, err := c.Get("k2") + if err != nil { + t.Fatal(err) + } + if *t1 != *item1 && *t2 != *item2 { + t.Fatal("get err", *t1, *t2) + } + c.Reserve(400) // overwrite item1 + _, err = c.Get("k1") + if err != ErrNotFound { + t.Fatal("should not found") + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} + +func TestCacheIndexSizeChanged(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cacheindex") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + fn := filepath.Join(dir, "index") + c, err := LoadCacheIndex(fn, 1024) + if err != nil { + t.Fatal(err) + } + var item *IndexItem + item, _ = c.Reserve(300) + c.Set("k1", item) + item, _ = c.Reserve(300) + c.Set("k2", item) + item, _ = c.Reserve(300) + c.Set("k3", item) + if err := c.Close(); err != nil { + t.Fatal(err) + } + c, err = LoadCacheIndex(fn, 500) + if err != nil { + t.Fatal(err) + } + _, err = c.Get("k1") + if err != nil { + t.Fatal(err) + } + _, err = c.Get("k2") + if err != ErrNotFound { + t.Fatal("should not found") + } + _, err = c.Get("k3") + if err != ErrNotFound { + t.Fatal("should not found") + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} + +func TestCacheIndexDel(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cacheindex") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + fn := filepath.Join(dir, "index") + c, err := LoadCacheIndex(fn, 1024) + if err != nil { + t.Fatal(err) + } + var item *IndexItem + item, _ = c.Reserve(300) + c.Set("k1", item) + item, _ = c.Reserve(300) + c.Set("k2", item) + item, _ = c.Reserve(300) + c.Set("k3", item) + if err := c.Del("k1"); err != nil { + t.Fatal(err) + } + if err := c.Dels([]string{"k2", "k3"}); err != nil { + t.Fatal(err) + } + for _, k := range []string{"k1", "k2", "k3"} { + if _, err := c.Get(k); err != ErrNotFound { + t.Fatal(k, "should not found") + } + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} + +func TestCacheIndexIter(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cacheindex") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + fn := filepath.Join(dir, "index") + c, err := LoadCacheIndex(fn, 1024) + if err != nil { + t.Fatal(err) + } + item1, _ := c.Reserve(300) + c.Set("k1", item1) + item2, _ := c.Reserve(300) + c.Set("k2", item2) + + iterkeys := 0 + + err = c.Iter("", 100, func(key string, item IndexItem) error { + iterkeys += 1 + if key == "k1" && item != *item1 { + t.Fatal(key, "item err", item) + } + if key == "k2" && item != *item2 { + t.Fatal(key, "item err", item) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + if iterkeys != 2 { + t.Fatal("iter keys", iterkeys) + } + if err := c.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/cache/pool.go b/cache/pool.go new file mode 100644 index 0000000..53bef6c --- /dev/null +++ b/cache/pool.go @@ -0,0 +1,74 @@ +package cache + +import ( + "sync" + "sync/atomic" +) + +var bytessize = []int{ + 4 << 10, 8 << 10, 16 << 10, 32 << 10, 64 << 10, 128 << 10, 256 << 10, 512 << 10, + 1 << 20, 2 << 20, 4 << 20, 8 << 20, 16 << 20, 32 << 20, 64 << 20, + int(MaxValueSize), +} + +type AllocatorPoolMetrics struct { + Malloc int64 + Free int64 + New int64 + + ErrMalloc int64 + ErrFree int64 +} + +type AllocatorPool struct { + pools []*sync.Pool + metrics AllocatorPoolMetrics +} + +func NewAllocatorPool() Allocator { + var p AllocatorPool + makeBytesPool := func(n int) *sync.Pool { + return &sync.Pool{New: func() interface{} { + atomic.AddInt64(&p.metrics.New, 1) + return make([]byte, n, n) + }} + } + p.pools = make([]*sync.Pool, len(bytessize)) + for i, n := range bytessize { + p.pools[i] = makeBytesPool(n) + } + return &p +} + +func (p *AllocatorPool) Malloc(n int) []byte { + atomic.AddInt64(&p.metrics.Malloc, 1) + for i, v := range bytessize { + if v >= n { + b := p.pools[i].Get().([]byte) + return b[:n] + } + } + atomic.AddInt64(&p.metrics.ErrMalloc, 1) + return make([]byte, n, n) +} + +func (p *AllocatorPool) Free(b []byte) { + atomic.AddInt64(&p.metrics.Free, 1) + for i, n := range bytessize { + if n == cap(b) { + p.pools[i].Put(b) + return + } + } + atomic.AddInt64(&p.metrics.ErrFree, 1) +} + +func (p *AllocatorPool) GetMetrics() AllocatorPoolMetrics { + var ret AllocatorPoolMetrics + ret.Malloc = atomic.LoadInt64(&p.metrics.Malloc) + ret.Free = atomic.LoadInt64(&p.metrics.Free) + ret.New = atomic.LoadInt64(&p.metrics.New) + ret.ErrMalloc = atomic.LoadInt64(&p.metrics.ErrMalloc) + ret.ErrFree = atomic.LoadInt64(&p.metrics.ErrFree) + return ret +} diff --git a/cache/pool_test.go b/cache/pool_test.go new file mode 100644 index 0000000..a35128e --- /dev/null +++ b/cache/pool_test.go @@ -0,0 +1,34 @@ +package cache + +import "testing" + +func TestAllocatorPool(t *testing.T) { + allocator := NewAllocatorPool() + b := allocator.Malloc(4000) + if len(b) != 4000 || cap(b) != 4096 { + t.Fatalf("malloc err: len:%d cap:%d", len(b), cap(b)) + } + allocator.Free(b) + b = allocator.Malloc(100 << 10) + if len(b) != 100<<10 || cap(b) != 128<<10 { + t.Fatalf("malloc err: len:%d cap:%d", len(b), cap(b)) + } + allocator.Free(b) + + for i := 0; i < 1000; i++ { + b := allocator.Malloc(8 << 10) + allocator.Free(b) + } + + metrics := allocator.(*AllocatorPool).GetMetrics() + if metrics.Malloc != 1002 || metrics.Free != 1002 || metrics.New != 3 { + t.Fatal("metrics err", metrics) + } + + allocator.Malloc(int(MaxValueSize + 1)) + allocator.Free(make([]byte, 1)) + metrics = allocator.(*AllocatorPool).GetMetrics() + if metrics.ErrMalloc != 1 || metrics.ErrFree != 1 { + t.Fatal("metrics err", metrics) + } +} diff --git a/cache/shard.go b/cache/shard.go new file mode 100644 index 0000000..69ab68d --- /dev/null +++ b/cache/shard.go @@ -0,0 +1,286 @@ +package cache + +import ( + "log" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" +) + +const ( + indexSubfix = ".idx" + dataSubfix = ".dat" +) + +type Shard struct { + mu sync.RWMutex + index *CacheIndex + data *CacheData + + options ShardOptions + + stats CacheStats + metrics CacheMetrics + exit chan struct{} +} + +type ShardOptions struct { + Size int64 + TTL int64 + Allocator Allocator +} + +var DefualtShardOptions = ShardOptions{ + Size: MinShardSize, + TTL: 0, + Allocator: NewAllocatorPool(), +} + +func LoadCacheShard(fn string, options *ShardOptions) (*Shard, error) { + if options == nil { + options = &ShardOptions{} + *options = DefualtShardOptions + } + if options.Size <= 0 { + options.Size = MinShardSize + } + if options.Allocator == nil { + options.Allocator = NewAllocatorPool() + } + + var err error + s := Shard{options: *options} + s.index, err = LoadCacheIndex(fn+indexSubfix, options.Size) + if err != nil { + return nil, errors.Wrap(err, "LoadIndex") + } + s.data, err = LoadCacheData(fn+dataSubfix, options.Size) + if err != nil { + return nil, errors.Wrap(err, "LoadData") + } + + s.stats.Keys, err = s.index.GetKeys() + if err != nil { + return nil, err + } + var st gcstat // sample 10000 keys for stats.Bytes + if err := s.scanKeysForGC(10000, &st); err != nil { + return nil, err + } + s.stats.Bytes = uint64(float64(s.stats.Keys) * float64(st.ActiveBytes) / float64(st.Active)) + s.stats.LastUpdate = time.Now().Unix() + + s.exit = make(chan struct{}) + go s.GCLoop() + return &s, nil +} + +func (s *Shard) Close() error { + close(s.exit) + err1 := s.index.Close() + err2 := s.data.Close() + if err1 != nil { + return errors.Wrap(err1, "close index") + } + if err2 != nil { + return errors.Wrap(err1, "close data") + } + return nil +} + +func (s *Shard) GetMetrics() CacheMetrics { + var m CacheMetrics + m.GetTotal = atomic.LoadInt64(&s.metrics.GetTotal) + m.GetHits = atomic.LoadInt64(&s.metrics.GetHits) + m.GetMisses = atomic.LoadInt64(&s.metrics.GetMisses) + m.GetExpired = atomic.LoadInt64(&s.metrics.GetExpired) + m.SetTotal = atomic.LoadInt64(&s.metrics.SetTotal) + m.DelTotal = atomic.LoadInt64(&s.metrics.DelTotal) + m.Expired = atomic.LoadInt64(&s.metrics.Expired) + m.Evicted = atomic.LoadInt64(&s.metrics.Evicted) + m.EvictedAge = atomic.LoadInt64(&s.metrics.EvictedAge) + return m +} + +func (s *Shard) GetStats() CacheStats { + var st CacheStats + st.Keys = atomic.LoadUint64(&s.stats.Keys) + st.Bytes = atomic.LoadUint64(&s.stats.Bytes) + st.LastUpdate = atomic.LoadInt64(&s.stats.LastUpdate) + return st +} + +func (s *Shard) Set(ci Item) error { + atomic.AddInt64(&s.metrics.SetTotal, 1) + s.mu.Lock() + defer s.mu.Unlock() + ii, err := s.index.Reserve(int32(len(ci.Value))) + if err != nil { + return errors.Wrap(err, "reserve index") + } + ii.TTL = ci.TTL + ii.Flags = ci.Flags + if err := s.data.Write(ii.Offset, ci.Value); err != nil { + return errors.Wrap(err, "write data") + } + if err := s.index.Set(ci.Key, ii); err != nil { + return errors.Wrap(err, "update index") + } + return nil +} + +func (s *Shard) Get(key string) (Item, error) { + var ci Item + atomic.AddInt64(&s.metrics.GetTotal, 1) + s.mu.RLock() + defer s.mu.RUnlock() + ii, err := s.index.Get(key) + if err != nil { + if err == ErrNotFound { + atomic.AddInt64(&s.metrics.GetMisses, 1) + } + return ci, err + } + + age := time.Now().Unix() - ii.Timestamp + if (s.options.TTL > 0 && age >= s.options.TTL) || (ii.TTL > 0 && age > int64(ii.TTL)) { + atomic.AddInt64(&s.metrics.GetMisses, 1) + atomic.AddInt64(&s.metrics.GetExpired, 1) + atomic.AddInt64(&s.metrics.Expired, 1) + s.index.Del(key) + return ci, ErrNotFound + } + + ci.Key = key + ci.Timestamp = ii.Timestamp + ci.TTL = ii.TTL + ci.Flags = ii.Flags + + allocator := s.options.Allocator + ci.Value = allocator.Malloc(int(ii.ValueSize)) + ci.allocator = allocator + + err = s.data.Read(ii.Offset, ci.Value) + if err == ErrOutOfRange { + err = ErrNotFound // data size changed? + } + if err != nil { + if err == ErrNotFound { + atomic.AddInt64(&s.metrics.GetMisses, 1) + } + s.options.Allocator.Free(ci.Value) + ci.Value = nil + return ci, err + } + atomic.AddInt64(&s.metrics.GetHits, 1) + return ci, nil +} + +func (s *Shard) Del(key string) error { + atomic.AddInt64(&s.metrics.DelTotal, 1) + s.mu.RLock() + defer s.mu.RUnlock() + return s.index.Del(key) +} + +type gcstat struct { + Scanned uint64 + Purged uint64 + Active uint64 + ActiveBytes uint64 + + LastKey string + LastFinish time.Time +} + +func timeSub(t1, t2 time.Time, round time.Duration) time.Duration { + t1 = t1.Round(round) + t2 = t2.Round(round) + return t1.Sub(t2) +} + +func (s *Shard) GCLoop() { + const scanItemsPerRound = 100 + const sleepTimePerRound = 50 * time.Millisecond // ~ scan 2k items per second + + var st gcstat + + st.LastFinish = time.Now() + + for { + select { + case <-s.exit: + return + case <-time.After(sleepTimePerRound): + } + n := st.Scanned + err := s.scanKeysForGC(scanItemsPerRound, &st) + if err != nil { + log.Printf("Iter keys err: %s", err) + continue + } + newScanned := st.Scanned - n + if newScanned >= scanItemsPerRound { + continue // scanning + } + // newScanned < scanItemsPerRound, end of index + now := time.Now() + cost := timeSub(now, st.LastFinish, time.Millisecond) + log.Printf("shard[%p] gc scanned:%d purged:%d keys cost %v", + s, st.Scanned, st.Purged, cost) + + // save stats to CacheStats + atomic.StoreUint64(&s.stats.Keys, st.Active) + atomic.StoreUint64(&s.stats.Bytes, st.ActiveBytes) + atomic.StoreInt64(&s.stats.LastUpdate, now.Unix()) + + st.Scanned = 0 + st.Purged = 0 + st.Active = 0 + st.ActiveBytes = 0 + st.LastKey = "" + + if cost < time.Minute { // rate limit + select { + case <-s.exit: + return + case <-time.After(time.Minute - cost): + } + } + st.LastFinish = time.Now() + } +} + +func (s *Shard) scanKeysForGC(maxIter int, st *gcstat) error { + now := time.Now().Unix() + meta := s.index.GetIndexMeta() + var pendingDeletes []string + err := s.index.Iter(st.LastKey, maxIter, func(key string, ii IndexItem) error { + st.Scanned += 1 + st.LastKey = key + age := time.Now().Unix() - ii.Timestamp + if (s.options.TTL > 0 && age >= s.options.TTL) || (ii.TTL > 0 && age > int64(ii.TTL)) { + st.Purged += 1 + atomic.AddInt64(&s.metrics.Expired, 1) + pendingDeletes = append(pendingDeletes, key) + return nil + } + if !meta.IsValidate(ii) { // the item may overwritten by other keys + st.Purged += 1 + atomic.AddInt64(&s.metrics.Evicted, 1) + atomic.StoreInt64(&s.metrics.EvictedAge, now-ii.Timestamp) + pendingDeletes = append(pendingDeletes, key) + return nil + } + st.Active += 1 + st.ActiveBytes += uint64(int64(len(key)) + ii.TotalSize()) + return nil + }) + err2 := s.index.Dels(pendingDeletes) + if err == nil { + err = err2 + } + return err +} diff --git a/cache/shard_test.go b/cache/shard_test.go new file mode 100644 index 0000000..d07b81c --- /dev/null +++ b/cache/shard_test.go @@ -0,0 +1,88 @@ +package cache + +import ( + "bytes" + "crypto/rand" + "io/ioutil" + "os" + "testing" + "time" +) + +func TestCacheMetrics(t *testing.T) { + m1 := CacheMetrics{} + m2 := CacheMetrics{1, 1, 1, 1, 1, 1, 1, 1, 1} + m1.Add(m2) + if m1 != m2 { + t.Fatal("not equal", m1, m2) + } +} + +func TestShardSetGet(t *testing.T) { + dir, err := ioutil.TempDir("", "test_cachedata") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + s, err := LoadCacheShard(dir, &ShardOptions{Size: 1024, TTL: 1}) + if err != nil { + t.Fatal(err) + } + b := make([]byte, 300) + rand.Read(b) + + _, err = s.Get("k1") + if err != ErrNotFound { + t.Fatal("should not found") + } + if err := s.Set(Item{Key: "k1", Value: b}); err != nil { + t.Fatal(err) + } + ci, err := s.Get("k1") + if err != nil { + t.Fatal(err) + } + if ci.Key != "k1" { + t.Fatal("key not equal") + } + if !bytes.Equal(ci.Value, b) { + t.Fatal("set get not equal") + } + + var st gcstat + + if testing.Short() { + goto EndOfTest + } + + s.Set(Item{Key: "k2", Value: b}) + s.Set(Item{Key: "k3", Value: b}) + s.Set(Item{Key: "k4", Value: b}) + + st.LastKey = "" + s.scanKeysForGC(100, &st) + + time.Sleep(1020 * time.Millisecond) + + st.LastKey = "" + s.scanKeysForGC(100, &st) + + for _, key := range []string{"k1", "k2", "k3", "k4"} { + if _, err := s.Get(key); err != ErrNotFound { + t.Fatal("should not found") + } + } + { + m1 := s.GetMetrics() + m2 := CacheMetrics{6, 1, 5, 0, 4, 0, 3, 1, 0} + if m1 != m2 { + t.Logf("\nget %+v\nexpect %+v", m1, m2) + t.Fatal("metrics err") + } + } + +EndOfTest: + if err := s.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..112ca74 --- /dev/null +++ b/main.go @@ -0,0 +1,94 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net" + + "github.com/xiaost/blobcached/cache" + "github.com/xiaost/blobcached/server" +) + +const VERSION = "0.1.0" + +func main() { + var ( + bindAddr string + cachePath string + cacheSize int64 + cacheShards int64 + cacheTTL int64 + + printVersion bool + ) + + flag.StringVar(&bindAddr, + "addr", ":11211", + "the addr that blobcached listen on.") + + flag.StringVar(&cachePath, + "path", "cachedata", + "the cache path used by blobcached to store items.") + + flag.Int64Var(&cacheSize, + "size", int64(cache.DefualtCacheOptions.Size), + "cache file size used by blobcached to store items. ") + + flag.Int64Var(&cacheShards, + "shards", int64(cache.DefualtCacheOptions.ShardNum), + "cache shards for performance purpose. max shards is 128.") + + flag.Int64Var(&cacheTTL, + "ttl", 0, + "the global ttl of cache items.") + + flag.BoolVar(&printVersion, "v", false, + "print the version and exit") + + flag.Parse() + + if printVersion { + fmt.Println("Blobcached %s", VERSION) + } + + l, err := net.Listen("tcp", bindAddr) + if err != nil { + log.Fatal(err) + } + + // cacheSize: limit to [2*MaxValueSize, +) + if cacheSize <= 2*cache.MaxValueSize { + cacheSize = 2 * cache.MaxValueSize + log.Printf("warn: cache size invaild. set to %d", cacheSize) + } + + // cacheShards: limit to (0, cache.MaxShards] and per cache shard size >= cache.MaxValueSize + if (cacheShards <= 0 || cacheShards > cache.MaxShards) || + (cacheSize/cacheShards < cache.MaxValueSize) { + cacheShards = int64(cache.DefualtCacheOptions.ShardNum) + if cacheSize/cacheShards < cache.MaxValueSize { + cacheShards = cacheSize / cache.MaxValueSize + } + log.Printf("warn: cache shards invaild. set to %d", cacheShards) + } + + if cacheTTL < 0 { + cacheTTL = 0 + } + + allocator := cache.NewAllocatorPool() + + options := &cache.CacheOptions{ + ShardNum: int(cacheShards), + Size: cacheSize, + TTL: cacheTTL, + Allocator: allocator, + } + c, err := cache.NewCache(cachePath, options) + if err != nil { + log.Fatal(err) + } + s := server.NewMemcacheServer(l, c, allocator) + log.Fatal(s.Serv()) +} diff --git a/protocol/memcache/common.go b/protocol/memcache/common.go new file mode 100644 index 0000000..05a66ac --- /dev/null +++ b/protocol/memcache/common.go @@ -0,0 +1,36 @@ +package memcache + +var ( + MaxCommandInfoBytes = 512 + + RspErr = []byte("ERROR\r\n") + RspStored = []byte("STORED\r\n") + RspNotStored = []byte("NOT_STORED\r\n") + RspExists = []byte("EXISTS\r\n") + RspNotFound = []byte("NOT_FOUND\r\n") + RspDeleted = []byte("DELETED\r\n") + RspEnd = []byte("END\r\n") + RspTouched = []byte("TOUCHED\r\n") + + EOL = []byte("\r\n") +) + +func MakeRspClientErr(err error) []byte { + return []byte("CLIENT_ERROR " + err.Error() + "\r\n") +} + +func MakeRspServerErr(err error) []byte { + return []byte("SERVER_ERROR " + err.Error() + "\r\n") +} + +type CommandInfo struct { + Cmd string + Key string + Keys []string // for retrieval commands + Delta uint64 // for incr/decr + Flags uint32 + Exptime uint32 + PayloadLen int64 + CasUnique int64 + NoReply bool +} diff --git a/protocol/memcache/parser.go b/protocol/memcache/parser.go new file mode 100644 index 0000000..4e6452e --- /dev/null +++ b/protocol/memcache/parser.go @@ -0,0 +1,160 @@ +package memcache + +import ( + "bytes" + "errors" + "fmt" +) + +var ( + ErrNeedMoreData = errors.New("need more data") + + errCommand = errors.New("command err") +) + +// ParseCommand parses cmd from `data` and return CommandInfo +// return ErrNeedMoreData if data not contains '\n' +// implements: https://github.com/memcached/memcached/blob/master/doc/protocol.txt +func ParseCommand(data []byte) (advance int, cmdinfo *CommandInfo, err error) { + idx := bytes.IndexByte(data, '\n') + if idx < 0 { + return 0, nil, ErrNeedMoreData + } + advance = idx + 1 + left := bytes.TrimSpace(data[:advance]) + if len(left) == 0 { + return advance, nil, errCommand + } + + var cmd string + idx = bytes.IndexByte(left, ' ') + if idx < 0 { // single cmd + cmd = string(left) + left = left[0:0] + } else { + cmd = string(left[:idx]) + left = left[idx+1:] // remove cmd + } + + var parser func(cmd string, left []byte) (*CommandInfo, error) + switch cmd { + case "set", "add", "replace", "append", "prepend", "cas": + parser = parseStorageCommands + case "get", "gets": + parser = parseRetrievalCommands + case "delete": + parser = parseDeleteCommand + case "incr", "decr": + parser = parseIncrDecrCommands + case "touch": + parser = parseTouchCommand + default: + parser = parseOtherCommands + } + cmdinfo, err = parser(cmd, left) + return advance, cmdinfo, err +} + +var ( + norepl = []byte("noreply") +) + +// parse: +// [noreply] +// cas [noreply] +func parseStorageCommands(cmd string, line []byte) (*CommandInfo, error) { + c := CommandInfo{Cmd: cmd} + + if cmd != "cas" { + n, _ := fmt.Sscanf(string(line), "%s %d %d %d", + &c.Key, &c.Flags, &c.Exptime, &c.PayloadLen) + if n != 4 { + return nil, errCommand + } + } else { + n, _ := fmt.Sscanf(string(line), "%s %d %d %d %d", + &c.Key, &c.Flags, &c.Exptime, &c.PayloadLen, &c.CasUnique) + if n != 5 { + return nil, errCommand + } + } + if bytes.HasSuffix(line, norepl) { + c.NoReply = true + } + return &c, nil +} + +// parse: +// get * +// gets * +func parseRetrievalCommands(cmd string, line []byte) (*CommandInfo, error) { + if len(line) == 0 { + return nil, errCommand + } + bb := bytes.Split(line, []byte(" ")) + c := CommandInfo{Cmd: cmd} + c.Keys = make([]string, 0, len(bb)) + for _, b := range bb { + c.Keys = append(c.Keys, string(b)) + } + c.Key = c.Keys[0] + return &c, nil +} + +// parse: +// delete [noreply] +func parseDeleteCommand(cmd string, line []byte) (*CommandInfo, error) { + c := CommandInfo{Cmd: cmd} + bb := bytes.Split(line, []byte(" ")) + c.Key = string(bb[0]) + if len(bb) == 1 { + return &c, nil + } + if len(bb) != 2 || !bytes.Equal(bb[1], norepl) { + return nil, errCommand + } + c.NoReply = true + return &c, nil +} + +// parse: +// incr [noreply] +// decr [noreply] +func parseIncrDecrCommands(cmd string, line []byte) (*CommandInfo, error) { + c := CommandInfo{Cmd: cmd} + n, _ := fmt.Sscanf(string(line), "%s %d", &c.Key, &c.Delta) + if n != 2 { + return nil, errCommand + } + if bytes.HasSuffix(line, norepl) { + c.NoReply = true + } + return &c, nil +} + +// parse: +// touch [noreply] +func parseTouchCommand(cmd string, line []byte) (*CommandInfo, error) { + c := CommandInfo{Cmd: cmd} + n, _ := fmt.Sscanf(string(line), "%s %d", &c.Key, &c.Exptime) + if n != 2 { + return nil, errCommand + } + if bytes.HasSuffix(line, norepl) { + c.NoReply = true + } + return &c, nil +} + +func parseOtherCommands(cmd string, line []byte) (*CommandInfo, error) { + c := CommandInfo{Cmd: cmd} + if cmd == "stats" { + bb := bytes.Split(line, []byte(" ")) + c.Keys = make([]string, len(bb), len(bb)) + for i, b := range bb { + c.Keys[i] = string(b) + } + return &c, nil + } + return nil, errCommand +} diff --git a/protocol/memcache/parser_test.go b/protocol/memcache/parser_test.go new file mode 100644 index 0000000..0de82a3 --- /dev/null +++ b/protocol/memcache/parser_test.go @@ -0,0 +1,197 @@ +package memcache + +import "testing" + +func TestParseSet(t *testing.T) { + b := []byte("set k1 1 2 3 noreply\r\nxxx\r\n") + advance, cmd, err := ParseCommand(b) + if err != nil { + t.Fatal(err) + } + if cmd.Cmd != "set" { + t.Fatal("cmd err", cmd.Cmd) + } + if cmd.Key != "k1" { + t.Fatal("key err", cmd.Key) + } + if cmd.Flags != 1 { + t.Fatal("flags err", cmd.Flags) + } + if cmd.Exptime != 2 { + t.Fatal("exptime err", cmd.Exptime) + } + if cmd.PayloadLen != 3 { + t.Fatal("payload len err", cmd.PayloadLen) + } + if cmd.NoReply != true { + t.Fatal("norepl err") + } + if string(b[advance:]) != "xxx\r\n" { + t.Fatal("left buf err", string(b[advance:])) + } +} + +func TestParseCAS(t *testing.T) { + b := []byte("cas k1 1 2 3 4 noreply\r\nxxx\r\n") + advance, cmd, err := ParseCommand(b) + if err != nil { + t.Fatal(err) + } + if cmd.Cmd != "cas" { + t.Fatal("cmd err", cmd.Cmd) + } + if cmd.Key != "k1" { + t.Fatal("key err", cmd.Key) + } + if cmd.Flags != 1 { + t.Fatal("flags err", cmd.Flags) + } + if cmd.Exptime != 2 { + t.Fatal("exptime err", cmd.Exptime) + } + if cmd.PayloadLen != 3 { + t.Fatal("payload len err", cmd.PayloadLen) + } + if cmd.CasUnique != 4 { + t.Fatal("cas uniq err", cmd.CasUnique) + } + if cmd.NoReply != true { + t.Fatal("norepl err") + } + if string(b[advance:]) != "xxx\r\n" { + t.Fatal("left buf err", string(b[advance:])) + } +} + +func TestParseGet(t *testing.T) { + b := []byte("get k1 k2\r\nxxx\r\n") + advance, cmd, err := ParseCommand(b) + if err != nil { + t.Fatal(err) + } + if cmd.Cmd != "get" { + t.Fatal("cmd err", cmd.Cmd) + } + if cmd.Key != "k1" { + t.Fatal("key err", cmd.Key) + } + if len(cmd.Keys) != 2 { + t.Fatal("keys number err", len(cmd.Keys)) + } + if cmd.Keys[0] != "k1" { + t.Fatal("keys[0] err", cmd.Keys[0]) + } + if cmd.Keys[1] != "k2" { + t.Fatal("keys[1] err", cmd.Keys[1]) + + } + if string(b[advance:]) != "xxx\r\n" { + t.Fatal("left buf err", string(b[advance:])) + } +} + +func TestParseDelete(t *testing.T) { + b := []byte("delete k1 noreply\r\nxxx\r\n") + advance, cmd, err := ParseCommand(b) + if err != nil { + t.Fatal(err) + } + if cmd.Cmd != "delete" { + t.Fatal("cmd err", cmd.Cmd) + } + if cmd.Key != "k1" { + t.Fatal("key err", cmd.Key) + } + if cmd.NoReply != true { + t.Fatal("norepl err") + } + if string(b[advance:]) != "xxx\r\n" { + t.Fatal("left buf err", string(b[advance:])) + } +} + +func TestParseIncr(t *testing.T) { + b := []byte("incr k1 7 noreply\r\nxxx\r\n") + advance, cmd, err := ParseCommand(b) + if err != nil { + t.Fatal(err) + } + if cmd.Cmd != "incr" { + t.Fatal("cmd err", cmd.Cmd) + } + if cmd.Key != "k1" { + t.Fatal("key err", cmd.Key) + } + if cmd.Delta != 7 { + t.Fatal("delta err", cmd.Delta) + } + if cmd.NoReply != true { + t.Fatal("norepl err") + } + if string(b[advance:]) != "xxx\r\n" { + t.Fatal("left buf err", string(b[advance:])) + } +} + +func TestParseTouch(t *testing.T) { + b := []byte("touch k1 7 noreply\r\nxxx\r\n") + advance, cmd, err := ParseCommand(b) + if err != nil { + t.Fatal(err) + } + if cmd.Cmd != "touch" { + t.Fatal("cmd err", cmd.Cmd) + } + if cmd.Key != "k1" { + t.Fatal("key err", cmd.Key) + } + if cmd.Exptime != 7 { + t.Fatal("delta err", cmd.Delta) + } + if cmd.NoReply != true { + t.Fatal("norepl err") + } + if string(b[advance:]) != "xxx\r\n" { + t.Fatal("left buf err", string(b[advance:])) + } +} + +func TestParseStats(t *testing.T) { + b := []byte("stats\r\n") + advance, cmd, err := ParseCommand(b) + if err != nil { + t.Fatal(err) + } + if cmd.Cmd != "stats" { + t.Fatal("cmd err", cmd.Cmd) + } + if string(b[advance:]) != "" { + t.Fatal("left buf err", string(b[advance:])) + } + + b = []byte("stats settings\r\n") + advance, cmd, err = ParseCommand(b) + if err != nil { + t.Fatal(err) + } + if cmd.Cmd != "stats" { + t.Fatal("cmd err", cmd.Cmd) + } + if len(cmd.Keys) != 1 || cmd.Keys[0] != "settings" { + t.Fatal("stats args err") + } + if string(b[advance:]) != "" { + t.Fatal("left buf err", string(b[advance:])) + } +} + +func TestParseErr(t *testing.T) { + b := []byte("xxx k1 7 noreply\r\nxxx\r\n") + advance, _, err := ParseCommand(b) + if err != errCommand { + t.Fatal("err != errCommand", err) + } + if advance != len("xxx k1 7 noreply\r\n") { + t.Fatal("advance err", advance) + } +} diff --git a/server/common.go b/server/common.go new file mode 100644 index 0000000..0a99738 --- /dev/null +++ b/server/common.go @@ -0,0 +1,38 @@ +package server + +import ( + "io" + + "github.com/xiaost/blobcached/cache" +) + +const Version = "1.0" + +type Cache interface { + Set(item cache.Item) error + Get(key string) (cache.Item, error) + Del(key string) error + GetOptions() cache.CacheOptions + GetMetrics() cache.CacheMetrics + GetMetricsByShards() []cache.CacheMetrics + GetStats() cache.CacheStats + GetStatsByShards() []cache.CacheStats +} + +type Allocator interface { + Malloc(n int) []byte + Free([]byte) +} + +type WriterCounter struct { + W io.Writer + N int64 +} + +func (w *WriterCounter) Write(p []byte) (int, error) { + n, err := w.W.Write(p) + if n > 0 { + w.N += int64(n) + } + return n, err +} diff --git a/server/common_test.go b/server/common_test.go new file mode 100644 index 0000000..95db6c9 --- /dev/null +++ b/server/common_test.go @@ -0,0 +1,107 @@ +package server + +import ( + "sync" + "time" + "unsafe" + + "blobcached/cache" +) + +type InMemoryCache struct { + m map[string]cache.Item + + options cache.CacheOptions + + stats cache.CacheStats + metrics cache.CacheMetrics +} + +func NewInMemoryCache() *InMemoryCache { + c := &InMemoryCache{} + c.m = make(map[string]cache.Item) + c.options.Allocator = cache.NewAllocatorPool() + return c +} + +func (c *InMemoryCache) Set(item cache.Item) error { + c.metrics.SetTotal += 1 + item.Timestamp = time.Now().Unix() + c.m[item.Key] = item + c.updateStats() + return nil +} + +func (c *InMemoryCache) Get(key string) (cache.Item, error) { + c.metrics.GetTotal += 1 + item, ok := c.m[key] + if ok { + c.metrics.GetHits += 1 + return item, nil + } + c.metrics.GetMisses += 1 + return item, cache.ErrNotFound +} + +func (c *InMemoryCache) Del(key string) error { + c.metrics.DelTotal += 1 + delete(c.m, key) + c.updateStats() + return nil +} + +func (c *InMemoryCache) updateStats() { + c.stats.Keys = uint64(len(c.m)) + c.stats.Bytes = 0 + for _, it := range c.m { + c.stats.Bytes += uint64(len(it.Key) + len(it.Value)) + } + c.stats.LastUpdate = time.Now().Unix() +} + +func (c *InMemoryCache) GetOptions() cache.CacheOptions { + return c.options +} + +func (c *InMemoryCache) GetMetrics() cache.CacheMetrics { + return c.metrics +} + +func (c *InMemoryCache) GetMetricsByShards() []cache.CacheMetrics { + return []cache.CacheMetrics{c.metrics} +} + +func (c *InMemoryCache) GetStats() cache.CacheStats { + return c.stats +} + +func (c *InMemoryCache) GetStatsByShards() []cache.CacheStats { + return []cache.CacheStats{c.stats} +} + +type DebugAllocator struct { + mu sync.Mutex + all map[unsafe.Pointer]bool +} + +func NewDebugAllocator() *DebugAllocator { + return &DebugAllocator{all: make(map[unsafe.Pointer]bool)} +} + +func (a *DebugAllocator) Free(b []byte) { + a.mu.Lock() + defer a.mu.Unlock() + p := unsafe.Pointer(&b[0]) + if !a.all[p] { + panic(p) + } +} + +func (a *DebugAllocator) Malloc(n int) []byte { + a.mu.Lock() + defer a.mu.Unlock() + b := make([]byte, n) + p := unsafe.Pointer(&b[0]) + a.all[p] = true + return b +} diff --git a/server/memcache.go b/server/memcache.go new file mode 100644 index 0000000..f773de8 --- /dev/null +++ b/server/memcache.go @@ -0,0 +1,299 @@ +package server + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "os" + "sync/atomic" + "time" + + "github.com/xiaost/blobcached/cache" + "github.com/xiaost/blobcached/protocol/memcache" +) + +var errNotSupportedCommand = errors.New("not supported command") + +type ServerMetrics struct { + BytesRead uint64 // Total number of bytes read by this server + BytesWritten uint64 // Total number of bytes sent by this server + CurrConnections int64 // Number of active connections + TotalConnections uint64 // Total number of connections opened since the server started running +} + +type MemcacheServer struct { + l net.Listener + cache Cache + allocator Allocator + metrics ServerMetrics + + startTime time.Time +} + +func NewMemcacheServer(l net.Listener, cache Cache, allocator Allocator) *MemcacheServer { + return &MemcacheServer{l: l, cache: cache, allocator: allocator} +} + +func (s *MemcacheServer) Serv() error { + s.startTime = time.Now() + log.Printf("memcache server listening on %s", s.l.Addr()) + for { + conn, err := s.l.Accept() + if err != nil { + return err + } + if tcpconn, ok := conn.(*net.TCPConn); ok { + tcpconn.SetKeepAlive(true) + tcpconn.SetKeepAlivePeriod(30 * time.Second) + } + atomic.AddUint64(&s.metrics.TotalConnections, 1) + go func(conn net.Conn) { + atomic.AddInt64(&s.metrics.CurrConnections, 1) + + s.Handle(conn) + conn.Close() + + atomic.AddInt64(&s.metrics.CurrConnections, -1) + }(conn) + } +} + +func (s *MemcacheServer) Handle(conn net.Conn) { + const maxReadPerRequest = 2 * cache.MaxValueSize + r := &io.LimitedReader{R: conn, N: maxReadPerRequest} + w := &WriterCounter{conn, 0} + var rbuf *bufio.Reader + for { + atomic.AddUint64(&s.metrics.BytesRead, uint64(maxReadPerRequest-r.N)) + atomic.AddUint64(&s.metrics.BytesWritten, uint64(w.N)) + + r.N = maxReadPerRequest + w.N = 0 + + conn.SetDeadline(time.Now().Add(48 * time.Hour)) + + if rbuf == nil { + rbuf = bufio.NewReader(r) + } + b, err := rbuf.ReadSlice('\n') + if err != nil { + if err != io.EOF { + log.Printf("read %s err: %s", conn.RemoteAddr(), err) + } + return + } + advance, cmdinfo, err := memcache.ParseCommand(b) + if err != nil { + log.Printf("parse %s command err: %s", conn.RemoteAddr(), err) + w.Write(memcache.MakeRspClientErr(err)) + return + } + if advance != len(b) { + panic("advance != len(b)") + } + + // avoid blocking on reading data block or writing rsp + conn.SetDeadline(time.Now().Add(60 * time.Second)) + + var ww io.Writer = w + if cmdinfo.NoReply { + ww = ioutil.Discard + } + + switch cmdinfo.Cmd { + case "get", "gets": + // some clients always use "gets" instead of "get" + // we implement "gets" with fake cas uniq + err = s.HandleGet(ww, cmdinfo) + case "set": + err = s.HandleSet(ww, rbuf, cmdinfo) + case "delete": + err = s.HandleDel(ww, cmdinfo) + case "touch": + err = s.HandleTouch(ww, cmdinfo) + case "stats": + err = s.HandleStats(ww) + default: + ww.Write(memcache.MakeRspServerErr(errNotSupportedCommand)) + return + } + + if err != nil { + log.Printf("client %s process err: %s", conn.RemoteAddr(), err) + return + } + + if advance > (4<<10) && rbuf.Buffered() == 0 { + // if parse cmd use memory > 4kb, free Reader to avoid memory issue + // it is safe to free it if buffered nothing + rbuf = nil + } + } +} + +func (s *MemcacheServer) HandleSet(w io.Writer, r *bufio.Reader, cmdinfo *memcache.CommandInfo) error { + if cmdinfo.PayloadLen > cache.MaxValueSize-4096 { + w.Write(memcache.MakeRspClientErr(cache.ErrValueSize)) + return cache.ErrValueSize + } + + b := s.allocator.Malloc(int(cmdinfo.PayloadLen) + 2) // including \r\n + defer s.allocator.Free(b) + + _, err := io.ReadFull(r, b) + if err != nil { + return err + } + value := b[:len(b)-2] // remove \r\n + item := cache.Item{Key: cmdinfo.Key, Value: value, Flags: cmdinfo.Flags} + + /* https://github.com/memcached/memcached/blob/master/doc/protocol.txt + + Expiration times + ---------------- + + Some commands involve a client sending some kind of expiration time + (relative to an item or to an operation requested by the client) to + the server. In all such cases, the actual value sent may either be + Unix time (number of seconds since January 1, 1970, as a 32-bit + value), or a number of seconds starting from current time. In the + latter case, this number of seconds may not exceed 60*60*24*30 (number + of seconds in 30 days); if the number sent by a client is larger than + that, the server will consider it to be real Unix time value rather + than an offset from current time. + + */ + if cmdinfo.Exptime <= 30*86400 { + item.TTL = cmdinfo.Exptime + } else { + now := time.Now().Unix() + if now >= int64(cmdinfo.Exptime) { + _, err = w.Write(memcache.RspStored) + return err + } else { + item.TTL = uint32(int64(cmdinfo.Exptime) - now) + } + } + + if err := s.cache.Set(item); err != nil { + w.Write(memcache.MakeRspServerErr(err)) + return err + } + _, err = w.Write(memcache.RspStored) + return err +} + +func (s *MemcacheServer) HandleGet(w io.Writer, cmdinfo *memcache.CommandInfo) error { + var prepend string + for _, k := range cmdinfo.Keys { + item, err := s.cache.Get(k) + if err == cache.ErrNotFound { + continue + } + if err != nil { + log.Printf("get key %s err: %s", k, err) + continue + } + // VALUE []\r\n + // \r\n + if cmdinfo.Cmd == "gets" { + fmt.Fprintf(w, "%sVALUE %s %d %d %d\r\n", prepend, k, item.Flags, len(item.Value), 0) + } else { + fmt.Fprintf(w, "%sVALUE %s %d %d\r\n", prepend, k, item.Flags, len(item.Value)) + } + w.Write(item.Value) + item.Free() + prepend = "\r\n" // reduce len(cmdinfo.Keys) times w.Write("\r\n") + } + _, err := w.Write([]byte(prepend + "END\r\n")) + return err +} + +func (s *MemcacheServer) HandleTouch(w io.Writer, cmdinfo *memcache.CommandInfo) error { + item, err := s.cache.Get(cmdinfo.Key) + if err != nil { + if err == cache.ErrNotFound { + _, err = w.Write(memcache.RspNotFound) + } else { + _, err = w.Write(memcache.MakeRspServerErr(err)) + } + return err + } + defer item.Free() + + if cmdinfo.Exptime <= 30*86400 { + item.TTL = cmdinfo.Exptime + } else { + now := time.Now().Unix() + if now >= int64(cmdinfo.Exptime) { // already expired + w.Write(memcache.RspTouched) + return s.cache.Del(cmdinfo.Key) + } else { + item.TTL = uint32(int64(cmdinfo.Exptime) - now) + } + } + if err := s.cache.Set(item); err != nil { + w.Write(memcache.MakeRspServerErr(err)) + return err + } + _, err = w.Write(memcache.RspTouched) + return err +} + +func (s *MemcacheServer) HandleDel(w io.Writer, cmdinfo *memcache.CommandInfo) error { + err := s.cache.Del(cmdinfo.Key) + if err != nil { + w.Write(memcache.MakeRspServerErr(err)) + return nil + } + _, err = w.Write(memcache.RspDeleted) + return err +} + +func (s *MemcacheServer) HandleStats(w io.Writer) error { + var buf bytes.Buffer + writeStat := func(name string, v interface{}) { + fmt.Fprintf(&buf, "STAT %s %v\r\n", name, v) + } + + now := time.Now() + writeStat("pid", os.Getpid()) + writeStat("uptime", int64(now.Sub(s.startTime).Seconds())) + writeStat("time", now.Unix()) + writeStat("version", Version+"(blobcached)") + + // options + options := s.cache.GetOptions() + writeStat("limit_maxbytes", options.Size) + + // server metrics + writeStat("curr_connections", atomic.LoadInt64(&s.metrics.CurrConnections)) + writeStat("total_connections", atomic.LoadUint64(&s.metrics.TotalConnections)) + writeStat("bytes_read", atomic.LoadUint64(&s.metrics.BytesRead)) + writeStat("bytes_written", atomic.LoadUint64(&s.metrics.BytesWritten)) + + // cache stats + stats := s.cache.GetStats() + writeStat("curr_items", stats.Keys) + writeStat("bytes", stats.Bytes) + + // cache metrics + metrics := s.cache.GetMetrics() + writeStat("cmd_get", metrics.GetTotal) + writeStat("cmd_set", metrics.SetTotal) + writeStat("get_hits", metrics.GetHits) + writeStat("get_misses", metrics.GetMisses) + writeStat("get_expired", metrics.GetExpired) + writeStat("reclaimed", metrics.Expired) + writeStat("evictions", metrics.Evicted) + writeStat("last_evicted_age", metrics.EvictedAge) + + buf.Write(memcache.RspEnd) + _, err := w.Write(buf.Bytes()) + return err +} diff --git a/server/memcache_test.go b/server/memcache_test.go new file mode 100644 index 0000000..847ab4f --- /dev/null +++ b/server/memcache_test.go @@ -0,0 +1,88 @@ +package server + +import ( + "bytes" + "fmt" + "net" + "strings" + "testing" + + "github.com/bradfitz/gomemcache/memcache" +) + +func getstat(lines string, name string, value interface{}) { + for _, line := range strings.Split(string(lines), "\r\n") { + if !strings.Contains(line, name) { + continue + } + fmt.Sscanf(line, "STAT "+name+" %v", value) + return + } +} + +func TestMemcacheServer(t *testing.T) { + l, err := net.ListenTCP("tcp", nil) + if err != nil { + t.Fatal(err) + } + defer l.Close() + + s := NewMemcacheServer(l, NewInMemoryCache(), NewDebugAllocator()) + go s.Serv() + + mc := memcache.New(l.Addr().String()) + if err := mc.Set(&memcache.Item{Key: "k1", Value: []byte("v1")}); err != nil { + t.Fatal(err) + } + if err := mc.Set(&memcache.Item{Key: "k2", Value: []byte("v2")}); err != nil { + t.Fatal(err) + } + + if err := mc.Touch("k2", 3); err != nil { + t.Fatal(err) + } + + if err := mc.Touch("k-notfound", 4); err != memcache.ErrCacheMiss { + t.Fatal("should not found") + } + + m, err := mc.GetMulti([]string{"k1", "k2", "k3"}) + if err != nil { + t.Fatal(err) + } + + if g, e := len(m), 2; g != e { + t.Fatalf("GetMulti: got len(map) = %d, want = %d", g, e) + } + if _, ok := m["k1"]; !ok { + t.Fatal("GetMulti: didn't get key 'k1'") + } + if _, ok := m["k2"]; !ok { + t.Fatal("GetMulti: didn't get key 'k2'") + } + if g, e := string(m["k1"].Value), "v1"; g != e { + t.Errorf("GetMulti: k1: got %q, want %q", g, e) + } + if g, e := string(m["k2"].Value), "v2"; g != e { + t.Errorf("GetMulti: k2: got %q, want %q", g, e) + } + + if err := mc.Delete("k1"); err != nil { + t.Fatal(err) + } + + if _, err := mc.Get("k1"); err != memcache.ErrCacheMiss { + t.Fatal("should cache miss") + } + + var buf bytes.Buffer + s.HandleStats(&buf) + t.Log("stat\n", buf.String()) + + var n int + getstat(buf.String(), "curr_connections", &n) + if n != 1 { + t.Fatal("stat err:", buf.String()) + } + +} diff --git a/tools/mcstat/mcstat.go b/tools/mcstat/mcstat.go new file mode 100644 index 0000000..8d744e7 --- /dev/null +++ b/tools/mcstat/mcstat.go @@ -0,0 +1,144 @@ +package main + +import ( + "bufio" + "fmt" + "log" + "net" + "os" + "strconv" + "strings" + "time" +) + +func PrintUsageAndExit() { + fmt.Printf("%s addr [interval]\n", os.Args[0]) + os.Exit(1) +} + +type Stats struct { + CmdSet int64 + CmdGet int64 + GetHits int64 + Rx int64 + Tx int64 + Keys int64 + + Bytes int64 + MaxBytes int64 +} + +func (s Stats) Sub(o Stats) Stats { + s.CmdSet -= o.CmdSet + s.CmdGet -= o.CmdGet + s.GetHits -= o.GetHits + s.Rx -= o.Rx + s.Tx -= o.Tx + // st.Bytes + // st.MaxBytes + return s +} + +func ParseStats(r *bufio.Reader) (Stats, error) { + var st Stats + mapping := map[string]interface{}{ + "cmd_set": &st.CmdSet, + "cmd_get": &st.CmdGet, + "get_hits": &st.GetHits, + "bytes_read": &st.Rx, + "bytes_written": &st.Tx, + "curr_items": &st.Keys, + "bytes": &st.Bytes, + "limit_maxbytes": &st.MaxBytes, + } + for { + line, err := r.ReadString('\n') + if err != nil { + return st, err + } + line = line[:len(line)-2] // remove \r\n + if line == "END" { + break + } + for name, v := range mapping { + if !strings.Contains(line, name) { + continue + } + fmt.Sscanf(line, "STAT "+name+" %v", v) + } + } + return st, nil +} + +func main() { + if len(os.Args) != 2 && len(os.Args) != 3 { + PrintUsageAndExit() + } + + addr := os.Args[1] + + var interval int + if len(os.Args) == 3 { + interval, _ = strconv.Atoi(os.Args[2]) + } + if interval <= 0 { + interval = 1 + } + + if _, _, err := net.SplitHostPort(addr); err != nil { + addr = net.JoinHostPort(addr, "11211") + } + + conn, err := net.Dial("tcp", addr) + if err != nil { + log.Fatal(err) + } + r := bufio.NewReader(conn) + + var st0 Stats + for i := 0; ; i++ { + conn.SetDeadline(time.Now().Add(10 * time.Second)) + _, err := conn.Write([]byte("stats\r\n")) + if err != nil { + log.Fatal(err) + } + st1, err := ParseStats(r) + if err != nil { + log.Fatal(err) + } + if i == 0 { + st0 = st1 + time.Sleep(time.Duration(interval) * time.Second) + continue + } + diff := st1.Sub(st0) + if (i-1)%50 == 0 { + fmt.Printf("%8s %8s %8s %6s %10s %10s %10s %18s\n", + "SET", "GET", "HITS", "HITR", "KEYS", "RX", "TX", "USAGE") + } + fmt.Printf("%8s %8s %8s %6.2f %10d %10s %10s %18s\n", + readableNum(diff.CmdSet), readableNum(diff.CmdGet), + readableNum(diff.GetHits), float32(diff.GetHits)/float32(diff.CmdGet), st1.Keys, + readableSize(diff.Rx), readableSize(diff.Tx), + readableSize(st1.Bytes)+"/"+readableSize(st1.MaxBytes)) + st0 = st1 + time.Sleep(time.Duration(interval) * time.Second) + } +} + +func readableNum(i int64) string { + if i < (1 << 10) { + return fmt.Sprintf("%d", i) + } + if i < (1 << 20) { + return fmt.Sprintf("%.2fK", float32(i)/(1<<10)) + } + if i < (1 << 30) { + return fmt.Sprintf("%.2fM", float32(i)/(1<<20)) + } + return fmt.Sprintf("%.2fG", float32(i)/(1<<30)) +} + +func readableSize(i int64) string { + return readableNum(i) + "B" +}