Skip to content

Commit 46d5f00

Browse files
Introduction of SyncRealTimeClient
1 parent 574bb18 commit 46d5f00

5 files changed

Lines changed: 315 additions & 25 deletions

File tree

examples/market_data_streaming.py

Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Example demonstrating real-time market data streaming with the ProjectX SDK."""
22

33
import os
4+
import signal
5+
import sys
46
import threading
57
import time
68
from collections import deque
@@ -13,6 +15,28 @@
1315
API_KEY = os.environ.get("PROJECTX_API_KEY")
1416
ENVIRONMENT = os.environ.get("PROJECTX_ENVIRONMENT", "demo")
1517

18+
# Global variables for graceful shutdown
19+
shutdown_requested = False
20+
client = None
21+
monitor = None
22+
23+
24+
def signal_handler(signum, frame):
25+
"""Handle shutdown signals gracefully."""
26+
global shutdown_requested, client, monitor
27+
print("\nShutdown signal received. Cleaning up...")
28+
shutdown_requested = True
29+
30+
if monitor:
31+
monitor.stop()
32+
if client:
33+
try:
34+
client.realtime.stop()
35+
except Exception as e:
36+
print(f"Error during cleanup: {e}")
37+
38+
sys.exit(0)
39+
1640

1741
class MarketDataMonitor:
1842
"""Class to monitor and display real-time market data."""
@@ -34,9 +58,9 @@ def __init__(self, client):
3458
self.display_thread.daemon = True
3559
self.display_thread.start()
3660

37-
def on_price_update(self, data):
61+
def on_quote_update(self, contract_id, data):
3862
"""Handle real-time price updates."""
39-
contract_id = data.get("contractId")
63+
print(f"Quote update for {contract_id}: {data}")
4064
if not contract_id or contract_id not in self.subscribed_contracts:
4165
return
4266

@@ -47,26 +71,42 @@ def on_price_update(self, data):
4771
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
4872
price_data = {
4973
"timestamp": timestamp,
50-
"last": data.get("last"),
51-
"bid": data.get("bid"),
52-
"ask": data.get("ask"),
74+
"last": data.get("lastPrice"), # Field mapping: lastPrice -> last
75+
"bid": data.get("bestBid"), # Field mapping: bestBid -> bid
76+
"ask": data.get("bestAsk"), # Field mapping: bestAsk -> ask
5377
"volume": data.get("volume"),
5478
}
5579
self.price_history[contract_id].append(price_data)
80+
print(f"Processed price data: {price_data}")
5681

57-
def on_trade_update(self, data):
82+
def on_trade_update(self, contract_id, data):
5883
"""Handle real-time trade updates."""
59-
contract_id = data.get("contractId")
6084
if not contract_id or contract_id not in self.subscribed_contracts:
6185
return
6286

6387
contract_name = self.subscribed_contracts[contract_id]
6488
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
6589

6690
print(f"\n[{timestamp}] TRADE: {contract_name}")
67-
print(f" Price: {data.get('price')}")
68-
print(f" Size: {data.get('size')}")
69-
print(f" Side: {data.get('side')}")
91+
print(f" Trade data type: {type(data)}")
92+
print(f" Trade data: {data}")
93+
94+
# Handle case where data might be a list or dict
95+
try:
96+
if isinstance(data, list):
97+
print(f" Data is a list with {len(data)} items")
98+
# If data is a list, take the first item or handle accordingly
99+
trade_data = data[0] if data else {}
100+
print(f" Using first item: {trade_data}")
101+
else:
102+
trade_data = data
103+
104+
print(f" Price: {trade_data.get('price', 'N/A')}")
105+
print(f" Size: {trade_data.get('size', 'N/A')}")
106+
print(f" Side: {trade_data.get('side', 'N/A')}")
107+
except Exception as e:
108+
print(f" Error processing trade data: {e}")
109+
print(f" Raw data: {data}")
70110

71111
def subscribe_contract(self, contract):
72112
"""Subscribe to real-time data for a contract."""
@@ -80,14 +120,16 @@ def subscribe_contract(self, contract):
80120
self.price_history[contract_id] = deque(maxlen=100)
81121

82122
# Subscribe to price updates
83-
self.client.realtime.market.subscribe_prices(
84-
contract_id=contract_id, callback=self.on_price_update
123+
self.client.realtime.market.subscribe_quotes(
124+
contract_id=contract_id, callback=self.on_quote_update
85125
)
126+
print("subscribed to quotes")
86127

87128
# Subscribe to trade updates
88129
self.client.realtime.market.subscribe_trades(
89130
contract_id=contract_id, callback=self.on_trade_update
90131
)
132+
print("subscribed to trades")
91133

92134
print(f"Subscribed to real-time data for {contract.name}")
93135

@@ -133,6 +175,12 @@ def stop(self):
133175

134176
def stream_market_data():
135177
"""Stream real-time market data for selected contracts."""
178+
global client, monitor
179+
180+
# Set up signal handling for graceful shutdown
181+
signal.signal(signal.SIGINT, signal_handler)
182+
signal.signal(signal.SIGTERM, signal_handler)
183+
136184
# Initialize the client
137185
print(f"Connecting to ProjectX {ENVIRONMENT} environment...")
138186
client = ProjectXClient(username=USERNAME, api_key=API_KEY, environment=ENVIRONMENT)
@@ -192,15 +240,24 @@ def stream_market_data():
192240
# Main loop to keep the script running
193241
print("\nStreaming real-time market data. Press Ctrl+C to exit.")
194242

195-
while True:
243+
while not shutdown_requested:
196244
time.sleep(0.1)
197245

198246
except KeyboardInterrupt:
199-
print("\nExiting...")
247+
print("\nKeyboard interrupt received...")
248+
except Exception as e:
249+
print(f"\nUnexpected error: {e}")
200250
finally:
201-
# Clean up
202-
monitor.stop()
203-
client.realtime.stop()
251+
# Clean up (signal handler may have already done this)
252+
print("Final cleanup...")
253+
if monitor:
254+
monitor.stop()
255+
if client:
256+
try:
257+
client.realtime.stop()
258+
print("Real-time client stopped successfully")
259+
except Exception as e:
260+
print(f"Error stopping real-time client: {e}")
204261

205262

206263
if __name__ == "__main__":

projectx_sdk/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
RequestError,
2121
ResourceNotFoundError,
2222
)
23-
from projectx_sdk.realtime import RealTimeClient
23+
from projectx_sdk.realtime import SyncRealTimeClient
2424

2525
logger = logging.getLogger(__name__)
2626

@@ -156,10 +156,10 @@ def __init__(
156156
self.trades = TradeService(self)
157157

158158
# Real-time client (lazy-initialized)
159-
self._realtime: Optional[RealTimeClient] = None
159+
self._realtime: Optional[SyncRealTimeClient] = None
160160

161161
@property
162-
def realtime(self) -> RealTimeClient:
162+
def realtime(self) -> SyncRealTimeClient:
163163
"""
164164
Get the real-time client for WebSocket connections.
165165
@@ -170,7 +170,7 @@ def realtime(self) -> RealTimeClient:
170170
"""
171171
if not self._realtime:
172172
token = self.auth.get_token()
173-
self._realtime = RealTimeClient(
173+
self._realtime = SyncRealTimeClient(
174174
auth_token=token,
175175
environment=self.environment,
176176
user_hub_url=self.USER_HUB_URLS.get(self.environment),

0 commit comments

Comments
 (0)