Skip to content

Commit c49b72f

Browse files
author
Jason M. Miller
committed
feat: Add names to IO threads
This would have sped up debugging #243 had it already been implemented, and other threads are named, so I'm guessing the lack of a name is an oversight.
1 parent 0e45a86 commit c49b72f

1 file changed

Lines changed: 71 additions & 65 deletions

File tree

libshpool/src/protocol.rs

Lines changed: 71 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -249,81 +249,87 @@ impl Client {
249249
let exit_status = AtomicI32::new(1);
250250
thread::scope(|s| {
251251
// stdin -> sock
252-
let stdin_to_sock_h = s.spawn(|| -> anyhow::Result<()> {
253-
let _s = span!(Level::INFO, "stdin->sock").entered();
254-
let mut stdin = std::io::stdin().lock();
255-
let mut buf = vec![0; consts::BUF_SIZE];
256-
257-
loop {
258-
let nread = stdin.read(&mut buf).context("reading stdin from user")?;
259-
if nread == 0 {
260-
continue;
261-
}
262-
debug!("read {} bytes", nread);
252+
let stdin_to_sock_h = thread::Builder::new()
253+
.name("stdin_to_sock".to_string())
254+
.spawn_scoped(s, || -> anyhow::Result<()> {
255+
let _s = span!(Level::INFO, "stdin->sock").entered();
256+
let mut stdin = std::io::stdin().lock();
257+
let mut buf = vec![0; consts::BUF_SIZE];
258+
259+
loop {
260+
let nread = stdin.read(&mut buf).context("reading stdin from user")?;
261+
if nread == 0 {
262+
continue;
263+
}
264+
debug!("read {} bytes", nread);
263265

264-
let to_write = &buf[..nread];
265-
trace!("created to_write='{}'", String::from_utf8_lossy(to_write));
266+
let to_write = &buf[..nread];
267+
trace!("created to_write='{}'", String::from_utf8_lossy(to_write));
266268

267-
write_client_stream.write_all(to_write)?;
268-
write_client_stream.flush().context("flushing client")?;
269-
}
270-
});
269+
write_client_stream.write_all(to_write)?;
270+
write_client_stream.flush().context("flushing client")?;
271+
}
272+
})
273+
.unwrap();
271274

272275
// sock -> stdout
273-
let sock_to_stdout_h = s.spawn(|| -> anyhow::Result<()> {
274-
let _s = span!(Level::INFO, "sock->stdout").entered();
275-
276-
let mut stdout = std::io::stdout().lock();
277-
let mut buf = vec![0; consts::BUF_SIZE];
278-
279-
loop {
280-
let chunk = match Chunk::read_into(&mut read_client_stream, &mut buf) {
281-
Ok(c) => c,
282-
Err(err) => {
283-
error!("reading chunk: {:?}", err);
284-
return Err(err);
276+
let sock_to_stdout_h = thread::Builder::new()
277+
.name("sock_to_stdout".to_string())
278+
.spawn_scoped(s, || -> anyhow::Result<()> {
279+
let _s = span!(Level::INFO, "sock->stdout").entered();
280+
281+
let mut stdout = std::io::stdout().lock();
282+
let mut buf = vec![0; consts::BUF_SIZE];
283+
284+
loop {
285+
let chunk = match Chunk::read_into(&mut read_client_stream, &mut buf) {
286+
Ok(c) => c,
287+
Err(err) => {
288+
error!("reading chunk: {:?}", err);
289+
return Err(err);
290+
}
291+
};
292+
293+
if !chunk.buf.is_empty() {
294+
debug!(
295+
"chunk='{}' kind={:?} len={}",
296+
String::from_utf8_lossy(chunk.buf),
297+
chunk.kind,
298+
chunk.buf.len()
299+
);
285300
}
286-
};
287-
288-
if !chunk.buf.is_empty() {
289-
debug!(
290-
"chunk='{}' kind={:?} len={}",
291-
String::from_utf8_lossy(chunk.buf),
292-
chunk.kind,
293-
chunk.buf.len()
294-
);
295-
}
296301

297-
match chunk.kind {
298-
ChunkKind::Heartbeat => {
299-
trace!("got heartbeat chunk");
300-
}
301-
ChunkKind::Data => {
302-
stdout.write_all(chunk.buf).context("writing chunk to stdout")?;
303-
304-
if let Err(e) = stdout.flush() {
305-
if e.kind() == std::io::ErrorKind::WouldBlock {
306-
// If the fd is busy, we are likely just getting
307-
// flooded with output and don't need to worry about
308-
// flushing every last byte. Flushing is really
309-
// about interactive situations where we want to
310-
// see echoed bytes immediately.
311-
continue;
302+
match chunk.kind {
303+
ChunkKind::Heartbeat => {
304+
trace!("got heartbeat chunk");
305+
}
306+
ChunkKind::Data => {
307+
stdout.write_all(chunk.buf).context("writing chunk to stdout")?;
308+
309+
if let Err(e) = stdout.flush() {
310+
if e.kind() == std::io::ErrorKind::WouldBlock {
311+
// If the fd is busy, we are likely just getting
312+
// flooded with output and don't need to worry about
313+
// flushing every last byte. Flushing is really
314+
// about interactive situations where we want to
315+
// see echoed bytes immediately.
316+
continue;
317+
}
312318
}
319+
debug!("flushed stdout");
320+
}
321+
ChunkKind::ExitStatus => {
322+
let mut status_reader = io::Cursor::new(chunk.buf);
323+
let stat = status_reader
324+
.read_i32::<LittleEndian>()
325+
.context("reading exit status from exit status chunk")?;
326+
info!("got exit status frame (status={})", stat);
327+
exit_status.store(stat, Ordering::Release);
313328
}
314-
debug!("flushed stdout");
315-
}
316-
ChunkKind::ExitStatus => {
317-
let mut status_reader = io::Cursor::new(chunk.buf);
318-
let stat = status_reader
319-
.read_i32::<LittleEndian>()
320-
.context("reading exit status from exit status chunk")?;
321-
info!("got exit status frame (status={})", stat);
322-
exit_status.store(stat, Ordering::Release);
323329
}
324330
}
325-
}
326-
});
331+
})
332+
.unwrap();
327333

328334
loop {
329335
let mut nfinished_threads = 0;

0 commit comments

Comments
 (0)