Skip to content

Commit

Permalink
fix(js): Limit queue batch concurrency, bump timeout, add maximum wai…
Browse files Browse the repository at this point in the history
…t period for serverless envs (#1124)

Fixes #1101 

Bumps timeouts related to large uploads, limit the total amount of
bandwidth used, and avoid blocking for too long in the default case

Adds `LANGSMITH_TRACING_BACKGROUND` to set blocking behavior

Bump to 0.2 will come along with this.
  • Loading branch information
jacoblee93 authored Oct 25, 2024
1 parent 1fbcc55 commit b9dc8f2
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 69 deletions.
10 changes: 5 additions & 5 deletions js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "langsmith",
"version": "0.1.67",
"version": "0.2.0",
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.",
"packageManager": "[email protected]",
"files": [
Expand Down Expand Up @@ -109,9 +109,9 @@
"@babel/preset-env": "^7.22.4",
"@faker-js/faker": "^8.4.1",
"@jest/globals": "^29.5.0",
"@langchain/core": "^0.3.1",
"@langchain/langgraph": "^0.2.3",
"@langchain/openai": "^0.3.0",
"@langchain/core": "^0.3.14",
"@langchain/langgraph": "^0.2.18",
"@langchain/openai": "^0.3.11",
"@tsconfig/recommended": "^1.0.2",
"@types/jest": "^29.5.1",
"@typescript-eslint/eslint-plugin": "^5.59.8",
Expand All @@ -126,7 +126,7 @@
"eslint-plugin-no-instanceof": "^1.0.1",
"eslint-plugin-prettier": "^4.2.1",
"jest": "^29.5.0",
"langchain": "^0.3.2",
"langchain": "^0.3.3",
"openai": "^4.67.3",
"prettier": "^2.8.8",
"ts-jest": "^29.1.0",
Expand Down
80 changes: 51 additions & 29 deletions js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
isLangChainMessage,
} from "./utils/messages.js";
import {
getEnvironmentVariable,
getLangChainEnvVarsMetadata,
getLangSmithEnvironmentVariable,
getRuntimeEnvironment,
Expand Down Expand Up @@ -74,6 +75,7 @@ export interface ClientConfig {
autoBatchTracing?: boolean;
batchSizeBytesLimit?: number;
blockOnRootRunFinalization?: boolean;
traceBatchConcurrency?: number;
fetchOptions?: RequestInit;
}

Expand Down Expand Up @@ -473,7 +475,10 @@ export class Client {

private settings: Promise<LangSmithSettings> | null;

private blockOnRootRunFinalization = true;
private blockOnRootRunFinalization =
getEnvironmentVariable("LANGSMITH_TRACING_BACKGROUND") === "false";

private traceBatchConcurrency = 5;

private _serverInfo: RecordStringAny | undefined;

Expand All @@ -493,9 +498,16 @@ export class Client {
if (this.webUrl?.endsWith("/")) {
this.webUrl = this.webUrl.slice(0, -1);
}
this.timeout_ms = config.timeout_ms ?? 12_000;
this.timeout_ms = config.timeout_ms ?? 90_000;
this.caller = new AsyncCaller(config.callerOptions ?? {});
this.traceBatchConcurrency =
config.traceBatchConcurrency ?? this.traceBatchConcurrency;
if (this.traceBatchConcurrency < 1) {
throw new Error("Trace batch concurrency must be positive.");
}
this.batchIngestCaller = new AsyncCaller({
maxRetries: 2,
maxConcurrency: this.traceBatchConcurrency,
...(config.callerOptions ?? {}),
onFailedResponseHook: handle429,
});
Expand Down Expand Up @@ -753,35 +765,44 @@ export class Client {
}

private async drainAutoBatchQueue() {
while (this.autoBatchQueue.items.length >= 0) {
const [batch, done] = this.autoBatchQueue.pop(
await this._getBatchSizeLimitBytes()
);
if (!batch.length) {
done();
return;
}
try {
const ingestParams = {
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
};
const serverInfo = await this._ensureServerInfo();
if (serverInfo?.batch_ingest_config?.use_multipart_endpoint) {
await this.multipartIngestRuns(ingestParams);
} else {
await this.batchIngestRuns(ingestParams);
const batchSizeLimit = await this._getBatchSizeLimitBytes();
while (this.autoBatchQueue.items.length > 0) {
for (let i = 0; i < this.traceBatchConcurrency; i++) {
const [batch, done] = this.autoBatchQueue.pop(batchSizeLimit);
if (!batch.length) {
done();
break;
}
} finally {
done();
await this.processBatch(batch, done);
}
}
}

private async processBatch(batch: AutoBatchQueueItem[], done: () => void) {
if (!batch.length) {
done();
return;
}
try {
const ingestParams = {
runCreates: batch
.filter((item) => item.action === "create")
.map((item) => item.item) as RunCreate[],
runUpdates: batch
.filter((item) => item.action === "update")
.map((item) => item.item) as RunUpdate[],
};
const serverInfo = await this._ensureServerInfo();
if (serverInfo?.batch_ingest_config?.use_multipart_endpoint) {
await this.multipartIngestRuns(ingestParams);
} else {
await this.batchIngestRuns(ingestParams);
}
} finally {
done();
}
}

private async processRunOperation(
item: AutoBatchQueueItem,
immediatelyTriggerBatch?: boolean
Expand Down Expand Up @@ -4152,8 +4173,9 @@ export class Client {
* @returns A promise that resolves once all currently pending traces have sent.
*/
public awaitPendingTraceBatches() {
return Promise.all(
this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise)
);
return Promise.all([
...this.autoBatchQueue.items.map(({ itemPromise }) => itemPromise),
this.batchIngestCaller.queue.onIdle(),
]);
}
}
2 changes: 1 addition & 1 deletion js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ export { RunTree, type RunTreeConfig } from "./run_trees.js";
export { overrideFetchImplementation } from "./singletons/fetch.js";

// Update using yarn bump-version
export const __version__ = "0.1.67";
export const __version__ = "0.2.0";
41 changes: 41 additions & 0 deletions js/src/tests/batch_client.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
waitUntilProjectFound,
waitUntilRunFound,
} from "./utils.js";
import { traceable } from "../traceable.js";

test.concurrent(
"Test persist update run",
Expand Down Expand Up @@ -241,3 +242,43 @@ test.concurrent(
},
180_000
);

test.skip("very large runs", async () => {
const langchainClient = new Client({
autoBatchTracing: true,
timeout_ms: 120_000,
});

const projectName = "__test_large_runs" + uuidv4().substring(0, 4);
await deleteProject(langchainClient, projectName);

console.time("largeRunTimer");

const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(
traceable(
async () => {
return "x".repeat(9000000);
},
{
project_name: projectName,
client: langchainClient,
tracingEnabled: true,
}
)()
);
}

await Promise.all(promises);

console.timeLog("largeRunTimer");

await langchainClient.awaitPendingTraceBatches();

console.timeLog("largeRunTimer");

await Promise.all([waitUntilProjectFound(langchainClient, projectName)]);

await langchainClient.deleteProject({ projectName });
}, 180_000);
7 changes: 4 additions & 3 deletions js/src/tests/batch_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ describe.each(ENDPOINT_TYPES)(
await new Promise((resolve) => setTimeout(resolve, 300));
});

it("Create + update batching should merge into a single call", async () => {
it.only("Create + update batching should merge into a single call", async () => {
const client = new Client({
apiKey: "test-api-key",
autoBatchTracing: true,
Expand Down Expand Up @@ -219,7 +219,7 @@ describe.each(ENDPOINT_TYPES)(
end_time: endTime,
});

await new Promise((resolve) => setTimeout(resolve, 100));
await client.awaitPendingTraceBatches();

const calledRequestParam: any = callSpy.mock.calls[0][2];
expect(await parseMockRequestBody(calledRequestParam?.body)).toEqual({
Expand Down Expand Up @@ -331,10 +331,11 @@ describe.each(ENDPOINT_TYPES)(
);
});

it("should immediately trigger a batch on root run end", async () => {
it("should immediately trigger a batch on root run end if blockOnRootRunFinalization is set", async () => {
const client = new Client({
apiKey: "test-api-key",
autoBatchTracing: true,
blockOnRootRunFinalization: true,
});
const callSpy = jest
.spyOn((client as any).batchIngestCaller, "call")
Expand Down
16 changes: 14 additions & 2 deletions js/src/tests/fetch.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-process-env */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { jest } from "@jest/globals";
import { Client } from "../client.js";
Expand All @@ -14,14 +15,24 @@ describe.each([[""], ["mocked"]])("Client uses %s fetch", (description) => {
globalFetchMock = jest.fn(() =>
Promise.resolve({
ok: true,
json: () => Promise.resolve({}),
json: () =>
Promise.resolve({
batch_ingest_config: {
use_multipart_endpoint: true,
},
}),
text: () => Promise.resolve(""),
})
);
overriddenFetch = jest.fn(() =>
Promise.resolve({
ok: true,
json: () => Promise.resolve({}),
json: () =>
Promise.resolve({
batch_ingest_config: {
use_multipart_endpoint: true,
},
}),
text: () => Promise.resolve(""),
})
);
Expand Down Expand Up @@ -78,6 +89,7 @@ describe.each([[""], ["mocked"]])("Client uses %s fetch", (description) => {
});

test("basic traceable implementation", async () => {
process.env.LANGSMITH_TRACING_BACKGROUND = "false";
const llm = traceable(
async function* llm(input: string) {
const response = input.repeat(2).split("");
Expand Down
13 changes: 9 additions & 4 deletions js/src/tests/traceable.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ test("404s should only log, not throw an error", async () => {
overrideFetchImplementation(overriddenFetch);
const client = new Client({
apiUrl: "https://foobar.notreal",
autoBatchTracing: false,
});
const llm = traceable(
async function* llm(input: string) {
Expand Down Expand Up @@ -1111,8 +1112,12 @@ test("argsConfigPath", async () => {

test("traceable continues execution when client throws error", async () => {
const errorClient = {
createRun: jest.fn().mockRejectedValue(new Error("Client error") as never),
updateRun: jest.fn().mockRejectedValue(new Error("Client error") as never),
createRun: jest
.fn()
.mockRejectedValue(new Error("Expected test client error") as never),
updateRun: jest
.fn()
.mockRejectedValue(new Error("Expected test client error") as never),
};

const tracedFunction = traceable(
Expand Down Expand Up @@ -1214,7 +1219,7 @@ test("traceable with processInputs throwing error does not affect invocation", a
const { client, callSpy } = mockClient();

const processInputs = jest.fn((_inputs: Readonly<KVMap>) => {
throw new Error("processInputs error");
throw new Error("totally expected test processInputs error");
});

const func = traceable(
Expand Down Expand Up @@ -1250,7 +1255,7 @@ test("traceable with processOutputs throwing error does not affect invocation",
const { client, callSpy } = mockClient();

const processOutputs = jest.fn((_outputs: Readonly<KVMap>) => {
throw new Error("processOutputs error");
throw new Error("totally expected test processInputs error");
});

const func = traceable(
Expand Down
2 changes: 1 addition & 1 deletion js/src/utils/async_caller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class AsyncCaller {

protected maxRetries: AsyncCallerParams["maxRetries"];

private queue: typeof import("p-queue")["default"]["prototype"];
queue: typeof import("p-queue")["default"]["prototype"];

private onFailedResponseHook?: ResponseCallback;

Expand Down
Loading

0 comments on commit b9dc8f2

Please sign in to comment.