diff --git a/script b/script index 6d266c2d..9615fb2d 160000 --- a/script +++ b/script @@ -1 +1 @@ -Subproject commit 6d266c2de572dde8fcf70085b8fa960084aae663 +Subproject commit 9615fb2de30949f7aa369d964081f57c30f01733 diff --git a/src/llfs/ioring_impl.cpp b/src/llfs/ioring_impl.cpp index 58fa9d50..6766ba8a 100644 --- a/src/llfs/ioring_impl.cpp +++ b/src/llfs/ioring_impl.cpp @@ -161,14 +161,11 @@ Status IoRingImpl::run() noexcept // grabbing/running a completion handler? // constexpr usize kMaxNoopCount = 1000 * 1000; - auto on_scope_exit = batt::finally([this] { LLFS_DVLOG(1) << "IoRingImpl::run() " << BATT_INSPECT(this->work_count_) << " LEAVING"; }); - CompletionHandler* handler = nullptr; usize noop_count = 0; - while (this->can_run()) { LLFS_DVLOG(1) << "IoRingImpl::run() top of loop;" << BATT_INSPECT(this->work_count_); @@ -245,7 +242,6 @@ Status IoRingImpl::run() noexcept } } } - // If we are returning with a ready-to-run handler, stash it for later. // if (handler) { diff --git a/src/llfs/ioring_log_device_storage.cpp b/src/llfs/ioring_log_device_storage.cpp index ab36b2c7..9fcfa950 100644 --- a/src/llfs/ioring_log_device_storage.cpp +++ b/src/llfs/ioring_log_device_storage.cpp @@ -39,7 +39,6 @@ namespace llfs { , caller_{caller} , thread_{[this] { LLFS_VLOG(1) << "(" << this->caller_ << ") invoking IoRing::run()"; - Status io_status = this->storage_.io_ring_.run(); if (!io_status.ok()) { LLFS_LOG_WARNING() << "(" << this->caller_ << ") IoRing::run() returned: " << io_status; @@ -64,6 +63,7 @@ DefaultIoRingLogDeviceStorage::EventLoopTask::~EventLoopTask() noexcept void DefaultIoRingLogDeviceStorage::EventLoopTask::join() { this->join_called_ = true; + BATT_CHECK_OK(this->done_.await_equal(true)); this->thread_.join(); } diff --git a/src/llfs/ioring_log_initializer.ipp b/src/llfs/ioring_log_initializer.ipp new file mode 100644 index 00000000..b9937b1c --- /dev/null +++ b/src/llfs/ioring_log_initializer.ipp @@ -0,0 +1,180 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#pragma once +#ifndef LLFS_IORING_LOG_INITIALIZER_IPP +#define LLFS_IORING_LOG_INITIALIZER_IPP + +#include +// +#ifndef LLFS_DISABLE_IO_URING + +namespace llfs { + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +/*explicit*/ inline BasicIoRingLogInitializer::BasicIoRingLogInitializer( + usize n_tasks, typename IoRingImpl::File& file, const IoRingLogConfig& config, + u64 n_blocks_to_init) noexcept + : file_{file} + , config_{config} + , subtasks_(n_tasks) + , n_blocks_to_init_{n_blocks_to_init} +{ + for (auto& task : this->subtasks_) { + task.that = this; + task.buffer.clear(); + task.buffer.header.reset(); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +inline batt::Status BasicIoRingLogInitializer::run() +{ + LLFS_VLOG(1) << "IoRingLogInitializer::run() Entered; " + << BATT_INSPECT(this->config_.block_count()) + << BATT_INSPECT(this->config_.block_size()) + << BATT_INSPECT(this->config_.block_capacity()) + << BATT_INSPECT(this->n_blocks_to_init_); + { + batt::MutableBuffer memory{this->subtasks_.data(), this->subtasks_.size() * sizeof(Subtask)}; + LLFS_VLOG(1) << "memory = " << (const void*)memory.data() << ".." + << (const void*)(this->subtasks_.data() + this->subtasks_.size()); + + // Cache the file descriptor information in the kernel for faster access. + // + Status fd_status = this->file_.register_fd(); + BATT_REQUIRE_OK(fd_status); + + // Map our memory buffer to the kernel for faster I/O. + // + StatusOr buffers_status = this->file_.get_io_ring_impl()->register_buffers( + seq::single_item(std::move(memory)) | seq::boxed(), /*update=*/false); + + LLFS_VLOG(2) << "register_buffers status=" << buffers_status; + BATT_REQUIRE_OK(buffers_status); + } + auto on_scope_exit = batt::finally([&] { + this->file_.get_io_ring_impl()->unregister_buffers().IgnoreError(); + LLFS_VLOG(1) << "IoRingLogInitializer::run() Finished"; + }); + + for (auto& task : this->subtasks_) { + task.start_write(); + } + Status all_finished = this->finished_count_.await_equal(this->subtasks_.size()); + BATT_REQUIRE_OK(all_finished); + for (auto& task : this->subtasks_) { + BATT_REQUIRE_OK(task.final_status); + } + + return OkStatus(); +} + +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +inline void BasicIoRingLogInitializer::Subtask::start_write() +{ + LLFS_VLOG(2) << "[Subtask:" << this->self_index() + << "] IoRingLogInitializer::Subtask::start_write()" + << BATT_INSPECT(this->block_progress) << BATT_INSPECT(this->file_offset); + + BasicIoRingLogInitializer* const that = this->that; + const IoRingLogConfig& config = that->config_; + + batt::ConstBuffer bytes = this->buffer.as_const_buffer() + this->block_progress; + if (bytes.size() == 0) { + LLFS_VLOG(2) << " -- At end of buffer; fetching next block index..."; + + const usize block_i = that->next_block_i_.fetch_add(1); + + LLFS_VLOG(2) << " -- " << BATT_INSPECT(block_i) << "/" << that->n_blocks_to_init_; + + // If `block_i` is at or past the end of the log, we are done! Increment the finished count + // and return. + // + if (block_i >= that->n_blocks_to_init_) { + LLFS_VLOG(2) << " -- FINISHED (all blocks written)"; + this->finish(batt::OkStatus()); + return; + } + + // We have a new block to write. Reset our state and continue. + // + this->file_offset = config.physical_offset + config.block_size() * block_i; + this->block_progress = 0; + this->buffer.header.slot_offset = config.block_capacity() * block_i; + bytes = this->buffer.as_const_buffer(); + } + + LLFS_VLOG(2) << " -- async_write_some(offset=" << this->file_offset << ", bytes=[" << bytes.size() + << "])"; + + that->file_.async_write_some_fixed( + this->file_offset, bytes, /*buf_index=*/0, + batt::make_custom_alloc_handler(this->handler_memory, + [this](const batt::StatusOr& n_written) { + this->handle_write(n_written); + })); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +inline void BasicIoRingLogInitializer::Subtask::handle_write( + const batt::StatusOr& n_written) +{ + LLFS_VLOG(2) << "[Subtask:" << this->self_index() + << "] IoRingLogInitializer::Subtask::handle_write(status=" << n_written.status() + << ", n_written=" << (n_written.ok() ? *n_written : -1) << ")"; + + if (!n_written.ok()) { + this->finish(n_written.status()); + return; + } + + BATT_CHECK_GE(*n_written, 0); + + this->block_progress += *n_written; + this->file_offset += *n_written; + this->start_write(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +inline void BasicIoRingLogInitializer::Subtask::finish(const batt::Status& status) +{ + BATT_CHECK(!this->done); + + this->done = true; + this->final_status.Update(status); + const auto prior_finished_count = this->that->finished_count_.fetch_add(1); + + LLFS_VLOG(2) << BATT_INSPECT(prior_finished_count); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +template +inline usize BasicIoRingLogInitializer::Subtask::self_index() const +{ + return this - this->that->subtasks_.data(); +} + +} // namespace llfs + +#endif // LLFS_DISABLE_IO_URING + +#endif // LLFS_IORING_LOG_INITIALIZER_IPP diff --git a/src/llfs/page_allocator_config.cpp b/src/llfs/page_allocator_config.cpp index 12299c8a..809feee4 100644 --- a/src/llfs/page_allocator_config.cpp +++ b/src/llfs/page_allocator_config.cpp @@ -153,8 +153,7 @@ StatusOr> recover_storage_object( const auto page_ids = PageIdFactory{ PageCount{BATT_CHECKED_CAST(PageCount::value_type, p_allocator_config->page_count.value())}, - p_allocator_config->page_device_id, - }; + p_allocator_config->page_device_id}; return PageAllocator::recover(allocator_options, page_ids, **log_factory); } diff --git a/src/llfs/page_arena_config.cpp b/src/llfs/page_arena_config.cpp index e42b936c..fd5ad980 100644 --- a/src/llfs/page_arena_config.cpp +++ b/src/llfs/page_arena_config.cpp @@ -191,7 +191,6 @@ StatusOr recover_storage_object( }; BATT_CHECK_EQ(arena.id(), arena.allocator().get_device_id()); - return arena; } diff --git a/src/llfs/page_cache.cpp b/src/llfs/page_cache.cpp index 39c60115..2e54e31d 100644 --- a/src/llfs/page_cache.cpp +++ b/src/llfs/page_cache.cpp @@ -22,7 +22,7 @@ namespace llfs { //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -usize get_page_size(const PageCache::PageDeviceEntry* entry) +usize get_page_size(std::shared_ptr entry) { return entry ? get_page_size(entry->arena) : 0; } @@ -102,66 +102,12 @@ PageCache::PageCache(std::vector&& storage_pool, , metrics_{} , page_readers_{std::make_shared>()} { - this->cache_slot_pool_by_page_size_log2_.fill(nullptr); - - // Find the maximum page device id value. - // - page_device_id_int max_page_device_id = 0; - for (const PageArena& arena : storage_pool) { - max_page_device_id = std::max(max_page_device_id, arena.device().get_id()); - } - - // Populate this->page_devices_. - // - this->page_devices_.resize(max_page_device_id + 1); - for (PageArena& arena : storage_pool) { - const page_device_id_int device_id = arena.device().get_id(); - const auto page_size_log2 = batt::log2_ceil(arena.device().page_size()); - - BATT_CHECK_EQ(PageSize{1} << page_size_log2, arena.device().page_size()) - << "Page sizes must be powers of 2!"; - - BATT_CHECK_LT(page_size_log2, kMaxPageSizeLog2); - - // Create a slot pool for this page size if we haven't already done so. - // - if (!this->cache_slot_pool_by_page_size_log2_[page_size_log2]) { - this->cache_slot_pool_by_page_size_log2_[page_size_log2] = PageCacheSlot::Pool::make_new( - /*n_slots=*/this->options_.max_cached_pages_per_size_log2[page_size_log2], - /*name=*/batt::to_string("size_", u64{1} << page_size_log2)); - } - - BATT_CHECK_EQ(this->page_devices_[device_id], nullptr) - << "Duplicate entries found for the same device id!" << BATT_INSPECT(device_id); - - this->page_devices_[device_id] = std::make_unique( // - std::move(arena), // - batt::make_copy(this->cache_slot_pool_by_page_size_log2_[page_size_log2]) // - ); - - // We will sort these later. - // - this->page_devices_by_page_size_.emplace_back(this->page_devices_[device_id].get()); + { + batt::ScopedWriteLock state(this->state_); + state->cache_slot_pool_by_page_size_log2.fill(nullptr); } - BATT_CHECK_EQ(this->page_devices_by_page_size_.size(), storage_pool.size()); - // Sort the storage pool by page size (MUST be first). - // - std::sort(this->page_devices_by_page_size_.begin(), this->page_devices_by_page_size_.end(), - PageSizeOrder{}); - - // Index the storage pool into groups of arenas by page size. - // - for (usize size_log2 = 6; size_log2 < kMaxPageSizeLog2; ++size_log2) { - auto iter_pair = std::equal_range(this->page_devices_by_page_size_.begin(), - this->page_devices_by_page_size_.end(), - PageSize{u32{1} << size_log2}, PageSizeOrder{}); - - this->page_devices_by_page_size_log2_[size_log2] = - as_slice(this->page_devices_by_page_size_.data() + - std::distance(this->page_devices_by_page_size_.begin(), iter_pair.first), - as_range(iter_pair).size()); - } + BATT_CHECK_OK(this->add_page_devices(storage_pool)); // Register metrics. // @@ -253,7 +199,8 @@ batt::Status PageCache::register_page_reader(const PageLayoutId& layout_id, cons // void PageCache::close() { - for (const std::unique_ptr& entry : this->page_devices_) { + batt::ScopedReadLock state(this->state_); + for (const std::shared_ptr& entry : state->page_devices) { if (entry) { entry->arena.halt(); } @@ -264,7 +211,8 @@ void PageCache::close() // void PageCache::join() { - for (const std::unique_ptr& entry : this->page_devices_) { + batt::ScopedReadLock state(this->state_); + for (const std::shared_ptr& entry : state->page_devices) { if (entry) { entry->arena.join(); } @@ -302,48 +250,82 @@ StatusOr> PageCache::allocate_page_of_size_log2( LatencyTimer alloc_timer{this->metrics_.allocate_page_alloc_latency}; - Slice device_entries = this->devices_with_page_size_log2(size_log2); + static const char* kOperationName = "PageCache::allocate_page_of_size_log2"; - // TODO [tastolfi 2021-09-08] If the caller wants to wait, which device should we wait on? First - // available? Random? Round-Robin? + // TODO: [Gabe Bornstein 6/19/24] Tony mentioned there's a different method besides + // `with_retry_policy` that we could use. It could potentially cause less congestion in the + // pipeline of CheckpointGenerator -> Trim -> PageRecycler -> Etc. However, it's more complicated // - for (auto wait_arg : {batt::WaitForResource::kFalse, batt::WaitForResource::kTrue}) { - for (PageDeviceEntry* device_entry : device_entries) { - PageArena& arena = device_entry->arena; - StatusOr page_id = arena.allocator().allocate_page(wait_arg, cancel_token); - if (!page_id.ok()) { - if (page_id.status() == batt::StatusCode::kResourceExhausted) { - const u64 page_size = u64{1} << size_log2; - LLFS_LOG_INFO_FIRST_N(1) // - << "Failed to allocate page (pool is empty): " << BATT_INSPECT(page_size); - } - continue; - } - - BATT_CHECK_EQ(PageIdFactory::get_device_id(*page_id), arena.id()); - - LLFS_VLOG(1) << "allocated page " << *page_id; - - this->track_new_page_event(NewPageTracker{ - .ts = 0, - .job_id = job_id, - .page_id = *page_id, - .callers = callers, - .event_id = (int)NewPageTracker::Event::kAllocate, - }); - - // PageDevice::prepare must be thread-safe. - // - return arena.device().prepare(*page_id); - } - - if (wait_for_resource == batt::WaitForResource::kFalse) { - break; - } - } - - LLFS_LOG_WARNING() << "No arena with free space could be found"; - return Status{batt::StatusCode::kUnavailable}; // TODO [tastolfi 2021-10-20] + return // + batt::with_retry_policy( // + // TODO: [Gabe Bornstein 6/20/24] Ensure MAX waittime is < 100 ms + // + batt::ExponentialBackoff{ + .max_attempts = ~u64{0}, + .initial_delay_usec = 500, + .backoff_factor = 3, + .backoff_divisor = 2, + .max_delay_usec = 1000 * 100, + }, + kOperationName, // + [this, &size_log2, &cancel_token, &job_id, &callers, &wait_for_resource] { + this->update_available_pages(); + + batt::StatusOr> page_buffer; + batt::Status s = batt::StatusCode::kResourceExhausted; + page_buffer = s; + + Slice> device_entries = + this->devices_with_page_size_log2(size_log2); + // TODO: [Gabe Bornstein 6/20/24] Currently, we're dropping the state lock in between + // `update_available_pages` and `allocate_page`. Could this cause issues? Could we run + // out of pages? + // + batt::ScopedWriteLock state(this->state_); + // If available_pages does not have not have any pages that match device_entries, + // nothing to do. No available pages. Return and try again. + // + for (std::shared_ptr device_entry : device_entries) { + const PageArena& arena = device_entry->arena; + if (state->arenas_with_available_pages.find(arena.id()) == + state->arenas_with_available_pages.end()) { + continue; + } + StatusOr page_id = + arena.allocator().allocate_page(batt::WaitForResource::kFalse, cancel_token); + if (!page_id.ok()) { + if (page_id.status() == batt::StatusCode::kResourceExhausted) { + const u64 page_size = u64{1} << size_log2; + LLFS_LOG_INFO_FIRST_N(1) // + << "Failed to allocate page (pool is empty): " << BATT_INSPECT(page_size); + } + continue; + } + BATT_CHECK_EQ(PageIdFactory::get_device_id(*page_id), arena.id()); + + LLFS_VLOG(1) << "allocated page " << *page_id; + + this->track_new_page_event(NewPageTracker{ + .ts = 0, + .job_id = job_id, + .page_id = *page_id, + .callers = callers, + .event_id = (int)NewPageTracker::Event::kAllocate, + }); + // PageDevice::prepare must be thread-safe. + // + page_buffer = arena.device().prepare(*page_id); + break; + } + + return page_buffer; + }, // + batt::TaskSleepImpl{}, // + [](const batt::Status& s) -> bool { // + VLOG(2) << "batt::StatusCode::kResourceExhausted == " + << (s == batt::StatusCode::kResourceExhausted); + return batt::status_is_retryable(s) || (s == batt::StatusCode::kResourceExhausted); + }); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -384,7 +366,7 @@ Status PageCache::attach(const boost::uuids::uuid& user_id, slot_offset_type slo } }); - for (PageDeviceEntry* entry : this->all_devices()) { + for (std::shared_ptr entry : this->all_devices()) { BATT_CHECK_NOT_NULLPTR(entry); auto arena_status = entry->arena.allocator().attach_user(user_id, slot_offset); BATT_REQUIRE_OK(arena_status); @@ -400,7 +382,7 @@ Status PageCache::attach(const boost::uuids::uuid& user_id, slot_offset_type slo // Status PageCache::detach(const boost::uuids::uuid& user_id, slot_offset_type slot_offset) { - for (PageDeviceEntry* entry : this->all_devices()) { + for (std::shared_ptr entry : this->all_devices()) { BATT_CHECK_NOT_NULLPTR(entry); auto arena_status = entry->arena.allocator().detach_user(user_id, slot_offset); BATT_REQUIRE_OK(arena_status); @@ -420,14 +402,18 @@ void PageCache::prefetch_hint(PageId page_id) //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Slice PageCache::all_devices() const +std::vector> PageCache::all_devices() { - return as_slice(this->page_devices_by_page_size_); + batt::ScopedReadLock state(this->state_); + std::vector> devices = + state->page_devices_by_page_size; + return devices; } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Slice PageCache::devices_with_page_size(usize size) const +Slice> PageCache::devices_with_page_size( + usize size) { const usize size_log2 = batt::log2_ceil(size); BATT_CHECK_EQ(size, usize{1} << size_log2) << "page size must be a power of 2"; @@ -437,31 +423,32 @@ Slice PageCache::devices_with_page_size(usize //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Slice PageCache::devices_with_page_size_log2( - usize size_log2) const +Slice> PageCache::devices_with_page_size_log2( + usize size_log2) { BATT_CHECK_LT(size_log2, kMaxPageSizeLog2); - - return this->page_devices_by_page_size_log2_[size_log2]; + batt::ScopedReadLock state(this->state_); + return state->page_devices_by_page_size_log2[size_log2]; } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -const PageArena& PageCache::arena_for_page_id(PageId page_id) const +const PageArena& PageCache::arena_for_page_id(PageId page_id) { return this->arena_for_device_id(PageIdFactory::get_device_id(page_id)); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -const PageArena& PageCache::arena_for_device_id(page_device_id_int device_id_val) const +const PageArena& PageCache::arena_for_device_id(page_device_id_int device_id_val) { - BATT_CHECK_LT(device_id_val, this->page_devices_.size()) + batt::ScopedReadLock state(this->state_); + BATT_CHECK_LT(device_id_val, state->page_devices.size()) << "the specified page_id's device is not in the storage pool for this cache"; - BATT_CHECK_NOT_NULLPTR(this->page_devices_[device_id_val]); + BATT_CHECK_NOT_NULLPTR(state->page_devices[device_id_val]); - return this->page_devices_[device_id_val]->arena; + return state->page_devices[device_id_val]->arena; } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -531,16 +518,100 @@ void PageCache::purge(PageId page_id, u64 callers, u64 job_id) } } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +batt::Status PageCache::add_page_devices(std::vector& arenas) +{ + { + batt::ScopedWriteLock state(this->state_); + // TODO: [Gabe Bornstein 6/6/24] Read over this function and make sure it won't break anything + // with just appending some values vs. creating page_cache from scratch. + + // Find the maximum page device id value. + // + // TODO: [Gabe Bornstein 6/6/24] Consider, this only looks for the max_page_device_id in + // `arenas`. Do we need to include pre-existing ids in `page_devices` as well? + // + page_device_id_int max_page_device_id = 0; + for (const PageArena& arena : arenas) { + max_page_device_id = std::max(max_page_device_id, arena.device().get_id()); + } + + // Populate state.page_devices. + // + state->page_devices.resize(max_page_device_id + 1); + for (PageArena& arena : arenas) { + const page_device_id_int device_id = arena.device().get_id(); + const auto page_size_log2 = batt::log2_ceil(arena.device().page_size()); + + BATT_CHECK_EQ(PageSize{1} << page_size_log2, arena.device().page_size()) + << "Page sizes must be powers of 2!"; + + BATT_CHECK_LT(page_size_log2, kMaxPageSizeLog2); + + // Create a slot pool for this page size if we haven't already done so. + // + if (!state->cache_slot_pool_by_page_size_log2[page_size_log2]) { + state->cache_slot_pool_by_page_size_log2[page_size_log2] = PageCacheSlot::Pool::make_new( + /*n_slots=*/this->options_.max_cached_pages_per_size_log2[page_size_log2], + /*name=*/batt::to_string("size_", u64{1} << page_size_log2)); + } + + BATT_CHECK_EQ(state->page_devices[device_id], nullptr) + << "Duplicate entries found for the same device id!" << BATT_INSPECT(device_id); + + state->page_devices[device_id] = std::make_shared( // + std::move(arena), // + batt::make_copy(state->cache_slot_pool_by_page_size_log2[page_size_log2]) // + ); + + // We will sort these later. + // + state->page_devices_by_page_size.emplace_back(state->page_devices[device_id]); + } + // BATT_CHECK_EQ(state->page_devices_by_page_size.size(), storage_pool.size()); + + // Sort the storage pool by page size (MUST be first). + // + std::sort(state->page_devices_by_page_size.begin(), state->page_devices_by_page_size.end(), + PageSizeOrder{}); + + // Index the storage pool into groups of arenas by page size. + // + for (usize size_log2 = 6; size_log2 < kMaxPageSizeLog2; ++size_log2) { + auto iter_pair = std::equal_range(state->page_devices_by_page_size.begin(), + state->page_devices_by_page_size.end(), + PageSize{u32{1} << size_log2}, PageSizeOrder{}); + + state->page_devices_by_page_size_log2[size_log2] = + as_slice(state->page_devices_by_page_size.data() + + std::distance(state->page_devices_by_page_size.begin(), iter_pair.first), + as_range(iter_pair).size()); + } + } + + return batt::OkStatus(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +int PageCache::get_num_page_devices() +{ + batt::ScopedReadLock state(this->state_); + return state->page_devices.size(); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // PageCache::PageDeviceEntry* PageCache::get_device_for_page(PageId page_id) { + batt::ScopedReadLock state(this->state_); const page_device_id_int device_id = PageIdFactory::get_device_id(page_id); - if (BATT_HINT_FALSE(device_id >= this->page_devices_.size())) { + if (BATT_HINT_FALSE(device_id >= state->page_devices.size())) { return nullptr; } - return this->page_devices_[device_id].get(); + return state->page_devices[device_id].get(); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -588,6 +659,18 @@ auto PageCache::find_page_in_cache(PageId page_id, const Optional& }); } +void PageCache::update_available_pages() +{ + batt::ScopedWriteLock state(this->state_); + for (std::shared_ptr page_device : state->page_devices) { + if (page_device->arena.allocator().free_pool_size() != 0) { + state->arenas_with_available_pages.insert(page_device->arena.id()); + } else { + state->arenas_with_available_pages.erase(page_device->arena.id()); + } + } +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // void PageCache::async_load_page_into_slot(const PageCacheSlot::PinnedRef& pinned_slot, @@ -698,7 +781,7 @@ void PageCache::track_new_page_event(const NewPageTracker& tracker) //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -BoxedSeq PageCache::find_new_page_events(PageId page_id) const +BoxedSeq PageCache::find_new_page_events(PageId page_id) { const isize n = this->history_end_.load(); return batt::as_seq(boost::irange(isize{0}, isize(this->history_.size()))) // diff --git a/src/llfs/page_cache.hpp b/src/llfs/page_cache.hpp index da1f12ec..7afdd77c 100644 --- a/src/llfs/page_cache.hpp +++ b/src/llfs/page_cache.hpp @@ -42,6 +42,7 @@ #include #include #include +#include #include #include @@ -189,15 +190,15 @@ class PageCache : public PageLoader Status detach(const boost::uuids::uuid& user_id, slot_offset_type slot_offset); - Slice devices_with_page_size_log2(usize size_log2) const; + Slice> devices_with_page_size_log2(usize size_log2); - Slice devices_with_page_size(usize size) const; + Slice> devices_with_page_size(usize size); - Slice all_devices() const; + std::vector> all_devices(); - const PageArena& arena_for_page_id(PageId id_val) const; + const PageArena& arena_for_page_id(PageId id_val); - const PageArena& arena_for_device_id(page_device_id_int device_id_val) const; + const PageArena& arena_for_device_id(page_device_id_int device_id_val); //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // PageLoader interface @@ -229,6 +230,10 @@ class PageCache : public PageLoader */ StatusOr put_view(std::shared_ptr&& view, u64 callers, u64 job_id); + batt::Status add_page_devices(std::vector& arenas); + + int get_num_page_devices(); + //----- --- -- - - - - /** \brief Removes all cached data for the specified page. */ @@ -236,7 +241,7 @@ class PageCache : public PageLoader bool page_might_contain_key(PageId id, const KeyView& key) const; - BoxedSeq find_new_page_events(PageId page_id) const; + BoxedSeq find_new_page_events(PageId page_id); void track_new_page_event(const NewPageTracker& tracker); @@ -245,16 +250,18 @@ class PageCache : public PageLoader return this->metrics_; } - const PageCacheSlot::Pool::Metrics& metrics_for_page_size(PageSize page_size) const + const PageCacheSlot::Pool::Metrics& metrics_for_page_size(PageSize page_size) { const i32 page_size_log2 = batt::log2_ceil(page_size); + batt::ScopedReadLock state(this->state_); + BATT_CHECK_LT(static_cast(page_size_log2), - this->cache_slot_pool_by_page_size_log2_.size()); + state->cache_slot_pool_by_page_size_log2.size()); - BATT_CHECK_NOT_NULLPTR(this->cache_slot_pool_by_page_size_log2_[page_size_log2]); + BATT_CHECK_NOT_NULLPTR(state->cache_slot_pool_by_page_size_log2[page_size_log2]); - return this->cache_slot_pool_by_page_size_log2_[page_size_log2]->metrics(); + return state->cache_slot_pool_by_page_size_log2[page_size_log2]->metrics(); } private: @@ -263,6 +270,35 @@ class PageCache : public PageLoader using PageLayoutReaderMap = std::unordered_map; + struct State { + // The arenas backing up this cache, indexed by device id int. + // + std::vector> page_devices; + + // The contents of `storage_pool_`, sorted by non-decreasing page size. + // + std::vector> page_devices_by_page_size; + + // PageDevices that are not full. If a PageDevice is in this list after `update_available_pages` + // is called, it means that PageDevice has un-used pages. a call to `allocate_page` should + // succeed on that device. + // + std::unordered_set arenas_with_available_pages; + // std::vector> available_pages; + + // Slices of `this->storage_pool_` that group arenas by page size (log2). For example, + // `this->arenas_by_size_log2_[12]` is the slice of `this->storage_pool_` comprised of + // PageArenas whose page size is 4096. + // + std::array>, kMaxPageSizeLog2> + page_devices_by_page_size_log2; + + // A pool of cache slots for each page size. + // + std::array, kMaxPageSizeLog2> + cache_slot_pool_by_page_size_log2; + }; + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - explicit PageCache(std::vector&& storage_pool, @@ -297,6 +333,10 @@ class PageCache : public PageLoader batt::StatusOr find_page_in_cache( PageId page_id, const Optional& required_layout, OkIfNotFound ok_if_not_found); + // Requires that ScopedWriteLock is acquired on state before calling. + // + void update_available_pages(/*batt::ScopedWriteLock& state*/); + //----- --- -- - - - - /** \brief Populates the passed PageCacheSlot asynchronously by attempting to read the page from * storage and setting the Latch value of the slot. @@ -321,24 +361,7 @@ class PageCache : public PageLoader // PageCacheMetrics metrics_; - // The arenas backing up this cache, indexed by device id int. - // - std::vector> page_devices_; - - // The contents of `storage_pool_`, sorted by non-decreasing page size. - // - std::vector page_devices_by_page_size_; - - // Slices of `this->storage_pool_` that group arenas by page size (log2). For example, - // `this->arenas_by_size_log2_[12]` is the slice of `this->storage_pool_` comprised of - // PageArenas whose page size is 4096. - // - std::array, kMaxPageSizeLog2> page_devices_by_page_size_log2_; - - // A pool of cache slots for each page size. - // - std::array, kMaxPageSizeLog2> - cache_slot_pool_by_page_size_log2_; + batt::ReadWriteMutex state_; // A thread-safe shared map from PageLayoutId to PageReader function; layouts must be registered // with the PageCache so that we trace references during page recycling (aka garbage collection). diff --git a/src/llfs/page_id_factory.hpp b/src/llfs/page_id_factory.hpp index 8aa66c8a..58994f56 100644 --- a/src/llfs/page_id_factory.hpp +++ b/src/llfs/page_id_factory.hpp @@ -52,6 +52,7 @@ constexpr page_id_int kPageDeviceIdMask = ((page_id_int{1} << kPageDeviceIdBits) // |<-- kPageDeviceIdBits -->|<-- log_2(Max-Generation-Count) -->|<- log_2(Physical-Page-Count) ->| // +-------------------------+-----------------------------------+--------------------------------+ // + class PageIdFactory : public boost::equality_comparable { public: diff --git a/src/llfs/raw_block_file_impl.cpp b/src/llfs/raw_block_file_impl.cpp index 5985c783..95bdd210 100644 --- a/src/llfs/raw_block_file_impl.cpp +++ b/src/llfs/raw_block_file_impl.cpp @@ -39,7 +39,6 @@ namespace llfs { } }); BATT_REQUIRE_OK(batt::status_from_retval(fd)); - return std::make_unique(IoRing::File{io_ring, fd}); } diff --git a/src/llfs/storage_context.cpp b/src/llfs/storage_context.cpp index 2a412fb8..d3d438cd 100644 --- a/src/llfs/storage_context.cpp +++ b/src/llfs/storage_context.cpp @@ -42,12 +42,10 @@ Status StorageContext::add_existing_named_file(std::string&& file_name, i64 star { StatusOr fd = open_file_read_write(file_name, OpenForAppend{false}, OpenRawIO{true}); BATT_REQUIRE_OK(fd); - IoRingRawBlockFile file{IoRing::File{*this->io_ring_, *fd}}; StatusOr>> config_blocks = read_storage_file(file, start_offset); BATT_REQUIRE_OK(config_blocks); - return this->add_existing_file( batt::make_shared(std::move(file_name), std::move(*config_blocks))); } @@ -63,16 +61,15 @@ Status StorageContext::add_new_file(const std::string& file_name, IoRingRawBlockFile::open(*this->io_ring_, file_name.c_str(), /*flags=*/O_RDWR | O_CREAT | O_EXCL | O_DIRECT | O_SYNC, /*mode=*/S_IRUSR | S_IWUSR)); - - StorageFileBuilder builder{*file, /*base_offset=*/0}; - + llfs::page_device_id_int initial_device_id = + (this->page_cache_) ? this->page_cache_->get_num_page_devices() : 0; + StorageFileBuilder builder{*file, /*base_offset=*/0, initial_device_id}; Status init_status = initializer(builder); if (!init_status.ok()) { file->close().IgnoreError(); delete_file(file_name).IgnoreError(); return init_status; } - Status flush_status = builder.flush_all(); BATT_REQUIRE_OK(flush_status); } @@ -86,11 +83,125 @@ Status StorageContext::add_existing_file(const batt::SharedPtr& fil file->find_all_objects() // | seq::for_each([&](const FileOffsetPtr& slot) { LLFS_VLOG(1) << "Adding " << *slot << " to storage context"; - this->index_.emplace(slot->uuid, batt::make_shared(batt::make_copy(file), slot)); }); + return OkStatus(); +} +// TODO: [Gabe Bornstein 6/4/24] Could encapsulate some of these params in a new llfs object +// +Status StorageContext::increase_storage_capacity( + const std::filesystem::path& dir_path, u64 increase_capacity, PageSize leaf_size, + PageSizeLog2 leaf_size_log2, PageSize node_size, PageSizeLog2 node_size_log2, + const char* const kPageFileName, unsigned int max_tree_height, unsigned int max_attachments) +{ + // TODO: [Gabe Bornstein 6/3/24] A lot of this code is copy-pasted and could be de-duped. This + // code could potentially be moved into turtle_db. It basically already exists there in + // DB::create. + // + // Calculate the page counts from the total capacity and TreeOptions. + // + const auto max_in_refs_size_per_leaf = 64 * max_tree_height; + + const auto leaf_page_count = + llfs::PageCount{increase_capacity / (leaf_size + max_in_refs_size_per_leaf)}; + + const auto total_leaf_pages_size = leaf_page_count * leaf_size; + const auto total_node_pages_size = increase_capacity - total_leaf_pages_size; + + const auto node_page_count = llfs::PageCount{total_node_pages_size / node_size}; + + VLOG(1) << BATT_INSPECT(increase_capacity) << BATT_INSPECT(node_page_count) + << BATT_INSPECT(leaf_page_count); + + LOG(INFO) << "PAGE COUNT add_existing_file: " << node_page_count; + // Create the page file. + // + Status page_file_status = this->add_new_file( + (dir_path / kPageFileName).string(), + [&](llfs::StorageFileBuilder& builder) -> Status // + { + // Add an arena for node pages. + // + llfs::StatusOr> node_pool_config = + builder.add_object( + llfs::PageArenaConfigOptions{ + .uuid = None, + .page_allocator = + llfs::CreateNewPageAllocator{ + .options = + llfs::PageAllocatorConfigOptions{ + .uuid = llfs::None, + .max_attachments = max_attachments, + .page_count = node_page_count, + .log_device = + llfs::CreateNewLogDevice2WithDefaultSize{ + .uuid = llfs::None, + .device_page_size_log2=None, + .data_alignment_log2=None, + }, + .page_size_log2 = node_size_log2, + .page_device = llfs::LinkToNewPageDevice{}, + }, + }, + .page_device = + llfs::CreateNewPageDevice{ + .options = + llfs::PageDeviceConfigOptions{ + .uuid = llfs::None, + .device_id = llfs::None, + .page_count = node_page_count, + .page_size_log2 = node_size_log2, + }, + }, + }); + + BATT_REQUIRE_OK(node_pool_config); + + // Add an arena for leaf pages. + // + llfs::StatusOr> leaf_pool_config = + builder.add_object( + llfs::PageArenaConfigOptions{ + .uuid = None, + .page_allocator = + llfs::CreateNewPageAllocator{ + .options = + llfs::PageAllocatorConfigOptions{ + .uuid = llfs::None, + .max_attachments = max_attachments, + .page_count = leaf_page_count, + .log_device = + llfs::CreateNewLogDevice2WithDefaultSize{ + .uuid = llfs::None, + .device_page_size_log2=None, + .data_alignment_log2=None, + }, + .page_size_log2 = leaf_size_log2, + .page_device = llfs::LinkToNewPageDevice{}, + }, + }, + .page_device = + llfs::CreateNewPageDevice{ + .options = + llfs::PageDeviceConfigOptions{ + .uuid = llfs::None, + .device_id = llfs::None, + .page_count = leaf_page_count, + .page_size_log2 = leaf_size_log2, + }, + }, + }); + BATT_REQUIRE_OK(leaf_pool_config); + return OkStatus(); + }); + + BATT_REQUIRE_OK(page_file_status); + std::vector arenas; + BATT_CHECK_OK(this->recover_arenas(arenas)); + BATT_CHECK_NE(this->page_cache_, nullptr); + BATT_CHECK_OK(this->page_cache_->add_page_devices(arenas)); return OkStatus(); } @@ -131,6 +242,64 @@ void StorageContext::set_page_cache_options(const PageCacheOptions& options) this->page_cache_options_ = options; } +// TODO: [Gabe Bornstein 6/7/24] This could probably be a private function. +// +Status StorageContext::recover_arena(std::vector& arenas, boost::uuids::uuid uuid, + batt::SharedPtr p_object_info) +{ + if (p_object_info->p_config_slot->tag == PackedConfigSlotBase::Tag::kPageArena) { + const auto& packed_arena_config = + config_slot_cast(p_object_info->p_config_slot.object); + + // If we have already recovered a device with the same uuid before, return immediately. If we + // recover the same device twice, would could have asynchronous write conflicts in the future. + // + if (this->recovered_uuids_.find(uuid) != this->recovered_uuids_.end()) { + return OkStatus(); + } + + const std::string base_name = + batt::to_string("PageDevice_", packed_arena_config.page_device_uuid); + StatusOr arena = this->recover_object( + batt::StaticType{}, uuid, + PageAllocatorRuntimeOptions{ + .scheduler = this->get_scheduler(), + .name = batt::to_string(base_name, "_Allocator"), + }, + [&] { + LogDeviceRuntimeOptions options; + options.name = batt::to_string(base_name, "_AllocatorLog"); + return options; + }(), + IoRingFileRuntimeOptions{ + .io_ring = this->get_io_ring(), + .use_raw_io = true, + .allow_read = true, + .allow_write = true, + }); + + // TODO: [Gabe Bornstein 6/11/24] Consider, should I insert before calling recover_object? + // + this->recovered_uuids_.insert(uuid); + + BATT_REQUIRE_OK(arena); + arenas.emplace_back(std::move(*arena)); + } + return OkStatus(); +} + +// TODO: [Gabe Bornstein 6/7/24] This could probably be a private function. +// +Status StorageContext::recover_arenas(std::vector& arenas) +{ + // Add Arenas to PageCache. + // + for (const auto& [uuid, p_object_info] : this->index_) { + BATT_CHECK_OK(this->recover_arena(arenas, uuid, p_object_info)); + } + return OkStatus(); +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // StatusOr> StorageContext::get_page_cache() @@ -143,33 +312,7 @@ StatusOr> StorageContext::get_page_cache() for (const auto& [uuid, p_object_info] : this->index_) { if (p_object_info->p_config_slot->tag == PackedConfigSlotBase::Tag::kPageArena) { - const auto& packed_arena_config = - config_slot_cast(p_object_info->p_config_slot.object); - - const std::string base_name = - batt::to_string("PageDevice_", packed_arena_config.page_device_uuid); - - StatusOr arena = this->recover_object( - batt::StaticType{}, uuid, - PageAllocatorRuntimeOptions{ - .scheduler = this->scheduler_, - .name = batt::to_string(base_name, "_Allocator"), - }, - [&] { - LogDeviceRuntimeOptions options; - options.name = batt::to_string(base_name, "_AllocatorLog"); - return options; - }(), - IoRingFileRuntimeOptions{ - .io_ring = *this->io_ring_, - .use_raw_io = true, - .allow_read = true, - .allow_write = true, - }); - - BATT_REQUIRE_OK(arena); - - storage_pool.emplace_back(std::move(*arena)); + BATT_CHECK_OK(this->recover_arena(storage_pool, uuid, p_object_info)); } } diff --git a/src/llfs/storage_context.hpp b/src/llfs/storage_context.hpp index fe107c43..a929ad99 100644 --- a/src/llfs/storage_context.hpp +++ b/src/llfs/storage_context.hpp @@ -103,10 +103,22 @@ class StorageContext : public batt::RefCounted // Status add_existing_file(const batt::SharedPtr& file); + Status increase_storage_capacity(const std::filesystem::path& dir_path, u64 increase_capacity, + PageSize leaf_size, PageSizeLog2 leaf_size_log2, + PageSize node_size, PageSizeLog2 node_size_log2, + const char* const kPageFileName = "pages.llfs", + unsigned int max_tree_height = 10, + unsigned int max_attachments = 32); + + Status recover_arena(std::vector& arenas, boost::uuids::uuid uuid, + batt::SharedPtr p_object_info); + + Status recover_arenas(std::vector& arenas); + // Attempts to recover an object of a given type from this context by uuid. // - // The type of the first argument determines the return type, the tag of the packed config, and - // the type of the extra_options parameter pack. + // The type of the first argument determines the return type, the tag of the packed config, + // and the type of the extra_options parameter pack. // template boost::hash> index_; + // An index of all storage objects by uuid. + // + std::unordered_set> recovered_uuids_; + // Options that will be used to instantiate `this->page_cache_`. // PageCacheOptions page_cache_options_ = PageCacheOptions::with_default_values(); diff --git a/src/llfs/storage_context.test.cpp b/src/llfs/storage_context.test.cpp index 2918d1df..3bec0744 100644 --- a/src/llfs/storage_context.test.cpp +++ b/src/llfs/storage_context.test.cpp @@ -14,7 +14,10 @@ #include #include +#include +#include #include +#include #include #include @@ -28,115 +31,231 @@ namespace { using namespace llfs::constants; using namespace llfs::int_types; -TEST(StorageContextTest, GetPageCache) +class StorageContextTest : public ::testing::Test { - const char* storage_file_name = "/tmp/llfs_StorageContextTest_GetPageCache_storage_file.llfs"; + public: + void SetUp() override + { + this->dir_name = "/tmp/"; + const char* storage_file_name = "/tmp/llfs_StorageContextTest_GetPageCache_storage_file.llfs"; - llfs::delete_file(storage_file_name).IgnoreError(); - EXPECT_FALSE(std::filesystem::exists(std::filesystem::path{storage_file_name})); + llfs::delete_file(storage_file_name).IgnoreError(); + EXPECT_FALSE(std::filesystem::exists(std::filesystem::path{storage_file_name})); - llfs::StatusOr io = - llfs::ScopedIoRing::make_new(llfs::MaxQueueDepth{1024}, llfs::ThreadPoolSize{1}); + this->io = llfs::ScopedIoRing::make_new(llfs::MaxQueueDepth{1024}, llfs::ThreadPoolSize{1}); - ASSERT_TRUE(io.ok()) << BATT_INSPECT(io.status()); + ASSERT_TRUE(io.ok()) << BATT_INSPECT(io.status()); + // Create a StorageContext. + // + this->storage_context = batt::make_shared( + batt::Runtime::instance().default_scheduler(), io->get_io_ring()); - // Create a StorageContext. - // - batt::SharedPtr storage_context = batt::make_shared( - batt::Runtime::instance().default_scheduler(), io->get_io_ring()); + boost::uuids::uuid arena_uuid_4kb = llfs::random_uuid(); + boost::uuids::uuid arena_uuid_2mb = llfs::random_uuid(); + + // Create a storage file with two page arenas, one for small pages (4kb), one for large (2mb). + // + llfs::Status file_create_status = storage_context->add_new_file( + storage_file_name, [&](llfs::StorageFileBuilder& builder) -> llfs::Status { + llfs::StatusOr> config_4kb = + builder.add_object(llfs::PageArenaConfigOptions{ + .uuid = arena_uuid_4kb, + .page_allocator = + llfs::CreateNewPageAllocator{ + .options = + llfs::PageAllocatorConfigOptions{ + .uuid = llfs::None, + .max_attachments = 32, + .page_count = llfs::PageCount{32}, + .log_device = + llfs::CreateNewLogDevice2WithDefaultSize{ + .uuid = llfs::None, + .device_page_size_log2 = llfs::None, + .data_alignment_log2 = llfs::None, + }, + .page_size_log2 = llfs::PageSizeLog2{12}, + .page_device = llfs::LinkToNewPageDevice{}, + }, + }, + .page_device = + llfs::CreateNewPageDevice{ + .options = + llfs::PageDeviceConfigOptions{ + .uuid = llfs::None, + .device_id = llfs::None, + .page_count = llfs::PageCount{32}, + .page_size_log2 = llfs::PageSizeLog2{12}, + }, + }, + }); + BATT_REQUIRE_OK(config_4kb); + + llfs::StatusOr> config_2mb = + builder.add_object(llfs::PageArenaConfigOptions{ + .uuid = arena_uuid_2mb, + .page_allocator = + llfs::CreateNewPageAllocator{ + .options = + llfs::PageAllocatorConfigOptions{ + .uuid = llfs::None, + .max_attachments = 32, + .page_count = llfs::PageCount{1}, + .log_device = + llfs::CreateNewLogDevice2WithDefaultSize{ + .uuid = llfs::None, + .device_page_size_log2 = llfs::None, + .data_alignment_log2 = llfs::None, + }, + .page_size_log2 = llfs::PageSizeLog2{21}, + .page_device = llfs::LinkToNewPageDevice{}, + }, + }, + .page_device = + llfs::CreateNewPageDevice{ + .options = + llfs::PageDeviceConfigOptions{ + .uuid = llfs::None, + .device_id = llfs::None, + .page_count = llfs::PageCount{1}, + .page_size_log2 = llfs::PageSizeLog2{21}, + }, + }, + }); + + BATT_REQUIRE_OK(config_2mb); + return llfs::OkStatus(); + }); + ASSERT_TRUE(file_create_status.ok()) << BATT_INSPECT(file_create_status); + + llfs::StatusOr> cache = + this->storage_context->get_page_cache(); + ASSERT_TRUE(cache.ok()) << BATT_INSPECT(cache.status()); + ASSERT_NE(*cache, nullptr); + llfs::Slice> devices_4kb = + (*cache)->devices_with_page_size(4 * kKiB); + EXPECT_EQ(devices_4kb.size(), 1u); + llfs::Slice> devices_2mb = + (*cache)->devices_with_page_size(2 * kMiB); + EXPECT_EQ(devices_2mb.size(), 1u); + } - boost::uuids::uuid arena_uuid_4kb = llfs::random_uuid(); - boost::uuids::uuid arena_uuid_2mb = llfs::random_uuid(); + batt::SharedPtr storage_context; - // Create a storage file with two page arenas, one for small pages (4kb), one for large (2mb). + llfs::StatusOr io; + + const char* dir_name; +}; + +TEST_F(StorageContextTest, GetPageCache) +{ + // This test just runs SetUp() // - llfs::Status file_create_status = storage_context->add_new_file( - storage_file_name, [&](llfs::StorageFileBuilder& builder) -> llfs::Status { - llfs::StatusOr> config_4kb = - builder.add_object( - llfs::PageArenaConfigOptions{ - .uuid = arena_uuid_4kb, - .page_allocator = - llfs::CreateNewPageAllocator{ - .options = - llfs::PageAllocatorConfigOptions{ - .uuid = llfs::None, - .max_attachments = 32, - .page_count = llfs::PageCount{32}, - .log_device = - llfs::CreateNewLogDevice2WithDefaultSize{ - .uuid = llfs::None, - .device_page_size_log2 = 9, - .data_alignment_log2 = 12, - }, - .page_size_log2 = llfs::PageSizeLog2{12}, - .page_device = llfs::LinkToNewPageDevice{}, - }, - }, - .page_device = - llfs::CreateNewPageDevice{ - .options = - llfs::PageDeviceConfigOptions{ - .uuid = llfs::None, - .device_id = llfs::None, - .page_count = llfs::PageCount{32}, - .page_size_log2 = llfs::PageSizeLog2{12}, - }, - }, - }); - - BATT_REQUIRE_OK(config_4kb); - - llfs::StatusOr> config_2mb = - builder.add_object( - llfs::PageArenaConfigOptions{ - .uuid = arena_uuid_2mb, - .page_allocator = - llfs::CreateNewPageAllocator{ - .options = - llfs::PageAllocatorConfigOptions{ - .uuid = llfs::None, - .max_attachments = 32, - .page_count = llfs::PageCount{32}, - .log_device = - llfs::CreateNewLogDevice2WithDefaultSize{ - .uuid = llfs::None, - .device_page_size_log2 = 9, - .data_alignment_log2 = 12, - }, - .page_size_log2 = llfs::PageSizeLog2{21}, - .page_device = llfs::LinkToNewPageDevice{}, - }, - }, - .page_device = - llfs::CreateNewPageDevice{ - .options = - llfs::PageDeviceConfigOptions{ - .uuid = llfs::None, - .device_id = llfs::None, - .page_count = llfs::PageCount{32}, - .page_size_log2 = llfs::PageSizeLog2{21}, - }, - }, - }); - - BATT_REQUIRE_OK(config_2mb); - - return llfs::OkStatus(); - }); - ASSERT_TRUE(file_create_status.ok()) << BATT_INSPECT(file_create_status); - - llfs::StatusOr> cache = storage_context->get_page_cache(); +} + +TEST_F(StorageContextTest, IncreasePageCacheStorage) +{ + const char* storage_file_name1 = "llfs_StorageContextTest_GetPageCache_storage_file2.llfs"; + const char* storage_file_name2 = "llfs_StorageContextTest_GetPageCache_storage_file3.llfs"; + + u8 node_size_log2 = 12 /*4kb*/; + u8 leaf_size_log2 = 21 /*2mb*/; + + llfs::delete_file(storage_file_name1).IgnoreError(); + EXPECT_FALSE(std::filesystem::exists(std::filesystem::path{storage_file_name1})); + llfs::delete_file(storage_file_name2).IgnoreError(); + EXPECT_FALSE(std::filesystem::exists(std::filesystem::path{storage_file_name2})); + + BATT_CHECK_OK(this->storage_context->increase_storage_capacity( + this->dir_name, 4096, llfs::PageSize{batt::checked_cast(u64{1} << leaf_size_log2)}, + llfs::PageSizeLog2{leaf_size_log2}, + llfs::PageSize{batt::checked_cast(u64{1} << node_size_log2)}, + llfs::PageSizeLog2{node_size_log2}, storage_file_name1)); + + llfs::StatusOr> cache = this->storage_context->get_page_cache(); ASSERT_TRUE(cache.ok()) << BATT_INSPECT(cache.status()); ASSERT_NE(*cache, nullptr); - - llfs::Slice devices_4kb = + llfs::Slice> devices_4kb = (*cache)->devices_with_page_size(4 * kKiB); - EXPECT_EQ(devices_4kb.size(), 1u); - - llfs::Slice devices_2mb = + EXPECT_EQ(devices_4kb.size(), 2u); + llfs::Slice> devices_2mb = (*cache)->devices_with_page_size(2 * kMiB); - EXPECT_EQ(devices_2mb.size(), 1u); + EXPECT_EQ(devices_2mb.size(), 2u); + + BATT_CHECK_OK(this->storage_context->increase_storage_capacity( + this->dir_name, 4096, llfs::PageSize{batt::checked_cast(u64{1} << leaf_size_log2)}, + llfs::PageSizeLog2{leaf_size_log2}, + llfs::PageSize{batt::checked_cast(u64{1} << node_size_log2)}, + llfs::PageSizeLog2{node_size_log2}, storage_file_name2)); + + ASSERT_TRUE(cache.ok()) << BATT_INSPECT(cache.status()); + ASSERT_NE(*cache, nullptr); + devices_4kb = (*cache)->devices_with_page_size(4 * kKiB); + EXPECT_EQ(devices_4kb.size(), 3u); + devices_2mb = (*cache)->devices_with_page_size(2 * kMiB); + EXPECT_EQ(devices_2mb.size(), 3u); + + llfs::PageCacheMetrics& metrics = (*cache)->metrics(); + LOG(INFO) << "metrics.get_page_view_count: " << metrics.get_page_view_count.load(); + + // TODO: [Gabe Bornstein 6/18/24] Add other invariants to be verified here. + // +} + +TEST_F(StorageContextTest, RunOutOfMemory) +{ + std::string file_name = "llfs_StorageContextTest_RunOutOfMemory_storage_file"; + std::string file_extension = ".llfs"; + + u8 node_size_log2 = 12 /*4kb*/; + u8 leaf_size_log2 = 21 /*2mb*/; + u8 num_storage_increases = 100; + u8 num_pages_per_device = 1; + + llfs::StatusOr> cache = this->storage_context->get_page_cache(); + + std::thread new_page_thread([&] { + for (usize i = 0; i < num_pages_per_device * num_storage_increases; ++i) { + LOG(INFO) << "new_page_thread start: " << i; + std::unique_ptr job1 = (*cache)->new_job(); + batt::StatusOr> page1 = + job1->new_page(llfs::PageSize{4096}, batt::WaitForResource::kFalse, + llfs::OpaquePageView::page_layout_id(), llfs::Caller::Unknown, + /*cancel_token=*/llfs::None); + LOG(INFO) << "new_page_thread end: " << i; + } + }); + + std::thread increase_storage_capacity_thread([&] { + for (int i = 0; i < num_storage_increases + 1; ++i) { + std::string file_name_str = file_name + batt::to_string(i) + file_extension; + const char* storage_file_name = file_name_str.c_str(); + LOG(INFO) << "increase_storage_capacity_thread start: " << i << " with file name: " << storage_file_name; + llfs::delete_file(storage_file_name).IgnoreError(); + EXPECT_FALSE(std::filesystem::exists(std::filesystem::path{storage_file_name})); + + BATT_CHECK_OK(this->storage_context->increase_storage_capacity( + this->dir_name, 4096, llfs::PageSize{batt::checked_cast(u64{1} << leaf_size_log2)}, + llfs::PageSizeLog2{leaf_size_log2}, + llfs::PageSize{batt::checked_cast(u64{1} << node_size_log2)}, + llfs::PageSizeLog2{node_size_log2}, storage_file_name)); + + ASSERT_TRUE(cache.ok()) << BATT_INSPECT(cache.status()); + ASSERT_NE(*cache, nullptr); + llfs::Slice> devices_4kb = + (*cache)->devices_with_page_size(4 * kKiB); + EXPECT_EQ(devices_4kb.size(), i * 1u + 2u); + llfs::Slice> devices_2mb = + (*cache)->devices_with_page_size(2 * kMiB); + EXPECT_EQ(devices_2mb.size(), i * 1u + 2u); + LOG(INFO) << "increase_storage_capacity_thread end: " << i; + } + }); + + llfs::PageCacheMetrics& metrics = (*cache)->metrics(); + LOG(INFO) << "metrics.get_page_view_count: " << metrics.get_page_view_count.load(); + + new_page_thread.join(); + increase_storage_capacity_thread.join(); } } // namespace diff --git a/src/llfs/storage_file_builder.cpp b/src/llfs/storage_file_builder.cpp index 15ab2a86..95a15f79 100644 --- a/src/llfs/storage_file_builder.cpp +++ b/src/llfs/storage_file_builder.cpp @@ -25,13 +25,14 @@ namespace llfs { //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -StorageFileBuilder::StorageFileBuilder(RawBlockFile& file, i64 base_offset) noexcept +StorageFileBuilder::StorageFileBuilder(RawBlockFile& file, i64 base_offset, + page_device_id_int next_available_device_id) noexcept : file_{file} , config_blocks_{} , unused_payload_{None} , base_offset_{base_offset} , next_offset_{base_offset} - , next_available_device_id_{0} + , next_available_device_id_{next_available_device_id} , pre_flush_actions_{} { } diff --git a/src/llfs/storage_file_builder.hpp b/src/llfs/storage_file_builder.hpp index 22beca4b..009c25f3 100644 --- a/src/llfs/storage_file_builder.hpp +++ b/src/llfs/storage_file_builder.hpp @@ -31,7 +31,8 @@ class StorageFileBuilder //+++++++++++-+-+--+----- --- -- - - - - - explicit StorageFileBuilder(RawBlockFile& file, i64 base_offset) noexcept; + explicit StorageFileBuilder(RawBlockFile& file, i64 base_offset, + page_device_id_int next_available_device_id = 0) noexcept; // Add a storage object to the file. // diff --git a/src/llfs/volume.cpp b/src/llfs/volume.cpp index 18c2a7b9..688f6eea 100644 --- a/src/llfs/volume.cpp +++ b/src/llfs/volume.cpp @@ -168,7 +168,7 @@ u64 Volume::calculate_grant_size(const AppendableJob& appendable) const metadata.ids->recycler_uuid, metadata.ids->trimmer_uuid, }) { - for (PageCache::PageDeviceEntry* entry : cache->all_devices()) { + for (std::shared_ptr entry : cache->all_devices()) { BATT_CHECK_NOT_NULLPTR(entry); const PageArena& arena = entry->arena; Optional attachment = @@ -280,7 +280,7 @@ u64 Volume::calculate_grant_size(const AppendableJob& appendable) const metadata.ids->recycler_uuid, metadata.ids->trimmer_uuid, }) { - for (PageCache::PageDeviceEntry* entry : cache->all_devices()) { + for (std::shared_ptr entry : cache->all_devices()) { BATT_CHECK_NOT_NULLPTR(entry); BATT_REQUIRE_OK(entry->arena.allocator().notify_user_recovered(uuid)); } diff --git a/src/llfs/volume.test.cpp b/src/llfs/volume.test.cpp index 628190ae..47df2da4 100644 --- a/src/llfs/volume.test.cpp +++ b/src/llfs/volume.test.cpp @@ -1599,7 +1599,7 @@ TEST_F(VolumeSimTest, ConcurrentAppendJobs) LLFS_VLOG(1) << "checking ref counts..."; - for (llfs::PageCache::PageDeviceEntry* entry : + for (std::shared_ptr entry : sim.cache()->devices_with_page_size(1 * kKiB)) { BATT_CHECK_NOT_NULLPTR(entry); for (llfs::PageId page_id : page_ids) { @@ -1888,7 +1888,7 @@ void VolumeSimTest::verify_post_recovery_expectations(RecoverySimState& state, if (state.recovered_second_page) { EXPECT_FALSE(state.second_job_will_not_commit); - for (llfs::PageCache::PageDeviceEntry* entry : + for (std::shared_ptr entry : sim.cache()->devices_with_page_size(1 * kKiB)) { BATT_CHECK_NOT_NULLPTR(entry); EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device - 1); @@ -1896,7 +1896,7 @@ void VolumeSimTest::verify_post_recovery_expectations(RecoverySimState& state, ASSERT_TRUE(sim.has_data_for_page_id(state.first_page_id).ok()); EXPECT_TRUE(*sim.has_data_for_page_id(state.first_page_id)); } - for (llfs::PageCache::PageDeviceEntry* entry : + for (std::shared_ptr entry : sim.cache()->devices_with_page_size(2 * kKiB)) { BATT_CHECK_NOT_NULLPTR(entry); EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device - 1); @@ -1904,7 +1904,7 @@ void VolumeSimTest::verify_post_recovery_expectations(RecoverySimState& state, ASSERT_TRUE(sim.has_data_for_page_id(state.second_root_page_id).ok()); EXPECT_TRUE(*sim.has_data_for_page_id(state.second_root_page_id)); } - for (llfs::PageCache::PageDeviceEntry* entry : + for (std::shared_ptr entry : sim.cache()->devices_with_page_size(4 * kKiB)) { BATT_CHECK_NOT_NULLPTR(entry); EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device - 1); @@ -1913,7 +1913,7 @@ void VolumeSimTest::verify_post_recovery_expectations(RecoverySimState& state, EXPECT_TRUE(*sim.has_data_for_page_id(state.third_page_id)); } } else { - for (llfs::PageCache::PageDeviceEntry* entry : + for (std::shared_ptr entry : sim.cache()->devices_with_page_size(1 * kKiB)) { BATT_CHECK_NOT_NULLPTR(entry); EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device - 1); @@ -1921,7 +1921,7 @@ void VolumeSimTest::verify_post_recovery_expectations(RecoverySimState& state, ASSERT_TRUE(sim.has_data_for_page_id(state.first_page_id).ok()); EXPECT_TRUE(*sim.has_data_for_page_id(state.first_page_id)); } - for (llfs::PageCache::PageDeviceEntry* entry : + for (std::shared_ptr entry : sim.cache()->devices_with_page_size(2 * kKiB)) { BATT_CHECK_NOT_NULLPTR(entry); EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device); @@ -1933,7 +1933,7 @@ void VolumeSimTest::verify_post_recovery_expectations(RecoverySimState& state, } } } - for (llfs::PageCache::PageDeviceEntry* entry : + for (std::shared_ptr entry : sim.cache()->devices_with_page_size(4 * kKiB)) { BATT_CHECK_NOT_NULLPTR(entry); EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device);