Skip to content
Snippets Groups Projects
Commit d7bf9f32 authored by Alexander Kreft's avatar Alexander Kreft
Browse files

Merge branch 'f-entity-role' into 'dev'

TST: Add more test for entity.role

See merge request !19
parents 183e1bd4 7c345ab2
Branches
Tags
1 merge request!19TST: Add more test for entity.role
Pipeline #14363 failed
# encoding: utf-8 # encoding: utf-8
# #
# ** header v3.0
# This file is a part of the CaosDB Project. # This file is a part of the CaosDB Project.
# #
# Copyright (C) 2018 Research Group Biomedical Physics, # Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2021 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2019-2021 IndiScale GmbH <indiscale@indiscale.com>
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as
...@@ -18,18 +19,11 @@ ...@@ -18,18 +19,11 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
# Copyright (c) 2019 IndiScale GmbH
"""Created on 23.07.2015. """Created on 23.07.2015.
@author: tf @author: tf
""" """
from nose.tools import (assert_equal, assert_is_not_none, # @UnresolvedImport
assert_not_equal, assert_true, nottest,
with_setup)
import caosdb as db import caosdb as db
from caosdb import (Container, Info, Property, Record, RecordType, from caosdb import (Container, Info, Property, Record, RecordType,
execute_query) execute_query)
...@@ -42,9 +36,9 @@ def setup(): ...@@ -42,9 +36,9 @@ def setup():
db.execute_query("FIND Test*").delete() db.execute_query("FIND Test*").delete()
except Exception as e: except Exception as e:
print(e) print(e)
assert len(db.execute_query("FIND Entity with id>99")) == 0
@with_setup(setup=setup, teardown=setup)
def test_file_system_returns_ids(): def test_file_system_returns_ids():
upload_file = open("test.dat", "w") upload_file = open("test.dat", "w")
upload_file.write("hello world\n") upload_file.write("hello world\n")
...@@ -65,7 +59,7 @@ def test_file_system_returns_ids(): ...@@ -65,7 +59,7 @@ def test_file_system_returns_ids():
print(type(body)) print(type(body))
search = "id=\"" + str(file_.id) + "\"" search = "id=\"" + str(file_.id) + "\""
print(search) print(search)
assert_true(search in str(body)) assert search in str(body)
def test_sat_query_with_leading_slash(): def test_sat_query_with_leading_slash():
...@@ -77,63 +71,36 @@ def test_sat_query_with_leading_slash(): ...@@ -77,63 +71,36 @@ def test_sat_query_with_leading_slash():
path="testfiles/test.dat", path="testfiles/test.dat",
file="test.dat") file="test.dat")
file_.insert() file_.insert()
assert_equal( assert file_.id == db.execute_query(
file_.id, "FIND FILE WHICH IS STORED testfiles/test.dat",
db.execute_query( unique=True).id
"FIND FILE WHICH IS STORED testfiles/test.dat", assert file_.id == db.execute_query(
unique=True).id) "FIND FILE WHICH IS STORED /testfiles/test.dat",
assert_equal( unique=True).id
file_.id, assert file_.id == db.execute_query(
db.execute_query( "FIND FILE WHICH IS STORED testfiles/test.*", unique=True).id
"FIND FILE WHICH IS STORED /testfiles/test.dat", assert file_.id == db.execute_query(
unique=True).id) "FIND FILE WHICH IS STORED /testfiles/test.*",
assert_equal( unique=True).id
file_.id, assert file_.id == db.execute_query(
db.execute_query( "FIND FILE WHICH IS STORED testfiles/*", unique=True).id
"FIND FILE WHICH IS STORED testfiles/test.*", assert file_.id == db.execute_query(
unique=True).id) "FIND FILE WHICH IS STORED /testfiles/*", unique=True).id
assert_equal( assert file_.id == db.execute_query(
file_.id, "FIND FILE WHICH IS STORED */test.dat", unique=True).id
db.execute_query( assert file_.id == db.execute_query(
"FIND FILE WHICH IS STORED /testfiles/test.*", "FIND FILE WHICH IS STORED /*/test.dat", unique=True).id
unique=True).id) assert file_.id == db.execute_query(
assert_equal( "FIND FILE WHICH IS STORED **/test.dat", unique=True).id
file_.id, assert file_.id == db.execute_query(
db.execute_query( "FIND FILE WHICH IS STORED /**/test.dat", unique=True).id
"FIND FILE WHICH IS STORED testfiles/*",
unique=True).id)
assert_equal(
file_.id,
db.execute_query(
"FIND FILE WHICH IS STORED /testfiles/*",
unique=True).id)
assert_equal(
file_.id,
db.execute_query(
"FIND FILE WHICH IS STORED */test.dat",
unique=True).id)
assert_equal(
file_.id,
db.execute_query(
"FIND FILE WHICH IS STORED /*/test.dat",
unique=True).id)
assert_equal(
file_.id,
db.execute_query(
"FIND FILE WHICH IS STORED **/test.dat",
unique=True).id)
assert_equal(
file_.id,
db.execute_query(
"FIND FILE WHICH IS STORED /**/test.dat",
unique=True).id)
def test_name_with_slash(): def test_name_with_slash():
rt1 = db.RecordType("Test/Name").insert() rt1 = db.RecordType("Test/Name").insert()
assert_true(rt1.is_valid()) assert rt1.is_valid() is True
rt2 = db.RecordType("Test/Name").retrieve() rt2 = db.RecordType("Test/Name").retrieve()
assert_equal(rt1.id, rt2.id) assert rt1.id == rt2.id
def test_nonsense_flag(): def test_nonsense_flag():
...@@ -158,7 +125,6 @@ def test_error_no_such_role(): ...@@ -158,7 +125,6 @@ def test_error_no_such_role():
"There is no such role 'ASDF'.") "There is no such role 'ASDF'.")
@with_setup(setup, setup)
def test_parent_duplicate_1(): def test_parent_duplicate_1():
db.RecordType(name="TestRT1").insert() db.RecordType(name="TestRT1").insert()
db.Property(name="TestProperty", datatype=db.TEXT).insert() db.Property(name="TestProperty", datatype=db.TEXT).insert()
...@@ -167,16 +133,14 @@ def test_parent_duplicate_1(): ...@@ -167,16 +133,14 @@ def test_parent_duplicate_1():
name="TestRT1").add_parent( name="TestRT1").add_parent(
name="TestRT1").add_property( name="TestRT1").add_property(
name="TestProperty") name="TestProperty")
assert_equal(len(rt2.get_parents()), 2) assert len(rt2.get_parents()) == 2
rt2.insert() rt2.insert()
assert_equal(len(rt2.get_parents()), 1) assert len(rt2.get_parents()) == 1
assert_equal(len(rt2.get_warnings()), 1) assert len(rt2.get_warnings()) == 1
assert_equal( assert rt2.get_warnings()[
rt2.get_warnings()[0].description, 0].description == "This entity had parent duplicates. That is meaningless and only one parent had been inserted."
"This entity had parent duplicates. That is meaningless and only one parent had been inserted.")
@with_setup(setup, setup)
def test_parent_duplicate_2(): def test_parent_duplicate_2():
db.RecordType(name="TestRT1").insert() db.RecordType(name="TestRT1").insert()
rt2 = db.RecordType( rt2 = db.RecordType(
...@@ -199,338 +163,245 @@ def test_server_error(): ...@@ -199,338 +163,245 @@ def test_server_error():
con._http_request( con._http_request(
method="GET", method="GET",
path="Entity?debug=throwNullPointerException") path="Entity?debug=throwNullPointerException")
assert_true("SRID = " in cm.value.msg) assert "SRID = " in cm.value.msg
def test_annotation(): def test_annotation():
try: p2 = Property(name="TestUser", datatype="TEXT").insert()
p2 = Property(name="AnnotationTestUser", datatype="TEXT").insert() assert p2.is_valid() is True
assert_true(p2.is_valid())
p_comment = Property(
p_comment = Property( name="TestComment",
name="AnnotationTestComment", datatype="TEXT").insert()
datatype="TEXT").insert() assert p_comment.is_valid() is True
assert_true(p_comment.is_valid())
p_datetime = Property(
p_datetime = Property( name="TestDatetime",
name="AnnotationTestDatetime", datatype="Datetime").insert()
datatype="Datetime").insert() assert p_datetime.is_valid() is True
assert_true(p_datetime.is_valid())
p_entity = Property(
p_entity = Property( name="TestEntity",
name="AnnotationTestEntity", datatype="REFERENCE").insert()
datatype="REFERENCE").insert() assert p_entity.is_valid() is True
assert_true(p_entity.is_valid())
Property(name="REFERENCE").retrieve()
Property(name="REFERENCE").retrieve()
rt = RecordType(
rt = RecordType( name="TestAnnotation").add_property(
name="AnnotationTestAnnotation").add_property( name="TestDatetime",
name="AnnotationTestDatetime", importance="OBLIGATORY").add_property(
name="TestComment",
importance="OBLIGATORY").add_property(
name="TestUser",
importance="OBLIGATORY").add_property( importance="OBLIGATORY").add_property(
name="AnnotationTestComment", name="TestEntity",
importance="OBLIGATORY").add_property( importance="OBLIGATORY").insert()
name="AnnotationTestUser",
importance="OBLIGATORY").add_property(
name="AnnotationTestEntity",
importance="OBLIGATORY").insert()
assert_true(rt.is_valid())
rt2 = RecordType(name="AnnotationTestSimpleRecordType").insert() assert rt.is_valid() is True
assert_true(rt2.is_valid())
rec = Record( rt2 = RecordType(name="TestSimpleRecordType").insert()
name="AnnotationTestSpecialAnnotion").add_parent(rt).add_property( assert rt2.is_valid() is True
name="AnnotationTestDatetime",
value="NOW").add_property(
name="AnnotationTestUser",
value=db.get_config().get(
"Connection",
"username")).add_property(
name="AnnotationTestEntity",
value=rt2).add_property(
name="AnnotationTestComment",
value="Veeeery nice!").insert()
assert_true(rec.is_valid()) rec = Record(
name="TestSpecialAnnotion").add_parent(rt).add_property(
name="TestDatetime",
value="NOW").add_property(
name="TestUser",
value=db.get_config().get(
"Connection",
"username")).add_property(
name="TestEntity",
value=rt2).add_property(
name="TestComment",
value="Veeeery nice!").insert()
ann = execute_query( assert rec.is_valid() is True
"FIND AnnotationTestAnnotation WHICH REFERENCES AnnotationTestSimpleRecordType AS AN AnnotationTestEntity",
unique=True)
assert_true(ann.is_valid())
assert_equal(rec.id, ann.id)
finally: ann = execute_query(
rt = execute_query("FIND AnnotationTest*").delete() "FIND TestAnnotation WHICH REFERENCES TestSimpleRecordType AS AN TestEntity",
unique=True)
assert ann.is_valid() is True
assert rec.id == ann.id
def test_info(): def test_info():
i = Info() i = Info()
assert_is_not_none(i.messages["Flags"]) assert (i.messages["Flags"]) is not None
assert_is_not_none(i.messages["Counts"]) assert (i.messages["Counts"]) is not None
assert_is_not_none(i.messages["TransactionBenchmark"]) assert (i.messages["TransactionBenchmark"]) is not None
assert_not_equal('-1', i.messages["Counts"]["files"]) assert '-1' != i.messages["Counts"]["files"]
assert_not_equal('-1', i.messages["Counts"]["records"]) assert '-1' != i.messages["Counts"]["records"]
assert_not_equal('-1', i.messages["Counts"]["properties"]) assert '-1' != i.messages["Counts"]["properties"]
assert_not_equal('-1', i.messages["Counts"]["recordTypes"]) assert '-1' != i.messages["Counts"]["recordTypes"]
assert_equal('true', i.messages["Counts"]["debug"]) assert 'true' == i.messages["Counts"]["debug"]
# Not necessarily 0, all the temporary directories go here as well. # Not necessarily 0, all the temporary directories go here as well.
# assert_equal('0', i.messages["Counts"]["tmpfiles"]) # assert 0 == i.messages["Counts"]["tmpfiles"]
def test_long_description(): def test_long_description():
try: longstr = 'desc_'
while len(longstr) < 10000:
longstr += "a"
longstr = 'desc_' rt = RecordType(
while len(longstr) < 10000: name="TestSimpleRecordTypeWithLongDesc",
longstr += "a" description=longstr).insert()
assert rt.is_valid() is True
assert rt.description == longstr
rt = RecordType( rt2 = RecordType(name="TestSimpleRecordTypeWithLongDesc").retrieve()
name="SimpleRecordTypeWithLongDesc", assert rt2.is_valid() is True
description=longstr).insert() assert rt2.description == longstr
assert_true(rt.is_valid())
assert_equal(rt.description, longstr)
rt2 = RecordType(name="SimpleRecordTypeWithLongDesc").retrieve()
assert_true(rt2.is_valid())
assert_equal(rt2.description, longstr)
finally:
try:
rt.delete()
except BaseException:
pass
def test_auto_importance_for_properties(): def test_auto_importance_for_properties():
try: p = Property(name="TestProperty1", datatype=db.TEXT).insert()
p = Property(name="SimpleTestProperty1", datatype=db.TEXT).insert() assert p.is_valid() is True
assert_true(p.is_valid())
p2 = Property( p2 = Property(
name="SimpleTestProperty2", name="TestProperty2",
datatype=db.TEXT).add_property(p).insert() datatype=db.TEXT).add_property(p).insert()
assert_true(p2.is_valid()) assert p2.is_valid() is True
finally:
try:
p2.delete()
except BaseException:
pass
try:
p.delete()
except BaseException:
pass
def test_overrides_with_deletion_in_worst_case_order(): def test_overrides_with_deletion_in_worst_case_order():
try: p = Property(
try: name="TestProperty1",
db.execute_query("FIND Simple*").delete() description="desc1",
except BaseException: datatype=db.TEXT).insert()
pass assert p.is_valid() is True
p = Property( rt1 = RecordType(
name="SimpleTestProperty1", name="TestRT1").add_property(
description="desc1", id=p.id,
datatype=db.TEXT).insert() name="nameOverride1",
assert_true(p.is_valid()) description="descOverride1").insert()
assert rt1.is_valid() is True
rt1 = RecordType(
name="SimpleRT1").add_property( assert p.id == rt1.get_property("nameOverride1").id
id=p.id, assert "descOverride1" == rt1.get_property("nameOverride1").description
name="nameOverride1",
description="descOverride1").insert() # is persistent?
assert_true(rt1.is_valid()) rt1c = RecordType(name="TestRT1").retrieve()
assert rt1c.is_valid() is True
assert_equal(p.id, rt1.get_property("nameOverride1").id)
assert_equal( assert p.id == rt1c.get_property("nameOverride1").id
"descOverride1", assert "descOverride1" == rt1c.get_property("nameOverride1").description
rt1.get_property("nameOverride1").description)
db.Container().extend([p, rt1]).delete()
# is persistent?
rt1c = RecordType(name="SimpleRT1").retrieve()
assert_true(rt1c.is_valid())
assert_equal(p.id, rt1c.get_property("nameOverride1").id)
assert_equal(
"descOverride1",
rt1c.get_property("nameOverride1").description)
db.Container().extend([p, rt1]).delete()
finally:
try:
rt1.delete()
except BaseException:
pass
try:
p.delete()
except BaseException:
pass
def test_overrides_with_duplicates(): def test_overrides_with_duplicates():
try: p = Property(
try: name="TestProperty1",
db.execute_query("FIND Simple*").delete() description="desc1",
except BaseException: datatype=db.TEXT).insert()
pass assert p.is_valid() is True
p = Property( rt1 = RecordType(
name="SimpleTestProperty1", name="TestRT1").add_property(
description="desc1", id=p.id,
datatype=db.TEXT).insert() name="nameOverride1",
assert_true(p.is_valid()) description="descOverride1").add_property(
id=p.id,
rt1 = RecordType( name="nameOverride2",
name="SimpleRT1").add_property( description="descOverride2").insert()
id=p.id, assert rt1.is_valid() is True
name="nameOverride1",
description="descOverride1").add_property( assert p.id == rt1.get_property("nameOverride1").id
id=p.id, assert p.id == rt1.get_property("nameOverride2").id
name="nameOverride2", assert "descOverride1" == rt1.get_property("nameOverride1").description
description="descOverride2").insert() assert "descOverride2" == rt1.get_property("nameOverride2").description
assert_true(rt1.is_valid())
# is persistent?
assert_equal(p.id, rt1.get_property("nameOverride1").id) rt1c = RecordType(name="TestRT1").retrieve()
assert_equal(p.id, rt1.get_property("nameOverride2").id) assert rt1c.is_valid() is True
assert_equal(
"descOverride1", assert p.id == rt1c.get_property("nameOverride1").id
rt1.get_property("nameOverride1").description) assert p.id == rt1c.get_property("nameOverride2").id
assert_equal( assert "descOverride1" == rt1c.get_property("nameOverride1").description
"descOverride2", assert "descOverride2" == rt1c.get_property("nameOverride2").description
rt1.get_property("nameOverride2").description)
# is persistent?
rt1c = RecordType(name="SimpleRT1").retrieve()
assert_true(rt1c.is_valid())
assert_equal(p.id, rt1c.get_property("nameOverride1").id)
assert_equal(p.id, rt1c.get_property("nameOverride2").id)
assert_equal(
"descOverride1",
rt1c.get_property("nameOverride1").description)
assert_equal(
"descOverride2",
rt1c.get_property("nameOverride2").description)
finally:
try:
rt1.delete()
except BaseException:
pass
try:
p.delete()
except BaseException:
pass
def test_overrides_in_subdomains(): def test_overrides_in_subdomains():
try: p1 = Property(
try: name="TestProperty1",
db.execute_query("FIND Simple*").delete() description="desc1",
except BaseException: datatype=db.TEXT).insert()
pass assert p1.is_valid() is True
def insert_model(): p2 = Property(
p1 = Property( name="TestProperty2",
name="SimpleTestProperty1", description="desc2",
description="desc1", datatype=db.TEXT).insert()
datatype=db.TEXT).insert() assert p2.is_valid() is True
assert_true(p1.is_valid())
p3 = Property(
p2 = Property( name="TestProperty3",
name="SimpleTestProperty2", description="desc3",
description="desc2", datatype=db.TEXT).insert()
datatype=db.TEXT).insert() assert p3.is_valid() is True
assert_true(p2.is_valid())
pov1 = Property(
p3 = Property( id=p1.id,
name="SimpleTestProperty3", name="TestPropertyov1",
description="desc3", description="desc1ov1")
datatype=db.TEXT).insert() pov2 = Property(
assert_true(p3.is_valid()) id=p2.id,
name="TestPropertyov2",
pov1 = Property( description="desc1ov2")
id=p1.id, pov3 = Property(
name="SimpleTestPropertyov1", id=p3.id,
description="desc1ov1") name="TestPropertyov3",
pov2 = Property( description="desc1ov3")
id=p2.id,
name="SimpleTestPropertyov2", pov21 = Property(
description="desc1ov2") id=p1.id,
pov3 = Property( name="TestPropertyov21",
id=p3.id, description="desc1ov21")
name="SimpleTestPropertyov3", pov22 = Property(
description="desc1ov3") id=p2.id,
name="TestPropertyov22",
pov21 = Property( description="desc1ov22")
id=p1.id,
name="SimpleTestPropertyov21", pov31 = Property(
description="desc1ov21") id=p1.id,
pov22 = Property( name="TestPropertyov31",
id=p2.id, description="desc1ov31")
name="SimpleTestPropertyov22", pov32 = Property(
description="desc1ov22") id=p2.id,
name="TestPropertyov32",
pov31 = Property( description="desc1ov32")
id=p1.id, pov33 = Property(
name="SimpleTestPropertyov31", id=p3.id,
description="desc1ov31") name="TestPropertyov33",
pov32 = Property( description="desc1ov33")
id=p2.id,
name="SimpleTestPropertyov32", pov321 = Property(
description="desc1ov32") id=p1.id,
pov33 = Property( name="TestPropertyov321",
id=p3.id, description="desc1ov321")
name="SimpleTestPropertyov33", pov322 = Property(
description="desc1ov33") id=p2.id,
name="TestPropertyov322",
pov321 = Property( description="desc1ov322")
id=p1.id, pov323 = Property(
name="SimpleTestPropertyov321", id=p3.id,
description="desc1ov321") name="TestPropertyov323",
pov322 = Property( description="desc1ov323")
id=p2.id,
name="SimpleTestPropertyov322", pov32.add_property(pov321).add_property(
description="desc1ov322") pov322).add_property(pov323)
pov323 = Property( pov3.add_property(pov31).add_property(pov32).add_property(pov33)
id=p3.id, pov2.add_property(pov21).add_property(pov22)
name="SimpleTestPropertyov323",
description="desc1ov323") rt1 = RecordType(name="TestRT1").add_property(
pov1).add_property(pov2).add_property(pov3).insert()
pov32.add_property(pov321).add_property( assert rt1.is_valid() is True
pov322).add_property(pov323)
pov3.add_property(pov31).add_property(pov32).add_property(pov33)
pov2.add_property(pov21).add_property(pov22) @mark.skip()
rt1 = RecordType(name="SimpleRT1").add_property(
pov1).add_property(pov2).add_property(pov3).insert()
assert_true(rt1.is_valid())
insert_model()
finally:
try:
db.execute_query("FIND SimpleRT1").delete()
except BaseException:
pass
try:
db.execute_query("FIND SimpleTestProperty3").delete()
except BaseException:
pass
try:
db.execute_query("FIND SimpleTestProperty2").delete()
except BaseException:
pass
try:
db.execute_query("FIND SimpleTestProperty1").delete()
except BaseException:
pass
@nottest
def test_cache_performance(): def test_cache_performance():
import time as t import time as t
...@@ -584,10 +455,23 @@ def test_cache_performance(): ...@@ -584,10 +455,23 @@ def test_cache_performance():
n += 10 n += 10
@mark.xfail(reason="Waits for MR https://gitlab.indiscale.com/caosdb/src/caosdb-pylib/-/merge_requests/15")
def test_role_after_retrieve(): def test_role_after_retrieve():
rt = db.RecordType("TestRT").insert() rt = db.RecordType("TestRT").insert()
entity = db.Entity(id=rt.id) entity = db.Entity(id=rt.id)
assert entity.role is None assert entity.role is None
entity.retrieve() entity.retrieve()
assert entity.role == rt.role assert entity.role == rt.role
def test_retrieve_wrong_role():
rt = db.RecordType("TestRT").insert()
entity = db.Record(id=rt.id)
assert entity.role == "Record"
with raises(ValueError) as cm:
entity.retrieve()
assert cm.value.args[0] == ("The resulting entity had a different role "
"(RecordType) than the local one (Record). "
"This probably means, that the entity was "
"intialized with a wrong class by this client "
"or it has changed in the past and this "
"client did't know about it yet.")
...@@ -29,14 +29,12 @@ ...@@ -29,14 +29,12 @@
from __future__ import absolute_import from __future__ import absolute_import
import caosdb as db import caosdb as db
from nose.tools import (assert_true, from nose.tools import (assert_equal,
assert_equal,
assert_raises, assert_raises,
assert_false, assert_false,
assert_is_none, assert_is_none,
assert_is_not_none, assert_is_not_none)
nottest) from pytest import raises, mark
from pytest import raises
from .test_misc import test_info from .test_misc import test_info
...@@ -66,7 +64,7 @@ def teardown_module(): ...@@ -66,7 +64,7 @@ def teardown_module():
pass pass
@nottest @mark.skip
def switch_to_test_user(): def switch_to_test_user():
db.configure_connection(username=test_user, password=test_pw, db.configure_connection(username=test_user, password=test_pw,
password_method="plain") password_method="plain")
...@@ -106,7 +104,7 @@ def grant_permission(entity, permission, username=test_user, priority=False, ...@@ -106,7 +104,7 @@ def grant_permission(entity, permission, username=test_user, priority=False,
return ret return ret
@nottest # No need to test manually, is called by setup() @mark.skip
def insert_test_user(): def insert_test_user():
try: try:
db.administration._delete_user(name=test_user) db.administration._delete_user(name=test_user)
...@@ -167,13 +165,13 @@ def test_basic_acl_stuff(): ...@@ -167,13 +165,13 @@ def test_basic_acl_stuff():
datatype=db.TEXT).insert( datatype=db.TEXT).insert(
flags={ flags={
"ACL": None}) "ACL": None})
assert_true(p.is_valid()) assert p.is_valid()
assert_is_not_none(p.acl) assert_is_not_none(p.acl)
p.retrieve(flags={"ACL": None}) p.retrieve(flags={"ACL": None})
assert_true(p.is_valid()) assert p.is_valid()
assert_is_not_none(p.acl) assert_is_not_none(p.acl)
assert_true(isinstance(p.acl, db.ACL)) assert isinstance(p.acl, db.ACL)
assert_false(p.acl.is_empty()) assert_false(p.acl.is_empty())
user_acl = p.acl.get_acl_for_user( user_acl = p.acl.get_acl_for_user(
...@@ -181,16 +179,16 @@ def test_basic_acl_stuff(): ...@@ -181,16 +179,16 @@ def test_basic_acl_stuff():
print(user_acl) print(user_acl)
assert_is_not_none(user_acl) assert_is_not_none(user_acl)
assert_true(isinstance(user_acl, db.ACL)) assert isinstance(user_acl, db.ACL)
assert_false(user_acl.is_empty()) assert_false(user_acl.is_empty())
user_permissions = p.acl.get_permissions_for_user( user_permissions = p.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")) db.get_config().get("Connection", "username"))
assert_is_not_none(user_permissions) assert_is_not_none(user_permissions)
assert_true(isinstance(user_permissions, set)) assert isinstance(user_permissions, set)
assert_true(len(user_permissions) > 0) assert len(user_permissions) > 0
assert_true("DELETE" in user_permissions) assert "DELETE" in user_permissions
other_role_permissions = p.acl.get_permissions_for_role("other_role") other_role_permissions = p.acl.get_permissions_for_role("other_role")
assert_false("DELETE" in other_role_permissions) assert_false("DELETE" in other_role_permissions)
...@@ -269,14 +267,13 @@ def test_query(): ...@@ -269,14 +267,13 @@ def test_query():
def test_update_acl(): def test_update_acl():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert() p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid()) assert p.is_valid()
assert_is_none(p.acl) assert_is_none(p.acl)
p.retrieve_acl() p.retrieve_acl()
assert_is_not_none(p.acl) assert_is_not_none(p.acl)
assert_true( assert "USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user(
"USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user( db.get_config().get("Connection", "username"))
db.get_config().get("Connection", "username")))
p.acl.deny( p.acl.deny(
username=db.get_config().get("Connection", "username"), username=db.get_config().get("Connection", "username"),
permission="USE:AS_DATA_TYPE") permission="USE:AS_DATA_TYPE")
...@@ -286,7 +283,7 @@ def test_update_acl(): ...@@ -286,7 +283,7 @@ def test_update_acl():
'''Success''' '''Success'''
p.update(flags={"ACL": None}) p.update(flags={"ACL": None})
assert_true(p.is_valid()) assert p.is_valid()
assert_is_not_none(p.acl) assert_is_not_none(p.acl)
assert_false( assert_false(
"USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user( "USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user(
...@@ -297,18 +294,16 @@ def test_update_acl(): ...@@ -297,18 +294,16 @@ def test_update_acl():
unique=True, unique=True,
flags={ flags={
"ACL": None}) "ACL": None})
assert_true(p2.is_valid()) assert p2.is_valid()
assert_is_not_none(p2.acl) assert_is_not_none(p2.acl)
assert_true( assert "USE:AS_PROPERTY" in p2.acl.get_permissions_for_user(
"USE:AS_PROPERTY" in p2.acl.get_permissions_for_user( db.get_config().get("Connection", "username"))
db.get_config().get("Connection", "username")))
assert_false( assert_false(
"USE:AS_DATA_TYPE" in p2.acl.get_permissions_for_user( "USE:AS_DATA_TYPE" in p2.acl.get_permissions_for_user(
db.get_config().get("Connection", "username"))) db.get_config().get("Connection", "username")))
assert_true( assert "EDIT:ACL" in p2.acl.get_permissions_for_user(
"EDIT:ACL" in p2.acl.get_permissions_for_user( db.get_config().get("Connection", "username"))
db.get_config().get("Connection", "username")))
p2.acl.deny( p2.acl.deny(
username=db.get_config().get("Connection", "username"), username=db.get_config().get("Connection", "username"),
permission="EDIT:ACL") permission="EDIT:ACL")
...@@ -324,11 +319,10 @@ def test_update_acl(): ...@@ -324,11 +319,10 @@ def test_update_acl():
unique=True, unique=True,
flags={ flags={
"ACL": None}) "ACL": None})
assert_true(p3.is_valid()) assert p3.is_valid()
assert_is_not_none(p3.acl) assert_is_not_none(p3.acl)
assert_true( assert "USE:AS_PROPERTY" in p3.acl.get_permissions_for_user(
"USE:AS_PROPERTY" in p3.acl.get_permissions_for_user( db.get_config().get("Connection", "username"))
db.get_config().get("Connection", "username")))
assert_false( assert_false(
"USE:AS_DATA_TYPE" in p3.acl.get_permissions_for_user( "USE:AS_DATA_TYPE" in p3.acl.get_permissions_for_user(
db.get_config().get("Connection", "username"))) db.get_config().get("Connection", "username")))
...@@ -370,7 +364,7 @@ def test_update_acl(): ...@@ -370,7 +364,7 @@ def test_update_acl():
def test_update_name(): def test_update_name():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert() p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid()) assert p.is_valid()
grant_permission(p, "RETRIEVE:*") grant_permission(p, "RETRIEVE:*")
grant_permission(p, "UPDATE:*") grant_permission(p, "UPDATE:*")
...@@ -381,7 +375,7 @@ def test_update_name(): ...@@ -381,7 +375,7 @@ def test_update_name():
p.update() p.update()
p2 = db.execute_query("FIND Test*", unique=True) p2 = db.execute_query("FIND Test*", unique=True)
assert_true(p.is_valid()) assert p.is_valid()
assert_equal(p2.name, "TestPropertyNew") assert_equal(p2.name, "TestPropertyNew")
'''Failure''' '''Failure'''
...@@ -405,7 +399,7 @@ def test_update_desc(): ...@@ -405,7 +399,7 @@ def test_update_desc():
name="TestProperty", name="TestProperty",
description="Description", description="Description",
datatype=db.TEXT).insert() datatype=db.TEXT).insert()
assert_true(p.is_valid()) assert p.is_valid()
grant_permission(p, "RETRIEVE:*") grant_permission(p, "RETRIEVE:*")
grant_permission(p, "UPDATE:DESCRIPTION") grant_permission(p, "UPDATE:DESCRIPTION")
...@@ -417,7 +411,7 @@ def test_update_desc(): ...@@ -417,7 +411,7 @@ def test_update_desc():
p.update() p.update()
p2 = db.execute_query("FIND Test*", unique=True) p2 = db.execute_query("FIND Test*", unique=True)
assert_true(p.is_valid()) assert p.is_valid()
assert_equal(p2.description, "DescriptionNew") assert_equal(p2.description, "DescriptionNew")
deny_permission(p, "UPDATE:DESCRIPTION") deny_permission(p, "UPDATE:DESCRIPTION")
...@@ -435,7 +429,7 @@ def test_update_desc(): ...@@ -435,7 +429,7 @@ def test_update_desc():
def test_update_data_type(): def test_update_data_type():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert() p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid()) assert p.is_valid()
grant_permission(p, "RETRIEVE:ENTITY") grant_permission(p, "RETRIEVE:ENTITY")
grant_permission(p, "UPDATE:DATA_TYPE") grant_permission(p, "UPDATE:DATA_TYPE")
...@@ -446,7 +440,7 @@ def test_update_data_type(): ...@@ -446,7 +440,7 @@ def test_update_data_type():
p.update() p.update()
p2 = db.execute_query("FIND Test*", unique=True) p2 = db.execute_query("FIND Test*", unique=True)
assert_true(p.is_valid()) assert p.is_valid()
assert_equal(p2.datatype, db.INTEGER) assert_equal(p2.datatype, db.INTEGER)
deny_permission(p, "UPDATE:DATA_TYPE") deny_permission(p, "UPDATE:DATA_TYPE")
...@@ -464,22 +458,24 @@ def test_update_data_type(): ...@@ -464,22 +458,24 @@ def test_update_data_type():
def test_update_role(): def test_update_role():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert() p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid()) assert p.is_valid()
grant_permission(p, "RETRIEVE:ENTITY") grant_permission(p, "RETRIEVE:ENTITY")
grant_permission(p, "UPDATE:ROLE") grant_permission(p, "UPDATE:ROLE")
'''Success''' '''Success'''
rt = db.RecordType(name="TestProperty").retrieve() rt = db.Entity(name="TestProperty").retrieve()
rt.role = "RecordType"
rt.update() rt.update()
rt2 = db.execute_query("FIND TestProperty", unique=True) rt2 = db.execute_query("FIND TestProperty", unique=True)
assert_true(isinstance(rt2, db.RecordType)) assert isinstance(rt2, db.RecordType)
deny_permission(p, "UPDATE:ROLE") deny_permission(p, "UPDATE:ROLE")
'''Failure''' '''Failure'''
rec = db.Record(name="TestProperty").retrieve() rec = db.Entity(name="TestProperty").retrieve()
rec.role = "Record"
with raises(db.TransactionError) as te: with raises(db.TransactionError) as te:
rec.update() rec.update()
assert te.value.has_error(db.AuthorizationError) assert te.value.has_error(db.AuthorizationError)
...@@ -498,7 +494,7 @@ def test_update_move_file(): ...@@ -498,7 +494,7 @@ def test_update_move_file():
name="TestFile", name="TestFile",
path="/permissiontestfiles/test.dat", path="/permissiontestfiles/test.dat",
file="test.dat").insert() file="test.dat").insert()
assert_true(f.is_valid()) assert f.is_valid()
grant_permission(f, "RETRIEVE:ENTITY") grant_permission(f, "RETRIEVE:ENTITY")
grant_permission(f, "UPDATE:FILE:MOVE") grant_permission(f, "UPDATE:FILE:MOVE")
...@@ -549,7 +545,7 @@ def test_update_add_file(): ...@@ -549,7 +545,7 @@ def test_update_add_file():
f2.path = "/permissiontestfiles/newtest.dat" f2.path = "/permissiontestfiles/newtest.dat"
f2.file = upload_file f2.file = upload_file
f2.update() f2.update()
assert_true(f2.is_valid()) assert f2.is_valid()
f2 = db.execute_query("FIND TestFile", unique=True) f2 = db.execute_query("FIND TestFile", unique=True)
assert_equal(f2.path, "/permissiontestfiles/newtest.dat") assert_equal(f2.path, "/permissiontestfiles/newtest.dat")
...@@ -568,7 +564,7 @@ def test_update_change_file(): ...@@ -568,7 +564,7 @@ def test_update_change_file():
name="TestFile", name="TestFile",
file=upload_file, file=upload_file,
path="permissiontestfiles/test.dat").insert() path="permissiontestfiles/test.dat").insert()
assert_true(f.is_valid()) assert f.is_valid()
grant_permission(f, "RETRIEVE:ENTITY") grant_permission(f, "RETRIEVE:ENTITY")
grant_permission(f, "RETRIEVE:FILE") grant_permission(f, "RETRIEVE:FILE")
...@@ -607,11 +603,11 @@ def test_update_change_file(): ...@@ -607,11 +603,11 @@ def test_update_change_file():
def test_update_add_property(): def test_update_add_property():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert() p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid()) assert p.is_valid()
p2 = db.Property(name="TestProperty2", datatype=db.TEXT).insert() p2 = db.Property(name="TestProperty2", datatype=db.TEXT).insert()
assert_true(p2.is_valid()) assert p2.is_valid()
rt = db.RecordType(name="TestRecordType").insert() rt = db.RecordType(name="TestRecordType").insert()
assert_true(rt.is_valid()) assert rt.is_valid()
rt = db.execute_query("FIND TestRecordType", unique=True) rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_properties()), 0) assert_equal(len(rt.get_properties()), 0)
...@@ -656,14 +652,14 @@ def test_update_add_property(): ...@@ -656,14 +652,14 @@ def test_update_add_property():
def test_update_remove_property(): def test_update_remove_property():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert() p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid()) assert p.is_valid()
p2 = db.Property(name="TestProperty2", datatype=db.TEXT).insert() p2 = db.Property(name="TestProperty2", datatype=db.TEXT).insert()
assert_true(p2.is_valid()) assert p2.is_valid()
rt = db.RecordType( rt = db.RecordType(
name="TestRecordType").add_property( name="TestRecordType").add_property(
name="TestProperty").add_property( name="TestProperty").add_property(
name="TestProperty2").insert() name="TestProperty2").insert()
assert_true(rt.is_valid()) assert rt.is_valid()
rt = db.execute_query("FIND TestRecordType", unique=True) rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_properties()), 2) assert_equal(len(rt.get_properties()), 2)
...@@ -701,11 +697,11 @@ def test_update_remove_property(): ...@@ -701,11 +697,11 @@ def test_update_remove_property():
def test_update_add_parent(): def test_update_add_parent():
rt = db.RecordType(name="TestRecordType").insert() rt = db.RecordType(name="TestRecordType").insert()
assert_true(rt.is_valid()) assert rt.is_valid()
par1 = db.RecordType(name="TestRecordTypePar1").insert() par1 = db.RecordType(name="TestRecordTypePar1").insert()
assert_true(par1.is_valid()) assert par1.is_valid()
par2 = db.RecordType(name="TestRecordTypePar2").insert() par2 = db.RecordType(name="TestRecordTypePar2").insert()
assert_true(par2.is_valid()) assert par2.is_valid()
rt = db.execute_query("FIND TestRecordType", unique=True) rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_parents()), 0) assert_equal(len(rt.get_parents()), 0)
...@@ -750,12 +746,12 @@ def test_update_add_parent(): ...@@ -750,12 +746,12 @@ def test_update_add_parent():
def test_update_remove_parent(): def test_update_remove_parent():
par1 = db.RecordType(name="TestRecordTypePar1").insert() par1 = db.RecordType(name="TestRecordTypePar1").insert()
assert_true(par1.is_valid()) assert par1.is_valid()
par2 = db.RecordType(name="TestRecordTypePar2").insert() par2 = db.RecordType(name="TestRecordTypePar2").insert()
assert_true(par2.is_valid()) assert par2.is_valid()
rt = db.RecordType(name="TestRecordType").add_parent( rt = db.RecordType(name="TestRecordType").add_parent(
par1).add_parent(par2).insert() par1).add_parent(par2).insert()
assert_true(rt.is_valid()) assert rt.is_valid()
rt = db.execute_query("FIND TestRecordType", unique=True) rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_parents()), 2) assert_equal(len(rt.get_parents()), 2)
...@@ -1020,7 +1016,7 @@ def test_permissions_there(): ...@@ -1020,7 +1016,7 @@ def test_permissions_there():
entity2 = db.Property(id=entity.id).retrieve() entity2 = db.Property(id=entity.id).retrieve()
assert_is_not_none(entity2.permissions) assert_is_not_none(entity2.permissions)
assert_true(entity2.is_permitted("RETRIEVE:ACL")) assert entity2.is_permitted("RETRIEVE:ACL")
def test_grant_nonsense_permission(): def test_grant_nonsense_permission():
...@@ -1033,9 +1029,9 @@ def test_grant_nonsense_permission(): ...@@ -1033,9 +1029,9 @@ def test_grant_nonsense_permission():
def test_global_acl_there(): def test_global_acl_there():
assert_is_not_none(db.get_global_acl()) assert_is_not_none(db.get_global_acl())
assert_true(isinstance(db.get_global_acl(), db.ACL)) assert isinstance(db.get_global_acl(), db.ACL)
assert_is_not_none(db.get_known_permissions()) assert_is_not_none(db.get_known_permissions())
assert_true(isinstance(db.get_known_permissions(), db.Permissions)) assert isinstance(db.get_known_permissions(), db.Permissions)
print(db.get_known_permissions()) print(db.get_known_permissions())
print(db.get_global_acl()) print(db.get_global_acl())
......
...@@ -40,15 +40,20 @@ def teardown(): ...@@ -40,15 +40,20 @@ def teardown():
setup_module() setup_module()
@pytest.mark.xfail(reason="See caosdb-pylib issue #31 and #66")
def test_add_properties_with_wrong_role(): def test_add_properties_with_wrong_role():
p = db.Property(name="TestProperty1", datatype=db.TEXT).insert() p = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
rt = db.RecordType(name="TestRT1").add_property("TestProperty1").insert() rt = db.RecordType(name="TestRT1").add_property("TestProperty1").insert()
wrong_role = db.Record(name="TestRT1").retrieve() no_role = db.Entity(name="TestRT1").retrieve()
no_role.role = None
wrong_role = db.Entity(name="TestRT1").retrieve()
wrong_role.role = "Record"
right_role = db.RecordType(name="TestRT1").retrieve() right_role = db.RecordType(name="TestRT1").retrieve()
rec = db.Record(name="fail").add_property(wrong_role)
rec2 = db.Record(name="ok").add_property(right_role) with pytest.raises(ValueError):
rec = db.Record().add_parent("TestRT1").add_property(wrong_role)
rec2 = db.Record().add_parent("TestRT1").add_property(right_role)
rec3 = db.Record().add_parent("TestRT1").add_property(no_role)
assert wrong_role.get_property("TestProperty1") is not None assert wrong_role.get_property("TestProperty1") is not None
assert str(wrong_role.get_property("TestProperty1")) == str( assert str(wrong_role.get_property("TestProperty1")) == str(
...@@ -57,7 +62,13 @@ def test_add_properties_with_wrong_role(): ...@@ -57,7 +62,13 @@ def test_add_properties_with_wrong_role():
xml = rec2.to_xml() xml = rec2.to_xml()
assert not xml.xpath("/Record/Property/Property") assert not xml.xpath("/Record/Property/Property")
xml = rec.to_xml() xml = rec3.to_xml()
# TODO this fails because the Record is treated differently than the
# RecordType.
assert not xml.xpath("/Record/Property/Property") assert not xml.xpath("/Record/Property/Property")
rec2.insert()
rec3.insert()
for e in [rec2, rec3]:
assert e.id is not None
assert e.get_property("TestRT1") is not None
assert e.get_property("TestRT1").value is None
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment