From c463907a440474ac9c80e0b3fd287b80561a141c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 2 Dec 2023 11:28:00 +0000 Subject: [PATCH] move some inserts to specific workers --- config.js | 2 +- store/getGcData.mjs | 5 ++++ store/queries.mjs | 62 +++++++++++++++------------------------------ svc/parser.mjs | 14 ++++++++++ 4 files changed, 41 insertions(+), 42 deletions(-) diff --git a/config.js b/config.js index a11bc0c31..9fe52a097 100755 --- a/config.js +++ b/config.js @@ -1,5 +1,5 @@ /** - * File managing configuration for the applicaption + * File managing configuration for the application * */ const dotenv = require('dotenv'); const fs = require('fs'); diff --git a/store/getGcData.mjs b/store/getGcData.mjs index 54387803a..5886905f1 100644 --- a/store/getGcData.mjs +++ b/store/getGcData.mjs @@ -60,6 +60,11 @@ async function getGcDataFromRetriever(match) { type: 'gcdata', skipParse: true, }); + // Update series id and type for pro match + await db.raw( + 'UPDATE matches SET series_id = ?, series_type = ? WHERE match_id = ?', + [matchToInsert.series_id, matchToInsert.series_type, match.match_id] + ).asCallback(cb); // Persist GC data to database await upsertPromise(db, 'match_gcdata', gcdata, { match_id: match.match_id, diff --git a/store/queries.mjs b/store/queries.mjs index d31aa270e..28a4ee314 100644 --- a/store/queries.mjs +++ b/store/queries.mjs @@ -963,7 +963,11 @@ function insertMatch(match, options, cb) { 5288: 'track', 5368: 'greevils_greed', }; + // We currently can call this function from many places + // There is a type to indicate source: api, gcdata, parsed + // Also an origin to indicate the context: scanner (fresh match) or request function preprocess(cb) { + // We always do this // don't insert anonymous account id if (players) { players.forEach((p) => { @@ -1009,35 +1013,11 @@ function insertMatch(match, options, cb) { } cb(); } - function updateMatchSeriesType(cb) { - if (options.type === 'gcdata') { - db.raw( - 'UPDATE matches SET series_id = ?, series_type = ? WHERE match_id = ?', - [match.series_id, match.series_type, match.match_id] - ).asCallback(cb); - } else { - cb(); - } - } - function upsertParsedMatch(cb) { - if (match.version) { - return upsert( - db, - 'parsed_matches', - { - match_id: match.match_id, - }, - { - match_id: match.match_id, - }, - cb - ); - } - return cb(); - } async function upsertMatchPostgres(cb) { + // Insert the pro match data: We do this if api or parser if (options.type === 'api' && !utility.isProMatch(match)) { - // Check if the match is legit if this is API insert + // Check whether we care about this match for pro purposes + // We need the basic match data to run the check, so only do it if type is api return cb(); } // Check if leagueid is premium/professional @@ -1191,6 +1171,8 @@ function insertMatch(match, options, cb) { }); } function getAverageRank(cb) { + // Only fetch the average_rank if this is a fresh match since otherwise it won't be accurate + // We currently only store this in the player_caches table, not in the match itself if (options.origin === 'scanner') { getMatchRankTier(match, (err, avg) => { match.average_rank = avg || null; @@ -1201,8 +1183,7 @@ function insertMatch(match, options, cb) { } } function upsertMatchCassandra(cb) { - // NOTE parsed insert doesn't have original match info so can't archive here - // unless we insert then read it back from cassandra + // We do this regardless of type (with different sets of fields) return cleanRowCassandra(cassandra, 'matches', match, (err, match) => { if (err) { return cb(err); @@ -1281,6 +1262,8 @@ function insertMatch(match, options, cb) { cb(); } function updateCassandraPlayerCaches(cb) { + // Add the 10 player_match rows indexed by player + // We currently do this on all types const copy = createMatchCopy(match, players); return insertPlayerCache(copy, cb); } @@ -1308,6 +1291,7 @@ function insertMatch(match, options, cb) { return cb(); } function clearRedisMatch(cb) { + // Clear out the Redis caches, we do this regardless of insert type redis.del(`match:${match.match_id}`, cb); } function clearRedisPlayer(cb) { @@ -1326,6 +1310,7 @@ function insertMatch(match, options, cb) { ); } function decideCounts(cb) { + // We only do this if fresh match if (options.skipCounts) { return cb(); } @@ -1334,16 +1319,8 @@ function insertMatch(match, options, cb) { } return cb(); } - function decideScenarios(cb) { - if ( - options.type === 'parsed' && - match.match_id % 100 < config.SCENARIOS_SAMPLE_PERCENT - ) { - return redis.rpush('scenariosQueue', match.match_id, cb); - } - return cb(); - } function decideBenchmarks(cb) { + // We only do this if fresh match if ( options.origin === 'scanner' && match.match_id % 100 < config.BENCHMARKS_SAMPLE_PERCENT @@ -1353,6 +1330,7 @@ function insertMatch(match, options, cb) { return cb(); } function decideMmr(cb) { + // We only do this if fresh match and ranked async.each( match.players, (p, cb) => { @@ -1379,6 +1357,7 @@ function insertMatch(match, options, cb) { ); } function decideProfile(cb) { + // We only do this if fresh match async.each( match.players, (p, cb) => { @@ -1403,6 +1382,7 @@ function insertMatch(match, options, cb) { ); } function decideGcData(cb) { + // We only do this for fresh matches // Don't get replay URLs for event matches if ( options.origin === 'scanner' && @@ -1426,6 +1406,9 @@ function insertMatch(match, options, cb) { cb(); } function decideReplayParse(cb) { + // We use params like skipParse and forceParse to determine whether we want to parse or not + // Otherwise this assumes a fresh match and checks to see if pro or tracked player + // Returns the created parse job (or null) if (options.skipParse || match.game_mode === 19) { // skipped or event games // not parsing this match @@ -1479,18 +1462,15 @@ function insertMatch(match, options, cb) { async.series( { preprocess, - updateMatchSeriesType, upsertMatchPostgres: (cb) => upsertMatchPostgres(cb), getAverageRank, upsertMatchCassandra, upsertMatchBlobs, - upsertParsedMatch, updateCassandraPlayerCaches, clearRedisMatch, clearRedisPlayer, telemetry, decideCounts, - decideScenarios, decideBenchmarks, decideMmr, decideProfile, diff --git a/svc/parser.mjs b/svc/parser.mjs index ff7e23f6e..4b8dba652 100755 --- a/svc/parser.mjs +++ b/svc/parser.mjs @@ -14,6 +14,7 @@ import config from '../config.js'; import queue from '../store/queue.mjs'; import queries from '../store/queries.mjs'; import { promisify } from 'util'; +import db from '../store/db.mjs'; const { runReliableQueue } = queue; const { PORT, PARSER_PORT, NODE_ENV, PARSER_HOST, PARSER_PARALLELISM } = config; const numCPUs = os.cpus().length; @@ -43,6 +44,19 @@ async function runParse(match, url) { type: 'parsed', skipParse: true, }); + // Mark this match parsed + await db.raw( + 'INSERT INTO parsed_matches(match_id) VALUES(?) ON CONFLICT DO NOTHING', + [Number(match.match_id)] + ); + // Decide if we want to do scenarios (requires parsed match) + // Only if it originated from scanner to avoid triggering on requests + if ( + match.origin === 'scanner' && + match.match_id % 100 < config.SCENARIOS_SAMPLE_PERCENT + ) { + return redis.rpush('scenariosQueue', match.match_id, cb); + } } async function parseProcessor(job, cb) { const match = job;