Skip to content

Commit

Permalink
Remove RLock/RUnlock from BatchTx
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jul 28, 2023
1 parent b4f8a7b commit 2976998
Show file tree
Hide file tree
Showing 17 changed files with 115 additions and 87 deletions.
4 changes: 2 additions & 2 deletions server/auth/range_perm_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"go.etcd.io/etcd/pkg/v3/adt"
)

func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions {
func getMergedPerms(tx UnsafeAuthReader, userName string) *unifiedRangePermissions {
user := tx.UnsafeGetUser(userName)
if user == nil {
return nil
Expand Down Expand Up @@ -127,7 +127,7 @@ func (as *authStore) isRangeOpPermitted(userName string, key, rangeEnd []byte, p
return checkKeyInterval(as.lg, rangePerm, key, rangeEnd, permtyp)
}

func (as *authStore) refreshRangePermCache(tx AuthReadTx) {
func (as *authStore) refreshRangePermCache(tx UnsafeAuthReader) {
// Note that every authentication configuration update calls this method and it invalidates the entire
// rangePermCache and reconstruct it based on information of users and roles stored in the backend.
// This can be a costly operation.
Expand Down
28 changes: 17 additions & 11 deletions server/auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ type AuthBackend interface {
}

type AuthBatchTx interface {
AuthReadTx
Lock()
Unlock()
UnsafeAuthReader
UnsafeSaveAuthEnabled(enabled bool)
UnsafeSaveAuthRevision(rev uint64)
UnsafePutUser(*authpb.User)
Expand All @@ -216,14 +218,18 @@ type AuthBatchTx interface {
}

type AuthReadTx interface {
RLock()
RUnlock()
UnsafeAuthReader
}

type UnsafeAuthReader interface {
UnsafeReadAuthEnabled() bool
UnsafeReadAuthRevision() uint64
UnsafeGetUser(string) *authpb.User
UnsafeGetRole(string) *authpb.Role
UnsafeGetAllUsers() []*authpb.User
UnsafeGetAllRoles() []*authpb.Role
Lock()
Unlock()
}

type authStore struct {
Expand Down Expand Up @@ -354,8 +360,8 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
// to avoid putting it in the critical section of the tx lock.
revision, err := func() (uint64, error) {
tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()
tx.RLock()
defer tx.RUnlock()

user = tx.UnsafeGetUser(username)
if user == nil {
Expand All @@ -382,13 +388,13 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) {
func (as *authStore) Recover(be AuthBackend) {
as.be = be
tx := be.ReadTx()
tx.Lock()
tx.RLock()

enabled := tx.UnsafeReadAuthEnabled()
as.setRevision(tx.UnsafeReadAuthRevision())
as.refreshRangePermCache(tx)

tx.Unlock()
tx.RUnlock()

as.enabledMu.Lock()
as.enabled = enabled
Expand Down Expand Up @@ -864,8 +870,8 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
}

tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()
tx.RLock()
defer tx.RUnlock()

user := tx.UnsafeGetUser(userName)
if user == nil {
Expand Down Expand Up @@ -906,8 +912,8 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error {
}

tx := as.be.ReadTx()
tx.Lock()
defer tx.Unlock()
tx.RLock()
defer tx.RUnlock()
u := tx.UnsafeGetUser(authInfo.Username)

if u == nil {
Expand Down
6 changes: 6 additions & 0 deletions server/auth/store_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func (t txMock) Lock() {
func (t txMock) Unlock() {
}

func (t txMock) RLock() {
}

func (t txMock) RUnlock() {
}

func (t txMock) UnsafeSaveAuthEnabled(enabled bool) {
t.be.enabled = enabled
}
Expand Down
14 changes: 1 addition & 13 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type Bucket interface {
}

type BatchTx interface {
ReadTx
Lock()
Unlock()
UnsafeReader
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
Expand Down Expand Up @@ -103,18 +103,6 @@ func (t *batchTx) Unlock() {
t.Mutex.Unlock()
}

// BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
// have appropriate semantics in BatchTx interface. Therefore should not be called.
// TODO: might want to decouple ReadTx and BatchTx

func (t *batchTx) RLock() {
panic("unexpected RLock")
}

func (t *batchTx) RUnlock() {
panic("unexpected RUnlock")
}

func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
_, err := t.tx.CreateBucket(bucket.Name())
if err != nil && err != bolt.ErrBucketExists {
Expand Down
3 changes: 3 additions & 0 deletions server/storage/backend/read_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (
type ReadTx interface {
RLock()
RUnlock()
UnsafeReader
}

type UnsafeReader interface {
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}
Expand Down
2 changes: 1 addition & 1 deletion server/storage/mvcc/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
hashStorageMaxSize = 10
)

func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
func unsafeHashByRev(tx backend.UnsafeReader, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
h := newKVHasher(compactRevision, revision, keep)
err := tx.UnsafeForEach(schema.Key, func(k, v []byte) error {
h.WriteKeyValue(k, v)
Expand Down
39 changes: 22 additions & 17 deletions server/storage/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ import (
)

type storeTxnRead struct {
s *store
storeTxnCommon
tx backend.ReadTx
}

type storeTxnCommon struct {
s *store
tx backend.UnsafeReader

firstRev int64
rev int64
Expand All @@ -54,17 +59,17 @@ func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
return newMetricsTxnRead(&storeTxnRead{storeTxnCommon{s, tx, firstRev, rev, trace}, tx})
}

func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
func (tr *storeTxnRead) Rev() int64 { return tr.rev }
func (tr *storeTxnCommon) FirstRev() int64 { return tr.firstRev }
func (tr *storeTxnCommon) Rev() int64 { return tr.rev }

func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
func (tr *storeTxnCommon) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
}

func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
func (tr *storeTxnCommon) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
rev := ro.Rev
if rev > curRev {
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
Expand Down Expand Up @@ -132,7 +137,7 @@ func (tr *storeTxnRead) End() {
}

type storeTxnWrite struct {
storeTxnRead
storeTxnCommon
tx backend.BatchTx
// beginRev is the revision where the txn begins; it will write to the next revision.
beginRev int64
Expand All @@ -144,10 +149,10 @@ func (s *store) Write(trace *traceutil.Trace) TxnWrite {
tx := s.b.BatchTx()
tx.LockInsideApply()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
storeTxnCommon: storeTxnCommon{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
Expand Down Expand Up @@ -217,7 +222,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {

d, err := kv.Marshal()
if err != nil {
tw.storeTxnRead.s.lg.Fatal(
tw.storeTxnCommon.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
Expand All @@ -240,7 +245,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
}
err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
tw.storeTxnRead.s.lg.Error(
tw.storeTxnCommon.s.lg.Error(
"failed to detach old lease from a key",
zap.Error(err),
)
Expand Down Expand Up @@ -278,13 +283,13 @@ func (tw *storeTxnWrite) delete(key []byte) {
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)

ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
ibytes = appendMarkTombstone(tw.storeTxnCommon.s.lg, ibytes)

kv := mvccpb.KeyValue{Key: key}

d, err := kv.Marshal()
if err != nil {
tw.storeTxnRead.s.lg.Fatal(
tw.storeTxnCommon.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
Expand All @@ -293,7 +298,7 @@ func (tw *storeTxnWrite) delete(key []byte) {
tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
if err != nil {
tw.storeTxnRead.s.lg.Fatal(
tw.storeTxnCommon.s.lg.Fatal(
"failed to tombstone an existing key",
zap.String("key", string(key)),
zap.Error(err),
Expand All @@ -307,7 +312,7 @@ func (tw *storeTxnWrite) delete(key []byte) {
if leaseID != lease.NoLease {
err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
if err != nil {
tw.storeTxnRead.s.lg.Error(
tw.storeTxnCommon.s.lg.Error(
"failed to detach old lease from a key",
zap.Error(err),
)
Expand Down
4 changes: 2 additions & 2 deletions server/storage/mvcc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (
"go.etcd.io/etcd/server/v3/storage/schema"
)

func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) {
func UnsafeReadFinishedCompact(tx backend.UnsafeReader) (finishedComact int64, found bool) {
_, finishedCompactBytes := tx.UnsafeRange(schema.Meta, schema.FinishedCompactKeyName, nil, 0)
if len(finishedCompactBytes) != 0 {
return bytesToRev(finishedCompactBytes[0]).main, true
}
return 0, false
}

func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found bool) {
func UnsafeReadScheduledCompact(tx backend.UnsafeReader) (scheduledComact int64, found bool) {
_, scheduledCompactBytes := tx.UnsafeRange(schema.Meta, schema.ScheduledCompactKeyName, nil, 0)
if len(scheduledCompactBytes) != 0 {
return bytesToRev(scheduledCompactBytes[0]).main, true
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/alarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *alarmBackend) GetAllAlarms() ([]*etcdserverpb.AlarmMember, error) {
return s.unsafeGetAllAlarms(tx)
}

func (s *alarmBackend) unsafeGetAllAlarms(tx backend.ReadTx) ([]*etcdserverpb.AlarmMember, error) {
func (s *alarmBackend) unsafeGetAllAlarms(tx backend.UnsafeReader) ([]*etcdserverpb.AlarmMember, error) {
var ms []*etcdserverpb.AlarmMember
err := tx.UnsafeForEach(Alarm, func(k, v []byte) error {
var m etcdserverpb.AlarmMember
Expand Down
22 changes: 14 additions & 8 deletions server/storage/schema/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,11 @@ func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) {
}

func (atx *authBatchTx) UnsafeReadAuthEnabled() bool {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeReadAuthEnabled()
return unsafeReadAuthEnabled(atx.tx)
}

func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 {
arx := &authReadTx{tx: atx.tx, lg: atx.lg}
return arx.UnsafeReadAuthRevision()
return unsafeReadAuthRevision(atx.tx)
}

func (atx *authBatchTx) Lock() {
Expand All @@ -117,7 +115,11 @@ func (atx *authBatchTx) Unlock() {
}

func (atx *authReadTx) UnsafeReadAuthEnabled() bool {
_, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
return unsafeReadAuthEnabled(atx.tx)
}

func unsafeReadAuthEnabled(tx backend.UnsafeReader) bool {
_, vs := tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0)
if len(vs) == 1 {
if bytes.Equal(vs[0], authEnabled) {
return true
Expand All @@ -127,18 +129,22 @@ func (atx *authReadTx) UnsafeReadAuthEnabled() bool {
}

func (atx *authReadTx) UnsafeReadAuthRevision() uint64 {
_, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
return unsafeReadAuthRevision(atx.tx)
}

func unsafeReadAuthRevision(tx backend.UnsafeReader) uint64 {
_, vs := tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0)
if len(vs) != 1 {
// this can happen in the initialization phase
return 0
}
return binary.BigEndian.Uint64(vs[0])
}

func (atx *authReadTx) Lock() {
func (atx *authReadTx) RLock() {
atx.tx.RLock()
}

func (atx *authReadTx) Unlock() {
func (atx *authReadTx) RUnlock() {
atx.tx.RUnlock()
}
Loading

0 comments on commit 2976998

Please sign in to comment.