# encoding: utf-8 # # ** header v3.0 # This file is a part of the CaosDB Project. # # Copyright (C) 2018 Research Group Biomedical Physics, # Max-Planck-Institute for Dynamics and Self-Organization Göttingen # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # ** end header # # Copyright (c) 2019 IndiScale GmbH """Created on 23.07.2015. @author: tf """ from nose.tools import (assert_equal, assert_is_not_none, # @UnresolvedImport assert_not_equal, assert_true, nottest, with_setup) import caosdb as db from caosdb import (Container, Info, Property, Record, RecordType, execute_query) from pytest import raises from pytest import mark def setup(): try: db.execute_query("FIND Test*").delete() except Exception as e: print(e) @with_setup(setup=setup, teardown=setup) def test_file_system_returns_ids(): upload_file = open("test.dat", "w") upload_file.write("hello world\n") upload_file.close() file_ = db.File(name="TestFileA", description="Testfile Desc", path="testfiles/test.dat", file="test.dat") file_.insert() c = db.get_connection() resp = c.retrieve( entity_uri_segments=[ "FileSystem", "testfiles"], reconnect=True) body = resp.read() print(body) print(type(body)) search = "id=\"" + str(file_.id) + "\"" print(search) assert_true(search in str(body)) def test_sat_query_with_leading_slash(): upload_file = open("test.dat", "w") upload_file.write("hello world\n") upload_file.close() file_ = db.File(name="TestFileA", description="Testfile Desc", path="testfiles/test.dat", file="test.dat") file_.insert() assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED testfiles/test.dat", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED /testfiles/test.dat", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED testfiles/test.*", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED /testfiles/test.*", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED testfiles/*", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED /testfiles/*", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED */test.dat", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED /*/test.dat", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED **/test.dat", unique=True).id) assert_equal( file_.id, db.execute_query( "FIND FILE WHICH IS STORED /**/test.dat", unique=True).id) def test_name_with_slash(): rt1 = db.RecordType("Test/Name").insert() assert_true(rt1.is_valid()) rt2 = db.RecordType("Test/Name").retrieve() assert_equal(rt1.id, rt2.id) def test_nonsense_flag(): db.execute_query("FIND Test", flags={"ThisIsUtterNonsense": None}) def test_error_no_such_role(): xml = "<Insert><Entity name='test'/></Insert>" r = db.get_connection().insert(entity_uri_segment=["Entity"], body=xml) c = Container._response_to_entities(r) with raises(db.TransactionError) as cm: db.raise_errors(c) assert (cm.value.errors[0].msg == "There is no such role 'Entity'.") xml = "<Insert><ASDF name='test'/></Insert>" r = db.get_connection().insert(entity_uri_segment=["Entity"], body=xml) c = Container._response_to_entities(r) with raises(db.TransactionError) as cm: db.raise_errors(c) assert (cm.value.errors[0].msg == "There is no such role 'ASDF'.") @with_setup(setup, setup) def test_parent_duplicate_1(): db.RecordType(name="TestRT1").insert() db.Property(name="TestProperty", datatype=db.TEXT).insert() rt2 = db.RecordType( name="TestRT2").add_parent( name="TestRT1").add_parent( name="TestRT1").add_property( name="TestProperty") assert_equal(len(rt2.get_parents()), 2) rt2.insert() assert_equal(len(rt2.get_parents()), 1) assert_equal(len(rt2.get_warnings()), 1) assert_equal( rt2.get_warnings()[0].description, "This entity had parent duplicates. That is meaningless and only one parent had been inserted.") @with_setup(setup, setup) def test_parent_duplicate_2(): db.RecordType(name="TestRT1").insert() rt2 = db.RecordType( name="TestRT2").add_parent( name="TestRT1", inheritance=db.ALL).add_parent( name="TestRT1", inheritance=db.NONE) assert len(rt2.get_parents()) == 2 with raises(db.TransactionError) as cm: rt2.insert() assert (cm.value.errors[0].msg == "This entity had parent duplicates. Parent duplicates are meaningless and would be ignored (and inserted only once). But these parents had diverging inheritance instructions which cannot be processed.") def test_server_error(): con = db.get_connection() con._login() with raises(db.HTTPServerError) as cm: con._http_request( method="GET", path="Entity?debug=throwNullPointerException") assert_true("SRID = " in cm.value.msg) def test_annotation(): try: p2 = Property(name="AnnotationTestUser", datatype="TEXT").insert() assert_true(p2.is_valid()) p_comment = Property( name="AnnotationTestComment", datatype="TEXT").insert() assert_true(p_comment.is_valid()) p_datetime = Property( name="AnnotationTestDatetime", datatype="Datetime").insert() assert_true(p_datetime.is_valid()) p_entity = Property( name="AnnotationTestEntity", datatype="REFERENCE").insert() assert_true(p_entity.is_valid()) Property(name="REFERENCE").retrieve() rt = RecordType( name="AnnotationTestAnnotation").add_property( name="AnnotationTestDatetime", importance="OBLIGATORY").add_property( name="AnnotationTestComment", importance="OBLIGATORY").add_property( name="AnnotationTestUser", importance="OBLIGATORY").add_property( name="AnnotationTestEntity", importance="OBLIGATORY").insert() assert_true(rt.is_valid()) rt2 = RecordType(name="AnnotationTestSimpleRecordType").insert() assert_true(rt2.is_valid()) rec = Record( name="AnnotationTestSpecialAnnotion").add_parent(rt).add_property( name="AnnotationTestDatetime", value="NOW").add_property( name="AnnotationTestUser", value=db.get_config().get( "Connection", "username")).add_property( name="AnnotationTestEntity", value=rt2).add_property( name="AnnotationTestComment", value="Veeeery nice!").insert() assert_true(rec.is_valid()) ann = execute_query( "FIND AnnotationTestAnnotation WHICH REFERENCES AnnotationTestSimpleRecordType AS AN AnnotationTestEntity", unique=True) assert_true(ann.is_valid()) assert_equal(rec.id, ann.id) finally: rt = execute_query("FIND AnnotationTest*").delete() def test_info(): i = Info() assert_is_not_none(i.messages["Flags"]) assert_is_not_none(i.messages["Counts"]) assert_is_not_none(i.messages["TransactionBenchmark"]) assert_not_equal('-1', i.messages["Counts"]["files"]) assert_not_equal('-1', i.messages["Counts"]["records"]) assert_not_equal('-1', i.messages["Counts"]["properties"]) assert_not_equal('-1', i.messages["Counts"]["recordTypes"]) assert_equal('true', i.messages["Counts"]["debug"]) # Not necessarily 0, all the temporary directories go here as well. # assert_equal('0', i.messages["Counts"]["tmpfiles"]) def test_long_description(): try: longstr = 'desc_' while len(longstr) < 10000: longstr += "a" rt = RecordType( name="SimpleRecordTypeWithLongDesc", description=longstr).insert() assert_true(rt.is_valid()) assert_equal(rt.description, longstr) rt2 = RecordType(name="SimpleRecordTypeWithLongDesc").retrieve() assert_true(rt2.is_valid()) assert_equal(rt2.description, longstr) finally: try: rt.delete() except BaseException: pass def test_auto_importance_for_properties(): try: p = Property(name="SimpleTestProperty1", datatype=db.TEXT).insert() assert_true(p.is_valid()) p2 = Property( name="SimpleTestProperty2", datatype=db.TEXT).add_property(p).insert() assert_true(p2.is_valid()) finally: try: p2.delete() except BaseException: pass try: p.delete() except BaseException: pass def test_overrides_with_deletion_in_worst_case_order(): try: try: db.execute_query("FIND Simple*").delete() except BaseException: pass p = Property( name="SimpleTestProperty1", description="desc1", datatype=db.TEXT).insert() assert_true(p.is_valid()) rt1 = RecordType( name="SimpleRT1").add_property( id=p.id, name="nameOverride1", description="descOverride1").insert() assert_true(rt1.is_valid()) assert_equal(p.id, rt1.get_property("nameOverride1").id) assert_equal( "descOverride1", rt1.get_property("nameOverride1").description) # is persistent? rt1c = RecordType(name="SimpleRT1").retrieve() assert_true(rt1c.is_valid()) assert_equal(p.id, rt1c.get_property("nameOverride1").id) assert_equal( "descOverride1", rt1c.get_property("nameOverride1").description) db.Container().extend([p, rt1]).delete() finally: try: rt1.delete() except BaseException: pass try: p.delete() except BaseException: pass def test_overrides_with_duplicates(): try: try: db.execute_query("FIND Simple*").delete() except BaseException: pass p = Property( name="SimpleTestProperty1", description="desc1", datatype=db.TEXT).insert() assert_true(p.is_valid()) rt1 = RecordType( name="SimpleRT1").add_property( id=p.id, name="nameOverride1", description="descOverride1").add_property( id=p.id, name="nameOverride2", description="descOverride2").insert() assert_true(rt1.is_valid()) assert_equal(p.id, rt1.get_property("nameOverride1").id) assert_equal(p.id, rt1.get_property("nameOverride2").id) assert_equal( "descOverride1", rt1.get_property("nameOverride1").description) assert_equal( "descOverride2", rt1.get_property("nameOverride2").description) # is persistent? rt1c = RecordType(name="SimpleRT1").retrieve() assert_true(rt1c.is_valid()) assert_equal(p.id, rt1c.get_property("nameOverride1").id) assert_equal(p.id, rt1c.get_property("nameOverride2").id) assert_equal( "descOverride1", rt1c.get_property("nameOverride1").description) assert_equal( "descOverride2", rt1c.get_property("nameOverride2").description) finally: try: rt1.delete() except BaseException: pass try: p.delete() except BaseException: pass def test_overrides_in_subdomains(): try: try: db.execute_query("FIND Simple*").delete() except BaseException: pass def insert_model(): p1 = Property( name="SimpleTestProperty1", description="desc1", datatype=db.TEXT).insert() assert_true(p1.is_valid()) p2 = Property( name="SimpleTestProperty2", description="desc2", datatype=db.TEXT).insert() assert_true(p2.is_valid()) p3 = Property( name="SimpleTestProperty3", description="desc3", datatype=db.TEXT).insert() assert_true(p3.is_valid()) pov1 = Property( id=p1.id, name="SimpleTestPropertyov1", description="desc1ov1") pov2 = Property( id=p2.id, name="SimpleTestPropertyov2", description="desc1ov2") pov3 = Property( id=p3.id, name="SimpleTestPropertyov3", description="desc1ov3") pov21 = Property( id=p1.id, name="SimpleTestPropertyov21", description="desc1ov21") pov22 = Property( id=p2.id, name="SimpleTestPropertyov22", description="desc1ov22") pov31 = Property( id=p1.id, name="SimpleTestPropertyov31", description="desc1ov31") pov32 = Property( id=p2.id, name="SimpleTestPropertyov32", description="desc1ov32") pov33 = Property( id=p3.id, name="SimpleTestPropertyov33", description="desc1ov33") pov321 = Property( id=p1.id, name="SimpleTestPropertyov321", description="desc1ov321") pov322 = Property( id=p2.id, name="SimpleTestPropertyov322", description="desc1ov322") pov323 = Property( id=p3.id, name="SimpleTestPropertyov323", description="desc1ov323") pov32.add_property(pov321).add_property( pov322).add_property(pov323) pov3.add_property(pov31).add_property(pov32).add_property(pov33) pov2.add_property(pov21).add_property(pov22) rt1 = RecordType(name="SimpleRT1").add_property( pov1).add_property(pov2).add_property(pov3).insert() assert_true(rt1.is_valid()) insert_model() finally: try: db.execute_query("FIND SimpleRT1").delete() except BaseException: pass try: db.execute_query("FIND SimpleTestProperty3").delete() except BaseException: pass try: db.execute_query("FIND SimpleTestProperty2").delete() except BaseException: pass try: db.execute_query("FIND SimpleTestProperty1").delete() except BaseException: pass @nottest def test_cache_performance(): import time as t q = db.Query("Count Record Simulation.ID>57500") q.execute() results = q.results print("\nFetching " + str(results) + " Entities (IdOnly)") t1 = t.time() q = db.Query("Find Record Simulation.ID>57500") q.putFlag("IdOnly") c = q.execute() t2 = t.time() - t1 print("Time [s]: " + str(t2)) t2 = 0.0 n = 10 while t2 < 30 and n <= results: fetch = Container() i = 0 for e in c: i += 1 fetch.append(Record(id=e.id)) if i >= n: break print("\nFetching " + str(n) + " Entities (w/o Cache)") t1 = t.time() fetch.retrieve(flags="disableCache") t2 = t.time() - t1 print("Time [s]: " + str(t2)) fetch = Container() i = 0 for e in c: i += 1 fetch.append(Record(id=e.id)) if i >= n: break print("Fetching " + str(n) + " Entities (with Cache)") t1 = t.time() fetch.retrieve() t2 = t.time() - t1 print("Time [s]: " + str(t2)) n += 10 @mark.xfail(reason="Waits for MR https://gitlab.indiscale.com/caosdb/src/caosdb-pylib/-/merge_requests/15") def test_role_after_retrieve(): rt = db.RecordType("TestRT").insert() entity = db.Entity(id=rt.id) assert entity.role is None entity.retrieve() assert entity.role == rt.role