Select Git revision
test_misc.py
-
Timm Fitschen authoredTimm Fitschen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_misc.py 13.35 KiB
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2019-2021 IndiScale GmbH <indiscale@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Created on 23.07.2015.
@author: tf
"""
import caosdb as db
from caosdb import (Container, Info, Property, Record, RecordType,
execute_query)
from pytest import raises
from pytest import mark
def setup():
try:
db.execute_query("FIND Test*").delete()
except Exception as e:
print(e)
assert len(db.execute_query("FIND Entity with id>99")) == 0
def test_file_system_returns_ids():
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = db.File(name="TestFileA",
description="Testfile Desc",
path="testfiles/test.dat",
file="test.dat")
file_.insert()
c = db.get_connection()
resp = c.retrieve(
entity_uri_segments=[
"FileSystem",
"testfiles"],
reconnect=True)
body = resp.read()
print(body)
print(type(body))
search = "id=\"" + str(file_.id) + "\""
print(search)
assert search in str(body)
def test_sat_query_with_leading_slash():
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = db.File(name="TestFileA",
description="Testfile Desc",
path="testfiles/test.dat",
file="test.dat")
file_.insert()
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED testfiles/test.dat",
unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED /testfiles/test.dat",
unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED testfiles/test.*", unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED /testfiles/test.*",
unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED testfiles/*", unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED /testfiles/*", unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED */test.dat", unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED /*/test.dat", unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED **/test.dat", unique=True).id
assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED /**/test.dat", unique=True).id
def test_name_with_slash():
rt1 = db.RecordType("Test/Name").insert()
assert rt1.is_valid() is True
rt2 = db.RecordType("Test/Name").retrieve()
assert rt1.id == rt2.id
def test_nonsense_flag():
db.execute_query("FIND Test", flags={"ThisIsUtterNonsense": None})
def test_error_no_such_role():
xml = "<Insert><Entity name='test'/></Insert>"
r = db.get_connection().insert(entity_uri_segment=["Entity"], body=xml)
c = Container._response_to_entities(r)
with raises(db.TransactionError) as cm:
db.raise_errors(c)
assert (cm.value.errors[0].msg ==
"There is no such role 'Entity'.")
xml = "<Insert><ASDF name='test'/></Insert>"
r = db.get_connection().insert(entity_uri_segment=["Entity"], body=xml)
c = Container._response_to_entities(r)
with raises(db.TransactionError) as cm:
db.raise_errors(c)
assert (cm.value.errors[0].msg ==
"There is no such role 'ASDF'.")
def test_parent_duplicate_1():
db.RecordType(name="TestRT1").insert()
db.Property(name="TestProperty", datatype=db.TEXT).insert()
rt2 = db.RecordType(
name="TestRT2").add_parent(
name="TestRT1").add_parent(
name="TestRT1").add_property(
name="TestProperty")
assert len(rt2.get_parents()) == 2
rt2.insert()
assert len(rt2.get_parents()) == 1
assert len(rt2.get_warnings()) == 1
assert rt2.get_warnings()[
0].description == "This entity had parent duplicates. That is meaningless and only one parent had been inserted."
def test_parent_duplicate_2():
db.RecordType(name="TestRT1").insert()
rt2 = db.RecordType(
name="TestRT2").add_parent(
name="TestRT1",
inheritance=db.ALL).add_parent(
name="TestRT1",
inheritance=db.NONE)
assert len(rt2.get_parents()) == 2
with raises(db.TransactionError) as cm:
rt2.insert()
assert (cm.value.errors[0].msg ==
"This entity had parent duplicates. Parent duplicates are meaningless and would be ignored (and inserted only once). But these parents had diverging inheritance instructions which cannot be processed.")
def test_server_error():
con = db.get_connection()
with raises(db.HTTPServerError) as cm:
con._http_request(
method="GET",
path="Entity?debug=throwNullPointerException")
assert "SRID = " in cm.value.msg
def test_annotation():
p2 = Property(name="TestUser", datatype="TEXT").insert()
assert p2.is_valid() is True
p_comment = Property(
name="TestComment",
datatype="TEXT").insert()
assert p_comment.is_valid() is True
p_datetime = Property(
name="TestDatetime",
datatype="Datetime").insert()
assert p_datetime.is_valid() is True
p_entity = Property(
name="TestEntity",
datatype="REFERENCE").insert()
assert p_entity.is_valid() is True
Property(name="REFERENCE").retrieve()
rt = RecordType(
name="TestAnnotation").add_property(
name="TestDatetime",
importance="OBLIGATORY").add_property(
name="TestComment",
importance="OBLIGATORY").add_property(
name="TestUser",
importance="OBLIGATORY").add_property(
name="TestEntity",
importance="OBLIGATORY").insert()
assert rt.is_valid() is True
rt2 = RecordType(name="TestSimpleRecordType").insert()
assert rt2.is_valid() is True
rec = Record(
name="TestSpecialAnnotion").add_parent(rt).add_property(
name="TestDatetime",
value="NOW").add_property(
name="TestUser",
value=db.get_config().get(
"Connection",
"username")).add_property(
name="TestEntity",
value=rt2).add_property(
name="TestComment",
value="Veeeery nice!").insert()
assert rec.is_valid() is True
ann = execute_query(
"FIND TestAnnotation WHICH REFERENCES TestSimpleRecordType AS AN TestEntity",
unique=True)
assert ann.is_valid() is True
assert rec.id == ann.id
def test_info():
i = Info()
assert (i.messages["Flags"]) is not None
assert (i.messages["Counts"]) is not None
assert (i.messages["TransactionBenchmark"]) is not None
assert '-1' != i.messages["Counts"]["files"]
assert '-1' != i.messages["Counts"]["records"]
assert '-1' != i.messages["Counts"]["properties"]
assert '-1' != i.messages["Counts"]["recordTypes"]
assert 'true' == i.messages["Counts"]["debug"]
# Not necessarily 0, all the temporary directories go here as well.
# assert 0 == i.messages["Counts"]["tmpfiles"]
def test_long_description():
longstr = 'desc_'
while len(longstr) < 10000:
longstr += "a"
rt = RecordType(
name="TestSimpleRecordTypeWithLongDesc",
description=longstr).insert()
assert rt.is_valid() is True
assert rt.description == longstr
rt2 = RecordType(name="TestSimpleRecordTypeWithLongDesc").retrieve()
assert rt2.is_valid() is True
assert rt2.description == longstr
def test_auto_importance_for_properties():
p = Property(name="TestProperty1", datatype=db.TEXT).insert()
assert p.is_valid() is True
p2 = Property(
name="TestProperty2",
datatype=db.TEXT).add_property(p).insert()
assert p2.is_valid() is True
def test_overrides_with_deletion_in_worst_case_order():
p = Property(
name="TestProperty1",
description="desc1",
datatype=db.TEXT).insert()
assert p.is_valid() is True
rt1 = RecordType(
name="TestRT1").add_property(
id=p.id,
name="nameOverride1",
description="descOverride1").insert()
assert rt1.is_valid() is True
assert p.id == rt1.get_property("nameOverride1").id
assert "descOverride1" == rt1.get_property("nameOverride1").description
# is persistent?
rt1c = RecordType(name="TestRT1").retrieve()
assert rt1c.is_valid() is True
assert p.id == rt1c.get_property("nameOverride1").id
assert "descOverride1" == rt1c.get_property("nameOverride1").description
db.Container().extend([p, rt1]).delete()
def test_overrides_with_duplicates():
p = Property(
name="TestProperty1",
description="desc1",
datatype=db.TEXT).insert()
assert p.is_valid() is True
rt1 = RecordType(
name="TestRT1").add_property(
id=p.id,
name="nameOverride1",
description="descOverride1").add_property(
id=p.id,
name="nameOverride2",
description="descOverride2").insert()
assert rt1.is_valid() is True
assert p.id == rt1.get_property("nameOverride1").id
assert p.id == rt1.get_property("nameOverride2").id
assert "descOverride1" == rt1.get_property("nameOverride1").description
assert "descOverride2" == rt1.get_property("nameOverride2").description
# is persistent?
rt1c = RecordType(name="TestRT1").retrieve()
assert rt1c.is_valid() is True
assert p.id == rt1c.get_property("nameOverride1").id
assert p.id == rt1c.get_property("nameOverride2").id
assert "descOverride1" == rt1c.get_property("nameOverride1").description
assert "descOverride2" == rt1c.get_property("nameOverride2").description
def test_overrides_in_subdomains():
p1 = Property(
name="TestProperty1",
description="desc1",
datatype=db.TEXT).insert()
assert p1.is_valid() is True
p2 = Property(
name="TestProperty2",
description="desc2",
datatype=db.TEXT).insert()
assert p2.is_valid() is True
p3 = Property(
name="TestProperty3",
description="desc3",
datatype=db.TEXT).insert()
assert p3.is_valid() is True
pov1 = Property(
id=p1.id,
name="TestPropertyov1",
description="desc1ov1")
pov2 = Property(
id=p2.id,
name="TestPropertyov2",
description="desc1ov2")
pov3 = Property(
id=p3.id,
name="TestPropertyov3",
description="desc1ov3")
pov21 = Property(
id=p1.id,
name="TestPropertyov21",
description="desc1ov21")
pov22 = Property(
id=p2.id,
name="TestPropertyov22",
description="desc1ov22")
pov31 = Property(
id=p1.id,
name="TestPropertyov31",
description="desc1ov31")
pov32 = Property(
id=p2.id,
name="TestPropertyov32",
description="desc1ov32")
pov33 = Property(
id=p3.id,
name="TestPropertyov33",
description="desc1ov33")
pov321 = Property(
id=p1.id,
name="TestPropertyov321",
description="desc1ov321")
pov322 = Property(
id=p2.id,
name="TestPropertyov322",
description="desc1ov322")
pov323 = Property(
id=p3.id,
name="TestPropertyov323",
description="desc1ov323")
pov32.add_property(pov321).add_property(
pov322).add_property(pov323)
pov3.add_property(pov31).add_property(pov32).add_property(pov33)
pov2.add_property(pov21).add_property(pov22)
rt1 = RecordType(name="TestRT1").add_property(
pov1).add_property(pov2).add_property(pov3).insert()
assert rt1.is_valid() is True
def test_role_after_retrieve():
rt = db.RecordType("TestRT").insert()
entity = db.Entity(id=rt.id)
assert entity.role is None
entity.retrieve()
assert entity.role == rt.role
def test_retrieve_wrong_role():
rt = db.RecordType("TestRT").insert()
entity = db.Record(id=rt.id)
assert entity.role == "Record"
with raises(ValueError) as cm:
entity.retrieve()
assert cm.value.args[0] == ("The resulting entity had a different role "
"(RecordType) than the local one (Record). "
"This probably means, that the entity was "
"intialized with a wrong class by this client "
"or it has changed in the past and this "
"client did't know about it yet.")