From 63f54f82ef407d735e5629efaea4e1e9dac2fb55 Mon Sep 17 00:00:00 2001 From: dvoytenko Date: Thu, 1 Feb 2024 13:36:19 -0800 Subject: [PATCH 1/4] Fix race condition between response end and the trace end --- .../src/processor/composite-span-processor.ts | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/packages/otel/src/processor/composite-span-processor.ts b/packages/otel/src/processor/composite-span-processor.ts index 06123f0..498f2b9 100644 --- a/packages/otel/src/processor/composite-span-processor.ts +++ b/packages/otel/src/processor/composite-span-processor.ts @@ -11,6 +11,7 @@ import { getVercelRequestContextAttributes } from "../vercel-request-context/att /** @internal */ export class CompositeSpanProcessor implements SpanProcessor { private readonly rootSpanIds = new Map(); + private readonly waitSpanEnd = new Map void>(); constructor(private processors: SpanProcessor[]) {} @@ -45,7 +46,29 @@ export class CompositeSpanProcessor implements SpanProcessor { // Flush the streams to avoid data loss. const vrc = getVercelRequestContext(); if (vrc) { - vrc.waitUntil(() => this.forceFlush()); + vrc.waitUntil(async () => { + console.log( + "QQQQ: waitUntil: pending? ", + traceId, + this.rootSpanIds.has(traceId) + ); + if (this.rootSpanIds.has(traceId)) { + // Not root has not completed yet, so no point in flushing. + // Need to wait for onEnd. + const promise = new Promise((resolve) => { + this.waitSpanEnd.set(traceId, resolve); + }); + await Promise.race([ + promise, + new Promise((resolve) => { + console.log("QQQQ: waitUntil: timeout", traceId); + setTimeout(resolve, 50); + }), + ]); + } + console.log("QQQQ: waitUntil: execute", traceId); + return this.forceFlush(); + }); } } @@ -57,13 +80,20 @@ export class CompositeSpanProcessor implements SpanProcessor { onEnd(span: ReadableSpan): void { const { traceId, spanId } = span.spanContext(); const isRoot = this.rootSpanIds.get(traceId) === spanId; - if (isRoot) { - this.rootSpanIds.delete(traceId); - } for (const spanProcessor of this.processors) { spanProcessor.onEnd(span); } + + if (isRoot) { + this.rootSpanIds.delete(traceId); + const pending = this.waitSpanEnd.get(traceId); + if (pending) { + console.log("QQQQ: onEnd: resolve pending", traceId); + this.waitSpanEnd.delete(traceId); + pending(); + } + } } } From ef121f289a514f6c37fc7e5ca29c418f7f41b403 Mon Sep 17 00:00:00 2001 From: dvoytenko Date: Thu, 1 Feb 2024 13:39:53 -0800 Subject: [PATCH 2/4] changeset --- .changeset/early-coats-pull.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/early-coats-pull.md diff --git a/.changeset/early-coats-pull.md b/.changeset/early-coats-pull.md new file mode 100644 index 0000000..1eb0dec --- /dev/null +++ b/.changeset/early-coats-pull.md @@ -0,0 +1,5 @@ +--- +"@vercel/otel": patch +--- + +Eliminate race condition between response end and trace end From c67c01a2e065c1279a79dbcf32c5b2cc335de108 Mon Sep 17 00:00:00 2001 From: dvoytenko Date: Thu, 1 Feb 2024 13:46:15 -0800 Subject: [PATCH 3/4] remove pending on timeout --- packages/otel/src/processor/composite-span-processor.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/otel/src/processor/composite-span-processor.ts b/packages/otel/src/processor/composite-span-processor.ts index 498f2b9..f7ee638 100644 --- a/packages/otel/src/processor/composite-span-processor.ts +++ b/packages/otel/src/processor/composite-span-processor.ts @@ -61,8 +61,11 @@ export class CompositeSpanProcessor implements SpanProcessor { await Promise.race([ promise, new Promise((resolve) => { - console.log("QQQQ: waitUntil: timeout", traceId); - setTimeout(resolve, 50); + setTimeout(() => { + console.log("QQQQ: waitUntil: timeout", traceId); + this.waitSpanEnd.delete(traceId); + resolve(undefined); + }, 50); }), ]); } From 364208932ef6af2b1fac9b6d6198bd3f704b8266 Mon Sep 17 00:00:00 2001 From: dvoytenko Date: Thu, 1 Feb 2024 17:19:35 -0800 Subject: [PATCH 4/4] drain response and clear timeout --- .github/workflows/release-snapshot.yaml | 4 ++-- packages/otel/src/exporters/otlp-exporter-base.ts | 2 ++ .../otel/src/processor/composite-span-processor.ts | 14 +++++--------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/.github/workflows/release-snapshot.yaml b/.github/workflows/release-snapshot.yaml index 44040ff..1b0d3eb 100644 --- a/.github/workflows/release-snapshot.yaml +++ b/.github/workflows/release-snapshot.yaml @@ -45,7 +45,7 @@ on: jobs: release-snapshot: - name: Release + name: Release Snapshot runs-on: ubuntu-latest steps: - name: Checkout @@ -68,7 +68,7 @@ jobs: run: pnpm install - name: Build - run: pnpm build + run: pnpm --filter @vercel/otel build - name: Add SHORT_SHA env property with commit short sha run: echo "SHORT_SHA=`echo ${{ github.sha }} | cut -c1-8`" >> $GITHUB_ENV diff --git a/packages/otel/src/exporters/otlp-exporter-base.ts b/packages/otel/src/exporters/otlp-exporter-base.ts index 6ee0754..7f941a3 100644 --- a/packages/otel/src/exporters/otlp-exporter-base.ts +++ b/packages/otel/src/exporters/otlp-exporter-base.ts @@ -67,6 +67,8 @@ export abstract class OTLPExporterEdgeBase< .then((res) => { diag.debug("@vercel/otel/otlp: onSuccess", res.status, res.statusText); onSuccess(); + // Drain the response body. + void res.arrayBuffer().catch(() => undefined); }) .catch((err) => { diag.error("@vercel/otel/otlp: onError", err); diff --git a/packages/otel/src/processor/composite-span-processor.ts b/packages/otel/src/processor/composite-span-processor.ts index f7ee638..77ae88d 100644 --- a/packages/otel/src/processor/composite-span-processor.ts +++ b/packages/otel/src/processor/composite-span-processor.ts @@ -47,29 +47,26 @@ export class CompositeSpanProcessor implements SpanProcessor { const vrc = getVercelRequestContext(); if (vrc) { vrc.waitUntil(async () => { - console.log( - "QQQQ: waitUntil: pending? ", - traceId, - this.rootSpanIds.has(traceId) - ); if (this.rootSpanIds.has(traceId)) { // Not root has not completed yet, so no point in flushing. // Need to wait for onEnd. const promise = new Promise((resolve) => { this.waitSpanEnd.set(traceId, resolve); }); + let timer: NodeJS.Timeout | undefined; await Promise.race([ promise, new Promise((resolve) => { - setTimeout(() => { - console.log("QQQQ: waitUntil: timeout", traceId); + timer = setTimeout(() => { this.waitSpanEnd.delete(traceId); resolve(undefined); }, 50); }), ]); + if (timer) { + clearTimeout(timer); + } } - console.log("QQQQ: waitUntil: execute", traceId); return this.forceFlush(); }); } @@ -92,7 +89,6 @@ export class CompositeSpanProcessor implements SpanProcessor { this.rootSpanIds.delete(traceId); const pending = this.waitSpanEnd.get(traceId); if (pending) { - console.log("QQQQ: onEnd: resolve pending", traceId); this.waitSpanEnd.delete(traceId); pending(); }