Skip to content

Commit

Permalink
move some inserts to specific workers
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 2, 2023
1 parent 6eabfa0 commit c463907
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 42 deletions.
2 changes: 1 addition & 1 deletion config.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* File managing configuration for the applicaption
* File managing configuration for the application
* */
const dotenv = require('dotenv');
const fs = require('fs');
Expand Down
5 changes: 5 additions & 0 deletions store/getGcData.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ async function getGcDataFromRetriever(match) {
type: 'gcdata',
skipParse: true,
});
// Update series id and type for pro match
await db.raw(
'UPDATE matches SET series_id = ?, series_type = ? WHERE match_id = ?',
[matchToInsert.series_id, matchToInsert.series_type, match.match_id]
).asCallback(cb);
// Persist GC data to database
await upsertPromise(db, 'match_gcdata', gcdata, {
match_id: match.match_id,
Expand Down
62 changes: 21 additions & 41 deletions store/queries.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,11 @@ function insertMatch(match, options, cb) {
5288: 'track',
5368: 'greevils_greed',
};
// We currently can call this function from many places
// There is a type to indicate source: api, gcdata, parsed
// Also an origin to indicate the context: scanner (fresh match) or request
function preprocess(cb) {
// We always do this
// don't insert anonymous account id
if (players) {
players.forEach((p) => {
Expand Down Expand Up @@ -1009,35 +1013,11 @@ function insertMatch(match, options, cb) {
}
cb();
}
function updateMatchSeriesType(cb) {
if (options.type === 'gcdata') {
db.raw(
'UPDATE matches SET series_id = ?, series_type = ? WHERE match_id = ?',
[match.series_id, match.series_type, match.match_id]
).asCallback(cb);
} else {
cb();
}
}
function upsertParsedMatch(cb) {
if (match.version) {
return upsert(
db,
'parsed_matches',
{
match_id: match.match_id,
},
{
match_id: match.match_id,
},
cb
);
}
return cb();
}
async function upsertMatchPostgres(cb) {
// Insert the pro match data: We do this if api or parser
if (options.type === 'api' && !utility.isProMatch(match)) {
// Check if the match is legit if this is API insert
// Check whether we care about this match for pro purposes
// We need the basic match data to run the check, so only do it if type is api
return cb();
}
// Check if leagueid is premium/professional
Expand Down Expand Up @@ -1191,6 +1171,8 @@ function insertMatch(match, options, cb) {
});
}
function getAverageRank(cb) {
// Only fetch the average_rank if this is a fresh match since otherwise it won't be accurate
// We currently only store this in the player_caches table, not in the match itself
if (options.origin === 'scanner') {
getMatchRankTier(match, (err, avg) => {
match.average_rank = avg || null;
Expand All @@ -1201,8 +1183,7 @@ function insertMatch(match, options, cb) {
}
}
function upsertMatchCassandra(cb) {
// NOTE parsed insert doesn't have original match info so can't archive here
// unless we insert then read it back from cassandra
// We do this regardless of type (with different sets of fields)
return cleanRowCassandra(cassandra, 'matches', match, (err, match) => {
if (err) {
return cb(err);
Expand Down Expand Up @@ -1281,6 +1262,8 @@ function insertMatch(match, options, cb) {
cb();
}
function updateCassandraPlayerCaches(cb) {
// Add the 10 player_match rows indexed by player
// We currently do this on all types
const copy = createMatchCopy(match, players);
return insertPlayerCache(copy, cb);
}
Expand Down Expand Up @@ -1308,6 +1291,7 @@ function insertMatch(match, options, cb) {
return cb();
}
function clearRedisMatch(cb) {
// Clear out the Redis caches, we do this regardless of insert type
redis.del(`match:${match.match_id}`, cb);
}
function clearRedisPlayer(cb) {
Expand All @@ -1326,6 +1310,7 @@ function insertMatch(match, options, cb) {
);
}
function decideCounts(cb) {
// We only do this if fresh match
if (options.skipCounts) {
return cb();
}
Expand All @@ -1334,16 +1319,8 @@ function insertMatch(match, options, cb) {
}
return cb();
}
function decideScenarios(cb) {
if (
options.type === 'parsed' &&
match.match_id % 100 < config.SCENARIOS_SAMPLE_PERCENT
) {
return redis.rpush('scenariosQueue', match.match_id, cb);
}
return cb();
}
function decideBenchmarks(cb) {
// We only do this if fresh match
if (
options.origin === 'scanner' &&
match.match_id % 100 < config.BENCHMARKS_SAMPLE_PERCENT
Expand All @@ -1353,6 +1330,7 @@ function insertMatch(match, options, cb) {
return cb();
}
function decideMmr(cb) {
// We only do this if fresh match and ranked
async.each(
match.players,
(p, cb) => {
Expand All @@ -1379,6 +1357,7 @@ function insertMatch(match, options, cb) {
);
}
function decideProfile(cb) {
// We only do this if fresh match
async.each(
match.players,
(p, cb) => {
Expand All @@ -1403,6 +1382,7 @@ function insertMatch(match, options, cb) {
);
}
function decideGcData(cb) {
// We only do this for fresh matches
// Don't get replay URLs for event matches
if (
options.origin === 'scanner' &&
Expand All @@ -1426,6 +1406,9 @@ function insertMatch(match, options, cb) {
cb();
}
function decideReplayParse(cb) {
// We use params like skipParse and forceParse to determine whether we want to parse or not
// Otherwise this assumes a fresh match and checks to see if pro or tracked player
// Returns the created parse job (or null)
if (options.skipParse || match.game_mode === 19) {
// skipped or event games
// not parsing this match
Expand Down Expand Up @@ -1479,18 +1462,15 @@ function insertMatch(match, options, cb) {
async.series(
{
preprocess,
updateMatchSeriesType,
upsertMatchPostgres: (cb) => upsertMatchPostgres(cb),
getAverageRank,
upsertMatchCassandra,
upsertMatchBlobs,
upsertParsedMatch,
updateCassandraPlayerCaches,
clearRedisMatch,
clearRedisPlayer,
telemetry,
decideCounts,
decideScenarios,
decideBenchmarks,
decideMmr,
decideProfile,
Expand Down
14 changes: 14 additions & 0 deletions svc/parser.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import config from '../config.js';
import queue from '../store/queue.mjs';
import queries from '../store/queries.mjs';
import { promisify } from 'util';
import db from '../store/db.mjs';
const { runReliableQueue } = queue;
const { PORT, PARSER_PORT, NODE_ENV, PARSER_HOST, PARSER_PARALLELISM } = config;
const numCPUs = os.cpus().length;
Expand Down Expand Up @@ -43,6 +44,19 @@ async function runParse(match, url) {
type: 'parsed',
skipParse: true,
});
// Mark this match parsed
await db.raw(
'INSERT INTO parsed_matches(match_id) VALUES(?) ON CONFLICT DO NOTHING',
[Number(match.match_id)]
);
// Decide if we want to do scenarios (requires parsed match)
// Only if it originated from scanner to avoid triggering on requests
if (
match.origin === 'scanner' &&
match.match_id % 100 < config.SCENARIOS_SAMPLE_PERCENT
) {
return redis.rpush('scenariosQueue', match.match_id, cb);
}
}
async function parseProcessor(job, cb) {
const match = job;
Expand Down

0 comments on commit c463907

Please sign in to comment.