Skip to content

Commit

Permalink
fix: replace usage of virtual threads for service output reading (#1520)
Browse files Browse the repository at this point in the history
### Motivation
A previous pull request replaced the old service output reading with a
new, virtual thread based implementation. However, it was discovered
that virtual threads are not actually a good fit for the use case, as
file I/O is actually blocking on virtual threads and will block all
other virtual thread operations.

### Modification
Replace virtual threads for service output reading with a scaling
scheduled thread pool to allow the same reading throughput regardless of
the service count.

### Result
The virtual thread scheduler is no longer blocked due to blocking file
I/O operations due to service output reading.
  • Loading branch information
derklaro authored Oct 2, 2024
1 parent 63936a3 commit 1d34f97
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import eu.cloudnetservice.node.service.ServiceConfigurationPreparer;
import eu.cloudnetservice.node.service.ServiceConsoleLogCache;
import eu.cloudnetservice.node.service.defaults.log.ProcessServiceLogCache;
import eu.cloudnetservice.node.service.defaults.log.ProcessServiceLogReadScheduler;
import eu.cloudnetservice.node.version.ServiceVersionProvider;
import io.vavr.CheckedFunction1;
import java.io.File;
Expand Down Expand Up @@ -80,9 +81,10 @@ public JVMService(
@NonNull CloudServiceManager manager,
@NonNull EventManager eventManager,
@NonNull ServiceVersionProvider versionProvider,
@NonNull ServiceConfigurationPreparer serviceConfigurationPreparer
@NonNull ServiceConfigurationPreparer serviceConfigurationPreparer,
@NonNull ProcessServiceLogReadScheduler processLogReadScheduler
) {
var logCache = new ProcessServiceLogCache(nodeConfig, configuration.serviceId());
var logCache = new ProcessServiceLogCache(nodeConfig, configuration.serviceId(), processLogReadScheduler);
this(
tickLoop,
nodeConfig,
Expand Down Expand Up @@ -193,23 +195,33 @@ protected void startProcess() {

@Override
protected void stopProcess() {
if (this.process != null) {
// try to send a shutdown command
var process = this.process;
if (process != null) {
// try to send a shutdown command (still needs the process instance to be present)
this.runCommand("end");
this.runCommand("stop");
this.process = null;

// try to wait for the process to terminate normally, setting the
// terminated flag to true if the process exited in the given time frame
var terminated = false;
try {
// wait until the process termination seconds exceeded
if (this.process.waitFor(this.configuration.processTerminationTimeoutSeconds(), TimeUnit.SECONDS)) {
this.process.exitValue(); // validation that the process terminated
this.process = null; // reset as there is no fall-through
return;
if (process.waitFor(this.configuration.processTerminationTimeoutSeconds(), TimeUnit.SECONDS)) {
process.exitValue(); // validation that the process terminated
terminated = true;
}
} catch (IllegalThreadStateException | InterruptedException ignored) { // force shutdown the process
}
// force destroy the process now - not much we can do here more than that
this.process.toHandle().destroyForcibly();
this.process = null;

// force-destroy the process in case it didn't terminate normally
if (!terminated) {
process.toHandle().destroyForcibly();
}

// stop the reading process when the process exited
if (this.logCache instanceof ProcessServiceLogCache processServiceLogCache) {
processServiceLogCache.stop();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import eu.cloudnetservice.node.service.CloudService;
import eu.cloudnetservice.node.service.CloudServiceManager;
import eu.cloudnetservice.node.service.defaults.JVMService;
import eu.cloudnetservice.node.service.defaults.log.ProcessServiceLogReadScheduler;
import eu.cloudnetservice.node.version.ServiceVersionProvider;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand All @@ -34,19 +35,22 @@ public class JVMLocalCloudServiceFactory extends BaseLocalCloudServiceFactory {
protected final TickLoop mainThread;
protected final EventManager eventManager;
protected final CloudServiceManager cloudServiceManager;
protected final ProcessServiceLogReadScheduler processLogReadScheduler;

@Inject
public JVMLocalCloudServiceFactory(
@NonNull TickLoop tickLoop,
@NonNull Configuration nodeConfig,
@NonNull CloudServiceManager cloudServiceManager,
@NonNull EventManager eventManager,
@NonNull ServiceVersionProvider versionProvider
@NonNull ServiceVersionProvider versionProvider,
@NonNull ProcessServiceLogReadScheduler processLogReadScheduler
) {
super(nodeConfig, versionProvider);
this.mainThread = tickLoop;
this.eventManager = eventManager;
this.cloudServiceManager = cloudServiceManager;
this.processLogReadScheduler = processLogReadScheduler;
}

@Override
Expand All @@ -66,7 +70,8 @@ public JVMLocalCloudServiceFactory(
manager,
this.eventManager,
this.versionProvider,
preparer);
preparer,
this.processLogReadScheduler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package eu.cloudnetservice.node.service.defaults.log;

import com.google.common.base.Preconditions;
import eu.cloudnetservice.common.util.StringUtil;
import eu.cloudnetservice.driver.service.ServiceId;
import eu.cloudnetservice.node.config.Configuration;
import java.io.BufferedReader;
Expand All @@ -25,40 +27,99 @@

public class ProcessServiceLogCache extends AbstractServiceLogCache {

public ProcessServiceLogCache(@NonNull Configuration configuration, @NonNull ServiceId associatedServiceId) {
private final ProcessServiceLogReadScheduler scheduler;

private volatile ProcessHandle targetProcess;
private BufferedReader outStreamReader;
private BufferedReader errStreamReader;

public ProcessServiceLogCache(
@NonNull Configuration configuration,
@NonNull ServiceId associatedServiceId,
@NonNull ProcessServiceLogReadScheduler scheduler
) {
super(configuration, associatedServiceId);
this.scheduler = scheduler;
}

public void start(@NonNull Process process) {
var inputStreamReader = process.inputReader(StandardCharsets.UTF_8);
this.startStreamReadingTask(inputStreamReader, false);
Preconditions.checkState(this.targetProcess == null);
this.targetProcess = process.toHandle();
this.outStreamReader = process.inputReader(StandardCharsets.UTF_8);
this.errStreamReader = process.errorReader(StandardCharsets.UTF_8);
this.scheduler.schedule(this);
}

public void stop() {
try {
var outReader = this.outStreamReader;
var errReader = this.errStreamReader;
if (outReader != null && errReader != null) {
outReader.close();
errReader.close();
this.outStreamReader = null;
this.errStreamReader = null;
}

var errorStreamReader = process.errorReader(StandardCharsets.UTF_8);
this.startStreamReadingTask(errorStreamReader, true);
// no longer targeting a process, always reset the target process
// in case something went wrong elsewhere to allow re-using this
// log cache in that case anyway
this.targetProcess = null;
} catch (IOException exception) {
LOGGER.error("Failed to close process streams of service {}", this.associatedServiceId.name(), exception);
}
}

protected void startStreamReadingTask(@NonNull BufferedReader reader, boolean isErrorStream) {
var serviceName = this.associatedServiceId.name();
var streamTypeDisplayName = isErrorStream ? "error" : "output";
var threadName = String.format("%s %s-stream reader", serviceName, streamTypeDisplayName);
public boolean readProcessOutputContent() {
try {
var outReader = this.outStreamReader;
var errReader = this.errStreamReader;
if (outReader == null || errReader == null) {
return false;
}

// try to read all lines from both stream if content is available
// these calls do not block in case the readers have no content
// available yet
this.readLinesFromStream(outReader, false);
this.readLinesFromStream(errReader, true);

// check if the target process terminated, we can stop reading
// the data streams in that case
// the data that was buffered is now removed from the reader and
// no now data will become available if the process is dead
var targetProcess = this.targetProcess;
if (targetProcess == null || !targetProcess.isAlive()) {
this.stop(); // call stop to ensure that the termination is properly handled (prevent state mismatch)
return false;
}

return true;
} catch (IOException exception) {
// stream close and read can happen concurrently, so in case the stream
// closed we don't want to log the exception but rather signal that the
// service was stopped. "stream closed" is the message for both the reader
// being closed and the file descriptor being no longer available (process terminated)
var message = StringUtil.toLower(exception.getMessage());
if (message != null && message.equals("stream closed")) {
this.stop(); // call stop to ensure that the termination is properly handled (prevent state mismatch)
LOGGER.debug("Encountered closed out/err stream for service {}, stopping", associatedServiceId);
return false;
} else {
LOGGER.error("Unable to read out/err stream of service {}", this.associatedServiceId, exception);
return true; // couldn't read this time, but maybe we can read next time?
}
}
}

Thread.ofVirtual()
.name(threadName)
.inheritInheritableThreadLocals(false)
.start(() -> {
while (true) {
try {
var logLine = reader.readLine();
if (logLine == null) {
// reached EOF, process terminated
break;
}
private void readLinesFromStream(@NonNull BufferedReader stream, boolean errStream) throws IOException {
while (stream.ready()) {
var line = stream.readLine();
if (line == null) {
break;
}

this.handleItem(logLine, isErrorStream);
} catch (IOException exception) {
LOGGER.error("Exception reading {} stream of service {}", streamTypeDisplayName, serviceName, exception);
}
}
});
this.handleItem(line, errStream);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2019-2024 CloudNetService team & contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package eu.cloudnetservice.node.service.defaults.log;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.inject.Singleton;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.NonNull;

@Singleton
public final class ProcessServiceLogReadScheduler {

private static final int LOG_READ_DELAY_MS = Integer.getInteger("cloudnet.process-log-read-delay", 25);
private static final int READ_WORKER_MAXIMUM = Integer.getInteger("cloudnet.process-log-worker-maximum", 25);
private static final int READ_ACTIONS_PER_WORKER = Integer.getInteger("cloudnet.process-log-actions-per-worker", 5);

private final AtomicInteger runningReaderActions;
private final ScheduledThreadPoolExecutor executor;

public ProcessServiceLogReadScheduler() {
var threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.NORM_PRIORITY)
.setNameFormat("process-log-reader-%d")
.build();
this.executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
this.runningReaderActions = new AtomicInteger(0);
}

public void schedule(@NonNull ProcessServiceLogCache logCache) {
var runningReaderActions = this.runningReaderActions.getAndIncrement();
if (runningReaderActions != 0 && runningReaderActions % READ_ACTIONS_PER_WORKER == 0) {
var expectedWorkerCount = (runningReaderActions / READ_ACTIONS_PER_WORKER) + 1;
this.adjustWorkerCount(expectedWorkerCount);
}

var readTask = new ProcessServiceLogReadTask(logCache, this);
this.executor.scheduleWithFixedDelay(readTask, 0, LOG_READ_DELAY_MS, TimeUnit.MILLISECONDS);
}

private void notifyLogCacheReadEnd() {
var runningReaderActions = this.runningReaderActions.decrementAndGet();
if (runningReaderActions != 0 && runningReaderActions % READ_ACTIONS_PER_WORKER == 0) {
var expectedWorkerCount = runningReaderActions / READ_ACTIONS_PER_WORKER;
this.adjustWorkerCount(expectedWorkerCount);
}
}

private void adjustWorkerCount(int expectedWorkerCount) {
var newCorePoolSize = Math.min(expectedWorkerCount, READ_WORKER_MAXIMUM);
if (this.executor.getCorePoolSize() != newCorePoolSize) {
this.executor.setCorePoolSize(expectedWorkerCount);
}
}

private record ProcessServiceLogReadTask(
@NonNull ProcessServiceLogCache logCache,
@NonNull ProcessServiceLogReadScheduler scheduler
) implements Runnable {

private static final RuntimeException CANCEL_EXCEPTION = new RuntimeException("cancelled, reached stream EOF");

@Override
public void run() {
// read the content from the stream, in case the stream closed notify the
// scheduler about this and stop scheduling the next by throwing an exception
var streamsStillOpen = this.logCache.readProcessOutputContent();
if (!streamsStillOpen) {
this.scheduler.notifyLogCacheReadEnd();
throw CANCEL_EXCEPTION;
}
}
}
}

0 comments on commit 1d34f97

Please sign in to comment.