From b4b10be1d39fb252adadc69ba402e7041e607ddf Mon Sep 17 00:00:00 2001 From: ale-rt Date: Sun, 30 Nov 2025 09:59:57 +0100 Subject: [PATCH] Add typing annotations --- .meta.toml | 11 +- .pre-commit-config.yaml | 7 ++ news/+a04874c0.internal.md | 1 + pyproject.toml | 2 +- setup.py | 1 + src/plone/api/addon.py | 9 +- src/plone/api/content.py | 142 +++++++++++++++++++------ src/plone/api/group.py | 4 +- src/plone/api/portal.py | 60 ++++++++--- src/plone/api/relation.py | 59 +++++++--- src/plone/api/tests/base.py | 13 ++- src/plone/api/tests/test_content.py | 3 +- src/plone/api/tests/test_doctests.py | 17 +-- src/plone/api/tests/test_env.py | 6 +- src/plone/api/tests/test_portal.py | 12 ++- src/plone/api/tests/test_user.py | 2 +- src/plone/api/tests/test_validation.py | 17 ++- src/plone/api/types.py | 13 +++ src/plone/api/user.py | 84 +++++++++++---- src/plone/api/validation.py | 19 +++- 20 files changed, 359 insertions(+), 123 deletions(-) create mode 100644 news/+a04874c0.internal.md create mode 100644 src/plone/api/types.py diff --git a/.meta.toml b/.meta.toml index 09f42117..eb3bd41c 100644 --- a/.meta.toml +++ b/.meta.toml @@ -9,6 +9,15 @@ commit-id = "2.9.0" codespell_extra_lines = """ exclude: docs/locale/.*.pot """ +extra_lines = """ +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.18.2 + hooks: + - id: mypy + additional_dependencies: + - types-decorator + exclude: docs +""" [pyproject] check_manifest_ignores = """ @@ -23,7 +32,7 @@ check_manifest_ignores = """ "requirements-docs.txt", "requirements.txt", """ -dependencies_ignores = "['Products.PrintingMailHost', 'plone.app.iterate',]" +dependencies_ignores = "['Products.DCWorkflow', 'Products.PrintingMailHost', 'plone.app.iterate',]" codespell_ignores = "manuel,checkin" [tox] diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b69f8abd..8db95272 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -85,6 +85,13 @@ repos: # """ ## +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.18.2 + hooks: + - id: mypy + additional_dependencies: + - types-decorator + exclude: docs ## # Add extra configuration options in .meta.toml: diff --git a/news/+a04874c0.internal.md b/news/+a04874c0.internal.md new file mode 100644 index 00000000..08857852 --- /dev/null +++ b/news/+a04874c0.internal.md @@ -0,0 +1 @@ +Add typing annotations [@ale-rt] diff --git a/pyproject.toml b/pyproject.toml index 8f80f884..d762fdfa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -124,7 +124,7 @@ Zope = [ ] python-dateutil = ['dateutil'] pytest-plone = ['pytest', 'zope.pytestlayer', 'plone.testing', 'plone.app.testing'] -ignore-packages = ['Products.PrintingMailHost', 'plone.app.iterate',] +ignore-packages = ['Products.DCWorkflow', 'Products.PrintingMailHost', 'plone.app.iterate',] ## # Add extra configuration options in .meta.toml: diff --git a/setup.py b/setup.py index 7e64fefb..9682d014 100644 --- a/setup.py +++ b/setup.py @@ -36,6 +36,7 @@ "plone.dexterity", "plone.i18n", "plone.registry", + "plone.supermodel", "plone.uuid", "zope.globalrequest", "Products.CMFCore", diff --git a/src/plone/api/addon.py b/src/plone/api/addon.py index 8fc8bf71..9a378a00 100644 --- a/src/plone/api/addon.py +++ b/src/plone/api/addon.py @@ -11,6 +11,7 @@ from plone.base.utils import get_installer from Products.CMFPlone.controlpanel.browser.quickinstaller import InstallerView from Products.GenericSetup import EXTENSION +from typing import Any from zope.component import getAllUtilitiesRegisteredFor from zope.globalrequest import getRequest @@ -94,7 +95,7 @@ def _get_non_installable_addons() -> NonInstallableAddons: @lru_cache(maxsize=1) -def _cached_addons() -> tuple[tuple[str, AddonInformation]]: +def _cached_addons() -> tuple[tuple[str, AddonInformation], ...]: """Return information about add-ons in this installation. :returns: Tuple of tuples with add-on id and AddonInformation. @@ -130,7 +131,7 @@ def _cached_addons() -> tuple[tuple[str, AddonInformation]]: profile_type = pid_parts[-1] if product_id not in addons: # get some basic information on the product - product = { + product: dict[str, Any] = { "id": product_id, "version": get_version(product_id), "title": product_id, @@ -271,10 +272,10 @@ def get(addon: str) -> AddonInformation: :returns: Add-on information. :rtype: string """ - addons = dict(_cached_addons()) + addons: dict[str, AddonInformation] = dict(_cached_addons()) if addon not in addons: raise InvalidParameterError(f"No add-on {addon} found.") - return _update_addon_info(addons.get(addon), _get_installer()) + return _update_addon_info(addons[addon], _get_installer()) @required_parameters("addon") diff --git a/src/plone/api/content.py b/src/plone/api/content.py index 552afffd..24687d4e 100644 --- a/src/plone/api/content.py +++ b/src/plone/api/content.py @@ -2,10 +2,15 @@ from Acquisition import aq_chain from Acquisition import aq_inner +from collections.abc import Callable +from collections.abc import Iterator from copy import copy as _copy from itertools import islice from plone.api import portal from plone.api.exc import InvalidParameterError +from plone.api.types import Container +from plone.api.types import Content +from plone.api.types import Request from plone.api.validation import at_least_one_of from plone.api.validation import mutually_exclusive_parameters from plone.api.validation import required_parameters @@ -14,6 +19,9 @@ from plone.uuid.interfaces import IUUID from Products.CMFCore.DynamicType import DynamicType from Products.CMFCore.WorkflowCore import WorkflowException +from Products.CMFPlone.WorkflowTool import WorkflowTool +from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition +from typing import Any from zope.component import ComponentLookupError from zope.component import getMultiAdapter from zope.component import getSiteManager @@ -21,6 +29,10 @@ from zope.globalrequest import getRequest from zope.interface import Interface from zope.interface import providedBy +from zope.interface.interface import InterfaceClass +from ZPublisher.BaseRequest import RequestContainer +from ZTUtils.Lazy import LazyCat +from ZTUtils.Lazy import LazyMap import transaction import uuid @@ -34,13 +46,13 @@ @required_parameters("container", "type") @at_least_one_of("id", "title") def create( - container=None, - type=None, - id=None, - title=None, - safe_id=False, + container: Container, + type: str, + id: str | None = None, + title: str | None = None, + safe_id: bool = False, **kwargs, # NOQA: C816, S101 -): +) -> Content: """Create a new content item. :param container: [required] Container object in which to create the new @@ -108,7 +120,7 @@ def create( ), ) - content = container[content_id] + content: Content = container[content_id] if not id or (safe_id and id): # Create a new id from title chooser = INameChooser(container) @@ -128,7 +140,7 @@ def create( @mutually_exclusive_parameters("path", "UID") @at_least_one_of("path", "UID") -def get(path=None, UID=None): +def get(path: str | None = None, UID: str | None = None) -> Content | None: """Get an object. :param path: Path to the object we want to get, relative to @@ -150,10 +162,10 @@ def get(path=None, UID=None): relative_path=path, ) try: - path = path.split("/") - if len(path) > 1: - parent = site.unrestrictedTraverse(path[:-1]) - content = parent.restrictedTraverse(path[-1]) + path_list = path.split("/") + if len(path_list) > 1: + parent = site.unrestrictedTraverse(path_list[:-1]) + content = parent.restrictedTraverse(path_list[-1]) else: content = site.restrictedTraverse(path[-1]) except (KeyError, AttributeError): @@ -165,11 +177,17 @@ def get(path=None, UID=None): elif UID: return uuidToObject(UID) + return None @required_parameters("source") @at_least_one_of("target", "id") -def move(source=None, target=None, id=None, safe_id=False): +def move( + source: Content, + target: Container | None = None, + id: str | None = None, + safe_id: bool = False, +) -> Content: """Move the object to the target container. :param source: [required] Object that we want to move. @@ -211,7 +229,11 @@ def move(source=None, target=None, id=None, safe_id=False): @required_parameters("obj", "new_id") -def rename(obj=None, new_id=None, safe_id=False): +def rename( + obj: Content, + new_id: str, + safe_id: bool = False, +) -> Content: """Rename the object. :param obj: [required] Object that we want to rename. @@ -239,7 +261,12 @@ def rename(obj=None, new_id=None, safe_id=False): @required_parameters("source") @at_least_one_of("target", "id") -def copy(source=None, target=None, id=None, safe_id=False): +def copy( + source: Content, + target: Container | None = None, + id: str | None = None, + safe_id: bool = False, +) -> Content: """Copy the object to the target container. :param source: [required] Object that we want to copy. @@ -285,7 +312,11 @@ def copy(source=None, target=None, id=None, safe_id=False): @mutually_exclusive_parameters("obj", "objects") @at_least_one_of("obj", "objects") -def delete(obj=None, objects=None, check_linkintegrity=True): +def delete( + obj: Content | None = None, + objects: list[Content] | None = None, + check_linkintegrity: bool = True, +): """Delete the object(s). :param obj: Object that we want to delete. @@ -327,7 +358,7 @@ def delete(obj=None, objects=None, check_linkintegrity=True): @required_parameters("obj") -def get_state(obj=None, default=_marker): +def get_state(obj: Content, default: Any = _marker) -> str: """Get the current workflow state of the object. :param obj: [required] Object that we want to get the state for. @@ -350,12 +381,17 @@ def get_state(obj=None, default=_marker): # work backwards from our end state -def _find_path(maps, path, current_state, start_state): +def _find_path( + maps: dict[str, list[tuple[str, list[str]]]], + path: list[str | Any], + current_state: str, + start_state: str, +) -> list[str] | None: paths = [] # current_state could not be on maps if it only has outgoing # transitions. i.e an initial state you are not able to return to. if current_state not in maps: - return + return None for new_transition, from_states in maps[current_state]: next_path = _copy(path) @@ -381,7 +417,9 @@ def _find_path(maps, path, current_state, start_state): return len(paths) and min(paths, key=len) or None -def _wf_transitions_for(workflow, from_state, to_state): +def _wf_transitions_for( + workflow: DCWorkflowDefinition, from_state: str, to_state: str +) -> list[str] | None: """Get list of transition IDs required to transition. from ``from_state`` to ``to_state``. @@ -395,7 +433,7 @@ def _wf_transitions_for(workflow, from_state, to_state): :returns: A list of transitions :rtype: list """ - exit_state_maps = {} + exit_state_maps: dict[str, list[str]] = {} for state in workflow.states.objectValues(): for transition in state.getTransitions(): exit_state_maps.setdefault(transition, []) @@ -419,7 +457,12 @@ def _wf_transitions_for(workflow, from_state, to_state): return _find_path(transition_maps, [], to_state, from_state) -def _transition_to(obj, workflow, to_state, **kwargs): +def _transition_to( + obj: Content, + workflow: WorkflowTool, + to_state: str, + **kwargs, +): # move from the current state to the given state # via any route we can find for wf in workflow.getWorkflowsFor(obj): @@ -453,7 +496,12 @@ def _transition_to(obj, workflow, to_state, **kwargs): @required_parameters("obj") @at_least_one_of("transition", "to_state") @mutually_exclusive_parameters("transition", "to_state") -def transition(obj=None, transition=None, to_state=None, **kwargs): +def transition( + obj: Content, + transition: str | None = None, + to_state: str | None = None, + **kwargs, +): """Perform a workflow transition. for the object or attempt to perform @@ -487,6 +535,7 @@ def transition(obj=None, transition=None, to_state=None, **kwargs): "{}".format(transition, "\n".join(sorted(transitions))), ) else: + assert isinstance(to_state, str) _transition_to(obj, workflow, to_state, **kwargs) if workflow.getInfoFor(obj, "review_state") != to_state: raise InvalidParameterError( @@ -498,7 +547,7 @@ def transition(obj=None, transition=None, to_state=None, **kwargs): @required_parameters("obj") -def disable_roles_acquisition(obj=None): +def disable_roles_acquisition(obj: Content): """Disable acquisition of local roles on given obj. Set __ac_local_roles_block__ = 1 on obj. @@ -512,7 +561,7 @@ def disable_roles_acquisition(obj=None): @required_parameters("obj") -def enable_roles_acquisition(obj=None): +def enable_roles_acquisition(obj: Content): """Enable acquisition of local roles on given obj. Set __ac_local_roles_block__ = 0 on obj. @@ -526,7 +575,11 @@ def enable_roles_acquisition(obj=None): @required_parameters("name", "context") -def get_view(name=None, context=None, request=None): +def get_view( + name: str, + context: Content, + request: Request | None = None, +): """Get a BrowserView object. :param name: [required] Name of the view. @@ -572,7 +625,7 @@ def get_view(name=None, context=None, request=None): @required_parameters("obj") -def get_uuid(obj=None): +def get_uuid(obj: Content) -> str: """Get the object's Universally Unique IDentifier (UUID). :param obj: [required] Object we want its UUID. @@ -587,7 +640,10 @@ def get_uuid(obj=None): @required_parameters("obj") -def get_path(obj=None, relative=False): +def get_path( + obj: Content, + relative: bool = False, +) -> str: """Get the path of an object. :param obj: [required] Object for which to get its path @@ -616,7 +672,7 @@ def get_path(obj=None, relative=False): return "/".join(rel_path) if rel_path else "" -def _parse_object_provides_query(query): +def _parse_object_provides_query(query: Any) -> dict[str, str | list[str]]: """Create a query for the object_provides index. :param query: [required] @@ -643,7 +699,7 @@ def _parse_object_provides_query(query): query_not = [query_not] query_not = [getattr(x, "__identifier__", x) for x in query_not] - result = {} + result: dict[str, str | list[str]] = {} if ifaces: result["query"] = ifaces @@ -655,10 +711,15 @@ def _parse_object_provides_query(query): return result -def find(context=None, depth=None, unrestricted=False, **kwargs): +def find( + context: Content | None = None, + depth: int | None = None, + unrestricted: bool = False, + **kwargs, +) -> LazyMap | LazyCat: """Find content in the portal. - :param context: Context for the search + :param context: Content for the search :type obj: Content object :param depth: How far in the content tree we want to search from context :param unrestricted: Boolean, use unrestrictedSearchResults if True @@ -668,7 +729,7 @@ def find(context=None, depth=None, unrestricted=False, **kwargs): :Example: :ref:`content-find-example` """ - query = {} + query: dict[str, Any] = {} query.update(**kwargs) # Save the original path to maybe restore it later. @@ -716,7 +777,12 @@ def find(context=None, depth=None, unrestricted=False, **kwargs): @required_parameters("obj") -def iter_ancestors(obj=None, function=None, interface=None, stop_at=_marker): +def iter_ancestors( + obj: Content, + function: Callable | None = None, + interface: InterfaceClass | None = None, + stop_at: bool | Content = _marker, +) -> Iterator[Content | RequestContainer]: """Iterate over the object ancestors. Optionally filter the ancestors: @@ -767,6 +833,7 @@ def iter_ancestors(obj=None, function=None, interface=None, stop_at=_marker): else: end = None + ancestors: Iterator[Container | RequestContainer] | Any ancestors = islice(chain, 1, end) if interface is not None: @@ -779,7 +846,12 @@ def iter_ancestors(obj=None, function=None, interface=None, stop_at=_marker): @required_parameters("obj") -def get_closest_ancestor(obj=None, function=None, interface=None, stop_at=_marker): +def get_closest_ancestor( + obj: Content, + function: Callable | None = None, + interface: InterfaceClass | None = None, + stop_at: bool | Content = _marker, +) -> Content | None: """Get the closest ancestor that matches the criteria. See :func:`~plone.api.content.iter_ancestors` for more information on the parameters. diff --git a/src/plone/api/group.py b/src/plone/api/group.py index ac009876..39d3e637 100644 --- a/src/plone/api/group.py +++ b/src/plone/api/group.py @@ -12,7 +12,7 @@ @required_parameters("groupname") def create( - groupname=None, + groupname, title=None, description=None, roles=[], @@ -48,7 +48,7 @@ def create( @required_parameters("groupname") -def get(groupname=None): +def get(groupname): """Get a group. :param groupname: [required] Name of the group we want to get. diff --git a/src/plone/api/portal.py b/src/plone/api/portal.py index 74e8bf98..f68aa62c 100644 --- a/src/plone/api/portal.py +++ b/src/plone/api/portal.py @@ -1,25 +1,35 @@ """Module that provides various utility methods on the portal level.""" from Acquisition import aq_inner +from datetime import date +from datetime import datetime +from DateTime.DateTime import DateTime +from email.mime.multipart import MIMEMultipart from email.utils import formataddr from email.utils import parseaddr from logging import getLogger from plone.api.exc import CannotGetPortalError from plone.api.exc import InvalidParameterError +from plone.api.types import Content +from plone.api.types import Request from plone.api.validation import required_parameters from plone.base.navigationroot import get_navigation_root_object from plone.registry.interfaces import IRegistry from Products.CMFCore.interfaces import ISiteRoot from Products.CMFCore.utils import getToolByName +from Products.CMFPlone.Portal import PloneSite from Products.statusmessages.interfaces import IStatusMessage +from typing import Any from zope.component import ComponentLookupError from zope.component import getUtilitiesFor from zope.component import getUtility from zope.component import providedBy from zope.component.hooks import getSite from zope.globalrequest import getRequest +from zope.interface.interface import InterfaceClass from zope.interface.interfaces import IInterface from zope.schema.interfaces import IVocabularyFactory +from zope.schema.vocabulary import SimpleVocabulary import datetime as dtime import re @@ -51,7 +61,7 @@ MISSING = object() -def get(): +def get() -> PloneSite: """Get the Plone portal object out of thin air. Without the need to import fancy Interfaces and doing multi adapter @@ -75,7 +85,7 @@ def get(): @required_parameters("context") -def get_navigation_root(context=None): +def get_navigation_root(context: Content) -> Content: """Get the navigation root object for the context. This traverses the path up and returns the nearest navigation root. @@ -92,7 +102,7 @@ def get_navigation_root(context=None): @required_parameters("name") -def get_tool(name=None): +def get_tool(name: str) -> Any: """Get a portal tool in a simple way. :param name: [required] Name of the tool you want. @@ -122,11 +132,11 @@ def get_tool(name=None): @required_parameters("recipient", "subject", "body") def send_email( - sender=None, - recipient=None, - subject=None, - body=None, - immediate=False, + sender: str | None = None, + recipient: str | None = None, + subject: str | None = None, + body: MIMEMultipart | str | None = None, + immediate: bool = False, ): """Send an email. @@ -182,7 +192,11 @@ def send_email( @required_parameters("datetime") -def get_localized_time(datetime=None, long_format=False, time_only=False): +def get_localized_time( + datetime: date | DateTime | datetime, + long_format: bool = False, + time_only: bool = False, +) -> str: """Display a date/time in a user-friendly way. It should be localized to the user's preferred language. @@ -230,7 +244,11 @@ def get_localized_time(datetime=None, long_format=False, time_only=False): @required_parameters("message") -def show_message(message=None, request=None, type="info"): +def show_message( + message: str, + request: Request | None = None, + type: str = "info", +): """Display a status message. :param message: [required] Message to show. @@ -249,7 +267,11 @@ def show_message(message=None, request=None, type="info"): @required_parameters("name") -def get_registry_record(name=None, interface=None, default=MISSING): +def get_registry_record( + name: str, + interface: InterfaceClass | None = None, + default: Any = MISSING, +) -> Any: """Get a record value from ``plone.app.registry``. :param name: [required] Name @@ -316,7 +338,11 @@ def get_registry_record(name=None, interface=None, default=MISSING): @required_parameters("name", "value") -def set_registry_record(name=None, value=None, interface=None): +def set_registry_record( + name: str, + value: Any, + interface: InterfaceClass | None = None, +): """Set a record value in the ``plone.app.registry``. :param name: [required] Name of the record @@ -368,7 +394,7 @@ def set_registry_record(name=None, value=None, interface=None): registry[name] = value -def get_default_language(): +def get_default_language() -> str: """Return the default language. :returns: language identifier @@ -382,7 +408,7 @@ def get_default_language(): return settings.default_language -def get_current_language(context=None): +def get_current_language(context: Content | None = None) -> str: """Return the current negotiated language. :param context: context object @@ -399,7 +425,7 @@ def get_current_language(context=None): ) -def translate(msgid, domain="plone", lang=None): +def translate(msgid: str, domain: str = "plone", lang: str | None = None) -> str: """Translate a message into a given language. Default to current negotiated language if no target language specified. @@ -431,7 +457,7 @@ def translate(msgid, domain="plone", lang=None): @required_parameters("name") -def get_vocabulary(name=None, context=None): +def get_vocabulary(name: str, context: Content | None = None) -> SimpleVocabulary: """Return a vocabulary object with the given name. :param name: Name of the vocabulary. @@ -458,7 +484,7 @@ def get_vocabulary(name=None, context=None): return vocabulary(context) -def get_vocabulary_names(): +def get_vocabulary_names() -> list[str]: """Return a list of vocabulary names. :returns: A sorted list of vocabulary names. diff --git a/src/plone/api/relation.py b/src/plone/api/relation.py index c46e7816..a520fd23 100644 --- a/src/plone/api/relation.py +++ b/src/plone/api/relation.py @@ -8,12 +8,15 @@ from importlib.metadata import distribution from importlib.metadata import PackageNotFoundError from plone.api.exc import InvalidParameterError +from plone.api.types import Content from plone.api.validation import at_least_one_of from plone.api.validation import required_parameters from plone.app.linkintegrity.handlers import modifiedContent from plone.app.linkintegrity.utils import referencedRelationship from plone.base.utils import base_hasattr from plone.dexterity.utils import iterSchemataForType +from plone.supermodel.model import SchemaClass +from typing import DefaultDict from z3c.relationfield import event from z3c.relationfield import RelationValue from z3c.relationfield.schema import Relation @@ -23,8 +26,10 @@ from zope.component import getUtility from zope.intid.interfaces import IIntIds from zope.lifecycleevent import modified +from zope.schema._bootstrapfields import Field import logging +import z3c.relationfield.relation try: distribution("plone.app.iterate") @@ -32,14 +37,19 @@ ITERATE_RELATION_NAME = None StagingRelationValue = None else: - from plone.app.iterate.dexterity import ITERATE_RELATION_NAME - from plone.app.iterate.dexterity.relation import StagingRelationValue + # isort: off + from plone.app.iterate.dexterity import ITERATE_RELATION_NAME # type: ignore[no-redef] + from plone.app.iterate.dexterity.relation import StagingRelationValue # type: ignore[no-redef] + + # isort: on logger = logging.getLogger(__name__) -def _get_field_and_schema_for_fieldname(field_id, portal_type): +def _get_field_and_schema_for_fieldname( + field_id: str, portal_type: str +) -> tuple[Field, SchemaClass] | None: """Get field and its schema from a portal_type.""" # Turn form.widgets.IDublinCore.title into title field_id = field_id.split(".")[-1] @@ -47,15 +57,19 @@ def _get_field_and_schema_for_fieldname(field_id, portal_type): field = schema.get(field_id, None) if field is not None: return (field, schema) + return None @at_least_one_of("source", "target", "relationship") def get( - source=None, - target=None, - relationship=None, - unrestricted=False, - as_dict=False, + source: Content | None = None, + target: Content | None = None, + relationship: str | None = None, + unrestricted: bool = False, + as_dict: bool = False, +) -> ( + list[z3c.relationfield.relation.RelationValue] + | DefaultDict[str, list[z3c.relationfield.relation.RelationValue]] ): """Get specific relations given a source/target/relationship. @@ -70,7 +84,8 @@ def get( :param as_dict: If true, return a dictionary with the relationship name as keys. :type id: bool - :returns: A list of relations + :returns: A list of relations or a dict of lists of relations + if as_dict is True. :rtype: List of RelationValue objects :Example: :ref:`relation-get-example` @@ -90,10 +105,13 @@ def get( intids = getUtility(IIntIds) relation_catalog = getUtility(ICatalog) query = {} - results = [] if as_dict: - results = defaultdict(list) + results: DefaultDict[str, list[z3c.relationfield.relation.RelationValue]] = ( + defaultdict(list) + ) + else: + results: list[z3c.relationfield.relation.RelationValue] = [] # type: ignore [no-redef] if not relation_catalog: return results @@ -123,19 +141,23 @@ def get( if as_dict: results[relation.from_attribute].append(relation) else: - results.append(relation) + results.append(relation) # type: ignore [attr-defined] else: continue else: if as_dict: results[relation.from_attribute].append(relation) else: - results.append(relation) + results.append(relation) # type: ignore [attr-defined] return results @required_parameters("source", "target", "relationship") -def create(source=None, target=None, relationship=None): +def create( + source: Content, + target: Content, + relationship: str, +): """Create a relation from source to target using zc.relation. :param source: [required] Object that the relation will originate from. @@ -191,8 +213,7 @@ def create(source=None, target=None, relationship=None): # This can only get a field from a dexterity item. field_and_schema = _get_field_and_schema_for_fieldname( - from_attribute, - source.portal_type, + from_attribute, source.portal_type ) if field_and_schema is None: @@ -252,7 +273,11 @@ def create(source=None, target=None, relationship=None): @at_least_one_of("source", "target", "relationship") -def delete(source=None, target=None, relationship=None): +def delete( + source: Content | None = None, + target: Content | None = None, + relationship: str | None = None, +): """Delete relation or relations. :param source: Object that the relation originates from. diff --git a/src/plone/api/tests/base.py b/src/plone/api/tests/base.py index a29fcf5c..7a1b39b7 100644 --- a/src/plone/api/tests/base.py +++ b/src/plone/api/tests/base.py @@ -1,5 +1,6 @@ """Base module for unittesting.""" +from OFS.Application import Application from plone.app.testing import FunctionalTesting from plone.app.testing import IntegrationTesting from plone.app.testing import login @@ -8,12 +9,18 @@ from plone.app.testing import setRoles from plone.app.testing import TEST_USER_ID from plone.app.testing import TEST_USER_NAME +from plone.testing.zca import NamedConfigurationMachine +from Products.CMFPlone.Portal import PloneSite class PloneApiLayer(PloneSandboxLayer): defaultBases = (PLONE_FIXTURE,) - def setUpZope(self, app, configurationContext): + def setUpZope( + self, + app: Application, + configurationContext: NamedConfigurationMachine, + ): """Prepare Zope instance by loading appropriate ZCMLs.""" import plone.app.dexterity @@ -25,7 +32,7 @@ def setUpZope(self, app, configurationContext): self.loadZCML(package=plone.app.contenttypes) - def setUpPloneSite(self, portal): + def setUpPloneSite(self, portal: PloneSite) -> None: """Prepare a Plone instance for testing.""" # Install into Plone site using portal_setup self.applyProfile(portal, "Products.CMFPlone:plone") @@ -38,7 +45,7 @@ def setUpPloneSite(self, portal): setRoles(portal, TEST_USER_ID, ["Manager"]) login(portal, TEST_USER_NAME) - def tearDownZope(self, app): + def tearDownZope(self, app: Application) -> None: """Tear down Zope.""" diff --git a/src/plone/api/tests/test_content.py b/src/plone/api/tests/test_content.py index fe9db170..4be3acef 100644 --- a/src/plone/api/tests/test_content.py +++ b/src/plone/api/tests/test_content.py @@ -8,6 +8,7 @@ from plone.api.content import _parse_object_provides_query from plone.api.exc import MissingParameterError from plone.api.tests.base import INTEGRATION_TESTING +from plone.api.types import Content from plone.app.contenttypes.interfaces import IFolder from plone.app.linkintegrity.exceptions import LinkIntegrityNotificationException from plone.app.textfield import RichTextValue @@ -920,7 +921,7 @@ def test_delete_with_resolved_internal_breaches(self): self.assertNotIn("blog", self.portal.keys()) self.assertNotIn("training", self.portal["events"].keys()) - def _set_text(self, obj, text): + def _set_text(self, obj: Content, text: str): obj.text = RichTextValue(text, "text/html", "text/x-html-safe") modified(obj) diff --git a/src/plone/api/tests/test_doctests.py b/src/plone/api/tests/test_doctests.py index f0ab669e..2c0d7199 100644 --- a/src/plone/api/tests/test_doctests.py +++ b/src/plone/api/tests/test_doctests.py @@ -1,5 +1,6 @@ """Boilerplate for doctest functional tests.""" +from collections.abc import Callable from importlib.metadata import distribution from importlib.metadata import PackageNotFoundError from logging import getLogger @@ -9,8 +10,10 @@ from plone.app.testing import TEST_USER_ID from plone.app.testing import TEST_USER_NAME from plone.app.testing import TEST_USER_PASSWORD +from plone.app.testing.layers import IntegrationTesting from plone.testing import layered from plone.testing.zope import Browser +from unittest.suite import TestSuite from zope.testing import renormalizing import doctest @@ -47,7 +50,7 @@ ) -def setUp(self): # pragma: no cover +def setUp(self: manuel.testing.TestCase): # pragma: no cover """Shared test environment set-up, ran before every test.""" layer = self.globs["layer"] # Update global variables within the tests. @@ -78,11 +81,11 @@ def setUp(self): # pragma: no cover def DocFileSuite( - testfile, - flags=FLAGS, - setUp=setUp, - layer=PLONE_INTEGRATION_TESTING, -): + testfile: str, + flags: int = FLAGS, + setUp: Callable = setUp, + layer: IntegrationTesting = PLONE_INTEGRATION_TESTING, +) -> TestSuite: """Returns a test suite configured with a test layer. :param testfile: Path to a doctest file. @@ -113,7 +116,7 @@ def DocFileSuite( ) -def test_suite(): +def test_suite() -> TestSuite: """Find .md files and test code examples in them.""" path = "doctests" doctests = [] diff --git a/src/plone/api/tests/test_env.py b/src/plone/api/tests/test_env.py index c82e4af5..cb1927a7 100644 --- a/src/plone/api/tests/test_env.py +++ b/src/plone/api/tests/test_env.py @@ -35,7 +35,7 @@ class TestException(Exception): class HasProtectedMethods(SimpleItem): security = AccessControl.ClassSecurityInfo() - def __init__(self, id): + def __init__(self, id: str): self.id = id @security.public @@ -117,11 +117,11 @@ def tearDown(self): """Shared test environment clean-up, ran after every test.""" AccessControl.SecurityManagement.setSecurityManager(self._old_sm) - def should_allow(self, names): + def should_allow(self, names: list[str]): for name in names: self.portal.hpm.restrictedTraverse(name) - def should_forbid(self, names): + def should_forbid(self, names: list[str]): for name in names: with self.assertRaises(Unauthorized): self.portal.hpm.restrictedTraverse(name) diff --git a/src/plone/api/tests/test_portal.py b/src/plone/api/tests/test_portal.py index e5bb0076..9f881370 100644 --- a/src/plone/api/tests/test_portal.py +++ b/src/plone/api/tests/test_portal.py @@ -16,12 +16,14 @@ from Products.CMFPlone.tests.utils import MockMailHost from Products.MailHost.interfaces import IMailHost from unittest import mock +from unittest.mock import MagicMock from zope import schema from zope.component import getUtility from zope.component.hooks import setSite from zope.interface import Interface from zope.schema.vocabulary import SimpleVocabulary from zope.site import LocalSiteManager +from ZPublisher.HTTPRequest import HTTPRequest import DateTime import unittest @@ -115,6 +117,12 @@ def test_get(self): """Test getting the portal object.""" self.assertEqual(portal.get(), self.portal) + def test_request_alias(self): + """Request alias should map to ZPublisher HTTPRequest.""" + from plone.api.types import Request + + self.assertIs(Request, HTTPRequest) + def test_get_with_sub_site(self): """Using getSite() alone is not enough to get the portal. It will return the closest site, which may return a sub site @@ -138,7 +146,7 @@ def test_get_with_sub_site(self): setSite(self.portal) @mock.patch("plone.api.portal.getSite") - def test_get_no_site(self, getSite): + def test_get_no_site(self, getSite: MagicMock): """Test error msg when getSite() returns None.""" getSite.return_value = None from plone.api.exc import CannotGetPortalError @@ -270,7 +278,7 @@ def test_send_email_without_configured_mailhost(self): self.portal.MailHost.smtp_host = old_smtp_host @mock.patch("plone.api.portal.parseaddr") - def test_send_email_parseaddr(self, mock_parseaddr): + def test_send_email_parseaddr(self, mock_parseaddr: MagicMock): """Simulate faulty parsing in parseaddr, from_address should be default email_from_address. """ diff --git a/src/plone/api/tests/test_user.py b/src/plone/api/tests/test_user.py index 4f8283fb..9e1c9c4e 100644 --- a/src/plone/api/tests/test_user.py +++ b/src/plone/api/tests/test_user.py @@ -41,7 +41,7 @@ def _check_userid_and_username_different(self): username = user.getUserName() self.assertNotEqual(userid, username) - def _set_emaillogin(self, value): + def _set_emaillogin(self, value: bool): api.portal.set_registry_record("plone.use_email_as_login", value) def test_create_no_email(self): diff --git a/src/plone/api/tests/test_validation.py b/src/plone/api/tests/test_validation.py index 7ed4dd5c..414324f0 100644 --- a/src/plone/api/tests/test_validation.py +++ b/src/plone/api/tests/test_validation.py @@ -10,7 +10,9 @@ import unittest -def undecorated_func(arg1=None, arg2=None, arg3=None): +def undecorated_func( + arg1: str | None = None, arg2: str | None = None, arg3: str | None = None +): return "foo" @@ -181,6 +183,19 @@ def test_single_arg_missing(self): with self.assertRaises(MissingParameterError): _func() + def test_single_non_default_arg_missing(self): + """Test that MissingParameterError is raised even without defaults.""" + from plone.api.exc import MissingParameterError + + @required_parameters("arg1") + def _func(arg1, arg2=None): + return "foo" + + with self.assertRaises(MissingParameterError): + _func() + + self.assertEqual(_func("hello"), "foo") + def test_one_missing_one_provided(self): """Test that MissingParameterError is raised if only one of the required parameters is missing. diff --git a/src/plone/api/types.py b/src/plone/api/types.py new file mode 100644 index 00000000..570721e0 --- /dev/null +++ b/src/plone/api/types.py @@ -0,0 +1,13 @@ +"""Shared internal typing aliases used across plone.api modules.""" + +from plone.dexterity.content import DexterityContent as DexterityContext +from Products.CMFCore.PortalFolder import PortalFolder +from typing import TypeAlias +from ZPublisher.HTTPRequest import HTTPRequest + +# Public-facing context type for content APIs. +Content: TypeAlias = DexterityContext +Container: TypeAlias = PortalFolder + +# Shared request type for browser/request-aware APIs. +Request: TypeAlias = HTTPRequest diff --git a/src/plone/api/user.py b/src/plone/api/user.py index a82de89c..93095ab6 100644 --- a/src/plone/api/user.py +++ b/src/plone/api/user.py @@ -2,6 +2,9 @@ from AccessControl.Permission import getPermissions from AccessControl.SecurityManagement import getSecurityManager +from AccessControl.users import SpecialUser +from collections.abc import Iterable +from collections.abc import Iterator from contextlib import contextmanager from plone.api import env from plone.api import portal @@ -9,23 +12,27 @@ from plone.api.exc import InvalidParameterError from plone.api.exc import MissingParameterError from plone.api.exc import UserNotFoundError +from plone.api.types import Content from plone.api.validation import at_least_one_of from plone.api.validation import mutually_exclusive_parameters from plone.api.validation import required_parameters from Products.CMFPlone.RegistrationTool import get_member_by_login_name from Products.PlonePAS.interfaces.plugins import ILocalRolesPlugin +from Products.PlonePAS.tools.groupdata import GroupData +from Products.PlonePAS.tools.memberdata import MemberData +from typing import cast import random import string def create( - email=None, - username=None, - password=None, - roles=("Member",), - properties=None, -): + email: str | None = None, + username: str | None = None, + password: str | None = None, + roles: list[str] | tuple[str, ...] = ("Member",), + properties: dict[str, str] | None = None, +) -> MemberData: """Create a user. :param email: [required] Email for the new user. @@ -95,19 +102,20 @@ def create( roles, properties=properties, ) - # If user_id differs from login_name (e.g. UUID as user id with # email as login), update the login name accordingly. if user_id != login_name: pas = portal.get_tool("acl_users") pas.updateLoginName(user_id, login_name) - return get(userid=user_id) + user = get(userid=user_id) + assert isinstance(user, MemberData) + return user @mutually_exclusive_parameters("userid", "username") @at_least_one_of("userid", "username") -def get(userid=None, username=None): +def get(userid: str | None = None, username: str | None = None) -> MemberData | None: """Get a user. Plone provides both a unique, unchanging identifier for a user (the @@ -141,11 +149,11 @@ def get(userid=None, username=None): ) -def get_current(): +def get_current() -> SpecialUser | MemberData: """Get the currently logged-in user. :returns: Currently logged-in user - :rtype: MemberData object + :rtype: MemberData object or SpecialUser for anonymous user :Example: :ref:`user-get-current-example` """ portal_membership = portal.get_tool("portal_membership") @@ -153,7 +161,9 @@ def get_current(): @mutually_exclusive_parameters("groupname", "group") -def get_users(groupname=None, group=None): +def get_users( + groupname: str | None = None, group: GroupData | None = None +) -> list[MemberData]: """Get all users or all users filtered by group. Arguments ``group`` and ``groupname`` are mutually exclusive. @@ -186,7 +196,7 @@ def get_users(groupname=None, group=None): @mutually_exclusive_parameters("username", "user") @at_least_one_of("username", "user") -def delete(username=None, user=None): +def delete(username: str | None = None, user: MemberData | None = None): """Delete a user. Arguments ``username`` and ``user`` are mutually exclusive. You can either @@ -202,11 +212,11 @@ def delete(username=None, user=None): :Example: :ref:`user-delete-example` """ portal_membership = portal.get_tool("portal_membership") - user_id = username or user.id + user_id = username or cast(MemberData, user).id portal_membership.deleteMembers((user_id,)) -def is_anonymous(): +def is_anonymous() -> bool: """Check if the currently logged-in user is anonymous. :returns: True if the current user is anonymous, False otherwise. @@ -217,7 +227,12 @@ def is_anonymous(): @mutually_exclusive_parameters("username", "user") -def get_roles(username=None, user=None, obj=None, inherit=True): +def get_roles( + username: str | None = None, + user: MemberData | None = None, + obj: Content | None = None, + inherit: bool = True, +) -> list[str] | tuple[str, ...]: """Get user's site-wide or local roles. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -275,13 +290,17 @@ def get_roles(username=None, user=None, obj=None, inherit=True): @contextmanager -def _nop_context_manager(): +def _nop_context_manager() -> Iterator[None]: """Do nothing (trivial context manager).""" yield @mutually_exclusive_parameters("username", "user") -def get_permissions(username=None, user=None, obj=None): +def get_permissions( + username: str | None = None, + user: MemberData | None = None, + obj: Content | None = None, +) -> dict[str, bool]: """Get user's site-wide or local permissions. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -316,7 +335,12 @@ def get_permissions(username=None, user=None, obj=None): @mutually_exclusive_parameters("username", "user") -def has_permission(permission, username=None, user=None, obj=None): +def has_permission( + permission: str, + username: str | None = None, + user: MemberData | None = None, + obj: Content | None = None, +) -> bool: """Check whether this user has the given permission. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -363,7 +387,12 @@ def has_permission(permission, username=None, user=None, obj=None): @required_parameters("roles") @mutually_exclusive_parameters("username", "user") -def grant_roles(username=None, user=None, obj=None, roles=None): +def grant_roles( + username: str | None = None, + user: MemberData | None = None, + obj: Content | None = None, + roles: list[str] | tuple[str, ...] | None = None, +): """Grant roles to a user. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -394,8 +423,10 @@ def grant_roles(username=None, user=None, obj=None, roles=None): roles = list(roles) # These roles cannot be granted - if "Anonymous" in roles or "Authenticated" in roles: - raise InvalidParameterError + if "Anonymous" in roles or "Authenticated" in roles or not isinstance(roles, list): # type: ignore + raise InvalidParameterError( + "Roles must be a list of strings and cannot include 'Anonymous' or 'Authenticated'" + ) if obj is None: actual_roles = get_roles(user=user) @@ -412,7 +443,12 @@ def grant_roles(username=None, user=None, obj=None, roles=None): @required_parameters("roles") @mutually_exclusive_parameters("username", "user") -def revoke_roles(username=None, user=None, obj=None, roles=None): +def revoke_roles( + username: str | None = None, + user: MemberData | None = None, + obj: Content | None = None, + roles: Iterable[str] | None = None, +): """Revoke roles from a user. Arguments ``username`` and ``user`` are mutually exclusive. You @@ -438,6 +474,8 @@ def revoke_roles(username=None, user=None, obj=None, roles=None): if user is None: raise InvalidParameterError("User could not be found") + if roles is None: + roles = set() try: roles_set = set(roles) if not all(isinstance(role, str) for role in roles_set): diff --git a/src/plone/api/validation.py b/src/plone/api/validation.py index 9976d75b..502c5b8c 100644 --- a/src/plone/api/validation.py +++ b/src/plone/api/validation.py @@ -1,14 +1,19 @@ """Provide decorators for validating parameters.""" +from collections.abc import Callable from decorator import decorator from functools import wraps from plone.api.exc import InvalidParameterError from plone.api.exc import MissingParameterError +from typing import Any import inspect -def _get_arg_spec(func, validator_args): +def _get_arg_spec( + func: Callable, + validator_args: tuple[str, ...], +) -> list[str]: """Get the arguments specified in the function spec. and check that the decorator doesn't refer to non-existent args. @@ -37,7 +42,11 @@ def _get_arg_spec(func, validator_args): return signature_args -def _get_supplied_args(signature_params, args, kwargs): +def _get_supplied_args( + signature_params: list[str] | tuple[str, ...], + args: Any, + kwargs: dict[str, Any], +) -> list[Any]: """Return names of all args that have been passed in. either as positional or keyword arguments, and are not None. @@ -55,7 +64,7 @@ def _get_supplied_args(signature_params, args, kwargs): return supplied_args -def required_parameters(*required_params): +def required_parameters(*required_params: str) -> Callable: """Test whether all of the specified parameters have been supplied and are not None. Todo: add an optional flag to allow None values through as valid parameters @@ -90,7 +99,7 @@ def wrapped(*args, **kwargs): return _required_parameters -def mutually_exclusive_parameters(*exclusive_params): +def mutually_exclusive_parameters(*exclusive_params: str) -> Callable: """Provide decorator. The decorator raises an exception if more than one @@ -126,7 +135,7 @@ def wrapped(function, *args, **kwargs): return _mutually_exclusive_parameters -def at_least_one_of(*candidate_params): +def at_least_one_of(*candidate_params: str) -> Callable: """Provide a decorator. The decorator raises an exception if none of the