Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_tickets_200.py 15.95 KiB
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""Created on 19.12.2015.
"""
from __future__ import print_function, unicode_literals

from nose.tools import (assert_equal, assert_is_none, assert_is_not_none,
                        assert_true)
from pytest import raises

import caosdb as h
from caosdb.common.models import Property, RecordType


def setup_module():
    try:
        h.execute_query("FIND Simple*").delete()
    except Exception as e:
        print(e)


def teardown_module():
    setup_module()


def test_ticket_208():

    try:
        c = h.Container()
        rt1 = h.RecordType(name="SimpleRecordType")
        c.append(rt1)

        for i in range(10):
            rec = h.Record(name="SimpleRecord" + str(i)).add_parent(rt1)
            c.append(rec)

        c.insert()
        assert_true(c.is_valid())

        q = h.Query("FIND SimpleRecordType")
        rs = q.execute()
        assert_true(rs.is_valid())
        assert_equal(len(rs), 11)

        q = h.Query("FIND Record SimpleRecordType")
        rs = q.execute()
        assert_true(rs.is_valid())
        assert_equal(len(rs), 10)

        q.putFlag("P", "0L3")
        rs = q.execute()
        assert_true(rs.is_valid())
        assert_equal(len(rs), 3)

        for i in range(3):
            assert_is_not_none(rs.get_entity_by_name("SimpleRecord" + str(i)))

        q.putFlag("P", "5L3")
        rs = q.execute()
        assert_true(rs.is_valid())
        assert_equal(len(rs), 3)

        for i in range(5, 8):
            assert_is_not_none(rs.get_entity_by_name("SimpleRecord" + str(i)))

    finally:
        try:
            c.delete()
        except BaseException:
            pass


def test_ticket_203():
    try:

        rt1 = h.RecordType(name="SimpleRecordType").add_property(
            name="File", datatype=h.REFERENCE).insert()
        assert_true(rt1.is_valid())

    finally:
        try:
            rt1.delete()
        except BaseException:
            pass


def test_ticket_202():
    try:

        p = h.Property(name="bla", datatype=h.TEXT).insert()
        assert_true(p.is_valid())

        rt1 = h.RecordType(name="SimpleRecordType").add_property(
            p, importance=h.OBLIGATORY).insert()
        assert_true(rt1.is_valid())

        rt2 = h.RecordType(name="SimpleRecordType2").add_parent(
            rt1, inheritance=h.OBLIGATORY).insert()
        assert_true(rt2.is_valid())
        rt2.add_property(id=p.id, name="bla2").update()
        assert_true(rt2.is_valid())
        assert_is_not_none(rt2.get_property("bla"))
        assert_is_not_none(rt2.get_property("bla2"))

    finally:
        try:
            rt2.delete()
        except BaseException:
            pass
        try:
            rt1.delete()
        except BaseException:
            pass
        try:
            p.delete()
        except BaseException:
            pass


def test_ticket_201():
    try:

        p1 = h.Property(name="FirstName", datatype=h.TEXT).insert()
        p2 = h.Property(name="LastName", datatype=h.TEXT).insert()
        assert_true(p1.is_valid())
        assert_true(p2.is_valid())

        rt1 = h.RecordType(name="SimpleRecordType").add_property(
            p1, importance=h.OBLIGATORY).add_property(
                p2, importance=h.OBLIGATORY).insert()
        assert_true(rt1.is_valid())

        rt2 = h.RecordType(name="SimpleRecordType2").add_parent(
            rt1, inheritance=h.OBLIGATORY).add_property(
                p2, importance=h.OBLIGATORY).insert()
        assert_true(rt2.is_valid())
        assert_equal(len(rt2.get_properties()), 2)
        assert_is_not_none(rt2.get_property("FirstName"))
        assert_is_not_none(rt2.get_property("LastName"))

    finally:
        try:
            rt2.delete()
        except BaseException:
            pass
        try:
            rt1.delete()
        except BaseException:
            pass
        try:
            p1.delete()
        except BaseException:
            pass
        try:
            p2.delete()
        except BaseException:
            pass


def test_ticket_204():

    RT1 = h.RecordType(name="one")
    RT2 = h.RecordType(name="two").add_property(RT1, name="one_and_only")
    assert_equal(
        RT2.__str__(),
        '<RecordType name="two">\n  <Property name="one_and_only" datatype="one" importance="RECOMMENDED" flag="inheritance:FIX"/>\n</RecordType>\n')
    assert_equal(RT2.get_properties()[0].__str__(),
                 '<Property name="one_and_only" datatype="one"/>\n')
    assert_equal(RT2.get_properties()[0].datatype.__str__(),
                 '<RecordType name="one"/>\n')


def test_ticket_232():

    uri = []

    for i in range(1000):
        uri.append("longname" + str(i))

    # with raises(h.HTTPURITooLongError) as exc_info:
    #    h.get_connection().retrieve(uri)

    c = h.Container().extend(uri).retrieve(raise_exception_on_error=False)
    assert_equal(len(c), 1000)


def test_ticket_216():
    """Tests creation and printing of a reference Property.
    """
    try:
        RT = RecordType(name="RT")
        p = Property(name="refProperty", datatype=RT)
        assert_equal(str(p), '<Property name="refProperty" datatype="RT"/>\n')

        RT = RecordType()
        p = Property(name="refProperty", datatype=RT)
        assert_is_not_none(str(p))
    finally:
        pass


def test_ticket_221():
    """Tries to create reference properties to RecordTypes without ID.
    """
    RT1 = h.RecordType(name="TestRT1")
    RT2 = h.RecordType(name="TestRT2")
    RT2.add_property(
        property=h.Property(name="test_property_with_RT1", datatype=RT1))

    assert_equal(
        repr(RT2),
        '<RecordType name="TestRT2">\n  <Property name="test_property_with_RT1" datatype="TestRT1" importance="RECOMMENDED" flag="inheritance:FIX"/>\n</RecordType>\n'
    )


def test_ticket_237():

    f1 = h.File(
        name="name1",
        path="path1",
        pickup="pickup1",
        file="file1",
        thumbnail="thumbnail1")
    assert_equal(f1.name, "name1")
    assert_equal(f1.path, "path1")
    assert_equal(f1.pickup, "pickup1")
    assert_equal(f1.file, "file1")
    assert_equal(f1.thumbnail, "thumbnail1")

    f2 = h.File(name="name2")
    assert_equal(f2.name, "name2")
    assert_is_none(f2.path)
    assert_is_none(f2.pickup)
    assert_is_none(f2.file)
    assert_is_none(f2.thumbnail)

    f2._wrap(f1)

    assert_equal(f2.name, "name2")
    assert_equal(f2.path, "path1")
    assert_equal(f2.pickup, "pickup1")
    assert_equal(f2.file, "file1")
    assert_equal(f2.thumbnail, "thumbnail1")

    f2.path = "path2"
    f2.pickup = "pickup2"
    f2.file = "file2"
    f2.thumbnail = "thumbnail2"

    assert_equal(f2.name, "name2")
    assert_equal(f2.path, "path2")
    assert_equal(f2.pickup, "pickup2")
    assert_equal(f2.file, "file2")
    assert_equal(f2.thumbnail, "thumbnail2")

    assert_equal(f1.name, "name1")
    assert_equal(f1.path, "path1")
    assert_equal(f1.pickup, "pickup1")
    assert_equal(f1.file, "file1")
    assert_equal(f1.thumbnail, "thumbnail1")


def test_ticket_238():
    c = h.Container()

    p1 = h.Property(name="bla")
    p2 = h.Property(id=-1)
    p3 = h.Property()
    c.append(p1)
    assert_equal(len(c), 1)

    c.remove(p1)
    assert_equal(len(c), 0)

    c.extend([p1, p2, p3])
    assert_equal(len(c), 3)

    c.remove(p3)
    assert_equal(len(c), 2)
    assert_is_not_none(c.get_entity_by_id(-1))
    assert_is_not_none(c.get_entity_by_name("bla"))


def test_ticket_239():
    try:
        h.configure_connection()
        p = h.Property(
            name="SimpleReferenceProperty", datatype=h.REFERENCE).insert()
        assert_true(p.is_valid())

        rt = h.RecordType(name="SimpleRecordType").add_property(p).insert()
        assert_true(rt.is_valid())

        rec1 = h.Record(name="SimpleRecord1").add_parent(rt)

        rec2 = h.Record(name="SimpleRecord2").add_parent(rt).add_property(
            p, value="SimpleRecord1")

        c = h.Container().extend([rec1, rec2]).insert()
        assert_true(c.is_valid())
        assert_true(rec1.is_valid())
        assert_true(rec2.is_valid())

        rec4 = h.Record(name="SimpleRecord4").add_parent(rt).add_property(
            p, value="SimpleRecord3")

        c = h.Container().extend([rec4])
        with raises(h.TransactionError) as te:
            c.insert()
        assert te.value.has_error(h.UnqualifiedPropertiesError)
        assert c[0].get_property("SimpleReferenceProperty").get_errors()[
            0].description == "Referenced entity does not exist."

        rec3 = h.Record(name="SimpleRecord3").add_parent(rt)
        c = h.Container().extend([rec3, rec4]).insert()
        assert_true(c.is_valid())
        assert_true(rec4.is_valid())
        assert_true(rec3.is_valid())

        rec5 = h.Record(name="SimpleRecord5").add_parent(rt).add_property(
            p, value="SimpleRecord3").insert()
        assert_true(rec5.is_valid())
        assert_equal(
            int(rec5.get_property("SimpleReferenceProperty").value), rec3.id)

    finally:
        try:
            rec5.delete()
        except BaseException:
            pass
        try:
            rec4.delete()
        except BaseException:
            pass
        try:
            rec3.delete()
        except BaseException:
            pass
        try:
            rec2.delete()
        except BaseException:
            pass
        try:
            rec1.delete()
        except BaseException:
            pass
        try:
            rt.delete()
        except BaseException:
            pass
        try:
            p.delete()
        except BaseException:
            pass


def test_ticket_240():
    try:
        #         try:
        #             h.execute_query("FIND Simple*").delete()
        #         except:
        #             pass
        p = h.Property(name="SimpleProperty", datatype=h.DOUBLE).insert()
        assert_true(p.is_valid())

        rt = h.RecordType(name="SimpleRecordType").add_property(p).insert()
        assert_true(rt.is_valid())

        rec1 = h.Record(name="SimpleRecord1").add_parent(rt).insert()

        rec2 = h.Record(name="SimpleRecord2").add_parent(rt).add_property(
            p, value=2.0).insert()

        rec3 = h.Record(name="SimpleRecord3").add_parent(rt).add_property(
            p, value=1.0).insert()

        c = h.execute_query(
            "FIND Record SimpleRecordType WHICH HAS A SimpleProperty='2.0'",
            unique=True)
        assert_equal(c.id, rec2.id)

        c = h.execute_query(
            "FIND Record SimpleRecordType WHICH HAS A SimpleProperty='1.0'",
            unique=True)
        assert_equal(c.id, rec3.id)

        c = h.execute_query(
            "FIND Record SimpleRecordType WHICH HAS A SimpleProperty!='1.0'",
            unique=True)
        assert_equal(c.id, rec2.id)

        c = h.execute_query(
            "FIND Record SimpleRecordType WHICH HAS A SimpleProperty!='2.0'",
            unique=True)
        assert_equal(c.id, rec3.id)

        c = h.execute_query(
            "FIND Record SimpleRecordType WHICH DOESN'T HAVE A SimpleProperty='1.0'"
        )
        assert_equal(len(c), 2)
        assert_is_not_none(c.get_entity_by_id(rec1.id))
        assert_is_not_none(c.get_entity_by_id(rec2.id))

    finally:
        try:
            rec3.delete()
        except BaseException:
            pass
        try:
            rec2.delete()
        except BaseException:
            pass
        try:
            rec1.delete()
        except BaseException:
            pass
        try:
            rt.delete()
        except BaseException:
            pass
        try:
            p.delete()
        except BaseException:
            pass


def test_ticket_247():
    try:

        p = h.Property(
            name="SimpleProperty", datatype=h.DOUBLE, unit="1").insert()
        assert_true(p.is_valid())

    finally:
        try:
            p.delete()
        except BaseException:
            pass


def test_ticket_243():
    h.execute_query("FIND RECORD WHICH HAS some-property = some-value")


def test_ticket_242():
    h.execute_query("FIND RECORD WHICH HAS been created by some.user")


def test_ticket_256():
    try:
        upload_file = open("test.dat", "w")
        upload_file.write("hello world\n")
        upload_file.close()
        file_ = h.File(
            name="Testfidb.dble",
            description="Testfile Desc",
            path="testfiles/testfilewithcolon:initsname.dat",
            file="test.dat")
        file_.insert()
        assert_true(file_.is_valid())

        ret = h.execute_query(
            "FIND FILE WHICH IS STORED AT testfiles/testfilewithcolon:initsname.dat",
            unique=True)
        assert_equal(ret.id, file_.id)

        ret = h.execute_query(
            "FIND FILE WHICH IS STORED AT *colon:initsname.dat", unique=True)
        assert_equal(ret.id, file_.id)
    finally:
        try:
            import os
            os.remove("test.dat")
        except BaseException:
            pass
        try:
            file_.delete()
        except BaseException:
            pass


def test_ticket_228():
    try:
        p1 = h.Property(name="SimpleProperty1", datatype=h.DOUBLE).insert()
        p2 = h.Property(name="SimpleProperty_2", datatype=h.DOUBLE).insert()
        p3 = h.Property(name="SimpleProperty.3", datatype=h.DOUBLE).insert()
        assert_true(p1.is_valid())
        assert_true(p2.is_valid())
        assert_true(p3.is_valid())

        assert_equal(h.execute_query("FIND *_*", unique=True).id, p2.id)
        assert_equal(
            len(h.execute_query("FIND *.*", raise_exception_on_error=False)),
            0)
        assert_equal(h.execute_query("FIND '*.*'", unique=True).id, p3.id)

        p4 = h.Property(name="^.^", datatype=h.DOUBLE).insert()
        assert_true(p4.is_valid())
        assert_equal(len(h.execute_query("FIND ^.^")), 0)
        assert_equal(h.execute_query("FIND '^.^'", unique=True).id, p4.id)

    finally:
        try:
            p4.delete()
        except BaseException:
            pass
        try:
            p3.delete()
        except BaseException:
            pass
        try:
            p2.delete()
        except BaseException:
            pass
        try:
            p1.delete()
        except BaseException:
            pass


def test_ticket_231():
    try:
        p1 = h.Property(name="Publication", datatype=h.TEXT).insert()

        assert_equal(
            h.execute_query("FIND publication", unique=True).id, p1.id)
        assert_equal(h.Property("publication").retrieve().id, p1.id)
        assert_equal(h.Entity("publication").retrieve().id, p1.id)

    finally:
        try:
            p1.delete()
        except BaseException:
            pass


def test_ticket_241():

    q = h.Query("FIND RECORD WHICH HAS been created by some*")
    q.execute(raise_exception_on_error=False)
    assert_is_not_none(q.messages["Info"])
    assert_equal(
        q.messages["Info"],
        ("Regular expression and like patterns are not implemented for transactor names yet.",
            None))


def test_ticket_233():
    e1 = h.Entity("sdlkavhjawriluvyruksvnkuhndb")
    e2 = h.Entity("ösdlkavhjawriluvyruksvnkuhndb")

    with raises(h.TransactionError) as te:
        e1.retrieve()
    assert te.value.has_error(h.EntityDoesNotExistError)
    with raises(h.TransactionError) as te:
        e2.retrieve()
    assert te.value.has_error(h.EntityDoesNotExistError)