Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conan.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"version": "0.5",
"requires": [
"batteries/0.70.1",
"batteries/0.70.3",
"boost/1.88.0",
"bzip2/1.0.8",
"cli11/2.5.0",
Expand Down
11 changes: 1 addition & 10 deletions src/llfs/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,13 @@
namespace llfs {

using batt::buffer_from_struct;
using batt::byte_distance;
using batt::ConstBuffer;
using batt::make_buffer;
using batt::mutable_buffer_from_struct;
using batt::MutableBuffer;
using batt::resize_buffer;

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
/** \brief Returns the distance, in bytes, from `begin` to `end`. If `end` is less than `begin`,
* the result is negative.
*/
inline isize byte_distance(const void* begin, const void* end)
{
return static_cast<const u8*>(end) - static_cast<const u8*>(begin);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
/** \brief Returns an empty sequence of ConstBuffer objects.
Expand Down
41 changes: 31 additions & 10 deletions src/llfs/committable_page_cache_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,23 @@ Status CommittablePageCacheJob::start_writing_new_pages()
return OkStatus();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
Status CommittablePageCacheJob::await_new_page_writes() noexcept
{
if (this->write_new_pages_context_) {
BATT_REQUIRE_OK(this->write_new_pages_context_->await_finish());
}
return OkStatus();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
/*explicit*/ CommittablePageCacheJob::WriteNewPagesContext::WriteNewPagesContext(
CommittablePageCacheJob* that) noexcept
: that{that}
, job{that->job_.get()}
, op_count{0}
, normalized_iop_count{0}
, used_byte_count{0}
, total_byte_count{0}
, done_counter{0}
Expand All @@ -329,6 +339,15 @@ Status CommittablePageCacheJob::start_writing_new_pages()
{
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
CommittablePageCacheJob::WriteNewPagesContext::~WriteNewPagesContext() noexcept
{
// Wait for any pending asynchronous writes to finish.
//
this->await_finish().IgnoreError();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
Status CommittablePageCacheJob::WriteNewPagesContext::start()
Expand All @@ -346,18 +365,22 @@ Status CommittablePageCacheJob::WriteNewPagesContext::start()
// throughput.
//
this->n_ops = this->job->get_new_pages().size();
this->ops = PageWriteOp::allocate_array(this->n_ops, this->done_counter);
this->ops = std::make_unique<PageWriteOp[]>(n_ops);
LLFS_VLOG(1) << "commit(PageCacheJob): writing new pages";
{
usize i = 0;
for (auto& p : this->job->get_new_pages()) {
auto increment_i = batt::finally([&i] {
++i;
});

const PageId page_id = p.first;

// There's no need to write recovered pages, since they are already durable; skip.
//
if (this->job->is_recovered_page(page_id)) {
LLFS_VLOG(1) << "commit(PageCacheJob): skipping recovered page " << page_id;
this->ops[i].get_handler()(batt::OkStatus());
this->ops[i].get_handler(page_id, &this->done_counter)(batt::OkStatus());
continue;
}

Expand All @@ -382,14 +405,12 @@ Status CommittablePageCacheJob::WriteNewPagesContext::start()
const usize page_size = page_header.size;
const usize used_size = page_header.used_size();

this->ops[i].page_id = page_id;

this->job->cache().async_write_new_page(std::move(*pinned_page), this->ops[i].get_handler());
this->job->cache().async_write_new_page(
std::move(*pinned_page), this->ops[i].get_handler(page_id, &this->done_counter));

this->total_byte_count += page_size;
this->used_byte_count += used_size;
this->op_count += page_size / 4096;
++i;
this->normalized_iop_count += page_size / kDirectIOBlockSize;
}
}

Expand Down Expand Up @@ -422,13 +443,13 @@ Status CommittablePageCacheJob::WriteNewPagesContext::await_finish()
});
#endif // LLFS_TRACK_NEW_PAGE_EVENTS

all_ops_status.Update(op.result);
all_ops_status.Update(op.result());
}
BATT_REQUIRE_OK(all_ops_status);

this->job->cache().metrics().total_bytes_written += this->total_byte_count;
this->job->cache().metrics().used_bytes_written += this->used_byte_count;
this->job->cache().metrics().total_write_ops += this->op_count;
this->job->cache().metrics().total_write_ops += this->normalized_iop_count;

return OkStatus();
}
Expand Down
8 changes: 7 additions & 1 deletion src/llfs/committable_page_cache_job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class CommittablePageCacheJob
*/
Status start_writing_new_pages();

/** \brief Waits for asynchronous writes of new pages to complete.
*/
Status await_new_page_writes() noexcept;

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
private:
struct DeviceUpdateState {
Expand All @@ -172,7 +176,7 @@ class CommittablePageCacheJob
struct WriteNewPagesContext {
CommittablePageCacheJob* const that;
const PageCacheJob* const job;
u64 op_count;
u64 normalized_iop_count;
u64 used_byte_count;
u64 total_byte_count;
batt::Watch<i64> done_counter;
Expand All @@ -186,6 +190,8 @@ class CommittablePageCacheJob
WriteNewPagesContext(const WriteNewPagesContext&) = delete;
WriteNewPagesContext& operator=(const WriteNewPagesContext&) = delete;

~WriteNewPagesContext() noexcept;

Status start();

Status await_finish();
Expand Down
5 changes: 5 additions & 0 deletions src/llfs/packed_pointer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ struct PackedPointer {
return reinterpret_cast<const T*>(reinterpret_cast<const u8*>(this) + this->offset);
}

void reset_unsafe(T* ptr)
{
this->offset = byte_distance(this, ptr);
}

template <typename Dst>
void reset(T* ptr, Dst* dst)
{
Expand Down
53 changes: 39 additions & 14 deletions src/llfs/page_write_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,51 @@ namespace llfs {

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
/*static*/ std::unique_ptr<PageWriteOp[]> PageWriteOp::allocate_array(
usize n, batt::Watch<i64>& done_counter)
/*explicit*/ PageWriteOp::PageWriteOp() noexcept
{
auto ops = std::make_unique<PageWriteOp[]>(n);
for (auto& op : as_slice(ops.get(), n)) {
op.done_counter = &done_counter;
}
return ops;
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
/*explicit*/ PageWriteOp::PageWriteOp() noexcept
PageWriteOp::~PageWriteOp() noexcept
{
BATT_CHECK_EQ(this->is_pending(), false) << "This write op has an uninvoked handler!";
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
PageWriteOp::~PageWriteOp() noexcept
batt::CustomAllocHandler<PageWriteOp::HandlerImpl> PageWriteOp::get_handler(
Comment thread
tonyastolfi marked this conversation as resolved.
PageId id, batt::Watch<i64>* done_counter) noexcept
{
BATT_CHECK_EQ(std::exchange(this->pending_, true), false)
<< "Only one handler per PageWriteOp is allowed at a time!";

BATT_CHECK_NOT_NULLPTR(done_counter);

this->page_id_ = id;
this->done_counter_ = done_counter;

return make_custom_alloc_handler(this->handler_memory_, HandlerImpl{.op_ = this});
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void PageWriteOp::HandlerImpl::operator()(PageDevice::WriteResult result) const noexcept
{
this->op_->handle_write(std::move(result));
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void PageWriteOp::handle_write(PageDevice::WriteResult result) noexcept
{
BATT_CHECK_EQ(std::exchange(this->pending_, false), true)
<< "PageWriteOp handlers may only be invoked once!";

BATT_CHECK_NOT_NULLPTR(this->done_counter_);

this->result_ = std::move(result);
this->done_counter_->fetch_add(1);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -48,12 +73,12 @@ Status parallel_drop_pages(const std::vector<PageId>& deleted_pages, PageCache&
}

batt::Watch<i64> done_counter{0};
std::unique_ptr<PageWriteOp[]> ops = PageWriteOp::allocate_array(n_ops, done_counter);
auto ops = std::make_unique<PageWriteOp[]>(n_ops);
{
usize i = 0;
for (const PageId page_id : deleted_pages) {
ops[i].page_id = page_id;
cache.arena_for_page_id(page_id).device().drop(page_id, ops[i].get_handler());
cache.arena_for_page_id(page_id).device().drop(page_id,
ops[i].get_handler(page_id, &done_counter));
++i;

#if LLFS_TRACK_NEW_PAGE_EVENTS
Expand All @@ -79,10 +104,10 @@ Status parallel_drop_pages(const std::vector<PageId>& deleted_pages, PageCache&
//
Status overall_status;
for (PageWriteOp& op : as_slice(ops.get(), n_ops)) {
Status page_status = op.result;
Status page_status = op.result();
overall_status.Update(page_status);
if (page_status.ok()) {
cache.purge(op.page_id, callers | Caller::PageCacheJob_commit_2, job_id);
cache.purge(op.page_id(), callers | Caller::PageCacheJob_commit_2, job_id);
}
}

Expand Down
79 changes: 66 additions & 13 deletions src/llfs/page_write_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,83 @@

namespace llfs {

// Represents/tracks a single async PageDevice::write/drop operation.
//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+---------------
//
/** \brief Represents/tracks a single async PageDevice::write/drop operation.
*/

struct PageWriteOp {
PageWriteOp(const PageWriteOp&) = delete;
PageWriteOp& operator=(const PageWriteOp&) = delete;
struct HandlerImpl {
PageWriteOp* op_;

batt::HandlerMemory<256> handler_memory;
PageId page_id;
batt::Watch<i64>* done_counter = nullptr;
PageDevice::WriteResult result;
void operator()(PageDevice::WriteResult result) const noexcept;
};

static std::unique_ptr<PageWriteOp[]> allocate_array(usize n, batt::Watch<i64>& done_counter);
//+++++++++++-+-+--+----- --- -- - - - -

explicit PageWriteOp() noexcept;

PageWriteOp(const PageWriteOp&) = delete;
PageWriteOp& operator=(const PageWriteOp&) = delete;

~PageWriteOp() noexcept;

auto get_handler()
//+++++++++++-+-+--+----- --- -- - - - -

/** \brief Returns a write handler for this op; there must only be one active handler per op at a
* time! If `this->get_handler()` is called twice before invoking the first returned handler (or
* a copy of it), this function will panic.
*/
batt::CustomAllocHandler<PageWriteOp::HandlerImpl> get_handler(
PageId id, batt::Watch<i64>* done_counter) noexcept;

/** \brief Returns the most recent result that was passed to the handler; only valid after
* get_handler has been called and the returned handler has been invoked.
*/
const PageDevice::WriteResult& result() const noexcept
{
return this->result_;
}

/** \brief Returns the most recent PageId passed to get_handler.
*/
PageId page_id() const noexcept
{
return make_custom_alloc_handler(this->handler_memory, [this](PageDevice::WriteResult result) {
this->result = std::move(result);
this->done_counter->fetch_add(1);
});
return this->page_id_;
}

/** \brief Returns true iff this op has a handler which hasn't been invoked.
*/
bool is_pending() const noexcept
{
return this->pending_;
}

//+++++++++++-+-+--+----- --- -- - - - -
private:
void handle_write(PageDevice::WriteResult result) noexcept;

//+++++++++++-+-+--+----- --- -- - - - -

/** \brief Pre-allocated memory for the write handler.
*/
batt::HandlerMemory<256> handler_memory_;

/** \brief The Watch to increment when this operation's handler is invoked.
*/
batt::Watch<i64>* done_counter_ = nullptr;

/** \brief The most recent page associated with this op.
*/
PageId page_id_;

/** \brief The result of the last operation.
*/
PageDevice::WriteResult result_;

/** \brief Set to true when get_handler is called; set to false when that handler is invoked.
*/
bool pending_ = false;
};

// Drops a range of PageIds in parallel.
Expand Down
3 changes: 3 additions & 0 deletions src/llfs/volume.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ StatusOr<SlotRange> Volume::append(AppendableJob&& appendable, batt::Grant& gran
if (write_new_pages_asap()) {
BATT_REQUIRE_OK(appendable.job.start_writing_new_pages());
}
auto wait_for_async_ops = batt::finally([&] {
appendable.job.await_new_page_writes().IgnoreError();
});

StatusOr<SlotRange> result;

Expand Down
2 changes: 2 additions & 0 deletions src/llfs/volume.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,8 @@ void VolumeSimTest::run_recovery_sim(u32 seed)
llfs::PageGraphNodeView::page_reader());

const auto main_task_fn = [&] {
BATT_DEBUG_INFO(BATT_INSPECT(seed));

sim.set_inject_failures_mode(false);

LLFS_VLOG(1) << "Entered main task;" << BATT_INSPECT(seed) << BATT_INSPECT(sim.is_running());
Expand Down