Skip to content

Commit 860b8b2

Browse files
mivertowskiclaude
andcommitted
Fix clippy warnings and formatting issues
- Use derive(Default) with #[default] attribute for SizeBucket and QueueTier - Remove unused std::io::Write import in persistent_fdtd.rs - Apply cargo fmt formatting fixes across multiple files Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 5fa0559 commit 860b8b2

12 files changed

Lines changed: 311 additions & 85 deletions

File tree

CHANGELOG.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,43 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
### Added
1313

14+
#### Multi-Kernel Dispatch and Persistent Message Routing
15+
16+
- **`#[derive(PersistentMessage)]` macro** (`ringkernel-derive`)
17+
- Automatic `handler_id` generation for GPU kernel dispatch
18+
- Inline payload serialization with response tracking
19+
- Compile-time handler registration
20+
21+
- **`KernelDispatcher`** (`ringkernel-core/src/dispatcher.rs`) - **NEW FILE**
22+
- Type-based message routing via K2K broker
23+
- `DispatcherBuilder` with fluent configuration API
24+
- `DispatcherConfig` for routing behavior customization
25+
- `DispatcherMetrics` for observability (messages dispatched, errors, latency)
26+
27+
- **CUDA Handler Dispatch Code Generator** (`ringkernel-cuda-codegen/src/ring_kernel.rs`)
28+
- `CudaDispatchTable` for handler registration
29+
- Switch-based dispatch code generation
30+
- `ExtendedH2KMessage` struct generation for typed payloads
31+
32+
- **Queue Tiering System** (`ringkernel-core/src/queue.rs`)
33+
- `QueueTier` enum: Small (256), Medium (1024), Large (4096), ExtraLarge (16384)
34+
- `QueueFactory` for creating appropriately-sized message queues
35+
- `QueueMonitor` for queue health checking with configurable thresholds
36+
- `QueueMetrics` for observability (enqueue/dequeue counts, peak depth)
37+
- `for_throughput()` method for automatic tier selection based on message rate
38+
39+
- **Persistent Message Infrastructure** (`ringkernel-core/src/persistent_message.rs`) - **NEW FILE**
40+
- `PersistentMessage` trait for GPU-dispatchable messages
41+
- `DispatchTable` for runtime handler registration
42+
- `HandlerId` type for type-safe handler identification
43+
44+
#### CUDA NVRTC Compilation
45+
46+
- **`compile_ptx()` function** (`ringkernel-cuda/src/lib.rs`)
47+
- Wraps `cudarc::nvrtc::compile_ptx` for downstream crates
48+
- Compile CUDA source to PTX without direct cudarc dependency
49+
- Returns PTX string or compilation error
50+
1451
#### Memory Pool Management
1552

1653
- **Size-Stratified Memory Pool** (`ringkernel-core/src/memory.rs`)

README.md

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,54 @@ let buffer = cache.acquire::<f32>(4, ReductionOp::Sum)?;
270270
// Buffer automatically returned to cache on drop
271271
```
272272

273+
### Multi-Kernel Dispatch
274+
275+
Route messages to GPU kernels based on message type:
276+
277+
```rust
278+
use ringkernel_core::prelude::*;
279+
280+
// Define a persistent message with automatic handler ID
281+
#[derive(PersistentMessage)]
282+
struct ComputeTask {
283+
data: Vec<f32>,
284+
}
285+
286+
// Create dispatcher with K2K broker
287+
let dispatcher = DispatcherBuilder::new(k2k_broker)
288+
.with_default_priority(Priority::Normal)
289+
.build();
290+
291+
// Dispatch routes to appropriate kernel based on message type
292+
dispatcher.dispatch(kernel_id, envelope).await?;
293+
294+
// Check metrics
295+
let metrics = dispatcher.metrics();
296+
println!("Dispatched: {}, Errors: {}", metrics.messages_dispatched, metrics.errors);
297+
```
298+
299+
### Queue Tiering
300+
301+
Select queue capacity based on expected throughput:
302+
303+
```rust
304+
use ringkernel_core::queue::{QueueTier, QueueFactory, QueueMonitor};
305+
306+
// Automatic tier selection based on message rate
307+
let tier = QueueTier::for_throughput(10_000, 100); // 10k msg/s, 100ms buffer
308+
// Returns QueueTier::Medium (1024 capacity)
309+
310+
// Manual tier selection
311+
let queue = QueueFactory::create_spsc(QueueTier::Large); // 4096 capacity
312+
313+
// Monitor queue health
314+
let monitor = QueueMonitor::new(queue, QueueMonitorConfig::default());
315+
let health = monitor.check_health();
316+
println!("Depth: {}, Healthy: {}", health.current_depth, health.is_healthy);
317+
```
318+
319+
Available tiers: `Small` (256), `Medium` (1024), `Large` (4096), `ExtraLarge` (16384).
320+
273321
### Kernel-to-Kernel Messaging
274322

275323
Direct communication between kernels:
@@ -335,6 +383,38 @@ struct Matrix4x4 {
335383
}
336384
```
337385

386+
### Domain System
387+
388+
Organize messages by business domain with automatic type ID allocation:
389+
390+
```rust
391+
use ringkernel::prelude::*;
392+
393+
// 20 predefined domains with reserved type ID ranges
394+
#[derive(RingMessage)]
395+
#[message(domain = "FraudDetection")] // Type IDs 1000-1099
396+
struct SuspiciousTransaction {
397+
#[message(id)]
398+
id: MessageId,
399+
amount: f64,
400+
risk_score: f32,
401+
}
402+
403+
#[derive(RingMessage)]
404+
#[message(domain = "ProcessIntelligence")] // Type IDs 1500-1599
405+
struct ActivityEvent {
406+
#[message(id)]
407+
id: MessageId,
408+
case_id: u64,
409+
activity: String,
410+
}
411+
412+
// Check domain at runtime
413+
let domain = Domain::from_type_id(1050); // Returns Some(Domain::FraudDetection)
414+
```
415+
416+
Available domains: `GraphAnalytics`, `StatisticalML`, `Compliance`, `RiskManagement`, `OrderMatching`, `MarketData`, `Settlement`, `Accounting`, `NetworkAnalysis`, `FraudDetection`, `TimeSeries`, `Simulation`, `Banking`, `BehavioralAnalytics`, `ProcessIntelligence`, `Clearing`, `TreasuryManagement`, `PaymentProcessing`, `FinancialAudit`, `Custom`.
417+
338418
## Examples
339419

340420
The repository includes 20+ working examples organized by category:
@@ -410,7 +490,7 @@ See the [Showcase Applications Guide](docs/15-showcase-applications.md) for deta
410490
## Testing
411491

412492
```bash
413-
# Run all tests (775+ tests)
493+
# Run all tests (825+ tests)
414494
cargo test --workspace
415495

416496
# CUDA backend tests (requires NVIDIA GPU)
@@ -577,7 +657,25 @@ Detailed documentation is also available in the `docs/` directory:
577657

578658
## GPU Code Generation
579659

580-
Write GPU kernels in Rust and transpile them to CUDA C or WGSL. Both transpilers support three kernel types with unified API:
660+
Write GPU kernels in Rust and transpile them to CUDA C or WGSL. Both transpilers support three kernel types with unified API.
661+
662+
### CUDA PTX Compilation
663+
664+
Compile CUDA source to PTX without directly depending on cudarc:
665+
666+
```rust
667+
use ringkernel_cuda::compile_ptx;
668+
669+
let cuda_source = r#"
670+
extern "C" __global__ void add(float* a, float* b, float* c, int n) {
671+
int idx = blockIdx.x * blockDim.x + threadIdx.x;
672+
if (idx < n) c[idx] = a[idx] + b[idx];
673+
}
674+
"#;
675+
676+
let ptx = compile_ptx(cuda_source)?;
677+
// Load PTX into CUDA module and execute
678+
```
581679

582680
### Backend Selection
583681

crates/ringkernel-core/src/analytics_context.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,13 +178,20 @@ impl AnalyticsContext {
178178
/// # Type Parameters
179179
///
180180
/// * `T` - Element type (must be Copy and have a meaningful zero value)
181-
pub fn allocate_typed<T: Copy + Default + 'static>(&mut self, count: usize) -> AllocationHandle {
181+
pub fn allocate_typed<T: Copy + Default + 'static>(
182+
&mut self,
183+
count: usize,
184+
) -> AllocationHandle {
182185
let size = count * std::mem::size_of::<T>();
183186
let handle = self.allocate(size);
184187

185188
// Track typed allocation
186189
self.stats.typed_allocations += 1;
187-
*self.stats.allocations_by_type.entry(TypeId::of::<T>()).or_insert(0) += 1;
190+
*self
191+
.stats
192+
.allocations_by_type
193+
.entry(TypeId::of::<T>())
194+
.or_insert(0) += 1;
188195

189196
handle
190197
}
@@ -337,7 +344,9 @@ impl AnalyticsContextBuilder {
337344
/// Build the context.
338345
pub fn build(self) -> AnalyticsContext {
339346
let mut ctx = match self.expected_allocations {
340-
Some(cap) => AnalyticsContext::with_capacity(self.name, cap.max(self.preallocations.len())),
347+
Some(cap) => {
348+
AnalyticsContext::with_capacity(self.name, cap.max(self.preallocations.len()))
349+
}
341350
None => AnalyticsContext::new(self.name),
342351
};
343352

crates/ringkernel-core/src/dispatcher.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,7 @@ impl KernelDispatcher {
247247
// Dispatch via K2K broker
248248
let receipt = self
249249
.broker
250-
.send_priority(
251-
source,
252-
kernel_id,
253-
envelope,
254-
self.config.default_priority,
255-
)
250+
.send_priority(source, kernel_id, envelope, self.config.default_priority)
256251
.await?;
257252

258253
// Update metrics

crates/ringkernel-core/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,16 +86,16 @@ pub mod prelude {
8686
};
8787
pub use crate::context::*;
8888
pub use crate::control::*;
89+
pub use crate::dispatcher::{
90+
DispatcherBuilder, DispatcherConfig, DispatcherMetrics, KernelDispatcher,
91+
};
8992
pub use crate::domain::{Domain, DomainMessage, DomainParseError};
9093
pub use crate::error::*;
9194
pub use crate::health::{
9295
BackoffStrategy, CircuitBreaker, CircuitBreakerConfig, CircuitBreakerStats, CircuitState,
9396
DegradationLevel, DegradationManager, DegradationStats, HealthCheck, HealthCheckResult,
9497
HealthChecker, HealthStatus, KernelHealth, KernelWatchdog, LoadSheddingPolicy, RetryPolicy,
9598
};
96-
pub use crate::dispatcher::{
97-
DispatcherBuilder, DispatcherConfig, DispatcherMetrics, KernelDispatcher,
98-
};
9999
pub use crate::hlc::*;
100100
pub use crate::k2k::{
101101
DeliveryStatus, K2KBroker, K2KBuilder, K2KConfig, K2KEndpoint, K2KMessage,

crates/ringkernel-core/src/memory.rs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -324,13 +324,14 @@ pub fn create_pool(
324324
///
325325
/// Provides predefined size classes for efficient multi-size pooling.
326326
/// Allocations are rounded up to the smallest bucket that fits.
327-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
327+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
328328
pub enum SizeBucket {
329329
/// Tiny buffers (256 bytes) - metadata, small messages.
330330
Tiny,
331331
/// Small buffers (1 KB) - typical message payloads.
332332
Small,
333333
/// Medium buffers (4 KB) - page-sized allocations.
334+
#[default]
334335
Medium,
335336
/// Large buffers (16 KB) - batch operations.
336337
Large,
@@ -399,12 +400,6 @@ impl SizeBucket {
399400
}
400401
}
401402

402-
impl Default for SizeBucket {
403-
fn default() -> Self {
404-
Self::Medium
405-
}
406-
}
407-
408403
impl std::fmt::Display for SizeBucket {
409404
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410405
match self {
@@ -442,7 +437,11 @@ impl StratifiedPoolStats {
442437

443438
/// Get hit rate for a specific bucket.
444439
pub fn bucket_hit_rate(&self, bucket: SizeBucket) -> f64 {
445-
let allocs = self.allocations_per_bucket.get(&bucket).copied().unwrap_or(0);
440+
let allocs = self
441+
.allocations_per_bucket
442+
.get(&bucket)
443+
.copied()
444+
.unwrap_or(0);
446445
let hits = self.hits_per_bucket.get(&bucket).copied().unwrap_or(0);
447446
if allocs == 0 {
448447
0.0
@@ -495,7 +494,10 @@ impl StratifiedMemoryPool {
495494

496495
for bucket in SizeBucket::ALL {
497496
let pool_name = format!("{}_{}", name, bucket);
498-
buckets.insert(bucket, MemoryPool::new(pool_name, bucket.size(), max_buffers_per_bucket));
497+
buckets.insert(
498+
bucket,
499+
MemoryPool::new(pool_name, bucket.size(), max_buffers_per_bucket),
500+
);
499501
}
500502

501503
Self {
@@ -552,7 +554,10 @@ impl StratifiedMemoryPool {
552554

553555
/// Get current size of a specific bucket pool.
554556
pub fn bucket_size(&self, bucket: SizeBucket) -> usize {
555-
self.buckets.get(&bucket).map(|p| p.current_size()).unwrap_or(0)
557+
self.buckets
558+
.get(&bucket)
559+
.map(|p| p.current_size())
560+
.unwrap_or(0)
556561
}
557562

558563
/// Get total buffers currently pooled across all buckets.
@@ -662,7 +667,10 @@ pub fn create_stratified_pool_with_capacity(
662667
name: impl Into<String>,
663668
max_buffers_per_bucket: usize,
664669
) -> SharedStratifiedPool {
665-
Arc::new(StratifiedMemoryPool::with_capacity(name, max_buffers_per_bucket))
670+
Arc::new(StratifiedMemoryPool::with_capacity(
671+
name,
672+
max_buffers_per_bucket,
673+
))
666674
}
667675

668676
// ============================================================================
@@ -698,7 +706,11 @@ impl std::fmt::Debug for PressureReaction {
698706
match self {
699707
Self::None => write!(f, "PressureReaction::None"),
700708
Self::Shrink { target_utilization } => {
701-
write!(f, "PressureReaction::Shrink {{ target_utilization: {} }}", target_utilization)
709+
write!(
710+
f,
711+
"PressureReaction::Shrink {{ target_utilization: {} }}",
712+
target_utilization
713+
)
702714
}
703715
Self::Callback(_) => write!(f, "PressureReaction::Callback(<fn>)"),
704716
}
@@ -1006,9 +1018,9 @@ mod tests {
10061018
let pool = StratifiedMemoryPool::new("test");
10071019

10081020
// Allocate different sizes
1009-
let buf1 = pool.allocate(100); // Tiny
1010-
let buf2 = pool.allocate(500); // Small
1011-
let buf3 = pool.allocate(2000); // Medium
1021+
let buf1 = pool.allocate(100); // Tiny
1022+
let buf2 = pool.allocate(500); // Small
1023+
let buf3 = pool.allocate(2000); // Medium
10121024

10131025
assert_eq!(buf1.bucket(), SizeBucket::Tiny);
10141026
assert_eq!(buf2.bucket(), SizeBucket::Small);
@@ -1046,14 +1058,20 @@ mod tests {
10461058
let pool = StratifiedMemoryPool::new("test");
10471059

10481060
// Allocate from different buckets
1049-
let _buf1 = pool.allocate(100); // Tiny
1050-
let _buf2 = pool.allocate(500); // Small
1051-
let _buf3 = pool.allocate(100); // Tiny again
1061+
let _buf1 = pool.allocate(100); // Tiny
1062+
let _buf2 = pool.allocate(500); // Small
1063+
let _buf3 = pool.allocate(100); // Tiny again
10521064

10531065
let stats = pool.stats();
10541066
assert_eq!(stats.total_allocations, 3);
1055-
assert_eq!(stats.allocations_per_bucket.get(&SizeBucket::Tiny), Some(&2));
1056-
assert_eq!(stats.allocations_per_bucket.get(&SizeBucket::Small), Some(&1));
1067+
assert_eq!(
1068+
stats.allocations_per_bucket.get(&SizeBucket::Tiny),
1069+
Some(&2)
1070+
);
1071+
assert_eq!(
1072+
stats.allocations_per_bucket.get(&SizeBucket::Small),
1073+
Some(&1)
1074+
);
10571075
}
10581076

10591077
#[test]
@@ -1173,7 +1191,9 @@ mod tests {
11731191
let none = PressureReaction::None;
11741192
assert!(format!("{:?}", none).contains("None"));
11751193

1176-
let shrink = PressureReaction::Shrink { target_utilization: 0.5 };
1194+
let shrink = PressureReaction::Shrink {
1195+
target_utilization: 0.5,
1196+
};
11771197
assert!(format!("{:?}", shrink).contains("0.5"));
11781198

11791199
let callback = PressureReaction::Callback(Box::new(|_| {}));

crates/ringkernel-core/src/persistent_message.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,7 @@ mod tests {
256256

257257
table.register(HandlerRegistration::new(1, "fraud_check", 1001));
258258
table.register(HandlerRegistration::new(2, "aggregate", 1002));
259-
table.register(
260-
HandlerRegistration::new(3, "pattern_detect", 1003).with_response(2003),
261-
);
259+
table.register(HandlerRegistration::new(3, "pattern_detect", 1003).with_response(2003));
262260

263261
assert_eq!(table.len(), 3);
264262
assert_eq!(table.max_handler_id(), 3);

0 commit comments

Comments
 (0)