diff --git a/protocol/streaming/full_node_streaming_manager_test.go b/protocol/streaming/full_node_streaming_manager_test.go index 7d07164b58..8d8e63eaeb 100644 --- a/protocol/streaming/full_node_streaming_manager_test.go +++ b/protocol/streaming/full_node_streaming_manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/mocks" streaming "github.com/dydxprotocol/v4-chain/protocol/streaming" clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "github.com/stretchr/testify/require" ) @@ -80,9 +81,9 @@ func UpdateOrder(orderId *pv1types.IndexerOrderId, totalFilledQuantums uint64) o } } -func toStreamUpdate(offChainUpdates []ocutypes.OffChainUpdateV1, blockHeight uint32) clobtypes.StreamUpdate { +func toStreamUpdate(offChainUpdates ...ocutypes.OffChainUpdateV1) clobtypes.StreamUpdate { return clobtypes.StreamUpdate{ - BlockHeight: blockHeight, + BlockHeight: uint32(0), ExecMode: uint32(sdktypes.ExecModeFinalize), UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ @@ -117,29 +118,111 @@ func NewOrderbookSubscription( ) } -type TestCase struct { - updates *[]clobtypes.StreamUpdate - subaccountIds []uint32 - filteredUpdates *[]clobtypes.StreamUpdate +func NewStreamOrderbookFill( + blockHeight uint32, + execMode uint32, +) *clobtypes.StreamUpdate { + return &clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: execMode, + UpdateMessage: &clobtypes.StreamUpdate_OrderFill{ + OrderFill: nil, + }, + } } -func TestFilterStreamUpdates(t *testing.T) { - logger := &mocks.Logger{} +func NewStreamTakerOrder( + blockHeight uint32, + execMode uint32, + order *clobtypes.Order, + remainingQuantums uint64, + optimisticallyFilledQuantums uint64, +) *clobtypes.StreamUpdate { + return &clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: execMode, + UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{ + TakerOrder: &clobtypes.StreamTakerOrder{ + TakerOrder: &clobtypes.StreamTakerOrder_Order{ + Order: order, + }, + TakerOrderStatus: &clobtypes.StreamTakerOrderStatus{ + OrderStatus: uint32(clobtypes.Success), + RemainingQuantums: remainingQuantums, + OptimisticallyFilledQuantums: optimisticallyFilledQuantums, + }, + }, + }, + } +} - subaccountIdNumber := uint32(1337) - subaccountId := pv1types.IndexerSubaccountId{ - Owner: "foo", - Number: subaccountIdNumber, +func NewSubaccountUpdate( + blockHeight uint32, + execMode uint32, + subaccountId *satypes.SubaccountId, +) *clobtypes.StreamUpdate { + return &clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: execMode, + UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{ + SubaccountUpdate: &satypes.StreamSubaccountUpdate{ + SubaccountId: subaccountId, + UpdatedPerpetualPositions: []*satypes.SubaccountPerpetualPosition{}, + UpdatedAssetPositions: []*satypes.SubaccountAssetPosition{}, + Snapshot: true, + }, + }, + } +} + +func NewPriceUpdate( + blockHeight uint32, + execMode uint32, +) *clobtypes.StreamUpdate { + return &clobtypes.StreamUpdate{ + BlockHeight: blockHeight, + ExecMode: execMode, + UpdateMessage: &clobtypes.StreamUpdate_PriceUpdate{ + PriceUpdate: &pricestypes.StreamPriceUpdate{ + MarketId: 1, + Price: pricestypes.MarketPrice{ + Id: 1, + Exponent: 1 ^ -6, + Price: 1, + }, + Snapshot: true, + }, + }, + } +} + +func NewIndexerOrderId(owner string, id uint32) pv1types.IndexerOrderId { + return pv1types.IndexerOrderId{ + SubaccountId: pv1types.IndexerSubaccountId{ + Owner: owner, + Number: id, + }, + ClientId: 0, + OrderFlags: 0, + ClobPairId: 0, } - orderId := pv1types.IndexerOrderId{ - SubaccountId: subaccountId, - ClientId: 0, - OrderFlags: 0, - ClobPairId: 0, +} + +func NewOrderId(owner string, id uint32) clobtypes.OrderId { + return clobtypes.OrderId{ + SubaccountId: satypes.SubaccountId{ + Owner: owner, + Number: id, + }, + ClientId: 0, + OrderFlags: 0, + ClobPairId: 0, } +} - order := pv1types.IndexerOrder{ - OrderId: orderId, +func NewIndexerOrder(id pv1types.IndexerOrderId) pv1types.IndexerOrder { + return pv1types.IndexerOrder{ + OrderId: id, Side: pv1types.IndexerOrder_SIDE_BUY, Quantums: uint64(10 ^ 6), Subticks: 1, @@ -152,116 +235,222 @@ func TestFilterStreamUpdates(t *testing.T) { ConditionType: pv1types.IndexerOrder_CONDITION_TYPE_UNSPECIFIED, ConditionalOrderTriggerSubticks: 0, } +} - newOrderId := order.OrderId - newOrderId.ClientId += 1 +func NewOrder(id clobtypes.OrderId) *clobtypes.Order { + return &clobtypes.Order{ + OrderId: id, + Side: clobtypes.Order_SIDE_BUY, + Quantums: uint64(10 ^ 6), + Subticks: 1, + GoodTilOneof: &clobtypes.Order_GoodTilBlock{ + GoodTilBlock: 10 ^ 9, + }, + TimeInForce: 10 ^ 9, + ReduceOnly: false, + ClientMetadata: 0, + ConditionType: clobtypes.Order_CONDITION_TYPE_UNSPECIFIED, + ConditionalOrderTriggerSubticks: 0, + } +} + +type TestCase struct { + updates *[]clobtypes.StreamUpdate + subaccountIds []uint32 + filteredUpdates *[]clobtypes.StreamUpdate +} + +func TestFilterStreamUpdates(t *testing.T) { + logger := &mocks.Logger{} + + subaccountIdNumber := uint32(1337) + orderId := NewIndexerOrderId("foo", subaccountIdNumber) + order := NewIndexerOrder(orderId) - newOrder := order - newOrder.OrderId = newOrderId - newOrder.Quantums += 10 ^ 6 + otherSubaccountIdNumber := uint32(2600) + otherOrderId := NewIndexerOrderId("bar", otherSubaccountIdNumber) + otherOrder := NewIndexerOrder(otherOrderId) - totalFilledQuantums := uint64(1988) + newOrderId := order.OrderId + newOrderId.ClientId += 1 + newOrder := NewIndexerOrder(newOrderId) - tests := make(map[string]TestCase) + otherNewOrderId := otherOrder.OrderId + otherNewOrderId.ClientId += 1 + otherNewOrder := NewIndexerOrder(otherNewOrderId) orderPlaceTime := time.Date(2024, 12, 25, 0, 0, 0, 0, time.UTC) openOrder := OpenOrder(&order, &orderPlaceTime) - orderCancelTime := orderPlaceTime.Add(time.Second) cancelOrder := CancelOrder(&orderId, &orderCancelTime) - orderReplaceTime := orderPlaceTime.Add(time.Minute) replaceOrder := ReplaceOrder(&orderId, &newOrder, &orderReplaceTime) - - updateOrder := UpdateOrder(&orderId, totalFilledQuantums) - - baseOffChainUpdates := []ocutypes.OffChainUpdateV1{openOrder, cancelOrder, replaceOrder, updateOrder} - baseStreamUpdates := []clobtypes.StreamUpdate{toStreamUpdate(baseOffChainUpdates, 0)} - tests["baseInScope"] = TestCase{ - updates: &baseStreamUpdates, - subaccountIds: []uint32{orderId.SubaccountId.Number}, - filteredUpdates: &baseStreamUpdates, - } - tests["baseNotInScope"] = TestCase{ - updates: &baseStreamUpdates, - subaccountIds: []uint32{0}, - filteredUpdates: nil, - } - - otherOrderId := orderId - otherSubaccountIdNumber := subaccountIdNumber + uint32(1) - otherOrderId.SubaccountId = pv1types.IndexerSubaccountId{ - Owner: "bar", - Number: otherSubaccountIdNumber, - } - otherOrder := order - otherOrder.OrderId = otherOrderId - - otherNewOrderId := otherOrder.OrderId - otherNewOrderId.ClientId += 1 - - otherNewOrder := otherOrder - otherNewOrder.OrderId = otherNewOrderId - otherNewOrder.Quantums += 10 ^ 6 + updateOrder := UpdateOrder(&orderId, uint64(1988)) otherOpenOrder := OpenOrder(&otherOrder, &orderPlaceTime) otherCancelOrder := CancelOrder(&otherOrderId, &orderCancelTime) otherReplaceOrder := ReplaceOrder(&otherOrderId, &otherNewOrder, &orderReplaceTime) - otherUpdateOrder := UpdateOrder(&otherOrderId, totalFilledQuantums) - - otherOffChainUpdates := []ocutypes.OffChainUpdateV1{ - otherOpenOrder, otherCancelOrder, otherReplaceOrder, otherUpdateOrder, - } - otherStreamUpdates := []clobtypes.StreamUpdate{toStreamUpdate(otherOffChainUpdates, 0)} - tests["otherInScope"] = TestCase{ - updates: &otherStreamUpdates, - subaccountIds: []uint32{otherSubaccountIdNumber}, - filteredUpdates: &otherStreamUpdates, - } - tests["otherNotInScope"] = TestCase{ - updates: &otherStreamUpdates, - subaccountIds: []uint32{subaccountIdNumber}, - filteredUpdates: nil, - } + otherUpdateOrder := UpdateOrder(&otherOrderId, uint64(1999)) + + baseStreamUpdate := toStreamUpdate(openOrder, cancelOrder, replaceOrder, updateOrder) + otherStreamUpdate := toStreamUpdate(otherOpenOrder, otherCancelOrder, otherReplaceOrder, otherUpdateOrder) + bothStreamUpdate := toStreamUpdate( + openOrder, + cancelOrder, + replaceOrder, + updateOrder, + otherOpenOrder, + otherCancelOrder, + otherReplaceOrder, + otherUpdateOrder, + ) - bothUpdates := []clobtypes.StreamUpdate{ - toStreamUpdate(append(baseOffChainUpdates, otherOffChainUpdates...), 0), - } - tests["bothInScope"] = TestCase{ - updates: &bothUpdates, - subaccountIds: []uint32{subaccountIdNumber, otherSubaccountIdNumber}, - filteredUpdates: &bothUpdates, - } - tests["bothOtherInScope"] = TestCase{ - updates: &bothUpdates, - subaccountIds: []uint32{otherSubaccountIdNumber}, - filteredUpdates: &otherStreamUpdates, - } - tests["bothBaseInScope"] = TestCase{ - updates: &bothUpdates, - subaccountIds: []uint32{subaccountIdNumber}, - filteredUpdates: &baseStreamUpdates, - } - tests["bothNoneInScopeWrongId"] = TestCase{ - updates: &bothUpdates, - subaccountIds: []uint32{404}, - filteredUpdates: nil, - } - tests["bothNoneInScopeNoId"] = TestCase{ - updates: &bothUpdates, - subaccountIds: []uint32{}, - filteredUpdates: nil, - } + orderBookFillUpdate := NewStreamOrderbookFill(0, 0) + clobOrder := NewOrder(NewOrderId("foo", 23)) + takerOrderUpdate := NewStreamTakerOrder(0, 0, clobOrder, 0, 0) + subaccountUpdate := NewSubaccountUpdate( + 0, + 0, + (*satypes.SubaccountId)(&orderId.SubaccountId), + ) + priceUpdate := NewPriceUpdate(0, 0) - tests["noUpdates"] = TestCase{ - updates: &[]clobtypes.StreamUpdate{}, - subaccountIds: []uint32{subaccountIdNumber}, - filteredUpdates: nil, - } - tests["noUpdatesNoId"] = TestCase{ - updates: &[]clobtypes.StreamUpdate{}, - subaccountIds: []uint32{}, - filteredUpdates: nil, + tests := map[string]TestCase{ + "baseInScope": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate}, + subaccountIds: []uint32{orderId.SubaccountId.Number}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate}, + }, + "baseNotInScope": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate}, + subaccountIds: []uint32{0}, + filteredUpdates: nil, + }, + "otherInScope": { + updates: &[]clobtypes.StreamUpdate{otherStreamUpdate}, + subaccountIds: []uint32{otherSubaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{otherStreamUpdate}, + }, + "otherNotInScope": { + updates: &[]clobtypes.StreamUpdate{otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: nil, + }, + "bothInScope": { + updates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber, otherSubaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + }, + "bothOtherInScope": { + updates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{otherSubaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{otherStreamUpdate}, + }, + "bothBaseInScope": { + updates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate}, + }, + "bothNoneInScopeWrongId": { + updates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{404}, + filteredUpdates: nil, + }, + "bothNoneInScopeNoId": { + updates: &[]clobtypes.StreamUpdate{bothStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: nil, + }, + "noUpdates": { + updates: &[]clobtypes.StreamUpdate{}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: nil, + }, + "noUpdatesNoId": { + updates: &[]clobtypes.StreamUpdate{}, + subaccountIds: []uint32{}, + filteredUpdates: nil, + }, + "orderBookFillUpdates": { + updates: &[]clobtypes.StreamUpdate{*orderBookFillUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*orderBookFillUpdate}, + }, + "orderBookFillUpdatesDropUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate, otherStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*orderBookFillUpdate}, + }, + "orderBookFillUpdatesFilterUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, + }, + "orderBookFillUpdatesFilterAndDropUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *orderBookFillUpdate}, + }, + "takerOrderUpdates": { + updates: &[]clobtypes.StreamUpdate{*takerOrderUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*takerOrderUpdate}, + }, + "takerOrderUpdatesDropUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate, otherStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*takerOrderUpdate}, + }, + "takerOrderUpdatesFilterUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, + }, + "takerOrderUpdatesFilterAndDropUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *takerOrderUpdate}, + }, + "subaccountUpdates": { + updates: &[]clobtypes.StreamUpdate{*subaccountUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*subaccountUpdate}, + }, + "subaccountUpdatesDropUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate, otherStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*subaccountUpdate}, + }, + "subaccountUpdatesFilterUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, + }, + "subaccountUpdatesFilterAndDropUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *subaccountUpdate}, + }, + "priceUpdates": { + updates: &[]clobtypes.StreamUpdate{*priceUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*priceUpdate}, + }, + "priceUpdatesDropUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate, otherStreamUpdate}, + subaccountIds: []uint32{}, + filteredUpdates: &[]clobtypes.StreamUpdate{*priceUpdate}, + }, + "priceUpdatesFilterUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, + }, + "priceUpdatesFilterAndDropUpdate": { + updates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate, otherStreamUpdate}, + subaccountIds: []uint32{subaccountIdNumber}, + filteredUpdates: &[]clobtypes.StreamUpdate{baseStreamUpdate, *priceUpdate}, + }, } for name, testCase := range tests {