Skip to content

Commit

Permalink
Merge pull request #7354 from alvasw/fix_daosnapshotservice_threading…
Browse files Browse the repository at this point in the history
…_issues

Fix DaoSnapshotService threading issues
  • Loading branch information
alejandrogarcia83 authored Jan 15, 2025
2 parents fe6a9a1 + 4ec4477 commit 3d86bbf
Showing 1 changed file with 37 additions and 32 deletions.
69 changes: 37 additions & 32 deletions core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -78,9 +80,9 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
@Setter
@Nullable
private Runnable resyncDaoStateFromResourcesHandler;
private int daoRequiresRestartHandlerAttempts = 0;
private boolean persistingBlockInProgress;
private boolean isParseBlockChainComplete;
private final AtomicInteger daoRequiresRestartHandlerAttempts = new AtomicInteger();
private final AtomicBoolean persistingBlockInProgress = new AtomicBoolean();
private final AtomicBoolean isParseBlockChainComplete = new AtomicBoolean();
private final List<Integer> heightsOfLastAppliedSnapshots = new ArrayList<>();

///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -155,7 +157,7 @@ public void onDaoStateChanged(Block block) {
// Otherwise, we do it only after the initial blockchain parsing is completed to not delay the parsing.
// In that case we get the missing hashes from the seed nodes. At any new block we do the hash calculation
// ourselves and therefore get back confidence that our DAO state is in sync with the network.
if (preferences.isUseFullModeDaoMonitor() || isParseBlockChainComplete) {
if (preferences.isUseFullModeDaoMonitor() || isParseBlockChainComplete.get()) {
// We need to execute first the daoStateMonitoringService.createHashFromBlock to get the hash created
daoStateMonitoringService.createHashFromBlock(block);
maybeCreateSnapshot(block);
Expand All @@ -167,7 +169,7 @@ public void onDaoStateChanged(Block block) {

@Override
public void onParseBlockChainComplete() {
isParseBlockChainComplete = true;
isParseBlockChainComplete.set(true);

// In case we have dao monitoring deactivated we create the snapshot after we are completed with parsing,
// and we got called back from daoStateMonitoringService once the hashes are created from peers data.
Expand Down Expand Up @@ -211,6 +213,23 @@ public void onParseBlockChainComplete() {

// We need to process during batch processing as well to write snapshots during that process.
public void maybeCreateSnapshot(Block block) {
// We protect to get called while we are not completed with persisting the daoState. This can take about
// 20 seconds, and it is not expected that we get triggered another snapshot event in that period, but this
// check guards that we would skip such calls.
if (persistingBlockInProgress.get()) {
if (preferences.isUseFullModeDaoMonitor()) {
// In case we don't use isUseFullModeDaoMonitor we might get called here too often as the parsing is much
// faster than the persistence, and we likely create only 1 snapshot during initial parsing, so
// we log only if isUseFullModeDaoMonitor is true as then parsing is likely slower, and we would
// expect that we do a snapshot at each trigger block.
log.info("We try to persist a daoState but the previous call has not completed yet. " +
"We ignore that call and skip that snapshot. " +
"Snapshot will be created at next snapshot height again. This is not to be expected with live " +
"blockchain data.");
}
return;
}

int chainHeight = block.getHeight();
if (!isSnapshotHeight(chainHeight)) {
return;
Expand All @@ -230,23 +249,6 @@ public void maybeCreateSnapshot(Block block) {
return;
}

// We protect to get called while we are not completed with persisting the daoState. This can take about
// 20 seconds, and it is not expected that we get triggered another snapshot event in that period, but this
// check guards that we would skip such calls.
if (persistingBlockInProgress) {
if (preferences.isUseFullModeDaoMonitor()) {
// In case we don't use isUseFullModeDaoMonitor we might get called here too often as the parsing is much
// faster than the persistence, and we likely create only 1 snapshot during initial parsing, so
// we log only if isUseFullModeDaoMonitor is true as then parsing is likely slower, and we would
// expect that we do a snapshot at each trigger block.
log.info("We try to persist a daoState but the previous call has not completed yet. " +
"We ignore that call and skip that snapshot. " +
"Snapshot will be created at next snapshot height again. This is not to be expected with live " +
"blockchain data.");
}
return;
}

if (daoStateCandidate != null) {
persist();
} else {
Expand All @@ -256,7 +258,7 @@ public void maybeCreateSnapshot(Block block) {

private void persist() {
long ts = System.currentTimeMillis();
persistingBlockInProgress = true;
persistingBlockInProgress.set(true);
daoStateStorageService.requestPersistence(daoStateCandidate,
blocksCandidate,
hashChainCandidate,
Expand All @@ -265,7 +267,7 @@ private void persist() {
snapshotHeight, System.currentTimeMillis() - ts);

createSnapshot();
persistingBlockInProgress = false;
persistingBlockInProgress.set(false);
});
}

Expand Down Expand Up @@ -293,7 +295,7 @@ public void revertToLastSnapshot() {
applySnapshot(false);
}

private void applySnapshot(boolean fromInitialize) {
private synchronized void applySnapshot(boolean fromInitialize) {
DaoState persistedDaoState = daoStateStorageService.getPersistedBsqState();
if (persistedDaoState == null) {
log.info("Try to apply snapshot but no stored snapshot available. That is expected at first blocks.");
Expand Down Expand Up @@ -368,7 +370,7 @@ private boolean isHeightBelowGenesisHeight(int height) {
private void resyncDaoStateFromResources() {
log.info("resyncDaoStateFromResources called");
if (resyncDaoStateFromResourcesHandler == null) {
if (++daoRequiresRestartHandlerAttempts <= 3) {
if (daoRequiresRestartHandlerAttempts.addAndGet(1) <= 3) {
log.warn("resyncDaoStateFromResourcesHandler has not been initialized yet, will try again in 10 seconds");
UserThread.runAfter(this::resyncDaoStateFromResources, 10); // a delay for the app to init
return;
Expand All @@ -377,12 +379,15 @@ private void resyncDaoStateFromResources() {
System.exit(1);
}
}
try {
daoStateStorageService.removeAndBackupAllDaoData();
// the restart handler informs the user of the need to restart bisq (in desktop mode)
resyncDaoStateFromResourcesHandler.run();
} catch (IOException e) {
log.error("Error at resyncDaoStateFromResources: {}", e.toString());

synchronized (this) {
try {
daoStateStorageService.removeAndBackupAllDaoData();
// the restart handler informs the user of the need to restart bisq (in desktop mode)
resyncDaoStateFromResourcesHandler.run();
} catch (IOException e) {
log.error("Error at resyncDaoStateFromResources: {}", e.toString());
}
}
}

Expand Down

0 comments on commit 3d86bbf

Please sign in to comment.