Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blob split and splice API #282

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 246 additions & 0 deletions build/bazel/remote/execution/v2/remote_execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,103 @@ service ContentAddressableStorage {
rpc GetTree(GetTreeRequest) returns (stream GetTreeResponse) {
option (google.api.http) = { get: "/v2/{instance_name=**}/blobs/{root_digest.hash}/{root_digest.size_bytes}:getTree" };
}

// Split a blob into chunks.
//
// This splitting API aims to reduce download traffic between client and
// server, e.g., if a client needs to fetch a large blob that just has been
// modified slightly since the last built. In this case, there is no need to
// fetch the entire blob data, but just the binary differences between the two
// blob versions, which are typically determined by deduplication techniques
// such as content-defined chunking.
//
// Clients can use this API before downloading a blob to determine which parts
// of the blob are already present locally and do not need to be downloaded
// again. The server splits the blob into chunks according to a specified
// content-defined chunking algorithm and returns a list of the chunk digests
// in the order in which the chunks have to be concatenated to assemble the
// requested blob.
//
// A client can expect the following guarantees from the server if a split
// request is answered successfully:
// 1. The blob chunks are stored in CAS.
// 2. Concatenating the blob chunks in the order of the digest list returned
// by the server results in the original blob.
//
// The usage of this API is optional for clients but it allows them to
// download only the missing parts of a large blob instead of the entire blob
// data, which in turn can considerably reduce download network traffic.
//
// Since the generated chunks are stored as blobs, they underlie the same
// lifetimes as other blobs. However, their lifetime is extended if they are
// part of the result of a split blob request.
//
// For the client, it is recommended to verify whether the digest of the blob
// assembled by the fetched chunks results in the requested blob digest.
//
// If several clients use blob splitting, it is recommended that they request
// the same splitting algorithm to benefit from each others chunking data. In
// combination with blob splicing, an agreement about the chunking algorithm
// is recommended since both client as well as server side can benefit from
// each others chunking data.
//
// Servers are free to implement this functionality, but they need to declare
// whether they support it or not by setting the
// [CacheCapabilities.blob_split_support][build.bazel.remote.execution.v2.CacheCapabilities.blob_split_support]
// field accordingly.
//
// Errors:
//
// * `NOT_FOUND`: The requested blob is not present in the CAS.
// * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the blob
// chunks.
rpc SplitBlob(SplitBlobRequest) returns (SplitBlobResponse) {
option (google.api.http) = { get: "/v2/{instance_name=**}/blobs/{blob_digest.hash}/{blob_digest.size_bytes}:splitBlob" };
}

// Splice a blob from chunks.
//
// This is the complementary operation to the
// [ContentAddressableStorage.SplitBlob][build.bazel.remote.execution.v2.ContentAddressableStorage.SplitBlob]
// function to handle the splitted upload of large blobs to save upload
// traffic.
//
// If a client needs to upload a large blob and is able to split a blob into
// chunks locally according to some content-defined chunking algorithm, it can
// first determine which parts of the blob are already available in the remote
// CAS and upload the missing chunks, and then use this API to instruct the
// server to splice the original blob from the remotely available blob chunks.
//
// In order to ensure data consistency of the CAS, the server will verify the
// spliced result whether digest calculation results in the provided digest
// from the request and will reject a splice request if this check fails.
//
// The usage of this API is optional for clients but it allows them to upload
// only the missing parts of a large blob instead of the entire blob data,
// which in turn can considerably reduce upload network traffic.
//
// In order to split a blob into chunks, it is recommended for the client to
// use one of the servers' advertised chunking algorithms by
// [CacheCapabilities.supported_chunking_algorithms][build.bazel.remote.execution.v2.CacheCapabilities.supported_chunking_algorithms]
// to benefit from each others chunking data. If several clients use blob
// splicing, it is recommended that they use the same splitting algorithm to
// split their blobs into chunks.
//
// Servers are free to implement this functionality, but they need to declare
// whether they support it or not by setting the
// [CacheCapabilities.blob_splice_support][build.bazel.remote.execution.v2.CacheCapabilities.blob_splice_support]
// field accordingly.
//
// Errors:
//
// * `NOT_FOUND`: At least one of the blob chunks is not present in the CAS.
// * `RESOURCE_EXHAUSTED`: There is insufficient disk quota to store the
// spliced blob.
// * `INVALID_ARGUMENT`: The digest of the spliced blob is different from the
// provided expected digest.
rpc SpliceBlob(SpliceBlobRequest) returns (SpliceBlobResponse) {
option (google.api.http) = { post: "/v2/{instance_name=**}/blobs:spliceBlob" body: "*" };
}
}

// The Capabilities service may be used by remote execution clients to query
Expand Down Expand Up @@ -1814,6 +1911,97 @@ message GetTreeResponse {
string next_page_token = 2;
}

// A request message for
// [ContentAddressableStorage.SplitBlob][build.bazel.remote.execution.v2.ContentAddressableStorage.SplitBlob].
message SplitBlobRequest {
// The instance of the execution system to operate against. A server may
// support multiple instances of the execution system (with their own workers,
// storage, caches, etc.). The server MAY require use of this field to select
// between them in an implementation-defined fashion, otherwise it can be
// omitted.
string instance_name = 1;

// The digest of the blob to be splitted.
Digest blob_digest = 2;
roloffs marked this conversation as resolved.
Show resolved Hide resolved

// The chunking algorithm to be used. Must be DEFAULT or one of the algorithms
// advertised by the
// [CacheCapabilities.supported_chunking_algorithms][build.bazel.remote.execution.v2.CacheCapabilities.supported_chunking_algorithms]
// field.
ChunkingAlgorithm.Value chunking_algorithm = 3;

// The digest function of the blob to be splitted.
//
// If the digest function used is one of MD5, MURMUR3, SHA1, SHA256,
// SHA384, SHA512, or VSO, the client MAY leave this field unset. In
// that case the server SHOULD infer the digest function using the
// length of the blob digest hashes and the digest functions announced
// in the server's capabilities.
DigestFunction.Value digest_function = 4;
}

// A response message for
// [ContentAddressableStorage.SplitBlob][build.bazel.remote.execution.v2.ContentAddressableStorage.SplitBlob].
message SplitBlobResponse {
// The ordered list of digests of the chunks into which the blob was splitted.
// The original blob is assembled by concatenating the chunk data according to
// the order of the digests given by this list.
repeated Digest chunk_digests = 1;

// The digest function of the chunks.
//
// If the digest function used is one of MD5, MURMUR3, SHA1, SHA256,
// SHA384, SHA512, or VSO, the client MAY leave this field unset. In
// that case the server SHOULD infer the digest function using the
// length of the blob digest hashes and the digest functions announced
// in the server's capabilities.
DigestFunction.Value digest_function = 2;
}

// A request message for
// [ContentAddressableStorage.SpliceBlob][build.bazel.remote.execution.v2.ContentAddressableStorage.SpliceBlob].
message SpliceBlobRequest {
// The instance of the execution system to operate against. A server may
// support multiple instances of the execution system (with their own workers,
// storage, caches, etc.). The server MAY require use of this field to select
// between them in an implementation-defined fashion, otherwise it can be
// omitted.
string instance_name = 1;

// Expected digest of the spliced blob.
Digest blob_digest = 2;
roloffs marked this conversation as resolved.
Show resolved Hide resolved

// The ordered list of digests of the chunks which need to be concatenated to
// assemble the original blob.
roloffs marked this conversation as resolved.
Show resolved Hide resolved
repeated Digest chunk_digests = 3;

// The digest function of the blob to be spliced as well as of the chunks to
// be concatenated.
//
// If the digest function used is one of MD5, MURMUR3, SHA1, SHA256,
// SHA384, SHA512, or VSO, the client MAY leave this field unset. In
// that case the server SHOULD infer the digest function using the
// length of the blob digest hashes and the digest functions announced
// in the server's capabilities.
DigestFunction.Value digest_function = 4;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should design this SpliceBlobRequest as part of BatchUpdateBlobsRequest.

The problem is that we are not pushing the chunk blobs in this request, only sending the CAS server metadata regarding combining some blobs into a larger blob. There could be a delay between the BatchUpdateBlobs RPC call and the SpliceBlob RPC call. That delay could be a few mili-seconds, or it could be weeks, or months after some of the uploaded chunks have expired from the CAS server. There is no transaction guarantee between the 2 RPCs.

So a way to have some form of transaction guarantee would be to send this as part of the same RPC that uploads all the blobs.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we are not pushing the chunk blobs in this request, only sending the CAS server metadata regarding combining some blobs into a larger blob.

But that is the overarching principle behind the whole protocol that we upload blobs and later refer to them by their digests. All that works, because the CAS promises to keep all blobs fresh (i.e., not forget about them for a reasonable amount of time) where its answer implies it knows about them (that could be the answer to a FindMissingBlobs request or a success statement to a blob upload request). The typical workflow for a client using this request would anyway be to split the blob locally, use FindMissingBlobs to find out which blobs are not yet known to the CAS, then (batch) upload only the ones not yet known to the CAS (that's where the savings in traffic come from) and then request a splice of all of them. All this works becasue the promise of the CAS to keep referenced objects alive.

To give a prominent example where the protocol is already relying on that guarantee to keep objects alive, consider the request to execute an action. That request does not upload any blobs, yet still expects them to be there because a recent interaction with the CAS showed they are in the CAS already. In a sense, that example also shows that blob splicing is nothing fundamentally new, but just an optimisation: the client could already now request an action calling cat be executed—in a request that is independent of the blob upload. However, making it explicitly an operation on the CAS gives a huge room for optimization: no need to spawn an action-execution environment, the CAS knows ahead of time hash and size of the blob that is to be stored as a result of that request, if it known the blob in question it does not even have to do anything (apart from keeping that blob fresh), etc.


// A response message for
// [ContentAddressableStorage.SpliceBlob][build.bazel.remote.execution.v2.ContentAddressableStorage.SpliceBlob].
message SpliceBlobResponse {
// Computed digest of the spliced blob.
roloffs marked this conversation as resolved.
Show resolved Hide resolved
Digest blob_digest = 1;

// The digest function of the spliced blob.
//
// If the digest function used is one of MD5, MURMUR3, SHA1, SHA256,
// SHA384, SHA512, or VSO, the client MAY leave this field unset. In
// that case the server SHOULD infer the digest function using the
// length of the blob digest hashes and the digest functions announced
// in the server's capabilities.
DigestFunction.Value digest_function = 2;
}

// A request message for
// [Capabilities.GetCapabilities][build.bazel.remote.execution.v2.Capabilities.GetCapabilities].
message GetCapabilitiesRequest {
Expand Down Expand Up @@ -2000,6 +2188,42 @@ message Compressor {
}
}

// Content-defined chunking algorithms used for splitting blobs into chunks.
message ChunkingAlgorithm {
enum Value {
// No special constraints about the used chunking algorithm. If a client
// selects this value this means he does not care about which exact chunking
// algorithm is used. Thus, the server is free to use any chunking algorithm
// at its disposal. A server does not need to advertise this value.
DEFAULT = 0;

// Content-defined chunking using Rabin fingerprints. An implementation of
// this scheme in presented in this paper
// https://link.springer.com/chapter/10.1007/978-1-4613-9323-8_11. The final
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This document is behind a paywall. Any chance we can link to a spec that is freely accessible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it this paper? http://cui.unige.ch/tcs/cours/algoweb/2002/articles/art_marculescu_andrei_1.pdf

Even though that paper provides a fairly thorough mathematical definition of how Rabin fingerprints work, it's not entirely obvious to me how it translates to an actual algorithm for us to use. Any chance we can include some pseudocode, or link to a publication that provides it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, that is the paper, I can update the link. I really spend some time trying to find a nice algorithmic description for the Rabin fingerprint method, but failed. Only I could find some real implementations on GitHub, but I assume this is not something, we would like to link here. There is also the original paper of the Rabin method http://www.xmailserver.org/rabin.pdf, but that one doesn't seem to help either. The paper above gives a reasonable introduction to how Rabin fingerprints works and even some thoughts about how to implement it, so I thought, that is the best source to link here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the paper about FastCDC https://ieeexplore.ieee.org/document/9055082 also contains an algorithmic description of the Rabin fingerprinting technique (Algorithm 3. RabinCDC8KB). The only thing that is missing here is a precise description of the precomputed U and T arrays. Even though, they provide links to other papers, where these arrays are supposedly defined, I could not find any definition in these papers.

// implementation of this algorithm should be configured to have the
// following properties on resulting chunk sizes.
// - Minimum chunk size: 128 KB
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that there is a...

1 - ((2^19-1)/(2^19))^(128*1024) = 22.12...%

... probability that chunks actually contain a cutoff point that wasn't taken because it would violate the minimum chunk size? That probability sounds a lot higher than I'd imagine to be acceptable.

Consider the case where you inject some arbitrary data close to the beginning of a chunk that had a cutoff point within the first 128 KB that wasn't respected. If the injected data causes the cutoff point to be pushed above the 128 KB boundary, that would cause the cutoff point to be respected. This in turn could in its turn cause successive chunks to use different cutoff points as well.

Maybe it makes more sense to pick 16 KB or 32 KB here?

// - Average chunk size: 512 KB (0x000000000007FFFF bit mask)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming that the idea is that you process input byte for byte, compute a fingerprint over a sliding window, and create a chunk if the last 19 bits of the hash are all zeroes or ones (which is it?), are you sure that this will give an average chunk size of 512 KB? That would only hold if there was no minimum size, right? So shouldn't the average chunk size be 128+512 = 640 KB?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the 19 lowest bits of the fingerprint need to be zero for the expression (fp & mask) to become 0, in which case you found a chunk boundary. Regarding your question about the average chunk size, you are right, the actual average chunk size = expected chunk size (512 KB) + minimum chunk size (128 KB). I will state this more clearly in the comments, thanks for pointing this out!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did some measurements, and it looks like using fp & mask == 0 is actually a pretty bad choice. The reason being that it's pretty easy to craft windows for which fp == 0, namely a sequence consisting exclusively of 0-bytes. This has also been observed here:

https://ieeexplore.ieee.org/document/9006560

Although using a cut value of zero seems to be a natural choice in theory, it turns out to be a poor choice in practice (Figure 2).

I noticed this especially when creating chunks of a Linux kernel source tarball (decompressed), for the same reason as stated in the article:

It turns out that tar files use zeroes to pad out internal file structures. These padded structures cause an explosion of 4-byte (or smaller) length chunks if the cut value is also zero. In fact, over 98% of the chunks are 4-bytes long (Figure 2, (W16,P15,M16,C0) Table I).

I didn't observe the 4-byte chunks, for the reason that I used a minimum size as documented.

Results look a lot better if I use fp & mask == mask.

// - Maximum chunk size: 2048 KB
Copy link
Collaborator

@EdSchouten EdSchouten Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming my understanding of the algorithm is correct, doesn't that mean that there is a...

((2^19-1)/(2^19))^((2048-128)*1024) = 2.352...%

... probability that chunks end up reaching the maximum size? This sounds relatively high, but is arguably unavoidable.

A bit odd that these kinds of algorithms don't attempt to account for this, for example by repeatedly rerunning the algorithm with a smaller bit mask (18, 17, 16, [...] bits) until a match is found. That way you get a more even spread of data across such chunks, and need to upload fewer chunks in case data is injected into/removed from neighbouring chunks that both reach the 2 MB limit.

// The irreducible polynomial to be used for the modulo divisions is the
// following 64-bit polynomial of degree 53: 0x003DA3358B4DC173. The window
// size to be used is 64 bits.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ NOTE: I'm absolutely not an expert on content defined chunking algorithms! ]

Is a window size of 64 bits intentional? If I look at stuff like https://pdos.csail.mit.edu/papers/lbfs:sosp01/lbfs.pdf, it seems they are using 48 bytes, while in their case they are aiming for min=2KB, avg=8KB, max=64KB. Shouldn't this be scaled proportionally?

To me it's also not fully clear why an algorithm like this makes a distinction between a minimum chunk size and a window size. Why wouldn't one simply pick a 128 KB window and slide over that?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, that was a mistake from myside, what was meant is 64 bytes window size. I took this value from an existing implementation. Thanks for this finding!

The window size is a general parameter of a rolling hash function and the hash value or also fingerprint for a specific byte position is calculated for that window of bytes. Then, you move forward by one byte and calculate the hash value for this new window of bytes again. Thanks to the rolling-hash property, this process can be done very efficiently. So, the window size influences the hash value for a specific byte position and thus, the locations of actual chunk boundaries. Theoretically, we could use a window size of the minimum chunk size of 128 KB, but it is not common to use such a large window size in the implementations of content-defined chunking algorithms I have seen so far.

RABINCDC = 1;

// Content-defined chunking using the FastCDC algorithm. The algorithm is
// described in this paper https://ieeexplore.ieee.org/document/9055082
// (Algorithm 2, FastCDC8KB). The algorithm should be configured to have the
// following properties on resulting chunk sizes.
// - Minimum chunk size: 128 KB
// - Average chunk size: 512 KB
// - Maximum chunk size: 2048 KB
Comment on lines +2217 to +2219
Copy link
Collaborator

@EdSchouten EdSchouten Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at algorithm 2 in that paper, I see that I can indeed plug in these values to MinSize, MaxSize, and NormalSize. But what values should I use for MaskS and MaskL now? (There's also MaskA, but that seems unused by the stock algorithm.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, as far as I have understood the algorithm, it is the normalized chunking technique that they use, which allows to keep these mask values as they are, but change the min/max/average chunk sizes according to your needs.

Copy link
Collaborator

@EdSchouten EdSchouten Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at algorithm 2 in that paper that attempts to compute 8 KB chunks, I think the probability of a random chunk having size at most x should be as follows (link):

desmos-graph

Now if I change that graph to use the minimum/average/maximum chunk sizes that you propose while leaving the bitmasks unaltered, I see this (link):

desmos-graph-2

If I change it so that MaskS has 21 bits and MaskL has 17, then the graph starts to resemble the original one (link):

desmos-graph-3

So I do think the masks need to be adjusted as well.

Copy link
Collaborator

@EdSchouten EdSchouten Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Making the graphs above was meaningful. I think it gives good insight in why FastCDC was designed the way it is. They are essentially trying to mimic a normal distribution (link):

desmos-graph-5

Using a literal normal distribution would not be desirable, because it means that the probability that a chunk is created at a given point not only depends on the data within the window, but also the size of the current chunk. And this is exactly what CDC tries to prevent. So that's why they emulate it using three partial functions.

Yeah, we should make sure to set MaskS and MaskL accordingly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks @EdSchouten for this great analysis. I have to admit, I did not look into these mask values in that detail, because it also was not really explained in detail in the paper, but your concerns are absolutely right and we have to set the mask values accordingly, when the min/max/average chunk sizes are changed. I am just asking myself, since the paper authors mentioned they derived the mask values empirically, how should we adapt them?

@luluz66 , @sluongng since you mentioned you were working with the FastCDC algorithm with chunk sizes of 500 KB, I am wondering how you handled the adaption of the mask values or whether you did it at all. Can you share your experience here? Thank you so much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have our fork of CDC implementation here https://github.com/buildbuddy-io/fastcdc-go/blob/47805a2ecd550cb875f1b797a47a1a648a1feed1/fastcdc.go#L162-L172

This work is very much in progress and not final. We hope that the current API will come with configurable knobs so that downstream implementation could choose what's best for their use cases and data.

Copy link
Collaborator

@EdSchouten EdSchouten Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just asking myself, since the paper authors mentioned they derived the mask values empirically, how should we adapt them?

I think that the authors of the paper just used the following heuristics:

  1. Don't use the bottom 16 bits, because those likely contain low quality data.
  2. Don't use the top 16 bits to get an equal comparison against many common Rabin fingerprinting implementations that use a 48 byte window.

(1) sounds like a good idea, but (2) can likely be ignored. So just pick 21 out of the top 48 bits, and then another 17 bits that are a subset of the former.

// The 256 64-bit random numbers in the Gear table are to be created with
// the Mersenne Twister pseudo-random number generator for 64-bit numbers
// with a state size of 19937 bits and a seed of 0.
FASTCDC = 2;
}
}

// Capabilities of the remote cache system.
message CacheCapabilities {
// All the digest functions supported by the remote cache.
Expand Down Expand Up @@ -2033,6 +2257,28 @@ message CacheCapabilities {
// [BatchUpdateBlobs][build.bazel.remote.execution.v2.ContentAddressableStorage.BatchUpdateBlobs]
// requests.
repeated Compressor.Value supported_batch_update_compressors = 7;

// All specifically defined chunking algorithms supported by the remote cache
// in addition to a DEFAULT implementation. A remote cache may support
// multiple chunking algorithms simultaneously. A server always has to provide
// a DEFAULT implementation, but does not need to advertise it here. This
// field is mainly existing for negotiation purposes between client and server
// to agree on an exact chunking algorithm.
repeated ChunkingAlgorithm.Value supported_chunking_algorithms = 8;

// Whether blob splitting is supported for the particular server/instance. If
// yes, the server/instance implements the specified behavior for blob
// splitting and a meaningful result can be expected from the
// [ContentAddressableStorage.SplitBlob][build.bazel.remote.execution.v2.ContentAddressableStorage.SplitBlob]
// operation.
bool blob_split_support = 9;

// Whether blob splicing is supported for the particular server/instance. If
// yes, the server/instance implements the specified behavior for blob
// splicing and a meaningful result can be expected from the
// [ContentAddressableStorage.SpliceBlob][build.bazel.remote.execution.v2.ContentAddressableStorage.SpliceBlob]
// operation.
bool blob_splice_support = 10;
}

// Capabilities of the remote execution system.
Expand Down