# # This file is a part of the LinkAhead Project. # # Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com> # Copyright (C) 2022 Florian Spreckelsen <f.spreckelsen@indiscale.com> # Copyright (C) 2022 Daniel Hornung <d.hornung@indiscale.com> # Copyright (C) 2020-2022 IndiScale GmbH <info@indiscale.com> # Copyright (C) 2018 Research Group Biomedical Physics, # Max-Planck-Institute for Dynamics and Self-Organization Göttingen # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # # Test apiutils # A. Schlemmer, 02/2018 from io import StringIO import linkahead as db import linkahead.apiutils import pytest from linkahead.apiutils import (EntityMergeConflictError, apply_to_ids, compare_entities, create_id_query, empty_diff, merge_entities, resolve_reference) from linkahead.common.models import SPECIAL_ATTRIBUTES def test_apply_to_ids(): parent = db.RecordType(id=3456) rec = db.Record(id=23) p = db.Property(id=23345, datatype=db.INTEGER) rec.add_parent(parent) rec.add_property(p) def invert(id_): return id_ * -1 apply_to_ids([rec], invert) assert invert(3456) == -3456 assert rec.parents[0].id == -3456 assert rec.properties[0].id == -23345 assert rec.id == -23 def test_id_query(): ids = [1, 2, 3, 4, 5] assert create_id_query(ids) == 'FIND ENTITY WITH ID=1 OR ID=2 OR ID=3 OR '\ 'ID=4 OR ID=5' def test_resolve_reference(): original_retrieve_entity_with_id = linkahead.apiutils.retrieve_entity_with_id linkahead.apiutils.retrieve_entity_with_id = lambda eid: db.Record(id=eid) prop = db.Property(id=1, datatype=db.REFERENCE, value=100) prop.is_valid = lambda: True items = [200, 300, 400] prop_list = db.Property(datatype=db.LIST(db.REFERENCE), value=items) prop_list2 = db.Property(datatype=db.LIST(db.REFERENCE), value=[db.Record(id=500)]) resolve_reference(prop) resolve_reference(prop_list) resolve_reference(prop_list2) assert prop.value.id == 100 assert isinstance(prop.value, db.Entity) prop_list_ids = [] for i in prop_list.value: prop_list_ids.append(i.id) assert isinstance(i, db.Entity) assert prop_list_ids == items for i in prop_list2.value: assert i.id == 500 assert isinstance(i, db.Entity) no_reference = db.Property(id=5000, datatype=db.INTEGER, value=2) resolve_reference(no_reference) assert no_reference.value == 2 assert no_reference.datatype is db.INTEGER # restore retrive_entity_with_id linkahead.apiutils.retrieve_entity_with_id = original_retrieve_entity_with_id def test_compare_entities(): # test compare of parents, properties r1 = db.Record() r2 = db.Record() r1.add_parent("bla") r2.add_parent("bla") r1.add_parent("lopp") r1.add_property("test", value=2) r2.add_property("test", value=2) r1.add_property("testi", importance=linkahead.SUGGESTED, value=2) r2.add_property("testi", importance=linkahead.RECOMMENDED, value=2) r1.add_property("tests", value=3) r2.add_property("tests", value=45) r1.add_property("tester", value=3) r2.add_property("tester", ) r1.add_property("tests_234234", value=45) r2.add_property("tests_TT", value=45) diff_r1, diff_r2 = compare_entities(r1, r2) assert len(diff_r1["parents"]) == 1 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 4 assert len(diff_r2["properties"]) == 4 assert "test" not in diff_r1["properties"] assert "test" not in diff_r2["properties"] assert "tests" in diff_r1["properties"] assert "tests" in diff_r2["properties"] assert "testi" in diff_r1["properties"] assert "testi" in diff_r2["properties"] assert "tester" in diff_r1["properties"] assert "tester" in diff_r2["properties"] assert "tests_234234" in diff_r1["properties"] assert "tests_TT" in diff_r2["properties"] # test compare units of properties r1 = db.Record() r2 = db.Record() r1.add_property("test", value=2, unit="cm") r2.add_property("test", value=2, unit="m") r1.add_property("tests", value=3, unit="cm") r2.add_property("tests", value=45, unit="cm") r1.add_property("tester", value=3) r2.add_property("tester", ) r1.add_property("tests_234234", value=45, unit="cm") r2.add_property("tests_TT", value=45, unit="cm") diff_r1, diff_r2 = compare_entities(r1, r2) assert len(diff_r1["properties"]) == 4 assert len(diff_r2["properties"]) == 4 assert "tests" in diff_r1["properties"] assert "tests" in diff_r2["properties"] assert "tester" in diff_r1["properties"] assert "tester" in diff_r2["properties"] assert "tests_234234" in diff_r1["properties"] assert "tests_TT" in diff_r2["properties"] assert diff_r1["properties"]["test"]["unit"] == "cm" assert diff_r2["properties"]["test"]["unit"] == "m" def test_compare_entities_battery(): par1, par2, par3 = db.Record(), db.Record(), db.RecordType() r1, r2, r3 = db.Record(), db.Record(), db.Record() prop1 = db.Property() prop2 = db.Property(name="Property 2") prop3 = db.Property() # Basic tests for Properties prop_settings = {"datatype": db.REFERENCE, "description": "desc of prop", "value": db.Record().add_parent(par3), "unit": '°'} t1 = db.Record().add_parent(db.RecordType()) t2 = db.Record().add_parent(db.RecordType()) # Change datatype t1.add_property(db.Property(name="datatype", **prop_settings)) prop_settings["datatype"] = par3 t2.add_property(db.Property(name="datatype", **prop_settings)) # Change description t1.add_property(db.Property(name="description", **prop_settings)) prop_settings["description"] = "diff desc" t2.add_property(db.Property(name="description", **prop_settings)) # Change value to copy t1.add_property(db.Property(name="value copy", **prop_settings)) prop_settings["value"] = db.Record().add_parent(par3) t2.add_property(db.Property(name="value copy", **prop_settings)) # Change value to something different t1.add_property(db.Property(name="value", **prop_settings)) prop_settings["value"] = db.Record(name="n").add_parent(par3) t2.add_property(db.Property(name="value", **prop_settings)) # Change unit t1.add_property(db.Property(name="unit", **prop_settings)) prop_settings["unit"] = db.Property(unit='°') t2.add_property(db.Property(name="unit", **prop_settings)) # Change unit again t1.add_property(db.Property(name="unit 2", **prop_settings)) prop_settings["unit"] = db.Property() t2.add_property(db.Property(name="unit 2", **prop_settings)) # Compare diff_0 = compare_entities(t1, t2) diff_1 = compare_entities(t1, t2, compare_referenced_records=True) # Check correct detection of changes assert diff_0[0]["properties"]["datatype"] == {"datatype": db.REFERENCE} assert diff_0[1]["properties"]["datatype"] == {"datatype": par3} assert diff_0[0]["properties"]["description"] == {"description": "desc of prop"} assert diff_0[1]["properties"]["description"] == {"description": "diff desc"} assert "value" in diff_0[0]["properties"]["value copy"] assert "value" in diff_0[1]["properties"]["value copy"] assert "value" in diff_0[0]["properties"]["value"] assert "value" in diff_0[1]["properties"]["value"] assert "unit" in diff_0[0]["properties"]["unit"] assert "unit" in diff_0[1]["properties"]["unit"] assert "unit" in diff_0[0]["properties"]["unit 2"] assert "unit" in diff_0[1]["properties"]["unit 2"] # Check correct result for compare_referenced_records=True assert "value copy" not in diff_1[0]["properties"] assert "value copy" not in diff_1[1]["properties"] diff_0[0]["properties"].pop("value copy") diff_0[1]["properties"].pop("value copy") assert diff_0 == diff_1 # Basic tests for Parents t3 = db.Record().add_parent(db.RecordType("A")).add_parent(db.Record("B")) t4 = db.Record().add_parent(db.RecordType("A")) assert compare_entities(t3, t4)[0]['parents'] == ['B'] assert len(compare_entities(t3, t4)[1]['parents']) == 0 t4.add_parent(db.Record("B")) assert empty_diff(t3, t4) # The two following assertions document current behaviour but do not make a # lot of sense t4.add_parent(db.Record("B")) assert empty_diff(t3, t4) t3.add_parent(db.RecordType("A")).add_parent(db.Record("B")) t4.add_parent(db.RecordType("B")).add_parent(db.Record("A")) assert empty_diff(t3, t4) # Basic tests for special attributes prop_settings = {"id": 42, "name": "Property", "datatype": db.LIST(db.REFERENCE), "value": [db.Record()], "unit": '€', "description": "desc of prop"} alt_settings = {"id": 64, "name": "Property 2", "datatype": db.LIST(db.TEXT), "value": [db.RecordType()], "unit": '€€', "description": " ę Ě ப ཾ ཿ ∛ ∜ ㅿ ㆀ 값 "} t5 = db.Property(**prop_settings) t6 = db.Property(**prop_settings) assert empty_diff(t5, t6) # ID t5.id = alt_settings['id'] diff = compare_entities(t5, t6) assert diff[0] == {'properties': {}, 'parents': [], 'id': alt_settings['id']} assert diff[1] == {'properties': {}, 'parents': [], 'id': prop_settings['id']} t6.id = alt_settings['id'] assert empty_diff(t5, t6) # Name t5.name = alt_settings['name'] diff = compare_entities(t5, t6) assert diff[0] == {'properties': {}, 'parents': [], 'name': alt_settings['name']} assert diff[1] == {'properties': {}, 'parents': [], 'name': prop_settings['name']} t6.name = alt_settings['name'] assert empty_diff(t5, t6) # Description t6.description = alt_settings['description'] diff = compare_entities(t5, t6) assert diff[0] == {'properties': {}, 'parents': [], 'description': prop_settings['description']} assert diff[1] == {'properties': {}, 'parents': [], 'description': alt_settings['description']} t5.description = alt_settings['description'] assert empty_diff(t5, t6) # Unit t5.unit = alt_settings['unit'] diff = compare_entities(t5, t6) assert diff[0] == {'properties': {}, 'parents': [], 'unit': alt_settings['unit']} assert diff[1] == {'properties': {}, 'parents': [], 'unit': prop_settings['unit']} t6.unit = alt_settings['unit'] assert empty_diff(t5, t6) # Value t6.value = alt_settings['value'] diff = compare_entities(t5, t6) assert diff[0] == {'properties': {}, 'parents': [], 'value': prop_settings['value']} assert diff[1] == {'properties': {}, 'parents': [], 'value': alt_settings['value']} t5.value = alt_settings['value'] assert empty_diff(t5, t6) # Datatype t6.datatype = alt_settings['datatype'] diff = compare_entities(t5, t6) assert diff[0] == {'properties': {}, 'parents': [], 'datatype': prop_settings['datatype']} assert diff[1] == {'properties': {}, 'parents': [], 'datatype': alt_settings['datatype']} t5.datatype = alt_settings['datatype'] assert empty_diff(t5, t6) # All at once diff = compare_entities(db.Property(**prop_settings), db.Property(**alt_settings)) assert diff[0] == {'properties': {}, 'parents': [], **prop_settings} assert diff[1] == {'properties': {}, 'parents': [], **alt_settings} # Entity Type diff = compare_entities(db.Property(value=db.Property(id=101)), db.Property(value=db.Record(id=101))) assert "value" in diff[0] assert "value" in diff[1] diff = compare_entities(db.Property(value=db.Record(id=101)), db.Property(value=db.Record(id=101))) assert "value" in diff[0] assert "value" in diff[1] assert empty_diff(db.Property(value=db.Record(id=101)), db.Property(value=db.Record(id=101)), compare_referenced_records=True) # Special cases # Files assert not empty_diff(db.File(path='ABC', file=StringIO("ABC")), db.File(path='ABC', file=StringIO("Other"))) # Importance assert empty_diff(db.Property().add_property(prop1), db.Property().add_property(prop1)) assert not empty_diff(db.Property().add_property(prop1, importance=db.SUGGESTED), db.Property().add_property(prop1, importance=db.OBLIGATORY)) # Mixed Lists assert empty_diff(db.Property(value=[1, 2, 'a', r1]), db.Property(value=[1, 2, 'a', r1])) # entity_name_id_equivalency assert not empty_diff(db.Property(value=[1, db.Record(id=2), 3, db.Record(id=4)]), db.Property(value=[db.Record(id=1), 2, db.Record(id=3), 4])) assert empty_diff(db.Property(value=[1, db.Record(id=2), 3, db.Record(id=4)]), db.Property(value=[db.Record(id=1), 2, db.Record(id=3), 4]), entity_name_id_equivalency=True) assert empty_diff(db.Property(value=1), db.Property(value=db.Record(id=1)), entity_name_id_equivalency=True) # entity_name_id_equivalency prop4 = db.Property(**prop_settings).add_parent(par1).add_property(prop2) prop4_c = db.Property(**prop_settings).add_parent(par1).add_property(prop2) prop4.value = db.Record(id=12) prop4_c.value = '12' prop4.add_property(db.Property(name="diff", datatype=db.LIST(db.REFERENCE), value=[12, db.Record(id=13), par1, "abc%"])) prop4_c.add_property(db.Property(name="diff", datatype=db.LIST(db.REFERENCE), value=[db.Record(id=12), "13", par1, "abc%"])) assert not empty_diff(prop4, prop4_c, entity_name_id_equivalency=False) assert empty_diff(prop4, prop4_c, entity_name_id_equivalency=True) # Order invariance t7 = db.Property(**prop_settings).add_parent(par1).add_property(prop1) t8 = db.Property(**alt_settings).add_parent(par3).add_property(prop3) diffs_0 = compare_entities(t7, t8), compare_entities(t7, t8, True) diffs_1 = compare_entities(t8, t7)[::-1], compare_entities(t8, t7, True)[::-1] assert diffs_0 == diffs_1 prop_settings = {"datatype": db.REFERENCE, "description": "desc of prop", "value": db.Record().add_parent(par3), "unit": '°'} t1.add_property(db.Property(name="description", **prop_settings)) t2.add_property(db.Property(name="description", **prop_settings)) try: diffs_0 = compare_entities(t1, t2), compare_entities(t1, t2, True) except Exception as e: diffs_0 = type(e) try: diffs_1 = compare_entities(t2, t1)[::-1], compare_entities(t2, t1, True)[::-1] except Exception as e: diffs_1 = type(e) assert diffs_0 == diffs_1 def test_compare_special_properties(): # Test for all known special properties: INTS = ("size", "id") HIDDEN = ("checksum", "size") for key in SPECIAL_ATTRIBUTES: set_key = key if key in HIDDEN: set_key = "_" + key r1 = db.Record() r2 = db.Record() if key not in INTS: setattr(r1, set_key, "bla 1") setattr(r2, set_key, "bla 1") else: setattr(r1, set_key, 1) setattr(r2, set_key, 1) diff_r1, diff_r2 = compare_entities(r1, r2) assert key not in diff_r1 assert key not in diff_r2 assert len(diff_r1["parents"]) == 0 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 if key not in INTS: setattr(r2, set_key, "bla test") else: setattr(r2, set_key, 2) diff_r1, diff_r2 = compare_entities(r1, r2) assert key in diff_r1 assert key in diff_r2 if key not in INTS: assert diff_r1[key] == "bla 1" assert diff_r2[key] == "bla test" else: assert diff_r1[key] == 1 assert diff_r2[key] == 2 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 # compare Property objects p1 = db.Property() p2 = db.Property() diff_r1, diff_r2 = compare_entities(p1, p2) assert len(diff_r1["parents"]) == 0 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 diff_r1, diff_r2 = compare_entities(p1, p2) assert len(diff_r1["parents"]) == 0 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 p1.value = 42 p2.value = 4 diff_r1, diff_r2 = compare_entities(p1, p2) assert len(diff_r1["parents"]) == 0 assert len(diff_r2["parents"]) == 0 assert len(diff_r1["properties"]) == 0 assert len(diff_r2["properties"]) == 0 # Comparing values currently does not seem to be implemented: assert "value" in diff_r1 assert diff_r1["value"] == 42 assert "value" in diff_r2 assert diff_r2["value"] == 4 def test_copy_entities(): r = db.Record(name="A") r.add_parent(name="B") r.add_property(name="C", value=4, importance="OBLIGATORY") r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY") r.description = "A fancy test record" c = r.copy() assert c is not r assert c.name == "A" assert c.role == r.role assert c.parents[0].name == "B" # parent and property objects are not shared among copy and original: assert c.parents[0] is not r.parents[0] for i in [0, 1]: assert c.properties[i] is not r.properties[i] for special in SPECIAL_ATTRIBUTES: assert getattr(c.properties[i], special) == getattr( r.properties[i], special) assert c.get_importance( c.properties[i]) == r.get_importance(r.properties[i]) def test_merge_entities(): r = db.Record(name="A") r.add_parent(name="B") r.add_property(name="C", value=4, importance="OBLIGATORY") r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY") r.description = "A fancy test record" r2 = db.Record() r2.add_property(name="F", value="text") merge_entities(r2, r) assert r2.get_parents()[0].name == "B" assert r2.get_property("C").name == "C" assert r2.get_property("C").value == 4 assert r2.get_property("D").name == "D" assert r2.get_property("D").value == [3, 4, 7] assert r2.get_property("F").name == "F" assert r2.get_property("F").value == "text" def test_merge_bug_conflict(): r = db.Record() r.add_property(name="C", value=4) r2 = db.Record() r2.add_property(name="C", value=4, datatype="TEXT") merge_entities(r, r2) r3 = db.Record() r3.add_property(name="C", value=4, datatype="INTEGER") with pytest.raises(EntityMergeConflictError): merge_entities(r3, r2) def test_merge_bug_109(): rt = db.RecordType(name="TestBug") p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER)) r_b = db.Record(name="TestRecord") r_b.add_parent(rt) r_b.add_property(p, value=[18, 19]) r_a = db.Record(name="TestRecord") r_a.add_parent(rt) merge_entities(r_a, r_b) assert r_b.get_property("test_bug_property").value == [18, 19] assert r_a.get_property("test_bug_property").value == [18, 19] assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b) assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str( r_b) assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a) assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str( r_a) @pytest.mark.xfail def test_bug_109(): rt = db.RecordType(name="TestBug") p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER)) r_b = db.Record(name="TestRecord") r_b.add_parent(rt) r_b.add_property(p, value=[18, 19]) r_a = db.Record(name="TestRecord") r_a.add_parent(rt) r_a.add_property(r_b.get_property("test_bug_property")) assert r_b.get_property("test_bug_property").value == [18, 19] assert r_a.get_property("test_bug_property").value == [18, 19] assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b) assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str( r_b) assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a) assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str( r_a) @pytest.mark.xfail(reason="Issue https://gitlab.com/linkahead/linkahead-pylib/-/issues/111") def test_failing_merge_entities_111(): prop_a = db.Property() prop_parent = db.Property(name="prop_parent") prop_b = db.Property(name="b", datatype=db.DOUBLE, unit="µs", value=1.1).add_parent(prop_parent) print(prop_b) db.apiutils.merge_entities(prop_a, prop_b) assert prop_a.name == prop_b.name # OK assert prop_parent.name in [par.name for par in prop_a.get_parents()] # OK assert prop_a.value == prop_b.value # fails assert prop_a.datatype == db.DOUBLE # fails assert prop_a.unit == prop_b.unit # fails def test_wrong_merge_conflict_reference(): """Test a wrongly detected merge conflict in case of two records referencing two different, but identical objects. """ # Two identical license records will be referenced from both records to be # merged license_rt = db.RecordType(name="license") license_rec_a = db.Record(name="CC-BY-3.0").add_parent(license_rt) license_rec_b = db.Record(name="CC-BY-3.0").add_parent(license_rt) # two referencing records dataset_rt = db.RecordType(name="Dataset") title_prop = db.Property(name="title", datatype=db.TEXT) doi_prop = db.Property(name="DOI", datatype=db.TEXT) rec_a = db.Record().add_parent(dataset_rt) rec_a.add_property(name=license_rt.name, datatype=license_rt.name, value=license_rec_a) rec_a.add_property(name=title_prop.name, value="Some dataset title") rec_b = db.Record().add_parent(dataset_rt) rec_b.add_property(name=license_rt.name, datatype=license_rt.name, value=license_rec_b) rec_b.add_property(name=doi_prop.name, value="https://doi.org/12345.678") merge_entities(rec_a, rec_b) assert rec_a.get_property(license_rt.name) is not None assert rec_a.get_property(license_rt.name).value is not None assert isinstance(rec_a.get_property(license_rt.name).value, db.Record) assert rec_a.get_property(license_rt.name).value.name == license_rec_a.name assert rec_a.get_property(license_rt.name).value.name == license_rec_b.name assert rec_a.get_property("title").value == "Some dataset title" assert rec_a.get_property("doi").value == "https://doi.org/12345.678" # Reset rec_a rec_a = db.Record().add_parent(dataset_rt) rec_a.add_property(name=license_rt.name, datatype=license_rt.name, value=license_rec_a) rec_a.add_property(name=title_prop.name, value="Some dataset title") # this does not compare referenced records, so it will fail with pytest.raises(EntityMergeConflictError): merge_entities(rec_a, rec_b, merge_references_with_empty_diffs=False) # ... as should this, of course rec_b.get_property(license_rt.name).value.name = "Another license" with pytest.raises(EntityMergeConflictError) as re: merge_entities(rec_a, rec_b) def test_empty_diff(): rec_a = db.Record(name="A") rec_b = db.Record(name="B") assert empty_diff(rec_a, rec_a) assert not empty_diff(rec_a, rec_b) rec_a.add_parent(name="RT") rec_b.add_parent(name="RT") assert empty_diff(rec_a, rec_a) assert not empty_diff(rec_a, rec_b) rec_b.name = "A" assert empty_diff(rec_a, rec_b) rec_a.add_property(name="some_prop", value=1) assert not empty_diff(rec_a, rec_b) rec_b.add_property(name="some_prop", value=1) assert empty_diff(rec_a, rec_b) rec_b.get_property("some_prop").value = 2 assert not empty_diff(rec_a, rec_b) rec_b.get_property("some_prop").value = 1 rec_b.add_property(name="some_other_prop", value="Test") assert not empty_diff(rec_a, rec_b) rec_a.add_property(name="some_other_prop", value="Test") assert empty_diff(rec_a, rec_b) # reference identical records, but different Python Record objects ref_rec_a = db.Record(name="Ref").add_parent(name="RefType") ref_rec_b = db.Record(name="Ref").add_parent(name="RefType") rec_a.add_property(name="RefType", datatype="RefType", value=ref_rec_a) rec_b.add_property(name="RefType", datatype="RefType", value=ref_rec_b) # the default is `compare_referenced_records=False`, so the diff shouldn't # be empty (different Python objects are referenced.) assert not empty_diff(rec_a, rec_b) # when looking into the referenced record, the diffs should be empty again assert empty_diff(rec_a, rec_b, compare_referenced_records=True) # The same for lists of references rec_a.remove_property("RefType") rec_b.remove_property("RefType") assert empty_diff(rec_a, rec_b) rec_a.add_property(name="RefType", datatype=db.LIST("RefType"), value=[ref_rec_a, ref_rec_a]) rec_b.add_property(name="RefType", datatype=db.LIST("RefType"), value=[ref_rec_b, ref_rec_b]) assert not empty_diff(rec_a, rec_b) assert empty_diff(rec_a, rec_b, compare_referenced_records=True) # special case of ids rec_a = db.Record(id=12) rec_b = db.Record() assert not empty_diff(rec_a, rec_b) rec_b.id = 13 assert not empty_diff(rec_a, rec_b) rec_b.id = 12 assert empty_diff(rec_a, rec_b) def test_force_merge(): """Test whether a forced merge overwrites existing properties correctly.""" # name overwrite recA = db.Record(name="A") recB = db.Record(name="B") with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB) merge_entities(recA, recB, force=True) assert "B" == recA.name # unchanged assert "B" == recB.name # description overwrite recA = db.Record() recA.description = "something" recB = db.Record() recB.description = "something else" with pytest.raises(EntityMergeConflictError) as emce: merge_entities(recA, recB) assert str(emce.value) == """Conflict in special attribute description: A: something B: something else""" merge_entities(recA, recB, force=True) assert recA.description == "something else" # unchanged assert recB.description == "something else" # property overwrite recA = db.Record() recA.add_property(name="propA", value="something") recB = db.Record() recB.add_property(name="propA", value="something else") with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB) merge_entities(recA, recB, force=True) assert recA.get_property("propA").value == "something else" # unchanged assert recB.get_property("propA").value == "something else" # don't remove a property that's not in recB recA = db.Record() recA.add_property(name="propA", value="something") recA.add_property(name="propB", value=5.0) recB = db.Record() recB.add_property(name="propA", value="something else") merge_entities(recA, recB, force=True) assert recA.get_property("propA").value == "something else" assert recA.get_property("propB").value == 5.0 # also overwrite datatypes ... rtA = db.RecordType() rtA.add_property(name="propA", datatype=db.INTEGER) rtB = db.RecordType() rtB.add_property(name="propA", datatype=db.TEXT) with pytest.raises(EntityMergeConflictError): merge_entities(rtA, rtB) merge_entities(rtA, rtB, force=True) assert rtA.get_property("propA").datatype == db.TEXT # unchanged assert rtB.get_property("propA").datatype == db.TEXT # ... and units recA = db.Record() recA.add_property(name="propA", value=5, unit="m") recB = db.Record() recB.add_property(name="propA", value=5, unit="cm") with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB) merge_entities(recA, recB, force=True) assert recA.get_property("propA").unit == "cm" # unchanged assert recB.get_property("propA").unit == "cm" def test_merge_missing_list_datatype_82(): """Merging two properties, where the list-valued one has no datatype.""" recA = db.Record().add_property("a", 5, datatype="B") recB_with_DT = db.Record().add_property("a", [1, 2], datatype=f"LIST<{db.DOUBLE}>") merge_entities(recA, recB_with_DT, force=True) assert recA.get_property("a").datatype == f"LIST<{db.DOUBLE}>" recA = db.Record().add_property("a", 5, datatype="B") recB_without_DT = db.Record().add_property("a", [1, 2]) with pytest.raises(TypeError) as te: merge_entities(recA, recB_without_DT, force=True) assert "Invalid datatype: List valued properties" in str(te.value) def test_merge_id_with_resolved_entity(): rtname = "TestRT" ref_id = 123 ref_rec = db.Record(id=ref_id).add_parent(name=rtname) # recA has the resolved referenced record as value, recB its id. Otherwise, # they are identical. recA = db.Record().add_property(name=rtname, value=ref_rec) recB = db.Record().add_property(name=rtname, value=ref_id) # default is strict: raise error since values are different with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB) # Overwrite from right to left in both cases merge_entities(recA, recB, merge_id_with_resolved_entity=True) assert recA.get_property(rtname).value == ref_rec recA = db.Record().add_property(name=rtname, value=ref_rec) merge_entities(recB, recA, merge_id_with_resolved_entity=True) assert recB.get_property(rtname).value == ref_id assert recA.get_property(rtname).value == ref_rec # id mismatches recB = db.Record().add_property(name=rtname, value=ref_id*2) with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB, merge_id_with_resolved_entity=True) other_rec = db.Record(id=None).add_parent(name=rtname) recA = db.Record().add_property(name=rtname, value=other_rec) recB = db.Record().add_property(name=rtname, value=ref_id) with pytest.raises(EntityMergeConflictError): merge_entities(recA, recB, merge_id_with_resolved_entity=True) # also works in lists: recA = db.Record().add_property( name=rtname, datatype=db.LIST(rtname), value=[ref_rec, ref_id*2]) recB = db.Record().add_property( name=rtname, datatype=db.LIST(rtname), value=[ref_id, ref_id*2]) merge_entities(recA, recB, merge_id_with_resolved_entity=True) assert recA.get_property(rtname).value == [ref_rec, ref_id*2] assert recB.get_property(rtname).value == [ref_id, ref_id*2]