Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
[DLOG] Sync zk before opening ledger
Browse files Browse the repository at this point in the history
DLog uses a separate zookeeper session for dlog metadata and
bookkeeper metadata. This can introduce a race condition when a dlog
client creates a new segment and adds it to a log. If the dlog
metadata session has a newer view of the data than the bookkeeper
metadata session, then the dlog session will see there's a new segment
to be opened, but the zk session will throw a NoSuchLedger exception
when it actually tries to open it.

To solve this, we need to tell the bookkeeper zk session to ensure it
has the latest view of the data at the moment it tries to open a
ledger.
  • Loading branch information
ivankelly committed Dec 10, 2019
1 parent 5f9f3ac commit acbc36e
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.net.NetUtils;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -224,6 +225,78 @@ public void createComplete(int rc, LedgerHandle lh, Object ctx) {
return promise;
}

public CompletableFuture<LedgerHandle> openLedger(long lid) {
BookKeeper bk;
try {
bk = get();

CompletableFuture<LedgerHandle> promise = new CompletableFuture<>();
// the bookkeeper client uses a different zk session to the session used
// to read the list of segments. Because of this, it is possible that the bk
// client session has an older view of the data, and a ledger visible to the
// dlog session is not yet visible to the bk session. For this reason, force
// the zk session to sync with the leader before trying to open a ledger.
zkc.get().sync("/", (rc, path, ctx) -> {
if (rc == KeeperException.Code.OK.intValue()) {
bk.asyncOpenLedger(lid, BookKeeper.DigestType.CRC32, passwd,
(rc2, lh, ctx2) -> {
if (BKException.Code.OK == rc2) {
promise.complete(lh);
} else {
promise.completeExceptionally(BKException.create(rc2));
}
}, null);
} else {
LOG.error("Error syncing to ZK leader",
KeeperException.create(KeeperException.Code.get(rc)));
promise.completeExceptionally(new BKException.ZKException());
}
}, null);
return promise;
} catch (IOException ioe) {
return FutureUtils.exception(ioe);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return FutureUtils.exception(ie);
}
}

public CompletableFuture<LedgerHandle> openLedgerNoRecovery(long lid) {
BookKeeper bk;
try {
bk = get();

CompletableFuture<LedgerHandle> promise = new CompletableFuture<>();
// the bookkeeper client uses a different zk session to the session used
// to read the list of segments. Because of this, it is possible that the bk
// client session has an older view of the data, and a ledger visible to the
// dlog session is not yet visible to the bk session. For this reason, force
// the zk session to sync with the leader before trying to open a ledger.
zkc.get().sync("/", (rc, path, ctx) -> {
if (rc == KeeperException.Code.OK.intValue()) {
bk.asyncOpenLedgerNoRecovery(lid, BookKeeper.DigestType.CRC32, passwd,
(rc2, lh, ctx2) -> {
if (BKException.Code.OK == rc2) {
promise.complete(lh);
} else {
promise.completeExceptionally(BKException.create(rc2));
}
}, null);
} else {
LOG.error("Error syncing to ZK leader",
KeeperException.create(KeeperException.Code.get(rc)));
promise.completeExceptionally(new BKException.ZKException());
}
}, null);
return promise;
} catch (IOException ioe) {
return FutureUtils.exception(ioe);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return FutureUtils.exception(ie);
}
}

public CompletableFuture<Void> deleteLedger(long lid,
final boolean ignoreNonExistentLedger) {
BookKeeper bk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.distributedlog.impl.logsegment;

import static com.google.common.base.Charsets.UTF_8;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
Expand All @@ -35,14 +33,14 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogSegmentMetadata;
Expand All @@ -59,7 +57,7 @@
/**
* BookKeeper ledger based log segment entry reader.
*/
public class BKLogSegmentEntryReader implements SafeRunnable, LogSegmentEntryReader, AsyncCallback.OpenCallback {
public class BKLogSegmentEntryReader implements SafeRunnable, LogSegmentEntryReader {

private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class);

Expand Down Expand Up @@ -285,7 +283,7 @@ boolean hasReadEnoughEntries() {
}
}

private final BookKeeper bk;
private final BookKeeperClient bkc;
private final DistributedLogConfiguration conf;
private final OrderedScheduler scheduler;
private final long lssn;
Expand Down Expand Up @@ -332,7 +330,7 @@ boolean hasReadEnoughEntries() {
BKLogSegmentEntryReader(LogSegmentMetadata metadata,
LedgerHandle lh,
long startEntryId,
BookKeeper bk,
BookKeeperClient bkc,
OrderedScheduler scheduler,
DistributedLogConfiguration conf,
StatsLogger statsLogger,
Expand All @@ -344,7 +342,7 @@ boolean hasReadEnoughEntries() {
this.deserializeRecordSet = conf.getDeserializeRecordSetOnReads();
this.lh = lh;
this.nextEntryId = Math.max(startEntryId, 0);
this.bk = bk;
this.bkc = bkc;
this.conf = conf;
this.numPrefetchEntries = conf.getNumPrefetchEntriesPerLogSegment();
this.maxPrefetchEntries = conf.getMaxPrefetchEntriesPerLogSegment();
Expand Down Expand Up @@ -432,22 +430,17 @@ public synchronized void onLogSegmentMetadataUpdated(LogSegmentMetadata segment)
return;
}
// segment is closed from inprogress, then re-open the log segment
bk.asyncOpenLedger(
segment.getLogSegmentId(),
BookKeeper.DigestType.CRC32,
conf.getBKDigestPW().getBytes(UTF_8),
this,
segment);
bkc.openLedger(segment.getLogSegmentId())
.whenComplete((ledger, exception) -> {
if (exception != null) {
failOrRetryOpenLedger(exception, segment);
} else {
handleSegmentOpened(ledger, segment);
}
});
}

@Override
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
LogSegmentMetadata segment = (LogSegmentMetadata) ctx;
if (BKException.Code.OK != rc) {
// fail current reader or retry opening the reader
failOrRetryOpenLedger(rc, segment);
return;
}
private void handleSegmentOpened(LedgerHandle lh, LogSegmentMetadata segment) {
// switch to new ledger handle if the log segment is moved to completed.
CacheEntry longPollRead = null;
synchronized (this) {
Expand All @@ -474,15 +467,15 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
notifyReaders();
}

private void failOrRetryOpenLedger(int rc, final LogSegmentMetadata segment) {
private void failOrRetryOpenLedger(Throwable ex, final LogSegmentMetadata segment) {
if (isClosed()) {
return;
}
if (isBeyondLastAddConfirmed()) {
// if the reader is already caught up, let's fail the reader immediately
// as we need to pull the latest metadata of this log segment.
completeExceptionally(new BKTransmitException("Failed to open ledger for reading log segment "
+ getSegment(), rc),
+ getSegment(), ex),
true);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.distributedlog.impl.logsegment;

import static com.google.common.base.Charsets.UTF_8;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.AsyncCallback;
Expand Down Expand Up @@ -56,26 +54,10 @@
*/
public class BKLogSegmentEntryStore implements
LogSegmentEntryStore,
AsyncCallback.OpenCallback,
AsyncCallback.DeleteCallback {

private static final Logger logger = LoggerFactory.getLogger(BKLogSegmentEntryReader.class);

private static class OpenReaderRequest {

private final LogSegmentMetadata segment;
private final long startEntryId;
private final CompletableFuture<LogSegmentEntryReader> openPromise;

OpenReaderRequest(LogSegmentMetadata segment,
long startEntryId) {
this.segment = segment;
this.startEntryId = startEntryId;
this.openPromise = new CompletableFuture<LogSegmentEntryReader>();
}

}

private static class DeleteLogSegmentRequest {

private final LogSegmentMetadata segment;
Expand All @@ -88,7 +70,6 @@ private static class DeleteLogSegmentRequest {

}

private final byte[] passwd;
private final ZooKeeperClient zkc;
private final BookKeeperClient bkc;
private final OrderedScheduler scheduler;
Expand All @@ -111,7 +92,6 @@ public BKLogSegmentEntryStore(DistributedLogConfiguration conf,
this.dynConf = dynConf;
this.zkc = zkc;
this.bkc = bkc;
this.passwd = conf.getBKDigestPW().getBytes(UTF_8);
this.scheduler = scheduler;
this.allocator = allocator;
this.statsLogger = statsLogger;
Expand Down Expand Up @@ -194,50 +174,36 @@ public CompletableFuture<LogSegmentEntryReader> openReader(LogSegmentMetadata se
} catch (IOException e) {
return FutureUtils.exception(e);
}
OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId);

CompletableFuture<LedgerHandle> ledgerF;
if (segment.isInProgress()) {
bk.asyncOpenLedgerNoRecovery(
segment.getLogSegmentId(),
BookKeeper.DigestType.CRC32,
passwd,
this,
request);
ledgerF = bkc.openLedgerNoRecovery(segment.getLogSegmentId());
} else {
bk.asyncOpenLedger(
segment.getLogSegmentId(),
BookKeeper.DigestType.CRC32,
passwd,
this,
request);
}
return request.openPromise;
}

@Override
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
OpenReaderRequest request = (OpenReaderRequest) ctx;
if (BKException.Code.OK != rc) {
FutureUtils.completeExceptionally(
request.openPromise,
new BKTransmitException("Failed to open ledger handle for log segment " + request.segment, rc));
return;
}
// successfully open a ledger
try {
LogSegmentEntryReader reader = new BKLogSegmentEntryReader(
request.segment,
lh,
request.startEntryId,
bkc.get(),
scheduler,
conf,
statsLogger,
failureInjector);
FutureUtils.complete(request.openPromise, reader);
} catch (IOException e) {
FutureUtils.completeExceptionally(request.openPromise, e);
ledgerF = bkc.openLedger(segment.getLogSegmentId());
}

CompletableFuture<LogSegmentEntryReader> promise = new CompletableFuture<>();
ledgerF.thenApply((ledger) -> {
return new BKLogSegmentEntryReader(
segment,
ledger,
startEntryId,
bkc,
scheduler,
conf,
statsLogger,
failureInjector);
})
.whenComplete((reader, exception) -> {
if (exception != null) {
promise.completeExceptionally(
new BKTransmitException(
"Failed to open ledger handle for log segment " + segment, exception));
} else {
promise.complete(reader);
}
});
return promise;
}

@Override
Expand All @@ -249,39 +215,27 @@ public CompletableFuture<LogSegmentRandomAccessEntryReader> openRandomAccessRead
} catch (IOException e) {
return FutureUtils.exception(e);
}
final CompletableFuture<LogSegmentRandomAccessEntryReader> openPromise =
new CompletableFuture<LogSegmentRandomAccessEntryReader>();
AsyncCallback.OpenCallback openCallback = new AsyncCallback.OpenCallback() {
@Override
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
if (BKException.Code.OK != rc) {
FutureUtils.completeExceptionally(
openPromise,
new BKTransmitException("Failed to open ledger handle for log segment " + segment, rc));
return;
}
LogSegmentRandomAccessEntryReader reader = new BKLogSegmentRandomAccessEntryReader(
segment,
lh,
conf);
FutureUtils.complete(openPromise, reader);
}
};

CompletableFuture<LedgerHandle> ledgerF;
if (segment.isInProgress() && !fence) {
bk.asyncOpenLedgerNoRecovery(
segment.getLogSegmentId(),
BookKeeper.DigestType.CRC32,
passwd,
openCallback,
null);
ledgerF = bkc.openLedgerNoRecovery(segment.getLogSegmentId());
} else {
bk.asyncOpenLedger(
segment.getLogSegmentId(),
BookKeeper.DigestType.CRC32,
passwd,
openCallback,
null);
ledgerF = bkc.openLedger(segment.getLogSegmentId());
}
return openPromise;

CompletableFuture<LogSegmentRandomAccessEntryReader> promise = new CompletableFuture<>();
ledgerF.thenApply((ledger) -> {
return new BKLogSegmentRandomAccessEntryReader(segment, ledger, conf);
})
.whenComplete((reader, exception) -> {
if (exception != null) {
promise.completeExceptionally(
new BKTransmitException(
"Failed to open ledger handle for log segment " + segment, exception));
} else {
promise.complete(reader);
}
});
return promise;
}
}
5 changes: 5 additions & 0 deletions stream/distributedlog/protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>bookkeeper-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Loading

0 comments on commit acbc36e

Please sign in to comment.