Skip to content
Snippets Groups Projects
Select Git revision
  • 338c635af7725c41d62eae378177065a4b41fdfc
  • main default protected
  • dev protected
  • f-yaml-parser-enums
  • f-fix-paths
  • f-fix-validate-to-dict
  • f-labfolder-converter
  • f-state-machine-script
  • f-xlsx-converter-warnings-errors
  • f-rename
  • f-extra-deps
  • f-more-jsonschema-export
  • f-henrik
  • f-fix-89
  • f-trigger-advanced-user-tools
  • f-real-rename-test
  • f-linkahead-rename
  • f-register-integrationtests
  • f-fix-id
  • f-h5-files
  • f-json-schema
  • v0.14.0
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.0-numpy2
  • v0.10.0
  • v0.9.0
  • v0.8.0
  • v0.7.0
  • v0.6.1
  • v0.6.0
  • v0.5.0
  • v0.4.1
  • v0.4.0
  • v0.3.1
  • v0.3.0
37 results

CHANGELOG.md

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    To find the state of this project's repository at the time of any of these versions, check out the tags.
    test_apiutils.py 19.26 KiB
    #
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
    # Copyright (C) 2022 Florian Spreckelsen <f.spreckelsen@indiscale.com>
    # Copyright (C) 2022 Daniel Hornung <d.hornung@indiscale.com>
    # Copyright (C) 2020-2022 IndiScale GmbH <info@indiscale.com>
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    #
    # Test apiutils
    # A. Schlemmer, 02/2018
    
    
    import pytest
    import caosdb as db
    import caosdb.apiutils
    from caosdb.apiutils import (apply_to_ids, compare_entities, create_id_query,
                                 empty_diff, EntityMergeConflictError,
                                 resolve_reference, merge_entities)
    
    from caosdb.common.models import SPECIAL_ATTRIBUTES
    
    
    def test_apply_to_ids():
        parent = db.RecordType(id=3456)
        rec = db.Record(id=23)
        p = db.Property(id=23345, datatype=db.INTEGER)
        rec.add_parent(parent)
        rec.add_property(p)
    
        def invert(id_):
            return id_ * -1
        apply_to_ids([rec], invert)
    
        assert invert(3456) == -3456
        assert rec.parents[0].id == -3456
        assert rec.properties[0].id == -23345
        assert rec.id == -23
    
    
    def test_id_query():
        ids = [1, 2, 3, 4, 5]
        assert create_id_query(ids) == 'FIND ENTITY WITH ID=1 OR ID=2 OR ID=3 OR '\
            'ID=4 OR ID=5'
    
    
    def test_resolve_reference():
        original_retrieve_entity_with_id = caosdb.apiutils.retrieve_entity_with_id
        caosdb.apiutils.retrieve_entity_with_id = lambda eid: db.Record(id=eid)
    
        prop = db.Property(id=1, datatype=db.REFERENCE, value=100)
        prop.is_valid = lambda: True
        items = [200, 300, 400]
        prop_list = db.Property(datatype=db.LIST(db.REFERENCE),
                                value=items)
        prop_list2 = db.Property(datatype=db.LIST(db.REFERENCE),
                                 value=[db.Record(id=500)])
        resolve_reference(prop)
        resolve_reference(prop_list)
        resolve_reference(prop_list2)
        assert prop.value.id == 100
        assert isinstance(prop.value, db.Entity)
    
        prop_list_ids = []
    
        for i in prop_list.value:
            prop_list_ids.append(i.id)
            assert isinstance(i, db.Entity)
        assert prop_list_ids == items
    
        for i in prop_list2.value:
            assert i.id == 500
            assert isinstance(i, db.Entity)
    
        no_reference = db.Property(id=5000, datatype=db.INTEGER, value=2)
        resolve_reference(no_reference)
        assert no_reference.value == 2
        assert no_reference.datatype is db.INTEGER
    
        # restore retrive_entity_with_id
        caosdb.apiutils.retrieve_entity_with_id = original_retrieve_entity_with_id
    
    
    def test_compare_entities():
        r1 = db.Record()
        r2 = db.Record()
        r1.add_parent("bla")
        r2.add_parent("bla")
        r1.add_parent("lopp")
        r1.add_property("test", value=2)
        r2.add_property("test", value=2)
        r1.add_property("tests", value=3)
        r2.add_property("tests", value=45)
        r1.add_property("tester", value=3)
        r2.add_property("tester", )
        r1.add_property("tests_234234", value=45)
        r2.add_property("tests_TT", value=45)
    
        diff_r1, diff_r2 = compare_entities(r1, r2)
    
        assert len(diff_r1["parents"]) == 1
        assert len(diff_r2["parents"]) == 0
        assert len(diff_r1["properties"]) == 3
        assert len(diff_r2["properties"]) == 3
    
        assert "test" not in diff_r1["properties"]
        assert "test" not in diff_r2["properties"]
    
        assert "tests" in diff_r1["properties"]
        assert "tests" in diff_r2["properties"]
    
        assert "tester" in diff_r1["properties"]
        assert "tester" in diff_r2["properties"]
    
        assert "tests_234234" in diff_r1["properties"]
        assert "tests_TT" in diff_r2["properties"]
    
    
    def test_compare_entities_units():
        r1 = db.Record()
        r2 = db.Record()
        r1.add_parent("bla")
        r2.add_parent("bla")
        r1.add_parent("lopp")
        r1.add_property("test", value=2, unit="cm")
        r2.add_property("test", value=2, unit="m")
        r1.add_property("tests", value=3, unit="cm")
        r2.add_property("tests", value=45, unit="cm")
        r1.add_property("tester", value=3)
        r2.add_property("tester", )
        r1.add_property("tests_234234", value=45, unit="cm")
        r2.add_property("tests_TT", value=45, unit="cm")
    
        diff_r1, diff_r2 = compare_entities(r1, r2)
    
        assert len(diff_r1["parents"]) == 1
        assert len(diff_r2["parents"]) == 0
        assert len(diff_r1["properties"]) == 4
        assert len(diff_r2["properties"]) == 4
    
        assert "tests" in diff_r1["properties"]
        assert "tests" in diff_r2["properties"]
    
        assert "tester" in diff_r1["properties"]
        assert "tester" in diff_r2["properties"]
    
        assert "tests_234234" in diff_r1["properties"]
        assert "tests_TT" in diff_r2["properties"]
    
        assert diff_r1["properties"]["test"]["unit"] == "cm"
        assert diff_r2["properties"]["test"]["unit"] == "m"
    
    
    def test_compare_special_properties():
        # Test for all known special properties:
        SPECIAL_PROPERTIES = ("description", "name",
                              "checksum", "size", "path", "id")
        INTS = ("size", "id")
        HIDDEN = ("checksum", "size")
    
        for key in SPECIAL_PROPERTIES:
            set_key = key
            if key in HIDDEN:
                set_key = "_" + key
            r1 = db.Record()
            r2 = db.Record()
            if key not in INTS:
                setattr(r1, set_key, "bla 1")
                setattr(r2, set_key, "bla 1")
            else:
                setattr(r1, set_key, 1)
                setattr(r2, set_key, 1)
    
            diff_r1, diff_r2 = compare_entities(r1, r2)
            assert key not in diff_r1
            assert key not in diff_r2
            assert len(diff_r1["parents"]) == 0
            assert len(diff_r2["parents"]) == 0
            assert len(diff_r1["properties"]) == 0
            assert len(diff_r2["properties"]) == 0
    
            if key not in INTS:
                setattr(r2, set_key, "bla test")
            else:
                setattr(r2, set_key, 2)
    
            diff_r1, diff_r2 = compare_entities(r1, r2)
            assert key in diff_r1
            assert key in diff_r2
            if key not in INTS:
                assert diff_r1[key] == "bla 1"
                assert diff_r2[key] == "bla test"
            else:
                assert diff_r1[key] == 1
                assert diff_r2[key] == 2
            assert len(diff_r1["properties"]) == 0
            assert len(diff_r2["properties"]) == 0
    
    
    @pytest.mark.xfail
    def test_compare_properties():
        p1 = db.Property()
        p2 = db.Property()
    
        diff_r1, diff_r2 = compare_entities(p1, p2)
        assert len(diff_r1["parents"]) == 0
        assert len(diff_r2["parents"]) == 0
        assert len(diff_r1["properties"]) == 0
        assert len(diff_r2["properties"]) == 0
    
        p1.importance = "SUGGESTED"
        diff_r1, diff_r2 = compare_entities(p1, p2)
        assert len(diff_r1["parents"]) == 0
        assert len(diff_r2["parents"]) == 0
        assert len(diff_r1["properties"]) == 0
        assert len(diff_r2["properties"]) == 0
        assert "importance" in diff_r1
        assert diff_r1["importance"] == "SUGGESTED"
    
        # TODO: I'm not sure why it is not like this:
        # assert diff_r2["importance"] is None
        # ... but:
        assert "importance" not in diff_r2
    
        p2.importance = "SUGGESTED"
        p1.value = 42
        p2.value = 4
    
        diff_r1, diff_r2 = compare_entities(p1, p2)
        assert len(diff_r1["parents"]) == 0
        assert len(diff_r2["parents"]) == 0
        assert len(diff_r1["properties"]) == 0
        assert len(diff_r2["properties"]) == 0
    
        # Comparing values currently does not seem to be implemented:
        assert "value" in diff_r1
        assert diff_r1["value"] == 42
        assert "value" in diff_r2
        assert diff_r2["value"] == 4
    
    
    def test_copy_entities():
        r = db.Record(name="A")
        r.add_parent(name="B")
        r.add_property(name="C", value=4, importance="OBLIGATORY")
        r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY")
        r.description = "A fancy test record"
    
        c = r.copy()
    
        assert c is not r
        assert c.name == "A"
        assert c.role == r.role
        assert c.parents[0].name == "B"
        # parent and property objects are not shared among copy and original:
        assert c.parents[0] is not r.parents[0]
    
        for i in [0, 1]:
            assert c.properties[i] is not r.properties[i]
            for special in SPECIAL_ATTRIBUTES:
                assert getattr(c.properties[i], special) == getattr(
                    r.properties[i], special)
            assert c.get_importance(
                c.properties[i]) == r.get_importance(r.properties[i])
    
    
    def test_merge_entities():
        r = db.Record(name="A")
        r.add_parent(name="B")
        r.add_property(name="C", value=4, importance="OBLIGATORY")
        r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY")
        r.description = "A fancy test record"
    
        r2 = db.Record()
        r2.add_property(name="F", value="text")
        merge_entities(r2, r)
        assert r2.get_parents()[0].name == "B"
        assert r2.get_property("C").name == "C"
        assert r2.get_property("C").value == 4
        assert r2.get_property("D").name == "D"
        assert r2.get_property("D").value == [3, 4, 7]
    
        assert r2.get_property("F").name == "F"
        assert r2.get_property("F").value == "text"
    
    
    def test_merge_bug_conflict():
        r = db.Record()
        r.add_property(name="C", value=4)
        r2 = db.Record()
        r2.add_property(name="C", value=4, datatype="TEXT")
        merge_entities(r, r2)
    
        r3 = db.Record()
        r3.add_property(name="C", value=4, datatype="INTEGER")
        with pytest.raises(EntityMergeConflictError):
            merge_entities(r3, r2)
    
    
    def test_merge_bug_109():
        rt = db.RecordType(name="TestBug")
        p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER))
    
        r_b = db.Record(name="TestRecord")
        r_b.add_parent(rt)
        r_b.add_property(p, value=[18, 19])
    
        r_a = db.Record(name="TestRecord")
        r_a.add_parent(rt)
    
        merge_entities(r_a, r_b)
    
        assert r_b.get_property("test_bug_property").value == [18, 19]
        assert r_a.get_property("test_bug_property").value == [18, 19]
    
        assert "<Value>18</Value>\n    <Value>19</Value>" in str(r_b)
        assert "<Value>18</Value>\n    <Value>19</Value>\n    <Value>18</Value>\n    <Value>19</Value>" not in str(
            r_b)
    
        assert "<Value>18</Value>\n    <Value>19</Value>" in str(r_a)
        assert "<Value>18</Value>\n    <Value>19</Value>\n    <Value>18</Value>\n    <Value>19</Value>" not in str(
            r_a)
    
    
    @pytest.mark.xfail
    def test_bug_109():
        rt = db.RecordType(name="TestBug")
        p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER))
    
        r_b = db.Record(name="TestRecord")
        r_b.add_parent(rt)
        r_b.add_property(p, value=[18, 19])
    
        r_a = db.Record(name="TestRecord")
        r_a.add_parent(rt)
        r_a.add_property(r_b.get_property("test_bug_property"))
    
        assert r_b.get_property("test_bug_property").value == [18, 19]
        assert r_a.get_property("test_bug_property").value == [18, 19]
    
        assert "<Value>18</Value>\n    <Value>19</Value>" in str(r_b)
        assert "<Value>18</Value>\n    <Value>19</Value>\n    <Value>18</Value>\n    <Value>19</Value>" not in str(
            r_b)
    
        assert "<Value>18</Value>\n    <Value>19</Value>" in str(r_a)
        assert "<Value>18</Value>\n    <Value>19</Value>\n    <Value>18</Value>\n    <Value>19</Value>" not in str(
            r_a)
    
    
    def test_wrong_merge_conflict_reference():
        """Test a wrongly detected merge conflict in case of two records referencing
        two different, but identical objects.
    
        """
        # Two identical license records will be referenced from both records to be
        # merged
        license_rt = db.RecordType(name="license")
        license_rec_a = db.Record(name="CC-BY-3.0").add_parent(license_rt)
        license_rec_b = db.Record(name="CC-BY-3.0").add_parent(license_rt)
    
        # two referencing records
        dataset_rt = db.RecordType(name="Dataset")
        title_prop = db.Property(name="title", datatype=db.TEXT)
        doi_prop = db.Property(name="DOI", datatype=db.TEXT)
        rec_a = db.Record().add_parent(dataset_rt)
        rec_a.add_property(name=license_rt.name,
                           datatype=license_rt.name, value=license_rec_a)
        rec_a.add_property(name=title_prop.name, value="Some dataset title")
    
        rec_b = db.Record().add_parent(dataset_rt)
        rec_b.add_property(name=license_rt.name,
                           datatype=license_rt.name, value=license_rec_b)
        rec_b.add_property(name=doi_prop.name, value="https://doi.org/12345.678")
    
        merge_entities(rec_a, rec_b)
        assert rec_a.get_property(license_rt.name) is not None
        assert rec_a.get_property(license_rt.name).value is not None
        assert isinstance(rec_a.get_property(license_rt.name).value, db.Record)
        assert rec_a.get_property(license_rt.name).value.name == license_rec_a.name
        assert rec_a.get_property(license_rt.name).value.name == license_rec_b.name
        assert rec_a.get_property("title").value == "Some dataset title"
        assert rec_a.get_property("doi").value == "https://doi.org/12345.678"
    
        # Reset rec_a
        rec_a = db.Record().add_parent(dataset_rt)
        rec_a.add_property(name=license_rt.name,
                           datatype=license_rt.name, value=license_rec_a)
        rec_a.add_property(name=title_prop.name, value="Some dataset title")
    
        # this does not compare referenced records, so it will fail
        with pytest.raises(EntityMergeConflictError):
            merge_entities(rec_a, rec_b, merge_references_with_empty_diffs=False)
    
        # ... as should this, of course
        rec_b.get_property(license_rt.name).value.name = "Another license"
        with pytest.raises(EntityMergeConflictError) as re:
            merge_entities(rec_a, rec_b)
    
    
    def test_empty_diff():
    
        rec_a = db.Record(name="A")
        rec_b = db.Record(name="B")
    
        assert empty_diff(rec_a, rec_a)
        assert not empty_diff(rec_a, rec_b)
    
        rec_a.add_parent(name="RT")
        rec_b.add_parent(name="RT")
        assert empty_diff(rec_a, rec_a)
        assert not empty_diff(rec_a, rec_b)
    
        rec_b.name = "A"
        assert empty_diff(rec_a, rec_b)
    
        rec_a.add_property(name="some_prop", value=1)
        assert not empty_diff(rec_a, rec_b)
    
        rec_b.add_property(name="some_prop", value=1)
        assert empty_diff(rec_a, rec_b)
    
        rec_b.get_property("some_prop").value = 2
        assert not empty_diff(rec_a, rec_b)
    
        rec_b.get_property("some_prop").value = 1
        rec_b.add_property(name="some_other_prop", value="Test")
        assert not empty_diff(rec_a, rec_b)
    
        rec_a.add_property(name="some_other_prop", value="Test")
        assert empty_diff(rec_a, rec_b)
    
        # reference identical records, but different Python Record objects
        ref_rec_a = db.Record(name="Ref").add_parent(name="RefType")
        ref_rec_b = db.Record(name="Ref").add_parent(name="RefType")
        rec_a.add_property(name="RefType", datatype="RefType", value=ref_rec_a)
        rec_b.add_property(name="RefType", datatype="RefType", value=ref_rec_b)
        # the default is `compare_referenced_records=False`, so the diff shouldn't
        # be empty (different Python objects are referenced.)
        assert not empty_diff(rec_a, rec_b)
        # when looking into the referenced record, the diffs should be empty again
        assert empty_diff(rec_a, rec_b, compare_referenced_records=True)
    
        # The same for lists of references
        rec_a.remove_property("RefType")
        rec_b.remove_property("RefType")
        assert empty_diff(rec_a, rec_b)
        rec_a.add_property(name="RefType", datatype=db.LIST(
            "RefType"), value=[ref_rec_a, ref_rec_a])
        rec_b.add_property(name="RefType", datatype=db.LIST(
            "RefType"), value=[ref_rec_b, ref_rec_b])
        assert not empty_diff(rec_a, rec_b)
        assert empty_diff(rec_a, rec_b, compare_referenced_records=True)
    
        # special case of ids
        rec_a = db.Record(id=12)
        rec_b = db.Record()
        assert not empty_diff(rec_a, rec_b)
        rec_b.id = 13
        assert not empty_diff(rec_a, rec_b)
        rec_b.id = 12
        assert empty_diff(rec_a, rec_b)
    
    
    def test_force_merge():
        """Test whether a forced merge overwrites existing properties correctly."""
    
        # name overwrite
        recA = db.Record(name="A")
        recB = db.Record(name="B")
    
        with pytest.raises(EntityMergeConflictError):
            merge_entities(recA, recB)
    
        merge_entities(recA, recB, force=True)
        assert "B" == recA.name
        # unchanged
        assert "B" == recB.name
    
        # description overwrite
        recA = db.Record()
        recA.description = "something"
        recB = db.Record()
        recB.description = "something else"
    
        with pytest.raises(EntityMergeConflictError) as emce:
            merge_entities(recA, recB)
        assert str(emce.value) == """Conflict in special attribute description:
    A: something
    B: something else"""
    
        merge_entities(recA, recB, force=True)
        assert recA.description == "something else"
        # unchanged
        assert recB.description == "something else"
    
        # property overwrite
        recA = db.Record()
        recA.add_property(name="propA", value="something")
        recB = db.Record()
        recB.add_property(name="propA", value="something else")
    
        with pytest.raises(EntityMergeConflictError):
            merge_entities(recA, recB)
    
        merge_entities(recA, recB, force=True)
        assert recA.get_property("propA").value == "something else"
        # unchanged
        assert recB.get_property("propA").value == "something else"
    
        # don't remove a property that's not in recB
        recA = db.Record()
        recA.add_property(name="propA", value="something")
        recA.add_property(name="propB", value=5.0)
        recB = db.Record()
        recB.add_property(name="propA", value="something else")
    
        merge_entities(recA, recB, force=True)
        assert recA.get_property("propA").value == "something else"
        assert recA.get_property("propB").value == 5.0
    
        # also overwrite datatypes ...
        rtA = db.RecordType()
        rtA.add_property(name="propA", datatype=db.INTEGER)
        rtB = db.RecordType()
        rtB.add_property(name="propA", datatype=db.TEXT)
    
        with pytest.raises(EntityMergeConflictError):
            merge_entities(rtA, rtB)
    
        merge_entities(rtA, rtB, force=True)
        assert rtA.get_property("propA").datatype == db.TEXT
        # unchanged
        assert rtB.get_property("propA").datatype == db.TEXT
    
        # ... and units
        recA = db.Record()
        recA.add_property(name="propA", value=5, unit="m")
        recB = db.Record()
        recB.add_property(name="propA", value=5, unit="cm")
    
        with pytest.raises(EntityMergeConflictError):
            merge_entities(recA, recB)
        merge_entities(recA, recB, force=True)
        assert recA.get_property("propA").unit == "cm"
        # unchanged
        assert recB.get_property("propA").unit == "cm"
    
    
    def test_merge_missing_list_datatype_82():
        """Merging two properties, where the list-valued one has no datatype."""
    
        recA = db.Record().add_property("a", 5, datatype="B")
        recB_with_DT = db.Record().add_property("a", [1, 2], datatype=f"LIST<{db.DOUBLE}>")
        merge_entities(recA, recB_with_DT, force=True)
        assert recA.get_property("a").datatype == f"LIST<{db.DOUBLE}>"
    
        recA = db.Record().add_property("a", 5, datatype="B")
        recB_without_DT = db.Record().add_property("a", [1, 2])
        with pytest.raises(TypeError) as te:
            merge_entities(recA, recB_without_DT, force=True)
        assert "Invalid datatype: List valued properties" in str(te.value)