From b1f61faa0167e9f5e5489556ce4d0b12f5c08742 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E4=B8=9A=E6=98=8C?= Date: Fri, 29 May 2026 20:01:14 +0800 Subject: [PATCH] Fix UBRing macOS cleanup and timer handling --- example/ubring_performance/CMakeLists.txt | 3 +- example/ubring_performance/client.cpp | 3 +- src/brpc/ubshm/shm/shm_ipc.cpp | 58 +++++++++++++++++++++++ src/brpc/ubshm/shm/shm_mgr.cpp | 7 ++- src/brpc/ubshm/timer/timer_mgr.cpp | 35 +++++++++++--- src/brpc/ubshm/ub_endpoint.cpp | 15 ++++-- src/brpc/ubshm/ub_ring.cpp | 30 +++++++----- src/butil/compat.h | 2 +- 8 files changed, 126 insertions(+), 27 deletions(-) diff --git a/example/ubring_performance/CMakeLists.txt b/example/ubring_performance/CMakeLists.txt index 729381ccb8..45e624885f 100644 --- a/example/ubring_performance/CMakeLists.txt +++ b/example/ubring_performance/CMakeLists.txt @@ -32,6 +32,7 @@ include(FindProtobuf) protobuf_generate_cpp(PROTO_SRC PROTO_HEADER test.proto) # include PROTO_HEADER include_directories(${CMAKE_CURRENT_BINARY_DIR}) +include_directories(${PROTOBUF_INCLUDE_DIRS}) # Search for libthrift* by best effort. If it is not found and brpc is # compiled with thrift protocol enabled, a link error would be reported. @@ -131,4 +132,4 @@ add_executable(ubring_performance_client client.cpp ${PROTO_SRC} ${PROTO_HEADER} add_executable(ubring_performance_server server.cpp ${PROTO_SRC} ${PROTO_HEADER}) target_link_libraries(ubring_performance_client ${BRPC_LIB} ${DYNAMIC_LIB}) -target_link_libraries(ubring_performance_server ${BRPC_LIB} ${DYNAMIC_LIB}) \ No newline at end of file +target_link_libraries(ubring_performance_server ${BRPC_LIB} ${DYNAMIC_LIB}) diff --git a/example/ubring_performance/client.cpp b/example/ubring_performance/client.cpp index c14268a430..0fb11fda75 100644 --- a/example/ubring_performance/client.cpp +++ b/example/ubring_performance/client.cpp @@ -107,8 +107,7 @@ class PerformanceTest { options.connection_type = FLAGS_connection_type; options.timeout_ms = FLAGS_rpc_timeout_ms; options.max_retry = 0; - // TODO A bug exists when the connection_group parameter is used. - // options.connection_group = std::to_string(reinterpret_cast(this)); + options.connection_group = std::to_string(reinterpret_cast(this)); std::string server = g_servers[(rr_index++) % g_servers.size()]; _channel = new brpc::Channel(); if (_channel->Init(server.c_str(), &options) != 0) { diff --git a/src/brpc/ubshm/shm/shm_ipc.cpp b/src/brpc/ubshm/shm/shm_ipc.cpp index 7e934c7568..a63e9cdd7c 100644 --- a/src/brpc/ubshm/shm/shm_ipc.cpp +++ b/src/brpc/ubshm/shm/shm_ipc.cpp @@ -29,6 +29,41 @@ namespace brpc { namespace ubring { +namespace { + +RETURN_CODE ReserveIpcShm(int fd, const SHM *shm) +{ +#if defined(__linux__) + const int rc = posix_fallocate(fd, 0, (off_t)shm->len); + if (rc != 0) { + LOG(ERROR) << "IPC reserve shm=" << shm->name << " length=" << shm->len + << " failed, ret(" << rc << ")."; + return SHM_ERR; + } +#else + UNREFERENCE_PARAM(fd); + UNREFERENCE_PARAM(shm); +#endif + return UBRING_OK; +} + +RETURN_CODE CheckIpcShmSize(int fd, const SHM *shm) +{ + struct stat st; + if (fstat(fd, &st) != 0) { + LOG(ERROR) << "IPC stat shm=" << shm->name << " failed, ret(" << errno << ")."; + return SHM_ERR; + } + if ((uint64_t)st.st_size < (uint64_t)shm->len) { + LOG(ERROR) << "IPC shm=" << shm->name << " actual length=" << st.st_size + << " is shorter than requested length=" << shm->len << "."; + return SHM_ERR; + } + return UBRING_OK; +} + +} // namespace + RETURN_CODE IpcShmLocalMalloc(SHM *shm) { int fd = shm_open(shm->name, O_CREAT | O_EXCL | O_RDWR, SHM_IPC_MODE); @@ -50,9 +85,16 @@ RETURN_CODE IpcShmLocalMalloc(SHM *shm) return SHM_ERR; } + if (ReserveIpcShm(fd, shm) != UBRING_OK) { + close(fd); + shm_unlink(shm->name); + return SHM_ERR; + } + shm->addr = (uint8_t*)mmap(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (shm->addr == (uint8_t*)MAP_FAILED) { LOG(ERROR) << "IPC map shm=" << shm->name << " length=" << shm->len << " failed, ret(" << errno << ")."; + shm->addr = NULL; close(fd); shm_unlink(shm->name); return SHM_ERR; @@ -75,6 +117,7 @@ RETURN_CODE IpcShmMunmap(SHM *shm) return SHM_ERR; } + shm->addr = NULL; LOG(INFO) << "IPC unmap shm=" << shm->name << " length=" << shm->len << " success."; return UBRING_OK; } @@ -109,6 +152,8 @@ RETURN_CODE IpcShmLocalFree(SHM *shm) int ret = munmap(shm->addr, shm->len); if (ret != UBRING_OK) { LOG(WARNING) << "IPC unmap shm=" << shm->name << " failed, ret=" << ret; + } else { + shm->addr = NULL; } ret = shm_unlink(shm->name); @@ -138,9 +183,15 @@ RETURN_CODE IpcShmRemoteMalloc(SHM *shm) return SHM_ERR; } + if (CheckIpcShmSize(fd, shm) != UBRING_OK) { + close(fd); + return SHM_ERR; + } + shm->addr = (uint8_t*)mmap(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (shm->addr == (uint8_t*)MAP_FAILED) { LOG(ERROR) << "IPC map shm=" << shm->name << " failed, ret=" << errno; + shm->addr = NULL; close(fd); return SHM_ERR; } @@ -157,9 +208,15 @@ RETURN_CODE IpcShmLocalMmap(SHM *shm, int prot) return SHM_ERR; } + if (CheckIpcShmSize(fd, shm) != UBRING_OK) { + close(fd); + return SHM_ERR; + } + shm->addr = (uint8_t*)mmap(NULL, shm->len, prot, MAP_SHARED, fd, 0); if (shm->addr == (uint8_t*)MAP_FAILED) { LOG(ERROR) << "IPC map shm=" << shm->name << " failed, ret=" << errno; + shm->addr = NULL; close(fd); return SHM_ERR; } @@ -182,6 +239,7 @@ RETURN_CODE IpcShmRemoteFree(SHM *shm) return SHM_ERR; } + shm->addr = NULL; LOG(INFO) << "IPC free remote shm=" << shm->name << " success."; return UBRING_OK; } diff --git a/src/brpc/ubshm/shm/shm_mgr.cpp b/src/brpc/ubshm/shm/shm_mgr.cpp index 3f819857b2..39a54dd1df 100644 --- a/src/brpc/ubshm/shm/shm_mgr.cpp +++ b/src/brpc/ubshm/shm/shm_mgr.cpp @@ -114,6 +114,11 @@ RETURN_CODE ShmLocalCalloc(SHM *shm) { LOG(ERROR) << "Failed to alloc local shm."; return rc; } + if (UNLIKELY(shm->addr == NULL)) { + LOG(ERROR) << "Local shm=" << shm->name << " allocated with NULL address."; + ShmFree(shm); + return SHM_ERR; + } memset(shm->addr, 0, shm->len); return UBRING_OK; } @@ -244,4 +249,4 @@ RETURN_CODE ShmFree(SHM *shm) { return rc; } } -} \ No newline at end of file +} diff --git a/src/brpc/ubshm/timer/timer_mgr.cpp b/src/brpc/ubshm/timer/timer_mgr.cpp index e53833f95e..d12d063cec 100644 --- a/src/brpc/ubshm/timer/timer_mgr.cpp +++ b/src/brpc/ubshm/timer/timer_mgr.cpp @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +#ifndef _GNU_SOURCE #define _GNU_SOURCE +#endif #include #include #include @@ -73,8 +75,10 @@ static RETURN_CODE DeleteTimerInner(uint32_t fd) { kevent(g_epollFd, &evt, 1, NULL, 0, NULL); #endif +#if defined(OS_LINUX) uint64_t exp = 0; read((int)fd, &exp, sizeof(exp)); +#endif close((int)fd); atomic_fetch_sub(&g_totalTimerNum, 1); @@ -153,7 +157,7 @@ RETURN_CODE TimerInit(void) { if (g_timerFdCtxMap == NULL) { g_timerFdCtxMap = (TimerFdCtx *)malloc(sizeof(TimerFdCtx) * maxSystemFd); if (UNLIKELY(!g_timerFdCtxMap)) { - LOG(ERROR) << "Fail to malloc space for timer modules. errno=%d", errno; + LOG(ERROR) << "Fail to malloc space for timer modules. errno=" << errno; return UBRING_ERR; } @@ -223,7 +227,10 @@ void *TimerEpoll(void *args) { int32_t readyNum = epoll_wait(g_epollFd, readyEvents, MAX_TIMER, TIMER_EPOLL_WAIT_TIMEOUT); #elif defined(OS_MACOSX) - struct timespec timeout = {0, TIMER_EPOLL_WAIT_TIMEOUT * 1000000}; + struct timespec timeout = { + TIMER_EPOLL_WAIT_TIMEOUT / 1000, + (TIMER_EPOLL_WAIT_TIMEOUT % 1000) * 1000000 + }; int32_t readyNum = kevent(g_epollFd, NULL, 0, readyEvents, MAX_TIMER, &timeout); #endif @@ -249,6 +256,7 @@ void *TimerEpoll(void *args) { int32_t timerFd = event->ident; #endif +#if defined(OS_LINUX) uint64_t exp = 0; if (read(timerFd, &exp, sizeof(exp)) < 0) { if (errno != EBADF) { @@ -256,6 +264,7 @@ void *TimerEpoll(void *args) { } continue; } +#endif if (TimerFdCtxValidate((uint32_t)timerFd) != UBRING_OK) { continue; } @@ -301,8 +310,10 @@ void DeleteTimerSafe(uint32_t fd) { kevent(g_epollFd, &evt, 1, NULL, 0, NULL); #endif +#if defined(OS_LINUX) uint64_t exp = 0; read((int)fd, &exp, sizeof(exp)); +#endif close((int)fd); atomic_fetch_sub(&g_totalTimerNum, 1); @@ -352,10 +363,20 @@ int32_t TimerStart(const itimerspec *time, void *(*cb)(void *), void *args) { int32_t ret = epoll_ctl(g_epollFd, EPOLL_CTL_ADD, timerFd, &event); #elif defined(OS_MACOSX) struct kevent event; - uint64_t timeout_nsec = time->it_value.tv_sec * 1000000000ULL + time->it_value.tv_nsec; uint64_t interval_nsec = time->it_interval.tv_sec * 1000000000ULL + time->it_interval.tv_nsec; - EV_SET(&event, timerFd, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, - timeout_nsec / 1000000, NULL); + uint64_t timeout_nsec = time->it_value.tv_sec * 1000000000ULL + time->it_value.tv_nsec; + uint16_t event_flags = EV_ADD | EV_ENABLE; + if (interval_nsec > 0) { + timeout_nsec = interval_nsec; + } else { + event_flags |= EV_ONESHOT; + } + uint64_t timeout_msec = timeout_nsec / 1000000; + if (timeout_msec == 0) { + timeout_msec = 1; + } + EV_SET(&event, timerFd, EVFILT_TIMER, event_flags, 0, + timeout_msec, NULL); int32_t ret = kevent(g_epollFd, &event, 1, NULL, 0, NULL); #endif @@ -434,7 +455,6 @@ RETURN_CODE TimerFdCtxValidate(uint32_t fd) { return UBRING_ERR; } if (g_timerFdCtxMap[fd].status == TIMER_CONTEXT_NOT_USING) { - LOG(ERROR) << "TimerFd=" << fd << " has wrong status=" << g_timerFdCtxMap[fd].status; return UBRING_ERR; } if (g_timerFdCtxMap[fd].cb == NULL) { @@ -451,6 +471,7 @@ static int timerfd_create_macosx(int clockid, int flags) { if (pipe(pipefd) == -1) { return -1; } + close(pipefd[1]); return pipefd[0]; } @@ -465,4 +486,4 @@ static int timerfd_settime_macosx(int fd, int flags, #endif } // namespace ubring -} // namespace brpc \ No newline at end of file +} // namespace brpc diff --git a/src/brpc/ubshm/ub_endpoint.cpp b/src/brpc/ubshm/ub_endpoint.cpp index b4c728c057..7b4868209a 100644 --- a/src/brpc/ubshm/ub_endpoint.cpp +++ b/src/brpc/ubshm/ub_endpoint.cpp @@ -17,6 +17,8 @@ #if BRPC_WITH_UBRING +#include + #include #include #include "butil/fd_utility.h" @@ -526,12 +528,12 @@ void* UBShmEndpoint::ProcessHandshakeAtServer(void* arg) { ub_transport->_ub_state = UBShmTransport::UB_OFF; } else { ep->_state = S_ALLOC_SHM; - ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, (uint8_t)ep->_socket->fd()}; + ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, (uint32_t)ep->_socket->fd()}; strncpy(remote_trx_shm.name, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN); size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE; // server端共享内存名称 - ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint8_t)ep->_socket->fd()}; + ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)ep->_socket->fd()}; char clientName[SHM_MAX_NAME_BUFF_LEN]; strncpy(clientName, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN); @@ -646,10 +648,15 @@ ssize_t UBShmEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { } ssize_t nw = 0; + errno = 0; nw = _ub_ring->UbrTrxWritev(vec, nvec); if (UNLIKELY(nw == -1)) { - LOG(ERROR) << "Non-blocking send msg in failed, connection has been closed."; - errno = EPIPE; + if (errno == EMSGSIZE) { + LOG(ERROR) << "Non-blocking send msg failed, message is larger than ubring capacity."; + } else { + LOG(ERROR) << "Non-blocking send msg in failed, connection has been closed."; + errno = EPIPE; + } } else if (UNLIKELY(nw == UBRING_RETRY)) { errno = EAGAIN; nw = -1; diff --git a/src/brpc/ubshm/ub_ring.cpp b/src/brpc/ubshm/ub_ring.cpp index 0ea64f07c1..11a5d9b311 100644 --- a/src/brpc/ubshm/ub_ring.cpp +++ b/src/brpc/ubshm/ub_ring.cpp @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include @@ -91,9 +92,6 @@ RETURN_CODE UBRing::UbrTrxClose() { if (_trx->ubrTx.remoteRxEventQ.addr != nullptr) { ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->flag = UBR_STATE_CLOSED; } - if (UNLIKELY(ShmRemoteFree(&_trx->remoteShm) != UBRING_OK)) { - LOG(WARNING) << "Force close, remote shm " << _trx->remoteShm.name << " free failed."; - } if (UNLIKELY(UbrTrxFreeShm(_trx) != UBRING_OK)) { LOG(WARNING) << "Force close, local shm " << _trx->localShm.name << " free failed."; } @@ -321,10 +319,6 @@ void *UBRing::UbrAsynClearCallback(void *args) return NULL; } - if (UNLIKELY(ShmRemoteFree(&trx->remoteShm) != UBRING_OK)) { - LOG(ERROR) << "Trx close, remote shm " << trx->remoteShm.name << " free failed."; - } - if (UNLIKELY(UbrTrxFreeShm(trx) != UBRING_OK)) { LOG(ERROR) << "Trx close, wait for local shm " << trx->localShm.name << " free fail."; } @@ -348,6 +342,12 @@ int UBRing::UbrTrxSend(const void *buf, uint32_t bufLen) uint32_t remainChunkNum = (_trx->ubrTx.writePos > tail) ? (tail + cap - _trx->ubrTx.writePos) : (tail - _trx->ubrTx.writePos); uint32_t needMsgChunkNum = CalcUbrMsgChunkCnt(bufLen); + if (needMsgChunkNum >= cap) { + LOG(ERROR) << "Ubr send failed, payload length=" << bufLen + << " needs " << needMsgChunkNum << " chunks, capacity=" << cap << "."; + errno = EMSGSIZE; + return UBRING_ERR; + } if (remainChunkNum < needMsgChunkNum) { return UBRING_RETRY; } @@ -653,7 +653,7 @@ RETURN_CODE UBRing::UbrTrxFreeShm(UbrTrx *trx) RETURN_CODE remoteRc = UBRING_OK; if (trx->remoteShm.addr != NULL) { - remoteRc = IpcShmRemoteFree(&trx->remoteShm); + remoteRc = ShmRemoteFree(&trx->remoteShm); } if (remoteRc != UBRING_OK) { LOG(WARNING) << "Free remote shm " << trx->remoteShm.name << " failed, rc=" << remoteRc; @@ -795,6 +795,7 @@ int UBRing::UbrAllocateServerShm(SHM* remote_trx_shm, SHM* local_trx_shm) { if (UNLIKELY((ShmLocalCalloc(local_trx_shm)) != UBRING_OK)) { LOG(ERROR) << "Trx apply local shared memory failed."; + ShmRemoteFree(remote_trx_shm); return -1; } @@ -808,9 +809,9 @@ int UBRing::UbrAllocateServerShm(SHM* remote_trx_shm, SHM* local_trx_shm) { _trx->type = TCP_TRX; if (UNLIKELY((UbrServerTrxInit(local_trx_shm, remote_trx_shm)) != UBRING_OK)) { LOG(ERROR) << "Server trx init failed."; - ShmRemoteFree(remote_trx_shm); UbrTrxFreeShm(_trx); UBRingManager::ReleaseUbrTrxFromMgr(_trx); + _trx = nullptr; return -1; } return 0; @@ -826,6 +827,7 @@ int UBRing::UbrAllocateLocalShm(SHM *local_trx_shm, const char *shm_name) _trx->type = TCP_TRX; if (UNLIKELY((ApplyAndMapLocalShm(local_trx_shm, shm_name)) != UBRING_OK)) { LOG(ERROR) << "Trx apply or map local shared memory failed, localName=" << shm_name; + _trx = nullptr; return -1; } return 0; @@ -873,7 +875,7 @@ RETURN_CODE UBRing::UbrMapRemoteShmAddTimer(SHM *localTrxShm, const char *localN if (UNLIKELY(UbrAddTimer() != UBRING_OK)) { LOG(ERROR) << "Ubr add timer failed, localName=" << localName; - ShmRemoteFree(&remoteTrxShm); + ShmRemoteFree(&_trx->remoteShm); return UBRING_ERR; } @@ -884,7 +886,7 @@ RETURN_CODE UBRing::UbrMapRemoteShmAddTimer(SHM *localTrxShm, const char *localN LOG(ERROR) << "Local shm " << localTrxShm->name << " wait for connect remote map timeout."; DeleteTimerSafe((uint32_t)_trx->hbTimerFd); DeleteTimerSafe((uint32_t)_trx->timerFd); - ShmRemoteFree(&remoteTrxShm); + ShmRemoteFree(&_trx->remoteShm); return UBRING_ERR_TIMEOUT; } @@ -961,6 +963,12 @@ RETURN_CODE UBRing::WritevHasEnoughSpace(size_t bufLen) uint32_t remainChunkNum = (_trx->ubrTx.writePos > tail) ? (tail + cap - _trx->ubrTx.writePos) : (tail - _trx->ubrTx.writePos); uint32_t needMsgChunkNum = CalcUbrMsgChunkCnt((uint32_t)bufLen); + if (needMsgChunkNum >= cap) { + LOG(ERROR) << "Ubr write failed, payload length=" << bufLen + << " needs " << needMsgChunkNum << " chunks, capacity=" << cap << "."; + errno = EMSGSIZE; + return UBRING_ERR; + } if (remainChunkNum < needMsgChunkNum) { return UBRING_RETRY; } diff --git a/src/butil/compat.h b/src/butil/compat.h index d75e16337f..53fcc1a8c0 100644 --- a/src/butil/compat.h +++ b/src/butil/compat.h @@ -36,7 +36,7 @@ struct pthread_spinlock_t { dispatch_semaphore_t sem; }; inline int pthread_spin_init(pthread_spinlock_t *__lock, int __pshared) { - if (__pshared != 0) { + if (__pshared != 0 && __pshared != PTHREAD_PROCESS_PRIVATE) { return EINVAL; } __lock->sem = dispatch_semaphore_create(1);