Skip to content

Commit

Permalink
Introduce Kubernetes KV interface to etcd client
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jun 20, 2024
1 parent a043da5 commit 61005d1
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 0 deletions.
2 changes: 2 additions & 0 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Client struct {
Watcher
Auth
Maintenance
Kubernetes Kubernetes

conn *grpc.ClientConn

Expand Down Expand Up @@ -447,6 +448,7 @@ func newClient(cfg *Config) (*Client, error) {
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Kubernetes = NewKubernetes(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)

Expand Down
184 changes: 184 additions & 0 deletions client/v3/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package clientv3

import (
"context"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
)

func NewKubernetes(c *Client) Kubernetes {
return &kubernetes{kv: RetryKVClient(c)}
}

type Kubernetes interface {
Get(ctx context.Context, key string, opts GetOptions) (KubernetesGetResponse, error)
List(ctx context.Context, prefix string, opts ListOptions) (KubernetesListResponse, error)
Count(ctx context.Context, prefix string) (int64, error)
OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (KubernetesPutResponse, error)
OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (KubernetesDeleteResponse, error)
}

type GetOptions struct {
Revision int64
}

type ListOptions struct {
Revision int64
Limit int64
Continue string
}

type PutOptions struct {
ExpectedRevision int64
GetOnFailure bool
// Deprecated: Should be replaced with TTL when Kubernetes starts using one lease per object.
LeaseID LeaseID
}

type DeleteOptions struct {
ExpectedRevision int64
GetOnFailure bool
}

type KubernetesGetResponse struct {
KV *mvccpb.KeyValue
Revision int64
}

type KubernetesListResponse struct {
KVs []*mvccpb.KeyValue
Count int64
Revision int64
}

type KubernetesPutResponse struct {
KV *mvccpb.KeyValue
Succeeded bool
Revision int64
}

type KubernetesDeleteResponse struct {
KV *mvccpb.KeyValue
Succeeded bool
Revision int64
}

type kubernetes struct {
kv pb.KVClient
}

func (k kubernetes) Get(ctx context.Context, key string, opts GetOptions) (resp KubernetesGetResponse, err error) {
rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision))
if err != nil {
return resp, toErr(ctx, err)
}
resp.Revision = rangeResp.Header.Revision
if len(rangeResp.Kvs) == 1 {
resp.KV = rangeResp.Kvs[0]
}
return resp, nil
}

func (k kubernetes) List(ctx context.Context, prefix string, opts ListOptions) (resp KubernetesListResponse, err error) {
rangeStart := prefix + opts.Continue
rangeEnd := GetPrefixRangeEnd(prefix)

rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(rangeStart),
RangeEnd: []byte(rangeEnd),
Limit: opts.Limit,
Revision: opts.Revision,
})
if err != nil {
return resp, toErr(ctx, err)
}
resp.KVs = rangeResp.Kvs
resp.Count = rangeResp.Count
resp.Revision = rangeResp.Header.Revision
return resp, nil
}

func (k kubernetes) Count(ctx context.Context, prefix string) (int64, error) {
resp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(prefix),
RangeEnd: []byte(GetPrefixRangeEnd(prefix)),
CountOnly: true,
})
if err != nil {
return 0, toErr(ctx, err)
}
return resp.Count, nil
}

func (k kubernetes) OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (resp KubernetesPutResponse, err error) {
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}}

var onFailure *pb.RequestOp
if opts.GetOnFailure {
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
}

txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, onSuccess, onFailure)
if err != nil {
return resp, toErr(ctx, err)
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}

func (k kubernetes) OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (resp KubernetesDeleteResponse, err error) {
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}

var onFailure *pb.RequestOp
if opts.GetOnFailure {
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
}

txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, onSuccess, onFailure)
if err != nil {
return resp, toErr(ctx, err)
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}

func (k kubernetes) optimisticTxn(ctx context.Context, key string, expectRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) {
txn := &pb.TxnRequest{
Compare: []*pb.Compare{
{
Result: pb.Compare_EQUAL,
Target: pb.Compare_MOD,
Key: []byte(key),
TargetUnion: &pb.Compare_ModRevision{ModRevision: expectRevision},
},
},
Success: []*pb.RequestOp{onSuccess},
Failure: []*pb.RequestOp{onFailure},
}
return k.kv.Txn(ctx, txn)
}

func getRequest(key string, revision int64) *pb.RangeRequest {
return &pb.RangeRequest{
Key: []byte(key),
Revision: revision,
Limit: 1,
}
}

func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
getResponse := resp.GetResponseRange()
if len(getResponse.Kvs) == 1 {
return getResponse.Kvs[0]
}
return nil
}

0 comments on commit 61005d1

Please sign in to comment.