Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/azul/indexer/mirror_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,7 @@ def delete_it_files(self):
"""
assert self.catalog in config.integration_test_catalogs, R(
'Not an IT catalog', self.catalog)
prefix = self._mirror_prefix
assert len(prefix) > 1 and prefix.endswith('/'), prefix
object_keys = self._storage.list_objects(prefix)
assert len(object_keys) <= 300, R('Too many objects', len(object_keys))
self._storage.delete_objects(object_keys, batch_size=100)
self._storage.delete_prefix(self._mirror_prefix)


@attrs.frozen(kw_only=True, slots=False)
Expand Down
42 changes: 31 additions & 11 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
)
from azul.lib.functions import (
compose,
iif,
)
from azul.lib.json import (
copy_json,
Expand Down Expand Up @@ -571,14 +572,17 @@ class CachedManifestNotFound(Exception):
manifest_key: ManifestKey


@attrs.frozen(kw_only=True)
class ManifestService(QueryService):
file_url_func: FileUrlFunc
class BaseManifestService:

@cached_property
def storage_service(self) -> StorageService:
return StorageService()


@attrs.frozen(kw_only=True)
class ManifestService(BaseManifestService, QueryService):
file_url_func: FileUrlFunc

def get_manifest(self,
*,
format: ManifestFormat,
Expand Down Expand Up @@ -785,7 +789,25 @@ def command_lines(self,
type Cells = dict[str, str]


class ManifestGenerator(metaclass=ABCMeta):
class ManifestAccessor:
service: BaseManifestService

def __init__(self, service: BaseManifestService):
self.service = service

@property
def storage(self) -> StorageService:
return self.service.storage_service

@classmethod
def _manifest_prefix(cls, is_it: bool) -> str:
return 'manifests/' + iif(is_it, '_it/')

def delete_it_files(self):
self.storage.delete_prefix(self._manifest_prefix(is_it=True))


class ManifestGenerator(ManifestAccessor, metaclass=ABCMeta):
"""
A generator for manifests. A manifest is an exhaustive representation of
the documents in the aggregate index for a particular entity type. The
Expand All @@ -797,6 +819,8 @@ class ManifestGenerator(metaclass=ABCMeta):
# descendants must be inexpensive. If a property getter performs and
# expensive computation or I/O, it should cache its return value.

service: ManifestService

@classmethod
@abstractmethod
def format(cls) -> ManifestFormat:
Expand Down Expand Up @@ -959,8 +983,7 @@ def __init__(self,

:param service: the service to use when querying the index
"""
super().__init__()
self.service = service
super().__init__(service)
self.catalog = catalog
self.filters = filters
self.file_url_func = service.file_url_func
Expand Down Expand Up @@ -1007,7 +1030,8 @@ def manifest_key(self) -> ManifestKey:

@classmethod
def s3_object_key(cls, manifest_key: ManifestKey) -> str:
return 'manifests' + '/' + cls.s3_object_key_base(manifest_key)
is_it = config.catalogs[manifest_key.catalog].is_integration_test_catalog
return cls._manifest_prefix(is_it) + cls.s3_object_key_base(manifest_key)

@classmethod
def s3_object_key_base(cls, manifest_key: ManifestKey) -> str:
Expand Down Expand Up @@ -1264,10 +1288,6 @@ def write(self,
"""
raise NotImplementedError

@property
def storage(self) -> StorageService:
return self.service.storage_service


class ClientSidePagingManifestGenerator(ManifestGenerator, metaclass=ABCMeta):
"""
Expand Down
6 changes: 6 additions & 0 deletions src/azul/service/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,12 @@ def delete_objects(self,
self._s3.delete_objects(**request)
log.info('Deleted %d objects overall', num_keys)

def delete_prefix(self, prefix: str) -> None:
assert len(prefix) > 1 and prefix.endswith('/'), prefix
object_keys = self.list_objects(prefix)
assert len(object_keys) <= 300, R('Too many objects', len(object_keys))
self.delete_objects(object_keys, batch_size=100)

def list_objects(self, prefix: str) -> OrderedSet[str]:
keys: OrderedSet[str] = OrderedSet()
num_keys = 0
Expand Down
27 changes: 22 additions & 5 deletions test/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@
Token,
)
from azul.service.manifest_service import (
BaseManifestService,
ManifestAccessor,
ManifestFormat,
ManifestGenerator,
)
Expand Down Expand Up @@ -470,6 +472,8 @@ class Catalog:
for flag in ['index', 'delete', 'mirror']
]

manifest_accessor = ManifestAccessor(BaseManifestService())

self._assert_queues_empty(config.indexer_fail_queue_names)
if index:
self._reset_indexer()
Expand Down Expand Up @@ -518,6 +522,12 @@ class Catalog:
bundle_fqids=catalog.bundles)
self._test_single_entity_response(catalog=catalog.name)

# `test_manifest` and `test_manifest_tagging_race` assert how many times
# the step function is executed when retrieving the manifests, with the
# expectation that there are no pre-existing cached manifests. This
# deletion is necessary to enforce that expectation, especially when
# performing consecutive IT runs with the same seed.
manifest_accessor.delete_it_files()
for catalog in catalogs:
self._test_manifest(catalog.name)
self._test_manifest_tagging_race(catalog.name)
Expand All @@ -527,6 +537,9 @@ class Catalog:
public_source=catalog.public_source,
ma_source=catalog.ma_source)

if delete:
manifest_accessor.delete_it_files()

if mirror and config.enable_mirroring:
self._test_mirroring(delete=delete)

Expand Down Expand Up @@ -605,6 +618,7 @@ def _test_manifest(self, catalog: CatalogName):
filters = self._manifest_filters(catalog)
execution_ids = set()
coin_flip = bool(self.random.getrandbits(1))
num_old_executions = 0
for i, fetch in enumerate([coin_flip, coin_flip, not coin_flip]):
with self.subTest('manifest', catalog=catalog, format=format, i=i, fetch=fetch):
args = dict(catalog=catalog, filters=json.dumps(filters))
Expand Down Expand Up @@ -642,15 +656,18 @@ def worker(_):
bucket, key = one(self._manifest_objects(responses))
if i == 0:
aws.s3.delete_object(Bucket=bucket, Key=key)
# One execution to generate the manifest
self.assertEqual(1, len(execution_ids))
# One execution to generate the manifest. However, if
# this test was recently run using the same seed,
# previous executions will be tracked in the token.
self.assertLessEqual(1, len(execution_ids))
num_old_executions = len(execution_ids) - 1
elif i == 1:
# One more execution to re-generate the manifest
self.assertEqual(2, len(execution_ids))
self.assertEqual(num_old_executions + 2, len(execution_ids))
elif i == 2:
# Only fetch mode changed, cached manifest will be used,
# and no additional executions are expectect
self.assertEqual(2, len(execution_ids))
# and no additional executions are expected
self.assertEqual(num_old_executions + 2, len(execution_ids))
else:
assert False

Expand Down
Loading