Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP} Preliminary realtime bars support for new IB #671

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 108 additions & 1 deletion lumibot/brokers/interactive_brokers_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import re
import traceback
from ..trading_builtins import PollingStream
import threading
import json
import websocket

TYPE_MAP = dict(
stock="STK",
Expand Down Expand Up @@ -1190,4 +1193,108 @@ def _get_broker_id_from_raw_orders(self, raw_orders):
for leg in o["leg"]:
if "orderId" in leg:
ids.append(str(leg["orderId"]))
return ids
return ids

def subscribe_to_realtime_bars(self, conid, callback, fields=None):
base_url = self.data_source.base_url
if base_url.startswith("https://"):
ws_url = base_url.replace("https", "wss", 1)
else:
ws_url = base_url.replace("http", "ws", 1)

try:
if not hasattr(self, "wsc") or self.wsc is None:
self.wsc = IBKRWebSocketClient(f"{ws_url}/ws")
self.wsc.connect()

Comment on lines +1198 to +1210
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing WebSocket Reconnection Logic category Functionality

Tell me more
What is the issue?

The WebSocket client is not properly handling reconnection scenarios. If the connection drops, there's no automatic reconnection mechanism.

Why this matters

In a production environment, network interruptions are common. Without automatic reconnection, real-time data feeds will stop working until manual intervention.

Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews.

self.wsc.subscribe_to_realtime_bars(conid, callback, fields)
except Exception as e:
logging.error(f"Error subscribing to realtime bars: {e}", exc_info=True)

def unsubscribe_from_realtime_bars(self, conid):
try:
self.wsc.unsubscribe_from_realtime_bars(conid)
except Exception as e:
logging.error(f"Error unsubscribing from realtime bars: {e}", exc_info=True)

class IBKRWebSocketClient:
def __init__(self, url):
self.url = url
self.ws = None
self.ws_thread = None
self.subscriptions = set()
self.callbacks = {}
self.lock = threading.Lock()

def connect(self):
if not self.ws:
try:
self.ws = websocket.WebSocketApp(
self.url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open,
)
self.ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True)
self.ws_thread.start()
except Exception as e:
logging.error(f"Error establishing WebSocket connection: {e}", exc_info=True)

def on_open(self, ws):
logging.info("WebSocket connection opened.")

def on_message(self, ws, message):
try:
data = json.loads(message)
topic = data.get("topic")
if topic and topic.startswith("smd+"):
conid = topic.split("+")[1]
if conid in self.callbacks:
Comment on lines +1247 to +1254
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unprotected WebSocket Callback Execution category Functionality

Tell me more
What is the issue?

The callback execution is not protected against slow or blocking callbacks that could affect the WebSocket message processing loop.

Why this matters

If a callback takes too long to process, it will block the WebSocket message processing thread, potentially causing message queuing and memory issues.

Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews.

self.callbacks[conid](data)
else:
logging.debug(f"Received message: {data}")
except json.JSONDecodeError:
Comment on lines +1248 to +1258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insufficient JSONDecodeError Logging category Error Handling

Tell me more
What is the issue?

The JSONDecodeError exception handler logs at INFO level but provides no details about the invalid message that caused the error.

Why this matters

Without logging the raw message content, debugging invalid JSON messages becomes more difficult as there's no way to identify what caused the parsing failure.

Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews.

logging.info("Received invalid JSON message.")
except Exception as e:
logging.error(f"Error processing WebSocket message: {e}", exc_info=True)

def on_error(self, ws, error):
Comment on lines +1262 to +1263
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect WebSocket Error Logging Level category Error Handling

Tell me more
What is the issue?

WebSocket errors are logged at INFO level rather than ERROR level and without stack trace information.

Why this matters

WebSocket errors represent connection issues that could disrupt data flow and should be treated as errors with full context for proper debugging.

Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews.

logging.info(f"WebSocket error: {error}")

def on_close(self, ws, close_status_code, close_msg):
logging.info("WebSocket connection closed.")
with self.lock:
Comment on lines +1262 to +1268
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Error Propagation to Subscribers category Functionality

Tell me more
What is the issue?

WebSocket error and close events are only logged but not propagated to the subscribers, leaving them unaware of connection issues.

Why this matters

Subscribers need to know about connection issues to implement appropriate error handling or retry logic in their applications.

Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews.

self.ws = None

def subscribe_to_realtime_bars(self, conid, callback, fields=None):
with self.lock:
try:
self.subscriptions.add(conid)
self.callbacks[conid] = callback
fields = fields or []
msg = f"smd+{conid}+{{\"fields\": {json.dumps(fields)}}}"
if self.ws:
self.ws.send(msg)
logging.info(f"Subscribed to conid: {conid} with fields={fields}")
else:
logging.error("WebSocket is not connected.")
except Exception as e:
logging.error(f"Error in subscribe_to_realtime_bars: {e}", exc_info=True)

def unsubscribe_from_realtime_bars(self, conid):
with self.lock:
try:
if conid in self.subscriptions:
self.subscriptions.remove(conid)
self.callbacks.pop(conid, None)
if self.ws:
msg = f"umd+{conid}+{{}}"
self.ws.send(msg)
logging.info(f"Unsubscribed from conid: {conid}")
else:
logging.error("WebSocket is not connected.")
else:
logging.info(f"Conid {conid} is not subscribed.")
except Exception as e:
logging.error(f"Error in unsubscribe_from_realtime_bars: {e}", exc_info=True)
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ duckdb
tabulate
thetadata
holidays
psutil
psutil
websocket-client
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"thetadata",
"holidays",
"psutil",
"websocket-client"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specify version for 'websocket-client' dependency. category Third-party Libraries

Tell me more

It's great that you're adding the 'websocket-client' dependency to enable real-time data functionality! To ensure compatibility and reproducibility, I recommend specifying a version or version range for this dependency in the install_requires list, similar to how versions are specified for the other dependencies. For example, you could use 'websocket-client>=1.0.0' to require version 1.0.0 or higher.

Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews.

],
classifiers=[
"Programming Language :: Python :: 3",
Expand Down
Loading