Skip to content

Commit 386c0cc

Browse files
committed
Improved thread scheduling
1 parent e84d385 commit 386c0cc

1 file changed

Lines changed: 17 additions & 15 deletions

File tree

ext/polars/src/file.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::io;
44
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
55
use std::path::PathBuf;
66
use std::sync::OnceLock;
7-
use std::sync::mpsc::{SyncSender, TryRecvError, sync_channel};
7+
use std::sync::mpsc::{RecvTimeoutError, SyncSender, sync_channel};
88

99
use magnus::{Error, RString, Ruby, Value, error::RubyUnavailableError, prelude::*, value::Opaque};
1010
use polars::io::mmap::MmapBytesReader;
@@ -15,7 +15,7 @@ use polars_utils::create_file;
1515

1616
use crate::error::RbPolarsErr;
1717
use crate::prelude::resolve_homedir;
18-
use crate::utils::{RubyAttach, to_rb_err};
18+
use crate::utils::{EnterPolarsExt, RubyAttach, to_rb_err};
1919
use crate::{RbErr, RbResult};
2020

2121
pub struct RbFileLikeObject {
@@ -336,22 +336,24 @@ fn start_background_ruby_thread(rb: &Ruby) {
336336

337337
// TODO save reference to thread?
338338
rb.thread_create_from_fn(move |rb2| {
339-
loop {
340-
match receiver.try_recv() {
341-
Ok((f, sender2)) => {
342-
sender2.send(f(rb2)).unwrap();
343-
}
344-
Err(TryRecvError::Empty) => {
345-
rb2.thread_sleep(std::time::Duration::from_millis(1))?;
346-
}
347-
Err(TryRecvError::Disconnected) => {
348-
todo!();
339+
rb2.detach(|| {
340+
loop {
341+
match receiver.recv_timeout(std::time::Duration::from_millis(10)) {
342+
Ok((f, sender2)) => {
343+
Ruby::attach(|rb3| sender2.send(f(rb3)).unwrap());
344+
}
345+
Err(RecvTimeoutError::Timeout) => {
346+
Ruby::attach(|rb3| rb3.thread_schedule());
347+
}
348+
Err(RecvTimeoutError::Disconnected) => {
349+
todo!();
350+
}
349351
}
350352
}
351-
}
352353

353-
#[allow(unreachable_code)]
354-
Ok(())
354+
#[allow(unreachable_code)]
355+
Ok(())
356+
})
355357
});
356358

357359
sender

0 commit comments

Comments
 (0)