Skip to content

Commit ccda3f2

Browse files
author
Grok Compression
committed
shmem: fix osx race condition
1 parent f7572fe commit ccda3f2

2 files changed

Lines changed: 131 additions & 58 deletions

File tree

src/lib/codec/shared/Messenger.h

Lines changed: 77 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,11 @@ struct Synch
321321
};
322322
struct SharedMemoryManager
323323
{
324-
static bool initShm(const std::string& name, size_t len, grk_handle* hMapFile, char** buffer)
324+
// Windows: CreateFileMapping transparently opens or creates the mapping,
325+
// so isCreator is unused — stale segments from a previous crash are handled
326+
// automatically, unlike POSIX where O_EXCL + shm_unlink recovery is needed.
327+
static bool initShm(const std::string& name, size_t len, grk_handle* hMapFile, char** buffer,
328+
[[maybe_unused]] bool isCreator)
325329
{
326330
*hMapFile = CreateFileMapping(INVALID_HANDLE_VALUE, // use paging file
327331
NULL, // default security
@@ -447,7 +451,8 @@ struct Synch
447451
};
448452
struct SharedMemoryManager
449453
{
450-
static bool initShm(const std::string& name, size_t len, grk_handle* shm_fd, char** buffer)
454+
static bool initShm(const std::string& name, size_t len, grk_handle* shm_fd, char** buffer,
455+
bool isCreator)
451456
{
452457
if(*shm_fd)
453458
return true;
@@ -466,38 +471,63 @@ struct SharedMemoryManager
466471
return false;
467472
}
468473

469-
*shm_fd = shm_open(name.c_str(), O_CREAT | O_RDWR, 0666);
470-
if(*shm_fd < 0)
474+
if(isCreator)
471475
{
472-
getMessengerLogger()->error("Error opening shared memory: %s", strerror(errno));
473-
return false;
476+
*shm_fd = shm_open(name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
477+
if(*shm_fd < 0 && errno == EEXIST)
478+
{
479+
// stale segment from a previous crash — remove and retry
480+
shm_unlink(name.c_str());
481+
*shm_fd = shm_open(name.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
482+
}
483+
if(*shm_fd < 0)
484+
{
485+
getMessengerLogger()->error("Error creating shared memory %s: %s", name.c_str(),
486+
strerror(errno));
487+
*shm_fd = 0;
488+
return false;
489+
}
490+
int rc = ftruncate(*shm_fd, (off_t)len);
491+
if(rc)
492+
{
493+
getMessengerLogger()->error("Error truncating shared memory to %" PRIu64 " bytes: %s",
494+
(uint64_t)len, strerror(errno));
495+
close(*shm_fd);
496+
shm_unlink(name.c_str());
497+
*shm_fd = 0;
498+
return false;
499+
}
474500
}
475-
int rc = ftruncate(*shm_fd, (off_t)len);
476-
if(rc)
501+
else
477502
{
478-
getMessengerLogger()->error("Error truncating shared memory to %" PRIu64 " bytes: %s",
479-
(uint64_t)len, strerror(errno));
480-
rc = close(*shm_fd);
481-
if(rc)
482-
getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
483-
rc = shm_unlink(name.c_str());
484-
// 2 == No such file or directory
485-
if(rc && errno != 2)
486-
getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
487-
return false;
503+
// Non-creator: open existing segment, retry briefly if not yet created
504+
const int maxRetries = 50;
505+
const int retryDelayMs = 10;
506+
for(int attempt = 0; attempt < maxRetries; ++attempt)
507+
{
508+
*shm_fd = shm_open(name.c_str(), O_RDWR, 0666);
509+
if(*shm_fd >= 0)
510+
break;
511+
if(errno != ENOENT || attempt == maxRetries - 1)
512+
{
513+
getMessengerLogger()->error("Error opening shared memory %s: %s", name.c_str(),
514+
strerror(errno));
515+
return false;
516+
}
517+
std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs));
518+
}
488519
}
489-
*buffer = static_cast<char*>(mmap(0, len, PROT_WRITE, MAP_SHARED, *shm_fd, 0));
520+
521+
*buffer = static_cast<char*>(mmap(0, len, PROT_READ | PROT_WRITE, MAP_SHARED, *shm_fd, 0));
490522
if(*buffer == MAP_FAILED)
491523
{
492524
*buffer = nullptr;
493-
getMessengerLogger()->error("Error mapping shared memory: %s", strerror(errno));
494-
rc = close(*shm_fd);
495-
if(rc)
496-
getMessengerLogger()->error("Error closing shared memory: %s", strerror(errno));
497-
rc = shm_unlink(name.c_str());
498-
// 2 == No such file or directory
499-
if(rc && errno != 2)
500-
getMessengerLogger()->error("Error unlinking shared memory: %s", strerror(errno));
525+
getMessengerLogger()->error("Error mapping shared memory %s: %s", name.c_str(),
526+
strerror(errno));
527+
close(*shm_fd);
528+
if(isCreator)
529+
shm_unlink(name.c_str());
530+
*shm_fd = 0;
501531
}
502532

503533
return *buffer != nullptr;
@@ -674,10 +704,7 @@ static void processorThread(Messenger* messenger, std::function<void(std::string
674704

675705
struct Messenger
676706
{
677-
explicit Messenger(MessengerInit init)
678-
: running(true), initialized_(false), shutdown_(false), init_(init), outboundSynch_(nullptr),
679-
inboundSynch_(nullptr), uncompressed_buffer_(nullptr), compressed_buffer_(nullptr),
680-
uncompressed_fd_(0), compressed_fd_(0)
707+
explicit Messenger(MessengerInit init) : init_(init)
681708
{
682709
if(!isClient())
683710
{
@@ -757,15 +784,6 @@ struct Messenger
757784
}
758785
bool initBuffers(void)
759786
{
760-
#ifndef _WIN32
761-
// clean up in case of previous crash
762-
if(init_.firstLaunch(init_.isClient_))
763-
{
764-
// shm_unlink(grokUncompressedBuf.c_str());
765-
// shm_unlink(grokCompressedBuf.c_str());
766-
}
767-
#endif
768-
769787
char temp[512];
770788
sprintf(temp,
771789
"Initializing shared memory buffers: num frames %zu, "
@@ -775,9 +793,9 @@ struct Messenger
775793
getMessengerLogger()->info(temp);
776794
if(init_.uncompressedFrameSize_)
777795
{
778-
bool rc = SharedMemoryManager::initShm(grokUncompressedBuf,
779-
init_.uncompressedFrameSize_ * init_.numFrames_,
780-
&uncompressed_fd_, &uncompressed_buffer_);
796+
bool rc = SharedMemoryManager::initShm(
797+
grokUncompressedBuf, init_.uncompressedFrameSize_ * init_.numFrames_, &uncompressed_fd_,
798+
&uncompressed_buffer_, !init_.isClient_);
781799
if(!rc)
782800
return false;
783801

@@ -787,9 +805,9 @@ struct Messenger
787805
}
788806
if(init_.compressedFrameSize_)
789807
{
790-
bool rc = SharedMemoryManager::initShm(grokCompressedBuf,
791-
init_.compressedFrameSize_ * init_.numFrames_,
792-
&compressed_fd_, &compressed_buffer_);
808+
bool rc = SharedMemoryManager::initShm(
809+
grokCompressedBuf, init_.compressedFrameSize_ * init_.numFrames_, &compressed_fd_,
810+
&compressed_buffer_, !init_.isClient_);
793811
if(!rc)
794812
return false;
795813

@@ -939,9 +957,9 @@ struct Messenger
939957

940958
return (uint8_t*)(compressed_buffer_ + frameId * init_.compressedFrameSize_);
941959
}
942-
std::atomic_bool running;
943-
bool initialized_;
944-
bool shutdown_;
960+
std::atomic_bool running{true};
961+
bool initialized_ = false;
962+
bool shutdown_ = false;
945963
MessengerBlockingQueue<std::string> sendQueue;
946964
MessengerBlockingQueue<std::string> receiveQueue;
947965
MessengerBlockingQueue<BufferSrc> availableBuffers_;
@@ -956,17 +974,17 @@ struct Messenger
956974

957975
private:
958976
std::thread outbound;
959-
Synch* outboundSynch_;
977+
Synch* outboundSynch_ = nullptr;
960978

961979
std::thread inbound;
962-
Synch* inboundSynch_;
980+
Synch* inboundSynch_ = nullptr;
963981

964982
std::vector<std::thread> processors_;
965-
char* uncompressed_buffer_;
966-
char* compressed_buffer_;
983+
char* uncompressed_buffer_ = nullptr;
984+
char* compressed_buffer_ = nullptr;
967985

968-
grk_handle uncompressed_fd_;
969-
grk_handle compressed_fd_;
986+
grk_handle uncompressed_fd_ = 0;
987+
grk_handle compressed_fd_ = 0;
970988
};
971989

972990
/*************************** I/O Threads *******************************/
@@ -975,7 +993,8 @@ static void outboundThread(Messenger* messenger, const std::string& sendBuf, Syn
975993
grk_handle shm_fd = 0;
976994
char* send_buffer = nullptr;
977995

978-
if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer))
996+
if(!SharedMemoryManager::initShm(sendBuf, messageBufferLen, &shm_fd, &send_buffer,
997+
!messenger->isClient()))
979998
return;
980999
while(messenger->running)
9811000
{
@@ -998,7 +1017,8 @@ static void inboundThread(Messenger* messenger, const std::string& receiveBuf, S
9981017
grk_handle shm_fd = 0;
9991018
char* receive_buffer = nullptr;
10001019

1001-
if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer))
1020+
if(!SharedMemoryManager::initShm(receiveBuf, messageBufferLen, &shm_fd, &receive_buffer,
1021+
!messenger->isClient()))
10021022
return;
10031023
while(messenger->running)
10041024
{

tests/GrkMessengerTest.cpp

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ static void testSharedMemoryAllocation()
365365
grk_handle fd = 0;
366366
char* buf = nullptr;
367367

368-
bool rc = SharedMemoryManager::initShm("/grk_test_shm_alloc", bufLen, &fd, &buf);
368+
bool rc = SharedMemoryManager::initShm("/grk_test_shm_alloc", bufLen, &fd, &buf, true);
369369
TEST_ASSERT(rc, "initShm should succeed");
370370
TEST_ASSERT(buf != nullptr, "buffer should be non-null");
371371

@@ -380,6 +380,36 @@ static void testSharedMemoryAllocation()
380380
TEST_PASS("testSharedMemoryAllocation");
381381
}
382382

383+
#ifndef _WIN32
384+
// Test that creator handles stale shm from a previous crash (EEXIST → unlink → retry)
385+
static void testSharedMemoryCrashRecovery()
386+
{
387+
const char* name = "/grk_test_crash_recovery";
388+
const size_t bufLen = 4096;
389+
390+
// simulate stale segment left by a crashed process
391+
shm_unlink(name);
392+
int stale_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
393+
TEST_ASSERT(stale_fd >= 0, "stale shm_open should succeed");
394+
ftruncate(stale_fd, (off_t)bufLen);
395+
close(stale_fd);
396+
397+
// creator should recover: unlink stale segment and create fresh
398+
grk_handle fd = 0;
399+
char* buf = nullptr;
400+
bool rc = SharedMemoryManager::initShm(name, bufLen, &fd, &buf, true);
401+
TEST_ASSERT(rc, "initShm should recover from stale segment");
402+
TEST_ASSERT(buf != nullptr, "buffer should be non-null after recovery");
403+
404+
memset(buf, 0xCD, bufLen);
405+
TEST_ASSERT((uint8_t)buf[0] == 0xCD, "written data should persist after recovery");
406+
407+
rc = SharedMemoryManager::deinitShm(name, bufLen, fd, &buf);
408+
TEST_ASSERT(rc, "deinitShm should succeed");
409+
TEST_PASS("testSharedMemoryCrashRecovery");
410+
}
411+
#endif
412+
383413
// Test frame accessor methods
384414
static void testFrameAccessors()
385415
{
@@ -458,6 +488,18 @@ static void testClientInitBufferQueue()
458488
const size_t compressedSize = 64;
459489
const size_t numFrames = 2;
460490

491+
#ifndef _WIN32
492+
// Pre-create data buffers (simulates server creating before client opens)
493+
shm_unlink(grokUncompressedBuf.c_str());
494+
shm_unlink(grokCompressedBuf.c_str());
495+
grk_handle pre_uc_fd = 0, pre_c_fd = 0;
496+
char *pre_uc = nullptr, *pre_c = nullptr;
497+
SharedMemoryManager::initShm(grokUncompressedBuf, uncompressedSize * numFrames, &pre_uc_fd,
498+
&pre_uc, true);
499+
SharedMemoryManager::initShm(grokCompressedBuf, compressedSize * numFrames, &pre_c_fd, &pre_c,
500+
true);
501+
#endif
502+
461503
MessengerInit init(true, "Global\\grk_test3_out", "Global\\grk_test3_out_sent",
462504
"Global\\grk_test3_out_ready", "Global\\grk_test3_in",
463505
"Global\\grk_test3_in_sent", "Global\\grk_test3_in_ready", proc, 0, 0, 0, 0);
@@ -474,6 +516,14 @@ static void testClientInitBufferQueue()
474516
TEST_ASSERT(m.availableBuffers_.pop(src), "should get buffer");
475517
TEST_ASSERT(src.frameId_ == i, "frameId should match");
476518
}
519+
520+
#ifndef _WIN32
521+
munmap(pre_uc, uncompressedSize * numFrames);
522+
close(pre_uc_fd);
523+
munmap(pre_c, compressedSize * numFrames);
524+
close(pre_c_fd);
525+
#endif
526+
477527
TEST_PASS("testClientInitBufferQueue");
478528
}
479529

@@ -536,6 +586,9 @@ int GrkMessengerTest::main(int argc, char** argv)
536586
// Full protocol loopback test
537587
fprintf(stdout, "\n[Shared Memory / Buffers]\n");
538588
testSharedMemoryAllocation();
589+
#ifndef _WIN32
590+
testSharedMemoryCrashRecovery();
591+
#endif
539592
testFrameAccessors();
540593
testServerBufferQueue();
541594
testClientInitBufferQueue();

0 commit comments

Comments
 (0)