# # This file is a part of the CaosDB Project. # # Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com> # Copyright (C) 2022 Florian Spreckelsen <f.spreckelsen@indiscale.com> # Copyright (C) 2022 Daniel Hornung <d.hornung@indiscale.com> # Copyright (C) 2020-2022 IndiScale GmbH <info@indiscale.com> # Copyright (C) 2018 Research Group Biomedical Physics, # Max-Planck-Institute for Dynamics and Self-Organization Göttingen # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # # Test apiutils # A. Schlemmer, 02/2018 import pytest import caosdb as db import caosdb.apiutils from caosdb.apiutils import (apply_to_ids, compare_entities, create_id_query, empty_diff, EntityMergeConflictError, resolve_reference, merge_entities) from caosdb.common.models import SPECIAL_ATTRIBUTES def test_apply_to_ids(): parent = db.RecordType(id=3456) rec = db.Record(id=23) p = db.Property(id=23345, datatype=db.INTEGER) rec.add_parent(parent) rec.add_property(p) def invert(id_): return id_ * -1 apply_to_ids([rec], invert) assert invert(3456) == -3456 assert rec.parents[0].id == -3456 assert rec.properties[0].id == -23345 assert rec.id == -23 def test_id_query(): ids = [1, 2, 3, 4, 5] assert create_id_query(ids) == 'FIND ENTITY WITH ID=1 OR ID=2 OR ID=3 OR '\ 'ID=4 OR ID=5' def test_resolve_reference(): original_retrieve_entity_with_id = caosdb.apiutils.retrieve_entity_with_id caosdb.apiutils.retrieve_entity_with_id = lambda eid: db.Record(id=eid) prop = db.Property(id=1, datatype=db.REFERENCE, value=100) prop.is_valid = lambda: True items = [200, 300, 400] prop_list = db.Property(datatype=db.LIST(db.REFERENCE), value=items) prop_list2 = db.Property(datatype=db.LIST(db.REFERENCE), value=[db.Record(id=500)]) resolve_reference(prop) resolve_reference(prop_list) resolve_reference(prop_list2) assert prop.value.id == 100 assert isinstance(prop.value, db.Entity) prop_list_ids = [] for i in prop_list.value: prop_list_ids.append(i.id) assert isinstance(i, db.Entity) assert prop_list_ids == items for i in prop_list2.value: assert i.id == 500 assert isinstance(i, db.Entity) no_reference = db.Property(id=5000, datatype=db.INTEGER, value=2) resolve_reference(no_reference) assert no_reference.value == 2 assert no_reference.datatype is db.INTEGER # restore retrive_entity_with_id caosdb.apiutils.retrieve_entity_with_id = original_retrieve_entity_with_id def test_compare_entities(): r1 = db.Record() r2 = db.Record() r1.add_parent("bla") r2.add_parent("bla") r1.add_parent("lopp") r1.add_property("test", value=2) r2.add_property("test", value=2) r1.add_property("tests", value=3) r2.add_property("tests", value=45) r1.add_property("tester", value=3) r2.add_property("tester", ) r1.add_property("tests_234234", value=45) r2.add_property("tests_TT", value=45) diff_r1, diff_r2 = compare_entities(r1, r2) assert len(diff_r1["parents"]) == 1 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 3 assert len(diff_r2["properties"]) == 3 assert "test" not in diff_r1["properties"] assert "test" not in diff_r2["properties"] assert "tests" in diff_r1["properties"] assert "tests" in diff_r2["properties"] assert "tester" in diff_r1["properties"] assert "tester" in diff_r2["properties"] assert "tests_234234" in diff_r1["properties"] assert "tests_TT" in diff_r2["properties"] def test_compare_entities_units(): r1 = db.Record() r2 = db.Record() r1.add_parent("bla") r2.add_parent("bla") r1.add_parent("lopp") r1.add_property("test", value=2, unit="cm") r2.add_property("test", value=2, unit="m") r1.add_property("tests", value=3, unit="cm") r2.add_property("tests", value=45, unit="cm") r1.add_property("tester", value=3) r2.add_property("tester", ) r1.add_property("tests_234234", value=45, unit="cm") r2.add_property("tests_TT", value=45, unit="cm") diff_r1, diff_r2 = compare_entities(r1, r2) assert len(diff_r1["parents"]) == 1 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 4 assert len(diff_r2["properties"]) == 4 assert "tests" in diff_r1["properties"] assert "tests" in diff_r2["properties"] assert "tester" in diff_r1["properties"] assert "tester" in diff_r2["properties"] assert "tests_234234" in diff_r1["properties"] assert "tests_TT" in diff_r2["properties"] assert diff_r1["properties"]["test"]["unit"] == "cm" assert diff_r2["properties"]["test"]["unit"] == "m" def test_compare_special_properties(): # Test for all known special properties: SPECIAL_PROPERTIES = ("description", "name", "checksum", "size", "path", "id") INTS = ("size", "id") HIDDEN = ("checksum", "size") for key in SPECIAL_PROPERTIES: set_key = key if key in HIDDEN: set_key = "_" + key r1 = db.Record() r2 = db.Record() if key not in INTS: setattr(r1, set_key, "bla 1") setattr(r2, set_key, "bla 1") else: setattr(r1, set_key, 1) setattr(r2, set_key, 1) diff_r1, diff_r2 = compare_entities(r1, r2) assert key not in diff_r1 assert key not in diff_r2 assert len(diff_r1["parents"]) == 0 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 if key not in INTS: setattr(r2, set_key, "bla test") else: setattr(r2, set_key, 2) diff_r1, diff_r2 = compare_entities(r1, r2) assert key in diff_r1 assert key in diff_r2 if key not in INTS: assert diff_r1[key] == "bla 1" assert diff_r2[key] == "bla test" else: assert diff_r1[key] == 1 assert diff_r2[key] == 2 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 @pytest.mark.xfail def test_compare_properties(): p1 = db.Property() p2 = db.Property() diff_r1, diff_r2 = compare_entities(p1, p2) assert len(diff_r1["parents"]) == 0 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 p1.importance = "SUGGESTED" diff_r1, diff_r2 = compare_entities(p1, p2) assert len(diff_r1["parents"]) == 0 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 assert "importance" in diff_r1 assert diff_r1["importance"] == "SUGGESTED" # TODO: I'm not sure why it is not like this: # assert diff_r2["importance"] is None # ... but: assert "importance" not in diff_r2 p2.importance = "SUGGESTED" p1.value = 42 p2.value = 4 diff_r1, diff_r2 = compare_entities(p1, p2) assert len(diff_r1["parents"]) == 0 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 # Comparing values currently does not seem to be implemented: assert "value" in diff_r1 assert diff_r1["value"] == 42 assert "value" in diff_r2 assert diff_r2["value"] == 4 def test_copy_entities(): r = db.Record(name="A") r.add_parent(name="B") r.add_property(name="C", value=4, importance="OBLIGATORY") r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY") r.description = "A fancy test record" c = r.copy() assert c is not r assert c.name == "A" assert c.role == r.role assert c.parents[0].name == "B" # parent and property objects are not shared among copy and original: assert c.parents[0] is not r.parents[0] for i in [0, 1]: assert c.properties[i] is not r.properties[i] for special in SPECIAL_ATTRIBUTES: assert getattr(c.properties[i], special) == getattr( r.properties[i], special) assert c.get_importance( c.properties[i]) == r.get_importance(r.properties[i]) def test_merge_entities(): r = db.Record(name="A") r.add_parent(name="B") r.add_property(name="C", value=4, importance="OBLIGATORY") r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY") r.description = "A fancy test record" r2 = db.Record() r2.add_property(name="F", value="text") merge_entities(r2, r) assert r2.get_parents()[0].name == "B" assert r2.get_property("C").name == "C" assert r2.get_property("C").value == 4 assert r2.get_property("D").name == "D" assert r2.get_property("D").value == [3, 4, 7] assert r2.get_property("F").name == "F" assert r2.get_property("F").value == "text" def test_merge_bug_conflict(): r = db.Record() r.add_property(name="C", value=4) r2 = db.Record() r2.add_property(name="C", value=4, datatype="TEXT") merge_entities(r, r2) r3 = db.Record() r3.add_property(name="C", value=4, datatype="INTEGER") with pytest.raises(EntityMergeConflictError): merge_entities(r3, r2) def test_merge_bug_109(): rt = db.RecordType(name="TestBug") p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER)) r_b = db.Record(name="TestRecord") r_b.add_parent(rt) r_b.add_property(p, value=[18, 19]) r_a = db.Record(name="TestRecord") r_a.add_parent(rt) merge_entities(r_a, r_b) assert r_b.get_property("test_bug_property").value == [18, 19] assert r_a.get_property("test_bug_property").value == [18, 19] assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b) assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str( r_b) assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a) assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str( r_a) @pytest.mark.xfail def test_bug_109(): rt = db.RecordType(name="TestBug") p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER)) r_b = db.Record(name="TestRecord") r_b.add_parent(rt) r_b.add_property(p, value=[18, 19]) r_a = db.Record(name="TestRecord") r_a.add_parent(rt) r_a.add_property(r_b.get_property("test_bug_property")) assert r_b.get_property("test_bug_property").value == [18, 19] assert r_a.get_property("test_bug_property").value == [18, 19] assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b) assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str( r_b) assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a) assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str( r_a) def test_wrong_merge_conflict_reference(): """Test a wrongly detected merge conflict in case of two records referencing two different, but identical objects. """ # Two identical license records will be referenced from both records to be # merged license_rt = db.RecordType(name="license") license_rec_a = db.Record(name="CC-BY-3.0").add_parent(license_rt) license_rec_b = db.Record(name="CC-BY-3.0").add_parent(license_rt) # two referencing records dataset_rt = db.RecordType(name="Dataset") title_prop = db.Property(name="title", datatype=db.TEXT) doi_prop = db.Property(name="DOI", datatype=db.TEXT) rec_a = db.Record().add_parent(dataset_rt) rec_a.add_property(name=license_rt.name, datatype=license_rt.name, value=license_rec_a) rec_a.add_property(name=title_prop.name, value="Some dataset title") rec_b = db.Record().add_parent(dataset_rt) rec_b.add_property(name=license_rt.name, datatype=license_rt.name, value=license_rec_b) rec_b.add_property(name=doi_prop.name, value="https://doi.org/12345.678") merge_entities(rec_a, rec_b) assert rec_a.get_property(license_rt.name) is not None assert rec_a.get_property(license_rt.name).value is not None assert isinstance(rec_a.get_property(license_rt.name).value, db.Record) assert rec_a.get_property(license_rt.name).value.name == license_rec_a.name assert rec_a.get_property(license_rt.name).value.name == license_rec_b.name assert rec_a.get_property("title").value == "Some dataset title" assert rec_a.get_property("doi").value == "https://doi.org/12345.678" # Reset rec_a rec_a = db.Record().add_parent(dataset_rt) rec_a.add_property(name=license_rt.name, datatype=license_rt.name, value=license_rec_a) rec_a.add_property(name=title_prop.name, value="Some dataset title") # this does not compare referenced records, so it will fail with pytest.raises(EntityMergeConflictError): merge_entities(rec_a, rec_b, merge_references_with_empty_diffs=False) # ... as should this, of course rec_b.get_property(license_rt.name).value.name = "Another license" with pytest.raises(EntityMergeConflictError) as re: merge_entities(rec_a, rec_b) def test_empty_diff(): rec_a = db.Record(name="A") rec_b = db.Record(name="B") assert empty_diff(rec_a, rec_a) assert not empty_diff(rec_a, rec_b) rec_a.add_parent(name="RT") rec_b.add_parent(name="RT") assert empty_diff(rec_a, rec_a) assert not empty_diff(rec_a, rec_b) rec_b.name = "A" assert empty_diff(rec_a, rec_b) rec_a.add_property(name="some_prop", value=1) assert not empty_diff(rec_a, rec_b) rec_b.add_property(name="some_prop", value=1) assert empty_diff(rec_a, rec_b) rec_b.get_property("some_prop").value = 2 assert not empty_diff(rec_a, rec_b) rec_b.get_property("some_prop").value = 1 rec_b.add_property(name="some_other_prop", value="Test") assert not empty_diff(rec_a, rec_b) rec_a.add_property(name="some_other_prop", value="Test") assert empty_diff(rec_a, rec_b) # reference identical records, but different Python Record objects ref_rec_a = db.Record(name="Ref").add_parent(name="RefType") ref_rec_b = db.Record(name="Ref").add_parent(name="RefType") rec_a.add_property(name="RefType", datatype="RefType", value=ref_rec_a) rec_b.add_property(name="RefType", datatype="RefType", value=ref_rec_b) # the default is `compare_referenced_records=False`, so the diff shouldn't # be empty (different Python objects are referenced.) assert not empty_diff(rec_a, rec_b) # when looking into the referenced record, the diffs should be empty again assert empty_diff(rec_a, rec_b, compare_referenced_records=True) # The same for lists of references rec_a.remove_property("RefType") rec_b.remove_property("RefType") assert empty_diff(rec_a, rec_b) rec_a.add_property(name="RefType", datatype=db.LIST( "RefType"), value=[ref_rec_a, ref_rec_a]) rec_b.add_property(name="RefType", datatype=db.LIST( "RefType"), value=[ref_rec_b, ref_rec_b]) assert not empty_diff(rec_a, rec_b) assert empty_diff(rec_a, rec_b, compare_referenced_records=True) # special case of ids rec_a = db.Record(id=12) rec_b = db.Record() assert not empty_diff(rec_a, rec_b) rec_b.id = 13 assert not empty_diff(rec_a, rec_b) rec_b.id = 12 assert empty_diff(rec_a, rec_b) def test_force_merge(): """Test whether a forced merge overwrites existing properties correctly.""" # name overwrite recA = db.Record(name="A") recB = db.Record(name="B") with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB) merge_entities(recA, recB, force=True) assert "B" == recA.name # unchanged assert "B" == recB.name # description overwrite recA = db.Record() recA.description = "something" recB = db.Record() recB.description = "something else" with pytest.raises(EntityMergeConflictError) as emce: merge_entities(recA, recB) assert str(emce.value) == """Conflict in special attribute description: A: something B: something else""" merge_entities(recA, recB, force=True) assert recA.description == "something else" # unchanged assert recB.description == "something else" # property overwrite recA = db.Record() recA.add_property(name="propA", value="something") recB = db.Record() recB.add_property(name="propA", value="something else") with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB) merge_entities(recA, recB, force=True) assert recA.get_property("propA").value == "something else" # unchanged assert recB.get_property("propA").value == "something else" # don't remove a property that's not in recB recA = db.Record() recA.add_property(name="propA", value="something") recA.add_property(name="propB", value=5.0) recB = db.Record() recB.add_property(name="propA", value="something else") merge_entities(recA, recB, force=True) assert recA.get_property("propA").value == "something else" assert recA.get_property("propB").value == 5.0 # also overwrite datatypes ... rtA = db.RecordType() rtA.add_property(name="propA", datatype=db.INTEGER) rtB = db.RecordType() rtB.add_property(name="propA", datatype=db.TEXT) with pytest.raises(EntityMergeConflictError): merge_entities(rtA, rtB) merge_entities(rtA, rtB, force=True) assert rtA.get_property("propA").datatype == db.TEXT # unchanged assert rtB.get_property("propA").datatype == db.TEXT # ... and units recA = db.Record() recA.add_property(name="propA", value=5, unit="m") recB = db.Record() recB.add_property(name="propA", value=5, unit="cm") with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB) merge_entities(recA, recB, force=True) assert recA.get_property("propA").unit == "cm" # unchanged assert recB.get_property("propA").unit == "cm" def test_merge_missing_list_datatype_82(): """Merging two properties, where the list-valued one has no datatype.""" recA = db.Record().add_property("a", 5, datatype="B") recB_with_DT = db.Record().add_property("a", [1, 2], datatype=f"LIST<{db.DOUBLE}>") merge_entities(recA, recB_with_DT, force=True) assert recA.get_property("a").datatype == f"LIST<{db.DOUBLE}>" recA = db.Record().add_property("a", 5, datatype="B") recB_without_DT = db.Record().add_property("a", [1, 2]) with pytest.raises(TypeError) as te: merge_entities(recA, recB_without_DT, force=True) assert "Invalid datatype: List valued properties" in str(te.value)