Select Git revision
test_creation.py
-
Timm Fitschen authored
AGPLv3 Veröffentlichung gemäß Dienstanweisung vom 15. August 2018.
Timm Fitschen authoredAGPLv3 Veröffentlichung gemäß Dienstanweisung vom 15. August 2018.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_apiutils.py 22.20 KiB
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2022 Florian Spreckelsen <f.spreckelsen@indiscale.com>
# Copyright (C) 2022 Daniel Hornung <d.hornung@indiscale.com>
# Copyright (C) 2020-2022 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# Test apiutils
# A. Schlemmer, 02/2018
import linkahead as db
import linkahead.apiutils
import pytest
from linkahead.apiutils import (EntityMergeConflictError, apply_to_ids,
compare_entities, create_id_query, empty_diff,
escape_quoted_text, merge_entities,
resolve_reference)
from linkahead.common.models import SPECIAL_ATTRIBUTES
def test_apply_to_ids():
parent = db.RecordType(id=3456)
rec = db.Record(id=23)
p = db.Property(id=23345, datatype=db.INTEGER)
rec.add_parent(parent)
rec.add_property(p)
def invert(id_):
return id_ * -1
apply_to_ids([rec], invert)
assert invert(3456) == -3456
assert rec.parents[0].id == -3456
assert rec.properties[0].id == -23345
assert rec.id == -23
def test_id_query():
ids = [1, 2, 3, 4, 5]
assert create_id_query(ids) == 'FIND ENTITY WITH ID=1 OR ID=2 OR ID=3 OR '\
'ID=4 OR ID=5'
def test_resolve_reference():
original_retrieve_entity_with_id = linkahead.apiutils.retrieve_entity_with_id
linkahead.apiutils.retrieve_entity_with_id = lambda eid: db.Record(id=eid)
prop = db.Property(id=1, datatype=db.REFERENCE, value=100)
prop.is_valid = lambda: True
items = [200, 300, 400]
prop_list = db.Property(datatype=db.LIST(db.REFERENCE),
value=items)
prop_list2 = db.Property(datatype=db.LIST(db.REFERENCE),
value=[db.Record(id=500)])
resolve_reference(prop)
resolve_reference(prop_list)
resolve_reference(prop_list2)
assert prop.value.id == 100
assert isinstance(prop.value, db.Entity)
prop_list_ids = []
for i in prop_list.value:
prop_list_ids.append(i.id)
assert isinstance(i, db.Entity)
assert prop_list_ids == items
for i in prop_list2.value:
assert i.id == 500
assert isinstance(i, db.Entity)
no_reference = db.Property(id=5000, datatype=db.INTEGER, value=2)
resolve_reference(no_reference)
assert no_reference.value == 2
assert no_reference.datatype is db.INTEGER
# restore retrive_entity_with_id
linkahead.apiutils.retrieve_entity_with_id = original_retrieve_entity_with_id
def test_compare_entities():
r1 = db.Record()
r2 = db.Record()
r1.add_parent("bla")
r2.add_parent("bla")
r1.add_parent("lopp")
r1.add_property("test", value=2)
r2.add_property("test", value=2)
r1.add_property("tests", value=3)
r2.add_property("tests", value=45)
r1.add_property("tester", value=3)
r2.add_property("tester", )
r1.add_property("tests_234234", value=45)
r2.add_property("tests_TT", value=45)
diff_r1, diff_r2 = compare_entities(r1, r2)
assert len(diff_r1["parents"]) == 1
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 3
assert len(diff_r2["properties"]) == 3
assert "test" not in diff_r1["properties"]
assert "test" not in diff_r2["properties"]
assert "tests" in diff_r1["properties"]
assert "tests" in diff_r2["properties"]
assert "tester" in diff_r1["properties"]
assert "tester" in diff_r2["properties"]
assert "tests_234234" in diff_r1["properties"]
assert "tests_TT" in diff_r2["properties"]
def test_compare_entities_units():
r1 = db.Record()
r2 = db.Record()
r1.add_parent("bla")
r2.add_parent("bla")
r1.add_parent("lopp")
r1.add_property("test", value=2, unit="cm")
r2.add_property("test", value=2, unit="m")
r1.add_property("tests", value=3, unit="cm")
r2.add_property("tests", value=45, unit="cm")
r1.add_property("tester", value=3)
r2.add_property("tester", )
r1.add_property("tests_234234", value=45, unit="cm")
r2.add_property("tests_TT", value=45, unit="cm")
diff_r1, diff_r2 = compare_entities(r1, r2)
assert len(diff_r1["parents"]) == 1
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 4
assert len(diff_r2["properties"]) == 4
assert "tests" in diff_r1["properties"]
assert "tests" in diff_r2["properties"]
assert "tester" in diff_r1["properties"]
assert "tester" in diff_r2["properties"]
assert "tests_234234" in diff_r1["properties"]
assert "tests_TT" in diff_r2["properties"]
assert diff_r1["properties"]["test"]["unit"] == "cm"
assert diff_r2["properties"]["test"]["unit"] == "m"
def test_compare_special_properties():
# Test for all known special properties:
SPECIAL_PROPERTIES = ("description", "name",
"checksum", "size", "path", "id")
INTS = ("size", "id")
HIDDEN = ("checksum", "size")
for key in SPECIAL_PROPERTIES:
set_key = key
if key in HIDDEN:
set_key = "_" + key
r1 = db.Record()
r2 = db.Record()
if key not in INTS:
setattr(r1, set_key, "bla 1")
setattr(r2, set_key, "bla 1")
else:
setattr(r1, set_key, 1)
setattr(r2, set_key, 1)
diff_r1, diff_r2 = compare_entities(r1, r2)
assert key not in diff_r1
assert key not in diff_r2
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
if key not in INTS:
setattr(r2, set_key, "bla test")
else:
setattr(r2, set_key, 2)
diff_r1, diff_r2 = compare_entities(r1, r2)
assert key in diff_r1
assert key in diff_r2
if key not in INTS:
assert diff_r1[key] == "bla 1"
assert diff_r2[key] == "bla test"
else:
assert diff_r1[key] == 1
assert diff_r2[key] == 2
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
@pytest.mark.xfail
def test_compare_properties():
p1 = db.Property()
p2 = db.Property()
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
p1.importance = "SUGGESTED"
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
assert "importance" in diff_r1
assert diff_r1["importance"] == "SUGGESTED"
# TODO: I'm not sure why it is not like this:
# assert diff_r2["importance"] is None
# ... but:
assert "importance" not in diff_r2
p2.importance = "SUGGESTED"
p1.value = 42
p2.value = 4
diff_r1, diff_r2 = compare_entities(p1, p2)
assert len(diff_r1["parents"]) == 0
assert len(diff_r2["parents"]) == 0
assert len(diff_r1["properties"]) == 0
assert len(diff_r2["properties"]) == 0
# Comparing values currently does not seem to be implemented:
assert "value" in diff_r1
assert diff_r1["value"] == 42
assert "value" in diff_r2
assert diff_r2["value"] == 4
def test_copy_entities():
r = db.Record(name="A")
r.add_parent(name="B")
r.add_property(name="C", value=4, importance="OBLIGATORY")
r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY")
r.description = "A fancy test record"
c = r.copy()
assert c is not r
assert c.name == "A"
assert c.role == r.role
assert c.parents[0].name == "B"
# parent and property objects are not shared among copy and original:
assert c.parents[0] is not r.parents[0]
for i in [0, 1]:
assert c.properties[i] is not r.properties[i]
for special in SPECIAL_ATTRIBUTES:
assert getattr(c.properties[i], special) == getattr(
r.properties[i], special)
assert c.get_importance(
c.properties[i]) == r.get_importance(r.properties[i])
def test_merge_entities():
r = db.Record(name="A")
r.add_parent(name="B")
r.add_property(name="C", value=4, importance="OBLIGATORY")
r.add_property(name="D", value=[3, 4, 7], importance="OBLIGATORY")
r.description = "A fancy test record"
r2 = db.Record()
r2.add_property(name="F", value="text")
merge_entities(r2, r)
assert r2.get_parents()[0].name == "B"
assert r2.get_property("C").name == "C"
assert r2.get_property("C").value == 4
assert r2.get_property("D").name == "D"
assert r2.get_property("D").value == [3, 4, 7]
assert r2.get_property("F").name == "F"
assert r2.get_property("F").value == "text"
def test_merge_bug_conflict():
r = db.Record()
r.add_property(name="C", value=4)
r2 = db.Record()
r2.add_property(name="C", value=4, datatype="TEXT")
merge_entities(r, r2)
r3 = db.Record()
r3.add_property(name="C", value=4, datatype="INTEGER")
with pytest.raises(EntityMergeConflictError):
merge_entities(r3, r2)
def test_merge_bug_109():
rt = db.RecordType(name="TestBug")
p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER))
r_b = db.Record(name="TestRecord")
r_b.add_parent(rt)
r_b.add_property(p, value=[18, 19])
r_a = db.Record(name="TestRecord")
r_a.add_parent(rt)
merge_entities(r_a, r_b)
assert r_b.get_property("test_bug_property").value == [18, 19]
assert r_a.get_property("test_bug_property").value == [18, 19]
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_b)
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_a)
@pytest.mark.xfail
def test_bug_109():
rt = db.RecordType(name="TestBug")
p = db.Property(name="test_bug_property", datatype=db.LIST(db.INTEGER))
r_b = db.Record(name="TestRecord")
r_b.add_parent(rt)
r_b.add_property(p, value=[18, 19])
r_a = db.Record(name="TestRecord")
r_a.add_parent(rt)
r_a.add_property(r_b.get_property("test_bug_property"))
assert r_b.get_property("test_bug_property").value == [18, 19]
assert r_a.get_property("test_bug_property").value == [18, 19]
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_b)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_b)
assert "<Value>18</Value>\n <Value>19</Value>" in str(r_a)
assert "<Value>18</Value>\n <Value>19</Value>\n <Value>18</Value>\n <Value>19</Value>" not in str(
r_a)
@pytest.mark.xfail(reason="Issue https://gitlab.com/linkahead/linkahead-pylib/-/issues/111")
def test_failing_merge_entities_111():
prop_a = db.Property()
prop_parent = db.Property(name="prop_parent")
prop_b = db.Property(name="b", datatype=db.DOUBLE, unit="µs", value=1.1).add_parent(prop_parent)
print(prop_b)
db.apiutils.merge_entities(prop_a, prop_b)
assert prop_a.name == prop_b.name # OK
assert prop_parent.name in [par.name for par in prop_a.get_parents()] # OK
assert prop_a.value == prop_b.value # fails
assert prop_a.datatype == db.DOUBLE # fails
assert prop_a.unit == prop_b.unit # fails
def test_wrong_merge_conflict_reference():
"""Test a wrongly detected merge conflict in case of two records referencing
two different, but identical objects.
"""
# Two identical license records will be referenced from both records to be
# merged
license_rt = db.RecordType(name="license")
license_rec_a = db.Record(name="CC-BY-3.0").add_parent(license_rt)
license_rec_b = db.Record(name="CC-BY-3.0").add_parent(license_rt)
# two referencing records
dataset_rt = db.RecordType(name="Dataset")
title_prop = db.Property(name="title", datatype=db.TEXT)
doi_prop = db.Property(name="DOI", datatype=db.TEXT)
rec_a = db.Record().add_parent(dataset_rt)
rec_a.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_a)
rec_a.add_property(name=title_prop.name, value="Some dataset title")
rec_b = db.Record().add_parent(dataset_rt)
rec_b.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_b)
rec_b.add_property(name=doi_prop.name, value="https://doi.org/12345.678")
merge_entities(rec_a, rec_b)
assert rec_a.get_property(license_rt.name) is not None
assert rec_a.get_property(license_rt.name).value is not None
assert isinstance(rec_a.get_property(license_rt.name).value, db.Record)
assert rec_a.get_property(license_rt.name).value.name == license_rec_a.name
assert rec_a.get_property(license_rt.name).value.name == license_rec_b.name
assert rec_a.get_property("title").value == "Some dataset title"
assert rec_a.get_property("doi").value == "https://doi.org/12345.678"
# Reset rec_a
rec_a = db.Record().add_parent(dataset_rt)
rec_a.add_property(name=license_rt.name,
datatype=license_rt.name, value=license_rec_a)
rec_a.add_property(name=title_prop.name, value="Some dataset title")
# this does not compare referenced records, so it will fail
with pytest.raises(EntityMergeConflictError):
merge_entities(rec_a, rec_b, merge_references_with_empty_diffs=False)
# ... as should this, of course
rec_b.get_property(license_rt.name).value.name = "Another license"
with pytest.raises(EntityMergeConflictError) as re:
merge_entities(rec_a, rec_b)
def test_empty_diff():
rec_a = db.Record(name="A")
rec_b = db.Record(name="B")
assert empty_diff(rec_a, rec_a)
assert not empty_diff(rec_a, rec_b)
rec_a.add_parent(name="RT")
rec_b.add_parent(name="RT")
assert empty_diff(rec_a, rec_a)
assert not empty_diff(rec_a, rec_b)
rec_b.name = "A"
assert empty_diff(rec_a, rec_b)
rec_a.add_property(name="some_prop", value=1)
assert not empty_diff(rec_a, rec_b)
rec_b.add_property(name="some_prop", value=1)
assert empty_diff(rec_a, rec_b)
rec_b.get_property("some_prop").value = 2
assert not empty_diff(rec_a, rec_b)
rec_b.get_property("some_prop").value = 1
rec_b.add_property(name="some_other_prop", value="Test")
assert not empty_diff(rec_a, rec_b)
rec_a.add_property(name="some_other_prop", value="Test")
assert empty_diff(rec_a, rec_b)
# reference identical records, but different Python Record objects
ref_rec_a = db.Record(name="Ref").add_parent(name="RefType")
ref_rec_b = db.Record(name="Ref").add_parent(name="RefType")
rec_a.add_property(name="RefType", datatype="RefType", value=ref_rec_a)
rec_b.add_property(name="RefType", datatype="RefType", value=ref_rec_b)
# the default is `compare_referenced_records=False`, so the diff shouldn't
# be empty (different Python objects are referenced.)
assert not empty_diff(rec_a, rec_b)
# when looking into the referenced record, the diffs should be empty again
assert empty_diff(rec_a, rec_b, compare_referenced_records=True)
# The same for lists of references
rec_a.remove_property("RefType")
rec_b.remove_property("RefType")
assert empty_diff(rec_a, rec_b)
rec_a.add_property(name="RefType", datatype=db.LIST(
"RefType"), value=[ref_rec_a, ref_rec_a])
rec_b.add_property(name="RefType", datatype=db.LIST(
"RefType"), value=[ref_rec_b, ref_rec_b])
assert not empty_diff(rec_a, rec_b)
assert empty_diff(rec_a, rec_b, compare_referenced_records=True)
# special case of ids
rec_a = db.Record(id=12)
rec_b = db.Record()
assert not empty_diff(rec_a, rec_b)
rec_b.id = 13
assert not empty_diff(rec_a, rec_b)
rec_b.id = 12
assert empty_diff(rec_a, rec_b)
def test_force_merge():
"""Test whether a forced merge overwrites existing properties correctly."""
# name overwrite
recA = db.Record(name="A")
recB = db.Record(name="B")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert "B" == recA.name
# unchanged
assert "B" == recB.name
# description overwrite
recA = db.Record()
recA.description = "something"
recB = db.Record()
recB.description = "something else"
with pytest.raises(EntityMergeConflictError) as emce:
merge_entities(recA, recB)
assert str(emce.value) == """Conflict in special attribute description:
A: something
B: something else"""
merge_entities(recA, recB, force=True)
assert recA.description == "something else"
# unchanged
assert recB.description == "something else"
# property overwrite
recA = db.Record()
recA.add_property(name="propA", value="something")
recB = db.Record()
recB.add_property(name="propA", value="something else")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").value == "something else"
# unchanged
assert recB.get_property("propA").value == "something else"
# don't remove a property that's not in recB
recA = db.Record()
recA.add_property(name="propA", value="something")
recA.add_property(name="propB", value=5.0)
recB = db.Record()
recB.add_property(name="propA", value="something else")
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").value == "something else"
assert recA.get_property("propB").value == 5.0
# also overwrite datatypes ...
rtA = db.RecordType()
rtA.add_property(name="propA", datatype=db.INTEGER)
rtB = db.RecordType()
rtB.add_property(name="propA", datatype=db.TEXT)
with pytest.raises(EntityMergeConflictError):
merge_entities(rtA, rtB)
merge_entities(rtA, rtB, force=True)
assert rtA.get_property("propA").datatype == db.TEXT
# unchanged
assert rtB.get_property("propA").datatype == db.TEXT
# ... and units
recA = db.Record()
recA.add_property(name="propA", value=5, unit="m")
recB = db.Record()
recB.add_property(name="propA", value=5, unit="cm")
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
merge_entities(recA, recB, force=True)
assert recA.get_property("propA").unit == "cm"
# unchanged
assert recB.get_property("propA").unit == "cm"
def test_merge_missing_list_datatype_82():
"""Merging two properties, where the list-valued one has no datatype."""
recA = db.Record().add_property("a", 5, datatype="B")
recB_with_DT = db.Record().add_property("a", [1, 2], datatype=f"LIST<{db.DOUBLE}>")
merge_entities(recA, recB_with_DT, force=True)
assert recA.get_property("a").datatype == f"LIST<{db.DOUBLE}>"
recA = db.Record().add_property("a", 5, datatype="B")
recB_without_DT = db.Record().add_property("a", [1, 2])
with pytest.raises(TypeError) as te:
merge_entities(recA, recB_without_DT, force=True)
assert "Invalid datatype: List valued properties" in str(te.value)
def test_merge_id_with_resolved_entity():
rtname = "TestRT"
ref_id = 123
ref_rec = db.Record(id=ref_id).add_parent(name=rtname)
# recA has the resolved referenced record as value, recB its id. Otherwise,
# they are identical.
recA = db.Record().add_property(name=rtname, value=ref_rec)
recB = db.Record().add_property(name=rtname, value=ref_id)
# default is strict: raise error since values are different
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB)
# Overwrite from right to left in both cases
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
assert recA.get_property(rtname).value == ref_id
assert recA.get_property(rtname).value == recB.get_property(rtname).value
recA = db.Record().add_property(name=rtname, value=ref_rec)
merge_entities(recB, recA, merge_id_with_resolved_entity=True)
assert recB.get_property(rtname).value == ref_rec
assert recA.get_property(rtname).value == recB.get_property(rtname).value
# id mismatches
recB = db.Record().add_property(name=rtname, value=ref_id*2)
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
other_rec = db.Record(id=None).add_parent(name=rtname)
recA = db.Record().add_property(name=rtname, value=other_rec)
recB = db.Record().add_property(name=rtname, value=ref_id)
with pytest.raises(EntityMergeConflictError):
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
# also works in lists:
recA = db.Record().add_property(
name=rtname, datatype=db.LIST(rtname), value=[ref_rec, ref_id*2])
recB = db.Record().add_property(name=rtname, datatype=db.LIST(rtname), value=[ref_id, ref_id*2])
merge_entities(recA, recB, merge_id_with_resolved_entity=True)
assert recA.get_property(rtname).value == [ref_id, ref_id*2]
assert recA.get_property(rtname).value == recB.get_property(rtname).value
def test_escape_quoted_text():
assert escape_quoted_text("bla") == "bla"
assert escape_quoted_text("bl\\a") == "bl\\\\a"
assert escape_quoted_text("bl*a") == "bl\\*a"
assert escape_quoted_text("bl*ab\\\\lab\\*labla") == "bl\\*ab\\\\\\\\lab\\\\\\*labla"