Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit 6d13567

Browse files
authored
Merge branch 'main' into name-only
2 parents 5d6b5ae + 8410d19 commit 6d13567

3 files changed

Lines changed: 171 additions & 72 deletions

File tree

  • packages
    • jumpstarter-driver-sdwire/jumpstarter_driver_sdwire
    • jumpstarter/jumpstarter/common

packages/jumpstarter-driver-dutlink/jumpstarter_driver_dutlink/driver.py

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,30 @@
11
from __future__ import annotations
22

3-
import os
43
import time
54
from collections.abc import AsyncGenerator
65
from dataclasses import dataclass, field
76

87
import pyudev
98
import usb.core
109
import usb.util
11-
from anyio import fail_after, sleep
12-
from anyio.streams.file import FileReadStream, FileWriteStream
10+
from anyio import sleep
1311
from jumpstarter_driver_composite.driver import CompositeInterface
1412
from jumpstarter_driver_opendal.driver import StorageMuxFlasherInterface
1513
from jumpstarter_driver_power.driver import PowerInterface, PowerReading
1614
from jumpstarter_driver_pyserial.driver import PySerial
1715
from serial.serialutil import SerialException
1816

17+
from jumpstarter.common.storage import read_from_storage_device, write_to_storage_device
1918
from jumpstarter.driver import Driver, export
2019

2120

2221
@dataclass(kw_only=True)
2322
class DutlinkConfig:
2423
serial: str | None = field(default=None)
2524
timeout_s: int = field(default=20) # 20 seconds, power control sequences can block USB for a long time
25+
storage_timeout: int = field(default=10)
26+
storage_leeway: int = field(default=6)
27+
storage_fsync_timeout: int = field(default=900)
2628

2729
dev: usb.core.Device = field(init=False)
2830
itf: usb.core.Interface = field(init=False)
@@ -187,44 +189,29 @@ def dut(self):
187189
def off(self):
188190
return self.control("off")
189191

190-
async def wait_for_storage_device(self):
191-
with fail_after(20):
192-
while True:
193-
self.logger.debug(f"waiting for storage device {self.storage_device}")
194-
if os.path.exists(self.storage_device):
195-
self.logger.debug(f"storage device {self.storage_device} is ready")
196-
# https://stackoverflow.com/a/2774125
197-
fd = os.open(self.storage_device, os.O_WRONLY)
198-
try:
199-
if os.lseek(fd, 0, os.SEEK_END) > 0:
200-
break
201-
finally:
202-
os.close(fd)
203-
await sleep(1)
204-
205192
@export
206193
async def write(self, src: str):
207194
self.host()
208-
await self.wait_for_storage_device()
209-
async with await FileWriteStream.from_path(self.storage_device) as stream:
210-
async with self.resource(src) as res:
211-
total_bytes = 0
212-
next_print = 0
213-
async for chunk in res:
214-
await stream.send(chunk)
215-
if total_bytes > next_print:
216-
self.logger.debug(f"{self.storage_device} written {total_bytes / (1024 * 1024)} MB")
217-
next_print += 50 * 1024 * 1024
218-
total_bytes += len(chunk)
195+
async with self.resource(src) as res:
196+
await write_to_storage_device(
197+
self.storage_device,
198+
res,
199+
timeout=self.storage_timeout,
200+
leeway=self.storage_leeway,
201+
fsync_timeout=self.storage_fsync_timeout,
202+
logger=self.logger,
203+
)
219204

220205
@export
221206
async def read(self, dst: str):
222207
self.host()
223-
await self.wait_for_storage_device()
224-
async with await FileReadStream.from_path(self.storage_device) as stream:
225-
async with self.resource(dst) as res:
226-
async for chunk in stream:
227-
await res.send(chunk)
208+
async with self.resource(dst) as res:
209+
await read_from_storage_device(
210+
self.storage_device,
211+
res,
212+
timeout=self.storage_timeout,
213+
logger=self.logger,
214+
)
228215

229216

230217
@dataclass(kw_only=True)

packages/jumpstarter-driver-sdwire/jumpstarter_driver_sdwire/driver.py

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
from __future__ import annotations
22

3-
import os
43
from dataclasses import dataclass, field
54

65
import pyudev
76
import usb.core
87
import usb.util
9-
from anyio import fail_after, sleep
10-
from anyio.streams.file import FileReadStream, FileWriteStream
118
from jumpstarter_driver_opendal.driver import StorageMuxFlasherInterface
129

10+
from jumpstarter.common.storage import read_from_storage_device, write_to_storage_device
1311
from jumpstarter.driver import Driver, export
1412

1513

@@ -20,6 +18,9 @@ class SDWire(StorageMuxFlasherInterface, Driver):
2018
itf: usb.core.Interface = field(init=False)
2119

2220
storage_device: str | None = field(default=None)
21+
storage_timeout: int = field(default=10)
22+
storage_leeway: int = field(default=6)
23+
storage_fsync_timeout: int = field(default=900)
2324

2425
def effective_storage_device(self):
2526
if self.storage_device is None:
@@ -101,45 +102,26 @@ def dut(self):
101102
def off(self):
102103
self.host()
103104

104-
async def wait_for_storage_device(self):
105-
with fail_after(10):
106-
storage_device = self.effective_storage_device()
107-
108-
while True:
109-
# https://stackoverflow.com/a/2774125
110-
try:
111-
fd = os.open(storage_device, os.O_WRONLY)
112-
if os.lseek(fd, 0, os.SEEK_END) > 0:
113-
break
114-
except OSError as e:
115-
match e.errno:
116-
case 123: # No medium found
117-
pass
118-
case 5: # Input/output error
119-
pass
120-
case _:
121-
raise
122-
finally:
123-
if "fd" in locals():
124-
os.close(fd)
125-
await sleep(1)
126-
127-
return storage_device
128-
129105
@export
130106
async def write(self, src: str):
131107
self.host()
132-
storage_device = await self.wait_for_storage_device()
133-
async with await FileWriteStream.from_path(storage_device) as stream:
134-
async with self.resource(src) as res:
135-
async for chunk in res:
136-
await stream.send(chunk)
108+
async with self.resource(src) as res:
109+
await write_to_storage_device(
110+
self.effective_storage_device(),
111+
res,
112+
timeout=self.storage_timeout,
113+
leeway=self.storage_leeway,
114+
fsync_timeout=self.storage_fsync_timeout,
115+
logger=self.logger,
116+
)
137117

138118
@export
139119
async def read(self, dst: str):
140120
self.host()
141-
storage_device = await self.wait_for_storage_device()
142-
async with await FileReadStream.from_path(storage_device) as stream:
143-
async with self.resource(dst) as res:
144-
async for chunk in stream:
145-
await res.send(chunk)
121+
async with self.resource(dst) as res:
122+
await read_from_storage_device(
123+
self.effective_storage_device(),
124+
res,
125+
timeout=self.storage_timeout,
126+
logger=self.logger,
127+
)
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import errno
2+
import os
3+
from logging import Logger
4+
from typing import Literal
5+
6+
from anyio import fail_after, sleep
7+
from anyio.abc import AnyByteStream
8+
from anyio.streams.file import FileReadStream, FileWriteStream
9+
10+
11+
async def wait_for_storage_device( # noqa: C901
12+
storage_device: os.PathLike,
13+
mode: Literal["wb", "rb"],
14+
timeout: int = 10,
15+
*,
16+
logger: Logger | None = None,
17+
) -> os.PathLike:
18+
with fail_after(timeout):
19+
while True:
20+
# https://stackoverflow.com/a/2774125
21+
try:
22+
match mode:
23+
case "wb":
24+
fd = os.open(storage_device, os.O_WRONLY)
25+
case "rb":
26+
fd = os.open(storage_device, os.O_RDONLY)
27+
case _:
28+
raise ValueError("invalid mode: {}".format(mode))
29+
with os.fdopen(fd, mode): # to prevent fd from leaking
30+
if os.lseek(fd, 0, os.SEEK_END) > 0:
31+
if logger:
32+
logger.info("storage device {} is ready".format(storage_device))
33+
break
34+
if logger:
35+
logger.debug("waiting for storage device {} to have a nonzero size".format(storage_device))
36+
except FileNotFoundError:
37+
if logger:
38+
logger.debug("waiting for storage device {} to appear".format(storage_device))
39+
except OSError as e:
40+
match e.errno:
41+
case errno.ENOMEDIUM | errno.EIO:
42+
if logger:
43+
logger.debug("waiting for storage device {} to be ready".format(storage_device))
44+
case _:
45+
raise
46+
47+
await sleep(1)
48+
49+
return storage_device
50+
51+
52+
async def write_to_storage_device(
53+
storage_device: os.PathLike,
54+
resource: AnyByteStream,
55+
timeout: int = 10,
56+
fsync_timeout: int = 900,
57+
leeway: int = 6,
58+
*,
59+
logger: Logger | None = None,
60+
):
61+
path = await wait_for_storage_device(
62+
storage_device,
63+
mode="wb",
64+
timeout=timeout,
65+
logger=logger,
66+
)
67+
with os.fdopen(os.open(path, os.O_WRONLY), "wb") as file:
68+
async with FileWriteStream(file) as stream:
69+
total_bytes = 0
70+
next_print = 0
71+
async for chunk in resource:
72+
await stream.send(chunk)
73+
if logger:
74+
total_bytes += len(chunk)
75+
if total_bytes > next_print:
76+
logger.info(
77+
"written {} MB to storage device {}".format(
78+
total_bytes / (1024 * 1024),
79+
storage_device,
80+
)
81+
)
82+
next_print += 50 * 1024 * 1024
83+
84+
with fail_after(fsync_timeout):
85+
while True:
86+
try:
87+
if logger:
88+
logger.info("fsyncing storage device {}, please wait".format(storage_device))
89+
os.fsync(file.fileno())
90+
except OSError as e:
91+
if e.errno == errno.EIO:
92+
await sleep(1)
93+
continue
94+
else:
95+
raise
96+
else:
97+
break
98+
99+
await sleep(leeway)
100+
101+
102+
async def read_from_storage_device(
103+
storage_device: os.PathLike,
104+
resource: AnyByteStream,
105+
timeout: int = 10,
106+
*,
107+
logger: Logger | None = None,
108+
):
109+
path = await wait_for_storage_device(
110+
storage_device,
111+
mode="rb",
112+
timeout=timeout,
113+
logger=logger,
114+
)
115+
with os.fdopen(os.open(path, os.O_RDONLY), "rb") as file:
116+
async with FileReadStream(file) as stream:
117+
total_bytes = 0
118+
next_print = 0
119+
async for chunk in stream:
120+
await resource.send(chunk)
121+
if logger:
122+
total_bytes += len(chunk)
123+
if total_bytes > next_print:
124+
logger.info(
125+
"read {} MB from storage device {}".format(
126+
total_bytes / (1024 * 1024),
127+
storage_device,
128+
)
129+
)
130+
next_print += 50 * 1024 * 1024

0 commit comments

Comments
 (0)