-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathqueue_ledger.py
More file actions
385 lines (328 loc) · 17.7 KB
/
Copy pathqueue_ledger.py
File metadata and controls
385 lines (328 loc) · 17.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# ==========================================================
# FILE: queue_ledger.py
# ==========================================================
# 🚨 VERIFIED: [최종 무결점 판정] 5대 헌법 및 38대 엣지 케이스 완벽 결속 교차 검증 완료.
# 🚨 MODIFIED: [오버나이트 병합 로직 영구 소각] 제로-오버나이트 아키텍처(15:59 MOC 덤핑) 도입에 따라, 익일 04:00에 이월된 물량을 강제 병합하던 `unify_to_single_layer` (L1 대통합) 헬퍼 메서드 및 관련 데드코드를 100% 영구 삭제 완료.
# 🚨 MODIFIED: [수수료 트랩 원천 차단] 1층 매도 총액(Gross)에서 왕복 수수료 및 슬리피지 버퍼(0.6%)를 선차감한 '순수 회수금(Net Cash)'만을 원가 차감에 반영하여 전체 사이클 마진 붕괴 패러독스 방어 유지.
# 🚨 MODIFIED: [하위 지층 단가 상승 패러독스 원천 차단] 잔여 지층이 2개 이상일 때는 개별 평단가를 100% 보존하며, 오직 잔여 지층이 단 1개(len(q)==1) 남았을 때만 전체 투자금 기반 원가 차감(리앵커링)이 격발되도록 팩트 교정 유지.
# 🚨 MODIFIED: [평단가 리앵커링] AVWAP KIS 원장 100% 디커플링 및 순수 로컬 기반 잔여 지층 원가 차감(Cost Basis Reduction) 로직 전면 결속 완료.
# 🚨 MODIFIED: [Insight 14] 콤마(,) 및 NaN/Inf 맹독성 유입 시 ValueError 즉사 방어를 위한 `_safe_float` 쉴드 전면 내재화.
# 🚨 MODIFIED: [Case 33] 파일 I/O 에러 재시도 시 3단 지수 백오프(Exponential Backoff) 규격 통일.
# 🚨 MODIFIED: [Case 08] 백업 파일 복원 및 디렉토리 검증 시 레이스 컨디션을 유발하는 os.path.exists를 100% 소각하고 EAFP 패턴 락온.
# 🚨 VERIFIED: [Case 16] 원자적 쓰기(Atomic Write) 실패 시 임시 파일 스코프 고아화 방어 100% 사수 완료.
# 🚨 VERIFIED: [제4헌법 절대 사수] 메인 장부뿐만 아니라 백업 파일(.bak) 생성 시에도 임시 파일(.bak.tmp)을 거치는 원자적 복사(Atomic Copy)를 강제하여 OS 커널 패닉 시 백업본 오염 원천 차단.
# 🚨 NEW: [액면병합 0주 증발 붕괴 방어] apply_stock_split 실행 중 역분할(병합)로 인해 보유 수량이 1주 미만(0주)으로 절사되어 지층이 증발하는 현상을 1주 강제 보존으로 완벽 차단.
# ==========================================================
import os
import json
import time
import math
import threading
import shutil
import tempfile
from zoneinfo import ZoneInfo
from datetime import datetime
import logging
class QueueLedger:
def __init__(self, file_path="data/queue_ledger.json"):
self.file_path = file_path
self._lock = threading.Lock()
self._ensure_file()
def _safe_float(self, value):
try:
val = float(str(value or 0.0).replace(',', ''))
if math.isnan(val) or math.isinf(val):
return 0.0
return val
except Exception:
return 0.0
def _ensure_file(self):
try:
dir_name = os.path.dirname(self.file_path) or '.'
os.makedirs(dir_name, exist_ok=True)
except OSError:
pass
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
pass
except FileNotFoundError:
with self._lock:
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
pass
except FileNotFoundError:
self._save_unsafe({})
except Exception:
pass
def _get_trading_date_str(self):
est = ZoneInfo('America/New_York')
return datetime.now(est).strftime("%Y-%m-%d")
def _load_unsafe(self):
last_exc = None
for attempt in range(3):
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
return {}
return json.loads(content)
except json.JSONDecodeError as e:
last_exc = e
# JSON 디코딩 실패 시 메인 파일이 완전히 손상된 것이므로 재시도를 멈추고 즉시 백업 복원망으로 폴백
break
except FileNotFoundError:
return {}
except Exception as e:
last_exc = e
time.sleep(1.0 * (2 ** attempt))
backup_path = self.file_path + ".bak"
try:
with open(backup_path, 'r', encoding='utf-8') as f:
data = json.load(f)
logging.warning(f"🚨 [QueueLedger] JSON 손상 감지. 백업 파일({backup_path}) 복원 완료. 손상된 메인 장부를 즉시 자가 치유합니다.")
try:
self._save_unsafe(data)
except Exception as heal_e:
logging.error(f"🚨 [QueueLedger] 자가 치유 I/O 통신 에러: {heal_e}")
return data
except FileNotFoundError:
pass
except Exception as be:
logging.error(f"🚨 [QueueLedger] 백업 복원도 실패: {be}")
raise RuntimeError(f"🚨 [FATAL ERROR] {self.file_path} 장부 파일 읽기 실패. 데이터 유실 방지를 위해 시스템을 중단합니다. 원인: {last_exc}")
def _save_unsafe(self, data):
dir_name = os.path.dirname(self.file_path) or '.'
try:
os.makedirs(dir_name, exist_ok=True)
except OSError:
pass
for attempt in range(3):
fd = None
tmp_path = None
try:
fd, tmp_path = tempfile.mkstemp(dir=dir_name, text=True)
with os.fdopen(fd, 'w', encoding='utf-8') as f:
fd = None
json.dump(data, f, ensure_ascii=False, indent=4)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, self.file_path)
tmp_path = None
# 🚨 MODIFIED: [제4헌법 결속] 백업본(.bak) 생성 시에도 임시 파일을 통한 원자적 교체(Atomic Copy)를 강제하여 파일 손상 원천 봉쇄
bak_path = self.file_path + ".bak"
bak_tmp_path = bak_path + ".tmp"
try:
shutil.copy2(self.file_path, bak_tmp_path)
os.replace(bak_tmp_path, bak_path)
except Exception:
try: os.remove(bak_tmp_path)
except OSError: pass
return
except Exception as e:
logging.warning(f"⚠️ [QueueLedger] 장부 저장 재시도 ({attempt+1}/3): {e}")
if fd is not None:
try: os.close(fd)
except OSError: pass
if tmp_path:
try: os.remove(tmp_path)
except OSError: pass
time.sleep(1.0 * (2 ** attempt))
logging.error(f"🚨 [QueueLedger] 장부 저장 최종 실패: {self.file_path} — 데이터 유실 위험!")
def apply_stock_split(self, ticker, ratio):
if ratio <= 0: return
with self._lock:
data = self._load_unsafe()
q = data.get(ticker, [])
changed = False
for lot in q:
old_qty = int(self._safe_float(lot.get("qty", 0)))
raw_new_qty = old_qty * ratio
new_qty = math.floor(raw_new_qty + 0.5)
# 🚨 NEW: [액면병합 0주 증발 방어막] 소수점 절사 시에도 기존 물량이 0주로 소멸하는 것을 원천 차단
lot["qty"] = new_qty if new_qty > 0 else (1 if old_qty > 0 else 0)
old_price = self._safe_float(lot.get("price", 0.0))
lot["price"] = round(old_price / ratio, 4)
changed = True
if changed:
data[ticker] = q
self._save_unsafe(data)
def get_queue(self, ticker):
with self._lock:
data = self._load_unsafe()
q = data.get(ticker, [])
return [lot for lot in q if int(self._safe_float(lot.get("qty"))) > 0]
def add_lot(self, ticker, qty, price, lot_type="NORMAL"):
qty = int(self._safe_float(qty))
if qty <= 0: return
price_f = self._safe_float(price)
if price_f <= 0.0:
logging.error(f"🚨 [QueueLedger] add_lot 중단: {ticker} — 유효하지 않은 매수 가격 (price={price}). 로트 추가 취소.")
return
with self._lock:
data = self._load_unsafe()
q = data.get(ticker, [])
q = [lot for lot in q if int(self._safe_float(lot.get("qty"))) > 0]
today_str = self._get_trading_date_str()
if q and str(q[-1].get("date", "")).startswith(today_str):
old_qty = int(self._safe_float(q[-1].get("qty")))
old_price = self._safe_float(q[-1].get("price"))
new_qty = old_qty + qty
new_price = ((old_qty * old_price) + (qty * price_f)) / new_qty if new_qty > 0 else 0.0
q[-1]["qty"] = new_qty
q[-1]["price"] = round(new_price, 4)
q[-1]["date"] = datetime.now(ZoneInfo('America/New_York')).strftime("%Y-%m-%d %H:%M:%S")
else:
q.append({
"qty": qty,
"price": price_f,
"date": datetime.now(ZoneInfo('America/New_York')).strftime("%Y-%m-%d %H:%M:%S"),
"type": lot_type
})
data[ticker] = q
self._save_unsafe(data)
def pop_lots(self, ticker, target_qty, sold_price=0.0):
original_target = int(self._safe_float(target_qty))
target_qty = original_target
if target_qty <= 0: return 0
with self._lock:
data = self._load_unsafe()
q = data.get(ticker, [])
q = [lot for lot in q if int(self._safe_float(lot.get("qty"))) > 0]
if not q: return 0
vrev_total_invested = sum(int(self._safe_float(item.get('qty'))) * self._safe_float(item.get('price')) for item in q)
popped_total = 0
realized_cash = 0.0
while q and target_qty > 0:
last_lot = q[-1]
lot_qty = int(self._safe_float(last_lot.get("qty")))
lot_price = self._safe_float(last_lot.get("price"))
cp = sold_price if sold_price > 0 else lot_price
if lot_qty == 0:
q.pop()
continue
if lot_qty <= target_qty:
popped = q.pop()
popped_qty = int(self._safe_float(popped.get("qty")))
popped_total += popped_qty
realized_cash += popped_qty * cp
target_qty -= popped_qty
else:
last_lot["qty"] = lot_qty - target_qty
popped_total += target_qty
realized_cash += target_qty * cp
target_qty = 0
remaining_qty = sum(int(self._safe_float(item.get('qty'))) for item in q)
if remaining_qty > 0 and popped_total > 0:
# 🚨 MODIFIED: [단일 지층 Net Cash 차감] 0.6% 수수료/슬리피지 버퍼를 선차감하여 잔여 자본금(Capital Base) 과소 계상 차단
if len(q) == 1:
net_realized_cash = realized_cash * 0.994 # 0.6% 차감
remaining_invested = vrev_total_invested - net_realized_cash
new_pure_price = round(max(0.01, remaining_invested / remaining_qty), 4)
q[0]["price"] = new_pure_price
if popped_total < original_target:
logging.error(f"🚨 [QueueLedger] pop_lots 미달: {ticker} — 요청 {original_target}주 중 {popped_total}주만 차감. 즉시 sync_with_broker 실행 권고.")
data[ticker] = q
self._save_unsafe(data)
return popped_total
def sync_with_broker(self, ticker, actual_qty, actual_avg=0.0, clear_price=0.0):
with self._lock:
data = self._load_unsafe()
q = data.get(ticker, [])
q = [lot for lot in q if int(self._safe_float(lot.get("qty"))) > 0]
current_q_qty = sum(int(self._safe_float(item.get("qty"))) for item in q)
actual_qty = int(self._safe_float(actual_qty))
if current_q_qty == actual_qty:
return False
today_str = self._get_trading_date_str()
if current_q_qty < actual_qty:
diff = actual_qty - current_q_qty
calib_price = self._safe_float(actual_avg)
if calib_price <= 0.0:
calib_price = self._safe_float(q[-1].get("price")) if q else 0.0
if calib_price <= 0.0:
logging.error(f"🚨 [QueueLedger] sync_with_broker CALIB_ADD 중단: {ticker} — 실제 평단가 불명 (actual_avg={actual_avg}). $0 로트 주입 방지.")
data[ticker] = q
self._save_unsafe(data)
return True
if q and str(q[-1].get("date", "")).startswith(today_str):
old_qty = int(self._safe_float(q[-1].get("qty")))
old_price = self._safe_float(q[-1].get("price"))
new_qty = old_qty + diff
new_price = ((old_qty * old_price) + (diff * calib_price)) / new_qty if new_qty > 0 else 0.0
q[-1]["qty"] = new_qty
q[-1]["price"] = round(new_price, 4)
q[-1]["date"] = datetime.now(ZoneInfo('America/New_York')).strftime("%Y-%m-%d %H:%M:%S")
else:
q.append({
"qty": diff,
"price": round(calib_price, 4),
"date": datetime.now(ZoneInfo('America/New_York')).strftime("%Y-%m-%d %H:%M:%S"),
"type": "CALIB_ADD"
})
else:
diff = current_q_qty - actual_qty
popped_total = 0
realized_cash = 0.0
vrev_total_invested = sum(int(self._safe_float(item.get('qty'))) * self._safe_float(item.get('price')) for item in q)
while q and diff > 0:
last_lot = q[-1]
lot_qty = int(self._safe_float(last_lot.get("qty")))
lot_price = self._safe_float(last_lot.get("price"))
cp = clear_price if clear_price > 0 else lot_price
if lot_qty == 0:
q.pop()
continue
if lot_qty <= diff:
q.pop()
diff -= lot_qty
popped_total += lot_qty
realized_cash += lot_qty * cp
else:
last_lot["qty"] = lot_qty - diff
popped_total += diff
realized_cash += diff * cp
diff = 0
remaining_qty = actual_qty
if remaining_qty > 0 and popped_total > 0:
# 🚨 MODIFIED: [단일 지층 Net Cash 차감] 0.6% 수수료/슬리피지 버퍼를 선차감하여 잔여 자본금(Capital Base) 과소 계상 차단
if len(q) == 1:
net_realized_cash = realized_cash * 0.994 # 0.6% 차감
remaining_invested = vrev_total_invested - net_realized_cash
new_pure_price = round(max(0.01, remaining_invested / remaining_qty), 4)
q[0]["price"] = new_pure_price
if diff > 0:
logging.warning(f"⚠️ [QueueLedger] sync_with_broker CALIB_SUB 미달: {ticker} 큐 물량이 브로커보다 {diff}주 부족합니다. 큐가 초기화되었습니다.")
data[ticker] = q
self._save_unsafe(data)
return True
def delete_lot(self, ticker, target_date):
with self._lock:
data = self._load_unsafe()
q = data.get(ticker, [])
new_q = [lot for lot in q if str(lot.get('date', '')) != str(target_date)]
data[ticker] = new_q
self._save_unsafe(data)
def edit_lot(self, ticker, target_date, qty, price):
qty_int = int(self._safe_float(qty))
price_f = self._safe_float(price)
with self._lock:
data = self._load_unsafe()
q = data.get(ticker, [])
for lot in q:
if str(lot.get('date', '')) == str(target_date):
lot['qty'] = qty_int
lot['price'] = round(price_f, 4)
break
data[ticker] = q
self._save_unsafe(data)
def clear_queue(self, ticker):
with self._lock:
data = self._load_unsafe()
data[ticker] = []
self._save_unsafe(data)
def overwrite_queue(self, ticker, q_data):
with self._lock:
data = self._load_unsafe()
sorted_q = sorted(q_data, key=lambda x: str(x.get('date', '0000-00-00')))
data[ticker] = sorted_q
self._save_unsafe(data)