Skip to content

Commit

Permalink
IntelliJ: set application pool threads to daemon
Browse files Browse the repository at this point in the history
so that they don't block the application from exiting.
  • Loading branch information
ting-yuan committed Nov 14, 2024
1 parent 58bf5ad commit 992bcfa
Showing 1 changed file with 245 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright 2000-2023 JetBrains s.r.o. and contributors. Use of this source code is governed by the Apache 2.0 license.
package com.intellij.util.concurrency;

import com.intellij.openapi.diagnostic.Logger;
import com.intellij.openapi.progress.ProcessCanceledException;
import com.intellij.openapi.util.LowMemoryWatcherManager;
import com.intellij.util.IncorrectOperationException;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;

import java.util.List;
import java.util.concurrent.*;
import java.util.function.BiConsumer;

import static com.intellij.util.concurrency.AppExecutorUtil.propagateContextOrCancellation;

/**
* A ThreadPoolExecutor which also implements {@link ScheduledExecutorService} by awaiting scheduled tasks in a separate thread
* and then executing them in the owned ThreadPoolExecutor.
* Unlike the existing {@link ScheduledThreadPoolExecutor}, this pool is unbounded.
* @see AppExecutorUtil#getAppScheduledExecutorService()
*/
@ApiStatus.Internal
public final class AppScheduledExecutorService extends SchedulingWrapper {
static final String POOLED_THREAD_PREFIX = "ApplicationImpl pooled thread ";
private final @NotNull String myName;
private final LowMemoryWatcherManager myLowMemoryWatcherManager;

private static final class Holder {
private static final AppScheduledExecutorService INSTANCE = new AppScheduledExecutorService("Global instance", 1, TimeUnit.MINUTES);
}

static @NotNull ScheduledExecutorService getInstance() {
return Holder.INSTANCE;
}

AppScheduledExecutorService(@NotNull String name, long keepAliveTime, @NotNull TimeUnit unit) {
super(new BackendThreadPoolExecutor(new MyThreadFactory(), keepAliveTime, unit), new AppDelayQueue());
myName = name;
myLowMemoryWatcherManager = new LowMemoryWatcherManager(this);
}

private MyThreadFactory getCountingThreadFactory() {
return (MyThreadFactory)((BackendThreadPoolExecutor)backendExecutorService).getThreadFactory();
}

private static final class MyThreadFactory extends CountingThreadFactory {
private BiConsumer<? super Thread, ? super Runnable> newThreadListener;
private final ThreadFactory myThreadFactory = Executors.privilegedThreadFactory();

@Override
public @NotNull Thread newThread(final @NotNull Runnable r) {
Thread thread = myThreadFactory.newThread(r);
thread.setDaemon(true);
thread.setName(POOLED_THREAD_PREFIX + counter.incrementAndGet());

thread.setPriority(Thread.NORM_PRIORITY - 1);

BiConsumer<? super Thread, ? super Runnable> listener = newThreadListener;
if (listener != null) {
listener.accept(thread, r);
}
return thread;
}

void setNewThreadListener(@NotNull BiConsumer<? super Thread, ? super Runnable> threadListener) {
if (newThreadListener != null) throw new IllegalStateException("Listener was already set: " + newThreadListener);
newThreadListener = threadListener;
}
}

public void setNewThreadListener(@NotNull BiConsumer<? super Thread, ? super Runnable> threadListener) {
getCountingThreadFactory().setNewThreadListener(threadListener);
}

@Override
public @NotNull List<Runnable> shutdownNow() {
return notAllowedMethodCall();
}

@Override
public void shutdown() {
notAllowedMethodCall();
}

static List<Runnable> notAllowedMethodCall() {
throw new IncorrectOperationException("You must not call this method on the global app pool");
}

@Override
void onDelayQueuePurgedOnShutdown() {
((BackendThreadPoolExecutor)backendExecutorService).superShutdown();
}

void shutdownAppScheduledExecutorService() {
// LowMemoryWatcher starts background threads so stop it now to avoid RejectedExecutionException
myLowMemoryWatcherManager.shutdown();
doShutdown();
delayQueue.shutdown(new SchedulingWrapper.MyScheduledFutureTask<Void>(()->{}, null, 0) {
@Override
boolean executeMeInBackendExecutor() {
set(null);
return false;
}
});
}

@Override
public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException {
// let this task to bubble through the AppDelayQueue global queue to make sure there are no in-flight tasks leaked on stack of com.intellij.util.concurrency.AppDelayQueue.transferrerThread
long deadline = System.nanoTime() + unit.toNanos(timeout);
if (!delayQueue.awaitTermination(deadline - System.nanoTime(), TimeUnit.NANOSECONDS)) {
return false;
}

return super.awaitTermination(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@TestOnly
public @NotNull String statistics() {
return myName + " threads created counter = " + getCountingThreadFactory().counter;
}

@TestOnly
public String dumpQueue() {
return delayQueue.toString();
}

public int getBackendPoolExecutorSize() {
return ((BackendThreadPoolExecutor)backendExecutorService).getPoolSize();
}

@TestOnly
void setBackendPoolCorePoolSize(int size) {
((BackendThreadPoolExecutor)backendExecutorService).superSetCorePoolSize(size);
}

static final class BackendThreadPoolExecutor extends ThreadPoolExecutor {

BackendThreadPoolExecutor(@NotNull ThreadFactory factory,
long keepAliveTime,
@NotNull TimeUnit unit) {
super(1, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<>(), factory);
}

@Override
public void execute(@NotNull Runnable command) {
super.execute(capturePropagationAndCancellationContext(command));
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return newTaskFor(Executors.callable(runnable, value));
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return capturePropagationAndCancellationContext(callable);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null && !(t instanceof ProcessCanceledException)) {
Logger.getInstance(SchedulingWrapper.class).error("Worker exited due to exception", t);
}
}

private void superShutdown() {
super.shutdown();
}

private @NotNull List<Runnable> superShutdownNow() {
return super.shutdownNow();
}

// stub out sensitive methods
@Override
public void shutdown() {
notAllowedMethodCall();
}

@Override
public @NotNull List<Runnable> shutdownNow() {
return notAllowedMethodCall();
}

@Override
public void setCorePoolSize(int corePoolSize) {
notAllowedMethodCall();
}

private void superSetCorePoolSize(int corePoolSize) {
super.setCorePoolSize(corePoolSize);
}

@Override
public void allowCoreThreadTimeOut(boolean value) {
notAllowedMethodCall();
}

@Override
public void setMaximumPoolSize(int maximumPoolSize) {
notAllowedMethodCall();
}

@Override
public void setKeepAliveTime(long time, TimeUnit unit) {
notAllowedMethodCall();
}

void superSetKeepAliveTime(long time, TimeUnit unit) {
super.setKeepAliveTime(time, unit);
}

@Override
public void setThreadFactory(@NotNull ThreadFactory threadFactory) {
notAllowedMethodCall();
}
}

public static @NotNull Thread getPeriodicTasksThread() {
return Holder.INSTANCE.delayQueue.getThread();
}

@TestOnly
void waitForLowMemoryWatcherManagerInit(int timeout, @NotNull TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
myLowMemoryWatcherManager.waitForInitComplete(timeout, unit);
}

public static @NotNull Runnable capturePropagationAndCancellationContext(@NotNull Runnable command) {
if (!propagateContextOrCancellation()) {
return command;
}
return Propagation.capturePropagationAndCancellationContext(command);
}

public static <T> @NotNull FutureTask<T> capturePropagationAndCancellationContext(@NotNull Callable<T> callable) {
if (!propagateContextOrCancellation()) {
return new FutureTask<>(callable);
}
return Propagation.capturePropagationAndCancellationContext(callable);
}
}

0 comments on commit 992bcfa

Please sign in to comment.