Skip to content

Commit 61ded60

Browse files
committed
Replaced module-level logger with synapse_client.logger and routed raw REST calls through the /api service layer
1 parent 42fa95f commit 61ded60

1 file changed

Lines changed: 75 additions & 61 deletions

File tree

synapseclient/models/services/migration.py

Lines changed: 75 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import asyncio
88
import collections.abc
99
import json
10-
import logging
1110
import os
1211
import sqlite3
1312
import sys
@@ -27,8 +26,12 @@
2726

2827
from synapseclient import Synapse
2928
from synapseclient.api import get_entity_type, rest_get_paginated_async
30-
from synapseclient.api.entity_services import get_children
29+
from synapseclient.api.entity_services import (
30+
get_children,
31+
update_entity_file_handle_version,
32+
)
3133
from synapseclient.api.file_services import get_file_handle_for_download_async
34+
from synapseclient.api.storage_location_services import get_storage_location_setting
3235
from synapseclient.api.table_services import get_columns
3336
from synapseclient.core import utils
3437
from synapseclient.core.constants import concrete_types
@@ -72,14 +75,12 @@
7275
# Maximum concurrent file copy.
7376
MAX_CONCURRENT_FILE_COPIES = max(int(Synapse().max_threads / 2), 1)
7477

75-
logger = logging.getLogger(__name__)
76-
7778

7879
# =============================================================================
7980
# Indexing Helper Functions
8081
# =============================================================================
8182
async def _verify_storage_location_ownership_async(
82-
storage_location_id: str,
83+
storage_location_id: int,
8384
*,
8485
synapse_client: Optional[Synapse] = None,
8586
) -> None:
@@ -94,7 +95,10 @@ async def _verify_storage_location_ownership_async(
9495
ValueError: If the user does not own the storage location.
9596
"""
9697
try:
97-
await synapse_client.rest_get_async(f"/storageLocation/{storage_location_id}")
98+
await get_storage_location_setting(
99+
storage_location_id=storage_location_id,
100+
synapse_client=synapse_client,
101+
)
98102
except SynapseError:
99103
raise ValueError(
100104
f"Unable to verify ownership of storage location {storage_location_id}. "
@@ -117,7 +121,7 @@ def _get_default_db_path(entity_id: str) -> str:
117121

118122
async def _get_version_numbers_async(
119123
entity_id: str,
120-
synapse_client: "Synapse",
124+
synapse_client: Optional[Synapse] = None,
121125
) -> AsyncGenerator[int, None]:
122126
"""Get all version numbers for an entity.
123127
@@ -164,13 +168,18 @@ def _join_column_names(columns: List[Any]) -> str:
164168
return ",".join(_escape_column_name(c) for c in columns)
165169

166170

167-
def _check_indexed(cursor: sqlite3.Cursor, entity_id: str) -> bool:
171+
def _check_indexed(
172+
cursor: sqlite3.Cursor,
173+
entity_id: str,
174+
synapse_client: Optional[Synapse] = None,
175+
) -> bool:
168176
"""Check if an entity has already been indexed.
169177
If so, it can skip reindexing it.
170178
171179
Arguments:
172180
cursor: The cursor object from the connection to the SQLite database.
173181
entity_id: The entity ID to check.
182+
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
174183
175184
Returns:
176185
True if the entity is already indexed.
@@ -180,10 +189,10 @@ def _check_indexed(cursor: sqlite3.Cursor, entity_id: str) -> bool:
180189
).fetchone()
181190

182191
if indexed_row:
183-
logger.debug("%s already indexed, skipping", entity_id)
192+
synapse_client.logger.debug(f"{entity_id} already indexed, skipping")
184193
return True
185194

186-
logger.debug("%s not yet indexed, indexing now", entity_id)
195+
synapse_client.logger.debug(f"{entity_id} not yet indexed, indexing now")
187196
return False
188197

189198

@@ -569,7 +578,11 @@ def _update_migration_database(
569578

570579

571580
def _confirm_migration(
572-
cursor: sqlite3.Cursor, dest_storage_location_id: str, force: bool = False
581+
cursor: sqlite3.Cursor,
582+
dest_storage_location_id: str,
583+
force: bool = False,
584+
*,
585+
synapse_client: Optional[Synapse] = None,
573586
) -> bool:
574587
"""Confirm migration with user if in interactive mode.
575588
@@ -578,6 +591,7 @@ def _confirm_migration(
578591
dest_storage_location_id: Destination storage location ID.
579592
force: If running in an interactive shell, migration requires an interactice confirmation.
580593
This can be bypassed by using the force=True option. Defaults to False.
594+
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
581595
582596
Returns:
583597
True if migration should proceed, False otherwise.
@@ -592,7 +606,7 @@ def _confirm_migration(
592606
).fetchone()[0]
593607

594608
if count == 0:
595-
logger.info("No items for migration.")
609+
synapse_client.logger.info("No items for migration.")
596610
return False
597611

598612
if sys.stdout.isatty():
@@ -601,11 +615,10 @@ def _confirm_migration(
601615
)
602616
return user_input.strip().lower() == "y"
603617
else:
604-
logger.info(
605-
"%s items for migration. "
618+
synapse_client.logger.info(
619+
f"{count} items for migration. "
606620
"force option not used, and console input not available to confirm migration, aborting. "
607-
"Use the force option or run from an interactive shell to proceed with migration.",
608-
count,
621+
"Use the force option or run from an interactive shell to proceed with migration."
609622
)
610623
return False
611624

@@ -770,7 +783,7 @@ async def index_files_for_migration_async(
770783
synapse_client=client,
771784
)
772785
except IndexingError as ex:
773-
logger.exception(
786+
client.logger.exception(
774787
f"Aborted due to failure to index entity {ex.entity_id} of type {ex.concrete_type}. "
775788
"Use continue_on_error=True to skip individual failures."
776789
)
@@ -793,7 +806,7 @@ async def _index_entity_async(
793806
include_table_files: bool,
794807
continue_on_error: bool,
795808
*,
796-
synapse_client: "Synapse",
809+
synapse_client: Optional[Synapse] = None,
797810
) -> None:
798811
"""Recursively index an entity and its children into migrations database.
799812
@@ -807,7 +820,7 @@ async def _index_entity_async(
807820
file_version_strategy: Strategy for file versions.
808821
include_table_files: Whether to include table-attached files.
809822
continue_on_error: Whether to continue on errors.
810-
synapse_client: The Synapse client.
823+
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
811824
"""
812825
entity_id = utils.id_of(entity)
813826
retrieved_entity = await get_entity_type(
@@ -816,7 +829,7 @@ async def _index_entity_async(
816829
concrete_type = retrieved_entity.type
817830

818831
# Check if already indexed
819-
is_indexed = _check_indexed(cursor, entity_id)
832+
is_indexed = _check_indexed(cursor, entity_id, synapse_client)
820833
try:
821834
if not is_indexed:
822835
if concrete_type == concrete_types.FILE_ENTITY:
@@ -867,7 +880,7 @@ async def _index_entity_async(
867880
raise
868881
except Exception as ex:
869882
if continue_on_error:
870-
logger.warning(f"Error indexing entity {entity_id}: {ex}")
883+
synapse_client.logger.warning(f"Error indexing entity {entity_id}: {ex}")
871884
tb_str = "".join(traceback.format_exception(type(ex), ex, ex.__traceback__))
872885
migration_type = MigrationType.from_concrete_type(concrete_type).value
873886
_record_indexing_error(cursor, entity_id, migration_type, parent_id, tb_str)
@@ -883,7 +896,7 @@ async def _index_file_entity_async(
883896
source_storage_location_ids: List[str],
884897
file_version_strategy: str,
885898
*,
886-
synapse_client: "Synapse",
899+
synapse_client: Optional[Synapse] = None,
887900
) -> None:
888901
"""Index a file entity for migration.
889902
@@ -894,10 +907,10 @@ async def _index_file_entity_async(
894907
dest_storage_location_id: Destination storage location ID.
895908
source_storage_location_ids: List of source storage locations.
896909
file_version_strategy: Strategy for file versions.
897-
synapse_client: The Synapse client.
910+
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
898911
"""
899912
entity_id = utils.id_of(entity)
900-
logger.info("Indexing file entity %s", entity_id)
913+
synapse_client.logger.info(f"Indexing file entity {entity_id}")
901914

902915
entity_versions: List[Tuple[Any, Optional[int]]] = []
903916

@@ -941,7 +954,7 @@ async def _index_file_entity_async(
941954
async def _get_table_file_handle_rows_async(
942955
entity_id: str,
943956
*,
944-
synapse_client: "Synapse",
957+
synapse_client: Optional[Synapse] = None,
945958
) -> List[Tuple[int, int, Dict[str, Any]]]:
946959
"""Get the table file handle rows for a given entity.
947960
@@ -955,7 +968,6 @@ async def _get_table_file_handle_rows_async(
955968
from synapseclient.models import Table
956969
from synapseclient.models.file import FileHandle
957970

958-
# Get file handle columns using the async API
959971
columns = await get_columns(table_id=entity_id, synapse_client=synapse_client)
960972
file_handle_columns = [c for c in columns if c.column_type == "FILEHANDLEID"]
961973

@@ -994,7 +1006,7 @@ async def _index_table_entity_async(
9941006
dest_storage_location_id: str,
9951007
source_storage_location_ids: List[str],
9961008
*,
997-
synapse_client: "Synapse",
1009+
synapse_client: Optional[Synapse] = None,
9981010
) -> None:
9991011
"""Index a table entity's file attachments for migration.
10001012
@@ -1006,7 +1018,7 @@ async def _index_table_entity_async(
10061018
source_storage_location_ids: List of source storage locations to filter.
10071019
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
10081020
"""
1009-
logger.info("Indexing table entity %s", entity_id)
1021+
synapse_client.logger.info(f"Indexing table entity {entity_id}")
10101022
insert_values = []
10111023
async for row_id, row_version, file_handles in _get_table_file_handle_rows_async(
10121024
entity_id=entity_id, synapse_client=synapse_client
@@ -1048,7 +1060,7 @@ async def _index_container_async(
10481060
include_table_files: bool,
10491061
continue_on_error: bool,
10501062
*,
1051-
synapse_client: "Synapse",
1063+
synapse_client: Optional[Synapse] = None,
10521064
) -> None:
10531065
"""Index a container (Project or Folder) and its children.
10541066
@@ -1068,7 +1080,7 @@ async def _index_container_async(
10681080
entity_id=entity_id, synapse_client=synapse_client
10691081
)
10701082
concrete_type = retrieved_entity.type
1071-
logger.info(
1083+
synapse_client.logger.info(
10721084
f'Indexing {concrete_type[concrete_type.rindex(".") + 1 :]} {entity_id}'
10731085
)
10741086

@@ -1134,7 +1146,7 @@ async def _migrate_item_async(
11341146
dest_storage_location_id: str,
11351147
semaphore: asyncio.Semaphore,
11361148
*,
1137-
synapse_client: "Synapse",
1149+
synapse_client: Optional[Synapse] = None,
11381150
) -> Dict[str, Any]:
11391151
"""Migrate a single item.
11401152
@@ -1209,17 +1221,16 @@ async def _create_new_file_version_async(
12091221
entity_id: str,
12101222
to_file_handle_id: str,
12111223
*,
1212-
synapse_client: "Synapse",
1224+
synapse_client: Optional[Synapse] = None,
12131225
) -> None:
12141226
"""Create a new version of a file entity with the new file handle.
12151227
12161228
Arguments:
12171229
entity_id: The file entity ID.
12181230
to_file_handle_id: The new file handle ID.
1219-
synapse_client: The Synapse client.
1231+
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
12201232
"""
1221-
client = Synapse.get_client(synapse_client=synapse_client)
1222-
client.logger.info("Creating new version for file entity %s", entity_id)
1233+
synapse_client.logger.info(f"Creating new version for file entity {entity_id}")
12231234

12241235
entity = await get_async(
12251236
synapse_id=entity_id,
@@ -1236,38 +1247,31 @@ async def _migrate_file_version_async(
12361247
from_file_handle_id: str,
12371248
to_file_handle_id: str,
12381249
*,
1239-
synapse_client: "Synapse",
1250+
synapse_client: Optional[Synapse] = None,
12401251
) -> None:
12411252
"""Migrate/update an existing file version with a new file handle.
12421253
12431254
Arguments:
1244-
entity_id: The file entity ID.
1245-
version: The version number.
1255+
entity_id: The Synapse ID of the entity.
1256+
version: The version number of the entity.
12461257
from_file_handle_id: The original file handle ID.
12471258
to_file_handle_id: The new file handle ID.
1248-
synapse_client: The Synapse client.
1259+
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
12491260
"""
1250-
client = Synapse.get_client(synapse_client=synapse_client)
1251-
client.logger.info(
1252-
"Updating file handle for file entity %s version %s", entity_id, version
1253-
)
1254-
1255-
await client.rest_put_async(
1256-
f"/entity/{entity_id}/version/{version}/filehandle",
1257-
body=json.dumps(
1258-
{
1259-
"oldFileHandleId": from_file_handle_id,
1260-
"newFileHandleId": to_file_handle_id,
1261-
}
1262-
),
1261+
await update_entity_file_handle_version(
1262+
entity_id=entity_id,
1263+
version=version,
1264+
old_file_handle_id=from_file_handle_id,
1265+
new_file_handle_id=to_file_handle_id,
1266+
synapse_client=synapse_client,
12631267
)
12641268

12651269

12661270
async def _migrate_table_attached_file_async(
12671271
key: MigrationKey,
12681272
to_file_handle_id: str,
12691273
*,
1270-
synapse_client: "Synapse",
1274+
synapse_client: Optional[Synapse] = None,
12711275
) -> None:
12721276
"""Migrate/update a table attached file with a new file handle.
12731277
@@ -1367,17 +1371,25 @@ async def migrate_indexed_files_async(
13671371
This is the second step in migrating files to a new storage location.
13681372
Files must first be indexed using `index_files_for_migration_async`.
13691373
1374+
**Interactive confirmation:** When called from an interactive shell and
1375+
`force=False` (the default), this function will print the number of items
1376+
queued for migration and prompt the user to confirm before proceeding
1377+
(``"N items for migration to <location>. Proceed? (y/n)?``). If standard
1378+
output is not connected to an interactive terminal (e.g. a script or CI
1379+
environment), migration is aborted unless ``force=True`` is set.
1380+
13701381
Arguments:
13711382
db_path: Path to SQLite database created by index_files_for_migration_async.
13721383
create_table_snapshots: Whether to create table snapshots before migrating. Defaults to True.
13731384
continue_on_error: Whether to continue on individual migration errors. Defaults to False.
1374-
force: If running in an interactive shell, migration requires an interactice confirmation.
1375-
This can be bypassed by using the force=True option. Defaults to False.
1376-
max_concurrent_copies: Maximum concurrent file copy operations. Defaults to None.
1385+
force: Skip the interactive confirmation prompt and proceed with migration
1386+
automatically. Set to ``True`` when running non-interactively (scripts,
1387+
CI, automated pipelines). Defaults to False.
13771388
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
13781389
13791390
Returns:
1380-
MigrationResult object or None if migration was aborted.
1391+
MigrationResult object, or None if migration was aborted (user declined
1392+
the confirmation prompt, or the session is non-interactive and force=False).
13811393
"""
13821394
client = Synapse.get_client(synapse_client=synapse_client)
13831395

@@ -1395,9 +1407,11 @@ async def migrate_indexed_files_async(
13951407
dest_storage_location_id = existing_settings.dest_storage_location_id
13961408

13971409
# Confirm migration
1398-
confirmed = _confirm_migration(cursor, dest_storage_location_id, force)
1410+
confirmed = _confirm_migration(
1411+
cursor, dest_storage_location_id, force, synapse_client=client
1412+
)
13991413
if not confirmed:
1400-
logger.info("Migration aborted.")
1414+
client.logger.info("Migration aborted.")
14011415
return None
14021416

14031417
# Execute migration
@@ -1419,7 +1433,7 @@ async def _execute_migration_async(
14191433
create_table_snapshots: bool,
14201434
continue_on_error: bool,
14211435
*,
1422-
synapse_client: "Synapse",
1436+
synapse_client: Optional[Synapse] = None,
14231437
) -> None:
14241438
"""Execute the actual file migration.
14251439
@@ -1430,7 +1444,7 @@ async def _execute_migration_async(
14301444
create_table_snapshots: Whether to create table snapshots.
14311445
continue_on_error: Whether to continue on errors.
14321446
max_concurrent: Maximum concurrent operations.
1433-
synapse_client: The Synapse client.
1447+
synapse_client: If not passed in and caching was not disabled by `Synapse.allow_client_caching(False)` this will use the last created instance from the Synapse class constructor.
14341448
"""
14351449
pending_file_handles: Set[str] = set()
14361450
completed_file_handles: Set[str] = set()

0 commit comments

Comments
 (0)