Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_misc.py 18.71 KiB
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2021-2022 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2019-2022 IndiScale GmbH <indiscale@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Created on 23.07.2015.

@author: tf
"""
import caosdb as db
from caosdb import (Container, Info, Property, Record, RecordType,
                    execute_query, administration as admin)
from caosdb.utils.register_tests import set_test_key, clear_database
from pytest import raises, mark, fixture

set_test_key("_CAOSDB_PYINTTEST_SUITE")


def setup_function(function):
    d = db.execute_query("SELECT id FROM ENTITY")
    if len(d) > 0:
        d.delete()


def teardown_function(function):
    assert len(db.execute_query("FIND Entity with id>99")) == 0


def test_file_system_returns_ids(clear_database):
    upload_file = open("test.dat", "w")
    upload_file.write("hello world\n")
    upload_file.close()
    file_ = db.File(name="TestFileA",
                    description="Testfile Desc",
                    path="testfiles/test.dat",
                    file="test.dat")
    file_.insert()
    c = db.get_connection()
    resp = c.retrieve(
        entity_uri_segments=[
            "FileSystem",
            "testfiles"],
        reconnect=True)
    body = resp.read()
    print(body)
    print(type(body))
    search = "id=\"" + str(file_.id) + "\""
    print(search)
    assert search in str(body)


def test_file_system_returns_correct_url_with_proxy(clear_database):
    c = db.get_connection()
    resp = c.retrieve(
        entity_uri_segments=["FileSystem"],
        headers={"X-Forwarded-Proto": "myscheme"},
        reconnect=True)
    body = resp.read()
    print(body)
    search = 'url="myscheme://'
    assert search in str(body)

    resp = c.retrieve(
        entity_uri_segments=["FileSystem"],
        headers={"Forwarded": "by=byid;for=forid;host=hostaddress;proto=myother"},
        reconnect=True)
    body = resp.read()
    print(body)
    search = 'url="myother://'
    assert search in str(body)


def test_sat_query_with_leading_slash(clear_database):
    upload_file = open("test.dat", "w")
    upload_file.write("hello world\n")
    upload_file.close()
    file_ = db.File(name="TestFileA",
                    description="Testfile Desc",
                    path="testfiles/test.dat",
                    file="test.dat")
    file_.insert()
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED testfiles/test.dat",
        unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED /testfiles/test.dat",
        unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED testfiles/test.*", unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED /testfiles/test.*",
        unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED testfiles/*", unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED /testfiles/*", unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED */test.dat", unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED /*/test.dat", unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED **/test.dat", unique=True).id
    assert file_.id == db.execute_query(
        "FIND FILE WHICH IS STORED /**/test.dat", unique=True).id


def test_name_with_slash(clear_database):
    rt1 = db.RecordType("Test/Name").insert()
    assert rt1.is_valid() is True
    rt2 = db.RecordType("Test/Name").retrieve()
    assert rt1.id == rt2.id


def test_nonsense_flag(clear_database):
    db.execute_query("FIND Test", flags={"ThisIsUtterNonsense": None})


def test_error_no_such_role(clear_database):
    xml = "<Insert><Entity name='test'/></Insert>"
    r = db.get_connection().insert(entity_uri_segment=["Entity"], body=xml)
    c = Container._response_to_entities(r)
    with raises(db.TransactionError) as cm:
        db.raise_errors(c)
    assert (cm.value.errors[0].msg ==
            "There is no such role 'Entity'.")

    xml = "<Insert><ASDF name='test'/></Insert>"
    r = db.get_connection().insert(entity_uri_segment=["Entity"], body=xml)
    c = Container._response_to_entities(r)
    with raises(db.TransactionError) as cm:
        db.raise_errors(c)
    assert (cm.value.errors[0].msg ==
            "There is no such role 'ASDF'.")


def test_parent_duplicate_1(clear_database):
    db.RecordType(name="TestRT1").insert()
    db.Property(name="TestProperty", datatype=db.TEXT).insert()
    rt2 = db.RecordType(
        name="TestRT2").add_parent(
        name="TestRT1").add_parent(
            name="TestRT1").add_property(
                name="TestProperty")
    assert len(rt2.get_parents()) == 2
    rt2.insert()
    assert len(rt2.get_parents()) == 1
    assert len(rt2.get_warnings()) == 1
    assert rt2.get_warnings()[
        0].description == ("This entity had parent duplicates. That is meaningless and only one "
                           "parent has been inserted.")


def test_parent_duplicate_2(clear_database):
    db.RecordType(name="TestRT1").insert()
    rt2 = db.RecordType(
        name="TestRT2").add_parent(
        name="TestRT1",
        inheritance=db.ALL).add_parent(
            name="TestRT1",
        inheritance=db.NONE)
    assert len(rt2.get_parents()) == 2
    with raises(db.TransactionError) as cm:
        rt2.insert()
    assert (cm.value.errors[0].msg ==
            "This entity had parent duplicates. Parent duplicates are meaningless and would be ignored (and inserted only once). But these parents had diverging inheritance instructions which cannot be processed.")


def test_server_error(clear_database):
    con = db.get_connection()
    with raises(db.HTTPServerError) as cm:
        con._http_request(
            method="GET",
            path="Entity?debug=throwNullPointerException")
    assert "SRID = " in cm.value.msg


def test_annotation(clear_database):
    p2 = Property(name="TestUser", datatype="TEXT").insert()
    assert p2.is_valid() is True

    p_comment = Property(
        name="TestComment",
        datatype="TEXT").insert()
    assert p_comment.is_valid() is True

    p_datetime = Property(
        name="TestDatetime",
        datatype="Datetime").insert()
    assert p_datetime.is_valid() is True

    p_entity = Property(
        name="TestEntity",
        datatype="REFERENCE").insert()
    assert p_entity.is_valid() is True

    Property(name="REFERENCE").retrieve()

    rt = RecordType(
        name="TestAnnotation").add_property(
        name="TestDatetime",
        importance="OBLIGATORY").add_property(
        name="TestComment",
        importance="OBLIGATORY").add_property(
            name="TestUser",
            importance="OBLIGATORY").add_property(
                name="TestEntity",
        importance="OBLIGATORY").insert()

    assert rt.is_valid() is True

    rt2 = RecordType(name="TestSimpleRecordType").insert()
    assert rt2.is_valid() is True

    rec = Record(
        name="TestSpecialAnnotion").add_parent(rt).add_property(
        name="TestDatetime",
        value="NOW").add_property(
        name="TestUser",
        value=db.get_config().get(
            "Connection",
            "username")).add_property(
                name="TestEntity",
                value=rt2).add_property(
                    name="TestComment",
        value="Veeeery nice!").insert()

    assert rec.is_valid() is True

    ann = execute_query(
        "FIND TestAnnotation WHICH REFERENCES TestSimpleRecordType AS AN TestEntity",
        unique=True)
    assert ann.is_valid() is True
    assert rec.id == ann.id


def test_info(clear_database):
    assert admin.get_server_property(
        "TRANSACTION_BENCHMARK_ENABLED").lower() == "true", "Please activate the transaction benchmark with the server option TRANSACTION_BENCHMARK_ENABLED=TRUE (Restart of the server needed)"
    i = Info()
    assert (i.messages["Flags"]) is not None
    assert (i.messages["Counts"]) is not None
    assert (i.messages["TransactionBenchmark"]) is not None
    assert '-1' != i.messages["Counts"]["files"]
    assert '-1' != i.messages["Counts"]["records"]
    assert '-1' != i.messages["Counts"]["properties"]
    assert '-1' != i.messages["Counts"]["recordTypes"]
    assert 'true' == i.messages["Counts"]["debug"]
    # Not necessarily 0, all the temporary directories go here as well.
    # assert 0 == i.messages["Counts"]["tmpfiles"]


def test_long_description(clear_database):
    longstr = 'desc_'
    while len(longstr) < 10000:
        longstr += "a"

    rt = RecordType(
        name="TestSimpleRecordTypeWithLongDesc",
        description=longstr).insert()
    assert rt.is_valid() is True
    assert rt.description == longstr

    rt2 = RecordType(name="TestSimpleRecordTypeWithLongDesc").retrieve()
    assert rt2.is_valid() is True
    assert rt2.description == longstr


def test_auto_importance_for_properties(clear_database):
    p = Property(name="TestProperty1", datatype=db.TEXT).insert()
    assert p.is_valid() is True

    p2 = Property(
        name="TestProperty2",
        datatype=db.TEXT).add_property(p).insert()
    assert p2.is_valid() is True


def test_overrides_with_deletion_in_worst_case_order(clear_database):
    p = Property(
        name="TestProperty1",
        description="desc1",
        datatype=db.TEXT).insert()
    assert p.is_valid() is True

    rt1 = RecordType(
        name="TestRT1").add_property(
        id=p.id,
        name="nameOverride1",
        description="descOverride1").insert()
    assert rt1.is_valid() is True

    assert p.id == rt1.get_property("nameOverride1").id
    assert "descOverride1" == rt1.get_property("nameOverride1").description

    # is persistent?
    rt1c = RecordType(name="TestRT1").retrieve()
    assert rt1c.is_valid() is True

    assert p.id == rt1c.get_property("nameOverride1").id
    assert "descOverride1" == rt1c.get_property("nameOverride1").description

    db.Container().extend([p, rt1]).delete()


def test_overrides_with_duplicates(clear_database):
    p = Property(
        name="TestProperty1",
        description="desc1",
        datatype=db.TEXT).insert()
    assert p.is_valid() is True

    rt1 = RecordType(
        name="TestRT1").add_property(
        id=p.id,
        name="nameOverride1",
        description="descOverride1").add_property(
        id=p.id,
        name="nameOverride2",
        description="descOverride2").insert()
    assert rt1.is_valid() is True

    assert p.id == rt1.get_property("nameOverride1").id
    assert p.id == rt1.get_property("nameOverride2").id
    assert "descOverride1" == rt1.get_property("nameOverride1").description
    assert "descOverride2" == rt1.get_property("nameOverride2").description

    # is persistent?
    rt1c = RecordType(name="TestRT1").retrieve()
    assert rt1c.is_valid() is True

    assert p.id == rt1c.get_property("nameOverride1").id
    assert p.id == rt1c.get_property("nameOverride2").id
    assert "descOverride1" == rt1c.get_property("nameOverride1").description
    assert "descOverride2" == rt1c.get_property("nameOverride2").description


def test_overrides_in_subdomains(clear_database):
    p1 = Property(
        name="TestProperty1",
        description="desc1",
        datatype=db.TEXT).insert()
    assert p1.is_valid() is True

    p2 = Property(
        name="TestProperty2",
        description="desc2",
        datatype=db.TEXT).insert()
    assert p2.is_valid() is True

    p3 = Property(
        name="TestProperty3",
        description="desc3",
        datatype=db.TEXT).insert()
    assert p3.is_valid() is True

    pov1 = Property(
        id=p1.id,
        value="pov1",
        name="TestPropertyov1",
        description="desc1ov1")
    pov2 = Property(
        id=p2.id,
        value="pov2",
        name="TestPropertyov2",
        description="desc2ov2")
    pov3 = Property(
        id=p3.id,
        value="pov3",
        name="TestPropertyov3",
        description="desc3ov3")

    pov21 = Property(
        id=p1.id,
        value="pov21",
        name="TestPropertyov21",
        description="desc1ov21")
    pov22 = Property(
        id=p2.id,
        value="pov22",
        name="TestPropertyov22",
        description="desc2ov22")

    pov31 = Property(
        id=p1.id,
        value="pov31",
        name="TestPropertyov31",
        description="desc1ov31")
    pov32 = Property(
        id=p2.id,
        value="pov32",
        name="TestPropertyov32",
        description="desc2ov32")
    pov33 = Property(
        id=p3.id,
        value="pov33",
        name="TestPropertyov33",
        description="desc3ov33")

    pov321 = Property(
        id=p1.id,
        value="pov321",
        name="TestPropertyov321",
        description="desc1ov321")
    pov322 = Property(
        id=p2.id,
        value="pov322",
        name="TestPropertyov322",
        description="desc2ov322")
    pov323 = Property(
        id=p3.id,
        value="pov323",
        name="TestPropertyov323",
        description="desc3ov323")

    pov32.add_property(pov321).add_property(
        pov322).add_property(pov323)
    pov3.add_property(pov31).add_property(pov32).add_property(pov33)
    pov2.add_property(pov21).add_property(pov22)

    rt1 = RecordType(name="TestRT1").add_property(
        pov1).add_property(pov2).add_property(pov3)

    rt1.insert()
    assert rt1.is_valid() is True
    rt1 = db.execute_query(f"FIND ENTITY WITH id={rt1.id}", unique=True)

    assert len(rt1.get_properties()) == 3
    assert rt1.get_property("TestPropertyov1").description == "desc1ov1"
    assert rt1.get_property("TestPropertyov1").value == "pov1"
    assert rt1.get_property("TestPropertyov2").description == "desc2ov2"
    assert rt1.get_property("TestPropertyov2").value == "pov2"
    assert rt1.get_property("TestPropertyov3").description == "desc3ov3"
    assert rt1.get_property("TestPropertyov3").value == "pov3"

    p1o = rt1.get_property("TestPropertyov1")
    assert len(p1o.get_properties()) == 0

    p2o = rt1.get_property("TestPropertyov2")
    assert len(p2o.get_properties()) == 2
    assert p2o.get_property("TestPropertyov21").description == "desc1ov21"
    assert p2o.get_property("TestPropertyov21").value == "pov21"
    assert p2o.get_property("TestPropertyov22").description == "desc2ov22"
    assert p2o.get_property("TestPropertyov22").value == "pov22"
    assert len(p2o.get_property("TestPropertyov21").get_properties()) == 0
    assert len(p2o.get_property("TestPropertyov22").get_properties()) == 0

    p3o = rt1.get_property("TestPropertyov3")
    assert len(p3o.get_properties()) == 3
    assert p3o.get_property("TestPropertyov31").description == "desc1ov31"
    assert p3o.get_property("TestPropertyov31").value == "pov31"
    assert p3o.get_property("TestPropertyov32").description == "desc2ov32"
    assert p3o.get_property("TestPropertyov32").value == "pov32"
    assert p3o.get_property("TestPropertyov33").description == "desc3ov33"
    assert p3o.get_property("TestPropertyov33").value == "pov33"
    assert len(p3o.get_property("TestPropertyov31").get_properties()) == 0
    assert len(p3o.get_property("TestPropertyov32").get_properties()) == 3
    assert len(p3o.get_property("TestPropertyov33").get_properties()) == 0

    p32o = p3o.get_property("TestPropertyov32")
    assert p32o.get_property("TestPropertyov321").description == "desc1ov321"
    assert p32o.get_property("TestPropertyov321").value == "pov321"
    assert p32o.get_property("TestPropertyov322").description == "desc2ov322"
    assert p32o.get_property("TestPropertyov322").value == "pov322"
    assert p32o.get_property("TestPropertyov323").description == "desc3ov323"
    assert p32o.get_property("TestPropertyov323").value == "pov323"
    assert len(p32o.get_property("TestPropertyov321").get_properties()) == 0
    assert len(p32o.get_property("TestPropertyov322").get_properties()) == 0
    assert len(p32o.get_property("TestPropertyov323").get_properties()) == 0


def test_role_after_retrieve(clear_database):
    rt = db.RecordType("TestRT").insert()
    entity = db.Entity(id=rt.id)
    assert entity.role is None
    entity.retrieve()
    assert entity.role == rt.role


def test_retrieve_wrong_role(clear_database):
    rt = db.RecordType("TestRT").insert()
    entity = db.Record(id=rt.id)
    assert entity.role == "Record"
    with raises(ValueError) as cm:
        entity.retrieve()
    assert cm.value.args[0].startswith("The resulting entity had a different role "
                                       "(RecordType) than the local one (Record). "
                                       "This probably means, that the entity was "
                                       "intialized with a wrong class by this client "
                                       "or it has changed in the past and this "
                                       "client did't know about it yet.")


@fixture
def reset_time_zone():
    yield None
    server_time_zone = db.get_config().get("IntegrationTests",
                                           "test_misc.test_time_zone.time_zone")
    if server_time_zone is not None:
        admin.set_server_property("TIMEZONE", server_time_zone)


def test_time_zone(reset_time_zone):
    server_time_zone = db.get_config().get("IntegrationTests",
                                           "test_misc.test_time_zone.time_zone")

    if server_time_zone is not None:
        assert admin.get_server_property(
            "TIMEZONE") == server_time_zone, "If this test fails locally, try to re-run it"

    admin.set_server_property("TIMEZONE", "UTC")
    assert db.Info().time_zone.offset == "+0000"
    admin.set_server_property("TIMEZONE", "Etc/GMT+5")  # Standard Time NY
    gmt_plus_5 = db.Info().time_zone.offset
    assert gmt_plus_5 == "-0500"

    # Today I learned: POSIX defines GMT+X as UTC-X (... no words...)
    admin.set_server_property("TIMEZONE", "EST")
    assert db.Info().time_zone.offset == gmt_plus_5

    admin.set_server_property("TIMEZONE", "Etc/GMT-1")  # Standard Time Berlin
    assert db.Info().time_zone.offset == "+0100"
    admin.set_server_property("TIMEZONE", "Etc/GMT-14")  # Kiritimati
    assert db.Info().time_zone.offset == "+1400"