Skip to content

Commit

Permalink
improve memory performance, disable sql logger in prod
Browse files Browse the repository at this point in the history
  • Loading branch information
jonluca committed Apr 19, 2023
1 parent 9862400 commit 2f718b0
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 113 deletions.
34 changes: 18 additions & 16 deletions electron-src/data/base-database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,27 @@ export class BaseDatabase<T> {
const dialect = new SqliteDialect({ database: sqliteDb });
const options: KyselyConfig = {
dialect,
log(event): void {
const isError = event.level === "error";
log: !debugLoggingEnabled
? ["error"]
: (event) => {
const isError = event.level === "error";

if (isError || debugLoggingEnabled) {
const { sql, parameters } = event.query;
if (isError || debugLoggingEnabled) {
const { sql, parameters } = event.query;

const { queryDurationMillis } = event;
const duration = queryDurationMillis.toFixed(2);
const params = (parameters as string[]) || [];
const formattedSql = format(sql, { params: params.map((l) => String(l)), language: "sqlite" });
if (event.level === "query") {
logger.debug(`[Query - ${duration}ms]:\n${formattedSql}\n`);
}
const { queryDurationMillis } = event;
const duration = queryDurationMillis.toFixed(2);
const params = (parameters as string[]) || [];
const formattedSql = format(sql, { params: params.map((l) => String(l)), language: "sqlite" });
if (event.level === "query") {
logger.debug(`[Query - ${duration}ms]:\n${formattedSql}\n`);
}

if (isError) {
logger.error(`[SQL Error - ${duration}ms]: ${event.error}\n\n${formattedSql}\n`);
}
}
},
if (isError) {
logger.error(`[SQL Error - ${duration}ms]: ${event.error}\n\n${formattedSql}\n`);
}
}
},
};

const db = new Kysely<T>(options);
Expand Down
4 changes: 2 additions & 2 deletions electron-src/data/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,14 @@ export class SQLDatabase extends BaseDatabase<MesssagesDatabase> {
};

getAllMessageTexts = async (limit?: number, offset?: number) => {
let query = this.baseAllMessageQuery().select(["text", "guid"]);
let query = this.baseAllMessageQuery().select("text").distinct();
if (limit) {
query = query.limit(limit);
}
if (offset) {
query = query.offset(offset);
}
return await query.execute();
return (await query.execute()).map((r) => r.text!);
};
countAllMessageTexts = async (distinct = false): Promise<number> => {
const query = this.baseAllMessageQuery().select((e) => {
Expand Down
11 changes: 8 additions & 3 deletions electron-src/data/embeddings-database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,19 @@ export class EmbeddingsDatabase extends BaseDatabase<EmbeddingsDb> {
const result = await this.db.selectFrom("embeddings").select("text").execute();
return result.map((l) => l.text!);
};
getExistingText = async (text: string[]): Promise<string[]> => {
await this.initialize();
const result = await this.db.selectFrom("embeddings").select("text").where("text", "in", text).execute();
return result.map((l) => l.text!);
};

insertEmbeddings = async (embeddings: { text: string; embedding: number[] }[]) => {
insertEmbeddings = async (embeddings: { input: string; values: number[] }[]) => {
await this.initialize();
const values = embeddings.map((e) => {
const typedBuffer = new Float32Array(e.embedding);
const typedBuffer = new Float32Array(e.values);
const buffer = Buffer.from(typedBuffer.buffer);
return {
text: e.text,
text: e.input,
embedding: buffer,
};
});
Expand Down
32 changes: 8 additions & 24 deletions electron-src/semantic-search/batch-utils.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,20 @@
import logger from "../utils/logger";
import type { OpenAIApi } from "openai";
import type { Chunk, SemanticSearchMetadata, SemanticSearchVector } from "./semantic-search";
import type { SemanticSearchVector } from "./semantic-search";
import { isRateLimitExceeded } from "./semantic-search";
import { pRateLimit } from "p-ratelimit";

export class BatchOpenAi {
private openai: OpenAIApi;
private batch: PendingVector[] = [];
private batchSize = 500; // create 500 embeddings at a time with the openai api
private batch: string[] = [];
private batchSize = 500;

constructor(openai: OpenAIApi) {
this.openai = openai;
}

async addPendingVectors(chunks: Chunk[], id: string) {
const pendingVectors = chunks.map(({ text, start, end }, index) => {
return {
id: `${id}:${index}`,
input: text,
metadata: {
index,
id,
text,
end,
start,
},
};
});
this.batch.push(...pendingVectors);
async addPendingVectors(chunks: string[]) {
this.batch.push(...chunks);

if (this.batch.length >= this.batchSize) {
return await this.flush();
Expand All @@ -48,7 +35,6 @@ export class BatchOpenAi {
interface PendingVector {
id: string;
input: string;
metadata: SemanticSearchMetadata;
}

export const OPENAI_EMBEDDING_MODEL = "text-embedding-ada-002";
Expand All @@ -60,16 +46,15 @@ const rateLimit = pRateLimit({
rate: 3500, // 3500 calls per minute
concurrency: 60, // no more than 60 running at once
});
const embeddingsFromPendingVectors = async (pendingVectors: PendingVector[], openai: OpenAIApi) => {
const embeddingsFromPendingVectors = async (pendingVectors: string[], openai: OpenAIApi) => {
const vectors: SemanticSearchVector[] = [];

let timeout = 10_000;
while (pendingVectors.length) {
try {
const input = pendingVectors.map((l) => l.input);
const { data: embed } = await rateLimit(() =>
openai.createEmbedding({
input,
input: pendingVectors,
model: OPENAI_EMBEDDING_MODEL,
}),
);
Expand All @@ -78,9 +63,8 @@ const embeddingsFromPendingVectors = async (pendingVectors: PendingVector[], ope
const embedding = embeddings[i].embedding;
if (embedding) {
const vector: SemanticSearchVector = {
id: pendingVectors[i].id,
metadata: pendingVectors[i].metadata,
values: embedding || [],
input: pendingVectors[i],
};
vectors.push(vector);
}
Expand Down
8 changes: 4 additions & 4 deletions electron-src/semantic-search/semantic-search-stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ import { GPT4Tokenizer } from "gpt4-tokenizer";

const tokenizer = new GPT4Tokenizer({ type: "gpt3" });

export const getStatsForText = (text: { text: string | null }[]) => {
export const getStatsForText = (text: string[]) => {
let totalTokens = 0;
const uniqueText = new Set<string>();
for (const line of text) {
if (line.text && !uniqueText.has(line.text)) {
uniqueText.add(line.text);
const tokens = tokenizer.estimateTokenCount(line.text);
if (line && !uniqueText.has(line)) {
uniqueText.add(line);
const tokens = tokenizer.estimateTokenCount(line);
totalTokens += tokens;
}
}
Expand Down
84 changes: 25 additions & 59 deletions electron-src/semantic-search/semantic-search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,16 @@ import { handleIpc } from "../ipc/ipc";
import logger from "../utils/logger";
import { BatchOpenAi, OPENAI_EMBEDDING_MODEL } from "./batch-utils";
import pMap from "p-map";
import { uniqBy } from "lodash-es";

export interface SemanticSearchMetadata {
id: string;
text: string;
start: number;
end: number;
[key: string]: any;
}

export interface SemanticSearchVector {
id: string;
input: string;
values: number[];
metadata: SemanticSearchMetadata;
}

export interface PostContent {
chunks: Chunk[];
}

export interface Chunk {
text: string;
start: number;
end: number;
}
const tokenizer = new GPT4Tokenizer({ type: "gpt3" });
const debugLoggingEnabled = process.env.DEBUG_LOGGING === "true";

export const MAX_INPUT_TOKENS = 1000;
export const MAX_INPUT_TOKENS = 7000;

export function isRateLimitExceeded(err: unknown): boolean {
return (
Expand All @@ -50,26 +31,20 @@ export function isRateLimitExceeded(err: unknown): boolean {
let numCompleted = 0;

const splitIntoChunks = (content: string, maxInputTokens = MAX_INPUT_TOKENS) => {
const chunks: Chunk[] = [];

let start = 0;

const chunked = tokenizer.chunkText(content, maxInputTokens);

for (const chunk of chunked) {
chunks.push({
start,
end: start + chunk.text.length,
text: chunk.text,
});

start += chunk.text.length + 1;
if (content.length < 2000) {
return [content];
}
const chunks: string[] = [];

const encoded = tokenizer.encode(content);
for (let i = 0; i < encoded.length; i += maxInputTokens) {
const chunk = encoded.slice(i, i + maxInputTokens);
chunks.push(tokenizer.decode(chunk));
}
return chunks;
};

const PAGE_SIZE = 100_000;
const PAGE_SIZE = 30_000;

export const createEmbeddings = async ({ openAiKey }: { openAiKey: string }) => {
logger.info("Creating embeddings");
Expand All @@ -78,8 +53,6 @@ export const createEmbeddings = async ({ openAiKey }: { openAiKey: string }) =>
const messageCount = await dbWorker.worker.countAllMessageTexts();

const pages = Math.ceil(messageCount / PAGE_SIZE);
const existingText = await dbWorker.embeddingsWorker.getAllText();
const set = new Set(existingText);

const configuration = new Configuration({
apiKey: openAiKey,
Expand All @@ -88,21 +61,17 @@ export const createEmbeddings = async ({ openAiKey }: { openAiKey: string }) =>
const openai = new OpenAIApi(configuration);

const batchOpenai = new BatchOpenAi(openai);
const processMessage = async (message: Awaited<ReturnType<typeof dbWorker.worker.getAllMessageTexts>>[number]) => {
const processMessage = async (message: string) => {
try {
if (!message.text) {
if (!message) {
return;
}

const chunks = splitIntoChunks(message.text);
const itemEmbeddings = await batchOpenai.addPendingVectors(chunks, message.guid);

const chunks = splitIntoChunks(message);
const itemEmbeddings = await batchOpenai.addPendingVectors(chunks);
if (itemEmbeddings.length) {
try {
logger.info(`Inserting ${itemEmbeddings.length} vectors`);
const embeddings = itemEmbeddings.map((l) => ({ embedding: l.values, text: l.metadata.text }));
await dbWorker.embeddingsWorker.insertEmbeddings(embeddings);
logger.info(`Inserted ${itemEmbeddings.length} vectors`);
await dbWorker.embeddingsWorker.insertEmbeddings(itemEmbeddings);
numCompleted += itemEmbeddings.length;
} catch (e) {
logger.error(e);
Expand All @@ -117,17 +86,14 @@ export const createEmbeddings = async ({ openAiKey }: { openAiKey: string }) =>
for (let i = 0; i < pages; i++) {
const messages = await dbWorker.worker.getAllMessageTexts(PAGE_SIZE, i * PAGE_SIZE);
logger.info(`Got ${messages.length} messages - ${i + 1} of ${pages}`);

numCompleted = existingText.length;
const notParsed = messages.filter((m) => m.text && !set.has(m.text));

const uniqueMessages = uniqBy(notParsed, "text");

await pMap(uniqueMessages, processMessage, { concurrency: 100 });

if (debugLoggingEnabled) {
logger.info(`Completed ${numCompleted} of ${messageCount} (${Math.round((numCompleted / messageCount) * 100)}%)`);
}
const now = performance.now();
const existingText = await dbWorker.embeddingsWorker.getExistingText(messages);
logger.info(`Got existing text in ${performance.now() - now}ms`);
const set = new Set(existingText);
numCompleted += existingText.length;
const notParsed = messages.filter((m) => !set.has(m));
await pMap(notParsed, processMessage, { concurrency: 50 });
logger.info(`Completed ${numCompleted} of ${messageCount} (${Math.round((numCompleted / messageCount) * 100)}%)`);
}
logger.info("Done creating embeddings");
};
Expand Down Expand Up @@ -159,7 +125,7 @@ export async function semanticQuery({ queryText, openAiKey }: SemanticQueryOpts)
return [];
}
// save embedding
await dbWorker.embeddingsWorker.insertEmbeddings([{ embedding, text: queryText }]);
await dbWorker.embeddingsWorker.insertEmbeddings([{ values: embedding, input: queryText }]);
floatEmbedding = new Float32Array(embedding);
}

Expand Down
6 changes: 2 additions & 4 deletions electron-src/utils/flags.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import type { App } from "electron";
export const addFlags = (app: App) => {
process.env.UV_THREADPOOL_SIZE = "128";
process.env.NODE_OPTIONS = "--max-old-space-size=32678";
app.commandLine.appendSwitch(
"enable-features",
"HardwareMediaKeyHandling,MediaSessionService,WebGPU,WebGPUDeveloperFeatures,WebGPUImportTexture,CSSVideoDynamicRangeMediaQueries,ExtraWebGLVideoTextureMetadata",
);
app.commandLine.appendSwitch("ignore-connections-limit", "localhost");
app.commandLine.appendArgument("--enable-experimental-web-platform-features");
app.commandLine.appendSwitch(
'--js-flags="--max-old-space-size=32678 --max-semi-space-size=32678 --use-largepages=silent"',
);
app.commandLine.appendSwitch('--js-flags="--max-old-space-size=32678');
app.commandLine.appendSwitch("--remote-allow-origins=*");
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"author": "JonLuca DeCaro <[email protected]>",
"main": "build/electron-src/index.js",
"name": "MiMessage",
"version": "1.1.0",
"version": "1.1.1",
"productName": "Mimessage",
"description": "Apple Messages UI alternative, with export, search, and more.",
"scripts": {
Expand Down

0 comments on commit 2f718b0

Please sign in to comment.