Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a886a66
chore: initialize repo
joshuaunity Mar 5, 2026
5fc6e42
chore: work in progress
joshuaunity Mar 9, 2026
09e879f
tests: added test to test for cpy asset feature
joshuaunity Mar 11, 2026
f588302
Merge branch 'main' of github.com:FlexMeasures/flexmeasures into feat…
joshuaunity Mar 11, 2026
3f97255
chore: removed unused file
joshuaunity Mar 11, 2026
d609e37
refactor: relocate util function
joshuaunity Mar 11, 2026
abd590b
refacto: Refactored util for copying asset
joshuaunity Mar 13, 2026
fb39eac
tests: and new test case
joshuaunity Mar 13, 2026
71810d0
Merge branch 'main' into feat/copy-assets
joshuaunity Mar 16, 2026
5371a4e
chore: move logic into @post_load
joshuaunity Mar 16, 2026
e84899e
feat: implement deep copy of asset subtree including direct sensors
joshuaunity Mar 17, 2026
510fa71
feat: add permission requirement for copying assets
joshuaunity Mar 18, 2026
bb6e959
feat: enhance asset copy endpoint with detailed OpenAPI specification…
joshuaunity Mar 18, 2026
9952e5b
feat: add test for asset copy API to ensure direct sensors are duplic…
joshuaunity Mar 18, 2026
d9eacf7
Update flexmeasures/api/v3_0/assets.py
joshuaunity Mar 18, 2026
a237a40
feat: enhance asset copy response messages for clarity based on param…
joshuaunity Mar 19, 2026
1a6c4c4
Merge branch 'feat/copy-assets' of github.com:FlexMeasures/flexmeasur…
joshuaunity Mar 19, 2026
8cf58f6
Merge branch 'main' into feat/copy-assets
joshuaunity Mar 19, 2026
3b88319
Merge branch 'main' into feat/copy-assets
joshuaunity Mar 25, 2026
46fc632
Revert same-site sensor reference preservation in asset copy (#2056)
Copilot Mar 30, 2026
54f83ca
Refactor asset copy parameters in API and OpenAPI specs to use 'accou…
joshuaunity Mar 30, 2026
4e84d1f
Merge branch 'main' into feat/copy-assets
joshuaunity Apr 1, 2026
67446b6
fix: improve sensor reference handling and format validation in asset…
joshuaunity Apr 7, 2026
b195085
feat: add audit log entry for asset copying in copy_asset function
joshuaunity Apr 7, 2026
04c25ee
feat: add option to include copy suffix when duplicating assets
joshuaunity Apr 9, 2026
3b8dd82
fix: restore post_annotation function lost during merge, fixing unuse…
Copilot Apr 10, 2026
ca4c390
feat: implement incremental naming for asset copies and add new API e…
joshuaunity Apr 13, 2026
8769a5d
chore: added changelog entry
joshuaunity Apr 13, 2026
a1b807f
Merge branch 'main' into feat/copy-assets
joshuaunity Apr 13, 2026
a2b86ab
Merge branch 'main' into feat/copy-assets
joshuaunity Apr 13, 2026
47a95bd
feat: add support for numeric graph format in asset graph template
joshuaunity Apr 13, 2026
640071a
feat: add validation to prevent copying an asset to itself or its des…
joshuaunity Apr 14, 2026
14cb654
fix: correct response message structure in asset copy rejection tests
joshuaunity Apr 14, 2026
1ac2117
Merge branch 'main' into feat/copy-assets
joshuaunity Apr 15, 2026
0a9090a
refactor: remove unnecessary asset existence check in AssetAPI
joshuaunity Apr 17, 2026
d463d3e
Merge remote-tracking branch 'origin/main' into feat/copy-assets
Flix6x Apr 17, 2026
e28aa47
chore: move changelog entry to v0.33.0
Flix6x Apr 17, 2026
383ee0a
refactor: cross-reference the CopyAssetSchema to get its field descri…
Flix6x Apr 21, 2026
2a5daea
feat: simplify endpoint description
Flix6x Apr 21, 2026
233b206
fix: drop int32 claim, which is unenforced at the Python/marshmallow …
Flix6x Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 319 additions & 0 deletions flexmeasures/api/common/utils/api_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

from copy import deepcopy
import json
from timely_beliefs.beliefs.classes import BeliefsDataFrame
from timely_beliefs.sensors.func_store import knowledge_horizons
from typing import Sequence
from datetime import timedelta

Expand All @@ -13,7 +16,10 @@
from sqlalchemy.exc import IntegrityError

from flexmeasures.data import db
from flexmeasures.data.models.audit_log import AssetAuditLog
from flexmeasures.data.models.user import Account
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.utils import save_to_db
from flexmeasures.auth.policy import check_access
from flexmeasures.api.common.responses import (
Expand All @@ -22,6 +28,7 @@
request_processed,
already_received_and_successfully_processed,
)
from flexmeasures.data.schemas.generic_assets import GenericAssetSchema as AssetSchema
from flexmeasures.utils.error_utils import error_handling_router
from flexmeasures.utils.flexmeasures_inflection import capitalize

Expand Down Expand Up @@ -182,3 +189,315 @@ def get_accessible_accounts() -> list[Account]:
pass

return accounts


def convert_asset_json_fields(asset_kwargs):
"""
Convert string fields in asset_kwargs to JSON where needed.
"""
if "attributes" in asset_kwargs and isinstance(asset_kwargs["attributes"], str):
asset_kwargs["attributes"] = json.loads(asset_kwargs["attributes"])
if "sensors_to_show" in asset_kwargs and isinstance(
asset_kwargs["sensors_to_show"], str
):
asset_kwargs["sensors_to_show"] = json.loads(asset_kwargs["sensors_to_show"])
if "flex_context" in asset_kwargs and isinstance(asset_kwargs["flex_context"], str):
asset_kwargs["flex_context"] = json.loads(asset_kwargs["flex_context"])
if "flex_model" in asset_kwargs and isinstance(asset_kwargs["flex_model"], str):
asset_kwargs["flex_model"] = json.loads(asset_kwargs["flex_model"])
if "sensors_to_show_as_kpis" in asset_kwargs and isinstance(
asset_kwargs["sensors_to_show_as_kpis"], str
):
asset_kwargs["sensors_to_show_as_kpis"] = json.loads(
asset_kwargs["sensors_to_show_as_kpis"]
)
return asset_kwargs


def _copy_direct_sensors(
source_asset: GenericAsset, copied_asset: GenericAsset
) -> dict[int, int]:
"""Copy sensors directly attached to one asset.

Returns a mapping of original sensor ID → new sensor ID for every sensor copied.
"""
sensor_id_map: dict[int, int] = {}
source_sensors = db.session.scalars(
select(Sensor).filter(Sensor.generic_asset_id == source_asset.id)
).all()
for source_sensor in source_sensors:
sensor_kwargs = {}
for column in source_sensor.__table__.columns:
if column.name in [
"id",
"generic_asset_id",
"knowledge_horizon_fnc",
"knowledge_horizon_par",
]:
continue
sensor_kwargs[column.name] = deepcopy(getattr(source_sensor, column.name))

sensor_kwargs["generic_asset_id"] = copied_asset.id
# Reconstruct knowledge_horizon tuple with actual function object
# (stored in DB as function name string, but Sensor constructor expects function object)
knowledge_horizon_fnc = getattr(
knowledge_horizons, source_sensor.knowledge_horizon_fnc
)
sensor_kwargs["knowledge_horizon"] = (
knowledge_horizon_fnc,
deepcopy(source_sensor.knowledge_horizon_par),
)

new_sensor = Sensor(**sensor_kwargs)
db.session.add(new_sensor)
db.session.flush() # obtain new_sensor.id
sensor_id_map[source_sensor.id] = new_sensor.id

return sensor_id_map


# Sentinel returned by _replace_sensor_refs to signal that a containing entry
# should be omitted entirely (used when a sensor reference points to a private
# asset that is neither public nor part of the copied subtree).
_REMOVED = object()


def _is_sensor_on_public_asset(sensor_id: int) -> bool:
"""Return True if *sensor_id* belongs to a public asset (account_id is None).

Unknown sensor IDs are treated as private (returns False) so that stale
references are also cleaned up during the copy.
"""
sensor = db.session.get(Sensor, sensor_id)
if sensor is None:
return False
asset = db.session.get(GenericAsset, sensor.generic_asset_id)
return asset is not None and asset.account_id is None


def _resolve_sensor_id(sensor_id: int, sensor_id_map: dict[int, int]) -> int | object:
"""Resolve a single sensor ID during an asset copy.

Returns:

* The new sensor ID if the sensor was copied (*sensor_id* is in *sensor_id_map*).
* The original sensor ID if the sensor belongs to a public asset.
* :data:`_REMOVED` sentinel if the sensor is on a private external asset.
"""
if sensor_id in sensor_id_map:
return sensor_id_map[sensor_id]
if _is_sensor_on_public_asset(sensor_id):
return sensor_id
return _REMOVED


def _replace_sensor_refs(data, sensor_id_map: dict[int, int]):
"""Recursively replace sensor IDs inside a nested JSON structure.

Handles the two reference patterns used in flex_context, flex_model and
sensors_to_show:

* ``{"sensor": <id>}`` – single sensor reference
* ``{"sensors": [<id>, ...]}`` – list of sensor IDs

For each sensor ID encountered:

* If the ID is in *sensor_id_map* (sensor was copied) → replace with the new ID.
* If the sensor belongs to a **public** asset (``account_id`` is ``None``) →
keep the original ID as-is (publicly accessible, safe to reference).
* Otherwise (sensor on a private asset not in the copied subtree) →
the containing ``{"sensor": id}`` dict is dropped (returns :data:`_REMOVED`)
and plain integer IDs in ``{"sensors": [...]}`` lists are filtered out.
"""

if isinstance(data, dict):
return _replace_sensor_refs_in_dict(data, sensor_id_map)
if isinstance(data, list):
# Update the ids in the list with the new ones, except for sensors from public assets, which are kept as is.
return [
(
_resolve_sensor_id(sensor_id, sensor_id_map)
if isinstance(sensor_id, int)
else _replace_sensor_refs(sensor_id, sensor_id_map)
)
for sensor_id in data
]
return data


def _replace_sensor_refs_in_dict(data: dict, sensor_id_map: dict[int, int]):
"""Handle the dict case for :func:`_replace_sensor_refs`."""
result: dict = {}
for key, value in data.items():
if key == "sensor" and isinstance(value, int):
resolved = _resolve_sensor_id(value, sensor_id_map)
if resolved is _REMOVED:
# Private external sensor: signal parent to drop this entry.
return _REMOVED
result[key] = resolved
elif key == "sensors" and isinstance(value, list):
result[key] = _replace_sensors_list(value, sensor_id_map)
else:
processed = _replace_sensor_refs(value, sensor_id_map)
if processed is not _REMOVED:
result[key] = processed
# else: drop this key entirely
return result


def _replace_sensors_list(value: list, sensor_id_map: dict[int, int]) -> list:
"""Replace/filter integer IDs in a ``{"sensors": [...]}`` list."""
new_list = []
for v in value:
if not isinstance(v, int):
new_list.append(v)
continue
resolved = _resolve_sensor_id(v, sensor_id_map)
if resolved is not _REMOVED:
new_list.append(resolved)
# else: private external sensor, skip
return new_list


def _update_sensor_refs_in_subtree(
asset: GenericAsset, sensor_id_map: dict[int, int]
) -> None:
"""Update sensor references in flex_context, flex_model and sensors_to_show
for the given asset and all its descendants.

Sensor references that were copied are replaced with their new IDs.
References to public (account_id = None) sensors are kept unchanged.
References to private external sensors are removed.
"""
if asset.flex_context:
result = _replace_sensor_refs(deepcopy(asset.flex_context), sensor_id_map)
asset.flex_context = result if result is not _REMOVED else {}
if asset.flex_model:
result = _replace_sensor_refs(deepcopy(asset.flex_model), sensor_id_map)
asset.flex_model = result if result is not _REMOVED else {}
if asset.sensors_to_show:
result = _replace_sensor_refs(deepcopy(asset.sensors_to_show), sensor_id_map)
asset.sensors_to_show = result if result is not _REMOVED else []
if asset.sensors_to_show_as_kpis:
result = _replace_sensor_refs(
deepcopy(asset.sensors_to_show_as_kpis), sensor_id_map
)
asset.sensors_to_show_as_kpis = result if result is not _REMOVED else []

children = db.session.scalars(
select(GenericAsset).filter(GenericAsset.parent_asset_id == asset.id)
).all()
for child in children:
_update_sensor_refs_in_subtree(child, sensor_id_map)


def _copy_asset_subtree(
source_asset: GenericAsset,
destination_account_id: int,
destination_parent_asset_id: int | None,
asset_schema: AssetSchema,
add_copy_suffix: bool,
) -> tuple[GenericAsset, dict[int, int]]:
"""Recursively copy one asset and all descendants.

Returns a tuple of (copied_asset, sensor_id_map) where sensor_id_map maps
every original sensor ID in the entire subtree to the corresponding new ID.
"""
asset_kwargs = asset_schema.dump(source_asset)

for key in ["id", "owner", "generic_asset_type", "child_assets", "sensors"]:
asset_kwargs.pop(key, None)

if add_copy_suffix:
asset_kwargs["name"] = f"{asset_kwargs['name']} (Copy)"
asset_kwargs["account_id"] = destination_account_id
asset_kwargs["parent_asset_id"] = destination_parent_asset_id
asset_kwargs = convert_asset_json_fields(asset_kwargs)

copied_asset = GenericAsset(**asset_kwargs)
db.session.add(copied_asset)
db.session.flush()

sensor_id_map = _copy_direct_sensors(source_asset, copied_asset)

source_children = db.session.scalars(
select(GenericAsset)
.filter(GenericAsset.parent_asset_id == source_asset.id)
.order_by(GenericAsset.id)
).all()
for source_child in source_children:
_, child_sensor_map = _copy_asset_subtree(
source_asset=source_child,
destination_account_id=destination_account_id,
destination_parent_asset_id=copied_asset.id,
asset_schema=asset_schema,
add_copy_suffix=False,
)
sensor_id_map.update(child_sensor_map)

return copied_asset, sensor_id_map


def copy_asset(
asset: GenericAsset,
account=None,
parent_asset=None,
) -> GenericAsset:
"""
Copy an asset subtree to a target account and/or under a target parent asset.

The copied subtree includes:
- the selected asset
- all descendant child assets (recursively)
- all sensors directly attached to each copied asset

Resolution rules:

- If neither ``account`` nor ``parent_asset`` is given, the copy is placed in
the same account and under the same parent as the original (i.e. a sibling).
- If ``account`` is given but ``parent_asset`` is not, the copy becomes a
top-level asset (no parent) in the given account.
- If ``parent_asset`` is given but ``account`` is not, the copy is placed under
the given parent and inherits that parent's account.
- If both are given, the copy belongs to the given account and is placed under
the given parent. This allows creating a copy that belongs to a different
account than its parent.
"""
try:
asset_schema = AssetSchema()

if account is None and parent_asset is None:
target_account_id = int(asset.account_id)
target_parent_asset_id = asset.parent_asset_id
elif account is not None and parent_asset is None:
target_account_id = int(account.id)
target_parent_asset_id = None
elif account is None and parent_asset is not None:
target_account_id = int(parent_asset.account_id)
target_parent_asset_id = int(parent_asset.id)
else:
target_account_id = int(account.id)
target_parent_asset_id = int(parent_asset.id)

copied_root, sensor_id_map = _copy_asset_subtree(
source_asset=asset,
destination_account_id=target_account_id,
destination_parent_asset_id=target_parent_asset_id,
asset_schema=asset_schema,
add_copy_suffix=True,
)
if sensor_id_map:
_update_sensor_refs_in_subtree(copied_root, sensor_id_map)

AssetAuditLog.add_record(
copied_root,
(
f"Copied asset '{asset.name}': {asset.id} "
f"to '{copied_root.name}': {copied_root.id}"
),
)
db.session.commit()
Comment thread
joshuaunity marked this conversation as resolved.
return copied_root
except Exception as e:
db.session.rollback()
raise e
Loading
Loading