55import os
66import glob
77import time
8+ import ctypes
89from collections import defaultdict
910
1011import numpy as np
@@ -93,6 +94,20 @@ def __init__(self, ptr, shape, dtype):
9394 'version' : 2 ,
9495 }
9596
97+ _TORCH_TO_CTYPE = {
98+ torch .uint8 : ctypes .c_uint8 ,
99+ torch .float32 : ctypes .c_float ,
100+ }
101+
102+ def _cpu_tensor (ptr , shape , dtype ):
103+ '''Zero-copy CPU tensor from a raw pointer via ctypes.'''
104+ ctype = _TORCH_TO_CTYPE [dtype ]
105+ n = 1
106+ for s in shape :
107+ n *= s
108+ arr = (ctype * n ).from_address (ptr )
109+ return torch .frombuffer (arr , dtype = dtype ).reshape (shape )
110+
96111class PuffeRL :
97112 def __init__ (self , args , vec , policy , verbose = True ):
98113 config = args ['train' ]
@@ -103,16 +118,25 @@ def __init__(self, args, vec, policy, verbose=True):
103118 torch .backends .cudnn .benchmark = True
104119
105120 self ._vec = vec
121+ self .gpu = vec .gpu
106122 total_agents = vec .total_agents
107123 self .total_agents = total_agents
108124 obs_dtype = _OBS_DTYPE_MAP .get (vec .obs_dtype , torch .uint8 )
109125
110- self .vec_obs = torch .as_tensor (_CudaPtr (vec .gpu_obs_ptr ,
111- (total_agents , vec .obs_size ), obs_dtype ))
112- self .vec_rewards = torch .as_tensor (_CudaPtr (vec .gpu_rewards_ptr ,
113- (total_agents ,), torch .float32 ))
114- self .vec_terminals = torch .as_tensor (_CudaPtr (vec .gpu_terminals_ptr ,
115- (total_agents ,), torch .float32 ))
126+ if self .gpu :
127+ self .vec_obs = torch .as_tensor (_CudaPtr (vec .gpu_obs_ptr ,
128+ (total_agents , vec .obs_size ), obs_dtype ))
129+ self .vec_rewards = torch .as_tensor (_CudaPtr (vec .gpu_rewards_ptr ,
130+ (total_agents ,), torch .float32 ))
131+ self .vec_terminals = torch .as_tensor (_CudaPtr (vec .gpu_terminals_ptr ,
132+ (total_agents ,), torch .float32 ))
133+ else :
134+ self .vec_obs = _cpu_tensor (vec .obs_ptr ,
135+ (total_agents , vec .obs_size ), obs_dtype )
136+ self .vec_rewards = _cpu_tensor (vec .rewards_ptr ,
137+ (total_agents ,), torch .float32 )
138+ self .vec_terminals = _cpu_tensor (vec .terminals_ptr ,
139+ (total_agents ,), torch .float32 )
116140
117141 vec .reset ()
118142 horizon = config ['horizon' ]
@@ -148,7 +172,7 @@ def __init__(self, args, vec, policy, verbose=True):
148172 self .last_log_step = 0
149173 self .last_log_time = time .time ()
150174 self .start_time = time .time ()
151- self .profile = Profile ()
175+ self .profile = Profile (gpu = self . gpu )
152176 self .verbose = verbose
153177
154178 self .model_size = sum (p .numel () for p in policy .parameters () if p .requires_grad )
@@ -203,9 +227,13 @@ def rollouts(self):
203227 self .values [t ] = value .flatten ()
204228
205229 prof .mark (2 )
206- actions_gpu = (action .T if action .dim () > 1 else action .unsqueeze (- 1 )).to (dtype = torch .float32 , device = 'cuda' ).contiguous ()
207- self ._vec .step (actions_gpu .data_ptr ())
208- torch .cuda .synchronize ()
230+ actions_flat = (action .T if action .dim () > 1 else action .unsqueeze (- 1 )).to (dtype = torch .float32 ).contiguous ()
231+ if self .gpu :
232+ actions_flat = actions_flat .cuda ()
233+ self ._vec .gpu_step (actions_flat .data_ptr ())
234+ torch .cuda .synchronize ()
235+ else :
236+ self ._vec .cpu_step (actions_flat .data_ptr ())
209237 o , r , d = self .vec_obs , self .vec_rewards , self .vec_terminals
210238 prof .mark (3 )
211239
@@ -348,7 +376,7 @@ def log(self):
348376 'train_misc' : perf [P .TRAIN_MISC ],
349377 'train_forward' : perf [P .TRAIN_FORWARD ],
350378 },
351- 'util' : dict (_C .get_utilization (self .args .get ('gpu_id' , 0 ))),
379+ 'util' : dict (_C .get_utilization (self .args .get ('gpu_id' , 0 ))) if self . gpu else {} ,
352380 }
353381 self .last_log_time = time .time ()
354382 self .last_log_step = self .global_step
@@ -376,7 +404,8 @@ def create_pufferl(cls, args):
376404 os .environ ['CUDA_VISIBLE_DEVICES' ] = str (local_rank )
377405
378406 args ['vec' ]['num_buffers' ] = 1
379- vec = _C .create_vec (args )
407+ gpu = 1 if device == 'cuda' else 0
408+ vec = _C .create_vec (args , gpu )
380409 policy = load_policy (args , vec )
381410
382411 if 'LOCAL_RANK' in os .environ :
@@ -395,7 +424,8 @@ def create_pufferl(cls, args):
395424def compute_puff_advantage (values , rewards , terminals ,
396425 ratio , advantages , gamma , gae_lambda , vtrace_rho_clip , vtrace_c_clip ):
397426 num_steps , horizon = values .shape
398- _C .puff_advantage (
427+ fn = _C .puff_advantage if values .is_cuda else _C .puff_advantage_cpu
428+ fn (
399429 values .data_ptr (), rewards .data_ptr (), terminals .data_ptr (),
400430 ratio .data_ptr (), advantages .data_ptr (),
401431 num_steps , horizon ,
@@ -406,16 +436,26 @@ class Profile:
406436 '''Matches pufferlib.cu profiling: accumulate ms, report seconds.'''
407437 ROLLOUT , EVAL_GPU , EVAL_ENV , TRAIN , TRAIN_MISC , TRAIN_FORWARD , NUM = range (7 )
408438
409- def __init__ (self ):
439+ def __init__ (self , gpu = True ):
410440 self .accum = [0.0 ] * Profile .NUM
411- self ._events = [torch .cuda .Event (enable_timing = True ) for _ in range (4 )]
441+ self .gpu = gpu
442+ if gpu :
443+ self ._events = [torch .cuda .Event (enable_timing = True ) for _ in range (4 )]
444+ else :
445+ self ._stamps = [0.0 ] * 4
412446
413447 def mark (self , idx ):
414- self ._events [idx ].record ()
448+ if self .gpu :
449+ self ._events [idx ].record ()
450+ else :
451+ self ._stamps [idx ] = time .perf_counter ()
415452
416453 def elapsed (self , idx , start_ev , end_ev ):
417- self ._events [end_ev ].synchronize ()
418- self .accum [idx ] += self ._events [start_ev ].elapsed_time (self ._events [end_ev ])
454+ if self .gpu :
455+ self ._events [end_ev ].synchronize ()
456+ self .accum [idx ] += self ._events [start_ev ].elapsed_time (self ._events [end_ev ])
457+ else :
458+ self .accum [idx ] += (self ._stamps [end_ev ] - self ._stamps [start_ev ]) * 1000.0
419459
420460 def read_and_reset (self ):
421461 out = [v / 1000.0 for v in self .accum ]
0 commit comments