Skip to content

Commit

Permalink
fix(wallet): one ledger-tip req per pollInterval
Browse files Browse the repository at this point in the history
WHAT: replaced `delay` operator with `auditTime` operator, which
starts a timer once the first isSettled$ emission, and emits the last
value, ignoring previous ones.
WHY: isSettled$ observable emissions triggered ledger-tip
requests, delayed by pollingInterval.
Flapping isSettled$ caused multiple requests to be done,
because the flapping was delayed.
Throttling is in place for the case where the ledger-tip request is
not resolved yet.
  • Loading branch information
mirceahasegan committed Oct 18, 2024
1 parent b760734 commit b272ea9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
10 changes: 6 additions & 4 deletions packages/wallet/src/services/TipTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import {
EMPTY,
Observable,
Subject,
auditTime,
combineLatest,
concat,
delay,
distinctUntilChanged,
exhaustMap,
filter,
Expand Down Expand Up @@ -56,17 +56,19 @@ export class TipTracker extends PersistentDocumentTrackerSubject<Cardano.Tip> {
// - if it's not settled for maxPollInterval
combineLatest([
triggerOrInterval$(syncStatus.isSettled$.pipe(filter((isSettled) => isSettled)), maxPollInterval).pipe(
// trigger fetch after some delay once fully synced and online
delay(minPollInterval),
// Do not allow more than one fetch per minPollInterval.
// The first syncStatus starts a minPollInterval timer. Only the latest emission from the minPollInterval
// is emitted in case the syncStatus changes multiple times.
auditTime(minPollInterval),
// trigger fetch on start
startWith(null)
),
connectionStatus$
]).pipe(
// Throttle syncing by interval, cancel ongoing request on external trigger
tap(([, connectionStatus]) => {
logger.debug(connectionStatus === ConnectionStatus.down ? 'Skipping fetch tip' : 'Fetching tip...');
}),
// Throttle syncing by interval, cancel ongoing request on external trigger
exhaustMap(([, connectionStatus]) =>
connectionStatus === ConnectionStatus.down
? EMPTY
Expand Down
48 changes: 46 additions & 2 deletions packages/wallet/test/services/TipTracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Cardano } from '@cardano-sdk/core';
import { ConnectionStatus, TipTracker } from '../../src/services';
import { InMemoryDocumentStore } from '../../src/persistence';
import { Milliseconds, SyncStatus } from '../../src';
import { Observable, firstValueFrom, of } from 'rxjs';
import { NEVER, Observable, firstValueFrom, of, take, takeUntil, timer } from 'rxjs';
import { createStubObservable, createTestScheduler } from '@cardano-sdk/util-dev';
import { dummyLogger } from 'ts-log';

Expand All @@ -15,6 +15,8 @@ const mockTips = {
y: { hash: 'hy' }
} as unknown as Record<string, Cardano.Tip>;

const trueFalse = { f: false, t: true };

describe('TipTracker', () => {
const pollInterval: Milliseconds = 1; // delays emission after trigger
let store: InMemoryDocumentStore<Cardano.Tip>;
Expand All @@ -26,6 +28,48 @@ describe('TipTracker', () => {
connectionStatus$ = of(ConnectionStatus.up);
});

it('calls the provider as soon as subscribed', () => {
createTestScheduler().run(({ cold, expectObservable }) => {
const provider$ = createStubObservable<Cardano.Tip>(cold('a|', mockTips));
const tracker$ = new TipTracker({
connectionStatus$,
logger,
maxPollInterval: Number.MAX_VALUE,
minPollInterval: pollInterval,
provider$,
store,
syncStatus: { isSettled$: NEVER } as unknown as SyncStatus
});
expectObservable(tracker$.asObservable().pipe(take(1))).toBe('(a|)', mockTips);
});
});

it('LW-11686 ignores multiple syncStatus emissions during pollInterval', () => {
const poll: Milliseconds = 3;
const sync = '-ttt-t----|';
const tipT = 'a---b---c-|';
// a-b--c-d|
createTestScheduler().run(({ cold, expectObservable }) => {
const syncStatus: Partial<SyncStatus> = { isSettled$: cold(sync, trueFalse) };
const provider$ = createStubObservable<Cardano.Tip>(
cold('(a|)', mockTips),
cold('(b|)', mockTips),
cold('(c|)', mockTips),
cold('(d|)', mockTips)
);
const tracker$ = new TipTracker({
connectionStatus$,
logger,
maxPollInterval: Number.MAX_VALUE,
minPollInterval: poll,
provider$,
store,
syncStatus: syncStatus as SyncStatus
});
expectObservable(tracker$.asObservable().pipe(takeUntil(timer(10)))).toBe(tipT, mockTips);
});
});

it('calls the provider immediately, only emitting distinct values, with throttling', () => {
createTestScheduler().run(({ cold, expectObservable }) => {
const syncStatus: Partial<SyncStatus> = { isSettled$: cold('---a---bc--d|') };
Expand All @@ -48,7 +92,7 @@ describe('TipTracker', () => {
});
});

it('starting offline, then coming online should subscribe to provider immediatelly for initial fetch', () => {
it('starting offline, then coming online should subscribe to provider immediately for initial fetch', () => {
createTestScheduler().run(({ cold, hot, expectObservable, expectSubscriptions }) => {
const connectionStatusOffOn$ = hot('d--u----|', {
d: ConnectionStatus.down,
Expand Down

0 comments on commit b272ea9

Please sign in to comment.