-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IND-288] send final message per order id for order fills, with aggregated fill sfrom the entire block #658
Conversation
WalkthroughThe changes primarily focus on improving the handling, sorting, and aggregation of subaccount messages and fill events in the Kafka publisher. A new interface Changes
TipsChat with CodeRabbit Bot (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Files selected for processing (6)
- indexer/services/ender/tests/lib/kafka-publisher.test.ts (3 hunks)
- indexer/services/ender/src/handlers/handler.ts (3 hunks)
- indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts (1 hunks)
- indexer/services/ender/src/lib/candles-generator.ts (2 hunks)
- indexer/services/ender/src/lib/kafka-publisher.ts (4 hunks)
- indexer/services/ender/src/lib/types.ts (1 hunks)
Files skipped from review due to trivial changes (2)
- indexer/services/ender/tests/lib/kafka-publisher.test.ts
- indexer/services/ender/src/lib/candles-generator.ts
Additional comments (Suppressed): 12
indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts (1)
- 407-412: The
generateConsolidatedSubaccountKafkaEvent
method now accepts two additional parameters:orderId
andisFill
. Ensure that all calls to this method throughout the codebase have been updated to match the new signature. Also, verify that theorder
object is guaranteed to be non-null when accessingorder.id
, as this could potentially cause a runtime error iforder
is null or undefined.indexer/services/ender/src/lib/types.ts (2)
191-194: The new
AnnotatedSubaccountMessage
interface extendsSubaccountMessage
with two optional properties:orderId
andisFill
. This change seems to be in line with the PR's goal of consolidating multiple fill events for a single order into a single websocket message. However, ensure that the rest of the codebase that usesSubaccountMessage
can handle the new properties without any issues.201-206: The
ConsolidatedKafkaEvent
type now usesAnnotatedSubaccountMessage
instead ofSubaccountMessage
. This change is consistent with the introduction of theAnnotatedSubaccountMessage
interface. However, make sure that all parts of the codebase that useConsolidatedKafkaEvent
are updated to handleAnnotatedSubaccountMessage
.indexer/services/ender/src/handlers/handler.ts (3)
25-26: The import statement now includes
AnnotatedSubaccountMessage
which extends the previousSubaccountMessage
with two additional properties:orderId
andisFill
. Ensure that this new type is correctly defined and used throughout the codebase.105-108: The
generateConsolidatedSubaccountKafkaEvent
method now accepts two additional optional parameters:orderId
andisFill
. Make sure that all calls to this method have been updated accordingly.117-124: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [111-122]
The
subaccountMessage
is now of typeAnnotatedSubaccountMessage
and includes the neworderId
andisFill
properties. Ensure that these properties are correctly populated and used in the rest of the code.indexer/services/ender/src/lib/kafka-publisher.ts (6)
28-31: The type of
subaccountMessages
has been changed fromSubaccountMessage[]
toAnnotatedSubaccountMessage[]
. Ensure that all usages ofsubaccountMessages
throughout the codebase have been updated to handleAnnotatedSubaccountMessage
objects.49-50: The
addEvent
method has been updated to use thegetMessages
helper function to add an event to the appropriate message array based on its topic. This is a more flexible approach than the previous switch-case structure.81-104: The
retainLastFillEventsForSubaccountMessages
method has been added to retain only the last fill event per order ID insubaccountMessages
. This is a significant change that affects how fill events are handled and sent to the websocket. Ensure that this change does not negatively impact any systems that rely on receiving all fill events for an order.109-117: The
compareMessages
helper function has been added to compare twoAnnotatedSubaccountMessage
objects based on block height, transaction index, and event index. This function is used inretainLastFillEventsForSubaccountMessages
to determine which fill event is the latest for a given order ID.46-139: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [123-149]
The
sortEvents
method has been added to sort events based on block height, transaction index, and event index. This method replaces the previoussortTradeEvents
method and is more flexible as it can sort both subaccount and trade events. However, it throws an error if the topic is not subaccounts or trades. Ensure that this method is only called with the appropriate topics.
- 175-176: The
retainLastFillEventsForSubaccountMessages
method is now called ingenerateAllTopicKafkaMessages
before generating Kafka messages forsubaccountMessages
. This ensures that only the last fill event per order ID is included in the Kafka messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 2
Files selected for processing (4)
- indexer/services/ender/tests/helpers/kafka-publisher-helpers.ts (2 hunks)
- indexer/services/ender/tests/lib/kafka-publisher.test.ts (2 hunks)
- indexer/services/ender/src/lib/kafka-publisher.ts (4 hunks)
- indexer/services/ender/src/lib/types.ts (2 hunks)
Additional comments (Suppressed): 13
indexer/services/ender/__tests__/helpers/kafka-publisher-helpers.ts (2)
5-5: The import of
AnnotatedSubaccountMessage
is new. Ensure that this type is correctly defined and used in the code.44-51: The new function
createConsolidatedKafkaEventFromSubaccount
is introduced to create aConsolidatedKafkaEvent
from aAnnotatedSubaccountMessage
. This function seems to be correctly implemented, but ensure that it is used properly in the rest of the codebase.indexer/services/ender/src/lib/types.ts (4)
31-37: The import of lodash (
_
) is new in this hunk. Ensure that it is used in the codebase and that it doesn't introduce unnecessary overhead if its functionalities are not fully utilized. If only specific methods from lodash are used, consider importing only those methods to reduce the bundle size.192-195: The
AnnotatedSubaccountMessage
interface extendsSubaccountMessage
with two optional properties:orderId
andisFill
. This is a good practice as it maintains the original structure ofSubaccountMessage
while adding new properties, thus ensuring backward compatibility.197-202: The
convertToSubaccountMessage
function is used to convert anAnnotatedSubaccountMessage
back to aSubaccountMessage
by omitting theorderId
andisFill
properties. This function uses lodash'somit
method, which is a good choice for this operation. However, be aware that lodash'somit
function can be slow for large objects because it creates a new object. If performance becomes an issue, consider using a more performant way to omit properties.209-214: The
ConsolidatedKafkaEvent
type now usesAnnotatedSubaccountMessage
instead ofSubaccountMessage
. This change should be compatible with the existing codebase as long as the new properties (orderId
andisFill
) are optional and are handled correctly in the code that usesConsolidatedKafkaEvent
.indexer/services/ender/__tests__/lib/kafka-publisher.test.ts (3)
1-30: The new hunk introduces additional imports from the
@dydxprotocol-indexer/kafka
and@dydxprotocol-indexer/postgres
packages, as well as from the localtypes
andhelpers
modules. TheAnnotatedSubaccountMessage
andconvertToSubaccountMessage
types are now being imported, which suggests that theKafkaPublisher
class may now be handlingAnnotatedSubaccountMessage
objects. ThecreateConsolidatedKafkaEventFromSubaccount
helper function is also being imported, which suggests that theKafkaPublisher
class may now be creatingConsolidatedKafkaEvent
objects fromAnnotatedSubaccountMessage
objects.149-218: The new hunk introduces a new test case for the
sortSubaccountEvents
method of theKafkaPublisher
class. This method appears to sortAnnotatedSubaccountMessage
objects based on theblockHeight
,transactionIndex
, andeventIndex
properties. The test case uses theit.each
function to test the sorting ofAnnotatedSubaccountMessage
objects based on each of these properties. TheAnnotatedSubaccountMessage
objects are created with different values for the property being tested, and thesortSubaccountEvents
method is called with these objects. The test case then asserts that theAnnotatedSubaccountMessage
objects are sorted in the expected order.221-308: The new hunk introduces a new test case for the
retainLastFillEventsForSubaccountMessages
method of theKafkaPublisher
class. This method appears to retain only the last fill event per order ID in thesubaccountMessages
array. The test case creates severalAnnotatedSubaccountMessage
objects with differentblockHeight
,transactionIndex
,eventIndex
,contents
,subaccountId
,version
,orderId
, andisFill
properties. Some of these objects represent fill events, while others do not. TheretainLastFillEventsForSubaccountMessages
method is called with these objects, and the test case then asserts that only the last fill event per order ID and the non-fill events are retained in thesubaccountMessages
array. The test case also asserts that thepublish
method calls theproducer.send
function with the expected arguments.indexer/services/ender/src/lib/kafka-publisher.ts (4)
30-36: The
KafkaPublisher
class now usesAnnotatedSubaccountMessage
instead ofSubaccountMessage
. Ensure that all instances ofKafkaPublisher
have been updated to useAnnotatedSubaccountMessage
and that the additional propertiesorderId
andisFill
are being correctly set and used where necessary.54-55: The
addEvent
method now uses a helper functiongetMessages
to retrieve the appropriate message array based on the topic. This is a good refactor as it reduces code duplication and improves readability.51-143: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [130-156]
The
sortEvents
method sorts events based on block height, transaction index, and event index. This is a good approach to ensure that events are processed in the correct order. However, the method throws an error if the topic is notKafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS
orKafkaTopics.TO_WEBSOCKETS_TRADES
. Ensure that this method is only called with these topics, or consider handling other topics as well.
- 182-185: The
generateAllTopicKafkaMessages
method now callsretainLastFillEventsForSubaccountMessages
before generating all topic Kafka messages. This is a good approach to ensure that only the most recent fill event for each order ID is included in the Kafka messages. However, ensure that this does not cause any issues with the order or timing of the Kafka messages.
private getMessages(kafkaTopic: KafkaTopics): any[] | undefined { | ||
switch (kafkaTopic) { | ||
case KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS: | ||
this.subaccountMessages.push(event.message); | ||
break; | ||
return this.subaccountMessages; | ||
case KafkaTopics.TO_WEBSOCKETS_TRADES: | ||
this.tradeMessages.push(event.message); | ||
break; | ||
return this.tradeMessages; | ||
case KafkaTopics.TO_WEBSOCKETS_MARKETS: | ||
this.marketMessages.push(event.message); | ||
break; | ||
return this.marketMessages; | ||
case KafkaTopics.TO_WEBSOCKETS_CANDLES: | ||
this.candleMessages.push(event.message); | ||
break; | ||
return this.candleMessages; | ||
case KafkaTopics.TO_VULCAN: | ||
this.vulcanMessages.push(event.message); | ||
break; | ||
return this.vulcanMessages; | ||
default: | ||
throw new Error('Invalid Topic'); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getMessages
method returns an array of messages for a given Kafka topic. It uses a switch statement to determine which message array to return based on the topic. This is a good use of a switch statement as it makes the code more readable and maintainable. However, the method returns any[] | undefined
, which is not type-safe. Consider using a more specific type or a union of specific types instead of any
.
- private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
+ private getMessages(kafkaTopic: KafkaTopics): (AnnotatedSubaccountMessage | SingleTradeMessage | MarketMessage | CandleMessage)[] | undefined {
public retainLastFillEventsForSubaccountMessages() { | ||
// Create a map to store the last fill event per order ID | ||
const lastFillEvents: Record<string, AnnotatedSubaccountMessage> = {}; | ||
const nonFillEvents: AnnotatedSubaccountMessage[] = []; | ||
|
||
this.subaccountMessages.forEach((message) => { | ||
if (message.isFill && message.orderId) { | ||
const orderId = message.orderId; | ||
// If we haven't seen this order ID before or if the current message | ||
// has a higher block height, update the lastFillEvents for this order ID | ||
if ( | ||
!lastFillEvents[orderId] || | ||
this.compareMessages(message, lastFillEvents[orderId]) > 0 | ||
) { | ||
lastFillEvents[orderId] = message; | ||
} | ||
} else { | ||
nonFillEvents.push(message); | ||
} | ||
}); | ||
|
||
this.subaccountMessages = Object.values(lastFillEvents) | ||
.concat(nonFillEvents) | ||
.map((annotatedMessage) => convertToSubaccountMessage(annotatedMessage)); | ||
this.sortEvents(KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retainLastFillEventsForSubaccountMessages
method sorts and retains only the last fill event per order ID in the subaccountMessages
array. This is a good approach to ensure that only the most recent fill event for each order ID is kept. However, the method mutates the subaccountMessages
array directly, which can lead to unexpected side effects. Consider creating a new array instead of mutating the existing one.
- this.subaccountMessages = Object.values(lastFillEvents)
+ const updatedSubaccountMessages = Object.values(lastFillEvents)
* and event index in ascending order per order id. Only keep the subaccount message if | ||
* it represents the last fill event per order id. | ||
*/ | ||
public retainLastFillEventsForSubaccountMessages() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would lead to all prior fills for an order (if there were multiple) to also be omitted, and that would be incorrect.
E.g. Imagine order A has 2 fills:
- message 1: {fills: [ .... 40% of order size], order: [ .... status is OPEN, total filled is 40%]}
- message 2: {fills: [ .... 60% of order size], order: [..... status is FILLED, total filled is 100%]}
The websocket message I expect to see as a client is:
- message: {fills: [.... 40% of order size, .... 60% of order size], order: [....status is FILLED, total filled is 100%]}
However with this logic what the client sees is:
- message: {fills: [....60% of order size], order: [....status is FILLED, total filled is 100%]}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (message.isFill && message.orderId) { | ||
const orderId = message.orderId; | ||
// If we haven't seen this order ID before or if the current message | ||
// has a higher block height, update the lastFillEvents for this order ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// has a higher block height, update the lastFillEvents for this order ID | |
// was associated with a later event, update the lastFillEvents for this order ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public sortTradeEvents() { | ||
this.tradeMessages = this.tradeMessages.sort( | ||
(a: SingleTradeMessage, b: SingleTradeMessage) => { | ||
public sortEvents(kafkaTopic: KafkaTopics) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can this be made a generic function that takes in OrderedMessage
and sorts them and returns a sorted list rather than checking if the topic is sortable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…wl/send_final_fill_data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Files selected for processing (3)
- indexer/services/ender/tests/lib/kafka-publisher.test.ts (2 hunks)
- indexer/services/ender/src/lib/candles-generator.ts (2 hunks)
- indexer/services/ender/src/lib/kafka-publisher.ts (4 hunks)
Files skipped from review due to trivial changes (1)
- indexer/services/ender/src/lib/candles-generator.ts
Additional comments (Suppressed): 10
indexer/services/ender/__tests__/lib/kafka-publisher.test.ts (3)
1-50: The new hunk introduces additional imports that are necessary for the updated tests. The
AnnotatedSubaccountMessage
type is imported, which extends theSubaccountMessage
type with two additional properties:orderId
andisFill
. This new type is used in theKafkaPublisher
class, where thesubaccountMessages
property now holdsAnnotatedSubaccountMessage
objects. TheconvertToSubaccountMessage
function is also imported, which is used to convertAnnotatedSubaccountMessage
objects toSubaccountMessage
objects. ThedefaultWalletAddress
constant is imported, which is used in the tests to simulate a wallet address. ThecreateConsolidatedKafkaEventFromSubaccount
function is imported, which is used to createConsolidatedKafkaEvent
objects fromAnnotatedSubaccountMessage
objects. ThegenerateFillSubaccountMessage
,generateOrderSubaccountMessage
, andgenerateTransferContents
functions are imported, which are used to generate different types of subaccount messages.169-238: The new hunk introduces a new test suite for the
sortSubaccountEvents
method of theKafkaPublisher
class. This method is used to sort subaccount events based on different fields such asblockHeight
,transactionIndex
, andeventIndex
. The test suite includes test cases that create aKafkaPublisher
instance, add subaccount events with different values for the specified field, call thesortSubaccountEvents
method, and assert that the subaccount events are sorted in the expected order.241-410: The new hunk introduces a new test suite for the
aggregateFillEventsForSubaccountMessages
method of theKafkaPublisher
class. This method is used to aggregate all fill events per order ID and sort the messages. The test suite includes a test case that creates aKafkaPublisher
instance, adds several subaccount messages with different fill events and order IDs, calls theaggregateFillEventsForSubaccountMessages
method, and asserts that the subaccount messages are aggregated and sorted as expected. The test case also calls thepublish
method and asserts that theproducer.send
function is called with the expected arguments.indexer/services/ender/src/lib/kafka-publisher.ts (7)
34-40: The
subaccountMessages
property of theKafkaPublisher
class has been changed fromSubaccountMessage[]
toAnnotatedSubaccountMessage[]
. This change is in line with the introduction of theAnnotatedSubaccountMessage
type, which extends theSubaccountMessage
type with two additional properties:orderId
andisFill
. This change allows for more detailed tracking of subaccount messages, particularly those related to order fills.57-83: The
addEvent
method has been simplified by using a helper functiongetMessages
to retrieve the appropriate message array based on the topic. This change improves the readability and maintainability of the code by reducing the complexity of theaddEvent
method and encapsulating the logic for retrieving the message array in a separate function.91-110: Two new helper methods
extractFills
andreplaceFills
have been added to handle the extraction and replacement of fill events in the subaccount message contents. These methods improve the modularity of the code by encapsulating specific tasks in separate functions. They also improve the readability of the code by providing descriptive names for these tasks.121-165: A new method
aggregateFillEventsForSubaccountMessages
has been added to sort and retain only the last fill event per order ID in thesubaccountMessages
array. This method addresses the issue of multiple fill events for an Immediate-Or-Cancel (IOC) order causing an invalid state transition fromCANCELED
toFILLED
in the front-end. The method improves the accuracy of the state transitions by ensuring that only the last fill event per order ID is retained.170-178: A new helper method
compareMessages
has been added to compare twoAnnotatedSubaccountMessage
objects based on block height, transaction index, and event index. This method improves the modularity of the code by encapsulating the comparison logic in a separate function. It also improves the readability of the code by providing a descriptive name for this task.55-189: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [184-202]
The
sortTradeEvents
method has been replaced with a more generalsortEvents
method that can sort any array ofOrderedMessage
objects. This change improves the flexibility of the code by allowing any array ofOrderedMessage
objects to be sorted, not just trade events.
- 225-231: The
aggregateFillEventsForSubaccountMessages
method is now called in thegenerateAllTopicKafkaMessages
method if there are any subaccount messages. This change ensures that the fill events for subaccount messages are aggregated before the Kafka messages are generated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 1
Files selected for processing (1)
- indexer/services/ender/src/lib/kafka-publisher.ts (4 hunks)
Additional comments (Suppressed): 6
indexer/services/ender/src/lib/kafka-publisher.ts (6)
34-40: The
subaccountMessages
property of theKafkaPublisher
class has been updated to holdAnnotatedSubaccountMessage
objects instead ofSubaccountMessage
objects. This change is in line with the introduction of theAnnotatedSubaccountMessage
type, which extendsSubaccountMessage
and addsorderId
andisFill
properties. This change will allow theKafkaPublisher
class to handle messages that are annotated with additional information about the order and fill status.91-110: The
extractFills
andreplaceFills
methods have been added to extract fill events fromSubaccountMessageContents
and replace the fills inSubaccountMessageContents
, respectively. These methods are used in theaggregateFillEventsForSubaccountMessages
method to handle multiple fills in a single block for an Immediate-Or-Cancel (IOC) order.121-165: The
aggregateFillEventsForSubaccountMessages
method has been added to sort and retain only the last fill event per order ID in thesubaccountMessages
array. This method is called in thegenerateAllTopicKafkaMessages
method before generating all topic Kafka messages. This change addresses the issue of multiple fills in a single block for an IOC order, which can lead to an invalid state transition fromCANCELED
toFILLED
.170-178: The
compareMessages
method has been added to compare twoAnnotatedSubaccountMessage
objects based on block height, transaction index, and event index. This method is used in theaggregateFillEventsForSubaccountMessages
method to determine the order of fill events.55-189: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [184-202]
The
sortEvents
method has been added to sort events based on block height, transaction index, and event index. This method is called in theaggregateFillEventsForSubaccountMessages
method after aggregating fill events. This change ensures that the events are sorted in the correct order before being published.
- 225-231: The
publish
method now calls theaggregateFillEventsForSubaccountMessages
method before generating all topic Kafka messages. This change ensures that only the last fill event per order ID is retained in thesubaccountMessages
array before the messages are published.
public addEvent(event: ConsolidatedKafkaEvent) { | ||
switch (event.topic) { | ||
this.getMessages(event.topic)!.push(event.message); | ||
} | ||
|
||
/** | ||
* Helper function to get messages for a given topic. | ||
* | ||
* @param kafkaTopic | ||
* @private | ||
*/ | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
private getMessages(kafkaTopic: KafkaTopics): any[] | undefined { | ||
switch (kafkaTopic) { | ||
case KafkaTopics.TO_WEBSOCKETS_SUBACCOUNTS: | ||
this.subaccountMessages.push(event.message); | ||
break; | ||
return this.subaccountMessages; | ||
case KafkaTopics.TO_WEBSOCKETS_TRADES: | ||
this.tradeMessages.push(event.message); | ||
break; | ||
return this.tradeMessages; | ||
case KafkaTopics.TO_WEBSOCKETS_MARKETS: | ||
this.marketMessages.push(event.message); | ||
break; | ||
return this.marketMessages; | ||
case KafkaTopics.TO_WEBSOCKETS_CANDLES: | ||
this.candleMessages.push(event.message); | ||
break; | ||
return this.candleMessages; | ||
case KafkaTopics.TO_VULCAN: | ||
this.vulcanMessages.push(event.message); | ||
break; | ||
return this.vulcanMessages; | ||
default: | ||
throw new Error('Invalid Topic'); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The addEvent
method has been updated to use a helper function getMessages
to retrieve the appropriate message array based on the topic. This change simplifies the addEvent
method and makes it easier to add new topics in the future. However, the getMessages
method returns any[] | undefined
, which could lead to potential type safety issues. Consider refining the return type to be more specific.
- private getMessages(kafkaTopic: KafkaTopics): any[] | undefined {
+ private getMessages(kafkaTopic: KafkaTopics): (AnnotatedSubaccountMessage | SingleTradeMessage)[] | undefined {
* Due to separate handlers for order fills, we can be sure that if a message is annotated | ||
* with a fill, it should only contain data about a fill / order and not transfers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Change this to note that the message should only contain a single fill / order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private extractFills(contents: string): FillSubaccountMessageContents[] | undefined { | ||
try { | ||
const subaccountContents: | ||
SubaccountMessageContents = JSON.parse(contents) as SubaccountMessageContents; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's add a TODO + linear ticket for adding a better way to aggregate messages. In addition to orders, we also want to make it so perpetual positions / asset positions only have a single update per block, so a more generic solution would be better. I think the current fix is fine for the short-term.
Having to do an extra JSON.parse per order message is un-ideal, and the JSON.stringify in the order handler is now redundant. It'd be better to add more generic logic to aggregate different types of subaccount messages rather than do post-processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, what do you think about updating the annotated messages such that it also contains a contents
property to avoid re-parsing all the message contents? I think that's a small enough change that can be scoped into this fix, rather than waiting for the ticket to allow more generic aggregation of subaccount events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
}); | ||
|
||
// Replace the fills in the last event for fills with all the fills for that order ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Replace the fills in the last event for fills with all the fills for that order ID | |
// Update the last event for the order ID such that it has all the fills that occurred for the order ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* where the first event should be the earliest event in the block. | ||
*/ | ||
public sortEvents(msgs: OrderedMessage[]) { | ||
if (msgs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? Would calling .sort
on an empty array lead to an error / undesirable outcome?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
* @private | ||
*/ | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
private getMessages(kafkaTopic: KafkaTopics): any[] | undefined { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is it possible to create a type other than any
for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
eventIndex: 3, | ||
contents: JSON.stringify(msg3Contents), | ||
subaccountId: { | ||
owner: 'owner3', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: For the sake of correct-ness, this should match msg1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts
Show resolved
Hide resolved
export function convertToSubaccountMessage( | ||
annotatedMessage: AnnotatedSubaccountMessage, | ||
): SubaccountMessage { | ||
const subaccountMessage: SubaccountMessage = _.omit(annotatedMessage, ['orderId', 'isFill']); | ||
return subaccountMessage; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is there a helpers file this can go into?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to helpers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Files selected for processing (6)
- indexer/services/ender/tests/lib/kafka-publisher.test.ts (2 hunks)
- indexer/services/ender/src/handlers/handler.ts (3 hunks)
- indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts (1 hunks)
- indexer/services/ender/src/lib/helper.ts (2 hunks)
- indexer/services/ender/src/lib/kafka-publisher.ts (3 hunks)
- indexer/services/ender/src/lib/types.ts (3 hunks)
Additional comments (Suppressed): 17
indexer/services/ender/src/handlers/order-fills/abstract-order-fill-handler.ts (1)
- 407-413: The
generateConsolidatedSubaccountKafkaEvent
method now includes two additional parameters:order?.id
andtrue
. Ensure that all calls to this method throughout the codebase have been updated to match the new signature. Also, verify that theorder
object is always defined when this method is called, asorder?.id
could potentially beundefined
.indexer/services/ender/src/handlers/handler.ts (3)
10-30: The import statements have been updated to include the new
AnnotatedSubaccountMessage
type andSubaccountMessageContents
from the respective modules. Ensure that these new types are correctly defined and exported in their respective modules.105-111: The
generateConsolidatedSubaccountKafkaEvent
method now accepts three additional optional parameters:orderId
,isFill
, andsubaccountMessageContents
. Make sure that all calls to this method throughout the codebase have been updated to pass these parameters where necessary.119-127: > Note: This review was outside of the patch, so it was mapped to the patch with the greatest overlap. Original lines [113-125]
The
subaccountMessage
object now includes the new propertiesorderId
,isFill
, andsubaccountMessageContents
. Ensure that these properties are correctly handled in the downstream code where this object is consumed.indexer/services/ender/src/lib/helper.ts (2)
21-38: The import statements have been updated to include
SubaccountMessage
from@dydxprotocol-indexer/v4-protos
andAnnotatedSubaccountMessage
from./types
. Ensure that these new imports are used appropriately in the code.58-69: A new function
convertToSubaccountMessage
has been added. This function takes anAnnotatedSubaccountMessage
as input and returns aSubaccountMessage
by omitting certain properties. This function seems to be used for converting anAnnotatedSubaccountMessage
to aSubaccountMessage
. Ensure that this function is used correctly and that the omitted properties are not required in theSubaccountMessage
.indexer/services/ender/src/lib/types.ts (3)
3-12: The import statement has been updated to include
SubaccountMessageContents
from@dydxprotocol-indexer/postgres
. Ensure that this new import is used appropriately in the codebase.32-37: The import of
DateTime
fromluxon
and theEventHandlerData
interface have been removed. Verify that these changes do not affect other parts of the codebase that might be using them.180-199: A new interface
AnnotatedSubaccountMessage
has been introduced, extendingSubaccountMessage
with additional propertiesorderId
,isFill
, andsubaccountMessageContents
. TheConsolidatedKafkaEvent
type has been updated to useAnnotatedSubaccountMessage
instead ofSubaccountMessage
. Ensure that these changes are reflected in the rest of the codebase where these types and interfaces are used.indexer/services/ender/__tests__/lib/kafka-publisher.test.ts (3)
1-50: The new hunk introduces additional imports from
@dydxprotocol-indexer/postgres
and../../src/lib/types
to support the new functionality being added. It also includes new helper functions for generating subaccount and transfer contents. The changes are consistent with the PR summary and seem to be necessary for the new functionality.170-240: The
sortEvents
method is now being used to sort both trade and subaccount events. This is a good change as it reduces code duplication and makes the code more maintainable. However, it's important to ensure that thesortEvents
method can handle both types of events correctly.242-403: The
aggregateFillEventsForSubaccountMessages
test case is introduced to verify the new functionality of aggregating multiple fills in a block for an IOC order into a single websocket message. The test case seems to cover the main scenarios and edge cases. However, it's important to ensure that theaggregateFillEventsForSubaccountMessages
method handles all possible edge cases correctly.indexer/services/ender/src/lib/kafka-publisher.ts (5)
6-40: The new hunk introduces a new type
AnnotatedSubaccountMessage
and replaces the type ofsubaccountMessages
fromSubaccountMessage[]
toAnnotatedSubaccountMessage[]
. This change is likely to affect the rest of the code wheresubaccountMessages
is used. Ensure that all such instances are updated to handleAnnotatedSubaccountMessage
instead ofSubaccountMessage
.57-82: The
addEvent
method has been refactored to use a new helper functiongetMessages
which returns the appropriate message array based on the providedkafkaTopic
. This change improves the readability and maintainability of the code.93-152: The new method
aggregateFillEventsForSubaccountMessages
is introduced to aggregate multiple fill events for an order into a single message. This is a significant change and should be thoroughly tested to ensure it works as expected and does not introduce any unintended side effects.158-174: The
sortTradeEvents
method has been renamed tosortEvents
and updated to handle anyOrderedMessage
, not just trade events. This change makes the method more versatile and reusable.197-203: The
publish
method now calls theaggregateFillEventsForSubaccountMessages
method before generating all topic Kafka messages. This change is part of the main feature introduced in this PR, which is to aggregate multiple fill events for an order into a single message.
* contents contains all individual fills from that block. | ||
* | ||
* Due to separate handlers for order fills, we can be sure that if a message is annotated | ||
* with a fill, it should only contain data about a single fill / order and not transfers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not transfers or positions
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
convertToSubaccountMessage(message4), | ||
convertToSubaccountMessage(message2), | ||
convertToSubaccountMessage(mergedMessage3), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this the right ordering? Shouldn't message 4 come after message 3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
message 4 has txn index 1/event index 4.
msg 2 has txn index 2/ event index1.
msg 3 has txn index 3/event index 1.
if (message.isFill && message.orderId) { | ||
const fills: | ||
FillSubaccountMessageContents[] | undefined = message.subaccountMessageContents?.fills; | ||
const orderId = message.orderId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
const lastEvent = lastEventForFills[orderId]; | ||
const fills = allFillEvents[orderId]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return -1; | ||
} else if (Big(a.blockHeight).gt(b.blockHeight)) { | ||
return 1; | ||
public aggregateFillEventsForSubaccountMessages() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it still makes sense to add the TODO, to genericize so we can do a similar thing for positions / create a linear ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 0
Files selected for processing (1)
- indexer/services/ender/src/lib/kafka-publisher.ts (3 hunks)
Additional comments (Suppressed): 6
indexer/services/ender/src/lib/kafka-publisher.ts (6)
26-40: The
KafkaPublisher
class now usesAnnotatedSubaccountMessage[]
instead ofSubaccountMessage[]
forsubaccountMessages
. This change is in line with the introduction of theAnnotatedSubaccountMessage
interface, which extendsSubaccountMessage
with additional properties. Ensure that all usages ofsubaccountMessages
in the codebase are updated to handleAnnotatedSubaccountMessage
objects.57-82: The
addEvent
method has been refactored to use the newgetMessages
helper function, which returns the appropriate message array based on the topic. This change improves the readability and maintainability of the code.94-141: The
sortTradeEvents
method has been renamed toaggregateFillEventsForSubaccountMessages
and updated to aggregate fill events for subaccount messages. This change is in line with the PR's goal of aggregating multiple fills in a block for an IOC order into a single websocket message. The logic seems sound, but it's important to ensure that the aggregation of fill events doesn't inadvertently omit any important information.146-154: The
compareMessages
helper function is introduced to compare twoAnnotatedSubaccountMessage
objects based on block height, transaction index, and event index. This function is used in theaggregateFillEventsForSubaccountMessages
method to determine the order of fill events.160-176: The
sortEvents
method is introduced to sort events by block height, transaction index, and event index in ascending order. This method is used in theaggregateFillEventsForSubaccountMessages
method to sort the aggregated fill events.199-205: The
generateAllTopicKafkaMessages
method now callsaggregateFillEventsForSubaccountMessages
before generating Kafka messages forsubaccountMessages
. This ensures that the fill events are properly aggregated before they are published to Kafka.
…gated fill sfrom the entire block (#658)
Changelist
For order fills, the state transition of
CANCELED
->FILLED
isn't considered valid by the FE. This can happen if there are multiple fills in a given block for an IOC order. Because the totalFilled < order amount on the first fill, Indexer will publish a websocket message withCANCELED
status. In subsequent fills, when totalFilled=order amount, Indexer will publish a follow-up message withFILLED
status.Rather than send individual messages per fill per order, send a single websocket message containing the order status of the last fill and all aggregated fills.
Test Plan
Unit tests
Author/Reviewer Checklist
state-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.