From 51aca390ba1cfdec68efbe832da9ce8c29fb5964 Mon Sep 17 00:00:00 2001 From: Lukasz Cwik <126621805+lcwik@users.noreply.github.com> Date: Mon, 6 Nov 2023 14:44:20 -0800 Subject: [PATCH] [IND-474] Update ender transfer handler to execute updates via a SQL function. (#763) --- indexer/packages/postgres/src/constants.ts | 2 + indexer/packages/postgres/src/index.ts | 1 + .../postgres/src/models/transfer-model.ts | 22 + .../src/stores/asset-position-table.ts | 1 + .../postgres/src/stores/candle-table.ts | 1 + .../postgres/src/stores/fill-table.ts | 1 + .../src/stores/funding-index-updates-table.ts | 1 + .../postgres/src/stores/oracle-price-table.ts | 1 + .../postgres/src/stores/order-table.ts | 1 + .../src/stores/perpetual-position-table.ts | 1 + .../postgres/src/stores/pnl-ticks-table.ts | 1 + .../postgres/src/stores/subaccount-table.ts | 1 + .../postgres/src/stores/transaction-table.ts | 1 + .../postgres/src/stores/transfer-table.ts | 1 + .../handlers/transfer-handler.test.ts | 673 ++++++++++-------- .../ender/__tests__/scripts/scripts.test.ts | 63 +- indexer/services/ender/src/config.ts | 5 +- .../ender/src/handlers/transfer-handler.ts | 52 +- .../helpers/postgres/postgres-functions.ts | 2 + .../scripts/dydx_liquidity_tier_handler.sql | 2 +- .../src/scripts/dydx_transfer_handler.sql | 95 +++ .../scripts/dydx_uuid_from_transfer_parts.sql | 34 + 22 files changed, 679 insertions(+), 283 deletions(-) create mode 100644 indexer/services/ender/src/scripts/dydx_transfer_handler.sql create mode 100644 indexer/services/ender/src/scripts/dydx_uuid_from_transfer_parts.sql diff --git a/indexer/packages/postgres/src/constants.ts b/indexer/packages/postgres/src/constants.ts index a636204313..900c2e6038 100644 --- a/indexer/packages/postgres/src/constants.ts +++ b/indexer/packages/postgres/src/constants.ts @@ -11,6 +11,7 @@ import OrderModel from './models/order-model'; import PerpetualMarketModel from './models/perpetual-market-model'; import PerpetualPositionModel from './models/perpetual-position-model'; import SubaccountModel from './models/subaccount-model'; +import TransferModel from './models/transfer-model'; import { APITimeInForce, CandleResolution, @@ -95,6 +96,7 @@ export const SQL_TO_JSON_DEFINED_MODELS = [ PerpetualMarketModel, PerpetualPositionModel, SubaccountModel, + TransferModel, ]; export type SpecifiedClobPairStatus = diff --git a/indexer/packages/postgres/src/index.ts b/indexer/packages/postgres/src/index.ts index 69e941c779..f2546f00ee 100644 --- a/indexer/packages/postgres/src/index.ts +++ b/indexer/packages/postgres/src/index.ts @@ -12,6 +12,7 @@ export { default as OraclePriceModel } from './models/oracle-price-model'; export { default as OrderModel } from './models/order-model'; export { default as PerpetualMarketModel } from './models/perpetual-market-model'; export { default as PerpetualPositionModel } from './models/perpetual-position-model'; +export { default as TransferModel } from './models/transfer-model'; export * as AssetTable from './stores/asset-table'; export * as AssetPositionTable from './stores/asset-position-table'; diff --git a/indexer/packages/postgres/src/models/transfer-model.ts b/indexer/packages/postgres/src/models/transfer-model.ts index cd801cbefc..8fdb88b3dd 100644 --- a/indexer/packages/postgres/src/models/transfer-model.ts +++ b/indexer/packages/postgres/src/models/transfer-model.ts @@ -104,6 +104,28 @@ export default class TransferModel extends Model { }; } + /** + * A mapping from column name to JSON conversion expected. + * See getSqlConversionForDydxModelTypes for valid conversions. + * + * TODO(IND-239): Ensure that jsonSchema() / sqlToJsonConversions() / model fields match. + */ + static get sqlToJsonConversions() { + return { + id: 'string', + senderSubaccountId: 'string', + recipientSubaccountId: 'string', + senderWalletAddress: 'string', + recipientWalletAddress: 'string', + assetId: 'string', + size: 'string', + eventId: 'hex-string', + transactionHash: 'string', + createdAt: 'date-time', + createdAtHeight: 'string', + }; + } + id!: string; senderSubaccountId?: string; diff --git a/indexer/packages/postgres/src/stores/asset-position-table.ts b/indexer/packages/postgres/src/stores/asset-position-table.ts index bd9cce950c..127417ebac 100644 --- a/indexer/packages/postgres/src/stores/asset-position-table.ts +++ b/indexer/packages/postgres/src/stores/asset-position-table.ts @@ -20,6 +20,7 @@ import { } from '../types'; export function uuid(subaccountId: string, assetId: string): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid(Buffer.from(`${subaccountId}-${assetId}`, BUFFER_ENCODING_UTF_8)); } diff --git a/indexer/packages/postgres/src/stores/candle-table.ts b/indexer/packages/postgres/src/stores/candle-table.ts index 71afe362f0..d43efb16cf 100644 --- a/indexer/packages/postgres/src/stores/candle-table.ts +++ b/indexer/packages/postgres/src/stores/candle-table.ts @@ -22,6 +22,7 @@ import { } from '../types'; export function uuid(startedAt: IsoString, ticker: string, resolution: CandleResolution): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid(Buffer.from(`${startedAt}-${ticker}-${resolution}`, BUFFER_ENCODING_UTF_8)); } diff --git a/indexer/packages/postgres/src/stores/fill-table.ts b/indexer/packages/postgres/src/stores/fill-table.ts index 6263424ddd..daf40ff1b2 100644 --- a/indexer/packages/postgres/src/stores/fill-table.ts +++ b/indexer/packages/postgres/src/stores/fill-table.ts @@ -28,6 +28,7 @@ import { } from '../types'; export function uuid(eventId: Buffer, liquidity: Liquidity): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid(Buffer.from(`${eventId.toString('hex')}-${liquidity}`, BUFFER_ENCODING_UTF_8)); } diff --git a/indexer/packages/postgres/src/stores/funding-index-updates-table.ts b/indexer/packages/postgres/src/stores/funding-index-updates-table.ts index 24ac8d02eb..54ab4d6abd 100644 --- a/indexer/packages/postgres/src/stores/funding-index-updates-table.ts +++ b/indexer/packages/postgres/src/stores/funding-index-updates-table.ts @@ -26,6 +26,7 @@ export function uuid( eventId: Buffer, perpetualId: string, ): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid(Buffer.from(`${blockHeight}-${eventId.toString('hex')}-${perpetualId}`, BUFFER_ENCODING_UTF_8)); } diff --git a/indexer/packages/postgres/src/stores/oracle-price-table.ts b/indexer/packages/postgres/src/stores/oracle-price-table.ts index f78b82df3a..82ee2a3eb6 100644 --- a/indexer/packages/postgres/src/stores/oracle-price-table.ts +++ b/indexer/packages/postgres/src/stores/oracle-price-table.ts @@ -24,6 +24,7 @@ import { export function uuid( marketId: number, height: string, ): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid(Buffer.from(`${marketId.toString()}-${height}`, BUFFER_ENCODING_UTF_8)); } diff --git a/indexer/packages/postgres/src/stores/order-table.ts b/indexer/packages/postgres/src/stores/order-table.ts index 70f8e96f01..8e93ce61f4 100644 --- a/indexer/packages/postgres/src/stores/order-table.ts +++ b/indexer/packages/postgres/src/stores/order-table.ts @@ -26,6 +26,7 @@ export function uuid( clobPairId: string, orderFlags: string, ): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid( Buffer.from( `${subaccountId}-${clientId}-${clobPairId}-${orderFlags}`, diff --git a/indexer/packages/postgres/src/stores/perpetual-position-table.ts b/indexer/packages/postgres/src/stores/perpetual-position-table.ts index 24077a68ba..29657a10d9 100644 --- a/indexer/packages/postgres/src/stores/perpetual-position-table.ts +++ b/indexer/packages/postgres/src/stores/perpetual-position-table.ts @@ -45,6 +45,7 @@ const DEFAULT_SUBACCOUNT_UPDATE_DEFAULT_POSITION_FIELDS = { }; export function uuid(subaccountId: string, openEventId: Buffer): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid(Buffer.from(`${subaccountId}-${openEventId.toString('hex')}`, BUFFER_ENCODING_UTF_8)); } diff --git a/indexer/packages/postgres/src/stores/pnl-ticks-table.ts b/indexer/packages/postgres/src/stores/pnl-ticks-table.ts index 85438c3e01..7de840933f 100644 --- a/indexer/packages/postgres/src/stores/pnl-ticks-table.ts +++ b/indexer/packages/postgres/src/stores/pnl-ticks-table.ts @@ -22,6 +22,7 @@ export function uuid( subaccountId: string, createdAt: string, ): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid( Buffer.from( `${subaccountId}-${createdAt}`, diff --git a/indexer/packages/postgres/src/stores/subaccount-table.ts b/indexer/packages/postgres/src/stores/subaccount-table.ts index 9395d3e533..e71401cd88 100644 --- a/indexer/packages/postgres/src/stores/subaccount-table.ts +++ b/indexer/packages/postgres/src/stores/subaccount-table.ts @@ -23,6 +23,7 @@ import { } from '../types'; export function uuid(address: string, subaccountNumber: number): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid(Buffer.from(`${address}-${subaccountNumber}`, BUFFER_ENCODING_UTF_8)); } diff --git a/indexer/packages/postgres/src/stores/transaction-table.ts b/indexer/packages/postgres/src/stores/transaction-table.ts index 95a3ea1b6e..1b6311daf3 100644 --- a/indexer/packages/postgres/src/stores/transaction-table.ts +++ b/indexer/packages/postgres/src/stores/transaction-table.ts @@ -20,6 +20,7 @@ import { } from '../types'; export function uuid(blockHeight: string, transactionIndex: number): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid(Buffer.from(`${blockHeight}-${transactionIndex}`, BUFFER_ENCODING_UTF_8)); } diff --git a/indexer/packages/postgres/src/stores/transfer-table.ts b/indexer/packages/postgres/src/stores/transfer-table.ts index d32a5b3901..9b89c9fbe6 100644 --- a/indexer/packages/postgres/src/stores/transfer-table.ts +++ b/indexer/packages/postgres/src/stores/transfer-table.ts @@ -31,6 +31,7 @@ export function uuid( senderWalletAddress?: string, recipientWalletAddress?: string, ): string { + // TODO(IND-483): Fix all uuid string substitutions to use Array.join. return getUuid( Buffer.from( `${senderSubaccountId}-${recipientSubaccountId}-${senderWalletAddress}-${recipientWalletAddress}-${eventId.toString('hex')}-${assetId}`, diff --git a/indexer/services/ender/__tests__/handlers/transfer-handler.test.ts b/indexer/services/ender/__tests__/handlers/transfer-handler.test.ts index 43d397000c..fe05ed7ff9 100644 --- a/indexer/services/ender/__tests__/handlers/transfer-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/transfer-handler.test.ts @@ -52,6 +52,7 @@ import { } from '../helpers/constants'; import { updateBlockCache } from '../../src/caches/block-cache'; import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions'; +import config from '../../src/config'; describe('transferHandler', () => { beforeAll(async () => { @@ -134,336 +135,452 @@ describe('transferHandler', () => { }); }); - it('fails when TransferEvent does not contain sender subaccountId', async () => { - const transactionIndex: number = 0; - const transferEvent: TransferEventV1 = TransferEventV1.fromPartial({ - recipient: { - subaccountId: { - owner: '', - number: 0, + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'fails when TransferEvent does not contain sender subaccountId (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_TRANSFER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const transferEvent: TransferEventV1 = TransferEventV1.fromPartial({ + recipient: { + subaccountId: { + owner: '', + number: 0, + }, }, - }, - assetId: 0, - amount: 100, - }); - const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ - transferEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, - }); - - const loggerCrit = jest.spyOn(logger, 'crit'); - const loggerError = jest.spyOn(logger, 'error'); - await expect(onMessage(kafkaMessage)).rejects.toThrowError( - new ParseMessageError( - 'TransferEvent must have either a sender subaccount id or sender wallet address', - ), - ); + assetId: 0, + amount: 100, + }); + const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ + transferEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({ - at: 'TransferValidator#logAndThrowParseMessageError', - message: 'TransferEvent must have either a sender subaccount id or sender wallet address', - })); - expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ - at: 'onMessage#onMessage', - message: 'Error: Unable to parse message, this must be due to a bug in V4 node', - })); - }); + const loggerCrit = jest.spyOn(logger, 'crit'); + const loggerError = jest.spyOn(logger, 'error'); + await expect(onMessage(kafkaMessage)).rejects.toThrowError( + new ParseMessageError( + 'TransferEvent must have either a sender subaccount id or sender wallet address', + ), + ); - it('fails when TransferEvent does not contain recipient subaccountId', async () => { - const transactionIndex: number = 0; - const transferEvent: TransferEventV1 = TransferEventV1.fromPartial({ - sender: { - subaccountId: { - owner: '', - number: 0, - }, - }, - assetId: 0, - amount: 100, - }); - const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ - transferEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({ + at: 'TransferValidator#logAndThrowParseMessageError', + message: 'TransferEvent must have either a sender subaccount id or sender wallet address', + })); + expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ + at: 'onMessage#onMessage', + message: 'Error: Unable to parse message, this must be due to a bug in V4 node', + })); }); - const loggerCrit = jest.spyOn(logger, 'crit'); - const loggerError = jest.spyOn(logger, 'error'); - await expect(onMessage(kafkaMessage)).rejects.toThrowError( - new ParseMessageError( - 'TransferEvent must have either a recipient subaccount id or recipient wallet address', - ), - ); - - expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({ - at: 'TransferValidator#logAndThrowParseMessageError', - message: 'TransferEvent must have either a recipient subaccount id or recipient wallet address', - })); - expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ - at: 'onMessage#onMessage', - message: 'Error: Unable to parse message, this must be due to a bug in V4 node', - })); - }); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'fails when TransferEvent does not contain recipient subaccountId (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_TRANSFER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; + const transferEvent: TransferEventV1 = TransferEventV1.fromPartial({ + sender: { + subaccountId: { + owner: '', + number: 0, + }, + }, + assetId: 0, + amount: 100, + }); + const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ + transferEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - it('creates new transfer for existing subaccounts', async () => { - const transactionIndex: number = 0; + const loggerCrit = jest.spyOn(logger, 'crit'); + const loggerError = jest.spyOn(logger, 'error'); + await expect(onMessage(kafkaMessage)).rejects.toThrowError( + new ParseMessageError( + 'TransferEvent must have either a recipient subaccount id or recipient wallet address', + ), + ); - const transferEvent: TransferEventV1 = defaultTransferEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ - transferEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + expect(loggerError).toHaveBeenCalledWith(expect.objectContaining({ + at: 'TransferValidator#logAndThrowParseMessageError', + message: 'TransferEvent must have either a recipient subaccount id or recipient wallet address', + })); + expect(loggerCrit).toHaveBeenCalledWith(expect.objectContaining({ + at: 'onMessage#onMessage', + message: 'Error: Unable to parse message, this must be due to a bug in V4 node', + })); }); - // Create the subaccounts - await Promise.all([ - SubaccountTable.upsert(defaultSenderSubaccount), - SubaccountTable.upsert(defaultRecipientSubaccount), - ]); - - // Confirm there are subaccounts - const subaccountIds: string[] = [defaultSenderSubaccountId, defaultRecipientSubaccountId]; - _.each(subaccountIds, async (subaccountId) => { - const existingSubaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( - subaccountId, - ); - expect(existingSubaccount).toBeDefined(); - }); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new transfer for existing subaccounts (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_TRANSFER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; - // Confirm there is no existing transfer to or from the recipient/sender subaccounts - await expectNoExistingTransfers([defaultRecipientSubaccountId, defaultSenderSubaccountId]); + const transferEvent: TransferEventV1 = defaultTransferEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ + transferEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); + // Create the subaccounts + await Promise.all([ + SubaccountTable.upsert(defaultSenderSubaccount), + SubaccountTable.upsert(defaultRecipientSubaccount), + ]); + + // Confirm there are subaccounts + const subaccountIds: string[] = [defaultSenderSubaccountId, defaultRecipientSubaccountId]; + _.each(subaccountIds, async (subaccountId) => { + const existingSubaccount: + SubaccountFromDatabase | undefined = await SubaccountTable.findById( + subaccountId, + ); + expect(existingSubaccount).toBeDefined(); + }); - const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer({ - recipientSubaccountId: defaultRecipientSubaccountId, - senderSubaccountId: defaultSenderSubaccountId, - }); + // Confirm there is no existing transfer to or from the recipient/sender subaccounts + await expectNoExistingTransfers([defaultRecipientSubaccountId, defaultSenderSubaccountId]); - expectTransferMatchesEvent(transferEvent, newTransfer, asset); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); - await expectTransfersSubaccountKafkaMessage( - producerSendMock, - transferEvent, - newTransfer, - asset, - ); - expectTimingStats(); - }); + const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer({ + recipientSubaccountId: defaultRecipientSubaccountId, + senderSubaccountId: defaultSenderSubaccountId, + }); - it('creates new deposit for existing subaccount', async () => { - const transactionIndex: number = 0; + expectTransferMatchesEvent(transferEvent, newTransfer, asset); - const depositEvent: TransferEventV1 = defaultDepositEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ - transferEvent: depositEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + await expectTransfersSubaccountKafkaMessage( + producerSendMock, + transferEvent, + newTransfer, + asset, + ); + if (!useSqlFunction) { + expectTimingStats(); + } }); - // Create the subaccounts - await Promise.all([ - SubaccountTable.upsert(defaultRecipientSubaccount), - ]); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new deposit for existing subaccount (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_TRANSFER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; - // Confirm there is a recipient subaccount - const existingSubaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( - defaultRecipientSubaccountId, - ); - expect(existingSubaccount).toBeDefined(); + const depositEvent: TransferEventV1 = defaultDepositEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ + transferEvent: depositEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - // Confirm there is no existing transfer to or from the recipient subaccount - await expectNoExistingTransfers([defaultRecipientSubaccountId]); + // Create the subaccounts + await Promise.all([ + SubaccountTable.upsert(defaultRecipientSubaccount), + ]); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); + // Confirm there is a recipient subaccount + const existingSubaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( + defaultRecipientSubaccountId, + ); + expect(existingSubaccount).toBeDefined(); - const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer( - { - recipientSubaccountId: defaultRecipientSubaccountId, - }, - ); + // Confirm there is no existing transfer to or from the recipient subaccount + await expectNoExistingTransfers([defaultRecipientSubaccountId]); - expectTransferMatchesEvent(depositEvent, newTransfer, asset); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); - await expectTransfersSubaccountKafkaMessage( - producerSendMock, - depositEvent, - newTransfer, - asset, - ); - // Confirm the wallet was created - const wallet: WalletFromDatabase | undefined = await WalletTable.findById( - defaultWalletAddress, - ); - expect(wallet).toBeDefined(); - expectTimingStats(); - }); + const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer( + { + recipientSubaccountId: defaultRecipientSubaccountId, + }, + ); - it('creates new deposit for previously non-existent subaccount', async () => { - const transactionIndex: number = 0; + expectTransferMatchesEvent(depositEvent, newTransfer, asset); - const depositEvent: TransferEventV1 = defaultDepositEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ - transferEvent: depositEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + await expectTransfersSubaccountKafkaMessage( + producerSendMock, + depositEvent, + newTransfer, + asset, + ); + // Confirm the wallet was created + const wallet: WalletFromDatabase | undefined = await WalletTable.findById( + defaultWalletAddress, + ); + expect(wallet).toBeDefined(); + if (!useSqlFunction) { + expectTimingStats(); + } }); - // Confirm there is no recipient subaccount - const existingSubaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( - defaultRecipientSubaccountId, - ); - expect(existingSubaccount).toBeUndefined(); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new deposit for previously non-existent subaccount (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_TRANSFER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; - // Confirm there is no existing transfer to or from the recipient subaccount - await expectNoExistingTransfers([defaultRecipientSubaccountId]); + const depositEvent: TransferEventV1 = defaultDepositEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ + transferEvent: depositEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); + // Confirm there is no recipient subaccount + const existingSubaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( + defaultRecipientSubaccountId, + ); + expect(existingSubaccount).toBeUndefined(); - const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer( - { - recipientSubaccountId: defaultRecipientSubaccountId, - }, - ); + // Confirm there is no existing transfer to or from the recipient subaccount + await expectNoExistingTransfers([defaultRecipientSubaccountId]); - expectTransferMatchesEvent(depositEvent, newTransfer, asset); - await expectTransfersSubaccountKafkaMessage( - producerSendMock, - depositEvent, - newTransfer, - asset, - ); - // Confirm the wallet was created - const wallet: WalletFromDatabase | undefined = await WalletTable.findById( - defaultWalletAddress, - ); - const newRecipientSubaccount: SubaccountFromDatabase | undefined = await - SubaccountTable.findById( - defaultRecipientSubaccountId, - ); - expect(newRecipientSubaccount).toBeDefined(); - expect(wallet).toBeDefined(); - expectTimingStats(); - }); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); - it('creates new withdrawal for existing subaccount', async () => { - const transactionIndex: number = 0; + const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer( + { + recipientSubaccountId: defaultRecipientSubaccountId, + }, + ); - const withdrawalEvent: TransferEventV1 = defaultWithdrawalEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ - transferEvent: withdrawalEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + expectTransferMatchesEvent(depositEvent, newTransfer, asset); + await expectTransfersSubaccountKafkaMessage( + producerSendMock, + depositEvent, + newTransfer, + asset, + ); + // Confirm the wallet was created + const wallet: WalletFromDatabase | undefined = await WalletTable.findById( + defaultWalletAddress, + ); + const newRecipientSubaccount: SubaccountFromDatabase | undefined = await + SubaccountTable.findById( + defaultRecipientSubaccountId, + ); + expect(newRecipientSubaccount).toBeDefined(); + expect(wallet).toBeDefined(); + if (!useSqlFunction) { + expectTimingStats(); + } }); - // Create the subaccounts - await Promise.all([ - SubaccountTable.upsert(defaultSenderSubaccount), - ]); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new withdrawal for existing subaccount (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_TRANSFER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; - // Confirm there is a sender subaccount - const existingSubaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( - defaultSenderSubaccountId, - ); - expect(existingSubaccount).toBeDefined(); + const withdrawalEvent: TransferEventV1 = defaultWithdrawalEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ + transferEvent: withdrawalEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - // Confirm there is no existing transfer to or from the sender subaccount - await expectNoExistingTransfers([defaultSenderSubaccountId]); + // Create the subaccounts + await Promise.all([ + SubaccountTable.upsert(defaultSenderSubaccount), + ]); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); + // Confirm there is a sender subaccount + const existingSubaccount: SubaccountFromDatabase | undefined = await SubaccountTable.findById( + defaultSenderSubaccountId, + ); + expect(existingSubaccount).toBeDefined(); - const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer( - { - senderSubaccountId: defaultSenderSubaccountId, - }, - ); + // Confirm there is no existing transfer to or from the sender subaccount + await expectNoExistingTransfers([defaultSenderSubaccountId]); - expectTransferMatchesEvent(withdrawalEvent, newTransfer, asset); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); - await expectTransfersSubaccountKafkaMessage( - producerSendMock, - withdrawalEvent, - newTransfer, - asset, - ); - // Confirm the wallet was created - const wallet: WalletFromDatabase | undefined = await WalletTable.findById( - defaultWalletAddress, - ); - expect(wallet).toBeDefined(); - expectTimingStats(); - }); + const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer( + { + senderSubaccountId: defaultSenderSubaccountId, + }, + ); - it('creates new transfer and the recipient subaccount', async () => { - const transactionIndex: number = 0; + expectTransferMatchesEvent(withdrawalEvent, newTransfer, asset); - const transferEvent: TransferEventV1 = defaultTransferEvent; - const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ - transferEvent, - transactionIndex, - height: defaultHeight, - time: defaultTime, - txHash: defaultTxHash, + await expectTransfersSubaccountKafkaMessage( + producerSendMock, + withdrawalEvent, + newTransfer, + asset, + ); + // Confirm the wallet was created + const wallet: WalletFromDatabase | undefined = await WalletTable.findById( + defaultWalletAddress, + ); + expect(wallet).toBeDefined(); + if (!useSqlFunction) { + expectTimingStats(); + } }); - await SubaccountTable.upsert(defaultSenderSubaccount); + it.each([ + [ + 'via knex', + false, + ], + [ + 'via SQL function', + true, + ], + ])( + 'creates new transfer and the recipient subaccount (%s)', + async ( + _name: string, + useSqlFunction: boolean, + ) => { + config.USE_TRANSFER_HANDLER_SQL_FUNCTION = useSqlFunction; + const transactionIndex: number = 0; - // Confirm there is 1 subaccount - const existingSenderSubaccount: SubaccountFromDatabase | undefined = await - SubaccountTable.findById( - defaultSenderSubaccountId, - ); - expect(existingSenderSubaccount).toBeDefined(); - const existingRecipientSubaccount: SubaccountFromDatabase | undefined = await - SubaccountTable.findById( - defaultRecipientSubaccountId, - ); - expect(existingRecipientSubaccount).toBeUndefined(); + const transferEvent: TransferEventV1 = defaultTransferEvent; + const kafkaMessage: KafkaMessage = createKafkaMessageFromTransferEvent({ + transferEvent, + transactionIndex, + height: defaultHeight, + time: defaultTime, + txHash: defaultTxHash, + }); - // Confirm there is no existing transfers - await expectNoExistingTransfers([defaultRecipientSubaccountId, defaultSenderSubaccountId]); + await SubaccountTable.upsert(defaultSenderSubaccount); - const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); - await onMessage(kafkaMessage); + // Confirm there is 1 subaccount + const existingSenderSubaccount: SubaccountFromDatabase | undefined = await + SubaccountTable.findById( + defaultSenderSubaccountId, + ); + expect(existingSenderSubaccount).toBeDefined(); + const existingRecipientSubaccount: SubaccountFromDatabase | undefined = await + SubaccountTable.findById( + defaultRecipientSubaccountId, + ); + expect(existingRecipientSubaccount).toBeUndefined(); - const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer( - { - recipientSubaccountId: defaultRecipientSubaccountId, - senderSubaccountId: defaultSenderSubaccountId, - }); + // Confirm there is no existing transfers + await expectNoExistingTransfers([defaultRecipientSubaccountId, defaultSenderSubaccountId]); - expectTransferMatchesEvent(transferEvent, newTransfer, asset); - const newRecipientSubaccount: SubaccountFromDatabase | undefined = await - SubaccountTable.findById( - defaultRecipientSubaccountId, - ); - expect(newRecipientSubaccount).toBeDefined(); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); + await onMessage(kafkaMessage); - await expectTransfersSubaccountKafkaMessage( - producerSendMock, - transferEvent, - newTransfer, - asset, - ); - expectTimingStats(); - }); + const newTransfer: TransferFromDatabase = await expectAndReturnNewTransfer( + { + recipientSubaccountId: defaultRecipientSubaccountId, + senderSubaccountId: defaultSenderSubaccountId, + }); + + expectTransferMatchesEvent(transferEvent, newTransfer, asset); + const newRecipientSubaccount: SubaccountFromDatabase | undefined = await + SubaccountTable.findById( + defaultRecipientSubaccountId, + ); + expect(newRecipientSubaccount).toBeDefined(); + + await expectTransfersSubaccountKafkaMessage( + producerSendMock, + transferEvent, + newTransfer, + asset, + ); + if (!useSqlFunction) { + expectTimingStats(); + } + }); }); function createKafkaMessageFromTransferEvent({ diff --git a/indexer/services/ender/__tests__/scripts/scripts.test.ts b/indexer/services/ender/__tests__/scripts/scripts.test.ts index d0d4b51c75..3c14458953 100644 --- a/indexer/services/ender/__tests__/scripts/scripts.test.ts +++ b/indexer/services/ender/__tests__/scripts/scripts.test.ts @@ -29,6 +29,7 @@ import { uuid, TransactionTable, TransactionFromDatabase, + TransferTable, BlockTable, TendermintEventFromDatabase, } from '@dydxprotocol-indexer/postgres'; @@ -292,7 +293,7 @@ describe('SQL Function Tests', () => { 5, 6, ], - ])('dydx_uuid_from_perpetual_position_parts (%s)', async (subaccountId: IndexerSubaccountId, blockHeight: number, transactionIndex: number, eventIndex: number) => { + ])('dydx_uuid_from_perpetual_position_parts (%s, %s, %s, %s)', async (subaccountId: IndexerSubaccountId, blockHeight: number, transactionIndex: number, eventIndex: number) => { const subaccountUuid = SubaccountTable.subaccountIdToUuid(subaccountId); const eventId = TendermintEventTable.createEventId(`${blockHeight}`, transactionIndex, eventIndex); const result = await getSingleRawQueryResultRow( @@ -313,6 +314,66 @@ describe('SQL Function Tests', () => { expect(result).toEqual(SubaccountTable.subaccountIdToUuid(subaccountId)); }); + it.each([ + [ + { + owner: testConstants.defaultSubaccount.address, + number: testConstants.defaultSubaccount.subaccountNumber, + }, + { + owner: testConstants.defaultSubaccount2.address, + number: testConstants.defaultSubaccount2.subaccountNumber, + }, + undefined, + undefined, + ], + [ + { + owner: testConstants.defaultSubaccount2.address, + number: testConstants.defaultSubaccount2.subaccountNumber, + }, + undefined, + 'senderWallet', + undefined, + ], + [ + { + owner: testConstants.defaultSubaccount.address, + number: testConstants.defaultSubaccount.subaccountNumber, + }, + undefined, + undefined, + 'recipientWallet', + ], + [ + undefined, + undefined, + 'senderWallet', + 'recipientWallet', + ], + ])('dydx_uuid_from_transfer_parts (%s, %s, %s, %s)', async ( + senderSubaccountId: IndexerSubaccountId | undefined, + recipientSubaccountId: IndexerSubaccountId | undefined, + senderWalletAddress: string | undefined, + recipientWalletAddress: string | undefined) => { + const eventId: Buffer = TendermintEventTable.createEventId('1', 2, 3); + const assetId: string = '0'; + const senderSubaccountUuid: string | undefined = senderSubaccountId + ? SubaccountTable.subaccountIdToUuid(senderSubaccountId) : undefined; + const recipientSubaccountUuid: string | undefined = recipientSubaccountId + ? SubaccountTable.subaccountIdToUuid(recipientSubaccountId) : undefined; + const result = await getSingleRawQueryResultRow( + `SELECT dydx_uuid_from_transfer_parts('\\x${eventId.toString('hex')}'::bytea, '${assetId}', ${senderSubaccountUuid ? `'${senderSubaccountUuid}'` : 'NULL'}, ${recipientSubaccountUuid ? `'${recipientSubaccountUuid}'` : 'NULL'}, ${senderWalletAddress ? `'${senderWalletAddress}'` : 'NULL'}, ${recipientWalletAddress ? `'${recipientWalletAddress}'` : 'NULL'}) AS result`); + expect(result).toEqual(TransferTable.uuid( + eventId, + assetId, + senderSubaccountUuid, + recipientSubaccountUuid, + senderWalletAddress, + recipientWalletAddress, + )); + }); + it.each([ { event: { transactionIndex: 123 }, diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index ecc09c4ac6..e6fd60d9f3 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -44,10 +44,13 @@ export const configSchema = { USE_ORDER_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), + USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION: parseBoolean({ + default: true, + }), USE_SUBACCOUNT_UPDATE_SQL_FUNCTION: parseBoolean({ default: true, }), - USE_PERPETUAL_MARKET_HANDLER_SQL_FUNCTION: parseBoolean({ + USE_TRANSFER_HANDLER_SQL_FUNCTION: parseBoolean({ default: true, }), USE_UPDATE_CLOB_PAIR_HANDLER_SQL_FUNCTION: parseBoolean({ diff --git a/indexer/services/ender/src/handlers/transfer-handler.ts b/indexer/services/ender/src/handlers/transfer-handler.ts index d4d8fa6644..b3b3118517 100644 --- a/indexer/services/ender/src/handlers/transfer-handler.ts +++ b/indexer/services/ender/src/handlers/transfer-handler.ts @@ -1,17 +1,23 @@ +import { logger } from '@dydxprotocol-indexer/base'; import { AssetFromDatabase, + AssetModel, assetRefresher, protocolTranslations, + storeHelpers, SubaccountMessageContents, SubaccountTable, TendermintEventTable, TransferCreateObject, TransferFromDatabase, + TransferModel, TransferTable, WalletTable, } from '@dydxprotocol-indexer/postgres'; import { TransferEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import * as pg from 'pg'; +import config from '../config'; import { generateTransferContents } from '../helpers/kafka-helper'; import { indexerTendermintEventToTransactionIndex } from '../lib/helper'; import { ConsolidatedKafkaEvent, TransferEventType } from '../lib/types'; @@ -25,8 +31,50 @@ export class TransferHandler extends Handler { return []; } - public async internalHandle( - ): Promise { + // eslint-disable-next-line @typescript-eslint/require-await + public async internalHandle(): Promise { + if (config.USE_TRANSFER_HANDLER_SQL_FUNCTION) { + return this.handleViaSqlFunction(); + } + return this.handleViaKnex(); + } + + private async handleViaSqlFunction(): Promise { + const transactionIndex: number = indexerTendermintEventToTransactionIndex( + this.indexerTendermintEvent, + ); + const eventDataBinary: Uint8Array = this.indexerTendermintEvent.dataBytes; + const result: pg.QueryResult = await storeHelpers.rawQuery( + `SELECT dydx_transfer_handler( + ${this.block.height}, + '${this.block.time?.toISOString()}', + '${JSON.stringify(TransferEventV1.decode(eventDataBinary))}', + ${this.indexerTendermintEvent.eventIndex}, + ${transactionIndex}, + '${this.block.txHashes[transactionIndex]}' + ) AS result;`, + { txId: this.txId }, + ).catch((error: Error) => { + logger.error({ + at: 'TransferHandler#handleViaSqlFunction', + message: 'Failed to handle TransferEventV1', + error, + }); + + throw error; + }); + + const asset: AssetFromDatabase = AssetModel.fromJson( + result.rows[0].result.asset) as AssetFromDatabase; + const transfer: TransferFromDatabase = TransferModel.fromJson( + result.rows[0].result.transfer) as TransferFromDatabase; + return this.generateKafkaEvents( + transfer, + asset, + ); + } + + private async handleViaKnex(): Promise { await this.runFuncWithTimingStatAndErrorLogging( Promise.all([ this.upsertRecipientSubaccount(), diff --git a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts index ec43a44afb..6a5b2b2f18 100644 --- a/indexer/services/ender/src/helpers/postgres/postgres-functions.ts +++ b/indexer/services/ender/src/helpers/postgres/postgres-functions.ts @@ -50,6 +50,7 @@ const scripts: string[] = [ 'dydx_perpetual_market_handler.sql', 'dydx_perpetual_position_and_order_side_matching.sql', 'dydx_subaccount_update_handler.sql', + 'dydx_transfer_handler.sql', 'dydx_trim_scale.sql', 'dydx_update_clob_pair_handler.sql', 'dydx_update_perpetual_handler.sql', @@ -63,6 +64,7 @@ const scripts: string[] = [ 'dydx_uuid_from_subaccount_id.sql', 'dydx_uuid_from_subaccount_id_parts.sql', 'dydx_uuid_from_transaction_parts.sql', + 'dydx_uuid_from_transfer_parts.sql', 'dydx_create_transaction.sql', 'dydx_create_initial_rows_for_tendermint_block.sql', 'dydx_create_tendermint_event.sql', diff --git a/indexer/services/ender/src/scripts/dydx_liquidity_tier_handler.sql b/indexer/services/ender/src/scripts/dydx_liquidity_tier_handler.sql index b32c4212ba..c59b05ac5b 100644 --- a/indexer/services/ender/src/scripts/dydx_liquidity_tier_handler.sql +++ b/indexer/services/ender/src/scripts/dydx_liquidity_tier_handler.sql @@ -13,7 +13,7 @@ BEGIN liquidity_tier_record."name" = event_data->>'name'; liquidity_tier_record."initialMarginPpm" = (event_data->'initialMarginPpm')::bigint; liquidity_tier_record."maintenanceFractionPpm" = (event_data->'maintenanceFractionPpm')::bigint; - liquidity_tier_record."basePositionNotional" = power(10, -6) * dydx_from_jsonlib_long(event_data->'basePositionNotional'); + liquidity_tier_record."basePositionNotional" = dydx_trim_scale(power(10, -6)::numeric * dydx_from_jsonlib_long(event_data->'basePositionNotional')); INSERT INTO liquidity_tiers VALUES (liquidity_tier_record.*) diff --git a/indexer/services/ender/src/scripts/dydx_transfer_handler.sql b/indexer/services/ender/src/scripts/dydx_transfer_handler.sql new file mode 100644 index 0000000000..3077c74b6c --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_transfer_handler.sql @@ -0,0 +1,95 @@ +/** + Parameters: + - block_height: the height of the block being processing. + - block_time: the time of the block being processed. + - event_data: The 'data' field of the IndexerTendermintEvent (https://github.com/dydxprotocol/v4-proto/blob/8d35c86/dydxprotocol/indexer/indexer_manager/event.proto#L25) + converted to JSON format. Conversion to JSON is expected to be done by JSON.stringify. + - event_index: The 'event_index' of the IndexerTendermintEvent. + - transaction_index: The transaction_index of the IndexerTendermintEvent after the conversion that takes into + account the block_event (https://github.com/dydxprotocol/indexer/blob/cc70982/services/ender/src/lib/helper.ts#L33) + - transaction_hash: The transaction hash corresponding to this event from the IndexerTendermintBlock 'tx_hashes'. + Returns: JSON object containing fields: + - asset: The existing asset in asset-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/asset-model.ts). + - transfer: The new transfer in transfer-model format (https://github.com/dydxprotocol/indexer/blob/cc70982/packages/postgres/src/models/transfer-model.ts). +*/ +CREATE OR REPLACE FUNCTION dydx_transfer_handler( + block_height int, block_time timestamp, event_data jsonb, event_index int, transaction_index int, + transaction_hash text) RETURNS jsonb AS $$ +DECLARE + asset_record assets%ROWTYPE; + recipient_subaccount_record subaccounts%ROWTYPE; + recipient_wallet_record wallets%ROWTYPE; + sender_wallet_record wallets%ROWTYPE; + transfer_record transfers%ROWTYPE; +BEGIN + asset_record."id" = event_data->>'assetId'; + SELECT * INTO asset_record FROM assets WHERE "id" = asset_record."id"; + + IF NOT FOUND THEN + RAISE EXCEPTION 'Unable to find asset with assetId: %', asset_record."id"; + END IF; + + IF event_data->'recipient'->'subaccountId' IS NOT NULL THEN + transfer_record."recipientSubaccountId" = dydx_uuid_from_subaccount_id(event_data->'recipient'->'subaccountId'); + + recipient_subaccount_record."id" = transfer_record."recipientSubaccountId"; + recipient_subaccount_record."address" = event_data->'recipient'->'subaccountId'->>'owner'; + recipient_subaccount_record."subaccountNumber" = (event_data->'recipient'->'subaccountId'->'number')::int; + recipient_subaccount_record."updatedAtHeight" = block_height; + recipient_subaccount_record."updatedAt" = block_time; + + INSERT INTO subaccounts VALUES (recipient_subaccount_record.*) + ON CONFLICT ("id") DO + UPDATE + SET + "updatedAtHeight" = recipient_subaccount_record."updatedAtHeight", + "updatedAt" = recipient_subaccount_record."updatedAt"; + END IF; + + IF event_data->'sender'->'subaccountId' IS NOT NULL THEN + transfer_record."senderSubaccountId" = dydx_uuid_from_subaccount_id(event_data->'sender'->'subaccountId'); + END IF; + + IF event_data->'recipient'->'address' IS NOT NULL THEN + transfer_record."recipientWalletAddress" = event_data->'recipient'->>'address'; + + recipient_wallet_record."address" = transfer_record."recipientWalletAddress"; + INSERT INTO wallets VALUES (recipient_wallet_record.*) ON CONFLICT DO NOTHING; + END IF; + + IF event_data->'sender'->'address' IS NOT NULL THEN + transfer_record."senderWalletAddress" = event_data->'sender'->>'address'; + + sender_wallet_record."address" = transfer_record."senderWalletAddress"; + INSERT INTO wallets VALUES (sender_wallet_record.*) ON CONFLICT DO NOTHING; + END IF; + + transfer_record."assetId" = event_data->>'assetId'; + transfer_record."size" = dydx_trim_scale(dydx_from_jsonlib_long(event_data->'amount') * power(10, asset_record."atomicResolution")::numeric); + transfer_record."eventId" = dydx_event_id_from_parts(block_height, transaction_index, event_index); + transfer_record."transactionHash" = transaction_hash; + transfer_record."createdAt" = block_time; + transfer_record."createdAtHeight" = block_height; + transfer_record."id" = dydx_uuid_from_transfer_parts( + transfer_record."eventId", + transfer_record."assetId", + transfer_record."senderSubaccountId", + transfer_record."recipientSubaccountId", + transfer_record."senderWalletAddress", + transfer_record."recipientWalletAddress"); + + BEGIN + INSERT INTO transfers VALUES (transfer_record.*); + EXCEPTION + WHEN check_violation THEN + RAISE EXCEPTION 'Record: %, event: %', transfer_record, event_data; + END; + + RETURN jsonb_build_object( + 'asset', + dydx_to_jsonb(asset_record), + 'transfer', + dydx_to_jsonb(transfer_record) + ); +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/indexer/services/ender/src/scripts/dydx_uuid_from_transfer_parts.sql b/indexer/services/ender/src/scripts/dydx_uuid_from_transfer_parts.sql new file mode 100644 index 0000000000..16870fed82 --- /dev/null +++ b/indexer/services/ender/src/scripts/dydx_uuid_from_transfer_parts.sql @@ -0,0 +1,34 @@ +/** + Returns a UUID using the parts of a transfer. +*/ +CREATE OR REPLACE FUNCTION dydx_uuid_from_transfer_parts(event_id bytea, asset_id text, sender_subaccount_id uuid, recipient_subaccount_id uuid, sender_wallet_address text, recipient_wallet_address text) RETURNS uuid AS $$ +DECLARE + sender_subaccount_id_or_undefined text; + recipient_subaccount_id_or_undefined text; + sender_wallet_address_or_undefined text; + recipient_wallet_address_or_undefined text; +BEGIN + /** TODO(IND-483): Fix all uuid string substitutions to use Array.join so that we can drop the 'undefined' substitutions below. */ + IF sender_subaccount_id IS NULL THEN + sender_subaccount_id_or_undefined = 'undefined'; + ELSE + sender_subaccount_id_or_undefined = sender_subaccount_id; + END IF; + IF recipient_subaccount_id IS NULL THEN + recipient_subaccount_id_or_undefined = 'undefined'; + ELSE + recipient_subaccount_id_or_undefined = recipient_subaccount_id; + END IF; + IF sender_wallet_address IS NULL THEN + sender_wallet_address_or_undefined = 'undefined'; + ELSE + sender_wallet_address_or_undefined = sender_wallet_address; + END IF; + IF recipient_wallet_address IS NULL THEN + recipient_wallet_address_or_undefined = 'undefined'; + ELSE + recipient_wallet_address_or_undefined = recipient_wallet_address; + END IF; + return dydx_uuid(concat(sender_subaccount_id_or_undefined, '-', recipient_subaccount_id_or_undefined, '-', sender_wallet_address_or_undefined, '-', recipient_wallet_address_or_undefined, '-', encode(event_id, 'hex'), '-', asset_id)); +END; +$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;