|
| 1 | +''' |
| 2 | +Synchronization-related functionality |
| 3 | +''' |
| 4 | + |
| 5 | +import itertools |
| 6 | +import warnings |
| 7 | +import zfslib as zfs |
| 8 | +from zfslib_test_tools import * |
| 9 | + |
| 10 | + |
| 11 | +# it is time to determine which datasets need to be synced |
| 12 | +# we walk the entire dataset structure, and sync snapshots recursively |
| 13 | +def recursive_replicate(s, d): |
| 14 | + sched = [] |
| 15 | + |
| 16 | + # we first collect all snapshot names, to later see if they are on both sides, one side, or what |
| 17 | + all_snapshots = [] |
| 18 | + if s: all_snapshots.extend(s.get_snapshots()) |
| 19 | + if d: all_snapshots.extend(d.get_snapshots()) |
| 20 | + all_snapshots = [ y[1] for y in sorted([ (x.get_property('creation'), x.name) for x in all_snapshots ]) ] |
| 21 | + snapshot_pairs = [] |
| 22 | + for snap in all_snapshots: |
| 23 | + try: ssnap = s.get_snapshot(snap) |
| 24 | + except (KeyError, AttributeError): ssnap = None |
| 25 | + try: dsnap = d.get_snapshot(snap) |
| 26 | + except (KeyError, AttributeError): dsnap = None |
| 27 | + # if the source snapshot exists and is not already in the table of snapshots |
| 28 | + # then pair it up with its destination snapshot (if it exists) or None |
| 29 | + # and add it to the table of snapshots |
| 30 | + if ssnap and not snap in [ x[0].name for x in snapshot_pairs ]: |
| 31 | + snapshot_pairs.append((ssnap, dsnap)) |
| 32 | + |
| 33 | + # now we have a list of all snapshots, paired up by name, and in chronological order |
| 34 | + # (it's quadratic complexity, but who cares) |
| 35 | + # now we need to find the snapshot pair that happens to be the the most recent common pair |
| 36 | + found_common_pair = False |
| 37 | + for idx, (m, n) in enumerate(snapshot_pairs): |
| 38 | + if m and n and m.name == n.name: |
| 39 | + found_common_pair = idx |
| 40 | + |
| 41 | + # we have combed through the snapshot pairs |
| 42 | + # time to check what the latest common pair is |
| 43 | + if not s.get_snapshots(): |
| 44 | + if d is None: |
| 45 | + # well, no snapshots in source, just create a stub in the target |
| 46 | + sched.append(("create_stub", s, d, None, None)) |
| 47 | + elif found_common_pair is False: |
| 48 | + # no snapshot is in common, problem! |
| 49 | + # theoretically destroying destination dataset and resyncing it recursively would work |
| 50 | + # but this requires work in the optimizer that comes later |
| 51 | + if d is not None and d.get_snapshots(): |
| 52 | + warnings.warn("Asked to replicate %s into %s but %s has snapshots and both have no snapshots in common!" % (s, d, d)) |
| 53 | + # see source snapshots |
| 54 | + full_source_snapshots = [ y[1] for y in sorted([ (x.get_property('creation'), x) for x in s.get_snapshots() ]) ] |
| 55 | + # send first snapshot as full snapshot |
| 56 | + sched.append(("full", s, d, None, full_source_snapshots[0])) |
| 57 | + if len(full_source_snapshots) > 1: |
| 58 | + # send other snapshots as incremental snapshots |
| 59 | + sched.append(("incremental", s, d, full_source_snapshots[0], full_source_snapshots[-1])) |
| 60 | + elif found_common_pair == len(snapshot_pairs) - 1: |
| 61 | + # the latest snapshot of both datasets that is common to both, is the latest snapshot in the source |
| 62 | + # we have nothing to do here because the datasets are "in sync" |
| 63 | + pass |
| 64 | + else: |
| 65 | + # the source dataset has more recent snapshots, not present in the destination dataset |
| 66 | + # we need to transfer those |
| 67 | + snapshots_to_transfer = [ x[0] for x in snapshot_pairs[found_common_pair:] ] |
| 68 | + for n, x in enumerate(snapshots_to_transfer): |
| 69 | + if n == 0: continue |
| 70 | + sched.append(("incremental", s, d, snapshots_to_transfer[n - 1], x)) |
| 71 | + |
| 72 | + # now let's apply the same argument to the children |
| 73 | + children_sched = [] |
| 74 | + for c in [ x for x in s.children if not isinstance(x, zfs.Snapshot) ]: |
| 75 | + try: cd = d.get_child(c.name) |
| 76 | + except (KeyError, AttributeError): cd = None |
| 77 | + children_sched.extend(recursive_replicate(c, cd)) |
| 78 | + |
| 79 | + # and return our schedule of operations to the parent |
| 80 | + return sched + children_sched |
| 81 | + |
| 82 | +def optimize_coalesce(operation_schedule): |
| 83 | + # now let's optimize the operation schedule |
| 84 | + # this optimization is quite basic |
| 85 | + # step 1: coalesce contiguous operations on the same file system |
| 86 | + |
| 87 | + operations_grouped_by_source = itertools.groupby( |
| 88 | + operation_schedule, |
| 89 | + lambda op: op[1] |
| 90 | + ) |
| 91 | + new = [] |
| 92 | + for _, opgroup in [ (x, list(y)) for x, y in operations_grouped_by_source ]: |
| 93 | + if not opgroup: # empty opgroup |
| 94 | + continue |
| 95 | + if opgroup[0][0] == 'full': # full operations |
| 96 | + new.extend(opgroup) |
| 97 | + elif opgroup[0][0] == 'create_stub': # create stub operations |
| 98 | + new.extend(opgroup) |
| 99 | + elif opgroup[0][0] == 'incremental': # incremental |
| 100 | + # 1->2->3->4 => 1->4 |
| 101 | + new_ops = [ (srcs, dsts) for _, _, _, srcs, dsts in opgroup ] |
| 102 | + new_ops = simplify(new_ops) |
| 103 | + for srcs, dsts in new_ops: |
| 104 | + new.append(tuple(opgroup[0][:3] + (srcs, dsts))) |
| 105 | + else: |
| 106 | + assert 0, "not reached: unknown operation type in %s" % opgroup |
| 107 | + return new |
| 108 | + |
| 109 | +def optimize_recursivize(operation_schedule): |
| 110 | + def recurse(dataset, func): |
| 111 | + results = [] |
| 112 | + results.append((dataset, func(dataset))) |
| 113 | + results.extend([ x for child in dataset.children if child.__class__ != zfs.Snapshot for x in recurse(child, func) ]) |
| 114 | + return results |
| 115 | + |
| 116 | + def zero_out_sched(dataset): |
| 117 | + dataset._ops_schedule = [] |
| 118 | + |
| 119 | + def evict_sched(dataset): |
| 120 | + dataset._ops_schedule = [] |
| 121 | + |
| 122 | + operations_grouped_by_source = itertools.groupby( |
| 123 | + operation_schedule, |
| 124 | + lambda op: op[1] |
| 125 | + ) |
| 126 | + operations_grouped_by_source = [ (x, list(y)) for x, y in operations_grouped_by_source ] |
| 127 | + |
| 128 | + roots = set() |
| 129 | + for root, opgroup in operations_grouped_by_source: |
| 130 | + while root.parent is not None: |
| 131 | + root = root.parent |
| 132 | + roots.add(root) |
| 133 | + |
| 134 | + for root in roots: |
| 135 | + recurse(root, zero_out_sched) |
| 136 | + |
| 137 | + for source, opgroup in operations_grouped_by_source: |
| 138 | + source._ops_schedule = opgroup |
| 139 | + |
| 140 | + def compare(*ops_schedules): |
| 141 | + assert len(ops_schedules), "operations schedules cannot be empty: %r" % ops_schedules |
| 142 | + |
| 143 | + # in the case of the list of operations schedules being just one (no children) |
| 144 | + # we return True, cos it's safe to recursively replicate this one |
| 145 | + if len(ops_schedules) == 1: |
| 146 | + return True |
| 147 | + |
| 148 | + # now let's check that all ops schedules are the same length |
| 149 | + # otherwise they are not the same and we can say the comparison isn't the same |
| 150 | + lens = set([ len(o) for o in ops_schedules ]) |
| 151 | + if len(lens) != 1: |
| 152 | + return False |
| 153 | + |
| 154 | + # we have multiple schedules |
| 155 | + # if their type, snapshot origin and snapshot destination are all the same |
| 156 | + # we can say that they are "the same" |
| 157 | + comparisons = [ |
| 158 | + all([ |
| 159 | + # never attempt to recursivize operations who involve create_stub |
| 160 | + all(["create_stub" not in o[0] for o in ops]), |
| 161 | + len(set([o[0] for o in ops])) == 1, |
| 162 | + any([o[3] is None for o in ops]) or len(set([o[3].name for o in ops])) == 1, |
| 163 | + any([o[4] is None for o in ops]) or len(set([o[4].name for o in ops])) == 1, |
| 164 | + ]) |
| 165 | + for ops |
| 166 | + in zip(*ops_schedules) |
| 167 | + ] |
| 168 | + return all(comparisons) |
| 169 | + |
| 170 | + # remove unnecessary stubs that stand in for only other stubs |
| 171 | + for root in roots: |
| 172 | + for dataset, _ in recurse(root, lambda d: d): |
| 173 | + ops = [z for x, y in recurse(dataset, lambda d: d._ops_schedule) for z in y] |
| 174 | + if all([o[0] == 'create_stub' for o in ops]): |
| 175 | + dataset._ops_schedule = [] |
| 176 | + |
| 177 | + for root in roots: |
| 178 | + for dataset, _ in recurse(root, lambda d: d): |
| 179 | + if compare(*[y for x, y in recurse(dataset, lambda d: d._ops_schedule)]): |
| 180 | + old_ops_schedule = dataset._ops_schedule |
| 181 | + recurse(dataset, zero_out_sched) |
| 182 | + for op in old_ops_schedule: |
| 183 | + dataset._ops_schedule.append(( |
| 184 | + op[0] + "_recursive", op[1], op[2], op[3], op[4] |
| 185 | + )) |
| 186 | + |
| 187 | + new_operation_schedule = [] |
| 188 | + for root in roots: |
| 189 | + for dataset, ops_schedule in recurse(root, lambda d: d._ops_schedule): |
| 190 | + new_operation_schedule.extend(ops_schedule) |
| 191 | + |
| 192 | + for root in roots: |
| 193 | + recurse(root, evict_sched) |
| 194 | + |
| 195 | + return new_operation_schedule |
| 196 | + |
| 197 | +def optimize(operation_schedule, allow_recursivize = True): |
| 198 | + operation_schedule = optimize_coalesce(operation_schedule) |
| 199 | + if allow_recursivize: |
| 200 | + operation_schedule = optimize_recursivize(operation_schedule) |
| 201 | + return operation_schedule |
| 202 | + |
| 203 | +# we walk the entire dataset structure, and sync snapshots recursively |
| 204 | +def recursive_clear_obsolete(s, d): |
| 205 | + sched = [] |
| 206 | + |
| 207 | + # we first collect all snapshot names, to later see if they are on both sides, one side, or what |
| 208 | + snapshots_in_src = set([ m.name for m in s.get_snapshots() ]) |
| 209 | + snapshots_in_dst = set([ m.name for m in d.get_snapshots() ]) |
| 210 | + |
| 211 | + snapshots_to_delete = snapshots_in_dst - snapshots_in_src |
| 212 | + snapshots_to_delete = [ d.get_snapshot(m) for m in snapshots_to_delete ] |
| 213 | + |
| 214 | + for m in snapshots_to_delete: |
| 215 | + sched.append(("destroy", m)) |
| 216 | + |
| 217 | + # now let's apply the same argument to the children |
| 218 | + children_sched = [] |
| 219 | + for child_d in [ x for x in d.children if not isinstance(x, zfs.Snapshot) ]: |
| 220 | + child_s = None |
| 221 | + |
| 222 | + try: |
| 223 | + child_s = s.get_child(child_d.name) |
| 224 | + except (KeyError, AttributeError): |
| 225 | + children_sched.append(("destroy_recursively", child_d)) |
| 226 | + |
| 227 | + if child_s: |
| 228 | + children_sched.extend(recursive_clear_obsolete(child_s, child_d)) |
| 229 | + |
| 230 | + # and return our schedule of operations to the parent |
| 231 | + return sched + children_sched |
0 commit comments