Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load metadata from stream instead of headers. #31

Merged
merged 3 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export class Chat {
decoder,
readableStream,
opts.statusStream,
).then((content) => {
).then(([content]) => {
const messages: Message[] = [
{
role: "user",
Expand Down Expand Up @@ -296,7 +296,7 @@ export class Chat {
decoder,
readableStream,
opts.statusStream,
).then((content) => {
).then(([content]) => {
this.messages.push(
{
role: "user",
Expand Down
84 changes: 42 additions & 42 deletions content.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ export type ContentListOptions = {
cortexName?: string;
};

export type ContentMetadata = {
id: string;
title: string;
version: number;
commands: ContentCommand[];
cortex: string;
createdAt: string;
userEmail: string;
status: ContentStatus;
publishedVersion: number | null;
};

export class Content {
get id() {
return this._id;
Expand Down Expand Up @@ -194,44 +206,37 @@ export class Content {
title,
prompt,
stream,
noContentInHeaders: true,
});
const reader = res.body!.getReader();
const decoder = new TextDecoder("utf-8");

const id: string = res.headers.get("id") || "";
const version: number = parseInt(res.headers.get("version") || "0");
const userEmail = res.headers.get("userEmail") || undefined;
const createdAt: string = res.headers.get("createdAt") || "";
const status: ContentStatus = res.headers.get("status") as ContentStatus;
const publishedVersion: number | undefined = numberOrUndefined(
res.headers.get("publishedVersion"),
);
const commands: ContentCommand[] = JSON.parse(
res.headers.get("commands") || "[]",
);

const readableStream = new Readable({
read() {},
});

const contentPromise = processStream(
const contentPromise = processStream<ContentMetadata>(
reader,
decoder,
readableStream,
opts.statusStream,
).then((content) => {
).then(([content, metadata]) => {
if (!metadata) {
throw new Error("Metadata not found in stream");
}

return new Content(
client,
id,
title,
metadata.id,
metadata.title,
content,
commands,
version,
createdAt,
status,
cortex.name,
userEmail,
publishedVersion,
metadata.commands,
metadata.version,
metadata.createdAt,
metadata.status,
metadata.cortex,
metadata.userEmail,
metadata.publishedVersion || undefined,
);
});

Expand Down Expand Up @@ -317,38 +322,33 @@ export class Content {
const res = await this.apiClient.POST(`/content/${this._id}/refine`, {
prompt,
stream: true,
noContentInHeaders: true,
});
const reader = res.body!.getReader();
const decoder = new TextDecoder("utf-8");

const version: number = parseInt(res.headers.get("version") || "0");
const createdAt = res.headers.get("createdAt") || "";
const userEmail = res.headers.get("userEmail") || undefined;
const commands: ContentCommand[] = JSON.parse(
res.headers.get("commands") || "[]",
);
const status = res.headers.get("status") as ContentStatus;
const publishedVersion: number | undefined = numberOrUndefined(
res.headers.get("publishedVersion"),
);
this._version = version;
this._commands = commands;
this._createdAt = createdAt;
this._userEmail = userEmail;
this._status = status;
this._publishedVersion = publishedVersion;

const readableStream = new Readable({
read() {},
});

const contentPromise = processStream(
const contentPromise = processStream<ContentMetadata>(
reader,
decoder,
readableStream,
opts.statusStream,
).then((content) => {
).then(([content, metadata]) => {
if (!metadata) {
throw new Error("Metadata not found in stream");
}

this._content = content;
this._version = metadata.version;
this._commands = metadata.commands;
this._createdAt = metadata.createdAt;
this._userEmail = metadata.userEmail;
this._status = metadata.status;
this._publishedVersion = metadata.publishedVersion || undefined;
this._title = metadata.title;
return this;
});

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"url": "https://github.com/cortexclick/cortex-sdk",
"type": "git"
},
"version": "0.0.4",
"version": "0.0.5",
"type": "module",
"main": "index.js",
"scripts": {
Expand Down
10 changes: 7 additions & 3 deletions utils/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { Readable } from "stream";

export async function processStream(
export async function processStream<Metadata extends Record<string, unknown>>(
reader: ReadableStreamDefaultReader<Uint8Array>,
decoder: TextDecoder,
contentStream: Readable,
statusStream?: Readable,
): Promise<string> {
): Promise<[string, Metadata | undefined]> {
let buffer = "";
let fullContent = "";
let isStatusStreamOpen = true;

let metadata: Metadata | undefined = undefined;

const processNextChunk = async (): Promise<void> => {
const { done, value } = await reader.read();
if (done) {
Expand Down Expand Up @@ -43,6 +45,8 @@ export async function processStream(
// t:s = status message
else if (json.messageType === "status" && statusStream) {
statusStream.push(line + "\n");
} else if (json.messageType === "metadata") {
metadata = json.data;
}
} catch (e) {
console.error("Error parsing JSON:", e);
Expand All @@ -57,5 +61,5 @@ export async function processStream(
contentStream.emit("error", error);
});

return fullContent;
return [fullContent, metadata];
}