@@ -73,8 +73,11 @@ typedef pthread_mutex_t* MutexHandle;
7373#include " butil/strings/string_util.h"
7474#include " butil/strings/stringprintf.h"
7575#include " butil/strings/utf_string_conversions.h"
76- #include " butil/synchronization/lock .h"
76+ #include " butil/synchronization/condition_variable .h"
7777#include " butil/threading/platform_thread.h"
78+ #include " butil/threading/simple_thread.h"
79+ #include " butil/object_pool.h"
80+
7881#if defined(OS_POSIX)
7982#include " butil/errno.h"
8083#include " butil/fd_guard.h"
@@ -144,6 +147,17 @@ DEFINE_bool(log_year, false, "Log year in datetime part in each log");
144147
145148DEFINE_bool (log_func_name, false , " Log function name in each log" );
146149
150+ DEFINE_bool (async_log, false , " Use async log" );
151+
152+ DEFINE_bool (async_log_in_background_always, false , " Async log written in background always." );
153+
154+ DEFINE_int32 (max_async_log_queue_size, 100000 , " Max async log size. "
155+ " If current log count of async log > max_async_log_size, "
156+ " Use sync log to protect process." );
157+
158+ DEFINE_int32 (sleep_to_flush_async_log_s, 0 ,
159+ " If the value > 0, sleep before atexit to flush async log" );
160+
147161namespace {
148162
149163LoggingDestination logging_destination = LOG_DEFAULT;
@@ -399,8 +413,300 @@ void CloseLogFileUnlocked() {
399413 log_file = NULL ;
400414}
401415
416+ void Log2File (const std::string& log) {
417+ // We can have multiple threads and/or processes, so try to prevent them
418+ // from clobbering each other's writes.
419+ // If the client app did not call InitLogging, and the lock has not
420+ // been created do it now. We do this on demand, but if two threads try
421+ // to do this at the same time, there will be a race condition to create
422+ // the lock. This is why InitLogging should be called from the main
423+ // thread at the beginning of execution.
424+ LoggingLock::Init (LOCK_LOG_FILE, NULL );
425+ LoggingLock logging_lock;
426+ if (InitializeLogFileHandle ()) {
427+ #if defined(OS_WIN)
428+ SetFilePointer (log_file, 0 , 0 , SEEK_END);
429+ DWORD num_written;
430+ WriteFile (log_file,
431+ static_cast <const void *>(log.data ()),
432+ static_cast <DWORD>(log.size ()),
433+ &num_written,
434+ NULL );
435+ #else
436+ fwrite (log.data (), log.size (), 1 , log_file);
437+ fflush (log_file);
438+ #endif
439+ }
440+ }
441+
402442} // namespace
403443
444+ struct BAIDU_CACHELINE_ALIGNMENT LogRequest {
445+ static LogRequest* const UNCONNECTED;
446+
447+ LogRequest* next{NULL };
448+ std::string data;
449+ };
450+
451+ LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t )-1 ;
452+
453+ class AsyncLogger : public butil ::SimpleThread {
454+ public:
455+ static AsyncLogger* GetInstance ();
456+
457+ void Log (const std::string& log);
458+ void Log (std::string&& log);
459+ void StopAndJoin ();
460+
461+ private:
462+ friend struct DefaultSingletonTraits <AsyncLogger>;
463+
464+ static LogRequest _stop_req;
465+
466+ AsyncLogger ();
467+ ~AsyncLogger () override ;
468+
469+ static void AtExit () {
470+ GetInstance ()->StopAndJoin ();
471+ if (FLAGS_sleep_to_flush_async_log_s > 0 ) {
472+ ::sleep (FLAGS_sleep_to_flush_async_log_s);
473+ }
474+ }
475+
476+ void LogImpl (LogRequest* log_req);
477+
478+ void Run () override ;
479+
480+ void LogTask (LogRequest* req);
481+
482+ bool IsLogComplete (LogRequest* old_head);
483+
484+ void DoLog (LogRequest* req);
485+ void DoLog (const std::string& log);
486+
487+ butil::atomic<LogRequest*> _log_head;
488+ butil::Mutex _mutex;
489+ butil::ConditionVariable _cond;
490+ LogRequest* _current_log_request;
491+ butil::atomic<int32_t > _log_request_count;
492+ butil::atomic<bool > _stop;
493+ };
494+
495+ AsyncLogger* AsyncLogger::GetInstance () {
496+ return Singleton<AsyncLogger,
497+ LeakySingletonTraits<AsyncLogger>>::get ();
498+ }
499+
500+ AsyncLogger::AsyncLogger ()
501+ : butil::SimpleThread(" async_log_thread" )
502+ , _log_head(NULL )
503+ , _cond(&_mutex)
504+ , _current_log_request(NULL )
505+ , _stop(false ) {
506+ Start ();
507+ // We need to stop async logger and
508+ // flush all async log before exit.
509+ atexit (AtExit);
510+ }
511+
512+ AsyncLogger::~AsyncLogger () {
513+ StopAndJoin ();
514+ }
515+
516+ void AsyncLogger::Log (const std::string& log) {
517+ if (log.empty ()) {
518+ return ;
519+ }
520+
521+ bool is_full = FLAGS_max_async_log_queue_size > 0 &&
522+ _log_request_count.fetch_add (1 , butil::memory_order_relaxed) >
523+ FLAGS_max_async_log_queue_size;
524+ if (is_full || _stop.load (butil::memory_order_relaxed)) {
525+ // Async logger is full or stopped, fallback to sync log.
526+ DoLog (log);
527+ return ;
528+ }
529+
530+ auto log_req = butil::get_object<LogRequest>();
531+ if (!log_req) {
532+ // Async log failed, fallback to sync log.
533+ DoLog (log);
534+ return ;
535+ }
536+ log_req->data = log;
537+ LogImpl (log_req);
538+ }
539+
540+ void AsyncLogger::Log (std::string&& log) {
541+ if (log.empty ()) {
542+ return ;
543+ }
544+
545+ bool is_full = FLAGS_max_async_log_queue_size > 0 &&
546+ _log_request_count.fetch_add (1 , butil::memory_order_relaxed) >
547+ FLAGS_max_async_log_queue_size;
548+ if (is_full || _stop.load (butil::memory_order_relaxed)) {
549+ // Async logger is full or stopped, fallback to sync log.
550+ DoLog (log);
551+ return ;
552+ }
553+
554+ auto log_req = butil::get_object<LogRequest>();
555+ if (!log_req) {
556+ // Async log failed, fallback to sync log.
557+ DoLog (log);
558+ return ;
559+ }
560+ log_req->data = std::move (log);
561+ LogImpl (log_req);
562+ }
563+
564+ void AsyncLogger::LogImpl (LogRequest* log_req) {
565+ log_req->next = LogRequest::UNCONNECTED;
566+ // Release fence makes sure the thread getting request sees *req
567+ LogRequest* const prev_head =
568+ _log_head.exchange (log_req, butil::memory_order_release);
569+ if (prev_head != NULL ) {
570+ // Someone is logging. The async_log_thread thread may spin
571+ // until req->next to be non-UNCONNECTED. This process is not
572+ // lock-free, but the duration is so short(1~2 instructions,
573+ // depending on compiler) that the spin rarely occurs in practice
574+ // (I've not seen any spin in highly contended tests).
575+ log_req->next = prev_head;
576+ return ;
577+ }
578+ // We've got the right to write.
579+ log_req->next = NULL ;
580+
581+ if (!FLAGS_async_log_in_background_always) {
582+ // Use sync log for the LogRequest
583+ // which has got the right to write.
584+ DoLog (log_req);
585+ // Return when there's no more LogRequests.
586+ if (IsLogComplete (log_req)) {
587+ butil::return_object (log_req);
588+ return ;
589+ }
590+ }
591+
592+ BAIDU_SCOPED_LOCK (_mutex);
593+ if (_stop.load (butil::memory_order_relaxed)) {
594+ // Async logger is stopped, fallback to sync log.
595+ LogTask (log_req);
596+ } else {
597+ // Wake up async logger.
598+ _current_log_request = log_req;
599+ _cond.Signal ();
600+ }
601+ }
602+
603+ void AsyncLogger::StopAndJoin () {
604+ if (!_stop.exchange (true , butil::memory_order_relaxed)) {
605+ BAIDU_SCOPED_LOCK (_mutex);
606+ _cond.Signal ();
607+ }
608+ if (!HasBeenJoined ()) {
609+ Join ();
610+ }
611+ }
612+
613+ void AsyncLogger::Run () {
614+ while (true ) {
615+ BAIDU_SCOPED_LOCK (_mutex);
616+ while (!_stop.load (butil::memory_order_relaxed) &&
617+ !_current_log_request) {
618+ _cond.Wait ();
619+ }
620+ if (_stop.load (butil::memory_order_relaxed) &&
621+ !_current_log_request) {
622+ break ;
623+ }
624+
625+ LogTask (_current_log_request);
626+ _current_log_request = NULL ;
627+ }
628+ }
629+
630+ void AsyncLogger::LogTask (LogRequest* req) {
631+ do {
632+ // req was logged, skip it.
633+ if (req->next != NULL && req->data .empty ()) {
634+ LogRequest* const saved_req = req;
635+ req = req->next ;
636+ butil::return_object (saved_req);
637+ }
638+
639+ // Log all requests to file.
640+ while (req->next != NULL ) {
641+ LogRequest* const saved_req = req;
642+ req = req->next ;
643+ if (!saved_req->data .empty ()) {
644+ DoLog (saved_req);
645+ }
646+ // Release LogRequests until last request.
647+ butil::return_object (saved_req);
648+ }
649+ if (!req->data .empty ()) {
650+ DoLog (req);
651+ }
652+
653+ // Return when there's no more LogRequests.
654+ if (IsLogComplete (req)) {
655+ butil::return_object (req);
656+ return ;
657+ }
658+ } while (true );
659+ }
660+
661+ bool AsyncLogger::IsLogComplete (LogRequest* old_head) {
662+ if (old_head->next ) {
663+ fprintf (stderr, " old_head->next should be NULL\n " );
664+ }
665+ LogRequest* new_head = old_head;
666+ LogRequest* desired = NULL ;
667+ if (_log_head.compare_exchange_strong (
668+ new_head, desired, butil::memory_order_acquire)) {
669+ // No one added new requests.
670+ return true ;
671+ }
672+ if (new_head == old_head) {
673+ fprintf (stderr, " new_head should not be equal to old_head\n " );
674+ }
675+ // Above acquire fence pairs release fence of exchange in Log() to make
676+ // sure that we see all fields of requests set.
677+
678+ // Someone added new requests.
679+ // Reverse the list until old_head.
680+ LogRequest* tail = NULL ;
681+ LogRequest* p = new_head;
682+ do {
683+ while (p->next == LogRequest::UNCONNECTED) {
684+ sched_yield ();
685+ }
686+ LogRequest* const saved_next = p->next ;
687+ p->next = tail;
688+ tail = p;
689+ p = saved_next;
690+ if (!p) {
691+ fprintf (stderr, " p should not be NULL\n " );
692+ }
693+ } while (p != old_head);
694+
695+ // Link old list with new list.
696+ old_head->next = tail;
697+ return false ;
698+ }
699+
700+ void AsyncLogger::DoLog (LogRequest* req) {
701+ DoLog (req->data );
702+ req->data .clear ();
703+ }
704+
705+ void AsyncLogger::DoLog (const std::string& log) {
706+ Log2File (log);
707+ _log_request_count.fetch_sub (1 );
708+ }
709+
404710LoggingSettings::LoggingSettings ()
405711 : logging_dest(LOG_DEFAULT),
406712 log_file (NULL ),
@@ -473,7 +779,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
473779 const char * file, int line,
474780 const char * func) {
475781 PrintLogSeverity (os, severity);
476- #if defined(OS_LINUX)
782+ #if defined(OS_LINUX) || defined(OS_MACOSX)
477783 timeval tv;
478784 gettimeofday (&tv, NULL );
479785 time_t t = tv.tv_sec ;
@@ -495,7 +801,7 @@ void PrintLogPrefix(std::ostream& os, int severity,
495801 << std::setw (2 ) << local_tm.tm_hour << ' :'
496802 << std::setw (2 ) << local_tm.tm_min << ' :'
497803 << std::setw (2 ) << local_tm.tm_sec ;
498- #if defined(OS_LINUX)
804+ #if defined(OS_LINUX) || defined(OS_MACOSX)
499805 os << ' .' << std::setw (6 ) << tv.tv_usec ;
500806#endif
501807 if (FLAGS_log_pid) {
@@ -957,35 +1263,17 @@ class DefaultLogSink : public LogSink {
9571263
9581264 // write to log file
9591265 if ((logging_destination & LOG_TO_FILE) != 0 ) {
960- // We can have multiple threads and/or processes, so try to prevent them
961- // from clobbering each other's writes.
962- // If the client app did not call InitLogging, and the lock has not
963- // been created do it now. We do this on demand, but if two threads try
964- // to do this at the same time, there will be a race condition to create
965- // the lock. This is why InitLogging should be called from the main
966- // thread at the beginning of execution.
967- LoggingLock::Init (LOCK_LOG_FILE, NULL );
968- LoggingLock logging_lock;
969- if (InitializeLogFileHandle ()) {
970- #if defined(OS_WIN)
971- SetFilePointer (log_file, 0 , 0 , SEEK_END);
972- DWORD num_written;
973- WriteFile (log_file,
974- static_cast <const void *>(log.data ()),
975- static_cast <DWORD>(log.size ()),
976- &num_written,
977- NULL );
978- #else
979- fwrite (log.data (), log.size (), 1 , log_file);
980- fflush (log_file);
981- #endif
1266+ if (FLAGS_async_log) {
1267+ AsyncLogger::GetInstance ()->Log (std::move (log));
1268+ } else {
1269+ Log2File (log);
9821270 }
9831271 }
9841272 return true ;
9851273 }
9861274private:
987- DefaultLogSink () {}
988- ~DefaultLogSink () {}
1275+ DefaultLogSink () = default ;
1276+ ~DefaultLogSink () override = default ;
9891277friend struct DefaultSingletonTraits <DefaultLogSink>;
9901278};
9911279
0 commit comments