Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/borg/archiver/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ def wrapper(self, args, **kwargs):
)

with repository:
if repository.version not in (3,):
if repository.version not in (4,):
raise Error(
f"This borg version only accepts version 3 repos for -r/--repo, "
f"This borg version only accepts version 4 repos for -r/--repo, "
f"but not version {repository.version}. "
f"You can use 'borg transfer' to copy archives from old to new repos."
)
Expand Down Expand Up @@ -194,10 +194,10 @@ def wrapper(self, args, **kwargs):
)

with repository:
acceptable_versions = (1,) if v1_legacy else (3,)
acceptable_versions = (1,) if v1_legacy else (4,)
if repository.version not in acceptable_versions:
raise Error(
f"This borg version only accepts version {' or '.join(acceptable_versions)} "
f"This borg version only accepts version {' or '.join(str(v) for v in acceptable_versions)} "
f"repos for --other-repo."
)
kwargs["other_repository"] = repository
Expand Down
11 changes: 7 additions & 4 deletions src/borg/repoobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
OBJ_MAGIC = b"BORG_OBJ"
OBJ_VERSION = 0x01

# Fixed header size per blob: OBJ_MAGIC(8) + version(1) + chunk_id(32) + meta_size(4) + data_size(4)
REPOOBJ_HEADER_SIZE = 49


class RepoObj:
# Object header: magic (8b), format version (1b), meta size (4b), data size (4b).
obj_header = Struct("<8sBII")
ObjHeader = namedtuple("ObjHeader", "magic version meta_size data_size")
# Object header: magic (8b), format version (1b), chunk_id (32b), meta size (4b), data size (4b).
obj_header = Struct("<8sB32sII")
ObjHeader = namedtuple("ObjHeader", "magic version chunk_id meta_size data_size")

@classmethod
def extract_crypted_data(cls, data: bytes) -> bytes:
Expand Down Expand Up @@ -72,7 +75,7 @@ def format(
data_encrypted = self.key.encrypt(id, data_compressed)
meta_packed = msgpack.packb(meta)
meta_encrypted = self.key.encrypt(id, meta_packed)
hdr = self.ObjHeader(OBJ_MAGIC, OBJ_VERSION, len(meta_encrypted), len(data_encrypted))
hdr = self.ObjHeader(OBJ_MAGIC, OBJ_VERSION, id, len(meta_encrypted), len(data_encrypted))
hdr_packed = self.obj_header.pack(*hdr)
return hdr_packed + meta_encrypted + data_encrypted

Expand Down
76 changes: 39 additions & 37 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,13 @@ def __init__(
location = Location(url)
self._location = location
self.url = url
# lots of stuff in data: use 2 levels by default (data/00/00/ .. data/ff/ff/ dirs)!
data_levels = int(os.environ.get("BORG_STORE_DATA_LEVELS", "2"))
ns_config = {
"archives/": {"levels": [0]},
"cache/": {"levels": [0]},
"config/": {"levels": [0]},
"data/": {"levels": [data_levels]},
"keys/": {"levels": [0]},
"locks/": {"levels": [0]},
"packs/": {"levels": [1]},
}
# Get permissions from parameter or environment variable
permissions = permissions if permissions is not None else os.environ.get("BORG_REPO_PERMISSIONS", "all")
Expand All @@ -136,19 +134,19 @@ def __init__(
"archives": "lrw",
"cache": "lrwWD", # WD for chunks.<HASH>, last-key-checked, ...
"config": "lrW", # W for manifest
"data": "lrw",
"keys": "lr",
"locks": "lrwD", # borg needs to create/delete a shared lock here
"packs": "lrw",
}
elif permissions == "write-only": # mostly no reading
permissions = {
"": "l",
"archives": "lw",
"cache": "lrwWD", # read allowed, e.g. for chunks.<HASH> cache
"config": "lrW", # W for manifest
"data": "lw", # no r!
"keys": "lr",
"locks": "lrwD", # borg needs to create/delete a shared lock here
"packs": "lw", # no r!
}
elif permissions == "read-only": # mostly r/o
permissions = {"": "lr", "locks": "lrwD"}
Expand All @@ -171,7 +169,7 @@ def __init__(
self._send_log = send_log_cb or (lambda: None)
self.do_create = create
self.created = False
self.acceptable_repo_versions = (3,)
self.acceptable_repo_versions = (4,)
self.opened = False
self.lock = None
self.do_lock = lock
Expand Down Expand Up @@ -209,10 +207,10 @@ def create(self):
self.store.open()
try:
self.store.store("config/readme", REPOSITORY_README.encode())
self.version = 3
self.version = 4
self.store.store("config/version", str(self.version).encode())
self.store.store("config/id", bin_to_hex(os.urandom(32)).encode())
# we know repo/data/ still does not have any chunks stored in it,
# we know repo/packs/ still does not have any chunks stored in it,
# but for some stores, there might be a lot of empty directories and
# listing them all might be rather slow, so we better cache an empty
# ChunkIndex from here so that the first repo operation does not have
Expand Down Expand Up @@ -327,22 +325,21 @@ def log_error(msg):
def check_object(obj):
"""Check if obj looks valid."""
hdr_size = RepoObj.obj_header.size
obj_size = len(obj)
if obj_size >= hdr_size:
hdr = RepoObj.ObjHeader(*RepoObj.obj_header.unpack(obj[:hdr_size]))
if hdr.magic != OBJ_MAGIC:
log_error("invalid object magic.")
elif hdr.version != OBJ_VERSION:
log_error(f"unsupported object version: {hdr.version}.")
else:
meta = obj[hdr_size : hdr_size + hdr.meta_size]
if hdr.meta_size != len(meta):
log_error("metadata size mismatch.")
data = obj[hdr_size + hdr.meta_size : hdr_size + hdr.meta_size + hdr.data_size]
if hdr.data_size != len(data):
log_error("data size mismatch.")
else:
if len(obj) < hdr_size:
log_error("too small.")
return
hdr = RepoObj.ObjHeader(*RepoObj.obj_header.unpack(obj[:hdr_size]))
if hdr.magic != OBJ_MAGIC:
log_error("invalid object magic.")
elif hdr.version != OBJ_VERSION:
log_error(f"unsupported object version: {hdr.version}.")
else:
meta = obj[hdr_size : hdr_size + hdr.meta_size]
if hdr.meta_size != len(meta):
log_error("metadata size mismatch.")
data = obj[hdr_size + hdr.meta_size : hdr_size + hdr.meta_size + hdr.data_size]
if hdr.data_size != len(data):
log_error("data size mismatch.")

# TODO: progress indicator, ...
partial = bool(max_duration)
Expand Down Expand Up @@ -376,11 +373,11 @@ def check_object(obj):
# As we don't do garbage collection here, this is not a problem.
# We also don't know the plaintext size, so we set it to 0.
init_entry = ChunkIndexEntry(flags=ChunkIndex.F_USED, size=0)
infos = self.store.list("data")
infos = self.store.list("packs")
try:
for info in infos:
self._lock_refresh()
key = "data/%s" % info.name
key = "packs/%s" % info.name
if key <= last_key_checked: # needs sorted keys
continue
try:
Expand Down Expand Up @@ -412,8 +409,9 @@ def check_object(obj):
# add all existing objects to the index.
# borg check: the index may have corrupted objects (we did not delete them)
# borg check --repair: the index will only have non-corrupted objects.
id = hex_to_bin(info.name)
chunks[id] = init_entry
pack_id = hex_to_bin(info.name)
chunk_id = pack_id # N=1: chunk_id == pack_id
chunks[chunk_id] = init_entry
now = time.monotonic()
if now > t_last_checkpoint + 300: # checkpoint every 5 mins
t_last_checkpoint = now
Expand All @@ -437,7 +435,7 @@ def check_object(obj):
self, chunks, incremental=False, clear=True, force_write=True, delete_other=True
)
except StoreObjectNotFound:
# it can be that there is no "data/" at all, then it crashes when iterating infos.
# it can be that there is no "packs/" at all, then it crashes when iterating infos.
pass
logger.info(f"Checked {objs_checked} repository objects, {objs_errors} errors.")
if objs_errors == 0:
Expand All @@ -456,33 +454,35 @@ def list(self, limit=None, marker=None):
"""
collect = True if marker is None else False
result = []
infos = self.store.list("data") # generator yielding ItemInfos
infos = self.store.list("packs") # generator yielding ItemInfos
while True:
self._lock_refresh()
try:
info = next(infos)
except StoreObjectNotFound:
break # can happen e.g. if "data" does not exist, pointless to continue in that case
break # can happen e.g. if "packs" does not exist, pointless to continue in that case
Comment thread
mr-raj12 marked this conversation as resolved.
except StopIteration:
break
else:
id = hex_to_bin(info.name)
pack_id = hex_to_bin(info.name)
chunk_id = pack_id # N=1: chunk_id == pack_id
if collect:
result.append((id, info.size))
chunk_size = info.size # only correct for N=1
result.append((chunk_id, chunk_size))
if len(result) == limit:
break
elif id == marker:
elif chunk_id == marker:
collect = True
# note: do not collect the marker id
return result

def get(self, id, read_data=True, raise_missing=True):
self._lock_refresh()
pack_id = id # N=1: pack_id == chunk_id
id_hex = bin_to_hex(id)
key = "data/" + id_hex
key = "packs/" + bin_to_hex(pack_id)
try:
if read_data:
# read everything
return self.store.load(key)
else:
# RepoObj layout supports separately encrypted metadata and data.
Expand Down Expand Up @@ -523,7 +523,8 @@ def put(self, id, data, wait=True):
if data_size > MAX_DATA_SIZE:
raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]")

key = "data/" + bin_to_hex(id)
pack_id = id # N=1: pack_id == chunk_id
key = "packs/" + bin_to_hex(pack_id)
self.store.store(key, data)

def delete(self, id, wait=True):
Expand All @@ -533,7 +534,8 @@ def delete(self, id, wait=True):
deal with async results / exceptions later.
"""
self._lock_refresh()
key = "data/" + bin_to_hex(id)
pack_id = id # N=1: pack_id == chunk_id
key = "packs/" + bin_to_hex(pack_id)
try:
self.store.delete(key)
except StoreObjectNotFound:
Expand Down
11 changes: 6 additions & 5 deletions src/borg/testsuite/archiver/check_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def test_corrupted_manifest(archivers, request):
archive, repository = open_archive(archiver.repository_path, "archive1")
with repository:
manifest = repository.get_manifest()
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:]
repository.put_manifest(corrupted_manifest)
cmd(archiver, "check", exit_code=1)
output = cmd(archiver, "check", "-v", "--repair", exit_code=0)
Expand Down Expand Up @@ -273,7 +273,7 @@ def test_manifest_rebuild_corrupted_chunk(archivers, request):
archive, repository = open_archive(archiver.repository_path, "archive1")
with repository:
manifest = repository.get_manifest()
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:]
repository.put_manifest(corrupted_manifest)
chunk = repository.get(archive.id)
corrupted_chunk = chunk + b"corrupted!"
Expand Down Expand Up @@ -312,7 +312,7 @@ def test_spoofed_archive(archivers, request):
with repository:
# attacker would corrupt or delete the manifest to trigger a rebuild of it:
manifest = repository.get_manifest()
corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:]
corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:]
repository.put_manifest(corrupted_manifest)
archive_dict = {
"command_line": "",
Expand Down Expand Up @@ -351,8 +351,9 @@ def test_extra_chunks(archivers, request):
check_cmd_setup(archiver)
cmd(archiver, "check", exit_code=0)
with Repository(archiver.repository_location, exclusive=True) as repository:
chunk = fchunk(b"xxxx")
repository.put(b"01234567890123456789012345678901", chunk)
key = b"01234567890123456789012345678901"
chunk = fchunk(b"xxxx", chunk_id=key)
repository.put(key, chunk)
cmd(archiver, "check", "-v", exit_code=0) # check does not deal with orphans anymore


Expand Down
8 changes: 4 additions & 4 deletions src/borg/testsuite/repository_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def reopen(repository, exclusive: bool | None = True, create=False):
)


def fchunk(data, meta=b""):
def fchunk(data, meta=b"", chunk_id=b"\x00" * 32):
# Format chunk: create a raw chunk that has a valid RepoObj layout, but does not use encryption or compression.
hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, len(meta), len(data))
hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, chunk_id, len(meta), len(data))
assert isinstance(data, bytes)
chunk = hdr + meta + data
return chunk
Expand All @@ -65,7 +65,7 @@ def pchunk(chunk):
# Parse chunk: extract data and metadata from a raw chunk made by fchunk.
hdr_size = RepoObj.obj_header.size
hdr = chunk[:hdr_size]
meta_size, data_size = RepoObj.obj_header.unpack(hdr)[2:4]
meta_size, data_size = RepoObj.obj_header.unpack(hdr)[3:5]
meta = chunk[hdr_size : hdr_size + meta_size]
data = chunk[hdr_size + meta_size : hdr_size + meta_size + data_size]
return data, meta
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_basic_operations(repo_fixtures, request):
def test_read_data(repo_fixtures, request):
with get_repository_from_fixture(repo_fixtures, request) as repository:
meta, data = b"meta", b"data"
hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, len(meta), len(data))
hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, H(0), len(meta), len(data))
chunk_complete = hdr + meta + data
chunk_short = hdr + meta
repository.put(H(0), chunk_complete)
Expand Down
Loading