Skip to content

Commit d0064c3

Browse files
committed
Made detach function Send
1 parent bfca98e commit d0064c3

16 files changed

Lines changed: 169 additions & 119 deletions

File tree

ext/polars/src/dataframe/general.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ impl RbDataFrame {
337337
lambda: Value,
338338
maintain_order: bool,
339339
) -> RbResult<Self> {
340+
let lambda = Opaque::from(lambda);
340341
rb.enter_polars_df(|| {
341342
let df = self_.df.read().clone(); // Clone so we can't deadlock on re-entrance from lambda.
342343
let gb = if maintain_order {
@@ -345,7 +346,6 @@ impl RbDataFrame {
345346
df.group_by(by.iter().map(|x| &**x))
346347
}?;
347348

348-
let lambda = Opaque::from(lambda);
349349
let function = move |df: DataFrame| {
350350
Ruby::attach(|rb| {
351351
let lambda = rb.get_inner(lambda);

ext/polars/src/expr/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ impl RbExpr {
119119
}
120120

121121
pub fn arr_to_struct(&self, name_gen: Option<Value>) -> Self {
122-
let name_gen = name_gen.map(|o| PlanCallback::new_ruby(RubyObject(o)));
122+
let name_gen = name_gen.map(|o| PlanCallback::new_ruby(RubyObject::from(o)));
123123
self.inner.clone().arr().to_struct(name_gen).into()
124124
}
125125

ext/polars/src/expr/name.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ impl RbExpr {
1414
self.inner
1515
.clone()
1616
.name()
17-
.map(PlanCallback::new_ruby(RubyObject(lambda)))
17+
.map(PlanCallback::new_ruby(RubyObject::from(lambda)))
1818
.into()
1919
}
2020

@@ -46,7 +46,7 @@ impl RbExpr {
4646
self.inner
4747
.clone()
4848
.name()
49-
.map_fields(PlanCallback::new_ruby(RubyObject(name_mapper)))
49+
.map_fields(PlanCallback::new_ruby(RubyObject::from(name_mapper)))
5050
.into()
5151
}
5252

ext/polars/src/functions/lazy.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ pub fn cum_fold(
214214
include_init: bool,
215215
) -> RbResult<RbExpr> {
216216
let exprs = exprs.to_exprs()?;
217-
let func = PlanCallback::new_ruby(RubyObject(lambda));
217+
let func = PlanCallback::new_ruby(RubyObject::from(lambda));
218218
Ok(dsl::cum_fold_exprs(
219219
acc.inner.clone(),
220220
func,
@@ -233,7 +233,7 @@ pub fn cum_reduce(
233233
return_dtype: Option<&RbDataTypeExpr>,
234234
) -> RbResult<RbExpr> {
235235
let exprs = exprs.to_exprs()?;
236-
let func = PlanCallback::new_ruby(RubyObject(lambda));
236+
let func = PlanCallback::new_ruby(RubyObject::from(lambda));
237237
Ok(dsl::cum_reduce_exprs(
238238
func,
239239
exprs,
@@ -360,7 +360,7 @@ pub fn fold(
360360
return_dtype: Option<&RbDataTypeExpr>,
361361
) -> RbResult<RbExpr> {
362362
let exprs = exprs.to_exprs()?;
363-
let func = PlanCallback::new_ruby(RubyObject(lambda));
363+
let func = PlanCallback::new_ruby(RubyObject::from(lambda));
364364
Ok(dsl::fold_exprs(
365365
acc.inner.clone(),
366366
func,
@@ -444,7 +444,7 @@ pub fn reduce(
444444
return_dtype: Option<&RbDataTypeExpr>,
445445
) -> RbResult<RbExpr> {
446446
let exprs = exprs.to_exprs()?;
447-
let func = PlanCallback::new_ruby(RubyObject(lambda));
447+
let func = PlanCallback::new_ruby(RubyObject::from(lambda));
448448
Ok(dsl::reduce_exprs(
449449
func,
450450
exprs,

ext/polars/src/lazyframe/general.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -403,12 +403,11 @@ impl RbLazyFrame {
403403
rb.enter_polars(|| {
404404
let ldf = self_.ldf.read().clone();
405405

406-
let collect_batches = ldf
407-
.clone()
408-
.collect_batches(engine.0, maintain_order, chunk_size, lazy)
409-
.map_err(RbPolarsErr::from)?;
406+
let collect_batches =
407+
ldf.clone()
408+
.collect_batches(engine.0, maintain_order, chunk_size, lazy)?;
410409

411-
RbResult::Ok(RbCollectBatches {
410+
PolarsResult::Ok(RbCollectBatches {
412411
inner: Arc::new(Mutex::new(collect_batches)),
413412
_ldf: ldf,
414413
})

ext/polars/src/lazygroupby.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ impl RbLazyGroupBy {
5151
.map_err(RbPolarsErr::from)?,
5252
};
5353

54-
let function = RubyObject(lambda);
54+
let function = RubyObject::from(lambda);
5555

5656
Ok(lgb.apply(PlanCallback::new_ruby(function), schema).into())
5757
}

ext/polars/src/ruby/gvl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub trait GvlExt {
1212

1313
fn detach<T, F>(&self, func: F) -> T
1414
where
15-
F: FnOnce() -> T;
15+
F: FnOnce() -> T + Send;
1616
}
1717

1818
unsafe extern "C" {
@@ -51,7 +51,7 @@ impl GvlExt for Ruby {
5151

5252
fn detach<T, F>(&self, func: F) -> T
5353
where
54-
F: FnOnce() -> T,
54+
F: Send + FnOnce() -> T,
5555
{
5656
if std::env::var("POLARS_GVL").is_ok() {
5757
func()

ext/polars/src/ruby/lazy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl RubyUdfLazyFrameExt for LazyFrame {
3030
};
3131

3232
// handle non-Ruby threads
33-
start_background_ruby_thread(&Ruby::get_with(function.0));
33+
start_background_ruby_thread(&Ruby::get().unwrap());
3434
let udf = ArcValue::new(function.0);
3535
let f = move |df| {
3636
if is_non_ruby_thread() {

ext/polars/src/ruby/plan_callback.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ impl<Args: PlanCallbackArgs + Send + 'static, Out: PlanCallbackOut + Send + 'sta
186186
};
187187

188188
// handle non-Ruby threads
189-
start_background_ruby_thread(&Ruby::get_with(rbfn.0));
189+
start_background_ruby_thread(&Ruby::get().unwrap());
190190
let udf = ArcValue::new(rbfn.0);
191191
let f = move |args: Args| {
192192
if is_non_ruby_thread() {
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
use magnus::Value;
1+
use magnus::{Value, value::Opaque};
22

3-
#[derive(Debug)]
4-
pub struct RubyObject(pub Value);
3+
pub struct RubyObject(pub Opaque<Value>);
54

65
pub type RubyFunction = RubyObject;
76

87
impl From<Value> for RubyObject {
98
fn from(value: Value) -> Self {
10-
Self(value)
9+
Self(value.into())
1110
}
1211
}

0 commit comments

Comments
 (0)