Skip to content

Commit

Permalink
Fix context propagation bug that would link two parents in some cases (
Browse files Browse the repository at this point in the history
…#906)

What changed?

ignore active span in ExecuteWorkflow and ExecuteActivity.
Why?

Because workflowThreads are coroutines, it's possible that two workflows are referencing each other as parents.
  • Loading branch information
shijiesheng authored Jun 3, 2024
1 parent 93104af commit 12aacfe
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public TracingPropagator(Tracer tracer) {
}

public Span spanByServiceMethod(String serviceMethod) {
return tracer.buildSpan(serviceMethod).asChildOf(tracer.activeSpan()).start();
return tracer.buildSpan(serviceMethod).start();
}

public Span spanForExecuteWorkflow(DecisionContext context) {
Expand All @@ -64,6 +64,8 @@ public Span spanForExecuteWorkflow(DecisionContext context) {

return tracer
.buildSpan(EXECUTE_WORKFLOW)
.ignoreActiveSpan() // ignore active span to start a new trace that ONLY links the start
// workflow context
.addReference(
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
.withTag(TAG_WORKFLOW_TYPE, context.getWorkflowType().getName())
Expand All @@ -76,6 +78,8 @@ public Span spanForExecuteActivity(PollForActivityTaskResponse task) {
SpanContext parent = extract(task.getHeader());
return tracer
.buildSpan(EXECUTE_ACTIVITY)
.ignoreActiveSpan() // ignore active span to start a new trace that ONLY links the execute
// workflow context
.addReference(
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
.withTag(
Expand All @@ -100,6 +104,7 @@ public Span spanForExecuteLocalActivity(Task task) {
Span span =
tracer
.buildSpan(EXECUTE_LOCAL_ACTIVITY)
.ignoreActiveSpan()
.addReference(References.FOLLOWS_FROM, parent)
.withTag(TAG_WORKFLOW_ID, params.getWorkflowExecution().getWorkflowId())
.withTag(TAG_WORKFLOW_RUN_ID, params.getWorkflowExecution().getRunId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.uber.cadence.DomainAlreadyExistsError;
import com.uber.cadence.RegisterDomainRequest;
import com.uber.cadence.activity.ActivityMethod;
import com.uber.cadence.activity.ActivityOptions;
import com.uber.cadence.client.*;
import com.uber.cadence.common.RetryOptions;
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter;
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs;
import com.uber.cadence.serviceclient.ClientOptions;
Expand All @@ -41,8 +43,8 @@
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.junit.Assume;
import org.junit.Test;
Expand Down Expand Up @@ -108,7 +110,16 @@ public Integer Double(Integer i) {
}

public static class TestWorkflowImpl implements TestWorkflow {
private final TestActivity activities = Workflow.newActivityStub(TestActivity.class);
private final TestActivity activities =
Workflow.newActivityStub(
TestActivity.class,
new ActivityOptions.Builder()
.setRetryOptions(
new RetryOptions.Builder()
.setInitialInterval(Duration.ofSeconds(10))
.setMaximumAttempts(2)
.build())
.build());

@Override
public Integer AddOneThenDouble(Integer n) {
Expand Down Expand Up @@ -153,6 +164,85 @@ public void testStartWorkflowGRPC() {
testStartWorkflowHelper(service, mockTracer, true);
}

@Test
public void testStartMultipleWorkflowGRPC() {
Assume.assumeTrue(useDockerService);
MockTracer mockTracer = new MockTracer();
IWorkflowService service =
new Thrift2ProtoAdapter(
IGrpcServiceStubs.newInstance(
ClientOptions.newBuilder().setTracer(mockTracer).setPort(7833).build()));
try {
service.RegisterDomain(new RegisterDomainRequest().setName(DOMAIN));
} catch (DomainAlreadyExistsError e) {
logger.info("domain already registered");
} catch (Exception e) {
fail("fail to register domain: " + e);
}

WorkflowClient client =
WorkflowClient.newInstance(
service, WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());

WorkerFactory workerFactory =
WorkerFactory.newInstance(
client, WorkerFactoryOptions.newBuilder().setMaxWorkflowThreadCount(2).build());
Worker worker;
worker =
workerFactory.newWorker(
TASK_LIST, WorkerOptions.newBuilder().setMaxConcurrentWorkflowExecutionSize(2).build());
worker.registerActivitiesImplementations(new TestActivityImpl(mockTracer, true));
worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class, DoubleWorkflowImpl.class);
workerFactory.start();

List<CompletableFuture<Void>> futures = new ArrayList<>();

for (int i = 0; i < 100; i++) {
int finalI = i;
futures.add(
CompletableFuture.runAsync(
() -> {
Span rootSpan = mockTracer.buildSpan("workflow=" + finalI).start();
rootSpan.setBaggageItem(CONTEXT_KEY, CONTEXT_VALUE);
mockTracer.activateSpan(rootSpan);
client.newWorkflowStub(TestWorkflow.class).AddOneThenDouble(finalI);
rootSpan.finish();
}));
}
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
} catch (Exception e) {
fail("workflow failure: " + e);
} finally {
// test debug log
StringBuilder sb = new StringBuilder();

List<MockSpan> spans = mockTracer.finishedSpans();
spans.forEach(
span -> {
sb.append(span.toString()).append("\n");
});
logger.info("spans: " + sb);
workerFactory.shutdown();

// assert activity span should have only 1 parent
List<MockSpan> filtered =
spans
.stream()
.filter(
s ->
s.operationName().contains("ExecuteActivity")
|| s.operationName().contains("ExecuteLocalActivity")
|| s.operationName().contains("ExecuteWorkflow"))
.collect(Collectors.toList());
assertFalse(filtered.isEmpty());
filtered.forEach(
s -> {
assertEquals(1, s.references().size());
});
}
}

@Test
public void testSignalWithStartWorkflowTchannel() {
Assume.assumeTrue(useDockerService);
Expand Down

0 comments on commit 12aacfe

Please sign in to comment.