#!/bin/python # Tests for the tool using pytest # Adapted from check-sfs # A. Schlemmer, 06/2021 from caoscrawler.crawl import Crawler, SecurityMode from caoscrawler.structure_elements import File, DictTextElement, DictListElement from caoscrawler.identifiable_adapters import IdentifiableAdapter, LocalStorageIdentifiableAdapter from simulated_server_data import full_data from functools import partial from copy import deepcopy from unittest.mock import patch import caosdb.common.models as dbmodels from unittest.mock import MagicMock, Mock from os.path import join, dirname, basename import yaml import caosdb as db from caosdb.apiutils import compare_entities import pytest from pytest import raises def rfp(*pathcomponents): """ Return full path. Shorthand convenience function. """ return join(dirname(__file__), *pathcomponents) ident = LocalStorageIdentifiableAdapter() ident.restore_state(rfp("records.xml")) full_data.update({el.name: el for el in ident._records if el.name is not None}) full_data.update({el.id: el for el in ident._records if el.name is None}) def dircheckstr(*pathcomponents): """ Return the debug tree identifier for a given path. """ return ("caoscrawler.structure_elements.Directory: " + basename( join(*pathcomponents)) + ", " + rfp( "test_directories", "examples_article", *pathcomponents)) @pytest.fixture def crawler(): crawler = Crawler(debug=True) crawler.crawl_directory(rfp("test_directories", "examples_article"), rfp("scifolder_cfood.yml")) return crawler @pytest.fixture def ident(crawler): ident = LocalStorageIdentifiableAdapter() crawler.identifiableAdapter = ident # The records.xml file is constructed as follows: # To a full run of the crawler, resolve all identifiables and insert all resulting entities. # See: test-setup/datamodel/generate_test_data.py for details. ident.restore_state(rfp("records.xml")) ident.register_identifiable( "Person", db.RecordType() .add_parent(name="Person") .add_property(name="first_name") .add_property(name="last_name")) ident.register_identifiable( "Measurement", db.RecordType() .add_parent(name="Measurement") .add_property(name="identifier") .add_property(name="date") .add_property(name="project")) ident.register_identifiable( "Project", db.RecordType() .add_parent(name="Project") .add_property(name="date") .add_property(name="identifier")) return ident def test_record_structure_generation(crawler): subd = crawler.debug_tree[dircheckstr("DataAnalysis")] subc = crawler.debug_metadata["copied"][dircheckstr("DataAnalysis")] assert len(subd) == 2 # variables store on Data Analysis node of debug tree assert len(subd[0]) == 2 # record store on Data Analysis node of debug tree assert len(subd[1]) == 0 assert len(subc) == 2 assert len(subc[0]) == 2 assert len(subc[1]) == 0 # The data analysis node creates one variable for the node itself: assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis" assert subc[0]["DataAnalysis"] is False subd = crawler.debug_tree[dircheckstr( "DataAnalysis", "2020_climate-model-predict")] subc = crawler.debug_metadata["copied"][dircheckstr( "DataAnalysis", "2020_climate-model-predict")] assert len(subd[1]) == 1 assert len(subd[1]["Project"].get_parents()) == 1 assert subd[1]["Project"].get_parents()[0].name == "Project" assert subd[1]["Project"].get_property("date").value == "2020" assert subd[1]["Project"].get_property( "identifier").value == "climate-model-predict" assert len(subd[0]) == 6 assert subd[0]["date"] == "2020" assert subd[0]["identifier"] == "climate-model-predict" assert subd[0]["Project"].__class__ == db.Record assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis" assert subc[0]["DataAnalysis"] is True assert subd[0]["project_dir"] == "examples_article/DataAnalysis/2020_climate-model-predict" assert subc[0]["project_dir"] is False # Check the copy flags for the first level in the hierarchy: assert len(subc[0]) == 6 assert len(subc[1]) == 1 assert subc[1]["Project"] is False assert subc[0]["Project"] is False assert subc[0]["date"] is False assert subc[0]["identifier"] is False subd = crawler.debug_tree[dircheckstr("DataAnalysis", "2020_climate-model-predict", "2020-02-08_prediction-errors")] subc = crawler.debug_metadata["copied"][dircheckstr("DataAnalysis", "2020_climate-model-predict", "2020-02-08_prediction-errors")] assert len(subd[0]) == 8 assert subd[0]["date"] == "2020-02-08" assert subd[0]["identifier"] == "prediction-errors" assert subd[0]["Project"].__class__ == db.Record assert subd[0]["Measurement"].__class__ == db.Record assert len(subd[1]) == 2 assert len(subd[1]["Project"].get_parents()) == 1 assert subd[1]["Project"].get_parents()[0].name == "Project" assert subd[1]["Project"].get_property("date").value == "2020" assert subd[1]["Project"].get_property( "identifier").value == "climate-model-predict" assert len(subd[1]["Measurement"].get_parents()) == 1 assert subd[1]["Measurement"].get_parents()[0].name == "Measurement" assert subd[1]["Measurement"].get_property("date").value == "2020-02-08" assert subd[1]["Measurement"].get_property( "identifier").value == "prediction-errors" assert subd[1]["Measurement"].get_property("project").value != "$Project" assert subd[1]["Measurement"].get_property( "project").value.__class__ == db.Record assert subd[1]["Measurement"].get_property( "project").value == subd[0]["Project"] # Check the copy flags for the second level in the hierarchy: assert subc[1]["Project"] is True assert subc[0]["Project"] is True assert subc[1]["Measurement"] is False assert subc[0]["Measurement"] is False assert subc[0]["date"] is False assert subc[0]["identifier"] is False # def prepare_test_record_file(): # ident = LocalStorageIdentifiableAdapter() # crawler = Crawler(debug=True, identifiableAdapter=ident) # crawler.crawl_directory(rfp("test_directories", "examples_article"), # rfp("scifolder_cfood.yml")) # # clean record list: # recordlist = ident.get_records() # for i in range(len(recordlist)-1, 1, -1): # if recordlist[i].parents[0].name == "Person": # del recordlist[i] # ident.store_state(rfp("records.xml")) def test_ambigious_records(crawler, ident): ident.get_records().clear() ident.get_records().extend(crawler.target_data) r = ident.get_records() id_r0 = ident.get_identifiable(r[0]) with raises(RuntimeError, match=".*unambigiously.*"): ident.retrieve_identified_record_for_identifiable(id_r0) def test_crawler_update_list(crawler, ident): # If the following assertions fail, that is a hint, that the test file records.xml has changed # and this needs to be updated: assert len(ident.get_records()) == 18 assert len( [r for r in ident.get_records() if r.parents[0].name == "Person"] ) == 5 assert len( [r for r in ident.get_records() if r.parents[0].name == "Measurement"] ) == 11 assert len( [r for r in ident.get_records() if r.parents[0].name == "Project"] ) == 2 # The crawler contains lots of duplicates, because identifiables have not been resolved yet: assert len(ident.get_records()) != len(crawler.target_data) # Check consistency: # Check whether identifiables retrieved from current identifiable store return # the same results. # take the first person in the list of records: for r in ident.get_records(): if r.parents[0].name == "Person": r_cur = r break id_r0 = ident.get_identifiable(r_cur) assert r_cur.parents[0].name == id_r0.parents[0].name assert r_cur.get_property( "first_name").value == id_r0.get_property("first_name").value assert r_cur.get_property( "last_name").value == id_r0.get_property("last_name").value assert len(r_cur.parents) == 1 assert len(id_r0.parents) == 1 assert len(r_cur.properties) == 2 assert len(id_r0.properties) == 2 idr_r0_test = ident.retrieve_identified_record_for_identifiable(id_r0) idr_r0 = ident.retrieve_identified_record_for_record(r_cur) assert idr_r0 == idr_r0_test # take the first measurement in the list of records: for r in ident.get_records(): if r.parents[0].name == "Measurement": r_cur = r break id_r1 = ident.get_identifiable(r_cur) assert r_cur.parents[0].name == id_r1.parents[0].name assert r_cur.get_property( "identifier").value == id_r1.get_property("identifier").value assert r_cur.get_property("date").value == id_r1.get_property("date").value assert r_cur.get_property( "project").value == id_r1.get_property("project").value assert len(r_cur.parents) == 1 assert len(id_r1.parents) == 1 assert len(r_cur.properties) == 4 assert len(id_r1.properties) == 3 idr_r1_test = ident.retrieve_identified_record_for_identifiable(id_r1) idr_r1 = ident.retrieve_identified_record_for_record(r_cur) assert idr_r1 == idr_r1_test assert idr_r1 != idr_r0 assert idr_r1_test != idr_r0_test assert len(idr_r1.properties) == 4 assert r_cur.get_property( "responsible").value == idr_r1.get_property("responsible").value assert r_cur.description == idr_r1.description # test whether compare_entites function works in this context: comp = compare_entities(r_cur, id_r1) assert len(comp[0]["parents"]) == 0 assert len(comp[1]["parents"]) == 0 assert len(comp[0]["properties"]) == 1 assert len(comp[1]["properties"]) == 0 assert "responsible" in comp[0]["properties"] assert "description" in comp[0] comp = compare_entities(r_cur, idr_r1) assert len(comp[0]["parents"]) == 0 assert len(comp[1]["parents"]) == 0 assert len(comp[0]["properties"]) == 0 assert len(comp[1]["properties"]) == 0 def test_synchronization(crawler, ident): insl, updl = crawler.synchronize(commit_changes=False) assert len(insl) == 0 assert len(updl) == 0 def test_identifiable_adapter(): query = IdentifiableAdapter.create_query_for_identifiable( db.Record().add_parent("Person") .add_property("first_name", value="A") .add_property("last_name", value="B")) assert query.lower() == "find record person with 'first_name'='a' and 'last_name'='b' " def test_remove_unnecessary_updates(): # test trvial case upl = [db.Record().add_parent("A")] irs = [db.Record().add_parent("A")] Crawler.remove_unnecessary_updates(upl, irs) assert len(upl) == 0 # test property difference case # TODO this should work right? # upl = [db.Record().add_parent("A").add_property("a", 3)] # irs = [db.Record().add_parent("A")] # ID should be s # Crawler.remove_unnecessary_updates(upl, irs) # assert len(upl) == 1 # test value difference case upl = [db.Record().add_parent("A").add_property("a", 5)] irs = [db.Record().add_parent("A").add_property("a")] Crawler.remove_unnecessary_updates(upl, irs) assert len(upl) == 1 upl = [db.Record().add_parent("A").add_property("a", 5)] irs = [db.Record().add_parent("A").add_property("a", 5)] Crawler.remove_unnecessary_updates(upl, irs) assert len(upl) == 0 # test unit difference case upl = [db.Record().add_parent("A").add_property("a", unit='cm')] irs = [db.Record().add_parent("A").add_property("a")] Crawler.remove_unnecessary_updates(upl, irs) assert len(upl) == 1 # test None difference case upl = [db.Record().add_parent("A").add_property("a")] irs = [db.Record().add_parent("A").add_property("a", 5)] Crawler.remove_unnecessary_updates(upl, irs) assert len(upl) == 1 # Current status: # TODO: currently, this test fails, because non identifiable records cannot # be inserted into the cache. Solution might be, just not to add them # into the local cache. Probably in split_into_inserts_and_updates. @pytest.mark.xfail def test_identifiable_adapter_no_identifiable(crawler, ident): del ident._registered_identifiables["Person"] insl, updl = crawler.synchronize() assert len(updl) == 0 pers = [r for r in crawler.target_data if r.parents[0].name == "Person"] # All persons are inserted, because they are not identifiable: assert len(insl) == len(pers) def test_provenance_debug_data(crawler): crawler.save_debug_data(rfp("provenance.yml")) with open(rfp("provenance.yml"), "r") as f: provenance = yaml.load(f, Loader=yaml.SafeLoader) pr = provenance["provenance"] def check_key_count(prefix): return sum([1 for key in pr.keys() if key.startswith(prefix)]) assert check_key_count("Measurement") == 11 assert check_key_count("Project") == 5 assert check_key_count("Person") == 14 def basic_retrieve_by_name_mock_up(rec, known): """ returns a stored Record if rec.name is an existing key, None otherwise """ if rec.name in known: return known[rec.name] else: return None @pytest.fixture def crawler_mocked_identifiable_retrieve(crawler): # mock retrieval of registered identifiabls: return Record with just a parent crawler.identifiableAdapter.get_registered_identifiable = Mock( side_effect=lambda x: db.Record().add_parent(x.parents[0].name)) # Simulate remote server content by using the names to identify records # There is only a single known Record with name A crawler.identifiableAdapter.retrieve_identified_record_for_record = Mock(side_effect=partial( basic_retrieve_by_name_mock_up, known={"A": db.Record(id=1111, name="A")})) return crawler def test_split_into_inserts_and_updates_trivial(crawler): # Try trivial argument crawler.split_into_inserts_and_updates([]) def test_split_into_inserts_and_updates_single(crawler_mocked_identifiable_retrieve): crawler = crawler_mocked_identifiable_retrieve entlist = [db.Record(name="A").add_parent( "C"), db.Record(name="B").add_parent("C")] assert crawler.get_identified_record_from_local_cache(entlist[0]) is None assert crawler.get_identified_record_from_local_cache(entlist[1]) is None assert crawler.can_be_checked_externally(entlist[0]) assert crawler.can_be_checked_externally(entlist[1]) assert crawler.identifiableAdapter.retrieve_identified_record_for_record( entlist[0]).id == 1111 assert crawler.identifiableAdapter.retrieve_identified_record_for_record( entlist[1]) is None insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist)) assert len(insert) == 1 assert insert[0].name == "B" assert len(update) == 1 assert update[0].name == "A" # if this ever fails, the mock up may be removed crawler.identifiableAdapter.get_registered_identifiable.assert_called() crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called() def test_split_into_inserts_and_updates_with_duplicate(crawler_mocked_identifiable_retrieve): crawler = crawler_mocked_identifiable_retrieve a = db.Record(name="A").add_parent("C") b = db.Record(name="B").add_parent("C") b.add_property("A", a) # This is identical to a and should be removed c = db.Record(name="A").add_parent("C") entlist = [a, b, c] insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist)) assert len(insert) == 1 assert insert[0].name == "B" assert len(update) == 1 assert update[0].name == "A" # if this ever fails, the mock up may be removed crawler.identifiableAdapter.get_registered_identifiable.assert_called() crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called() def test_split_into_inserts_and_updates_with_ref(crawler_mocked_identifiable_retrieve): crawler = crawler_mocked_identifiable_retrieve # try it with a reference a = db.Record(name="A").add_parent("C") b = db.Record(name="B").add_parent("C") b.add_property("A", a) entlist = [a, b] insert, update = crawler.split_into_inserts_and_updates(entlist) assert len(insert) == 1 assert insert[0].name == "B" assert len(update) == 1 assert update[0].name == "A" # if this ever fails, the mock up may be removed crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called() crawler.identifiableAdapter.get_registered_identifiable.assert_called() def test_split_into_inserts_and_updates_with_circ(crawler): # try circular a = db.Record(name="A").add_parent("C") b = db.Record(name="B").add_parent("C") b.add_property("A", a) a.add_property("B", b) entlist = [a, b] # TODO this does not seem to be complete! def test_split_into_inserts_and_updates_with_complex(crawler_mocked_identifiable_retrieve): crawler = crawler_mocked_identifiable_retrieve # A # ^ # | # F <- B <- G a = db.Record(name="A").add_parent("C").add_property( 'd', 13).add_property('e', "lskdjlsfdj") b = db.Record(name="B").add_parent("C") g = db.Record(name="G").add_parent("C") f = db.Record(name="F").add_parent("C") g.add_property("A", a) b.add_property("A", f) b.add_property("A", a) entlist = [a, b, g] insert, update = crawler.split_into_inserts_and_updates(entlist) assert len(insert) == 3 assert "B" in [el.name for el in insert] assert len(update) == 1 assert update[0].name == "A" # if this ever fails, the mock up may be removed crawler.identifiableAdapter.get_registered_identifiable.assert_called() crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called() # TODO write test where the unresoled entity is not part of the identifiable def test_split_into_inserts_and_updates_with_copy_attr(crawler_mocked_identifiable_retrieve): crawler = crawler_mocked_identifiable_retrieve # assume identifiable is only the name a = db.Record(name="A").add_parent("C") a.add_property("foo", 1) b = db.Record(name="A").add_parent("C") b.add_property("bar", 2) entlist = [a, b] insert, update = crawler.split_into_inserts_and_updates(entlist) assert update[0].get_property("bar").value == 2 assert update[0].get_property("foo").value == 1 # if this ever fails, the mock up may be removed crawler.identifiableAdapter.get_registered_identifiable.assert_called() crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called() def test_all_references_are_existing_already(crawler): # Simulate remote server content by using the names to identify records # There are only two known Records with name A and B crawler.identifiableAdapter.get_registered_identifiable = Mock(side_effect=partial( basic_retrieve_by_name_mock_up, known={"A": db.Record(name="A").add_parent("C"), "B": db.Record(name="B").add_parent("C")})) assert crawler.all_references_are_existing_already( db.Record().add_property('a', 123)) assert crawler.all_references_are_existing_already(db.Record() .add_property('a', db.Record(id=123))) assert crawler.all_references_are_existing_already(db.Record() .add_property('a', 123) .add_property('b', db.Record(id=123))) assert not crawler.all_references_are_existing_already(db.Record() .add_property('a', 123) .add_property('b', db.Record(name="A") .add_parent("C"))) a = db.Record(name="A").add_parent("C") crawler.add_identified_record_to_local_cache(a) assert crawler.all_references_are_existing_already(db.Record() .add_property('a', 123) .add_property('b', a)) # if this ever fails, the mock up may be removed crawler.identifiableAdapter.get_registered_identifiable.assert_called() def test_can_be_checked_externally(crawler): assert crawler.can_be_checked_externally( db.Record().add_property('a', 123)) assert crawler.can_be_checked_externally(db.Record() .add_property('a', db.Record(id=123))) assert crawler.can_be_checked_externally(db.Record() .add_property('a', 123) .add_property('b', db.Record(id=123))) assert not crawler.can_be_checked_externally(db.Record() .add_property('a', 123) .add_property('b', db.Record())) def test_replace_entities_with_ids(crawler): a = (db.Record().add_parent("B").add_property("A", 12345) .add_property("B", db.Record(id=12345)) .add_property("C", [db.Record(id=12345), 233324])) crawler.replace_entities_with_ids(a) assert a.get_property("A").value == 12345 assert a.get_property("B").value == 12345 assert a.get_property("C").value == [12345, 233324] def mock_get_entity_by_id(id): candidates = [el for el in list(full_data.values()) if el.id == id] if len(candidates) > 0: return candidates[0] else: raise ValueError() def mock_get_entity_by_name(name): candidates = [el for el in full_data.values() if (el.name is not None and el.name.lower() == name.lower())] if len(candidates) > 0: return candidates[0] else: raise ValueError() def prepare_crawler_with_sec_mode(mode, ident): crawler = Crawler(debug=True, securityMode=mode) crawler.crawl_directory(rfp("test_directories", "examples_article"), rfp("scifolder_cfood.yml")) crawler.identifiableAdapter = ident return crawler def reset_mocks(mocks): for mock in mocks: mock.reset_mock() def change_identifiable_prop(ident): # the checks in here are only to make sure we change the record as we intend to meas = ident._records[-2] assert meas.parents[0].name == "Measurement" resps = meas.properties[0] assert resps.name == "date" # change one element; This changes the date which is part of the identifiable resps.value = "2022-01-04" def change_non_identifiable_prop(ident): # the checks in here are only to make sure we change the record as we intend to meas = ident._records[-1] assert meas.parents[0].name == "Measurement" resps = meas.properties[-1] assert resps.name == "responsible" assert len(resps.value) == 2 # change one element; This removes a responsible which is not part of the identifiable del resps.value[-1] @patch("caoscrawler.crawl.Crawler._get_entity_by_id", new=Mock(side_effect=mock_get_entity_by_id)) @patch("caoscrawler.crawl.Crawler._get_entity_by_name", new=Mock(side_effect=mock_get_entity_by_name)) @patch("caoscrawler.crawl.db.Container.insert") @patch("caoscrawler.crawl.db.Container.update") @patch("caoscrawler.crawl.UpdateCache.insert") def test_security_mode(updateCacheMock, upmock, insmock, ident): records_backup = deepcopy(ident._records) # trivial case: nothing to do crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident) crawler.synchronize(commit_changes=True) assert crawler.run_id is not None insmock.assert_not_called() upmock.assert_not_called() updateCacheMock.assert_not_called() # RETRIEVE: insert only crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident) # remove one element del ident._records[-1] # insert forbidden crawler.synchronize(commit_changes=True) assert crawler.run_id is not None insmock.assert_not_called() upmock.assert_not_called() assert updateCacheMock.call_count == 1 # reset counts reset_mocks([updateCacheMock, insmock, upmock]) # restore original ident ident._records = deepcopy(records_backup) # RETRIEVE: update only crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident) # change one element change_non_identifiable_prop(ident) crawler.synchronize(commit_changes=True) assert crawler.run_id is not None insmock.assert_not_called() upmock.assert_not_called() assert updateCacheMock.call_count == 1 # reset counts reset_mocks([updateCacheMock, insmock, upmock]) # restore original ident ident._records = deepcopy(records_backup) # INSERT: insert only crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident) # remove one element del ident._records[-1] crawler.synchronize(commit_changes=True) assert crawler.run_id is not None insmock.assert_called_once() upmock.assert_not_called() updateCacheMock.assert_not_called() # reset counts reset_mocks([updateCacheMock, insmock, upmock]) # restore original ident ident._records = deepcopy(records_backup) # INSERT: update only crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident) # change one element change_non_identifiable_prop(ident) crawler.synchronize(commit_changes=True) assert crawler.run_id is not None insmock.assert_not_called() upmock.assert_not_called() updateCacheMock.assert_called_once() # reset counts reset_mocks([updateCacheMock, insmock, upmock]) # restore original ident ident._records = deepcopy(records_backup) # INSERT: insert and update crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident) # change two elements change_non_identifiable_prop(ident) change_identifiable_prop(ident) crawler.synchronize(commit_changes=True) assert crawler.run_id is not None insmock.asser_called_once() upmock.assert_not_called() updateCacheMock.assert_called_once() # reset counts reset_mocks([updateCacheMock, insmock, upmock]) # restore original ident ident._records = deepcopy(records_backup)