Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IND-288] send final message per order id for order fills, with aggregated fill sfrom the entire block #658

Merged
merged 7 commits into from
Oct 20, 2023

Conversation

dydxwill
Copy link
Contributor

@dydxwill dydxwill commented Oct 18, 2023

Changelist

For order fills, the state transition of CANCELED -> FILLED isn't considered valid by the FE. This can happen if there are multiple fills in a given block for an IOC order. Because the totalFilled < order amount on the first fill, Indexer will publish a websocket message with CANCELED status. In subsequent fills, when totalFilled=order amount, Indexer will publish a follow-up message with FILLED status.

Rather than send individual messages per fill per order, send a single websocket message containing the order status of the last fill and all aggregated fills.

Test Plan

Unit tests

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

@dydxwill dydxwill added the bug Something isn't working label Oct 18, 2023
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 18, 2023

Walkthrough

The changes primarily focus on improving the handling, sorting, and aggregation of subaccount messages and fill events in the Kafka publisher. A new interface AnnotatedSubaccountMessage has been introduced, which extends SubaccountMessage and adds additional properties. The KafkaPublisher class and its methods have been updated to use this new interface. New helper functions have been added to sort and aggregate events, and existing methods have been modified to improve the overall functionality.

Changes

File Path Summary
.../ender/__tests__/lib/kafka-publisher.test.ts Introduced new functions for handling, sorting, and aggregating subaccount and trade events. Added new helper functions for generating subaccount and transfer contents.
.../ender/src/handlers/handler.ts Updated the generateConsolidatedSubaccountKafkaEvent method to include additional parameters. The SubaccountMessage type has been replaced with AnnotatedSubaccountMessage.
.../ender/src/handlers/order-fills/abstract-order-fill-handler.ts Updated the generateConsolidatedSubaccountKafkaEvent function to include additional arguments.
.../ender/src/lib/candles-generator.ts Renamed the sortTradeEvents() method to sortEvents() and updated its arguments.
.../ender/src/lib/helper.ts Added a new function convertToSubaccountMessage that converts an AnnotatedSubaccountMessage to a SubaccountMessage.
.../ender/src/lib/kafka-publisher.ts Updated the KafkaPublisher class to handle AnnotatedSubaccountMessage objects. Added new methods for sorting and aggregating events.
.../ender/src/lib/types.ts Introduced the AnnotatedSubaccountMessage interface and updated the ConsolidatedKafkaEvent type to use this new interface.

Tips

Chat with CodeRabbit Bot (@coderabbitai)

  • Mention @coderabbitai in any review comment for bot assistance.
  • Note: Review comments are made on code diffs or files, not on the PR overview.

Pause Incremental Reviews

  • Insert @coderabbitai: ignore in the PR description to halt the bot's ongoing reviews. Remove the line to resume.
  • Additionally, enter @coderabbitai pause to halt the bot's ongoing reviews as any issue or review comment. To
    resume reviews, enter @coderabbitai resume as any issue or review comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 0

Commits Files that changed from the base of the PR and between c17e215 and 4cd6724.
Files selected for processing (6)
  • indexer/services/ender/tests/lib/kafka-publisher.test.ts (3 hunks)
  • indexer/services/ender/src/handlers/handler.ts (3 hunks)
  • indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts (1 hunks)
  • indexer/services/ender/src/lib/candles-generator.ts (2 hunks)
  • indexer/services/ender/src/lib/kafka-publisher.ts (4 hunks)
  • indexer/services/ender/src/lib/types.ts (1 hunks)
Files skipped from review due to trivial changes (2)
  • indexer/services/ender/tests/lib/kafka-publisher.test.ts
  • indexer/services/ender/src/lib/candles-generator.ts
Additional comments (Suppressed): 12
indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts (1)
  • 407-412: The generateConsolidatedSubaccountKafkaEvent method now accepts two additional parameters: orderId and isFill. Ensure that all calls to this method throughout the codebase have been updated to match the new signature. Also, verify that the order object is guaranteed to be non-null when accessing order.id, as this could potentially cause a runtime error if order is null or undefined.
indexer/services/ender/src/lib/types.ts (2)
  • 191-194: The new AnnotatedSubaccountMessage interface extends SubaccountMessage with two optional properties: orderId and isFill. This change seems to be in line with the PR's goal of consolidating multiple fill events for a single order into a single websocket message. However, ensure that the rest of the codebase that uses SubaccountMessage can handle the new properties without any issues.

  • 201-206: The ConsolidatedKafkaEvent type now uses AnnotatedSubaccountMessage instead of SubaccountMessage. This change is consistent with the introduction of the AnnotatedSubaccountMessage interface. However, make sure that all parts of the codebase that use ConsolidatedKafkaEvent are updated to handle AnnotatedSubaccountMessage.

indexer/services/ender/src/handlers/handler.ts (3)
  • 25-26: The import statement now includes AnnotatedSubaccountMessage which extends the previous SubaccountMessage with two additional properties: orderId and isFill. Ensure that this new type is correctly defined and used throughout the codebase.

  • 105-108: The generateConsolidatedSubaccountKafkaEvent method now accepts two additional optional parameters: orderId and isFill. Make sure that all calls to this method have been updated accordingly.

  • 117-124: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [111-122]

The subaccountMessage is now of type AnnotatedSubaccountMessage and includes the new orderId and isFill properties. Ensure that these properties are correctly populated and used in the rest of the code.

indexer/services/ender/src/lib/kafka-publisher.ts (6)
  • 28-31: The type of subaccountMessages has been changed from SubaccountMessage[] to AnnotatedSubaccountMessage[]. Ensure that all usages of subaccountMessages throughout the codebase have been updated to handle AnnotatedSubaccountMessage objects.

  • 49-50: The addEvent method has been updated to use the getMessages helper function to add an event to the appropriate message array based on its topic. This is a more flexible approach than the previous switch-case structure.

  • 81-104: The retainLastFillEventsForSubaccountMessages method has been added to retain only the last fill event per order ID in subaccountMessages. This is a significant change that affects how fill events are handled and sent to the websocket. Ensure that this change does not negatively impact any systems that rely on receiving all fill events for an order.

  • 109-117: The compareMessages helper function has been added to compare two AnnotatedSubaccountMessage objects based on block height, transaction index, and event index. This function is used in retainLastFillEventsForSubaccountMessages to determine which fill event is the latest for a given order ID.

  • 46-139: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [123-149]

The sortEvents method has been added to sort events based on block height, transaction index, and event index. This method replaces the previous sortTradeEvents method and is more flexible as it can sort both subaccount and trade events. However, it throws an error if the topic is not subaccounts or trades. Ensure that this method is only called with the appropriate topics.

  • 175-176: The retainLastFillEventsForSubaccountMessages method is now called in generateAllTopicKafkaMessages before generating Kafka messages for subaccountMessages. This ensures that only the last fill event per order ID is included in the Kafka messages.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 2

Commits Files that changed from the base of the PR and between 4cd6724 and 3e09681.
Files selected for processing (4)
  • indexer/services/ender/tests/helpers/kafka-publisher-helpers.ts (2 hunks)
  • indexer/services/ender/tests/lib/kafka-publisher.test.ts (2 hunks)
  • indexer/services/ender/src/lib/kafka-publisher.ts (4 hunks)
  • indexer/services/ender/src/lib/types.ts (2 hunks)
Additional comments (Suppressed): 13
indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts (2)
  • 5-5: The import of AnnotatedSubaccountMessage is new. Ensure that this type is correctly defined and used in the code.

  • 44-51: The new function createConsolidatedKafkaEventFromSubaccount is introduced to create a ConsolidatedKafkaEvent from a AnnotatedSubaccountMessage. This function seems to be correctly implemented, but ensure that it is used properly in the rest of the codebase.

indexer/services/ender/src/lib/types.ts (4)
  • 31-37: The import of lodash (_) is new in this hunk. Ensure that it is used in the codebase and that it doesn't introduce unnecessary overhead if its functionalities are not fully utilized. If only specific methods from lodash are used, consider importing only those methods to reduce the bundle size.

  • 192-195: The AnnotatedSubaccountMessage interface extends SubaccountMessage with two optional properties: orderId and isFill. This is a good practice as it maintains the original structure of SubaccountMessage while adding new properties, thus ensuring backward compatibility.

  • 197-202: The convertToSubaccountMessage function is used to convert an AnnotatedSubaccountMessage back to a SubaccountMessage by omitting the orderId and isFill properties. This function uses lodash's omit method, which is a good choice for this operation. However, be aware that lodash's omit function can be slow for large objects because it creates a new object. If performance becomes an issue, consider using a more performant way to omit properties.

  • 209-214: The ConsolidatedKafkaEvent type now uses AnnotatedSubaccountMessage instead of SubaccountMessage. This change should be compatible with the existing codebase as long as the new properties (orderId and isFill) are optional and are handled correctly in the code that uses ConsolidatedKafkaEvent.

indexer/services/ender/__tests__/lib/kafka-publisher.test.ts (3)
  • 1-30: The new hunk introduces additional imports from the @dydxprotocol-indexer/kafka and @dydxprotocol-indexer/postgres packages, as well as from the local types and helpers modules. The AnnotatedSubaccountMessage and convertToSubaccountMessage types are now being imported, which suggests that the KafkaPublisher class may now be handling AnnotatedSubaccountMessage objects. The createConsolidatedKafkaEventFromSubaccount helper function is also being imported, which suggests that the KafkaPublisher class may now be creating ConsolidatedKafkaEvent objects from AnnotatedSubaccountMessage objects.

  • 149-218: The new hunk introduces a new test case for the sortSubaccountEvents method of the KafkaPublisher class. This method appears to sort AnnotatedSubaccountMessage objects based on the blockHeight, transactionIndex, and eventIndex properties. The test case uses the it.each function to test the sorting of AnnotatedSubaccountMessage objects based on each of these properties. The AnnotatedSubaccountMessage objects are created with different values for the property being tested, and the sortSubaccountEvents method is called with these objects. The test case then asserts that the AnnotatedSubaccountMessage objects are sorted in the expected order.

  • 221-308: The new hunk introduces a new test case for the retainLastFillEventsForSubaccountMessages method of the KafkaPublisher class. This method appears to retain only the last fill event per order ID in the subaccountMessages array. The test case creates several AnnotatedSubaccountMessage objects with different blockHeight, transactionIndex, eventIndex, contents, subaccountId, version, orderId, and isFill properties. Some of these objects represent fill events, while others do not. The retainLastFillEventsForSubaccountMessages method is called with these objects, and the test case then asserts that only the last fill event per order ID and the non-fill events are retained in the subaccountMessages array. The test case also asserts that the publish method calls the producer.send function with the expected arguments.

indexer/services/ender/src/lib/kafka-publisher.ts (4)
  • 30-36: The KafkaPublisher class now uses AnnotatedSubaccountMessage instead of SubaccountMessage. Ensure that all instances of KafkaPublisher have been updated to use AnnotatedSubaccountMessage and that the additional properties orderId and isFill are being correctly set and used where necessary.

  • 54-55: The addEvent method now uses a helper function getMessages to retrieve the appropriate message array based on the topic. This is a good refactor as it reduces code duplication and improves readability.

  • 51-143: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [130-156]

The sortEvents method sorts events based on block height, transaction index, and event index. This is a good approach to ensure that events are processed in the correct order. However, the method throws an error if the topic is not KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS or KafkaTopics.TO_WEBSOCKETS_TRADES. Ensure that this method is only called with these topics, or consider handling other topics as well.

  • 182-185: The generateAllTopicKafkaMessages method now calls retainLastFillEventsForSubaccountMessages before generating all topic Kafka messages. This is a good approach to ensure that only the most recent fill event for each order ID is included in the Kafka messages. However, ensure that this does not cause any issues with the order or timing of the Kafka messages.

Comment on lines 64 to 78
private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
switch (kafkaTopic) {
case KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS:
this.subaccountMessages.push(event.message);
break;
return this.subaccountMessages;
case KafkaTopics.TO_WEBSOCKETS_TRADES:
this.tradeMessages.push(event.message);
break;
return this.tradeMessages;
case KafkaTopics.TO_WEBSOCKETS_MARKETS:
this.marketMessages.push(event.message);
break;
return this.marketMessages;
case KafkaTopics.TO_WEBSOCKETS_CANDLES:
this.candleMessages.push(event.message);
break;
return this.candleMessages;
case KafkaTopics.TO_VULCAN:
this.vulcanMessages.push(event.message);
break;
return this.vulcanMessages;
default:
throw new Error('Invalid Topic');
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getMessages method returns an array of messages for a given Kafka topic. It uses a switch statement to determine which message array to return based on the topic. This is a good use of a switch statement as it makes the code more readable and maintainable. However, the method returns any[] | undefined, which is not type-safe. Consider using a more specific type or a union of specific types instead of any.

- private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
+ private getMessages(kafkaTopic: KafkaTopics): (AnnotatedSubaccountMessage | SingleTradeMessage | MarketMessage | CandleMessage)[] | undefined {

Comment on lines 86 to 111
public retainLastFillEventsForSubaccountMessages() {
// Create a map to store the last fill event per order ID
const lastFillEvents: Record<string, AnnotatedSubaccountMessage> = {};
const nonFillEvents: AnnotatedSubaccountMessage[] = [];

this.subaccountMessages.forEach((message) => {
if (message.isFill && message.orderId) {
const orderId = message.orderId;
// If we haven't seen this order ID before or if the current message
// has a higher block height, update the lastFillEvents for this order ID
if (
!lastFillEvents[orderId] ||
this.compareMessages(message, lastFillEvents[orderId]) > 0
) {
lastFillEvents[orderId] = message;
}
} else {
nonFillEvents.push(message);
}
});

this.subaccountMessages = Object.values(lastFillEvents)
.concat(nonFillEvents)
.map((annotatedMessage) => convertToSubaccountMessage(annotatedMessage));
this.sortEvents(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retainLastFillEventsForSubaccountMessages method sorts and retains only the last fill event per order ID in the subaccountMessages array. This is a good approach to ensure that only the most recent fill event for each order ID is kept. However, the method mutates the subaccountMessages array directly, which can lead to unexpected side effects. Consider creating a new array instead of mutating the existing one.

- this.subaccountMessages = Object.values(lastFillEvents)
+ const updatedSubaccountMessages = Object.values(lastFillEvents)

* and event index in ascending order per order id. Only keep the subaccount message if
* it represents the last fill event per order id.
*/
public retainLastFillEventsForSubaccountMessages() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would lead to all prior fills for an order (if there were multiple) to also be omitted, and that would be incorrect.

E.g. Imagine order A has 2 fills:

  • message 1: {fills: [ .... 40% of order size], order: [ .... status is OPEN, total filled is 40%]}
  • message 2: {fills: [ .... 60% of order size], order: [..... status is FILLED, total filled is 100%]}

The websocket message I expect to see as a client is:

  • message: {fills: [.... 40% of order size, .... 60% of order size], order: [....status is FILLED, total filled is 100%]}

However with this logic what the client sees is:

  • message: {fills: [....60% of order size], order: [....status is FILLED, total filled is 100%]}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (message.isFill && message.orderId) {
const orderId = message.orderId;
// If we haven't seen this order ID before or if the current message
// has a higher block height, update the lastFillEvents for this order ID
Copy link
Contributor

@vincentwschau vincentwschau Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// has a higher block height, update the lastFillEvents for this order ID
// was associated with a later event, update the lastFillEvents for this order ID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public sortTradeEvents() {
this.tradeMessages = this.tradeMessages.sort(
(a: SingleTradeMessage, b: SingleTradeMessage) => {
public sortEvents(kafkaTopic: KafkaTopics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this be made a generic function that takes in OrderedMessage and sorts them and returns a sorted list rather than checking if the topic is sortable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 0

Commits Files that changed from the base of the PR and between 3e09681 and 02b63da.
Files selected for processing (3)
  • indexer/services/ender/tests/lib/kafka-publisher.test.ts (2 hunks)
  • indexer/services/ender/src/lib/candles-generator.ts (2 hunks)
  • indexer/services/ender/src/lib/kafka-publisher.ts (4 hunks)
Files skipped from review due to trivial changes (1)
  • indexer/services/ender/src/lib/candles-generator.ts
Additional comments (Suppressed): 10
indexer/services/ender/__tests__/lib/kafka-publisher.test.ts (3)
  • 1-50: The new hunk introduces additional imports that are necessary for the updated tests. The AnnotatedSubaccountMessage type is imported, which extends the SubaccountMessage type with two additional properties: orderId and isFill. This new type is used in the KafkaPublisher class, where the subaccountMessages property now holds AnnotatedSubaccountMessage objects. The convertToSubaccountMessage function is also imported, which is used to convert AnnotatedSubaccountMessage objects to SubaccountMessage objects. The defaultWalletAddress constant is imported, which is used in the tests to simulate a wallet address. The createConsolidatedKafkaEventFromSubaccount function is imported, which is used to create ConsolidatedKafkaEvent objects from AnnotatedSubaccountMessage objects. The generateFillSubaccountMessage, generateOrderSubaccountMessage, and generateTransferContents functions are imported, which are used to generate different types of subaccount messages.

  • 169-238: The new hunk introduces a new test suite for the sortSubaccountEvents method of the KafkaPublisher class. This method is used to sort subaccount events based on different fields such as blockHeight, transactionIndex, and eventIndex. The test suite includes test cases that create a KafkaPublisher instance, add subaccount events with different values for the specified field, call the sortSubaccountEvents method, and assert that the subaccount events are sorted in the expected order.

  • 241-410: The new hunk introduces a new test suite for the aggregateFillEventsForSubaccountMessages method of the KafkaPublisher class. This method is used to aggregate all fill events per order ID and sort the messages. The test suite includes a test case that creates a KafkaPublisher instance, adds several subaccount messages with different fill events and order IDs, calls the aggregateFillEventsForSubaccountMessages method, and asserts that the subaccount messages are aggregated and sorted as expected. The test case also calls the publish method and asserts that the producer.send function is called with the expected arguments.

indexer/services/ender/src/lib/kafka-publisher.ts (7)
  • 34-40: The subaccountMessages property of the KafkaPublisher class has been changed from SubaccountMessage[] to AnnotatedSubaccountMessage[]. This change is in line with the introduction of the AnnotatedSubaccountMessage type, which extends the SubaccountMessage type with two additional properties: orderId and isFill. This change allows for more detailed tracking of subaccount messages, particularly those related to order fills.

  • 57-83: The addEvent method has been simplified by using a helper function getMessages to retrieve the appropriate message array based on the topic. This change improves the readability and maintainability of the code by reducing the complexity of the addEvent method and encapsulating the logic for retrieving the message array in a separate function.

  • 91-110: Two new helper methods extractFills and replaceFills have been added to handle the extraction and replacement of fill events in the subaccount message contents. These methods improve the modularity of the code by encapsulating specific tasks in separate functions. They also improve the readability of the code by providing descriptive names for these tasks.

  • 121-165: A new method aggregateFillEventsForSubaccountMessages has been added to sort and retain only the last fill event per order ID in the subaccountMessages array. This method addresses the issue of multiple fill events for an Immediate-Or-Cancel (IOC) order causing an invalid state transition from CANCELED to FILLED in the front-end. The method improves the accuracy of the state transitions by ensuring that only the last fill event per order ID is retained.

  • 170-178: A new helper method compareMessages has been added to compare two AnnotatedSubaccountMessage objects based on block height, transaction index, and event index. This method improves the modularity of the code by encapsulating the comparison logic in a separate function. It also improves the readability of the code by providing a descriptive name for this task.

  • 55-189: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [184-202]

The sortTradeEvents method has been replaced with a more general sortEvents method that can sort any array of OrderedMessage objects. This change improves the flexibility of the code by allowing any array of OrderedMessage objects to be sorted, not just trade events.

  • 225-231: The aggregateFillEventsForSubaccountMessages method is now called in the generateAllTopicKafkaMessages method if there are any subaccount messages. This change ensures that the fill events for subaccount messages are aggregated before the Kafka messages are generated.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 1

Commits Files that changed from the base of the PR and between 02b63da and 7073324.
Files selected for processing (1)
  • indexer/services/ender/src/lib/kafka-publisher.ts (4 hunks)
Additional comments (Suppressed): 6
indexer/services/ender/src/lib/kafka-publisher.ts (6)
  • 34-40: The subaccountMessages property of the KafkaPublisher class has been updated to hold AnnotatedSubaccountMessage objects instead of SubaccountMessage objects. This change is in line with the introduction of the AnnotatedSubaccountMessage type, which extends SubaccountMessage and adds orderId and isFill properties. This change will allow the KafkaPublisher class to handle messages that are annotated with additional information about the order and fill status.

  • 91-110: The extractFills and replaceFills methods have been added to extract fill events from SubaccountMessageContents and replace the fills in SubaccountMessageContents, respectively. These methods are used in the aggregateFillEventsForSubaccountMessages method to handle multiple fills in a single block for an Immediate-Or-Cancel (IOC) order.

  • 121-165: The aggregateFillEventsForSubaccountMessages method has been added to sort and retain only the last fill event per order ID in the subaccountMessages array. This method is called in the generateAllTopicKafkaMessages method before generating all topic Kafka messages. This change addresses the issue of multiple fills in a single block for an IOC order, which can lead to an invalid state transition from CANCELED to FILLED.

  • 170-178: The compareMessages method has been added to compare two AnnotatedSubaccountMessage objects based on block height, transaction index, and event index. This method is used in the aggregateFillEventsForSubaccountMessages method to determine the order of fill events.

  • 55-189: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [184-202]

The sortEvents method has been added to sort events based on block height, transaction index, and event index. This method is called in the aggregateFillEventsForSubaccountMessages method after aggregating fill events. This change ensures that the events are sorted in the correct order before being published.

  • 225-231: The publish method now calls the aggregateFillEventsForSubaccountMessages method before generating all topic Kafka messages. This change ensures that only the last fill event per order ID is retained in the subaccountMessages array before the messages are published.

Comment on lines 57 to 83
public addEvent(event: ConsolidatedKafkaEvent) {
switch (event.topic) {
this.getMessages(event.topic)!.push(event.message);
}

/**
* Helper function to get messages for a given topic.
*
* @param kafkaTopic
* @private
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
switch (kafkaTopic) {
case KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS:
this.subaccountMessages.push(event.message);
break;
return this.subaccountMessages;
case KafkaTopics.TO_WEBSOCKETS_TRADES:
this.tradeMessages.push(event.message);
break;
return this.tradeMessages;
case KafkaTopics.TO_WEBSOCKETS_MARKETS:
this.marketMessages.push(event.message);
break;
return this.marketMessages;
case KafkaTopics.TO_WEBSOCKETS_CANDLES:
this.candleMessages.push(event.message);
break;
return this.candleMessages;
case KafkaTopics.TO_VULCAN:
this.vulcanMessages.push(event.message);
break;
return this.vulcanMessages;
default:
throw new Error('Invalid Topic');
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addEvent method has been updated to use a helper function getMessages to retrieve the appropriate message array based on the topic. This change simplifies the addEvent method and makes it easier to add new topics in the future. However, the getMessages method returns any[] | undefined, which could lead to potential type safety issues. Consider refining the return type to be more specific.

- private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
+ private getMessages(kafkaTopic: KafkaTopics): (AnnotatedSubaccountMessage | SingleTradeMessage)[] | undefined {

@dydxwill dydxwill changed the title [IND-288] send final message per order id for order fills [IND-288] send final message per order id for order fills, with aggregated fill sfrom the entire block Oct 19, 2023
Comment on lines 118 to 119
* Due to separate handlers for order fills, we can be sure that if a message is annotated
* with a fill, it should only contain data about a fill / order and not transfers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Change this to note that the message should only contain a single fill / order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private extractFills(contents: string): FillSubaccountMessageContents[] | undefined {
try {
const subaccountContents:
SubaccountMessageContents = JSON.parse(contents) as SubaccountMessageContents;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's add a TODO + linear ticket for adding a better way to aggregate messages. In addition to orders, we also want to make it so perpetual positions / asset positions only have a single update per block, so a more generic solution would be better. I think the current fix is fine for the short-term.
Having to do an extra JSON.parse per order message is un-ideal, and the JSON.stringify in the order handler is now redundant. It'd be better to add more generic logic to aggregate different types of subaccount messages rather than do post-processing.

Copy link
Contributor

@vincentwschau vincentwschau Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, what do you think about updating the annotated messages such that it also contains a contents property to avoid re-parsing all the message contents? I think that's a small enough change that can be scoped into this fix, rather than waiting for the ticket to allow more generic aggregation of subaccount events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
});

// Replace the fills in the last event for fills with all the fills for that order ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Replace the fills in the last event for fills with all the fills for that order ID
// Update the last event for the order ID such that it has all the fills that occurred for the order ID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* where the first event should be the earliest event in the block.
*/
public sortEvents(msgs: OrderedMessage[]) {
if (msgs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Would calling .sort on an empty array lead to an error / undesirable outcome?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

* @private
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is it possible to create a type other than any for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

eventIndex: 3,
contents: JSON.stringify(msg3Contents),
subaccountId: {
owner: 'owner3',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For the sake of correct-ness, this should match msg1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 197 to 203
export function convertToSubaccountMessage(
annotatedMessage: AnnotatedSubaccountMessage,
): SubaccountMessage {
const subaccountMessage: SubaccountMessage = _.omit(annotatedMessage, ['orderId', 'isFill']);
return subaccountMessage;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is there a helpers file this can go into?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to helpers

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 0

Commits Files that changed from the base of the PR and between 7073324 and d7902ed.
Files selected for processing (6)
  • indexer/services/ender/tests/lib/kafka-publisher.test.ts (2 hunks)
  • indexer/services/ender/src/handlers/handler.ts (3 hunks)
  • indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts (1 hunks)
  • indexer/services/ender/src/lib/helper.ts (2 hunks)
  • indexer/services/ender/src/lib/kafka-publisher.ts (3 hunks)
  • indexer/services/ender/src/lib/types.ts (3 hunks)
Additional comments (Suppressed): 17
indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts (1)
  • 407-413: The generateConsolidatedSubaccountKafkaEvent method now includes two additional parameters: order?.id and true. Ensure that all calls to this method throughout the codebase have been updated to match the new signature. Also, verify that the order object is always defined when this method is called, as order?.id could potentially be undefined.
indexer/services/ender/src/handlers/handler.ts (3)
  • 10-30: The import statements have been updated to include the new AnnotatedSubaccountMessage type and SubaccountMessageContents from the respective modules. Ensure that these new types are correctly defined and exported in their respective modules.

  • 105-111: The generateConsolidatedSubaccountKafkaEvent method now accepts three additional optional parameters: orderId, isFill, and subaccountMessageContents. Make sure that all calls to this method throughout the codebase have been updated to pass these parameters where necessary.

  • 119-127: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [113-125]

The subaccountMessage object now includes the new properties orderId, isFill, and subaccountMessageContents. Ensure that these properties are correctly handled in the downstream code where this object is consumed.

indexer/services/ender/src/lib/helper.ts (2)
  • 21-38: The import statements have been updated to include SubaccountMessage from @dydxprotocol-indexer/v4-protos and AnnotatedSubaccountMessage from ./types. Ensure that these new imports are used appropriately in the code.

  • 58-69: A new function convertToSubaccountMessage has been added. This function takes an AnnotatedSubaccountMessage as input and returns a SubaccountMessage by omitting certain properties. This function seems to be used for converting an AnnotatedSubaccountMessage to a SubaccountMessage. Ensure that this function is used correctly and that the omitted properties are not required in the SubaccountMessage.

indexer/services/ender/src/lib/types.ts (3)
  • 3-12: The import statement has been updated to include SubaccountMessageContents from @dydxprotocol-indexer/postgres. Ensure that this new import is used appropriately in the codebase.

  • 32-37: The import of DateTime from luxon and the EventHandlerData interface have been removed. Verify that these changes do not affect other parts of the codebase that might be using them.

  • 180-199: A new interface AnnotatedSubaccountMessage has been introduced, extending SubaccountMessage with additional properties orderId, isFill, and subaccountMessageContents. The ConsolidatedKafkaEvent type has been updated to use AnnotatedSubaccountMessage instead of SubaccountMessage. Ensure that these changes are reflected in the rest of the codebase where these types and interfaces are used.

indexer/services/ender/__tests__/lib/kafka-publisher.test.ts (3)
  • 1-50: The new hunk introduces additional imports from @dydxprotocol-indexer/postgres and ../../src/lib/types to support the new functionality being added. It also includes new helper functions for generating subaccount and transfer contents. The changes are consistent with the PR summary and seem to be necessary for the new functionality.

  • 170-240: The sortEvents method is now being used to sort both trade and subaccount events. This is a good change as it reduces code duplication and makes the code more maintainable. However, it's important to ensure that the sortEvents method can handle both types of events correctly.

  • 242-403: The aggregateFillEventsForSubaccountMessages test case is introduced to verify the new functionality of aggregating multiple fills in a block for an IOC order into a single websocket message. The test case seems to cover the main scenarios and edge cases. However, it's important to ensure that the aggregateFillEventsForSubaccountMessages method handles all possible edge cases correctly.

indexer/services/ender/src/lib/kafka-publisher.ts (5)
  • 6-40: The new hunk introduces a new type AnnotatedSubaccountMessage and replaces the type of subaccountMessages from SubaccountMessage[] to AnnotatedSubaccountMessage[]. This change is likely to affect the rest of the code where subaccountMessages is used. Ensure that all such instances are updated to handle AnnotatedSubaccountMessage instead of SubaccountMessage.

  • 57-82: The addEvent method has been refactored to use a new helper function getMessages which returns the appropriate message array based on the provided kafkaTopic. This change improves the readability and maintainability of the code.

  • 93-152: The new method aggregateFillEventsForSubaccountMessages is introduced to aggregate multiple fill events for an order into a single message. This is a significant change and should be thoroughly tested to ensure it works as expected and does not introduce any unintended side effects.

  • 158-174: The sortTradeEvents method has been renamed to sortEvents and updated to handle any OrderedMessage, not just trade events. This change makes the method more versatile and reusable.

  • 197-203: The publish method now calls the aggregateFillEventsForSubaccountMessages method before generating all topic Kafka messages. This change is part of the main feature introduced in this PR, which is to aggregate multiple fill events for an order into a single message.

* contents contains all individual fills from that block.
*
* Due to separate handlers for order fills, we can be sure that if a message is annotated
* with a fill, it should only contain data about a single fill / order and not transfers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not transfers or positions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +385 to +387
convertToSubaccountMessage(message4),
convertToSubaccountMessage(message2),
convertToSubaccountMessage(mergedMessage3),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the right ordering? Shouldn't message 4 come after message 3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

message 4 has txn index 1/event index 4.
msg 2 has txn index 2/ event index1.
msg 3 has txn index 3/event index 1.

if (message.isFill && message.orderId) {
const fills:
FillSubaccountMessageContents[] | undefined = message.subaccountMessageContents?.fills;
const orderId = message.orderId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 127 to 128
const lastEvent = lastEventForFills[orderId];
const fills = allFillEvents[orderId];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return -1;
} else if (Big(a.blockHeight).gt(b.blockHeight)) {
return 1;
public aggregateFillEventsForSubaccountMessages() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it still makes sense to add the TODO, to genericize so we can do a similar thing for positions / create a linear ticket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 0

Commits Files that changed from the base of the PR and between d7902ed and 9a33378.
Files selected for processing (1)
  • indexer/services/ender/src/lib/kafka-publisher.ts (3 hunks)
Additional comments (Suppressed): 6
indexer/services/ender/src/lib/kafka-publisher.ts (6)
  • 26-40: The KafkaPublisher class now uses AnnotatedSubaccountMessage[] instead of SubaccountMessage[] for subaccountMessages. This change is in line with the introduction of the AnnotatedSubaccountMessage interface, which extends SubaccountMessage with additional properties. Ensure that all usages of subaccountMessages in the codebase are updated to handle AnnotatedSubaccountMessage objects.

  • 57-82: The addEvent method has been refactored to use the new getMessages helper function, which returns the appropriate message array based on the topic. This change improves the readability and maintainability of the code.

  • 94-141: The sortTradeEvents method has been renamed to aggregateFillEventsForSubaccountMessages and updated to aggregate fill events for subaccount messages. This change is in line with the PR's goal of aggregating multiple fills in a block for an IOC order into a single websocket message. The logic seems sound, but it's important to ensure that the aggregation of fill events doesn't inadvertently omit any important information.

  • 146-154: The compareMessages helper function is introduced to compare two AnnotatedSubaccountMessage objects based on block height, transaction index, and event index. This function is used in the aggregateFillEventsForSubaccountMessages method to determine the order of fill events.

  • 160-176: The sortEvents method is introduced to sort events by block height, transaction index, and event index in ascending order. This method is used in the aggregateFillEventsForSubaccountMessages method to sort the aggregated fill events.

  • 199-205: The generateAllTopicKafkaMessages method now calls aggregateFillEventsForSubaccountMessages before generating Kafka messages for subaccountMessages. This ensures that the fill events are properly aggregated before they are published to Kafka.

@dydxwill dydxwill merged commit 11c9acb into main Oct 20, 2023
11 checks passed
@dydxwill dydxwill deleted the wl/send_final_fill_data branch October 20, 2023 15:47
clemire pushed a commit that referenced this pull request Oct 26, 2023
clemire pushed a commit that referenced this pull request Oct 26, 2023
…gated fill sfrom the entire block (#658)

(cherry picked from commit 11c9acb)
roy-dydx pushed a commit that referenced this pull request Oct 26, 2023
…gated fill sfrom the entire block (#658)

(cherry picked from commit 11c9acb)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working indexer
Development

Successfully merging this pull request may close these issues.

2 participants