This repository was archived by the owner on Sep 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathioutils.py
More file actions
74 lines (58 loc) · 2.4 KB
/
ioutils.py
File metadata and controls
74 lines (58 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from select import select
from socket import socket
from typing import IO, Callable
import logging
logger = logging.getLogger(__name__)
class IOWrapper:
def __init__(self, io_stream: IO, socket_: socket = None):
self.io_stream = io_stream
self.socket = socket_
self.next_timeout_cb = None
def set_next_timeout_cb(self, cb: Callable):
self.next_timeout_cb = cb
def read_or_eof(self, size, field):
if self.next_timeout_cb and self.socket:
timeout = self.socket.gettimeout()
self.socket.setblocking(False)
while True:
ready_to_read, _, _ = select([self.socket], [], [], 1)
if ready_to_read:
break
else:
self.next_timeout_cb()
self.next_timeout_cb = None
self.socket.setblocking(True)
self.socket.settimeout(timeout)
buf: bytes = self.io_stream.read(size)
if len(buf) != size:
logger.info(f"buffer content bytes: {buf}")
logger.info(f"buffer size: {len(buf)}")
logger.info(f"size: {size}")
raise EOFError(field)
return buf
def read_short(self, field: str) -> int:
return int.from_bytes(self.read_or_eof(2, field), byteorder='big', signed=False)
def read_long(self, field: str) -> int:
return int.from_bytes(self.read_or_eof(4, field), byteorder='big', signed=False)
def read_byte(self, field: str) -> int:
return int.from_bytes(self.read_or_eof(1, field), byteorder='big', signed=False)
def read_bytes(self, n: int, field: str) -> bytes:
buf: bytes = self.read_or_eof(n, field)
return buf
def read_string(self, n: int, field: str) -> str:
buf: bytes = self.read_or_eof(n, field)
return str(buf, "utf-8")
def write_short(self, v: int):
self.io_stream.write(v.to_bytes(2, byteorder='big', signed=False))
def write_long(self, v: int):
self.io_stream.write(v.to_bytes(4, byteorder='big', signed=False))
def write_byte(self, v: int):
self.io_stream.write(v.to_bytes(1, byteorder='big', signed=False))
def write_bytes(self, b: bytes):
self.io_stream.write(b)
def write_string(self, s: str):
self.io_stream.write(bytes(s, "utf-8"))
def flush(self):
self.io_stream.flush()
def close(self):
self.io_stream.close()