Skip to content

Commit

Permalink
fix circular dep
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Jan 9, 2025
1 parent b426878 commit c416e7f
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 112 deletions.
4 changes: 0 additions & 4 deletions dev/streamArchive.ts

This file was deleted.

5 changes: 0 additions & 5 deletions fetcher/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,3 @@ export abstract class MatchFetcher<T> {
public abstract checkAvailable(matchId: number): Promise<boolean>;
// Each might also have an internal save function that's not in the interface
}

export abstract class PlayerFetcher<T> {
public abstract readData(accountId: number): Promise<T>;
public abstract getOrFetchData(accountId: number): Promise<T>;
}
42 changes: 3 additions & 39 deletions fetcher/getArchivedData.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,7 @@
import { isDataComplete, redisCount } from '../util/utility';
import { redisCount } from '../util/utility';
import { matchArchive } from '../store/archive';
import db from '../store/db';
import { MatchFetcher } from './base';
import { getMatchDataFromBlobWithMetadata } from '../util/buildMatch';

/**
* Consolidates separate match data blobs and stores as a single blob in archive
* @param matchId
* @returns
*/
async function doArchiveMatchFromBlobs(matchId: number) {
// Don't read from archive when determining whether to archive
const [match, metadata] = await getMatchDataFromBlobWithMetadata(matchId, {
noArchive: true,
// TODO Remove noBlobStore once migrated
noBlobStore: true,
});
if (match && metadata?.has_parsed) {
// check data completeness with isDataComplete
if (!isDataComplete(match as ParsedMatch)) {
redisCount('incomplete_archive');
console.log('INCOMPLETE skipping match %s', matchId);
return;
}
// Archive the data since it's parsed. This might also contain api and gcdata
const blob = Buffer.from(JSON.stringify(match));
const result = await matchArchive.archivePut(matchId.toString(), blob);
if (result) {
// Mark the match archived
await db.raw(
`UPDATE parsed_matches SET is_archived = TRUE WHERE match_id = ?`,
[matchId],
);
// TODO delete blobs
// await deleteMatch(matchId);
console.log('ARCHIVE match %s, parsed', matchId);
}
return result;
}
}

class ArchivedFetcher extends MatchFetcher<ParsedMatch> {
readData = async (matchId: number): Promise<ParsedMatch | null> => {
Expand All @@ -63,7 +26,8 @@ class ArchivedFetcher extends MatchFetcher<ParsedMatch> {
return null;
};
getOrFetchData = async (matchId: number) => {
await doArchiveMatchFromBlobs(matchId);
// Circular dependency if we import
// await doArchiveMatchFromBlobs(matchId);
return { data: await this.readData(matchId), error: null };
};
checkAvailable = async (matchId: number) => {
Expand Down
61 changes: 0 additions & 61 deletions fetcher/getPlayerArchive.ts

This file was deleted.

76 changes: 75 additions & 1 deletion util/archiveUtil.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import config from '../config';
import { blobArchive } from '../store/archive';
import { blobArchive, matchArchive, playerArchive } from '../store/archive';
import cassandra from '../store/cassandra';
import QueryStream from 'pg-query-stream';
import { Client } from 'pg';
Expand All @@ -8,6 +8,10 @@ import db from '../store/db';
import { apiFetcher } from '../fetcher/getApiData';
import { gcFetcher } from '../fetcher/getGcData';
import { parsedFetcher } from '../fetcher/getParsedData';
import type { PutObjectCommandOutput } from '@aws-sdk/client-s3';
import { getFullPlayerMatchesWithMetadata } from './buildPlayer';
import { getMatchDataFromBlobWithMetadata } from './buildMatch';
import { isDataComplete, redisCount } from './utility';

async function processMatch(matchId: number) {
// Check if we should archive the blobs (should be parsed and not archived)
Expand Down Expand Up @@ -159,3 +163,73 @@ async function getTokenRange(size: number) {
);
return result.rows.map((row) => Number(row.match_id));
}

async function doArchivePlayerMatches(
accountId: number,
): Promise<PutObjectCommandOutput | null> {
if (!playerArchive) {
return null;
}
// Fetch our combined list of archive and current, selecting all fields
const full = await getFullPlayerMatchesWithMetadata(accountId);
const toArchive = full[0];
console.log(full[1]);
toArchive.forEach((m, i) => {
Object.keys(m).forEach((key) => {
if (m[key as keyof ParsedPlayerMatch] === null) {
// Remove any null values from the matches for storage
delete m[key as keyof ParsedPlayerMatch];
}
});
});
// TODO (howard) Make sure the new list is longer than the old list
// Make sure we're archiving at least 1 match
if (!toArchive.length) {
return null;
}
// Put the blob
return playerArchive.archivePut(
accountId.toString(),
Buffer.from(JSON.stringify(toArchive)),
);
// TODO (howard) delete the archived values from player_caches
// TODO (howard) keep the 20 highest match IDs for recentMatches
// TODO (howard) mark the user archived so we don't need to query archive on every request
// TODO (howard) add redis counts
}

/**
* Consolidates separate match data blobs and stores as a single blob in archive
* @param matchId
* @returns
*/
export async function doArchiveMatchFromBlobs(matchId: number) {
// Don't read from archive when determining whether to archive
const [match, metadata] = await getMatchDataFromBlobWithMetadata(matchId, {
noArchive: true,
// TODO Remove noBlobStore once migrated
noBlobStore: true,
});
if (match && metadata?.has_parsed) {
// check data completeness with isDataComplete
if (!isDataComplete(match as ParsedMatch)) {
redisCount('incomplete_archive');
console.log('INCOMPLETE skipping match %s', matchId);
return;
}
// Archive the data since it's parsed. This might also contain api and gcdata
const blob = Buffer.from(JSON.stringify(match));
const result = await matchArchive.archivePut(matchId.toString(), blob);
if (result) {
// Mark the match archived
await db.raw(
`UPDATE parsed_matches SET is_archived = TRUE WHERE match_id = ?`,
[matchId],
);
// TODO delete blobs
// await deleteMatch(matchId);
console.log('ARCHIVE match %s, parsed', matchId);
}
return result;
}
}
17 changes: 15 additions & 2 deletions util/buildPlayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { deserialize, pick, redisCount, redisCountDistinct } from './utility';
import { gzipSync, gunzipSync } from 'zlib';
import { cacheableCols } from '../routes/playerFields';
import { promises as fs } from 'fs';
import { playerArchiveFetcher } from '../fetcher/getPlayerArchive';
import { playerArchive } from '../store/archive';

export async function getPlayerMatches(
accountId: number,
Expand Down Expand Up @@ -63,7 +63,7 @@ export async function getPlayerMatchesWithMetadata(
// if dbLimit (recentMatches), don't use archive
const archivedMatches =
config.ENABLE_PLAYER_ARCHIVE && !queryObj.dbLimit
? await playerArchiveFetcher.readData(accountId)
? await readArchivedPlayerMatches(accountId)
: [];
const localLength = localMatches.length;
const archivedLength = archivedMatches.length;
Expand Down Expand Up @@ -255,3 +255,16 @@ export async function getFullPlayerMatchesWithMetadata(
projectAll: true,
});
}

async function readArchivedPlayerMatches(
accountId: number,
): Promise<ParsedPlayerMatch[]> {
if (!playerArchive) {
return [];
}
console.time('archive:' + accountId);
const blob = await playerArchive.archiveGet(accountId.toString());
const arr = blob ? JSON.parse(blob.toString()) : [];
console.timeEnd('archive:' + accountId);
return arr;
}

0 comments on commit c416e7f

Please sign in to comment.