Select Git revision
test_apiutils.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_apiutils.py 39.49 KiB
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 Alexander Schlemmer <a.schlemmer@indiscale.com>
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2022 Florian Spreckelsen <f.spreckelsen@indiscale.com>
# Copyright (C) 2022 Daniel Hornung <d.hornung@indiscale.com>
# Copyright (C) 2020-2022 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# Test apiutils
# A. Schlemmer, 02/2018
from io import StringIO
import linkahead as db
import linkahead.apiutils
import pytest
from linkahead.apiutils import (EntityMergeConflictError, apply_to_ids,
compare_entities, create_id_query, empty_diff,
merge_entities, resolve_reference,
describe_diff)
from linkahead.common.models import SPECIAL_ATTRIBUTES
def test_apply_to_ids():
parent = db.RecordType(id=3456)
rec = db.Record(id=23)
p = db.Property(id=23345, datatype=db.INTEGER)
rec.add_parent(parent)
rec.add_property(p)
def invert(id_):
return id_ * -1
apply_to_ids([rec], invert)
assert invert(3456) == -3456
assert rec.parents[0].id == -3456
assert rec.properties[0].id == -23345
assert rec.id == -23
def test_id_query():
ids = [1, 2, 3, 4, 5]
assert create_id_query(ids) == 'FIND ENTITY WITH ID=1 OR ID=2 OR ID=3 OR '\
'ID=4 OR ID=5'
def test_resolve_reference():
original_retrieve_entity_with_id = linkahead.apiutils.retrieve_entity_with_id
linkahead.apiutils.retrieve_entity_with_id = lambda eid: db.Record(id=eid)
prop = db.Property(id=1, datatype=db.REFERENCE, value=100)
prop.is_valid = lambda: True
items = [200, 300, 400]
prop_list = db.Property(datatype=db.LIST(db.REFERENCE),
value=items)
prop_list2 = db.Property(datatype=db.LIST(db.REFERENCE),
value=[db.Record(id=500)])
resolve_reference(prop)
resolve_reference(prop_list)
resolve_reference(prop_list2)
assert prop.value.id == 100
assert isinstance(prop.value, db.Entity)
prop_list_ids = []
for i in prop_list.value:
prop_list_ids.append(i.id)
assert isinstance(i, db.Entity)
assert prop_list_ids == items
for i in prop_list2.value:
assert i.id == 500
assert isinstance(i, db.Entity)
no_reference = db.Property(id=5000, datatype=db.INTEGER, value=2)
resolve_reference(no_reference)
assert no_reference.value == 2
assert no_reference.datatype is db.INTEGER
# restore retrive_entity_with_id
linkahead.apiutils.retrieve_entity_with_id = original_retrieve_entity_with_id
def test_compare_entities():
# test compare of parents, properties
r1 = db.Record()
r2 = db.Record()
r1.add_parent("bla")
r2.add_parent("bla")
r1.add_parent("lopp")
r1.add_property("test", value=2)
r2.add_property("test", value=2)
r1.add_property("testi", importance=linkahead.SUGGESTED, value=2)
r2.add_property("testi", importance=linkahead.RECOMMENDED, value=2)
r1.add_property("tests", value=3)
r2.add_property("tests", value=45)
r1.add_property("tester", value=3)
r2.add_property("tester", )
r1.add_property("tests_234234", value=45)
r2.add_property("tests_TT", value=45)
r1.add_property("datatype", value=45, datatype=db.INTEGER)
r2.add_property("datatype", value=45)
r1.add_property("entity_id", value=2)
r2.add_property("entity_id", value=24)
r1.add_property("entity_mix_e", value=2)
r2.add_property("entity_mix_e", value=db.Entity(id=2))
r1.add_property("entity_mix_d", value=22)
r2.add_property("entity_mix_d", value=db.Entity(id=2))
r1.add_property("entity_mix_w", value=22)
r2.add_property("entity_mix_w", value=db.Entity())
r1.add_property("entity_Ent_e", value=db.Entity(id=2))
r2.add_property("entity_Ent_e", value=db.Entity(id=2))
r1.add_property("entity_Ent_d", value=db.Entity(id=2))
r2.add_property("entity_Ent_d", value=db.Entity(id=22))
diff_r1, diff_r2 = compare_entities(r1, r2)
assert len(diff_r1["parents"]) == 1
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 11
assert len(diff_r2["properties"]) == 11
assert "test" not in diff_r1["properties"]
assert "test" not in diff_r2["properties"]
assert "tests" in diff_r1["properties"]
assert "tests" in diff_r2["properties"]
assert "testi" in diff_r1["properties"]
assert "testi" in diff_r2["properties"]
assert "tester" in diff_r1["properties"]
assert "tester" in diff_r2["properties"]
assert "tests_234234" in diff_r1["properties"]
assert "tests_TT" in diff_r2["properties"]
assert "datatype" in diff_r1["properties"]
assert "datatype" in diff_r1["properties"]["datatype"]
assert "datatype" in diff_r2["properties"]
assert "datatype" in diff_r2["properties"]["datatype"]
assert "entity_id" in diff_r1["properties"]
assert "entity_id" in diff_r2["properties"]
assert "entity_mix_e" in diff_r1["properties"]
assert "entity_mix_e" in diff_r2["properties"]
assert "entity_Ent_e" in diff_r1["properties"]
assert "entity_Ent_e" in diff_r2["properties"]
assert "entity_mix_d" in diff_r1["properties"]
assert "entity_mix_d" in diff_r2["properties"]
assert "entity_mix_w" in diff_r1["properties"]
assert "entity_mix_w" in diff_r2["properties"]
assert "entity_Ent_d" in diff_r1["properties"]
assert "entity_Ent_d" in diff_r2["properties"]
diff_r1, diff_r2 = compare_entities(r1, r2, compare_referenced_records=True)
assert len(diff_r1["parents"]) == 1
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 10
assert len(diff_r2["properties"]) == 10
assert "entity_id" in diff_r1["properties"]
assert "entity_id" in diff_r2["properties"]
assert "entity_mix_e" in diff_r1["properties"]
assert "entity_mix_e" in diff_r2["properties"]
assert "entity_mix_w" in diff_r1["properties"]
assert "entity_mix_w" in diff_r2["properties"]
assert "entity_Ent_e" not in diff_r1["properties"]
assert "entity_Ent_e" not in diff_r2["properties"]
assert "entity_mix_d" in diff_r1["properties"]
assert "entity_mix_d" in diff_r2["properties"]
assert "entity_Ent_d" in diff_r1["properties"]
assert "entity_Ent_d" in diff_r2["properties"]
diff_r1, diff_r2 = compare_entities(r1, r2,
entity_name_id_equivalency=True,
compare_referenced_records=True)
assert len(diff_r1["parents"]) == 1
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 9
assert len(diff_r2["properties"]) == 9
assert "entity_id" in diff_r1["properties"]
assert "entity_id" in diff_r2["properties"]
assert "entity_mix_e" not in diff_r1["properties"]
assert "entity_mix_e" not in diff_r2["properties"]
assert "entity_mix_w" in diff_r1["properties"]
assert "entity_mix_w" in diff_r2["properties"]
assert "entity_Ent_e" not in diff_r1["properties"]
assert "entity_Ent_e" not in diff_r2["properties"]
assert "entity_mix_d" in diff_r1["properties"]
assert "entity_mix_d" in diff_r2["properties"]
assert "entity_Ent_d" in diff_r1["properties"]
assert "entity_Ent_d" in diff_r2["properties"]
r1 = db.Record()
r2 = db.Record()
r1.add_property(id=20, name="entity_mix_d", value=2, datatype=db.LIST("B"))
r2.add_property("entity_mix_d", value=db.Entity())
diff_r1, diff_r2 = compare_entities(r1, r2, compare_referenced_records=True)
assert len(diff_r1["properties"]) == 1
assert len(diff_r2["properties"]) == 1
assert "entity_mix_d" in diff_r1["properties"]
assert "entity_mix_d" in diff_r2["properties"]
def test_compare_entities_units():
r1 = db.Record()
r2 = db.Record()
r1.add_property("test", value=2, unit="cm")
r2.add_property("test", value=2, unit="m")
r1.add_property("tests", value=3, unit="cm")
r2.add_property("tests", value=45, unit="cm")
r1.add_property("tester", value=3)
r2.add_property("tester", )
r1.add_property("tests_234234", value=45, unit="cm")
r2.add_property("tests_TT", value=45, unit="cm")
diff_r1, diff_r2 = compare_entities(r1, r2)
assert len(diff_r1["properties"]) == 4
assert len(diff_r2["properties"]) == 4
assert "tests" in diff_r1["properties"]
assert "tests" in diff_r2["properties"]
assert "tester" in diff_r1["properties"]
assert "tester" in diff_r2["properties"]
assert "tests_234234" in diff_r1["properties"]
assert "tests_TT" in diff_r2["properties"]
assert diff_r1["properties"]["test"]["unit"] == "cm"
assert diff_r2["properties"]["test"]["unit"] == "m"
def test_compare_entities_battery():
par1, par3 = db.Record(name=""), db.RecordType(name="")
r1, r2, r3 = db.Record(), db.Record(), db.Record()
prop2 = db.Property(name="Property 2")
prop3 = db.Property(name="")
# Basic tests for Properties
prop_settings = {"datatype": db.REFERENCE, "description": "desc of prop",
"value": db.Record().add_parent(par3), "unit": '°'}
t1 = db.Record().add_parent(db.RecordType(id=1))
t2 = db.Record().add_parent(db.RecordType(id=1))
# Change datatype
t1.add_property(db.Property(name="datatype", **prop_settings))
prop_settings["datatype"] = par3
t2.add_property(db.Property(name="datatype", **prop_settings))
# Change description
t1.add_property(db.Property(name="description", **prop_settings))
prop_settings["description"] = "diff desc"
t2.add_property(db.Property(name="description", **prop_settings))
# Change value to copy
t1.add_property(db.Property(name="value copy", **prop_settings))
prop_settings["value"] = db.Record().add_parent(par3)
t2.add_property(db.Property(name="value copy", **prop_settings))
# Change value to something different
t1.add_property(db.Property(name="value", **prop_settings))
prop_settings["value"] = db.Record(name="n").add_parent(par3)
t2.add_property(db.Property(name="value", **prop_settings))
# Change unit
t1.add_property(db.Property(name="unit", **prop_settings))
prop_settings["unit"] = db.Property(unit='°')
t2.add_property(db.Property(name="unit", **prop_settings))
# Change unit again
t1.add_property(db.Property(name="unit 2", **prop_settings))
prop_settings["unit"] = db.Property()
t2.add_property(db.Property(name="unit 2", **prop_settings))
# Compare
diff_0 = compare_entities(t1, t2)
diff_1 = compare_entities(t1, t2, compare_referenced_records=True)
# Check correct detection of changes
assert diff_0[0]["properties"]["datatype"] == {"datatype": db.REFERENCE}
assert diff_0[1]["properties"]["datatype"] == {"datatype": par3}
assert diff_0[0]["properties"]["description"] == {"description": "desc of prop"}
assert diff_0[1]["properties"]["description"] == {"description": "diff desc"}
assert "value" in diff_0[0]["properties"]["value copy"]
assert "value" in diff_0[1]["properties"]["value copy"]
assert "value" in diff_0[0]["properties"]["value"]
assert "value" in diff_0[1]["properties"]["value"]
assert "unit" in diff_0[0]["properties"]["unit"]
assert "unit" in diff_0[1]["properties"]["unit"]
assert "unit" in diff_0[0]["properties"]["unit 2"]
assert "unit" in diff_0[1]["properties"]["unit 2"]
# Check correct result for compare_referenced_records=True
assert "value copy" not in diff_1[0]["properties"]
assert "value copy" not in diff_1[1]["properties"]
diff_0[0]["properties"].pop("value copy")
diff_0[1]["properties"].pop("value copy")
assert diff_0 == diff_1
# Basic tests for Parents
t3 = db.Record().add_parent(db.RecordType("A")).add_parent(db.Record("B"))
t4 = db.Record().add_parent(db.RecordType("A"))
assert compare_entities(t3, t4)[0]['parents'] == ['B']
assert len(compare_entities(t3, t4)[1]['parents']) == 0
t4.add_parent(db.Record("B"))
assert empty_diff(t3, t4)
# The two following assertions document current behaviour but do not make a
# lot of sense
t4.add_parent(db.Record("B"))
assert empty_diff(t3, t4)
t3.add_parent(db.RecordType("A")).add_parent(db.Record("B"))
t4.add_parent(db.RecordType("B")).add_parent(db.Record("A"))
assert empty_diff(t3, t4)
# Basic tests for special attributes
prop_settings = {"id": 42, "name": "Property",
"datatype": db.LIST(db.REFERENCE), "value": [db.Record(name="")],
"unit": '€', "description": "desc of prop"}
alt_settings = {"id": 64, "name": "Property 2",
"datatype": db.LIST(db.TEXT), "value": [db.RecordType(name="")],
"unit": '€€', "description": " ę Ě ப ཾ ཿ ∛ ∜ ㅿ ㆀ 값 "}
t5 = db.Property(**prop_settings)
t6 = db.Property(**prop_settings)
assert empty_diff(t5, t6)
# ID
t5.id = alt_settings['id']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'id': alt_settings['id']}
assert diff[1] == {'properties': {}, 'parents': [], 'id': prop_settings['id']}
t6.id = alt_settings['id']
assert empty_diff(t5, t6)
# Name
t5.name = alt_settings['name']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'name': alt_settings['name']}
assert diff[1] == {'properties': {}, 'parents': [], 'name': prop_settings['name']}
t6.name = alt_settings['name']
assert empty_diff(t5, t6)
# Description
t6.description = alt_settings['description']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'description': prop_settings['description']}
assert diff[1] == {'properties': {}, 'parents': [], 'description': alt_settings['description']}
t5.description = alt_settings['description']
assert empty_diff(t5, t6)
# Unit
t5.unit = alt_settings['unit']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'unit': alt_settings['unit']}
assert diff[1] == {'properties': {}, 'parents': [], 'unit': prop_settings['unit']}
t6.unit = alt_settings['unit']
assert empty_diff(t5, t6)
# Value
t6.value = alt_settings['value']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'value': prop_settings['value']}
assert diff[1] == {'properties': {}, 'parents': [], 'value': alt_settings['value']}
t5.value = alt_settings['value']
assert empty_diff(t5, t6)
# Datatype
t6.datatype = alt_settings['datatype']
diff = compare_entities(t5, t6)
assert diff[0] == {'properties': {}, 'parents': [], 'datatype': prop_settings['datatype']}
assert diff[1] == {'properties': {}, 'parents': [], 'datatype': alt_settings['datatype']}
t5.datatype = alt_settings['datatype']
assert empty_diff(t5, t6)
# All at once
diff = compare_entities(db.Property(**prop_settings), db.Property(**alt_settings))
assert diff[0] == {'properties': {}, 'parents': [], **prop_settings}
assert diff[1] == {'properties': {}, 'parents': [], **alt_settings}
# Entity Type
diff = compare_entities(db.Property(value=db.Property(id=101)),
db.Property(value=db.Record(id=101)))
assert "value" in diff[0]
assert "value" in diff[1]
diff = compare_entities(db.Property(value=db.Record(id=101)),
db.Property(value=db.Record(id=101)))
assert "value" in diff[0]
assert "value" in diff[1]
assert empty_diff(db.Property(value=db.Record(id=101)),
db.Property(value=db.Record(id=101)),
compare_referenced_records=True)
# Special cases
# Files
assert not empty_diff(db.File(path='ABC', file=StringIO("ABC")),
db.File(path='ABC', file=StringIO("Other")))
# Importance
assert empty_diff(db.Property().add_property(prop2),
db.Property().add_property(prop2))
assert not empty_diff(db.Property().add_property(prop2, importance=db.SUGGESTED),
db.Property().add_property(prop2, importance=db.OBLIGATORY))
# Mixed Lists
assert empty_diff(db.Property(value=[1, 2, 'a', r1]),
db.Property(value=[1, 2, 'a', r1]))
# entity_name_id_equivalency
assert not empty_diff(db.Property(value=[1, db.Record(id=2), 3, db.Record(id=4)]),
db.Property(value=[db.Record(id=1), 2, db.Record(id=3), 4]))
assert empty_diff(db.Property(value=[1, db.Record(id=2), 3, db.Record(id=4)]),
db.Property(value=[db.Record(id=1), 2, db.Record(id=3), 4]),
entity_name_id_equivalency=True)
assert empty_diff(db.Property(value=1), db.Property(value=db.Record(id=1)),
entity_name_id_equivalency=True)
# entity_name_id_equivalency
prop4 = db.Property(**prop_settings).add_parent(par1).add_property(prop2)
prop4_c = db.Property(**prop_settings).add_parent(par1).add_property(prop2)
prop4.value = db.Record(id=12)
prop4_c.value = '12'
prop4.add_property(db.Property(name="diff", datatype=db.LIST(db.REFERENCE),
value=[12, db.Record(id=13), par1, "abc%"]))
prop4_c.add_property(db.Property(name="diff", datatype=db.LIST(db.REFERENCE),
value=[db.Record(id=12), "13", par1, "abc%"]))
assert not empty_diff(prop4, prop4_c, entity_name_id_equivalency=False)
assert empty_diff(prop4, prop4_c, entity_name_id_equivalency=True)
# Order invariance
t7 = db.Property(**prop_settings).add_parent(par1).add_property(prop2)
t8 = db.Property(**alt_settings).add_parent(par3).add_property(prop3)
diffs_0 = compare_entities(t7, t8), compare_entities(t7, t8, True)
diffs_1 = compare_entities(t8, t7)[::-1], compare_entities(t8, t7, True)[::-1]
assert diffs_0 == diffs_1
prop_settings = {"datatype": db.REFERENCE, "description": "desc of prop",
"value": db.Record().add_parent(par3), "unit": '°'}
t1.add_property(db.Property(name="description", **prop_settings))
t2.add_property(db.Property(name="description", **prop_settings))
# Order invariance for multi-property - either both fail or same result
try:
diffs_0 = compare_entities(t1, t2), compare_entities(t1, t2, True)
except Exception as e:
diffs_0 = type(e)
try:
diffs_1 = compare_entities(t2, t1)[::-1], compare_entities(t2, t1, True)[::-1]
except Exception as e:
diffs_1 = type(e)
assert diffs_0 == diffs_1
# Property types
t09, t10 = db.RecordType(), db.RecordType()
for t, ex in [(db.INTEGER, [-12, 0]), (db.DATETIME, ["2030-01-01", "1012-02-29"]),
(db.DOUBLE, [13.23, 7.1]), (db.BOOLEAN, [True, False])]:
t09.add_property(db.Property(name=f"{t}:{ex[0]}", datatype=t, value=ex[0]))
t10.add_property(db.Property(name=f"{t}:{ex[0]}", datatype=t, value=ex[0]))
t09.add_property(name=f"{t}:{ex[1]}", datatype=t, value=ex[1])
t10.add_property(name=f"{t}:{ex[1]}", datatype=t, value=ex[1])
assert empty_diff(t09, t10)
t09.add_property(name=f"diff", value=1)
t10.add_property(name=f"diff", value=2)
assert not empty_diff(t09, t10)
# Default values
t09, t10 = db.Record(), db.Record()
t09.add_property(db.Property(name=f"A1"), value="A")
t10.add_property(name=f"A1", value="A")
t09.add_property(db.Property(id=12, name=f"A2"), value="A")
t10.add_property(id=12, name=f"A2", value="A")
t09.add_property(db.Property(id=15), value="A")
t10.add_property(id=15, value="A")
assert empty_diff(t09, t10)
# ToDo: extended tests for references
def test_compare_special_properties():
# Test for all known special properties:
INTS = ("size", "id")
HIDDEN = ("checksum", "size")
for key in SPECIAL_ATTRIBUTES:
set_key = key
if key in HIDDEN:
set_key = "_" + key
r1 = db.Record()
r2 = db.Record()
if key not in INTS:
setattr(r1, set_key, "bla 1")
setattr(r2, set_key, "bla 1")
else:
setattr(r1, set_key, 1)
setattr(r2, set_key, 1)
diff_r1, diff_r2 = compare_entities(r1, r2)
assert key not in diff_r1
assert key not in diff_r2
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
if key not in INTS:
setattr(r2, set_key, "bla test")
else:
setattr(r2, set_key, 2)
diff_r1, diff_r2 = compare_entities(r1, r2)
assert key in diff_r1
assert key in diff_r2
if key not in INTS:
assert diff_r1[key] == "bla 1"
assert diff_r2[key] == "bla test"
else:
assert diff_r1[key] == 1
assert diff_r2[key] == 2
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
# compare Property objects
p1 = db.Property()
p2 = db.Property()
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
p1.value = 42
p2.value = 4
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
# Comparing values currently does not seem to be implemented:
assert "value" in diff_r1
assert diff_r1["value"] == 42
assert "value" in diff_r2
assert diff_r2["value"] == 4
def test_copy_entities():
r = db.Record(name="A")
r.add_parent(name="B")
r.add_property(name="C", value=4, importance="OBLIGATORY")
r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY")
r.description = "A fancy test record"
c = r.copy()
assert c is not r
assert c.name == "A"
assert c.role == r.role
assert c.parents[0].name == "B"
# parent and property objects are not shared among copy and original:
assert c.parents[0] is not r.parents[0]
for i in [0, 1]:
assert c.properties[i] is not r.properties[i]
for special in SPECIAL_ATTRIBUTES:
assert getattr(c.properties[i], special) == getattr(
r.properties[i], special)
assert c.get_importance(
c.properties[i]) == r.get_importance(r.properties[i])
def test_merge_entities():
r = db.Record(name="A")
r.add_parent(name="B")
r.add_property(name="C", value=4, importance="OBLIGATORY")
r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY")
r.description = "A fancy test record"
r2 = db.Record()
r2.add_property(name="F", value="text")
merge_entities(r2, r)
assert r2.get_parents()[0].name == "B"
assert r2.get_property("C").name == "C"
assert r2.get_property("C").value == 4
assert r2.get_property("D").name == "D"
assert r2.get_property("D").value == [3, 4, 7]
assert r2.get_property("F").name == "F"
assert r2.get_property("F").value == "text"
def test_merge_bug_conflict():
r = db.Record()
r.add_property(name="C", value=4)
r2 = db.Record()
r2.add_property(name="C", value=4, datatype="TEXT")
merge_entities(r, r2)
r3 = db.Record()
r3.add_property(name="C", value=4, datatype="INTEGER")
with pytest.raises(EntityMergeConflictError):
merge_entities(r3, r2)
def test_merge_bug_109():
rt = db.RecordType(name="TestBug")
p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER))
r_b = db.Record(name="TestRecord")
r_b.add_parent(rt)
r_b.add_property(p, value=[18, 19])
r_a = db.Record(name="TestRecord")
r_a.add_parent(rt)
merge_entities(r_a, r_b)
assert r_b.get_property("test_bug_property").value == [18, 19]
assert r_a.get_property("test_bug_property").value == [18, 19]
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_b)
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_a)
@pytest.mark.xfail
def test_bug_109():
rt = db.RecordType(name="TestBug")
p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER))
r_b = db.Record(name="TestRecord")
r_b.add_parent(rt)
r_b.add_property(p, value=[18, 19])
r_a = db.Record(name="TestRecord")
r_a.add_parent(rt)
r_a.add_property(r_b.get_property("test_bug_property"))
assert r_b.get_property("test_bug_property").value == [18, 19]
assert r_a.get_property("test_bug_property").value == [18, 19]
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_b)
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_a)
@pytest.mark.xfail(reason="Issue https://gitlab.com/linkahead/linkahead-pylib/-/issues/111")
def test_failing_merge_entities_111():
prop_a = db.Property()
prop_parent = db.Property(name="prop_parent")
prop_b = db.Property(name="b", datatype=db.DOUBLE, unit="µs", value=1.1).add_parent(prop_parent)
print(prop_b)
db.apiutils.merge_entities(prop_a, prop_b)
assert prop_a.name == prop_b.name # OK
assert prop_parent.name in [par.name for par in prop_a.get_parents()] # OK
assert prop_a.value == prop_b.value # fails
assert prop_a.datatype == db.DOUBLE # fails
assert prop_a.unit == prop_b.unit # fails
def test_wrong_merge_conflict_reference():
"""Test a wrongly detected merge conflict in case of two records referencing
two different, but identical objects.
"""
# Two identical license records will be referenced from both records to be
# merged
license_rt = db.RecordType(name="license")
license_rec_a = db.Record(name="CC-BY-3.0").add_parent(license_rt)
license_rec_b = db.Record(name="CC-BY-3.0").add_parent(license_rt)
# two referencing records
dataset_rt = db.RecordType(name="Dataset")
title_prop = db.Property(name="title", datatype=db.TEXT)
doi_prop = db.Property(name="DOI", datatype=db.TEXT)
rec_a = db.Record().add_parent(dataset_rt)
rec_a.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_a)
rec_a.add_property(name=title_prop.name, value="Some dataset title")
rec_b = db.Record().add_parent(dataset_rt)
rec_b.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_b)
rec_b.add_property(name=doi_prop.name, value="https://doi.org/12345.678")
merge_entities(rec_a, rec_b)
assert rec_a.get_property(license_rt.name) is not None
assert rec_a.get_property(license_rt.name).value is not None
assert isinstance(rec_a.get_property(license_rt.name).value, db.Record)
assert rec_a.get_property(license_rt.name).value.name == license_rec_a.name
assert rec_a.get_property(license_rt.name).value.name == license_rec_b.name
assert rec_a.get_property("title").value == "Some dataset title"
assert rec_a.get_property("doi").value == "https://doi.org/12345.678"
# Reset rec_a
rec_a = db.Record().add_parent(dataset_rt)
rec_a.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_a)
rec_a.add_property(name=title_prop.name, value="Some dataset title")
# this does not compare referenced records, so it will fail
with pytest.raises(EntityMergeConflictError):
merge_entities(rec_a, rec_b, merge_references_with_empty_diffs=False)
# ... as should this, of course
rec_b.get_property(license_rt.name).value.name = "Another license"
with pytest.raises(EntityMergeConflictError) as re:
merge_entities(rec_a, rec_b)
def test_empty_diff():
rec_a = db.Record(name="A")
rec_b = db.Record(name="B")
assert empty_diff(rec_a, rec_a)
assert not empty_diff(rec_a, rec_b)
rec_a.add_parent(name="RT")
rec_b.add_parent(name="RT")
assert empty_diff(rec_a, rec_a)
assert not empty_diff(rec_a, rec_b)
rec_b.name = "A"
assert empty_diff(rec_a, rec_b)
rec_a.add_property(name="some_prop", value=1)
assert not empty_diff(rec_a, rec_b)
rec_b.add_property(name="some_prop", value=1)
assert empty_diff(rec_a, rec_b)
rec_b.get_property("some_prop").value = 2
assert not empty_diff(rec_a, rec_b)
rec_b.get_property("some_prop").value = 1
rec_b.add_property(name="some_other_prop", value="Test")
assert not empty_diff(rec_a, rec_b)
rec_a.add_property(name="some_other_prop", value="Test")
assert empty_diff(rec_a, rec_b)
# reference identical records, but different Python Record objects
ref_rec_a = db.Record(name="Ref").add_parent(name="RefType")
ref_rec_b = db.Record(name="Ref").add_parent(name="RefType")
rec_a.add_property(name="RefType", datatype="RefType", value=ref_rec_a)
rec_b.add_property(name="RefType", datatype="RefType", value=ref_rec_b)
# the default is `compare_referenced_records=False`, so the diff shouldn't
# be empty (different Python objects are referenced.)
assert not empty_diff(rec_a, rec_b)
# when looking into the referenced record, the diffs should be empty again
assert empty_diff(rec_a, rec_b, compare_referenced_records=True)
# The same for lists of references
rec_a.remove_property("RefType")
rec_b.remove_property("RefType")
assert empty_diff(rec_a, rec_b)
rec_a.add_property(name="RefType", datatype=db.LIST("RefType"),
value=[ref_rec_a, ref_rec_a])
rec_b.add_property(name="RefType", datatype=db.LIST("RefType"),
value=[ref_rec_b, ref_rec_b])
assert not empty_diff(rec_a, rec_b)
assert empty_diff(rec_a, rec_b, compare_referenced_records=True)
# special case of ids
rec_a = db.Record(id=12)
rec_b = db.Record()
assert not empty_diff(rec_a, rec_b)
rec_b.id = 13
assert not empty_diff(rec_a, rec_b)
rec_b.id = 12
assert empty_diff(rec_a, rec_b)
def test_force_merge():
"""Test whether a forced merge overwrites existing properties correctly."""
# name overwrite
recA = db.Record(name="A")
recB = db.Record(name="B")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert "B" == recA.name
# unchanged
assert "B" == recB.name
# description overwrite
recA = db.Record()
recA.description = "something"
recB = db.Record()
recB.description = "something else"
with pytest.raises(EntityMergeConflictError) as emce:
merge_entities(recA, recB)
assert str(emce.value) == """Conflict in special attribute description:
A: something
B: something else"""
merge_entities(recA, recB, force=True)
assert recA.description == "something else"
# unchanged
assert recB.description == "something else"
# property overwrite
recA = db.Record()
recA.add_property(name="propA", value="something")
recB = db.Record()
recB.add_property(name="propA", value="something else")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").value == "something else"
# unchanged
assert recB.get_property("propA").value == "something else"
# don't remove a property that's not in recB
recA = db.Record()
recA.add_property(name="propA", value="something")
recA.add_property(name="propB", value=5.0)
recB = db.Record()
recB.add_property(name="propA", value="something else")
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").value == "something else"
assert recA.get_property("propB").value == 5.0
# also overwrite datatypes ...
rtA = db.RecordType()
rtA.add_property(name="propA", datatype=db.INTEGER)
rtB = db.RecordType()
rtB.add_property(name="propA", datatype=db.TEXT)
with pytest.raises(EntityMergeConflictError):
merge_entities(rtA, rtB)
merge_entities(rtA, rtB, force=True)
assert rtA.get_property("propA").datatype == db.TEXT
# unchanged
assert rtB.get_property("propA").datatype == db.TEXT
# ... and units
recA = db.Record()
recA.add_property(name="propA", value=5, unit="m")
recB = db.Record()
recB.add_property(name="propA", value=5, unit="cm")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").unit == "cm"
# unchanged
assert recB.get_property("propA").unit == "cm"
# test whether an id is correctly overwritten by an entity without id
recA = db.Record().add_parent("A").add_property(name="B", value=112)
newRec = db.Record().add_parent("B").add_property("c")
recB = db.Record().add_parent("A").add_property(name="B", value=newRec)
merge_entities(recA, recB, force=True)
assert recA.get_property("B").value == newRec
recA = db.Record().add_parent("A").add_property(name="B", value=[112],
datatype=db.LIST("B"))
recB = db.Record().add_parent("A").add_property(name="B", value=[newRec], datatype=db.LIST(db.REFERENCE))
merge_entities(recA, recB, force=True)
assert recA.get_property("B").value == [newRec]
def test_merge_missing_list_datatype_82():
"""Merging two properties, where the list-valued one has no datatype."""
recA = db.Record().add_property("a", 5, datatype="B")
recB_with_DT = db.Record().add_property("a", [1, 2], datatype=f"LIST<{db.DOUBLE}>")
merge_entities(recA, recB_with_DT, force=True)
assert recA.get_property("a").datatype == f"LIST<{db.DOUBLE}>"
recA = db.Record().add_property("a", 5, datatype="B")
recB_without_DT = db.Record().add_property("a", [1, 2])
with pytest.raises(TypeError) as te:
merge_entities(recA, recB_without_DT, force=True)
assert "Invalid datatype: List valued properties" in str(te.value)
def test_merge_id_with_resolved_entity():
rtname = "TestRT"
ref_id = 123
ref_rec = db.Record(id=ref_id).add_parent(name=rtname)
# recA has the resolved referenced record as value, recB its id. Otherwise,
# they are identical.
recA = db.Record().add_property(name=rtname, value=ref_rec)
recB = db.Record().add_property(name=rtname, value=ref_id)
# default is strict: raise error since values are different
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
# Overwrite from right to left in both cases
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
assert recA.get_property(rtname).value == ref_rec
recA = db.Record().add_property(name=rtname, value=ref_rec)
merge_entities(recB, recA, merge_id_with_resolved_entity=True)
assert recB.get_property(rtname).value == ref_id
assert recA.get_property(rtname).value == ref_rec
# id mismatches
recB = db.Record().add_property(name=rtname, value=ref_id*2)
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
other_rec = db.Record(id=None).add_parent(name=rtname)
recA = db.Record().add_property(name=rtname, value=other_rec)
recB = db.Record().add_property(name=rtname, value=ref_id)
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
# also works in lists:
recA = db.Record().add_property(
name=rtname, datatype=db.LIST(rtname), value=[ref_rec, ref_id*2])
recB = db.Record().add_property(
name=rtname, datatype=db.LIST(rtname), value=[ref_id, ref_id*2])
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
assert recA.get_property(rtname).value == [ref_rec, ref_id*2]
assert recB.get_property(rtname).value == [ref_id, ref_id*2]
def test_describe_diff():
recA = db.Record()
recA.add_property(name="propA", value=2)
recA.add_property(name="propB", value=2)
recA.add_property(name="propD", value=-273, unit="K")
recB = db.Record()
recB.add_property(name="propA", value=2)
recB.add_property(name="propB", value=12)
recB.add_property(name="propC", value="cool 17")
recB.add_property(name="propD", value=-273, unit="°C")
diff = compare_entities(recA, recB)
diffout = describe_diff(*diff)
assert diffout.startswith("## Difference between the first version and the second version of None")
# The output of the describe_diff function is currently not ordered (e.g. by name of the property)
# so we cannot just compare a well-defined output string.
assert "it does not exist in the first version:" in diffout
assert "first version: {'value': 2}" in diffout
assert "second version: {'value': 12}" in diffout
assert "first version: {'unit': 'K'}" in diffout
assert "second version: {'unit': '°C'}" in diffout
diffout = describe_diff(*diff, name="Entity")
assert diffout.startswith("## Difference between the first version and the second version of Entity")
diffout = describe_diff(*diff, label_e0="recA", label_e1="recB")
assert "recA: {'value': 2}" in diffout
assert "recB: {'value': 12}" in diffout
assert "recA: {'unit': 'K'}" in diffout
assert "recB: {'unit': '°C'}" in diffout
assert "it does not exist in the recA:" in diffout
assert "first" not in diffout
assert "second" not in diffout
def test_diff_without_names():
"""Test compare_entities in case of properties and parents with
ids and without names
(cf. https://gitlab.com/linkahead/linkahead-pylib/-/issues/119).
"""
r1 = db.Record(name="Test").add_parent(name="TestType")
r2 = db.Record(name="Test").add_parent(name="TestType")
r2.add_property(id=123, value="Test")
diff1, diff2 = compare_entities(r1, r2)
assert len(diff1["properties"]) == 0
assert len(diff2["properties"]) == 1
assert 123 in diff2["properties"]
assert None not in diff2["properties"]
r3 = db.Record().add_parent(id=101)
r4 = db.Record().add_parent(id=102)
diff3, diff4 = compare_entities(r3, r4)
assert len(diff3["parents"]) == 1
assert 101 in diff3["parents"]
assert None not in diff3["parents"]
assert len(diff4["parents"]) == 1
assert 102 in diff4["parents"]
assert None not in diff3["parents"]