Select Git revision
test_records.py
-
Timm Fitschen authored
AGPLv3 Veröffentlichung gemäß Dienstanweisung vom 15. August 2018.
Timm Fitschen authoredAGPLv3 Veröffentlichung gemäß Dienstanweisung vom 15. August 2018.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_version.py 27.33 KiB
# encoding: utf-8
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
from pytest import mark, raises
from dateutil.parser import parse
import caosdb as c
def setup():
d = c.execute_query("FIND Test*")
if len(d) > 0:
d.delete()
def teardown():
setup()
def test_version_object():
from caosdb.common.versioning import Version
def insertion(name="TestRT"):
rt = c.RecordType(name, description="TestDescription1").insert()
assert rt.version is not None
assert rt.version.id is not None
assert rt.version.date is not None
assert len(rt.version.predecessors) == 0
assert len(rt.version.successors) == 0
return rt
def test_retrieve():
rt = insertion()
version = rt.version
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
assert parse(rt2.version.date) == parse(version.date)
assert rt2.version == version
def test_update_description():
rt = insertion()
old_version = rt.version
old_desc = rt.description
new_desc = "TestDescription2"
rt.description = new_desc
rt.update()
assert rt.description == new_desc
assert rt.version is not None
assert rt.version.id is not None
assert rt.version.date is not None
assert rt.version != old_version
assert rt.version.date != old_version.date
assert parse(rt.version.date) > parse(old_version.date)
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
assert rt2.version.id == rt.version.id
assert rt2.version == rt.version
assert rt2.description == new_desc
rt3 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
assert rt3.version.id == rt.version.id
assert rt3.version == rt.version
assert rt3.description == new_desc
# retrieve old version
rt_old = c.Container().retrieve(query=str(rt.id) +
"@" + old_version.id, sync=False)[0]
assert rt_old.version.id == old_version.id
assert rt_old.description == old_desc
def test_update_parent():
par1 = insertion("TestRTParent1")
par2 = insertion("TestRTParent2")
rt = insertion("TestRTChild")
assert len(rt.get_parents()) == 0
first_version = rt.version
rt.add_parent(par1)
rt.update()
assert len(rt.get_parents()) == 1
assert rt.get_parent("TestRTParent1") is not None
second_version = rt.version
rt.remove_parent(par1)
assert len(rt.get_parents()) == 0
rt.add_parent(par2)
rt.update()
assert len(rt.get_parents()) == 1
assert rt.get_parent("TestRTParent1") is None
assert rt.get_parent("TestRTParent2") is not None
third_version = rt.version
# now retrieve and look again
assert c.execute_query("FIND TestRTParent1", unique=True).id == par1.id
assert len(c.execute_query("FIND TestRTParent2")) == 2
assert c.execute_query("FIND TestRTChild", unique=True).id == rt.id
rt_head = c.Container().retrieve(query=str(rt.id), sync=False)[0]
rt_v1 = c.Container().retrieve(query=str(rt.id) + "@" + first_version.id,
sync=False)[0]
rt_v2 = c.Container().retrieve(query=str(rt.id) + "@" + second_version.id,
sync=False)[0]
rt_v3 = c.Container().retrieve(query=str(rt.id) + "@" + third_version.id,
sync=False)[0]
assert rt_head.version == third_version
assert rt_v1.version.id == first_version.id
assert rt_v2.version.id == second_version.id
assert rt_v3.version.id == third_version.id
assert len(rt_v3.get_parents()) == 1
assert rt_v3.get_parent("TestRTParent1") is None
assert rt_v3.get_parent("TestRTParent2") is not None
assert len(rt_v2.get_parents()) == 1
assert rt_v2.get_parent("TestRTParent1") is not None
assert rt_v2.get_parent("TestRTParent2") is None
assert len(rt_v1.get_parents()) == 0
def test_retrieve_old_version():
rt = insertion()
old_version = rt.version
old_description = rt.description
rt.description = "TestDescription3"
rt.update()
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
assert rt2.description == "TestDescription3"
rt_old = c.Container().retrieve(query=str(rt.id) + "@" + old_version.id,
sync=False)[0]
assert rt_old.id == rt.id
assert rt_old.description == old_description
def test_successor():
rt = insertion()
old_version = rt.version
rt.description = "TestDescription5"
rt.update()
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
rt_old = c.Container().retrieve(query=str(rt.id) + "@" + old_version.id,
sync=False)[0]
assert rt_old.version.successors[0].id == rt2.version.id, (
"old version has successor after retrieval")
def test_predecessor():
rt = insertion()
old_version = rt.version
old_description = rt.description
rt.description = "TestDescription6"
rt.update()
assert rt.version.predecessors[0].id == old_version.id, (
"latest version has predecessor directly after update")
rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
assert rt2.version.predecessors[0].id == old_version.id, (
"latest version has predecessor after retrieval")
def test_retrieve_relative_to_head():
rt = insertion()
first_version = rt.version
# retrieve HEAD
rt_head = c.Container().retrieve(query=str(rt.id) + "@HEAD",
sync=False)[0]
rt_head2 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
rt_head2.version = rt_head.version
assert first_version == rt_head.version, "head is first version"
# no HEAD~1 before first update
with raises(c.EntityDoesNotExistError) as exc:
# no head~2
c.Container().retrieve(query=str(rt.id) + "@HEAD~1", sync=False)
# update
rt.description = "TestDescription4"
rt.update()
new_head_version = rt.version
assert first_version != new_head_version, (
"first version is not head anymore")
assert new_head_version.predecessors[0] == first_version, (
"first version is predessor of head")
# retrieve HEAD (which should have changed after the update)
rt_new_head = c.Container().retrieve(query=str(rt.id) + "@HEAD",
sync=False)[0]
rt_new_head2 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
assert rt_new_head2.version == rt_new_head.version
assert rt_new_head.version == new_head_version, (
"head is version after update")
assert rt_new_head.version.predecessors[0] == first_version, (
"predecessor of head is first version (after update)")
# retrieve HEAD~1 (the pre-update version)
rt_pre_head = c.Container().retrieve(query=str(rt.id) + "@HEAD~1",
sync=False)[0]
assert rt_pre_head.version.id == first_version.id, (
"head~1 is first version (after update)")
assert rt_pre_head.version.successors[0].id == rt_new_head.version.id, (
"successor of head~1 is head")
with raises(c.EntityDoesNotExistError) as exc:
# no head~2
c.Container().retrieve(query=str(rt.id) + "@HEAD~2", sync=False)
@mark.xfail(reason="bug fix needed")
def test_bug_cached_delete():
rt = insertion()
old_version = rt.version.id
rt.description = "UpdatedDesc"
rt.update()
# now id@old_version is cached...
rt2 = c.Container().retrieve(query=str(rt.id) + "@" + old_version,
sync=False)[0]
c.execute_query("FIND RecordType TestRT").delete()
with raises(c.EntityDoesNotExistError) as exc:
c.Container().retrieve(query=str(rt.id) + "@" + old_version,
sync=False)[0]
@mark.xfail(reason=("TODO: What is the desired behavior? "
"Resolve in versioning phase 10"))
def test_delete_property_used_in_old_version():
p = c.Property(name="TestProp1", datatype=c.TEXT).insert()
del_p = c.Property(name="TestProp2", datatype=c.TEXT).insert()
rt = c.RecordType(name="TestRT")
rt.add_property(del_p, "blubblub")
rt.insert()
# can't delete the property used by rt@HEAD
with raises(c.TransactionError) as exc:
del_p.delete()
assert "Entity is required by other entities" in str(exc.value)
# now update rt and remove the property which is to be deleted
rt.remove_property(del_p)
rt.add_property(p, "blablabla")
rt.update()
# retrieve and check old version
old_rt = c.Container().retrieve(str(rt.id) + "@HEAD~1",
sync=False)[0]
assert old_rt.get_property(p) is None
assert old_rt.get_property(del_p).value == "blubblub"
# delete the property use by old_rt
del_p_id = del_p.id
del_p_name = del_p.name
del_p.delete()
# retrieve old version again
old_rt = c.Container().retrieve(str(rt.id) + "@HEAD~1",
sync=False,
raise_exception_on_error=False)[0]
assert old_rt.get_property(p) is None
# the value is still there (and the property has nothing but an id)
assert old_rt.get_property(del_p_name) is None
assert old_rt.get_property(del_p_id).value == "blubblub"
# TODO: What is the desired behavior?
# Currently, the server throws an error.
# Should we make this a warning?
assert "Entity has unqualified properties" in str(old_rt.get_errors()[0])
# fails until resolved!
assert len(old_rt.get_errors()) == 0
@mark.xfail(reason=("TODO: What is the desired behavior? "
"Resolve in versioning phase 10"))
def test_delete_parent_used_in_old_version():
del_rt = c.RecordType(name="TestRT1").insert()
rt = c.RecordType(name="TestRT2").insert()
rec = c.Record(name="TestRec").add_parent(del_rt)
rec.insert()
# can't delete the parent used by rec@HEAD
with raises(c.TransactionError) as exc:
del_rt.delete()
assert "Entity is required by other entities" in str(exc.value)
# update rec and change parent
rec.remove_parent(del_rt)
rec.add_parent(rt)
rec.update()
# retrieve old version
old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
sync=False)[0]
assert old_rec.get_parent(rt) is None
assert old_rec.get_parent(del_rt) is not None
del_rt_id = del_rt.id
del_rt_name = del_rt.name
del_rt.delete()
# retrieve old version, again
old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
sync=False,
raise_exception_on_error=False)[0]
assert old_rec.get_parent(rt) is None
assert old_rec.get_parent(del_rt_id) is not None
# TODO: What is the desired behavior?
# Currently, the server doesn't report anything
# Should we issue a warning?
assert len(old_rec.get_messages()) == 0
# fail until resolved
assert len(old_rec.get_errors()) > 0
assert "Entity has unqualified parents" in str(old_rec.get_errors()[0])
# Another problem is caching. This fails because the parent name is still
# in the cache.
assert old_rec.get_parent(del_rt_name) is None
@mark.xfail(reason="bug fix needed")
def test_bug_cached_parent_name_in_old_version():
del_rt = c.RecordType(name="TestRT1").insert()
rt = c.RecordType(name="TestRT2").insert()
rec = c.Record(name="TestRec").add_parent(del_rt)
rec.insert()
# update rec and change parent
rec.remove_parent(del_rt)
rec.add_parent(rt)
rec.update()
# delete old parent
del_rt_name = del_rt.name
del_rt.delete()
# retrieve old version
old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
sync=False,
raise_exception_on_error=False)[0]
assert old_rec.get_parent(rt) is None
# This fails because the parent name is still in the cache.
# The name should be forgotten.
assert old_rec.get_parent(del_rt_name) is None
def test_reference_deleted_in_old_version():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
p = c.Property(name="TestProp", datatype=c.TEXT).insert()
referenced_rec = c.Record(name="TestRec1").add_parent(ref_rt)
referenced_rec.insert()
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(p, "blablabla")
rec.add_property(ref_rt, referenced_rec)
rec.insert()
old_version = rec.version.id
test_rec = c.execute_query(
"FIND RECORD TestRec2 WHICH REFERENCES {}".format(referenced_rec.id),
unique=True)
assert test_rec.get_property(p).value == "blablabla"
assert test_rec.get_property(ref_rt).value == referenced_rec.id
# deletion of the referenced_rec not possible because rec@HEAD is
# still pointing at it
with raises(c.TransactionError) as exc:
referenced_rec.delete()
assert "Entity is required by other entities" in str(exc.value)
# update rec
rec.remove_property(ref_rt)
rec.update()
with raises(c.EntityDoesNotExistError) as exc:
c.execute_query(
"FIND RECORD TestRec2 WHICH REFERENCES {}".format(
referenced_rec.id),
unique=True)
test_rec = c.execute_query("FIND RECORD WITH TestProp = blablabla",
unique=True)
assert test_rec.get_property(p).value == "blablabla"
assert test_rec.get_property(ref_rt) is None
assert test_rec.version.predecessors[0].id == old_version
# retrieve old version
old_rec = c.Container().retrieve(str(test_rec.id) + "@HEAD~1",
sync=False)[0]
assert old_rec.version.id == old_version
assert old_rec.version.successors[0].id == test_rec.version.id
assert old_rec.get_property(p).value == "blablabla"
assert old_rec.get_property(ref_rt).value == referenced_rec.id
# deletion of the referenced_rec now possible because rec@HEAD is not
# pointing at it anymore
referenced_id = referenced_rec.id
referenced_rec.delete()
# still everything ok
test_rec = c.execute_query("FIND RECORD WITH TestProp = blablabla",
unique=True)
assert test_rec.get_property(p).value == "blablabla"
assert test_rec.get_property(ref_rt) is None
assert test_rec.version.predecessors[0].id == old_version
# retrieve old version again. the reference (to the now deleted entity)
# is still there.
old_rec = c.Container().retrieve(str(test_rec.id) + "@HEAD~1",
sync=False)[0]
assert old_rec.version.id == old_version
assert old_rec.version.successors[0].id == test_rec.version.id
assert old_rec.get_property(p).value == "blablabla"
assert old_rec.get_property(ref_rt).value == referenced_id
with raises(c.EntityDoesNotExistError) as exc:
c.execute_query("FIND ENTITY WITH ID = {}".format(referenced_id),
unique=True)
with raises(c.EntityDoesNotExistError) as exc:
c.Record(id=referenced_id).retrieve()
def test_reference_version_head():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.insert()
version = versioned_rec.version.id
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, str(versioned_rec.id) + "@HEAD")
rec = rec.insert(sync=False)
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=version)
test_rec = c.execute_query(
"FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=version)
# now update versioned_rec
old_head = versioned_rec.version.id
versioned_rec.description = "new desc"
versioned_rec.update()
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=old_head), "after update still old head"
test_rec = c.execute_query(
"FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=old_head), "after query old head"
def test_insert_reference_to_head_in_same_container():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.id = -1
rec = c.Record(name="TestRec2").add_parent(rt)
rec.id = -2
rec.add_property(ref_rt, str(versioned_rec.id) + "@HEAD")
container = c.Container()
container.extend([versioned_rec, rec])
container.insert()
version_id = c.execute_query("FIND Record TestReferencedObject",
unique=True).version.id
test_rec = c.execute_query("FIND Record TestRT WHICH REFERENCES {}".format(versioned_rec.id),
unique=True)
assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=version_id)
def test_update_reference_to_head_minus_one_in_same_container():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.insert()
old_head = versioned_rec.version.id
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, str(versioned_rec.id))
rec.insert()
# now update both
versioned_rec.description = "new description"
assert rec.get_property(ref_rt).value == versioned_rec.id, "ref to entity"
rec.get_property(ref_rt).value = str(versioned_rec.id) + "@HEAD~1"
container = c.Container()
container.extend([versioned_rec, rec])
container.update()
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=versioned_rec.id, ver=old_head), "ref to old_head"
test_rec = c.execute_query("FIND RECORD TestRT WHICH REFERENCES {}".format(versioned_rec.id),
unique=True)
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=versioned_rec.id, ver=old_head), "after query ref to old_head"
def test_update_reference_to_head_minus_one_in_same_container_2():
""" This is identical to the previous one with one exception: The
referenced entity is not being updated during the transaction but it is
included in the update container. This has caused some buggy behavior in
the past.
"""
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(
name="TestRec1",
description="v1").add_parent(ref_rt)
versioned_rec.insert()
old_head = versioned_rec.version.id
# update versioned
versioned_rec.description = "v2"
versioned_rec.update()
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, str(versioned_rec.id))
rec.insert()
# now update only the referencing entity.
assert rec.get_property(ref_rt).value == versioned_rec.id, "ref to entity"
rec.get_property(ref_rt).value = str(versioned_rec.id) + "@HEAD~1"
container = c.Container()
container.extend([versioned_rec, rec])
container.update()
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=versioned_rec.id, ver=old_head), "ref to old_head"
test_rec = c.execute_query("FIND RECORD TestRT WHICH REFERENCES {}".format(versioned_rec.id),
unique=True)
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=versioned_rec.id, ver=old_head), "after query ref to old_head"
def test_reference_version_old():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.insert()
version = versioned_rec.version.id
# now update versioned_rec
old_head = versioned_rec.version.id
versioned_rec.description = "new desc"
versioned_rec.update()
# insert rec which references an old version of versioned_rec
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, str(versioned_rec.id) + "@" + old_head)
rec = rec.insert(sync=False)
assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=old_head)
test_rec = c.execute_query(
"FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
id=str(versioned_rec.id), ver=old_head)
def test_reference_no_version():
ref_rt = insertion("TestReferencedObject")
rt = insertion("TestRT")
versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
versioned_rec.insert()
rec = c.Record(name="TestRec2").add_parent(rt)
rec.add_property(ref_rt, versioned_rec.id)
rec.insert()
assert rec.get_property(ref_rt).value == versioned_rec.id
test_rec = c.execute_query(
"FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
assert test_rec.get_property(ref_rt).value == versioned_rec.id
def test_reference_head_minus_in_separate_container():
ref_rt = insertion("TestRT")
rec1 = c.Record("TestRecord1-firstVersion").add_parent("TestRT")
rec1.description = "This is the first version."
rec1.insert()
v1 = rec1.version.id
rec1.name = "TestRecord1-secondVersion"
rec1.description = "This is the second version."
rec1.update()
v2 = rec1.version.id
rec1.name = "TestRecord1-thirdVersion"
rec1.description = "This is the third version."
rec1.update()
v3 = rec1.version.id
rec2 = c.Record("TestRecord2").add_parent("TestRT")
rec2.description = ("This record has a list of references to several "
"versions of TestRecord1. The first references the "
"record without specifying the version, the other "
"each reference a different version of that record.")
rec2.add_property("TestRT", datatype=c.LIST("TestRT"),
value=[rec1.id,
str(rec1.id) + "@HEAD",
str(rec1.id) + "@HEAD~1",
str(rec1.id) + "@HEAD~2"])
rec2.insert()
test_rec = c.execute_query("FIND TestRecord2", unique=True)
assert test_rec.get_property("TestRT").value == [rec1.id,
str(rec1.id) + "@" + v3,
str(rec1.id) + "@" + v2,
str(rec1.id) + "@" + v1]
def test_properties_no_version():
c.Property("TestProperty", datatype=c.TEXT).insert()
c.RecordType("TestRT").add_property("TestProperty").insert()
rt = c.execute_query("FIND TestRT", unique=True)
p = rt.get_property("TestProperty")
assert p.version is None
def test_update_name():
old_name = "TestRTOldName"
new_name = "TestRTNewName"
rt = insertion(old_name)
old_version = rt.version
assert len(c.execute_query("FIND RecordType {}".format(new_name))) == 0
rt2 = c.execute_query("FIND RecordType {}".format(old_name), unique=True)
assert rt2.version.id == rt.version.id
assert rt2.version == old_version
assert rt2.name == old_name
# do the update, run checks again
rt.name = new_name
rt.update()
assert rt.name == new_name
assert rt.version is not None
assert rt.version.id is not None
assert rt.version.date is not None
assert rt.version != old_version
assert rt.version.date != old_version.date
assert parse(rt.version.date) > parse(old_version.date)
assert len(c.execute_query("FIND RecordType {}".format(old_name))) == 0
rt2 = c.execute_query("FIND RecordType {}".format(new_name), unique=True)
assert rt2.version.id == rt.version.id
assert rt2.version == rt.version
assert rt2.name == new_name
# retrieve once again, via id
rt3 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
assert rt3.version.id == rt.version.id
assert rt3.version == rt.version
assert rt3.name == new_name
# retrieve old version
rt_old = c.Container().retrieve(query=str(rt.id) +
"@" + old_version.id, sync=False)[0]
assert rt_old.version.id == old_version.id
assert rt_old.name == old_name
def test_overridden_datatype():
"""A bug in the mysql backend resultet in a server error when the old
version of an entity was retrieved where the datatype of a property has
been overriden.
Original error in the server logs:
```
org.caosdb.server.database.exceptions.TransactionException: java.sql.SQLSyntaxErrorException: Unknown column 'datatypeID' in 'where clause'
at org.caosdb.server.database.backend.implementation.MySQL.MySQLRetrieveProperties.execute(MySQLRetrieveProperties.java:70)
```
"""
p = c.Property("TestProperty", datatype=c.TEXT).insert()
rt = c.RecordType("TestRT")
rt.add_property("TestProperty", datatype=c.LIST(c.TEXT))
rt.insert()
rt.description = "Updated TestRT"
rt.update()
# retrieve the old version (cache flag must be set to "false")
rt_old = c.Container().retrieve(query=str(rt.id) + "@HEAD~1",
flags={"cache": "false"}, sync=False)
assert rt.get_property("TestProperty").datatype == c.LIST(c.TEXT)
def test_delete_referenced_entity():
ref_rt = c.RecordType("TestReferencedRT").insert()
rt = c.RecordType("TestRT").insert()
ref = c.Record("TestRef1").add_parent(ref_rt).insert()
rec = c.Record("TestRec").add_parent(rt).add_property(ref_rt, ref).insert()
rec.description = "v2"
rec.remove_property(ref_rt)
rec.update()
rec_v1 = c.Container().retrieve(query=str(rec.id) + "@HEAD~1",
flags={"cache": "false"}, sync=False)[0]
assert rec_v1.get_property(ref_rt).value == ref.id
rec_v2 = c.Container().retrieve(query=str(rec.id) + "@HEAD",
flags={"cache": "false"}, sync=False)[0]
assert rec_v2.get_property(ref_rt) is None
ref.delete()
rec_v1 = c.Container().retrieve(query=str(rec.id) + "@HEAD~1",
flags={"cache": "false"}, sync=False)[0]
assert rec_v1.get_property(ref_rt).value is None
assert rec_v1.get_property(ref_rt).get_warnings()[
0].description == "The referenced entity has been deleted in the mean time and is not longer available."
rec_v2 = c.Container().retrieve(query=str(rec.id) + "@HEAD",
flags={"cache": "false"}, sync=False)[0]
assert rec_v2.get_property(ref_rt) is None