Skip to content
Snippets Groups Projects
Select Git revision
  • deaa3317df6bf3233090cfe2d6b4a40044417f98
  • main default protected
  • dev protected
  • f-fix-accent-sensitivity
  • f-filesystem-import
  • f-update-acl
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-string-ids
  • f-filesystem-main
  • f-multipart-encoding
  • f-trigger-advanced-user-tools
  • f-real-rename-test-pylibsolo2
  • f-real-rename-test-pylibsolo
  • f-real-rename-test
  • f-linkahead-rename
  • f-reference-record
  • f-xml-serialization
  • f-xfail-server-181
  • linkahead-pylib-v0.18.0
  • linkahead-control-v0.16.0
  • linkahead-pylib-v0.17.0
  • linkahead-mariadbbackend-v8.0.0
  • linkahead-server-v0.13.0
  • caosdb-pylib-v0.15.0
  • caosdb-pylib-v0.14.0
  • caosdb-pylib-v0.13.2
  • caosdb-server-v0.12.1
  • caosdb-pylib-v0.13.1
  • caosdb-pylib-v0.12.0
  • caosdb-server-v0.10.0
  • caosdb-pylib-v0.11.1
  • caosdb-pylib-v0.11.0
  • caosdb-server-v0.9.0
  • caosdb-pylib-v0.10.0
  • caosdb-server-v0.8.1
  • caosdb-pylib-v0.8.0
  • caosdb-server-v0.8.0
  • caosdb-pylib-v0.7.2
41 results

test_version.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_version.py 27.33 KiB
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
    # Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    from pytest import mark, raises
    from dateutil.parser import parse
    import caosdb as c
    
    
    def setup():
        d = c.execute_query("FIND Test*")
        if len(d) > 0:
            d.delete()
    
    
    def teardown():
        setup()
    
    
    def test_version_object():
        from caosdb.common.versioning import Version
    
    
    def insertion(name="TestRT"):
        rt = c.RecordType(name, description="TestDescription1").insert()
        assert rt.version is not None
        assert rt.version.id is not None
        assert rt.version.date is not None
        assert len(rt.version.predecessors) == 0
        assert len(rt.version.successors) == 0
        return rt
    
    
    def test_retrieve():
        rt = insertion()
        version = rt.version
    
        rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
        assert parse(rt2.version.date) == parse(version.date)
        assert rt2.version == version
    
    
    def test_update_description():
        rt = insertion()
        old_version = rt.version
        old_desc = rt.description
        new_desc = "TestDescription2"
        rt.description = new_desc
        rt.update()
        assert rt.description == new_desc
        assert rt.version is not None
        assert rt.version.id is not None
        assert rt.version.date is not None
        assert rt.version != old_version
        assert rt.version.date != old_version.date
        assert parse(rt.version.date) > parse(old_version.date)
    
        rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
        assert rt2.version.id == rt.version.id
        assert rt2.version == rt.version
        assert rt2.description == new_desc
    
        rt3 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
        assert rt3.version.id == rt.version.id
        assert rt3.version == rt.version
        assert rt3.description == new_desc
    
        # retrieve old version
        rt_old = c.Container().retrieve(query=str(rt.id) +
                                        "@" + old_version.id, sync=False)[0]
        assert rt_old.version.id == old_version.id
        assert rt_old.description == old_desc
    
    
    def test_update_parent():
        par1 = insertion("TestRTParent1")
        par2 = insertion("TestRTParent2")
        rt = insertion("TestRTChild")
    
        assert len(rt.get_parents()) == 0
        first_version = rt.version
        rt.add_parent(par1)
        rt.update()
    
        assert len(rt.get_parents()) == 1
        assert rt.get_parent("TestRTParent1") is not None
        second_version = rt.version
    
        rt.remove_parent(par1)
        assert len(rt.get_parents()) == 0
        rt.add_parent(par2)
        rt.update()
        assert len(rt.get_parents()) == 1
        assert rt.get_parent("TestRTParent1") is None
        assert rt.get_parent("TestRTParent2") is not None
        third_version = rt.version
    
        # now retrieve and look again
        assert c.execute_query("FIND TestRTParent1", unique=True).id == par1.id
        assert len(c.execute_query("FIND TestRTParent2")) == 2
        assert c.execute_query("FIND TestRTChild", unique=True).id == rt.id
    
        rt_head = c.Container().retrieve(query=str(rt.id), sync=False)[0]
        rt_v1 = c.Container().retrieve(query=str(rt.id) + "@" + first_version.id,
                                       sync=False)[0]
        rt_v2 = c.Container().retrieve(query=str(rt.id) + "@" + second_version.id,
                                       sync=False)[0]
        rt_v3 = c.Container().retrieve(query=str(rt.id) + "@" + third_version.id,
                                       sync=False)[0]
    
        assert rt_head.version == third_version
        assert rt_v1.version.id == first_version.id
        assert rt_v2.version.id == second_version.id
        assert rt_v3.version.id == third_version.id
    
        assert len(rt_v3.get_parents()) == 1
        assert rt_v3.get_parent("TestRTParent1") is None
        assert rt_v3.get_parent("TestRTParent2") is not None
    
        assert len(rt_v2.get_parents()) == 1
        assert rt_v2.get_parent("TestRTParent1") is not None
        assert rt_v2.get_parent("TestRTParent2") is None
    
        assert len(rt_v1.get_parents()) == 0
    
    
    def test_retrieve_old_version():
        rt = insertion()
        old_version = rt.version
        old_description = rt.description
    
        rt.description = "TestDescription3"
        rt.update()
    
        rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
        assert rt2.description == "TestDescription3"
    
        rt_old = c.Container().retrieve(query=str(rt.id) + "@" + old_version.id,
                                        sync=False)[0]
        assert rt_old.id == rt.id
        assert rt_old.description == old_description
    
    
    def test_successor():
        rt = insertion()
        old_version = rt.version
    
        rt.description = "TestDescription5"
        rt.update()
    
        rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
    
        rt_old = c.Container().retrieve(query=str(rt.id) + "@" + old_version.id,
                                        sync=False)[0]
    
        assert rt_old.version.successors[0].id == rt2.version.id, (
            "old version has successor after retrieval")
    
    
    def test_predecessor():
        rt = insertion()
        old_version = rt.version
        old_description = rt.description
    
        rt.description = "TestDescription6"
        rt.update()
    
        assert rt.version.predecessors[0].id == old_version.id, (
            "latest version has predecessor directly after update")
    
        rt2 = c.execute_query("FIND RecordType TestRT", unique=True)
    
        assert rt2.version.predecessors[0].id == old_version.id, (
            "latest version has predecessor after retrieval")
    
    
    def test_retrieve_relative_to_head():
        rt = insertion()
        first_version = rt.version
    
        # retrieve HEAD
        rt_head = c.Container().retrieve(query=str(rt.id) + "@HEAD",
                                         sync=False)[0]
        rt_head2 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
        rt_head2.version = rt_head.version
        assert first_version == rt_head.version, "head is first version"
    
        # no HEAD~1 before first update
        with raises(c.EntityDoesNotExistError) as exc:
            # no head~2
            c.Container().retrieve(query=str(rt.id) + "@HEAD~1", sync=False)
    
        # update
        rt.description = "TestDescription4"
        rt.update()
    
        new_head_version = rt.version
        assert first_version != new_head_version, (
            "first version is not head anymore")
        assert new_head_version.predecessors[0] == first_version, (
            "first version is predessor of head")
    
        # retrieve HEAD (which should have changed after the update)
        rt_new_head = c.Container().retrieve(query=str(rt.id) + "@HEAD",
                                             sync=False)[0]
        rt_new_head2 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
        assert rt_new_head2.version == rt_new_head.version
        assert rt_new_head.version == new_head_version, (
            "head is version after update")
        assert rt_new_head.version.predecessors[0] == first_version, (
            "predecessor of head is first version (after update)")
    
        # retrieve HEAD~1 (the pre-update version)
        rt_pre_head = c.Container().retrieve(query=str(rt.id) + "@HEAD~1",
                                             sync=False)[0]
        assert rt_pre_head.version.id == first_version.id, (
            "head~1 is first version (after update)")
        assert rt_pre_head.version.successors[0].id == rt_new_head.version.id, (
            "successor of head~1 is head")
    
        with raises(c.EntityDoesNotExistError) as exc:
            # no head~2
            c.Container().retrieve(query=str(rt.id) + "@HEAD~2", sync=False)
    
    
    @mark.xfail(reason="bug fix needed")
    def test_bug_cached_delete():
        rt = insertion()
        old_version = rt.version.id
    
        rt.description = "UpdatedDesc"
        rt.update()
    
        # now id@old_version is cached...
        rt2 = c.Container().retrieve(query=str(rt.id) + "@" + old_version,
                                     sync=False)[0]
    
        c.execute_query("FIND RecordType TestRT").delete()
    
        with raises(c.EntityDoesNotExistError) as exc:
            c.Container().retrieve(query=str(rt.id) + "@" + old_version,
                                   sync=False)[0]
    
    
    @mark.xfail(reason=("TODO: What is the desired behavior? "
                        "Resolve in versioning phase 10"))
    def test_delete_property_used_in_old_version():
        p = c.Property(name="TestProp1", datatype=c.TEXT).insert()
        del_p = c.Property(name="TestProp2", datatype=c.TEXT).insert()
        rt = c.RecordType(name="TestRT")
        rt.add_property(del_p, "blubblub")
        rt.insert()
    
        # can't delete the property used by rt@HEAD
        with raises(c.TransactionError) as exc:
            del_p.delete()
        assert "Entity is required by other entities" in str(exc.value)
    
        # now update rt and remove the property which is to be deleted
        rt.remove_property(del_p)
        rt.add_property(p, "blablabla")
        rt.update()
    
        # retrieve and check old version
        old_rt = c.Container().retrieve(str(rt.id) + "@HEAD~1",
                                        sync=False)[0]
        assert old_rt.get_property(p) is None
        assert old_rt.get_property(del_p).value == "blubblub"
    
        # delete the property use by old_rt
        del_p_id = del_p.id
        del_p_name = del_p.name
        del_p.delete()
    
        # retrieve old version again
        old_rt = c.Container().retrieve(str(rt.id) + "@HEAD~1",
                                        sync=False,
                                        raise_exception_on_error=False)[0]
        assert old_rt.get_property(p) is None
    
        # the value is still there (and the property has nothing but an id)
        assert old_rt.get_property(del_p_name) is None
        assert old_rt.get_property(del_p_id).value == "blubblub"
    
        # TODO: What is the desired behavior?
        # Currently, the server throws an error.
        # Should we make this a warning?
        assert "Entity has unqualified properties" in str(old_rt.get_errors()[0])
    
        # fails until resolved!
        assert len(old_rt.get_errors()) == 0
    
    
    @mark.xfail(reason=("TODO: What is the desired behavior? "
                        "Resolve in versioning phase 10"))
    def test_delete_parent_used_in_old_version():
        del_rt = c.RecordType(name="TestRT1").insert()
        rt = c.RecordType(name="TestRT2").insert()
    
        rec = c.Record(name="TestRec").add_parent(del_rt)
        rec.insert()
    
        # can't delete the parent used by rec@HEAD
        with raises(c.TransactionError) as exc:
            del_rt.delete()
        assert "Entity is required by other entities" in str(exc.value)
    
        # update rec and change parent
        rec.remove_parent(del_rt)
        rec.add_parent(rt)
        rec.update()
    
        # retrieve old version
        old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
                                         sync=False)[0]
        assert old_rec.get_parent(rt) is None
        assert old_rec.get_parent(del_rt) is not None
    
        del_rt_id = del_rt.id
        del_rt_name = del_rt.name
        del_rt.delete()
    
        # retrieve old version, again
        old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
                                         sync=False,
                                         raise_exception_on_error=False)[0]
    
        assert old_rec.get_parent(rt) is None
        assert old_rec.get_parent(del_rt_id) is not None
    
        # TODO: What is the desired behavior?
        # Currently, the server doesn't report anything
        # Should we issue a warning?
        assert len(old_rec.get_messages()) == 0
    
        # fail until resolved
        assert len(old_rec.get_errors()) > 0
        assert "Entity has unqualified parents" in str(old_rec.get_errors()[0])
    
        # Another problem is caching. This fails because the parent name is still
        # in the cache.
        assert old_rec.get_parent(del_rt_name) is None
    
    
    @mark.xfail(reason="bug fix needed")
    def test_bug_cached_parent_name_in_old_version():
        del_rt = c.RecordType(name="TestRT1").insert()
        rt = c.RecordType(name="TestRT2").insert()
    
        rec = c.Record(name="TestRec").add_parent(del_rt)
        rec.insert()
    
        # update rec and change parent
        rec.remove_parent(del_rt)
        rec.add_parent(rt)
        rec.update()
    
        # delete old parent
        del_rt_name = del_rt.name
        del_rt.delete()
    
        # retrieve old version
        old_rec = c.Container().retrieve(str(rec.id) + "@HEAD~1",
                                         sync=False,
                                         raise_exception_on_error=False)[0]
    
        assert old_rec.get_parent(rt) is None
    
        # This fails because the parent name is still in the cache.
        # The name should be forgotten.
        assert old_rec.get_parent(del_rt_name) is None
    
    
    def test_reference_deleted_in_old_version():
        ref_rt = insertion("TestReferencedObject")
        rt = insertion("TestRT")
        p = c.Property(name="TestProp", datatype=c.TEXT).insert()
    
        referenced_rec = c.Record(name="TestRec1").add_parent(ref_rt)
        referenced_rec.insert()
    
        rec = c.Record(name="TestRec2").add_parent(rt)
        rec.add_property(p, "blablabla")
        rec.add_property(ref_rt, referenced_rec)
        rec.insert()
        old_version = rec.version.id
    
        test_rec = c.execute_query(
            "FIND RECORD TestRec2 WHICH REFERENCES {}".format(referenced_rec.id),
            unique=True)
        assert test_rec.get_property(p).value == "blablabla"
        assert test_rec.get_property(ref_rt).value == referenced_rec.id
    
        # deletion of the referenced_rec not possible because rec@HEAD is
        # still pointing at it
        with raises(c.TransactionError) as exc:
            referenced_rec.delete()
        assert "Entity is required by other entities" in str(exc.value)
    
        # update rec
        rec.remove_property(ref_rt)
        rec.update()
    
        with raises(c.EntityDoesNotExistError) as exc:
            c.execute_query(
                "FIND RECORD TestRec2 WHICH REFERENCES {}".format(
                    referenced_rec.id),
                unique=True)
    
        test_rec = c.execute_query("FIND RECORD WITH TestProp = blablabla",
                                   unique=True)
        assert test_rec.get_property(p).value == "blablabla"
        assert test_rec.get_property(ref_rt) is None
        assert test_rec.version.predecessors[0].id == old_version
    
        # retrieve old version
        old_rec = c.Container().retrieve(str(test_rec.id) + "@HEAD~1",
                                         sync=False)[0]
        assert old_rec.version.id == old_version
        assert old_rec.version.successors[0].id == test_rec.version.id
        assert old_rec.get_property(p).value == "blablabla"
        assert old_rec.get_property(ref_rt).value == referenced_rec.id
    
        # deletion of the referenced_rec now possible because rec@HEAD is not
        # pointing at it anymore
        referenced_id = referenced_rec.id
        referenced_rec.delete()
    
        # still everything ok
        test_rec = c.execute_query("FIND RECORD WITH TestProp = blablabla",
                                   unique=True)
        assert test_rec.get_property(p).value == "blablabla"
        assert test_rec.get_property(ref_rt) is None
        assert test_rec.version.predecessors[0].id == old_version
    
        # retrieve old version again. the reference (to the now deleted entity)
        # is still there.
        old_rec = c.Container().retrieve(str(test_rec.id) + "@HEAD~1",
                                         sync=False)[0]
        assert old_rec.version.id == old_version
        assert old_rec.version.successors[0].id == test_rec.version.id
        assert old_rec.get_property(p).value == "blablabla"
        assert old_rec.get_property(ref_rt).value == referenced_id
    
        with raises(c.EntityDoesNotExistError) as exc:
            c.execute_query("FIND ENTITY WITH ID = {}".format(referenced_id),
                            unique=True)
    
        with raises(c.EntityDoesNotExistError) as exc:
            c.Record(id=referenced_id).retrieve()
    
    
    def test_reference_version_head():
        ref_rt = insertion("TestReferencedObject")
        rt = insertion("TestRT")
    
        versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
        versioned_rec.insert()
        version = versioned_rec.version.id
        rec = c.Record(name="TestRec2").add_parent(rt)
        rec.add_property(ref_rt, str(versioned_rec.id) + "@HEAD")
        rec = rec.insert(sync=False)
        assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=str(versioned_rec.id), ver=version)
    
        test_rec = c.execute_query(
            "FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
        assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=str(versioned_rec.id), ver=version)
    
        # now update versioned_rec
        old_head = versioned_rec.version.id
        versioned_rec.description = "new desc"
        versioned_rec.update()
    
        assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=str(versioned_rec.id), ver=old_head), "after update still old head"
    
        test_rec = c.execute_query(
            "FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
        assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=str(versioned_rec.id), ver=old_head), "after query old head"
    
    
    def test_insert_reference_to_head_in_same_container():
        ref_rt = insertion("TestReferencedObject")
        rt = insertion("TestRT")
    
        versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
        versioned_rec.id = -1
        rec = c.Record(name="TestRec2").add_parent(rt)
        rec.id = -2
        rec.add_property(ref_rt, str(versioned_rec.id) + "@HEAD")
        container = c.Container()
        container.extend([versioned_rec, rec])
        container.insert()
    
        version_id = c.execute_query("FIND Record TestReferencedObject",
                                     unique=True).version.id
        test_rec = c.execute_query("FIND Record TestRT WHICH REFERENCES {}".format(versioned_rec.id),
                                   unique=True)
        assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=str(versioned_rec.id), ver=version_id)
    
    
    def test_update_reference_to_head_minus_one_in_same_container():
        ref_rt = insertion("TestReferencedObject")
        rt = insertion("TestRT")
    
        versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
        versioned_rec.insert()
        old_head = versioned_rec.version.id
    
        rec = c.Record(name="TestRec2").add_parent(rt)
        rec.add_property(ref_rt, str(versioned_rec.id))
        rec.insert()
    
        # now update both
        versioned_rec.description = "new description"
    
        assert rec.get_property(ref_rt).value == versioned_rec.id, "ref to entity"
        rec.get_property(ref_rt).value = str(versioned_rec.id) + "@HEAD~1"
    
        container = c.Container()
        container.extend([versioned_rec, rec])
        container.update()
        assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=versioned_rec.id, ver=old_head), "ref to old_head"
    
        test_rec = c.execute_query("FIND RECORD TestRT WHICH REFERENCES {}".format(versioned_rec.id),
                                   unique=True)
        assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=versioned_rec.id, ver=old_head), "after query ref to old_head"
    
    
    def test_update_reference_to_head_minus_one_in_same_container_2():
        """ This is identical to the previous one with one exception: The
        referenced entity is not being updated during the transaction but it is
        included in the update container. This has caused some buggy behavior in
        the past.
        """
        ref_rt = insertion("TestReferencedObject")
        rt = insertion("TestRT")
    
        versioned_rec = c.Record(
            name="TestRec1",
            description="v1").add_parent(ref_rt)
        versioned_rec.insert()
        old_head = versioned_rec.version.id
    
        # update versioned
        versioned_rec.description = "v2"
        versioned_rec.update()
    
        rec = c.Record(name="TestRec2").add_parent(rt)
        rec.add_property(ref_rt, str(versioned_rec.id))
        rec.insert()
    
        # now update only the referencing entity.
        assert rec.get_property(ref_rt).value == versioned_rec.id, "ref to entity"
        rec.get_property(ref_rt).value = str(versioned_rec.id) + "@HEAD~1"
    
        container = c.Container()
        container.extend([versioned_rec, rec])
        container.update()
        assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=versioned_rec.id, ver=old_head), "ref to old_head"
    
        test_rec = c.execute_query("FIND RECORD TestRT WHICH REFERENCES {}".format(versioned_rec.id),
                                   unique=True)
        assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=versioned_rec.id, ver=old_head), "after query ref to old_head"
    
    
    def test_reference_version_old():
        ref_rt = insertion("TestReferencedObject")
        rt = insertion("TestRT")
    
        versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
        versioned_rec.insert()
        version = versioned_rec.version.id
    
        # now update versioned_rec
        old_head = versioned_rec.version.id
        versioned_rec.description = "new desc"
        versioned_rec.update()
    
        # insert rec which references an old version of versioned_rec
        rec = c.Record(name="TestRec2").add_parent(rt)
        rec.add_property(ref_rt, str(versioned_rec.id) + "@" + old_head)
        rec = rec.insert(sync=False)
    
        assert rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=str(versioned_rec.id), ver=old_head)
    
        test_rec = c.execute_query(
            "FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
        assert test_rec.get_property(ref_rt).value == "{id}@{ver}".format(
            id=str(versioned_rec.id), ver=old_head)
    
    
    def test_reference_no_version():
        ref_rt = insertion("TestReferencedObject")
        rt = insertion("TestRT")
    
        versioned_rec = c.Record(name="TestRec1").add_parent(ref_rt)
        versioned_rec.insert()
        rec = c.Record(name="TestRec2").add_parent(rt)
        rec.add_property(ref_rt, versioned_rec.id)
        rec.insert()
        assert rec.get_property(ref_rt).value == versioned_rec.id
    
        test_rec = c.execute_query(
            "FIND TestRec2 WHICH HAS A TestReferencedObject", unique=True)
        assert test_rec.get_property(ref_rt).value == versioned_rec.id
    
    
    def test_reference_head_minus_in_separate_container():
        ref_rt = insertion("TestRT")
    
        rec1 = c.Record("TestRecord1-firstVersion").add_parent("TestRT")
        rec1.description = "This is the first version."
        rec1.insert()
        v1 = rec1.version.id
    
        rec1.name = "TestRecord1-secondVersion"
        rec1.description = "This is the second version."
        rec1.update()
        v2 = rec1.version.id
    
        rec1.name = "TestRecord1-thirdVersion"
        rec1.description = "This is the third version."
        rec1.update()
        v3 = rec1.version.id
    
        rec2 = c.Record("TestRecord2").add_parent("TestRT")
        rec2.description = ("This record has a list of references to several "
                            "versions of TestRecord1. The first references the "
                            "record without specifying the version, the other "
                            "each reference a different version of that record.")
        rec2.add_property("TestRT", datatype=c.LIST("TestRT"),
                          value=[rec1.id,
                                 str(rec1.id) + "@HEAD",
                                 str(rec1.id) + "@HEAD~1",
                                 str(rec1.id) + "@HEAD~2"])
        rec2.insert()
    
        test_rec = c.execute_query("FIND TestRecord2", unique=True)
        assert test_rec.get_property("TestRT").value == [rec1.id,
                                                         str(rec1.id) + "@" + v3,
                                                         str(rec1.id) + "@" + v2,
                                                         str(rec1.id) + "@" + v1]
    
    
    def test_properties_no_version():
        c.Property("TestProperty", datatype=c.TEXT).insert()
        c.RecordType("TestRT").add_property("TestProperty").insert()
    
        rt = c.execute_query("FIND TestRT", unique=True)
        p = rt.get_property("TestProperty")
        assert p.version is None
    
    
    def test_update_name():
        old_name = "TestRTOldName"
        new_name = "TestRTNewName"
    
        rt = insertion(old_name)
        old_version = rt.version
    
        assert len(c.execute_query("FIND RecordType {}".format(new_name))) == 0
        rt2 = c.execute_query("FIND RecordType {}".format(old_name), unique=True)
        assert rt2.version.id == rt.version.id
        assert rt2.version == old_version
        assert rt2.name == old_name
    
        # do the update, run checks again
        rt.name = new_name
        rt.update()
    
        assert rt.name == new_name
        assert rt.version is not None
        assert rt.version.id is not None
        assert rt.version.date is not None
        assert rt.version != old_version
        assert rt.version.date != old_version.date
        assert parse(rt.version.date) > parse(old_version.date)
    
        assert len(c.execute_query("FIND RecordType {}".format(old_name))) == 0
        rt2 = c.execute_query("FIND RecordType {}".format(new_name), unique=True)
        assert rt2.version.id == rt.version.id
        assert rt2.version == rt.version
        assert rt2.name == new_name
    
        # retrieve once again, via id
        rt3 = c.Container().retrieve(query=str(rt.id), sync=False)[0]
        assert rt3.version.id == rt.version.id
        assert rt3.version == rt.version
        assert rt3.name == new_name
    
        # retrieve old version
        rt_old = c.Container().retrieve(query=str(rt.id) +
                                        "@" + old_version.id, sync=False)[0]
        assert rt_old.version.id == old_version.id
        assert rt_old.name == old_name
    
    
    def test_overridden_datatype():
        """A bug in the mysql backend resultet in a server error when the old
        version of an entity was retrieved where the datatype of a property has
        been overriden.
    
        Original error in the server logs:
        ```
        org.caosdb.server.database.exceptions.TransactionException: java.sql.SQLSyntaxErrorException: Unknown column 'datatypeID' in 'where clause'
            at org.caosdb.server.database.backend.implementation.MySQL.MySQLRetrieveProperties.execute(MySQLRetrieveProperties.java:70)
        ```
        """
        p = c.Property("TestProperty", datatype=c.TEXT).insert()
        rt = c.RecordType("TestRT")
        rt.add_property("TestProperty", datatype=c.LIST(c.TEXT))
        rt.insert()
    
        rt.description = "Updated TestRT"
        rt.update()
    
        # retrieve the old version (cache flag must be set to "false")
        rt_old = c.Container().retrieve(query=str(rt.id) + "@HEAD~1",
                                        flags={"cache": "false"}, sync=False)
        assert rt.get_property("TestProperty").datatype == c.LIST(c.TEXT)
    
    
    def test_delete_referenced_entity():
        ref_rt = c.RecordType("TestReferencedRT").insert()
        rt = c.RecordType("TestRT").insert()
        ref = c.Record("TestRef1").add_parent(ref_rt).insert()
    
        rec = c.Record("TestRec").add_parent(rt).add_property(ref_rt, ref).insert()
    
        rec.description = "v2"
        rec.remove_property(ref_rt)
        rec.update()
    
        rec_v1 = c.Container().retrieve(query=str(rec.id) + "@HEAD~1",
                                        flags={"cache": "false"}, sync=False)[0]
        assert rec_v1.get_property(ref_rt).value == ref.id
    
        rec_v2 = c.Container().retrieve(query=str(rec.id) + "@HEAD",
                                        flags={"cache": "false"}, sync=False)[0]
        assert rec_v2.get_property(ref_rt) is None
    
        ref.delete()
    
        rec_v1 = c.Container().retrieve(query=str(rec.id) + "@HEAD~1",
                                        flags={"cache": "false"}, sync=False)[0]
        assert rec_v1.get_property(ref_rt).value is None
        assert rec_v1.get_property(ref_rt).get_warnings()[
            0].description == "The referenced entity has been deleted in the mean time and is not longer available."
    
        rec_v2 = c.Container().retrieve(query=str(rec.id) + "@HEAD",
                                        flags={"cache": "false"}, sync=False)[0]
        assert rec_v2.get_property(ref_rt) is None