Skip to content

Commit ddaa3ec

Browse files
committed
Added run_in_background function
1 parent 30c952c commit ddaa3ec

1 file changed

Lines changed: 18 additions & 9 deletions

File tree

ext/polars/src/file.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,21 +147,15 @@ impl Write for RbFileLikeObject {
147147
if is_non_ruby_thread() {
148148
let buf2 = buf.to_vec();
149149
let mut self2 = self.clone();
150-
let f = move |_rb: &Ruby| -> Box<dyn Any + Send> {
150+
let f = move |_rb: &Ruby| -> Result<usize, io::Error> {
151151
let result = self2.write(&buf2);
152152
if result.is_ok() {
153153
// flush writes for now
154154
self2.flush().unwrap();
155155
}
156-
Box::new(result)
156+
result
157157
};
158-
let (sender, receiver) = sync_channel(0);
159-
POLARS_RUBY_SENDER
160-
.get()
161-
.unwrap()
162-
.send((Box::new(f), sender))
163-
.unwrap();
164-
return *receiver.recv().unwrap().downcast().unwrap();
158+
return run_in_background(f);
165159
}
166160

167161
let expects_str = self.expects_str;
@@ -373,6 +367,21 @@ fn start_background_thread(rb: &Ruby) {
373367
});
374368
}
375369

370+
fn run_in_background<T, F>(f: F) -> T
371+
where
372+
T: Send + 'static,
373+
F: FnOnce(&Ruby) -> T + Send + 'static,
374+
{
375+
let f2 = move |rb: &Ruby| -> Box<dyn Any + Send> { Box::new(f(rb)) };
376+
let (sender, receiver) = sync_channel(0);
377+
POLARS_RUBY_SENDER
378+
.get()
379+
.unwrap()
380+
.send((Box::new(f2), sender))
381+
.unwrap();
382+
*receiver.recv().unwrap().downcast().unwrap()
383+
}
384+
376385
fn is_non_ruby_thread() -> bool {
377386
matches!(Ruby::get(), Err(RubyUnavailableError::NonRubyThread))
378387
}

0 commit comments

Comments
 (0)