Skip to content

Commit 528c898

Browse files
author
lhh
committed
Refactor bthread span lifecycle management and optimize span API with smart pointer reuse (#3068)
1 parent dc8f2e6 commit 528c898

11 files changed

Lines changed: 111 additions & 75 deletions

File tree

docs/cn/rpcz.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,14 @@ bthread_attr_t attr = { BTHREAD_STACKTYPE_NORMAL, BTHREAD_INHERIT_SPAN, NULL };
6666
bthread_start_urgent(&tid, &attr, thread_proc, arg);
6767
```
6868

69-
注意:使用这种方式创建子bthread来发送rpc,请确保rpc在server返回response之前完成,否则可能导致使用被释放的Span对象而出core。
69+
### Span生命周期管理
70+
71+
brpc使用智能指针(`std::shared_ptr`/`std::weak_ptr`)管理Span对象的生命周期,并通过自旋锁保护并发访问,解决了以下问题:
72+
73+
1. **Use-after-free防护**:父Span通过`shared_ptr`持有子Span的强引用,TLS中使用`weak_ptr`存储,确保Span对象在被访问时仍然有效。即使server在子bthread完成前返回response,也不会导致访问已释放的Span对象。
74+
75+
2. **线程安全**:使用自旋锁保护`_client_list``_info`的并发修改,支持多个bthread同时创建子span或添加annotation。
76+
77+
3. **自动生命周期管理**:当父Span销毁时,会自动清理所有子Span(通过`_client_list.clear()`),无需手动管理。
78+
79+
使用`BTHREAD_INHERIT_SPAN`创建子bthread时,不再需要担心Span对象的生命周期问题,可以安全地在异步场景中使用。

src/brpc/builtin/rpcz_service.cpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,15 @@ static void PrintAnnotations(
211211
PrintElapse(os, anno_time, last_time);
212212
os << ' ';
213213
if (span) {
214-
os << '[' << span_type_str << ' ' << SPAN_ID_STR << '=' << Hex(span->span_id()) << "] ";
214+
const char* short_type = "SPAN";
215+
if (span->type() == SPAN_TYPE_SERVER) {
216+
short_type = "Server";
217+
} else if (span->type() == SPAN_TYPE_CLIENT) {
218+
short_type = "Client";
219+
} else if (span->type() == SPAN_TYPE_BTHREAD) {
220+
short_type = "Bthread";
221+
}
222+
os << '[' << short_type << " SPAN#" << Hex(span->span_id()) << "] ";
215223
}
216224
os << WebEscape(a);
217225
if (a.empty() || butil::back_char(a) != '\n') {
@@ -292,11 +300,11 @@ static void PrintClientSpan(
292300

293301
if (PrintAnnotationsAndRealTimeSpan(os, span.sent_real_us(),
294302
last_time, extr, num_extr, &span)) {
295-
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Requested(" << span.request_size() << ") [1]" << std::endl;
303+
os << " [Client SPAN#" << Hex(span.span_id()) << "] Requested(" << span.request_size() << ") [1]" << std::endl;
296304
}
297305
if (PrintAnnotationsAndRealTimeSpan(os, span.received_real_us(),
298306
last_time, extr, num_extr, &span)) {
299-
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Received response(" << span.response_size() << ")";
307+
os << " [Client SPAN#" << Hex(span.span_id()) << "] Received response(" << span.response_size() << ")";
300308
if (span.base_cid() != 0 && span.ending_cid() != 0) {
301309
int64_t ver = span.ending_cid() - span.base_cid();
302310
if (ver >= 1) {
@@ -310,13 +318,13 @@ static void PrintClientSpan(
310318

311319
if (PrintAnnotationsAndRealTimeSpan(os, span.start_parse_real_us(),
312320
last_time, extr, num_extr, &span)) {
313-
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Processing the response in a new bthread" << std::endl;
321+
os << " [Client SPAN#" << Hex(span.span_id()) << "] Processing the response in a new bthread" << std::endl;
314322
}
315323

316324
if (PrintAnnotationsAndRealTimeSpan(
317325
os, span.start_callback_real_us(),
318326
last_time, extr, num_extr, &span)) {
319-
os << " [ClientSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] " << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl;
327+
os << " [Client SPAN#" << Hex(span.span_id()) << "] " << (span.async() ? " Enter user's done" : " Back to user's callsite") << std::endl;
320328
}
321329

322330
PrintAnnotations(os, std::numeric_limits<int64_t>::max(),
@@ -340,9 +348,9 @@ static void PrintBthreadSpan(std::ostream& os, const RpczSpan& span, int64_t* la
340348
extr[num_extr++] = &client_extr;
341349

342350
// Print span id for bthread span context identification
343-
os << " [BthreadSpan " << SPAN_ID_STR << '=' << Hex(span.span_id());
351+
os << " [Bthread SPAN#" << Hex(span.span_id());
344352
if (span.parent_span_id() != 0) {
345-
os << " parent_span=" << Hex(span.parent_span_id());
353+
os << " parent#" << Hex(span.parent_span_id());
346354
}
347355
os << "] ";
348356

@@ -377,15 +385,15 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
377385
if (PrintAnnotationsAndRealTimeSpan(
378386
os, span.start_parse_real_us(),
379387
&last_time, extr, ARRAY_SIZE(extr), &span)) {
380-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Processing the request in a new bthread" << std::endl;
388+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Processing the request in a new bthread" << std::endl;
381389
}
382390

383391
bool entered_user_method = false;
384392
if (PrintAnnotationsAndRealTimeSpan(
385393
os, span.start_callback_real_us(),
386394
&last_time, extr, ARRAY_SIZE(extr), &span)) {
387395
entered_user_method = true;
388-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Enter " << WebEscape(span.full_method_name()) << std::endl;
396+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Enter " << WebEscape(span.full_method_name()) << std::endl;
389397
}
390398

391399
const int nclient = span.client_spans_size();
@@ -402,16 +410,16 @@ static void PrintServerSpan(std::ostream& os, const RpczSpan& span,
402410
os, span.start_send_real_us(),
403411
&last_time, extr, ARRAY_SIZE(extr), &span)) {
404412
if (entered_user_method) {
405-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Leave " << WebEscape(span.full_method_name()) << std::endl;
413+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Leave " << WebEscape(span.full_method_name()) << std::endl;
406414
} else {
407-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Responding" << std::endl;
415+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Responding" << std::endl;
408416
}
409417
}
410418

411419
if (PrintAnnotationsAndRealTimeSpan(
412420
os, span.sent_real_us(),
413421
&last_time, extr, ARRAY_SIZE(extr), &span)) {
414-
os << " [ServerSpan " << SPAN_ID_STR << '=' << Hex(span.span_id()) << "] Responded(" << span.response_size() << ')' << std::endl;
422+
os << " [Server SPAN#" << Hex(span.span_id()) << "] Responded(" << span.response_size() << ')' << std::endl;
415423
}
416424

417425
PrintAnnotations(os, std::numeric_limits<int64_t>::max(),

src/brpc/channel.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
515515
method_name = NULL_METHOD_STR;
516516
}
517517
std::shared_ptr<Span> span = Span::CreateClientSpan(
518-
*method_name, start_send_real_us - start_send_us);
518+
method_name, start_send_real_us - start_send_us);
519519
if (span) {
520520
ControllerPrivateAccessor accessor(cntl);
521521
span->set_log_id(cntl->log_id());

src/brpc/controller.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,7 @@ uint64_t Controller::trace_id() const {
14051405
}
14061406
return 0;
14071407
}
1408+
14081409
uint64_t Controller::span_id() const {
14091410
if (auto span = _span.lock()) {
14101411
return span->span_id();
@@ -1737,4 +1738,24 @@ void Controller::DoPrintLogPrefix(std::ostream& os) const {
17371738
}
17381739
}
17391740

1741+
1742+
ControllerPrivateAccessor& ControllerPrivateAccessor::set_span(
1743+
const std::shared_ptr<Span>& span) {
1744+
_cntl->_span = span;
1745+
return *this;
1746+
}
1747+
1748+
ControllerPrivateAccessor& ControllerPrivateAccessor::set_span(Span* span) {
1749+
if (span) {
1750+
_cntl->_span = span->shared_from_this();
1751+
} else {
1752+
_cntl->_span.reset();
1753+
}
1754+
return *this;
1755+
}
1756+
1757+
std::shared_ptr<Span> ControllerPrivateAccessor::span() const {
1758+
return _cntl->_span.lock();
1759+
}
1760+
17401761
} // namespace brpc

src/brpc/details/controller_private_accessor.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ class Message;
3030
}
3131
}
3232

33-
3433
namespace brpc {
3534

35+
class Span;
36+
3637
class AuthContext;
3738

3839
// A wrapper to access some private methods/fields of `Controller'
@@ -90,17 +91,16 @@ class ControllerPrivateAccessor {
9091
return *this;
9192
}
9293

93-
ControllerPrivateAccessor &set_span(std::shared_ptr<Span> span) {
94-
_cntl->_span = span;
95-
return *this;
96-
}
94+
// Overloaded set_span methods to support both shared_ptr and raw pointer
95+
ControllerPrivateAccessor &set_span(const std::shared_ptr<Span>& span);
96+
ControllerPrivateAccessor &set_span(Span* span);
9797

9898
ControllerPrivateAccessor &set_request_protocol(ProtocolType protocol) {
9999
_cntl->_request_protocol = protocol;
100100
return *this;
101101
}
102102

103-
std::shared_ptr<Span> span() const { return _cntl->_span.lock(); }
103+
std::shared_ptr<Span> span() const;
104104

105105
uint32_t pipelined_count() const { return _cntl->_pipelined_count; }
106106
void set_pipelined_count(uint32_t count) { _cntl->_pipelined_count = count; }

src/brpc/policy/couchbase_protocol.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ void ProcessCouchbaseResponse(InputMessageBase* msg_base) {
160160
}
161161

162162
ControllerPrivateAccessor accessor(cntl);
163-
Span* span = accessor.span();
164-
if (span) {
163+
if (auto span = accessor.span()) {
165164
span->set_base_real_us(msg->base_real_us());
166165
span->set_received_us(msg->received_us());
167166
span->set_response_size(msg->meta.length());

src/brpc/span.cpp

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ void DestroyRpczParentSpan(void* ptr) {
6262
void EndBthreadSpan() {
6363
std::shared_ptr<Span> span = GetTlsParentSpan();
6464
if (span) {
65-
bthread_id_t id = {bthread_self()};
66-
span->set_ending_cid(id);
65+
span->set_ending_tid(bthread_self());
6766
}
6867

6968
ClearTlsParentSpan();
@@ -73,10 +72,11 @@ void SetTlsParentSpan(std::shared_ptr<Span> span) {
7372
using namespace bthread;
7473
LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
7574
if (ls.rpcz_parent_span) {
76-
delete static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span);
75+
*static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span) = span;
76+
} else {
77+
ls.rpcz_parent_span = new std::weak_ptr<Span>(span);
78+
BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls);
7779
}
78-
ls.rpcz_parent_span = new std::weak_ptr<Span>(span);
79-
BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls);
8080
}
8181

8282
std::shared_ptr<Span> GetTlsParentSpan() {
@@ -94,9 +94,7 @@ void ClearTlsParentSpan() {
9494
using namespace bthread;
9595
LocalStorage ls = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls);
9696
if (ls.rpcz_parent_span) {
97-
delete static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span);
98-
ls.rpcz_parent_span = nullptr;
99-
BAIDU_SET_VOLATILE_THREAD_LOCAL(tls_bls, ls);
97+
static_cast<std::weak_ptr<Span>*>(ls.rpcz_parent_span)->reset();
10098
}
10199
}
102100

@@ -122,9 +120,6 @@ void SpanDeleter::operator()(Span* r) const {
122120
// children.
123121
r->_client_list.clear();
124122
r->_info.clear();
125-
// Destroy the spinlocks, as the destructor might not be invoked.
126-
pthread_spin_destroy(&r->_client_list_spinlock);
127-
pthread_spin_destroy(&r->_info_spinlock);
128123
butil::return_object(r);
129124
}
130125

@@ -200,7 +195,8 @@ Span::Span(Forbidden) {
200195
}
201196

202197
Span::~Span() {
203-
// The destruction of the spinlock has been handled in SpanDeleter.
198+
pthread_spin_destroy(&_client_list_spinlock);
199+
pthread_spin_destroy(&_info_spinlock);
204200
}
205201

206202
std::shared_ptr<Span> Span::CreateClientSpan(const std::string& full_method_name,
@@ -212,7 +208,7 @@ std::shared_ptr<Span> Span::CreateClientSpan(const std::string& full_method_name
212208
std::shared_ptr<Span> span(span_raw, SpanDeleter());
213209
span->_log_id = 0;
214210
span->_base_cid = INVALID_BTHREAD_ID;
215-
span->_ending_cid = INVALID_BTHREAD_ID;
211+
span->_ending_cid = INVALID_BTHREAD_ID; // Client Span uses ending_cid
216212
span->_type = SPAN_TYPE_CLIENT;
217213
span->_async = false;
218214
span->_protocol = PROTOCOL_UNKNOWN;
@@ -254,7 +250,7 @@ std::shared_ptr<Span> Span::CreateBthreadSpan(const std::string& full_method_nam
254250
std::shared_ptr<Span> span(span_raw, SpanDeleter());
255251
span->_log_id = 0;
256252
span->_base_cid = INVALID_BTHREAD_ID;
257-
span->_ending_cid = INVALID_BTHREAD_ID;
253+
span->_ending_tid = INVALID_BTHREAD; // Bthread Span uses ending_tid
258254
span->_type = SPAN_TYPE_BTHREAD;
259255
span->_async = false;
260256
span->_protocol = PROTOCOL_UNKNOWN;
@@ -307,7 +303,7 @@ std::shared_ptr<Span> Span::CreateServerSpan(
307303
span->_parent_span_id = parent_span_id;
308304
span->_log_id = 0;
309305
span->_base_cid = INVALID_BTHREAD_ID;
310-
span->_ending_cid = INVALID_BTHREAD_ID;
306+
span->_ending_cid = INVALID_BTHREAD_ID; // Server Span uses ending_cid
311307
span->_type = SPAN_TYPE_SERVER;
312308
span->_async = false;
313309
span->_protocol = PROTOCOL_UNKNOWN;

src/brpc/span.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ friend class SpanContainer;
125125
void set_log_id(uint64_t cid) { _log_id = cid; }
126126
void set_base_cid(bthread_id_t id) { _base_cid = id; }
127127
void set_ending_cid(bthread_id_t id) { _ending_cid = id; }
128+
void set_ending_tid(bthread_t tid) { _ending_tid = tid; }
128129
void set_remote_side(const butil::EndPoint& pt) { _remote_side = pt; }
129130
void set_protocol(ProtocolType p) { _protocol = p; }
130131
void set_error_code(int error_code) { _error_code = error_code; }
@@ -144,7 +145,12 @@ friend class SpanContainer;
144145
void set_sent_us(int64_t tm)
145146
{ _sent_real_us = tm + _base_real_us; }
146147

147-
bool is_active() const { return _ending_cid == INVALID_BTHREAD_ID; }
148+
bool is_active() const {
149+
if (_type == SPAN_TYPE_BTHREAD) {
150+
return _ending_tid == INVALID_BTHREAD;
151+
}
152+
return _ending_cid == INVALID_BTHREAD_ID;
153+
}
148154

149155
std::weak_ptr<Span> local_parent() const { return _local_parent; }
150156
static std::shared_ptr<Span> tls_parent() {
@@ -161,6 +167,7 @@ friend class SpanContainer;
161167
uint64_t log_id() const { return _log_id; }
162168
bthread_id_t base_cid() const { return _base_cid; }
163169
bthread_id_t ending_cid() const { return _ending_cid; }
170+
bthread_t ending_tid() const { return _ending_tid; }
164171
const butil::EndPoint& remote_side() const { return _remote_side; }
165172
SpanType type() const { return _type; }
166173
ProtocolType protocol() const { return _protocol; }
@@ -215,6 +222,7 @@ friend class SpanContainer;
215222
uint64_t _log_id;
216223
bthread_id_t _base_cid;
217224
bthread_id_t _ending_cid;
225+
bthread_t _ending_tid; // Used for bthread span to store the ending bthread tid
218226
butil::EndPoint _remote_side;
219227
SpanType _type;
220228
bool _async;
@@ -252,7 +260,7 @@ friend class SpanContainer;
252260

253261
class SpanContainer : public bvar::Collected {
254262
public:
255-
explicit SpanContainer(std::shared_ptr<Span> span) : _span(span) {}
263+
explicit SpanContainer(const std::shared_ptr<Span>& span) : _span(span) {}
256264
~SpanContainer() {}
257265

258266
// Implementations of bvar::Collected

src/bthread/bthread.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,21 @@ uint64_t bthread_cpu_clock_ns(void) {
659659
return 0;
660660
}
661661

662+
int bthread_set_span_funcs(bthread_create_span_fn create_fn,
663+
bthread_destroy_span_fn destroy_fn,
664+
bthread_end_span_fn end_fn) {
665+
if ((create_fn && destroy_fn && end_fn) ||
666+
(!create_fn && !destroy_fn && !end_fn)) {
667+
bthread::g_create_bthread_span = create_fn;
668+
bthread::g_rpcz_parent_span_dtor = destroy_fn;
669+
bthread::g_end_bthread_span = end_fn;
670+
return 0;
671+
}
672+
673+
errno = EINVAL;
674+
return -1;
675+
}
676+
662677
} // extern "C"
663678

664679
void bthread_attr_set_name(bthread_attr_t* attr, const char* name) {

0 commit comments

Comments
 (0)