diff --git a/operator/src/main/java/oracle/kubernetes/operator/BaseMain.java b/operator/src/main/java/oracle/kubernetes/operator/BaseMain.java index 3e7a3c606ae..fe1f3e4d8d1 100644 --- a/operator/src/main/java/oracle/kubernetes/operator/BaseMain.java +++ b/operator/src/main/java/oracle/kubernetes/operator/BaseMain.java @@ -45,7 +45,6 @@ import oracle.kubernetes.operator.work.Fiber.CompletionCallback; import oracle.kubernetes.operator.work.Packet; import oracle.kubernetes.operator.work.Step; -import oracle.kubernetes.operator.work.ThreadFactorySingleton; import oracle.kubernetes.utils.SystemClock; /** An abstract base main class for the operator and the webhook. */ @@ -60,7 +59,7 @@ public abstract class BaseMain { static final Container container = new Container(); static final ThreadFactory threadFactory = new WrappedThreadFactory(); static ScheduledExecutorService wrappedExecutorService = - Engine.wrappedExecutorService("operator", container); // non-final to allow change in unit tests + Engine.wrappedExecutorService(container); // non-final to allow change in unit tests static final AtomicReference lastFullRecheck = new AtomicReference<>(SystemClock.now()); static final Semaphore shutdownSignal = new Semaphore(0); @@ -276,7 +275,7 @@ static Packet createPacketWithLoggingContext(String ns) { } private static class WrappedThreadFactory implements ThreadFactory { - private final ThreadFactory delegate = ThreadFactorySingleton.getInstance(); + private final ThreadFactory delegate = Thread.ofVirtual().factory(); @Override public Thread newThread(@Nonnull Runnable r) { diff --git a/operator/src/main/java/oracle/kubernetes/operator/helpers/ClientPool.java b/operator/src/main/java/oracle/kubernetes/operator/helpers/ClientPool.java index 1e9fd07785c..af04b9f1cc6 100644 --- a/operator/src/main/java/oracle/kubernetes/operator/helpers/ClientPool.java +++ b/operator/src/main/java/oracle/kubernetes/operator/helpers/ClientPool.java @@ -4,11 +4,8 @@ package oracle.kubernetes.operator.helpers; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -116,22 +113,9 @@ public ApiClient get() { } if (threadFactory != null) { - ExecutorService exec = - new ThreadPoolExecutor( - 0, - Integer.MAX_VALUE, - 60, - TimeUnit.SECONDS, - new SynchronousQueue<>(), - threadFactory) { - @Override - public void execute(Runnable command) { - super.execute(wrapRunnable(command)); - } - }; OkHttpClient httpClient = client.getHttpClient().newBuilder().addInterceptor(new HeaderModifierInterceptor()) - .dispatcher(new Dispatcher(exec)).build(); + .dispatcher(new Dispatcher(Executors.newThreadPerTaskExecutor(threadFactory))).build(); client.setHttpClient(httpClient); } diff --git a/operator/src/main/java/oracle/kubernetes/operator/work/Engine.java b/operator/src/main/java/oracle/kubernetes/operator/work/Engine.java index 346cc7f4d5d..00c4911a5ba 100644 --- a/operator/src/main/java/oracle/kubernetes/operator/work/Engine.java +++ b/operator/src/main/java/oracle/kubernetes/operator/work/Engine.java @@ -1,13 +1,21 @@ -// Copyright (c) 2018, 2021, Oracle and/or its affiliates. +// Copyright (c) 2018, 2023, Oracle and/or its affiliates. // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. package oracle.kubernetes.operator.work; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; @@ -15,7 +23,7 @@ * Collection of {@link Fiber}s. Owns an {@link Executor} to run them. */ public class Engine { - private static final int DEFAULT_THREAD_COUNT = 10; + private static final int DEFAULT_THREAD_COUNT = 2; private final AtomicReference threadPool = new AtomicReference<>(); /** @@ -29,30 +37,146 @@ public Engine(ScheduledExecutorService threadPool) { /** * Creates engine with the specified id and default container and executor. - * - * @param id Engine id */ - public Engine(String id) { - this(wrappedExecutorService(id, ContainerResolver.getDefault().getContainer())); + public Engine() { + this(wrappedExecutorService(ContainerResolver.getDefault().getContainer())); } /** * wrapped executor service. - * @param id id * @param container container * @return executor service */ - public static ScheduledExecutorService wrappedExecutorService(String id, Container container) { - ScheduledThreadPoolExecutor threadPool = - new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, new DaemonThreadFactory(id)); + public static ScheduledExecutorService wrappedExecutorService(Container container) { + ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT); threadPool.setRemoveOnCancelPolicy(true); - return wrap(container, threadPool); + return wrap(container, new VirtualScheduledExectuorService(threadPool)); } private static ScheduledExecutorService wrap(Container container, ScheduledExecutorService ex) { return container != null ? ContainerResolver.getDefault().wrapExecutor(container, ex) : ex; } + private static class VirtualScheduledExectuorService implements ScheduledExecutorService { + private final ScheduledExecutorService service; + private final ExecutorService virtualService = Executors.newVirtualThreadPerTaskExecutor(); + + public VirtualScheduledExectuorService(ScheduledExecutorService service) { + this.service = service; + } + + private Runnable wrap(Runnable command) { + return () -> virtualService.execute(command); + } + + @Nonnull + @Override + public ScheduledFuture schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) { + return service.schedule(wrap(command), delay, unit); + } + + @Nonnull + @Override + public ScheduledFuture schedule(@Nonnull Callable callable, long delay, @Nonnull TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public ScheduledFuture scheduleAtFixedRate( + @Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) { + return service.scheduleAtFixedRate(wrap(command), initialDelay, period, unit); + } + + @Nonnull + @Override + public ScheduledFuture scheduleWithFixedDelay( + @Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) { + return service.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit); + } + + @Override + public void shutdown() { + service.shutdown(); + } + + @Nonnull + @Override + public List shutdownNow() { + return service.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return service.isShutdown(); + } + + @Override + public boolean isTerminated() { + return service.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) throws InterruptedException { + return service.awaitTermination(timeout, unit); + } + + @Nonnull + @Override + public Future submit(@Nonnull Callable task) { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public Future submit(@Nonnull Runnable task, T result) { + return service.submit(wrap(task), result); + } + + @Nonnull + @Override + public Future submit(@Nonnull Runnable task) { + return service.submit(wrap(task)); + } + + @Nonnull + @Override + public List> invokeAll(@Nonnull Collection> tasks) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public List> invokeAll( + @Nonnull Collection> tasks, long timeout, @Nonnull TimeUnit unit) + throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public T invokeAny(@Nonnull Collection> tasks) + throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(@Nonnull Collection> tasks, long timeout, @Nonnull TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + service.close(); + } + + @Override + public void execute(@Nonnull Runnable command) { + virtualService.execute(command); + } + } + /** * Returns the executor. * @@ -81,25 +205,4 @@ public Fiber createFiber() { Fiber createChildFiber(Fiber parent) { return new Fiber(this, parent); } - - private static class DaemonThreadFactory implements ThreadFactory { - final AtomicInteger threadNumber = new AtomicInteger(1); - final String namePrefix; - - DaemonThreadFactory(String id) { - namePrefix = "engine-" + id + "-thread-"; - } - - public Thread newThread(@Nonnull Runnable r) { - Thread t = new Thread(r); - t.setName(namePrefix + threadNumber.getAndIncrement()); - if (!t.isDaemon()) { - t.setDaemon(true); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - } } diff --git a/operator/src/test/java/oracle/kubernetes/operator/DomainPresenceTest.java b/operator/src/test/java/oracle/kubernetes/operator/DomainPresenceTest.java index af1b09489f6..0e9bb4dc991 100644 --- a/operator/src/test/java/oracle/kubernetes/operator/DomainPresenceTest.java +++ b/operator/src/test/java/oracle/kubernetes/operator/DomainPresenceTest.java @@ -619,7 +619,7 @@ public abstract static class DomainProcessorStub implements DomainProcessor { @NotNull private Map createMakeRightFiberGateMap() { Map map = new ConcurrentHashMap<>(); - map.put(NS, new TestFiberGate(new Engine("Test"))); + map.put(NS, new TestFiberGate(new Engine())); return map; }