@@ -60,7 +60,8 @@ extern ATOMIC_INT disp_hup;
6060static unsigned int q_next , q_last ; /* Fallback when atomics are absent */
6161extern volatile ATOMIC_INT disp_hup ;
6262#endif
63- static unsigned int q_depth , processing_suspended , overflowed ;
63+ static unsigned int q_depth , overflowed ;
64+ static ATOMIC_UNSIGNED processing_suspended ;
6465static ATOMIC_UNSIGNED currently_used , max_used ;
6566static int queue_full_warning = 0 ;
6667static int persist_fd = -1 ;
@@ -69,10 +70,48 @@ static int persist_sync = 0;
6970
7071void reset_suspended (void )
7172{
72- processing_suspended = 0 ;
73+ AUDIT_ATOMIC_STORE ( processing_suspended , 0 ) ;
7374 queue_full_warning = 0 ;
7475}
7576
77+ /*
78+ * Increment the queue depth counter and preserve the largest value seen.
79+ * The max update is best-effort and uses compare-exchange to avoid losing
80+ * concurrent updates from the producer and consumer threads.
81+ */
82+ static unsigned int increase_used_count (void )
83+ {
84+ unsigned int used ;
85+ unsigned int max ;
86+
87+ #ifdef HAVE_ATOMIC
88+ used = atomic_fetch_add_explicit (& currently_used , 1 ,
89+ memory_order_relaxed ) + 1 ;
90+ max = atomic_load_explicit (& max_used , memory_order_relaxed );
91+ while (used > max &&
92+ !atomic_compare_exchange_weak_explicit (& max_used , & max , used ,
93+ memory_order_relaxed , memory_order_relaxed ))
94+ ;
95+ #else
96+ used = ++ currently_used ;
97+ if (used > max_used )
98+ max_used = used ;
99+ #endif
100+ return used ;
101+ }
102+
103+ /*
104+ * Decrement the queue depth counter after the consumer removes an event.
105+ */
106+ static void decrease_used_count (void )
107+ {
108+ #ifdef HAVE_ATOMIC
109+ atomic_fetch_sub_explicit (& currently_used , 1 , memory_order_relaxed );
110+ #else
111+ currently_used -- ;
112+ #endif
113+ }
114+
76115static int queue_load_file (int fd )
77116{
78117 FILE * f ;
@@ -116,9 +155,8 @@ static int queue_load_file(int fd)
116155 q_next = count % q_depth ;
117156 q_last = 0 ;
118157 #endif
119- currently_used = count ;
120- if (max_used < count )
121- max_used = count ;
158+ AUDIT_ATOMIC_STORE (currently_used , count );
159+ AUDIT_ATOMIC_STORE (max_used , count );
122160
123161 fclose (f );
124162 return 0 ;
@@ -139,7 +177,7 @@ int init_queue_extended(unsigned int size, int flags, const char *path)
139177 q_depth = size ;
140178 q = malloc (q_depth * sizeof (event_t * ));
141179 if (q == NULL ) {
142- processing_suspended = 1 ;
180+ AUDIT_ATOMIC_STORE ( processing_suspended , 1 ) ;
143181 return -1 ;
144182 }
145183
@@ -202,7 +240,7 @@ static int do_overflow_action(struct disp_conf *config)
202240 case O_SUSPEND :
203241 syslog (LOG_ALERT ,
204242 "Auditd is suspending event passing to plugins due to overflowing its queue." );
205- processing_suspended = 1 ;
243+ AUDIT_ATOMIC_STORE ( processing_suspended , 1 ) ;
206244 break ;
207245 case O_SINGLE :
208246 syslog (LOG_ALERT ,
@@ -231,7 +269,7 @@ int enqueue(event_t *e, struct disp_conf *config)
231269{
232270 unsigned int n , retry_cnt = 0 ;
233271
234- if (processing_suspended ) {
272+ if (AUDIT_ATOMIC_LOAD ( processing_suspended ) ) {
235273 free (e );
236274 return 1 ;
237275 }
@@ -271,9 +309,7 @@ int enqueue(event_t *e, struct disp_conf *config)
271309#else
272310 q_next = (n + 1 ) % q_depth ;
273311#endif
274- currently_used ++ ;
275- if (currently_used > max_used )
276- max_used = currently_used ;
312+ increase_used_count ();
277313 if (persist_fd >= 0 ) {
278314 if (write (persist_fd , e -> data , e -> hdr .size ) < 0 ) {
279315 /* Log error but continue - persistence is not critical */
@@ -329,7 +365,7 @@ static event_t *dequeue_common(void)
329365#else
330366 q_last = (n + 1 ) % q_depth ;
331367#endif
332- currently_used -- ;
368+ decrease_used_count () ;
333369 } else
334370 e = NULL ;
335371
@@ -413,18 +449,21 @@ void increase_queue_depth(unsigned int size)
413449
414450void write_queue_state (FILE * f )
415451{
416- fprintf (f , "current plugin queue depth = %u\n" , currently_used );
417- fprintf (f , "max plugin queue depth used = %u\n" , max_used );
452+ fprintf (f , "current plugin queue depth = %u\n" ,
453+ AUDIT_ATOMIC_LOAD (currently_used ));
454+ fprintf (f , "max plugin queue depth used = %u\n" ,
455+ AUDIT_ATOMIC_LOAD (max_used ));
418456 fprintf (f , "plugin queue size = %u\n" , q_depth );
419457 fprintf (f , "plugin queue overflow detected = %s\n" ,
420458 overflowed ? "yes" : "no" );
421459 fprintf (f , "plugin queueing suspended = %s\n" ,
422- processing_suspended ? "yes" : "no" );
460+ AUDIT_ATOMIC_LOAD (processing_suspended ) ?
461+ "yes" : "no" );
423462}
424463
425464void resume_queue (void )
426465{
427- processing_suspended = 0 ;
466+ AUDIT_ATOMIC_STORE ( processing_suspended , 0 ) ;
428467}
429468
430469void destroy_queue (void )
@@ -438,7 +477,7 @@ void destroy_queue(void)
438477 pthread_mutex_destroy (& queue_lock );
439478 sem_destroy (& queue_nonempty );
440479 if (persist_fd >= 0 ) {
441- if (currently_used == 0 ) {
480+ if (AUDIT_ATOMIC_LOAD ( currently_used ) == 0 ) {
442481 if (ftruncate (persist_fd , 0 ) < 0 ) {
443482 /* Log error but continue - cleanup is not critical */
444483 syslog (LOG_WARNING , "Failed to truncate persistent queue file" );
@@ -460,23 +499,23 @@ void destroy_queue(void)
460499 q_last = 0 ;
461500#endif
462501 q_depth = 0 ;
463- processing_suspended = 1 ;
464- currently_used = 0 ;
465- max_used = 0 ;
502+ AUDIT_ATOMIC_STORE ( processing_suspended , 1 ) ;
503+ AUDIT_ATOMIC_STORE ( currently_used , 0 ) ;
504+ AUDIT_ATOMIC_STORE ( max_used , 0 ) ;
466505 overflowed = 0 ;
467506}
468507
469508unsigned int queue_current_depth (void )
470509{
471- return currently_used ;
510+ return AUDIT_ATOMIC_LOAD ( currently_used ) ;
472511}
473512
474513unsigned int queue_max_depth (void )
475514{
476- return max_used ;
515+ return AUDIT_ATOMIC_LOAD ( max_used ) ;
477516}
478517
479518int queue_overflowed_p (void )
480519{
481- return overflowed ;
520+ return overflowed ;
482521}
0 commit comments