Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion example/ubring_performance/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ include(FindProtobuf)
protobuf_generate_cpp(PROTO_SRC PROTO_HEADER test.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${PROTOBUF_INCLUDE_DIRS})

# Search for libthrift* by best effort. If it is not found and brpc is
# compiled with thrift protocol enabled, a link error would be reported.
Expand Down Expand Up @@ -131,4 +132,4 @@ add_executable(ubring_performance_client client.cpp ${PROTO_SRC} ${PROTO_HEADER}
add_executable(ubring_performance_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})

target_link_libraries(ubring_performance_client ${BRPC_LIB} ${DYNAMIC_LIB})
target_link_libraries(ubring_performance_server ${BRPC_LIB} ${DYNAMIC_LIB})
target_link_libraries(ubring_performance_server ${BRPC_LIB} ${DYNAMIC_LIB})
3 changes: 1 addition & 2 deletions example/ubring_performance/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ class PerformanceTest {
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_rpc_timeout_ms;
options.max_retry = 0;
// TODO A bug exists when the connection_group parameter is used.
// options.connection_group = std::to_string(reinterpret_cast<uintptr_t>(this));
options.connection_group = std::to_string(reinterpret_cast<uintptr_t>(this));
std::string server = g_servers[(rr_index++) % g_servers.size()];
_channel = new brpc::Channel();
if (_channel->Init(server.c_str(), &options) != 0) {
Expand Down
58 changes: 58 additions & 0 deletions src/brpc/ubshm/shm/shm_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,41 @@

namespace brpc {
namespace ubring {
namespace {

RETURN_CODE ReserveIpcShm(int fd, const SHM *shm)
{
#if defined(__linux__)
const int rc = posix_fallocate(fd, 0, (off_t)shm->len);
if (rc != 0) {
LOG(ERROR) << "IPC reserve shm=" << shm->name << " length=" << shm->len
<< " failed, ret(" << rc << ").";
return SHM_ERR;
}
#else
UNREFERENCE_PARAM(fd);
UNREFERENCE_PARAM(shm);
#endif
return UBRING_OK;
}

RETURN_CODE CheckIpcShmSize(int fd, const SHM *shm)
{
struct stat st;
if (fstat(fd, &st) != 0) {
LOG(ERROR) << "IPC stat shm=" << shm->name << " failed, ret(" << errno << ").";
return SHM_ERR;
}
if ((uint64_t)st.st_size < (uint64_t)shm->len) {
LOG(ERROR) << "IPC shm=" << shm->name << " actual length=" << st.st_size
<< " is shorter than requested length=" << shm->len << ".";
return SHM_ERR;
}
return UBRING_OK;
}

} // namespace

RETURN_CODE IpcShmLocalMalloc(SHM *shm)
{
int fd = shm_open(shm->name, O_CREAT | O_EXCL | O_RDWR, SHM_IPC_MODE);
Expand All @@ -50,9 +85,16 @@ RETURN_CODE IpcShmLocalMalloc(SHM *shm)
return SHM_ERR;
}

if (ReserveIpcShm(fd, shm) != UBRING_OK) {
close(fd);
shm_unlink(shm->name);
return SHM_ERR;
}

shm->addr = (uint8_t*)mmap(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (shm->addr == (uint8_t*)MAP_FAILED) {
LOG(ERROR) << "IPC map shm=" << shm->name << " length=" << shm->len << " failed, ret(" << errno << ").";
shm->addr = NULL;
close(fd);
shm_unlink(shm->name);
return SHM_ERR;
Expand All @@ -75,6 +117,7 @@ RETURN_CODE IpcShmMunmap(SHM *shm)
return SHM_ERR;
}

shm->addr = NULL;
LOG(INFO) << "IPC unmap shm=" << shm->name << " length=" << shm->len << " success.";
return UBRING_OK;
}
Expand Down Expand Up @@ -109,6 +152,8 @@ RETURN_CODE IpcShmLocalFree(SHM *shm)
int ret = munmap(shm->addr, shm->len);
if (ret != UBRING_OK) {
LOG(WARNING) << "IPC unmap shm=" << shm->name << " failed, ret=" << ret;
} else {
shm->addr = NULL;
}

ret = shm_unlink(shm->name);
Expand Down Expand Up @@ -138,9 +183,15 @@ RETURN_CODE IpcShmRemoteMalloc(SHM *shm)
return SHM_ERR;
}

if (CheckIpcShmSize(fd, shm) != UBRING_OK) {
close(fd);
return SHM_ERR;
}

shm->addr = (uint8_t*)mmap(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (shm->addr == (uint8_t*)MAP_FAILED) {
LOG(ERROR) << "IPC map shm=" << shm->name << " failed, ret=" << errno;
shm->addr = NULL;
close(fd);
return SHM_ERR;
}
Expand All @@ -157,9 +208,15 @@ RETURN_CODE IpcShmLocalMmap(SHM *shm, int prot)
return SHM_ERR;
}

if (CheckIpcShmSize(fd, shm) != UBRING_OK) {
close(fd);
return SHM_ERR;
}

shm->addr = (uint8_t*)mmap(NULL, shm->len, prot, MAP_SHARED, fd, 0);
if (shm->addr == (uint8_t*)MAP_FAILED) {
LOG(ERROR) << "IPC map shm=" << shm->name << " failed, ret=" << errno;
shm->addr = NULL;
close(fd);
return SHM_ERR;
}
Expand All @@ -182,6 +239,7 @@ RETURN_CODE IpcShmRemoteFree(SHM *shm)
return SHM_ERR;
}

shm->addr = NULL;
LOG(INFO) << "IPC free remote shm=" << shm->name << " success.";
return UBRING_OK;
}
Expand Down
7 changes: 6 additions & 1 deletion src/brpc/ubshm/shm/shm_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ RETURN_CODE ShmLocalCalloc(SHM *shm) {
LOG(ERROR) << "Failed to alloc local shm.";
return rc;
}
if (UNLIKELY(shm->addr == NULL)) {
LOG(ERROR) << "Local shm=" << shm->name << " allocated with NULL address.";
ShmFree(shm);
return SHM_ERR;
}
memset(shm->addr, 0, shm->len);
return UBRING_OK;
}
Expand Down Expand Up @@ -244,4 +249,4 @@ RETURN_CODE ShmFree(SHM *shm) {
return rc;
}
}
}
}
35 changes: 28 additions & 7 deletions src/brpc/ubshm/timer/timer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <pthread.h>
#include <sched.h>
#include <errno.h>
Expand Down Expand Up @@ -73,8 +75,10 @@ static RETURN_CODE DeleteTimerInner(uint32_t fd) {
kevent(g_epollFd, &evt, 1, NULL, 0, NULL);
#endif

#if defined(OS_LINUX)
uint64_t exp = 0;
read((int)fd, &exp, sizeof(exp));
#endif

close((int)fd);
atomic_fetch_sub(&g_totalTimerNum, 1);
Expand Down Expand Up @@ -153,7 +157,7 @@ RETURN_CODE TimerInit(void) {
if (g_timerFdCtxMap == NULL) {
g_timerFdCtxMap = (TimerFdCtx *)malloc(sizeof(TimerFdCtx) * maxSystemFd);
if (UNLIKELY(!g_timerFdCtxMap)) {
LOG(ERROR) << "Fail to malloc space for timer modules. errno=%d", errno;
LOG(ERROR) << "Fail to malloc space for timer modules. errno=" << errno;
return UBRING_ERR;
}

Expand Down Expand Up @@ -223,7 +227,10 @@ void *TimerEpoll(void *args) {
int32_t readyNum = epoll_wait(g_epollFd, readyEvents, MAX_TIMER,
TIMER_EPOLL_WAIT_TIMEOUT);
#elif defined(OS_MACOSX)
struct timespec timeout = {0, TIMER_EPOLL_WAIT_TIMEOUT * 1000000};
struct timespec timeout = {
TIMER_EPOLL_WAIT_TIMEOUT / 1000,
(TIMER_EPOLL_WAIT_TIMEOUT % 1000) * 1000000
};
int32_t readyNum = kevent(g_epollFd, NULL, 0, readyEvents, MAX_TIMER, &timeout);
#endif

Expand All @@ -249,13 +256,15 @@ void *TimerEpoll(void *args) {
int32_t timerFd = event->ident;
#endif

#if defined(OS_LINUX)
uint64_t exp = 0;
if (read(timerFd, &exp, sizeof(exp)) < 0) {
if (errno != EBADF) {
LOG(ERROR) << "Failed to read timerfd=" << timerFd << " errno=" << errno;
}
continue;
}
#endif
if (TimerFdCtxValidate((uint32_t)timerFd) != UBRING_OK) {
continue;
}
Expand Down Expand Up @@ -301,8 +310,10 @@ void DeleteTimerSafe(uint32_t fd) {
kevent(g_epollFd, &evt, 1, NULL, 0, NULL);
#endif

#if defined(OS_LINUX)
uint64_t exp = 0;
read((int)fd, &exp, sizeof(exp));
#endif

close((int)fd);
atomic_fetch_sub(&g_totalTimerNum, 1);
Expand Down Expand Up @@ -352,10 +363,20 @@ int32_t TimerStart(const itimerspec *time, void *(*cb)(void *), void *args) {
int32_t ret = epoll_ctl(g_epollFd, EPOLL_CTL_ADD, timerFd, &event);
#elif defined(OS_MACOSX)
struct kevent event;
uint64_t timeout_nsec = time->it_value.tv_sec * 1000000000ULL + time->it_value.tv_nsec;
uint64_t interval_nsec = time->it_interval.tv_sec * 1000000000ULL + time->it_interval.tv_nsec;
EV_SET(&event, timerFd, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0,
timeout_nsec / 1000000, NULL);
uint64_t timeout_nsec = time->it_value.tv_sec * 1000000000ULL + time->it_value.tv_nsec;
uint16_t event_flags = EV_ADD | EV_ENABLE;
if (interval_nsec > 0) {
timeout_nsec = interval_nsec;
} else {
event_flags |= EV_ONESHOT;
}
uint64_t timeout_msec = timeout_nsec / 1000000;
if (timeout_msec == 0) {
timeout_msec = 1;
}
EV_SET(&event, timerFd, EVFILT_TIMER, event_flags, 0,
timeout_msec, NULL);
int32_t ret = kevent(g_epollFd, &event, 1, NULL, 0, NULL);
#endif

Expand Down Expand Up @@ -434,7 +455,6 @@ RETURN_CODE TimerFdCtxValidate(uint32_t fd) {
return UBRING_ERR;
}
if (g_timerFdCtxMap[fd].status == TIMER_CONTEXT_NOT_USING) {
LOG(ERROR) << "TimerFd=" << fd << " has wrong status=" << g_timerFdCtxMap[fd].status;
return UBRING_ERR;
}
if (g_timerFdCtxMap[fd].cb == NULL) {
Expand All @@ -451,6 +471,7 @@ static int timerfd_create_macosx(int clockid, int flags) {
if (pipe(pipefd) == -1) {
return -1;
}
close(pipefd[1]);
return pipefd[0];
}

Expand All @@ -465,4 +486,4 @@ static int timerfd_settime_macosx(int fd, int flags,
#endif

} // namespace ubring
} // namespace brpc
} // namespace brpc
15 changes: 11 additions & 4 deletions src/brpc/ubshm/ub_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#if BRPC_WITH_UBRING

#include <errno.h>

#include <gflags/gflags.h>
#include <array>
#include "butil/fd_utility.h"
Expand Down Expand Up @@ -526,12 +528,12 @@ void* UBShmEndpoint::ProcessHandshakeAtServer(void* arg) {
ub_transport->_ub_state = UBShmTransport::UB_OFF;
} else {
ep->_state = S_ALLOC_SHM;
ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, (uint8_t)ep->_socket->fd()};
ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, (uint32_t)ep->_socket->fd()};
strncpy(remote_trx_shm.name, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN);

size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE;
// server端共享内存名称
ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint8_t)ep->_socket->fd()};
ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)ep->_socket->fd()};
char clientName[SHM_MAX_NAME_BUFF_LEN];
strncpy(clientName, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN);

Expand Down Expand Up @@ -646,10 +648,15 @@ ssize_t UBShmEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
}

ssize_t nw = 0;
errno = 0;
nw = _ub_ring->UbrTrxWritev(vec, nvec);
if (UNLIKELY(nw == -1)) {
LOG(ERROR) << "Non-blocking send msg in failed, connection has been closed.";
errno = EPIPE;
if (errno == EMSGSIZE) {
LOG(ERROR) << "Non-blocking send msg failed, message is larger than ubring capacity.";
} else {
LOG(ERROR) << "Non-blocking send msg in failed, connection has been closed.";
errno = EPIPE;
}
} else if (UNLIKELY(nw == UBRING_RETRY)) {
errno = EAGAIN;
nw = -1;
Expand Down
Loading
Loading