Select Git revision
test_version.py
-
Timm Fitschen authoredTimm Fitschen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_version.py 27.33 KiB
# encoding: utf-8
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
from pytest import mark, raises
from dateutil.parser import parse
import caosdb as c
def setup():
d = c.execute_query("FIND Test*")
if len(d) > 0:
d.delete()
def teardown():
setup()
def test_version_object():
from caosdb.common.versioning import Version
def insertion(name="TestRT"):
rt = c.RecordType(name, description="TestDescription1").insert()
assert rt.version is not None
assert rt.version.id is not None
assert rt.version.date is not None
assert len(rt.version.predecessors) == 0
assert len(rt.version.successors) == 0
return rt
def test_retrieve():
rt = insertion()
version = rt.version
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
assert parse(rt2.version.date) == parse(version.date)
assert rt2.version == version
def test_update_description():
rt = insertion()
old_version = rt.version
old_desc = rt.description
new_desc = "TestDescription2"
rt.description = new_desc
rt.update()
assert rt.description == new_desc
assert rt.version is not None
assert rt.version.id is not None
assert rt.version.date is not None
assert rt.version != old_version
assert rt.version.date != old_version.date
assert parse(rt.version.date) > parse(old_version.date)
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
assert rt2.version.id == rt.version.id
assert rt2.version == rt.version
assert rt2.description == new_desc
rt3 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
assert rt3.version.id == rt.version.id
assert rt3.version == rt.version
assert rt3.description == new_desc
# retrieve old version
rt_old = c.Container().retrieve(query=str(rt.id) +
"@" + old_version.id, sync=False)[0]
assert rt_old.version.id == old_version.id
assert rt_old.description == old_desc
def test_update_parent():
par1 = insertion("TestRTParent1")
par2 = insertion("TestRTParent2")
rt = insertion("TestRTChild")
assert len(rt.get_parents()) == 0
first_version = rt.version
rt.add_parent(par1)
rt.update()
assert len(rt.get_parents()) == 1
assert rt.get_parent("TestRTParent1") is not None
second_version = rt.version
rt.remove_parent(par1)
assert len(rt.get_parents()) == 0
rt.add_parent(par2)
rt.update()
assert len(rt.get_parents()) == 1
assert rt.get_parent("TestRTParent1") is None
assert rt.get_parent("TestRTParent2") is not None
third_version = rt.version
# now retrieve and look again
assert c.execute_query("FIND TestRTParent1", unique=True).id == par1.id
assert len(c.execute_query("FIND TestRTParent2")) == 2
assert c.execute_query("FIND TestRTChild", unique=True).id == rt.id
rt_head = c.Container().retrieve(query=str(rt.id), sync=False)[0]
rt_v1 = c.Container().retrieve(query=str(rt.id) + "@" + first_version.id,
sync=False)[0]
rt_v2 = c.Container().retrieve(query=str(rt.id) + "@" + second_version.id,
sync=False)[0]
rt_v3 = c.Container().retrieve(query=str(rt.id) + "@" + third_version.id,
sync=False)[0]
assert rt_head.version == third_version
assert rt_v1.version.id == first_version.id
assert rt_v2.version.id == second_version.id
assert rt_v3.version.id == third_version.id
assert len(rt_v3.get_parents()) == 1
assert rt_v3.get_parent("TestRTParent1") is None
assert rt_v3.get_parent("TestRTParent2") is not None
assert len(rt_v2.get_parents()) == 1
assert rt_v2.get_parent("TestRTParent1") is not None
assert rt_v2.get_parent("TestRTParent2") is None
assert len(rt_v1.get_parents()) == 0
def test_retrieve_old_version():
rt = insertion()
old_version = rt.version
old_description = rt.description
rt.description = "TestDescription3"
rt.update()
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
assert rt2.description == "TestDescription3"
rt_old = c.Container().retrieve(query=str(rt.id) + "@" + old_version.id,
sync=False)[0]
assert rt_old.id == rt.id
assert rt_old.description == old_description
def test_successor():
rt = insertion()
old_version = rt.version
rt.description = "TestDescription5"
rt.update()
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
rt_old = c.Container().retrieve(query=str(rt.id) + "@" + old_version.id,
sync=False)[0]
assert rt_old.version.successors[0].id == rt2.version.id, (
"old version has successor after retrieval")
def test_predecessor():
rt = insertion()
old_version = rt.version
old_description = rt.description
rt.description = "TestDescription6"
rt.update()
assert rt.version.predecessors[0].id == old_version.id, (
"latest version has predecessor directly after update")
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
assert rt2.version.predecessors[0].id == old_version.id, (
"latest version has predecessor after retrieval")
def test_retrieve_relative_to_head():
rt = insertion()
first_version = rt.version
# retrieve HEAD
rt_head = c.Container().retrieve(query=str(rt.id) + "@HEAD",
sync=False)[0]
rt_head2 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
rt_head2.version = rt_head.version
assert first_version == rt_head.version, "head is first version"
# no HEAD~1 before first update
with raises(c.EntityDoesNotExistError) as exc:
# no head~2
c.Container().retrieve(query=str(rt.id) + "@HEAD~1", sync=False)
# update
rt.description = "TestDescription4"
rt.update()
new_head_version = rt.version
assert first_version != new_head_version, (
"first version is not head anymore")
assert new_head_version.predecessors[0] == first_version, (
"first version is predessor of head")
# retrieve HEAD (which should have changed after the update)
rt_new_head = c.Container().retrieve(query=str(rt.id) + "@HEAD",
sync=False)[0]
rt_new_head2 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
assert rt_new_head2.version == rt_new_head.version
assert rt_new_head.version == new_head_version, (
"head is version after update")
assert rt_new_head.version.predecessors[0] == first_version, (
"predecessor of head is first version (after update)")
# retrieve HEAD~1 (the pre-update version)
rt_pre_head = c.Container().retrieve(query=str(rt.id) + "@HEAD~1",
sync=False)[0]
assert rt_pre_head.version.id == first_version.id, (
"head~1 is first version (after update)")
assert rt_pre_head.version.successors[0].id == rt_new_head.version.id, (
"successor of head~1 is head")
with raises(c.EntityDoesNotExistError) as exc:
# no head~2
c.Container().retrieve(query=str(rt.id) + "@HEAD~2", sync=False)
@mark.xfail(reason="bug fix needed")
def test_bug_cached_delete():
rt = insertion()
old_version = rt.version.id
rt.description = "UpdatedDesc"
rt.update()
# now id@old_version is cached...
rt2 = c.Container().retrieve(query=str(rt.id) + "@" + old_version,
sync=False)[0]
c.execute_query("FIND RecordType TestRT").delete()
with raises(c.EntityDoesNotExistError) as exc:
c.Container().retrieve(query=str(rt.id) + "@" + old_version,
sync=False)[0]
@mark.xfail(reason=("TODO: What is the desired behavior? "
"Resolve in versioning phase 10"))
def test_delete_property_used_in_old_version():
p = c.Property(name="TestProp1", datatype=c.TEXT).insert()
del_p = c.Property(name="TestProp2", datatype=c.TEXT).insert()
rt = c.RecordType(name="TestRT")
rt.add_property(del_p, "blubblub")
rt.insert()
# can't delete the property used by rt@HEAD
with raises(c.TransactionError) as exc:
del_p.delete()
assert "Entity is required by other entities" in str(exc.value)
# now update rt and remove the property which is to be deleted
rt.remove_property(del_p)
rt.add_property(p, "blablabla")
rt.update()
# retrieve and check old version
old_rt = c.Container().retrieve(str(rt.id) + "@HEAD~1",
sync=False)[0]
assert old_rt.get_property(p) is None
assert old_rt.get_property(del_p).value == "blubblub"
# delete the property use by old_rt
del_p_id = del_p.id
del_p_name = del_p.name
del_p.delete()
# retrieve old version again
old_rt = c.Container().retrieve(str(rt.id) + "@HEAD~1",
sync=False,
raise_exception_on_error=False)[0]
assert old_rt.get_property(p) is None
# the value is still there (and the property has nothing but an id)
assert old_rt.get_property(del_p_name) is None
assert old_rt.get_property(del_p_id).value == "blubblub"
# TODO: What is the desired behavior?
# Currently, the server throws an error.
# Should we make this a warning?
assert "Entity has unqualified properties" in str(old_rt.get_errors()[0])
# fails until resolved!
assert len(old_rt.get_errors()) == 0
@mark.xfail(reason=("TODO: What is the desired behavior? "
"Resolve in versioning phase 10"))
def test_delete_parent_used_in_old_version():
del_rt = c.RecordType(name="TestRT1").insert()
rt = c.RecordType(name="TestRT2").insert()
rec = c.Record(name="TestRec").add_parent(del_rt)
rec.insert()
# can't delete the parent used by rec@HEAD
with raises(c.TransactionError) as exc:
del_rt.delete()
assert "Entity is required by other entities" in str(exc.value)
# update rec and change parent
rec.remove_parent(del_rt)
rec.add_parent(rt)
rec.update()
# retrieve old version
old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
sync=False)[0]
assert old_rec.get_parent(rt) is None
assert old_rec.get_parent(del_rt) is not None
del_rt_id = del_rt.id
del_rt_name = del_rt.name
del_rt.delete()
# retrieve old version, again
old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
sync=False,
raise_exception_on_error=False)[0]
assert old_rec.get_parent(rt) is None
assert old_rec.get_parent(del_rt_id) is not None
# TODO: What is the desired behavior?
# Currently, the server doesn't report anything
# Should we issue a warning?
assert len(old_rec.get_messages()) == 0
# fail until resolved
assert len(old_rec.get_errors()) > 0
assert "Entity has unqualified parents" in str(old_rec.get_errors()[0])
# Another problem is caching. This fails because the parent name is still
# in the cache.
assert old_rec.get_parent(del_rt_name) is None
@mark.xfail(reason="bug fix needed")
def test_bug_cached_parent_name_in_old_version():
del_rt = c.RecordType(name="TestRT1").insert()
rt = c.RecordType(name="TestRT2").insert()
rec = c.Record(name="TestRec").add_parent(del_rt)
rec.insert()
# update rec and change parent
rec.remove_parent(del_rt)
rec.add_parent(rt)
rec.update()
# delete old parent
del_rt_name = del_rt.name
del_rt.delete()
# retrieve old version
old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
sync=False,
raise_exception_on_error=False)[0]
assert old_rec.get_parent(rt) is None
# This fails because the parent name is still in the cache.
# The name should be forgotten.
assert old_rec.get_parent(del_rt_name) is None
def test_reference_deleted_in_old_version():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
p = c.Property(name="TestProp", datatype=c.TEXT).insert()
referenced_rec = c.Record(name="TestRec1").add_parent(ref_rt)
referenced_rec.insert()
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(p, "blablabla")
rec.add_property(ref_rt, referenced_rec)
rec.insert()
old_version = rec.version.id
test_rec = c.execute_query(
"FIND RECORD TestRec2 WHICH REFERENCES {}".format(referenced_rec.id),
unique=True)
assert test_rec.get_property(p).value == "blablabla"
assert test_rec.get_property(ref_rt).value == referenced_rec.id
# deletion of the referenced_rec not possible because rec@HEAD is
# still pointing at it
with raises(c.TransactionError) as exc:
referenced_rec.delete()
assert "Entity is required by other entities" in str(exc.value)
# update rec
rec.remove_property(ref_rt)
rec.update()
with raises(c.EntityDoesNotExistError) as exc:
c.execute_query(
"FIND RECORD TestRec2 WHICH REFERENCES {}".format(
referenced_rec.id),
unique=True)
test_rec = c.execute_query("FIND RECORD WITH TestProp = blablabla",
unique=True)
assert test_rec.get_property(p).value == "blablabla"
assert test_rec.get_property(ref_rt) is None
assert test_rec.version.predecessors[0].id == old_version
# retrieve old version
old_rec = c.Container().retrieve(str(test_rec.id) + "@HEAD~1",
sync=False)[0]
assert old_rec.version.id == old_version
assert old_rec.version.successors[0].id == test_rec.version.id
assert old_rec.get_property(p).value == "blablabla"
assert old_rec.get_property(ref_rt).value == referenced_rec.id
# deletion of the referenced_rec now possible because rec@HEAD is not
# pointing at it anymore
referenced_id = referenced_rec.id
referenced_rec.delete()
# still everything ok
test_rec = c.execute_query("FIND RECORD WITH TestProp = blablabla",
unique=True)
assert test_rec.get_property(p).value == "blablabla"
assert test_rec.get_property(ref_rt) is None
assert test_rec.version.predecessors[0].id == old_version
# retrieve old version again. the reference (to the now deleted entity)
# is still there.
old_rec = c.Container().retrieve(str(test_rec.id) + "@HEAD~1",
sync=False)[0]
assert old_rec.version.id == old_version
assert old_rec.version.successors[0].id == test_rec.version.id
assert old_rec.get_property(p).value == "blablabla"
assert old_rec.get_property(ref_rt).value == referenced_id
with raises(c.EntityDoesNotExistError) as exc:
c.execute_query("FIND ENTITY WITH ID = {}".format(referenced_id),
unique=True)
with raises(c.EntityDoesNotExistError) as exc:
c.Record(id=referenced_id).retrieve()
def test_reference_version_head():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.insert()
version = versioned_rec.version.id
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, str(versioned_rec.id) + "@HEAD")
rec = rec.insert(sync=False)
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=version)
test_rec = c.execute_query(
"FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=version)
# now update versioned_rec
old_head = versioned_rec.version.id
versioned_rec.description = "new desc"
versioned_rec.update()
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=old_head), "after update still old head"
test_rec = c.execute_query(
"FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=old_head), "after query old head"
def test_insert_reference_to_head_in_same_container():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.id = -1
rec = c.Record(name="TestRec2").add_parent(rt)
rec.id = -2
rec.add_property(ref_rt, str(versioned_rec.id) + "@HEAD")
container = c.Container()
container.extend([versioned_rec, rec])
container.insert()
version_id = c.execute_query("FIND Record TestReferencedObject",
unique=True).version.id
test_rec = c.execute_query("FIND Record TestRT WHICH REFERENCES {}".format(versioned_rec.id),
unique=True)
assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=version_id)
def test_update_reference_to_head_minus_one_in_same_container():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.insert()
old_head = versioned_rec.version.id
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, str(versioned_rec.id))
rec.insert()
# now update both
versioned_rec.description = "new description"
assert rec.get_property(ref_rt).value == versioned_rec.id, "ref to entity"
rec.get_property(ref_rt).value = str(versioned_rec.id) + "@HEAD~1"
container = c.Container()
container.extend([versioned_rec, rec])
container.update()
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=versioned_rec.id, ver=old_head), "ref to old_head"
test_rec = c.execute_query("FIND RECORD TestRT WHICH REFERENCES {}".format(versioned_rec.id),
unique=True)
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=versioned_rec.id, ver=old_head), "after query ref to old_head"
def test_update_reference_to_head_minus_one_in_same_container_2():
""" This is identical to the previous one with one exception: The
referenced entity is not being updated during the transaction but it is
included in the update container. This has caused some buggy behavior in
the past.
"""
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(
name="TestRec1",
description="v1").add_parent(ref_rt)
versioned_rec.insert()
old_head = versioned_rec.version.id
# update versioned
versioned_rec.description = "v2"
versioned_rec.update()
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, str(versioned_rec.id))
rec.insert()
# now update only the referencing entity.
assert rec.get_property(ref_rt).value == versioned_rec.id, "ref to entity"
rec.get_property(ref_rt).value = str(versioned_rec.id) + "@HEAD~1"
container = c.Container()
container.extend([versioned_rec, rec])
container.update()
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=versioned_rec.id, ver=old_head), "ref to old_head"
test_rec = c.execute_query("FIND RECORD TestRT WHICH REFERENCES {}".format(versioned_rec.id),
unique=True)
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=versioned_rec.id, ver=old_head), "after query ref to old_head"
def test_reference_version_old():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.insert()
version = versioned_rec.version.id
# now update versioned_rec
old_head = versioned_rec.version.id
versioned_rec.description = "new desc"
versioned_rec.update()
# insert rec which references an old version of versioned_rec
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, str(versioned_rec.id) + "@" + old_head)
rec = rec.insert(sync=False)
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=old_head)
test_rec = c.execute_query(
"FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=old_head)
def test_reference_no_version():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.insert()
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, versioned_rec.id)
rec.insert()
assert rec.get_property(ref_rt).value == versioned_rec.id
test_rec = c.execute_query(
"FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
assert test_rec.get_property(ref_rt).value == versioned_rec.id
def test_reference_head_minus_in_separate_container():
ref_rt = insertion("TestRT")
rec1 = c.Record("TestRecord1-firstVersion").add_parent("TestRT")
rec1.description = "This is the first version."
rec1.insert()
v1 = rec1.version.id
rec1.name = "TestRecord1-secondVersion"
rec1.description = "This is the second version."
rec1.update()
v2 = rec1.version.id
rec1.name = "TestRecord1-thirdVersion"
rec1.description = "This is the third version."
rec1.update()
v3 = rec1.version.id
rec2 = c.Record("TestRecord2").add_parent("TestRT")
rec2.description = ("This record has a list of references to several "
"versions of TestRecord1. The first references the "
"record without specifying the version, the other "
"each reference a different version of that record.")
rec2.add_property("TestRT", datatype=c.LIST("TestRT"),
value=[rec1.id,
str(rec1.id) + "@HEAD",
str(rec1.id) + "@HEAD~1",
str(rec1.id) + "@HEAD~2"])
rec2.insert()
test_rec = c.execute_query("FIND TestRecord2", unique=True)
assert test_rec.get_property("TestRT").value == [rec1.id,
str(rec1.id) + "@" + v3,
str(rec1.id) + "@" + v2,
str(rec1.id) + "@" + v1]
def test_properties_no_version():
c.Property("TestProperty", datatype=c.TEXT).insert()
c.RecordType("TestRT").add_property("TestProperty").insert()
rt = c.execute_query("FIND TestRT", unique=True)
p = rt.get_property("TestProperty")
assert p.version is None
def test_update_name():
old_name = "TestRTOldName"
new_name = "TestRTNewName"
rt = insertion(old_name)
old_version = rt.version
assert len(c.execute_query("FIND RecordType {}".format(new_name))) == 0
rt2 = c.execute_query("FIND RecordType {}".format(old_name), unique=True)
assert rt2.version.id == rt.version.id
assert rt2.version == old_version
assert rt2.name == old_name
# do the update, run checks again
rt.name = new_name
rt.update()
assert rt.name == new_name
assert rt.version is not None
assert rt.version.id is not None
assert rt.version.date is not None
assert rt.version != old_version
assert rt.version.date != old_version.date
assert parse(rt.version.date) > parse(old_version.date)
assert len(c.execute_query("FIND RecordType {}".format(old_name))) == 0
rt2 = c.execute_query("FIND RecordType {}".format(new_name), unique=True)
assert rt2.version.id == rt.version.id
assert rt2.version == rt.version
assert rt2.name == new_name
# retrieve once again, via id
rt3 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
assert rt3.version.id == rt.version.id
assert rt3.version == rt.version
assert rt3.name == new_name
# retrieve old version
rt_old = c.Container().retrieve(query=str(rt.id) +
"@" + old_version.id, sync=False)[0]
assert rt_old.version.id == old_version.id
assert rt_old.name == old_name
def test_overridden_datatype():
"""A bug in the mysql backend resultet in a server error when the old
version of an entity was retrieved where the datatype of a property has
been overriden.
Original error in the server logs:
```
org.caosdb.server.database.exceptions.TransactionException: java.sql.SQLSyntaxErrorException: Unknown column 'datatypeID' in 'where clause'
at org.caosdb.server.database.backend.implementation.MySQL.MySQLRetrieveProperties.execute(MySQLRetrieveProperties.java:70)
```
"""
p = c.Property("TestProperty", datatype=c.TEXT).insert()
rt = c.RecordType("TestRT")
rt.add_property("TestProperty", datatype=c.LIST(c.TEXT))
rt.insert()
rt.description = "Updated TestRT"
rt.update()
# retrieve the old version (cache flag must be set to "false")
rt_old = c.Container().retrieve(query=str(rt.id) + "@HEAD~1",
flags={"cache": "false"}, sync=False)
assert rt.get_property("TestProperty").datatype == c.LIST(c.TEXT)
def test_delete_referenced_entity():
ref_rt = c.RecordType("TestReferencedRT").insert()
rt = c.RecordType("TestRT").insert()
ref = c.Record("TestRef1").add_parent(ref_rt).insert()
rec = c.Record("TestRec").add_parent(rt).add_property(ref_rt, ref).insert()
rec.description = "v2"
rec.remove_property(ref_rt)
rec.update()
rec_v1 = c.Container().retrieve(query=str(rec.id) + "@HEAD~1",
flags={"cache": "false"}, sync=False)[0]
assert rec_v1.get_property(ref_rt).value == ref.id
rec_v2 = c.Container().retrieve(query=str(rec.id) + "@HEAD",
flags={"cache": "false"}, sync=False)[0]
assert rec_v2.get_property(ref_rt) is None
ref.delete()
rec_v1 = c.Container().retrieve(query=str(rec.id) + "@HEAD~1",
flags={"cache": "false"}, sync=False)[0]
assert rec_v1.get_property(ref_rt).value is None
assert rec_v1.get_property(ref_rt).get_warnings()[
0].description == "The referenced entity has been deleted in the mean time and is not longer available."
rec_v2 = c.Container().retrieve(query=str(rec.id) + "@HEAD",
flags={"cache": "false"}, sync=False)[0]
assert rec_v2.get_property(ref_rt) is None