Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions mooncake-store/src/serialize/serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,14 +737,16 @@ auto Serializer<Replica>::deserialize(const msgpack::object &obj,
"data, expected array"));
}

// Verify array size is correct (should have 5 elements: id, status,
// replica_type, storage_level, payload)
if (obj.via.array.size != 5) {
// Accept both legacy 4-element format [id, status, replica_type, payload]
// and new 5-element format [id, status, replica_type, storage_level, payload].
// Legacy replicas default to StorageLevel::RAM.
const auto array_size = obj.via.array.size;
if (array_size != 4 && array_size != 5) {
return tl::unexpected(SerializationError(
ErrorCode::DESERIALIZE_FAIL,
fmt::format("deserialize_msgpack Replica invalid array size: "
"expected 5, got {}",
obj.via.array.size)));
"expected 4 or 5, got {}",
array_size)));
}

auto *array_items = obj.via.array.ptr;
Expand All @@ -758,16 +760,22 @@ auto Serializer<Replica>::deserialize(const msgpack::object &obj,
// 3. Deserialize replica_type
auto replica_type_code = array_items[2].as<int8_t>();

// 4. Deserialize storage_level
auto storage_level = static_cast<StorageLevel>(array_items[3].as<int8_t>());
// 4. Deserialize storage_level (missing in legacy 4-element format → RAM)
StorageLevel storage_level = StorageLevel::RAM;
size_t payload_index = 3;
if (array_size == 5) {
storage_level =
static_cast<StorageLevel>(array_items[3].as<int8_t>());
payload_index = 4;
}

// 5. Parse payload by type
std::shared_ptr<Replica> replica;
switch (replica_type_code) {
case static_cast<int8_t>(ReplicaType::MEMORY): {
// MEMORY: payload is AllocatedBuffer
auto buffer_result = Serializer<AllocatedBuffer>::deserialize(
array_items[4], segment_view);
array_items[payload_index], segment_view);
if (!buffer_result) {
return tl::unexpected(buffer_result.error());
}
Expand All @@ -776,7 +784,7 @@ auto Serializer<Replica>::deserialize(const msgpack::object &obj,
break;
}
case static_cast<int8_t>(ReplicaType::DISK): {
const auto &payload = array_items[4];
const auto &payload = array_items[payload_index];
if (payload.type != msgpack::type::ARRAY ||
payload.via.array.size != 2) {
return tl::unexpected(
Expand All @@ -793,7 +801,7 @@ auto Serializer<Replica>::deserialize(const msgpack::object &obj,
break;
}
case static_cast<int8_t>(ReplicaType::LOCAL_DISK): {
const auto &payload = array_items[4];
const auto &payload = array_items[payload_index];
if (payload.type != msgpack::type::ARRAY ||
payload.via.array.size != 3) {
return tl::unexpected(
Expand Down