Skip to content
Snippets Groups Projects
Select Git revision
  • 17724e12220273e2824de5f068f57c0364117d30
  • main default protected
  • dev
  • f-spss-value-label-name
  • f-unmod
  • f-checkidentical
  • f-simple-breakpoint
  • f-new-debug-tree
  • f-existing-file-id
  • f-no-ident
  • f-collect-problems
  • f-refactor-debug-tree
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
  • v0.5.0
  • v0.4.0
  • v0.3.0
  • v0.2.0
  • v0.1.0
28 results

test_tool.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_permissions.py 36.47 KiB
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    """Created on 30.01.2017.
    
    @author: tf
    """
    
    from __future__ import absolute_import
    
    import caosdb as db
    from nose.tools import (assert_true, assert_equal, assert_raises, assert_false,
                            assert_is_none, assert_is_not_none, nottest,
                            with_setup)  # @UnresolvedImport
    from pytest import raises
    from nose.tools import (assert_equal, assert_false,  # @UnresolvedImport
                            assert_is_none, assert_is_not_none, assert_raises,
                            assert_true, nottest, with_setup)
    
    from .test_misc import test_info
    
    test_user = "test_user"
    test_role = "test_role"
    test_pw = "passphrase1P!"
    easy_pw = "1234"
    
    
    def setup_module():
        insert_test_user()
    
    
    def teardown_module():
        switch_to_admin_user()
        db.administration._delete_user(name="test_user")
        db.administration._delete_role(name=test_role)
        import os
        try:
            os.remove("test.dat")
        except BaseException:
            pass
        try:
            os.remove("test2.dat")
        except BaseException:
            pass
    
    
    @nottest
    def switch_to_test_user():
        db.configure_connection(username=test_user, password=test_pw,
                                password_method="plain")
    
    
    def switch_to_admin_user():
        db.configure_connection()
    
    
    def deny_permission(entity, permission, username=test_user, priority=False):
        switch_to_admin_user()
        entity.retrieve_acl()
        entity.deny(
            realm="CaosDB",
            username=username,
            permission=permission,
            priority=priority)
        entity.update_acl()
        entity.acl = None
        switch_to_test_user()
    
    
    def grant_permission(entity, permission, username=test_user, priority=False,
                         switch=True):
        if switch:
            switch_to_admin_user()
        entity.retrieve_acl()
        entity.grant(
            realm="CaosDB",
            username=username,
            permission=permission,
            priority=priority)
        ret = entity.update_acl()
        entity.acl = None
        if switch:
            switch_to_test_user()
        return ret
    
    
    @nottest  # No need to test manually, is called by setup()
    def insert_test_user():
        try:
            db.administration._delete_user(name=test_user)
        except BaseException:
            pass
        try:
            db.administration._delete_role(name=test_role)
        except BaseException:
            pass
    
        with assert_raises(db.ClientErrorException) as cee:
            db.administration._insert_user(
                name=test_user,
                password=easy_pw,
                status="ACTIVE")
        assert_equal(cee.exception.status, 422,
                     "Easy password should raise a 422 error.")
    
        db.administration._insert_user(
            name=test_user,
            password=test_pw,
            status="ACTIVE")
        db.administration._insert_role(name=test_role, description="A test role")
        db.administration._set_roles(username=test_user, roles=[test_role])
        set_transaction_permissions_test_role()
    
    
    def set_transaction_permissions_test_role():
        switch_to_admin_user()
        db.administration._set_permissions(
            role=test_role, permission_rules=[
                db.administration.PermissionRule(
                    "Grant", "TRANSACTION:*")])
    
    
    def revoke_permissions_test_role():
        switch_to_admin_user()
        db.administration._set_permissions(
            role=test_role, permission_rules=[])
    
    
    def teardown():
        setup()
    
    
    def setup():
        switch_to_admin_user()
        try:
            db.execute_query("FIND Test*").delete()
        except BaseException:
            pass
        test_info()
    
    
    @with_setup(setup, teardown)
    def test_basic_acl_stuff():
        p = db.Property(
            name="TestProperty",
            datatype=db.TEXT).insert(
            flags={
                "ACL": None})
        assert_true(p.is_valid())
        assert_is_not_none(p.acl)
    
        p.retrieve(flags={"ACL": None})
        assert_true(p.is_valid())
        assert_is_not_none(p.acl)
        assert_true(isinstance(p.acl, db.ACL))
        assert_false(p.acl.is_empty())
    
        user_acl = p.acl.get_acl_for_user(
            db.get_config().get("Connection", "username"))
        print(user_acl)
    
        assert_is_not_none(user_acl)
        assert_true(isinstance(user_acl, db.ACL))
        assert_false(user_acl.is_empty())
    
        user_permissions = p.acl.get_permissions_for_user(
            db.get_config().get("Connection", "username"))
        assert_is_not_none(user_permissions)
        assert_true(isinstance(user_permissions, set))
        assert_true(len(user_permissions) > 0)
    
        assert_true("DELETE" in user_permissions)
    
        other_role_permissions = p.acl.get_permissions_for_role("other_role")
        assert_false("DELETE" in other_role_permissions)
    
    
    @with_setup(setup, teardown)
    def test_query():
        person = db.RecordType("TestPerson").insert()
        db.Property("TestFirstName", datatype=db.TEXT).insert()
        db.Property("TestConductor", datatype=person).insert()
    
        dan = db.Record(
            name="TestDaniel").add_property(
            name="TestFirstName",
            value="Daniel").add_parent(person).insert()
        exp = db.RecordType(
            name="TestExperiment").add_property(
            name="TestConductor",
            value=dan.id).insert()
    
        assert_equal(
            db.execute_query(
                "FIND TestExperiment WHICH HAS A TestConductor->TestPerson",
                unique=True).id,
            exp.id)
        assert_equal(
            db.execute_query(
                "FIND TestExperiment WHICH HAS A TestConductor=TestPerson",
                unique=True).id,
            exp.id)
        assert_equal(
            db.execute_query(
                "FIND TestExperiment WHICH HAS A TestConductor=" + str(dan.id),
                unique=True).id, exp.id)
        assert_equal(
            db.execute_query(
                "FIND TestExperiment",
                unique=True).id,
            exp.id)
        assert_equal(
            db.execute_query(
                "FIND TestExperiment WHICH HAS A TestConductor WHICH has a TestFirstName=Daniel",
                unique=True).id,
            exp.id)
    
        '''success'''
        grant_permission(person, "RETRIEVE:*")
        grant_permission(dan, "RETRIEVE:*")
        grant_permission(exp, "RETRIEVE:*")
    
        switch_to_test_user()
        assert_equal(
            db.execute_query(
                "FIND TestExperiment WHICH HAS A TestConductor WHICH has a TestFirstName=Daniel",
                unique=True).id,
            exp.id)
    
        '''failure - dan'''
        deny_permission(dan, "RETRIEVE:*")
        switch_to_test_user()
    
        # this fails if server is configured with
        # QUERY_FILTER_ENTITIES_WITHOUT_RETRIEVE_PERMISSIONS = FALSE
        with raises(db.TransactionError) as cm:
            db.execute_query(
                "FIND TestExperiment WHICH HAS A TestConductor WHICH has a TestFirstName=Daniel",
                unique=True)
        assert cm.value.has_error(db.EntityDoesNotExistError)
        '''... but works without the which clause'''
        assert db.execute_query("FIND TestExperiment", unique=True).id == exp.id
        '''and with the id'''
        assert db.execute_query(
            "FIND TestExperiment WHICH HAS A TestConductor=" + str(dan.id),
            unique=True).id == exp.id
    
        '''failure - exp'''
        grant_permission(dan, "RETRIEVE:*")
        deny_permission(exp, "RETRIEVE:*")
        switch_to_test_user()
    
        with raises(db.TransactionError) as cm:
            db.execute_query(
                "FIND TestExperiment WHICH HAS A TestConductor=TestDaniel",
                unique=True)
        assert cm.value.has_error(db.EntityDoesNotExistError)
    
        with raises(db.TransactionError) as cm:
            db.execute_query("FIND TestExperiment", unique=True)
        assert cm.value.has_error(db.EntityDoesNotExistError)
    
    
    @with_setup(setup, teardown)
    def test_update_acl():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert_true(p.is_valid())
        assert_is_none(p.acl)
        p.retrieve_acl()
        assert_is_not_none(p.acl)
    
        assert_true(
            "USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
        p.acl.deny(
            username=db.get_config().get("Connection", "username"),
            permission="USE:AS_DATA_TYPE")
        assert_false(
            "USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
    
        '''Success'''
        p.update(flags={"ACL": None})
        assert_true(p.is_valid())
        assert_is_not_none(p.acl)
        assert_false(
            "USE:AS_DATA_TYPE" in p.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
    
        p2 = db.execute_query(
            "FIND TestProperty",
            unique=True,
            flags={
                "ACL": None})
        assert_true(p2.is_valid())
        assert_is_not_none(p2.acl)
        assert_true(
            "USE:AS_PROPERTY" in p2.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
        assert_false(
            "USE:AS_DATA_TYPE" in p2.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
    
        assert_true(
            "EDIT:ACL" in p2.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
        p2.acl.deny(
            username=db.get_config().get("Connection", "username"),
            permission="EDIT:ACL")
        assert_false(
            "EDIT:ACL" in p2.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
    
        '''Success'''
        p2.update()
    
        p3 = db.execute_query(
            "FIND TestProperty",
            unique=True,
            flags={
                "ACL": None})
        assert_true(p3.is_valid())
        assert_is_not_none(p3.acl)
        assert_true(
            "USE:AS_PROPERTY" in p3.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
        assert_false(
            "USE:AS_DATA_TYPE" in p3.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
        assert_false(
            "EDIT:ACL" in p3.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
    
        switch_to_test_user()
        '''Failure'''
        p.retrieve_acl()
        p3.acl.deny(
            username=db.get_config().get("Connection", "username"),
            permission="USE:AS_PROPERTY")
        with raises(db.TransactionError) as te:
            p3.update_acl()
        assert te.value.has_error(db.AuthorizationException)
    
        p3 = db.execute_query(
            "FIND TestProperty",
            unique=True,
            flags={
                "ACL": None})
        assert p3.is_valid()
        assert p3.acl is not None
        assert ("USE:AS_PROPERTY" in p3.acl.get_permissions_for_user(
                db.get_config().get("Connection", "username")))
        assert not ("USE:AS_DATA_TYPE" in p3.acl.get_permissions_for_user(
            db.get_config().get("Connection", "username")))
        assert not ("EDIT:ACL" in p3.acl.get_permissions_for_user(
            db.get_config().get("Connection", "username")))
    
        '''Failure'''
        switch_to_test_user()
        p3.acl.grant(username=test_user, permission="EDIT:ACL")
        with raises(db.TransactionError) as te:
            p3.update()
        assert te.value.has_error(db.AuthorizationException)
    
    
    @with_setup(setup, teardown)
    def test_update_name():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert_true(p.is_valid())
    
        grant_permission(p, "RETRIEVE:*")
        grant_permission(p, "UPDATE:*")
    
        '''Success'''
        p.name = "TestPropertyNew"
        assert_is_none(p.acl)
        p.update()
    
        p2 = db.execute_query("FIND Test*", unique=True)
        assert_true(p.is_valid())
        assert_equal(p2.name, "TestPropertyNew")
    
        '''Failure'''
        deny_permission(p, "UPDATE:NAME")
    
        p.retrieve()
        assert_false("UPDATE:NAME" in p.permissions)
    
        p.name = "TestPropertyEvenNewer"
        with raises(db.TransactionError) as te:
            p.update()
        assert te.value.has_error(db.AuthorizationException)
    
        p2 = db.execute_query("FIND Test*", unique=True)
        assert p2.is_valid()
        assert p2.name == "TestPropertyNew"
    
    
    @with_setup(setup, teardown)
    def test_update_desc():
        p = db.Property(
            name="TestProperty",
            description="Description",
            datatype=db.TEXT).insert()
        assert_true(p.is_valid())
    
        grant_permission(p, "RETRIEVE:*")
        grant_permission(p, "UPDATE:DESCRIPTION")
    
        '''Success'''
        switch_to_test_user()
        assert_equal(p.description, "Description")
        p.description = "DescriptionNew"
        p.update()
    
        p2 = db.execute_query("FIND Test*", unique=True)
        assert_true(p.is_valid())
        assert_equal(p2.description, "DescriptionNew")
    
        deny_permission(p, "UPDATE:DESCRIPTION")
    
        '''Failure'''
        p.description = "DescriptionEvenNewer"
        with raises(db.TransactionError) as te:
            p.update()
        assert te.value.has_error(db.AuthorizationException)
    
        p2 = db.execute_query("FIND Test*", unique=True)
        assert p2.is_valid()
        assert p2.description == "DescriptionNew"
    
    
    @with_setup(setup, teardown)
    def test_update_data_type():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert_true(p.is_valid())
    
        grant_permission(p, "RETRIEVE:ENTITY")
        grant_permission(p, "UPDATE:DATA_TYPE")
    
        '''Success'''
        assert_equal(p.datatype, db.TEXT)
        p.datatype = db.INTEGER
        p.update()
    
        p2 = db.execute_query("FIND Test*", unique=True)
        assert_true(p.is_valid())
        assert_equal(p2.datatype, db.INTEGER)
    
        deny_permission(p, "UPDATE:DATA_TYPE")
    
        '''Failure'''
        p.datatype = db.DOUBLE
        with raises(db.TransactionError) as te:
            p.update()
        assert te.value.has_error(db.AuthorizationException)
    
        p2 = db.execute_query("FIND Test*", unique=True)
        assert p2.is_valid()
        assert p2.datatype == db.INTEGER
    
    
    @with_setup(setup, teardown)
    def test_update_role():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert_true(p.is_valid())
    
        grant_permission(p, "RETRIEVE:ENTITY")
        grant_permission(p, "UPDATE:ROLE")
    
        '''Success'''
        rt = db.RecordType(name="TestProperty").retrieve()
        rt.update()
    
        rt2 = db.execute_query("FIND TestProperty", unique=True)
        assert_true(isinstance(rt2, db.RecordType))
    
        deny_permission(p, "UPDATE:ROLE")
    
        '''Failure'''
        rec = db.Record(name="TestProperty").retrieve()
        with raises(db.TransactionError) as te:
            rec.update()
        assert te.value.has_error(db.AuthorizationException)
    
        rt2 = db.execute_query("FIND Test*", unique=True)
        assert rt2.is_valid()
        assert isinstance(rt2, db.RecordType)
    
    
    @with_setup(setup, teardown)
    def test_update_move_file():
        upload_file = open("test.dat", "w")
        upload_file.write("hello world\n")
        upload_file.close()
    
        f = db.File(
            name="TestFile",
            path="/permissiontestfiles/test.dat",
            file="test.dat").insert()
        assert_true(f.is_valid())
    
        grant_permission(f, "RETRIEVE:ENTITY")
        grant_permission(f, "UPDATE:FILE:MOVE")
    
        '''SUCCESS'''
        f.path = "/otherpermissiontestfiles/test.dat"
        f.update()
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        assert_equal(f2.path, "/otherpermissiontestfiles/test.dat")
    
        deny_permission(f, "UPDATE:FILE:MOVE")
    
        '''FAILURE'''
        f.path = "/againotherpermissiontestfiles/test.dat"
        with raises(db.TransactionError) as te:
            f.update()
        assert te.value.has_error(db.AuthorizationException)
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        assert f2.path == "/otherpermissiontestfiles/test.dat"
    
    
    @with_setup(setup, teardown)
    def test_update_add_file():
        upload_file = open("test.dat", "w")
        upload_file.write("test update add file: #here#\n")
        upload_file.close()
    
        f = db.File(name="TestFile").insert()
        assert f.is_valid()
    
        grant_permission(f, "RETRIEVE:ENTITY")
        '''FAILURE'''
    
        f.path = "/permissiontestfiles/newtest.dat"
        f.file = upload_file
        with raises(db.TransactionError) as te:
            f.update()
        assert te.value.has_error(db.AuthorizationException)
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        assert f2.path is None
    
        '''SUCCESS'''
        grant_permission(f, "UPDATE:FILE:ADD")
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        f2.path = "/permissiontestfiles/newtest.dat"
        f2.file = upload_file
        f2.update()
        assert_true(f2.is_valid())
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        assert_equal(f2.path, "/permissiontestfiles/newtest.dat")
    
    
    @with_setup(setup, teardown)
    def test_update_change_file():
        upload_file = open("test.dat", "w")
        upload_file.write("hello world\n")
        upload_file.close()
    
        upload_file2 = open("test2.dat", "w")
        upload_file2.write("hello universe\n")
        upload_file2.close()
    
        f = db.File(
            name="TestFile",
            file=upload_file,
            path="permissiontestfiles/test.dat").insert()
        assert_true(f.is_valid())
        grant_permission(f, "RETRIEVE:ENTITY")
        grant_permission(f, "RETRIEVE:FILE")
    
        '''FAILURE'''
        deny_permission(f, "UPDATE:FILE:REMOVE")
    
        f.file = upload_file2
        with raises(db.TransactionError) as te:
            f.update()
        assert te.value.has_error(db.AuthorizationException)
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        download_file = f2.download()
        assert db.File._get_checksum(
            download_file) == db.File._get_checksum(upload_file)
    
        '''SUCCESS'''
        print('#################################################################')
        print('#################################################################')
        print(f.acl)
        grant_permission(f, "UPDATE:FILE:REMOVE")
        print(f.acl)
        grant_permission(f, "UPDATE:FILE:ADD")
        print(f.acl)
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        f2.file = upload_file2
        f2.update()
        assert f2.is_valid()
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        download_file = f2.download()
        assert db.File._get_checksum(
            download_file) == db.File._get_checksum(upload_file2)
    
    
    @with_setup(setup, teardown)
    def test_update_add_property():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert_true(p.is_valid())
        p2 = db.Property(name="TestProperty2", datatype=db.TEXT).insert()
        assert_true(p2.is_valid())
        rt = db.RecordType(name="TestRecordType").insert()
        assert_true(rt.is_valid())
    
        rt = db.execute_query("FIND TestRecordType", unique=True)
        assert_equal(len(rt.get_properties()), 0)
    
        grant_permission(p, "RETRIEVE:ENTITY")
        grant_permission(p2, "RETRIEVE:ENTITY")
        grant_permission(rt, "RETRIEVE:ENTITY")
    
        grant_permission(p, "USE:*")
        grant_permission(p2, "USE:*")
        grant_permission(rt, "UPDATE:PROPERTY:ADD")
    
        '''Success - add p to rt'''
        rt.add_property(name="TestProperty", id=p.id).update()
    
        rt = db.execute_query("FIND TestRecordType", unique=True)
        assert_equal(len(rt.get_properties()), 1)
        assert_is_not_none(rt.get_property("TestProperty"))
    
        deny_permission(rt, "UPDATE:PROPERTY:ADD")
    
        '''Failure - add p to rt'''
        rt.add_property(name="TestProperty", id=p.id)
        with raises(db.TransactionError) as te:
            rt.update()
        assert te.value.has_error(db.AuthorizationException)
    
        rt2 = db.execute_query("FIND TestRecordType", unique=True)
        assert len(rt2.get_properties()) == 1
        assert rt2.get_property("TestProperty") is not None
    
        '''Failure - add p2 to rt'''
        rt.add_property(name="TestProperty2", id=p2.id)
        with raises(db.TransactionError) as te:
            rt.update()
        assert te.value.has_error(db.AuthorizationException)
    
        rt2 = db.execute_query("FIND TestRecordType", unique=True)
        assert len(rt2.get_properties()) == 1
        assert rt2.get_property("TestProperty") is not None
    
    
    @with_setup(setup, teardown)
    def test_update_remove_property():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert_true(p.is_valid())
        p2 = db.Property(name="TestProperty2", datatype=db.TEXT).insert()
        assert_true(p2.is_valid())
        rt = db.RecordType(
            name="TestRecordType").add_property(
            name="TestProperty").add_property(
                name="TestProperty2").insert()
        assert_true(rt.is_valid())
    
        rt = db.execute_query("FIND TestRecordType", unique=True)
        assert_equal(len(rt.get_properties()), 2)
        assert_is_not_none(rt.get_property("TestProperty"))
        assert_is_not_none(rt.get_property("TestProperty2"))
    
        grant_permission(p, "RETRIEVE:ENTITY")
        grant_permission(p2, "RETRIEVE:ENTITY")
        grant_permission(rt, "RETRIEVE:ENTITY")
    
        grant_permission(p, "USE:*")
        grant_permission(p2, "USE:*")
        grant_permission(rt, "UPDATE:PROPERTY:REMOVE")
    
        '''Success - remove p2 from rt'''
        rt.remove_property("TestProperty2").update()
    
        rt2 = db.execute_query("FIND TestRecordType", unique=True)
        assert_equal(len(rt2.get_properties()), 1)
        assert_is_not_none(rt2.get_property("TestProperty"))
        assert_is_none(rt2.get_property("TestProperty2"))
    
        deny_permission(rt, "UPDATE:PROPERTY:REMOVE")
    
        '''Failure - remove p from rt'''
        rt.remove_property("TestProperty")
        with raises(db.TransactionError) as te:
            rt.update()
        assert te.value.has_error(db.AuthorizationException)
    
        rt2 = db.execute_query("FIND TestRecordType", unique=True)
        assert len(rt2.get_properties()) == 1
        assert rt2.get_property("TestProperty") is not None
    
    
    @with_setup(setup, teardown)
    def test_update_add_parent():
        rt = db.RecordType(name="TestRecordType").insert()
        assert_true(rt.is_valid())
        par1 = db.RecordType(name="TestRecordTypePar1").insert()
        assert_true(par1.is_valid())
        par2 = db.RecordType(name="TestRecordTypePar2").insert()
        assert_true(par2.is_valid())
    
        rt = db.execute_query("FIND TestRecordType", unique=True)
        assert_equal(len(rt.get_parents()), 0)
    
        grant_permission(rt, "RETRIEVE:ENTITY")
        grant_permission(par1, "RETRIEVE:ENTITY")
        grant_permission(par2, "RETRIEVE:ENTITY")
        grant_permission(par1, "USE:*")
        grant_permission(par2, "USE:*")
        grant_permission(rt, "UPDATE:PARENT:ADD")
    
        '''Success - add par1 to rt'''
        rt.add_parent(name="TestRecordTypePar1", id=par1.id).update()
    
        rt2 = db.execute_query("FIND TestRecordType", unique=True)
        assert_equal(len(rt2.get_parents()), 1)
        assert_is_not_none(rt2.get_parent("TestRecordTypePar1"))
    
        deny_permission(rt, "UPDATE:PARENT:ADD")
    
        '''Failure - add par1 to rt'''
        rt.add_parent(name="TestRecordTypePar1", id=par1.id)
        with raises(db.TransactionError) as te:
            rt.update()
        assert te.value.has_error(db.AuthorizationException)
    
        rt2 = db.execute_query("FIND TestRecordType", unique=True)
        assert len(rt2.get_parents()) == 1
        assert rt2.get_parent("TestRecordTypePar1") is not None
    
        '''Failure - add par2 to rt'''
        rt.add_parent(name="TestRecordTypePar2", id=par2.id)
        with raises(db.TransactionError) as te:
            rt.update()
        assert te.value.has_error(db.AuthorizationException)
    
        rt2 = db.execute_query("FIND TestRecordType", unique=True)
        assert len(rt2.get_parents()) == 1
        assert rt2.get_parent("TestRecordTypePar1") is not None
        assert rt2.get_parent("TestRecordTypePar2") is None
    
    
    @with_setup(setup, teardown)
    def test_update_remove_parent():
        par1 = db.RecordType(name="TestRecordTypePar1").insert()
        assert_true(par1.is_valid())
        par2 = db.RecordType(name="TestRecordTypePar2").insert()
        assert_true(par2.is_valid())
        rt = db.RecordType(name="TestRecordType").add_parent(
            par1).add_parent(par2).insert()
        assert_true(rt.is_valid())
    
        rt = db.execute_query("FIND TestRecordType", unique=True)
        assert_equal(len(rt.get_parents()), 2)
        assert_is_not_none(rt.get_parent("TestRecordTypePar1"))
        assert_is_not_none(rt.get_parent("TestRecordTypePar2"))
    
        grant_permission(rt, "RETRIEVE:ENTITY")
        grant_permission(par1, "RETRIEVE:ENTITY")
        grant_permission(par2, "RETRIEVE:ENTITY")
        grant_permission(par1, "USE:*")
        grant_permission(par2, "USE:*")
        grant_permission(rt, "UPDATE:PARENT:REMOVE")
    
        '''Success'''
        rt.remove_parent("TestRecordTypePar2").update()
    
        rt = db.execute_query("FIND TestRecordType", unique=True)
        assert_equal(len(rt.get_parents()), 1)
        assert_is_not_none(rt.get_parent("TestRecordTypePar1"))
        assert_is_none(rt.get_parent("TestRecordTypePar2"))
    
        deny_permission(rt, "UPDATE:PARENT:REMOVE")
    
        '''Failure'''
        rt.remove_parent("TestRecordTypePar1")
        with raises(db.TransactionError) as te:
            rt.update()
        assert te.value.has_error(db.AuthorizationException)
    
        rt = db.execute_query("FIND TestRecordType", unique=True)
        assert len(rt.get_parents()) == 1
        assert rt.get_parent("TestRecordTypePar1") is not None
        assert rt.get_parent("TestRecordTypePar2") is None
    
    
    @with_setup(setup, teardown)
    def test_update_value():
        p = db.Property(
            name="TestProperty",
            datatype=db.TEXT,
            value="Value").insert()
        assert p.is_valid()
    
        p = db.execute_query("FIND Test*", unique=True)
        assert p.is_valid()
        assert p.value == "Value"
    
        grant_permission(p, "RETRIEVE:ENTITY")
        grant_permission(p, "UPDATE:VALUE")
    
        '''Success'''
        p.value = "NewValue"
        p.update()
    
        p = db.execute_query("FIND Test*", unique=True)
        assert p.is_valid()
        assert p.value == "NewValue"
    
        deny_permission(p, "UPDATE:VALUE")
    
        '''Failure'''
        p.value = "EvenNewerValue"
        with raises(db.TransactionError) as te:
            p.update()
        assert te.value.has_error(db.AuthorizationException)
    
        p2 = db.execute_query("FIND Test*", unique=True)
        assert p2.is_valid()
        assert p2.value == "NewValue"
    
    
    @with_setup(setup, teardown)
    def test_deletion():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert p.is_valid()
    
        grant_permission(p, "RETRIEVE:ENTITY")
        '''Failure'''
    
        p = db.execute_query("FIND Test*", unique=True)
        assert p.is_valid()
        with raises(db.TransactionError) as te:
            p.delete()
        assert te.value.has_error(db.AuthorizationException)
    
        '''Success'''
        grant_permission(p, "DELETE")
    
        p.delete()
        assert p.get_messages()[
            0].description == "This entity has been deleted successfully."
    
    
    @with_setup(setup, teardown)
    def test_retrieve_acl():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert p.is_valid()
    
        '''Success'''
        grant_permission(p, "RETRIEVE:*")
        p.retrieve_acl()
    
        '''Failure'''
        deny_permission(p, "RETRIEVE:ACL")
    
        with raises(db.TransactionError) as te:
            p.retrieve_acl()
        assert te.value.has_error(db.AuthorizationException)
    
    
    @with_setup(setup, teardown)
    def test_retrieve_history():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert p.is_valid()
    
        grant_permission(p, "RETRIEVE:ENTITY")
    
        '''Failure'''
        with raises(db.TransactionError) as te:
            p.retrieve(flags={"H": None})
        assert te.value.has_error(db.AuthorizationException)
    
        '''Success'''
        grant_permission(p, "RETRIEVE:HISTORY")
    
        p.retrieve(flags={"H": None})
        assert p.messages["History"] == ('Insert', None)
    
    
    @with_setup(setup, teardown)
    def test_retrieve_entity():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
    
        assert p.is_valid()
    
        ''' Success'''
        grant_permission(p, "RETRIEVE:*")
        db.Property(name="TestProperty").retrieve()
    
        '''Failure'''
        deny_permission(p, "RETRIEVE:ENTITY")
    
        with raises(db.TransactionError) as te:
            p.retrieve()
        assert te.value.has_error(db.AuthorizationException)
    
    
    @with_setup(setup, teardown)
    def test_retrieve_owner():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert p.is_valid()
    
        '''Failure'''
        grant_permission(p, "RETRIEVE:ENTITY")
        deny_permission(p, "RETRIEVE:OWNER")
    
        with raises(db.TransactionError) as te:
            p.retrieve(flags={"owner": None})
        assert te.value.has_error(db.AuthorizationException)
    
        '''Success'''
        grant_permission(p, "RETRIEVE:OWNER")
    
        p.retrieve(flags={"owner": None})
    
    
    @with_setup(setup, teardown)
    def test_download_file():
        upload_file = open("test.dat", "w")
        upload_file.write("hello world\n")
        upload_file.close()
    
        f = db.File(
            name="TestFile",
            file=upload_file,
            path="permissiontestfiles/test.dat").insert()
        assert f.is_valid()
    
        '''FAILURE'''
        f2 = db.execute_query("FIND TestFile", unique=True)
    
        grant_permission(f2, "RETRIEVE:ENTITY")
        deny_permission(f2, "RETRIEVE:FILE")
    
        with raises(db.HTTPAuthorizationException):
            f.download()
    
        '''SUCCESS'''
        grant_permission(f2, "RETRIEVE:FILE")
    
        f2 = db.execute_query("FIND TestFile", unique=True)
        download_file = f2.download()
        assert db.File._get_checksum(
            download_file) == db.File._get_checksum(upload_file)
    
    
    @with_setup(setup, teardown)
    def test_grant_priority_permission():
        p = db.Property(name="TestProperty2", datatype=db.TEXT).insert()
    
        switch_to_test_user()
        with raises(db.TransactionError) as te:
            # other user cannot delete this.
            p.delete()
        assert te.value.has_error(db.AuthorizationException)
    
        deny_permission(p, "DELETE")
    
        with raises(db.TransactionError) as te:
            # still not working
            p.delete()
        assert te.value.has_error(db.AuthorizationException)
    
        # now its working
        grant_permission(p, "DELETE", priority=True)
        p.delete()
    
    
    @with_setup(setup, teardown)
    def test_deny_priority_permission():
        p = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
    
        switch_to_test_user()
        with raises(db.TransactionError) as te:
            # other user cannot delete this.
            p.delete()
        assert te.value.has_error(db.AuthorizationException)
    
        deny_permission(p, "DELETE")
    
        with raises(db.TransactionError) as te:
            # still not working
            p.delete()
        assert te.value.has_error(db.AuthorizationException)
    
        # now it's working
        grant_permission(p, "DELETE")
        # now it's not
        deny_permission(p, "DELETE", priority=True)
        with raises(db.TransactionError) as te:
            # still not working
            p.delete()
        assert te.value.has_error(db.AuthorizationException)
    
    
    @with_setup(setup, teardown)
    def test_change_priority_permission():
        entity = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
        grant_permission(entity, "*")
    
        entity.retrieve(flags={"ACL": None})
        # try and grant a priority permission with the test_user
        entity.acl.grant(
            username="test_user",
            permission="USE:AS_PROPERTY",
            priority=True)
    
        with raises(db.TransactionError) as te:
            entity.update_acl()
        assert te.value.has_error(db.AuthorizationException)
    
    
    @with_setup(setup, teardown)
    def test_permissions_there():
        entity = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
        assert_is_not_none(entity.permissions)
    
        entity2 = db.Property(id=entity.id).retrieve()
        assert_is_not_none(entity2.permissions)
    
        assert_true(entity2.is_permitted("RETRIEVE:ACL"))
    
    
    @with_setup(setup, teardown)
    def test_grant_nonsense_permission():
        entity = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
        entity.retrieve_acl()
        entity.grant(username="someone", permission="PAINT:BLUE")
        with raises(db.TransactionError):
            entity.update_acl()
    
    
    @with_setup(setup, teardown)
    def test_global_acl_there():
        assert_is_not_none(db.get_global_acl())
        assert_true(isinstance(db.get_global_acl(), db.ACL))
        assert_is_not_none(db.get_known_permissions())
        assert_true(isinstance(db.get_known_permissions(), db.Permissions))
        print(db.get_known_permissions())
        print(db.get_global_acl())
    
    
    @with_setup(setup, teardown)
    def test_use_as_property():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert p.is_valid()
        switch_to_test_user()
        rt = db.RecordType(name="TestRecordType").add_property(name="TestProperty")
    
        deny_permission(p, "USE:AS_PROPERTY")
        '''Failure'''
        deny_permission(p, "USE:AS_PROPERTY")
        with raises(db.TransactionError) as cm:
            rt.insert()
        assert cm.value.has_error(db.UnqualifiedPropertiesError)
        assert int(rt.get_property("TestProperty").get_errors()[0].code) == 403
    
        '''Success'''
        grant_permission(p, "USE:AS_PROPERTY")
        rt2 = db.RecordType(
            name="TestRecordType").add_property(
            name="TestProperty").insert()
        assert rt2.is_valid()
    
    
    @with_setup(setup, teardown)
    def test_use_as_data_type():
        dt_name = "TestPersonRecordType"
        dt = db.RecordType(name=dt_name).insert()
    
        '''success'''
        grant_permission(dt, "RETRIEVE:ENTITY")
        grant_permission(dt, "USE:AS_DATA_TYPE")
        db.Property(name="TestConductorProperty", datatype=dt_name).insert()
    
        deny_permission(dt, "USE:AS_DATA_TYPE")
        with raises(db.TransactionError) as cm:
            db.Property(name="TestConductorProperty2", datatype=dt_name).insert()
        assert cm.value.has_error(db.AuthorizationException)
    
    
    @with_setup(setup, teardown)
    def test_use_as_reference():
    
        p = db.RecordType(name="TestRecordTypeForReference").insert()
        rec = db.Record().add_parent(parent=p).insert()
    
        grant_permission(p, "RETRIEVE:ENTITY")
        grant_permission(p, "USE:*")
        grant_permission(rec, "RETRIEVE:ENTITY")
        grant_permission(rec, "USE:*")
    
        rt = db.RecordType(
            name="TestRecordType").add_property(
            name="TestRecordTypeForReference",
            value=rec).insert()
        assert rt.is_valid()
    
        deny_permission(rec, "USE:AS_REFERENCE")
        rt2 = db.RecordType(
            name="TestRecordType2").add_property(
            name="TestRecordTypeForReference",
            value=rec)
        with raises(db.TransactionError) as cm:
            rt2.insert()
        assert cm.value.has_error(db.UnqualifiedPropertiesError)
        assert int(rt2.get_property(p.name).get_errors()[0].code) == 403
    
    
    @with_setup(setup, teardown)
    def test_use_as_parent():
        p = db.Property(name="TestProperty", datatype=db.TEXT).insert()
        assert p.is_valid()
        switch_to_test_user()
    
        p2 = db.Property(
            name="TestPropertyChild",
            datatype=db.TEXT).add_parent(
            name="TestProperty")
    
        deny_permission(p, "USE:AS_PARENT")
        '''Failure'''
        deny_permission(p, "USE:AS_PARENT")
        with raises(db.TransactionError) as cm:
            p2.insert()
        assert cm.value.has_error(db.UnqualifiedParentsError)
        assert int(p2.get_parent("TestProperty").get_errors()[0].code) == 403
    
        '''Success'''
        grant_permission(p, "USE:AS_PARENT")
        p3 = db.Property(
            name="TestPropertyChild").add_parent(
            name="TestProperty").insert()
        assert p3.is_valid()
    
    
    @with_setup(setup, teardown)
    def test_access_control_job_bug():
        """test_access_control_job_bug.
    
        The AccessControl class attempts to retrieve an entity with an id < 0,
        which causes a DataTruncation Error in the SQL backend.
    
        This happens if a user does not have the permission "TRANSACTION:INSERT"
        """
        revoke_permissions_test_role()
        switch_to_test_user()
        with raises(db.TransactionError) as te:
            rt1 = db.RecordType(name="TestRT").add_parent(id=-5).insert()
        assert te.value.has_error(db.AuthorizationException)
        set_transaction_permissions_test_role()
    
    
    @with_setup(setup, teardown)
    def test_check_entity_acl_roles():
        '''Test the CheckEntityACLRoles class.
    
        Insert an entity.
        With "CHECK_ENTITY_ACL_ROLES_MODE=MUST":
        Add permissions for a non-existing user role -> error
        Set "CHECK_ENTITY_ACL_ROLES_MODE=SHOULD":
        Add permissions for a non-existing user role -> warning
        reset to CHECK_ENTITY_ACL_ROLES_MODE=MUST
        '''
    
        reset = db.administration.get_server_property(
            "CHECK_ENTITY_ACL_ROLES_MODE")
        if reset == "SHOULD":
            db.administration.set_server_property(
                "CHECK_ENTITY_ACL_ROLES_MODE", "MUST")
    
        p = db.Property(name="TestP", datatype=db.TEXT,
                        description="test_check_entity_acl_roles").insert()
    
        # test failure with error
        with raises(db.TransactionError) as cm:
            grant_permission(p, "USE:AS_PARENT", username="asdf-non-existing",
                             switch=False)
        errors = cm.value.entity.errors
        assert errors[0].description == "User Role does not exist."
        db.administration.set_server_property(
            "CHECK_ENTITY_ACL_ROLES_MODE", "SHOULD")
    
        # test success with warning
        ret = grant_permission(p, "USE:AS_PROPERTY",
                               username="asdf-non-existing", switch=False)
        assert ret.get_warnings()[0].description == "User Role does not exist."
    
        db.administration.set_server_property("CHECK_ENTITY_ACL_ROLES_MODE", reset)