diff --git a/proxy/CHANGELOG.md b/proxy/CHANGELOG.md index 07d8297f..10182f4e 100644 --- a/proxy/CHANGELOG.md +++ b/proxy/CHANGELOG.md @@ -1,5 +1,9 @@ # Overlord-proxy +## Version 1.9.0 +1. complete memcache binary protocol. +2. fix memcache text request key&data no copy bug. + ## Version 1.8.5 1. add slowlog file rotate based on size 2. add slowlog file total limit diff --git a/proxy/proto/memcache/binary/node_conn.go b/proxy/proto/memcache/binary/node_conn.go index be6a27a4..8726f170 100644 --- a/proxy/proto/memcache/binary/node_conn.go +++ b/proxy/proto/memcache/binary/node_conn.go @@ -62,11 +62,15 @@ func (n *nodeConn) Write(m *proto.Message) (err error) { err = errors.WithStack(ErrAssertReq) return } + if _, ok := noNeedNodeTypes[mcr.respType]; ok { + return + } + _ = n.bw.Write(magicReqBytes) cmd := mcr.respType - if cmd == RequestTypeGetQ || cmd == RequestTypeGetKQ { - cmd = RequestTypeGetK + if noq, ok := qReplaceNoQTypes[cmd]; ok { + cmd = noq } _ = n.bw.Write(cmd.Bytes()) _ = n.bw.Write(mcr.keyLen) @@ -100,6 +104,15 @@ func (n *nodeConn) Read(m *proto.Message) (err error) { return } mcr.data = mcr.data[:0] + + if _, ok := noNeedNodeTypes[mcr.respType]; ok { + if mcr.respType == RequestTypeVersion { + versionRespHeader(mcr) + mcr.data = append(mcr.data, versionRespBytes...) + } + return + } + REREAD: var bs []byte if bs, err = n.br.ReadExact(requestHeaderLen); err == bufio.ErrBufferFull { @@ -143,3 +156,13 @@ func (n *nodeConn) Close() error { func (n *nodeConn) Closed() bool { return atomic.LoadInt32(&n.state) == closed } + +func versionRespHeader(req *MCRequest) { + req.magic = magicResp + copy(req.keyLen, zeroTwoBytes) + copy(req.extraLen, zeroBytes) + copy(req.status, zeroTwoBytes) + copy(req.bodyLen, versionFourBytes) + copy(req.opaque, zeroFourBytes) + copy(req.cas, zeroEightBytes) +} diff --git a/proxy/proto/memcache/binary/proxy_conn.go b/proxy/proto/memcache/binary/proxy_conn.go index 03508648..be65d41e 100644 --- a/proxy/proto/memcache/binary/proxy_conn.go +++ b/proxy/proto/memcache/binary/proxy_conn.go @@ -15,6 +15,7 @@ import ( // memcached binary protocol: https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped const ( requestHeaderLen = 24 + proxyReadBufSize = 1024 ) type proxyConn struct { @@ -27,7 +28,7 @@ type proxyConn struct { func NewProxyConn(rw *libnet.Conn) proto.ProxyConn { p := &proxyConn{ // TODO: optimus zero - br: bufio.NewReader(rw, bufio.Get(1024)), + br: bufio.NewReader(rw, bufio.Get(proxyReadBufSize)), bw: bufio.NewWriter(rw), completed: true, } @@ -80,21 +81,30 @@ NEXTGET: return } switch req.respType { + case RequestTypeNoop, RequestTypeVersion, RequestTypeQuit, RequestTypeQuitQ: + req.key = req.key[:0] + req.data = req.data[:0] + return case RequestTypeSet, RequestTypeAdd, RequestTypeReplace, RequestTypeGet, RequestTypeGetK, - RequestTypeDelete, RequestTypeIncr, RequestTypeDecr, RequestTypeAppend, RequestTypePrepend, RequestTypeTouch, RequestTypeGat: + RequestTypeDelete, RequestTypeIncr, RequestTypeDecr, RequestTypeAppend, RequestTypePrepend, + RequestTypeTouch, RequestTypeGat: if err = p.decodeCommon(m, req); err == bufio.ErrBufferFull { p.br.Advance(-requestHeaderLen) return } return - case RequestTypeGetQ, RequestTypeGetKQ: + case RequestTypeGetQ, RequestTypeGetKQ, RequestTypeSetQ, RequestTypeAddQ, RequestTypeReplaceQ, + RequestTypeIncrQ, RequestTypeDecrQ, RequestTypeAppendQ, RequestTypePrependQ: + REQAGAIN: if err = p.decodeCommon(m, req); err == bufio.ErrBufferFull { - p.br.Advance(-requestHeaderLen) - return + if err = p.br.Read(); err != nil { + return + } + goto REQAGAIN // NOTE: try to read again for this request } goto NEXTGET } - err = errors.Wrapf(ErrBadRequest, "MC decoder unsupport command:%x", req.respType) + err = errors.Wrapf(ErrBadRequest, "MC decoder unsupport command:%d", req.respType) return } diff --git a/proxy/proto/memcache/binary/request.go b/proxy/proto/memcache/binary/request.go index 147bce53..a24c8446 100644 --- a/proxy/proto/memcache/binary/request.go +++ b/proxy/proto/memcache/binary/request.go @@ -14,12 +14,14 @@ const ( ) var ( - magicReqBytes = []byte{magicReq} - magicRespBytes = []byte{magicResp} - zeroBytes = []byte{0x00} - zeroTwoBytes = []byte{0x00, 0x00} - zeroFourBytes = []byte{0x00, 0x00, 0x00, 0x00} - zeroEightBytes = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + magicReqBytes = []byte{magicReq} + magicRespBytes = []byte{magicResp} + zeroBytes = []byte{0x00} + zeroTwoBytes = []byte{0x00, 0x00} + zeroFourBytes = []byte{0x00, 0x00, 0x00, 0x00} + zeroEightBytes = []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + versionRespBytes = []byte{'1', '.', '5', '.', '1', '2'} + versionFourBytes = []byte{0x00, 0x00, 0x00, 0x06} ) // RequestType is the protocol-agnostic identifier for the command @@ -27,84 +29,114 @@ type RequestType byte // all memcache request type const ( - RequestTypeGet RequestType = 0x00 - RequestTypeSet RequestType = 0x01 - RequestTypeAdd RequestType = 0x02 - RequestTypeReplace RequestType = 0x03 - RequestTypeDelete RequestType = 0x04 - RequestTypeIncr RequestType = 0x05 - RequestTypeDecr RequestType = 0x06 - RequestTypeGetQ RequestType = 0x09 - RequestTypeNoop RequestType = 0x0a - RequestTypeGetK RequestType = 0x0c - RequestTypeGetKQ RequestType = 0x0d - RequestTypeAppend RequestType = 0x0e - RequestTypePrepend RequestType = 0x0f - // RequestTypeSetQ = 0x11 - // RequestTypeAddQ = 0x12 - // RequestTypeReplaceQ = 0x13 - // RequestTypeIncrQ = 0x15 - // RequestTypeDecrQ = 0x16 - // RequestTypeAppendQ = 0x19 - // RequestTypePrependQ = 0x1a - RequestTypeTouch RequestType = 0x1c - RequestTypeGat RequestType = 0x1d - // RequestTypeGatQ = 0x1e - RequestTypeUnknown RequestType = 0xff + RequestTypeGet RequestType = 0x00 + RequestTypeSet RequestType = 0x01 + RequestTypeAdd RequestType = 0x02 + RequestTypeReplace RequestType = 0x03 + RequestTypeDelete RequestType = 0x04 + RequestTypeIncr RequestType = 0x05 + RequestTypeDecr RequestType = 0x06 + RequestTypeQuit RequestType = 0x07 + RequestTypeGetQ RequestType = 0x09 + RequestTypeNoop RequestType = 0x0a + RequestTypeVersion RequestType = 0x0b + RequestTypeGetK RequestType = 0x0c + RequestTypeGetKQ RequestType = 0x0d + RequestTypeAppend RequestType = 0x0e + RequestTypePrepend RequestType = 0x0f + RequestTypeSetQ RequestType = 0x11 + RequestTypeAddQ RequestType = 0x12 + RequestTypeReplaceQ RequestType = 0x13 + RequestTypeIncrQ RequestType = 0x15 + RequestTypeDecrQ RequestType = 0x16 + RequestTypeQuitQ RequestType = 0x17 + RequestTypeAppendQ RequestType = 0x19 + RequestTypePrependQ RequestType = 0x1a + RequestTypeTouch RequestType = 0x1c + RequestTypeGat RequestType = 0x1d + RequestTypeGatQ RequestType = 0x1e + RequestTypeUnknown RequestType = 0xff ) var ( - getBytes = []byte{byte(RequestTypeGet)} - setBytes = []byte{byte(RequestTypeSet)} - addBytes = []byte{byte(RequestTypeAdd)} - replaceBytes = []byte{byte(RequestTypeReplace)} - deleteBytes = []byte{byte(RequestTypeDelete)} - incrBytes = []byte{byte(RequestTypeIncr)} - decrBytes = []byte{byte(RequestTypeDecr)} - getQBytes = []byte{byte(RequestTypeGetQ)} - noopBytes = []byte{byte(RequestTypeNoop)} - getKBytes = []byte{byte(RequestTypeGetK)} - getKQBytes = []byte{byte(RequestTypeGetKQ)} - appendBytes = []byte{byte(RequestTypeAppend)} - prependBytes = []byte{byte(RequestTypePrepend)} - // setQBytes = []byte{byte(RequestTypeSetQ)} - // addQBytes = []byte{byte(RequestTypeAddQ)} - // replaceQBytes = []byte{byte(RequestTypeReplaceQ)} - // incrQBytes = []byte{byte(RequestTypeIncrQ)} - // decrQBytes = []byte{byte(RequestTypeDecrQ)} - // appendQBytes = []byte{byte(RequestTypeAppendQ)} - // prependQBytes = []byte{byte(RequestTypePrependQ)} - touchBytes = []byte{byte(RequestTypeTouch)} - gatBytes = []byte{byte(RequestTypeGat)} - // gatQBytes = []byte{byte(RequestTypeGatQ)} - unknownBytes = []byte{byte(RequestTypeUnknown)} + noNeedNodeTypes = map[RequestType]struct{}{ + RequestTypeQuit: struct{}{}, + RequestTypeNoop: struct{}{}, + RequestTypeVersion: struct{}{}, + RequestTypeQuitQ: struct{}{}, + } + qReplaceNoQTypes = map[RequestType]RequestType{ + RequestTypeGetQ: RequestTypeGetK, + RequestTypeGetKQ: RequestTypeGetK, + RequestTypeSetQ: RequestTypeSet, + RequestTypeAddQ: RequestTypeAdd, + RequestTypeReplaceQ: RequestTypeReplace, + RequestTypeIncrQ: RequestTypeIncr, + RequestTypeDecrQ: RequestTypeDecr, + RequestTypeAppendQ: RequestTypeAppend, + RequestTypePrependQ: RequestTypePrepend, + RequestTypeGatQ: RequestTypeGat, + } +) + +var ( + getBytes = []byte{byte(RequestTypeGet)} + setBytes = []byte{byte(RequestTypeSet)} + addBytes = []byte{byte(RequestTypeAdd)} + replaceBytes = []byte{byte(RequestTypeReplace)} + deleteBytes = []byte{byte(RequestTypeDelete)} + incrBytes = []byte{byte(RequestTypeIncr)} + decrBytes = []byte{byte(RequestTypeDecr)} + quitBytes = []byte{byte(RequestTypeQuit)} + getQBytes = []byte{byte(RequestTypeGetQ)} + noopBytes = []byte{byte(RequestTypeNoop)} + versionBytes = []byte{byte(RequestTypeVersion)} + getKBytes = []byte{byte(RequestTypeGetK)} + getKQBytes = []byte{byte(RequestTypeGetKQ)} + appendBytes = []byte{byte(RequestTypeAppend)} + prependBytes = []byte{byte(RequestTypePrepend)} + setQBytes = []byte{byte(RequestTypeSetQ)} + addQBytes = []byte{byte(RequestTypeAddQ)} + replaceQBytes = []byte{byte(RequestTypeReplaceQ)} + incrQBytes = []byte{byte(RequestTypeIncrQ)} + decrQBytes = []byte{byte(RequestTypeDecrQ)} + quitQBytes = []byte{byte(RequestTypeQuitQ)} + appendQBytes = []byte{byte(RequestTypeAppendQ)} + prependQBytes = []byte{byte(RequestTypePrependQ)} + touchBytes = []byte{byte(RequestTypeTouch)} + gatBytes = []byte{byte(RequestTypeGat)} + gatQBytes = []byte{byte(RequestTypeGatQ)} + unknownBytes = []byte{byte(RequestTypeUnknown)} ) const ( - getString = "get" - setString = "set" - addString = "add" - replaceString = "replace" - deleteString = "delete" - incrString = "incr" - decrString = "decr" - getQString = "getq" - noopString = "noop" - getKString = "getk" - getKQString = "getkq" - appendString = "append" - prependString = "prepend" - // setQString = "setq" - // addQString = "addq" - // replaceQString = "replaceq" - // incrQString = "incrq" - // decrQString = "decrq" - // appendQString = "appendq" - // prependQString = "prependq" - touchString = "touch" - gatString = "gat" - // gatQString = "gatq" - unknownString = "unknown" + getString = "get" + setString = "set" + addString = "add" + replaceString = "replace" + deleteString = "delete" + incrString = "incr" + decrString = "decr" + quitString = "quit" + getQString = "getq" + noopString = "noop" + versionString = "version" + getKString = "getk" + getKQString = "getkq" + appendString = "append" + prependString = "prepend" + setQString = "setq" + addQString = "addq" + replaceQString = "replaceq" + incrQString = "incrq" + decrQString = "decrq" + quitQString = "quitq" + appendQString = "appendq" + prependQString = "prependq" + touchString = "touch" + gatString = "gat" + gatQString = "gatq" + unknownString = "unknown" ) // Bytes get reqtype bytes. @@ -124,10 +156,14 @@ func (rt RequestType) Bytes() []byte { return incrBytes case RequestTypeDecr: return decrBytes + case RequestTypeQuit: + return quitBytes case RequestTypeGetQ: return getQBytes case RequestTypeNoop: return noopBytes + case RequestTypeVersion: + return versionBytes case RequestTypeGetK: return getKBytes case RequestTypeGetKQ: @@ -136,26 +172,28 @@ func (rt RequestType) Bytes() []byte { return appendBytes case RequestTypePrepend: return prependBytes - // case RequestTypeSetQ: - // return setQBytes - // case RequestTypeAddQ: - // return addQBytes - // case RequestTypeReplaceQ: - // return replaceQBytes - // case RequestTypeIncrQ: - // return incrQBytes - // case RequestTypeDecrQ: - // return decrQBytes - // case RequestTypeAppendQ: - // return appendQBytes - // case RequestTypePrependQ: - // return prependQBytes + case RequestTypeSetQ: + return setQBytes + case RequestTypeAddQ: + return addQBytes + case RequestTypeReplaceQ: + return replaceQBytes + case RequestTypeIncrQ: + return incrQBytes + case RequestTypeDecrQ: + return decrQBytes + case RequestTypeQuitQ: + return quitQBytes + case RequestTypeAppendQ: + return appendQBytes + case RequestTypePrependQ: + return prependQBytes case RequestTypeTouch: return touchBytes case RequestTypeGat: return gatBytes - // case RequestTypeGatQ: - // return gatQBytes + case RequestTypeGatQ: + return gatQBytes } return unknownBytes } @@ -177,10 +215,14 @@ func (rt RequestType) String() string { return incrString case RequestTypeDecr: return decrString + case RequestTypeQuit: + return quitString case RequestTypeGetQ: return getQString case RequestTypeNoop: return noopString + case RequestTypeVersion: + return versionString case RequestTypeGetK: return getKString case RequestTypeGetKQ: @@ -189,26 +231,28 @@ func (rt RequestType) String() string { return appendString case RequestTypePrepend: return prependString - // case RequestTypeSetQ: - // return setQString - // case RequestTypeAddQ: - // return addQString - // case RequestTypeReplaceQ: - // return replaceQString - // case RequestTypeIncrQ: - // return incrQString - // case RequestTypeDecrQ: - // return decrQString - // case RequestTypeAppendQ: - // return appendQString - // case RequestTypePrependQ: - // return prependQString + case RequestTypeSetQ: + return setQString + case RequestTypeAddQ: + return addQString + case RequestTypeReplaceQ: + return replaceQString + case RequestTypeIncrQ: + return incrQString + case RequestTypeDecrQ: + return decrQString + case RequestTypeQuitQ: + return quitQString + case RequestTypeAppendQ: + return appendQString + case RequestTypePrependQ: + return prependQString case RequestTypeTouch: return touchString case RequestTypeGat: return gatString - // case RequestTypeGatQ: - // return gatQString + case RequestTypeGatQ: + return gatQString } return unknownString } diff --git a/proxy/proto/memcache/node_conn.go b/proxy/proto/memcache/node_conn.go index 940c9d42..66076e4a 100644 --- a/proxy/proto/memcache/node_conn.go +++ b/proxy/proto/memcache/node_conn.go @@ -16,7 +16,7 @@ const ( opened = int32(0) closed = int32(1) - nodeReadBufSize = 512 * 1024 // NOTE: 2MB + nodeReadBufSize = 2 * 1024 * 1024 // NOTE: 2MB ) type nodeConn struct { diff --git a/proxy/proto/memcache/proxy_conn.go b/proxy/proto/memcache/proxy_conn.go index e1da2f4f..3dc988a3 100644 --- a/proxy/proto/memcache/proxy_conn.go +++ b/proxy/proto/memcache/proxy_conn.go @@ -16,6 +16,8 @@ import ( // memcached protocol: https://github.com/memcached/memcached/blob/master/doc/protocol.txt const ( serverErrorPrefix = "SERVER_ERROR " + + proxyReadBufSize = 1024 ) var ( @@ -33,7 +35,7 @@ type proxyConn struct { func NewProxyConn(rw *libnet.Conn) proto.ProxyConn { p := &proxyConn{ // TODO: optimus zero - br: bufio.NewReader(rw, bufio.Get(1024)), + br: bufio.NewReader(rw, bufio.Get(proxyReadBufSize)), bw: bufio.NewWriter(rw), completed: true, } @@ -254,12 +256,10 @@ func (p *proxyConn) decodeGetAndTouch(m *proto.Message, bs []byte, reqType Reque return } } - var ( b, e int ns = bs[eE:] ) - for { ns = ns[e:] b, e = nextField(ns) @@ -281,14 +281,18 @@ func WithReq(m *proto.Message, rtype RequestType, key []byte, data []byte) { if req == nil { req := GetReq() req.respType = rtype - req.key = key - req.data = data + req.key = req.key[:0] + req.key = append(req.key, key...) + req.data = req.data[:0] + req.data = append(req.data, data...) m.WithRequest(req) } else { mcreq := req.(*MCRequest) mcreq.respType = rtype - mcreq.key = key - mcreq.data = data + mcreq.key = mcreq.key[:0] + mcreq.key = append(mcreq.key, key...) + mcreq.data = mcreq.data[:0] + mcreq.data = append(mcreq.data, data...) } } @@ -385,14 +389,12 @@ func (p *proxyConn) Encode(m *proto.Message) (err error) { err = proto.ErrQuit return } - if mcr.respType == RequestTypeVersion { _ = p.bw.Write(versionReplyBytes) _ = p.bw.Write(version.Bytes()) err = p.bw.Write(crlfBytes) return } - if mcr.respType == RequestTypeSetNoreply { return } diff --git a/proxy/proto/redis/proxy_conn.go b/proxy/proto/redis/proxy_conn.go index 590ef42b..2ff47d08 100644 --- a/proxy/proto/redis/proxy_conn.go +++ b/proxy/proto/redis/proxy_conn.go @@ -13,6 +13,10 @@ import ( "github.com/pkg/errors" ) +const ( + proxyReadBufSize = 1024 +) + var ( nullBytes = []byte("-1\r\n") okBytes = []byte("OK\r\n") @@ -40,7 +44,7 @@ type proxyConn struct { // NewProxyConn creates new redis Encoder and Decoder. func NewProxyConn(conn *libnet.Conn) proto.ProxyConn { r := &proxyConn{ - br: bufio.NewReader(conn, bufio.Get(1024)), + br: bufio.NewReader(conn, bufio.Get(proxyReadBufSize)), bw: bufio.NewWriter(conn), completed: true, resp: &resp{}, diff --git a/version/version.go b/version/version.go index da45b8e5..213c7bec 100644 --- a/version/version.go +++ b/version/version.go @@ -9,8 +9,8 @@ import ( // Define overlord version consts const ( OverlordMajor = 1 - OverlordMinor = 8 - OverlordPatch = 5 + OverlordMinor = 9 + OverlordPatch = 0 ) var (