Skip to content

Commit

Permalink
L1 Market Data (#24)
Browse files Browse the repository at this point in the history
* Adds market data producer and types

* Adds market l1 ring buffer. Refactors socket/gateway servers

* Adds scaffolding for market data server on port 8889

* adds function to send a message to all connected clients

* OrderBook sending market data

* MarketData server starts on port 8889. Adds libs for ssl and sqlite

* Refactors python load test into trading client to ease testing market data

* Testing pulling packages from alpine instead of from source

* Tries reconfiguring workflow for github

* Typo in libspdlog-dev

* Trying older catch library

* Catch2 / catch packages in ubuntu giving me issues. Rolling back

* Transmitting market data

* order book tests still failling

* Fixes github workflow
  • Loading branch information
sneilan authored Nov 27, 2023
1 parent 95c47c3 commit 7c3e64a
Show file tree
Hide file tree
Showing 19 changed files with 482 additions and 233 deletions.
21 changes: 13 additions & 8 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Download spdlog
- name: Install dependencies
run: |
mkdir third_party
cd third_party
git clone https://github.com/gabime/spdlog.git
cd spdlog
mkdir build && cd build
cmake ..
sudo make install
sudo apt-get install libspdlog-dev
# - name: Download spdlog
# run: |
# mkdir third_party
# cd third_party
# git clone https://github.com/gabime/spdlog.git
# cd spdlog
# mkdir build && cd build
# cmake ..
# sudo make install

- name: Download Catch2
run: |
mkdir third_party
cd third_party
git clone https://github.com/catchorg/Catch2.git
cd Catch2
Expand Down
9 changes: 5 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
FROM alpine:3.14

RUN apk add g++ git cmake make
RUN apk update
RUN apk add g++ git cmake make openssl sqlite-dev openssl-dev spdlog-dev

# Testing framework
RUN git clone https://github.com/catchorg/Catch2.git && \
Expand All @@ -9,9 +10,9 @@ RUN git clone https://github.com/catchorg/Catch2.git && \
cmake --build build/ --target install

# Logging library
RUN git clone https://github.com/gabime/spdlog.git && \
cd spdlog && mkdir build && cd build && \
cmake .. && make -j && make install
# RUN git clone https://github.com/gabime/spdlog.git && \
# cd spdlog && mkdir build && cd build && \
# cmake .. && make -j && make install

WORKDIR /
COPY /scripts/deploy/run.sh /run.sh
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ services:
build: .
ports:
- "8888:8888"
- "8889:8889"
volumes:
- ./:/app
1 change: 1 addition & 0 deletions scripts/deploy/make.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/bin/sh

cd /app
rm main test
cmake .
make
1 change: 1 addition & 0 deletions scripts/goClient/readme.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Go client is WIP! Use at own peril. Barely documented.
85 changes: 0 additions & 85 deletions scripts/loadTest.py

This file was deleted.

134 changes: 134 additions & 0 deletions scripts/python_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import socket
import sys
import time
from struct import pack, unpack
import random
import threading


class Client:
port = None
host = None

def log(self, msg):
print(msg)

def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
response = self.sock.recv(1024)
self.log(response)

def disconnect(self):
self.sock.close()

def listener(self):
raise NotImplementedError()

def start_listener(self):
self.thread = threading.Thread(target=self.listener)
self.thread.start()

def stop_listener(self):
raise NotImplementedError()


class MarketClient(Client):
host = '0.0.0.0'
port = 8889

def listener(self):
while True:
data = self.sock.recv(16)
print(data)
print(len(data))
if not data:
return

# c is char
# Q is unsigned long long
# i is 4 byte integer
# x is 1 byte padding.
format_string = 'BcxxiQ'
unpacked_data = unpack(format_string, data)

version = unpacked_data[0]
msg_type = unpacked_data[1].decode()
val = unpacked_data[2]
time_ms = unpacked_data[3]

self.handle_notification(msg_type, val, time_ms)

def handle_notification(self, msg_type: str, val: int, time_ms: int):
self.log({'msg_type': msg_type, 'val': val, 'time_ms': time_ms})


class TradingClient(Client):
host = '0.0.0.0'
port = 8888

type_to_msg = {
'u': 'updated',
'f': 'filled',
'r': 'recieved'
}

def __init__(self):
pass

def trade(self, price: int, quantity: int, side: str):
assert side in ['b', 's'], 'Side must be b or s for buy and sell'

message = pack(
'cii',
bytes(side, 'ascii'),
price,
quantity,
)

self.sock.sendall(message)

def listener(self):
while True:
data = self.sock.recv(21)
if not data:
return

# c is char
# Q is unsigned long long
# i is 4 byte integer
format_string = 'Qiii'
unpacked_data = unpack(format_string, data[1:])
msg_type = chr(data[0])
message = self.type_to_msg[msg_type]

id = unpacked_data[0]
quantity = unpacked_data[1]
filled_quantity = unpacked_data[2]
client_id = unpacked_data[3]

self.handle_notification(id, quantity, filled_quantity, client_id)

def handle_notification(self, id, quantity, filled_quantity, client_id):
self.log({'id': id, 'quantity': quantity, 'filled_quantity': filled_quantity, 'client_id': client_id})


client = TradingClient()
client.connect()
client.start_listener()

mkt = MarketClient()
mkt.connect()
mkt.start_listener()

while True:
char = sys.stdin.read(1)
if char == 'b':
break
time.sleep(.1)
print("Placing trade.")
price = random.randint(101, 999)
quantity = random.randint(1, 10)
side = 'b' if random.randint(0, 1) == 0 else 's'
client.trade(price, quantity, side)

51 changes: 49 additions & 2 deletions src/gateway/gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,55 @@ Gateway::Gateway(Producer<NewOrderEvent>* incoming_msg_producer,
this->order_pool = order_pool;
}

Gateway::~Gateway() throw() {
// incoming_msg_producer->cleanup();
Gateway::~Gateway() throw() {}

void Gateway::handleOutgoingMessage() {
ORDER_MMAP_OFFSET *offset = outgoing_message_consumer->get();
if (offset != nullptr) {
Order *order = order_pool->offset_to_pointer(*offset);

// message type (char)
// sequence ID (unsigned long long)
// total quantity (integer)
// filled quantity (integer)
// (TODO)
// last fill price (integer)
// last quantity filled (integer)

// hacked out for prototype purposes. Should replace with FIX or protobuf.
// Maybe even Apache Avro.
int total_size = sizeof(char) + sizeof(order->id) +
sizeof(order->quantity) +
sizeof(order->filled_quantity) + sizeof(order->clientId);
char buffer[total_size];

char orderRecieved = 'r';
char orderUpdated = 'u';
char orderFilled = 'f';
// @TODO cancelled later.

if (order->unfilled_quantity() == order->quantity) {
buffer[0] = orderRecieved;
} else if (order->unfilled_quantity() == 0) {
buffer[0] = orderFilled;
} else {
buffer[0] = orderUpdated;
}

int offset = 1;
std::memcpy(buffer + offset, &order->id, sizeof(order->id));
offset += sizeof(order->id);
std::memcpy(buffer + offset, &order->quantity, sizeof(order->quantity));
offset += sizeof(order->quantity);
std::memcpy(buffer + offset, &order->filled_quantity,
sizeof(order->filled_quantity));
offset += sizeof(order->clientId);
std::memcpy(buffer + offset, &order->clientId, sizeof(order->clientId));

sendMessage(order->clientId, buffer, total_size);
SPDLOG_DEBUG("Sent {} message {} about order {}", order->clientId,
buffer[0], order->id);
}
}

void Gateway::readMessage(int client_id, char *message) {
Expand Down
3 changes: 3 additions & 0 deletions src/gateway/gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ class Gateway : public SocketServer {
void newClient(int client_id) override;
void disconnected(int client_id) override;
void readMessage(int client_id, char *message) override;
void handleOutgoingMessage() override;
void run();

private:
Producer<NewOrderEvent>* incoming_msg_producer;
Consumer<ORDER_MMAP_OFFSET> *outgoing_message_consumer;
MMapObjectPool<Order> *order_pool;
};
#endif
35 changes: 35 additions & 0 deletions src/gateway/market_data.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include "market_data.h"

MarketData::MarketData(Consumer<L1MarketData>* market_l1_data_consumer) {
this->market_l1_data_consumer = market_l1_data_consumer;
}

MarketData::~MarketData() throw() {}

// I don't care about these for now because I am implementing authentication later.
// For now anyone can get market data if they connect.

void MarketData::newClient(int client_id) {
const char *msg = "Welcome new market data consumer";
if (!sendMessage(client_id, const_cast<char *>(msg), strlen(msg))) {
forceDisconnect(client_id);
}
}
void MarketData::disconnected(int client_id) {};
void MarketData::readMessage(int client_id, char *message) {};

void MarketData::handleOutgoingMessage() {
L1MarketData *market_data = market_l1_data_consumer->get();
if (market_data == nullptr) {
return;
}

// For every client, send market data.
sendMessageToAllClients((char*)market_data, sizeof(L1MarketData));
SPDLOG_DEBUG("Sent Mkt {} Value {} ", market_data->type, market_data->val);
}

void MarketData::run() {
bindSocket(8889);
listenToSocket();
}
Loading

0 comments on commit 7c3e64a

Please sign in to comment.