Skip to content
Snippets Groups Projects

Add type hints to models.py

Merged Henrik tom Wörden requested to merge f-type-hints into dev
1 unresolved thread
Files
2
@@ -35,6 +35,7 @@ transactions.
from __future__ import annotations # Can be removed with 3.10.
from __future__ import print_function, unicode_literals
from enum import Enum
import re
import sys
@@ -46,7 +47,14 @@ from os import listdir
from os.path import isdir
from random import randint
from tempfile import NamedTemporaryFile
from typing import Any, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from datetime import datetime
from typing import Any, Literal, Optional, Type, Union, List, TextIO
from warnings import warn
from lxml import etree
@@ -72,13 +80,23 @@ from .versioning import Version
_ENTITY_URI_SEGMENT = "Entity"
# importances/inheritance
OBLIGATORY = "OBLIGATORY"
SUGGESTED = "SUGGESTED"
RECOMMENDED = "RECOMMENDED"
FIX = "FIX"
ALL = "ALL"
NONE = "NONE"
class Inheritance(Enum):
OBLIGATORY = "OBLIGATORY"
SUGGESTED = "SUGGESTED"
RECOMMENDED = "RECOMMENDED"
FIX = "FIX"
ALL = "ALL"
NONE = "NONE"
OBLIGATORY = Inheritance.OBLIGATORY
SUGGESTED = Inheritance.SUGGESTED
RECOMMENDED = Inheritance.RECOMMENDED
FIX = Inheritance.FIX
ALL = Inheritance.ALL
NONE = Inheritance.NONE
SPECIAL_ATTRIBUTES = ["name", "role", "datatype", "description",
@@ -98,15 +116,23 @@ class Entity:
by the user to control several server-side plug-ins.
"""
def __init__(self, name=None, id=None, description=None, # @ReservedAssignment
datatype=None, value=None, **kwargs):
def __init__(
self,
name: Optional[str] = None,
id: Optional[int] = None,
description: Optional[str] = None, # @ReservedAssignment
datatype: Optional[str] = None,
value=None,
**kwargs,
):
self.__role = kwargs["role"] if "role" in kwargs else None
self._checksum = None
self._checksum: Optional[str]= None
self._size = None
self._upload = None
# If an entity is used (e.g. as parent), it is wrapped instead of being used directly.
# see Entity._wrap()
self._wrapped_entity = None
self._wrapped_entity: Optional[Entity] = None
self._version = None
self._cuid = None
self._flags = dict()
@@ -120,7 +146,7 @@ class Entity:
self.path = None
self.file = None
self.unit = None
self.acl = None
self.acl: Optional[ACL] = None
self.permissions = None
self.is_valid = lambda: False
self.is_deleted = lambda: False
@@ -324,8 +350,15 @@ class Entity:
def pickup(self, new_pickup):
self.__pickup = new_pickup
def grant(self, realm=None, username=None, role=None,
permission=None, priority=False, revoke_denial=True):
def grant(
self,
realm: Optional[str] = None,
username: Optional[str] = None,
role: Optional[str] = None,
permission: str = None,
priority: bool = False,
revoke_denial: bool = True,
):
"""Grant a permission to a user or role for this entity.
You must specify either only the username and the realm, or only the
@@ -359,8 +392,15 @@ class Entity:
permission=permission, priority=priority,
revoke_denial=revoke_denial)
def deny(self, realm=None, username=None, role=None,
permission=None, priority=False, revoke_grant=True):
def deny(
self,
realm: Optional[str] = None,
username: Optional[str] = None,
role: Optional[str] = None,
permission: str = None,
priority: bool = False,
revoke_grant: bool = True,
):
"""Deny a permission to a user or role for this entity.
You must specify either only the username and the realm, or only the
@@ -412,7 +452,7 @@ class Entity:
permission=permission,
priority=priority)
def is_permitted(self, permission, role=None):
def is_permitted(self, permission: str, role: Optional[str] = None):
if role is None:
# pylint: disable=unsupported-membership-test
@@ -420,7 +460,7 @@ class Entity:
else:
self.acl.is_permitted(permission=permission)
def get_all_messages(self):
def get_all_messages(self) -> Messages:
ret = Messages()
ret.append(self.messages)
@@ -519,8 +559,29 @@ class Entity:
return self
def add_property(self, property=None, value=None, id=None, name=None, description=None, datatype=None,
unit=None, importance=None, inheritance=None): # @ReservedAssignment
def add_property(
self,
property: Union[int, str, Entity, None] = None,
value: Union[
int,
str,
bool,
datetime,
Entity,
List[int],
List[str],
List[bool],
List[Entity],
None,
] = None,
id: Optional[int] = None,
name: Optional[str] = None,
description: Optional[str] = None,
datatype: Optional[str] = None,
unit: Optional[str] = None,
importance: Optional[str] = None,
inheritance: Union[str, Inheritance, None] = None,
) -> Entity: # @ReservedAssignment
"""Add a property to this entity.
The first parameter is meant to identify the property entity either via
@@ -690,7 +751,13 @@ class Entity:
return self
def add_parent(self, parent=None, id=None, name=None, inheritance=None): # @ReservedAssignment
def add_parent(
self,
parent: Union[Entity, int, str, None] = None,
id: Optional[int] = None,
name: Optional[str] = None,
inheritance: Union[str, Inheritance, None] = None,
): # @ReservedAssignment
"""Add a parent to this entity.
Parameters
@@ -704,7 +771,7 @@ class Entity:
name : str
Name of the parent entity. Ignored if `parent is not
none`.
inheritance : str
inheritance : str, Inheritance
One of ``obligatory``, ``recommended``, ``suggested``, or ``fix``. Specifies the
minimum importance which parent properties need to have to be inherited by this
entity. If no `inheritance` is given, no properties will be inherited by the child.
@@ -811,7 +878,7 @@ out: bool
return self.parents
def get_parents_recursively(self, retrieve: bool = True):
def get_parents_recursively(self, retrieve: bool = True) -> List[Entity]:
"""Get all ancestors of this entity.
Parameters
@@ -826,12 +893,12 @@ out: List[Entity]
The parents of this Entity
"""
all_parents = []
all_parents: List[Entity] = []
self._get_parent_recursively(all_parents, retrieve=retrieve)
return all_parents
def _get_parent_recursively(self, all_parents: list, retrieve: bool = True):
def _get_parent_recursively(self, all_parents: List[Entity], retrieve: bool = True):
"""Get all ancestors with a little helper.
As a side effect of this method, the ancestors are added to
@@ -862,7 +929,7 @@ out: List[Entity]
all_parents.append(w_parent)
w_parent._get_parent_recursively(all_parents, retrieve=retrieve)
def get_parent(self, key):
def get_parent(self, key: Union[int, Entity, str]) -> Union[Entity, None]:
"""Return the first parent matching the key or None if no match exists.
Parameters
@@ -1124,7 +1191,12 @@ out: List[Entity]
return False
def to_xml(self, xml=None, add_properties=ALL, local_serialization=False):
def to_xml(
self,
xml: Optional[etree._Element] = None,
add_properties=ALL,
local_serialization=False,
):
"""Generate an xml representation of this entity. If the parameter xml
is given, all attributes, parents, properties, and messages of this
entity will be added to it instead of creating a new element.
@@ -1146,7 +1218,7 @@ out: List[Entity]
# unwrap wrapped entity
if self._wrapped_entity is not None:
xml = self._wrapped_entity.to_xml(xml, add_properties)
xml: etree._Element = self._wrapped_entity.to_xml(xml, add_properties)
if self.id is not None:
xml.set("id", str(self.id))
@@ -1394,8 +1466,14 @@ out: List[Entity]
return Container().append(self).retrieve(
unique=unique, raise_exception_on_error=raise_exception_on_error, flags=flags)
def insert(self, raise_exception_on_error=True, unique=True,
sync=True, strict=False, flags=None):
def insert(
self,
raise_exception_on_error=True,
unique=True,
sync=True,
strict=False,
flags: Optional[dict] = None,
):
"""Insert this entity into a LinkAhead server. A successful insertion will
generate a new persistent ID for this entity. This entity can be
identified, retrieved, updated, and deleted via this ID until it has
@@ -1878,7 +1956,13 @@ class Property(Entity):
class Message(object):
def __init__(self, type=None, code=None, description=None, body=None): # @ReservedAssignment
def __init__(
self,
type: Optional[str] = None,
code: Optional[int] = None,
description=None,
body: str = None,
): # @ReservedAssignment
self.description = description
self.type = type if type is not None else "Info"
self.code = int(code) if code is not None else None
@@ -1926,7 +2010,7 @@ class RecordType(Entity):
property=property, id=id, name=name, description=description, datatype=datatype,
value=value, unit=unit, importance=importance, inheritance=inheritance)
def add_parent(self, parent=None, id=None, name=None, inheritance=OBLIGATORY):
def add_parent(self, parent=None, id=None, name=None, inheritance:Union[str, Inheritance]=OBLIGATORY):
"""Add a parent to this RecordType
Parameters
@@ -1982,7 +2066,12 @@ class Record(Entity):
property=property, id=id, name=name, description=description, datatype=datatype,
value=value, unit=unit, importance=importance, inheritance=inheritance)
def __init__(self, name=None, id=None, description=None): # @ReservedAssignment
def __init__(
self,
name: Optional[str] = None,
id: Optional[int] = None,
description: Optional[str] = None,
): # @ReservedAssignment
Entity.__init__(self, name=name, id=id, description=description,
role="Record")
@@ -2023,9 +2112,17 @@ class File(Record):
"""
def __init__(self, name=None, id=None, description=None, # @ReservedAssignment
path=None, file=None, pickup=None, # @ReservedAssignment
thumbnail=None, from_location=None):
def __init__(
self,
name: Optional[str] = None,
id: Optional[int] = None,
description: Optional[str] = None, # @ReservedAssignment
path: Optional[str] = None,
file: Union[str, TextIO, None] = None,
pickup: Optional[str] = None, # @ReservedAssignment
thumbnail: Optional[str] = None,
from_location=None,
):
Record.__init__(self, id=id, name=name, description=description)
self.role = "File"
self.datatype = None
@@ -2161,7 +2258,7 @@ class _Properties(list):
if property is not None:
self._importance[property] = importance
def get_by_name(self, name):
def get_by_name(self, name: str) -> Property:
"""Get a property of this list via it's name. Raises a LinkAheadException
if not exactly one property has this name.
@@ -2203,7 +2300,9 @@ class _Properties(list):
return self
def to_xml(self, add_to_element, add_properties):
def to_xml(
self, add_to_element: etree._Element, add_properties: Union[str, Inheritance]
):
for p in self:
importance = self._importance.get(p)
@@ -2213,7 +2312,7 @@ class _Properties(list):
pelem = p.to_xml(xml=etree.Element("Property"), add_properties=FIX)
if p in self._importance:
pelem.set("importance", importance)
pelem.set("importance", str(importance))
if p in self._inheritance:
pelem.set("flag", "inheritance:" +
@@ -2228,7 +2327,7 @@ class _Properties(list):
return xml2str(xml)
def _get_entity_by_cuid(self, cuid):
def _get_entity_by_cuid(self, cuid: str):
'''
Get the first entity which has the given cuid.
Note: this method is intended for internal use.
@@ -2242,7 +2341,7 @@ class _Properties(list):
return e
raise KeyError("No entity with that cuid in this container.")
def remove(self, prop):
def remove(self, prop: Union[Entity, int]):
if isinstance(prop, Entity):
if prop in self:
list.remove(self, prop)
@@ -2366,7 +2465,7 @@ class _ParentList(list):
return xml2str(xml)
def remove(self, parent):
def remove(self, parent: Union[Entity, int, str]):
if isinstance(parent, Entity):
if parent in self:
list.remove(self, parent)
@@ -3968,7 +4067,15 @@ def get_global_acl():
class ACI():
def __init__(self, realm, username, role, permission):
"""FIXME: Add docstring"""
def __init__(
self,
realm: Optional[str],
username: Optional[str],
role: Optional[str],
permission: Optional[str],
):
self.role = role
self.username = username
self.realm = realm
@@ -3984,7 +4091,7 @@ class ACI():
def __repr__(self):
return str(self.realm) + ":" + str(self.username) + ":" + str(self.role) + ":" + str(self.permission)
def add_to_element(self, e):
def add_to_element(self, e: etree._Element):
if self.role is not None:
e.set("role", self.role)
else:
@@ -3998,16 +4105,17 @@ class ACI():
class ACL():
"""FIXME: Add docstring"""
global_acl = None
global_acl: Optional[ACL] = None
def __init__(self, xml=None):
def __init__(self, xml: Optional[etree._Element] = None):
if xml is not None:
self.parse_xml(xml)
else:
self.clear()
def parse_xml(self, xml):
def parse_xml(self, xml: etree._Element):
"""Clear this ACL and parse the xml.
Iterate over the rules in the xml and add each rule to this ACL.
@@ -4016,14 +4124,14 @@ class ACL():
Parameters
----------
xml : lxml.etree.Element
xml : lxml.etree._Element
The xml element containing the ACL rules, i.e. <Grant> and <Deny>
rules.
"""
self.clear()
self._parse_xml(xml)
def _parse_xml(self, xml):
def _parse_xml(self, xml: etree._Element):
"""Parse the xml.
Iterate over the rules in the xml and add each rule to this ACL.
@@ -4032,7 +4140,7 @@ class ACL():
Parameters
----------
xml : lxml.etree.Element
xml : lxml.etree._Element
The xml element containing the ACL rules, i.e. <Grant> and <Deny>
rules.
"""
@@ -4056,7 +4164,7 @@ class ACL():
permission=permission, priority=priority,
revoke_grant=False)
def combine(self, other):
def combine(self, other: ACL):
""" Combine and return new instance."""
result = ACL()
result._grants.update(other._grants)
@@ -4078,15 +4186,15 @@ class ACL():
len(self._priority_denials) + len(self._denials) == 0
def clear(self):
self._grants = set()
self._denials = set()
self._priority_grants = set()
self._priority_denials = set()
self._grants: set[ACI] = set()
self._denials: set[ACI] = set()
self._priority_grants: set[ACI] = set()
self._priority_denials: set[ACI] = set()
def _get_boolean_priority(self, priority):
def _get_boolean_priority(self, priority: str):
return str(priority).lower() in ["true", "1", "yes", "y"]
def _remove_item(self, item, priority):
def _remove_item(self, item, priority: str):
try:
self._denials.remove(item)
except KeyError:
@@ -4106,8 +4214,14 @@ class ACL():
except KeyError:
pass
def revoke_grant(self, username=None, realm=None,
role=None, permission=None, priority=False):
def revoke_grant(
self,
username: Optional[str] = None,
realm: Optional[str] = None,
role: Optional[str] = None,
permission: str = None,
priority: Union[bool, str] = False,
):
priority = self._get_boolean_priority(priority)
item = ACI(role=role, username=username,
realm=realm, permission=permission)
@@ -4132,8 +4246,15 @@ class ACL():
if item in self._denials:
self._denials.remove(item)
def grant(self, permission, username=None, realm=None, role=None,
priority=False, revoke_denial=True):
def grant(
self,
permission: str,
username: Optional[str] = None,
realm: Optional[str] = None,
role: Optional[str] = None,
priority: bool = False,
revoke_denial: bool = True,
):
"""Grant a permission to a user or role.
You must specify either only the username and the realm, or only the
@@ -4174,8 +4295,15 @@ class ACL():
else:
self._grants.add(item)
def deny(self, username=None, realm=None, role=None,
permission=None, priority=False, revoke_grant=True):
def deny(
self,
username: Optional[str] = None,
realm: Optional[str] = None,
role: Optional[str] = None,
permission: str = None,
priority: bool = False,
revoke_grant: bool = True,
):
"""Deny a permission to a user or role for this entity.
You must specify either only the username and the realm, or only the
@@ -4216,7 +4344,7 @@ class ACL():
else:
self._denials.add(item)
def to_xml(self, xml=None):
def to_xml(self, xml: Optional[etree._Element] = None):
if xml is None:
xml = etree.Element("EntityACL")
@@ -4246,7 +4374,7 @@ class ACL():
return xml
def get_acl_for_role(self, role):
def get_acl_for_role(self, role: str) -> ACL:
ret = ACL()
for aci in self._grants:
@@ -4292,7 +4420,7 @@ class ACL():
return ret
def get_permissions_for_user(self, username, realm=None):
def get_permissions_for_user(self, username: str, realm: Optional[str] = None):
acl = self.get_acl_for_user(username, realm)
_grants = set()
@@ -4313,7 +4441,7 @@ class ACL():
return ((_grants - _denials) | _priority_grants) - _priority_denials
def get_permissions_for_role(self, role):
def get_permissions_for_role(self, role: str):
acl = self.get_acl_for_role(role)
_grants = set()
@@ -4701,7 +4829,7 @@ class Permissions():
return str(self._perms)
def parse_xml(xml):
def parse_xml(xml: Union[str, etree._Element]):
"""parse a string or tree representation of an xml document to a set of
entities (records, recordtypes, properties, or files).
@@ -4710,9 +4838,9 @@ def parse_xml(xml):
"""
if isinstance(xml, etree._Element):
elem = xml
elem: etree._Element = xml
else:
elem = etree.fromstring(xml)
elem: etree._Element = etree.fromstring(xml)
return _parse_single_xml_element(elem)
@@ -4771,7 +4899,7 @@ def _parse_single_xml_element(elem):
"code"), description=elem.get("description"), body=elem.text)
def _evaluate_and_add_error(parent_error, ent):
def _evaluate_and_add_error(parent_error: TransactionError, ent: Entity):
"""Evaluate the error message(s) attached to entity and add a
corresponding exception to parent_error.
@@ -4874,7 +5002,7 @@ def _evaluate_and_add_error(parent_error, ent):
return parent_error
def raise_errors(arg0):
def raise_errors(arg0: Union[Entity, QueryTemplate, Container]):
"""Raise a TransactionError depending on the error code(s) inside
Entity, QueryTemplate or Container arg0. More detailed errors may
be attached to the TransactionError depending on the contents of
@@ -4901,7 +5029,7 @@ def raise_errors(arg0):
raise transaction_error
def delete(ids, raise_exception_on_error=True):
def delete(ids:Union[List[int], range], raise_exception_on_error=True):
c = Container()
if isinstance(ids, list) or isinstance(ids, range):
Loading