From 13444c9091ae1dcfd62f839c3a70f07969b8509e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 2 Dec 2023 15:06:57 +0000 Subject: [PATCH] refactor queue methods --- routes/spec.mjs | 42 +++++++++++------------- store/queries.mjs | 73 ++++++++++++++++++++++------------------- store/queue.mjs | 32 ++++++++---------- svc/autofullhistory.mjs | 4 +-- svc/parser.mjs | 2 +- svc/profiler.mjs | 54 ++++++++---------------------- test/test.js | 4 +-- 7 files changed, 91 insertions(+), 120 deletions(-) diff --git a/routes/spec.mjs b/routes/spec.mjs index d269f58e4..c1edf399f 100644 --- a/routes/spec.mjs +++ b/routes/spec.mjs @@ -1027,21 +1027,19 @@ The OpenDota API offers 50,000 free calls per month and a rate limit of 60 reque }, }, route: () => '/players/:account_id/refresh', - func: (req, res, cb) => { - redis.rpush( + func: async (req, res, cb) => { + try { + const length = await queue.addJob( 'fhQueue', JSON.stringify({ account_id: req.params.account_id || '1', - }), - (err, length) => { - if (err) { - return cb(err); - } - return res.json({ - length, - }); - } - ); + })); + return res.json({ + length, + }); + } catch(e) { + return cb(e); + } }, }, }, @@ -1620,16 +1618,16 @@ The OpenDota API offers 50,000 free calls per month and a rate limit of 60 reque }, }, route: () => '/request/:jobId', - func: (req, res, cb) => { - queue.getJob(req.params.jobId, (err, job) => { - if (err) { - return cb(err); - } - if (job) { - return res.json({ ...job, jobId: job.id }); - } - return res.json(null); - }); + func: async (req, res, cb) => { + try { + const job = await queue.getReliableJob(req.params.jobId); + if (job) { + return res.json({ ...job, jobId: job.id }); + } + return res.json(null); + } catch(e) { + return cb(e); + } }, }, }, diff --git a/store/queries.mjs b/store/queries.mjs index 06144ddcc..e90be2071 100644 --- a/store/queries.mjs +++ b/store/queries.mjs @@ -718,6 +718,7 @@ function insertPlayer(db, player, indexPlayer, cb) { cb ); } +export const insertPlayerPromise = util.promisify(insertPlayer); function bulkIndexPlayer(bulkActions, cb) { // Bulk call to ElasticSearch if (bulkActions.length > 0) { @@ -1315,7 +1316,8 @@ function insertMatch(match, options, cb) { return cb(); } if (options.origin === 'scanner') { - return redis.rpush('countsQueue', JSON.stringify(match), cb); + queue.addJob('countsQueue', JSON.stringify(match)); + return cb(); } return cb(); } @@ -1325,7 +1327,8 @@ function insertMatch(match, options, cb) { options.origin === 'scanner' && match.match_id % 100 < config.BENCHMARKS_SAMPLE_PERCENT ) { - return redis.rpush('parsedBenchmarksQueue', match.match_id, cb); + queue.addJob('parsedBenchmarksQueue', match.match_id); + return cb(); } return cb(); } @@ -1341,14 +1344,14 @@ function insertMatch(match, options, cb) { p.account_id !== utility.getAnonymousAccountId() && config.ENABLE_RANDOM_MMR_UPDATE ) { - redis.rpush( + queue.addJob( 'mmrQueue', JSON.stringify({ match_id: match.match_id, account_id: p.account_id, }), - cb ); + cb(); } else { cb(); } @@ -1356,30 +1359,29 @@ function insertMatch(match, options, cb) { cb ); } - function decideProfile(cb) { + async function decideProfile(cb) { // We only do this if fresh match - async.each( - match.players, - (p, cb) => { - if ( - match.match_id % 100 < Number(config.SCANNER_PLAYER_PERCENT) && - options.origin === 'scanner' && - p.account_id && - p.account_id !== utility.getAnonymousAccountId() - ) { - upsert( - db, - 'players', - { account_id: p.account_id }, - { account_id: p.account_id }, - cb - ); - } else { - cb(); - } - }, - cb - ); + if (match.match_id % 100 < Number(config.SCANNER_PLAYER_PERCENT) && options.origin === 'scanner' && match.players) { + try { + const filteredPlayers = match.players.filter(p => { + return p.account_id && + p.account_id !== utility.getAnonymousAccountId(); + }); + // Add a placeholder player with just the ID + await Promise.all(filteredPlayers.map(p => upsertPromise( + db, + 'players', + { account_id: p.account_id }, + { account_id: p.account_id }, + ))); + // We could also queue a profile job here but seems like a lot to update name after each match + cb(); + } catch(e) { + cb(e); + } + } else { + cb(); + } } function decideGcData(cb) { // We only do this for fresh matches @@ -1389,14 +1391,14 @@ function insertMatch(match, options, cb) { match.game_mode !== 19 && match.match_id % 100 < Number(config.GCDATA_PERCENT) ) { - redis.rpush( + queue.addJob( 'gcQueue', JSON.stringify({ match_id: match.match_id, pgroup: match.pgroup, }), - cb ); + cb(); } else { cb(); } @@ -1422,7 +1424,7 @@ function insertMatch(match, options, cb) { cb(err, Boolean(score)) ); }, - (err, hasTrackedPlayer) => { + async (err, hasTrackedPlayer) => { if (err) { return cb(err); } @@ -1435,7 +1437,8 @@ function insertMatch(match, options, cb) { if (hasTrackedPlayer) { priority = -2; } - return queue.addJob( + try { + const job = await queue.addReliableJob( 'parse', { data: { @@ -1452,8 +1455,11 @@ function insertMatch(match, options, cb) { priority, attempts: options.attempts || 15, }, - cb ); + cb(null, job); + } catch(e) { + cb(e); + } } return cb(); } @@ -1473,7 +1479,7 @@ function insertMatch(match, options, cb) { decideCounts, decideBenchmarks, decideMmr, - decideProfile, + decideProfile: (cb) => decideProfile(cb), decideGcData, decideMetaParse, decideReplayParse, @@ -1595,6 +1601,7 @@ export default { upsert, upsertPromise, insertPlayer, + insertPlayerPromise, bulkIndexPlayer, insertMatchPromise, insertPlayerRating, diff --git a/store/queue.mjs b/store/queue.mjs index e26ba8e49..d15c8763b 100644 --- a/store/queue.mjs +++ b/store/queue.mjs @@ -2,6 +2,7 @@ import moment from 'moment'; import async from 'async'; import redis from './redis.mjs'; import db from './db.mjs'; + function runQueue(queueName, parallelism, processor) { function processOneJob(cb) { redis.blpop(queueName, '0', (err, job) => { @@ -85,8 +86,11 @@ function runReliableQueue(queueName, parallelism, processor) { }); } } -function addJob(queueName, job, options, cb) { - db.raw( +async function addJob(queueName, job) { + return await redis.rpush(queueName, job); +} +async function addReliableJob(queueName, job, options) { + const result = await db.raw( `INSERT INTO queue(type, timestamp, attempts, data, next_attempt_time, priority) VALUES (?, ?, ?, ?, ?, ?) RETURNING *`, @@ -98,26 +102,18 @@ function addJob(queueName, job, options, cb) { new Date(), options.priority || 10, ] - ).asCallback((err, result) => { - if (err) { - return cb(err); - } - return cb(err, result.rows[0]); - }); -} -function getJob(jobId, cb) { - db.raw('SELECT * FROM queue WHERE id = ?', [jobId]).asCallback( - (err, result) => { - if (err) { - return cb(err); - } - return cb(err, result.rows[0]); - } ); + return result.rows[0]; +} +async function getReliableJob(jobId) { + const result = await db.raw('SELECT * FROM queue WHERE id = ?', [jobId]); + return result.rows[0]; } + export default { runQueue, runReliableQueue, + addReliableJob, + getReliableJob, addJob, - getJob, }; diff --git a/svc/autofullhistory.mjs b/svc/autofullhistory.mjs index 05da71cfe..f27fc4894 100644 --- a/svc/autofullhistory.mjs +++ b/svc/autofullhistory.mjs @@ -1,13 +1,13 @@ // Randomly requests history refreshes for users to fill in missing matches import db from '../store/db.mjs'; -import redis from '../store/redis.mjs'; +import queue from '../store/queue.mjs'; while(true) { const result = await db.raw( "SELECT account_id from players TABLESAMPLE SYSTEM_ROWS(100) where last_match_time > (now() - interval '7 day')" ); console.log(result.rows); - await Promise.all(result.rows.map(row => redis.rpush( + await Promise.all(result.rows.map(row => queue.addJob( 'fhQueue', JSON.stringify({ account_id: row.account_id, diff --git a/svc/parser.mjs b/svc/parser.mjs index 2eeca9c01..488e855bb 100755 --- a/svc/parser.mjs +++ b/svc/parser.mjs @@ -63,7 +63,7 @@ async function parseProcessor(job, cb) { match.origin === 'scanner' && match.match_id % 100 < config.SCENARIOS_SAMPLE_PERCENT ) { - return redis.rpush('scenariosQueue', match.match_id, cb); + await queue.addJob('scenariosQueue', match.match_id); } console.log('[PARSER] completed parse of match %s', match.match_id); cb(null, match.match_id); diff --git a/svc/profiler.mjs b/svc/profiler.mjs index 8db056a36..198cdaadf 100644 --- a/svc/profiler.mjs +++ b/svc/profiler.mjs @@ -1,33 +1,22 @@ -// Processes a queue of requests to update Steam profile data -// TODO use top level await -import async from 'async'; +// Updates Steam profile data for players periodically import queries from '../store/queries.mjs'; import db from '../store/db.mjs'; import utility from '../util/utility.mjs'; -const { insertPlayer, bulkIndexPlayer } = queries; +const { insertPlayerPromise, bulkIndexPlayer } = queries; const { getData, generateJob, convert64to32 } = utility; -function getSummaries(cb) { - db.raw( + +while(true) { + // To optimize the api call we need to do 100 players at a time + // We sample 100 random rows from the DB, with the downside that we might update a lot of inactive players + // Alternatively we could also trigger updates from match insert to target active players + const result = await db.raw( 'SELECT account_id from players TABLESAMPLE SYSTEM_ROWS(100)' - ).asCallback((err, result) => { - if (err) { - return cb(err); - } + ); const container = generateJob('api_summaries', { players: result.rows, }); - // Request rank_tier data for these players - // result.rows.forEach((row) => { - // redis.rpush('mmrQueue', JSON.stringify({ - // match_id: null, - // account_id: row.account_id, - // })); - // }); - return getData(container.url, (err, body) => { - if (err) { - // couldn't get data from api, non-retryable - return cb(JSON.stringify(err)); - } + // We can also queue a rank tier/MMR request for these players + const body = await getDataPromise(container.url); const results = body.response.players.filter((player) => player.steamid); const bulkUpdate = results.reduce((acc, player) => { acc.push( @@ -51,23 +40,6 @@ function getSummaries(cb) { console.log(err); } }); - // player summaries response - return async.each( - results, - (player, cb) => { - insertPlayer(db, player, false, cb); - }, - cb - ); - }); - }); -} -function start() { - getSummaries((err) => { - if (err) { - throw err; - } - return setTimeout(start, 500); - }); + await Promise.all(results.map(player => insertPlayerPromise(db, player, false))); + await new Promise(resolve => setTimeout(resolve, 1000)); } -start(); diff --git a/test/test.js b/test/test.js index cd29c22f3..d8663a224 100644 --- a/test/test.js +++ b/test/test.js @@ -10,7 +10,6 @@ const supertest = require('supertest'); const stripeLib = require('stripe'); const pg = require('pg'); const fs = require('fs'); -const util = require('util'); const cassandraDriver = require('cassandra-driver'); const swaggerParser = require('@apidevtools/swagger-parser'); const config = require('../config'); @@ -884,9 +883,8 @@ async function loadMatches(cb) { async function loadPlayers(cb) { console.log('loading players'); - const insertPlayerPromise = util.promisify(queries.insertPlayer); await Promise.all( - summariesApi.response.players.map((p) => insertPlayerPromise(db, p, true)) + summariesApi.response.players.map((p) => queries.insertPlayerPromise(db, p, true)) ); cb(); }