-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathlog.py
More file actions
209 lines (182 loc) · 8.7 KB
/
log.py
File metadata and controls
209 lines (182 loc) · 8.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
''' Configurable asynchronous logger using second core as context for writing to REPL, file and/or screen
Copyright (c) 2025 Harm Lammers
The dependency on the `display` module is optional: the Log class also works if `display.py` is missing, but only for logging to file.
For logging to display this module is set up to work with the Cybo-Drummer hardware (https://github.com/HLammers/cybo-drummer), which
uses an ILI9225 display, connected as defined in the constant definitions below. To make this work with another screen the `_DisplayMonitor`
class needs to be rewritten to use different display drivers.
MIT licence:
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
'''
import machine
import asyncio
from collections import deque
from singleton import singleton
from context import Context
try:
from display import Display, DISPLAY_W, DISPLAY_H
display_missing = False
except:
display_missing = True # allow the display module to be missing (to allow only logging to file)
_LOG_BUF_LEN = const(32)
# Diplay configuration
_DSP_BAUTRATE = const(50_000_000)
_DSP_SPI = const(0) # SPI controller
_DSP_PIN_RS = const(20) # RS (register selection) pin GPIO number
_DSP_PIN_RST = const(21) # RST (reset) pin GPIO number
_DSP_PIN_LED = const(17) # LED (backlight) pin GPIO number
_ROW_H = const(9)
_ROWS = const(19) # DISPLAY_H // _ROW_H
_COLS = const(27) # DISPLAY_W // 8
_TOP_MARGIN = const(3) # (_ROW_H - 8) // 2 + (DISPLAY_H - _ROWS * _ROW_H)
@singleton
class Log:
''' `asyncio`-compatible singleton logger which prints the message and can be configured to write to file and/or monitor
Args:
to_file (bool, optional): Write logs to log file if `True`; defaults to `False`
to_monitor (bool, optional): Write logs to display if `True`; defaults to `True` (which is overruled if the `display` import failed)
file_name (str, optional): File name for log file; defaults to '/log.txt'
'''
def __init__(self, to_file: bool = False, to_monitor: bool = True, file_name: str = '/log.txt') -> None:
self.to_file = to_file
if display_missing: to_monitor = False # overrule if the `display` import failed
self.to_monitor = to_monitor
self.file_name = file_name
if to_file:
open(file_name, 'w').close()
if to_monitor:
self._monitor = _DisplayMonitor()
self._monitor_task = None
self._core_1 = Context()
self._deque = deque((), _LOG_BUF_LEN)
self._put_flag = asyncio.ThreadSafeFlag()
def write(self, msg: str|bytes|bytearray) -> None:
''' Write message to logger buffer to be logged synchronously
Args:
msg (str | bytes | bytearray): Message to write to logger
'''
_deque = self._deque
if len(_deque) >= _LOG_BUF_LEN:
print('LOG BUF FULL, NOT LOGGED:', str(msg))
return
_deque.append(msg)
_put_flag = self._put_flag
_put_flag.set()
def sync_write(self, msg: str|bytes|bytearray) -> None:
''' Write message to logger synchronously (flushes to screen immediately and blocks until completed)
Args:
msg (str | bytes | bytearray): Message to write to logger
'''
print(msg if type(msg) == str else msg.decode('ascii')) # pyright: ignore[reportAttributeAccessIssue]
if self.to_monitor:
_monitor = self._monitor
_monitor.write(msg, True)
if self.to_file:
_log_to_file(self.file_name, msg)
async def run(self) -> None:
''' `asyncio` task to read from log buffer and print, write to file and/or show on display whenever data is available '''
_deque = self._deque
to_monitor = self.to_monitor
if to_monitor:
_monitor = self._monitor
self._monitor_task = asyncio.create_task(_monitor.run())
_monitor_write = _monitor.write
_write_to_file = _log_to_file
to_file = self.to_file
file_name = self.file_name
_core_1 = self._core_1
_sleep = asyncio.sleep
while True:
while not _deque:
await self._put_flag.wait()
msg = _deque.popleft()
print(msg if type(msg) == str else msg.decode('ascii'))
if to_monitor:
await _core_1.assign(_monitor_write, msg)
if to_file:
await _core_1.assign(_write_to_file, file_name, msg)
await _sleep(0)
def deinit(self) -> None:
''' Stop second core context, switch off display and cancel display monitor task '''
self._core_1.deinit()
if self.to_monitor:
self._monitor.deinit()
if (_task := self._monitor_task) is not None: _task.cancel()
def _log_to_file(file_name, msg: str|bytes|bytearray) -> None:
''' Write message to file
Args:
file_name (str): File name for log file; defaults to '/log.txt'
msg (str | bytes | bytearray): Message to write to file
'''
with open(file_name, 'a') as f:
f.write(msg)
f.write(b'\n')
@singleton
class _DisplayMonitor:
''' Singleton writer for displaying log messages on ILI9225 screen '''
def __init__(self) -> None:
self._display = Display(_DSP_SPI, _DSP_BAUTRATE, machine.Pin(_DSP_PIN_RS), machine.Pin(_DSP_PIN_RST),
machine.Pin(_DSP_PIN_LED, mode=machine.Pin.OUT))
self._line_buf = bytearray(_COLS)
self.row = 0
self._redraw = asyncio.Event()
def write(self, text: str|bytes|bytearray, sync: bool = False) -> None:
''' Write text to display asynchronously or synchronously; word wraps text to multiple lines if necessary
Args:
text (str | bytes | bytearray): Text to display
sync (bool, optional): Synchronously show screen update if `True`; set redraw screen flag to synchronously show screen update if
`False`; defaults to `False`
'''
if type(text) is not str:
text = text.decode('ascii') # pyright: ignore[reportAttributeAccessIssue]
_display = self._display
row = self.row
_DISPLAY_W = DISPLAY_W
_DISPLAY_H = DISPLAY_H
if len(text) <= _COLS:
line = text
else:
words = text.split()
line = ''
for word in words:
if (len_line := len(line)) + len(word) + (0 if len_line == 0 else 1) <= _COLS:
line += ('' if len_line == 0 else ' ') + word
else:
_display.text(line, 0, row * _ROW_H + _TOP_MARGIN, 0xFFFF)
row += 1
if row == _ROWS:
_display.scroll(0, -_ROW_H)
_display.fill_rect(0, _DISPLAY_H - _ROW_H, _DISPLAY_W, _ROW_H, 0)
row = _ROWS - 1
line = word
if line:
_display.text(line, 0, row * _ROW_H + _TOP_MARGIN, 0xFFFF)
row += 1
if row == _ROWS:
_display.scroll(0, -_ROW_H)
_display.fill_rect(0, _DISPLAY_H - _ROW_H, _DISPLAY_W, _ROW_H, 0)
row = _ROWS - 1
self.row = row
if sync:
_display.show()
else:
self._redraw.set()
async def run(self) -> None:
''' `asyncio` task to flush display buffer to screen whenever the redraw screen flag is set '''
_redraw = self._redraw.wait
_clear = self._redraw.clear
_draw_screen = self._display.async_show
while True:
await _redraw()
_clear()
await _draw_screen()
def deinit(self) -> None:
''' Turn display off '''
self._display.off()