diff --git a/src/client/api/email.test.ts b/src/client/api/email.test.ts new file mode 100644 index 0000000..9e1b3a8 --- /dev/null +++ b/src/client/api/email.test.ts @@ -0,0 +1,103 @@ +import { describe, test } from "bun:test"; +import { Client } from "../client"; +import { resend } from "./email"; +import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "../workflow/test-utils"; +import { nanoid } from "../utils"; + +describe("email", () => { + const qstashToken = nanoid(); + const resendToken = nanoid(); + const client = new Client({ baseUrl: MOCK_QSTASH_SERVER_URL, token: qstashToken }); + + test("should use resend", async () => { + await mockQStashServer({ + execute: async () => { + await client.publishJSON({ + api: { + name: "email", + provider: resend({ token: resendToken }), + }, + body: { + from: "Acme ", + to: ["delivered@resend.dev"], + subject: "hello world", + html: "

it works!

", + }, + }); + }, + responseFields: { + body: { messageId: "msgId" }, + status: 200, + }, + receivesRequest: { + method: "POST", + token: qstashToken, + url: "http://localhost:8080/v2/publish/https://api.resend.com/emails", + body: { + from: "Acme ", + to: ["delivered@resend.dev"], + subject: "hello world", + html: "

it works!

", + }, + headers: { + authorization: `Bearer ${qstashToken}`, + "upstash-forward-authorization": resendToken, + }, + }, + }); + }); + + test("should use resend with batch", async () => { + await mockQStashServer({ + execute: async () => { + await client.publishJSON({ + api: { + name: "email", + provider: resend({ token: resendToken, batch: true }), + }, + body: [ + { + from: "Acme ", + to: ["foo@gmail.com"], + subject: "hello world", + html: "

it works!

", + }, + { + from: "Acme ", + to: ["bar@outlook.com"], + subject: "world hello", + html: "

it works!

", + }, + ], + }); + }, + responseFields: { + body: { messageId: "msgId" }, + status: 200, + }, + receivesRequest: { + method: "POST", + token: qstashToken, + url: "http://localhost:8080/v2/publish/https://api.resend.com/emails/batch", + body: [ + { + from: "Acme ", + to: ["foo@gmail.com"], + subject: "hello world", + html: "

it works!

", + }, + { + from: "Acme ", + to: ["bar@outlook.com"], + subject: "world hello", + html: "

it works!

", + }, + ], + headers: { + authorization: `Bearer ${qstashToken}`, + "upstash-forward-authorization": resendToken, + }, + }, + }); + }); +}); diff --git a/src/client/api/email.ts b/src/client/api/email.ts new file mode 100644 index 0000000..5de04d1 --- /dev/null +++ b/src/client/api/email.ts @@ -0,0 +1,19 @@ +export type EmailProviderReturnType = { + owner: "resend"; + baseUrl: "https://api.resend.com/emails" | "https://api.resend.com/emails/batch"; + token: string; +}; + +export const resend = ({ + token, + batch = false, +}: { + token: string; + batch?: boolean; +}): EmailProviderReturnType => { + return { + owner: "resend", + baseUrl: `https://api.resend.com/emails${batch ? "/batch" : ""}`, + token, + }; +}; diff --git a/src/client/api/index.ts b/src/client/api/index.ts new file mode 100644 index 0000000..0f81b62 --- /dev/null +++ b/src/client/api/index.ts @@ -0,0 +1 @@ +export { resend } from "./email"; diff --git a/src/client/api/utils.ts b/src/client/api/utils.ts new file mode 100644 index 0000000..ebaed9f --- /dev/null +++ b/src/client/api/utils.ts @@ -0,0 +1,8 @@ +import type { PublishRequest } from "../client"; + +export const appendAPIOptions = (request: PublishRequest, headers: Headers) => { + if (request.api?.name === "email") { + headers.set("Authorization", request.api.provider.token); + request.method = request.method ?? "POST"; + } +}; diff --git a/src/client/client.ts b/src/client/client.ts index a05785e..d926928 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,3 +1,5 @@ +import { appendAPIOptions } from "./api/utils"; +import type { EmailProviderReturnType } from "./api/email"; import { DLQ } from "./dlq"; import type { Duration } from "./duration"; import { HttpClient, type Requester, type RetryConfig } from "./http"; @@ -189,6 +191,7 @@ export type PublishRequest = { provider?: ProviderReturnType; analytics?: { name: "helicone"; token: string }; }; + topic?: never; /** * Use a callback url to forward the response of your destination server to your callback url. * @@ -197,14 +200,28 @@ export type PublishRequest = { * @default undefined */ callback: string; + } + | { + url?: never; + urlGroup?: never; + /** + * The api endpoint the request should be sent to. + */ + api: { + name: "email"; + provider: EmailProviderReturnType; + }; topic?: never; + callback?: string; } | { url?: never; urlGroup?: never; - api: never; + api?: never; /** * Deprecated. The topic the message should be sent to. Same as urlGroup + * + * @deprecated */ topic?: string; /** @@ -370,6 +387,8 @@ export class Client { ensureCallbackPresent(request); //If needed, this allows users to directly pass their requests to any open-ai compatible 3rd party llm directly from sdk. appendLLMOptionsIfNeeded(request, headers, this.http); + // append api options + appendAPIOptions(request, headers); // @ts-expect-error it's just internal const response = await this.publish({ @@ -434,6 +453,11 @@ export class Client { // eslint-disable-next-line @typescript-eslint/ban-ts-comment //@ts-ignore this is required otherwise message header prevent ts to compile appendLLMOptionsIfNeeded(message, message.headers, this.http); + + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + //@ts-ignore this is required otherwise message header prevent ts to compile + appendAPIOptions(message, message.headers); + (message.headers as Headers).set("Content-Type", "application/json"); } diff --git a/src/client/llm/utils.ts b/src/client/llm/utils.ts index c0fe706..7406c9d 100644 --- a/src/client/llm/utils.ts +++ b/src/client/llm/utils.ts @@ -8,7 +8,7 @@ export function appendLLMOptionsIfNeeded< // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-parameters TRequest extends PublishRequest = PublishRequest, >(request: TRequest, headers: Headers, http: Requester) { - if (!request.api) return; + if (request.api?.name === "email" || !request.api) return; const provider = request.api.provider; const analytics = request.api.analytics; diff --git a/src/client/queue.ts b/src/client/queue.ts index 8a3c705..11f9e32 100644 --- a/src/client/queue.ts +++ b/src/client/queue.ts @@ -1,3 +1,4 @@ +import { appendAPIOptions } from "./api/utils"; import type { PublishRequest, PublishResponse } from "./client"; import type { Requester } from "./http"; import { appendLLMOptionsIfNeeded, ensureCallbackPresent } from "./llm/utils"; @@ -140,6 +141,8 @@ export class Queue { // If needed, this allows users to directly pass their requests to any open-ai compatible 3rd party llm directly from sdk. appendLLMOptionsIfNeeded(request, headers, this.http); + appendAPIOptions(request, headers); + const response = await this.enqueue({ ...request, body: JSON.stringify(request.body), diff --git a/src/client/utils.ts b/src/client/utils.ts index b096df5..57689ab 100644 --- a/src/client/utils.ts +++ b/src/client/utils.ts @@ -1,4 +1,5 @@ import type { PublishRequest } from "./client"; +import { QstashError } from "./error"; const isIgnoredHeader = (header: string) => { const lowerCaseHeader = header.toLowerCase(); @@ -79,7 +80,18 @@ export function processHeaders(request: PublishRequest) { export function getRequestPath( request: Pick ): string { - return request.url ?? request.urlGroup ?? request.topic ?? `api/${request.api?.name}`; + // eslint-disable-next-line @typescript-eslint/no-deprecated + const nonApiPath = request.url ?? request.urlGroup ?? request.topic; + if (nonApiPath) return nonApiPath; + + // return llm api + if (request.api?.name === "llm") return `api/${request.api.name}`; + // return email api + if (request.api?.name === "email") { + return request.api.provider.baseUrl; + } + + throw new QstashError(`Failed to infer request path for ${JSON.stringify(request)}`); } const NANOID_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_"; @@ -110,11 +122,19 @@ export function decodeBase64(base64: string) { return new TextDecoder().decode(intArray); } catch (error) { // this error should never happen essentially. It's only a failsafe - console.warn( - `Upstash Qstash: Failed while decoding base64 "${base64}".` + - ` Decoding with atob and returning it instead. ${error}` - ); - return atob(base64); + try { + const result = atob(base64); + console.warn( + `Upstash QStash: Failed while decoding base64 "${base64}".` + + ` Decoding with atob and returning it instead. ${error}` + ); + return result; + } catch (error) { + console.warn( + `Upstash QStash: Failed to decode base64 "${base64}" with atob. Returning it as it is. ${error}` + ); + return base64; + } } } diff --git a/src/index.ts b/src/index.ts index cef6de9..15c00f9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,3 +10,4 @@ export { decodeBase64 } from "./client/utils"; export * from "./client/llm/chat"; export * from "./client/llm/types"; export * from "./client/llm/providers"; +export * from "./client/api";