Skip to content

Commit 1078ea4

Browse files
committed
perf(qdp): Implement async prefetching and native f32 dispatch pipelines
1 parent d7fd681 commit 1078ea4

14 files changed

Lines changed: 400 additions & 83 deletions

File tree

qdp/qdp-core/src/dlpack.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,10 @@ impl GpuStateVector {
257257
let (shape, strides) = if let Some(num_samples) = self.num_samples {
258258
// Batch: [num_samples, state_len_per_sample]
259259
debug_assert!(
260-
num_samples > 0 && self.size_elements.is_multiple_of(num_samples),
261-
"Batch state vector size must be divisible by num_samples"
260+
num_samples > 0 && self.size_elements % num_samples == 0,
261+
"Batch mismatch: {} elements cannot be evenly divided into {} samples",
262+
self.size_elements,
263+
num_samples
262264
);
263265
let state_len_per_sample = self.size_elements / num_samples;
264266
let shape = vec![num_samples as i64, state_len_per_sample as i64];

qdp/qdp-core/src/gpu/encodings/amplitude.rs

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,216 @@ impl QuantumEncoder for AmplitudeEncoder {
457457
Ok(batch_state_vector)
458458
}
459459

460+
/// Encode multiple samples in a single GPU allocation and kernel launch for f32 inputs
461+
#[cfg(target_os = "linux")]
462+
fn encode_batch_f32(
463+
&self,
464+
device: &Arc<CudaDevice>,
465+
batch_data: &[f32],
466+
num_samples: usize,
467+
sample_size: usize,
468+
num_qubits: usize,
469+
) -> Result<GpuStateVector> {
470+
crate::profile_scope!("AmplitudeEncoder::encode_batch_f32");
471+
472+
// Validate inputs. Wait, Preprocessor::validate_batch currently takes f64...
473+
// We will just do a basic length check if f32 validation is missing.
474+
let state_len = 1 << num_qubits;
475+
if batch_data.len() != num_samples * sample_size {
476+
return Err(MahoutError::InvalidInput("batch_data length mismatch".into()));
477+
}
478+
479+
let batch_state_vector = {
480+
crate::profile_scope!("GPU::AllocBatch_f32");
481+
GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)?
482+
};
483+
484+
// Upload input data to GPU
485+
let input_batch_gpu = {
486+
crate::profile_scope!("GPU::H2D_InputBatch_f32");
487+
device.htod_sync_copy(batch_data).map_err(|e| {
488+
MahoutError::MemoryAllocation(format!("Failed to upload batch input: {:?}", e))
489+
})?
490+
};
491+
492+
// Compute inverse norms on GPU using warp-reduced kernel
493+
let inv_norms_gpu = {
494+
crate::profile_scope!("GPU::BatchNormKernel_f32");
495+
use cudarc::driver::DevicePtrMut;
496+
let mut buffer = device.alloc_zeros::<f32>(num_samples).map_err(|e| {
497+
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", e))
498+
})?;
499+
500+
let ret = unsafe {
501+
launch_l2_norm_batch_f32(
502+
*input_batch_gpu.device_ptr() as *const f32,
503+
num_samples,
504+
sample_size,
505+
*buffer.device_ptr_mut() as *mut f32,
506+
std::ptr::null_mut(), // default stream
507+
)
508+
};
509+
510+
if ret != 0 {
511+
return Err(MahoutError::KernelLaunch(format!(
512+
"Norm reduction kernel failed: {} ({})",
513+
ret,
514+
cuda_error_to_string(ret)
515+
)));
516+
}
517+
buffer
518+
};
519+
520+
// Validate norms on host
521+
{
522+
crate::profile_scope!("GPU::NormValidation_f32");
523+
let host_inv_norms = device
524+
.dtoh_sync_copy(&inv_norms_gpu)
525+
.map_err(|e| MahoutError::Cuda(format!("Failed to copy norms to host: {:?}", e)))?;
526+
527+
if host_inv_norms.iter().any(|v| !v.is_finite() || *v == 0.0) {
528+
return Err(MahoutError::InvalidInput(
529+
"One or more samples have zero or invalid norm".to_string(),
530+
));
531+
}
532+
}
533+
534+
// Launch batch kernel
535+
{
536+
crate::profile_scope!("GPU::BatchKernelLaunch_f32");
537+
use cudarc::driver::DevicePtr;
538+
let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| {
539+
MahoutError::InvalidInput(
540+
"Batch state vector precision mismatch (expected float32 buffer)".to_string(),
541+
)
542+
})?;
543+
let ret = unsafe {
544+
launch_amplitude_encode_batch_f32(
545+
*input_batch_gpu.device_ptr() as *const f32,
546+
state_ptr as *mut c_void,
547+
*inv_norms_gpu.device_ptr() as *const f32,
548+
num_samples,
549+
sample_size,
550+
state_len,
551+
std::ptr::null_mut(), // default stream
552+
)
553+
};
554+
555+
if ret != 0 {
556+
return Err(MahoutError::KernelLaunch(format!(
557+
"Batch kernel launch failed: {} ({})",
558+
ret,
559+
cuda_error_to_string(ret)
560+
)));
561+
}
562+
}
563+
564+
{
565+
crate::profile_scope!("GPU::Synchronize");
566+
device
567+
.synchronize()
568+
.map_err(|e| MahoutError::Cuda(format!("Sync failed: {:?}", e)))?;
569+
}
570+
571+
Ok(batch_state_vector)
572+
}
573+
574+
#[cfg(target_os = "linux")]
575+
unsafe fn encode_batch_from_gpu_ptr_f32(
576+
&self,
577+
device: &Arc<CudaDevice>,
578+
input_batch_d: *const c_void,
579+
num_samples: usize,
580+
sample_size: usize,
581+
num_qubits: usize,
582+
stream: *mut c_void,
583+
) -> Result<GpuStateVector> {
584+
let state_len = 1 << num_qubits;
585+
if sample_size == 0 {
586+
return Err(MahoutError::InvalidInput(
587+
"Sample size cannot be zero".into(),
588+
));
589+
}
590+
if sample_size > state_len {
591+
return Err(MahoutError::InvalidInput(format!(
592+
"Sample size {} exceeds state vector size {} (2^{} qubits)",
593+
sample_size, state_len, num_qubits
594+
)));
595+
}
596+
let input_batch_d = input_batch_d as *const f32;
597+
let batch_state_vector = {
598+
crate::profile_scope!("GPU::AllocBatch_f32");
599+
GpuStateVector::new_batch(device, num_samples, num_qubits, Precision::Float32)?
600+
};
601+
let inv_norms_gpu = {
602+
crate::profile_scope!("GPU::BatchNormKernel_f32");
603+
use cudarc::driver::DevicePtrMut;
604+
let mut buffer = device.alloc_zeros::<f32>(num_samples).map_err(|e| {
605+
MahoutError::MemoryAllocation(format!("Failed to allocate norm buffer: {:?}", e))
606+
})?;
607+
let ret = unsafe {
608+
launch_l2_norm_batch_f32(
609+
input_batch_d,
610+
num_samples,
611+
sample_size,
612+
*buffer.device_ptr_mut() as *mut f32,
613+
stream,
614+
)
615+
};
616+
if ret != 0 {
617+
return Err(MahoutError::KernelLaunch(format!(
618+
"Norm reduction kernel failed with CUDA error code: {} ({})",
619+
ret,
620+
cuda_error_to_string(ret)
621+
)));
622+
}
623+
buffer
624+
};
625+
{
626+
crate::profile_scope!("GPU::NormValidation_f32");
627+
let host_inv_norms = device
628+
.dtoh_sync_copy(&inv_norms_gpu)
629+
.map_err(|e| MahoutError::Cuda(format!("Failed to copy norms to host: {:?}", e)))?;
630+
if host_inv_norms.iter().any(|v| !v.is_finite() || *v == 0.0) {
631+
return Err(MahoutError::InvalidInput(
632+
"One or more samples have zero or invalid norm".to_string(),
633+
));
634+
}
635+
}
636+
{
637+
crate::profile_scope!("GPU::BatchKernelLaunch_f32");
638+
use cudarc::driver::DevicePtr;
639+
let state_ptr = batch_state_vector.ptr_f32().ok_or_else(|| {
640+
MahoutError::InvalidInput(
641+
"Batch state vector precision mismatch (expected float32 buffer)".to_string(),
642+
)
643+
})?;
644+
let ret = unsafe {
645+
launch_amplitude_encode_batch_f32(
646+
input_batch_d,
647+
state_ptr as *mut c_void,
648+
*inv_norms_gpu.device_ptr() as *const f32,
649+
num_samples,
650+
sample_size,
651+
state_len,
652+
stream,
653+
)
654+
};
655+
if ret != 0 {
656+
return Err(MahoutError::KernelLaunch(format!(
657+
"Batch kernel launch failed with CUDA error code: {} ({})",
658+
ret,
659+
cuda_error_to_string(ret)
660+
)));
661+
}
662+
}
663+
{
664+
crate::profile_scope!("GPU::Synchronize");
665+
sync_cuda_stream(stream, "CUDA stream synchronize failed")?;
666+
}
667+
Ok(batch_state_vector)
668+
}
669+
460670
fn name(&self) -> &'static str {
461671
"amplitude"
462672
}

qdp/qdp-core/src/gpu/encodings/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,41 @@ pub trait QuantumEncoder: Send + Sync {
134134
self.name()
135135
)))
136136
}
137+
138+
/// Encode multiple samples in a single GPU allocation and kernel launch using f32 inputs.
139+
fn encode_batch_f32(
140+
&self,
141+
_device: &Arc<CudaDevice>,
142+
_batch_data: &[f32],
143+
_num_samples: usize,
144+
_sample_size: usize,
145+
_num_qubits: usize,
146+
) -> Result<GpuStateVector> {
147+
Err(MahoutError::NotImplemented(format!(
148+
"encode_batch_f32 not implemented for {}",
149+
self.name()
150+
)))
151+
}
152+
153+
/// Encode batch from existing GPU pointer (zero-copy) for f32 inputs.
154+
///
155+
/// # Safety
156+
/// Caller must ensure `input_batch_d` points to valid GPU memory (f32).
157+
#[cfg(target_os = "linux")]
158+
unsafe fn encode_batch_from_gpu_ptr_f32(
159+
&self,
160+
_device: &Arc<CudaDevice>,
161+
_input_batch_d: *const c_void,
162+
_num_samples: usize,
163+
_sample_size: usize,
164+
_num_qubits: usize,
165+
_stream: *mut c_void,
166+
) -> Result<GpuStateVector> {
167+
Err(MahoutError::NotImplemented(format!(
168+
"encode_batch_from_gpu_ptr_f32 not supported for {}",
169+
self.name()
170+
)))
171+
}
137172
}
138173

139174
// Encoding implementations

qdp/qdp-core/src/gpu/pipeline.rs

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ where
290290
"Alignment must be greater than zero".to_string(),
291291
));
292292
}
293-
if !host_data.len().is_multiple_of(align_elements) {
293+
if host_data.len() % align_elements != 0 {
294294
return Err(MahoutError::InvalidInput(format!(
295295
"Host data length {} is not aligned to {} elements",
296296
host_data.len(),
@@ -403,14 +403,14 @@ where
403403

404404
// Record copy start if overlap tracking enabled
405405
// Note: Overlap tracking is optional observability - failures should not stop the pipeline
406-
if let Some(ref tracker) = overlap_tracker
407-
&& let Err(e) = tracker.record_copy_start(&ctx.stream_copy, event_slot)
408-
{
409-
log::warn!(
410-
"Chunk {}: Failed to record copy start event: {}. Overlap tracking may be incomplete.",
411-
chunk_idx,
412-
e
413-
);
406+
if let Some(ref tracker) = overlap_tracker {
407+
if let Err(e) = tracker.record_copy_start(&ctx.stream_copy, event_slot) {
408+
log::warn!(
409+
"Chunk {}: Failed to record copy start event: {}. Overlap tracking may be incomplete.",
410+
chunk_idx,
411+
e
412+
);
413+
}
414414
}
415415

416416
unsafe {
@@ -422,14 +422,14 @@ where
422422

423423
// Record copy end if overlap tracking enabled
424424
// Note: Overlap tracking is optional observability - failures should not stop the pipeline
425-
if let Some(ref tracker) = overlap_tracker
426-
&& let Err(e) = tracker.record_copy_end(&ctx.stream_copy, event_slot)
427-
{
428-
log::warn!(
429-
"Chunk {}: Failed to record copy end event: {}. Overlap tracking may be incomplete.",
430-
chunk_idx,
431-
e
432-
);
425+
if let Some(ref tracker) = overlap_tracker {
426+
if let Err(e) = tracker.record_copy_end(&ctx.stream_copy, event_slot) {
427+
log::warn!(
428+
"Chunk {}: Failed to record copy end event: {}. Overlap tracking may be incomplete.",
429+
chunk_idx,
430+
e
431+
);
432+
}
433433
}
434434

435435
ctx.record_copy_done(event_slot)?;
@@ -456,28 +456,28 @@ where
456456

457457
// Record compute start if overlap tracking enabled
458458
// Note: Overlap tracking is optional observability - failures should not stop the pipeline
459-
if let Some(ref tracker) = overlap_tracker
460-
&& let Err(e) = tracker.record_compute_start(&ctx.stream_compute, event_slot)
461-
{
462-
log::warn!(
463-
"Chunk {}: Failed to record compute start event: {}. Overlap tracking may be incomplete.",
464-
chunk_idx,
465-
e
466-
);
459+
if let Some(ref tracker) = overlap_tracker {
460+
if let Err(e) = tracker.record_compute_start(&ctx.stream_compute, event_slot) {
461+
log::warn!(
462+
"Chunk {}: Failed to record compute start event: {}. Overlap tracking may be incomplete.",
463+
chunk_idx,
464+
e
465+
);
466+
}
467467
}
468468

469469
kernel_launcher(&ctx.stream_compute, input_ptr, chunk_offset, chunk.len())?;
470470

471471
// Record compute end if overlap tracking enabled
472472
// Note: Overlap tracking is optional observability - failures should not stop the pipeline
473-
if let Some(ref tracker) = overlap_tracker
474-
&& let Err(e) = tracker.record_compute_end(&ctx.stream_compute, event_slot)
475-
{
476-
log::warn!(
477-
"Chunk {}: Failed to record compute end event: {}. Overlap tracking may be incomplete.",
478-
chunk_idx,
479-
e
480-
);
473+
if let Some(ref tracker) = overlap_tracker {
474+
if let Err(e) = tracker.record_compute_end(&ctx.stream_compute, event_slot) {
475+
log::warn!(
476+
"Chunk {}: Failed to record compute end event: {}. Overlap tracking may be incomplete.",
477+
chunk_idx,
478+
e
479+
);
480+
}
481481
}
482482
}
483483

@@ -489,11 +489,10 @@ where
489489
// Note: log_overlap now handles both success and failure cases internally,
490490
// logging at appropriate levels (INFO for visibility, DEBUG for details).
491491
#[allow(clippy::manual_is_multiple_of)]
492-
if let Some(ref tracker) = overlap_tracker
493-
&& (chunk_idx % 10 == 0 || chunk_idx == 0)
494-
{
495-
// Only log every Nth chunk to avoid excessive logging
496-
// Note: log_overlap waits for events to complete, which may take time
492+
if let Some(ref tracker) = overlap_tracker {
493+
if chunk_idx % 10 == 0 || chunk_idx == 0 {
494+
// Only log every Nth chunk to avoid excessive logging
495+
// Note: log_overlap waits for events to complete, which may take time
497496
// If events fail (e.g., invalid resource handle), log_overlap will log
498497
// at INFO level so it's visible in both debug and info modes
499498
if let Err(e) = tracker.log_overlap(chunk_idx) {
@@ -508,6 +507,7 @@ where
508507
}
509508
// Don't fail the pipeline - overlap tracking is optional observability
510509
}
510+
}
511511
}
512512

513513
// Keep buffer alive until synchronization

0 commit comments

Comments
 (0)