Skip to content

Commit

Permalink
Merge pull request #798 from joaodinissf/awaitBinaryStorageExecutorQu…
Browse files Browse the repository at this point in the history
…iescence

Fix the behavior of awaitBinaryStorageExecutorTermination
  • Loading branch information
joaodinissf authored Sep 5, 2023
2 parents 8300377 + dfe9483 commit 10f4300
Showing 1 changed file with 48 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -114,6 +115,7 @@
public class MonitoredClusteringBuilderState extends ClusteringBuilderState
implements IResourceDescriptions2, IXtextTargetPlatformManager.Listener, ILayeredResourceDescriptions {

private static final int BINARY_STORAGE_EXECUTOR_PARALLELISM = 4;
public static final String PHASE_ONE_BUILD_SORTER = "com.avaloq.tools.ddk.xtext.builder.phaseOneBuildSorter"; //$NON-NLS-1$
public static final String PHASE_TWO_BUILD_SORTER = "com.avaloq.tools.ddk.xtext.builder.phaseTwoBuildSorter"; //$NON-NLS-1$

Expand Down Expand Up @@ -170,7 +172,7 @@ public class MonitoredClusteringBuilderState extends ClusteringBuilderState
return worker;
};

private final ForkJoinPool binaryStorageExecutor = new ForkJoinPool(4, factory, null, false);
private ForkJoinPool binaryStorageExecutor = new ForkJoinPool(BINARY_STORAGE_EXECUTOR_PARALLELISM, factory, null, false);

/**
* Handle to the ResourceDescriptionsData we use viewed as a IResourceDescriptions2 (with findReferences()). Parent class does not provide direct access to
Expand Down Expand Up @@ -234,7 +236,6 @@ public synchronized void load() {
}
}

/** {@inheritDoc} */
@Override
protected void ensureLoaded() {
if (!isLoaded) {
Expand Down Expand Up @@ -395,7 +396,6 @@ public synchronized ImmutableList<IResourceDescription.Delta> clean(final Set<UR
}
}

/** {@inheritDoc} */
@Override
// CHECKSTYLE:CHECK-OFF NestedTryDepth
protected Collection<Delta> doUpdate(final BuildData buildData, final ResourceDescriptionsData newData, final IProgressMonitor monitor) { // NOPMD
Expand Down Expand Up @@ -653,7 +653,7 @@ public boolean isCanceled() {
}
traceSet.ended(BuildLinkingEvent.class);
watchdog.interrupt();
awaitBinaryStorageExecutorQuiescence();
awaitBinaryStorageExecutorTermination();
}
return allDeltas;
// CHECKSTYLE:CHECK-ON NestedTryDepth
Expand Down Expand Up @@ -688,38 +688,40 @@ protected void storeBinaryResource(final Resource resource, final BuildData buil
if (isBinaryModelStorageAvailable && resource instanceof StorageAwareResource && ((StorageAwareResource) resource).getResourceStorageFacade() != null
&& fileSystemAccess instanceof IFileSystemAccessExtension3) {

CompletableFuture.runAsync(() -> doStoreBinaryResource(resource, buildData), binaryStorageExecutor);
try {
CompletableFuture.runAsync(() -> doStoreBinaryResource(resource, buildData), binaryStorageExecutor);
} catch (RejectedExecutionException e) {
String errorMessage = "Unable to submit a new task to store a binary resource."; //$NON-NLS-1$
if (binaryStorageExecutor.isShutdown()) {
LOGGER.info(errorMessage + " The worker pool has shut down."); //$NON-NLS-1$
} else {
LOGGER.error(errorMessage + " Exception information: " + e.getMessage()); //$NON-NLS-1$
}
}
}
}

protected void doStoreBinaryResource(final Resource resource, final BuildData buildData) {
IResourceStorageFacade storageFacade = ((StorageAwareResource) resource).getResourceStorageFacade();
if (resource.getResourceSet() != null) {
final long maxTaskExecutionNanos = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
final long maxTaskExecutionNanos = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);

try {
long elapsed = System.nanoTime();

storageFacade.saveResource((StorageAwareResource) resource, (IFileSystemAccessExtension3) fileSystemAccess);
buildData.getSourceLevelURICache().getSources().remove(resource.getURI());
try {
long elapsed = System.nanoTime();

elapsed = System.nanoTime() - elapsed;
if (elapsed > maxTaskExecutionNanos) {
LOGGER.info("saving binary taking longer than expected (" + elapsed + " ns) : " + resource.getURI()); //$NON-NLS-1$ //$NON-NLS-2$
}
} catch (WrappedException ex) {
LOGGER.error(FAILED_TO_SAVE_BINARY + resource.getURI(), ex.exception());
storageFacade.saveResource((StorageAwareResource) resource, (IFileSystemAccessExtension3) fileSystemAccess);
buildData.getSourceLevelURICache().getSources().remove(resource.getURI());

// CHECKSTYLE:OFF
} catch (Throwable ex) {
// CHECKSTYLE:ON
LOGGER.error(FAILED_TO_SAVE_BINARY + resource.getURI(), ex);
}
} else {
if (storageFacade instanceof DirectLinkingResourceStorageFacade) {
((DirectLinkingResourceStorageFacade) storageFacade).deleteStorage(resource.getURI(), fileSystemAccess);
elapsed = System.nanoTime() - elapsed;
if (elapsed > maxTaskExecutionNanos) {
LOGGER.info("saving binary taking longer than expected (" + elapsed + " ns) : " + resource.getURI()); //$NON-NLS-1$ //$NON-NLS-2$
}
LOGGER.info("No resourceSet found for " + resource.getURI()); //$NON-NLS-1$
} catch (WrappedException ex) {
LOGGER.error(FAILED_TO_SAVE_BINARY + resource.getURI(), ex.exception());

// CHECKSTYLE:OFF
} catch (Throwable ex) {
// CHECKSTYLE:ON
LOGGER.error(FAILED_TO_SAVE_BINARY + resource.getURI(), ex);
}
}

Expand All @@ -744,13 +746,29 @@ protected void deleteBinaryResources(final Set<URI> toBeDeleted) {
/**
* Waits until binary models are stored.
*/
protected void awaitBinaryStorageExecutorQuiescence() {
protected void awaitBinaryStorageExecutorTermination() {
LOGGER.info("Waiting for binary resource storage tasks to complete..."); //$NON-NLS-1$

// Stop accepting additional work
binaryStorageExecutor.shutdown();
int activeThreadCount = binaryStorageExecutor.getActiveThreadCount();
long queuedTaskCount = binaryStorageExecutor.getQueuedTaskCount();
if (!binaryStorageExecutor.awaitQuiescence(1, TimeUnit.MINUTES)) {

// Attempt to wait for queued work to complete
try {
if (!binaryStorageExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
LOGGER.warn(String.format("Binary resource storage tasks not completed in time, start with task / queue %d / %d; now have %d / %d", //$NON-NLS-1$
activeThreadCount, queuedTaskCount, binaryStorageExecutor.getActiveThreadCount(), binaryStorageExecutor.getQueuedTaskCount()));
binaryStorageExecutor.shutdownNow();
}

LOGGER.info("Binary resource storage executor completed."); //$NON-NLS-1$

// Be ready to accept additional work
binaryStorageExecutor = new ForkJoinPool(BINARY_STORAGE_EXECUTOR_PARALLELISM, factory, null, false);
}

/**
Expand Down Expand Up @@ -1014,7 +1032,6 @@ private List<URI> writeResources(final Collection<URI> toWrite, final BuildData
return toBuild;
}

/** {@inheritDoc} */
protected void writeNewResourceDescriptions(final BuildData buildData, final IResourceDescriptions oldState, final CurrentDescriptions newState, final ResourceDescriptionsData newData, final IProgressMonitor monitor) {
final List<List<URI>> toWriteGroups = phaseOneBuildSorter.sort(buildData.getToBeUpdated());
final List<URI> toBuild = Lists.newLinkedList();
Expand Down Expand Up @@ -1059,9 +1076,7 @@ protected void writeNewResourceDescriptions(final BuildData buildData, final IRe
@Override
protected void clearResourceSet(final ResourceSet resourceSet) {
// this is important as otherwise the resources would unexpectedly become detached from the resource set
while (!binaryStorageExecutor.awaitQuiescence(1, TimeUnit.SECONDS)) {
LOGGER.warn("Waiting for binary resource storage tasks to complete."); //$NON-NLS-1$
}
awaitBinaryStorageExecutorTermination();
traceSet.started(BuildResourceSetClearEvent.class, resourceSet.getResources().size());
try {
EmfResourceSetUtil.clearResourceSetWithoutNotifications(resourceSet);
Expand Down Expand Up @@ -1297,34 +1312,29 @@ protected void addDeletedURIsToDeltas(final Set<URI> deletedUris, final Set<Delt

// IResourceDescriptions2 interface implementation

/** {@inheritDoc} */
@Override
public Set<URI> getAllURIs() {
return myData.getAllURIs();
}

/** {@inheritDoc} */
@Override
public Iterable<IResourceDescription> findAllReferencingResources(final Set<IResourceDescription> targetResources, final ReferenceMatchPolicy matchPolicy) {
ensureLoaded();
return myData.findAllReferencingResources(targetResources, matchPolicy);
}

/** {@inheritDoc} */
@Override
public Iterable<IResourceDescription> findExactReferencingResources(final Set<IEObjectDescription> targetObjects, final ReferenceMatchPolicy matchPolicy) {
ensureLoaded();
return myData.findExactReferencingResources(targetObjects, matchPolicy);
}

/** {@inheritDoc} */
@Override
public Iterable<IReferenceDescription> findReferencesToObjects(final Set<URI> targetObjects) {
ensureLoaded();
return myData.findReferencesToObjects(targetObjects);
}

/** {@inheritDoc} */
@Override
public Iterable<IResourceDescription> getLocalResourceDescriptions() {
ensureLoaded();
Expand Down

0 comments on commit 10f4300

Please sign in to comment.