Skip to content
Snippets Groups Projects
Commit 653ae056 authored by Joscha Schmiedt's avatar Joscha Schmiedt
Browse files

WIP: Fixing mypy errors

parent e163e732
Branches
Tags
2 merge requests!143Release 0.15.0,!134Resolve "Fix mypy errors in models.py and complete type hints"
Pipeline #49446 failed
...@@ -54,6 +54,9 @@ if TYPE_CHECKING and sys.version_info > (3, 7): ...@@ -54,6 +54,9 @@ if TYPE_CHECKING and sys.version_info > (3, 7):
from datetime import datetime from datetime import datetime
from typing import Any, Dict, Optional, Type, Union, List, TextIO, Tuple, Literal from typing import Any, Dict, Optional, Type, Union, List, TextIO, Tuple, Literal
from .datatype import DATATYPE from .datatype import DATATYPE
from tempfile import _TemporaryFileWrapper
from io import BufferedWriter
from warnings import warn from warnings import warn
...@@ -62,15 +65,26 @@ from lxml import etree ...@@ -62,15 +65,26 @@ from lxml import etree
from ..configuration import get_config from ..configuration import get_config
from ..connection.connection import get_connection from ..connection.connection import get_connection
from ..connection.encode import MultipartParam, multipart_encode from ..connection.encode import MultipartParam, multipart_encode
from ..exceptions import (AmbiguousEntityError, AuthorizationError, from ..exceptions import (
ConsistencyError, EmptyUniqueQueryError, AmbiguousEntityError,
EntityDoesNotExistError, EntityError, AuthorizationError,
EntityHasNoDatatypeError, HTTPURITooLongError, ConsistencyError,
LinkAheadConnectionError, LinkAheadException, EmptyUniqueQueryError,
MismatchingEntitiesError, PagingConsistencyError, EntityDoesNotExistError,
QueryNotUniqueError, TransactionError, EntityError,
UniqueNamesError, UnqualifiedParentsError, EntityHasNoAclError,
UnqualifiedPropertiesError) EntityHasNoDatatypeError,
HTTPURITooLongError,
LinkAheadConnectionError,
LinkAheadException,
MismatchingEntitiesError,
PagingConsistencyError,
QueryNotUniqueError,
TransactionError,
UniqueNamesError,
UnqualifiedParentsError,
UnqualifiedPropertiesError,
)
from .datatype import ( from .datatype import (
BOOLEAN, BOOLEAN,
DATETIME, DATETIME,
...@@ -95,7 +109,8 @@ FIX = "FIX" ...@@ -95,7 +109,8 @@ FIX = "FIX"
ALL = "ALL" ALL = "ALL"
NONE = "NONE" NONE = "NONE"
if TYPE_CHECKING: if TYPE_CHECKING:
INHERITANCE = Literal["OBLIGATORY", "SUGGESTED", "RECOMMENDED", "FIX", "ALL", "NONE"] INHERITANCE = Literal["OBLIGATORY", "SUGGESTED", "RECOMMENDED", "ALL", "NONE"]
IMPORTANCE = Literal["OBLIGATORY", "RECOMMENDED", "SUGGESTED", "FIX", "NONE"]
SPECIAL_ATTRIBUTES = ["name", "role", "datatype", "description", SPECIAL_ATTRIBUTES = ["name", "role", "datatype", "description",
"id", "path", "checksum", "size", "value"] "id", "path", "checksum", "size", "value"]
...@@ -387,6 +402,10 @@ class Entity: ...@@ -387,6 +402,10 @@ class Entity:
ACL will be revoked. ACL will be revoked.
""" """
# @review Florian Spreckelsen 2022-03-17 # @review Florian Spreckelsen 2022-03-17
if self.acl is None:
raise EntityHasNoAclError("This entity does not have an ACL (yet).")
self.acl.grant(realm=realm, username=username, role=role, self.acl.grant(realm=realm, username=username, role=role,
permission=permission, priority=priority, permission=permission, priority=priority,
revoke_denial=revoke_denial) revoke_denial=revoke_denial)
...@@ -429,6 +448,9 @@ class Entity: ...@@ -429,6 +448,9 @@ class Entity:
ACL will be revoked. ACL will be revoked.
""" """
# @review Florian Spreckelsen 2022-03-17 # @review Florian Spreckelsen 2022-03-17
if self.acl is None:
raise EntityHasNoAclError("This entity does not have an ACL (yet).")
self.acl.deny(realm=realm, username=username, role=role, self.acl.deny(realm=realm, username=username, role=role,
permission=permission, priority=priority, permission=permission, priority=priority,
revoke_grant=revoke_grant) revoke_grant=revoke_grant)
...@@ -452,12 +474,14 @@ class Entity: ...@@ -452,12 +474,14 @@ class Entity:
priority=priority) priority=priority)
def is_permitted(self, permission: Permission, role: Optional[str] = None): def is_permitted(self, permission: Permission, role: Optional[str] = None):
if role is None: if self.acl is None:
# pylint: disable=unsupported-membership-test raise EntityHasNoAclError("This entity does not have an ACL (yet).")
if role is None and self.permissions is not None:
# pylint: disable=unsupported-membership-test
return permission in self.permissions return permission in self.permissions
else:
self.acl.is_permitted(permission=permission) self.acl.is_permitted(role, permission=permission)
def get_all_messages(self) -> Messages: def get_all_messages(self) -> Messages:
ret = Messages() ret = Messages()
...@@ -534,20 +558,26 @@ class Entity: ...@@ -534,20 +558,26 @@ class Entity:
""" """
if self.get_property(property_name) is None: prop = self.get_property(property_name)
if prop is None:
return self return self
if self.get_property(property_name).value is None:
property_value = prop.value
if property_value is None:
remove_if_empty_afterwards = False remove_if_empty_afterwards = False
empty_afterwards = False empty_afterwards = False
if isinstance(self.get_property(property_name).value, list): if isinstance(property_value, list):
if value in self.get_property(property_name).value: if value in property_value:
self.get_property(property_name).value.remove(value) property_value.remove(value)
if self.get_property(property_name).value == []: if property_value == []:
self.get_property(property_name).value = None property_value = None
empty_afterwards = True empty_afterwards = True
elif self.get_property(property_name).value == value: elif property_value == value:
self.get_property(property_name).value = None property_value = None
empty_afterwards = True empty_afterwards = True
if remove_if_empty_afterwards and empty_afterwards: if remove_if_empty_afterwards and empty_afterwards:
self.remove_property(property_name) self.remove_property(property_name)
...@@ -576,10 +606,10 @@ class Entity: ...@@ -576,10 +606,10 @@ class Entity:
id: Optional[int] = None, id: Optional[int] = None,
name: Optional[str] = None, name: Optional[str] = None,
description: Optional[str] = None, description: Optional[str] = None,
datatype: Optional[str] = None, datatype: Optional[DATATYPE] = None,
unit: Optional[str] = None, unit: Optional[str] = None,
importance: Optional[str] = None, importance: Optional[IMPORTANCE] = None,
inheritance: Union[str, INHERITANCE, None] = None, inheritance: Optional[INHERITANCE] = None,
) -> Entity: # @ReservedAssignment ) -> Entity: # @ReservedAssignment
"""Add a property to this entity. """Add a property to this entity.
...@@ -755,7 +785,7 @@ class Entity: ...@@ -755,7 +785,7 @@ class Entity:
parent: Union[Entity, int, str, None] = None, parent: Union[Entity, int, str, None] = None,
id: Optional[int] = None, id: Optional[int] = None,
name: Optional[str] = None, name: Optional[str] = None,
inheritance: Union[INHERITANCE, str, None] = None, inheritance: INHERITANCE = "NONE",
): # @ReservedAssignment ): # @ReservedAssignment
"""Add a parent to this entity. """Add a parent to this entity.
...@@ -771,7 +801,7 @@ class Entity: ...@@ -771,7 +801,7 @@ class Entity:
Name of the parent entity. Ignored if `parent is not Name of the parent entity. Ignored if `parent is not
none`. none`.
inheritance : str, INHERITANCE inheritance : str, INHERITANCE
One of ``obligatory``, ``recommended``, ``suggested``, or ``fix``. Specifies the One of ``obligatory``, ``recommended``, ``suggested``, or ``all``. Specifies the
minimum importance which parent properties need to have to be inherited by this minimum importance which parent properties need to have to be inherited by this
entity. If no `inheritance` is given, no properties will be inherited by the child. entity. If no `inheritance` is given, no properties will be inherited by the child.
This parameter is case-insensitive. This parameter is case-insensitive.
...@@ -1018,7 +1048,7 @@ out: List[Entity] ...@@ -1018,7 +1048,7 @@ out: List[Entity]
return p return p
else: else:
raise ValueError("argument should be entity, int , string") raise ValueError("pattern argument should be Entity, int or str")
return None return None
...@@ -1035,8 +1065,10 @@ out: List[Entity] ...@@ -1035,8 +1065,10 @@ out: List[Entity]
""" """
SPECIAL_SELECTORS = ["unit", "value", "description", "id", "name"] SPECIAL_SELECTORS = ["unit", "value", "description", "id", "name"]
if not isinstance(selector, (tuple, list)): if isinstance(selector, str):
selector = [selector] selector = [selector]
elif isinstance(selector, tuple):
selector = list(selector)
ref = self ref = self
...@@ -1051,7 +1083,7 @@ out: List[Entity] ...@@ -1051,7 +1083,7 @@ out: List[Entity]
special_selector = None special_selector = None
# iterating through the entity tree according to the selector # iterating through the entity tree according to the selector
prop: Optional[Property] = None
for subselector in selector: for subselector in selector:
# selector does not match the structure, we cannot get a # selector does not match the structure, we cannot get a
# property of non-entity # property of non-entity
...@@ -1078,7 +1110,6 @@ out: List[Entity] ...@@ -1078,7 +1110,6 @@ out: List[Entity]
ref = prop ref = prop
# if we saved a special selector before, apply it # if we saved a special selector before, apply it
if special_selector is None: if special_selector is None:
return prop.value return prop.value
else: else:
...@@ -1195,7 +1226,7 @@ out: List[Entity] ...@@ -1195,7 +1226,7 @@ out: List[Entity]
def to_xml( def to_xml(
self, self,
xml: Optional[etree._Element] = None, xml: Optional[etree._Element] = None,
add_properties: Optional[INHERITANCE] = ALL, add_properties: INHERITANCE = "ALL",
local_serialization: bool = False, local_serialization: bool = False,
) -> etree._Element: ) -> etree._Element:
"""Generate an xml representation of this entity. If the parameter xml """Generate an xml representation of this entity. If the parameter xml
...@@ -1207,6 +1238,10 @@ out: List[Entity] ...@@ -1207,6 +1238,10 @@ out: List[Entity]
@param xml: an xml element to which all attributes, parents, @param xml: an xml element to which all attributes, parents,
properties, and messages properties, and messages
are to be added. are to be added.
FIXME: Add documentation for the add_properties parameter.
FIXME: Add docuemntation for the local_serialization parameter.
@return: xml representation of this entity. @return: xml representation of this entity.
""" """
...@@ -1849,7 +1884,12 @@ class Parent(Entity): ...@@ -1849,7 +1884,12 @@ class Parent(Entity):
self.set_flag("inheritance", inheritance) self.set_flag("inheritance", inheritance)
self.__affiliation = None self.__affiliation = None
def to_xml(self, xml: Optional[etree._Element] = None, add_properties=None): def to_xml(
self,
xml: Optional[etree._Element] = None,
add_properties: INHERITANCE = "NONE",
local_serialization: bool = False,
):
if xml is None: if xml is None:
xml = etree.Element("Parent") xml = etree.Element("Parent")
...@@ -1919,11 +1959,20 @@ class Property(Entity): ...@@ -1919,11 +1959,20 @@ class Property(Entity):
datatype=datatype, value=value, role="Property") datatype=datatype, value=value, role="Property")
self.unit = unit self.unit = unit
def to_xml(self, xml: Optional[etree._Element] = None, add_properties=ALL): def to_xml(
self,
xml: Optional[etree._Element] = None,
add_properties: INHERITANCE = "ALL",
local_serialization: bool = False,
):
if xml is None: if xml is None:
xml = etree.Element("Property") xml = etree.Element("Property")
return super(Property, self).to_xml(xml, add_properties) return super(Property, self).to_xml(
xml=xml,
add_properties=add_properties,
local_serialization=local_serialization,
)
def is_reference(self, server_retrieval=False): def is_reference(self, server_retrieval=False):
"""Returns whether this Property is a reference """Returns whether this Property is a reference
...@@ -2028,7 +2077,7 @@ class RecordType(Entity): ...@@ -2028,7 +2077,7 @@ class RecordType(Entity):
parent: Union[Entity, int, str, None] = None, parent: Union[Entity, int, str, None] = None,
id: Optional[int] = None, id: Optional[int] = None,
name: Optional[str] = None, name: Optional[str] = None,
inheritance: Union[INHERITANCE, str, None] = OBLIGATORY, inheritance: INHERITANCE = "OBLIGATORY",
): ):
"""Add a parent to this RecordType """Add a parent to this RecordType
...@@ -2048,8 +2097,8 @@ class RecordType(Entity): ...@@ -2048,8 +2097,8 @@ class RecordType(Entity):
name : str name : str
Name of the parent entity. Ignored if `parent is not Name of the parent entity. Ignored if `parent is not
none`. none`.
inheritance : str, default OBLIGATORY inheritance : INHERITANCE, default OBLIGATORY
One of ``obligatory``, ``recommended``, ``suggested``, or ``fix``. Specifies the One of ``obligatory``, ``recommended``, ``suggested``, or ``all``. Specifies the
minimum importance which parent properties need to have to be inherited by this minimum importance which parent properties need to have to be inherited by this
entity. If no `inheritance` is given, no properties will be inherited by the child. entity. If no `inheritance` is given, no properties will be inherited by the child.
This parameter is case-insensitive. This parameter is case-insensitive.
...@@ -2075,12 +2124,18 @@ class RecordType(Entity): ...@@ -2075,12 +2124,18 @@ class RecordType(Entity):
def to_xml( def to_xml(
self, self,
xml: Optional[etree._Element] = None, xml: Optional[etree._Element] = None,
add_properties: Optional[INHERITANCE] = ALL, add_properties: INHERITANCE = "ALL",
local_serialization: bool = False,
) -> etree._Element: ) -> etree._Element:
if xml is None: if xml is None:
xml = etree.Element("RecordType") xml = etree.Element("RecordType")
return Entity.to_xml(self, xml, add_properties) return Entity.to_xml(
self,
xml=xml,
add_properties=add_properties,
local_serialization=local_serialization,
)
class Record(Entity): class Record(Entity):
...@@ -2104,11 +2159,20 @@ class Record(Entity): ...@@ -2104,11 +2159,20 @@ class Record(Entity):
Entity.__init__(self, name=name, id=id, description=description, Entity.__init__(self, name=name, id=id, description=description,
role="Record") role="Record")
def to_xml(self, xml=None, add_properties=ALL): def to_xml(
self,
xml: Optional[etree._Element] = None,
add_properties: INHERITANCE = "ALL",
local_serialization: bool = False,
):
if xml is None: if xml is None:
xml = etree.Element("Record") xml = etree.Element("Record")
return Entity.to_xml(self, xml, add_properties=ALL) return super().to_xml(
xml=xml,
add_properties=add_properties,
local_serialization=local_serialization,
)
class File(Record): class File(Record):
...@@ -2175,7 +2239,7 @@ class File(Record): ...@@ -2175,7 +2239,7 @@ class File(Record):
def to_xml( def to_xml(
self, self,
xml: Optional[etree._Element] = None, xml: Optional[etree._Element] = None,
add_properties: Optional[INHERITANCE] = ALL, add_properties: INHERITANCE = "ALL",
local_serialization: bool = False, local_serialization: bool = False,
): ):
"""Convert this file to an xml element. """Convert this file to an xml element.
...@@ -2189,7 +2253,7 @@ class File(Record): ...@@ -2189,7 +2253,7 @@ class File(Record):
return Entity.to_xml(self, xml=xml, add_properties=add_properties, return Entity.to_xml(self, xml=xml, add_properties=add_properties,
local_serialization=local_serialization) local_serialization=local_serialization)
def download(self, target=None): def download(self, target: Optional[str] = None):
"""Download this file-entity's actual file from the file server. It """Download this file-entity's actual file from the file server. It
will be stored to the target or will be hold as a temporary file. will be stored to the target or will be hold as a temporary file.
...@@ -2199,7 +2263,7 @@ class File(Record): ...@@ -2199,7 +2263,7 @@ class File(Record):
self.clear_server_messages() self.clear_server_messages()
if target: if target:
file_ = open(target, 'wb') file_: Union[BufferedWriter, _TemporaryFileWrapper] = open(target, "wb")
else: else:
file_ = NamedTemporaryFile(mode='wb', delete=False) file_ = NamedTemporaryFile(mode='wb', delete=False)
checksum = File.download_from_path(file_, self.path) checksum = File.download_from_path(file_, self.path)
...@@ -2340,9 +2404,7 @@ class _Properties(list): ...@@ -2340,9 +2404,7 @@ class _Properties(list):
return self return self
def to_xml( def to_xml(self, add_to_element: etree._Element, add_properties: INHERITANCE):
self, add_to_element: etree._Element, add_properties: Union[str, INHERITANCE]
):
for p in self: for p in self:
importance = self._importance.get(p) importance = self._importance.get(p)
...@@ -4152,11 +4214,17 @@ class ACI(): ...@@ -4152,11 +4214,17 @@ class ACI():
if self.role is not None: if self.role is not None:
e.set("role", self.role) e.set("role", self.role)
else: else:
if self.username is None:
raise LinkAheadException(
"An ACI must have either a role or a username."
)
e.set("username", self.username) e.set("username", self.username)
if self.realm is not None: if self.realm is not None:
e.set("realm", self.realm) e.set("realm", self.realm)
p = etree.Element("Permission") p = etree.Element("Permission")
if self.permission is None:
raise LinkAheadException("An ACI must have a permission.")
p.set("name", self.permission) p.set("name", self.permission)
e.append(p) e.append(p)
...@@ -4206,7 +4274,7 @@ class ACL(): ...@@ -4206,7 +4274,7 @@ class ACL():
role = e.get("role") role = e.get("role")
username = e.get("username") username = e.get("username")
realm = e.get("realm") realm = e.get("realm")
priority = e.get("priority") priority = self._get_boolean_priority(e.get("priority"))
for p in e: for p in e:
if p.tag == "Permission": if p.tag == "Permission":
...@@ -4564,12 +4632,18 @@ class Query(): ...@@ -4564,12 +4632,18 @@ class Query():
if isinstance(q, etree._Element): if isinstance(q, etree._Element):
self.q = q.get("string") self.q = q.get("string")
self.results = int(q.get("results")) results = q.get("results")
if results is None:
raise LinkAheadException(
"The query result count is not available in the response."
)
self.results = int(results)
if q.get("cached") is None: cached_value = q.get("cached")
if cached_value is None:
self.cached = False self.cached = False
else: else:
self.cached = q.get("cached").lower() == "true" self.cached = cached_value.lower() == "true"
self.etag = q.get("etag") self.etag = q.get("etag")
for m in q: for m in q:
...@@ -4883,8 +4957,12 @@ class Permissions(): ...@@ -4883,8 +4957,12 @@ class Permissions():
for e in xml: for e in xml:
if e.tag == "Permission": if e.tag == "Permission":
self._perms.add(Permission(name=e.get("name"), name = e.get("name")
description=e.get("description"))) if name is None:
raise LinkAheadException(
"The permission element has no name attribute."
)
self._perms.add(Permission(name=name, description=e.get("description")))
def __contains__(self, p): def __contains__(self, p):
if isinstance(p, Permission): if isinstance(p, Permission):
...@@ -4917,15 +4995,18 @@ def parse_xml(xml: Union[str, etree._Element]): ...@@ -4917,15 +4995,18 @@ def parse_xml(xml: Union[str, etree._Element]):
def _parse_single_xml_element(elem: etree._Element): def _parse_single_xml_element(elem: etree._Element):
classmap = { classmap = {
'record': Record, "record": Record,
'recordtype': RecordType, "recordtype": RecordType,
'property': Property, "property": Property,
'file': File, "file": File,
'parent': Parent, "parent": Parent,
'entity': Entity} "entity": Entity,
}
if elem.tag.lower() in classmap: if elem.tag.lower() in classmap:
klass = classmap.get(elem.tag.lower()) klass = classmap.get(elem.tag.lower())
if klass is None:
raise LinkAheadException("No class for tag '{}' found.".format(elem.tag))
entity = klass() entity = klass()
Entity._from_xml(entity, elem) Entity._from_xml(entity, elem)
...@@ -4953,8 +5034,8 @@ def _parse_single_xml_element(elem: etree._Element): ...@@ -4953,8 +5034,8 @@ def _parse_single_xml_element(elem: etree._Element):
return Message(type='History', description=elem.get("transaction")) return Message(type='History', description=elem.get("transaction"))
elif elem.tag.lower() == 'stats': elif elem.tag.lower() == 'stats':
counts = elem.find("counts") counts = elem.find("counts")
attrib = str(counts.attrib) if counts is not None else None
return Message(type="Counts", description=None, body=counts.attrib) return Message(type="Counts", description=None, body=attrib)
elif elem.tag == "EntityACL": elif elem.tag == "EntityACL":
return ACL(xml=elem) return ACL(xml=elem)
elif elem.tag == "Permissions": elif elem.tag == "Permissions":
...@@ -4962,11 +5043,19 @@ def _parse_single_xml_element(elem: etree._Element): ...@@ -4962,11 +5043,19 @@ def _parse_single_xml_element(elem: etree._Element):
elif elem.tag == "UserInfo": elif elem.tag == "UserInfo":
return UserInfo(xml=elem) return UserInfo(xml=elem)
elif elem.tag == "TimeZone": elif elem.tag == "TimeZone":
return TimeZone(zone_id=elem.get("id"), offset=elem.get("offset"), return TimeZone(
display_name=elem.text.strip()) zone_id=elem.get("id"),
offset=elem.get("offset"),
display_name=elem.text.strip() if elem.text is not None else "",
)
else: else:
return Message(type=elem.tag, code=elem.get( code = elem.get("code")
"code"), description=elem.get("description"), body=elem.text) return Message(
type=elem.tag,
code=int(code) if code is not None else None,
description=elem.get("description"),
body=elem.text,
)
def _evaluate_and_add_error(parent_error: TransactionError, ent: Union[Entity, Container]): def _evaluate_and_add_error(parent_error: TransactionError, ent: Union[Entity, Container]):
...@@ -5000,7 +5089,7 @@ def _evaluate_and_add_error(parent_error: TransactionError, ent: Union[Entity, C ...@@ -5000,7 +5089,7 @@ def _evaluate_and_add_error(parent_error: TransactionError, ent: Union[Entity, C
if err.code is not None: if err.code is not None:
if int(err.code) == 101: # ent doesn't exist if int(err.code) == 101: # ent doesn't exist
new_exc = EntityDoesNotExistError(entity=ent, new_exc: EntityError = EntityDoesNotExistError(entity=ent,
error=err) error=err)
elif int(err.code) == 110: # ent has no data type elif int(err.code) == 110: # ent has no data type
new_exc = EntityHasNoDatatypeError(entity=ent, new_exc = EntityHasNoDatatypeError(entity=ent,
......
...@@ -353,6 +353,9 @@ class UnqualifiedPropertiesError(EntityError): ...@@ -353,6 +353,9 @@ class UnqualifiedPropertiesError(EntityError):
""" """
class EntityHasNoAclError(EntityError):
"""This entity has no ACL (yet)."""
class EntityDoesNotExistError(EntityError): class EntityDoesNotExistError(EntityError):
"""This entity does not exist.""" """This entity does not exist."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment