Skip to content

Commit

Permalink
refactor queue methods
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Dec 2, 2023
1 parent 82c66fc commit 13444c9
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 120 deletions.
42 changes: 20 additions & 22 deletions routes/spec.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,21 +1027,19 @@ The OpenDota API offers 50,000 free calls per month and a rate limit of 60 reque
},
},
route: () => '/players/:account_id/refresh',
func: (req, res, cb) => {
redis.rpush(
func: async (req, res, cb) => {
try {
const length = await queue.addJob(
'fhQueue',
JSON.stringify({
account_id: req.params.account_id || '1',
}),
(err, length) => {
if (err) {
return cb(err);
}
return res.json({
length,
});
}
);
}));
return res.json({
length,
});
} catch(e) {
return cb(e);
}
},
},
},
Expand Down Expand Up @@ -1620,16 +1618,16 @@ The OpenDota API offers 50,000 free calls per month and a rate limit of 60 reque
},
},
route: () => '/request/:jobId',
func: (req, res, cb) => {
queue.getJob(req.params.jobId, (err, job) => {
if (err) {
return cb(err);
}
if (job) {
return res.json({ ...job, jobId: job.id });
}
return res.json(null);
});
func: async (req, res, cb) => {
try {
const job = await queue.getReliableJob(req.params.jobId);
if (job) {
return res.json({ ...job, jobId: job.id });
}
return res.json(null);
} catch(e) {
return cb(e);
}
},
},
},
Expand Down
73 changes: 40 additions & 33 deletions store/queries.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ function insertPlayer(db, player, indexPlayer, cb) {
cb
);
}
export const insertPlayerPromise = util.promisify(insertPlayer);
function bulkIndexPlayer(bulkActions, cb) {
// Bulk call to ElasticSearch
if (bulkActions.length > 0) {
Expand Down Expand Up @@ -1315,7 +1316,8 @@ function insertMatch(match, options, cb) {
return cb();
}
if (options.origin === 'scanner') {
return redis.rpush('countsQueue', JSON.stringify(match), cb);
queue.addJob('countsQueue', JSON.stringify(match));
return cb();
}
return cb();
}
Expand All @@ -1325,7 +1327,8 @@ function insertMatch(match, options, cb) {
options.origin === 'scanner' &&
match.match_id % 100 < config.BENCHMARKS_SAMPLE_PERCENT
) {
return redis.rpush('parsedBenchmarksQueue', match.match_id, cb);
queue.addJob('parsedBenchmarksQueue', match.match_id);
return cb();
}
return cb();
}
Expand All @@ -1341,45 +1344,44 @@ function insertMatch(match, options, cb) {
p.account_id !== utility.getAnonymousAccountId() &&
config.ENABLE_RANDOM_MMR_UPDATE
) {
redis.rpush(
queue.addJob(
'mmrQueue',
JSON.stringify({
match_id: match.match_id,
account_id: p.account_id,
}),
cb
);
cb();
} else {
cb();
}
},
cb
);
}
function decideProfile(cb) {
async function decideProfile(cb) {
// We only do this if fresh match
async.each(
match.players,
(p, cb) => {
if (
match.match_id % 100 < Number(config.SCANNER_PLAYER_PERCENT) &&
options.origin === 'scanner' &&
p.account_id &&
p.account_id !== utility.getAnonymousAccountId()
) {
upsert(
db,
'players',
{ account_id: p.account_id },
{ account_id: p.account_id },
cb
);
} else {
cb();
}
},
cb
);
if (match.match_id % 100 < Number(config.SCANNER_PLAYER_PERCENT) && options.origin === 'scanner' && match.players) {
try {
const filteredPlayers = match.players.filter(p => {
return p.account_id &&
p.account_id !== utility.getAnonymousAccountId();
});
// Add a placeholder player with just the ID
await Promise.all(filteredPlayers.map(p => upsertPromise(
db,
'players',
{ account_id: p.account_id },
{ account_id: p.account_id },
)));
// We could also queue a profile job here but seems like a lot to update name after each match
cb();
} catch(e) {
cb(e);
}
} else {
cb();
}
}
function decideGcData(cb) {
// We only do this for fresh matches
Expand All @@ -1389,14 +1391,14 @@ function insertMatch(match, options, cb) {
match.game_mode !== 19 &&
match.match_id % 100 < Number(config.GCDATA_PERCENT)
) {
redis.rpush(
queue.addJob(
'gcQueue',
JSON.stringify({
match_id: match.match_id,
pgroup: match.pgroup,
}),
cb
);
cb();
} else {
cb();
}
Expand All @@ -1422,7 +1424,7 @@ function insertMatch(match, options, cb) {
cb(err, Boolean(score))
);
},
(err, hasTrackedPlayer) => {
async (err, hasTrackedPlayer) => {
if (err) {
return cb(err);
}
Expand All @@ -1435,7 +1437,8 @@ function insertMatch(match, options, cb) {
if (hasTrackedPlayer) {
priority = -2;
}
return queue.addJob(
try {
const job = await queue.addReliableJob(
'parse',
{
data: {
Expand All @@ -1452,8 +1455,11 @@ function insertMatch(match, options, cb) {
priority,
attempts: options.attempts || 15,
},
cb
);
cb(null, job);
} catch(e) {
cb(e);
}
}
return cb();
}
Expand All @@ -1473,7 +1479,7 @@ function insertMatch(match, options, cb) {
decideCounts,
decideBenchmarks,
decideMmr,
decideProfile,
decideProfile: (cb) => decideProfile(cb),
decideGcData,
decideMetaParse,
decideReplayParse,
Expand Down Expand Up @@ -1595,6 +1601,7 @@ export default {
upsert,
upsertPromise,
insertPlayer,
insertPlayerPromise,
bulkIndexPlayer,
insertMatchPromise,
insertPlayerRating,
Expand Down
32 changes: 14 additions & 18 deletions store/queue.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import moment from 'moment';
import async from 'async';
import redis from './redis.mjs';
import db from './db.mjs';

function runQueue(queueName, parallelism, processor) {
function processOneJob(cb) {
redis.blpop(queueName, '0', (err, job) => {
Expand Down Expand Up @@ -85,8 +86,11 @@ function runReliableQueue(queueName, parallelism, processor) {
});
}
}
function addJob(queueName, job, options, cb) {
db.raw(
async function addJob(queueName, job) {
return await redis.rpush(queueName, job);
}
async function addReliableJob(queueName, job, options) {
const result = await db.raw(
`INSERT INTO queue(type, timestamp, attempts, data, next_attempt_time, priority)
VALUES (?, ?, ?, ?, ?, ?)
RETURNING *`,
Expand All @@ -98,26 +102,18 @@ function addJob(queueName, job, options, cb) {
new Date(),
options.priority || 10,
]
).asCallback((err, result) => {
if (err) {
return cb(err);
}
return cb(err, result.rows[0]);
});
}
function getJob(jobId, cb) {
db.raw('SELECT * FROM queue WHERE id = ?', [jobId]).asCallback(
(err, result) => {
if (err) {
return cb(err);
}
return cb(err, result.rows[0]);
}
);
return result.rows[0];
}
async function getReliableJob(jobId) {
const result = await db.raw('SELECT * FROM queue WHERE id = ?', [jobId]);
return result.rows[0];
}

export default {
runQueue,
runReliableQueue,
addReliableJob,
getReliableJob,
addJob,
getJob,
};
4 changes: 2 additions & 2 deletions svc/autofullhistory.mjs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Randomly requests history refreshes for users to fill in missing matches
import db from '../store/db.mjs';
import redis from '../store/redis.mjs';
import queue from '../store/queue.mjs';

while(true) {
const result = await db.raw(
"SELECT account_id from players TABLESAMPLE SYSTEM_ROWS(100) where last_match_time > (now() - interval '7 day')"
);
console.log(result.rows);
await Promise.all(result.rows.map(row => redis.rpush(
await Promise.all(result.rows.map(row => queue.addJob(
'fhQueue',
JSON.stringify({
account_id: row.account_id,
Expand Down
2 changes: 1 addition & 1 deletion svc/parser.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async function parseProcessor(job, cb) {
match.origin === 'scanner' &&
match.match_id % 100 < config.SCENARIOS_SAMPLE_PERCENT
) {
return redis.rpush('scenariosQueue', match.match_id, cb);
await queue.addJob('scenariosQueue', match.match_id);
}
console.log('[PARSER] completed parse of match %s', match.match_id);
cb(null, match.match_id);
Expand Down
54 changes: 13 additions & 41 deletions svc/profiler.mjs
Original file line number Diff line number Diff line change
@@ -1,33 +1,22 @@
// Processes a queue of requests to update Steam profile data
// TODO use top level await
import async from 'async';
// Updates Steam profile data for players periodically
import queries from '../store/queries.mjs';
import db from '../store/db.mjs';
import utility from '../util/utility.mjs';
const { insertPlayer, bulkIndexPlayer } = queries;
const { insertPlayerPromise, bulkIndexPlayer } = queries;
const { getData, generateJob, convert64to32 } = utility;
function getSummaries(cb) {
db.raw(

while(true) {
// To optimize the api call we need to do 100 players at a time
// We sample 100 random rows from the DB, with the downside that we might update a lot of inactive players
// Alternatively we could also trigger updates from match insert to target active players
const result = await db.raw(
'SELECT account_id from players TABLESAMPLE SYSTEM_ROWS(100)'
).asCallback((err, result) => {
if (err) {
return cb(err);
}
);
const container = generateJob('api_summaries', {
players: result.rows,
});
// Request rank_tier data for these players
// result.rows.forEach((row) => {
// redis.rpush('mmrQueue', JSON.stringify({
// match_id: null,
// account_id: row.account_id,
// }));
// });
return getData(container.url, (err, body) => {
if (err) {
// couldn't get data from api, non-retryable
return cb(JSON.stringify(err));
}
// We can also queue a rank tier/MMR request for these players
const body = await getDataPromise(container.url);
const results = body.response.players.filter((player) => player.steamid);
const bulkUpdate = results.reduce((acc, player) => {
acc.push(
Expand All @@ -51,23 +40,6 @@ function getSummaries(cb) {
console.log(err);
}
});
// player summaries response
return async.each(
results,
(player, cb) => {
insertPlayer(db, player, false, cb);
},
cb
);
});
});
}
function start() {
getSummaries((err) => {
if (err) {
throw err;
}
return setTimeout(start, 500);
});
await Promise.all(results.map(player => insertPlayerPromise(db, player, false)));
await new Promise(resolve => setTimeout(resolve, 1000));
}
start();
4 changes: 1 addition & 3 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const supertest = require('supertest');
const stripeLib = require('stripe');
const pg = require('pg');
const fs = require('fs');
const util = require('util');
const cassandraDriver = require('cassandra-driver');
const swaggerParser = require('@apidevtools/swagger-parser');
const config = require('../config');
Expand Down Expand Up @@ -884,9 +883,8 @@ async function loadMatches(cb) {

async function loadPlayers(cb) {
console.log('loading players');
const insertPlayerPromise = util.promisify(queries.insertPlayer);
await Promise.all(
summariesApi.response.players.map((p) => insertPlayerPromise(db, p, true))
summariesApi.response.players.map((p) => queries.insertPlayerPromise(db, p, true))
);
cb();
}
Expand Down

0 comments on commit 13444c9

Please sign in to comment.