-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP} Preliminary realtime bars support for new IB #671
base: dev
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,9 @@ | |
import re | ||
import traceback | ||
from ..trading_builtins import PollingStream | ||
import threading | ||
import json | ||
import websocket | ||
|
||
TYPE_MAP = dict( | ||
stock="STK", | ||
|
@@ -1190,4 +1193,108 @@ def _get_broker_id_from_raw_orders(self, raw_orders): | |
for leg in o["leg"]: | ||
if "orderId" in leg: | ||
ids.append(str(leg["orderId"])) | ||
return ids | ||
return ids | ||
|
||
def subscribe_to_realtime_bars(self, conid, callback, fields=None): | ||
base_url = self.data_source.base_url | ||
if base_url.startswith("https://"): | ||
ws_url = base_url.replace("https", "wss", 1) | ||
else: | ||
ws_url = base_url.replace("http", "ws", 1) | ||
|
||
try: | ||
if not hasattr(self, "wsc") or self.wsc is None: | ||
self.wsc = IBKRWebSocketClient(f"{ws_url}/ws") | ||
self.wsc.connect() | ||
|
||
self.wsc.subscribe_to_realtime_bars(conid, callback, fields) | ||
except Exception as e: | ||
logging.error(f"Error subscribing to realtime bars: {e}", exc_info=True) | ||
|
||
def unsubscribe_from_realtime_bars(self, conid): | ||
try: | ||
self.wsc.unsubscribe_from_realtime_bars(conid) | ||
except Exception as e: | ||
logging.error(f"Error unsubscribing from realtime bars: {e}", exc_info=True) | ||
|
||
class IBKRWebSocketClient: | ||
def __init__(self, url): | ||
self.url = url | ||
self.ws = None | ||
self.ws_thread = None | ||
self.subscriptions = set() | ||
self.callbacks = {} | ||
self.lock = threading.Lock() | ||
|
||
def connect(self): | ||
if not self.ws: | ||
try: | ||
self.ws = websocket.WebSocketApp( | ||
self.url, | ||
on_message=self.on_message, | ||
on_error=self.on_error, | ||
on_close=self.on_close, | ||
on_open=self.on_open, | ||
) | ||
self.ws_thread = threading.Thread(target=self.ws.run_forever, daemon=True) | ||
self.ws_thread.start() | ||
except Exception as e: | ||
logging.error(f"Error establishing WebSocket connection: {e}", exc_info=True) | ||
|
||
def on_open(self, ws): | ||
logging.info("WebSocket connection opened.") | ||
|
||
def on_message(self, ws, message): | ||
try: | ||
data = json.loads(message) | ||
topic = data.get("topic") | ||
if topic and topic.startswith("smd+"): | ||
conid = topic.split("+")[1] | ||
if conid in self.callbacks: | ||
Comment on lines
+1247
to
+1254
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unprotected WebSocket Callback ExecutionTell me moreWhat is the issue?The callback execution is not protected against slow or blocking callbacks that could affect the WebSocket message processing loop. Why this mattersIf a callback takes too long to process, it will block the WebSocket message processing thread, potentially causing message queuing and memory issues. Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews. |
||
self.callbacks[conid](data) | ||
else: | ||
logging.debug(f"Received message: {data}") | ||
except json.JSONDecodeError: | ||
Comment on lines
+1248
to
+1258
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Insufficient JSONDecodeError LoggingTell me moreWhat is the issue?The JSONDecodeError exception handler logs at INFO level but provides no details about the invalid message that caused the error. Why this mattersWithout logging the raw message content, debugging invalid JSON messages becomes more difficult as there's no way to identify what caused the parsing failure. Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews. |
||
logging.info("Received invalid JSON message.") | ||
except Exception as e: | ||
logging.error(f"Error processing WebSocket message: {e}", exc_info=True) | ||
|
||
def on_error(self, ws, error): | ||
Comment on lines
+1262
to
+1263
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incorrect WebSocket Error Logging LevelTell me moreWhat is the issue?WebSocket errors are logged at INFO level rather than ERROR level and without stack trace information. Why this mattersWebSocket errors represent connection issues that could disrupt data flow and should be treated as errors with full context for proper debugging. Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews. |
||
logging.info(f"WebSocket error: {error}") | ||
|
||
def on_close(self, ws, close_status_code, close_msg): | ||
logging.info("WebSocket connection closed.") | ||
with self.lock: | ||
Comment on lines
+1262
to
+1268
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Error Propagation to SubscribersTell me moreWhat is the issue?WebSocket error and close events are only logged but not propagated to the subscribers, leaving them unaware of connection issues. Why this mattersSubscribers need to know about connection issues to implement appropriate error handling or retry logic in their applications. Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews. |
||
self.ws = None | ||
|
||
def subscribe_to_realtime_bars(self, conid, callback, fields=None): | ||
with self.lock: | ||
try: | ||
self.subscriptions.add(conid) | ||
self.callbacks[conid] = callback | ||
fields = fields or [] | ||
msg = f"smd+{conid}+{{\"fields\": {json.dumps(fields)}}}" | ||
if self.ws: | ||
self.ws.send(msg) | ||
logging.info(f"Subscribed to conid: {conid} with fields={fields}") | ||
else: | ||
logging.error("WebSocket is not connected.") | ||
except Exception as e: | ||
logging.error(f"Error in subscribe_to_realtime_bars: {e}", exc_info=True) | ||
|
||
def unsubscribe_from_realtime_bars(self, conid): | ||
with self.lock: | ||
try: | ||
if conid in self.subscriptions: | ||
self.subscriptions.remove(conid) | ||
self.callbacks.pop(conid, None) | ||
if self.ws: | ||
msg = f"umd+{conid}+{{}}" | ||
self.ws.send(msg) | ||
logging.info(f"Unsubscribed from conid: {conid}") | ||
else: | ||
logging.error("WebSocket is not connected.") | ||
else: | ||
logging.info(f"Conid {conid} is not subscribed.") | ||
except Exception as e: | ||
logging.error(f"Error in unsubscribe_from_realtime_bars: {e}", exc_info=True) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,4 +30,5 @@ duckdb | |
tabulate | ||
thetadata | ||
holidays | ||
psutil | ||
psutil | ||
websocket-client |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ | |
"thetadata", | ||
"holidays", | ||
"psutil", | ||
"websocket-client" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Specify version for 'websocket-client' dependency.Tell me moreIt's great that you're adding the 'websocket-client' dependency to enable real-time data functionality! To ensure compatibility and reproducibility, I recommend specifying a version or version range for this dependency in the install_requires list, similar to how versions are specified for the other dependencies. For example, you could use 'websocket-client>=1.0.0' to require version 1.0.0 or higher. Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews. |
||
], | ||
classifiers=[ | ||
"Programming Language :: Python :: 3", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing WebSocket Reconnection Logic
Tell me more
What is the issue?
The WebSocket client is not properly handling reconnection scenarios. If the connection drops, there's no automatic reconnection mechanism.
Why this matters
In a production environment, network interruptions are common. Without automatic reconnection, real-time data feeds will stop working until manual intervention.
Chat with Korbit by mentioning @korbit-ai, and give a 👍 or 👎 to help Korbit improve your reviews.