Skip to content

Commit 0a53dd2

Browse files
author
lhh
committed
Refactor bthread span lifecycle management and optimize span API with smart pointer reuse (#3068)
1 parent dc8f2e6 commit 0a53dd2

11 files changed

Lines changed: 96 additions & 65 deletions

File tree

docs/cn/rpcz.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,14 @@ bthread_attr_t attr = { BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL };
6666
bthread_start_urgent(&tid, &attr, thread_proc, arg);
6767
```
6868

69-
注意:使用这种方式创建子bthread来发送rpc,请确保rpc在server返回response之前完成,否则可能导致使用被释放的Span对象而出core。
69+
### Span生命周期管理
70+
71+
brpc使用智能指针(`std::shared_ptr`/`std::weak_ptr`)管理Span对象的生命周期,并通过自旋锁保护并发访问,解决了以下问题:
72+
73+
1. **Use-after-free防护**:父Span通过`shared_ptr`持有子Span的强引用,TLS中使用`weak_ptr`存储,确保Span对象在被访问时仍然有效。即使server在子bthread完成前返回response,也不会导致访问已释放的Span对象。
74+
75+
2. **线程安全**:使用自旋锁保护`_client_list``_info`的并发修改,支持多个bthread同时创建子span或添加annotation。
76+
77+
3. **自动生命周期管理**:当父Span销毁时,会自动清理所有子Span(通过`_client_list.clear()`),无需手动管理。
78+
79+
使用`BTHREAD_INHERIT_SPAN`创建子bthread时,不再需要担心Span对象的生命周期问题,可以安全地在异步场景中使用。

src/brpc/builtin/rpcz_service.cpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,15 @@ static void PrintAnnotations(
211211
PrintElapse(os, anno_time, last_time);
212212
os << ' ';
213213
if (span) {
214-
os << '[' << span_type_str << ' ' << SPAN_ID_STR << '=' << Hex(span->span_id()) << "] ";
214+
const char* short_type = "SPAN";
215+
if (span->type() == SPAN_TYPE_SERVER) {
216+
short_type = "Server";
217+
} else if (span->type() == SPAN_TYPE_CLIENT) {
218+
short_type = "Client";
219+
} else if (span->type() == SPAN_TYPE_BTHREAD) {
220+
short_type = "Bthread";
221+
}
222+
os << '[' << short_type << " SPAN#" << Hex(span->span_id()) << "] ";
215223
}
216224
os << WebEscape(a);
217225
if (a.empty() || butil::back_char(a) != '\n') {
@@ -292,11 +300,11 @@ static void PrintClientSpan(
292300

293301
if (PrintAnnotationsAndRealTimeSpan(os, span.sent_real_us(),
294302
last_time, extr, num_extr, &span)) {
295-
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Requested(" << span.request_size() << ") [1]" << std::endl;
303+
os << " [Client SPAN#" << Hex(span.span_id()) << "] Requested(" << span.request_size() << ") [1]" << std::endl;
296304
}
297305
if (PrintAnnotationsAndRealTimeSpan(os, span.received_real_us(),
298306
last_time, extr, num_extr, &span)) {
299-
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Received response(" << span.response_size() << ")";
307+
os << " [Client SPAN#" << Hex(span.span_id()) << "] Received response(" << span.response_size() << ")";
300308
if (span.base_cid() != 0 && span.ending_cid() != 0) {
301309
int64_t ver = span.ending_cid() - span.base_cid();
302310
if (ver >= 1) {
@@ -310,13 +318,13 @@ static void PrintClientSpan(
310318

311319
if (PrintAnnotationsAndRealTimeSpan(os, span.start_parse_real_us(),
312320
last_time, extr, num_extr, &span)) {
313-
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Processing the response in a new bthread" << std::endl;
321+
os << " [Client SPAN#" << Hex(span.span_id()) << "] Processing the response in a new bthread" << std::endl;
314322
}
315323

316324
if (PrintAnnotationsAndRealTimeSpan(
317325
os, span.start_callback_real_us(),
318326
last_time, extr, num_extr, &span)) {
319-
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] " << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl;
327+
os << " [Client SPAN#" << Hex(span.span_id()) << "] " << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl;
320328
}
321329

322330
PrintAnnotations(os, std::numeric_limits<int64_t>::max(),
@@ -340,9 +348,9 @@ static void PrintBthreadSpan(std::ostream& os, const RpczSpan& span, int64_t* la
340348
extr[num_extr++] = &client_extr;
341349

342350
// Print span id for bthread span context identification
343-
os << " [BthreadSpan " << SPAN_ID_STR << '=' << Hex(span.span_id());
351+
os << " [Bthread SPAN#" << Hex(span.span_id());
344352
if (span.parent_span_id() != 0) {
345-
os << " parent_span=" << Hex(span.parent_span_id());
353+
os << " parent#" << Hex(span.parent_span_id());
346354
}
347355
os << "] ";
348356

@@ -377,15 +385,15 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
377385
if (PrintAnnotationsAndRealTimeSpan(
378386
os, span.start_parse_real_us(),
379387
&last_time, extr, ARRAY_SIZE(extr), &span)) {
380-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Processing the request in a new bthread" << std::endl;
388+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Processing the request in a new bthread" << std::endl;
381389
}
382390

383391
bool entered_user_method = false;
384392
if (PrintAnnotationsAndRealTimeSpan(
385393
os, span.start_callback_real_us(),
386394
&last_time, extr, ARRAY_SIZE(extr), &span)) {
387395
entered_user_method = true;
388-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Enter " << WebEscape(span.full_method_name()) << std::endl;
396+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Enter " << WebEscape(span.full_method_name()) << std::endl;
389397
}
390398

391399
const int nclient = span.client_spans_size();
@@ -402,16 +410,16 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
402410
os, span.start_send_real_us(),
403411
&last_time, extr, ARRAY_SIZE(extr), &span)) {
404412
if (entered_user_method) {
405-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Leave " << WebEscape(span.full_method_name()) << std::endl;
413+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Leave " << WebEscape(span.full_method_name()) << std::endl;
406414
} else {
407-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Responding" << std::endl;
415+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Responding" << std::endl;
408416
}
409417
}
410418

411419
if (PrintAnnotationsAndRealTimeSpan(
412420
os, span.sent_real_us(),
413421
&last_time, extr, ARRAY_SIZE(extr), &span)) {
414-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Responded(" << span.response_size() << ')' << std::endl;
422+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Responded(" << span.response_size() << ')' << std::endl;
415423
}
416424

417425
PrintAnnotations(os, std::numeric_limits<int64_t>::max(),

src/brpc/channel.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
515515
method_name = NULL_METHOD_STR;
516516
}
517517
std::shared_ptr<Span> span = Span::CreateClientSpan(
518-
*method_name, start_send_real_us - start_send_us);
518+
method_name, start_send_real_us - start_send_us);
519519
if (span) {
520520
ControllerPrivateAccessor accessor(cntl);
521521
span->set_log_id(cntl->log_id());

src/brpc/controller.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,7 @@ uint64_t Controller::trace_id() const {
14051405
}
14061406
return 0;
14071407
}
1408+
14081409
uint64_t Controller::span_id() const {
14091410
if (auto span = _span.lock()) {
14101411
return span->span_id();
@@ -1737,4 +1738,24 @@ void Controller::DoPrintLogPrefix(std::ostream& os) const {
17371738
}
17381739
}
17391740

1741+
1742+
ControllerPrivateAccessor& ControllerPrivateAccessor::set_span(
1743+
const std::shared_ptr<Span>& span) {
1744+
_cntl->_span = span;
1745+
return *this;
1746+
}
1747+
1748+
ControllerPrivateAccessor& ControllerPrivateAccessor::set_span(Span* span) {
1749+
if (span) {
1750+
_cntl->_span = span->shared_from_this();
1751+
} else {
1752+
_cntl->_span.reset();
1753+
}
1754+
return *this;
1755+
}
1756+
1757+
std::shared_ptr<Span> ControllerPrivateAccessor::span() const {
1758+
return _cntl->_span.lock();
1759+
}
1760+
17401761
} // namespace brpc

src/brpc/details/controller_private_accessor.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ class Message;
3030
}
3131
}
3232

33-
3433
namespace brpc {
3534

35+
class Span;
36+
3637
class AuthContext;
3738

3839
// A wrapper to access some private methods/fields of `Controller'
@@ -90,17 +91,16 @@ class ControllerPrivateAccessor {
9091
return *this;
9192
}
9293

93-
ControllerPrivateAccessor &set_span(std::shared_ptr<Span> span) {
94-
_cntl->_span = span;
95-
return *this;
96-
}
94+
// Overloaded set_span methods to support both shared_ptr and raw pointer
95+
ControllerPrivateAccessor &set_span(const std::shared_ptr<Span>& span);
96+
ControllerPrivateAccessor &set_span(Span* span);
9797

9898
ControllerPrivateAccessor &set_request_protocol(ProtocolType protocol) {
9999
_cntl->_request_protocol = protocol;
100100
return *this;
101101
}
102102

103-
std::shared_ptr<Span> span() const { return _cntl->_span.lock(); }
103+
std::shared_ptr<Span> span() const;
104104

105105
uint32_t pipelined_count() const { return _cntl->_pipelined_count; }
106106
void set_pipelined_count(uint32_t count) { _cntl->_pipelined_count = count; }

src/brpc/policy/couchbase_protocol.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ void ProcessCouchbaseResponse(InputMessageBase* msg_base) {
160160
}
161161

162162
ControllerPrivateAccessor accessor(cntl);
163-
Span* span = accessor.span();
164-
if (span) {
163+
if (auto span = accessor.span()) {
165164
span->set_base_real_us(msg->base_real_us());
166165
span->set_received_us(msg->received_us());
167166
span->set_response_size(msg->meta.length());

src/brpc/span.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,11 @@ void SetTlsParentSpan(std::shared_ptr<Span> span) {
7373
using namespace bthread;
7474
LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
7575
if (ls.rpcz_parent_span) {
76-
delete static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span);
76+
*static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span) = span;
77+
} else {
78+
ls.rpcz_parent_span = new std::weak_ptr<Span>(span);
79+
BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls);
7780
}
78-
ls.rpcz_parent_span = new std::weak_ptr<Span>(span);
79-
BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls);
8081
}
8182

8283
std::shared_ptr<Span> GetTlsParentSpan() {
@@ -94,9 +95,7 @@ void ClearTlsParentSpan() {
9495
using namespace bthread;
9596
LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
9697
if (ls.rpcz_parent_span) {
97-
delete static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span);
98-
ls.rpcz_parent_span = nullptr;
99-
BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls);
98+
static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span)->reset();
10099
}
101100
}
102101

src/brpc/span.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ friend class SpanContainer;
252252

253253
class SpanContainer : public bvar::Collected {
254254
public:
255-
explicit SpanContainer(std::shared_ptr<Span> span) : _span(span) {}
255+
explicit SpanContainer(const std::shared_ptr<Span>& span) : _span(span) {}
256256
~SpanContainer() {}
257257

258258
// Implementations of bvar::Collected

src/bthread/bthread.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,21 @@ uint64_t bthread_cpu_clock_ns(void) {
659659
return 0;
660660
}
661661

662+
int bthread_set_span_funcs(bthread_create_span_fn create_fn,
663+
bthread_destroy_span_fn destroy_fn,
664+
bthread_end_span_fn end_fn) {
665+
if ((create_fn && destroy_fn && end_fn) ||
666+
(!create_fn && !destroy_fn && !end_fn)) {
667+
bthread::g_create_bthread_span = create_fn;
668+
bthread::g_rpcz_parent_span_dtor = destroy_fn;
669+
bthread::g_end_bthread_span = end_fn;
670+
return 0;
671+
}
672+
673+
errno = EINVAL;
674+
return -1;
675+
}
676+
662677
} // extern "C"
663678

664679
void bthread_attr_set_name(bthread_attr_t* attr, const char* name) {

src/bthread/key.cpp

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,6 @@
3535

3636
namespace bthread {
3737

38-
void* (*g_create_bthread_span)() = NULL;
39-
40-
void (*g_rpcz_parent_span_dtor)(void*) = NULL;
41-
42-
void (*g_end_bthread_span)() = NULL;
43-
4438
DEFINE_uint32(key_table_list_size, 4000,
4539
"The maximum length of the KeyTableList. Once this value is "
4640
"exceeded, a portion of the KeyTables will be moved to the "
@@ -252,11 +246,6 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
252246
if (g) {
253247
g->current_task()->local_storage.keytable = old_kt;
254248
}
255-
256-
if (tls_bls.rpcz_parent_span && g_rpcz_parent_span_dtor) {
257-
g_rpcz_parent_span_dtor(tls_bls.rpcz_parent_span);
258-
tls_bls.rpcz_parent_span = NULL;
259-
}
260249
}
261250

262251
void append(KeyTable* keytable) {
@@ -417,11 +406,6 @@ static void cleanup_pthread(void* arg) {
417406
delete kt;
418407
// After deletion: tls may be set during deletion.
419408
tls_bls.keytable = NULL;
420-
421-
if (tls_bls.rpcz_parent_span && g_rpcz_parent_span_dtor) {
422-
g_rpcz_parent_span_dtor(tls_bls.rpcz_parent_span);
423-
tls_bls.rpcz_parent_span = NULL;
424-
}
425409
}
426410
}
427411

@@ -500,11 +484,6 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) {
500484
if (g) {
501485
g->current_task()->local_storage.keytable = old_kt;
502486
}
503-
504-
if (bthread::tls_bls.rpcz_parent_span && bthread::g_rpcz_parent_span_dtor) {
505-
bthread::g_rpcz_parent_span_dtor(bthread::tls_bls.rpcz_parent_span);
506-
bthread::tls_bls.rpcz_parent_span = NULL;
507-
}
508487
// TODO: return_keytable may race with this function, we don't destroy
509488
// the mutex right now.
510489
// pthread_mutex_destroy(&pool->mutex);
@@ -694,19 +673,4 @@ void* bthread_get_assigned_data() {
694673
return bthread::tls_bls.assigned_data;
695674
}
696675

697-
int bthread_set_span_funcs(bthread_create_span_fn create_fn,
698-
bthread_destroy_span_fn destroy_fn,
699-
bthread_end_span_fn end_fn) {
700-
if ((create_fn && destroy_fn && end_fn) ||
701-
(!create_fn && !destroy_fn && !end_fn)) {
702-
bthread::g_create_bthread_span = create_fn;
703-
bthread::g_rpcz_parent_span_dtor = destroy_fn;
704-
bthread::g_end_bthread_span = end_fn;
705-
return 0;
706-
}
707-
708-
errno = EINVAL;
709-
return -1;
710-
}
711-
712676
} // extern "C"

0 commit comments

Comments
 (0)