Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make reads and writes linearizable #4996

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions enterprise/server/raft/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,27 @@ func (rc *RaftCache) Reader(ctx context.Context, r *rspb.ResourceName, uncompres
return nil, err
}

var readCloser io.ReadCloser
var rsp *rfpb.GetMultiResponse
err = rc.sender.Run(ctx, fileMetadataKey, func(c rfspb.ApiClient, h *rfpb.Header) error {
req := &rfpb.ReadRequest{
Header: h,
FileRecord: fileRecord,
Offset: uncompressedOffset,
Limit: limit,
req := &rfpb.GetMultiRequest{
Header: h,
FileRecords: []*rfpb.FileRecord{fileRecord},
}
r, err := rc.apiClient.RemoteReader(ctx, c, req)
r, err := c.GetMulti(ctx, req)
if err != nil {
return err
}
readCloser = r
rsp = r
return nil
})
return readCloser, err
if err != nil {
return nil, err
}
if len(rsp.GetResponses()) != 1 {
return nil, status.InternalError("GetMulti response did not containe requested FileRecord")
}
md := rsp.GetResponses()[0].GetFileMetadata()
return rc.fileStorer.InlineReader(md.GetStorageMetadata().GetInlineMetadata(), uncompressedOffset, limit)
}

type raftWriteCloser struct {
Expand Down Expand Up @@ -548,7 +553,7 @@ func (rc *RaftCache) GetMulti(ctx context.Context, resources []*rspb.ResourceNam
if !ok {
return nil, status.InternalError("type is not *rfpb.FileRecord")
}
req.FileRecord = append(req.FileRecord, fr)
req.FileRecords = append(req.FileRecords, fr)
}
return c.GetMulti(ctx, req)
})
Expand All @@ -560,10 +565,10 @@ func (rc *RaftCache) GetMulti(ctx context.Context, resources []*rspb.ResourceNam
for _, rsp := range rsps {
fmr, ok := rsp.(*rfpb.GetMultiResponse)
if !ok {
return nil, status.InternalError("response not of type *rfpb.FindMissingResponse")
return nil, status.InternalError("response not of type *rfpb.GetMultiResponse")
}
for _, frd := range fmr.GetData() {
dataMap[frd.GetFileRecord().GetDigest()] = frd.GetData()
for _, r := range fmr.GetResponses() {
dataMap[r.GetFileRecord().GetDigest()] = r.GetFileMetadata().GetStorageMetadata().GetInlineMetadata().GetData()
}
}

Expand Down
51 changes: 0 additions & 51 deletions enterprise/server/raft/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client
import (
"context"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -75,56 +74,6 @@ func (c *APIClient) Get(ctx context.Context, peer string) (rfspb.ApiClient, erro
return c.getClient(ctx, peer)
}

func RemoteReader(ctx context.Context, client rfspb.ApiClient, req *rfpb.ReadRequest) (io.ReadCloser, error) {
stream, err := client.Read(ctx, req)
if err != nil {
return nil, err
}
reader, writer := io.Pipe()

// Bit annoying here -- the gRPC stream won't give us an error until
// we've called Recv on it. But we don't want to return a reader that
// we know will error on first read with NotFound -- we want to return
// that error now. So we'll wait for our goroutine to call Recv once
// and return any error it gets in the main thread.
firstError := make(chan error)
go func() {
readOnce := false
for {
rsp, err := stream.Recv()
if !readOnce {
firstError <- err
readOnce = true
}
if rsp != nil {
writer.Write(rsp.Data)
}
if err == io.EOF {
writer.Close()
break
}
if err != nil {
writer.CloseWithError(err)
break
}

}
}()
err = <-firstError

// If we get an EOF, and we're expecting one - don't return an error.
digestSize := req.GetFileRecord().GetDigest().GetSizeBytes()
offset := req.GetOffset()
if err == io.EOF && offset == digestSize {
return reader, nil
}
return reader, err
}

func (c *APIClient) RemoteReader(ctx context.Context, client rfspb.ApiClient, req *rfpb.ReadRequest) (io.ReadCloser, error) {
return RemoteReader(ctx, client, req)
}

func RunNodehostFn(ctx context.Context, nhf func(ctx context.Context) error) error {
if _, ok := ctx.Deadline(); !ok {
c, cancel := context.WithTimeout(ctx, DefaultContextTimeout)
Expand Down
39 changes: 39 additions & 0 deletions enterprise/server/raft/rbuilder/rbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ func (bb *BatchBuilder) Add(m proto.Message) *BatchBuilder {
req.Value = &rfpb.RequestUnion_FindMissing{
FindMissing: value,
}
case *rfpb.GetMultiRequest:
req.Value = &rfpb.RequestUnion_GetMulti{
GetMulti: value,
}
case *rfpb.SetMultiRequest:
req.Value = &rfpb.RequestUnion_SetMulti{
SetMulti: value,
}
case *rfpb.MetadataRequest:
req.Value = &rfpb.RequestUnion_Metadata{
Metadata: value,
}
default:
bb.setErr(status.FailedPreconditionErrorf("BatchBuilder.Add handling for %+v not implemented.", m))
return bb
Expand Down Expand Up @@ -242,3 +254,30 @@ func (br *BatchResponse) FindMissingResponse(n int) (*rfpb.FindMissingResponse,
u := br.cmd.GetUnion()[n]
return u.GetFindMissing(), br.unionError(u)
}

func (br *BatchResponse) GetMultiResponse(n int) (*rfpb.GetMultiResponse, error) {
br.checkIndex(n)
if br.err != nil {
return nil, br.err
}
u := br.cmd.GetUnion()[n]
return u.GetGetMulti(), br.unionError(u)
}

func (br *BatchResponse) SetMultiResponse(n int) (*rfpb.SetMultiResponse, error) {
br.checkIndex(n)
if br.err != nil {
return nil, br.err
}
u := br.cmd.GetUnion()[n]
return u.GetSetMulti(), br.unionError(u)
}

func (br *BatchResponse) MetadataResponse(n int) (*rfpb.MetadataResponse, error) {
br.checkIndex(n)
if br.err != nil {
return nil, br.err
}
u := br.cmd.GetUnion()[n]
return u.GetMetadata(), br.unionError(u)
}
2 changes: 1 addition & 1 deletion enterprise/server/raft/replica/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"//enterprise/server/util/pebble",
"//proto:raft_go_proto",
"//proto:raft_service_go_proto",
"//server/interfaces",
"//server/metrics",
"//server/util/log",
"//server/util/qps",
Expand Down Expand Up @@ -52,6 +51,7 @@ go_test(
"//server/util/status",
"@com_github_lni_dragonboat_v4//statemachine",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//proto",
],
)
Loading