diff --git a/src/linkahead/apiutils.py b/src/linkahead/apiutils.py index 49336aa8db24fba663337185c5c37a346330c4cd..31e49aa9a25ea8a58a0cdf2b5e81f6c489a36b5f 100644 --- a/src/linkahead/apiutils.py +++ b/src/linkahead/apiutils.py @@ -275,9 +275,11 @@ def compare_entities(entity0: Optional[Entity] = None, if entity1 is not None: raise ValueError("You cannot use both entity1 and new_entity") entity1 = new_entity + assert entity0 is not None + assert entity1 is not None - diff: tuple = ({"properties": {}, "parents": []}, - {"properties": {}, "parents": []}) + diff: tuple[dict[str, Any], dict[str, Any]] = ({"properties": {}, "parents": []}, + {"properties": {}, "parents": []}) if entity0 is entity1: return diff @@ -550,9 +552,10 @@ def merge_entities(entity_a: Entity, """ # Compare both entities: - diff_r1, diff_r2 = compare_entities(entity_a, entity_b, - entity_name_id_equivalency=merge_id_with_resolved_entity, - compare_referenced_records=merge_references_with_empty_diffs) + diff_r1, diff_r2 = compare_entities( + entity_a, entity_b, + entity_name_id_equivalency=merge_id_with_resolved_entity, + compare_referenced_records=merge_references_with_empty_diffs) # Go through the comparison and try to apply changes to entity_a: for key in diff_r2["parents"]: diff --git a/src/linkahead/common/administration.py b/src/linkahead/common/administration.py index 28ef107579fccb689b7337aed65e054cfbf36c05..4f39d55626b121b077105a2b0f6bb5b311febaeb 100644 --- a/src/linkahead/common/administration.py +++ b/src/linkahead/common/administration.py @@ -91,7 +91,7 @@ def get_server_properties() -> dict[str, Optional[str]]: props: dict[str, Optional[str]] = dict() for elem in xml.getroot(): - props[elem.tag] = elem.text + props[elem.tag] = str(elem.text) return props @@ -156,7 +156,10 @@ def generate_password(length: int): def _retrieve_user(name: str, realm: Optional[str] = None, **kwargs): con = get_connection() try: - return con._http_request(method="GET", path="User/" + (realm + "/" + name if realm is not None else name), **kwargs).read() + return con._http_request( + method="GET", + path="User/" + (realm + "/" + name if realm is not None else name), + **kwargs).read() except HTTPForbiddenError as e: e.msg = "You are not permitted to retrieve this user." raise @@ -198,7 +201,9 @@ def _update_user(name: str, if entity is not None: params["entity"] = str(entity) try: - return con.put_form_data(entity_uri_segment="User/" + (realm + "/" + name if realm is not None else name), params=params, **kwargs).read() + return con.put_form_data(entity_uri_segment="User/" + (realm + "/" + + name if realm is not None else name), + params=params, **kwargs).read() except HTTPResourceNotFoundError as e: e.msg = "User does not exist." raise e @@ -246,7 +251,9 @@ def _insert_user(name: str, def _insert_role(name, description, **kwargs): con = get_connection() try: - return con.post_form_data(entity_uri_segment="Role", params={"role_name": name, "role_description": description}, **kwargs).read() + return con.post_form_data(entity_uri_segment="Role", + params={"role_name": name, "role_description": description}, + **kwargs).read() except HTTPForbiddenError as e: e.msg = "You are not permitted to insert a new role." raise @@ -259,7 +266,9 @@ def _insert_role(name, description, **kwargs): def _update_role(name, description, **kwargs): con = get_connection() try: - return con.put_form_data(entity_uri_segment="Role/" + name, params={"role_description": description}, **kwargs).read() + return con.put_form_data(entity_uri_segment="Role/" + name, + params={"role_description": description}, + **kwargs).read() except HTTPForbiddenError as e: e.msg = "You are not permitted to update this role." raise @@ -301,8 +310,10 @@ def _set_roles(username, roles, realm=None, **kwargs): body = xml2str(xml) con = get_connection() try: - body = con._http_request(method="PUT", path="UserRoles/" + (realm + "/" + - username if realm is not None else username), body=body, **kwargs).read() + body = con._http_request(method="PUT", + path="UserRoles/" + (realm + "/" + + username if realm is not None else username), + body=body, **kwargs).read() except HTTPForbiddenError as e: e.msg = "You are not permitted to set this user's roles." raise @@ -369,7 +380,8 @@ def _set_permissions(role, permission_rules, **kwargs): body = xml2str(xml) con = get_connection() try: - return con._http_request(method="PUT", path="PermissionRules/" + role, body=body, **kwargs).read() + return con._http_request(method="PUT", path="PermissionRules/" + role, body=body, + **kwargs).read() except HTTPForbiddenError as e: e.msg = "You are not permitted to set this role's permissions." raise @@ -381,7 +393,9 @@ def _set_permissions(role, permission_rules, **kwargs): def _get_permissions(role, **kwargs): con = get_connection() try: - return PermissionRule._parse_body(con._http_request(method="GET", path="PermissionRules/" + role, **kwargs).read()) + return PermissionRule._parse_body(con._http_request(method="GET", + path="PermissionRules/" + role, + **kwargs).read()) except HTTPForbiddenError as e: e.msg = "You are not permitted to retrieve this role's permissions." raise @@ -429,7 +443,8 @@ class PermissionRule(): if permission is None: raise ValueError(f"Permission is missing in PermissionRule xml: {elem}") priority = PermissionRule._parse_boolean(elem.get("priority")) - return PermissionRule(elem.tag, permission, priority if priority is not None else False) + return PermissionRule(str(elem.tag), permission, + priority if priority is not None else False) @staticmethod def _parse_body(body: str): diff --git a/src/linkahead/utils/plantuml.py b/src/linkahead/utils/plantuml.py index 19594d6e856e740fe2c58c5128eead31c37485ce..59e3c34dd04c2425aef46b6d9e2411f75b747aca 100644 --- a/src/linkahead/utils/plantuml.py +++ b/src/linkahead/utils/plantuml.py @@ -130,9 +130,9 @@ def recordtypes_to_plantuml_string(iterable, classes = [el for el in iterable if isinstance(el, db.RecordType)] - dependencies = {} - inheritances = {} - properties = [p for p in iterable if isinstance(p, db.Property)] + dependencies: dict = {} + inheritances: dict = {} + properties: list = [p for p in iterable if isinstance(p, db.Property)] grouped = [g for g in iterable if isinstance(g, Grouped)] def _add_properties(c, importance=None): @@ -272,7 +272,8 @@ package \"The property P references an instance of D\" <<Rectangle>> { return result -def retrieve_substructure(start_record_types, depth, result_id_set=None, result_container=None, cleanup=True): +def retrieve_substructure(start_record_types, depth, result_id_set=None, result_container=None, + cleanup=True): """Recursively retrieves LinkAhead record types and properties, starting from given initial types up to a specific depth.