Skip to content

Commit

Permalink
Store validators in IndexedDB and serve StakingService from the exten…
Browse files Browse the repository at this point in the history
…sion (#700)

* Create a StakingQuerier

* Store validator infos in IndexedDB

* Fix bug with how new epochs were created; save updated validator infos during epoch transition

* Keep latest known block height up to date

* Remove unneeded package

* Tweak comment

* Fix dependency

* Fix typo

* Add new IDB method to iterate validator infos

* Implement StakingService in the extension

* Add comment

* Add comment

* Format

* Add brackets

* Add comment
  • Loading branch information
jessepinho authored Mar 7, 2024
1 parent 873303d commit 3754c70
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 13 deletions.
2 changes: 1 addition & 1 deletion apps/extension/.env
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

PRAX=lkpmkhpnhknhmibgnmmhdhgdilepfghe
IDB_VERSION=26
IDB_VERSION=27
USDC_ASSET_ID="reum7wQmk/owgvGMWMZn/6RFPV24zIKq3W6In/WwZgg="
MINIFRONT_URL=https://app.testnet.penumbra.zone/
PENUMBRA_NODE_PD_URL=https://grpc.testnet.penumbra.zone/
9 changes: 5 additions & 4 deletions apps/extension/src/impls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,26 @@ import {
import { QueryService as GovernanceService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/core/component/governance/v1/governance_connect';
import { QueryService as SctService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/core/component/sct/v1/sct_connect';
import { QueryService as ShieldedPoolService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/core/component/shielded_pool/v1/shielded_pool_connect';
import { QueryService as StakeService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/core/component/stake/v1/stake_connect';
import { QueryService as StakingService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/core/component/stake/v1/stake_connect';
import { TendermintProxyService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/util/tendermint_proxy/v1/tendermint_proxy_connect';
import { CustodyService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/custody/v1/custody_connect';
import { ViewService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/view/v1/view_connect';
import { custodyImpl } from '@penumbra-zone/router/src/grpc/custody';
import { sctImpl } from '@penumbra-zone/router/src/grpc/sct';
import { stakingImpl } from '@penumbra-zone/router/src/grpc/staking';
import { viewImpl } from '@penumbra-zone/router/src/grpc/view-protocol-server';

import { localExtStorage } from '@penumbra-zone/storage';
const grpcEndpoint = await localExtStorage.get('grpcEndpoint');

const penumbraProxies = [
IbcProxy,

AppService,
CompactBlockService,
DexService,
DexSimulationService,
GovernanceService,
IbcProxy,
ShieldedPoolService,
StakeService,
TendermintProxyService,
].map(
serviceType =>
Expand All @@ -50,6 +49,8 @@ export const rpcImpls = [
// @ts-expect-error TODO: accept partial impl
[CustodyService, rethrowImplErrors(CustodyService, custodyImpl)],
[SctService, rethrowImplErrors(SctService, sctImpl)],
// @ts-expect-error TODO: accept partial impl
[StakingService, rethrowImplErrors(StakingService, stakingImpl)],
[ViewService, rethrowImplErrors(ViewService, viewImpl)],
// rpc remote proxies
...penumbraProxies,
Expand Down
61 changes: 53 additions & 8 deletions packages/query/src/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,15 @@ export class BlockProcessor implements BlockProcessorInterface {
private async syncAndStore() {
const fullSyncHeight = await this.indexedDb.getFullSyncHeight();
const startHeight = fullSyncHeight ? fullSyncHeight + 1n : 0n;
const latestBlockHeight = await this.querier.tendermint.latestBlockHeight();
let latestKnownBlockHeight = await this.querier.tendermint.latestBlockHeight();

let nextEpochStartHeight: bigint | undefined = startHeight === 0n ? 0n : undefined;
// In the `for` loop below, we only update validator infos once we've
// reached the latest known epoch. This means that, if a user is syncing for
// the first time, they could experience a broken UI until the latest known
// epoch is reached, since they may have delegation tokens but no validator
// info to go with them. So we'll update validator infos at the beginning of
// sync as well, and force the rest of sync to wait until it's done.
if (startHeight === 0n) await this.updateValidatorInfos();

// this is an indefinite stream of the (compact) chain from the network
// intended to run continuously
Expand All @@ -142,10 +148,6 @@ export class BlockProcessor implements BlockProcessorInterface {
keepAlive: true,
abortSignal: this.abortController.signal,
})) {
if (nextEpochStartHeight === compactBlock.height) {
await this.indexedDb.addEpoch(nextEpochStartHeight);
nextEpochStartHeight = undefined;
}
if (compactBlock.appParametersUpdated) {
await this.indexedDb.saveAppParams(await this.querier.app.appParams());
}
Expand All @@ -169,7 +171,7 @@ export class BlockProcessor implements BlockProcessorInterface {
const flushReasons = {
scannerWantsFlush,
interval: compactBlock.height % 1000n === 0n,
new: compactBlock.height > latestBlockHeight,
new: compactBlock.height > latestKnownBlockHeight,
};

const recordsByCommitment = new Map<StateCommitment, SpendableNoteRecord | SwapRecord>();
Expand Down Expand Up @@ -236,8 +238,19 @@ export class BlockProcessor implements BlockProcessorInterface {
await this.saveTransactionInfos(compactBlock.height, relevantTx);
}

// We only query Tendermint for the latest known block height once, when
// the block processor starts running. Once we're caught up, though, the
// chain will of course continue adding blocks, and we'll keep processing
// them. So, we need to update `latestKnownBlockHeight` once we've passed
// it.
if (compactBlock.height > latestKnownBlockHeight) {
latestKnownBlockHeight = compactBlock.height;
}

const isLastBlockOfEpoch = !!compactBlock.epochRoot;
if (isLastBlockOfEpoch) nextEpochStartHeight = compactBlock.height + 1n;
if (isLastBlockOfEpoch) {
void this.handleEpochTransition(compactBlock.height, latestKnownBlockHeight);
}
}
}

Expand Down Expand Up @@ -365,4 +378,36 @@ export class BlockProcessor implements BlockProcessorInterface {
);
}
}

private async handleEpochTransition(
endHeightOfPreviousEpoch: bigint,
latestKnownBlockHeight: bigint,
): Promise<void> {
const { sctParams } = await this.querier.app.appParams();
const nextEpochStartHeight = endHeightOfPreviousEpoch + 1n;

await this.indexedDb.addEpoch(nextEpochStartHeight);

const nextEpochIsLatestKnownEpoch =
sctParams && latestKnownBlockHeight - nextEpochStartHeight < sctParams.epochDuration;

// If we're doing a full sync from block 0, there could be hundreds or even
// thousands of epoch transitions in the chain already. If we update
// validator infos on every epoch transition, we'd be making tons of
// unnecessary calls to the RPC node for validator infos. Instead, we'll
// only get updated validator infos once we're within the latest known
// epoch.
if (nextEpochIsLatestKnownEpoch) void this.updateValidatorInfos();
}

private async updateValidatorInfos(): Promise<void> {
for await (const validatorInfoResponse of this.querier.staking.allValidatorInfos()) {
if (!validatorInfoResponse.validatorInfo) continue;

// Await the upsert. This makes it possible for users of this method to
// await the entire method, if they want to block all other code until all
// validator infos have been upserted.
await this.indexedDb.upsertValidatorInfo(validatorInfoResponse.validatorInfo);
}
}
}
22 changes: 22 additions & 0 deletions packages/query/src/queriers/staking.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { PromiseClient } from '@connectrpc/connect';
import { createClient } from './utils';
import { StakingQuerierInterface } from '@penumbra-zone/types';
import { QueryService as StakingService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/core/component/stake/v1/stake_connect';
import { ValidatorInfoResponse } from '@buf/penumbra-zone_penumbra.bufbuild_es/penumbra/core/component/stake/v1/stake_pb';

export class StakingQuerier implements StakingQuerierInterface {
private readonly client: PromiseClient<typeof StakingService>;

constructor({ grpcEndpoint }: { grpcEndpoint: string }) {
this.client = createClient(grpcEndpoint, StakingService);
}

allValidatorInfos(): AsyncIterable<ValidatorInfoResponse> {
/**
* Include inactive validators when saving to our local database, since we
* serve the `ValidatorInfo` RPC method from the extension, and may receive
* requests for inactive validators.
*/
return this.client.validatorInfo({ showInactive: true });
}
}
3 changes: 3 additions & 0 deletions packages/query/src/root-querier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ShieldedPoolQuerier } from './queriers/shielded-pool';
import { RootQuerierInterface } from '@penumbra-zone/types';
import { IbcClientQuerier } from './queriers/ibc-client';
import { CnidariumQuerier } from './queriers/cnidarium';
import { StakingQuerier } from './queriers/staking';

// Given the amount of query services, this root querier aggregates them all
// to make it easier for consumers
Expand All @@ -14,6 +15,7 @@ export class RootQuerier implements RootQuerierInterface {
readonly tendermint: TendermintQuerier;
readonly shieldedPool: ShieldedPoolQuerier;
readonly ibcClient: IbcClientQuerier;
readonly staking: StakingQuerier;
readonly cnidarium: CnidariumQuerier;

constructor({ grpcEndpoint }: { grpcEndpoint: string }) {
Expand All @@ -22,6 +24,7 @@ export class RootQuerier implements RootQuerierInterface {
this.tendermint = new TendermintQuerier({ grpcEndpoint });
this.shieldedPool = new ShieldedPoolQuerier({ grpcEndpoint });
this.ibcClient = new IbcClientQuerier({ grpcEndpoint });
this.staking = new StakingQuerier({ grpcEndpoint });
this.cnidarium = new CnidariumQuerier({ grpcEndpoint });
}
}
7 changes: 7 additions & 0 deletions packages/router/src/grpc/staking/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { QueryService as StakingService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/core/component/stake/v1/stake_connect';
import { ServiceImpl } from '@connectrpc/connect';
import { validatorInfo } from './validator-info';

export type Impl = ServiceImpl<typeof StakingService>;

export const stakingImpl: Pick<Impl, 'validatorInfo'> = { validatorInfo };
89 changes: 89 additions & 0 deletions packages/router/src/grpc/staking/validator-info.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { validatorInfo } from './validator-info';
import { IndexedDbMock, MockServices } from '../test-utils';
import { HandlerContext, createContextValues, createHandlerContext } from '@connectrpc/connect';
import { QueryService as StakingService } from '@buf/penumbra-zone_penumbra.connectrpc_es/penumbra/core/component/stake/v1/stake_connect';
import { ServicesInterface } from '@penumbra-zone/types';
import { servicesCtx } from '../../ctx';
import {
ValidatorInfoRequest,
ValidatorInfoResponse,
ValidatorState_ValidatorStateEnum,
} from '@buf/penumbra-zone_penumbra.bufbuild_es/penumbra/core/component/stake/v1/stake_pb';
import { PartialMessage } from '@bufbuild/protobuf';

describe('ValidatorInfo request handler', () => {
let mockServices: MockServices;
let mockIndexedDb: IndexedDbMock;
let mockCtx: HandlerContext;
const mockValidatorInfoResponse1 = new ValidatorInfoResponse({
validatorInfo: {
validator: { name: 'Validator 1' },
status: { state: { state: ValidatorState_ValidatorStateEnum.ACTIVE } },
},
});
const mockValidatorInfoResponse2 = new ValidatorInfoResponse({
validatorInfo: {
validator: { name: 'Validator 2' },
status: { state: { state: ValidatorState_ValidatorStateEnum.INACTIVE } },
},
});

beforeEach(() => {
vi.resetAllMocks();

const mockIterateValidatorInfos = {
next: vi.fn(),
[Symbol.asyncIterator]: () => mockIterateValidatorInfos,
};
mockIterateValidatorInfos.next.mockResolvedValueOnce({
value: mockValidatorInfoResponse1.validatorInfo,
});
mockIterateValidatorInfos.next.mockResolvedValueOnce({
value: mockValidatorInfoResponse2.validatorInfo,
});
mockIterateValidatorInfos.next.mockResolvedValueOnce({ done: true });

mockIndexedDb = {
iterateValidatorInfos: () => mockIterateValidatorInfos,
};
mockServices = {
getWalletServices: vi.fn(() =>
Promise.resolve({ indexedDb: mockIndexedDb }),
) as MockServices['getWalletServices'],
};
mockCtx = createHandlerContext({
service: StakingService,
method: StakingService.methods.validatorInfo,
protocolName: 'mock',
requestMethod: 'MOCK',
url: '/mock',
contextValues: createContextValues().set(
servicesCtx,
mockServices as unknown as ServicesInterface,
),
});
});

it('streams `ValidatorInfoResponse`s from the results of the database query', async () => {
const req = new ValidatorInfoRequest({ showInactive: true });

const results: (ValidatorInfoResponse | PartialMessage<ValidatorInfoResponse>)[] = [];
for await (const result of validatorInfo(req, mockCtx)) {
results.push(result);
}

expect(results).toEqual([mockValidatorInfoResponse1, mockValidatorInfoResponse2]);
});

it('does not include inactive validators by default', async () => {
const req = new ValidatorInfoRequest();

const results: (ValidatorInfoResponse | PartialMessage<ValidatorInfoResponse>)[] = [];
for await (const result of validatorInfo(req, mockCtx)) {
results.push(result);
}

expect(results).toEqual([mockValidatorInfoResponse1]);
});
});
22 changes: 22 additions & 0 deletions packages/router/src/grpc/staking/validator-info.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { getStateEnumFromValidatorInfo } from '@penumbra-zone/getters';
import { Impl } from '.';
import { servicesCtx } from '../../ctx';
import {
ValidatorInfoResponse,
ValidatorState_ValidatorStateEnum,
} from '@buf/penumbra-zone_penumbra.bufbuild_es/penumbra/core/component/stake/v1/stake_pb';

export const validatorInfo: Impl['validatorInfo'] = async function* (req, ctx) {
const services = ctx.values.get(servicesCtx);
const { indexedDb } = await services.getWalletServices();

for await (const validatorInfo of indexedDb.iterateValidatorInfos()) {
if (
!req.showInactive &&
getStateEnumFromValidatorInfo(validatorInfo) === ValidatorState_ValidatorStateEnum.INACTIVE
)
continue;

yield new ValidatorInfoResponse({ validatorInfo });
}
};
1 change: 1 addition & 0 deletions packages/router/src/grpc/test-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface IndexedDbMock {
iterateSpendableNotes?: () => Partial<AsyncIterable<Mock>>;
iterateSwaps?: () => Partial<AsyncIterable<Mock>>;
iterateTransactionInfo?: () => Partial<AsyncIterable<Mock>>;
iterateValidatorInfos?: () => Partial<AsyncIterable<Mock>>;
subscribe?: (table: string) => Partial<AsyncIterable<Mock>>;
getSwapByCommitment?: Mock;
getEpochByHeight?: Mock;
Expand Down
1 change: 1 addition & 0 deletions packages/storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"dependencies": {
"@penumbra-zone/constants": "workspace:*",
"@penumbra-zone/crypto-web": "workspace:*",
"@penumbra-zone/getters": "workspace:*",
"idb": "^8.0.0"
},
"devDependencies": {
Expand Down
27 changes: 27 additions & 0 deletions packages/storage/src/indexed-db/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IDBPDatabase, openDB, StoreNames } from 'idb';
import {
bech32IdentityKey,
bech32ToUint8Array,
IDB_TABLES,
IdbConstants,
Expand Down Expand Up @@ -47,6 +48,8 @@ import { AppParameters } from '@buf/penumbra-zone_penumbra.bufbuild_es/penumbra/
import { IdbCursorSource } from './stream';

import '@penumbra-zone/polyfills/ReadableStream[Symbol.asyncIterator]';
import { ValidatorInfo } from '@buf/penumbra-zone_penumbra.bufbuild_es/penumbra/core/component/stake/v1/stake_pb';
import { getIdentityKeyFromValidatorInfo } from '@penumbra-zone/getters';

interface IndexedDbProps {
dbVersion: number; // Incremented during schema changes
Expand Down Expand Up @@ -94,6 +97,7 @@ export class IndexedDb implements IndexedDbInterface {
db.createObjectStore('GAS_PRICES');
db.createObjectStore('POSITIONS', { keyPath: 'id.inner' });
db.createObjectStore('EPOCHS', { autoIncrement: true });
db.createObjectStore('VALIDATOR_INFOS');
},
});
const constants = {
Expand Down Expand Up @@ -469,6 +473,29 @@ export class IndexedDb implements IndexedDbInterface {
return epoch;
}

/**
* Inserts the validator info into the database, or updates an existing
* validator info if one with the same identity key exists.
*/
async upsertValidatorInfo(validatorInfo: ValidatorInfo): Promise<void> {
const identityKeyAsBech32 = bech32IdentityKey(getIdentityKeyFromValidatorInfo(validatorInfo));

await this.u.update({
table: 'VALIDATOR_INFOS',
key: identityKeyAsBech32,
value: validatorInfo.toJson() as Jsonified<ValidatorInfo>,
});
}

/**
* Iterates over all validator infos in the database.
*/
async *iterateValidatorInfos() {
yield* new ReadableStream(
new IdbCursorSource(this.db.transaction('VALIDATOR_INFOS').store.openCursor(), ValidatorInfo),
);
}

private addSctUpdates(txs: IbdUpdates, sctUpdates: ScanBlockResult['sctUpdates']): void {
if (sctUpdates.set_position) {
txs.add({
Expand Down
Loading

0 comments on commit 3754c70

Please sign in to comment.