Skip to content

Commit b67a962

Browse files
small refactoring :)
1 parent d1a0eca commit b67a962

2 files changed

Lines changed: 143 additions & 103 deletions

File tree

cloudinary_cli/modules/clone.py

Lines changed: 137 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import re
1212

1313
DEFAULT_MAX_RESULTS = 500
14+
ALLOWED_TYPE_VALUES = ("upload", "private", "authenticated")
1415

1516

1617
@command("clone",
@@ -49,6 +50,30 @@
4950
"bandwidth costs."))
5051
def clone(target, force, overwrite, concurrent_workers, fields,
5152
search_exp, async_, notification_url, ttl):
53+
target_config, auth_token = _validate_clone_inputs(target)
54+
if not target_config:
55+
return False
56+
57+
source_assets = search_assets(search_exp, force)
58+
if not source_assets or not source_assets.get('resources'):
59+
logger.error(style(f"No asset(s) found in {cloudinary.config().cloud_name}", fg="red"))
60+
return False
61+
62+
upload_list = _prepare_upload_list(
63+
source_assets, target_config, overwrite, async_,
64+
notification_url, auth_token, ttl, fields
65+
)
66+
67+
logger.info(style(f"Copying {len(upload_list)} asset(s) from "
68+
f"{cloudinary.config().cloud_name} to "
69+
f"{target_config.cloud_name}", fg="blue"))
70+
71+
run_tasks_concurrently(upload_file, upload_list, concurrent_workers)
72+
73+
return True
74+
75+
76+
def _validate_clone_inputs(target):
5277
if not target:
5378
print_help_and_exit()
5479

@@ -57,12 +82,12 @@ def clone(target, force, overwrite, concurrent_workers, fields,
5782
logger.error("The specified config does not exist or the "
5883
"CLOUDINARY_URL scheme provided is invalid "
5984
"(expecting to start with 'cloudinary://').")
60-
return False
85+
return None, None
6186

6287
if cloudinary.config().cloud_name == target_config.cloud_name:
6388
logger.error("Target environment cannot be the same "
6489
"as source environment.")
65-
return False
90+
return None, None
6691

6792
auth_token = cloudinary.config().auth_token
6893
if auth_token:
@@ -74,13 +99,13 @@ def clone(target, force, overwrite, concurrent_workers, fields,
7499
except Exception as e:
75100
logger.error(f"{e} - auth_token validation failed. "
76101
"Please double-check your auth_token parameters.")
77-
return False
102+
return None, None
103+
104+
return target_config, auth_token
78105

79-
source_assets = search_assets(force, search_exp)
80-
if not source_assets:
81-
# End command if search_exp contains unsupported type(s)
82-
return False
83106

107+
def _prepare_upload_list(source_assets, target_config, overwrite, async_,
108+
notification_url, auth_token, ttl, fields):
84109
upload_list = []
85110
for r in source_assets.get('resources'):
86111
updated_options, asset_url = process_metadata(r, overwrite, async_,
@@ -89,42 +114,13 @@ def clone(target, force, overwrite, concurrent_workers, fields,
89114
normalize_list_params(fields))
90115
updated_options.update(config_to_dict(target_config))
91116
upload_list.append((asset_url, {**updated_options}))
117+
return upload_list
92118

93-
source_cloud_name = cloudinary.config().cloud_name
94-
if not upload_list:
95-
logger.error(style('No asset(s) found in '
96-
f'{source_cloud_name}', fg="red"))
97-
return False
98-
99-
logger.info(style(f'Copying {len(upload_list)} asset(s) from '
100-
f'{source_cloud_name} to '
101-
f'{target_config.cloud_name}', fg="blue"))
102-
103-
run_tasks_concurrently(upload_file, upload_list, concurrent_workers)
104-
105-
return True
106119

107-
108-
def search_assets(force, search_exp):
109-
# Prevent other unsupported types to prevent
110-
# avoidable errors during the upload process
111-
# and append the default types in not in the
112-
# search expression
113-
ALLOWED_TYPES = {"type:upload", "type:private", "type:authenticated",
114-
"type=upload", "type=private", "type=authenticated"}
115-
if search_exp and re.search(r"\btype\s*[:=]\s*\w+", search_exp):
116-
exp_types = re.findall(r"\btype\s*[:=]\s*\w+", search_exp)
117-
exp_types_cleaned = [''.join(t.split()) for t in exp_types]
118-
unallowed_types = [t for t in exp_types_cleaned if t not in ALLOWED_TYPES]
119-
if unallowed_types:
120-
logger.error("Unsupported type(s) in search expression: "
121-
f"{', '.join(unallowed_types)}. "
122-
"Only upload/private/authenticated types allowed.")
123-
return False
124-
elif search_exp:
125-
search_exp += " AND (type:upload OR type:private OR type:authenticated)"
126-
else:
127-
search_exp = "type:upload OR type:private OR type:authenticated"
120+
def search_assets(search_exp, force):
121+
search_exp = _normalize_search_expression(search_exp)
122+
if not search_exp:
123+
return False
128124

129125
search = cloudinary.search.Search().expression(search_exp)
130126
search.fields(['tags', 'context', 'access_control',
@@ -137,64 +133,110 @@ def search_assets(force, search_exp):
137133
return res
138134

139135

140-
def process_metadata(res, overwrite, async_, notification_url,
141-
auth_token, ttl, copy_fields=""):
142-
cloned_options = {}
143-
acc_ctl = res.get('access_control')
144-
pub_id = res.get('public_id')
145-
del_type = res.get('type')
136+
def _normalize_search_expression(search_exp):
137+
"""
138+
Ensures the search expression has a valid 'type' filter.
139+
140+
- If no expression is given, a default is created.
141+
- If 'type' filters exist, they are validated.
142+
- If no 'type' filters exist, the default is appended.
143+
"""
144+
default_types_str = " OR ".join(f"type:{t}" for t in ALLOWED_TYPE_VALUES)
145+
146+
if not search_exp:
147+
return default_types_str
148+
149+
# Use a simple regex to find all 'type' filters
150+
found_types = re.findall(r"\btype\s*[:=]\s*(\w+)", search_exp)
151+
152+
if not found_types:
153+
# No 'type' filter found, so append the default
154+
return f"{search_exp} AND ({default_types_str})"
155+
156+
# A 'type' filter was found, so validate it
157+
invalid_types = {t for t in found_types if t not in ALLOWED_TYPE_VALUES}
158+
159+
if invalid_types:
160+
error_msg = ", ".join(f"type:{t}" for t in invalid_types)
161+
logger.error(
162+
f"Unsupported type(s) in search expression: {error_msg}. "
163+
f"Only {', '.join(ALLOWED_TYPE_VALUES)} types allowed."
164+
)
165+
return None
166+
167+
# All found types are valid, so return the original expression
168+
return search_exp
169+
170+
171+
def process_metadata(res, overwrite, async_, notification_url, auth_token, ttl, copy_fields=None):
172+
if copy_fields is None:
173+
copy_fields = []
174+
asset_url = _get_asset_url(res, auth_token, ttl)
175+
cloned_options = _build_cloned_options(res, overwrite, async_, notification_url, copy_fields)
176+
177+
return cloned_options, asset_url
178+
179+
180+
def _get_asset_url(res, auth_token, ttl):
181+
if not (isinstance(res.get('access_control'), list) and
182+
len(res.get('access_control')) > 0 and
183+
isinstance(res['access_control'][0], dict) and
184+
res['access_control'][0].get("access_type") == "token"):
185+
return res.get('secure_url')
186+
146187
reso_type = res.get('resource_type')
188+
del_type = res.get('type')
189+
pub_id = res.get('public_id')
147190
file_format = res.get('format')
148-
if (
149-
isinstance(acc_ctl, list)
150-
and len(acc_ctl) > 0
151-
and isinstance(acc_ctl[0], dict)
152-
and acc_ctl[0].get("access_type") == "token"
153-
):
154-
# Generate a time-limited URL for restricted assets
155-
# Use private url if no auth_token provided
156-
if auth_token:
157-
# Don't add format if asset is raw
158-
pub_id_format = (pub_id if reso_type == "raw"
159-
else f"{pub_id}.{file_format}")
160-
asset_url = cloudinary.utils.cloudinary_url(
161-
pub_id_format,
162-
type=del_type,
163-
resource_type=reso_type,
164-
auth_token={"duration": ttl},
165-
secure=True,
166-
sign_url=True)
167-
else:
168-
expiry_date = int(time.time()) + ttl
169-
asset_url = cloudinary.utils.private_download_url(
170-
pub_id,
171-
file_format,
172-
resource_type=reso_type,
173-
type=del_type,
174-
expires_at=expiry_date)
175-
else:
176-
asset_url = res.get('secure_url')
177-
cloned_options['access_control'] = acc_ctl
178-
cloned_options['public_id'] = pub_id
179-
cloned_options['type'] = del_type
180-
cloned_options['resource_type'] = reso_type
181-
cloned_options['overwrite'] = overwrite
182-
cloned_options['async'] = async_
183-
if "tags" in copy_fields:
184-
cloned_options['tags'] = res.get('tags')
185-
if "context" in copy_fields:
186-
cloned_options['context'] = res.get('context')
191+
192+
if auth_token:
193+
# Raw assets already have the format in the public_id
194+
pub_id_format = pub_id if reso_type == "raw" else f"{pub_id}.{file_format}"
195+
return cloudinary.utils.cloudinary_url(
196+
pub_id_format,
197+
type=del_type,
198+
resource_type=reso_type,
199+
auth_token={"duration": ttl},
200+
secure=True,
201+
sign_url=True
202+
)
203+
204+
# Use private url if no auth_token provided
205+
return cloudinary.utils.private_download_url(
206+
pub_id,
207+
file_format,
208+
resource_type=reso_type,
209+
type=del_type,
210+
expires_at=int(time.time()) + ttl
211+
)
212+
213+
214+
def _build_cloned_options(res, overwrite, async_, notification_url, copy_fields):
215+
# 1. Start with mandatory options
216+
cloned_options = {
217+
'overwrite': overwrite,
218+
'async': async_,
219+
}
220+
221+
# 2. Copy fields from source asset. Some are standard, others are from user input.
222+
fields_to_copy = {'public_id', 'type', 'resource_type', 'access_control'}.union(copy_fields)
223+
cloned_options.update({field: res.get(field) for field in fields_to_copy})
224+
225+
# 3. Handle fields that are added only if they have a truthy value
226+
if res.get('display_name'):
227+
cloned_options['display_name'] = res['display_name']
228+
229+
# This is required to put the asset in the correct asset_folder
230+
# when copying from a fixed to DF (dynamic folder) cloud as if
231+
# you just pass a `folder` param to a DF cloud, it will append
232+
# this to the `public_id` and we don't want this.
187233
if res.get('folder'):
188-
# This is required to put the asset in the correct asset_folder
189-
# when copying from a fixed to DF (dynamic folder) cloud as if
190-
# you just pass a `folder` param to a DF cloud, it will append
191-
# this to the `public_id` and we don't want this.
192-
cloned_options['asset_folder'] = res.get('folder')
234+
cloned_options['asset_folder'] = res['folder']
193235
elif res.get('asset_folder'):
194-
cloned_options['asset_folder'] = res.get('asset_folder')
195-
if res.get('display_name'):
196-
cloned_options['display_name'] = res.get('display_name')
236+
cloned_options['asset_folder'] = res['asset_folder']
237+
197238
if notification_url:
198239
cloned_options['notification_url'] = notification_url
199240

200-
return cloned_options, asset_url
241+
# 4. Clean up any None values before returning
242+
return {k: v for k, v in cloned_options.items() if v is not None}

test/test_modules/test_cli_clone.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
import unittest
22
from unittest.mock import patch, MagicMock
33
import re
4-
import importlib.util
5-
import os
6-
7-
# Get the path to the clone module and load it directly to avoid conflicts with the command object
8-
clone_module_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'cloudinary_cli', 'modules', 'clone.py'))
9-
spec = importlib.util.spec_from_file_location("clone_module", clone_module_path)
10-
clone_module = importlib.util.module_from_spec(spec)
11-
spec.loader.exec_module(clone_module)
4+
import sys
5+
6+
# Import the modules package, which will load the clone module.
7+
# The 'clone' name in the package is the command object, so we get the module from sys.modules.
8+
import cloudinary_cli.modules
9+
clone_module = sys.modules['cloudinary_cli.modules.clone']
1210

1311
from cloudinary_cli.defaults import logger
1412

0 commit comments

Comments
 (0)