Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@
*.exe
*.out
*.app

# Build directories
build/
CMakeFiles/
CMakeCache.txt
cmake_install.cmake
Makefile
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ To enable the checksum (only on XRootD 5.2+):
ofs.ckslib * libXrdMultiuser.so
```

### Write Buffering

To reduce IOPS from small sequential writes, you can enable write buffering:

```
multiuser.writebuffersize <bytes>
```

Where `<bytes>` is the buffer size in bytes. Default is 0 (disabled). When enabled:
- Sequential writes smaller than the buffer size are accumulated in memory
- The buffer is flushed when full, when a non-sequential write occurs, or when the file is closed
- Buffering is automatically disabled for a file if non-sequential writes are detected

Example: Buffer up to 1MB of writes:
```
multiuser.writebuffersize 1048576
```

**Note:** Buffering is only suitable for sequential write workloads. Non-sequential writes will cause the buffer to be flushed and buffering disabled for that file.

Startup
-------

Expand Down
5 changes: 5 additions & 0 deletions configs/60-osg-multiuser.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,9 @@ if defined ?~XC_ENABLE_MULTIUSER
# checksum while it is writing a file. To turn this on, uncomment the
# following line:
# multiuser.checksumonwrite on

# Write buffering can reduce IOPS for workloads with many small sequential
# writes. Specify the buffer size in bytes (default: 0 = disabled).
# Example: Buffer up to 1MB of sequential writes
# multiuser.writebuffersize 1048576
fi
12 changes: 11 additions & 1 deletion src/MultiuserFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
#include "XrdChecksum.hh"

#include <memory>
#include <mutex>
#include <vector>

class MultiuserFile : public XrdOssDF {
public:
MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss);
MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss, size_t write_buffer_size);

virtual ~MultiuserFile() {
if (m_state) {delete m_state;}
Expand Down Expand Up @@ -127,6 +129,7 @@ public:
int Close(long long *retsz=0);

private:
int FlushWriteBuffer();
std::unique_ptr<XrdOssDF> m_wrapped;
XrdSysError &m_log;
const XrdSecEntity* m_client;
Expand All @@ -138,6 +141,13 @@ private:
bool m_checksum_on_write;
unsigned m_digests;

// Write buffering
size_t m_write_buffer_size;
std::vector<unsigned char> m_write_buffer;
off_t m_buffer_offset;
bool m_buffering_enabled;
std::mutex m_buffer_mutex;

};

#endif
27 changes: 25 additions & 2 deletions src/MultiuserFileSystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ MultiuserFileSystem::MultiuserFileSystem(XrdOss *oss, XrdSysLogger *lp, const ch
m_env(envP),
m_log(lp, "multiuser_"),
m_checksum_on_write(false),
m_digests(0)
m_digests(0),
m_write_buffer_size(0)
{
if (!oss) {
throw std::runtime_error("The multi-user plugin must be chained with another filesystem.");
Expand Down Expand Up @@ -114,6 +115,28 @@ MultiuserFileSystem::Config(XrdSysLogger *lp, const char *configfn)
return false;
}
}

// Write buffer size
if (!strcmp("multiuser.writebuffersize", val)) {
val = Config.GetWord();
if (!val || !val[0]) {
m_log.Emsg("Config", "multiuser.writebuffersize must specify a value");
Config.Close();
return false;
}
char *endptr = NULL;
errno = 0;
long int buffer_size = strtol(val, &endptr, 0);
if (errno == ERANGE || buffer_size < 0 || endptr == val || *endptr != '\0') {
m_log.Emsg("Config", "multiuser.writebuffersize must specify a valid non-negative integer");
Config.Close();
return false;
}
m_write_buffer_size = static_cast<size_t>(buffer_size);
std::stringstream ss;
ss << "Setting write buffer size to " << m_write_buffer_size << " bytes";
m_log.Emsg("Config", ss.str().c_str());
}
if (!strcmp("xrootd.chksum", val)) {
m_digests = 0;
val = Config.GetWord();
Expand Down Expand Up @@ -175,7 +198,7 @@ XrdOssDF *MultiuserFileSystem::newFile(const char *user)
{
// Call the underlying OSS newFile
std::unique_ptr<XrdOssDF> wrapped(m_oss->newFile(user));
return (MultiuserFile *)new MultiuserFile(user, std::move(wrapped), m_log, m_umask_mode, m_checksum_on_write, m_digests, this);
return (MultiuserFile *)new MultiuserFile(user, std::move(wrapped), m_log, m_umask_mode, m_checksum_on_write, m_digests, this, m_write_buffer_size);
}

int MultiuserFileSystem::Chmod(const char * path, mode_t mode, XrdOucEnv *env)
Expand Down
3 changes: 3 additions & 0 deletions src/MultiuserFileSystem.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public:
bool
Config(XrdSysLogger *lp, const char *configfn);

size_t GetWriteBufferSize() const { return m_write_buffer_size; }

XrdOssDF *newDir(const char *user=0);
XrdOssDF *newFile(const char *user=0);
int Chmod(const char * path, mode_t mode, XrdOucEnv *env=0);
Expand Down Expand Up @@ -71,6 +73,7 @@ private:
std::shared_ptr<XrdAccAuthorize> m_authz;
bool m_checksum_on_write;
unsigned m_digests;
size_t m_write_buffer_size;

};

Expand Down
140 changes: 129 additions & 11 deletions src/multiuser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ErrorSentry
};


MultiuserFile::MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss) :
MultiuserFile::MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss, size_t write_buffer_size) :
XrdOssDF(user),
m_wrapped(std::move(ossDF)),
m_log(log),
Expand All @@ -142,7 +142,10 @@ MultiuserFile::MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF,
m_nextoff(0),
m_oss(oss),
m_checksum_on_write(checksum_on_write),
m_digests(digests)
m_digests(digests),
m_write_buffer_size(write_buffer_size),
m_buffer_offset(-1),
m_buffering_enabled(write_buffer_size > 0)
{}

int MultiuserFile::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Expand Down Expand Up @@ -173,28 +176,143 @@ int MultiuserFile::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv

ssize_t MultiuserFile::Write(const void *buffer, off_t offset, size_t size)
{
// Lock protects buffer state and sequential write tracking.
// While this serializes writes to the same file, it ensures correctness
// and is consistent with the sequential write assumption.
std::lock_guard<std::mutex> lock(m_buffer_mutex);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When buffered writes are not enabled at constructor time (i.e., disabled in the config file), never take this mutex.

Merge the logic in https://github.com/opensciencegrid/xrootd-multiuser/pull/61/changes with the changes in this function; this work will conflict with that and it's straightforward to do both.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use std::unique_lock with std::defer_lock - the lock is only acquired when m_write_buffer_size > 0. Also merged the PR #61 logic to disable checksumming (instead of returning error) on non-sequential writes. Fixed in b830dd6.


if ((offset != m_nextoff) && m_state)
// Check for out-of-order writes if checksumming or buffering
if ((offset != m_nextoff) && (m_state || m_buffering_enabled))
{
std::stringstream ss;
ss << "Out-of-order writes not supported while running checksum. " << m_fname;
m_log.Emsg("Write", ss.str().c_str());
return -ENOTSUP;
// Flush any buffered data first
if (m_buffering_enabled) {
int flush_result = FlushWriteBuffer();
if (flush_result < 0) {
return flush_result;
}
// Disable buffering for the rest of this file
m_buffering_enabled = false;
}

if (m_state) {
std::stringstream ss;
ss << "Out-of-order writes not supported while running checksum. " << m_fname;
m_log.Emsg("Write", ss.str().c_str());
return -ENOTSUP;
}
}

// If buffering is enabled and configured
if (m_buffering_enabled) {
// If this is the first write or buffer is empty, initialize buffer offset
if (m_write_buffer.empty()) {
m_buffer_offset = offset;
}

// Check if this write is sequential to the buffer
off_t expected_offset = m_buffer_offset + static_cast<off_t>(m_write_buffer.size());
if (offset != expected_offset) {
// Not sequential - flush buffer and disable buffering
int flush_result = FlushWriteBuffer();
if (flush_result < 0) {
return flush_result;
}
m_buffering_enabled = false;
// Fall through to direct write
} else {
// Sequential write - check if we should buffer it
size_t total_size = m_write_buffer.size() + size;

if (total_size <= m_write_buffer_size) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are edge cases here I'd like approached differently.

If the total is over the buffer size, then copy as much data as you can to fill the buffer and then write. This way, writes are always the buffer size (except maybe the last before close) at the cost of memcpy.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to fill the buffer completely before flushing when total exceeds buffer size. After flush, remaining data is either buffered (if it fits) or written directly (if too large). All writes are now buffer-sized except possibly the last. Fixed in b830dd6.

// Buffer this write - reserve capacity to avoid reallocations
if (m_write_buffer.capacity() < total_size) {
m_write_buffer.reserve(total_size);
}
m_write_buffer.insert(m_write_buffer.end(),
static_cast<const unsigned char*>(buffer),
static_cast<const unsigned char*>(buffer) + size);
m_nextoff = offset + size;
return size;
} else {
// Buffer would exceed limit - flush and write directly
int flush_result = FlushWriteBuffer();
if (flush_result < 0) {
return flush_result;
}
// Fall through to direct write
}
}
}

// Direct write (no buffering or buffer disabled)
auto result = m_wrapped->Write(buffer, offset, size);
if (result >= 0) {m_nextoff += result;}
if (m_state)
{
m_state->Update(static_cast<const unsigned char*>(buffer), size);
if (result >= 0) {
m_nextoff = offset + result;
if (m_state && result > 0) {
// Only update checksum for the data that was actually written
m_state->Update(static_cast<const unsigned char*>(buffer), result);
}
}
return result;
}



// FlushWriteBuffer: Writes all buffered data to the underlying file system.
// Preconditions: Must be called with m_buffer_mutex held.
// Returns: 0 on success, negative error code on failure.
// Side effects:
// - Writes buffer contents via m_wrapped->Write()
// - Updates checksum state if enabled (m_state)
// - Clears m_write_buffer and resets m_buffer_offset on success
// - Handles partial writes with retry loop
// - On failure, buffer is NOT cleared to allow retry
int MultiuserFile::FlushWriteBuffer()
{
if (m_write_buffer.empty()) {
return 0;
}

size_t total_written = 0;
while (total_written < m_write_buffer.size()) {
auto result = m_wrapped->Write(m_write_buffer.data() + total_written,
m_buffer_offset + total_written,
m_write_buffer.size() - total_written);
if (result < 0) {
// Write failed - don't clear buffer, return error
return result;
}
if (result == 0) {
// No progress - treat as error
return -EIO;
}
total_written += result;
}

if (m_state) {
m_state->Update(m_write_buffer.data(), m_write_buffer.size());
}

m_write_buffer.clear();
m_buffer_offset = -1;

return 0;
}


int MultiuserFile::Close(long long *retsz)
{
std::lock_guard<std::mutex> lock(m_buffer_mutex);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly as above - if the buffering is disabled in the config file, then skip the lock.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use std::unique_lock with std::defer_lock - the lock is only acquired when m_write_buffer_size > 0, skipping the lock entirely when buffering is disabled. Fixed in b830dd6.


// Flush any remaining buffered data
if (!m_write_buffer.empty()) {
int flush_result = FlushWriteBuffer();
if (flush_result < 0) {
m_log.Emsg("Close", "Failed to flush write buffer");
// Continue with close anyway
}
}

auto close_result = m_wrapped->Close(retsz);
if (m_state)
{
Expand Down