Skip to content

Commit

Permalink
First step moving to virtual threads as part of reducing thread conte…
Browse files Browse the repository at this point in the history
…ntion
  • Loading branch information
rjeberhard committed Nov 17, 2023
1 parent 31bdd1f commit db7642b
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import oracle.kubernetes.operator.work.Fiber.CompletionCallback;
import oracle.kubernetes.operator.work.Packet;
import oracle.kubernetes.operator.work.Step;
import oracle.kubernetes.operator.work.ThreadFactorySingleton;
import oracle.kubernetes.utils.SystemClock;

/** An abstract base main class for the operator and the webhook. */
Expand All @@ -60,7 +59,7 @@ public abstract class BaseMain {
static final Container container = new Container();
static final ThreadFactory threadFactory = new WrappedThreadFactory();
static ScheduledExecutorService wrappedExecutorService =
Engine.wrappedExecutorService("operator", container); // non-final to allow change in unit tests
Engine.wrappedExecutorService(container); // non-final to allow change in unit tests
static final AtomicReference<OffsetDateTime> lastFullRecheck =
new AtomicReference<>(SystemClock.now());
static final Semaphore shutdownSignal = new Semaphore(0);
Expand Down Expand Up @@ -276,7 +275,7 @@ static Packet createPacketWithLoggingContext(String ns) {
}

private static class WrappedThreadFactory implements ThreadFactory {
private final ThreadFactory delegate = ThreadFactorySingleton.getInstance();
private final ThreadFactory delegate = Thread.ofVirtual().factory();

@Override
public Thread newThread(@Nonnull Runnable r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
package oracle.kubernetes.operator.helpers;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -116,22 +113,9 @@ public ApiClient get() {
}

if (threadFactory != null) {
ExecutorService exec =
new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory) {
@Override
public void execute(Runnable command) {
super.execute(wrapRunnable(command));
}
};
OkHttpClient httpClient =
client.getHttpClient().newBuilder().addInterceptor(new HeaderModifierInterceptor())
.dispatcher(new Dispatcher(exec)).build();
.dispatcher(new Dispatcher(Executors.newThreadPerTaskExecutor(threadFactory))).build();
client.setHttpClient(httpClient);
}

Expand Down
171 changes: 137 additions & 34 deletions operator/src/main/java/oracle/kubernetes/operator/work/Engine.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
// Copyright (c) 2018, 2021, Oracle and/or its affiliates.
// Copyright (c) 2018, 2023, Oracle and/or its affiliates.
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.

package oracle.kubernetes.operator.work;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;

/**
* Collection of {@link Fiber}s. Owns an {@link Executor} to run them.
*/
public class Engine {
private static final int DEFAULT_THREAD_COUNT = 10;
private static final int DEFAULT_THREAD_COUNT = 2;
private final AtomicReference<ScheduledExecutorService> threadPool = new AtomicReference<>();

/**
Expand All @@ -29,30 +37,146 @@ public Engine(ScheduledExecutorService threadPool) {

/**
* Creates engine with the specified id and default container and executor.
*
* @param id Engine id
*/
public Engine(String id) {
this(wrappedExecutorService(id, ContainerResolver.getDefault().getContainer()));
public Engine() {
this(wrappedExecutorService(ContainerResolver.getDefault().getContainer()));
}

/**
* wrapped executor service.
* @param id id
* @param container container
* @return executor service
*/
public static ScheduledExecutorService wrappedExecutorService(String id, Container container) {
ScheduledThreadPoolExecutor threadPool =
new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT, new DaemonThreadFactory(id));
public static ScheduledExecutorService wrappedExecutorService(Container container) {
ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(DEFAULT_THREAD_COUNT);
threadPool.setRemoveOnCancelPolicy(true);
return wrap(container, threadPool);
return wrap(container, new VirtualScheduledExectuorService(threadPool));
}

private static ScheduledExecutorService wrap(Container container, ScheduledExecutorService ex) {
return container != null ? ContainerResolver.getDefault().wrapExecutor(container, ex) : ex;
}

private static class VirtualScheduledExectuorService implements ScheduledExecutorService {
private final ScheduledExecutorService service;
private final ExecutorService virtualService = Executors.newVirtualThreadPerTaskExecutor();

public VirtualScheduledExectuorService(ScheduledExecutorService service) {
this.service = service;
}

private Runnable wrap(Runnable command) {
return () -> virtualService.execute(command);
}

@Nonnull
@Override
public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
return service.schedule(wrap(command), delay, unit);
}

@Nonnull
@Override
public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
throw new UnsupportedOperationException();
}

@Nonnull
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
return service.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
}

@Nonnull
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
return service.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit);
}

@Override
public void shutdown() {
service.shutdown();
}

@Nonnull
@Override
public List<Runnable> shutdownNow() {
return service.shutdownNow();
}

@Override
public boolean isShutdown() {
return service.isShutdown();
}

@Override
public boolean isTerminated() {
return service.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
return service.awaitTermination(timeout, unit);
}

@Nonnull
@Override
public <T> Future<T> submit(@Nonnull Callable<T> task) {
throw new UnsupportedOperationException();
}

@Nonnull
@Override
public <T> Future<T> submit(@Nonnull Runnable task, T result) {
return service.submit(wrap(task), result);
}

@Nonnull
@Override
public Future<?> submit(@Nonnull Runnable task) {
return service.submit(wrap(task));
}

@Nonnull
@Override
public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Nonnull
@Override
public <T> List<Future<T>> invokeAll(
@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit)
throws InterruptedException {
throw new UnsupportedOperationException();
}

@Nonnull
@Override
public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}

@Override
public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}

@Override
public void close() {
service.close();
}

@Override
public void execute(@Nonnull Runnable command) {
virtualService.execute(command);
}
}

/**
* Returns the executor.
*
Expand Down Expand Up @@ -81,25 +205,4 @@ public Fiber createFiber() {
Fiber createChildFiber(Fiber parent) {
return new Fiber(this, parent);
}

private static class DaemonThreadFactory implements ThreadFactory {
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

DaemonThreadFactory(String id) {
namePrefix = "engine-" + id + "-thread-";
}

public Thread newThread(@Nonnull Runnable r) {
Thread t = new Thread(r);
t.setName(namePrefix + threadNumber.getAndIncrement());
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ public abstract static class DomainProcessorStub implements DomainProcessor {
@NotNull
private Map<String, FiberGate> createMakeRightFiberGateMap() {
Map<String, FiberGate> map = new ConcurrentHashMap<>();
map.put(NS, new TestFiberGate(new Engine("Test")));
map.put(NS, new TestFiberGate(new Engine()));
return map;
}

Expand Down

0 comments on commit db7642b

Please sign in to comment.