Select Git revision
CHANGELOG.md
-
Henrik tom Wörden authoredHenrik tom Wörden authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
To find the state of this project's repository at the time of any of these versions, check out the tags.
test_apiutils.py 31.56 KiB
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2022 Florian Spreckelsen <f.spreckelsen@indiscale.com>
# Copyright (C) 2022 Daniel Hornung <d.hornung@indiscale.com>
# Copyright (C) 2020-2022 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# Test apiutils
# A. Schlemmer, 02/2018
from io import StringIO
import linkahead as db
import linkahead.apiutils
import pytest
from linkahead.apiutils import (EntityMergeConflictError, apply_to_ids,
compare_entities, create_id_query, empty_diff,
merge_entities, resolve_reference)
from linkahead.common.models import SPECIAL_ATTRIBUTES
def test_apply_to_ids():
parent = db.RecordType(id=3456)
rec = db.Record(id=23)
p = db.Property(id=23345, datatype=db.INTEGER)
rec.add_parent(parent)
rec.add_property(p)
def invert(id_):
return id_ * -1
apply_to_ids([rec], invert)
assert invert(3456) == -3456
assert rec.parents[0].id == -3456
assert rec.properties[0].id == -23345
assert rec.id == -23
def test_id_query():
ids = [1, 2, 3, 4, 5]
assert create_id_query(ids) == 'FIND ENTITY WITH ID=1 OR ID=2 OR ID=3 OR '\
'ID=4 OR ID=5'
def test_resolve_reference():
original_retrieve_entity_with_id = linkahead.apiutils.retrieve_entity_with_id
linkahead.apiutils.retrieve_entity_with_id = lambda eid: db.Record(id=eid)
prop = db.Property(id=1, datatype=db.REFERENCE, value=100)
prop.is_valid = lambda: True
items = [200, 300, 400]
prop_list = db.Property(datatype=db.LIST(db.REFERENCE),
value=items)
prop_list2 = db.Property(datatype=db.LIST(db.REFERENCE),
value=[db.Record(id=500)])
resolve_reference(prop)
resolve_reference(prop_list)
resolve_reference(prop_list2)
assert prop.value.id == 100
assert isinstance(prop.value, db.Entity)
prop_list_ids = []
for i in prop_list.value:
prop_list_ids.append(i.id)
assert isinstance(i, db.Entity)
assert prop_list_ids == items
for i in prop_list2.value:
assert i.id == 500
assert isinstance(i, db.Entity)
no_reference = db.Property(id=5000, datatype=db.INTEGER, value=2)
resolve_reference(no_reference)
assert no_reference.value == 2
assert no_reference.datatype is db.INTEGER
# restore retrive_entity_with_id
linkahead.apiutils.retrieve_entity_with_id = original_retrieve_entity_with_id
def test_compare_entities():
# test compare of parents, properties
r1 = db.Record()
r2 = db.Record()
r1.add_parent("bla")
r2.add_parent("bla")
r1.add_parent("lopp")
r1.add_property("test", value=2)
r2.add_property("test", value=2)
r1.add_property("testi", importance=linkahead.SUGGESTED, value=2)
r2.add_property("testi", importance=linkahead.RECOMMENDED, value=2)
r1.add_property("tests", value=3)
r2.add_property("tests", value=45)
r1.add_property("tester", value=3)
r2.add_property("tester", )
r1.add_property("tests_234234", value=45)
r2.add_property("tests_TT", value=45)
r1.add_property("datatype", value=45, datatype=db.INTEGER)
r2.add_property("datatype", value=45)
diff_r1, diff_r2 = compare_entities(r1, r2)
assert len(diff_r1["parents"]) == 1
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 5
assert len(diff_r2["properties"]) == 5
assert "test" not in diff_r1["properties"]
assert "test" not in diff_r2["properties"]
assert "tests" in diff_r1["properties"]
assert "tests" in diff_r2["properties"]
assert "testi" in diff_r1["properties"]
assert "testi" in diff_r2["properties"]
assert "tester" in diff_r1["properties"]
assert "tester" in diff_r2["properties"]
assert "tests_234234" in diff_r1["properties"]
assert "tests_TT" in diff_r2["properties"]
assert "datatype" in diff_r1["properties"]
assert "datatype" in diff_r1["properties"]["datatype"]
assert "datatype" in diff_r2["properties"]
assert "datatype" in diff_r2["properties"]["datatype"]
# test compare units of properties
r1 = db.Record()
r2 = db.Record()
r1.add_property("test", value=2, unit="cm")
r2.add_property("test", value=2, unit="m")
r1.add_property("tests", value=3, unit="cm")
r2.add_property("tests", value=45, unit="cm")
r1.add_property("tester", value=3)
r2.add_property("tester", )
r1.add_property("tests_234234", value=45, unit="cm")
r2.add_property("tests_TT", value=45, unit="cm")
diff_r1, diff_r2 = compare_entities(r1, r2)
assert len(diff_r1["properties"]) == 4
assert len(diff_r2["properties"]) == 4
assert "tests" in diff_r1["properties"]
assert "tests" in diff_r2["properties"]
assert "tester" in diff_r1["properties"]
assert "tester" in diff_r2["properties"]
assert "tests_234234" in diff_r1["properties"]
assert "tests_TT" in diff_r2["properties"]
assert diff_r1["properties"]["test"]["unit"] == "cm"
assert diff_r2["properties"]["test"]["unit"] == "m"
def test_compare_entities_battery():
par1, par2, par3 = db.Record(), db.Record(), db.RecordType()
r1, r2, r3 = db.Record(), db.Record(), db.Record()
prop1 = db.Property()
prop2 = db.Property(name="Property 2")
prop3 = db.Property()
# Basic tests for Properties
prop_settings = {"datatype": db.REFERENCE, "description": "desc of prop",
"value": db.Record().add_parent(par3), "unit": '°'}
t1 = db.Record().add_parent(db.RecordType())
t2 = db.Record().add_parent(db.RecordType())
# Change datatype
t1.add_property(db.Property(name="datatype", **prop_settings))
prop_settings["datatype"] = par3
t2.add_property(db.Property(name="datatype", **prop_settings))
# Change description
t1.add_property(db.Property(name="description", **prop_settings))
prop_settings["description"] = "diff desc"
t2.add_property(db.Property(name="description", **prop_settings))
# Change value to copy
t1.add_property(db.Property(name="value copy", **prop_settings))
prop_settings["value"] = db.Record().add_parent(par3)
t2.add_property(db.Property(name="value copy", **prop_settings))
# Change value to something different
t1.add_property(db.Property(name="value", **prop_settings))
prop_settings["value"] = db.Record(name="n").add_parent(par3)
t2.add_property(db.Property(name="value", **prop_settings))
# Change unit
t1.add_property(db.Property(name="unit", **prop_settings))
prop_settings["unit"] = db.Property(unit='°')
t2.add_property(db.Property(name="unit", **prop_settings))
# Change unit again
t1.add_property(db.Property(name="unit 2", **prop_settings))
prop_settings["unit"] = db.Property()
t2.add_property(db.Property(name="unit 2", **prop_settings))
# Compare
diff_0 = compare_entities(t1, t2)
diff_1 = compare_entities(t1, t2, compare_referenced_records=True)
# Check correct detection of changes
assert diff_0[0]["properties"]["datatype"] == {"datatype": db.REFERENCE}
assert diff_0[1]["properties"]["datatype"] == {"datatype": par3}
assert diff_0[0]["properties"]["description"] == {"description": "desc of prop"}
assert diff_0[1]["properties"]["description"] == {"description": "diff desc"}
assert "value" in diff_0[0]["properties"]["value copy"]
assert "value" in diff_0[1]["properties"]["value copy"]
assert "value" in diff_0[0]["properties"]["value"]
assert "value" in diff_0[1]["properties"]["value"]
assert "unit" in diff_0[0]["properties"]["unit"]
assert "unit" in diff_0[1]["properties"]["unit"]
assert "unit" in diff_0[0]["properties"]["unit 2"]
assert "unit" in diff_0[1]["properties"]["unit 2"]
# Check correct result for compare_referenced_records=True
assert "value copy" not in diff_1[0]["properties"]
assert "value copy" not in diff_1[1]["properties"]
diff_0[0]["properties"].pop("value copy")
diff_0[1]["properties"].pop("value copy")
assert diff_0 == diff_1
# Basic tests for Parents
t3 = db.Record().add_parent(db.RecordType("A")).add_parent(db.Record("B"))
t4 = db.Record().add_parent(db.RecordType("A"))
assert compare_entities(t3, t4)[0]['parents'] == ['B']
assert len(compare_entities(t3, t4)[1]['parents']) == 0
t4.add_parent(db.Record("B"))
assert empty_diff(t3, t4)
# The two following assertions document current behaviour but do not make a
# lot of sense
t4.add_parent(db.Record("B"))
assert empty_diff(t3, t4)
t3.add_parent(db.RecordType("A")).add_parent(db.Record("B"))
t4.add_parent(db.RecordType("B")).add_parent(db.Record("A"))
assert empty_diff(t3, t4)
# Basic tests for special attributes
prop_settings = {"id": 42, "name": "Property",
"datatype": db.LIST(db.REFERENCE), "value": [db.Record()],
"unit": '€', "description": "desc of prop"}
alt_settings = {"id": 64, "name": "Property 2",
"datatype": db.LIST(db.TEXT), "value": [db.RecordType()],
"unit": '€€', "description": " ę Ě ப ཾ ཿ ∛ ∜ ㅿ ㆀ 값 "}
t5 = db.Property(**prop_settings)
t6 = db.Property(**prop_settings)
assert empty_diff(t5, t6)
# ID
t5.id = alt_settings['id']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'id': alt_settings['id']}
assert diff[1] == {'properties': {}, 'parents': [], 'id': prop_settings['id']}
t6.id = alt_settings['id']
assert empty_diff(t5, t6)
# Name
t5.name = alt_settings['name']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'name': alt_settings['name']}
assert diff[1] == {'properties': {}, 'parents': [], 'name': prop_settings['name']}
t6.name = alt_settings['name']
assert empty_diff(t5, t6)
# Description
t6.description = alt_settings['description']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'description': prop_settings['description']}
assert diff[1] == {'properties': {}, 'parents': [], 'description': alt_settings['description']}
t5.description = alt_settings['description']
assert empty_diff(t5, t6)
# Unit
t5.unit = alt_settings['unit']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'unit': alt_settings['unit']}
assert diff[1] == {'properties': {}, 'parents': [], 'unit': prop_settings['unit']}
t6.unit = alt_settings['unit']
assert empty_diff(t5, t6)
# Value
t6.value = alt_settings['value']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'value': prop_settings['value']}
assert diff[1] == {'properties': {}, 'parents': [], 'value': alt_settings['value']}
t5.value = alt_settings['value']
assert empty_diff(t5, t6)
# Datatype
t6.datatype = alt_settings['datatype']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'datatype': prop_settings['datatype']}
assert diff[1] == {'properties': {}, 'parents': [], 'datatype': alt_settings['datatype']}
t5.datatype = alt_settings['datatype']
assert empty_diff(t5, t6)
# All at once
diff = compare_entities(db.Property(**prop_settings), db.Property(**alt_settings))
assert diff[0] == {'properties': {}, 'parents': [], **prop_settings}
assert diff[1] == {'properties': {}, 'parents': [], **alt_settings}
# Entity Type
diff = compare_entities(db.Property(value=db.Property(id=101)),
db.Property(value=db.Record(id=101)))
assert "value" in diff[0]
assert "value" in diff[1]
diff = compare_entities(db.Property(value=db.Record(id=101)),
db.Property(value=db.Record(id=101)))
assert "value" in diff[0]
assert "value" in diff[1]
assert empty_diff(db.Property(value=db.Record(id=101)),
db.Property(value=db.Record(id=101)),
compare_referenced_records=True)
# Special cases
# Files
assert not empty_diff(db.File(path='ABC', file=StringIO("ABC")),
db.File(path='ABC', file=StringIO("Other")))
# Importance
assert empty_diff(db.Property().add_property(prop1),
db.Property().add_property(prop1))
assert not empty_diff(db.Property().add_property(prop1, importance=db.SUGGESTED),
db.Property().add_property(prop1, importance=db.OBLIGATORY))
# Mixed Lists
assert empty_diff(db.Property(value=[1, 2, 'a', r1]),
db.Property(value=[1, 2, 'a', r1]))
# entity_name_id_equivalency
assert not empty_diff(db.Property(value=[1, db.Record(id=2), 3, db.Record(id=4)]),
db.Property(value=[db.Record(id=1), 2, db.Record(id=3), 4]))
assert empty_diff(db.Property(value=[1, db.Record(id=2), 3, db.Record(id=4)]),
db.Property(value=[db.Record(id=1), 2, db.Record(id=3), 4]),
entity_name_id_equivalency=True)
assert empty_diff(db.Property(value=1), db.Property(value=db.Record(id=1)),
entity_name_id_equivalency=True)
# entity_name_id_equivalency
prop4 = db.Property(**prop_settings).add_parent(par1).add_property(prop2)
prop4_c = db.Property(**prop_settings).add_parent(par1).add_property(prop2)
prop4.value = db.Record(id=12)
prop4_c.value = '12'
prop4.add_property(db.Property(name="diff", datatype=db.LIST(db.REFERENCE),
value=[12, db.Record(id=13), par1, "abc%"]))
prop4_c.add_property(db.Property(name="diff", datatype=db.LIST(db.REFERENCE),
value=[db.Record(id=12), "13", par1, "abc%"]))
assert not empty_diff(prop4, prop4_c, entity_name_id_equivalency=False)
assert empty_diff(prop4, prop4_c, entity_name_id_equivalency=True)
# Order invariance
t7 = db.Property(**prop_settings).add_parent(par1).add_property(prop1)
t8 = db.Property(**alt_settings).add_parent(par3).add_property(prop3)
diffs_0 = compare_entities(t7, t8), compare_entities(t7, t8, True)
diffs_1 = compare_entities(t8, t7)[::-1], compare_entities(t8, t7, True)[::-1]
assert diffs_0 == diffs_1
prop_settings = {"datatype": db.REFERENCE, "description": "desc of prop",
"value": db.Record().add_parent(par3), "unit": '°'}
t1.add_property(db.Property(name="description", **prop_settings))
t2.add_property(db.Property(name="description", **prop_settings))
try:
diffs_0 = compare_entities(t1, t2), compare_entities(t1, t2, True)
except Exception as e:
diffs_0 = type(e)
try:
diffs_1 = compare_entities(t2, t1)[::-1], compare_entities(t2, t1, True)[::-1]
except Exception as e:
diffs_1 = type(e)
assert diffs_0 == diffs_1
def test_compare_special_properties():
# Test for all known special properties:
INTS = ("size", "id")
HIDDEN = ("checksum", "size")
for key in SPECIAL_ATTRIBUTES:
set_key = key
if key in HIDDEN:
set_key = "_" + key
r1 = db.Record()
r2 = db.Record()
if key not in INTS:
setattr(r1, set_key, "bla 1")
setattr(r2, set_key, "bla 1")
else:
setattr(r1, set_key, 1)
setattr(r2, set_key, 1)
diff_r1, diff_r2 = compare_entities(r1, r2)
assert key not in diff_r1
assert key not in diff_r2
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
if key not in INTS:
setattr(r2, set_key, "bla test")
else:
setattr(r2, set_key, 2)
diff_r1, diff_r2 = compare_entities(r1, r2)
assert key in diff_r1
assert key in diff_r2
if key not in INTS:
assert diff_r1[key] == "bla 1"
assert diff_r2[key] == "bla test"
else:
assert diff_r1[key] == 1
assert diff_r2[key] == 2
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
# compare Property objects
p1 = db.Property()
p2 = db.Property()
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
p1.value = 42
p2.value = 4
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
# Comparing values currently does not seem to be implemented:
assert "value" in diff_r1
assert diff_r1["value"] == 42
assert "value" in diff_r2
assert diff_r2["value"] == 4
def test_copy_entities():
r = db.Record(name="A")
r.add_parent(name="B")
r.add_property(name="C", value=4, importance="OBLIGATORY")
r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY")
r.description = "A fancy test record"
c = r.copy()
assert c is not r
assert c.name == "A"
assert c.role == r.role
assert c.parents[0].name == "B"
# parent and property objects are not shared among copy and original:
assert c.parents[0] is not r.parents[0]
for i in [0, 1]:
assert c.properties[i] is not r.properties[i]
for special in SPECIAL_ATTRIBUTES:
assert getattr(c.properties[i], special) == getattr(
r.properties[i], special)
assert c.get_importance(
c.properties[i]) == r.get_importance(r.properties[i])
def test_merge_entities():
r = db.Record(name="A")
r.add_parent(name="B")
r.add_property(name="C", value=4, importance="OBLIGATORY")
r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY")
r.description = "A fancy test record"
r2 = db.Record()
r2.add_property(name="F", value="text")
merge_entities(r2, r)
assert r2.get_parents()[0].name == "B"
assert r2.get_property("C").name == "C"
assert r2.get_property("C").value == 4
assert r2.get_property("D").name == "D"
assert r2.get_property("D").value == [3, 4, 7]
assert r2.get_property("F").name == "F"
assert r2.get_property("F").value == "text"
def test_merge_bug_conflict():
r = db.Record()
r.add_property(name="C", value=4)
r2 = db.Record()
r2.add_property(name="C", value=4, datatype="TEXT")
merge_entities(r, r2)
r3 = db.Record()
r3.add_property(name="C", value=4, datatype="INTEGER")
with pytest.raises(EntityMergeConflictError):
merge_entities(r3, r2)
def test_merge_bug_109():
rt = db.RecordType(name="TestBug")
p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER))
r_b = db.Record(name="TestRecord")
r_b.add_parent(rt)
r_b.add_property(p, value=[18, 19])
r_a = db.Record(name="TestRecord")
r_a.add_parent(rt)
merge_entities(r_a, r_b)
assert r_b.get_property("test_bug_property").value == [18, 19]
assert r_a.get_property("test_bug_property").value == [18, 19]
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_b)
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_a)
@pytest.mark.xfail
def test_bug_109():
rt = db.RecordType(name="TestBug")
p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER))
r_b = db.Record(name="TestRecord")
r_b.add_parent(rt)
r_b.add_property(p, value=[18, 19])
r_a = db.Record(name="TestRecord")
r_a.add_parent(rt)
r_a.add_property(r_b.get_property("test_bug_property"))
assert r_b.get_property("test_bug_property").value == [18, 19]
assert r_a.get_property("test_bug_property").value == [18, 19]
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_b)
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_a)
@pytest.mark.xfail(reason="Issue https://gitlab.com/linkahead/linkahead-pylib/-/issues/111")
def test_failing_merge_entities_111():
prop_a = db.Property()
prop_parent = db.Property(name="prop_parent")
prop_b = db.Property(name="b", datatype=db.DOUBLE, unit="µs", value=1.1).add_parent(prop_parent)
print(prop_b)
db.apiutils.merge_entities(prop_a, prop_b)
assert prop_a.name == prop_b.name # OK
assert prop_parent.name in [par.name for par in prop_a.get_parents()] # OK
assert prop_a.value == prop_b.value # fails
assert prop_a.datatype == db.DOUBLE # fails
assert prop_a.unit == prop_b.unit # fails
def test_wrong_merge_conflict_reference():
"""Test a wrongly detected merge conflict in case of two records referencing
two different, but identical objects.
"""
# Two identical license records will be referenced from both records to be
# merged
license_rt = db.RecordType(name="license")
license_rec_a = db.Record(name="CC-BY-3.0").add_parent(license_rt)
license_rec_b = db.Record(name="CC-BY-3.0").add_parent(license_rt)
# two referencing records
dataset_rt = db.RecordType(name="Dataset")
title_prop = db.Property(name="title", datatype=db.TEXT)
doi_prop = db.Property(name="DOI", datatype=db.TEXT)
rec_a = db.Record().add_parent(dataset_rt)
rec_a.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_a)
rec_a.add_property(name=title_prop.name, value="Some dataset title")
rec_b = db.Record().add_parent(dataset_rt)
rec_b.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_b)
rec_b.add_property(name=doi_prop.name, value="https://doi.org/12345.678")
merge_entities(rec_a, rec_b)
assert rec_a.get_property(license_rt.name) is not None
assert rec_a.get_property(license_rt.name).value is not None
assert isinstance(rec_a.get_property(license_rt.name).value, db.Record)
assert rec_a.get_property(license_rt.name).value.name == license_rec_a.name
assert rec_a.get_property(license_rt.name).value.name == license_rec_b.name
assert rec_a.get_property("title").value == "Some dataset title"
assert rec_a.get_property("doi").value == "https://doi.org/12345.678"
# Reset rec_a
rec_a = db.Record().add_parent(dataset_rt)
rec_a.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_a)
rec_a.add_property(name=title_prop.name, value="Some dataset title")
# this does not compare referenced records, so it will fail
with pytest.raises(EntityMergeConflictError):
merge_entities(rec_a, rec_b, merge_references_with_empty_diffs=False)
# ... as should this, of course
rec_b.get_property(license_rt.name).value.name = "Another license"
with pytest.raises(EntityMergeConflictError) as re:
merge_entities(rec_a, rec_b)
def test_empty_diff():
rec_a = db.Record(name="A")
rec_b = db.Record(name="B")
assert empty_diff(rec_a, rec_a)
assert not empty_diff(rec_a, rec_b)
rec_a.add_parent(name="RT")
rec_b.add_parent(name="RT")
assert empty_diff(rec_a, rec_a)
assert not empty_diff(rec_a, rec_b)
rec_b.name = "A"
assert empty_diff(rec_a, rec_b)
rec_a.add_property(name="some_prop", value=1)
assert not empty_diff(rec_a, rec_b)
rec_b.add_property(name="some_prop", value=1)
assert empty_diff(rec_a, rec_b)
rec_b.get_property("some_prop").value = 2
assert not empty_diff(rec_a, rec_b)
rec_b.get_property("some_prop").value = 1
rec_b.add_property(name="some_other_prop", value="Test")
assert not empty_diff(rec_a, rec_b)
rec_a.add_property(name="some_other_prop", value="Test")
assert empty_diff(rec_a, rec_b)
# reference identical records, but different Python Record objects
ref_rec_a = db.Record(name="Ref").add_parent(name="RefType")
ref_rec_b = db.Record(name="Ref").add_parent(name="RefType")
rec_a.add_property(name="RefType", datatype="RefType", value=ref_rec_a)
rec_b.add_property(name="RefType", datatype="RefType", value=ref_rec_b)
# the default is `compare_referenced_records=False`, so the diff shouldn't
# be empty (different Python objects are referenced.)
assert not empty_diff(rec_a, rec_b)
# when looking into the referenced record, the diffs should be empty again
assert empty_diff(rec_a, rec_b, compare_referenced_records=True)
# The same for lists of references
rec_a.remove_property("RefType")
rec_b.remove_property("RefType")
assert empty_diff(rec_a, rec_b)
rec_a.add_property(name="RefType", datatype=db.LIST("RefType"),
value=[ref_rec_a, ref_rec_a])
rec_b.add_property(name="RefType", datatype=db.LIST("RefType"),
value=[ref_rec_b, ref_rec_b])
assert not empty_diff(rec_a, rec_b)
assert empty_diff(rec_a, rec_b, compare_referenced_records=True)
# special case of ids
rec_a = db.Record(id=12)
rec_b = db.Record()
assert not empty_diff(rec_a, rec_b)
rec_b.id = 13
assert not empty_diff(rec_a, rec_b)
rec_b.id = 12
assert empty_diff(rec_a, rec_b)
def test_force_merge():
"""Test whether a forced merge overwrites existing properties correctly."""
# name overwrite
recA = db.Record(name="A")
recB = db.Record(name="B")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert "B" == recA.name
# unchanged
assert "B" == recB.name
# description overwrite
recA = db.Record()
recA.description = "something"
recB = db.Record()
recB.description = "something else"
with pytest.raises(EntityMergeConflictError) as emce:
merge_entities(recA, recB)
assert str(emce.value) == """Conflict in special attribute description:
A: something
B: something else"""
merge_entities(recA, recB, force=True)
assert recA.description == "something else"
# unchanged
assert recB.description == "something else"
# property overwrite
recA = db.Record()
recA.add_property(name="propA", value="something")
recB = db.Record()
recB.add_property(name="propA", value="something else")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").value == "something else"
# unchanged
assert recB.get_property("propA").value == "something else"
# don't remove a property that's not in recB
recA = db.Record()
recA.add_property(name="propA", value="something")
recA.add_property(name="propB", value=5.0)
recB = db.Record()
recB.add_property(name="propA", value="something else")
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").value == "something else"
assert recA.get_property("propB").value == 5.0
# also overwrite datatypes ...
rtA = db.RecordType()
rtA.add_property(name="propA", datatype=db.INTEGER)
rtB = db.RecordType()
rtB.add_property(name="propA", datatype=db.TEXT)
with pytest.raises(EntityMergeConflictError):
merge_entities(rtA, rtB)
merge_entities(rtA, rtB, force=True)
assert rtA.get_property("propA").datatype == db.TEXT
# unchanged
assert rtB.get_property("propA").datatype == db.TEXT
# ... and units
recA = db.Record()
recA.add_property(name="propA", value=5, unit="m")
recB = db.Record()
recB.add_property(name="propA", value=5, unit="cm")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").unit == "cm"
# unchanged
assert recB.get_property("propA").unit == "cm"
def test_merge_missing_list_datatype_82():
"""Merging two properties, where the list-valued one has no datatype."""
recA = db.Record().add_property("a", 5, datatype="B")
recB_with_DT = db.Record().add_property("a", [1, 2], datatype=f"LIST<{db.DOUBLE}>")
merge_entities(recA, recB_with_DT, force=True)
assert recA.get_property("a").datatype == f"LIST<{db.DOUBLE}>"
recA = db.Record().add_property("a", 5, datatype="B")
recB_without_DT = db.Record().add_property("a", [1, 2])
with pytest.raises(TypeError) as te:
merge_entities(recA, recB_without_DT, force=True)
assert "Invalid datatype: List valued properties" in str(te.value)
def test_merge_id_with_resolved_entity():
rtname = "TestRT"
ref_id = 123
ref_rec = db.Record(id=ref_id).add_parent(name=rtname)
# recA has the resolved referenced record as value, recB its id. Otherwise,
# they are identical.
recA = db.Record().add_property(name=rtname, value=ref_rec)
recB = db.Record().add_property(name=rtname, value=ref_id)
# default is strict: raise error since values are different
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
# Overwrite from right to left in both cases
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
assert recA.get_property(rtname).value == ref_rec
recA = db.Record().add_property(name=rtname, value=ref_rec)
merge_entities(recB, recA, merge_id_with_resolved_entity=True)
assert recB.get_property(rtname).value == ref_id
assert recA.get_property(rtname).value == ref_rec
# id mismatches
recB = db.Record().add_property(name=rtname, value=ref_id*2)
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
other_rec = db.Record(id=None).add_parent(name=rtname)
recA = db.Record().add_property(name=rtname, value=other_rec)
recB = db.Record().add_property(name=rtname, value=ref_id)
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
# also works in lists:
recA = db.Record().add_property(
name=rtname, datatype=db.LIST(rtname), value=[ref_rec, ref_id*2])
recB = db.Record().add_property(
name=rtname, datatype=db.LIST(rtname), value=[ref_id, ref_id*2])
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
assert recA.get_property(rtname).value == [ref_rec, ref_id*2]
assert recB.get_property(rtname).value == [ref_id, ref_id*2]