Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_permissions.py 36.48 KiB
# encoding: utf-8
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""Created on 30.01.2017.
@author: tf
"""
from __future__ import absolute_import
import caosdb as db
from nose.tools import (assert_true, assert_equal, assert_raises, assert_false,
assert_is_none, assert_is_not_none, nottest,
with_setup) # @UnresolvedImport
from pytest import raises
from nose.tools import (assert_equal, assert_false, # @UnresolvedImport
assert_is_none, assert_is_not_none, assert_raises,
assert_true, nottest, with_setup)
from .test_misc import test_info
test_user = "test_user"
test_role = "test_role"
test_pw = "passphrase1P!"
easy_pw = "1234"
def setup_module():
insert_test_user()
def teardown_module():
switch_to_admin_user()
db.administration._delete_user(name="test_user")
db.administration._delete_role(name=test_role)
import os
try:
os.remove("test.dat")
except BaseException:
pass
try:
os.remove("test2.dat")
except BaseException:
pass
@nottest
def switch_to_test_user():
db.configure_connection(username=test_user, password=test_pw,
password_method="plain")
def switch_to_admin_user():
db.configure_connection()
def deny_permission(entity, permission, username=test_user, priority=False):
switch_to_admin_user()
entity.retrieve_acl()
entity.deny(
realm="CaosDB",
username=username,
permission=permission,
priority=priority)
entity.update_acl()
entity.acl = None
switch_to_test_user()
def grant_permission(entity, permission, username=test_user, priority=False,
switch=True):
if switch:
switch_to_admin_user()
entity.retrieve_acl()
entity.grant(
realm="CaosDB",
username=username,
permission=permission,
priority=priority)
ret = entity.update_acl()
entity.acl = None
if switch:
switch_to_test_user()
return ret
@nottest # No need to test manually, is called by setup()
def insert_test_user():
try:
db.administration._delete_user(name=test_user)
except BaseException:
pass
try:
db.administration._delete_role(name=test_role)
except BaseException:
pass
with assert_raises(db.ClientErrorException) as cee:
db.administration._insert_user(
name=test_user,
password=easy_pw,
status="ACTIVE")
assert_equal(cee.exception.status, 422,
"Easy password should raise a 422 error.")
db.administration._insert_user(
name=test_user,
password=test_pw,
status="ACTIVE")
db.administration._insert_role(name=test_role, description="A test role")
db.administration._set_roles(username=test_user, roles=[test_role])
set_transaction_permissions_test_role()
def set_transaction_permissions_test_role():
switch_to_admin_user()
db.administration._set_permissions(
role=test_role, permission_rules=[
db.administration.PermissionRule(
"Grant", "TRANSACTION:*")])
def revoke_permissions_test_role():
switch_to_admin_user()
db.administration._set_permissions(
role=test_role, permission_rules=[])
def teardown():
setup()
def setup():
switch_to_admin_user()
try:
db.execute_query("FIND Test*").delete()
except BaseException:
pass
test_info()
@with_setup(setup, teardown)
def test_basic_acl_stuff():
p = db.Property(
name="TestProperty",
datatype=db.TEXT).insert(
flags={
"ACL": None})
assert_true(p.is_valid())
assert_is_not_none(p.acl)
p.retrieve(flags={"ACL": None})
assert_true(p.is_valid())
assert_is_not_none(p.acl)
assert_true(isinstance(p.acl, db.ACL))
assert_false(p.acl.is_empty())
user_acl = p.acl.get_acl_for_user(
db.get_config().get("Connection", "username"))
print(user_acl)
assert_is_not_none(user_acl)
assert_true(isinstance(user_acl, db.ACL))
assert_false(user_acl.is_empty())
user_permissions = p.acl.get_permissions_for_user(
db.get_config().get("Connection", "username"))
assert_is_not_none(user_permissions)
assert_true(isinstance(user_permissions, set))
assert_true(len(user_permissions) > 0)
assert_true("DELETE" in user_permissions)
other_role_permissions = p.acl.get_permissions_for_role("other_role")
assert_false("DELETE" in other_role_permissions)
@with_setup(setup, teardown)
def test_query():
person = db.RecordType("TestPerson").insert()
db.Property("TestFirstName", datatype=db.TEXT).insert()
db.Property("TestConductor", datatype=person).insert()
dan = db.Record(
name="TestDaniel").add_property(
name="TestFirstName",
value="Daniel").add_parent(person).insert()
exp = db.RecordType(
name="TestExperiment").add_property(
name="TestConductor",
value=dan.id).insert()
assert_equal(
db.execute_query(
"FIND TestExperiment WHICH HAS A TestConductor->TestPerson",
unique=True).id,
exp.id)
assert_equal(
db.execute_query(
"FIND TestExperiment WHICH HAS A TestConductor=TestPerson",
unique=True).id,
exp.id)
assert_equal(
db.execute_query(
"FIND TestExperiment WHICH HAS A TestConductor=" + str(dan.id),
unique=True).id, exp.id)
assert_equal(
db.execute_query(
"FIND TestExperiment",
unique=True).id,
exp.id)
assert_equal(
db.execute_query(
"FIND TestExperiment WHICH HAS A TestConductor WHICH has a TestFirstName=Daniel",
unique=True).id,
exp.id)
'''success'''
grant_permission(person, "RETRIEVE:*")
grant_permission(dan, "RETRIEVE:*")
grant_permission(exp, "RETRIEVE:*")
switch_to_test_user()
assert_equal(
db.execute_query(
"FIND TestExperiment WHICH HAS A TestConductor WHICH has a TestFirstName=Daniel",
unique=True).id,
exp.id)
'''failure - dan'''
deny_permission(dan, "RETRIEVE:*")
switch_to_test_user()
# this fails if server is configured with
# QUERY_FILTER_ENTITIES_WITHOUT_RETRIEVE_PERMISSIONS = FALSE
with raises(db.TransactionError) as cm:
db.execute_query(
"FIND TestExperiment WHICH HAS A TestConductor WHICH has a TestFirstName=Daniel",
unique=True)
assert cm.value.has_error(db.EntityDoesNotExistError)
'''... but works without the which clause'''
assert db.execute_query("FIND TestExperiment", unique=True).id == exp.id
'''and with the id'''
assert db.execute_query(
"FIND TestExperiment WHICH HAS A TestConductor=" + str(dan.id),
unique=True).id == exp.id
'''failure - exp'''
grant_permission(dan, "RETRIEVE:*")
deny_permission(exp, "RETRIEVE:*")
switch_to_test_user()
with raises(db.TransactionError) as cm:
db.execute_query(
"FIND TestExperiment WHICH HAS A TestConductor=TestDaniel",
unique=True)
assert cm.value.has_error(db.EntityDoesNotExistError)
with raises(db.TransactionError) as cm:
db.execute_query("FIND TestExperiment", unique=True)
assert cm.value.has_error(db.EntityDoesNotExistError)
@with_setup(setup, teardown)
def test_update_acl():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid())
assert_is_none(p.acl)
p.retrieve_acl()
assert_is_not_none(p.acl)
assert_true(
"USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
p.acl.deny(
username=db.get_config().get("Connection", "username"),
permission="USE:AS_DATA_TYPE")
assert_false(
"USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
'''Success'''
p.update(flags={"ACL": None})
assert_true(p.is_valid())
assert_is_not_none(p.acl)
assert_false(
"USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
p2 = db.execute_query(
"FIND TestProperty",
unique=True,
flags={
"ACL": None})
assert_true(p2.is_valid())
assert_is_not_none(p2.acl)
assert_true(
"USE:AS_PROPERTY" in p2.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
assert_false(
"USE:AS_DATA_TYPE" in p2.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
assert_true(
"EDIT:ACL" in p2.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
p2.acl.deny(
username=db.get_config().get("Connection", "username"),
permission="EDIT:ACL")
assert_false(
"EDIT:ACL" in p2.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
'''Success'''
p2.update()
p3 = db.execute_query(
"FIND TestProperty",
unique=True,
flags={
"ACL": None})
assert_true(p3.is_valid())
assert_is_not_none(p3.acl)
assert_true(
"USE:AS_PROPERTY" in p3.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
assert_false(
"USE:AS_DATA_TYPE" in p3.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
assert_false(
"EDIT:ACL" in p3.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
switch_to_test_user()
'''Failure'''
p.retrieve_acl()
p3.acl.deny(
username=db.get_config().get("Connection", "username"),
permission="USE:AS_PROPERTY")
with raises(db.TransactionError) as te:
p3.update_acl()
assert te.value.has_error(db.AuthorizationException)
p3 = db.execute_query(
"FIND TestProperty",
unique=True,
flags={
"ACL": None})
assert p3.is_valid()
assert p3.acl is not None
assert ("USE:AS_PROPERTY" in p3.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
assert not ("USE:AS_DATA_TYPE" in p3.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
assert not ("EDIT:ACL" in p3.acl.get_permissions_for_user(
db.get_config().get("Connection", "username")))
'''Failure'''
switch_to_test_user()
p3.acl.grant(username=test_user, permission="EDIT:ACL")
with raises(db.TransactionError) as te:
p3.update()
assert te.value.has_error(db.AuthorizationException)
@with_setup(setup, teardown)
def test_update_name():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid())
grant_permission(p, "RETRIEVE:*")
grant_permission(p, "UPDATE:*")
'''Success'''
p.name = "TestPropertyNew"
assert_is_none(p.acl)
p.update()
p2 = db.execute_query("FIND Test*", unique=True)
assert_true(p.is_valid())
assert_equal(p2.name, "TestPropertyNew")
'''Failure'''
deny_permission(p, "UPDATE:NAME")
p.retrieve()
assert_false("UPDATE:NAME" in p.permissions)
p.name = "TestPropertyEvenNewer"
with raises(db.TransactionError) as te:
p.update()
assert te.value.has_error(db.AuthorizationException)
p2 = db.execute_query("FIND Test*", unique=True)
assert p2.is_valid()
assert p2.name == "TestPropertyNew"
@with_setup(setup, teardown)
def test_update_desc():
p = db.Property(
name="TestProperty",
description="Description",
datatype=db.TEXT).insert()
assert_true(p.is_valid())
grant_permission(p, "RETRIEVE:*")
grant_permission(p, "UPDATE:DESCRIPTION")
'''Success'''
switch_to_test_user()
assert_equal(p.description, "Description")
p.description = "DescriptionNew"
p.update()
p2 = db.execute_query("FIND Test*", unique=True)
assert_true(p.is_valid())
assert_equal(p2.description, "DescriptionNew")
deny_permission(p, "UPDATE:DESCRIPTION")
'''Failure'''
p.description = "DescriptionEvenNewer"
with raises(db.TransactionError) as te:
p.update()
assert te.value.has_error(db.AuthorizationException)
p2 = db.execute_query("FIND Test*", unique=True)
assert p2.is_valid()
assert p2.description == "DescriptionNew"
@with_setup(setup, teardown)
def test_update_data_type():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid())
grant_permission(p, "RETRIEVE:ENTITY")
grant_permission(p, "UPDATE:DATA_TYPE")
'''Success'''
assert_equal(p.datatype, db.TEXT)
p.datatype = db.INTEGER
p.update()
p2 = db.execute_query("FIND Test*", unique=True)
assert_true(p.is_valid())
assert_equal(p2.datatype, db.INTEGER)
deny_permission(p, "UPDATE:DATA_TYPE")
'''Failure'''
p.datatype = db.DOUBLE
with raises(db.TransactionError) as te:
p.update()
assert te.value.has_error(db.AuthorizationException)
p2 = db.execute_query("FIND Test*", unique=True)
assert p2.is_valid()
assert p2.datatype == db.INTEGER
@with_setup(setup, teardown)
def test_update_role():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid())
grant_permission(p, "RETRIEVE:ENTITY")
grant_permission(p, "UPDATE:ROLE")
'''Success'''
rt = db.RecordType(name="TestProperty").retrieve()
rt.update()
rt2 = db.execute_query("FIND TestProperty", unique=True)
assert_true(isinstance(rt2, db.RecordType))
deny_permission(p, "UPDATE:ROLE")
'''Failure'''
rec = db.Record(name="TestProperty").retrieve()
with raises(db.TransactionError) as te:
rec.update()
assert te.value.has_error(db.AuthorizationException)
rt2 = db.execute_query("FIND Test*", unique=True)
assert rt2.is_valid()
assert isinstance(rt2, db.RecordType)
@with_setup(setup, teardown)
def test_update_move_file():
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
f = db.File(
name="TestFile",
path="/permissiontestfiles/test.dat",
file="test.dat").insert()
assert_true(f.is_valid())
grant_permission(f, "RETRIEVE:ENTITY")
grant_permission(f, "UPDATE:FILE:MOVE")
'''SUCCESS'''
f.path = "/otherpermissiontestfiles/test.dat"
f.update()
f2 = db.execute_query("FIND TestFile", unique=True)
assert_equal(f2.path, "/otherpermissiontestfiles/test.dat")
deny_permission(f, "UPDATE:FILE:MOVE")
'''FAILURE'''
f.path = "/againotherpermissiontestfiles/test.dat"
with raises(db.TransactionError) as te:
f.update()
assert te.value.has_error(db.AuthorizationException)
f2 = db.execute_query("FIND TestFile", unique=True)
assert f2.path == "/otherpermissiontestfiles/test.dat"
@with_setup(setup, teardown)
def test_update_add_file():
upload_file = open("test.dat", "w")
upload_file.write("test update add file: #here#\n")
upload_file.close()
f = db.File(name="TestFile").insert()
assert f.is_valid()
grant_permission(f, "RETRIEVE:ENTITY")
'''FAILURE'''
f.path = "/permissiontestfiles/newtest.dat"
f.file = upload_file
with raises(db.TransactionError) as te:
f.update()
assert te.value.has_error(db.AuthorizationException)
f2 = db.execute_query("FIND TestFile", unique=True)
assert f2.path is None
'''SUCCESS'''
grant_permission(f, "UPDATE:FILE:ADD")
f2 = db.execute_query("FIND TestFile", unique=True)
f2.path = "/permissiontestfiles/newtest.dat"
f2.file = upload_file
f2.update()
assert_true(f2.is_valid())
f2 = db.execute_query("FIND TestFile", unique=True)
assert_equal(f2.path, "/permissiontestfiles/newtest.dat")
@with_setup(setup, teardown)
def test_update_change_file():
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
upload_file2 = open("test2.dat", "w")
upload_file2.write("hello universe\n")
upload_file2.close()
f = db.File(
name="TestFile",
file=upload_file,
path="permissiontestfiles/test.dat").insert()
assert_true(f.is_valid())
grant_permission(f, "RETRIEVE:ENTITY")
grant_permission(f, "RETRIEVE:FILE")
'''FAILURE'''
deny_permission(f, "UPDATE:FILE:REMOVE")
f.file = upload_file2
with raises(db.TransactionError) as te:
f.update()
assert te.value.has_error(db.AuthorizationException)
f2 = db.execute_query("FIND TestFile", unique=True)
download_file = f2.download()
assert db.File._get_checksum(
download_file) == db.File._get_checksum(upload_file)
'''SUCCESS'''
print('#################################################################')
print('#################################################################')
print(f.acl)
grant_permission(f, "UPDATE:FILE:REMOVE")
print(f.acl)
grant_permission(f, "UPDATE:FILE:ADD")
print(f.acl)
f2 = db.execute_query("FIND TestFile", unique=True)
f2.file = upload_file2
f2.update()
assert f2.is_valid()
f2 = db.execute_query("FIND TestFile", unique=True)
download_file = f2.download()
assert db.File._get_checksum(
download_file) == db.File._get_checksum(upload_file2)
@with_setup(setup, teardown)
def test_update_add_property():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid())
p2 = db.Property(name="TestProperty2", datatype=db.TEXT).insert()
assert_true(p2.is_valid())
rt = db.RecordType(name="TestRecordType").insert()
assert_true(rt.is_valid())
rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_properties()), 0)
grant_permission(p, "RETRIEVE:ENTITY")
grant_permission(p2, "RETRIEVE:ENTITY")
grant_permission(rt, "RETRIEVE:ENTITY")
grant_permission(p, "USE:*")
grant_permission(p2, "USE:*")
grant_permission(rt, "UPDATE:PROPERTY:ADD")
'''Success - add p to rt'''
rt.add_property(name="TestProperty", id=p.id).update()
rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_properties()), 1)
assert_is_not_none(rt.get_property("TestProperty"))
deny_permission(rt, "UPDATE:PROPERTY:ADD")
'''Failure - add p to rt'''
rt.add_property(name="TestProperty", id=p.id)
with raises(db.TransactionError) as te:
rt.update()
assert te.value.has_error(db.AuthorizationException)
rt2 = db.execute_query("FIND TestRecordType", unique=True)
assert len(rt2.get_properties()) == 1
assert rt2.get_property("TestProperty") is not None
'''Failure - add p2 to rt'''
rt.add_property(name="TestProperty2", id=p2.id)
with raises(db.TransactionError) as te:
rt.update()
assert te.value.has_error(db.AuthorizationException)
rt2 = db.execute_query("FIND TestRecordType", unique=True)
assert len(rt2.get_properties()) == 1
assert rt2.get_property("TestProperty") is not None
@with_setup(setup, teardown)
def test_update_remove_property():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert_true(p.is_valid())
p2 = db.Property(name="TestProperty2", datatype=db.TEXT).insert()
assert_true(p2.is_valid())
rt = db.RecordType(
name="TestRecordType").add_property(
name="TestProperty").add_property(
name="TestProperty2").insert()
assert_true(rt.is_valid())
rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_properties()), 2)
assert_is_not_none(rt.get_property("TestProperty"))
assert_is_not_none(rt.get_property("TestProperty2"))
grant_permission(p, "RETRIEVE:ENTITY")
grant_permission(p2, "RETRIEVE:ENTITY")
grant_permission(rt, "RETRIEVE:ENTITY")
grant_permission(p, "USE:*")
grant_permission(p2, "USE:*")
grant_permission(rt, "UPDATE:PROPERTY:REMOVE")
'''Success - remove p2 from rt'''
rt.remove_property("TestProperty2").update()
rt2 = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt2.get_properties()), 1)
assert_is_not_none(rt2.get_property("TestProperty"))
assert_is_none(rt2.get_property("TestProperty2"))
deny_permission(rt, "UPDATE:PROPERTY:REMOVE")
'''Failure - remove p from rt'''
rt.remove_property("TestProperty")
with raises(db.TransactionError) as te:
rt.update()
assert te.value.has_error(db.AuthorizationException)
rt2 = db.execute_query("FIND TestRecordType", unique=True)
assert len(rt2.get_properties()) == 1
assert rt2.get_property("TestProperty") is not None
@with_setup(setup, teardown)
def test_update_add_parent():
rt = db.RecordType(name="TestRecordType").insert()
assert_true(rt.is_valid())
par1 = db.RecordType(name="TestRecordTypePar1").insert()
assert_true(par1.is_valid())
par2 = db.RecordType(name="TestRecordTypePar2").insert()
assert_true(par2.is_valid())
rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_parents()), 0)
grant_permission(rt, "RETRIEVE:ENTITY")
grant_permission(par1, "RETRIEVE:ENTITY")
grant_permission(par2, "RETRIEVE:ENTITY")
grant_permission(par1, "USE:*")
grant_permission(par2, "USE:*")
grant_permission(rt, "UPDATE:PARENT:ADD")
'''Success - add par1 to rt'''
rt.add_parent(name="TestRecordTypePar1", id=par1.id).update()
rt2 = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt2.get_parents()), 1)
assert_is_not_none(rt2.get_parent("TestRecordTypePar1"))
deny_permission(rt, "UPDATE:PARENT:ADD")
'''Failure - add par1 to rt'''
rt.add_parent(name="TestRecordTypePar1", id=par1.id)
with raises(db.TransactionError) as te:
rt.update()
assert te.value.has_error(db.AuthorizationException)
rt2 = db.execute_query("FIND TestRecordType", unique=True)
assert len(rt2.get_parents()) == 1
assert rt2.get_parent("TestRecordTypePar1") is not None
'''Failure - add par2 to rt'''
rt.add_parent(name="TestRecordTypePar2", id=par2.id)
with raises(db.TransactionError) as te:
rt.update()
assert te.value.has_error(db.AuthorizationException)
rt2 = db.execute_query("FIND TestRecordType", unique=True)
assert len(rt2.get_parents()) == 1
assert rt2.get_parent("TestRecordTypePar1") is not None
assert rt2.get_parent("TestRecordTypePar2") is None
@with_setup(setup, teardown)
def test_update_remove_parent():
par1 = db.RecordType(name="TestRecordTypePar1").insert()
assert_true(par1.is_valid())
par2 = db.RecordType(name="TestRecordTypePar2").insert()
assert_true(par2.is_valid())
rt = db.RecordType(name="TestRecordType").add_parent(
par1).add_parent(par2).insert()
assert_true(rt.is_valid())
rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_parents()), 2)
assert_is_not_none(rt.get_parent("TestRecordTypePar1"))
assert_is_not_none(rt.get_parent("TestRecordTypePar2"))
grant_permission(rt, "RETRIEVE:ENTITY")
grant_permission(par1, "RETRIEVE:ENTITY")
grant_permission(par2, "RETRIEVE:ENTITY")
grant_permission(par1, "USE:*")
grant_permission(par2, "USE:*")
grant_permission(rt, "UPDATE:PARENT:REMOVE")
'''Success'''
rt.remove_parent("TestRecordTypePar2").update()
rt = db.execute_query("FIND TestRecordType", unique=True)
assert_equal(len(rt.get_parents()), 1)
assert_is_not_none(rt.get_parent("TestRecordTypePar1"))
assert_is_none(rt.get_parent("TestRecordTypePar2"))
deny_permission(rt, "UPDATE:PARENT:REMOVE")
'''Failure'''
rt.remove_parent("TestRecordTypePar1")
with raises(db.TransactionError) as te:
rt.update()
assert te.value.has_error(db.AuthorizationException)
rt = db.execute_query("FIND TestRecordType", unique=True)
assert len(rt.get_parents()) == 1
assert rt.get_parent("TestRecordTypePar1") is not None
assert rt.get_parent("TestRecordTypePar2") is None
@with_setup(setup, teardown)
def test_update_value():
p = db.Property(
name="TestProperty",
datatype=db.TEXT,
value="Value").insert()
assert p.is_valid()
p = db.execute_query("FIND Test*", unique=True)
assert p.is_valid()
assert p.value == "Value"
grant_permission(p, "RETRIEVE:ENTITY")
grant_permission(p, "UPDATE:VALUE")
'''Success'''
p.value = "NewValue"
p.update()
p = db.execute_query("FIND Test*", unique=True)
assert p.is_valid()
assert p.value == "NewValue"
deny_permission(p, "UPDATE:VALUE")
'''Failure'''
p.value = "EvenNewerValue"
with raises(db.TransactionError) as te:
p.update()
assert te.value.has_error(db.AuthorizationException)
p2 = db.execute_query("FIND Test*", unique=True)
assert p2.is_valid()
assert p2.value == "NewValue"
@with_setup(setup, teardown)
def test_deletion():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert p.is_valid()
grant_permission(p, "RETRIEVE:ENTITY")
'''Failure'''
p = db.execute_query("FIND Test*", unique=True)
assert p.is_valid()
with raises(db.TransactionError) as te:
p.delete()
assert te.value.has_error(db.AuthorizationException)
'''Success'''
grant_permission(p, "DELETE")
p.delete()
assert p.get_messages()[
0].description == "This entity has been deleted successfully."
@with_setup(setup, teardown)
def test_retrieve_acl():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert p.is_valid()
'''Success'''
grant_permission(p, "RETRIEVE:*")
p.retrieve_acl()
'''Failure'''
deny_permission(p, "RETRIEVE:ACL")
with raises(db.TransactionError) as te:
p.retrieve_acl()
assert te.value.has_error(db.AuthorizationException)
@with_setup(setup, teardown)
def test_retrieve_history():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert p.is_valid()
grant_permission(p, "RETRIEVE:ENTITY")
'''Failure'''
with raises(db.TransactionError) as te:
p.retrieve(flags={"H": None})
assert te.value.has_error(db.AuthorizationException)
'''Success'''
grant_permission(p, "RETRIEVE:HISTORY")
p.retrieve(flags={"H": None})
assert p.messages["History"] == ('Insert', None)
@with_setup(setup, teardown)
def test_retrieve_entity():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert p.is_valid()
''' Success'''
grant_permission(p, "RETRIEVE:*")
db.Property(name="TestProperty").retrieve()
'''Failure'''
deny_permission(p, "RETRIEVE:ENTITY")
with raises(db.TransactionError) as te:
p.retrieve()
assert te.value.has_error(db.AuthorizationException)
@with_setup(setup, teardown)
def test_retrieve_owner():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert p.is_valid()
'''Failure'''
grant_permission(p, "RETRIEVE:ENTITY")
deny_permission(p, "RETRIEVE:OWNER")
with raises(db.TransactionError) as te:
p.retrieve(flags={"owner": None})
assert te.value.has_error(db.AuthorizationException)
'''Success'''
grant_permission(p, "RETRIEVE:OWNER")
p.retrieve(flags={"owner": None})
@with_setup(setup, teardown)
def test_download_file():
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
f = db.File(
name="TestFile",
file=upload_file,
path="permissiontestfiles/test.dat").insert()
assert f.is_valid()
'''FAILURE'''
f2 = db.execute_query("FIND TestFile", unique=True)
grant_permission(f2, "RETRIEVE:ENTITY")
deny_permission(f2, "RETRIEVE:FILE")
with raises(db.HTTPAuthorizationException):
f.download()
'''SUCCESS'''
grant_permission(f2, "RETRIEVE:FILE")
f2 = db.execute_query("FIND TestFile", unique=True)
download_file = f2.download()
assert db.File._get_checksum(
download_file) == db.File._get_checksum(upload_file)
@with_setup(setup, teardown)
def test_grant_priority_permission():
p = db.Property(name="TestProperty2", datatype=db.TEXT).insert()
switch_to_test_user()
with raises(db.TransactionError) as te:
# other user cannot delete this.
p.delete()
assert te.value.has_error(db.AuthorizationException)
deny_permission(p, "DELETE")
with raises(db.TransactionError) as te:
# still not working
p.delete()
assert te.value.has_error(db.AuthorizationException)
# now its working
grant_permission(p, "DELETE", priority=True)
p.delete()
@with_setup(setup, teardown)
def test_deny_priority_permission():
p = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
switch_to_test_user()
with raises(db.TransactionError) as te:
# other user cannot delete this.
p.delete()
assert te.value.has_error(db.AuthorizationException)
deny_permission(p, "DELETE")
with raises(db.TransactionError) as te:
# still not working
p.delete()
assert te.value.has_error(db.AuthorizationException)
# now it's working
grant_permission(p, "DELETE")
# now it's not
deny_permission(p, "DELETE", priority=True)
with raises(db.TransactionError) as te:
# still not working
p.delete()
assert te.value.has_error(db.AuthorizationException)
@with_setup(setup, teardown)
def test_change_priority_permission():
entity = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
grant_permission(entity, "*")
entity.retrieve(flags={"ACL": None})
# try and grant a priority permission with the test_user
entity.acl.grant(
username="test_user",
permission="USE:AS_PROPERTY",
priority=True)
with raises(db.TransactionError) as te:
entity.update_acl()
assert te.value.has_error(db.AuthorizationException)
@with_setup(setup, teardown)
def test_permissions_there():
entity = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
assert_is_not_none(entity.permissions)
entity2 = db.Property(id=entity.id).retrieve()
assert_is_not_none(entity2.permissions)
assert_true(entity2.is_permitted("RETRIEVE:ACL"))
@with_setup(setup, teardown)
def test_grant_nonsense_permission():
entity = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
entity.retrieve_acl()
entity.grant(username="someone", permission="PAINT:BLUE")
with raises(db.TransactionError):
entity.update_acl()
@with_setup(setup, teardown)
def test_global_acl_there():
assert_is_not_none(db.get_global_acl())
assert_true(isinstance(db.get_global_acl(), db.ACL))
assert_is_not_none(db.get_known_permissions())
assert_true(isinstance(db.get_known_permissions(), db.Permissions))
print(db.get_known_permissions())
print(db.get_global_acl())
@with_setup(setup, teardown)
def test_use_as_property():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert p.is_valid()
switch_to_test_user()
rt = db.RecordType(name="TestRecordType").add_property(name="TestProperty")
deny_permission(p, "USE:AS_PROPERTY")
'''Failure'''
deny_permission(p, "USE:AS_PROPERTY")
with raises(db.TransactionError) as cm:
rt.insert()
assert cm.value.has_error(db.UnqualifiedPropertiesError)
assert int(rt.get_property("TestProperty").get_errors()[0].code) == 403
'''Success'''
grant_permission(p, "USE:AS_PROPERTY")
rt2 = db.RecordType(
name="TestRecordType").add_property(
name="TestProperty").insert()
assert rt2.is_valid()
@with_setup(setup, teardown)
def test_use_as_data_type():
dt_name = "TestPersonRecordType"
dt = db.RecordType(name=dt_name).insert()
'''success'''
grant_permission(dt, "RETRIEVE:ENTITY")
grant_permission(dt, "USE:AS_DATA_TYPE")
db.Property(name="TestConductorProperty", datatype=dt_name).insert()
deny_permission(dt, "USE:AS_DATA_TYPE")
with raises(db.TransactionError) as cm:
db.Property(name="TestConductorProperty2", datatype=dt_name).insert()
assert cm.value.has_error(db.AuthorizationException)
@with_setup(setup, teardown)
def test_use_as_reference():
p = db.RecordType(name="TestRecordTypeForReference").insert()
rec = db.Record().add_parent(parent=p).insert()
grant_permission(p, "RETRIEVE:ENTITY")
grant_permission(p, "USE:*")
grant_permission(rec, "RETRIEVE:ENTITY")
grant_permission(rec, "USE:*")
rt = db.RecordType(
name="TestRecordType").add_property(
name="TestRecordTypeForReference",
value=rec).insert()
assert rt.is_valid()
deny_permission(rec, "USE:AS_REFERENCE")
rt2 = db.RecordType(
name="TestRecordType2").add_property(
name="TestRecordTypeForReference",
value=rec)
with raises(db.TransactionError) as cm:
rt2.insert()
assert cm.value.has_error(db.UnqualifiedPropertiesError)
assert int(rt2.get_property(p.name).get_errors()[0].code) == 403
@with_setup(setup, teardown)
def test_use_as_parent():
p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
assert p.is_valid()
switch_to_test_user()
p2 = db.Property(
name="TestPropertyChild",
datatype=db.TEXT).add_parent(
name="TestProperty")
deny_permission(p, "USE:AS_PARENT")
'''Failure'''
deny_permission(p, "USE:AS_PARENT")
with raises(db.TransactionError) as cm:
p2.insert()
assert cm.value.has_error(db.UnqualifiedParentsError)
assert int(p2.get_parent("TestProperty").get_errors()[0].code) == 403
'''Success'''
grant_permission(p, "USE:AS_PARENT")
p3 = db.Property(
name="TestPropertyChild").add_parent(
name="TestProperty").insert()
assert p3.is_valid()
@with_setup(setup, teardown)
def test_access_control_job_bug():
"""test_access_control_job_bug.
The AccessControl class attempts to retrieve an entity with an id < 0,
which causes a DataTruncation Error in the SQL backend.
This happens if a user does not have the permission "TRANSACTION:INSERT"
"""
revoke_permissions_test_role()
switch_to_test_user()
with raises(db.TransactionError) as te:
rt1 = db.RecordType(name="TestRT").add_parent(id=-5).insert()
assert te.value.has_error(db.AuthorizationException)
set_transaction_permissions_test_role()
@with_setup(setup, teardown)
def test_check_entity_acl_roles():
'''Test the CheckEntityACLRoles class.
Insert an entity.
With "CHECK_ENTITY_ACL_ROLES_MODE=MUST":
Add permissions for a non-existing user role -> error
Set "CHECK_ENTITY_ACL_ROLES_MODE=SHOULD":
Add permissions for a non-existing user role -> warning
reset to CHECK_ENTITY_ACL_ROLES_MODE=MUST
'''
reset = db.administration.get_server_property(
"CHECK_ENTITY_ACL_ROLES_MODE")
if reset == "SHOULD":
db.administration.set_server_property(
"CHECK_ENTITY_ACL_ROLES_MODE", "MUST")
p = db.Property(name="TestP", datatype=db.TEXT,
description="test_check_entity_acl_roles").insert()
# test failure with error
with raises(db.TransactionError) as cm:
grant_permission(p, "USE:AS_PARENT", username="asdf-non-existing",
switch=False)
errors = cm.value.get_entity().get_errors()
assert errors[0].description == "User Role does not exist."
db.administration.set_server_property(
"CHECK_ENTITY_ACL_ROLES_MODE", "SHOULD")
# test success with warning
ret = grant_permission(p, "USE:AS_PROPERTY",
username="asdf-non-existing", switch=False)
assert ret.get_warnings()[0].description == "User Role does not exist."
db.administration.set_server_property("CHECK_ENTITY_ACL_ROLES_MODE", reset)