Skip to content

Commit

Permalink
[server][dvc] Enable parallel transfers for each partition during blo…
Browse files Browse the repository at this point in the history
…b transfer. (#1266)
  • Loading branch information
jingy-li authored Oct 30, 2024
1 parent 416b7db commit 8889ab3
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new DaVinciBlobFinder(storeClient));
new DaVinciBlobFinder(storeClient),
baseDir);
manager.start();
return manager;
} catch (Exception e) {
Expand Down Expand Up @@ -111,7 +112,8 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir, blobSnapshotManager),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new ServerBlobFinder(customizedViewFuture));
new ServerBlobFinder(customizedViewFuture),
baseDir);
manager.start();
return manager;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
import com.linkedin.venice.blobtransfer.BlobFinder;
import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse;
import com.linkedin.venice.exceptions.VeniceBlobTransferFileNotFoundException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VenicePeersConnectionException;
import com.linkedin.venice.exceptions.VenicePeersNotFoundException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.Utils;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -26,147 +27,172 @@
*/
public class NettyP2PBlobTransferManager implements P2PBlobTransferManager<Void> {
private static final Logger LOGGER = LogManager.getLogger(NettyP2PBlobTransferManager.class);
protected static final int MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST = 3;
protected static final int MAX_TIMEOUT_FOR_BLOB_TRANSFER_IN_MIN = 60;
// Log messages format
private static final String NO_PEERS_FOUND_ERROR_MSG_FORMAT =
"Replica: %s are not found any peers for the requested blob.";
private static final String NO_VALID_PEERS_MSG_FORMAT =
"Replica %s failed to connect to any peer, after trying all possible hosts.";
private static final String FETCHED_BLOB_SUCCESS_MSG =
"Replica {} successfully fetched blob from peer {} in {} seconds";
private static final String PEER_CONNECTION_EXCEPTION_MSG =
"Replica {} get error when connect to peer: {}. Exception: {}";
private static final String PEER_NO_SNAPSHOT_MSG =
"Replica {} peer {} does not have the requested blob. Exception: {}";
private static final String FAILED_TO_FETCH_BLOB_MSG =
"Replica {} failed to fetch blob from peer {}. Deleting partially downloaded blobs. Exception: {}";

private final P2PBlobTransferService blobTransferService;
// netty client is responsible to make requests against other peers for blob fetching
protected final NettyFileTransferClient nettyClient;
// peer finder is responsible to find the peers that have the requested blob
protected final BlobFinder peerFinder;
private final String baseDir;

public NettyP2PBlobTransferManager(
P2PBlobTransferService blobTransferService,
NettyFileTransferClient nettyClient,
BlobFinder peerFinder) {
BlobFinder peerFinder,
String baseDir) {
this.blobTransferService = blobTransferService;
this.nettyClient = nettyClient;
this.peerFinder = peerFinder;
this.baseDir = baseDir;
}

@Override
public void start() throws Exception {
blobTransferService.start();
}

@Override
public CompletionStage<InputStream> get(String storeName, int version, int partition)
throws VenicePeersNotFoundException {
CompletableFuture<InputStream> resultFuture = new CompletableFuture<>();
// 1. Discover peers for the requested blob
BlobPeersDiscoveryResponse response = peerFinder.discoverBlobPeers(storeName, version, partition);
if (response == null || response.isError() || response.getDiscoveryResult() == null
|| response.getDiscoveryResult().isEmpty()) {
// error case 1: no peers are found for the requested blob
String errorMsg = String.format(
NO_PEERS_FOUND_ERROR_MSG_FORMAT,
Utils.getReplicaId(Version.composeKafkaTopic(storeName, version), partition));
resultFuture.completeExceptionally(new VenicePeersNotFoundException(errorMsg));
return resultFuture;
}

List<String> discoverPeers = response.getDiscoveryResult();
LOGGER
.info("Discovered peers {} for store {} version {} partition {}", discoverPeers, storeName, version, partition);

// 2: Process peers sequentially to fetch the blob
processPeersSequentially(discoverPeers, storeName, version, partition, resultFuture);

return resultFuture;
}

/**
* Get the blobs for the given storeName and partition
* error cases:
* 1. [Fatal Case] If no peers info are found for the requested blob, a VenicePeersNotFoundException is thrown.
* Process the peers sequentially to fetch the blob for the given storeName and partition
* - Error cases:
* - Fatal cases, skip bootstrapping from blob:
* 1. If no peers info are found for the requested blob, a VenicePeersNotFoundException is thrown.
* In this case, blob transfer is not used for bootstrapping at all.
* 2. If one host connect error, it will throw VenicePeersCannotConnectException and retry connecting to the peer again
* After MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST times, if still failed to connect, move to the next possible host.
* 3. If the connected host does not have the requested file,
* a VeniceBlobTransferFileNotFoundException is thrown, and the process moves on to the next available host.
* 4. [Fatal Case] If any unexpected exception occurs, such as InterruptedException, ExecutionException, or TimeoutException
* during the file/metadata transfer, a VeniceException is thrown, and blob transfer is skipped for bootstrapping to save time.
* 5. [Fatal Case] If all peers fail to connect or have no snapshot, a VenicePeersNotFoundException is thrown,
* 2. If all peers fail to connect or have no snapshot, a VenicePeersNotFoundException is thrown,
* and Kafka is used for bootstrapping instead.
*
* success case:
* - Non-fatal cases, move to the next possible host:
* 3. If one host connect error, it will throw VenicePeersCannotConnectException then move to the next possible host.
* 4. If the connected host does not have the requested file,
* a VeniceBlobTransferFileNotFoundException is thrown, and the process moves on to the next available host.
* 5. If any unexpected exception occurs, such as InterruptedException, ExecutionException, or TimeoutException
* during the file/metadata transfer, a VeniceException is thrown,
* and the process moves on to the next possible host, and the partially downloaded blobs are deleted.
*
* - Success case:
* 1. If the blob is successfully fetched from a peer, an InputStream of the blob is returned.
*
* @param peers the list of peers to process
* @param storeName the name of the store
* @param version the version of the store
* @param partition the partition of the store
* @return the InputStream of the blob
* @throws VenicePeersNotFoundException
* @param resultFuture the future to complete with the InputStream of the blob
*/
@Override
public CompletionStage<InputStream> get(String storeName, int version, int partition)
throws VenicePeersNotFoundException {
// error case 1: no peers are found for the requested blob
BlobPeersDiscoveryResponse response = peerFinder.discoverBlobPeers(storeName, version, partition);
if (response == null || response.isError()) {
throw new VenicePeersNotFoundException("Failed to obtain the peers for the requested blob");
}
private void processPeersSequentially(
List<String> peers,
String storeName,
int version,
int partition,
CompletableFuture<InputStream> resultFuture) {
String replicaId = Utils.getReplicaId(Version.composeKafkaTopic(storeName, version), partition);
Instant startTime = Instant.now();

List<String> discoverPeers = response.getDiscoveryResult();
if (discoverPeers == null || discoverPeers.isEmpty()) {
throw new VenicePeersNotFoundException("No peers found for the requested blob");
}
LOGGER
.info("Discovered peers {} for store {} version {} partition {}", discoverPeers, storeName, version, partition);
// Create a CompletableFuture that represents the chain of processing all peers
CompletableFuture<Void> chainOfPeersFuture = CompletableFuture.completedFuture(null);

Instant startTime = Instant.now();
for (String peer: discoverPeers) {
String chosenHost = peer.split("_")[0];
int retryCount = 0;
while (retryCount < MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST) {
try {
// instanceName comes as a format of <hostName>_<applicationPort>
LOGGER.info("Attempt {} to connect to host: {}", retryCount + 1, chosenHost);
CompletableFuture<InputStream> inputStreamFuture =
nettyClient.get(chosenHost, storeName, version, partition).toCompletableFuture();
InputStream inputStream = inputStreamFuture.get(MAX_TIMEOUT_FOR_BLOB_TRANSFER_IN_MIN, TimeUnit.MINUTES);
LOGGER.info(
"Successfully fetched blob from peer {} for store {} partition {} version {} in {} seconds",
peer,
storeName,
partition,
version,
Duration.between(startTime, Instant.now()).getSeconds());
return CompletableFuture.completedFuture(inputStream);
} catch (Exception e) {
if (e.getCause() instanceof VenicePeersConnectionException) {
// error case 2: failed to connect to the peer,
// solution: retry connecting to the peer again up to MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST times
LOGGER.warn(
"Get error when connect to peer: {} for store {} version {} partition {}, retrying {}/{}",
peer,
storeName,
version,
partition,
retryCount + 1,
MAX_RETRIES_FOR_BLOB_TRANSFER_PER_HOST,
e);
retryCount++;
} else if (e.getCause() instanceof VeniceBlobTransferFileNotFoundException) {
// error case 3: the connected host does not have the requested file,
// solution: move to next possible host
LOGGER.warn(
"Peer {} does not have the requested blob for store {} version {} partition {}, moving to next possible host.",
peer,
storeName,
version,
partition,
e);
break;
} else {
// error case 4:
// other exceptions (InterruptedException, ExecutionException, TimeoutException) that are not expected,
// solution: do not use blob transfer to bootstrap at all for saving time
String errorMessage = String.format(
"Failed to connect to peer %s for partition %d store %s version %d with exception. "
+ "Skip bootstrap the partition from blob transfer.",
peer,
partition,
storeName,
version);
LOGGER.error(errorMessage, e);
throw new VeniceException(errorMessage, e);
}
// Iterate through each peer and chain the futures
for (int currentPeerIndex = 0; currentPeerIndex < peers.size(); currentPeerIndex++) {
final int peerIndex = currentPeerIndex;
// Chain the next operation to the previous future
chainOfPeersFuture = chainOfPeersFuture.thenCompose(v -> {
String chosenHost = peers.get(peerIndex).split("_")[0];

if (resultFuture.isDone()) {
// If the result future is already completed, skip the current peer
return CompletableFuture.completedFuture(null);
}
}

LOGGER.warn(
"Failed to connect to peer {} for partition {} store {} version {} after {} attempts, "
+ "moving to next possible host to bootstrap the partition.",
peer,
partition,
storeName,
version,
retryCount);
// Attempt to fetch the blob from the current peer asynchronously
LOGGER.info("Attempting to connect to host: {}", chosenHost);

return nettyClient.get(chosenHost, storeName, version, partition)
.toCompletableFuture()
.thenAccept(inputStream -> {
// Success case: Complete the future with the input stream
LOGGER.info(
FETCHED_BLOB_SUCCESS_MSG,
replicaId,
chosenHost,
Duration.between(startTime, Instant.now()).getSeconds());
resultFuture.complete(inputStream);
})
.exceptionally(ex -> {
handlePeerFetchException(ex, chosenHost, storeName, version, partition, replicaId);
return null;
});
});
}

// error case 5: no valid peers found for the requested blob after trying all possible hosts,
// solution: do not use blob at all.
String errorMessage = String.format(
"Failed to connect to any peer for partition %d store %s version %d, after trying all possible hosts.",
partition,
storeName,
version);
LOGGER.warn(errorMessage);
throw new VenicePeersNotFoundException(errorMessage);
// error case 2: no valid peers found for the requested blob after trying all possible hosts, skip bootstrapping
// from blob.
chainOfPeersFuture.thenRun(() -> {
if (!resultFuture.isDone()) {
resultFuture.completeExceptionally(
new VenicePeersNotFoundException(String.format(NO_VALID_PEERS_MSG_FORMAT, replicaId)));
}
});
}

/**
* Handle the exception thrown when fetching the blob from a peer.
*/
private void handlePeerFetchException(
Throwable ex,
String chosenHost,
String storeName,
int version,
int partition,
String replicaId) {
if (ex.getCause() instanceof VenicePeersConnectionException) {
// error case 3: failed to connect to the peer, move to the next possible host
LOGGER.error(PEER_CONNECTION_EXCEPTION_MSG, replicaId, chosenHost, ex.getMessage());
} else if (ex.getCause() instanceof VeniceBlobTransferFileNotFoundException) {
// error case 4: the connected host does not have the requested file, move to the next available host
LOGGER.error(PEER_NO_SNAPSHOT_MSG, replicaId, chosenHost, ex.getMessage());
} else {
// error case 5: other exceptions (InterruptedException, ExecutionException, TimeoutException) that are not
// expected, move to the next possible host
RocksDBUtils.deletePartitionDir(baseDir, storeName, version, partition);
LOGGER.error(FAILED_TO_FETCH_BLOB_MSG, replicaId, chosenHost, ex.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -61,6 +62,7 @@ public CompletionStage<InputStream> get(String host, String storeName, int versi
// Attach the file handler to the pipeline
// Attach the metadata handler to the pipeline
ch.pipeline()
.addLast(new IdleStateHandler(0, 0, 60))
.addLast(new MetadataAggregator(MAX_METADATA_CONTENT_LENGTH))
.addLast(new P2PFileTransferClientHandler(baseDir, inputStream, storeName, version, partition))
.addLast(new P2PMetadataTransferHandler(storageMetadataService, baseDir, storeName, version, partition));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -51,7 +52,10 @@
@ChannelHandler.Sharable
public class P2PFileTransferServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger LOGGER = LogManager.getLogger(P2PFileTransferServerHandler.class);

// Maximum timeout for blob transfer in minutes per partition
// TODO: make this configurable in store level
private static final int MAX_TIMEOUT_FOR_BLOB_TRANSFER_IN_MIN = 30;
private static final String TRANSFER_TIMEOUT_ERROR_MSG_FORMAT = "Timeout for transferring blob %s file %s";
private boolean useZeroCopy = false;
private final String baseDir;
private BlobSnapshotManager blobSnapshotManager;
Expand Down Expand Up @@ -131,8 +135,19 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpReque
return;
}

// Set up the time limitation for the transfer
long startTime = System.currentTimeMillis();

// transfer files
for (File file: files) {
if (System.currentTimeMillis() - startTime >= TimeUnit.MINUTES.toMillis(MAX_TIMEOUT_FOR_BLOB_TRANSFER_IN_MIN)) {
String errMessage = String
.format(TRANSFER_TIMEOUT_ERROR_MSG_FORMAT, blobTransferRequest.getFullResourceName(), file.getName());
LOGGER.error(errMessage);
setupResponseAndFlush(HttpResponseStatus.REQUEST_TIMEOUT, errMessage.getBytes(), false, ctx);
return;
}

sendFile(file, ctx);
}

Expand Down
Loading

0 comments on commit 8889ab3

Please sign in to comment.