diff --git a/server/etcdserver/apply/apply.go b/server/etcdserver/apply/apply.go index 058870b1dc2..f5180b0c072 100644 --- a/server/etcdserver/apply/apply.go +++ b/server/etcdserver/apply/apply.go @@ -71,9 +71,9 @@ type applierV3 interface { // delegates the actual execution to the applyFunc method. Apply(ctx context.Context, r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3, applyFunc applyFunc) *Result - Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) - Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) - DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) + Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) + Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) + DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) @@ -157,16 +157,16 @@ func (a *applierV3backend) Apply(ctx context.Context, r *pb.InternalRaftRequest, return applyFunc(ctx, r, shouldApplyV3) } -func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { - return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, txn, p) +func (a *applierV3backend) Put(ctx context.Context, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { + return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, p) } -func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { - return mvcctxn.DeleteRange(a.kv, txn, dr) +func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + return mvcctxn.DeleteRange(a.kv, dr) } -func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { - return mvcctxn.Range(ctx, a.lg, a.kv, txn, r) +func (a *applierV3backend) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + return mvcctxn.Range(ctx, a.lg, a.kv, r) } func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { @@ -255,7 +255,7 @@ type applierV3Capped struct { // with Puts so that the number of keys in the store is capped. func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } -func (a *applierV3Capped) Put(_ context.Context, _ mvcc.TxnWrite, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (a *applierV3Capped) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { return nil, nil, errors.ErrNoSpace } @@ -447,9 +447,9 @@ func newQuotaApplierV3(lg *zap.Logger, quotaBackendBytesCfg int64, be backend.Ba return "aApplierV3{app, serverstorage.NewBackendQuota(lg, quotaBackendBytesCfg, be, "v3-applier")} } -func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (a *quotaApplierV3) Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { ok := a.q.Available(p) - resp, trace, err := a.applierV3.Put(ctx, txn, p) + resp, trace, err := a.applierV3.Put(ctx, p) if err == nil && !ok { err = errors.ErrNoSpace } diff --git a/server/etcdserver/apply/apply_auth.go b/server/etcdserver/apply/apply_auth.go index 4873653ece1..51cf5ad8685 100644 --- a/server/etcdserver/apply/apply_auth.go +++ b/server/etcdserver/apply/apply_auth.go @@ -24,7 +24,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/txn" "go.etcd.io/etcd/server/v3/lease" - "go.etcd.io/etcd/server/v3/storage/mvcc" ) type authApplierV3 struct { @@ -65,7 +64,7 @@ func (aa *authApplierV3) Apply(ctx context.Context, r *pb.InternalRaftRequest, s return ret } -func (aa *authApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (aa *authApplierV3) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil { return nil, nil, err } @@ -84,17 +83,17 @@ func (aa *authApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, r *pb.PutRe return nil, nil, err } } - return aa.applierV3.Put(ctx, txn, r) + return aa.applierV3.Put(ctx, r) } -func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (aa *authApplierV3) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } - return aa.applierV3.Range(ctx, txn, r) + return aa.applierV3.Range(ctx, r) } -func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func (aa *authApplierV3) DeleteRange(r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } @@ -105,7 +104,7 @@ func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest } } - return aa.applierV3.DeleteRange(txn, r) + return aa.applierV3.DeleteRange(r) } func (aa *authApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { diff --git a/server/etcdserver/apply/apply_auth_test.go b/server/etcdserver/apply/apply_auth_test.go index a772a1823e5..0e2252f3ab1 100644 --- a/server/etcdserver/apply/apply_auth_test.go +++ b/server/etcdserver/apply/apply_auth_test.go @@ -445,7 +445,7 @@ func TestAuthApplierV3_Put(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { setAuthInfo(authApplier, tc.userName) - _, _, err := authApplier.Put(ctx, nil, tc.request) + _, _, err := authApplier.Put(ctx, tc.request) require.Equalf(t, tc.expectError, err, "Put returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err) }) } @@ -466,7 +466,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) { // The user should be able to put the key setAuthInfo(authApplier, userWriteOnly) - _, _, err = authApplier.Put(ctx, nil, &pb.PutRequest{ + _, _, err = authApplier.Put(ctx, &pb.PutRequest{ Key: []byte(key), Value: []byte("1"), Lease: LeaseId, @@ -475,7 +475,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) { // Put a key under the lease outside user's key range setAuthInfo(authApplier, userRoot) - _, _, err = authApplier.Put(ctx, nil, &pb.PutRequest{ + _, _, err = authApplier.Put(ctx, &pb.PutRequest{ Key: []byte(keyOutsideRange), Value: []byte("1"), Lease: LeaseId, @@ -484,7 +484,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) { // The user should not be able to put the key anymore setAuthInfo(authApplier, userWriteOnly) - _, _, err = authApplier.Put(ctx, nil, &pb.PutRequest{ + _, _, err = authApplier.Put(ctx, &pb.PutRequest{ Key: []byte(key), Value: []byte("1"), Lease: LeaseId, @@ -532,7 +532,7 @@ func TestAuthApplierV3_Range(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { setAuthInfo(authApplier, tc.userName) - _, err := authApplier.Range(ctx, nil, tc.request) + _, err := authApplier.Range(ctx, tc.request) require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err) }) } @@ -596,7 +596,7 @@ func TestAuthApplierV3_DeleteRange(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { setAuthInfo(authApplier, tc.userName) - _, err := authApplier.DeleteRange(nil, tc.request) + _, err := authApplier.DeleteRange(tc.request) require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err) }) } @@ -703,7 +703,7 @@ func TestAuthApplierV3_LeaseRevoke(t *testing.T) { // Put a key under the lease outside user's key range setAuthInfo(authApplier, userRoot) - _, _, err = authApplier.Put(ctx, nil, &pb.PutRequest{ + _, _, err = authApplier.Put(ctx, &pb.PutRequest{ Key: []byte(keyOutsideRange), Value: []byte("1"), Lease: LeaseId, diff --git a/server/etcdserver/apply/corrupt.go b/server/etcdserver/apply/corrupt.go index 040f294aeba..e04d28c6f39 100644 --- a/server/etcdserver/apply/corrupt.go +++ b/server/etcdserver/apply/corrupt.go @@ -20,7 +20,6 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/pkg/v3/traceutil" "go.etcd.io/etcd/server/v3/etcdserver/errors" - "go.etcd.io/etcd/server/v3/storage/mvcc" ) type applierV3Corrupt struct { @@ -29,15 +28,15 @@ type applierV3Corrupt struct { func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } -func (a *applierV3Corrupt) Put(_ context.Context, _ mvcc.TxnWrite, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (a *applierV3Corrupt) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { return nil, nil, errors.ErrCorrupt } -func (a *applierV3Corrupt) Range(_ context.Context, _ mvcc.TxnRead, _ *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3Corrupt) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, error) { return nil, errors.ErrCorrupt } -func (a *applierV3Corrupt) DeleteRange(_ mvcc.TxnWrite, _ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func (a *applierV3Corrupt) DeleteRange(_ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { return nil, errors.ErrCorrupt } diff --git a/server/etcdserver/apply/uber_applier.go b/server/etcdserver/apply/uber_applier.go index 201defa385b..72e591ed7ef 100644 --- a/server/etcdserver/apply/uber_applier.go +++ b/server/etcdserver/apply/uber_applier.go @@ -153,13 +153,13 @@ func (a *uberApplier) dispatch(ctx context.Context, r *pb.InternalRaftRequest, s switch { case r.Range != nil: op = "Range" - ar.Resp, ar.Err = a.applyV3.Range(ctx, nil, r.Range) + ar.Resp, ar.Err = a.applyV3.Range(ctx, r.Range) case r.Put != nil: op = "Put" - ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, nil, r.Put) + ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, r.Put) case r.DeleteRange != nil: op = "DeleteRange" - ar.Resp, ar.Err = a.applyV3.DeleteRange(nil, r.DeleteRange) + ar.Resp, ar.Err = a.applyV3.DeleteRange(r.DeleteRange) case r.Txn != nil: op = "Txn" ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(ctx, r.Txn) diff --git a/server/etcdserver/txn/txn.go b/server/etcdserver/txn/txn.go index ecd554629c3..6070389fd86 100644 --- a/server/etcdserver/txn/txn.go +++ b/server/etcdserver/txn/txn.go @@ -31,9 +31,7 @@ import ( "go.etcd.io/etcd/server/v3/storage/mvcc" ) -func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { - resp = &pb.PutResponse{} - resp.Header = &pb.ResponseHeader{} +func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { trace = traceutil.Get(ctx) // create put tracing if the trace in context is empty if trace.IsEmpty() { @@ -42,17 +40,25 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t traceutil.Field{Key: "key", Value: string(p.Key)}, traceutil.Field{Key: "req_size", Value: p.Size()}, ) + ctx = context.WithValue(ctx, traceutil.TraceKey, trace) } - val, leaseID := p.Value, lease.LeaseID(p.Lease) - if txnWrite == nil { - if leaseID != lease.NoLease { - if l := lessor.Lookup(leaseID); l == nil { - return nil, nil, lease.ErrLeaseNotFound - } + leaseID := lease.LeaseID(p.Lease) + if leaseID != lease.NoLease { + if l := lessor.Lookup(leaseID); l == nil { + return nil, nil, lease.ErrLeaseNotFound } - txnWrite = kv.Write(trace) - defer txnWrite.End() } + txnWrite := kv.Write(trace) + defer txnWrite.End() + resp, err = put(ctx, txnWrite, p) + return resp, trace, err +} + +func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) { + trace := traceutil.Get(ctx) + resp = &pb.PutResponse{} + resp.Header = &pb.ResponseHeader{} + val, leaseID := p.Value, lease.LeaseID(p.Lease) var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { @@ -61,13 +67,13 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t }, "get previous kv pair") if err != nil { - return nil, nil, err + return nil, err } } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { // ignore_{lease,value} flag expects previous key-value pair - return nil, nil, errors.ErrKeyNotFound + return nil, errors.ErrKeyNotFound } } if p.IgnoreValue { @@ -84,19 +90,21 @@ func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, t resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID) trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) - return resp, trace, nil + return resp, nil } -func DeleteRange(kv mvcc.KV, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { +func DeleteRange(kv mvcc.KV, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + txnWrite := kv.Write(traceutil.TODO()) + defer txnWrite.End() + resp, err := deleteRange(txnWrite, dr) + return resp, err +} + +func deleteRange(txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { resp := &pb.DeleteRangeResponse{} resp.Header = &pb.ResponseHeader{} end := mkGteRange(dr.RangeEnd) - if txnWrite == nil { - txnWrite = kv.Write(traceutil.TODO()) - defer txnWrite.End() - } - if dr.PrevKv { rr, err := txnWrite.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{}) if err != nil { @@ -114,17 +122,23 @@ func DeleteRange(kv mvcc.KV, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) return resp, nil } -func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (*pb.RangeResponse, error) { + trace := traceutil.Get(ctx) + if trace.IsEmpty() { + trace = traceutil.New("range", lg) + ctx = context.WithValue(ctx, traceutil.TraceKey, trace) + } + txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace) + defer txnRead.End() + return executeRange(ctx, lg, txnRead, r) +} + +func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { trace := traceutil.Get(ctx) resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} - if txnRead == nil { - txnRead = kv.Read(mvcc.ConcurrentReadTxMode, trace) - defer txnRead.End() - } - limit := r.Limit if r.SortOrder != pb.RangeRequest_NONE || r.MinModRevision != 0 || r.MaxModRevision != 0 || @@ -226,7 +240,6 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit ctx = context.WithValue(ctx, traceutil.TraceKey, trace) } isWrite := !IsTxnReadonly(rt) - // When the transaction contains write operations, we use ReadTx instead of // ConcurrentReadTx to avoid extra overhead of copying buffer. var txnWrite mvcc.TxnWrite @@ -235,7 +248,6 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit } else { txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.ConcurrentReadTxMode, trace)) } - var txnPath []bool trace.StepWithFunction( func() { @@ -243,22 +255,11 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit }, "compare", ) - - if isWrite { - trace.AddField(traceutil.Field{Key: "read_only", Value: false}) - if _, err := checkRequests(txnWrite, rt, txnPath, - func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil { - txnWrite.End() - return nil, nil, err - } - } - if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil { + err := checkTxn(ctx, txnWrite, rt, isWrite, lessor, txnPath) + if err != nil { txnWrite.End() return nil, nil, err } - trace.Step("check requests") - txnResp, _ := newTxnResp(rt, txnPath) - // When executing mutable txnWrite ops, etcd must hold the txnWrite lock so // readers do not see any intermediate results. Since writes are // serialized on the raft loop, the revision in the read view will @@ -267,7 +268,35 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit txnWrite.End() txnWrite = kv.Write(trace) } - _, err := applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp) + txnResp, err := txn(ctx, lg, txnWrite, rt, isWrite, txnPath) + txnWrite.End() + + trace.AddField( + traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)}, + traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision}, + ) + return txnResp, trace, err +} + +func checkTxn(ctx context.Context, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, lessor lease.Lessor, txnPath []bool) error { + trace := traceutil.Get(ctx) + if isWrite { + trace.AddField(traceutil.Field{Key: "read_only", Value: false}) + if _, err := checkRequests(txnWrite, rt, txnPath, + func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil { + return err + } + } + if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil { + return err + } + trace.Step("check requests") + return nil +} + +func txn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, isWrite bool, txnPath []bool) (*pb.TxnResponse, error) { + txnResp, _ := newTxnResp(rt, txnPath) + _, err := executeTxn(ctx, lg, txnWrite, rt, txnPath, txnResp) if err != nil { if isWrite { // end txn to release locks before panic @@ -283,14 +312,8 @@ func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWit if len(txnWrite.Changes()) != 0 { rev++ } - txnWrite.End() - txnResp.Header.Revision = rev - trace.AddField( - traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)}, - traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision}, - ) - return txnResp, trace, err + return txnResp, err } // newTxnResp allocates a txn response for a txn request given a path. @@ -324,7 +347,7 @@ func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txn return txnResp, txnCount } -func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) { +func executeTxn(ctx context.Context, lg *zap.Logger, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) { trace := traceutil.Get(ctx) reqs := rt.Success if !txnPath[0] { @@ -339,7 +362,7 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less traceutil.Field{Key: "req_type", Value: "range"}, traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)}, traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)}) - resp, err := Range(ctx, lg, kv, txnWrite, tv.RequestRange) + resp, err := executeRange(ctx, lg, txnWrite, tv.RequestRange) if err != nil { return 0, fmt.Errorf("applyTxn: failed Range: %w", err) } @@ -350,21 +373,21 @@ func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Less traceutil.Field{Key: "req_type", Value: "put"}, traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)}, traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()}) - resp, _, err := Put(ctx, lg, lessor, kv, txnWrite, tv.RequestPut) + resp, err := put(ctx, txnWrite, tv.RequestPut) if err != nil { return 0, fmt.Errorf("applyTxn: failed Put: %w", err) } respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp trace.StopSubTrace() case *pb.RequestOp_RequestDeleteRange: - resp, err := DeleteRange(kv, txnWrite, tv.RequestDeleteRange) + resp, err := deleteRange(txnWrite, tv.RequestDeleteRange) if err != nil { return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err) } respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp case *pb.RequestOp_RequestTxn: resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn - applyTxns, err := applyTxn(ctx, lg, kv, lessor, txnWrite, tv.RequestTxn, txnPath[1:], resp) + applyTxns, err := executeTxn(ctx, lg, txnWrite, tv.RequestTxn, txnPath[1:], resp) if err != nil { // don't wrap the error. It's a recursive call and err should be already wrapped return 0, err diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index ff6de473e93..c25108aaaf8 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -131,7 +131,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = txn.Range(ctx, s.Logger(), s.KV(), nil, r) } + get := func() { resp, err = txn.Range(ctx, s.Logger(), s.KV(), r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { err = serr return nil, err