Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): move amount collection in ILP connector #2626

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/backend/src/accounting/psql/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { TransactionOrKnex } from 'objection'
import { v4 as uuid } from 'uuid'
import { Asset } from '../../asset/model'
import { BaseService } from '../../shared/baseService'
import { TelemetryService } from '../../telemetry/service'
import { isTransferError, TransferError } from '../errors'
import {
AccountingService,
Expand Down Expand Up @@ -36,7 +35,6 @@ import {
import { LedgerTransfer, LedgerTransferType } from './ledger-transfer/model'

export interface ServiceDependencies extends BaseService {
telemetry?: TelemetryService
knex: TransactionOrKnex
withdrawalThrottleDelay?: number
}
Expand Down
21 changes: 2 additions & 19 deletions packages/backend/src/accounting/service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { TransactionOrKnex } from 'objection'
import { BaseService } from '../shared/baseService'
import { TelemetryService } from '../telemetry/service'
import { collectTelemetryAmount } from '../telemetry/transaction-amount'
import { TransferError, isTransferError } from './errors'

export enum LiquidityAccountType {
Expand Down Expand Up @@ -93,7 +91,6 @@ export interface TransferToCreate {
}

export interface BaseAccountingServiceDependencies extends BaseService {
telemetry?: TelemetryService
withdrawalThrottleDelay?: number
}

Expand Down Expand Up @@ -121,7 +118,7 @@ export async function createAccountToAccountTransfer(
transferArgs
} = args

const { withdrawalThrottleDelay, telemetry, logger } = deps
const { withdrawalThrottleDelay } = deps

const { sourceAccount, destinationAccount, sourceAmount, destinationAmount } =
transferArgs
Expand Down Expand Up @@ -196,22 +193,8 @@ export async function createAccountToAccountTransfer(
withdrawalThrottleDelay
})
}

if (
destinationAccount.onDebit &&
telemetry &&
sourceAccount.asset.code &&
sourceAccount.asset.scale
) {
collectTelemetryAmount(telemetry, logger, {
amount: sourceAmount,
asset: {
code: sourceAccount.asset.code,
scale: sourceAccount.asset.scale
}
})
}
},

void: async (): Promise<void | TransferError> => {
const error = await voidTransfers(pendingTransferIdsOrError)

Expand Down
2 changes: 0 additions & 2 deletions packages/backend/src/accounting/tigerbeetle/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { v4 as uuid } from 'uuid'

import { BaseService } from '../../shared/baseService'
import { validateId } from '../../shared/utils'
import { TelemetryService } from '../../telemetry/service'
import {
AccountAlreadyExistsError,
BalanceTransferError,
Expand Down Expand Up @@ -49,7 +48,6 @@ export const convertToTigerbeetleAccountCode: {
}

export interface ServiceDependencies extends BaseService {
telemetry?: TelemetryService
tigerbeetle: Client
withdrawalThrottleDelay?: number
}
Expand Down
14 changes: 6 additions & 8 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,6 @@ export function initIocContainer(
const knex = await deps.use('knex')
const config = await deps.use('config')

let telemetry: TelemetryService | undefined
if (config.enableTelemetry && config.openTelemetryCollectors.length > 0) {
telemetry = await deps.use('telemetry')
}

if (config.useTigerbeetle) {
container.singleton('tigerbeetle', async (deps) => {
const config = await deps.use('config')
Expand All @@ -226,7 +221,6 @@ export function initIocContainer(

return createTigerbeetleAccountingService({
logger,
telemetry,
knex,
tigerbeetle,
withdrawalThrottleDelay: config.withdrawalThrottleDelay
Expand All @@ -235,7 +229,6 @@ export function initIocContainer(

return createPsqlAccountingService({
logger,
telemetry,
knex,
withdrawalThrottleDelay: config.withdrawalThrottleDelay
})
Expand Down Expand Up @@ -362,6 +355,10 @@ export function initIocContainer(

container.singleton('connectorApp', async (deps) => {
const config = await deps.use('config')
let telemetry: TelemetryService | undefined
if (config.enableTelemetry) {
telemetry = await deps.use('telemetry')
}
return await createConnectorService({
logger: await deps.use('logger'),
redis: await deps.use('redis'),
Expand All @@ -371,7 +368,8 @@ export function initIocContainer(
peerService: await deps.use('peerService'),
ratesService: await deps.use('ratesService'),
streamServer: await deps.use('streamServer'),
ilpAddress: config.ilpAddress
ilpAddress: config.ilpAddress,
telemetry
})
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import { StreamServer } from '@interledger/stream-receiver'
import { RafikiServices } from '../rafiki'
import { MockAccountingService } from '../test/mocks/accounting-service'
import { TestLoggerFactory } from './test-logger'
import { MockTelemetryService } from '../../../../../tests/telemetry'

interface MockRafikiServices extends RafikiServices {
accounting: MockAccountingService
telemetry: MockTelemetryService
}

export const RafikiServicesFactory = Factory.define<MockRafikiServices>(
Expand All @@ -20,6 +22,7 @@ export const RafikiServicesFactory = Factory.define<MockRafikiServices>(
.attr('accounting', () => {
return new MockAccountingService()
})
.attr('telemetry', () => new MockTelemetryService())
.attr('logger', TestLoggerFactory.build())
.attr(
'walletAddresses',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { collectTelemetryAmount } from '../../../../../telemetry/transaction-amount'
import { ILPContext, ILPMiddleware } from '../rafiki'

export function createTelemetryMiddleware(): ILPMiddleware {
return async (
{ request, services, accounts, response }: ILPContext,
next: () => Promise<void>
): Promise<void> => {
await next()
if (
services.telemetry &&
Number(request.prepare.amount) &&
response.fulfill
) {
const { code, scale } = accounts.outgoing.asset
collectTelemetryAmount(services.telemetry, services.logger, {
amount: BigInt(request.prepare.amount),
asset: { code: code, scale: scale }
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
ZeroCopyIlpPrepare,
createIlpPacketMiddleware
} from './middleware/ilp-packet'
import { TelemetryService } from '../../../../telemetry/service'

// Model classes that represent an Interledger sender, receiver, or
// connector SHOULD implement this ConnectorAccount interface.
Expand Down Expand Up @@ -71,6 +72,7 @@ export interface AccountingService {
export interface RafikiServices {
//router: Router
accounting: AccountingService
telemetry?: TelemetryService
walletAddresses: WalletAddressService
logger: Logger
incomingPayments: IncomingPaymentService
Expand Down Expand Up @@ -159,6 +161,9 @@ export class Rafiki<T = any> {
get walletAddresses(): WalletAddressService {
return config.walletAddresses
},
get telemetry(): TelemetryService | undefined {
return config.telemetry
},

logger
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import assert from 'assert'
import { IlpResponse, OutgoingAccount, ZeroCopyIlpPrepare } from '../..'
import { IncomingAccountFactory, RafikiServicesFactory } from '../../factories'
import { createTelemetryMiddleware } from '../../middleware/telemetry'
import { createILPContext } from '../../utils'

import { IlpFulfill } from 'ilp-packet'
import * as telemetry from '../../../../../../telemetry/transaction-amount'

const incomingAccount = IncomingAccountFactory.build({ id: 'alice' })

assert.ok(incomingAccount.id)
const services = RafikiServicesFactory.build({})

const ctx = createILPContext({
services,
request: {
prepare: {
amount: 100n
} as unknown as ZeroCopyIlpPrepare,
rawPrepare: Buffer.from('')
},
accounts: {
incoming: incomingAccount,
outgoing: { asset: { code: 'USD', scale: 2 } } as OutgoingAccount
},
state: {
unfulfillable: false,
incomingAccount: {
quote: 'exists'
}
},
response: {
fulfill: 'exists' as unknown as IlpFulfill
} as IlpResponse
})

jest.mock('../../../../../../telemetry/transaction-amount')
const middleware = createTelemetryMiddleware()
const next = jest.fn().mockImplementation(() => Promise.resolve())

beforeEach(async () => {
incomingAccount.balance = 100n
incomingAccount.asset.scale = 2
incomingAccount.asset.code = 'USD'
})

describe('Telemetry Middleware', function () {
it('does not gather telemetry if telemetry is not enabled (service is undefined)', async () => {
const collectAmountSpy = jest
.spyOn(telemetry, 'collectTelemetryAmount')
.mockImplementation(() => Promise.resolve())

await middleware(
{ ...ctx, services: { ...ctx.services, telemetry: undefined } },
next
)
expect(collectAmountSpy).not.toHaveBeenCalled()
expect(next).toHaveBeenCalled()
})

it('does not gather telemetry if response.fulfill undefined', async () => {
const collectAmountSpy = jest.spyOn(telemetry, 'collectTelemetryAmount')

await middleware(
{ ...ctx, response: { fulfill: undefined } as IlpResponse },
next
)

expect(collectAmountSpy).not.toHaveBeenCalled()
expect(next).toHaveBeenCalled()
})

it('does not gather telemetry if amount is invalid', async () => {
const collectAmountSpy = jest.spyOn(telemetry, 'collectTelemetryAmount')

await middleware(
{
...ctx,
request: {
...ctx.request,
prepare: { amount: '0' } as ZeroCopyIlpPrepare
}
},
next
)

expect(collectAmountSpy).not.toHaveBeenCalled()
expect(next).toHaveBeenCalled()
})

it('gathers telemetry without blocking middleware chain', async () => {
let nextCalled = false
const next = jest.fn().mockImplementation(() => {
nextCalled = true
return Promise.resolve()
})

const collectAmountSpy = jest
.spyOn(telemetry, 'collectTelemetryAmount')
.mockImplementation(() => {
expect(nextCalled).toBe(true)
return Promise.resolve()
})

await middleware(ctx, next)

expect(collectAmountSpy).toHaveBeenCalled()
expect(next).toHaveBeenCalled()
})
})
7 changes: 7 additions & 0 deletions packages/backend/src/payment-method/ilp/connector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ import {
createStreamController
} from './core'

import { TelemetryService } from '../../../telemetry/service'
import { createTelemetryMiddleware } from './core/middleware/telemetry'

interface ServiceDependencies extends BaseService {
redis: Redis
ratesService: RatesService
accountingService: AccountingService
telemetry?: TelemetryService
walletAddressService: WalletAddressService
incomingPaymentService: IncomingPaymentService
peerService: PeerService
Expand All @@ -44,6 +48,7 @@ export async function createConnectorService({
redis,
ratesService,
accountingService,
telemetry,
walletAddressService,
incomingPaymentService,
peerService,
Expand All @@ -57,6 +62,7 @@ export async function createConnectorService({
service: 'ConnectorService'
}),
accounting: accountingService,
telemetry,
walletAddresses: walletAddressService,
incomingPayments: incomingPaymentService,
peers: peerService,
Expand All @@ -76,6 +82,7 @@ export async function createConnectorService({

// Local pay
createBalanceMiddleware(),
createTelemetryMiddleware(),

// Outgoing Rules
createStreamController(),
Expand Down
Loading