Skip to content

Commit

Permalink
refactor: improve service process output reading (#1490)
Browse files Browse the repository at this point in the history
### Motivation
Currently service process output streams are read once per second on a
single shared thread. This causes issues with delayed logs, especially
when the JVM crashes at startup as 99% of the times the JVM crash reason
wasn't read from the process error stream. This leaves users with nearly
undiagnosable startup issues.

Example log output with an invalid JVM flag before the change:
```
4.0.0-RC11-SNAPSHOT-f66323ca => create by Proxy 1 --start
[26.08 09:26:50.417] INFO : Starting to create 1 services for Proxy
[26.08 09:26:50.641] INFO : CloudService [uniqueId=04789662-d056-48ba-975a-f69993ed56dd task=Proxy name=Proxy-1] has been started
[26.08 09:26:50.642] INFO : The services were created based on the task. They can be managed with the service command
[26.08 09:26:50.703] INFO : CloudService [uniqueId=04789662-d056-48ba-975a-f69993ed56dd task=Proxy name=Proxy-1] has been stopped
```

### Modification
Now the log cache uses virtual threads to read the process output
streams. This is very efficient as the whole process stream reading is
based on I/O operations which means that the virtual thread can be
unmounted until some data is available to be read. This means that even
with starting 2 virtual threads per service, there shouldn't be any
noticeable performance impacts on the node at all.

Additionally, empty log lines are now ignored in the parent log cache
class and there is no need anymore for log caches to implement the
handling themselfes. Also a small issue with caching was fixed that
caused the cache to still contain 1 item even if the log cache entry
limit was set to 0.

### Result
Log output of services is not read immediately once data is available
from the input or output stream, making the output more responsive (as
it is nearly immediately printed) and also catches everything that is
being logged.

Example log output with an invalid JVM flag & this change applied:
```
4.0.0-RC11-SNAPSHOT-b99d7fbc => create by Proxy 1 --start
[26.08 09:21:59.245] INFO : Starting to create 1 services for Proxy
[26.08 09:21:59.289] INFO : CloudService [uniqueId=3a718134-d248-41d4-9f40-75751e1c1d64 task=Proxy name=Proxy-1] has been started
[26.08 09:21:59.289] INFO : The services were created based on the task. They can be managed with the service command
[26.08 09:21:59.295] WARN : [Proxy-1/WARN]: Unrecognized option: -XIReallyKnowWhatIAmDoingISwear
[26.08 09:21:59.296] WARN : [Proxy-1/WARN]: Error: Could not create the Java Virtual Machine.
[26.08 09:21:59.296] WARN : [Proxy-1/WARN]: Error: A fatal exception has occurred. Program will exit.
[26.08 09:22:00.303] INFO : CloudService [uniqueId=3a718134-d248-41d4-9f40-75751e1c1d64 task=Proxy name=Proxy-1] has been stopped
```
  • Loading branch information
derklaro authored Aug 30, 2024
1 parent 784e801 commit 1ef60be
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 138 deletions.
2 changes: 1 addition & 1 deletion checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"https://checkstyle.org/dtds/configuration_1_3.dtd">
<module name="Checker">
<module name="BeforeExecutionExclusionFileFilter">
<property name="fileNamePattern" value="(module\-info\.java$)|(.*[\\|\/]fabric[\\|\/].*$)"/>
<property name="fileNamePattern" value="(module\-info\.java$)|(.*[\\|\/]fabric[\\|\/].*|(JVM|Dockerized)Service\.java$)"/>
</module>
<module name="SuppressionFilter">
<property default="checkstyle-suppressions.xml" name="file"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public class DockerizedService extends JVMService {

protected final DockerClient dockerClient;
protected final DockerConfiguration configuration;
protected final DockerizedServiceLogCache logCache;

protected volatile String containerId;

Expand All @@ -104,13 +103,19 @@ protected DockerizedService(
@NonNull DockerClient dockerClient,
@NonNull DockerConfiguration dockerConfiguration
) {
super(tickLoop, nodeConfig, configuration, manager, eventManager, versionProvider, serviceConfigurationPreparer);
var logCache = new DockerizedServiceLogCache(nodeConfig, configuration.serviceId());
super(
tickLoop,
nodeConfig,
configuration,
manager,
eventManager,
logCache,
versionProvider,
serviceConfigurationPreparer);

this.dockerClient = dockerClient;
this.configuration = dockerConfiguration;

super.logCache = this.logCache = new DockerizedServiceLogCache(nodeConfig, this);
this.initLogHandler();
}

@Override
Expand Down Expand Up @@ -356,8 +361,8 @@ protected boolean needsImagePull(@NonNull DockerImage image) {
public final class ServiceLogCacheAdapter extends ResultCallback.Adapter<Frame> {

@Override
public void onNext(Frame object) {
DockerizedService.this.logCache.handle(object);
public void onNext(@NonNull Frame object) {
((DockerizedServiceLogCache) DockerizedService.this.logCache).handle(object);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@
package eu.cloudnetservice.modules.docker;

import com.github.dockerjava.api.model.Frame;
import eu.cloudnetservice.driver.service.ServiceId;
import eu.cloudnetservice.node.config.Configuration;
import eu.cloudnetservice.node.service.CloudService;
import eu.cloudnetservice.node.service.ServiceConsoleLogCache;
import eu.cloudnetservice.node.service.defaults.log.AbstractServiceLogCache;
import java.nio.charset.StandardCharsets;
import lombok.NonNull;

public class DockerizedServiceLogCache extends AbstractServiceLogCache {

public DockerizedServiceLogCache(@NonNull Configuration configuration, @NonNull CloudService service) {
super(configuration, service);
}

@Override
public @NonNull ServiceConsoleLogCache update() {
return this;
public DockerizedServiceLogCache(@NonNull Configuration configuration, @NonNull ServiceId associatedServiceId) {
super(configuration, associatedServiceId);
}

public void handle(@NonNull Frame frame) {
Expand All @@ -49,9 +43,7 @@ protected void handleItem(@NonNull String content, boolean comesFromErrorStream)
if (content.contains("\n") || content.contains("\r")) {
for (var input : content.split("\r")) {
for (var text : input.split("\n")) {
if (!text.trim().isEmpty()) {
super.handleItem(text, comesFromErrorStream);
}
super.handleItem(text, comesFromErrorStream);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@

package eu.cloudnetservice.node.service;

import eu.cloudnetservice.driver.service.ServiceId;
import java.util.Collection;
import java.util.Queue;
import lombok.NonNull;
import org.jetbrains.annotations.UnmodifiableView;

public interface ServiceConsoleLogCache {

@NonNull Queue<String> cachedLogMessages();

@NonNull ServiceConsoleLogCache update();
@NonNull
Queue<String> cachedLogMessages();

@NonNull CloudService service();
@NonNull
ServiceId associatedServiceId();

int logCacheSize();

Expand All @@ -42,5 +43,6 @@ public interface ServiceConsoleLogCache {
void removeHandler(@NonNull ServiceConsoleLineHandler handler);

@NonNull
@UnmodifiableView Collection<ServiceConsoleLineHandler> handlers();
@UnmodifiableView
Collection<ServiceConsoleLineHandler> handlers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import eu.cloudnetservice.driver.channel.ChannelMessageTarget;
import eu.cloudnetservice.driver.document.Document;
import eu.cloudnetservice.driver.event.EventManager;
import eu.cloudnetservice.driver.event.events.service.CloudServiceLogEntryEvent;
import eu.cloudnetservice.driver.network.HostAndPort;
import eu.cloudnetservice.driver.network.NetworkChannel;
import eu.cloudnetservice.driver.network.buffer.DataBuf;
Expand Down Expand Up @@ -100,6 +101,7 @@ public abstract class AbstractService implements CloudService {
protected final TickLoop mainThread;
protected final EventManager eventManager;
protected final Configuration configuration;
protected final ServiceConsoleLogCache logCache;
protected final CloudServiceManager cloudServiceManager;
protected final ServiceConfiguration serviceConfiguration;
protected final ServiceVersionProvider serviceVersionProvider;
Expand All @@ -116,8 +118,6 @@ public abstract class AbstractService implements CloudService {
protected final Collection<ServiceRemoteInclusion> installedInclusions = ConcurrentHashMap.newKeySet();
protected final Collection<ServiceDeployment> installedDeployments = ConcurrentHashMap.newKeySet();

protected ServiceConsoleLogCache logCache;

protected volatile NetworkChannel networkChannel;
protected volatile long connectionTimestamp = -1;

Expand All @@ -130,9 +130,11 @@ protected AbstractService(
@NonNull ServiceConfiguration configuration,
@NonNull CloudServiceManager manager,
@NonNull EventManager eventManager,
@NonNull ServiceConsoleLogCache logCache,
@NonNull ServiceVersionProvider versionProvider,
@NonNull ServiceConfigurationPreparer serviceConfigurationPreparer
) {
this.logCache = logCache;
this.mainThread = tickLoop;
this.configuration = nodeConfig;
this.eventManager = eventManager;
Expand All @@ -155,6 +157,7 @@ protected AbstractService(
ServiceLifeCycle.PREPARED,
configuration.propertyHolder().immutableCopy());
this.pushServiceInfoSnapshotUpdate(ServiceLifeCycle.PREPARED, false);
this.initStandardServiceLogHandler();

// register the service locally for now
manager.registerUnacceptedService(this);
Expand Down Expand Up @@ -782,6 +785,33 @@ protected void downloadInclusionFile(@NonNull ServiceRemoteInclusion inclusion,
this.serviceId().nodeUniqueId()};
}

protected void initStandardServiceLogHandler() {
this.logCache.addHandler((_, line, stderr) -> {
for (var logTarget : this.logTargets) {
if (logTarget.first().equals(ChannelMessageSender.self().toTarget())) {
// the current target is the node this service is running on, print it directly here
this.eventManager.callEvent(logTarget.second(), new CloudServiceLogEntryEvent(
this.currentServiceInfo,
line,
stderr ? CloudServiceLogEntryEvent.StreamType.STDERR : CloudServiceLogEntryEvent.StreamType.STDOUT));
} else {
// the listener is listening remotely, send the line to the network component
ChannelMessage.builder()
.target(logTarget.first())
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
.message("screen_new_line")
.buffer(DataBuf.empty()
.writeObject(this.currentServiceInfo)
.writeString(logTarget.second())
.writeString(line)
.writeBoolean(stderr))
.build()
.send();
}
}
});
}

protected abstract void startProcess();

protected abstract void stopProcess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,14 @@ public DefaultCloudServiceManager(
})
.currentGetter(group -> this.serviceProviderByName(group.name()).serviceInfo())
.build());
// schedule the updating of the local service log cache

// schedule the service watchdog to run once per second
mainThread.scheduleTask(() -> {
for (var service : this.localCloudServices()) {
// we only need to look at running services
if (service.lifeCycle() == ServiceLifeCycle.RUNNING) {
// detect dead services and stop them
if (service.alive()) {
service.serviceConsoleLogCache().update();
LOGGER.trace("Updated service log cache of {}", service.serviceId().name());
} else {
eventManager.callEvent(new CloudServicePreForceStopEvent(service));
service.stop();
LOGGER.trace("Stopped dead service {}", service.serviceId().name());
}
if (service.lifeCycle() == ServiceLifeCycle.RUNNING && !service.alive()) {
eventManager.callEvent(new CloudServicePreForceStopEvent(service));
service.stop();
LOGGER.debug("Stopped dead service {}", service.serviceId().name());
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@
import eu.cloudnetservice.common.language.I18n;
import eu.cloudnetservice.common.tuple.Tuple2;
import eu.cloudnetservice.common.util.StringUtil;
import eu.cloudnetservice.driver.channel.ChannelMessage;
import eu.cloudnetservice.driver.channel.ChannelMessageSender;
import eu.cloudnetservice.driver.event.EventManager;
import eu.cloudnetservice.driver.event.events.service.CloudServiceLogEntryEvent;
import eu.cloudnetservice.driver.network.buffer.DataBuf;
import eu.cloudnetservice.driver.network.def.NetworkConstants;
import eu.cloudnetservice.driver.service.ServiceConfiguration;
import eu.cloudnetservice.driver.service.ServiceEnvironment;
import eu.cloudnetservice.driver.service.ServiceEnvironmentType;
Expand All @@ -37,6 +32,7 @@
import eu.cloudnetservice.node.event.service.CloudServicePreProcessStartEvent;
import eu.cloudnetservice.node.service.CloudServiceManager;
import eu.cloudnetservice.node.service.ServiceConfigurationPreparer;
import eu.cloudnetservice.node.service.ServiceConsoleLogCache;
import eu.cloudnetservice.node.service.defaults.log.ProcessServiceLogCache;
import eu.cloudnetservice.node.version.ServiceVersionProvider;
import io.vavr.CheckedFunction1;
Expand Down Expand Up @@ -86,9 +82,37 @@ public JVMService(
@NonNull ServiceVersionProvider versionProvider,
@NonNull ServiceConfigurationPreparer serviceConfigurationPreparer
) {
super(tickLoop, nodeConfig, configuration, manager, eventManager, versionProvider, serviceConfigurationPreparer);
super.logCache = new ProcessServiceLogCache(() -> this.process, nodeConfig, this);
this.initLogHandler();
var logCache = new ProcessServiceLogCache(nodeConfig, configuration.serviceId());
this(
tickLoop,
nodeConfig,
configuration,
manager,
eventManager,
logCache,
versionProvider,
serviceConfigurationPreparer);
}

protected JVMService(
@NonNull TickLoop tickLoop,
@NonNull Configuration nodeConfig,
@NonNull ServiceConfiguration configuration,
@NonNull CloudServiceManager manager,
@NonNull EventManager eventManager,
@NonNull ServiceConsoleLogCache logCache,
@NonNull ServiceVersionProvider versionProvider,
@NonNull ServiceConfigurationPreparer serviceConfigurationPreparer
) {
super(
tickLoop,
nodeConfig,
configuration,
manager,
eventManager,
logCache,
versionProvider,
serviceConfigurationPreparer);
}

@Override
Expand Down Expand Up @@ -231,6 +255,13 @@ protected void doStartProcess(
// start the process and fire the post start event
this.process = builder.start();
this.eventManager.callEvent(new CloudServicePostProcessStartEvent(this));

// start the log reading unless some user code changed the log cache type
// in that case it's up to the user to start the reading process
if (super.logCache instanceof ProcessServiceLogCache processServiceLogCache) {
processServiceLogCache.start(this.process);
LOGGER.debug("Started {} log cache for service {}", super.logCache.getClass(), this.serviceId());
}
} catch (IOException exception) {
LOGGER.error(
"Unable to start process in {} with command line {}",
Expand All @@ -240,33 +271,6 @@ protected void doStartProcess(
}
}

protected void initLogHandler() {
super.logCache.addHandler(($, line, stderr) -> {
for (var logTarget : super.logTargets) {
if (logTarget.first().equals(ChannelMessageSender.self().toTarget())) {
// the current target is the node this service is running on, print it directly here
this.eventManager.callEvent(logTarget.second(), new CloudServiceLogEntryEvent(
this.currentServiceInfo,
line,
stderr ? CloudServiceLogEntryEvent.StreamType.STDERR : CloudServiceLogEntryEvent.StreamType.STDOUT));
} else {
// the listener is listening remotely, send the line to the network component
ChannelMessage.builder()
.target(logTarget.first())
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
.message("screen_new_line")
.buffer(DataBuf.empty()
.writeObject(this.currentServiceInfo)
.writeString(logTarget.second())
.writeString(line)
.writeBoolean(stderr))
.build()
.send();
}
}
});
}

protected @Nullable Tuple2<Path, Attributes> prepareWrapperFile() {
// check if the wrapper file is there - unpack it if not
if (Files.notExists(WRAPPER_TEMP_FILE)) {
Expand Down
Loading

0 comments on commit 1ef60be

Please sign in to comment.