Skip to content

Commit 671a0bf

Browse files
Adding Testing
WIP
1 parent fa54dbc commit 671a0bf

6 files changed

Lines changed: 1541 additions & 39 deletions

File tree

examples/ex_local.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
# Basic example for accessing ZFS on local machine
2-
32
import sys
43
import zfslib as zfs
54
from zfslib_ex_common import *

tests/data_mounts.tsv

Lines changed: 604 additions & 0 deletions
Large diffs are not rendered by default.

tests/data_nomounts.tsv

Lines changed: 604 additions & 0 deletions
Large diffs are not rendered by default.

tests/test_zfslib.py

Lines changed: 256 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,258 @@
1+
import unittest
2+
from datetime import datetime
13
import zfslib as zfs
4+
from zfslib_test_tools import *
25

3-
# TBD
6+
# TODO:
7+
# [.] Write negative tests for get_mounts=False
8+
9+
# Load / init test data
10+
properties = ['name', 'creation', 'used', 'available', 'referenced', 'mountpoint','mounted']
11+
12+
zlist_data = load_test_data('data_mounts', properties)
13+
poolset = zfs.TestPoolSet()
14+
poolset.parse_zfs_r_output(zlist_data, properties=properties)
15+
pool_names = pool_names = [p.name for p in poolset if True]
16+
17+
18+
class PoolSetTests(unittest.TestCase):
19+
20+
def test_pool_names(self):
21+
self.assertEqual(pool_names, ['bpool', 'dpool', 'rpool'])
22+
23+
24+
def test_zfs_list_parsing(self):
25+
lines = zlist_data.splitlines()
26+
for line in lines:
27+
line = line.decode('utf-8') if isinstance(line, bytes) else line
28+
if line.strip() == '': continue
29+
row = line.split('\t')
30+
name = row[0]
31+
obj = poolset.lookup(name)
32+
if name.find('@') > -1:
33+
self.assertIsInstance(obj, zfs.Snapshot)
34+
if not obj.dataset is None:
35+
self.assertIsInstance(obj.dataset, zfs.Dataset)
36+
self.assertIsInstance(obj.pool, zfs.Pool)
37+
self.assertIsInstance(obj.parent, (zfs.Dataset, zfs.Pool))
38+
39+
elif name.find('/') > -1:
40+
self.assertIsInstance(obj, zfs.Dataset)
41+
self.assertIsInstance(obj.pool, zfs.Pool)
42+
self.assertIsInstance(obj.parent, (zfs.Dataset, zfs.Pool))
43+
44+
else:
45+
self.assertIsInstance(obj, zfs.Pool)
46+
47+
48+
def test_poolset_get_pool(self):
49+
for pname in pool_names:
50+
pool = poolset.get_pool(pname)
51+
self.assertIsInstance(pool, zfs.Pool)
52+
self.assertEqual(pool.name, pname)
53+
54+
with self.assertRaises(KeyError):
55+
poolset.get_pool('pool')
56+
poolset.get_pool('*pool')
57+
58+
59+
# Test PoolSet.lookup and properties: name
60+
def test_poolset_lookup_pool(self):
61+
pool = poolset.lookup('dpool')
62+
self.assertIsInstance(pool, zfs.Pool)
63+
self.assertEqual(pool.name, 'dpool')
64+
65+
# Lookup should fail unless pool name is specified
66+
with self.assertRaises(KeyError):
67+
poolset.lookup('pool')
68+
poolset.lookup('*pool')
69+
70+
71+
# Test PoolSet.lookup, ZFSItem.lookup and properties: name, pool, dataset, parent, path, dspath
72+
def test_poolset_lookup_dataset(self):
73+
for (pool_c, dspath_c) in [
74+
('bpool','BOOT')
75+
,('rpool','ROOT')
76+
,('rpool','ROOT/ubuntu_n2qr5q')
77+
,('rpool','ROOT/ubuntu_n2qr5q/usr')
78+
,('rpool','ROOT/ubuntu_n2qr5q/usr/local')
79+
]:
80+
ds_fullpath = '{}/{}'.format(pool_c, dspath_c)
81+
ds_name = dspath_c[dspath_c.rfind('/')+1:] if '/' in dspath_c else dspath_c
82+
ds = poolset.lookup(ds_fullpath)
83+
self.assertIsInstance(ds, zfs.Dataset)
84+
self.assertEqual(ds.name, ds_name)
85+
self.assertEqual(ds.path, ds_fullpath)
86+
self.assertEqual(ds.dspath, dspath_c)
87+
88+
pool = ds.pool
89+
self.assertIsInstance(pool, zfs.Pool)
90+
self.assertEqual(pool.name, pool_c)
91+
self.assertEqual(pool, ds.parent.pool)
92+
93+
94+
ds = poolset.lookup('bpool/BOOT')
95+
self.assertIsInstance(ds.parent, zfs.Pool)
96+
97+
# Lookup should fail unless full path to dataset is specified
98+
with self.assertRaises(KeyError):
99+
poolset.lookup('bpool/*OOT')
100+
poolset.lookup('bpool/OOT')
101+
102+
103+
# Test PoolSet.lookup, ZfsItem.lookup, ZfsItem.get_child, and properties: name, pool, dataset, parent, path
104+
def test_poolset_lookup_snapshot(self):
105+
for (pool_c, spath_c, cdate_c) in [
106+
('bpool','BOOT/ubuntu_n2qr5q@autozsys_68frge', '1608162058')
107+
,('bpool','BOOT/ubuntu_n2qr5q@autozsys_wo1pfb', '1608266786')
108+
,('bpool','BOOT/ubuntu_n2qr5q@autozsys_fzhwyn', '1608401468')
109+
]:
110+
snap_fullpath = '{}/{}'.format(pool_c, spath_c)
111+
snap_name = spath_c[spath_c.rfind('@')+1:]
112+
snap = poolset.lookup(snap_fullpath)
113+
self.assertIsInstance(snap, zfs.Snapshot)
114+
self.assertIsInstance(snap.parent, zfs.Dataset)
115+
dataset = snap.dataset
116+
self.assertIsInstance(dataset, zfs.Dataset)
117+
self.assertEqual(dataset.name, snap.parent.name)
118+
pool = snap.pool
119+
self.assertIsInstance(pool, zfs.Pool)
120+
self.assertEqual(pool.name, pool_c)
121+
self.assertEqual(pool, snap.parent.pool)
122+
123+
self.assertEqual(snap.name, snap_name)
124+
self.assertEqual(snap.path, snap_fullpath)
125+
self.assertEqual(snap.get_property('creation'), cdate_c)
126+
127+
# Lookup should fail unless full path to snapshot is specified
128+
with self.assertRaises(KeyError):
129+
poolset.lookup('BOOT/ubuntu_n2qr5q@autozsys_68frge')
130+
poolset.lookup('autozsys_68frge')
131+
poolset.lookup('@autozsys_68frge')
132+
133+
134+
135+
def test_zfsitem_get_child_n_properties(self):
136+
137+
pool = poolset.lookup('rpool')
138+
self.assertIsInstance(pool, zfs.Pool)
139+
ds = pool.lookup('ROOT')
140+
self.assertIsInstance(ds, zfs.Dataset)
141+
ds2=ds.get_child('ubuntu_n2qr5q')
142+
self.assertIsInstance(ds2, zfs.Dataset)
143+
self.assertEqual(ds2.get_property('used'), '12518498304')
144+
ds3=ds2.get_child('srv')
145+
self.assertIsInstance(ds3, zfs.Dataset)
146+
self.assertEqual(ds3.get_property('used'), '327680')
147+
snap = ds3.get_child('autozsys_j57yyo')
148+
self.assertIsInstance(snap, zfs.Snapshot)
149+
150+
# Negative tests
151+
with self.assertRaises(KeyError):
152+
ds2.get_child('foo')
153+
ds3.get_child('bar')
154+
155+
156+
ds = poolset.lookup('dpool/other')
157+
self.assertIsInstance(ds, zfs.Dataset)
158+
s_ts='1608446351'
159+
self.assertEqual(ds.get_property('creation'), s_ts)
160+
self.assertEqual(ds.creation, datetime.fromtimestamp(int(s_ts)))
161+
self.assertEqual(ds.get_property('used'), '280644734976')
162+
self.assertEqual(ds.get_property('available'), '3564051083264')
163+
self.assertEqual(ds.get_property('referenced'), '266918285312')
164+
self.assertEqual(ds.mountpoint, '/dpool/other')
165+
self.assertEqual(ds.mounted, True)
166+
self.assertEqual(ds.has_mount, True)
167+
168+
169+
ds = poolset.lookup('bpool/BOOT')
170+
self.assertIsInstance(ds, zfs.Dataset)
171+
self.assertEqual(ds.get_property('creation'), '1608154061')
172+
self.assertEqual(ds.get_property('used'), '190410752')
173+
self.assertEqual(ds.get_property('available'), '1686265856')
174+
self.assertEqual(ds.get_property('referenced'), '98304')
175+
self.assertEqual(ds.mountpoint, 'none')
176+
self.assertEqual(ds.has_mount, False)
177+
self.assertEqual(ds.mounted, False)
178+
179+
180+
def test_pool_get_datasets_etc(self):
181+
ds_counts = {}
182+
ds_counts['bpool'] = 2
183+
ds_counts['dpool'] = 4
184+
ds_counts['rpool'] = 20
185+
for pool in poolset:
186+
self.assertIs(pool.name in ds_counts, True)
187+
ds_count = ds_counts[pool.name]
188+
self.assertIsInstance(pool, zfs.Pool)
189+
all_ds = pool.get_all_datasets()
190+
self.assertEqual(len(all_ds), ds_count)
191+
192+
all_ds = pool.get_all_datasets(with_depth=True)
193+
self.assertEqual(len(all_ds), ds_count)
194+
t=all_ds[0]
195+
self.assertIsInstance(t, tuple)
196+
self.assertEqual(len(t), 2)
197+
self.assertIsInstance(t[0], int)
198+
self.assertIsInstance(t[1], zfs.Dataset)
199+
200+
self.assertIsInstance(all_ds, list)
201+
seen={}
202+
for (depth, ds) in all_ds:
203+
self.assertIsInstance(ds, zfs.Dataset)
204+
self.assertIs(ds.path in seen, False, 'Duplicate object returned in Pool.get_all_datasets(): {}'.format(ds.path))
205+
seen[ds.path] = True
206+
self.assertEqual(ds, poolset.lookup(ds.path))
207+
self.assertEqual(ds, pool.lookup(ds.dspath))
208+
self.assertEqual(ds, pool.get_dataset(ds.dspath))
209+
210+
211+
# Negative Tests
212+
pool = poolset.lookup('rpool')
213+
with self.assertRaises(KeyError):
214+
pool.get_dataset('foo')
215+
pool.get_dataset('ubuntu_n2qr5q/srv')
216+
pool.get_dataset('OOT/ubuntu_n2qr5q/srv')
217+
pool.get_dataset('*OOT/ubuntu_n2qr5q/srv')
218+
219+
220+
221+
def test_snapable_get_snapshots(self):
222+
pool = poolset.lookup('rpool')
223+
ds = pool.get_dataset('USERDATA/jbloggs_jb327m')
224+
self.assertIsInstance(ds, zfs.Dataset)
225+
snaps_all = ds.get_all_snapshots()
226+
self.assertIs(len(snaps_all), 56)
227+
snaps = ds.get_snapshots(flt=lambda s: int(s.get_property('used')) >= 3000000 and int(s.get_property('used')) <= 6000000)
228+
self.assertIs(len(snaps), 4)
229+
snaps = ds.get_snapshots(flt=lambda s: int(s.get_property('used')) >= 6000000 and int(s.get_property('used')) <= 9000000)
230+
self.assertIs(len(snaps), 4)
231+
snaps = ds.get_snapshots(flt=lambda s: True)
232+
self.assertIs(len(snaps), len(snaps_all))
233+
snaps = ds.get_snapshots()
234+
self.assertIs(len(snaps), len(snaps_all))
235+
snaps = ds.get_snapshots(index=True)
236+
self.assertIs(len(snaps), len(snaps_all))
237+
238+
snaps = ds.get_snapshots(flt=lambda s: int(s.get_property('used')) >= 3000000 and int(s.get_property('used')) <= 6000000, index=True)
239+
self.assertEqual(len(snaps), 4)
240+
self.assertIsInstance(snaps[1], tuple)
241+
self.assertEqual(len(snaps[1]), 2)
242+
self.assertIsInstance(snaps[1][0], int)
243+
self.assertIsInstance(snaps[1][1], zfs.Snapshot)
244+
self.assertEqual(snaps[1][0], 10)
245+
self.assertEqual(snaps[1][1].name, 'autozsys_wp6x2l')
246+
247+
248+
# Negative tests
249+
with self.assertRaises(AssertionError):
250+
ds.get_snapshots(flt=None)
251+
ds.get_snapshots(index='test')
252+
253+
254+
255+
256+
if __name__ == "__main__":
257+
#import sys;sys.argv = ['', 'Test.testName']
258+
unittest.main()

tests/zfslib_test_tools.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,31 @@
22
import os
33
import subprocess
44

5+
6+
# Loads and validates test data
7+
def load_test_data(alias, properties):
8+
fpath="./tests/{}.tsv".format(alias)
9+
assert os.path.isfile(fpath), "Test Data file {} not found.".format(fpath)
10+
ret=[]
11+
rc = 0
12+
with open(fpath) as f:
13+
while True:
14+
rc = rc + 1
15+
line = f.readline()
16+
if not line:
17+
break
18+
row = line.split('\t')
19+
assert len(row) == len(properties) \
20+
,"Row number {} in file {} has incorrect number of columns ({}) expecting {}.".format(
21+
rc, fpath, len(row), len(properties)
22+
)
23+
ret.append(line)
24+
return '\n'.join(ret)
25+
26+
27+
28+
29+
530
def simplify(x):
631
'''Take a list of tuples where each tuple is in form [v1,v2,...vn]
732
and then coalesce all tuples tx and ty where tx[v1] equals ty[v2],

0 commit comments

Comments
 (0)