-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscheduler_regular.py
More file actions
282 lines (233 loc) · 16.5 KB
/
Copy pathscheduler_regular.py
File metadata and controls
282 lines (233 loc) · 16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# ==========================================================
# FILE: scheduler_regular.py
# ==========================================================
# 🚨 [AI 에이전트(Copilot/Claude) 절대 주의 - 환각(Hallucination) 방어막]
# 제1헌법: queue_ledger.get_queue 등 모든 파일 I/O 및 락 점유 메서드는 무조건 asyncio.to_thread로 래핑하여 이벤트 루프 교착(Deadlock)을 원천 차단함.
# MODIFIED: [V44.47 이벤트 루프 데드락 영구 소각] 동기식 블로킹 호출 전면 비동기 래핑.
# ==========================================================
import logging
import datetime
from zoneinfo import ZoneInfo
import asyncio
import random
from scheduler_core import is_market_open, get_budget_allocation
async def scheduled_regular_trade(context):
try:
is_open = await asyncio.wait_for(asyncio.to_thread(is_market_open), timeout=10.0)
except asyncio.TimeoutError:
logging.error("⚠️ is_market_open 달력 API 타임아웃. 평일이므로 강제 개장 처리합니다.")
est = ZoneInfo('America/New_York')
is_open = datetime.datetime.now(est).weekday() < 5
if not is_open:
return
app_data = context.job.data
cfg, broker, strategy, tx_lock = app_data['cfg'], app_data['broker'], app_data['strategy'], app_data['tx_lock']
strategy_rev = app_data.get('strategy_rev')
queue_ledger = app_data.get('queue_ledger')
if tx_lock is None:
logging.warning("⚠️ [regular_trade] tx_lock 미초기화. 이번 사이클 스킵.")
await context.bot.send_message(chat_id=context.job.chat_id, text="⚠️ <b>[시스템 경고]</b> tx_lock 미초기화로 정규장 주문을 1회 스킵합니다.", parse_mode='HTML')
return
jitter_seconds = random.randint(0, 180)
await context.bot.send_message(
chat_id=context.job.chat_id,
text=f"🌃 <b>[04:05 EST] 통합 주문 장전 및 스냅샷 박제!</b>\n"
f"🛡️ 서버 접속 부하 방지를 위해 <b>{jitter_seconds}초</b> 대기 후 안전하게 주문 전송을 시도합니다.",
parse_mode='HTML'
)
await asyncio.sleep(jitter_seconds)
MAX_RETRIES = 15
RETRY_DELAY = 60
async def _do_regular_trade():
est = ZoneInfo('America/New_York')
_now_est = datetime.datetime.now(est)
today_str_est = _now_est.strftime("%Y-%m-%d")
async with tx_lock:
cash, holdings = await asyncio.to_thread(broker.get_account_balance)
if holdings is None:
return False, "❌ 계좌 정보를 불러오지 못했습니다."
safe_holdings = holdings if isinstance(holdings, dict) else {}
# MODIFIED: [맹점 2 수술] 파이썬 인자 평가 순서 함정으로 인한 동기 블로킹 방어
# cfg.get_active_tickers()를 메인 루프에서 분리하여 선제적으로 비동기 스캔
active_tickers_list = await asyncio.to_thread(cfg.get_active_tickers)
# 🚨 [비동기 래핑] get_budget_allocation 내부 파일 I/O 데드락 방어
sorted_tickers, allocated_cash = await asyncio.to_thread(get_budget_allocation, cash, active_tickers_list, cfg)
plans = {}
msgs = {t: "" for t in sorted_tickers}
all_success_map = {t: True for t in sorted_tickers}
v_rev_tickers = []
for t in sorted_tickers:
# 🚨 [비동기 래핑] 파일 I/O 데드락 방어
is_locked = await asyncio.to_thread(cfg.check_lock, t, "REG")
if is_locked:
skip_msg = (
f"⚠️ <b>[{t}] REG 잠금 미해제 — 주문 스킵</b>\n"
f"▫️ 전날 REG 잠금이 자정 초기화 시 해제되지 않아 오늘 04:05 EST 주문 루프에서 제외되었습니다.\n"
f"▫️ 수동으로 잠금 해제 후 상태를 확인하십시오."
)
await context.bot.send_message(context.job.chat_id, skip_msg, parse_mode='HTML')
continue
h = safe_holdings.get(t) or {}
safe_avg = float(h.get('avg') or 0.0)
safe_qty = int(float(h.get('qty') or 0))
curr_p = 0.0
prev_c = 0.0
for _api_retry in range(3):
curr_p = float(await asyncio.to_thread(broker.get_current_price, t) or 0.0)
prev_c = float(await asyncio.to_thread(broker.get_previous_close, t) or 0.0)
if curr_p > 0 and prev_c > 0:
break
await asyncio.sleep(2.0)
if curr_p <= 0 or prev_c <= 0:
msgs[t] += (
f"🚨 <b>[{t}] 전일 종가/현재가 API 3회 결측 감지!</b>\n"
f"▫️ 매수 방어선을 장전하지 못하고 다음 종목으로 넘어갑니다(continue 바이패스).\n"
)
all_success_map[t] = False
await context.bot.send_message(context.job.chat_id, msgs[t], parse_mode='HTML')
continue
# 🚨 [비동기 래핑] 파일 I/O 데드락 방어
version = await asyncio.to_thread(cfg.get_version, t)
is_manual_vwap = await asyncio.to_thread(getattr(cfg, 'get_manual_vwap_mode', lambda x: False), t)
if version == "V_REV" and is_manual_vwap:
msgs[t] += f"🛡️ <b>[{t}] V-REV 수동 시그널 모드 가동 중</b>\n"
msgs[t] += "▫️ 봇 자동 주문이 락다운되었습니다. V앱에서 장 마감 30분 전 세팅으로 수동 장전하십시오.\n"
await context.bot.send_message(context.job.chat_id, msgs[t], parse_mode='HTML')
continue
if version == "V_REV" or (version == "V14" and is_manual_vwap):
loc_orders = []
if version == "V_REV":
# 🚨 [비동기 래핑] 큐 장부 스레드 락 점유 원천 차단
q_data = await asyncio.to_thread(queue_ledger.get_queue, t)
v_rev_q_qty = sum(item.get("qty", 0) for item in q_data)
msgs[t] += f"🛡️ <b>[{t}] V-REV 예방적 덫 장전 기능 전면 소각</b>\n"
msgs[t] += "▫️ 자전거래(FDS) 의심을 회피하고 AVWAP 암살자가 자유롭게 타격하도록 예방 덫 기능을 영구 소각했습니다.\n"
plan_result = {"orders": [], "trigger_loc": False, "total_q": v_rev_q_qty}
if hasattr(strategy_rev, 'save_daily_snapshot'):
await asyncio.to_thread(strategy_rev.save_daily_snapshot, t, plan_result)
await asyncio.to_thread(cfg.set_lock, t, "REG")
all_success_map[t] = True
v_rev_tickers.append((t, version))
continue
elif version == "V14":
ma_5day = float(await asyncio.to_thread(broker.get_5day_ma, t) or 0.0)
v14_vwap_plugin = strategy.v14_vwap_plugin
# 🚨 [비동기 래핑] 내부 스냅샷 저장 등 파일 I/O 방어
v14_plan = await asyncio.to_thread(
v14_vwap_plugin.get_plan,
ticker=t, current_price=curr_p, avg_price=safe_avg, qty=safe_qty,
prev_close=prev_c, ma_5day=ma_5day, market_type="REG",
available_cash=allocated_cash.get(t, 0.0), is_simulation=False,
is_snapshot_mode=True
)
loc_orders = v14_plan.get('core_orders', [])
msgs[t] += f"🛡️ <b>[{t}] 무매4(VWAP) 예방적 LOC 덫 장전 완료</b>\n"
sell_success_count = 0
for o in loc_orders:
res = await asyncio.to_thread(broker.send_order, t, o['side'], o['qty'], o['price'], o['type'])
is_success = res.get('rt_cd') == '0'
if not is_success: all_success_map[t] = False
if is_success and o['side'] == 'SELL':
sell_success_count += 1
err_msg = res.get('msg1', '오류')
status_icon = '✅' if is_success else f'❌({err_msg})'
msgs[t] += f"└ {o['desc']} {o['qty']}주 (${o['price']}): {status_icon}\n"
await asyncio.sleep(0.2)
if all_success_map[t] and len(loc_orders) > 0:
await asyncio.to_thread(cfg.set_lock, t, "REG")
msgs[t] += "\n🔒 <b>방어선 전송 완료 (매매 잠금 설정됨)</b>"
elif sell_success_count > 0:
await asyncio.to_thread(cfg.set_lock, t, "REG")
msgs[t] += "\n⚠️ <b>일부 방어선 구축 실패 (반쪽짜리 잠금 설정됨)</b>"
elif len(loc_orders) == 0:
msgs[t] += "\n⚠️ <b>전송할 방어선(예산/수량)이 없습니다.</b>"
else:
msgs[t] += "\n⚠️ <b>일부 방어선 구축 실패 (잠금 보류)</b>"
await context.bot.send_message(context.job.chat_id, msgs[t], parse_mode='HTML')
v_rev_tickers.append((t, version))
continue
ma_5day = float(await asyncio.to_thread(broker.get_5day_ma, t) or 0.0)
# 🚨 [비동기 래핑] 내부 파일 I/O 및 로드 방어
plan = await asyncio.to_thread(
strategy.get_plan,
t, curr_p, safe_avg, safe_qty, prev_c, ma_5day=ma_5day, market_type="REG", available_cash=allocated_cash.get(t, 0.0), is_snapshot_mode=True
)
if hasattr(strategy, 'v14_plugin') and hasattr(strategy.v14_plugin, 'save_daily_snapshot'):
await asyncio.to_thread(strategy.v14_plugin.save_daily_snapshot, t, plan)
plans[t] = plan
if plan.get('core_orders', []) or plan.get('orders', []):
is_rev = plan.get('is_reverse', False)
msgs[t] += f"🔄 <b>[{t}] 리버스 주문 실행</b>\n" if is_rev else f"💎 <b>[{t}] 정규장 주문 실행</b>\n"
for t, ver in v_rev_tickers:
mod_name = "V-REV" if ver == "V_REV" else "무매4(VWAP)"
if ver == "V_REV":
msg = f"🎺 <b>[{t}] {mod_name} 예방적 방어망 전면 철거 완료</b>\n"
msg += f"▫️ 프리장이 개장했습니다! 자전거래(FDS) 의심을 회피하고 <b>AVWAP 암살자</b>의 기동력을 극대화하기 위해 예방적 덫(물리/가상)을 전면 소각했습니다.\n"
msg += f"▫️ <b>1회분 예산은 가상 에스크로에 100% 안전하게 격리 보존되며</b>, 암살자는 오직 잉여 현금만으로 자유롭게 타격합니다.\n"
msg += f"▫️ 장 후반(15:27 EST) VWAP 엔진이 깨어나 보존된 1회분 예산으로 정밀 타격을 개시합니다! 편안한 밤 보내십시오! 🌙💤\n"
else:
msg = f"🎺 <b>[{t}] {mod_name} 예방적 방어망 장전 완료</b>\n"
msg += f"▫️ 프리장이 개장했습니다! 시스템 다운 등 최악의 블랙스완을 대비하여 <b>지층별 분리 종가(LOC) 덫</b>을 KIS 서버에 선제 전송했습니다.\n"
msg += f"▫️ 서버가 무사하다면 장 후반(15:27 EST)에 스스로 깨어나 이 덫을 거두고 추세(60% 허들)를 스캔하여 새로운 최적 전술로 교체합니다! 편안한 밤 보내십시오! 🌙💤\n"
await context.bot.send_message(chat_id=context.job.chat_id, text=msg, parse_mode='HTML')
for t in sorted_tickers:
if t not in plans: continue
target_orders = plans[t].get('core_orders', plans[t].get('orders', []))
for o in target_orders:
res = await asyncio.to_thread(broker.send_order, t, o['side'], o['qty'], o['price'], o['type'])
if res.get('rt_cd') != '0': all_success_map[t] = False
err_msg = res.get('msg1', '오류')
status_icon = '✅' if res.get('rt_cd') == '0' else f'❌({err_msg})'
msgs[t] += f"└ 1차 필수: {o['desc']} {o['qty']}주: {status_icon}\n"
await asyncio.sleep(0.2)
for t in sorted_tickers:
if t not in plans: continue
target_bonus = plans[t].get('bonus_orders', [])
for o in target_bonus:
res = await asyncio.to_thread(broker.send_order, t, o['side'], o['qty'], o['price'], o['type'])
msgs[t] += f"└ 2차 보너스: {o['desc']} {o['qty']}주: {'✅' if res.get('rt_cd')=='0' else '❌(잔금패스)'}\n"
await asyncio.sleep(0.2)
for t in sorted_tickers:
if t not in plans: continue
target_orders = plans[t].get('core_orders', plans[t].get('orders', []))
target_bonus = plans[t].get('bonus_orders', [])
if not target_orders and not target_bonus: continue
if all_success_map[t] and len(target_orders) > 0:
await asyncio.to_thread(cfg.set_lock, t, "REG")
msgs[t] += "\n🔒 <b>필수 주문 정상 전송 완료 (잠금 설정됨)</b>"
elif not all_success_map[t] and len(target_orders) > 0:
msgs[t] += "\n⚠️ <b>일부 필수 주문 실패 (매매 잠금 보류)</b>"
elif len(target_bonus) > 0:
await asyncio.to_thread(cfg.set_lock, t, "REG")
msgs[t] += "\n🔒 <b>보너스 주문만 전송 완료 (잠금 설정됨)</b>"
if not any(tx[0] == t for tx in v_rev_tickers):
await context.bot.send_message(chat_id=context.job.chat_id, text=msgs[t], parse_mode='HTML')
return True, "SUCCESS"
for attempt in range(1, MAX_RETRIES + 1):
try:
success, fail_reason = await asyncio.wait_for(_do_regular_trade(), timeout=300.0)
if success:
if attempt > 1:
await context.bot.send_message(chat_id=context.job.chat_id, text=f"✅ <b>[통신 복구] {attempt}번째 재시도 끝에 전송을 완수했습니다!</b>", parse_mode='HTML')
return
except Exception as e:
logging.error(f"정규장 전송 에러 ({attempt}/{MAX_RETRIES}): {e}", exc_info=True)
if attempt == 1:
await context.bot.send_message(
chat_id=context.job.chat_id,
text=f"⚠️ <b>[API 통신 지연 감지]</b>\n한투 서버 불안정. 1분 뒤 재시도합니다! 🛡️\n<code>사유: {type(e).__name__}: {e}</code>",
parse_mode='HTML'
)
else:
logging.warning(f"정규장 조건 미충족 ({attempt}/{MAX_RETRIES}): {fail_reason}")
if attempt == 1:
await context.bot.send_message(
chat_id=context.job.chat_id,
text=f"⚠️ <b>[API 통신 지연 감지]</b>\n한투 서버 불안정. 1분 뒤 재시도합니다! 🛡️\n<code>사유: {fail_reason}</code>",
parse_mode='HTML'
)
if attempt < MAX_RETRIES:
if attempt != 1 and attempt % 5 == 0:
await context.bot.send_message(chat_id=context.job.chat_id, text=f"⚠️ <b>[API 통신 지연 감지]</b>\n한투 서버 불안정. 1분 뒤 재시도합니다! 🛡️", parse_mode='HTML')
await asyncio.sleep(RETRY_DELAY)
await context.bot.send_message(chat_id=context.job.chat_id, text="🚨 <b>[긴급 에러] 통신 복구 최종 실패. 수동 점검 요망!</b>", parse_mode='HTML')