diff --git a/tests/test_error_stuff.py b/tests/test_error_stuff.py index b43b53ee194a9ec8c598118861786d8e5fde77af..d975ef5dd30368df70e007f0585c7f97b09c9fbc 100644 --- a/tests/test_error_stuff.py +++ b/tests/test_error_stuff.py @@ -30,7 +30,7 @@ Created on 19.02.2015. @author: tf """ -import caosdb as h +import linkahead as db from caosdb.exceptions import (AmbiguousEntityError, EntityDoesNotExistError, EntityError, EntityHasNoDatatypeError, @@ -42,7 +42,7 @@ import pytest def setup_function(function): try: - h.execute_query("FIND ENTITY").delete() + db.execute_query("FIND ENTITY").delete() except BaseException: pass @@ -54,7 +54,7 @@ def teardown_function(function): def test_retrieval_no_exception_raised(): """Test whether retrieval fails but error is suppressed.""" - p = h.Property(name="TestNon-ExsistentProperty").retrieve( + p = db.Property(name="TestNon-ExsistentProperty").retrieve( unique=True, raise_exception_on_error=False) assert not p.is_valid() assert (p.id is None or p.id < 0) @@ -67,8 +67,8 @@ def test_retrieval_exception_raised(): """ propname = "TestNon-ExistentProperty" with pytest.raises(TransactionError) as te: - h.Property(name="TestNon-ExistentProperty").retrieve(unique=True, - raise_exception_on_error=True) + db.Property(name="TestNon-ExistentProperty").retrieve(unique=True, + raise_exception_on_error=True) assert len(te.value.errors) == 1 ee = te.value.errors[0] # Check for type incl. inheritance @@ -89,19 +89,19 @@ def test_ambiguous_retrieval(): raised correctly if there are two possible candidates. """ - h.RecordType(name="TestType").insert() - h.Record(name="TestRec").add_parent(name="TestType").insert() + db.RecordType(name="TestType").insert() + db.Record(name="TestRec").add_parent(name="TestType").insert() # Insert twice, so unique=False - h.Record(name="TestRec").add_parent(name="TestType").insert(unique=False) + db.Record(name="TestRec").add_parent(name="TestType").insert(unique=False) with pytest.raises(TransactionError) as te: - h.Record(name="TestRec").retrieve() + db.Record(name="TestRec").retrieve() assert te.value.has_error(AmbiguousEntityError) assert te.value.errors[0].entity.name == "TestRec" def test_insertion_no_exception_raised(): """Test whether insertion fails but no error is raised.""" - p = h.Property(name="TestNoTypeProperty").insert( + p = db.Property(name="TestNoTypeProperty").insert( raise_exception_on_error=False) assert not p.is_valid() assert (p.id is None or p.id < 0) @@ -109,7 +109,7 @@ def test_insertion_no_exception_raised(): def test_insertion_exception_raised(): """Test insertion of a property with missing datatype.""" - p = h.Property(name="TestNoTypeProperty") + p = db.Property(name="TestNoTypeProperty") with pytest.raises(TransactionError) as te: p.insert(raise_exception_on_error=True) assert te.value.has_error(EntityHasNoDatatypeError) @@ -117,7 +117,7 @@ def test_insertion_exception_raised(): def test_insertion_with_invalid_parents(): with pytest.raises(TransactionError) as te: - p = h.Property( + p = db.Property( name="TestNoTypeProperty", datatype="Text").add_parent( id=-1) @@ -136,7 +136,7 @@ def test_insertion_with_invalid_parents(): def test_insertion_with_invalid_properties(): with pytest.raises(TransactionError) as te: - p = h.Property( + p = db.Property( name="TestNoTypeProperty", datatype="Text").add_property( id=-1) @@ -157,11 +157,11 @@ def test_entity_does_not_exist(): EntityDoesNotExistErrors. """ - p1 = h.Property(name="TestNon-ExistentProperty1").retrieve( + p1 = db.Property(name="TestNon-ExistentProperty1").retrieve( raise_exception_on_error=False) - p2 = h.Property(name="TestNon-ExistentProperty2").retrieve( + p2 = db.Property(name="TestNon-ExistentProperty2").retrieve( raise_exception_on_error=False) - p3 = h.Property(name="TestNon-ExistentProperty3").retrieve( + p3 = db.Property(name="TestNon-ExistentProperty3").retrieve( raise_exception_on_error=False) # None of them should exist assert not p1.is_valid() @@ -171,17 +171,17 @@ def test_entity_does_not_exist(): assert not p3.is_valid() assert (p3.id is None or p3.id < 0) - pe = h.Property(name="TestExistentProperty", datatype="text").insert() + pe = db.Property(name="TestExistentProperty", datatype="text").insert() - c = h.Container().extend( + c = db.Container().extend( [ - h.Property( + db.Property( name="TestNon-ExistentProperty1"), - h.Property( + db.Property( name="TestNon-ExistentProperty2"), - h.Property( + db.Property( name="TestNon-ExistentProperty3"), - h.Property( + db.Property( name="TestExistentProperty")]) with pytest.raises(TransactionError) as te: @@ -199,11 +199,11 @@ def test_insert_existent_entity(): UniqueNamesError. """ - p1 = h.Property(name="TestNon-ExistentProperty1").retrieve( + p1 = db.Property(name="TestNon-ExistentProperty1").retrieve( raise_exception_on_error=False) - p2 = h.Property(name="TestNon-ExistentProperty2").retrieve( + p2 = db.Property(name="TestNon-ExistentProperty2").retrieve( raise_exception_on_error=False) - p3 = h.Property(name="TestNon-ExistentProperty3").retrieve( + p3 = db.Property(name="TestNon-ExistentProperty3").retrieve( raise_exception_on_error=False) # None of them should exist assert not p1.is_valid() @@ -213,21 +213,21 @@ def test_insert_existent_entity(): assert not p3.is_valid() assert (p3.id is None or p3.id < 0) - pe = h.Property(name="TestExistentProperty", datatype="text").insert() + pe = db.Property(name="TestExistentProperty", datatype="text").insert() assert pe.is_valid() - c = h.Container().extend( + c = db.Container().extend( [ - h.Property( + db.Property( name="TestNon-ExistentProperty1", datatype="text"), - h.Property( + db.Property( name="TestNon-ExistentProperty2", datatype="text"), - h.Property( + db.Property( name="TestNon-ExistentProperty3", datatype="text"), - h.Property( + db.Property( name="TestExistentProperty", datatype="text")]) @@ -243,31 +243,31 @@ def test_insert_existent_entity(): def test_double_insertion(): - c1 = h.Container() + c1 = db.Container() c1.append( - h.Property( + db.Property( name="TestSimpleTextProperty", description="simple text property (from test_error_stuff.py)", datatype='text')) c1.append( - h.Property( + db.Property( name="TestSimpleDoubleProperty", description="simple double property (from test_error_stuff.py)", datatype='double')) c1.append( - h.Property( + db.Property( name="TestSimpleIntegerProperty", description="simple integer property (from test_error_stuff.py)", datatype='integer')) c1.append( - h.Property( + db.Property( name="TestSimpleDatetimeProperty", description="simple datetime property (from test_error_stuff.py)", datatype='datetime')) c1.append( - h.RecordType( + db.RecordType( name="TestSimpleRecordType", description="simple recordType (from test_error_stuff.py)").add_property( name='TestSimpleTextProperty').add_property( @@ -277,30 +277,30 @@ def test_double_insertion(): c1.insert() - c2 = h.Container() + c2 = db.Container() c2.append( - h.Property( + db.Property( name="TestSimpleTextProperty", description="simple text property (from test_error_stuff.py)", datatype='text')) c2.append( - h.Property( + db.Property( name="TestSimpleDoubleProperty", description="simple double property (from test_error_stuff.py)", datatype='double')) c2.append( - h.Property( + db.Property( name="TestSimpleIntegerProperty", description="simple integer property (from test_error_stuff.py)", datatype='integer')) c2.append( - h.Property( + db.Property( name="TestSimpleDatetimeProperty", description="simple datetime property (from test_error_stuff.py)", datatype='datetime')) c2.append( - h.RecordType( + db.RecordType( name="TestSimpleRecordType", description="simple recordType (from test_error_stuff.py)").add_property( name='TestSimpleTextProperty').add_property( @@ -322,7 +322,7 @@ def test_update_acl_errors(): `Entity.update_acl` """ - rec_ne = h.Record("TestRecordNonExisting") + rec_ne = db.Record("TestRecordNonExisting") with pytest.raises(TransactionError) as te: @@ -331,12 +331,37 @@ def test_update_acl_errors(): assert te.value.has_error(EntityDoesNotExistError) assert te.value.errors[0].entity.name == rec_ne.name - rt = h.RecordType(name="TestType").insert() - rec = h.Record(name="TestRecord").add_parent(rt).insert() - h.Record(name=rec.name).add_parent(rt).insert(unique=False) + rt = db.RecordType(name="TestType").insert() + rec = db.Record(name="TestRecord").add_parent(rt).insert() + db.Record(name=rec.name).add_parent(rt).insert(unique=False) with pytest.raises(TransactionError) as te: - h.Record(name=rec.name).update_acl() + db.Record(name=rec.name).update_acl() assert te.value.has_error(AmbiguousEntityError) + + +@pytest.mark.xfail(reason="Waiting for pylib release") +def test_URI_too_long(): + """Some tests for variours URI too long commands. + + See for example https://gitlab.indiscale.com/caosdb/src/caosdb-pylib/-/issues/180 + + """ + + short = 100 + uri_long = 819 + header_long = 815 + + with pytest.raises(db.TransactionError) as excinfo: + db.execute_query("0123456789" * short) + assert "Parsing" in excinfo.value.msg + + with pytest.raises(db.HTTPURITooLongError) as excinfo: + db.execute_query("0123456789" * uri_long) + assert "414" in excinfo.value.msg + + with pytest.raises(db.HTTPURITooLongError) as excinfo: + db.execute_query("0123456789" * header_long) + assert "431" in excinfo.value.msg diff --git a/tests/test_permissions.py b/tests/test_permissions.py index 1b50060de9df368618e5e4966263dde30242de0d..fafc70db1a8fb97c63c2ba875e08f7eeb6768afa 100644 --- a/tests/test_permissions.py +++ b/tests/test_permissions.py @@ -36,6 +36,7 @@ from nose.tools import (assert_equal, assert_is_not_none) from pytest import raises, mark + test_user = "test_user" test_role = "test_role" test_pw = "passphrase1P!" @@ -187,6 +188,33 @@ def test_basic_acl_stuff(): assert_false("DELETE" in other_role_permissions) +@mark.xfail(reason="fix needed: https://gitlab.com/linkahead/linkahead-server/-/issues/247") +def test_server_issue_247(): + db.administration.set_server_property( + "QUERY_FILTER_ENTITIES_WITHOUT_RETRIEVE_PERMISSIONS", "TRUE") + person = db.RecordType("TestPerson").insert() + db.Property("TestFirstName", datatype=db.TEXT).insert() + db.Property("TestConductor", datatype=person).insert() + + dan = db.Record( + name="TestDaniel").add_property( + name="TestFirstName", + value="Daniel").add_parent(person).insert() + exp = db.RecordType( + name="TestExperiment").add_property( + name="TestConductor", + value=dan.id).insert() + + grant_permission(person, "RETRIEVE:*") + grant_permission(exp, "RETRIEVE:*") + deny_permission(dan, "RETRIEVE:*") + switch_to_test_user() + + assert db.execute_query( + "FIND ENTITY TestExperiment WHICH HAS A TestConductor=" + str(dan.id), + unique=True).id == exp.id + + def test_query(): db.administration.set_server_property( "QUERY_FILTER_ENTITIES_WITHOUT_RETRIEVE_PERMISSIONS", "TRUE") @@ -239,10 +267,11 @@ def test_query(): unique=True) '''... but works without the which clause''' assert db.execute_query("FIND ENTITY TestExperiment", unique=True).id == exp.id + '''and with the id''' - assert db.execute_query( - "FIND ENTITY TestExperiment WHICH HAS A TestConductor=" + str(dan.id), - unique=True).id == exp.id + # assert db.execute_query( + # "FIND ENTITY TestExperiment WHICH HAS A TestConductor=" + str(dan.id), + # unique=True).id == exp.id '''failure - exp''' grant_permission(dan, "RETRIEVE:*") @@ -1195,3 +1224,118 @@ def test_deny_update_role(): p.name = "TestPropertyEvenNewer" with raises(db.TransactionError) as te: p.update() + + +def test_query_with_invisible_reference(): + """ + Names of references that are not visible to the test user should not be usable as query + filters. + """ + db.administration.set_server_property( + "QUERY_FILTER_ENTITIES_WITHOUT_RETRIEVE_PERMISSIONS", "TRUE") + + rt = db.RecordType(name="TestRT").insert() + rec_invisible = db.Record(name="TestInvisible").add_parent(rt).insert() + rec_visible = db.Record(name="TestVisible").add_parent( + rt).add_property(name=rt.name, value=rec_invisible.id).insert() + # test user is only allowed to see rec_visible, not rec_invisble + grant_permission(rec_visible, "RETRIEVE:*") + deny_permission(rec_invisible, "RETRIEVE:*") + + # as admin, I'm allowed to filter this + switch_to_admin_user() + assert len(db.execute_query(f"FIND {rt.name} WITH {rt.name}={rec_invisible.name}")) == 1 + + switch_to_test_user() + + # Retrival is forbidden + with raises(db.TransactionError) as te: + retrieved_invisible = db.Record(id=rec_invisible.id).retrieve() + assert te.value.has_error(db.AuthorizationError) + + retrieved_visible = db.Record(id=rec_visible.id).retrieve() + assert retrieved_visible.name == rec_visible.name + assert retrieved_visible.get_property(rt.name) is not None + assert retrieved_visible.get_property(rt.name).value == rec_invisible.id + + # We cant see rec_invisible, so its name can't be used as a valid query filter. + assert len(db.execute_query( + f"FIND {rt.name} WITH {rt.name} WITH name={rec_invisible.name}")) == 0 + assert len(db.execute_query(f"FIND {rt.name} WITH {rt.name}={rec_invisible.name}")) == 0 + assert len(db.execute_query(f"FIND {rt.name} WITH {rt.name} LIKE '*invis*'")) == 0 + + +def test_select_query_with_invisible_reference(): + """SELECT queries must not leak property values of invisible referenced entities.""" + + visible_rt = db.RecordType(name="TestTypeVisible").insert() + invisible_rt = db.RecordType(name="TestTypeInvisible").insert() + other_rt = db.RecordType(name="TestTypeOther").insert() + prop = db.Property(name="TestProp", datatype=db.INTEGER).insert() + + other_rec = db.Record(name="TestOther").add_parent(other_rt).insert() + referenced_visible = db.Record(name="TestReferencedVisible").add_parent(visible_rt).insert() + # invisible rec will have one int property, one reference to the (invisible) + # other rt and one to the visible rt. + invisible_rec = db.Record(name="TestInvisible").add_parent(invisible_rt) + invisible_rec.add_property(name=prop.name, value=42) + invisible_rec.add_property(name=other_rt.name, value=other_rec.id) + invisible_rec.add_property(name=visible_rt.name, value=referenced_visible.id) + invisible_rec.insert() + visible_rec = db.Record(name="TestVisible").add_parent(visible_rt.name) + visible_rec.add_property(name=invisible_rt.name, value=invisible_rec.id) + visible_rec.insert() + + # Everything is there when queried as admin + select_query = ( + f"SELECT name, {invisible_rt.name}, {invisible_rt.name}.name, " + f"{invisible_rt.name}.{prop.name}, {invisible_rt.name}.{other_rt.name}, " + f"{invisible_rt.name}.{other_rt.name}.name FROM {visible_rec.id}") + select_results = db.execute_query(select_query) + value_args = ["name", f"{invisible_rt.name}", (invisible_rt.name, "name"), + (invisible_rt.name, prop.name), (invisible_rt.name, other_rt.name), + (invisible_rt.name, other_rt.name, "name")] + values = select_results.get_property_values(*value_args)[0] + assert values[0] == visible_rec.name + assert values[1] == invisible_rec.id + assert values[2] == invisible_rec.name + assert values[3] == invisible_rec.get_property(prop.name).value + assert values[4] == other_rec.id + assert values[5] == other_rec.name + + for rec in [referenced_visible, visible_rec]: + grant_permission(rec, "RETRIEVE:*") + for rec in [invisible_rec, other_rec]: + deny_permission(rec, "RETRIEVE:*") + + switch_to_test_user() + + select_results = db.execute_query(select_query) + values = select_results.get_property_values(*value_args)[0] + assert values[0] == visible_rec.name + assert values[1] == invisible_rec.id # id is ok + assert values[2] == invisible_rt.name # fall-back to property's name + assert values[3] is None # prop isn't either + assert values[4] is None # neither id ... + assert values[5] is None # ... nor name of other rec referenced by invisible + + # Special case of visible referencing invisible referencing visible + + switch_to_admin_user() + + select_query = ( + f"SELECT {invisible_rt.name}.{visible_rt.name}, " + f"{invisible_rt.name}.{visible_rt.name}.name FROM {visible_rec.id}") + value_args = [(invisible_rt.name, visible_rt.name), + (invisible_rt.name, visible_rt.name, "name")] + select_results = db.execute_query(select_query) + values = select_results.get_property_values(*value_args)[0] + assert values[0] == referenced_visible.id + assert values[1] == referenced_visible.name + + switch_to_test_user() + + select_results = db.execute_query(select_query) + values = select_results.get_property_values(*value_args)[0] + assert values[0] is None + assert values[1] is None diff --git a/tests/test_query.py b/tests/test_query.py index 72fc01cca086fd6e107307c23956bf64ced85896..87500c405966f1b07fab239ce679b72ce3b6f3f0 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1101,6 +1101,8 @@ def test_query_cache(): def test_query_cache_with_permissions(): + db.administration.set_server_property( + "QUERY_FILTER_ENTITIES_WITHOUT_RETRIEVE_PERMISSIONS", "TRUE") db.RecordType("TestRT").insert() db.RecordType("TestRT2").insert() public_record = db.Record().add_parent("TestRT").insert()