@@ -69,6 +69,8 @@ void StfSenderOutput::start(std::shared_ptr<ConsulStfSender> pDiscoveryConfig)
6969
7070 // create stf ordering thread
7171 mStfOrderThread = create_thread_member (" stfs_order" , &StfSenderOutput::StfOrderThread, this );
72+ // create stf alloc thread
73+ mCopyAllocThread = create_thread_member (" stfs_alloc" , &StfSenderOutput::StfCopyAllocThread, this );
7274
7375 // create stf copy thread
7476 for (auto i = 0 ; i < 8 ; i++) {
@@ -125,6 +127,9 @@ void StfSenderOutput::start_standalone(std::shared_ptr<ConsulStfSender> pDiscove
125127 // create stf ordering thread
126128 mStfOrderThread = create_thread_member (" stfs_order" , &StfSenderOutput::StfOrderThread, this );
127129
130+ // create stf alloc thread
131+ mCopyAllocThread = create_thread_member (" stfs_alloc" , &StfSenderOutput::StfCopyAllocThread, this );
132+
128133 // create stf copy thread
129134 for (auto i = 0 ; i < 8 ; i++) {
130135 mCopyThreads .emplace_back (create_thread_member (" stfs_copy" , &StfSenderOutput::StfCopyThread, this ));
@@ -143,13 +148,19 @@ void StfSenderOutput::stop()
143148 mRunning = false ;
144149 mDropQueue .stop ();
145150 mScheduleQueue .stop ();
151+ mCopyAllocQueue .stop ();
146152 mCopyQueue .stop ();
147153
148154 // stop the stf ordering: on pipeline interrupt
149155 if (mStfOrderThread .joinable ()) {
150156 mStfOrderThread .join ();
151157 }
152158
159+ // stop the stf copy alloc threqad
160+ if (mCopyAllocThread .joinable ()) {
161+ mCopyAllocThread .join ();
162+ }
163+
153164 // stop copy threads
154165 for (auto &lThread : mCopyThreads ) {
155166 if (lThread.joinable ()) {
@@ -287,7 +298,12 @@ void StfSenderOutput::StfKeepThread()
287298 }
288299
289300 if (lStfOpt) {
290- std::unique_ptr<SubTimeFrame> lStf = std::move (lStfOpt.value ());
301+ std::unique_ptr<SubTimeFrame> lStf = std::move (lStfOpt.value ().mStf );
302+
303+ if (lStfOpt.value ().mMemoryPressure ) {
304+ // release immediately if we could not copy
305+ continue ;
306+ }
291307
292308 const auto lStfId = lStf->id ();
293309 const auto lStfSize = lStf->getDataSize ();
@@ -313,45 +329,77 @@ void StfSenderOutput::StfOrderThread()
313329{
314330 std::unique_ptr<SubTimeFrame> lStf;
315331 while ((lStf = mPipelineI .dequeue (eSenderIn)) != nullptr ) {
316-
317332 DDDLOG_RL (1000 , " StfOrderThread: receiving {}" , lStf->id () );
318333
319- std::unique_lock lOrderLock (mScheduledStfMapLock );
320- mStfOrderingQueue .push (lStf->id ());
321- // push the original Stf for scheduling
322- mCopyQueue .push (std::move (lStf));
334+ if (mStfCopyBuilder ) {
335+ std::unique_lock lOrderLock (mScheduledStfMapLock );
336+ mStfOrderingQueue .push (lStf->id ());
337+ // push the original Stf for allocation copy
338+ mCopyAllocQueue .push (std::move (lStf));
339+ } else {
340+ // no copying
341+ StfSchedInfo lSchedInfo;
342+ lSchedInfo.mStf = std::move (lStf);
343+ lSchedInfo.mMemoryPressure = false ;
344+ mScheduleQueue .push (std::move (lSchedInfo));
345+ }
323346 }
324347 DDDLOG (" StfOrderThread: Exiting." );
325348}
326349
327- void StfSenderOutput::StfCopyThread ()
350+ void StfSenderOutput::StfCopyAllocThread ()
328351{
329352 while (mRunning ) {
330353 std::unique_ptr<SubTimeFrame> lStf;
331- if (!mCopyQueue .pop (lStf)) {
354+ if (!mCopyAllocQueue .pop (lStf)) {
332355 break ;
333356 }
334357
335- if (mStfCopyBuilder ) {
336- // copy stf
337- if (!mStfCopyBuilder ->copyStfData (lStf)) {
338- DDMON (" stfsender" , " stf_region.full" , 1 );
339- }
358+ StfCopyInfo lStfCopyInfo;
359+
360+ // allocate memory for the stf
361+ assert (mStfCopyBuilder );
362+ mStfCopyBuilder ->allocNewStfData (lStf, lStfCopyInfo.mLinkBuffers );
363+
364+ lStfCopyInfo.mStf = std::move (lStf);
365+ mCopyQueue .push (std::move (lStfCopyInfo));
366+ }
367+ DDDLOG (" StfCopyAlloc: Exiting." );
368+ }
369+
370+ void StfSenderOutput::StfCopyThread ()
371+ {
372+ while (mRunning ) {
373+ StfCopyInfo lStfCopyInfo;
374+ if (!mCopyQueue .pop (lStfCopyInfo)) {
375+ break ;
376+ }
377+
378+ assert (mStfCopyBuilder );
379+
380+ // copy stf
381+ const bool lCopyOk = mStfCopyBuilder ->copyStfData (lStfCopyInfo.mStf , lStfCopyInfo.mLinkBuffers );
382+ if (!lCopyOk) {
383+ // Copy region is full or fragmented
384+ DDMON (" stfsender" , " stf_region.full" , 1 );
340385 }
341386
342387 // wait for our turn to schedule
343388 while (mRunning ) {
344389 std::unique_lock lOrderLock (mScheduledStfMapLock );
345390 assert (!mStfOrderingQueue .empty ());
346- assert (mStfOrderingQueue .front () <= lStf ->id ());
391+ assert (mStfOrderingQueue .front () <= lStfCopyInfo. mStf ->id ());
347392
348- if (mStfOrderingQueue .front () != lStf ->id ()) {
393+ if (mStfOrderingQueue .front () != lStfCopyInfo. mStf ->id ()) {
349394 mStfOrderingCv .wait_for (lOrderLock, 10ms);
350395 continue ;
351396 } else {
352397 mStfOrderingQueue .pop ();
353398 // push the original Stf for scheduling
354- mScheduleQueue .push (std::move (lStf));
399+ StfSchedInfo lSchedInfo;
400+ lSchedInfo.mMemoryPressure = !lCopyOk;
401+ lSchedInfo.mStf = std::move (lStfCopyInfo.mStf );
402+ mScheduleQueue .push (std::move (lSchedInfo));
355403 break ;
356404 }
357405 }
@@ -373,11 +421,13 @@ void StfSenderOutput::StfSchedulerThread()
373421#endif
374422
375423 while (mRunning ) {
376- std::unique_ptr<SubTimeFrame> lStf ;
377- if (!mScheduleQueue .pop (lStf )) {
424+ StfSchedInfo lSchedInfo ;
425+ if (!mScheduleQueue .pop (lSchedInfo )) {
378426 break ;
379427 }
380428
429+ std::unique_ptr<SubTimeFrame> lStf = std::move (lSchedInfo.mStf );
430+
381431 const auto lStfId = lStf->id ();
382432 const auto lStfSize = lStf->getDataSize ();
383433
@@ -420,7 +470,7 @@ void StfSenderOutput::StfSchedulerThread()
420470 lStfInfo.set_stf_size (lStfSize);
421471
422472 lStfInfo.mutable_stfs_info ()->set_buffer_size (mBufferSize );
423- lStfInfo.mutable_stfs_info ()->set_buffer_used (lCounters.mBuffered .mSize );
473+ lStfInfo.mutable_stfs_info ()->set_buffer_used (lSchedInfo. mMemoryPressure ? mBufferSize : lCounters.mBuffered .mSize );
424474 lStfInfo.mutable_stfs_info ()->set_num_buffered_stfs (lCounters.mBuffered .mCnt );
425475
426476 switch (lStf->header ().mOrigin ) {
@@ -487,8 +537,10 @@ void StfSenderOutput::StfSchedulerThread()
487537 mCounters .mValues .mSchedulerStfRejectedCnt += 1 ;
488538 }
489539
490- WDDLOG_RL (5000 , " TfScheduler rejected the Stf announce. stf_id={} reason={}" ,
491- lStfId, SchedulerStfInfoResponse_StfInfoStatus_Name (lSchedResponse.status ()));
540+ if (lSchedResponse.status () != SchedulerStfInfoResponse::DROP_STFS_THROTTLING) {
541+ WDDLOG_RL (5000 , " TfScheduler rejected the Stf announce. stf_id={} reason={}" ,
542+ lStfId, SchedulerStfInfoResponse_StfInfoStatus_Name (lSchedResponse.status ()));
543+ }
492544 }
493545 DDDLOG_RL (5000 , " StfSchedulerThread: Sent an STF announce. stf_id={} stf_size={}" , lStfId, lStfInfo.stf_size ());
494546 }
0 commit comments