diff --git a/src/linkahead/common/datatype.py b/src/linkahead/common/datatype.py index c0c15feca240112f1f8e33a0cd37932151fcd9f0..65e6246c0287f0af07aa604f4bc18ce54615cae2 100644 --- a/src/linkahead/common/datatype.py +++ b/src/linkahead/common/datatype.py @@ -24,6 +24,10 @@ # import re +import sys + +if sys.version_info >= (3, 8): + from typing import Literal from ..exceptions import EmptyUniqueQueryError, QueryNotUniqueError @@ -34,6 +38,8 @@ DATETIME = "DATETIME" INTEGER = "INTEGER" FILE = "FILE" BOOLEAN = "BOOLEAN" +if sys.version_info >= (3, 8): + DATATYPE = Literal["DOUBLE", "REFERENCE", "TEXT", "DATETIME", "INTEGER", "FILE", "BOOLEAN"] def LIST(datatype): diff --git a/src/linkahead/common/models.py b/src/linkahead/common/models.py index 6f6e4c8f51674e17174af7cc03ec0375c6afde8b..2828b1ad5bbe84e1120d0da65f23686d9245212a 100644 --- a/src/linkahead/common/models.py +++ b/src/linkahead/common/models.py @@ -35,6 +35,7 @@ transactions. from __future__ import annotations # Can be removed with 3.10. from __future__ import print_function, unicode_literals +from enum import Enum import re import sys @@ -46,7 +47,14 @@ from os import listdir from os.path import isdir from random import randint from tempfile import NamedTemporaryFile -from typing import Any, Optional + +from typing import TYPE_CHECKING + +if TYPE_CHECKING and sys.version_info > (3, 7): + from datetime import datetime + from typing import Any, Dict, Optional, Type, Union, List, TextIO, Tuple, Literal + from .datatype import DATATYPE + from warnings import warn from lxml import etree @@ -63,8 +71,16 @@ from ..exceptions import (AmbiguousEntityError, AuthorizationError, QueryNotUniqueError, TransactionError, UniqueNamesError, UnqualifiedParentsError, UnqualifiedPropertiesError) -from .datatype import (BOOLEAN, DATETIME, DOUBLE, INTEGER, TEXT, - get_list_datatype, is_list_datatype, is_reference) +from .datatype import ( + BOOLEAN, + DATETIME, + DOUBLE, + INTEGER, + TEXT, + get_list_datatype, + is_list_datatype, + is_reference, +) from .state import State from .timezone import TimeZone from .utils import uuid, xml2str @@ -72,14 +88,14 @@ from .versioning import Version _ENTITY_URI_SEGMENT = "Entity" -# importances/inheritance OBLIGATORY = "OBLIGATORY" SUGGESTED = "SUGGESTED" RECOMMENDED = "RECOMMENDED" FIX = "FIX" ALL = "ALL" NONE = "NONE" - +if TYPE_CHECKING: + INHERITANCE = Literal["OBLIGATORY", "SUGGESTED", "RECOMMENDED", "FIX", "ALL", "NONE"] SPECIAL_ATTRIBUTES = ["name", "role", "datatype", "description", "id", "path", "checksum", "size", "value"] @@ -98,41 +114,50 @@ class Entity: by the user to control several server-side plug-ins. """ - def __init__(self, name=None, id=None, description=None, # @ReservedAssignment - datatype=None, value=None, **kwargs): + def __init__( + self, + name: Optional[str] = None, + id: Optional[int] = None, + description: Optional[str] = None, # @ReservedAssignment + datatype: Optional[DATATYPE] = None, + value=None, + **kwargs, + ): + self.__role = kwargs["role"] if "role" in kwargs else None - self._checksum = None + self._checksum: Optional[str] = None self._size = None self._upload = None # If an entity is used (e.g. as parent), it is wrapped instead of being used directly. # see Entity._wrap() - self._wrapped_entity = None - self._version = None - self._cuid = None - self._flags = dict() + self._wrapped_entity: Optional[Entity] = None + self._version: Optional[Version] = None + self._cuid: Optional[str] = None + self._flags: Dict[str, str] = dict() self.__value = None - self.__datatype = None - self.datatype = datatype + self.__datatype: Optional[DATATYPE] = None + self.datatype: Optional[DATATYPE] = datatype self.value = value self.messages = Messages() self.properties = _Properties() self.parents = _ParentList() - self.path = None - self.file = None - self.unit = None - self.acl = None - self.permissions = None + self.path: Optional[str] = None + self.file: Optional[File] = None + self.unit: Optional[str] = None + self.acl: Optional[ACL] = None + self.permissions: Optional[Permissions] = None self.is_valid = lambda: False self.is_deleted = lambda: False self.name = name self.description = description - self.id = id - self.state = None + self.id: Optional[int] = id + self.state: Optional[State] = None def copy(self): """ Return a copy of entity. + FIXME: This method doesn't have a deep keyword argument. If deep == True return a deep copy, recursively copying all sub entities. Standard properties are copied using add_property. @@ -178,7 +203,7 @@ class Entity: return self._wrapped_entity.version @version.setter - def version(self, version): + def version(self, version: Optional[Version]): self._version = version @property @@ -250,14 +275,14 @@ class Entity: return self._wrapped_entity.description - @property - def checksum(self): - return self._checksum - @description.setter def description(self, new_description): self.__description = new_description + @property + def checksum(self): + return self._checksum + @property def unit(self): if self.__unit is not None or self._wrapped_entity is None: @@ -324,8 +349,15 @@ class Entity: def pickup(self, new_pickup): self.__pickup = new_pickup - def grant(self, realm=None, username=None, role=None, - permission=None, priority=False, revoke_denial=True): + def grant( + self, + realm: Optional[str] = None, + username: Optional[str] = None, + role: Optional[str] = None, + permission: Optional[str] = None, + priority: bool = False, + revoke_denial: bool = True, + ): """Grant a permission to a user or role for this entity. You must specify either only the username and the realm, or only the @@ -359,8 +391,15 @@ class Entity: permission=permission, priority=priority, revoke_denial=revoke_denial) - def deny(self, realm=None, username=None, role=None, - permission=None, priority=False, revoke_grant=True): + def deny( + self, + realm: Optional[str] = None, + username: Optional[str] = None, + role: Optional[str] = None, + permission: Optional[str] = None, + priority: bool = False, + revoke_grant: bool = True, + ): """Deny a permission to a user or role for this entity. You must specify either only the username and the realm, or only the @@ -412,7 +451,7 @@ class Entity: permission=permission, priority=priority) - def is_permitted(self, permission, role=None): + def is_permitted(self, permission: Permission, role: Optional[str] = None): if role is None: # pylint: disable=unsupported-membership-test @@ -420,7 +459,7 @@ class Entity: else: self.acl.is_permitted(permission=permission) - def get_all_messages(self): + def get_all_messages(self) -> Messages: ret = Messages() ret.append(self.messages) @@ -519,8 +558,29 @@ class Entity: return self - def add_property(self, property=None, value=None, id=None, name=None, description=None, datatype=None, - unit=None, importance=None, inheritance=None): # @ReservedAssignment + def add_property( + self, + property: Union[int, str, Entity, None] = None, + value: Union[ + int, + str, + bool, + datetime, + Entity, + List[int], + List[str], + List[bool], + List[Entity], + None, + ] = None, + id: Optional[int] = None, + name: Optional[str] = None, + description: Optional[str] = None, + datatype: Optional[str] = None, + unit: Optional[str] = None, + importance: Optional[str] = None, + inheritance: Union[str, INHERITANCE, None] = None, + ) -> Entity: # @ReservedAssignment """Add a property to this entity. The first parameter is meant to identify the property entity either via @@ -690,7 +750,13 @@ class Entity: return self - def add_parent(self, parent=None, id=None, name=None, inheritance=None): # @ReservedAssignment + def add_parent( + self, + parent: Union[Entity, int, str, None] = None, + id: Optional[int] = None, + name: Optional[str] = None, + inheritance: Union[INHERITANCE, str, None] = None, + ): # @ReservedAssignment """Add a parent to this entity. Parameters @@ -704,7 +770,7 @@ class Entity: name : str Name of the parent entity. Ignored if `parent is not none`. - inheritance : str + inheritance : str, INHERITANCE One of ``obligatory``, ``recommended``, ``suggested``, or ``fix``. Specifies the minimum importance which parent properties need to have to be inherited by this entity. If no `inheritance` is given, no properties will be inherited by the child. @@ -811,7 +877,7 @@ out: bool return self.parents - def get_parents_recursively(self, retrieve: bool = True): + def get_parents_recursively(self, retrieve: bool = True) -> List[Entity]: """Get all ancestors of this entity. Parameters @@ -826,12 +892,12 @@ out: List[Entity] The parents of this Entity """ - all_parents = [] + all_parents: List[Entity] = [] self._get_parent_recursively(all_parents, retrieve=retrieve) return all_parents - def _get_parent_recursively(self, all_parents: list, retrieve: bool = True): + def _get_parent_recursively(self, all_parents: List[Entity], retrieve: bool = True): """Get all ancestors with a little helper. As a side effect of this method, the ancestors are added to @@ -862,7 +928,7 @@ out: List[Entity] all_parents.append(w_parent) w_parent._get_parent_recursively(all_parents, retrieve=retrieve) - def get_parent(self, key): + def get_parent(self, key: Union[int, Entity, str]) -> Union[Entity, None]: """Return the first parent matching the key or None if no match exists. Parameters @@ -911,7 +977,7 @@ out: List[Entity] return self.properties - def get_property(self, pattern): + def get_property(self, pattern: Union[int, str, Entity]) -> Union[Property, None]: """ Return the first matching property or None. Parameters @@ -956,7 +1022,9 @@ out: List[Entity] return None - def _get_value_for_selector(self, selector): + def _get_value_for_selector( + self, selector: Union[str, List[str], Tuple[str]] + ) -> Any: """return the value described by the selector A selector is a list or a tuple of strings describing a path in an @@ -1093,7 +1161,7 @@ out: List[Entity] return ret - def get_errors_deep(self, roots=None): + def get_errors_deep(self, roots=None) -> List[Tuple[str, List[Entity]]]: """Get all error messages of this entity and all sub-entities / parents / properties. @@ -1124,7 +1192,12 @@ out: List[Entity] return False - def to_xml(self, xml=None, add_properties=ALL, local_serialization=False): + def to_xml( + self, + xml: Optional[etree._Element] = None, + add_properties: Optional[INHERITANCE] = ALL, + local_serialization: bool = False, + ) -> etree._Element: """Generate an xml representation of this entity. If the parameter xml is given, all attributes, parents, properties, and messages of this entity will be added to it instead of creating a new element. @@ -1144,7 +1217,6 @@ out: List[Entity] assert isinstance(xml, etree._Element) # unwrap wrapped entity - if self._wrapped_entity is not None: xml = self._wrapped_entity.to_xml(xml, add_properties) @@ -1394,8 +1466,14 @@ out: List[Entity] return Container().append(self).retrieve( unique=unique, raise_exception_on_error=raise_exception_on_error, flags=flags) - def insert(self, raise_exception_on_error=True, unique=True, - sync=True, strict=False, flags=None): + def insert( + self, + raise_exception_on_error=True, + unique=True, + sync=True, + strict=False, + flags: Optional[dict] = None, + ): """Insert this entity into a LinkAhead server. A successful insertion will generate a new persistent ID for this entity. This entity can be identified, retrieved, updated, and deleted via this ID until it has @@ -1758,14 +1836,20 @@ class Parent(Entity): def affiliation(self, affiliation): self.__affiliation = affiliation - def __init__(self, id=None, name=None, description=None, inheritance=None): # @ReservedAssignment + def __init__( + self, + id: Optional[int] = None, + name: Optional[str] = None, + description: Optional[str] = None, + inheritance: Optional[INHERITANCE] = None, + ): # @ReservedAssignment Entity.__init__(self, id=id, name=name, description=description) if inheritance is not None: self.set_flag("inheritance", inheritance) self.__affiliation = None - def to_xml(self, xml=None, add_properties=None): + def to_xml(self, xml: Optional[etree._Element] = None, add_properties=None): if xml is None: xml = etree.Element("Parent") @@ -1822,13 +1906,20 @@ class Property(Entity): return super(Property, self).add_parent(parent=parent, id=id, name=name, inheritance=inheritance) - def __init__(self, name=None, id=None, description=None, datatype=None, - value=None, unit=None): + def __init__( + self, + name: Optional[str] = None, + id: Optional[int] = None, + description: Optional[str] = None, + datatype: Union[DATATYPE, None] = None, + value=None, + unit: Optional[str] = None, + ): Entity.__init__(self, id=id, name=name, description=description, datatype=datatype, value=value, role="Property") self.unit = unit - def to_xml(self, xml=None, add_properties=ALL): + def to_xml(self, xml: Optional[etree._Element] = None, add_properties=ALL): if xml is None: xml = etree.Element("Property") @@ -1878,13 +1969,19 @@ class Property(Entity): class Message(object): - def __init__(self, type=None, code=None, description=None, body=None): # @ReservedAssignment + def __init__( + self, + type: Optional[str] = None, + code: Optional[int] = None, + description: Optional[str] = None, + body: Optional[str] = None, + ): # @ReservedAssignment self.description = description self.type = type if type is not None else "Info" self.code = int(code) if code is not None else None self.body = body - def to_xml(self, xml=None): + def to_xml(self, xml: Optional[etree._Element] = None): if xml is None: xml = etree.Element(str(self.type)) @@ -1926,7 +2023,13 @@ class RecordType(Entity): property=property, id=id, name=name, description=description, datatype=datatype, value=value, unit=unit, importance=importance, inheritance=inheritance) - def add_parent(self, parent=None, id=None, name=None, inheritance=OBLIGATORY): + def add_parent( + self, + parent: Union[Entity, int, str, None] = None, + id: Optional[int] = None, + name: Optional[str] = None, + inheritance: Union[INHERITANCE, str, None] = OBLIGATORY, + ): """Add a parent to this RecordType Parameters @@ -1959,11 +2062,21 @@ class RecordType(Entity): return super().add_parent(parent=parent, id=id, name=name, inheritance=inheritance) - def __init__(self, name=None, id=None, description=None, datatype=None): # @ReservedAssignment + def __init__( + self, + name: Optional[str] = None, + id: Optional[int] = None, + description: Optional[str] = None, + datatype: Optional[DATATYPE] = None, + ): # @ReservedAssignment Entity.__init__(self, name=name, id=id, description=description, datatype=datatype, role="RecordType") - def to_xml(self, xml=None, add_properties=ALL): + def to_xml( + self, + xml: Optional[etree._Element] = None, + add_properties: Optional[INHERITANCE] = ALL, + ) -> etree._Element: if xml is None: xml = etree.Element("RecordType") @@ -1982,7 +2095,12 @@ class Record(Entity): property=property, id=id, name=name, description=description, datatype=datatype, value=value, unit=unit, importance=importance, inheritance=inheritance) - def __init__(self, name=None, id=None, description=None): # @ReservedAssignment + def __init__( + self, + name: Optional[str] = None, + id: Optional[int] = None, + description: Optional[str] = None, + ): # @ReservedAssignment Entity.__init__(self, name=name, id=id, description=description, role="Record") @@ -2023,9 +2141,17 @@ class File(Record): """ - def __init__(self, name=None, id=None, description=None, # @ReservedAssignment - path=None, file=None, pickup=None, # @ReservedAssignment - thumbnail=None, from_location=None): + def __init__( + self, + name: Optional[str] = None, + id: Optional[int] = None, + description: Optional[str] = None, # @ReservedAssignment + path: Optional[str] = None, + file: Union[str, TextIO, None] = None, + pickup: Optional[str] = None, # @ReservedAssignment + thumbnail: Optional[str] = None, + from_location=None, + ): Record.__init__(self, id=id, name=name, description=description) self.role = "File" self.datatype = None @@ -2046,7 +2172,12 @@ class File(Record): if self.pickup is None: self.pickup = from_location - def to_xml(self, xml=None, add_properties=ALL, local_serialization=False): + def to_xml( + self, + xml: Optional[etree._Element] = None, + add_properties: Optional[INHERITANCE] = ALL, + local_serialization: bool = False, + ): """Convert this file to an xml element. @return: xml element @@ -2142,6 +2273,7 @@ class File(Record): class _Properties(list): + """FIXME: Add docstring.""" def __init__(self): list.__init__(self) @@ -2161,7 +2293,7 @@ class _Properties(list): if property is not None: self._importance[property] = importance - def get_by_name(self, name): + def get_by_name(self, name: str) -> Property: """Get a property of this list via it's name. Raises a LinkAheadException if not exactly one property has this name. @@ -2176,7 +2308,12 @@ class _Properties(list): return self - def append(self, property, importance=None, inheritance=None): # @ReservedAssignment + def append( + self, + property: Union[List[Entity], Entity], + importance=None, + inheritance: Union[str, INHERITANCE, None] = None, + ): # @ReservedAssignment if isinstance(property, list): for p in property: self.append(p, importance, inheritance) @@ -2203,7 +2340,9 @@ class _Properties(list): return self - def to_xml(self, add_to_element, add_properties): + def to_xml( + self, add_to_element: etree._Element, add_properties: Union[str, INHERITANCE] + ): for p in self: importance = self._importance.get(p) @@ -2213,7 +2352,7 @@ class _Properties(list): pelem = p.to_xml(xml=etree.Element("Property"), add_properties=FIX) if p in self._importance: - pelem.set("importance", importance) + pelem.set("importance", str(importance)) if p in self._inheritance: pelem.set("flag", "inheritance:" + @@ -2228,7 +2367,7 @@ class _Properties(list): return xml2str(xml) - def _get_entity_by_cuid(self, cuid): + def _get_entity_by_cuid(self, cuid: str): ''' Get the first entity which has the given cuid. Note: this method is intended for internal use. @@ -2242,7 +2381,7 @@ class _Properties(list): return e raise KeyError("No entity with that cuid in this container.") - def remove(self, prop): + def remove(self, prop: Union[Entity, int]): if isinstance(prop, Entity): if prop in self: list.remove(self, prop) @@ -2366,7 +2505,7 @@ class _ParentList(list): return xml2str(xml) - def remove(self, parent): + def remove(self, parent: Union[Entity, int, str]): if isinstance(parent, Entity): if parent in self: list.remove(self, parent) @@ -2813,7 +2952,7 @@ class Container(list): return error_list - def get_entity_by_name(self, name, case_sensitive=True): + def get_entity_by_name(self, name: str, case_sensitive: bool = True): """Get the first entity which has the given name. Note: If several entities are in this list which have the same name, this method will only return the first and ignore the others. @@ -3071,8 +3210,14 @@ class Container(list): raise LinkAheadException( "The server's response didn't contain the expected elements. The configuration of this client might be invalid (especially the url).") - def _sync(self, container, unique, raise_exception_on_error, - name_case_sensitive=False, strategy=_basic_sync): + def _sync( + self, + container: Container, + unique: bool, + raise_exception_on_error: bool, + name_case_sensitive: bool = False, + strategy=_basic_sync, + ): """Synchronize this container (C1) with another container (C2). That is: 1) Synchronize any entity e1 in C1 with the @@ -3118,13 +3263,18 @@ class Container(list): self._timestamp = container._timestamp self._srid = container._srid - def _calc_sync_dict(self, remote_container, unique, - raise_exception_on_error, name_case_sensitive): + def _calc_sync_dict( + self, + remote_container: Container, + unique: bool, + raise_exception_on_error: bool, + name_case_sensitive: bool, + ): # self is local, remote_container is remote. # which is to be synced with which: # sync_dict[local_entity]=sync_remote_enities - sync_dict = dict() + sync_dict: Dict[Entity, Optional[List[Entity]]] = dict() # list of remote entities which already have a local equivalent used_remote_entities = [] @@ -3265,7 +3415,7 @@ class Container(list): return sync_dict @staticmethod - def _find_dependencies_in_container(container): + def _find_dependencies_in_container(container: Container): """Find elements in a container that are a dependency of another element of the same. Parameters @@ -3458,8 +3608,14 @@ class Container(list): return self - def retrieve(self, query=None, unique=True, - raise_exception_on_error=True, sync=True, flags=None): + def retrieve( + self, + query: Union[str, list, None] = None, + unique: bool = True, + raise_exception_on_error: bool = True, + sync: bool = True, + flags=None, + ): """Retrieve all entities in this container identified via their id if present and via their name otherwise. Any locally already existing attributes (name, description, ...) will be preserved. Any such @@ -3747,25 +3903,25 @@ class Container(list): self._linearize() # TODO: This is a possible solution for ticket#137 -# retrieved = Container() -# for entity in self: -# if entity.is_valid(): -# retrieved.append(entity) -# if len(retrieved)>0: -# retrieved = retrieved.retrieve(raise_exception_on_error=False, sync=False) -# for e_remote in retrieved: -# if e_remote.id is not None: -# try: -# self.get_entity_by_id(e_remote.id).is_valid=e_remote.is_valid -# continue -# except KeyError: -# pass -# if e_remote.name is not None: -# try: -# self.get_entity_by_name(e_remote.name).is_valid=e_remote.is_valid -# continue -# except KeyError: -# pass + # retrieved = Container() + # for entity in self: + # if entity.is_valid(): + # retrieved.append(entity) + # if len(retrieved)>0: + # retrieved = retrieved.retrieve(raise_exception_on_error=False, sync=False) + # for e_remote in retrieved: + # if e_remote.id is not None: + # try: + # self.get_entity_by_id(e_remote.id).is_valid=e_remote.is_valid + # continue + # except KeyError: + # pass + # if e_remote.name is not None: + # try: + # self.get_entity_by_name(e_remote.name).is_valid=e_remote.is_valid + # continue + # except KeyError: + # pass for entity in self: if entity.is_valid(): continue @@ -3968,7 +4124,15 @@ def get_global_acl(): class ACI(): - def __init__(self, realm, username, role, permission): + """FIXME: Add docstring""" + + def __init__( + self, + realm: Optional[str], + username: Optional[str], + role: Optional[str], + permission: Optional[str], + ): self.role = role self.username = username self.realm = realm @@ -3984,7 +4148,7 @@ class ACI(): def __repr__(self): return str(self.realm) + ":" + str(self.username) + ":" + str(self.role) + ":" + str(self.permission) - def add_to_element(self, e): + def add_to_element(self, e: etree._Element): if self.role is not None: e.set("role", self.role) else: @@ -3998,16 +4162,17 @@ class ACI(): class ACL(): + """FIXME: Add docstring""" - global_acl = None + global_acl: Optional[ACL] = None - def __init__(self, xml=None): + def __init__(self, xml: Optional[etree._Element] = None): if xml is not None: self.parse_xml(xml) else: self.clear() - def parse_xml(self, xml): + def parse_xml(self, xml: etree._Element): """Clear this ACL and parse the xml. Iterate over the rules in the xml and add each rule to this ACL. @@ -4016,14 +4181,14 @@ class ACL(): Parameters ---------- - xml : lxml.etree.Element + xml : lxml.etree._Element The xml element containing the ACL rules, i.e. <Grant> and <Deny> rules. """ self.clear() self._parse_xml(xml) - def _parse_xml(self, xml): + def _parse_xml(self, xml: etree._Element): """Parse the xml. Iterate over the rules in the xml and add each rule to this ACL. @@ -4032,7 +4197,7 @@ class ACL(): Parameters ---------- - xml : lxml.etree.Element + xml : lxml.etree._Element The xml element containing the ACL rules, i.e. <Grant> and <Deny> rules. """ @@ -4056,7 +4221,7 @@ class ACL(): permission=permission, priority=priority, revoke_grant=False) - def combine(self, other): + def combine(self, other: ACL): """ Combine and return new instance.""" result = ACL() result._grants.update(other._grants) @@ -4078,15 +4243,15 @@ class ACL(): len(self._priority_denials) + len(self._denials) == 0 def clear(self): - self._grants = set() - self._denials = set() - self._priority_grants = set() - self._priority_denials = set() + self._grants: set[ACI] = set() + self._denials: set[ACI] = set() + self._priority_grants: set[ACI] = set() + self._priority_denials: set[ACI] = set() - def _get_boolean_priority(self, priority): + def _get_boolean_priority(self, priority: Any): return str(priority).lower() in ["true", "1", "yes", "y"] - def _remove_item(self, item, priority): + def _remove_item(self, item, priority: bool): try: self._denials.remove(item) except KeyError: @@ -4106,8 +4271,14 @@ class ACL(): except KeyError: pass - def revoke_grant(self, username=None, realm=None, - role=None, permission=None, priority=False): + def revoke_grant( + self, + username: Optional[str] = None, + realm: Optional[str] = None, + role: Optional[str] = None, + permission: Optional[str] = None, + priority: Union[bool, str] = False, + ): priority = self._get_boolean_priority(priority) item = ACI(role=role, username=username, realm=realm, permission=permission) @@ -4132,8 +4303,15 @@ class ACL(): if item in self._denials: self._denials.remove(item) - def grant(self, permission, username=None, realm=None, role=None, - priority=False, revoke_denial=True): + def grant( + self, + permission: Optional[str], + username: Optional[str] = None, + realm: Optional[str] = None, + role: Optional[str] = None, + priority: bool = False, + revoke_denial: bool = True, + ): """Grant a permission to a user or role. You must specify either only the username and the realm, or only the @@ -4174,8 +4352,15 @@ class ACL(): else: self._grants.add(item) - def deny(self, username=None, realm=None, role=None, - permission=None, priority=False, revoke_grant=True): + def deny( + self, + username: Optional[str] = None, + realm: Optional[str] = None, + role: Optional[str] = None, + permission: Optional[str] = None, + priority: bool = False, + revoke_grant: bool = True, + ): """Deny a permission to a user or role for this entity. You must specify either only the username and the realm, or only the @@ -4216,7 +4401,7 @@ class ACL(): else: self._denials.add(item) - def to_xml(self, xml=None): + def to_xml(self, xml: Optional[etree._Element] = None): if xml is None: xml = etree.Element("EntityACL") @@ -4246,7 +4431,7 @@ class ACL(): return xml - def get_acl_for_role(self, role): + def get_acl_for_role(self, role: str) -> ACL: ret = ACL() for aci in self._grants: @@ -4292,7 +4477,7 @@ class ACL(): return ret - def get_permissions_for_user(self, username, realm=None): + def get_permissions_for_user(self, username: str, realm: Optional[str] = None): acl = self.get_acl_for_user(username, realm) _grants = set() @@ -4313,7 +4498,7 @@ class ACL(): return ((_grants - _denials) | _priority_grants) - _priority_denials - def get_permissions_for_role(self, role): + def get_permissions_for_role(self, role: str): acl = self.get_acl_for_role(role) _grants = set() @@ -4371,10 +4556,10 @@ class Query(): def getFlag(self, key): return self.flags.get(key) - def __init__(self, q): - self.flags = dict() + def __init__(self, q: Union[str, etree._Element]): + self.flags: Dict[str, str] = dict() self.messages = Messages() - self.cached = None + self.cached: Optional[bool] = None self.etag = None if isinstance(q, etree._Element): @@ -4419,8 +4604,13 @@ class Query(): yield next_page index += page_length - def execute(self, unique=False, raise_exception_on_error=True, cache=True, - page_length=None): + def execute( + self, + unique: bool = False, + raise_exception_on_error: bool = True, + cache: bool = True, + page_length: Optional[int] = None, + ) -> Union[Container, int]: """Execute a query (via a server-requests) and return the results. Parameters @@ -4507,8 +4697,14 @@ class Query(): return cresp -def execute_query(q, unique=False, raise_exception_on_error=True, cache=True, - flags=None, page_length=None): +def execute_query( + q: str, + unique: bool = False, + raise_exception_on_error: bool = True, + cache: bool = True, + flags: Optional[Dict[str, str]] = None, + page_length: Optional[int] = None, +) -> Union[Container, int]: """Execute a query (via a server-requests) and return the results. Parameters @@ -4600,7 +4796,7 @@ class DropOffBox(list): class UserInfo(): - def __init__(self, xml): + def __init__(self, xml: etree._Element): self.roles = [role.text for role in xml.findall("Roles/Role")] self.name = xml.get("username") self.realm = xml.get("realm") @@ -4652,7 +4848,7 @@ class Info(): class Permission(): - def __init__(self, name, description=None): + def __init__(self, name: str, description: Optional[str] = None): self.name = name self.description = description @@ -4676,13 +4872,13 @@ class Permissions(): known_permissions = None - def __init__(self, xml): + def __init__(self, xml: etree._Element): self.parse_xml(xml) def clear(self): self._perms = set() - def parse_xml(self, xml): + def parse_xml(self, xml: etree._Element): self.clear() for e in xml: @@ -4703,7 +4899,7 @@ class Permissions(): return str(self._perms) -def parse_xml(xml): +def parse_xml(xml: Union[str, etree._Element]): """parse a string or tree representation of an xml document to a set of entities (records, recordtypes, properties, or files). @@ -4712,14 +4908,14 @@ def parse_xml(xml): """ if isinstance(xml, etree._Element): - elem = xml + elem: etree._Element = xml else: elem = etree.fromstring(xml) return _parse_single_xml_element(elem) -def _parse_single_xml_element(elem): +def _parse_single_xml_element(elem: etree._Element): classmap = { 'record': Record, 'recordtype': RecordType, @@ -4773,7 +4969,7 @@ def _parse_single_xml_element(elem): "code"), description=elem.get("description"), body=elem.text) -def _evaluate_and_add_error(parent_error, ent): +def _evaluate_and_add_error(parent_error: TransactionError, ent: Union[Entity, Container]): """Evaluate the error message(s) attached to entity and add a corresponding exception to parent_error. @@ -4782,7 +4978,7 @@ def _evaluate_and_add_error(parent_error, ent): parent_error : TransactionError Parent error to which the new exception will be attached. This exception will be a direct child. - ent : Entity + ent : Entity or Container Entity that caused the TransactionError. An exception is created depending on its error message(s). @@ -4876,7 +5072,7 @@ def _evaluate_and_add_error(parent_error, ent): return parent_error -def raise_errors(arg0): +def raise_errors(arg0: Union[Entity, QueryTemplate, Container]): """Raise a TransactionError depending on the error code(s) inside Entity, QueryTemplate or Container arg0. More detailed errors may be attached to the TransactionError depending on the contents of @@ -4903,7 +5099,7 @@ def raise_errors(arg0): raise transaction_error -def delete(ids, raise_exception_on_error=True): +def delete(ids: Union[List[int], range], raise_exception_on_error=True): c = Container() if isinstance(ids, list) or isinstance(ids, range):