Skip to content

Commit

Permalink
websockets transactions events metrics (#1429)
Browse files Browse the repository at this point in the history
* Enhance ApiMetricsService with new transaction and batch update counters.

* Updated WebSocketPublisherController to emit metrics events after processing transactions and batch updates.

* Update web.socket controller spec
  • Loading branch information
cfaur09 authored Jan 8, 2025
1 parent 8135bf8 commit 0937b65
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 1 deletion.
41 changes: 40 additions & 1 deletion src/common/metrics/api.metrics.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { MetricsService } from '@multiversx/sdk-nestjs-monitoring';
import { forwardRef, Inject, Injectable } from "@nestjs/common";
import { OnEvent } from '@nestjs/event-emitter';
import { register, Histogram, Gauge } from 'prom-client';
import { register, Histogram, Gauge, Counter } from 'prom-client';
import { ApiConfigService } from "src/common/api-config/api.config.service";
import { GatewayService } from "../gateway/gateway.service";
import { ProtocolService } from "../protocol/protocol.service";
Expand All @@ -19,6 +19,9 @@ export class ApiMetricsService {
private static lastProcessedNonceGauge: Gauge<string>;
private static lastProcessedBatchProcessorNonce: Gauge<string>;
private static lastProcessedTransactionCompletedProcessorNonce: Gauge<string>;
private static transactionsCompletedCounter: Counter<string>;
private static transactionsPendingResultsCounter: Counter<string>;
private static batchUpdatesCounter: Counter<string>;

constructor(
private readonly apiConfigService: ApiConfigService,
Expand Down Expand Up @@ -105,6 +108,27 @@ export class ApiMetricsService {
labelNames: ['shardId'],
});
}

if (!ApiMetricsService.transactionsCompletedCounter) {
ApiMetricsService.transactionsCompletedCounter = new Counter({
name: 'websocket_transactions_completed_total',
help: 'Total number of completed transactions processed via websocket',
});
}

if (!ApiMetricsService.transactionsPendingResultsCounter) {
ApiMetricsService.transactionsPendingResultsCounter = new Counter({
name: 'websocket_transactions_pending_results_total',
help: 'Total number of transactions with pending results processed via websocket',
});
}

if (!ApiMetricsService.batchUpdatesCounter) {
ApiMetricsService.batchUpdatesCounter = new Counter({
name: 'websocket_batch_updates_total',
help: 'Total number of batch updates processed via websocket',
});
}
}

@OnEvent(MetricsEvents.SetVmQuery)
Expand Down Expand Up @@ -158,6 +182,21 @@ export class ApiMetricsService {
ApiMetricsService.lastProcessedTransactionCompletedProcessorNonce.set({ shardId }, nonce);
}

@OnEvent(MetricsEvents.SetTransactionsCompleted)
recordTransactionsCompleted(payload: { transactions: any[] }) {
ApiMetricsService.transactionsCompletedCounter.inc(payload.transactions.length);
}

@OnEvent(MetricsEvents.SetTransactionsPendingResults)
recordTransactionsPendingResults(payload: { transactions: any[] }) {
ApiMetricsService.transactionsPendingResultsCounter.inc(payload.transactions.length);
}

@OnEvent(MetricsEvents.SetBatchUpdated)
recordBatchUpdated() {
ApiMetricsService.batchUpdatesCounter.inc();
}

async getMetrics(): Promise<string> {
const shardIds = await this.protocolService.getShardIds();
if (this.apiConfigService.getIsTransactionProcessorCronActive()) {
Expand Down
13 changes: 13 additions & 0 deletions src/common/websockets/web-socket-publisher-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,45 @@ import { ShardTransaction } from "@elrondnetwork/transaction-processor";
import { Controller } from "@nestjs/common";
import { EventPattern } from "@nestjs/microservices";
import { WebSocketPublisherService } from "src/common/websockets/web-socket-publisher-service";
import { EventEmitter2 } from "@nestjs/event-emitter";
import { MetricsEvents } from "src/utils/metrics-events.constants";

@Controller()
export class WebSocketPublisherController {
private logger = new OriginLogger(WebSocketPublisherController.name);

constructor(
private readonly webSocketPublisherService: WebSocketPublisherService,
private readonly eventEmitter: EventEmitter2,
) { }

@EventPattern('transactionsCompleted')
async transactionsCompleted(transactions: ShardTransaction[]) {
for (const transaction of transactions) {
await this.webSocketPublisherService.onTransactionCompleted(transaction);
}

this.eventEmitter.emit(MetricsEvents.SetTransactionsCompleted, {
transactions,
});
}

@EventPattern('transactionsPendingResults')
async transactionsPendingResults(transactions: ShardTransaction[]) {
for (const transaction of transactions) {
await this.webSocketPublisherService.onTransactionPendingResults(transaction);
}

this.eventEmitter.emit(MetricsEvents.SetTransactionsPendingResults, {
transactions,
});
}

@EventPattern('onBatchUpdated')
onBatchUpdated(payload: { address: string, batchId: string, txHashes: string[] }) {
this.logger.log(`Notifying batch updated for address ${payload.address}, batch id '${payload.batchId}', hashes ${payload.txHashes} `);
this.webSocketPublisherService.onBatchUpdated(payload.address, payload.batchId, payload.txHashes);

this.eventEmitter.emit(MetricsEvents.SetBatchUpdated);
}
}
13 changes: 13 additions & 0 deletions src/test/unit/controllers/web.socket.publiser.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { ShardTransaction } from "@elrondnetwork/transaction-processor";
import { TestingModule, Test } from "@nestjs/testing";
import { EventEmitter2 } from "@nestjs/event-emitter";
import { WebSocketPublisherController } from "src/common/websockets/web-socket-publisher-controller";
import { WebSocketPublisherService } from "src/common/websockets/web-socket-publisher-service";
import { MetricsEvents } from "src/utils/metrics-events.constants";

describe('WebSocketPublisherController', () => {
let controller: WebSocketPublisherController;
let webSocketPublisherService: WebSocketPublisherService;
let eventEmitter: EventEmitter2;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
Expand All @@ -19,29 +22,39 @@ describe('WebSocketPublisherController', () => {
onBatchUpdated: jest.fn(),
},
},
{
provide: EventEmitter2,
useValue: {
emit: jest.fn(),
},
},
],
}).compile();

controller = module.get<WebSocketPublisherController>(WebSocketPublisherController);
webSocketPublisherService = module.get<WebSocketPublisherService>(WebSocketPublisherService);
eventEmitter = module.get<EventEmitter2>(EventEmitter2);
});

it('should handle transactionsCompleted event', async () => {
const mockTransactions = [{}, {}] as ShardTransaction[];
await controller.transactionsCompleted(mockTransactions);
expect(webSocketPublisherService.onTransactionCompleted).toHaveBeenCalledTimes(mockTransactions.length);
expect(eventEmitter.emit).toHaveBeenCalledWith(MetricsEvents.SetTransactionsCompleted, { transactions: mockTransactions });
});

it('should handle transactionsPendingResults event', async () => {
const mockTransactions = [{}, {}] as ShardTransaction[];
await controller.transactionsPendingResults(mockTransactions);
expect(webSocketPublisherService.onTransactionPendingResults).toHaveBeenCalledTimes(mockTransactions.length);
expect(eventEmitter.emit).toHaveBeenCalledWith(MetricsEvents.SetTransactionsPendingResults, { transactions: mockTransactions });
});

it('should handle onBatchUpdated event', () => {
const mockPayload = { address: 'testAddress', batchId: 'testBatchId', txHashes: ['hash1', 'hash2'] };
controller.onBatchUpdated(mockPayload);
expect(webSocketPublisherService.onBatchUpdated).toHaveBeenCalledWith(mockPayload.address, mockPayload.batchId, mockPayload.txHashes);
expect(webSocketPublisherService.onBatchUpdated).toHaveBeenCalledTimes(1);
expect(eventEmitter.emit).toHaveBeenCalledWith(MetricsEvents.SetBatchUpdated);
});
});
3 changes: 3 additions & 0 deletions src/utils/metrics-events.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ export enum MetricsEvents {
SetLastProcessedNonce = "setLastProcessedNonce",
SetLastProcessedBatchProcessorNonce = "setLastProcessedBatchProcessorNonce",
SetLastProcessedTransactionCompletedProcessorNonce = "setLastProcessedTransactionCompletedProcessorNonce",
SetTransactionsCompleted = "setTransactionsCompleted",
SetTransactionsPendingResults = "setTransactionsPendingResults",
SetBatchUpdated = "setBatchUpdated",
}

0 comments on commit 0937b65

Please sign in to comment.