diff --git a/enterprise/server/raft/cache/cache.go b/enterprise/server/raft/cache/cache.go index d8306dc8fac..90e0a7adb31 100644 --- a/enterprise/server/raft/cache/cache.go +++ b/enterprise/server/raft/cache/cache.go @@ -371,22 +371,27 @@ func (rc *RaftCache) Reader(ctx context.Context, r *rspb.ResourceName, uncompres return nil, err } - var readCloser io.ReadCloser + var rsp *rfpb.GetMultiResponse err = rc.sender.Run(ctx, fileMetadataKey, func(c rfspb.ApiClient, h *rfpb.Header) error { - req := &rfpb.ReadRequest{ - Header: h, - FileRecord: fileRecord, - Offset: uncompressedOffset, - Limit: limit, + req := &rfpb.GetMultiRequest{ + Header: h, + FileRecords: []*rfpb.FileRecord{fileRecord}, } - r, err := rc.apiClient.RemoteReader(ctx, c, req) + r, err := c.GetMulti(ctx, req) if err != nil { return err } - readCloser = r + rsp = r return nil }) - return readCloser, err + if err != nil { + return nil, err + } + if len(rsp.GetResponses()) != 1 { + return nil, status.InternalError("GetMulti response did not containe requested FileRecord") + } + md := rsp.GetResponses()[0].GetFileMetadata() + return rc.fileStorer.InlineReader(md.GetStorageMetadata().GetInlineMetadata(), uncompressedOffset, limit) } type raftWriteCloser struct { @@ -548,7 +553,7 @@ func (rc *RaftCache) GetMulti(ctx context.Context, resources []*rspb.ResourceNam if !ok { return nil, status.InternalError("type is not *rfpb.FileRecord") } - req.FileRecord = append(req.FileRecord, fr) + req.FileRecords = append(req.FileRecords, fr) } return c.GetMulti(ctx, req) }) @@ -560,10 +565,10 @@ func (rc *RaftCache) GetMulti(ctx context.Context, resources []*rspb.ResourceNam for _, rsp := range rsps { fmr, ok := rsp.(*rfpb.GetMultiResponse) if !ok { - return nil, status.InternalError("response not of type *rfpb.FindMissingResponse") + return nil, status.InternalError("response not of type *rfpb.GetMultiResponse") } - for _, frd := range fmr.GetData() { - dataMap[frd.GetFileRecord().GetDigest()] = frd.GetData() + for _, r := range fmr.GetResponses() { + dataMap[r.GetFileRecord().GetDigest()] = r.GetFileMetadata().GetStorageMetadata().GetInlineMetadata().GetData() } } diff --git a/enterprise/server/raft/client/client.go b/enterprise/server/raft/client/client.go index 8d2802218c8..a407a04cf0e 100644 --- a/enterprise/server/raft/client/client.go +++ b/enterprise/server/raft/client/client.go @@ -3,7 +3,6 @@ package client import ( "context" "fmt" - "io" "sync" "time" @@ -75,56 +74,6 @@ func (c *APIClient) Get(ctx context.Context, peer string) (rfspb.ApiClient, erro return c.getClient(ctx, peer) } -func RemoteReader(ctx context.Context, client rfspb.ApiClient, req *rfpb.ReadRequest) (io.ReadCloser, error) { - stream, err := client.Read(ctx, req) - if err != nil { - return nil, err - } - reader, writer := io.Pipe() - - // Bit annoying here -- the gRPC stream won't give us an error until - // we've called Recv on it. But we don't want to return a reader that - // we know will error on first read with NotFound -- we want to return - // that error now. So we'll wait for our goroutine to call Recv once - // and return any error it gets in the main thread. - firstError := make(chan error) - go func() { - readOnce := false - for { - rsp, err := stream.Recv() - if !readOnce { - firstError <- err - readOnce = true - } - if rsp != nil { - writer.Write(rsp.Data) - } - if err == io.EOF { - writer.Close() - break - } - if err != nil { - writer.CloseWithError(err) - break - } - - } - }() - err = <-firstError - - // If we get an EOF, and we're expecting one - don't return an error. - digestSize := req.GetFileRecord().GetDigest().GetSizeBytes() - offset := req.GetOffset() - if err == io.EOF && offset == digestSize { - return reader, nil - } - return reader, err -} - -func (c *APIClient) RemoteReader(ctx context.Context, client rfspb.ApiClient, req *rfpb.ReadRequest) (io.ReadCloser, error) { - return RemoteReader(ctx, client, req) -} - func RunNodehostFn(ctx context.Context, nhf func(ctx context.Context) error) error { if _, ok := ctx.Deadline(); !ok { c, cancel := context.WithTimeout(ctx, DefaultContextTimeout) diff --git a/enterprise/server/raft/rbuilder/rbuilder.go b/enterprise/server/raft/rbuilder/rbuilder.go index 3b03e358e7d..8fd795b4c63 100644 --- a/enterprise/server/raft/rbuilder/rbuilder.go +++ b/enterprise/server/raft/rbuilder/rbuilder.go @@ -82,6 +82,18 @@ func (bb *BatchBuilder) Add(m proto.Message) *BatchBuilder { req.Value = &rfpb.RequestUnion_FindMissing{ FindMissing: value, } + case *rfpb.GetMultiRequest: + req.Value = &rfpb.RequestUnion_GetMulti{ + GetMulti: value, + } + case *rfpb.SetMultiRequest: + req.Value = &rfpb.RequestUnion_SetMulti{ + SetMulti: value, + } + case *rfpb.MetadataRequest: + req.Value = &rfpb.RequestUnion_Metadata{ + Metadata: value, + } default: bb.setErr(status.FailedPreconditionErrorf("BatchBuilder.Add handling for %+v not implemented.", m)) return bb @@ -242,3 +254,30 @@ func (br *BatchResponse) FindMissingResponse(n int) (*rfpb.FindMissingResponse, u := br.cmd.GetUnion()[n] return u.GetFindMissing(), br.unionError(u) } + +func (br *BatchResponse) GetMultiResponse(n int) (*rfpb.GetMultiResponse, error) { + br.checkIndex(n) + if br.err != nil { + return nil, br.err + } + u := br.cmd.GetUnion()[n] + return u.GetGetMulti(), br.unionError(u) +} + +func (br *BatchResponse) SetMultiResponse(n int) (*rfpb.SetMultiResponse, error) { + br.checkIndex(n) + if br.err != nil { + return nil, br.err + } + u := br.cmd.GetUnion()[n] + return u.GetSetMulti(), br.unionError(u) +} + +func (br *BatchResponse) MetadataResponse(n int) (*rfpb.MetadataResponse, error) { + br.checkIndex(n) + if br.err != nil { + return nil, br.err + } + u := br.cmd.GetUnion()[n] + return u.GetMetadata(), br.unionError(u) +} diff --git a/enterprise/server/raft/replica/BUILD b/enterprise/server/raft/replica/BUILD index 4b7efaff498..d54e4f43187 100644 --- a/enterprise/server/raft/replica/BUILD +++ b/enterprise/server/raft/replica/BUILD @@ -15,7 +15,6 @@ go_library( "//enterprise/server/util/pebble", "//proto:raft_go_proto", "//proto:raft_service_go_proto", - "//server/interfaces", "//server/metrics", "//server/util/log", "//server/util/qps", @@ -52,6 +51,7 @@ go_test( "//server/util/status", "@com_github_lni_dragonboat_v4//statemachine", "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//status", "@org_golang_google_protobuf//proto", ], ) diff --git a/enterprise/server/raft/replica/replica.go b/enterprise/server/raft/replica/replica.go index 07a28076dce..8f33cae90de 100644 --- a/enterprise/server/raft/replica/replica.go +++ b/enterprise/server/raft/replica/replica.go @@ -10,6 +10,7 @@ import ( "io" "math/rand" "path/filepath" + "sort" "strconv" "sync" "time" @@ -20,7 +21,6 @@ import ( "github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/rbuilder" "github.com/buildbuddy-io/buildbuddy/enterprise/server/raft/sender" "github.com/buildbuddy-io/buildbuddy/enterprise/server/util/pebble" - "github.com/buildbuddy-io/buildbuddy/server/interfaces" "github.com/buildbuddy-io/buildbuddy/server/metrics" "github.com/buildbuddy-io/buildbuddy/server/util/log" "github.com/buildbuddy-io/buildbuddy/server/util/qps" @@ -218,6 +218,9 @@ func (sm *Replica) Usage() (*rfpb.ReplicaUsage, error) { } sm.partitionMetadataMu.Unlock() + sort.Slice(ru.Partitions, func(i, j int) bool { + return ru.Partitions[i].GetPartitionId() < ru.Partitions[j].GetPartitionId() + }) ru.EstimatedDiskBytesUsed = sizeBytes ru.ReadQps = int64(sm.readQPS.Get()) ru.RaftProposeQps = int64(sm.raftProposeQPS.Get()) @@ -927,6 +930,85 @@ func (sm *Replica) findMissing(db ReplicaReader, req *rfpb.FindMissingRequest) ( return rsp, nil } +func (sm *Replica) getMulti(db ReplicaReader, req *rfpb.GetMultiRequest) (*rfpb.GetMultiResponse, error) { + rsp := &rfpb.GetMultiResponse{ + Responses: make([]*rfpb.GetMultiResponse_Response, len(req.GetFileRecords())), + } + + iter := db.NewIter(nil /*default iterOptions*/) + defer iter.Close() + + for i, fileRecord := range req.GetFileRecords() { + fr := fileRecord + + fileMetadataKey, err := sm.fileMetadataKey(fileRecord) + if err != nil { + return nil, err + } + + r := new(rfpb.GetMultiResponse_Response) + rsp.Responses[i] = r + r.FileRecord = fr + + fileMetadata, err := lookupFileMetadata(iter, fileMetadataKey) + if err != nil { + r.Status = gstatus.Convert(err).Proto() + } else { + r.FileMetadata = fileMetadata + sm.sendAccessTimeUpdate(fileMetadata) + } + } + + return rsp, nil +} + +func (sm *Replica) setMulti(wb pebble.Batch, req *rfpb.SetMultiRequest) (*rfpb.SetMultiResponse, error) { + rsp := &rfpb.SetMultiResponse{ + Responses: make([]*rfpb.SetMultiResponse_Response, len(req.GetFileMetadatas())), + } + + for i, fileMetadata := range req.GetFileMetadatas() { + fm := fileMetadata + + r := new(rfpb.SetMultiResponse_Response) + rsp.Responses[i] = r + r.FileRecord = fm.GetFileRecord() + + fileMetadataKey, err := sm.fileMetadataKey(fm.GetFileRecord()) + if err != nil { + return nil, err + } + buf, err := proto.Marshal(fm) + if err != nil { + return nil, err + } + if err := sm.rangeCheckedSet(wb, fileMetadataKey, buf); err != nil { + r.Status = gstatus.Convert(err).Proto() + } + } + + return rsp, nil +} + +func (sm *Replica) metadata(db ReplicaReader, req *rfpb.MetadataRequest) (*rfpb.MetadataResponse, error) { + fileMetadataKey, err := sm.fileMetadataKey(req.GetFileRecord()) + if err != nil { + return nil, err + } + + iter := db.NewIter(nil /*default iterOptions*/) + defer iter.Close() + + fileMetadata, err := lookupFileMetadata(iter, fileMetadataKey) + if err != nil { + return nil, err + } + fileMetadata.StorageMetadata = nil // nil out because this is not a read call. + return &rfpb.MetadataResponse{ + FileMetadata: fileMetadata, + }, nil +} + func statusProto(err error) *statuspb.Status { s, _ := gstatus.FromError(err) return s.Proto() @@ -976,6 +1058,12 @@ func (sm *Replica) handlePropose(wb pebble.Batch, req *rfpb.RequestUnion, rsp *r SimpleSplit: r, } rsp.Status = statusProto(err) + case *rfpb.RequestUnion_SetMulti: + r, err := sm.setMulti(wb, value.SetMulti) + rsp.Value = &rfpb.ResponseUnion_SetMulti{ + SetMulti: r, + } + rsp.Status = statusProto(err) default: rsp.Status = statusProto(status.UnimplementedErrorf("SyncPropose handling for %+v not implemented.", req)) } @@ -1003,6 +1091,18 @@ func (sm *Replica) handleRead(db ReplicaReader, req *rfpb.RequestUnion) *rfpb.Re FindMissing: r, } rsp.Status = statusProto(err) + case *rfpb.RequestUnion_GetMulti: + r, err := sm.getMulti(db, value.GetMulti) + rsp.Value = &rfpb.ResponseUnion_GetMulti{ + GetMulti: r, + } + rsp.Status = statusProto(err) + case *rfpb.RequestUnion_Metadata: + r, err := sm.metadata(db, value.Metadata) + rsp.Value = &rfpb.ResponseUnion_Metadata{ + Metadata: r, + } + rsp.Status = statusProto(err) default: rsp.Status = statusProto(status.UnimplementedErrorf("Read handling for %+v not implemented.", req)) } @@ -1037,21 +1137,6 @@ func (sm *Replica) validateRange(header *rfpb.Header) error { return nil } -func (sm *Replica) metadataForRecord(db pebble.Reader, fileRecord *rfpb.FileRecord) (*rfpb.FileMetadata, error) { - fileMetadataKey, err := sm.fileMetadataKey(fileRecord) - if err != nil { - return nil, err - } - - iter := db.NewIter(nil /*default iterOptions*/) - fileMetadata, err := lookupFileMetadata(iter, fileMetadataKey) - iter.Close() - if err != nil { - return nil, err - } - return fileMetadata, nil -} - type accessTimeUpdate struct { record *rfpb.FileRecord } @@ -1199,88 +1284,6 @@ func (sm *Replica) Sample(ctx context.Context, partitionID string, n int) ([]*rf return samples, nil } -func (sm *Replica) Metadata(ctx context.Context, header *rfpb.Header, fileRecord *rfpb.FileRecord) (*rfpb.FileMetadata, error) { - db, err := sm.leaser.DB() - if err != nil { - return nil, err - } - defer db.Close() - - if err := sm.validateRange(header); err != nil { - return nil, err - } - - return sm.metadataForRecord(db, fileRecord) -} - -func (sm *Replica) Reader(ctx context.Context, header *rfpb.Header, fileRecord *rfpb.FileRecord, offset, limit int64) (io.ReadCloser, error) { - db, err := sm.leaser.DB() - if err != nil { - return nil, err - } - - if err := sm.validateRange(header); err != nil { - db.Close() - return nil, err - } - - sm.readQPS.Inc() - - fileMetadata, err := sm.metadataForRecord(db, fileRecord) - db.Close() - - if err != nil { - return nil, err - } - rc, err := sm.fileStorer.NewReader(ctx, sm.fileDir, fileMetadata.GetStorageMetadata(), offset, limit) - if err != nil { - return nil, err - } - sm.sendAccessTimeUpdate(fileMetadata) - return rc, nil -} - -func (sm *Replica) GetMulti(ctx context.Context, header *rfpb.Header, fileRecords []*rfpb.FileRecord) ([]*rfpb.GetMultiResponse_Data, error) { - reader, err := sm.leaser.DB() - if err != nil { - return nil, err - } - defer reader.Close() - - if err := sm.validateRange(header); err != nil { - return nil, err - } - - var rsp []*rfpb.GetMultiResponse_Data - var buf bytes.Buffer - for _, fileRecord := range fileRecords { - rc, err := sm.Reader(ctx, header, fileRecord, 0, 0) - if err != nil { - return nil, err - } - _, err = io.Copy(&buf, rc) - rc.Close() - if err != nil { - return nil, err - } - - data := make([]byte, buf.Len()) - copy(data, buf.Bytes()) - rsp = append(rsp, &rfpb.GetMultiResponse_Data{FileRecord: fileRecord, Data: data}) - buf.Reset() - } - return rsp, nil -} - -func (sm *Replica) Writer(ctx context.Context, header *rfpb.Header, fileRecord *rfpb.FileRecord) (interfaces.MetadataWriteCloser, error) { - if err := sm.validateRange(header); err != nil { - return nil, err - } - - writeCloserMetadata := sm.fileStorer.InlineWriter(ctx, fileRecord.GetDigest().GetSizeBytes()) - return writeCloserMetadata, nil -} - // Update updates the IOnDiskStateMachine instance. The input Entry slice // is a list of continuous proposed and committed commands from clients, they // are provided together as a batch so the IOnDiskStateMachine implementation diff --git a/enterprise/server/raft/replica/replica_test.go b/enterprise/server/raft/replica/replica_test.go index 63cbc7c8e38..df0baaff2d0 100644 --- a/enterprise/server/raft/replica/replica_test.go +++ b/enterprise/server/raft/replica/replica_test.go @@ -28,6 +28,7 @@ import ( repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution" rspb "github.com/buildbuddy-io/buildbuddy/proto/resource" dbsm "github.com/lni/dragonboat/v4/statemachine" + gstatus "google.golang.org/grpc/status" ) var ( @@ -125,6 +126,29 @@ func writeLocalRangeDescriptor(t *testing.T, em *entryMaker, r *replica.Replica, require.Equal(t, 1, len(writeRsp)) } +func reader(t *testing.T, r *replica.Replica, h *rfpb.Header, fileRecord *rfpb.FileRecord) (io.ReadCloser, error) { + fs := filestore.New() + + buf, err := rbuilder.NewBatchBuilder().Add(&rfpb.GetMultiRequest{ + FileRecords: []*rfpb.FileRecord{fileRecord}, + }).ToBuf() + require.NoError(t, err) + readRsp, err := r.Lookup(buf) + require.NoError(t, err) + readBatch := rbuilder.NewBatchResponse(readRsp) + getMultiRsp, err := readBatch.GetMultiResponse(0) + require.NoError(t, err) + require.Equal(t, 1, len(getMultiRsp.GetResponses())) + err = gstatus.ErrorProto(getMultiRsp.GetResponses()[0].GetStatus()) + if err != nil { + return nil, err + } + md := getMultiRsp.GetResponses()[0].GetFileMetadata() + rc, err := fs.InlineReader(md.GetStorageMetadata().GetInlineMetadata(), 0, 0) + require.NoError(t, err) + return rc, nil +} + func writer(t *testing.T, em *entryMaker, r *replica.Replica, h *rfpb.Header, fileRecord *rfpb.FileRecord) interfaces.CommittedWriteCloser { fs := filestore.New() key, err := fs.PebbleKey(fileRecord) @@ -500,7 +524,6 @@ func TestReplicaScan(t *testing.T) { } func TestReplicaFileWriteSnapshotRestore(t *testing.T) { - ctx := context.Background() rootDir := testfs.MakeTempDir(t) store := &fakeStore{} repl := newTestReplica(t, rootDir, 1, 1, store) @@ -541,7 +564,7 @@ func TestReplicaFileWriteSnapshotRestore(t *testing.T) { require.Nil(t, writeCommitter.Commit()) require.Nil(t, writeCommitter.Close()) - readCloser, err := repl.Reader(ctx, header, fileRecord, 0, 0) + readCloser, err := reader(t, repl, header, fileRecord) require.NoError(t, err) require.Equal(t, r.GetDigest().GetHash(), testdigest.ReadDigestAndClose(t, readCloser).GetHash()) @@ -570,13 +593,12 @@ func TestReplicaFileWriteSnapshotRestore(t *testing.T) { require.NoError(t, err) // Verify that the file is readable. - readCloser, err = repl2.Reader(ctx, header, fileRecord, 0, 0) + readCloser, err = reader(t, repl2, header, fileRecord) require.NoError(t, err) require.Equal(t, r.GetDigest().GetHash(), testdigest.ReadDigestAndClose(t, readCloser).GetHash()) } func TestReplicaFileWriteDelete(t *testing.T) { - ctx := context.Background() rootDir := testfs.MakeTempDir(t) store := &fakeStore{} repl := newTestReplica(t, rootDir, 1, 1, store) @@ -611,7 +633,7 @@ func TestReplicaFileWriteDelete(t *testing.T) { // Verify that the file is readable. { - readCloser, err := repl.Reader(ctx, header, fileRecord, 0, 0) + readCloser, err := reader(t, repl, header, fileRecord) require.NoError(t, err) require.Equal(t, r.GetDigest().GetHash(), testdigest.ReadDigestAndClose(t, readCloser).GetHash()) } @@ -628,7 +650,7 @@ func TestReplicaFileWriteDelete(t *testing.T) { // Verify that the file is no longer readable and reading it returns a // NotFoundError. { - _, err := repl.Reader(ctx, header, fileRecord, 0, 0) + _, err := reader(t, repl, header, fileRecord) require.NotNil(t, err) require.True(t, status.IsNotFoundError(err), err) } @@ -659,15 +681,19 @@ func TestUsage(t *testing.T) { { ru, err := repl.Usage() require.NoError(t, err) - require.EqualValues(t, 2100, ru.GetEstimatedDiskBytesUsed()) require.Len(t, ru.GetPartitions(), 2) - defaultUsage := ru.GetPartitions()[0] - require.EqualValues(t, 1500, defaultUsage.GetSizeBytes()) - require.EqualValues(t, 2, defaultUsage.GetTotalCount()) - anotherUsage := ru.GetPartitions()[1] - require.EqualValues(t, 600, anotherUsage.GetSizeBytes()) - require.EqualValues(t, 3, anotherUsage.GetTotalCount()) + + for _, usage := range ru.GetPartitions() { + switch usage.GetPartitionId() { + case defaultPartition: + require.EqualValues(t, 1500, usage.GetSizeBytes()) + require.EqualValues(t, 2, usage.GetTotalCount()) + case anotherPartition: + require.EqualValues(t, 600, usage.GetSizeBytes()) + require.EqualValues(t, 3, usage.GetTotalCount()) + } + } } // Delete a single record and verify updated usage. @@ -679,11 +705,16 @@ func TestUsage(t *testing.T) { require.EqualValues(t, 1100, ru.GetEstimatedDiskBytesUsed()) require.Len(t, ru.GetPartitions(), 2) - defaultUsage := ru.GetPartitions()[0] - require.EqualValues(t, 500, defaultUsage.GetSizeBytes()) - require.EqualValues(t, 1, defaultUsage.GetTotalCount()) - anotherUsage := ru.GetPartitions()[1] - require.EqualValues(t, 600, anotherUsage.GetSizeBytes()) - require.EqualValues(t, 3, anotherUsage.GetTotalCount()) + + for _, usage := range ru.GetPartitions() { + switch usage.GetPartitionId() { + case defaultPartition: + require.EqualValues(t, 500, usage.GetSizeBytes()) + require.EqualValues(t, 1, usage.GetTotalCount()) + case anotherPartition: + require.EqualValues(t, 600, usage.GetSizeBytes()) + require.EqualValues(t, 3, usage.GetTotalCount()) + } + } } } diff --git a/enterprise/server/raft/store/BUILD b/enterprise/server/raft/store/BUILD index 45c98d6ca81..e7a9804ddfa 100644 --- a/enterprise/server/raft/store/BUILD +++ b/enterprise/server/raft/store/BUILD @@ -78,6 +78,7 @@ go_test( "@com_github_lni_dragonboat_v4//config", "@com_github_lni_dragonboat_v4//raftio", "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//status", "@org_golang_google_protobuf//proto", ], ) diff --git a/enterprise/server/raft/store/store.go b/enterprise/server/raft/store/store.go index 64d6a10f888..64abd60dcab 100644 --- a/enterprise/server/raft/store/store.go +++ b/enterprise/server/raft/store/store.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "flag" "fmt" - "io" "net" "sort" "sync" @@ -591,7 +590,7 @@ func (s *Store) replicaForRange(rangeID uint64) (*replica.Replica, *rfpb.RangeDe // done by using the LeasedRange function. func (s *Store) validatedRange(header *rfpb.Header) (*replica.Replica, *rfpb.RangeDescriptor, error) { if header == nil { - return nil, nil, status.FailedPreconditionError("Nil header not allowed") + return nil, nil, status.FailedPreconditionError("Header must be set (was nil)") } r, rd, err := s.replicaForRange(header.GetRangeId()) @@ -842,6 +841,7 @@ func (s *Store) SyncRead(ctx context.Context, req *rfpb.SyncReadRequest) (*rfpb. }, nil } +// TODO(tylerw): consolidate these in a non-confusing way. func (s *Store) FindMissing(ctx context.Context, req *rfpb.FindMissingRequest) (*rfpb.FindMissingResponse, error) { _, err := s.LeasedRange(req.GetHeader()) if err != nil { @@ -864,63 +864,66 @@ func (s *Store) FindMissing(ctx context.Context, req *rfpb.FindMissingRequest) ( } func (s *Store) GetMulti(ctx context.Context, req *rfpb.GetMultiRequest) (*rfpb.GetMultiResponse, error) { - r, err := s.LeasedRange(req.GetHeader()) + _, err := s.LeasedRange(req.GetHeader()) if err != nil { return nil, err } - data, err := r.GetMulti(ctx, req.GetHeader(), req.GetFileRecord()) + shardID := req.GetHeader().GetReplica().GetShardId() + batch, err := rbuilder.NewBatchBuilder().Add(req).ToProto() if err != nil { return nil, err } - return &rfpb.GetMultiResponse{ - Data: data, - }, nil -} - -type streamWriter struct { - stream rfspb.Api_ReadServer -} - -func (w *streamWriter) Write(buf []byte) (int, error) { - err := w.stream.Send(&rfpb.ReadResponse{ - Data: buf, - }) - return len(buf), err + batch.Header = req.GetHeader() + rsp, err := client.SyncReadLocal(ctx, s.nodeHost, shardID, batch) + if err != nil { + if err == dragonboat.ErrShardNotFound { + return nil, status.OutOfRangeErrorf("%s: cluster %d not found", constants.RangeLeaseInvalidMsg, shardID) + } + return nil, err + } + return rbuilder.NewBatchResponseFromProto(rsp).GetMultiResponse(0) } -func (s *Store) Metadata(ctx context.Context, req *rfpb.MetadataRequest) (*rfpb.MetadataResponse, error) { - r, err := s.LeasedRange(req.GetHeader()) +func (s *Store) SetMulti(ctx context.Context, req *rfpb.SetMultiRequest) (*rfpb.SetMultiResponse, error) { + _, err := s.LeasedRange(req.GetHeader()) if err != nil { return nil, err } - md, err := r.Metadata(ctx, req.GetHeader(), req.GetFileRecord()) + shardID := req.GetHeader().GetReplica().GetShardId() + batch, err := rbuilder.NewBatchBuilder().Add(req).ToProto() if err != nil { return nil, err } - - return &rfpb.MetadataResponse{Metadata: md}, nil + batch.Header = req.GetHeader() + rsp, err := client.SyncReadLocal(ctx, s.nodeHost, shardID, batch) + if err != nil { + if err == dragonboat.ErrShardNotFound { + return nil, status.OutOfRangeErrorf("%s: cluster %d not found", constants.RangeLeaseInvalidMsg, shardID) + } + return nil, err + } + return rbuilder.NewBatchResponseFromProto(rsp).SetMultiResponse(0) } -func (s *Store) Read(req *rfpb.ReadRequest, stream rfspb.Api_ReadServer) error { - r, err := s.LeasedRange(req.GetHeader()) +func (s *Store) Metadata(ctx context.Context, req *rfpb.MetadataRequest) (*rfpb.MetadataResponse, error) { + _, err := s.LeasedRange(req.GetHeader()) if err != nil { - return err + return nil, err } - - readCloser, err := r.Reader(stream.Context(), req.GetHeader(), req.GetFileRecord(), req.GetOffset(), req.GetLimit()) + shardID := req.GetHeader().GetReplica().GetShardId() + batch, err := rbuilder.NewBatchBuilder().Add(req).ToProto() if err != nil { - return err + return nil, err } - defer readCloser.Close() - - bufSize := int64(readBufSizeBytes) - d := req.GetFileRecord().GetDigest() - if d.GetSizeBytes() > 0 && d.GetSizeBytes() < bufSize { - bufSize = d.GetSizeBytes() + batch.Header = req.GetHeader() + rsp, err := client.SyncReadLocal(ctx, s.nodeHost, shardID, batch) + if err != nil { + if err == dragonboat.ErrShardNotFound { + return nil, status.OutOfRangeErrorf("%s: cluster %d not found", constants.RangeLeaseInvalidMsg, shardID) + } + return nil, err } - copyBuf := make([]byte, bufSize) - _, err = io.CopyBuffer(&streamWriter{stream}, readCloser, copyBuf) - return err + return rbuilder.NewBatchResponseFromProto(rsp).MetadataResponse(0) } func (s *Store) OnEvent(updateType serf.EventType, event serf.Event) { diff --git a/enterprise/server/raft/store/store_test.go b/enterprise/server/raft/store/store_test.go index b47ac594c52..243bcd34874 100644 --- a/enterprise/server/raft/store/store_test.go +++ b/enterprise/server/raft/store/store_test.go @@ -38,6 +38,7 @@ import ( rfpb "github.com/buildbuddy-io/buildbuddy/proto/raft" rfspb "github.com/buildbuddy-io/buildbuddy/proto/raft_service" dbConfig "github.com/lni/dragonboat/v4/config" + gstatus "google.golang.org/grpc/status" ) func localAddr(t *testing.T) string { @@ -326,16 +327,23 @@ func metadataKey(t *testing.T, fr *rfpb.FileRecord) []byte { } func readRecord(ctx context.Context, t *testing.T, ts *TestingStore, fr *rfpb.FileRecord) { + fs := filestore.New() fk := metadataKey(t, fr) err := ts.Sender.Run(ctx, fk, func(c rfspb.ApiClient, h *rfpb.Header) error { - rc, err := client.RemoteReader(ctx, c, &rfpb.ReadRequest{ - Header: h, - FileRecord: fr, + rsp, err := c.GetMulti(ctx, &rfpb.GetMultiRequest{ + Header: h, + FileRecords: []*rfpb.FileRecord{fr}, }) if err != nil { return err } + require.Equal(t, 1, len(rsp.GetResponses())) + err = gstatus.ErrorProto(rsp.GetResponses()[0].GetStatus()) + require.NoError(t, err) + md := rsp.GetResponses()[0].GetFileMetadata() + rc, err := fs.InlineReader(md.GetStorageMetadata().GetInlineMetadata(), 0, 0) + require.NoError(t, err) d := testdigest.ReadDigestAndClose(t, rc) require.True(t, proto.Equal(d, fr.GetDigest())) return nil @@ -563,20 +571,19 @@ func TestPostFactoSplit(t *testing.T) { time.Sleep(10 * time.Millisecond) } + // Transfer Leadership to the new node + _, err = s4.TransferLeadership(ctx, &rfpb.TransferLeadershipRequest{ + ShardId: 2, + TargetReplicaId: 4, + }) + require.NoError(t, err) // Now verify that all keys that should be on the new node are present. for _, fr := range written { fmk := metadataKey(t, fr) if bytes.Compare(fmk, splitResponse.GetStart().GetEnd()) >= 0 { continue } - rd := s4.GetRange(2) - rc, err := r4.Reader(ctx, &rfpb.Header{ - RangeId: rd.GetRangeId(), - Generation: rd.GetGeneration(), - }, fr, 0, 0) - require.NoError(t, err) - d := testdigest.ReadDigestAndClose(t, rc) - require.True(t, proto.Equal(d, fr.GetDigest())) + readRecord(ctx, t, s4, fr) } } diff --git a/enterprise/server/raft/usagetracker/usagetracker.go b/enterprise/server/raft/usagetracker/usagetracker.go index 6ececc6d4ff..1a066c768d8 100644 --- a/enterprise/server/raft/usagetracker/usagetracker.go +++ b/enterprise/server/raft/usagetracker/usagetracker.go @@ -224,7 +224,7 @@ func (pu *partitionUsage) refresh(ctx context.Context, key *ReplicaSample) (skip } return false, time.Time{}, err } - atime := time.UnixMicro(rsp.GetMetadata().GetLastAccessUsec()) + atime := time.UnixMicro(rsp.GetFileMetadata().GetLastAccessUsec()) return false, atime, nil } diff --git a/proto/raft.proto b/proto/raft.proto index 9ba78c6940e..c3cb59851ae 100644 --- a/proto/raft.proto +++ b/proto/raft.proto @@ -231,6 +231,9 @@ message RequestUnion { DeleteRangeRequest delete_range = 14; SimpleSplitRequest simple_split = 15; FindMissingRequest find_missing = 16; + GetMultiRequest get_multi = 17; + SetMultiRequest set_multi = 18; + MetadataRequest metadata = 19; } } @@ -253,6 +256,9 @@ message ResponseUnion { DeleteRangeResponse delete_range = 15; SimpleSplitResponse simple_split = 16; FindMissingResponse find_missing = 17; + GetMultiResponse get_multi = 18; + SetMultiResponse set_multi = 19; + MetadataResponse metadata = 20; } } @@ -532,52 +538,55 @@ message MetadataRequest { } message MetadataResponse { - FileMetadata metadata = 1; + FileMetadata file_metadata = 1; } -message ReadRequest { +message FindMissingRequest { Header header = 1; - FileRecord file_record = 2; - int64 offset = 3; - int64 limit = 4; + repeated FileRecord file_records = 2; } -message ReadResponse { - bytes data = 1; +message FindMissingResponse { + repeated FileRecord missing = 1; } -message WriteRequest { +message GetMultiRequest { Header header = 1; - FileRecord file_record = 2; - bool finish_write = 3; - bytes data = 4; + repeated FileRecord file_records = 2; } -message WriteResponse { - int64 committed_size = 1; -} +message GetMultiResponse { + message Response { + // The FileRecord that was requested. + FileRecord file_record = 1; -message FindMissingRequest { - Header header = 1; - repeated FileRecord file_records = 2; -} + // The FileMetadata that matches + FileMetadata file_metadata = 2; -message FindMissingResponse { - repeated FileRecord missing = 1; + // The result of attempting to upload that blob. + google.rpc.Status status = 3; + } + + // The responses to the requests. + repeated Response responses = 1; } -message GetMultiRequest { +message SetMultiRequest { Header header = 1; - Isolation isolation = 2; - repeated FileRecord file_record = 3; + repeated FileMetadata file_metadatas = 2; } -message GetMultiResponse { - message Data { +message SetMultiResponse { + message Response { + // The blob digest to which this response corresponds. FileRecord file_record = 1; - bytes data = 2; + + // The result of attempting to upload that blob. + google.rpc.Status status = 2; } - repeated Data data = 1; + + // The responses to the requests. + repeated Response responses = 1; } message TransferLeadershipRequest { diff --git a/proto/raft_service.proto b/proto/raft_service.proto index 0929b9a9c52..b570d8924c8 100644 --- a/proto/raft_service.proto +++ b/proto/raft_service.proto @@ -20,9 +20,9 @@ service Api { rpc TransferLeadership(TransferLeadershipRequest) returns (TransferLeadershipResponse); - // Data API. + // Metadata API. rpc Metadata(MetadataRequest) returns (MetadataResponse); - rpc Read(ReadRequest) returns (stream ReadResponse); rpc FindMissing(FindMissingRequest) returns (FindMissingResponse); rpc GetMulti(GetMultiRequest) returns (GetMultiResponse); + rpc SetMulti(SetMultiRequest) returns (SetMultiResponse); }