-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbridge.py
More file actions
107 lines (94 loc) · 3.89 KB
/
bridge.py
File metadata and controls
107 lines (94 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import asyncio
import websockets
import json
import socket
SOURCES = [
{"name": "BINANCE", "url": "wss://stream.binance.com:9443/ws/btcusdt@trade"},
{"name": "COINBASE", "url": "wss://ws-feed.exchange.coinbase.com"},
{"name": "BITSTAMP", "url": "wss://ws.bitstamp.net"}
]
global_ticks = 0
async def send_to_cpp(conn, loop, fix_msg):
global global_ticks
global_ticks += 1
if global_ticks % 30 == 0:
print("\n[GATEWAY] INJECTING FAT FINGER ANOMALY ($250,000) TO TEST FPGA...\n")
fix_msg = "8=FIX.4.2\x0135=X\x0149=ANOMALY\x0155=BTCUSDT\x01269=0\x01270=250000.00\x01"
try:
await loop.sock_sendall(conn, fix_msg.encode())
except Exception as e:
pass
async def stream_binance(conn, loop):
url = SOURCES[0]["url"]
while True:
try:
async with websockets.connect(url) as ws:
print(f"[CONNECT] Connected to {SOURCES[0]['name']}")
async for msg in ws:
data = json.loads(msg)
price = data['p']
fix = f"8=FIX.4.2\x0135=X\x0149=BINANCE\x0155=BTCUSDT\x01269=0\x01270={price}\x01"
await send_to_cpp(conn, loop, fix)
await asyncio.sleep(0.1)
except Exception:
await asyncio.sleep(1)
async def stream_coinbase(conn, loop):
url = SOURCES[1]["url"]
while True:
try:
async with websockets.connect(url) as ws:
await ws.send(json.dumps({"type": "subscribe", "product_ids": ["BTC-USD"], "channels": ["ticker"]}))
print(f"[CONNECT] Connected to {SOURCES[1]['name']}")
async for msg in ws:
data = json.loads(msg)
if 'price' in data:
price = data['price']
fix = f"8=FIX.4.2\x0135=X\x0149=COINBASE\x0155=BTCUSD\x01269=0\x01270={price}\x01"
await send_to_cpp(conn, loop, fix)
await asyncio.sleep(0.1)
except Exception:
await asyncio.sleep(1)
async def stream_bitstamp(conn, loop):
url = SOURCES[2]["url"]
while True:
try:
async with websockets.connect(url) as ws:
sub_msg = {"event": "bts:subscribe", "data": {"channel": "live_trades_btcusd"}}
await ws.send(json.dumps(sub_msg))
print(f"[CONNECT] Connected to {SOURCES[2]['name']}")
async for msg in ws:
data = json.loads(msg)
if 'data' in data and 'price' in data['data']:
price = data['data']['price']
fix = f"8=FIX.4.2\x0135=X\x0149=BITSTAMP\x0155=BTCUSD\x01269=0\x01270={price}\x01"
await send_to_cpp(conn, loop, fix)
await asyncio.sleep(0.1)
except Exception:
await asyncio.sleep(1)
def start_tcp_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('0.0.0.0', 5000))
server.listen(1)
print("\n=======================================================")
print(" [GATEWAY] Multi-Exchange Live Aggregator (TCP Mode)")
print(" [GATEWAY] Listening on Port 5000...")
print("=======================================================")
print(" Waiting for Shaurya Engine to connect...\n")
conn, addr = server.accept()
print(f"\n[>>>] Shaurya Engine Connected from {addr}!\n")
return conn
async def main():
conn = start_tcp_server()
loop = asyncio.get_event_loop()
tasks = [
asyncio.create_task(stream_binance(conn, loop)),
asyncio.create_task(stream_coinbase(conn, loop)),
asyncio.create_task(stream_bitstamp(conn, loop))
]
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
print("\n[GATEWAY] Shutting down.")
conn.close()
if __name__ == "__main__":
asyncio.run(main())