diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0f9a258de99ba559d280fc5ace74a3f111a9e30e..8845e4070c685230a99958fbebd9377238df32de 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -113,15 +113,8 @@ unittest_py3.13: tags: [ docker ] stage: test needs: [ ] - image: python:3.13-rc - script: - # TODO: Replace by '*python_test_script' as soon as 3.13 has been officially released. - # Python docker has problems with tox and pip so use plain pytest here - - apt update && apt install -y cargo - - touch ~/.pylinkahead.ini - - pip install pynose pytest pytest-cov jsonschema>=4.4.0 setuptools - - pip install . - - python -m pytest unittests + image: python:3.13 + script: *python_test_script # Trigger building of server image and integration tests trigger_build: diff --git a/CHANGELOG.md b/CHANGELOG.md index c18ee0ded167db384baecfbac96636e22f70b6f2..b5a0e729877076c966b2ffa207122ea46032b2bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,24 +8,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ## ### Added ### -* `ParentList` and `PropertyList` now have a `filter` function that allows to select a subset of - the contained elements by ID and/or name. + +* Official support for Python 3.13 +* Optional `realm` argument for `linkahead_admin.py set_user_password` + which defaults to `None`, i.e., the server's default realm. ### Changed ### -* `compare_entities` is now case insensitive with respect to property and - recordtype names -* `_ParentList` is now called `ParentList` -* `_Properties` is now called `PropertyList` -* `ParentList.remove` is now case insensitive when a name is used. ### Deprecated ### -* the use of the arguments `old_entity` and `new_entity` in `compare_entities` - is now deprecated. Please use `entity0` and `entity1` respectively instead. ### Removed ### ### Fixed ### +* [gitlab.indiscale.com#200](https://gitlab.indiscale.com/caosdb/src/caosdb-pylib/-/merge_requests/153) + ``linkahead_admin.py`` prints reasonable error messages when users + or roles don't exist. + ### Security ### ### Documentation ### diff --git a/examples/set_permissions.py b/examples/set_permissions.py index a558bde73897cb6827c93373cc8327efc10e6e15..4657f2cca182b567c761a777df838825f8e89aef 100755 --- a/examples/set_permissions.py +++ b/examples/set_permissions.py @@ -37,13 +37,13 @@ from caosdb import administration as admin def assert_user_and_role(): """Make sure that users and roles exist. -After calling this function, there will be a user "jane" with the role "human" -and the user "xaxys" with the role "alien". These users and roles are returned. + After calling this function, there will be a user "jane" with the role "human" + and the user "xaxys" with the role "alien". These users and roles are returned. -Returns -------- -out : tuple - ((human_user, human_role), (alien_user, alien_role)) + Returns + ------- + out : tuple + ((human_user, human_role), (alien_user, alien_role)) """ try: @@ -81,15 +81,15 @@ out : tuple def get_entities(count=1): """Retrieve one or more entities. -Parameters ----------- -count : int, optional - How many entities to retrieve. + Parameters + ---------- + count : int, optional + How many entities to retrieve. -Returns -------- -out : Container - A container of retrieved entities, the length is given by the parameter count. + Returns + ------- + out : Container + A container of retrieved entities, the length is given by the parameter count. """ cont = db.execute_query("FIND RECORD 'Human Food'", flags={ "P": "0L{n}".format(n=count)}) @@ -102,20 +102,20 @@ out : Container def set_permission(role_grant, role_deny, cont=None, general=False): """Set the permissions of some entities. -Parameters ----------- -role_grant : str - Role which is granted permissions. + Parameters + ---------- + role_grant : str + Role which is granted permissions. -role_deny : str - Role which is denied permissions. + role_deny : str + Role which is denied permissions. -cont : Container - Entities for which permissions are set. + cont : Container + Entities for which permissions are set. -general : bool, optional - If True, the permissions for the roles will be set. If False (the default), - permissions for the entities in the container will be set. + general : bool, optional + If True, the permissions for the roles will be set. If False (the default), + permissions for the entities in the container will be set. """ # Set general permissions @@ -143,23 +143,23 @@ general : bool, optional def test_permission(granted_user, denied_user, cont): """Tests if the permissions are set correctly for two users. -Parameters ----------- -granted_user : (str, str) - The user which should have permissions to retrieve the entities in `cont`. - Given as (user, password). + Parameters + ---------- + granted_user : (str, str) + The user which should have permissions to retrieve the entities in `cont`. + Given as (user, password). -denied_user : (str, str) - The user which should have no permission to retrieve the entities in `cont`. - Given as (user, password). + denied_user : (str, str) + The user which should have no permission to retrieve the entities in `cont`. + Given as (user, password). -cont : Container - Entities for which permissions are tested. + cont : Container + Entities for which permissions are tested. -Returns -------- -None + Returns + ------- + None """ diff --git a/src/doc/tutorials/complex_data_models.rst b/src/doc/tutorials/complex_data_models.rst index b33bc9ae49e9a850a73e32e396735652ecaab7d7..569acdae174a9df9d0d2b5eae9a0084d793cc90c 100644 --- a/src/doc/tutorials/complex_data_models.rst +++ b/src/doc/tutorials/complex_data_models.rst @@ -75,58 +75,3 @@ Examples b = input("Press any key to cleanup.") # cleanup everything after the user presses any button. c.delete() - - -Finding parents and properties --------- -To find a specific parent or property of an Entity, its -ParentList or PropertyList can be filtered using names, ids, or -entities. A short example: - -.. code-block:: python3 - - import linkahead as db - - # Setup a record with four properties - r = db.Record() - p1 = db.Property(id=101, name="Property 1") - p2 = db.Property(id=102, name="Property 2") - p3_1 = db.Property(id=103, name="Property 3") - p3_2 = db.Property(id=103, name="Property 3") - p4 = db.Property(name="Property") - p5 = db.Property(name="Property") - r.add_property(p1).add_property(p2).add_property(p3_1) - r.add_property(p3_2).add_property(p4).add_property(p5) - properties = r.properties - - # As r only has one property with id 101, this returns a list containing only p1 - properties.filter(pid=101) - # Result: [p1] - - # Filtering with name="Property" returns both p4 and p5, as they share their name - properties.filter(name="Property") - # Result: [p4, p5] - - # Filtering with name="Property 1" and id=102 returns both p1 and p2, because - # any property matching either criterion is returned: - properties.filter(name="Property 1", pid="102") - # Result: [p1, p2] - - p6 = db.Property(name="Property 2") - r.add_property(p6) - # If we want to find properties matching one specific property, we can also filter using - # the entity itself. In this case, only properties matching both name and id are returned, - # as long as both are set. - properties.filter(p2) - # Result: [p2] - # As p6 does not have an id yet, both candidates matching its name are returned - properties.filter(p6) - # Result: [p2, p6] - # Similarly if we match using name and id parameters, all candidates matching either are returned - properties.filter(name=p2.name, pid=p2.id) - # Result: [p2, p6], because p2 and p6 share a name - # And if both name and id match, there may also be several results when matching an entity - properties.filter(p3_1) - # Result: [p3_1, p3_2], because they share both their name and id - -The filter function of ParentList works analogously. diff --git a/src/linkahead/__init__.py b/src/linkahead/__init__.py index 567748e3b3a58fb73b91f652d82ed10f818d6014..cd54f8f4e05326579521fbbf226f027d32fa616e 100644 --- a/src/linkahead/__init__.py +++ b/src/linkahead/__init__.py @@ -42,7 +42,7 @@ from .common.datatype import (BOOLEAN, DATETIME, DOUBLE, FILE, INTEGER, LIST, REFERENCE, TEXT) # Import of the basic API classes: from .common.models import (ACL, ALL, FIX, NONE, OBLIGATORY, RECOMMENDED, - SUGGESTED, Container, DropOffBox, Entity, File, Parent, + SUGGESTED, Container, DropOffBox, Entity, File, Info, Message, Permissions, Property, Query, QueryTemplate, Record, RecordType, delete, execute_query, get_global_acl, diff --git a/src/linkahead/apiutils.py b/src/linkahead/apiutils.py index d51171c7c59fd0ae8ee202db224f2597f3e9cdae..4307caa531f2a2a0d8e68dd4ced5240e0e2c5b83 100644 --- a/src/linkahead/apiutils.py +++ b/src/linkahead/apiutils.py @@ -28,11 +28,10 @@ """ from __future__ import annotations - import logging import warnings from collections.abc import Iterable -from typing import Any, Optional, Union +from typing import Any, Union, Optional from .common.datatype import is_reference from .common.models import (SPECIAL_ATTRIBUTES, Container, Entity, File, @@ -180,27 +179,19 @@ def getCommitIn(folder): return get_commit_in(folder) -def compare_entities(entity0: Optional[Entity] = None, - entity1: Optional[Entity] = None, - compare_referenced_records: bool = False, - entity_name_id_equivalency: bool = False, - old_entity: Optional[Entity] = None, - new_entity: Optional[Entity] = None, +def compare_entities(old_entity: Entity, + new_entity: Entity, + compare_referenced_records: bool = False ) -> tuple[dict[str, Any], dict[str, Any]]: - """Compare two entities. - - Returns two dicts listing the differences between the two entities. The - order of the two returned dicts corresponds to the two input entities. - The dicts contain two keys, 'parents' and 'properties'. The list saved - under the 'parents' key contains those parents of the respective entity - that are missing in the other entity, and the 'properties' dict contains - properties and SPECIAL_ATTRIBUTES if they are missing or different from - their counterparts in the other entity. - - The value of the properties dict for each listed property is again a dict - detailing the differences between this property and its counterpart. - The characteristics that are checked to determine whether two properties - match are the following: + """Compare two entites. + + Return a tuple of dictionaries, the first index belongs to additional information for old + entity, the second index belongs to additional information for new entity. + + Additional information means in detail: + - Additional parents (a list under key "parents") + - Information about properties: + - Each property lists either an additional property or a property with a changed: - datatype - importance - value @@ -211,241 +202,152 @@ def compare_entities(entity0: Optional[Entity] = None, value is not added to the dict. If a property is of type LIST, the comparison is order-sensitive. - Comparison of multi-properties is not yet supported, so should either - entity have several instances of one Property, the comparison is aborted - and an error is raised. + In case of changed information the value listed under the respective key shows the + value that is stored in the respective entity. - Two parents match if their name and id are the same, any further - differences are ignored. + If `compare_referenced_records` is `True`, also referenced entities will be + compared using this function (which is then called with + `compare_referenced_records = False` to prevent infinite recursion in case + of circular references). - Should records referenced in the value field not be checked for equality - between the entities but for equivalency, this is possible by setting the - parameter compare_referenced_records. + Parameters + ---------- + old_entity, new_entity : Entity + Entities to be compared + compare_referenced_records : bool, optional + Whether to compare referenced records in case of both, `old_entity` and + `new_entity`, have the same reference properties and both have a Record + object as value. If set to `False`, only the corresponding Python + objects are compared which may lead to unexpected behavior when + identical records are stored in different objects. Default is False. - Params - ------ - entity0 : Entity - First entity to be compared. - entity1 : Entity - Second entity to be compared. - compare_referenced_records: bool, default: False - If set to True, values with referenced records - are not checked for equality but for - equivalency using this function. - compare_referenced_records is set to False for - these recursive calls, so references of - references need to be equal. If set to `False`, - only the Python objects are compared, which may - lead to unexpected behavior. - entity_name_id_equivalency: bool, default: False - If set to True, the comparison between an - entity and an int or str also checks whether - the int/str matches the name or id of the - entity, so Entity(id=100) == 100 == "100". """ - # ToDo: Discuss intended behaviour - # Questions that need clarification: - # - What is intended behaviour for multi-properties and multi-parents? - # - Do different inheritance levels for parents count as a difference? - # - Do we care about parents and properties of properties? - # - Should there be a more detailed comparison of parents without id? - # - Revisit filter - do we care about RecordType when matching? - # How to treat None? - # Suggestions for enhancements: - # - For the comparison of entities in value and properties, consider - # keeping a list of traversed entities, not only look at first layer - # - Make the empty_diff functionality faster by adding a parameter to - # this function so that it returns after the first found difference? - # - Add parameter to restrict diff to some characteristics - if entity0 is None and old_entity is None: - raise ValueError("Please provide the first entity as first argument (`entity0`)") - if entity1 is None and new_entity is None: - raise ValueError("Please provide the second entity as second argument (`entity1`)") - if old_entity is not None: - warnings.warn("Please use 'entity0' instead of 'old_entity'.", DeprecationWarning) - if entity0 is not None: - raise ValueError("You cannot use both, entity0 and old_entity") - entity0 = old_entity - if new_entity is not None: - warnings.warn("Please use 'entity1' instead of 'new_entity'.", DeprecationWarning) - if entity1 is not None: - raise ValueError("You cannot use both, entity1 and new_entity") - entity1 = new_entity - - diff: tuple = ({"properties": {}, "parents": []}, - {"properties": {}, "parents": []}) - - if entity0 is entity1: - return diff - - if type(entity0) is not type(entity1): + olddiff: dict[str, Any] = {"properties": {}, "parents": []} + newdiff: dict[str, Any] = {"properties": {}, "parents": []} + + if old_entity is new_entity: + return (olddiff, newdiff) + + if type(old_entity) is not type(new_entity): raise ValueError( "Comparison of different Entity types is not supported.") - # compare special attributes for attr in SPECIAL_ATTRIBUTES: - if attr == "value": + try: + oldattr = old_entity.__getattribute__(attr) + old_entity_attr_exists = True + except BaseException: + old_entity_attr_exists = False + try: + newattr = new_entity.__getattribute__(attr) + new_entity_attr_exists = True + except BaseException: + new_entity_attr_exists = False + + if old_entity_attr_exists and (oldattr == "" or oldattr is None): + old_entity_attr_exists = False + + if new_entity_attr_exists and (newattr == "" or newattr is None): + new_entity_attr_exists = False + + if not old_entity_attr_exists and not new_entity_attr_exists: continue - attr0 = entity0.__getattribute__(attr) - # we consider "" and None to be nonexistent - attr0_unset = (attr0 == "" or attr0 is None) + if ((old_entity_attr_exists ^ new_entity_attr_exists) + or (oldattr != newattr)): - attr1 = entity1.__getattribute__(attr) - # we consider "" and None to be nonexistent - attr1_unset = (attr1 == "" or attr1 is None) + if old_entity_attr_exists: + olddiff[attr] = oldattr - # in both entities the current attribute is not set - if attr0_unset and attr1_unset: - continue + if new_entity_attr_exists: + newdiff[attr] = newattr - # treat datatype separately if one datatype is an object and the other - # a string or int, and therefore may be a name or id - if attr == "datatype": - if not attr0_unset and not attr1_unset: - if isinstance(attr0, RecordType): - if attr0.name == attr1: - continue - if str(attr0.id) == str(attr1): - continue - if isinstance(attr1, RecordType): - if attr1.name == attr0: - continue - if str(attr1.id) == str(attr0): - continue + # properties # add to diff if attr has different values or is not set for one entity if (attr0_unset != attr1_unset) or (attr0 != attr1): diff[0][attr] = attr0 diff[1][attr] = attr1 - # compare value - ent0_val, ent1_val = entity0.value, entity1.value - if ent0_val != ent1_val: - same_value = False - - # Surround scalar values with a list to avoid code duplication - - # this way, the scalar values can be checked against special cases - # (compare refs, entity id equivalency etc.) in the list loop - if not isinstance(ent0_val, list) and not isinstance(ent1_val, list): - ent0_val, ent1_val = [ent0_val], [ent1_val] - - if isinstance(ent0_val, list) and isinstance(ent1_val, list): - # lists can't be the same if the lengths are different - if len(ent0_val) == len(ent1_val): - lists_match = True - for val0, val1 in zip(ent0_val, ent1_val): - if val0 == val1: - continue - # Compare Entities - if (compare_referenced_records and - isinstance(val0, Entity) and isinstance(val1, Entity)): - try: - same = empty_diff(val0, val1, False, - entity_name_id_equivalency) - except (ValueError, NotImplementedError): - same = False - if same: - continue - # Compare Entity name and id - if entity_name_id_equivalency: - if (isinstance(val0, Entity) - and isinstance(val1, (int, str))): - if (str(val0.id) == str(val1) - or str(val0.name) == str(val1)): - continue - if (isinstance(val1, Entity) - and isinstance(val0, (int, str))): - if (str(val1.id) == str(val0) - or str(val1.name) == str(val0)): - continue - # val0 and val1 could not be matched - lists_match = False - break - if lists_match: - same_value = True - - if not same_value: - diff[0]["value"] = entity0.value - diff[1]["value"] = entity1.value - - # compare properties - for prop in entity0.properties: - matching = entity1.properties.filter(prop, check_wrapped=False) if len(matching) == 0: - # entity1 has prop, entity0 does not - diff[0]["properties"][prop.name] = {} + olddiff["properties"][prop.name] = {} elif len(matching) == 1: - diff[0]["properties"][prop.name] = {} - diff[1]["properties"][prop.name] = {} - propdiff = (diff[0]["properties"][prop.name], - diff[1]["properties"][prop.name]) - - # We should compare the wrapped properties instead of the - # wrapping entities if possible: - comp1, comp2 = prop, matching[0] - if (comp1._wrapped_entity is not None - and comp2._wrapped_entity is not None): - comp1, comp2 = comp1._wrapped_entity, comp2._wrapped_entity - # Recursive call to determine the differences between properties - # Note: Can lead to infinite recursion if two properties have - # themselves or each other as subproperties - od, nd = compare_entities(comp1, comp2, compare_referenced_records, - entity_name_id_equivalency) - # We do not care about parents and properties here, discard - od.pop("parents") - od.pop("properties") - nd.pop("parents") - nd.pop("properties") - # use the remaining diff - propdiff[0].update(od) - propdiff[1].update(nd) - - # As the importance of a property is an attribute of the record - # and not the property, it is not contained in the diff returned - # by compare_entities and needs to be added separately - if (entity0.get_importance(prop) != - entity1.get_importance(matching[0])): - propdiff[0]["importance"] = entity0.get_importance(prop) - propdiff[1]["importance"] = entity1.get_importance(matching[0]) - - # in case there is no difference, we remove the dict keys again - if len(propdiff[0]) == 0 and len(propdiff[1]) == 0: - diff[0]["properties"].pop(prop.name) - diff[1]["properties"].pop(prop.name) + newdiff["properties"][prop.name] = {} + olddiff["properties"][prop.name] = {} + + if (old_entity.get_importance(prop.name) != + new_entity.get_importance(prop.name)): + olddiff["properties"][prop.name]["importance"] = \ + old_entity.get_importance(prop.name) + newdiff["properties"][prop.name]["importance"] = \ + new_entity.get_importance(prop.name) + + if (prop.datatype != matching[0].datatype): + olddiff["properties"][prop.name]["datatype"] = prop.datatype + newdiff["properties"][prop.name]["datatype"] = \ + matching[0].datatype + + if (prop.unit != matching[0].unit): + olddiff["properties"][prop.name]["unit"] = prop.unit + newdiff["properties"][prop.name]["unit"] = \ + matching[0].unit + + if (prop.value != matching[0].value): + # basic comparison of value objects says they are different + same_value = False + if compare_referenced_records: + # scalar reference + if isinstance(prop.value, Entity) and isinstance(matching[0].value, Entity): + # explicitely not recursive to prevent infinite recursion + same_value = empty_diff( + prop.value, matching[0].value, compare_referenced_records=False) + # list of references + elif isinstance(prop.value, list) and isinstance(matching[0].value, list): + # all elements in both lists actually are entity objects + # TODO: check, whether mixed cases can be allowed or should lead to an error + if (all([isinstance(x, Entity) for x in prop.value]) + and all([isinstance(x, Entity) for x in matching[0].value])): + # can't be the same if the lengths are different + if len(prop.value) == len(matching[0].value): + # do a one-by-one comparison: + # the values are the same if all diffs are empty + same_value = all( + [empty_diff(x, y, False) for x, y + in zip(prop.value, matching[0].value)]) + + if not same_value: + olddiff["properties"][prop.name]["value"] = prop.value + newdiff["properties"][prop.name]["value"] = \ + matching[0].value + + if (len(newdiff["properties"][prop.name]) == 0 + and len(olddiff["properties"][prop.name]) == 0): + newdiff["properties"].pop(prop.name) + olddiff["properties"].pop(prop.name) else: raise NotImplementedError( "Comparison not implemented for multi-properties.") - # we have not yet compared properties that do not exist in entity0 - for prop in entity1.properties: - # check how often the property appears in entity0 - num_prop_in_ent0 = len(entity0.properties.filter(prop)) - if num_prop_in_ent0 == 0: - # property is only present in entity0 - add to diff - diff[1]["properties"][prop.name] = {} - if num_prop_in_ent0 > 1: - # Check whether the property is present multiple times in entity0 - # and raise error - result would be incorrect - raise NotImplementedError( - "Comparison not implemented for multi-properties.") + for prop in new_entity.properties: + if len([0 for p in old_entity.properties if p.name == prop.name]) == 0: + newdiff["properties"][prop.name] = {} + + # parents - # compare parents - for index, parents, other_entity in [(0, entity0.parents, entity1), - (1, entity1.parents, entity0)]: - for parent in parents: - matching = other_entity.parents.filter(parent) - if len(matching) == 0: - diff[index]["parents"].append(parent.name) - continue + for parent in old_entity.parents: + if len([0 for p in new_entity.parents if p.name == parent.name]) == 0: + olddiff["parents"].append(parent.name) - return diff + for parent in new_entity.parents: + if len([0 for p in old_entity.parents if p.name == parent.name]) == 0: + newdiff["parents"].append(parent.name) + + return (olddiff, newdiff) def empty_diff(old_entity: Entity, new_entity: Entity, - compare_referenced_records: bool = False, - entity_name_id_equivalency: bool = False) -> bool: + compare_referenced_records: bool = False) -> bool: """Check whether the `compare_entities` found any differences between old_entity and new_entity. @@ -457,13 +359,10 @@ def empty_diff(old_entity: Entity, new_entity: Entity, Whether to compare referenced records in case of both, `old_entity` and `new_entity`, have the same reference properties and both have a Record object as value. - entity_name_id_equivalency : bool, optional - If set to True, the comparison between an entity and an int or str also - checks whether the int/str matches the name or id of the entity, so - Entity(id=100) == 100 == "100". + """ - olddiff, newdiff = compare_entities(old_entity, new_entity, - compare_referenced_records, entity_name_id_equivalency) + olddiff, newdiff = compare_entities( + old_entity, new_entity, compare_referenced_records) for diff in [olddiff, newdiff]: for key in ["parents", "properties"]: if len(diff[key]) > 0: @@ -485,9 +384,9 @@ def merge_entities(entity_a: Entity, ) -> Entity: """Merge entity_b into entity_a such that they have the same parents and properties. - The attributes datatype, unit, value, name and description will only be changed - in entity_a if they are None for entity_a and set for entity_b. If one of those attributes is - set in both entities and they differ, then an + datatype, unit, value, name and description will only be changed in entity_a + if they are None for entity_a and set for entity_b. If there is a + corresponding value for entity_a different from None, an EntityMergeConflictError will be raised to inform about an unresolvable merge conflict. @@ -495,6 +394,8 @@ def merge_entities(entity_a: Entity, Returns entity_a. + WARNING: This function is currently experimental and insufficiently tested. Use with care. + Parameters ---------- entity_a, entity_b : Entity @@ -527,10 +428,12 @@ def merge_entities(entity_a: Entity, """ + logger.warning( + "This function is currently experimental and insufficiently tested. Use with care.") + # Compare both entities: - diff_r1, diff_r2 = compare_entities(entity_a, entity_b, - entity_name_id_equivalency=merge_id_with_resolved_entity, - compare_referenced_records=merge_references_with_empty_diffs) + diff_r1, diff_r2 = compare_entities( + entity_a, entity_b, compare_referenced_records=merge_references_with_empty_diffs) # Go through the comparison and try to apply changes to entity_a: for key in diff_r2["parents"]: @@ -550,8 +453,7 @@ def merge_entities(entity_a: Entity, for attribute in ("datatype", "unit", "value"): if (attribute in diff_r2["properties"][key] and diff_r2["properties"][key][attribute] is not None): - if (attribute not in diff_r1["properties"][key] or - diff_r1["properties"][key][attribute] is None): + if (diff_r1["properties"][key][attribute] is None): setattr(entity_a.get_property(key), attribute, diff_r2["properties"][key][attribute]) elif force: @@ -619,18 +521,6 @@ def merge_entities(entity_a: Entity, def describe_diff(olddiff, newdiff, name=None, as_update=True): - """ - This function generates a textual representation of the differences between two entities that have been generated - using compare_entities. - - Arguments: - ---------- - olddiff: The diff output for the entity marked as "old". - newdiff: The diff output for the entity marked as "new". - - Example: - >>> describe_diff(*compare_entities(db.Record().add_property("P"), "value", db.Record())) - """ description = "" for attr in list(set(list(olddiff.keys()) + list(newdiff.keys()))): diff --git a/src/linkahead/cached.py b/src/linkahead/cached.py index 6715c16629659ff79ea66de7fea127f81317bf3c..f9179e1a54997bf99f7158b9b46e2d1068a21e47 100644 --- a/src/linkahead/cached.py +++ b/src/linkahead/cached.py @@ -107,7 +107,7 @@ If a query phrase is given, the result must be unique. If this is not what you def cached_query(query_string: str) -> Container: """A cached version of :func:`linkahead.execute_query<linkahead.common.models.execute_query>`. -All additional arguments are at their default values. + All additional arguments are at their default values. """ result = _cached_access(AccessType.QUERY, query_string, unique=False) @@ -116,7 +116,7 @@ All additional arguments are at their default values. return result -@lru_cache(maxsize=DEFAULT_SIZE) +@ lru_cache(maxsize=DEFAULT_SIZE) def _cached_access(kind: AccessType, value: Union[str, int], unique: bool = True): # This is the function that is actually cached. # Due to the arguments, the cache has kind of separate sections for cached_query and @@ -161,11 +161,12 @@ def cache_clear() -> None: def cache_info(): """Return info about the cache that is used by `cached_query` and `cached_get_entity_by`. -Returns -------- + Returns + ------- -out: named tuple - See the standard library :func:`functools.lru_cache` for details.""" + out: named tuple + See the standard library :func:`functools.lru_cache` for details. + """ return _cached_access.cache_info() @@ -188,21 +189,21 @@ def cache_fill(items: dict[Union[str, int], Any], This allows to fill the cache without actually submitting queries. Note that this does not overwrite existing entries with the same keys. -Parameters ----------- + Parameters + ---------- -items: dict - A dictionary with the entries to go into the cache. The keys must be compatible with the - AccessType given in ``kind`` + items: dict + A dictionary with the entries to go into the cache. The keys must be compatible with the + AccessType given in ``kind`` -kind: AccessType, optional - The AccessType, for example ID, name, path or query. + kind: AccessType, optional + The AccessType, for example ID, name, path or query. -unique: bool, optional - If True, fills the cache for :func:`cached_get_entity_by`, presumably with - :class:`linkahead.Entity<linkahead.common.models.Entity>` objects. If False, the cache should be - filled with :class:`linkahead.Container<linkahead.common.models.Container>` objects, for use with - :func:`cached_query`. + unique: bool, optional + If True, fills the cache for :func:`cached_get_entity_by`, presumably with + :class:`linkahead.Entity<linkahead.common.models.Entity>` objects. If False, the cache should be + filled with :class:`linkahead.Container<linkahead.common.models.Container>` objects, for use with + :func:`cached_query`. """ diff --git a/src/linkahead/common/administration.py b/src/linkahead/common/administration.py index dee341fa84dd85cbd41a77c0e2d510a96f2c4824..28ef107579fccb689b7337aed65e054cfbf36c05 100644 --- a/src/linkahead/common/administration.py +++ b/src/linkahead/common/administration.py @@ -345,20 +345,20 @@ def _get_roles(username, realm=None, **kwargs): def _set_permissions(role, permission_rules, **kwargs): """Set permissions for a role. -Parameters ----------- + Parameters + ---------- -role : str - The role for which the permissions are set. + role : str + The role for which the permissions are set. -permission_rules : iterable<PermissionRule> - An iterable with PermissionRule objects. + permission_rules : iterable<PermissionRule> + An iterable with PermissionRule objects. -**kwargs : - Additional arguments which are passed to the HTTP request. + **kwargs : + Additional arguments which are passed to the HTTP request. -Returns -------- + Returns + ------- None """ xml = etree.Element("PermissionRules") @@ -393,15 +393,15 @@ def _get_permissions(role, **kwargs): class PermissionRule(): """Permission rules. -Parameters ----------- -action : str - Either "grant" or "deny" + Parameters + ---------- + action : str + Either "grant" or "deny" -permission : str - For example ``RETRIEVE:*``. + permission : str + For example ``RETRIEVE:*``. -priority : bool, optional + priority : bool, optional Whether the priority shall be set, defaults is False. """ diff --git a/src/linkahead/common/models.py b/src/linkahead/common/models.py index 5ae04331901bf702d9e683282cd5ec95d76826ba..5689e0799f51584a39c187fd106c4d1b09e9cfc7 100644 --- a/src/linkahead/common/models.py +++ b/src/linkahead/common/models.py @@ -37,10 +37,8 @@ from __future__ import annotations # Can be removed with 3.10. import re import sys -import warnings from builtins import str from copy import deepcopy -from enum import Enum from datetime import date, datetime from functools import cmp_to_key from hashlib import sha512 @@ -48,6 +46,7 @@ from os import listdir from os.path import isdir from random import randint from tempfile import NamedTemporaryFile + from typing import TYPE_CHECKING from typing import Any, Final, Literal, Optional, TextIO, Union @@ -58,6 +57,7 @@ if TYPE_CHECKING: from os import PathLike QueryDict = dict[str, Optional[str]] + from warnings import warn from lxml import etree @@ -114,8 +114,8 @@ if TYPE_CHECKING: IMPORTANCE = Literal["OBLIGATORY", "RECOMMENDED", "SUGGESTED", "FIX", "NONE"] ROLE = Literal["Entity", "Record", "RecordType", "Property", "File"] -SPECIAL_ATTRIBUTES = ["name", "role", "datatype", "description", "file", - "id", "path", "checksum", "size", "value", "unit"] +SPECIAL_ATTRIBUTES = ["name", "role", "datatype", "description", + "id", "path", "checksum", "size", "value"] class Entity: @@ -138,10 +138,10 @@ class Entity: description: Optional[str] = None, # @ReservedAssignment datatype: Optional[DATATYPE] = None, value=None, - role=None, + **kwargs, ): - self.__role: Optional[ROLE] = role + self.__role: Optional[ROLE] = kwargs["role"] if "role" in kwargs else None self._checksum: Optional[str] = None self._size = None self._upload = None @@ -156,8 +156,8 @@ class Entity: self.datatype: Optional[DATATYPE] = datatype self.value = value self.messages = Messages() - self.properties = PropertyList() - self.parents = ParentList() + self.properties = _Properties() + self.parents = _ParentList() self.path: Optional[str] = None self.file: Optional[File] = None self.unit: Optional[str] = None @@ -873,29 +873,29 @@ class Entity: check. Note that, if checked, name or ID should not be None, lest the check fail. -Parameters ----------- + Parameters + ---------- -parent: Entity - Check for this parent. + parent: Entity + Check for this parent. -recursive: bool, optional - Whether to check recursively. + recursive: bool, optional + Whether to check recursively. -check_name: bool, optional - Whether to use the name for ancestry check. + check_name: bool, optional + Whether to use the name for ancestry check. -check_id: bool, optional - Whether to use the ID for ancestry check. + check_id: bool, optional + Whether to use the ID for ancestry check. -retrieve: bool, optional - If False, do not retrieve parents from the server. + retrieve: bool, optional + If False, do not retrieve parents from the server. -Returns -------- -out: bool - True if ``parent`` is a true parent, False otherwise. -""" + Returns + ------- + out: bool + True if ``parent`` is a true parent, False otherwise. + """ if recursive: parents = self.get_parents_recursively(retrieve=retrieve) @@ -922,7 +922,7 @@ out: bool def get_parents(self): """Get all parents of this entity. - @return: ParentList(list) + @return: _ParentList(list) """ return self.parents @@ -930,17 +930,17 @@ out: bool def get_parents_recursively(self, retrieve: bool = True) -> list[Entity]: """Get all ancestors of this entity. -Parameters ----------- + Parameters + ---------- -retrieve: bool, optional - If False, do not retrieve parents from the server. + retrieve: bool, optional + If False, do not retrieve parents from the server. -Returns -------- -out: list[Entity] - The parents of this Entity -""" + Returns + ------- + out: list[Entity] + The parents of this Entity + """ all_parents: list[Entity] = [] self._get_parent_recursively(all_parents, retrieve=retrieve) @@ -1022,7 +1022,7 @@ out: list[Entity] def get_properties(self): """Get all properties of this entity. - @return: PropertyList(list) + @return: _Properties(list) """ return self.properties @@ -1598,15 +1598,15 @@ out: list[Entity] unique=True, flags=None, sync=True): """Update this entity. -There are two possible work-flows to perform this update: -First: - 1) retrieve an entity - 2) do changes - 3) call update method + There are two possible work-flows to perform this update: + First: + 1) retrieve an entity + 2) do changes + 3) call update method -Second: - 1) construct entity with id - 2) call update method. + Second: + 1) construct entity with id + 2) call update method. For slight changes the second one it is more comfortable. Furthermore, it is possible to stay off-line until calling the update method. The name, description, unit, datatype, path, @@ -2422,14 +2422,11 @@ class File(Record): value=value, unit=unit, importance=importance, inheritance=inheritance) -class PropertyList(list): - """A list class for Property objects - - This class provides addional functionality like get/set_importance or get_by_name. - """ +class _Properties(list): + """FIXME: Add docstring.""" def __init__(self): - super().__init__() + list.__init__(self) self._importance: dict[Entity, IMPORTANCE] = dict() self._inheritance: dict[Entity, INHERITANCE] = dict() self._element_by_name: dict[str, Entity] = dict() @@ -2522,47 +2519,6 @@ class PropertyList(list): return xml2str(xml) - def filter(self, prop: Optional[Property] = None, pid: Union[None, str, int] = None, - name: Optional[str] = None, check_wrapped: bool = True) -> list: - """ - Return all Properties from the given PropertyList that match the - selection criteria. - - You can provide name or ID and all matching elements will be returned. - If both name and ID are given, elements matching either criterion will - be returned. - - If a Property is given, neither name nor ID may be set. In this case, - only elements matching both name and ID of the Property are returned. - - Also checks the original Properties wrapped within the elements of - PropertyList and will return the original Property if both wrapper and - original match. - - Params - ------ - listobject : Iterable(Property) - List to be filtered - prop : Property - Property to match name and ID with. Cannot be - set simultaneously with ID or name. - pid : str, int - Property ID to match - name : str - Property name to match - check_wrapped : bool, default: True - If set to False, only the wrapper elements - contained in the given PropertyList will be - checked, not the original Properties they wrap. - - Returns - ------- - matches : list - List containing all matching Properties - """ - return _filter_entity_list(self, pid=pid, name=name, entity=prop, - check_wrapped=check_wrapped) - def _get_entity_by_cuid(self, cuid: str): ''' Get the first entity which has the given cuid. @@ -2620,7 +2576,9 @@ class PropertyList(list): raise KeyError(str(prop) + " not found.") -class ParentList(list): +class _ParentList(list): + # TODO unclear why this class is private. Isn't it use full for users? + def _get_entity_by_cuid(self, cuid): ''' Get the first entity which has the given cuid. @@ -2635,8 +2593,8 @@ class ParentList(list): return e raise KeyError("No entity with that cuid in this container.") - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self): + list.__init__(self) self._element_by_name = dict() self._element_by_id = dict() @@ -2649,9 +2607,15 @@ class ParentList(list): if isinstance(parent, list): for p in parent: self.append(p) + return if isinstance(parent, Entity): + if parent.id: + self._element_by_id[str(parent.id)] = parent + + if parent.name: + self._element_by_name[parent.name] = parent list.append(self, parent) else: raise TypeError("Argument was not an Entity") @@ -2693,62 +2657,7 @@ class ParentList(list): return xml2str(xml) - def filter(self, parent: Optional[Parent] = None, pid: Union[None, str, int] = None, - name: Optional[str] = None, check_wrapped: bool = True) -> list: - """ - Return all Parents from the given ParentList that match the selection - criteria. - - You can provide name or ID and all matching elements will be returned. - If both name and ID are given, elements matching either criterion will - be returned. - - If a Parent is given, neither name nor ID may be set. In this case, - only elements matching both name and ID of the Parent are returned. - - Also checks the original Parents wrapped within the elements of - ParentList, will return the original Parent if both wrapper and - original match. - - Params - ------ - listobject : Iterable(Parent) - List to be filtered - parent : Parent - Parent to match name and ID with. Cannot be set - pid : str, int - Parent ID to match - name : str - Parent name to match - simultaneously with ID or name. - check_wrapped : bool, default: True - If set to False, only the wrapper elements - contained in the given ParentList will be - checked, not the original Parents they wrap. - - Returns - ------- - matches : list - List containing all matching Parents - """ - return _filter_entity_list(self, pid=pid, name=name, entity=parent, - check_wrapped=check_wrapped) - def remove(self, parent: Union[Entity, int, str]): - """ - Remove first occurrence of parent. - - Parameters - ---------- - parent: Union[Entity, int, str], the parent to be removed identified via ID or name. If a - Parent object is provided the ID and then the name is used to identify the parent to be - removed. - - Returns - ------- - None - """ - if isinstance(parent, Entity): if parent in self: list.remove(self, parent) @@ -2766,11 +2675,11 @@ class ParentList(list): # by name for e in self: - if e.name is not None and e.name.lower() == parent.name.lower(): + if e.name is not None and e.name == parent.name: list.remove(self, e) return - elif isinstance(parent, str): + elif hasattr(parent, "encode"): # by name for e in self: @@ -2789,19 +2698,6 @@ class ParentList(list): raise KeyError(str(parent) + " not found.") -class _Properties(PropertyList): - def __init__(self, *args, **kwargs): - warnings.warn(DeprecationWarning("This class is deprecated. Please use PropertyList.")) - super().__init__(*args, **kwargs) - - -class _ParentList(ParentList): - def __init__(self, *args, **kwargs): - warnings.warn(DeprecationWarning("This class is deprecated. Please use ParentList " - "(without underscore).")) - super().__init__(*args, **kwargs) - - class Messages(list): """This specialization of list stores error, warning, info, and other messages. The mentioned three messages types play a special role. @@ -5496,100 +5392,3 @@ def delete(ids: Union[list[int], range], raise_exception_on_error: bool = True): c.append(Entity(id=ids)) return c.delete(raise_exception_on_error=raise_exception_on_error) - - -def _filter_entity_list(listobject, entity: Optional[Entity] = None, pid: Union[None, str, int] = None, - name: Optional[str] = None, check_wrapped: bool = True) -> list: - """ - Return all elements from the given list that match the selection criteria. - - You can provide name or ID and all matching elements will be returned. - If both name and ID are given, elements matching either criterion will be - returned. - - If an Entity is given, neither name nor ID may be set. In this case, only - elements matching both name and ID of the Entity are returned, as long as - name and ID are both set. - - In case the elements contained in the given list are wrapped, the function - in its default configuration checks both the wrapped and wrapper Entity - against the match criteria, and will return the wrapped Entity if both - match. Note that this is currently not iterative, meaning that only the - first layer of wrapped entity is considered. - - Params - ------ - listobject : Iterable(Entity) - List to be filtered - entity : Entity - Entity to match name and ID for. Cannot be set - simultaneously with ID or name. - pid : str, int - Entity ID to match - name : str - Entity name to match - check_wrapped : bool, default: True - If set to False, only the wrapper elements - contained in the given list will be checked and - returned, not the original Entities they wrap. - - Returns - ------- - matches : list - A List containing all matching Entities - """ - # Check correct input params and setup - match_entity = False - if entity is not None: - if pid is not None or name is not None: - raise ValueError("Please provide either Entity, pid or name.") - pid = entity.id - name = entity.name - match_entity = True - - # Iterate through list and match based on given criteria - matches = [] - potentials = list(zip(listobject.copy(), [False]*len(listobject))) - for candidate, wrapped_is_checked in potentials: - name_match, pid_match, original_candidate = False, False, None - - # Parents/Properties may be wrapped - if wanted, try to match original - # Note: if we want to check all wrapped Entities, this should be - # switched. First check the wrap, then append wrapped. In this - # case we also don't need wrapped_checked, but preferentially - # append the wrapper. - if check_wrapped and not wrapped_is_checked: - try: - if candidate._wrapped_entity is not None: - original_candidate = candidate - candidate = candidate._wrapped_entity - except AttributeError: - pass - - # Check whether name/pid match - # pid and candidate.id might be int and str, so cast to str (None safe) - if pid is not None and str(candidate.id) == str(pid): - pid_match = True - elif match_entity and pid is None: - # Without a pid we cannot match - pid_match = True - if (name is not None and candidate.name is not None - and candidate.name.lower() == name.lower()): - name_match = True - elif match_entity and name is None: - # Without a name we cannot match - name_match = True - - # If the criteria are satisfied, append the match. Otherwise, check - # the wrapper if applicable - # ToDo: Check whether it would make sense to also check the RecordType - # for equality when match_entity is true to offset potentially - # missing id - if name_match and pid_match: - matches.append(candidate) - elif not match_entity and (name_match or pid_match): - matches.append(candidate) - else: - if original_candidate is not None: - potentials.append((original_candidate, True)) - return matches diff --git a/src/linkahead/common/versioning.py b/src/linkahead/common/versioning.py index 2e292e6bb031725fbd6da618c4b888c05072c46b..11cf5f6904b02954eb0b2bddc16478590df167e7 100644 --- a/src/linkahead/common/versioning.py +++ b/src/linkahead/common/versioning.py @@ -105,7 +105,7 @@ class Version(): is_head: Union[bool, str, None] = False, is_complete_history: Union[bool, str, None] = False): """Typically the `predecessors` or `successors` should not "link back" to an existing Version -object.""" + object.""" self.id = id self.date = date self.username = username diff --git a/src/linkahead/utils/create_revision.py b/src/linkahead/utils/create_revision.py index 5f6ecc8148859d0ee0908412ff80d20d465cdb25..cde4bae5b0d919977d220b2c35896dcb20e933e7 100644 --- a/src/linkahead/utils/create_revision.py +++ b/src/linkahead/utils/create_revision.py @@ -34,15 +34,15 @@ def bend_references(from_id, to_id, except_for=None): and those references are changed to point to to_id. entities having an id listed in except_for are excluded. -Parameters ----------- + Parameters + ---------- -from_id : int - the old object to which references where pointing -to_id : int - the new object to which references will be pointing -except_for : list of int - entities with id of this list will not be changed + from_id : int + the old object to which references where pointing + to_id : int + the new object to which references will be pointing + except_for : list of int + entities with id of this list will not be changed """ if except_for is None: except_for = [to_id] @@ -73,16 +73,16 @@ def create_revision(old_id, prop, value): This function changes the record with id old_id. The value of the propertye prop is changed to value. -Parameters ----------- + Parameters + ---------- -old_id : int - id of the record to be changed -prop : string - name of the property to be changed -value : type of corresponding property - the new value of the corresponding property -""" + old_id : int + id of the record to be changed + prop : string + name of the property to be changed + value : type of corresponding property + the new value of the corresponding property + """ record = db.execute_query("FIND {}".format(old_id))[0] new_rec = record.copy() new_rec.get_property(prop).value = value diff --git a/src/linkahead/utils/get_entity.py b/src/linkahead/utils/get_entity.py index 0ffd89e4dc7f214bbc72d4508f6ca4481dad7d9c..dd91cdc27b3f6adb52ddef36a59d1a0965fb662e 100644 --- a/src/linkahead/utils/get_entity.py +++ b/src/linkahead/utils/get_entity.py @@ -30,13 +30,13 @@ from .escape import escape_squoted_text def get_entity_by_name(name: str, role: Optional[str] = None) -> Entity: """Return the result of a unique query that uses the name to find the correct entity. -Submits the query "FIND {role} WITH name='{name}'". + Submits the query "FIND {role} WITH name='{name}'". -Parameters ----------- + Parameters + ---------- -role: str, optional - The role for the query, defaults to ``ENTITY``. + role: str, optional + The role for the query, defaults to ``ENTITY``. """ name = escape_squoted_text(name) if role is None: @@ -48,13 +48,13 @@ role: str, optional def get_entity_by_id(eid: Union[str, int], role: Optional[str] = None) -> Entity: """Return the result of a unique query that uses the id to find the correct entity. -Submits the query "FIND {role} WITH id='{eid}'". + Submits the query "FIND {role} WITH id='{eid}'". -Parameters ----------- + Parameters + ---------- -role: str, optional - The role for the query, defaults to ``ENTITY``. + role: str, optional + The role for the query, defaults to ``ENTITY``. """ if role is None: role = "ENTITY" @@ -65,13 +65,13 @@ role: str, optional def get_entity_by_path(path: str) -> Entity: """Return the result of a unique query that uses the path to find the correct file. -Submits the query "FIND {role} WHICH IS STORED AT '{path}'". + Submits the query "FIND {role} WHICH IS STORED AT '{path}'". -Parameters ----------- + Parameters + ---------- -role: str, optional - The role for the query, defaults to ``ENTITY``. + role: str, optional + The role for the query, defaults to ``ENTITY``. """ # type hint can be ignored, it's a unique query return execute_query(f"FIND FILE WHICH IS STORED AT '{path}'", unique=True) # type: ignore diff --git a/src/linkahead/utils/linkahead_admin.py b/src/linkahead/utils/linkahead_admin.py index f7e3b8b63f18e37e6210f2aa03f34ce5b0f688d4..ca5f3c01e0bbe95fe712761ec7f443ec88d406fd 100755 --- a/src/linkahead/utils/linkahead_admin.py +++ b/src/linkahead/utils/linkahead_admin.py @@ -33,7 +33,7 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter import linkahead as db from linkahead import administration as admin -from linkahead.exceptions import HTTPClientError +from linkahead.exceptions import HTTPClientError, HTTPResourceNotFoundError, HTTPForbiddenError __all__ = [] __version__ = 0.3 @@ -42,19 +42,42 @@ __updated__ = '2018-12-11' def do_update_role(args): - admin._update_role(name=args.role_name, description=args.role_description) + """ + Update the description of a role. + + Allowed keyword arguments: + role_name: Name of the role to update + role_description: New description of the role + """ + try: + admin._update_role(name=args.role_name, description=args.role_description) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot update role '{args.role_name}', " + f"reason: '{e.msg}'") def do_create_role(args): - admin._insert_role(name=args.role_name, description=args.role_description) + try: + admin._insert_role(name=args.role_name, description=args.role_description) + except (HTTPClientError, HTTPForbiddenError) as e: + print(f"Error: Cannot create role '{args.role_name}', " + f"reason: '{e.msg}'") def do_retrieve_role(args): - print(admin._retrieve_role(name=args.role_name)) + try: + print(admin._retrieve_role(name=args.role_name)) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot retrieve role '{args.role_name}', " + f"reason: '{e.msg}'") def do_delete_role(args): - admin._delete_role(name=args.role_name) + try: + admin._delete_role(name=args.role_name) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot delete role '{args.role_name}', " + f"reason: '{e.msg}'") def do_retrieve(args): @@ -123,25 +146,27 @@ def do_create_user(args): try: admin._insert_user(name=args.user_name, email=args.user_email, password=password) - if args.activate_user: do_activate_user(args) - except HTTPClientError as e: - print(e.msg) + except (HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot create user '{args.user_name}', " + f"reason: '{e.msg}'") def do_activate_user(args): try: admin._update_user(name=args.user_name, status="ACTIVE") - except HTTPClientError as e: - print(e.msg) + except (HTTPResourceNotFoundError, HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot activate user '{args.user_name}', " + f"reason: '{e.msg}'") def do_deactivate_user(args): try: admin._update_user(name=args.user_name, status="INACTIVE") - except HTTPClientError as e: - print(e.msg) + except (HTTPResourceNotFoundError, HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot deactivate user '{args.user_name}', " + f"reason: '{e.msg}'") def do_set_user_password(args): @@ -150,58 +175,110 @@ def do_set_user_password(args): else: password = args.user_password try: - admin._update_user(name=args.user_name, password=password) - except HTTPClientError as e: - print(e.msg) + admin._update_user(name=args.user_name, password=password, realm=args.realm) + except (HTTPResourceNotFoundError, HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot set password for user '{args.user_name}', " + f"reason: '{e.msg}'") def do_add_user_roles(args): - roles = admin._get_roles(username=args.user_name, realm=None) + try: + roles = admin._get_roles(username=args.user_name, realm=None) + except (HTTPForbiddenError, HTTPResourceNotFoundError) as e: + print(f"Error: Cannot access roles for user '{args.user_name}', " + f"reason: '{e.msg}'") + return for r in args.user_roles: roles.add(r) - admin._set_roles(username=args.user_name, roles=roles) + try: + admin._set_roles(username=args.user_name, roles=roles) + except (HTTPResourceNotFoundError, HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot add new roles for user '{args.user_name}', " + f"reason: '{e.msg}'") def do_remove_user_roles(args): - roles = admin._get_roles(username=args.user_name, realm=None) + try: + roles = admin._get_roles(username=args.user_name, realm=None) + except (HTTPForbiddenError, HTTPResourceNotFoundError) as e: + print(f"Error: Cannot access roles for user '{args.user_name}', " + f"reason: '{e.msg}'") + return for r in args.user_roles: if r in roles: roles.remove(r) - admin._set_roles(username=args.user_name, roles=roles) + try: + admin._set_roles(username=args.user_name, roles=roles) + except (HTTPResourceNotFoundError, HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot remove roles from user '{args.user_name}', " + f"reason: '{e.msg}'") def do_set_user_entity(args): - admin._update_user(name=args.user_name, entity=args.user_entity) + try: + admin._update_user(name=args.user_name, entity=args.user_entity) + except (HTTPResourceNotFoundError, HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot set entity for user '{args.user_name}', " + f"reason: '{e.msg}'") def do_reset_user_entity(args): - admin._update_user(name=args.user_name, entity="") + try: + admin._update_user(name=args.user_name, entity="") + except (HTTPResourceNotFoundError, HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot remove entity for user '{args.user_name}', " + f"reason: '{e.msg}'") def do_set_user_email(args): - admin._update_user(name=args.user_name, email=args.user_email) + try: + admin._update_user(name=args.user_name, email=args.user_email) + except (HTTPResourceNotFoundError, HTTPForbiddenError, HTTPClientError) as e: + print(f"Error: Cannot set email for user '{args.user_name}', " + f"reason: '{e.msg}'") def do_retrieve_user(args): - print(admin._retrieve_user(name=args.user_name)) + try: + print(admin._retrieve_user(name=args.user_name)) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot retrieve user '{args.user_name}', " + f"reason: '{e.msg}'") def do_delete_user(args): - admin._delete_user(name=args.user_name) + try: + admin._delete_user(name=args.user_name) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot delete user '{args.user_name}', " + f"reason: '{e.msg}'") def do_retrieve_user_roles(args): - print(admin._get_roles(username=args.user_name)) + try: + print(admin._get_roles(username=args.user_name)) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot retrieve roles for user '{args.user_name}', " + f"reason: '{e.msg}'") def do_retrieve_role_permissions(args): - print(admin._get_permissions(role=args.role_name)) + try: + print(admin._get_permissions(role=args.role_name)) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot retrieve permissions for role '{args.role_name}', " + f"reason: '{e.msg}'") def do_grant_role_permissions(args): - perms = admin._get_permissions(args.role_name) + try: + perms = admin._get_permissions(role=args.role_name) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot access permissions for role '{args.role_name}', " + f"reason: '{e.msg}'") + return for p in args.role_permissions: g = admin.PermissionRule( @@ -215,11 +292,20 @@ def do_grant_role_permissions(args): if d in perms: perms.remove(d) perms.add(g) - admin._set_permissions(role=args.role_name, permission_rules=perms) + try: + admin._set_permissions(role=args.role_name, permission_rules=perms) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot set permissions for role '{args.role_name}', " + f"reason: '{e.msg}'") def do_revoke_role_permissions(args): - perms = admin._get_permissions(args.role_name) + try: + perms = admin._get_permissions(role=args.role_name) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot access permissions for role '{args.role_name}', " + f"reason: '{e.msg}'") + return for p in args.role_permissions: g = admin.PermissionRule( @@ -232,11 +318,20 @@ def do_revoke_role_permissions(args): if d in perms: perms.remove(d) - admin._set_permissions(role=args.role_name, permission_rules=perms) + try: + admin._set_permissions(role=args.role_name, permission_rules=perms) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot revoke permissions for role '{args.role_name}', " + f"reason: '{e.msg}'") def do_deny_role_permissions(args): - perms = admin._get_permissions(args.role_name) + try: + perms = admin._get_permissions(role=args.role_name) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot access permissions for role '{args.role_name}', " + f"reason: '{e.msg}'") + return for p in args.role_permissions: g = admin.PermissionRule( @@ -250,7 +345,11 @@ def do_deny_role_permissions(args): if d in perms: perms.remove(d) perms.add(d) - admin._set_permissions(role=args.role_name, permission_rules=perms) + try: + admin._set_permissions(role=args.role_name, permission_rules=perms) + except (HTTPResourceNotFoundError, HTTPForbiddenError) as e: + print(f"Error: Cannot deny permissions for role '{args.role_name}', " + f"reason: '{e.msg}'") def do_retrieve_entity_acl(args): @@ -364,6 +463,12 @@ USAGE metavar='USERNAME', dest="user_name", help="The name of the user who's password is to be set.") + subparser.add_argument( + metavar='REALM', + dest="realm", + nargs="?", + default=None, + help="The realm of the user who's password is to be set.") subparser.add_argument( metavar='PASSWORD', nargs="?", diff --git a/unittests/test_apiutils.py b/unittests/test_apiutils.py index cdca1280b790e09225320330dd3e4a89989ef17b..087880768516f0da97858d65a5d15247308dca2a 100644 --- a/unittests/test_apiutils.py +++ b/unittests/test_apiutils.py @@ -25,7 +25,6 @@ # Test apiutils # A. Schlemmer, 02/2018 -from io import StringIO import linkahead as db import linkahead.apiutils @@ -97,7 +96,6 @@ def test_resolve_reference(): def test_compare_entities(): - # test compare of parents, properties r1 = db.Record() r2 = db.Record() r1.add_parent("bla") @@ -146,6 +144,9 @@ def test_compare_entities(): # test compare units of properties r1 = db.Record() r2 = db.Record() + r1.add_parent("bla") + r2.add_parent("bla") + r1.add_parent("lopp") r1.add_property("test", value=2, unit="cm") r2.add_property("test", value=2, unit="m") r1.add_property("tests", value=3, unit="cm") @@ -157,6 +158,8 @@ def test_compare_entities(): diff_r1, diff_r2 = compare_entities(r1, r2) + assert len(diff_r1["parents"]) == 1 + assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 4 assert len(diff_r2["properties"]) == 4 @@ -173,207 +176,14 @@ def test_compare_entities(): assert diff_r2["properties"]["test"]["unit"] == "m" -def test_compare_entities_battery(): - par1, par2, par3 = db.Record(), db.Record(), db.RecordType() - r1, r2, r3 = db.Record(), db.Record(), db.Record() - prop1 = db.Property() - prop2 = db.Property(name="Property 2") - prop3 = db.Property() - - # Basic tests for Properties - prop_settings = {"datatype": db.REFERENCE, "description": "desc of prop", - "value": db.Record().add_parent(par3), "unit": '°'} - t1 = db.Record().add_parent(db.RecordType()) - t2 = db.Record().add_parent(db.RecordType()) - # Change datatype - t1.add_property(db.Property(name="datatype", **prop_settings)) - prop_settings["datatype"] = par3 - t2.add_property(db.Property(name="datatype", **prop_settings)) - # Change description - t1.add_property(db.Property(name="description", **prop_settings)) - prop_settings["description"] = "diff desc" - t2.add_property(db.Property(name="description", **prop_settings)) - # Change value to copy - t1.add_property(db.Property(name="value copy", **prop_settings)) - prop_settings["value"] = db.Record().add_parent(par3) - t2.add_property(db.Property(name="value copy", **prop_settings)) - # Change value to something different - t1.add_property(db.Property(name="value", **prop_settings)) - prop_settings["value"] = db.Record(name="n").add_parent(par3) - t2.add_property(db.Property(name="value", **prop_settings)) - # Change unit - t1.add_property(db.Property(name="unit", **prop_settings)) - prop_settings["unit"] = db.Property(unit='°') - t2.add_property(db.Property(name="unit", **prop_settings)) - # Change unit again - t1.add_property(db.Property(name="unit 2", **prop_settings)) - prop_settings["unit"] = db.Property() - t2.add_property(db.Property(name="unit 2", **prop_settings)) - # Compare - diff_0 = compare_entities(t1, t2) - diff_1 = compare_entities(t1, t2, compare_referenced_records=True) - # Check correct detection of changes - assert diff_0[0]["properties"]["datatype"] == {"datatype": db.REFERENCE} - assert diff_0[1]["properties"]["datatype"] == {"datatype": par3} - assert diff_0[0]["properties"]["description"] == {"description": "desc of prop"} - assert diff_0[1]["properties"]["description"] == {"description": "diff desc"} - assert "value" in diff_0[0]["properties"]["value copy"] - assert "value" in diff_0[1]["properties"]["value copy"] - assert "value" in diff_0[0]["properties"]["value"] - assert "value" in diff_0[1]["properties"]["value"] - assert "unit" in diff_0[0]["properties"]["unit"] - assert "unit" in diff_0[1]["properties"]["unit"] - assert "unit" in diff_0[0]["properties"]["unit 2"] - assert "unit" in diff_0[1]["properties"]["unit 2"] - # Check correct result for compare_referenced_records=True - assert "value copy" not in diff_1[0]["properties"] - assert "value copy" not in diff_1[1]["properties"] - diff_0[0]["properties"].pop("value copy") - diff_0[1]["properties"].pop("value copy") - assert diff_0 == diff_1 - - # Basic tests for Parents - t3 = db.Record().add_parent(db.RecordType("A")).add_parent(db.Record("B")) - t4 = db.Record().add_parent(db.RecordType("A")) - assert compare_entities(t3, t4)[0]['parents'] == ['B'] - assert len(compare_entities(t3, t4)[1]['parents']) == 0 - t4.add_parent(db.Record("B")) - assert empty_diff(t3, t4) - # The two following assertions document current behaviour but do not make a - # lot of sense - t4.add_parent(db.Record("B")) - assert empty_diff(t3, t4) - t3.add_parent(db.RecordType("A")).add_parent(db.Record("B")) - t4.add_parent(db.RecordType("B")).add_parent(db.Record("A")) - assert empty_diff(t3, t4) - - # Basic tests for special attributes - prop_settings = {"id": 42, "name": "Property", - "datatype": db.LIST(db.REFERENCE), "value": [db.Record()], - "unit": '€', "description": "desc of prop"} - alt_settings = {"id": 64, "name": "Property 2", - "datatype": db.LIST(db.TEXT), "value": [db.RecordType()], - "unit": '€€', "description": " ę Ě ப ཾ ཿ ∛ ∜ ㅿ ㆀ 값 "} - t5 = db.Property(**prop_settings) - t6 = db.Property(**prop_settings) - assert empty_diff(t5, t6) - # ID - t5.id = alt_settings['id'] - diff = compare_entities(t5, t6) - assert diff[0] == {'properties': {}, 'parents': [], 'id': alt_settings['id']} - assert diff[1] == {'properties': {}, 'parents': [], 'id': prop_settings['id']} - t6.id = alt_settings['id'] - assert empty_diff(t5, t6) - # Name - t5.name = alt_settings['name'] - diff = compare_entities(t5, t6) - assert diff[0] == {'properties': {}, 'parents': [], 'name': alt_settings['name']} - assert diff[1] == {'properties': {}, 'parents': [], 'name': prop_settings['name']} - t6.name = alt_settings['name'] - assert empty_diff(t5, t6) - # Description - t6.description = alt_settings['description'] - diff = compare_entities(t5, t6) - assert diff[0] == {'properties': {}, 'parents': [], 'description': prop_settings['description']} - assert diff[1] == {'properties': {}, 'parents': [], 'description': alt_settings['description']} - t5.description = alt_settings['description'] - assert empty_diff(t5, t6) - # Unit - t5.unit = alt_settings['unit'] - diff = compare_entities(t5, t6) - assert diff[0] == {'properties': {}, 'parents': [], 'unit': alt_settings['unit']} - assert diff[1] == {'properties': {}, 'parents': [], 'unit': prop_settings['unit']} - t6.unit = alt_settings['unit'] - assert empty_diff(t5, t6) - # Value - t6.value = alt_settings['value'] - diff = compare_entities(t5, t6) - assert diff[0] == {'properties': {}, 'parents': [], 'value': prop_settings['value']} - assert diff[1] == {'properties': {}, 'parents': [], 'value': alt_settings['value']} - t5.value = alt_settings['value'] - assert empty_diff(t5, t6) - # Datatype - t6.datatype = alt_settings['datatype'] - diff = compare_entities(t5, t6) - assert diff[0] == {'properties': {}, 'parents': [], 'datatype': prop_settings['datatype']} - assert diff[1] == {'properties': {}, 'parents': [], 'datatype': alt_settings['datatype']} - t5.datatype = alt_settings['datatype'] - assert empty_diff(t5, t6) - # All at once - diff = compare_entities(db.Property(**prop_settings), db.Property(**alt_settings)) - assert diff[0] == {'properties': {}, 'parents': [], **prop_settings} - assert diff[1] == {'properties': {}, 'parents': [], **alt_settings} - # Entity Type - diff = compare_entities(db.Property(value=db.Property(id=101)), - db.Property(value=db.Record(id=101))) - assert "value" in diff[0] - assert "value" in diff[1] - diff = compare_entities(db.Property(value=db.Record(id=101)), - db.Property(value=db.Record(id=101))) - assert "value" in diff[0] - assert "value" in diff[1] - assert empty_diff(db.Property(value=db.Record(id=101)), - db.Property(value=db.Record(id=101)), - compare_referenced_records=True) - - # Special cases - # Files - assert not empty_diff(db.File(path='ABC', file=StringIO("ABC")), - db.File(path='ABC', file=StringIO("Other"))) - # Importance - assert empty_diff(db.Property().add_property(prop1), - db.Property().add_property(prop1)) - assert not empty_diff(db.Property().add_property(prop1, importance=db.SUGGESTED), - db.Property().add_property(prop1, importance=db.OBLIGATORY)) - # Mixed Lists - assert empty_diff(db.Property(value=[1, 2, 'a', r1]), - db.Property(value=[1, 2, 'a', r1])) - # entity_name_id_equivalency - assert not empty_diff(db.Property(value=[1, db.Record(id=2), 3, db.Record(id=4)]), - db.Property(value=[db.Record(id=1), 2, db.Record(id=3), 4])) - assert empty_diff(db.Property(value=[1, db.Record(id=2), 3, db.Record(id=4)]), - db.Property(value=[db.Record(id=1), 2, db.Record(id=3), 4]), - entity_name_id_equivalency=True) - assert empty_diff(db.Property(value=1), db.Property(value=db.Record(id=1)), - entity_name_id_equivalency=True) - # entity_name_id_equivalency - prop4 = db.Property(**prop_settings).add_parent(par1).add_property(prop2) - prop4_c = db.Property(**prop_settings).add_parent(par1).add_property(prop2) - prop4.value = db.Record(id=12) - prop4_c.value = '12' - prop4.add_property(db.Property(name="diff", datatype=db.LIST(db.REFERENCE), - value=[12, db.Record(id=13), par1, "abc%"])) - prop4_c.add_property(db.Property(name="diff", datatype=db.LIST(db.REFERENCE), - value=[db.Record(id=12), "13", par1, "abc%"])) - assert not empty_diff(prop4, prop4_c, entity_name_id_equivalency=False) - assert empty_diff(prop4, prop4_c, entity_name_id_equivalency=True) - # Order invariance - t7 = db.Property(**prop_settings).add_parent(par1).add_property(prop1) - t8 = db.Property(**alt_settings).add_parent(par3).add_property(prop3) - diffs_0 = compare_entities(t7, t8), compare_entities(t7, t8, True) - diffs_1 = compare_entities(t8, t7)[::-1], compare_entities(t8, t7, True)[::-1] - assert diffs_0 == diffs_1 - prop_settings = {"datatype": db.REFERENCE, "description": "desc of prop", - "value": db.Record().add_parent(par3), "unit": '°'} - t1.add_property(db.Property(name="description", **prop_settings)) - t2.add_property(db.Property(name="description", **prop_settings)) - try: - diffs_0 = compare_entities(t1, t2), compare_entities(t1, t2, True) - except Exception as e: - diffs_0 = type(e) - try: - diffs_1 = compare_entities(t2, t1)[::-1], compare_entities(t2, t1, True)[::-1] - except Exception as e: - diffs_1 = type(e) - assert diffs_0 == diffs_1 - - def test_compare_special_properties(): # Test for all known special properties: + SPECIAL_PROPERTIES = ("description", "name", + "checksum", "size", "path", "id") INTS = ("size", "id") HIDDEN = ("checksum", "size") - for key in SPECIAL_ATTRIBUTES: + for key in SPECIAL_PROPERTIES: set_key = key if key in HIDDEN: set_key = "_" + key @@ -411,7 +221,8 @@ def test_compare_special_properties(): assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 - # compare Property objects + +def test_compare_properties(): p1 = db.Property() p2 = db.Property() @@ -662,10 +473,10 @@ def test_empty_diff(): rec_a.remove_property("RefType") rec_b.remove_property("RefType") assert empty_diff(rec_a, rec_b) - rec_a.add_property(name="RefType", datatype=db.LIST("RefType"), - value=[ref_rec_a, ref_rec_a]) - rec_b.add_property(name="RefType", datatype=db.LIST("RefType"), - value=[ref_rec_b, ref_rec_b]) + rec_a.add_property(name="RefType", datatype=db.LIST( + "RefType"), value=[ref_rec_a, ref_rec_a]) + rec_b.add_property(name="RefType", datatype=db.LIST( + "RefType"), value=[ref_rec_b, ref_rec_b]) assert not empty_diff(rec_a, rec_b) assert empty_diff(rec_a, rec_b, compare_referenced_records=True) @@ -796,12 +607,13 @@ def test_merge_id_with_resolved_entity(): # Overwrite from right to left in both cases merge_entities(recA, recB, merge_id_with_resolved_entity=True) - assert recA.get_property(rtname).value == ref_rec + assert recA.get_property(rtname).value == ref_id + assert recA.get_property(rtname).value == recB.get_property(rtname).value recA = db.Record().add_property(name=rtname, value=ref_rec) merge_entities(recB, recA, merge_id_with_resolved_entity=True) - assert recB.get_property(rtname).value == ref_id - assert recA.get_property(rtname).value == ref_rec + assert recB.get_property(rtname).value == ref_rec + assert recA.get_property(rtname).value == recB.get_property(rtname).value # id mismatches recB = db.Record().add_property(name=rtname, value=ref_id*2) @@ -817,8 +629,7 @@ def test_merge_id_with_resolved_entity(): # also works in lists: recA = db.Record().add_property( name=rtname, datatype=db.LIST(rtname), value=[ref_rec, ref_id*2]) - recB = db.Record().add_property( - name=rtname, datatype=db.LIST(rtname), value=[ref_id, ref_id*2]) + recB = db.Record().add_property(name=rtname, datatype=db.LIST(rtname), value=[ref_id, ref_id*2]) merge_entities(recA, recB, merge_id_with_resolved_entity=True) - assert recA.get_property(rtname).value == [ref_rec, ref_id*2] - assert recB.get_property(rtname).value == [ref_id, ref_id*2] + assert recA.get_property(rtname).value == [ref_id, ref_id*2] + assert recA.get_property(rtname).value == recB.get_property(rtname).value diff --git a/unittests/test_entity.py b/unittests/test_entity.py index 66cffbefe207820ddde1463f62a88788beb7df9a..abf82f0a9b557cf9d1d2365e01fedaa4eae0c565 100644 --- a/unittests/test_entity.py +++ b/unittests/test_entity.py @@ -22,17 +22,14 @@ # ** end header # """Tests for the Entity class.""" -import os # pylint: disable=missing-docstring import unittest -from pytest import raises +from lxml import etree -import linkahead -from linkahead import (INTEGER, Entity, Property, Record, RecordType, Parent, +import os +from linkahead import (INTEGER, Entity, Property, Record, RecordType, configure_connection) -from linkahead.common.models import SPECIAL_ATTRIBUTES from linkahead.connection.mockup import MockUpServerConnection -from lxml import etree UNITTESTDIR = os.path.dirname(os.path.abspath(__file__)) @@ -85,13 +82,7 @@ class TestEntity(unittest.TestCase): self.assertEqual(entity.to_xml().tag, "Property") def test_instantiation(self): - e = Entity() - for attr in SPECIAL_ATTRIBUTES: - assert hasattr(e, attr) - - def test_instantiation_bad_argument(self): - with self.assertRaises(Exception): - Entity(rol="File") + self.assertRaises(Exception, Entity()) def test_parse_role(self): """During parsing, the role of an entity is set explicitely. All other @@ -106,135 +97,3 @@ class TestEntity(unittest.TestCase): # test whether the __role property of this object has explicitely been # set. self.assertEqual(getattr(entity, "_Entity__role"), "Record") - - -def test_parent_list(): - p1 = RecordType(name="A") - pl = linkahead.common.models.ParentList([p1]) - assert p1 in pl - assert pl.index(p1) == 0 - assert RecordType(name="A") not in pl - assert RecordType(id=101) not in pl - p2 = RecordType(id=101) - pl.append(p2) - assert p2 in pl - assert len(pl) == 2 - p3 = RecordType(id=103, name='B') - pl.append(p3) - assert len(pl) == 3 - - # test removal - # remove by id only, even though element in parent list has name and id - pl.remove(RecordType(id=103)) - assert len(pl) == 2 - assert p3 not in pl - assert p2 in pl - assert p1 in pl - # Same for removal by name - pl.append(p3) - assert len(pl) == 3 - pl.remove(RecordType(name='B')) - assert len(pl) == 2 - assert p3 not in pl - # And an error if no suitable element can be found - with raises(KeyError) as ve: - pl.remove(RecordType(id=105, name='B')) - assert "not found" in str(ve.value) - assert len(pl) == 2 - - # TODO also check pl1 == pl2 - - -def test_property_list(): - # TODO: Resolve parent-list TODOs, then transfer to here. - # TODO: What other considerations have to be done with properties? - p1 = Property(name="A") - pl = linkahead.common.models.PropertyList() - pl.append(p1) - assert p1 in pl - assert Property(id=101) not in pl - p2 = Property(id=101) - pl.append(p2) - assert p1 in pl - assert p2 in pl - p3 = Property(id=103, name='B') - pl.append(p3) - - -def test_filter(): - rt1 = RecordType(id=100) - rt2 = RecordType(id=101, name="RT") - rt3 = RecordType() - p1 = Property(id=100) - p2 = Property(id=100) - p3 = Property(id=101, name="RT") - p4 = Property(id=102, name="P") - p5 = Property(id=103, name="P") - p6 = Property() - r1 = Record(id=100) - r2 = Record(id=100) - r3 = Record(id=101, name="RT") - r4 = Record(id=101, name="R") - r5 = Record(id=104, name="R") - r6 = Record(id=105, name="R") - test_ents = [rt1, rt2, rt3, p1, p2, p3, p4, p5, p6, r1, r2, r3, r4, r5, r6] - - # Setup - t1 = Property() - t1_props, t1_pars = t1.properties, t1.parents - t2 = Record() - t2_props, t2_pars = t2.properties, t2.parents - t3 = RecordType() - t3_props, t3_pars = t3.properties, t3.parents - test_colls = [t1_props, t1_pars, t2_props, t2_pars, t3_props, t3_pars] - for coll in test_colls: - for ent in test_ents: - assert ent not in coll - assert ent not in coll.filter(ent) - - # Checks with each type - for t, t_props, t_pars in [(t1, t1_props, t1_pars), (t2, t2_props, t2_pars), - (t3, t3_props, t3_pars)]: - # Properties - # Basic Checks - t.add_property(p1) - t.add_property(p3) - assert p1 in t_props.filter(pid=100) - assert p1 in t_props.filter(pid="100") - assert p1 not in t_props.filter(pid=101, name="RT") - for entity in [rt1, p2, r1, r2]: - assert entity not in t_props.filter(pid=100) - assert p1 in t_props.filter(entity) - # Check that direct addition (not wrapped) works - t_props.append(p2) - assert p2 in t_props.filter(pid=100) - assert p2 not in t_props.filter(pid=101, name="RT") - for entity in [rt1, r1, r2]: - assert entity not in t_props.filter(pid=100) - assert p2 in t_props.filter(entity) - - # Parents - # Filtering with both name and id - t.add_parent(r3) - t.add_parent(r5) - assert r3 in t_pars.filter(pid=101) - assert r5 not in t_pars.filter(pid=101) - assert r3 not in t_pars.filter(name="R") - assert r5 in t_pars.filter(name="R") - assert r3 in t_pars.filter(pid=101, name="R") - assert r5 in t_pars.filter(pid=101, name="R") - assert r3 in t_pars.filter(pid=104, name="RT") - assert r5 in t_pars.filter(pid=104, name="RT") - assert r3 not in t_pars.filter(pid=105, name="T") - assert r5 not in t_pars.filter(pid=105, name="T") - # Works also without id / name and with duplicate parents - for ent in test_ents: - t.add_parent(ent) - for ent in test_ents: - assert ent in t_pars.filter(ent) - for ent in [rt1, p1, p2, r1, r2]: - filtered = t_pars.filter(ent) - for ent2 in [rt1, p1, p2, r1, r2]: - assert ent2 in filtered - assert ent in t_pars.filter(pid=100) - assert ent in t_pars.filter(pid="100")