Skip to content

Commit

Permalink
Merge pull request #86 from grafana/handle-errors-in-streams
Browse files Browse the repository at this point in the history
feat(openai): handle error messages in chat completion streams
  • Loading branch information
sd2k authored Sep 27, 2023
2 parents a74c827 + 0e7e873 commit d4f70ac
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions src/llms/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { isLiveChannelMessageEvent, LiveChannelAddress, LiveChannelMessageEvent,
import { getBackendSrv, getGrafanaLiveSrv, logDebug } from "@grafana/runtime";

import { pipe, Observable, UnaryFunction } from "rxjs";
import { filter, map, scan, takeWhile } from "rxjs/operators";
import { filter, map, scan, takeWhile, tap } from "rxjs/operators";

import { LLM_PLUGIN_ID, LLM_PLUGIN_ROUTE, setLLMPluginVersion } from "./constants";
import { LLMAppHealthCheck } from "./types";
Expand Down Expand Up @@ -170,6 +170,12 @@ export interface Usage {
total_tokens: number;
}

/** The error response from the Grafana LLM app when trying to call the chat completions API. */
interface ChatCompletionsErrorResponse {
/** The error message. */
error: string;
}

/** A response from the OpenAI Chat Completions API. */
export interface ChatCompletionsResponse<T = Choice> {
/** The ID of the request. */
Expand Down Expand Up @@ -230,6 +236,11 @@ export function isDoneMessage(message: any): message is DoneMessage {
return message.done !== undefined
}

/** Return true if the response is an error response. */
export function isErrorResponse(response: any): response is ChatCompletionsErrorResponse {
return response.error !== undefined;
}

/**
* An rxjs operator that extracts the content messages from a stream of chat completion responses.
*
Expand All @@ -240,7 +251,7 @@ export function isDoneMessage(message: any): message is DoneMessage {
* { role: 'system', content: 'You are a great bot.' },
* { role: 'user', content: 'Hello, bot.' },
* ]}).pipe(extractContent());
* stream.subscribe(console.log);
* stream.subscribe({ next: console.log, error: console.error });
* // Output:
* // ['Hello', '? ', 'How ', 'are ', 'you', '?']
*/
Expand All @@ -262,7 +273,7 @@ export function extractContent(): UnaryFunction<Observable<ChatCompletionsRespon
* { role: 'system', content: 'You are a great bot.' },
* { role: 'user', content: 'Hello, bot.' },
* ]}).pipe(accumulateContent());
* stream.subscribe(console.log);
* stream.subscribe({ next: console.log, error: console.error });
* // Output:
* // ['Hello', 'Hello! ', 'Hello! How ', 'Hello! How are ', 'Hello! How are you', 'Hello! How are you?']
*/
Expand Down Expand Up @@ -297,7 +308,7 @@ export async function chatCompletions(request: ChatCompletionsRequest): Promise<
* { role: 'system', content: 'You are a great bot.' },
* { role: 'user', content: 'Hello, bot.' },
* ]}).pipe(extractContent());
* stream.subscribe(console.log);
* stream.subscribe({ next: console.log, error: console.error });
* // Output:
* // ['Hello', '? ', 'How ', 'are ', 'you', '?']
*
Expand All @@ -306,7 +317,7 @@ export async function chatCompletions(request: ChatCompletionsRequest): Promise<
* { role: 'system', content: 'You are a great bot.' },
* { role: 'user', content: 'Hello, bot.' },
* ]}).pipe(accumulateContent());
* stream.subscribe(console.log);
* stream.subscribe({ next: console.log, error: console.error });
* // Output:
* // ['Hello', 'Hello! ', 'Hello! How ', 'Hello! How are ', 'Hello! How are you', 'Hello! How are you?']
*/
Expand All @@ -321,7 +332,12 @@ export function streamChatCompletions(request: ChatCompletionsRequest): Observab
.getStream(channel)
.pipe(filter((event) => isLiveChannelMessageEvent(event))) as Observable<LiveChannelMessageEvent<ChatCompletionsResponse<ChatCompletionsChunk>>>
return messages.pipe(
takeWhile((event) => !isDoneMessage(event.message.choices[0].delta)),
tap((event) => {
if (isErrorResponse(event.message)) {
throw new Error(event.message.error);
}
}),
takeWhile((event) => isErrorResponse(event.message) || !isDoneMessage(event.message.choices[0].delta)),
map((event) => event.message),
);
}
Expand Down

0 comments on commit d4f70ac

Please sign in to comment.