Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added server/app/backend/__init__.py
Empty file.
174 changes: 174 additions & 0 deletions server/app/backend/local_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from typing import Iterator, Dict, Type, Union
import logging
import json
import os
import hashlib
import threading
import weakref

from app.model import AssetAdministrationShellDescriptor, SubmodelDescriptor
from basyx.aas import model
from basyx.aas.model import provider as sdk_provider

from app.model.descriptor import Descriptor
from app.adapter import jsonization


logger = logging.getLogger(__name__)


# We need to resolve the Descriptor type in order to deserialize it again from JSON
DESCRIPTOR_TYPE_TO_STRING: Dict[Type[Union[AssetAdministrationShellDescriptor, SubmodelDescriptor]], str] = {
AssetAdministrationShellDescriptor: "AssetAdministrationShellDescriptor",
SubmodelDescriptor: "SubmodelDescriptor",
}


class LocalFileDescriptorStore(sdk_provider.AbstractObjectStore[model.Identifier, Descriptor]):
"""
An ObjectStore implementation for :class:`~app.model.descriptor.Descriptor` BaSyx Python SDK objects backed
by a local file based local backend
"""
def __init__(self, directory_path: str):
"""
Initializer of class LocalFileDescriptorStore

:param directory_path: Path to the local file backend (the path where you want to store your AAS JSON files)
"""
self.directory_path: str = directory_path.rstrip("/")

# A dictionary of weak references to local replications of stored objects. Objects are kept in this cache as
# long as there is any other reference in the Python application to them. We use this to make sure that only one
# local replication of each object is kept in the application and retrieving an object from the store always
# returns the **same** (not only equal) object. Still, objects are forgotten, when they are not referenced
# anywhere else to save memory.
self._object_cache: weakref.WeakValueDictionary[model.Identifier, Descriptor] \
= weakref.WeakValueDictionary()
self._object_cache_lock = threading.Lock()

def check_directory(self, create=False):
"""
Check if the directory exists and created it if not (and requested to do so)

:param create: If True and the database does not exist, try to create it
"""
if not os.path.exists(self.directory_path):
if not create:
raise FileNotFoundError("The given directory ({}) does not exist".format(self.directory_path))
# Create directory
os.mkdir(self.directory_path)
logger.info("Creating directory {}".format(self.directory_path))

def get_descriptor_by_hash(self, hash_: str) -> Descriptor:
"""
Retrieve an AAS Descriptor object from the local file by its identifier hash

:raises KeyError: If the respective file could not be found
"""
# Try to get the correct file
try:
with open("{}/{}.json".format(self.directory_path, hash_), "r") as file:
obj = json.load(file, cls=jsonization.ServerAASFromJsonDecoder)
except FileNotFoundError as e:
raise KeyError("No Descriptor with hash {} found in local file database".format(hash_)) from e
# If we still have a local replication of that object (since it is referenced from anywhere else), update that
# replication and return it.
with self._object_cache_lock:
if obj.id in self._object_cache:
old_obj = self._object_cache[obj.id]
old_obj.update_from(obj)
return old_obj
self._object_cache[obj.id] = obj
return obj

def get_item(self, identifier: model.Identifier) -> Descriptor:
"""
Retrieve an AAS Descriptor object from the local file by its :class:`~basyx.aas.model.base.Identifier`

:raises KeyError: If the respective file could not be found
"""
try:
return self.get_descriptor_by_hash(self._transform_id(identifier))
except KeyError as e:
raise KeyError("No Identifiable with id {} found in local file database".format(identifier)) from e

def add(self, x: Descriptor) -> None:
"""
Add a Descriptor object to the store

:raises KeyError: If an object with the same id exists already in the object store
"""
logger.debug("Adding object %s to Local File Store ...", repr(x))
if os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(x.id))):
raise KeyError("Descriptor with id {} already exists in local file database".format(x.id))
with open("{}/{}.json".format(self.directory_path, self._transform_id(x.id)), "w") as file:
# Usually, we don't need to serialize the modelType, since during HTTP requests, we know exactly if this
# is an AASDescriptor or SubmodelDescriptor. However, here we cannot distinguish them, so to deserialize
# them successfully, we hack the `modelType` into the JSON.
serialized = json.loads(
json.dumps(x, cls=jsonization.ServerAASToJsonEncoder)
)
serialized["modelType"] = DESCRIPTOR_TYPE_TO_STRING[type(x)]
json.dump(serialized, file, indent=4)
with self._object_cache_lock:
self._object_cache[x.id] = x

def discard(self, x: Descriptor) -> None:
"""
Delete an :class:`~app.model.descriptor.Descriptor` AAS object from the local file store

:param x: The object to be deleted
:raises KeyError: If the object does not exist in the database
"""
logger.debug("Deleting object %s from Local File Store database ...", repr(x))
try:
os.remove("{}/{}.json".format(self.directory_path, self._transform_id(x.id)))
except FileNotFoundError as e:
raise KeyError("No AAS Descriptor object with id {} exists in local file database".format(x.id)) from e
with self._object_cache_lock:
self._object_cache.pop(x.id, None)

def __contains__(self, x: object) -> bool:
"""
Check if an object with the given :class:`~basyx.aas.model.base.Identifier` or the same
:class:`~basyx.aas.model.base.Identifier` as the given object is contained in the local file database

:param x: AAS object :class:`~basyx.aas.model.base.Identifier` or :class:`~app.model.descriptor.Descriptor`
AAS object
:return: ``True`` if such an object exists in the database, ``False`` otherwise
"""
if isinstance(x, model.Identifier):
identifier = x
elif isinstance(x, Descriptor):
identifier = x.id
else:
return False
logger.debug("Checking existence of Descriptor object with id %s in database ...", repr(x))
return os.path.exists("{}/{}.json".format(self.directory_path, self._transform_id(identifier)))

def __len__(self) -> int:
"""
Retrieve the number of objects in the local file database

:return: The number of objects (determined from the number of documents)
"""
logger.debug("Fetching number of documents from database ...")
return len(os.listdir(self.directory_path))

def __iter__(self) -> Iterator[Descriptor]:
"""
Iterate all :class:`~app.model.descriptor.Descriptor` objects in the local folder.

This method returns an iterator, containing only a list of all identifiers in the database and retrieving
the identifiable objects on the fly.
"""
logger.debug("Iterating over objects in database ...")
for name in os.listdir(self.directory_path):
yield self.get_descriptor_by_hash(name.rstrip(".json"))

@staticmethod
def _transform_id(identifier: model.Identifier) -> str:
"""
Helper method to represent an ASS Identifier as a string to be used as Local file document id
"""
return hashlib.sha256(identifier.encode("utf-8")).hexdigest()
45 changes: 45 additions & 0 deletions server/app/model/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Iterable, Dict, TypeVar, Iterator

from basyx.aas import model
from basyx.aas.model import provider as sdk_provider

from app.model.descriptor import Descriptor

_DESCRIPTOR_TYPE = TypeVar("_DESCRIPTOR_TYPE", bound=Descriptor)

class DictDescriptorStore(sdk_provider.AbstractObjectStore[model.Identifier, _DESCRIPTOR_TYPE]):
"""
A local in-memory object store for :class:`~app.model.descriptor.Descriptor` objects, backed by a dict, mapping
:class:`~basyx.aas.model.base.Identifier` → :class:`~app.model.descriptor.Descriptor`
"""

def __init__(self, descriptors: Iterable[_DESCRIPTOR_TYPE] = ()) -> None:
self._backend: Dict[model.Identifier, _DESCRIPTOR_TYPE] = {}
for x in descriptors:
self.add(x)

def get_item(self, identifier: model.Identifier) -> _DESCRIPTOR_TYPE:
return self._backend[identifier]

def add(self, x: _DESCRIPTOR_TYPE) -> None:
if x.id in self._backend and self._backend.get(x.id) is not x:
raise KeyError("Descriptor object with same id {} is already stored in this store"
.format(x.id))
self._backend[x.id] = x

def discard(self, x: _DESCRIPTOR_TYPE) -> None:
if self._backend.get(x.id) is x:
del self._backend[x.id]

def __contains__(self, x: object) -> bool:
if isinstance(x, model.Identifier):
return x in self._backend
if not isinstance(x, Descriptor):
return False
return self._backend.get(x.id) is x

def __len__(self) -> int:
return len(self._backend)

def __iter__(self) -> Iterator[_DESCRIPTOR_TYPE]:
return iter(self._backend.values())
Empty file added server/test/backend/__init__.py
Empty file.
91 changes: 91 additions & 0 deletions server/test/backend/test_local_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright (c) 2026 the Eclipse BaSyx Authors
#
# This program and the accompanying materials are made available under the terms of the MIT License, available in
# the LICENSE file of this project.
#
# SPDX-License-Identifier: MIT
import os.path
import shutil

from unittest import TestCase

from app.backend import local_file
from app import model
from app.model import provider

store_path: str = os.path.dirname(__file__) + "/local_file_test_folder"
source_core: str = "file://localhost/{}/".format(store_path)


class LocalFileBackendTest(TestCase):
def setUp(self) -> None:
self.descriptor_store = local_file.LocalFileDescriptorStore(store_path)
self.descriptor_store.check_directory(create=True)
self.mock_endpoint = model.Endpoint(
interface="AAS-3.0",
protocol_information=model.ProtocolInformation(href="https://example.org/")
)
self.aasd1 = model.AssetAdministrationShellDescriptor(id_="https://example.org/AASDescriptor/1",
endpoints=[self.mock_endpoint])
self.aasd2 = model.AssetAdministrationShellDescriptor(id_="https://example.org/AASDescriptor/2",
endpoints=[self.mock_endpoint])
self.sd1 = model.SubmodelDescriptor(id_="https://example.org/SubmodelDescriptor/1",
endpoints=[self.mock_endpoint])
self.sd2 = model.SubmodelDescriptor(id_="https://example.org/SubmodelDescriptor/2",
endpoints=[self.mock_endpoint])

def tearDown(self) -> None:
try:
self.descriptor_store.clear()
finally:
shutil.rmtree(store_path)


def test_add(self) -> None:
self.descriptor_store.add(self.aasd1)
# Note that this test is only checking that there are no errors during adding.
# The actual logic is tested together with retrieval in `test_retrieval`.

def test_retrieval(self) -> None:
self.descriptor_store.add(self.sd1)

# When retrieving the object, we should get the *same* instance as we added
retrieved_descriptor = self.descriptor_store.get_item("https://example.org/SubmodelDescriptor/1")
self.assertIs(self.sd1, retrieved_descriptor)

def test_iterating(self) -> None:
self.descriptor_store.add(self.sd1)
self.descriptor_store.add(self.sd2)
self.descriptor_store.add(self.aasd1)
self.descriptor_store.add(self.aasd2)
self.assertEqual(4, len(self.descriptor_store))

# Iterate objects, add them to a DictDescriptorStore and check them
retrieved_descriptor_store = provider.DictDescriptorStore()
for item in self.descriptor_store:
retrieved_descriptor_store.add(item)
self.assertEqual(4, len(retrieved_descriptor_store))
self.assertIn(self.sd1, retrieved_descriptor_store)
self.assertIn(self.sd2, retrieved_descriptor_store)
self.assertIn(self.aasd1, retrieved_descriptor_store)
self.assertIn(self.aasd2, retrieved_descriptor_store)

def test_key_errors(self) -> None:
self.descriptor_store.add(self.aasd1)
with self.assertRaises(KeyError) as cm:
self.descriptor_store.add(self.aasd1)
self.assertEqual("'Descriptor with id https://example.org/AASDescriptor/1 already exists in "
"local file database'", str(cm.exception))

self.descriptor_store.discard(self.aasd1)
with self.assertRaises(KeyError) as cm:
self.descriptor_store.get_item("https://example.org/AASDescriptor/1")
self.assertIsNone(self.descriptor_store.get("https://example.org/AASDescriptor/1"))
self.assertEqual("'No Identifiable with id https://example.org/AASDescriptor/1 found in local "
"file database'", str(cm.exception))

def test_reload_discard(self) -> None:
self.descriptor_store.add(self.sd1)
self.descriptor_store = local_file.LocalFileDescriptorStore(store_path)
self.descriptor_store.discard(self.sd1)
self.assertNotIn(self.sd1, self.descriptor_store)
Empty file added server/test/model/__init__.py
Empty file.
52 changes: 52 additions & 0 deletions server/test/model/test_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import unittest

from app import model
from app.model.provider import DictDescriptorStore

class DictDescriptorStoreTest(unittest.TestCase):
def setUp(self) -> None:
self.mock_endpoint = model.Endpoint(
interface="AAS-3.0",
protocol_information=model.ProtocolInformation(href="https://example.org/")
)
self.aasd1 = model.AssetAdministrationShellDescriptor(id_="https://example.org/AASDescriptor/1",
endpoints=[self.mock_endpoint])
self.aasd2 = model.AssetAdministrationShellDescriptor(id_="https://example.org/AASDescriptor/2",
endpoints=[self.mock_endpoint])
self.sd1 = model.SubmodelDescriptor(id_="https://example.org/SubmodelDescriptor/1",
endpoints=[self.mock_endpoint])
self.sd2 = model.SubmodelDescriptor(id_="https://example.org/SubmodelDescriptor/2",
endpoints=[self.mock_endpoint])

def test_store_retrieve(self) -> None:
descriptor_store: DictDescriptorStore = DictDescriptorStore()
descriptor_store.add(self.aasd1)
descriptor_store.add(self.aasd2)
self.assertIn(self.aasd1, descriptor_store)
self.assertFalse(self.sd1 in descriptor_store)

aasd3 = model.AssetAdministrationShellDescriptor(id_="https://example.org/AASDescriptor/1",
endpoints=[self.mock_endpoint])
with self.assertRaises(KeyError) as cm:
descriptor_store.add(aasd3)
self.assertEqual("'Descriptor object with same id https://example.org/AASDescriptor/1 is already "
"stored in this store'", str(cm.exception))
self.assertEqual(2, len(descriptor_store))
self.assertIs(self.aasd1, descriptor_store.get("https://example.org/AASDescriptor/1"))

descriptor_store.discard(self.aasd1)
with self.assertRaises(KeyError) as cm:
descriptor_store.get_item("https://example.org/AASDescriptor/1")
self.assertIsNone(descriptor_store.get("https://example.org/AASDescriptor/1"))
self.assertEqual("'https://example.org/AASDescriptor/1'", str(cm.exception))
self.assertIs(self.aasd2, descriptor_store.pop())
self.assertEqual(0, len(descriptor_store))

def test_store_update(self) -> None:
descriptor_store1: DictDescriptorStore = DictDescriptorStore()
descriptor_store2: DictDescriptorStore = DictDescriptorStore()
descriptor_store1.add(self.sd1)
descriptor_store2.add(self.sd2)
descriptor_store1.update(descriptor_store2)
self.assertIsInstance(descriptor_store1, DictDescriptorStore)
self.assertIn(self.sd2, descriptor_store1)
Loading