diff --git a/src/borg/archiver/_common.py b/src/borg/archiver/_common.py index 7aba725e28..769be095ed 100644 --- a/src/borg/archiver/_common.py +++ b/src/borg/archiver/_common.py @@ -127,9 +127,9 @@ def wrapper(self, args, **kwargs): ) with repository: - if repository.version not in (3,): + if repository.version not in (4,): raise Error( - f"This borg version only accepts version 3 repos for -r/--repo, " + f"This borg version only accepts version 4 repos for -r/--repo, " f"but not version {repository.version}. " f"You can use 'borg transfer' to copy archives from old to new repos." ) @@ -194,10 +194,10 @@ def wrapper(self, args, **kwargs): ) with repository: - acceptable_versions = (1,) if v1_legacy else (3,) + acceptable_versions = (1,) if v1_legacy else (4,) if repository.version not in acceptable_versions: raise Error( - f"This borg version only accepts version {' or '.join(acceptable_versions)} " + f"This borg version only accepts version {' or '.join(str(v) for v in acceptable_versions)} " f"repos for --other-repo." ) kwargs["other_repository"] = repository diff --git a/src/borg/repoobj.py b/src/borg/repoobj.py index ad8e52d6f6..f2a57f95a8 100644 --- a/src/borg/repoobj.py +++ b/src/borg/repoobj.py @@ -13,11 +13,14 @@ OBJ_MAGIC = b"BORG_OBJ" OBJ_VERSION = 0x01 +# Fixed header size per blob: OBJ_MAGIC(8) + version(1) + chunk_id(32) + meta_size(4) + data_size(4) +REPOOBJ_HEADER_SIZE = 49 + class RepoObj: - # Object header: magic (8b), format version (1b), meta size (4b), data size (4b). - obj_header = Struct("<8sBII") - ObjHeader = namedtuple("ObjHeader", "magic version meta_size data_size") + # Object header: magic (8b), format version (1b), chunk_id (32b), meta size (4b), data size (4b). + obj_header = Struct("<8sB32sII") + ObjHeader = namedtuple("ObjHeader", "magic version chunk_id meta_size data_size") @classmethod def extract_crypted_data(cls, data: bytes) -> bytes: @@ -72,7 +75,7 @@ def format( data_encrypted = self.key.encrypt(id, data_compressed) meta_packed = msgpack.packb(meta) meta_encrypted = self.key.encrypt(id, meta_packed) - hdr = self.ObjHeader(OBJ_MAGIC, OBJ_VERSION, len(meta_encrypted), len(data_encrypted)) + hdr = self.ObjHeader(OBJ_MAGIC, OBJ_VERSION, id, len(meta_encrypted), len(data_encrypted)) hdr_packed = self.obj_header.pack(*hdr) return hdr_packed + meta_encrypted + data_encrypted diff --git a/src/borg/repository.py b/src/borg/repository.py index 925725bcbb..bba937e336 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -115,15 +115,13 @@ def __init__( location = Location(url) self._location = location self.url = url - # lots of stuff in data: use 2 levels by default (data/00/00/ .. data/ff/ff/ dirs)! - data_levels = int(os.environ.get("BORG_STORE_DATA_LEVELS", "2")) ns_config = { "archives/": {"levels": [0]}, "cache/": {"levels": [0]}, "config/": {"levels": [0]}, - "data/": {"levels": [data_levels]}, "keys/": {"levels": [0]}, "locks/": {"levels": [0]}, + "packs/": {"levels": [1]}, } # Get permissions from parameter or environment variable permissions = permissions if permissions is not None else os.environ.get("BORG_REPO_PERMISSIONS", "all") @@ -136,9 +134,9 @@ def __init__( "archives": "lrw", "cache": "lrwWD", # WD for chunks., last-key-checked, ... "config": "lrW", # W for manifest - "data": "lrw", "keys": "lr", "locks": "lrwD", # borg needs to create/delete a shared lock here + "packs": "lrw", } elif permissions == "write-only": # mostly no reading permissions = { @@ -146,9 +144,9 @@ def __init__( "archives": "lw", "cache": "lrwWD", # read allowed, e.g. for chunks. cache "config": "lrW", # W for manifest - "data": "lw", # no r! "keys": "lr", "locks": "lrwD", # borg needs to create/delete a shared lock here + "packs": "lw", # no r! } elif permissions == "read-only": # mostly r/o permissions = {"": "lr", "locks": "lrwD"} @@ -171,7 +169,7 @@ def __init__( self._send_log = send_log_cb or (lambda: None) self.do_create = create self.created = False - self.acceptable_repo_versions = (3,) + self.acceptable_repo_versions = (4,) self.opened = False self.lock = None self.do_lock = lock @@ -209,10 +207,10 @@ def create(self): self.store.open() try: self.store.store("config/readme", REPOSITORY_README.encode()) - self.version = 3 + self.version = 4 self.store.store("config/version", str(self.version).encode()) self.store.store("config/id", bin_to_hex(os.urandom(32)).encode()) - # we know repo/data/ still does not have any chunks stored in it, + # we know repo/packs/ still does not have any chunks stored in it, # but for some stores, there might be a lot of empty directories and # listing them all might be rather slow, so we better cache an empty # ChunkIndex from here so that the first repo operation does not have @@ -327,22 +325,21 @@ def log_error(msg): def check_object(obj): """Check if obj looks valid.""" hdr_size = RepoObj.obj_header.size - obj_size = len(obj) - if obj_size >= hdr_size: - hdr = RepoObj.ObjHeader(*RepoObj.obj_header.unpack(obj[:hdr_size])) - if hdr.magic != OBJ_MAGIC: - log_error("invalid object magic.") - elif hdr.version != OBJ_VERSION: - log_error(f"unsupported object version: {hdr.version}.") - else: - meta = obj[hdr_size : hdr_size + hdr.meta_size] - if hdr.meta_size != len(meta): - log_error("metadata size mismatch.") - data = obj[hdr_size + hdr.meta_size : hdr_size + hdr.meta_size + hdr.data_size] - if hdr.data_size != len(data): - log_error("data size mismatch.") - else: + if len(obj) < hdr_size: log_error("too small.") + return + hdr = RepoObj.ObjHeader(*RepoObj.obj_header.unpack(obj[:hdr_size])) + if hdr.magic != OBJ_MAGIC: + log_error("invalid object magic.") + elif hdr.version != OBJ_VERSION: + log_error(f"unsupported object version: {hdr.version}.") + else: + meta = obj[hdr_size : hdr_size + hdr.meta_size] + if hdr.meta_size != len(meta): + log_error("metadata size mismatch.") + data = obj[hdr_size + hdr.meta_size : hdr_size + hdr.meta_size + hdr.data_size] + if hdr.data_size != len(data): + log_error("data size mismatch.") # TODO: progress indicator, ... partial = bool(max_duration) @@ -376,11 +373,11 @@ def check_object(obj): # As we don't do garbage collection here, this is not a problem. # We also don't know the plaintext size, so we set it to 0. init_entry = ChunkIndexEntry(flags=ChunkIndex.F_USED, size=0) - infos = self.store.list("data") + infos = self.store.list("packs") try: for info in infos: self._lock_refresh() - key = "data/%s" % info.name + key = "packs/%s" % info.name if key <= last_key_checked: # needs sorted keys continue try: @@ -412,8 +409,9 @@ def check_object(obj): # add all existing objects to the index. # borg check: the index may have corrupted objects (we did not delete them) # borg check --repair: the index will only have non-corrupted objects. - id = hex_to_bin(info.name) - chunks[id] = init_entry + pack_id = hex_to_bin(info.name) + chunk_id = pack_id # N=1: chunk_id == pack_id + chunks[chunk_id] = init_entry now = time.monotonic() if now > t_last_checkpoint + 300: # checkpoint every 5 mins t_last_checkpoint = now @@ -437,7 +435,7 @@ def check_object(obj): self, chunks, incremental=False, clear=True, force_write=True, delete_other=True ) except StoreObjectNotFound: - # it can be that there is no "data/" at all, then it crashes when iterating infos. + # it can be that there is no "packs/" at all, then it crashes when iterating infos. pass logger.info(f"Checked {objs_checked} repository objects, {objs_errors} errors.") if objs_errors == 0: @@ -456,33 +454,35 @@ def list(self, limit=None, marker=None): """ collect = True if marker is None else False result = [] - infos = self.store.list("data") # generator yielding ItemInfos + infos = self.store.list("packs") # generator yielding ItemInfos while True: self._lock_refresh() try: info = next(infos) except StoreObjectNotFound: - break # can happen e.g. if "data" does not exist, pointless to continue in that case + break # can happen e.g. if "packs" does not exist, pointless to continue in that case except StopIteration: break else: - id = hex_to_bin(info.name) + pack_id = hex_to_bin(info.name) + chunk_id = pack_id # N=1: chunk_id == pack_id if collect: - result.append((id, info.size)) + chunk_size = info.size # only correct for N=1 + result.append((chunk_id, chunk_size)) if len(result) == limit: break - elif id == marker: + elif chunk_id == marker: collect = True # note: do not collect the marker id return result def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() + pack_id = id # N=1: pack_id == chunk_id id_hex = bin_to_hex(id) - key = "data/" + id_hex + key = "packs/" + bin_to_hex(pack_id) try: if read_data: - # read everything return self.store.load(key) else: # RepoObj layout supports separately encrypted metadata and data. @@ -523,7 +523,8 @@ def put(self, id, data, wait=True): if data_size > MAX_DATA_SIZE: raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]") - key = "data/" + bin_to_hex(id) + pack_id = id # N=1: pack_id == chunk_id + key = "packs/" + bin_to_hex(pack_id) self.store.store(key, data) def delete(self, id, wait=True): @@ -533,7 +534,8 @@ def delete(self, id, wait=True): deal with async results / exceptions later. """ self._lock_refresh() - key = "data/" + bin_to_hex(id) + pack_id = id # N=1: pack_id == chunk_id + key = "packs/" + bin_to_hex(pack_id) try: self.store.delete(key) except StoreObjectNotFound: diff --git a/src/borg/testsuite/archiver/check_cmd_test.py b/src/borg/testsuite/archiver/check_cmd_test.py index aeccfd91d2..162b4c1986 100644 --- a/src/borg/testsuite/archiver/check_cmd_test.py +++ b/src/borg/testsuite/archiver/check_cmd_test.py @@ -225,7 +225,7 @@ def test_corrupted_manifest(archivers, request): archive, repository = open_archive(archiver.repository_path, "archive1") with repository: manifest = repository.get_manifest() - corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:] + corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:] repository.put_manifest(corrupted_manifest) cmd(archiver, "check", exit_code=1) output = cmd(archiver, "check", "-v", "--repair", exit_code=0) @@ -273,7 +273,7 @@ def test_manifest_rebuild_corrupted_chunk(archivers, request): archive, repository = open_archive(archiver.repository_path, "archive1") with repository: manifest = repository.get_manifest() - corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:] + corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:] repository.put_manifest(corrupted_manifest) chunk = repository.get(archive.id) corrupted_chunk = chunk + b"corrupted!" @@ -312,7 +312,7 @@ def test_spoofed_archive(archivers, request): with repository: # attacker would corrupt or delete the manifest to trigger a rebuild of it: manifest = repository.get_manifest() - corrupted_manifest = manifest[:123] + b"corrupted!" + manifest[123:] + corrupted_manifest = manifest[:250] + b"corrupted!" + manifest[250:] repository.put_manifest(corrupted_manifest) archive_dict = { "command_line": "", @@ -351,8 +351,9 @@ def test_extra_chunks(archivers, request): check_cmd_setup(archiver) cmd(archiver, "check", exit_code=0) with Repository(archiver.repository_location, exclusive=True) as repository: - chunk = fchunk(b"xxxx") - repository.put(b"01234567890123456789012345678901", chunk) + key = b"01234567890123456789012345678901" + chunk = fchunk(b"xxxx", chunk_id=key) + repository.put(key, chunk) cmd(archiver, "check", "-v", exit_code=0) # check does not deal with orphans anymore diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 5e15ec7942..becdb36354 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -53,9 +53,9 @@ def reopen(repository, exclusive: bool | None = True, create=False): ) -def fchunk(data, meta=b""): +def fchunk(data, meta=b"", chunk_id=b"\x00" * 32): # Format chunk: create a raw chunk that has a valid RepoObj layout, but does not use encryption or compression. - hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, len(meta), len(data)) + hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, chunk_id, len(meta), len(data)) assert isinstance(data, bytes) chunk = hdr + meta + data return chunk @@ -65,7 +65,7 @@ def pchunk(chunk): # Parse chunk: extract data and metadata from a raw chunk made by fchunk. hdr_size = RepoObj.obj_header.size hdr = chunk[:hdr_size] - meta_size, data_size = RepoObj.obj_header.unpack(hdr)[2:4] + meta_size, data_size = RepoObj.obj_header.unpack(hdr)[3:5] meta = chunk[hdr_size : hdr_size + meta_size] data = chunk[hdr_size + meta_size : hdr_size + meta_size + data_size] return data, meta @@ -97,7 +97,7 @@ def test_basic_operations(repo_fixtures, request): def test_read_data(repo_fixtures, request): with get_repository_from_fixture(repo_fixtures, request) as repository: meta, data = b"meta", b"data" - hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, len(meta), len(data)) + hdr = RepoObj.obj_header.pack(OBJ_MAGIC, OBJ_VERSION, H(0), len(meta), len(data)) chunk_complete = hdr + meta + data chunk_short = hdr + meta repository.put(H(0), chunk_complete)