Skip to content

Commit

Permalink
async getGcData
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 1, 2023
1 parent 3840b22 commit bf3adcc
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 142 deletions.
11 changes: 9 additions & 2 deletions dev/archiveTest.mjs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { archivePut } from '../store/archive.js';
import { getArchivedMatch, getMatchData, getPlayerMatchData } from '../store/queries.js';
import {
getArchivedMatch,
getMatchData,
getPlayerMatchData,
} from '../store/queries.js';

// Read some match data
const match = {...await getMatchData(7465883253), players: await getPlayerMatchData(7465883253)};
const match = {
...(await getMatchData(7465883253)),
players: await getPlayerMatchData(7465883253),
};
const blob = Buffer.from(JSON.stringify(match));

// Archive it
Expand Down
2 changes: 1 addition & 1 deletion store/archive.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async function archivePut(key, blob) {
data.length
);
return result;
} catch(e) {
} catch (e) {
console.error('[ARCHIVE] put error:', e.Code);
return null;
}
Expand Down
11 changes: 5 additions & 6 deletions store/buildMatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,16 @@ async function getMatch(matchId) {
// if so we prefer the archive since Cassandra may contain an unparsed version
const isParsed = Boolean(
(
await db.raw(
'select match_id from parsed_matches where match_id = ?',
[matchId]
)
await db.raw('select match_id from parsed_matches where match_id = ?', [
matchId,
])
).rows[0]
);
let match = null;
if (isParsed) {
match = await getArchivedMatch(matchId) || await getMatchData();
match = (await getArchivedMatch(matchId)) || (await getMatchData());
} else {
match = await getMatchData(matchId) || await getArchivedMatch(matchId);
match = (await getMatchData(matchId)) || (await getArchivedMatch(matchId));
}
if (!match) {
// if we still don't have it, try backfilling it from Steam API and then check again
Expand Down
7 changes: 5 additions & 2 deletions store/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,8 @@ function upsert(db, table, row, conflict, cb) {
});
}

const upsertPromise = util.promisify(upsert);

function insertPlayer(db, player, indexPlayer, cb) {
if (player.steamid) {
// this is a login, compute the account_id from steamid
Expand Down Expand Up @@ -1343,7 +1345,7 @@ function insertMatch(match, options, cb) {
}

function upsertMatchBlobs(cb) {
// TODO (howard) this function is meant to eventually replace the cassandra match/player_match tables
// TODO this function is meant to eventually replace the cassandra match/player_match tables
// TODO remove pgroup from this since we don't actually need it stored

// It's a temporary store (postgres table) holding data for each possible stage of ingestion, api/gcdata/replay/meta etc.
Expand Down Expand Up @@ -1698,14 +1700,15 @@ async function getArchivedMatch(matchId) {
utility.redisCount(redis, 'match_archive_read');
return result;
}
} catch(e) {
} catch (e) {
console.error(e);
}
return null;
}

module.exports = {
upsert,
upsertPromise,
insertPlayer,
bulkIndexPlayer,
insertMatchPromise,
Expand Down
24 changes: 12 additions & 12 deletions svc/backupscanner.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ function processMatch(matchId, cb) {
}
const match = body.result;
try {
await insertMatchPromise(match, {
type: 'api',
origin: 'scanner',
skipCounts: false,
});
// Set with long expiration (1 month) to avoid picking up the same matches again
// If GetMatchHistoryBySequenceNum is out for a long time, this might be a problem
redis.setex(`scanner_insert:${match.match_id}`, 3600 * 24 * 30, 1);
cb();
} catch(e) {
cb(e);
}
await insertMatchPromise(match, {
type: 'api',
origin: 'scanner',
skipCounts: false,
});
// Set with long expiration (1 month) to avoid picking up the same matches again
// If GetMatchHistoryBySequenceNum is out for a long time, this might be a problem
redis.setex(`scanner_insert:${match.match_id}`, 3600 * 24 * 30, 1);
cb();
} catch (e) {
cb(e);
}
}
);
});
Expand Down
9 changes: 7 additions & 2 deletions svc/gcdata.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ const utility = require('../util/utility');
const { getRetrieverArr } = utility;
const retrieverArr = getRetrieverArr();

function processGcData(job, cb) {
async function processGcData(job, cb) {
job.useGcDataArr = true;
getGcData(job, cb);
try {
await getGcData(job);
cb();
} catch (e) {
cb(e);
}
}

queue.runQueue(
Expand Down
74 changes: 25 additions & 49 deletions svc/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Stream is run through a series of processors to count/aggregate it into a single object
* This object is passed to insertMatch to persist the data into the database.
* */
const cp = require('child_process');
const { exec } = require('child_process');
const async = require('async');
const numCPUs = require('os').cpus().length;
const express = require('express');
Expand All @@ -14,77 +14,53 @@ const getGcData = require('../util/getGcData');
const config = require('../config');
const queue = require('../store/queue');
const queries = require('../store/queries');
const { promisify } = require('util');

const { insertMatchPromise } = queries;
const { buildReplayUrl } = utility;
const execPromise = promisify(exec);

const app = express();
app.get('/healthz', (req, res) => {
res.end('ok');
});
app.listen(config.PORT || config.PARSER_PORT);

function runParse(match, job, cb) {
let { url } = match;
async function runParse(match, url) {
if (config.NODE_ENV === 'test') {
url = `https://odota.github.io/testfiles/${match.match_id}_1.dem`;
}
console.log(new Date(), url);
cp.exec(
const {stdout} = await execPromise(
`curl --max-time 180 --fail ${url} | ${
url && url.slice(-3) === 'bz2' ? 'bunzip2' : 'cat'
} | curl -X POST -T - ${
config.PARSER_HOST
} | node processors/createParsedDataBlob.js ${match.match_id}`,
{ shell: true, maxBuffer: 10 * 1024 * 1024 },
async (err, stdout) => {
if (err) {
return cb(err);
}
const result = { ...JSON.parse(stdout), ...match };
try {
await insertMatchPromise(result, {
type: 'parsed',
skipParse: true,
});
cb();
} catch (e) {
cb(e);
}
}
{ shell: true, maxBuffer: 10 * 1024 * 1024 }
);
const result = { ...JSON.parse(stdout), ...match };
await insertMatchPromise(result, {
type: 'parsed',
skipParse: true,
});
}

function parseProcessor(job, cb) {
async function parseProcessor(job, cb) {
const match = job;
async.series(
{
getDataSource(cb) {
getGcData(match, (err, result) => {
if (err) {
return cb(err);
}
match.url = buildReplayUrl(
result.match_id,
result.cluster,
result.replay_salt
);
return cb(err);
});
},
runParse(cb) {
runParse(match, job, cb);
},
},
(err) => {
if (err) {
console.error(err.stack || err);
} else {
console.log('completed parse of match %s', match.match_id);
}
return cb(err, match.match_id);
}
);
try {
const result = await getGcData(match);
const url = buildReplayUrl(
result.match_id,
result.cluster,
result.replay_salt
);
await runParse(match, url);
console.log('[PARSER] completed parse of match %s', match.match_id);
cb(null, match.match_id);
} catch (e) {
cb(e);
}
}

queue.runReliableQueue(
Expand Down
Loading

0 comments on commit bf3adcc

Please sign in to comment.