Skip to content

Commit

Permalink
fix: use async fs operations to avoid blocking the main program loop
Browse files Browse the repository at this point in the history
  • Loading branch information
zone117x committed Oct 31, 2024
1 parent 0faf121 commit 9155853
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 19 deletions.
50 changes: 35 additions & 15 deletions components/client/typescript/src/predicates.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as fs from 'fs';
import * as fs from 'fs/promises';
import * as path from 'path';
import { logger } from './util/logger';
import {
Expand All @@ -17,17 +17,32 @@ const RegisteredPredicates = new Map<string, Predicate>();

const CompiledPredicateSchema = TypeCompiler.Compile(PredicateSchema);

// Async version of fs.existsSync
async function pathExists(path: string): Promise<boolean> {
try {
await fs.access(path);
return true;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return false;
}
throw error; // Re-throw other errors (e.g., permission issues)
}
}

/**
* Looks on disk and returns a map of registered Predicates, where the key is the predicate `name`
* as defined by the user.
*/
export function recallPersistedPredicatesFromDisk(basePath: string): Map<string, Predicate> {
export async function recallPersistedPredicatesFromDisk(
basePath: string
): Promise<Map<string, Predicate>> {
RegisteredPredicates.clear();
try {
if (!fs.existsSync(basePath)) return RegisteredPredicates;
for (const file of fs.readdirSync(basePath)) {
if (!(await pathExists(basePath))) return RegisteredPredicates;
for (const file of await fs.readdir(basePath)) {
if (file.endsWith('.json')) {
const text = fs.readFileSync(path.join(basePath, file), 'utf-8');
const text = await fs.readFile(path.join(basePath, file), 'utf-8');
const predicate = JSON.parse(text) as JSON;
if (CompiledPredicateSchema.Check(predicate)) {
logger.info(
Expand All @@ -44,11 +59,11 @@ export function recallPersistedPredicatesFromDisk(basePath: string): Map<string,
return RegisteredPredicates;
}

export function savePredicateToDisk(basePath: string, predicate: Predicate) {
export async function savePredicateToDisk(basePath: string, predicate: Predicate) {
const predicatePath = `${basePath}/predicate-${encodeURIComponent(predicate.name)}.json`;
try {
fs.mkdirSync(basePath, { recursive: true });
fs.writeFileSync(predicatePath, JSON.stringify(predicate, null, 2));
await fs.mkdir(basePath, { recursive: true });
await fs.writeFile(predicatePath, JSON.stringify(predicate, null, 2));
logger.info(
`ChainhookEventObserver persisted predicate '${predicate.name}' (${predicate.uuid}) to disk`
);
Expand All @@ -60,13 +75,18 @@ export function savePredicateToDisk(basePath: string, predicate: Predicate) {
}
}

function deletePredicateFromDisk(basePath: string, predicate: Predicate) {
async function deletePredicateFromDisk(basePath: string, predicate: Predicate) {
const predicatePath = `${basePath}/predicate-${encodeURIComponent(predicate.name)}.json`;
if (fs.existsSync(predicatePath)) {
fs.rmSync(predicatePath);
try {
await fs.rm(predicatePath);
logger.info(
`ChainhookEventObserver deleted predicate '${predicate.name}' (${predicate.uuid}) from disk`
);
} catch (error: unknown) {
// ignore if the file doesn't exist
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
logger.error(error, `Failed to delete predicate`);
}
}
}

Expand Down Expand Up @@ -165,7 +185,7 @@ async function registerPredicate(
logger.info(
`ChainhookEventObserver registered '${newPredicate.name}' predicate (${newPredicate.uuid})`
);
savePredicateToDisk(observer.predicate_disk_file_path, newPredicate);
await savePredicateToDisk(observer.predicate_disk_file_path, newPredicate);
RegisteredPredicates.set(newPredicate.name, newPredicate);
} catch (error) {
logger.error(error, `ChainhookEventObserver unable to register predicate`);
Expand All @@ -190,7 +210,7 @@ async function removePredicate(
throwOnError: true,
});
logger.info(`ChainhookEventObserver removed predicate '${predicate.name}' (${predicate.uuid})`);
deletePredicateFromDisk(observer.predicate_disk_file_path, predicate);
await deletePredicateFromDisk(observer.predicate_disk_file_path, predicate);
} catch (error) {
logger.error(error, `ChainhookEventObserver unable to deregister predicate`);
}
Expand All @@ -207,7 +227,7 @@ export async function registerAllPredicatesOnObserverReady(
logger.info(`ChainhookEventObserver does not have predicates to register`);
return;
}
const diskPredicates = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const diskPredicates = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
for (const predicate of predicates)
await registerPredicate(predicate, diskPredicates, observer, chainhook);
}
Expand All @@ -217,7 +237,7 @@ export async function removeAllPredicatesOnObserverClose(
observer: EventObserverOptions,
chainhook: ChainhookNodeOptions
) {
const diskPredicates = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const diskPredicates = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
if (diskPredicates.size === 0) {
logger.info(`ChainhookEventObserver does not have predicates to close`);
return;
Expand Down
8 changes: 4 additions & 4 deletions components/client/typescript/tests/predicates.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe('predicates', () => {
await server.start([testPredicate], async () => {});

expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true);
const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const disk = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const storedPredicate = disk.get('test');
expect(storedPredicate).not.toBeUndefined();
expect(storedPredicate?.name).toBe(testPredicate.name);
Expand All @@ -102,8 +102,8 @@ describe('predicates', () => {
});

describe('pre-stored', () => {
beforeEach(() => {
savePredicateToDisk(observer.predicate_disk_file_path, {
beforeEach(async () => {
await savePredicateToDisk(observer.predicate_disk_file_path, {
uuid: 'e2777d77-473a-4c1d-9012-152deb36bf4c',
name: 'test',
version: 1,
Expand Down Expand Up @@ -164,7 +164,7 @@ describe('predicates', () => {

mockAgent.assertNoPendingInterceptors();
expect(fs.existsSync(`${observer.predicate_disk_file_path}/predicate-test.json`)).toBe(true);
const disk = recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const disk = await recallPersistedPredicatesFromDisk(observer.predicate_disk_file_path);
const storedPredicate = disk.get('test');
// Should have a different uuid
expect(storedPredicate?.uuid).not.toBe('e2777d77-473a-4c1d-9012-152deb36bf4c');
Expand Down

0 comments on commit 9155853

Please sign in to comment.