Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental new bucket-style queues for feeds. #911

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Stores/MemoryStorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
private storedFiles = new QuickLRU<string, string>({ maxSize: 128 });
private gitlabDiscussionThreads = new Map<string, SerializedGitlabDiscussionThreads>();
private feedGuids = new Map<string, Array<string>>();
private feedQueries: {[url: string]: number} = {};
private feedQueriesResetTime = 0;

constructor() {
super();
Expand All @@ -29,6 +31,7 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
while (set.length > MAX_FEED_ITEMS) {
set.pop();
}
this.feedQueries[url] = (this.feedQueries[url] ?? 0) + 1
}

async hasSeenFeed(url: string): Promise<boolean> {
Expand Down Expand Up @@ -108,4 +111,14 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
public async setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise<void> {
this.gitlabDiscussionThreads.set(connectionId, value);
}

public async resetFeedQueryCount() {
const scores = {...this.feedQueries};
this.feedQueriesResetTime = Date.now();
this.feedQueries = {};
return {
scores,
lastQueryTime: this.feedQueriesResetTime,
};
}
}
20 changes: 20 additions & 0 deletions src/Stores/RedisStorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const WIDGET_TOKENS = "widgets.tokens.";
const WIDGET_USER_TOKENS = "widgets.user-tokens.";

const FEED_GUIDS = "feeds.guids.";
const FEED_QUERY_SCORE = "feeds.queryscore";
const FEED_QUERY_SCORE_RESET_TIME = "feeds.queryscore.resettime";

const log = new Logger("RedisASProvider");

Expand Down Expand Up @@ -219,6 +221,7 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
const feedKey = `${FEED_GUIDS}${url}`;
await this.redis.lpush(feedKey, ...guids);
await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS);
await this.redis.zadd(FEED_QUERY_SCORE, "INCR", 1, url);
}

public async hasSeenFeed(url: string): Promise<boolean> {
Expand All @@ -239,4 +242,21 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
}
return guids.filter((_guid, index) => res[index][1] !== null);
}

public async resetFeedQueryCount() {
const scores: {[key: string]: number} = {};
const data = await this.redis.zrangebyscore(FEED_QUERY_SCORE, "-inf", "+inf", "WITHSCORES");
for (let index = 0; index < data.length; index += 2) {
const key = data[index];
const value = parseInt(data[index+1]);
scores[key] = value;
}
const lastQueryTime = parseInt((await this.redis.get(FEED_QUERY_SCORE_RESET_TIME)) ?? "0");
await this.redis.del(FEED_QUERY_SCORE);
await this.redis.set(FEED_QUERY_SCORE_RESET_TIME, Date.now());
return {
scores,
lastQueryTime,
};
}
}
4 changes: 4 additions & 0 deletions src/Stores/StorageProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto
storeFeedGuids(url: string, ...guids: string[]): Promise<void>;
hasSeenFeed(url: string): Promise<boolean>;
hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]>;
resetFeedQueryCount(): Promise<{
scores: Record<string, number>,
lastQueryTime: number
}>;
}
153 changes: 124 additions & 29 deletions src/feeds/FeedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,49 @@ export class FeedReader {

private connections: FeedConnection[];

private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS);
/**
* Poll [1] every time.
* Poll [2] every 4th time.
* Poll [3] every 8th time.
* Poll [4] every 16th time.
*
* Every N hours, move between buckets.
*/

private readonly intakeBucket = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS);

// TODO: Make this configurable.
private readonly feedBuckets = new Map([
[0, this.intakeBucket], // < feedInterval
[4, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // < 4xfeedInvterval
[8, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // < 8xfeedInvterval
[16, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // rest
]);
private readonly lastBucket = 16;
private bucketIteration = 0;


// A set of last modified times for each url.
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
private readonly cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();

// Reason failures to url map.
private feedsFailingHttp = new Set();
private feedsFailingParsing = new Set();
private readonly feedsFailingHttp = new Set();
private readonly feedsFailingParsing = new Set();

static readonly seenEntriesEventType = "uk.half-shot.matrix-hookshot.feed.reader.seenEntries";

private shouldRun = true;
private readonly timeouts: (NodeJS.Timeout|undefined)[];
private readonly feedsToRetain = new Set();

get feedCount() {
return Object.values(this.feedBuckets).map(b => b.length()).reduce((a,b) => a+b, 0);
}

get sleepingInterval() {
return (
// Calculate the number of MS to wait in between feeds.
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1)
(this.config.pollIntervalSeconds * 1000) / (this.feedCount || 1)
// And multiply by the number of concurrent readers
) * this.config.pollConcurrency;
}
Expand All @@ -125,7 +149,7 @@ export class FeedReader {
const normalisedUrl = normalizeUrl(newConnection.feedUrl);
if (!feeds.has(normalisedUrl)) {
log.info(`Connection added, adding "${normalisedUrl}" to queue`);
this.feedQueue.push(normalisedUrl);
this.intakeBucket.push(normalisedUrl);
feeds.add(normalisedUrl);
Metrics.feedsCount.inc();
Metrics.feedsCountDeprecated.inc();
Expand All @@ -151,7 +175,7 @@ export class FeedReader {
}
log.info(`Connection removed, removing "${normalisedUrl}" from queue`);
this.feedsToRetain.delete(normalisedUrl);
this.feedQueue.remove(normalisedUrl);
Object.values(this.feedBuckets).some(bucket => bucket.remove(normalisedUrl));
feeds.delete(normalisedUrl);
this.feedsFailingHttp.delete(normalisedUrl);
this.feedsFailingParsing.delete(normalisedUrl);
Expand All @@ -164,6 +188,61 @@ export class FeedReader {
for (let i = 0; i < config.pollConcurrency; i++) {
void this.pollFeeds(i);
}

this.configureBucketInterval();
}

/**
* Resort feeds into the correct queues based on how
* often they return new values.
*/
public configureBucketInterval() {
// TOOD: Run this function on startup to immediate sort into buckets.
const bucketEntries = [...this.feedBuckets.entries()];
setInterval(async () => {
const { scores, lastQueryTime } = await this.storage.resetFeedQueryCount();
if (lastQueryTime === 0) {
log.debug(`Skipping first bucket interval check, not enough data present.`);
// Skip, not enough data.
return;
}
const timePassed = Date.now() - lastQueryTime;
// Determine a reasonable approximation of how much the feed may have been polled in the
// last time period.
const maximumFastestPotential = timePassed / this.sleepingInterval;
// TODO: This should be all feeds, so we can catch any that don't even score...
for (const [url, pollCount] of Object.entries(scores)) {
// Remove current bucket
const feedCurrentBucket = bucketEntries.find((b) => b[1].remove(url));
if (!feedCurrentBucket) {
log.warn(`Feed ${url} was not found in any buckets!!!`);
continue;
}
// E.g. Bucket 1 polls only 4 times as often as bucket 0, so we want to ensure we weight these fairly.
const maximumPotential = maximumFastestPotential * (1 / (feedCurrentBucket[0] || 1));
log.debug(`Determining new bucket for ${url}. Maximum possible poll rate would be ${maximumPotential}. Score was ${pollCount}`);

// Determine new bucket
for (let bIndex = 0; bIndex < bucketEntries.length; bIndex++) {
const currentBucket = bucketEntries[bIndex];
const nextBucket = bucketEntries[bIndex+1];
if (!nextBucket) {
// We're at the end, just push to the last bucket because this is a sad feed that didn't
// get any items.
currentBucket[1].push(url);
break;
}
// ((16 - (4 / 16)) / 16)*50
const nextBucketMinSuccessRate = (1 - (nextBucket[0] / this.lastBucket))*maximumPotential;
log.debug(`Min success rate target for bucket ${nextBucket[0]} is ${nextBucketMinSuccessRate}`);
if (pollCount >= nextBucketMinSuccessRate) {
log.debug(`Adding feed ${url} to ${currentBucket[0]}`);
currentBucket[1].push(url);
break;
}
}
}
}, 30000);
}

public stop() {
Expand All @@ -185,7 +264,7 @@ export class FeedReader {
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
}
}
this.feedQueue.populate([...observedFeedUrls]);
this.intakeBucket.populate([...observedFeedUrls]);
Metrics.feedsCount.set(observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(observedFeedUrls.size);
return observedFeedUrls;
Expand All @@ -199,7 +278,7 @@ export class FeedReader {
* @param url The URL to be polled.
* @returns A boolean that returns if we saw any changes on the feed since the last poll time.
*/
public async pollFeed(url: string): Promise<boolean> {
public async pollFeed(url: string, feedQueue: QueueWithBackoff): Promise<boolean> {
// If a feed is deleted while it is being polled, we need
// to remember NOT to add it back to the queue. This
// set keeps track of all the feeds that *should* be
Expand Down Expand Up @@ -236,6 +315,7 @@ export class FeedReader {
// If undefined, we got a not-modified.
log.debug(`Found ${feed.items.length} entries in ${url}`);
const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!))
log.debug(`Seen ${seenItems.join(', ')} entries for ${url} already`);
for (const item of feed.items) {
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
if (!item.hashId) {
Expand Down Expand Up @@ -280,7 +360,7 @@ export class FeedReader {
this.feedsFailingParsing.delete(url);
if (this.feedsToRetain.has(url)) {
// If we've removed this feed since processing it, do not requeue.
this.feedQueue.push(url);
feedQueue.push(url);
}
} catch (err: unknown) {
// TODO: Proper Rust Type error.
Expand All @@ -289,7 +369,7 @@ export class FeedReader {
} else {
this.feedsFailingParsing.add(url);
}
const backoffDuration = this.feedQueue.backoff(url);
const backoffDuration = feedQueue.backoff(url);
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
const feedError = new FeedError(url.toString(), error, fetchKey);
log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`);
Expand All @@ -309,29 +389,44 @@ export class FeedReader {
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);

log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`);
log.debug(`Checking for updates in ${this.feedCount} RSS/Atom feeds (worker: ${workerId})`);

const fetchingStarted = Date.now();
// TODO: Dehardcode
if (this.bucketIteration > this.lastBucket) {
this.bucketIteration = 0;
}
this.bucketIteration++;
const queuesToPoll = [...this.feedBuckets.entries()].filter(([bucketInterval]) => bucketInterval % this.bucketIteration === 0);

const url = this.feedQueue.pop();
let sleepFor = this.sleepingInterval;
const fetchingStarted = Date.now();

if (url) {
if (await this.pollFeed(url)) {
log.debug(`Feed changed and will be saved`);
}
const elapsed = Date.now() - fetchingStarted;
Metrics.feedFetchMs.set(elapsed);
Metrics.feedsFetchMsDeprecated.set(elapsed);
sleepFor = Math.max(this.sleepingInterval - elapsed, 0);
log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`);

if (elapsed > this.sleepingInterval) {
log.warn(`It took us longer to update the feeds than the configured pool interval`);
let sleepFor = 0;
for (const [queueNumber, queue] of queuesToPoll) {
log.debug(`Fetching from queue ${queueNumber}`);
const url = queue.pop();
if (url) {
if (await this.pollFeed(url, queue)) {
log.debug(`Feed changed and will be saved`);
}
const elapsed = Date.now() - fetchingStarted;
log.debug(`Feed fetching took ${elapsed / 1000}s,`);
Metrics.feedFetchMs.set(elapsed);
Metrics.feedsFetchMsDeprecated.set(elapsed);
sleepFor += Math.max(this.sleepingInterval - elapsed, 0);
} else {
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
log.debug(`No feeds available to poll for worker ${workerId} on this bucket`);
// So we don't tightloop
if (sleepFor === 0) {
sleepFor += this.sleepingInterval;
}
}
}

if (this.sleepingInterval === 0) {
log.warn(`It took us longer to update the feeds than the configured pool interval`);
} else {
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
log.debug(`No feeds available to poll for worker ${workerId}`);
log.debug(`Sleeping for ${sleepFor / 1000}s`);
}

this.timeouts[workerId] = setTimeout(() => {
Expand Down
Loading