diff --git a/src/Stores/MemoryStorageProvider.ts b/src/Stores/MemoryStorageProvider.ts index 52b5bf54..01b401eb 100644 --- a/src/Stores/MemoryStorageProvider.ts +++ b/src/Stores/MemoryStorageProvider.ts @@ -14,6 +14,8 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider private storedFiles = new QuickLRU({ maxSize: 128 }); private gitlabDiscussionThreads = new Map(); private feedGuids = new Map>(); + private feedQueries: {[url: string]: number} = {}; + private feedQueriesResetTime = 0; constructor() { super(); @@ -29,6 +31,7 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider while (set.length > MAX_FEED_ITEMS) { set.pop(); } + this.feedQueries[url] = (this.feedQueries[url] ?? 0) + 1 } async hasSeenFeed(url: string): Promise { @@ -108,4 +111,14 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider public async setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise { this.gitlabDiscussionThreads.set(connectionId, value); } + + public async resetFeedQueryCount() { + const scores = {...this.feedQueries}; + this.feedQueriesResetTime = Date.now(); + this.feedQueries = {}; + return { + scores, + lastQueryTime: this.feedQueriesResetTime, + }; + } } diff --git a/src/Stores/RedisStorageProvider.ts b/src/Stores/RedisStorageProvider.ts index 12c42a79..d2914e7e 100644 --- a/src/Stores/RedisStorageProvider.ts +++ b/src/Stores/RedisStorageProvider.ts @@ -28,6 +28,8 @@ const WIDGET_TOKENS = "widgets.tokens."; const WIDGET_USER_TOKENS = "widgets.user-tokens."; const FEED_GUIDS = "feeds.guids."; +const FEED_QUERY_SCORE = "feeds.queryscore"; +const FEED_QUERY_SCORE_RESET_TIME = "feeds.queryscore.resettime"; const log = new Logger("RedisASProvider"); @@ -219,6 +221,7 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme const feedKey = `${FEED_GUIDS}${url}`; await this.redis.lpush(feedKey, ...guids); await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS); + await this.redis.zadd(FEED_QUERY_SCORE, "INCR", 1, url); } public async hasSeenFeed(url: string): Promise { @@ -239,4 +242,21 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme } return guids.filter((_guid, index) => res[index][1] !== null); } + + public async resetFeedQueryCount() { + const scores: {[key: string]: number} = {}; + const data = await this.redis.zrangebyscore(FEED_QUERY_SCORE, "-inf", "+inf", "WITHSCORES"); + for (let index = 0; index < data.length; index += 2) { + const key = data[index]; + const value = parseInt(data[index+1]); + scores[key] = value; + } + const lastQueryTime = parseInt((await this.redis.get(FEED_QUERY_SCORE_RESET_TIME)) ?? "0"); + await this.redis.del(FEED_QUERY_SCORE); + await this.redis.set(FEED_QUERY_SCORE_RESET_TIME, Date.now()); + return { + scores, + lastQueryTime, + }; + } } diff --git a/src/Stores/StorageProvider.ts b/src/Stores/StorageProvider.ts index 50175d75..2451ccfb 100644 --- a/src/Stores/StorageProvider.ts +++ b/src/Stores/StorageProvider.ts @@ -28,4 +28,8 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto storeFeedGuids(url: string, ...guids: string[]): Promise; hasSeenFeed(url: string): Promise; hasSeenFeedGuids(url: string, ...guids: string[]): Promise; + resetFeedQueryCount(): Promise<{ + scores: Record, + lastQueryTime: number + }>; } \ No newline at end of file diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 699598ad..937d5ac6 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -83,14 +83,34 @@ export class FeedReader { private connections: FeedConnection[]; - private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS); + /** + * Poll [1] every time. + * Poll [2] every 4th time. + * Poll [3] every 8th time. + * Poll [4] every 16th time. + * + * Every N hours, move between buckets. + */ + + private readonly intakeBucket = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS); + + // TODO: Make this configurable. + private readonly feedBuckets = new Map([ + [0, this.intakeBucket], // < feedInterval + [4, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // < 4xfeedInvterval + [8, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // < 8xfeedInvterval + [16, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // rest + ]); + private readonly lastBucket = 16; + private bucketIteration = 0; + // A set of last modified times for each url. - private cacheTimes: Map = new Map(); + private readonly cacheTimes: Map = new Map(); // Reason failures to url map. - private feedsFailingHttp = new Set(); - private feedsFailingParsing = new Set(); + private readonly feedsFailingHttp = new Set(); + private readonly feedsFailingParsing = new Set(); static readonly seenEntriesEventType = "uk.half-shot.matrix-hookshot.feed.reader.seenEntries"; @@ -98,10 +118,14 @@ export class FeedReader { private readonly timeouts: (NodeJS.Timeout|undefined)[]; private readonly feedsToRetain = new Set(); + get feedCount() { + return Object.values(this.feedBuckets).map(b => b.length()).reduce((a,b) => a+b, 0); + } + get sleepingInterval() { return ( // Calculate the number of MS to wait in between feeds. - (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1) + (this.config.pollIntervalSeconds * 1000) / (this.feedCount || 1) // And multiply by the number of concurrent readers ) * this.config.pollConcurrency; } @@ -125,7 +149,7 @@ export class FeedReader { const normalisedUrl = normalizeUrl(newConnection.feedUrl); if (!feeds.has(normalisedUrl)) { log.info(`Connection added, adding "${normalisedUrl}" to queue`); - this.feedQueue.push(normalisedUrl); + this.intakeBucket.push(normalisedUrl); feeds.add(normalisedUrl); Metrics.feedsCount.inc(); Metrics.feedsCountDeprecated.inc(); @@ -151,7 +175,7 @@ export class FeedReader { } log.info(`Connection removed, removing "${normalisedUrl}" from queue`); this.feedsToRetain.delete(normalisedUrl); - this.feedQueue.remove(normalisedUrl); + Object.values(this.feedBuckets).some(bucket => bucket.remove(normalisedUrl)); feeds.delete(normalisedUrl); this.feedsFailingHttp.delete(normalisedUrl); this.feedsFailingParsing.delete(normalisedUrl); @@ -164,6 +188,61 @@ export class FeedReader { for (let i = 0; i < config.pollConcurrency; i++) { void this.pollFeeds(i); } + + this.configureBucketInterval(); + } + + /** + * Resort feeds into the correct queues based on how + * often they return new values. + */ + public configureBucketInterval() { + // TOOD: Run this function on startup to immediate sort into buckets. + const bucketEntries = [...this.feedBuckets.entries()]; + setInterval(async () => { + const { scores, lastQueryTime } = await this.storage.resetFeedQueryCount(); + if (lastQueryTime === 0) { + log.debug(`Skipping first bucket interval check, not enough data present.`); + // Skip, not enough data. + return; + } + const timePassed = Date.now() - lastQueryTime; + // Determine a reasonable approximation of how much the feed may have been polled in the + // last time period. + const maximumFastestPotential = timePassed / this.sleepingInterval; + // TODO: This should be all feeds, so we can catch any that don't even score... + for (const [url, pollCount] of Object.entries(scores)) { + // Remove current bucket + const feedCurrentBucket = bucketEntries.find((b) => b[1].remove(url)); + if (!feedCurrentBucket) { + log.warn(`Feed ${url} was not found in any buckets!!!`); + continue; + } + // E.g. Bucket 1 polls only 4 times as often as bucket 0, so we want to ensure we weight these fairly. + const maximumPotential = maximumFastestPotential * (1 / (feedCurrentBucket[0] || 1)); + log.debug(`Determining new bucket for ${url}. Maximum possible poll rate would be ${maximumPotential}. Score was ${pollCount}`); + + // Determine new bucket + for (let bIndex = 0; bIndex < bucketEntries.length; bIndex++) { + const currentBucket = bucketEntries[bIndex]; + const nextBucket = bucketEntries[bIndex+1]; + if (!nextBucket) { + // We're at the end, just push to the last bucket because this is a sad feed that didn't + // get any items. + currentBucket[1].push(url); + break; + } + // ((16 - (4 / 16)) / 16)*50 + const nextBucketMinSuccessRate = (1 - (nextBucket[0] / this.lastBucket))*maximumPotential; + log.debug(`Min success rate target for bucket ${nextBucket[0]} is ${nextBucketMinSuccessRate}`); + if (pollCount >= nextBucketMinSuccessRate) { + log.debug(`Adding feed ${url} to ${currentBucket[0]}`); + currentBucket[1].push(url); + break; + } + } + } + }, 30000); } public stop() { @@ -185,7 +264,7 @@ export class FeedReader { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - this.feedQueue.populate([...observedFeedUrls]); + this.intakeBucket.populate([...observedFeedUrls]); Metrics.feedsCount.set(observedFeedUrls.size); Metrics.feedsCountDeprecated.set(observedFeedUrls.size); return observedFeedUrls; @@ -199,7 +278,7 @@ export class FeedReader { * @param url The URL to be polled. * @returns A boolean that returns if we saw any changes on the feed since the last poll time. */ - public async pollFeed(url: string): Promise { + public async pollFeed(url: string, feedQueue: QueueWithBackoff): Promise { // If a feed is deleted while it is being polled, we need // to remember NOT to add it back to the queue. This // set keeps track of all the feeds that *should* be @@ -236,6 +315,7 @@ export class FeedReader { // If undefined, we got a not-modified. log.debug(`Found ${feed.items.length} entries in ${url}`); const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!)) + log.debug(`Seen ${seenItems.join(', ')} entries for ${url} already`); for (const item of feed.items) { // Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage. if (!item.hashId) { @@ -280,7 +360,7 @@ export class FeedReader { this.feedsFailingParsing.delete(url); if (this.feedsToRetain.has(url)) { // If we've removed this feed since processing it, do not requeue. - this.feedQueue.push(url); + feedQueue.push(url); } } catch (err: unknown) { // TODO: Proper Rust Type error. @@ -289,7 +369,7 @@ export class FeedReader { } else { this.feedsFailingParsing.add(url); } - const backoffDuration = this.feedQueue.backoff(url); + const backoffDuration = feedQueue.backoff(url); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); @@ -309,29 +389,44 @@ export class FeedReader { Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size); - log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`); + log.debug(`Checking for updates in ${this.feedCount} RSS/Atom feeds (worker: ${workerId})`); - const fetchingStarted = Date.now(); + // TODO: Dehardcode + if (this.bucketIteration > this.lastBucket) { + this.bucketIteration = 0; + } + this.bucketIteration++; + const queuesToPoll = [...this.feedBuckets.entries()].filter(([bucketInterval]) => bucketInterval % this.bucketIteration === 0); - const url = this.feedQueue.pop(); - let sleepFor = this.sleepingInterval; + const fetchingStarted = Date.now(); - if (url) { - if (await this.pollFeed(url)) { - log.debug(`Feed changed and will be saved`); - } - const elapsed = Date.now() - fetchingStarted; - Metrics.feedFetchMs.set(elapsed); - Metrics.feedsFetchMsDeprecated.set(elapsed); - sleepFor = Math.max(this.sleepingInterval - elapsed, 0); - log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`); - - if (elapsed > this.sleepingInterval) { - log.warn(`It took us longer to update the feeds than the configured pool interval`); + let sleepFor = 0; + for (const [queueNumber, queue] of queuesToPoll) { + log.debug(`Fetching from queue ${queueNumber}`); + const url = queue.pop(); + if (url) { + if (await this.pollFeed(url, queue)) { + log.debug(`Feed changed and will be saved`); + } + const elapsed = Date.now() - fetchingStarted; + log.debug(`Feed fetching took ${elapsed / 1000}s,`); + Metrics.feedFetchMs.set(elapsed); + Metrics.feedsFetchMsDeprecated.set(elapsed); + sleepFor += Math.max(this.sleepingInterval - elapsed, 0); + } else { + // It is possible that we have more workers than feeds. This will cause the worker to just sleep. + log.debug(`No feeds available to poll for worker ${workerId} on this bucket`); + // So we don't tightloop + if (sleepFor === 0) { + sleepFor += this.sleepingInterval; + } } + } + + if (this.sleepingInterval === 0) { + log.warn(`It took us longer to update the feeds than the configured pool interval`); } else { - // It is possible that we have more workers than feeds. This will cause the worker to just sleep. - log.debug(`No feeds available to poll for worker ${workerId}`); + log.debug(`Sleeping for ${sleepFor / 1000}s`); } this.timeouts[workerId] = setTimeout(() => {