Skip to content

Commit 3b6b659

Browse files
authored
Merge pull request #114 from cpsievert/fix/chunk-large-dataframe-register
fix: chunk large DataFrames in register to avoid duckdb-rs panic
2 parents 2eaeedb + c22a981 commit 3b6b659

1 file changed

Lines changed: 116 additions & 10 deletions

File tree

src/reader/duckdb.rs

Lines changed: 116 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -516,17 +516,58 @@ impl Reader for DuckDBReader {
516516
)));
517517
}
518518

519-
// Convert DataFrame to Arrow query params
520-
let params = dataframe_to_arrow_params(df)?;
519+
// Workaround for a duckdb-rs limitation (not a DuckDB limitation).
520+
//
521+
// duckdb-rs's `ArrowVTab` writes each RecordBatch into a single DuckDB
522+
// `DataChunk`, which has a fixed capacity of `STANDARD_VECTOR_SIZE`.
523+
// That constant is defined in DuckDB's C++ source at
524+
// `src/include/duckdb/common/constants.hpp` and is currently 2048.
525+
// When a RecordBatch exceeds this, `FlatVector::copy` panics with
526+
// `assertion failed: data.len() <= self.capacity()`.
527+
//
528+
// We chunk large DataFrames to stay within this limit. The first chunk
529+
// creates the table (letting DuckDB infer the schema from Arrow), and
530+
// subsequent chunks INSERT into it.
531+
const MAX_ARROW_BATCH_ROWS: usize = 2048;
532+
let total_rows = df.height();
533+
534+
if total_rows <= MAX_ARROW_BATCH_ROWS {
535+
// Small DataFrame: register in a single batch
536+
let params = dataframe_to_arrow_params(df)?;
537+
let sql = format!(
538+
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
539+
name
540+
);
541+
self.conn.execute(&sql, params).map_err(|e| {
542+
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
543+
})?;
544+
} else {
545+
// Large DataFrame: create table from first chunk, then insert remaining chunks
546+
let first_chunk = df.slice(0, MAX_ARROW_BATCH_ROWS);
547+
let params = dataframe_to_arrow_params(first_chunk)?;
548+
let create_sql = format!(
549+
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
550+
name
551+
);
552+
self.conn.execute(&create_sql, params).map_err(|e| {
553+
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
554+
})?;
521555

522-
// Create temp table from Arrow data
523-
let sql = format!(
524-
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
525-
name
526-
);
527-
self.conn.execute(&sql, params).map_err(|e| {
528-
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
529-
})?;
556+
let mut offset = MAX_ARROW_BATCH_ROWS;
557+
while offset < total_rows {
558+
let chunk_size = std::cmp::min(MAX_ARROW_BATCH_ROWS, total_rows - offset);
559+
let chunk = df.slice(offset as i64, chunk_size);
560+
let params = dataframe_to_arrow_params(chunk)?;
561+
let insert_sql = format!("INSERT INTO \"{}\" SELECT * FROM arrow(?, ?)", name);
562+
self.conn.execute(&insert_sql, params).map_err(|e| {
563+
GgsqlError::ReaderError(format!(
564+
"Failed to insert chunk into table '{}': {}",
565+
name, e
566+
))
567+
})?;
568+
offset += chunk_size;
569+
}
570+
}
530571

531572
// Track the table so we can unregister it later
532573
self.registered_tables.insert(name.to_string());
@@ -783,4 +824,69 @@ mod tests {
783824
let result = reader.execute_sql("SELECT * FROM data").unwrap();
784825
assert_eq!(result.height(), 3);
785826
}
827+
828+
#[test]
829+
fn test_register_large_dataframe() {
830+
// duckdb-rs Arrow vtab has a vector capacity of 2048 rows. DataFrames
831+
// larger than this must be chunked to avoid a panic.
832+
let mut reader = DuckDBReader::from_connection_string("duckdb://memory").unwrap();
833+
834+
let n = 3000;
835+
let ids: Vec<i32> = (0..n).collect();
836+
let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
837+
let names: Vec<String> = (0..n).map(|i| format!("item_{}", i)).collect();
838+
839+
let df = DataFrame::new(vec![
840+
Column::new("id".into(), ids),
841+
Column::new("value".into(), values),
842+
Column::new("name".into(), names),
843+
])
844+
.unwrap();
845+
846+
reader.register("large_table", df).unwrap();
847+
848+
// Verify row count
849+
let result = reader
850+
.execute_sql("SELECT COUNT(*) as cnt FROM large_table")
851+
.unwrap();
852+
let count = result.column("cnt").unwrap().i64().unwrap().get(0).unwrap();
853+
assert_eq!(count, n as i64);
854+
855+
// Verify first and last rows survived chunking intact
856+
let result = reader
857+
.execute_sql("SELECT id, name FROM large_table ORDER BY id LIMIT 1")
858+
.unwrap();
859+
assert_eq!(
860+
result.column("id").unwrap().i32().unwrap().get(0).unwrap(),
861+
0
862+
);
863+
assert_eq!(
864+
result
865+
.column("name")
866+
.unwrap()
867+
.str()
868+
.unwrap()
869+
.get(0)
870+
.unwrap(),
871+
"item_0"
872+
);
873+
874+
let result = reader
875+
.execute_sql("SELECT id, name FROM large_table ORDER BY id DESC LIMIT 1")
876+
.unwrap();
877+
assert_eq!(
878+
result.column("id").unwrap().i32().unwrap().get(0).unwrap(),
879+
(n - 1) as i32
880+
);
881+
assert_eq!(
882+
result
883+
.column("name")
884+
.unwrap()
885+
.str()
886+
.unwrap()
887+
.get(0)
888+
.unwrap(),
889+
format!("item_{}", n - 1)
890+
);
891+
}
786892
}

0 commit comments

Comments
 (0)