diff --git a/extensions/opentelemetry/deployment/src/main/java/org/apache/camel/quarkus/component/opentelemetry/deployment/OpenTelemetryProcessor.java b/extensions/opentelemetry/deployment/src/main/java/org/apache/camel/quarkus/component/opentelemetry/deployment/OpenTelemetryProcessor.java index 10408415410a..3c16a8189434 100644 --- a/extensions/opentelemetry/deployment/src/main/java/org/apache/camel/quarkus/component/opentelemetry/deployment/OpenTelemetryProcessor.java +++ b/extensions/opentelemetry/deployment/src/main/java/org/apache/camel/quarkus/component/opentelemetry/deployment/OpenTelemetryProcessor.java @@ -16,11 +16,19 @@ */ package org.apache.camel.quarkus.component.opentelemetry.deployment; +import java.nio.file.Paths; +import java.util.Map; + import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.opentelemetry.deployment.tracing.TracerEnabled; import org.apache.camel.quarkus.component.opentelemetry.OpenTelemetryTracerProducer; +import org.apache.camel.quarkus.core.deployment.spi.CamelServiceBuildItem; +import org.apache.camel.quarkus.core.deployment.spi.CamelServiceDestination; +import org.apache.camel.quarkus.core.deployment.spi.CamelServicePatternBuildItem; +import org.apache.camel.spi.FactoryFinder; class OpenTelemetryProcessor { @@ -38,4 +46,25 @@ AdditionalBeanBuildItem openTelemetryTracerProducerBean() { .addBeanClass(OpenTelemetryTracerProducer.class) .build(); } + + // TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669 + @BuildStep + void overrideCamelOpenTelemetryThreadPoolServices( + BuildProducer camelServicePattern, + BuildProducer camelService) { + + Map.of("thread-pool-factory", "OpenTelemetryInstrumentedThreadPoolFactory", + "thread-factory-listener", "OpenTelemetryInstrumentedThreadFactoryListener") + .forEach((serviceName, type) -> { + String servicePath = FactoryFinder.DEFAULT_PATH + serviceName; + // Disable broken original service + camelServicePattern + .produce(new CamelServicePatternBuildItem(CamelServiceDestination.DISCOVERY, false, servicePath)); + + // Replace with working + camelService.produce(new CamelServiceBuildItem(Paths.get(servicePath), + "org.apache.camel.quarkus.component.opentelemetry.patch.%s".formatted(type))); + }); + + } } diff --git a/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadFactoryListener.java b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadFactoryListener.java new file mode 100644 index 000000000000..655d5a39be4a --- /dev/null +++ b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadFactoryListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.opentelemetry.patch; + +import java.util.concurrent.ThreadFactory; + +import io.opentelemetry.context.Context; +import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.annotations.JdkService; + +/** + * TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669 + */ +@JdkService(ExecutorServiceManager.ThreadFactoryListener.FACTORY) +public class OpenTelemetryInstrumentedThreadFactoryListener implements ExecutorServiceManager.ThreadFactoryListener { + + @Override + public ThreadFactory onNewThreadFactory(ThreadFactory factory) { + return runnable -> factory.newThread(Context.current().wrap(runnable)); + } +} diff --git a/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadPoolFactory.java b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadPoolFactory.java new file mode 100644 index 000000000000..f379046b6682 --- /dev/null +++ b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/OpenTelemetryInstrumentedThreadPoolFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.opentelemetry.patch; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.context.Context; +import org.apache.camel.quarkus.component.opentelemetry.patch.internal.CurrentContextScheduledExecutorService; +import org.apache.camel.spi.ThreadPoolFactory; +import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.spi.annotations.JdkService; +import org.apache.camel.support.DefaultThreadPoolFactory; + +/** + * TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669 + */ +@JdkService(ThreadPoolFactory.FACTORY) +public class OpenTelemetryInstrumentedThreadPoolFactory extends DefaultThreadPoolFactory implements ThreadPoolFactory { + + @Override + public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return Context.taskWrapping(super.newCachedThreadPool(threadFactory)); + } + + @Override + public ExecutorService newThreadPool( + int corePoolSize, + int maxPoolSize, + long keepAliveTime, + TimeUnit timeUnit, + int maxQueueSize, + boolean allowCoreThreadTimeOut, + RejectedExecutionHandler rejectedExecutionHandler, + ThreadFactory threadFactory) + throws IllegalArgumentException { + + ExecutorService executorService = super.newThreadPool( + corePoolSize, + maxPoolSize, + keepAliveTime, + timeUnit, + maxQueueSize, + allowCoreThreadTimeOut, + rejectedExecutionHandler, + threadFactory); + + return Context.taskWrapping(executorService); + } + + @Override + public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { + return new CurrentContextScheduledExecutorService(super.newScheduledThreadPool(profile, threadFactory)); + } + +} diff --git a/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/CurrentContextScheduledExecutorService.java b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/CurrentContextScheduledExecutorService.java new file mode 100644 index 000000000000..7e499657dbde --- /dev/null +++ b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/CurrentContextScheduledExecutorService.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.opentelemetry.patch.internal; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.opentelemetry.context.Context; + +/** + * TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669 + */ +public class CurrentContextScheduledExecutorService extends ForwardingScheduledExecutorService { + + public CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) { + super(delegate); + } + + @Override + public Future submit(Callable task) { + return delegate().submit(Context.current().wrap(task)); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate().submit(Context.current().wrap(task), result); + } + + @Override + public Future submit(Runnable task) { + return delegate().submit(Context.current().wrap(task)); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate().invokeAll(wrap(Context.current(), tasks)); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate().invokeAll(wrap(Context.current(), tasks), timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate().invokeAny(wrap(Context.current(), tasks)); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate().invokeAny(wrap(Context.current(), tasks), timeout, unit); + } + + @Override + public void execute(Runnable command) { + delegate().execute(Context.current().wrap(command)); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return delegate().schedule(Context.current().wrap(command), delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return delegate().schedule(Context.current().wrap(callable), delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return delegate().scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return delegate().scheduleWithFixedDelay(Context.current().wrap(command), initialDelay, delay, unit); + } +} diff --git a/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/ForwardingScheduledExecutorService.java b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/ForwardingScheduledExecutorService.java new file mode 100644 index 000000000000..e6f3d05fe6bb --- /dev/null +++ b/extensions/opentelemetry/runtime/src/main/java/org/apache/camel/quarkus/component/opentelemetry/patch/internal/ForwardingScheduledExecutorService.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.opentelemetry.patch.internal; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.opentelemetry.context.Context; + +/** + * TODO: Remove this: https://github.com/apache/camel-quarkus/issues/6669 + */ +abstract class ForwardingScheduledExecutorService implements ScheduledExecutorService { + + private final ScheduledExecutorService delegate; + + protected ForwardingScheduledExecutorService(ScheduledExecutorService delegate) { + this.delegate = delegate; + } + + ScheduledExecutorService delegate() { + return delegate; + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + protected static Collection> wrap(Context context, Collection> tasks) { + List> wrapped = new ArrayList<>(); + for (Callable task : tasks) { + wrapped.add(context.wrap(task)); + } + return wrapped; + } +}