#!/usr/bin/env python3 # encoding: utf-8 # # This file is a part of the LinkAhead Project. # # Copyright (C) 2023,2024 Indiscale GmbH <info@indiscale.com> # Copyright (C) 2023,2024 Henrik tom Wörden <h.tomwoerden@indiscale.com> # 2021-2023 Research Group Biomedical Physics, # Max-Planck-Institute for Dynamics and Self-Organization Göttingen # Alexander Schlemmer <alexander.schlemmer@ds.mpg.de> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # """ Unit test functions for the scanner. """ from functools import partial from pathlib import Path from tempfile import NamedTemporaryFile from unittest.mock import MagicMock, Mock, patch import linkahead as db import pytest import yaml from caoscrawler.crawl import Crawler from caoscrawler.debug_tree import DebugTree from caoscrawler.scanner import (create_converter_registry, load_definition, scan_directory, scan_structure_elements, _load_definition_from_yaml_dict) from caoscrawler.structure_elements import (DictElement, DictListElement, DictTextElement, File) from pytest import raises from utils import dircheckstr as dircheck_base UNITTESTDIR = Path(__file__).parent dircheckstr = partial(dircheck_base, UNITTESTDIR / "test_directories" / "examples_article") def test_scan_structure_elements(): tmpfi = NamedTemporaryFile(delete=False) with open(UNITTESTDIR / "example_datastructure.yml", "r") as f: data = yaml.load(f, Loader=yaml.SafeLoader) crawler_definition = load_definition(UNITTESTDIR / "example_cfood.yml") converter_registry = create_converter_registry(crawler_definition) recs = scan_structure_elements(DictElement(name="", value=data), crawler_definition, converter_registry) assert len(recs) == 4 def test_provenance_debug_data(): # TODO rewrite the test to use a smaller example setup tmpfi = NamedTemporaryFile(delete=False) debug_tree = DebugTree() with open(UNITTESTDIR / "example_datastructure.yml", "r") as f: data = yaml.load(f, Loader=yaml.SafeLoader) crawler_definition = load_definition(UNITTESTDIR / "example_cfood.yml") converter_registry = create_converter_registry(crawler_definition) stuff = scan_structure_elements(DictElement(name="", value=data), crawler_definition, converter_registry, debug_tree=debug_tree) crawler = Crawler() crawler.save_debug_data(tmpfi.name, debug_tree) with open(tmpfi.name, "r") as f: provenance = yaml.load(f, Loader=yaml.SafeLoader) pr = provenance["provenance"] def check_key_count(prefix): return sum([1 for key in pr.keys() if key.startswith(prefix)]) assert check_key_count("Ent") == 4 def test_record_structure_generation(): # TODO create a test from this that tests scan_structure # the cfood should be minimal but cover typical scenarios (e.g. children) # add also a minimal test for scan_directory; it can be very basic since the only difference # to scan_structure is the kind of starting structure_element (check this statement) # The test should not check debug tree output but actual created records # TODO test creation of debug information in a separate test dbt = DebugTree() scan_directory(UNITTESTDIR / "test_directories" / "examples_article", UNITTESTDIR / "scifolder_cfood.yml", debug_tree=dbt) subd = dbt.debug_tree[dircheckstr("DataAnalysis")] subc = dbt.debug_metadata["copied"][dircheckstr("DataAnalysis")] assert len(subd) == 2 # variables store on Data Analysis node of debug tree assert len(subd[0]) == 4 # record store on Data Analysis node of debug tree assert len(subd[1]) == 0 assert len(subc) == 2 assert len(subc[0]) == 4 assert len(subc[1]) == 0 # The data analysis node creates one variable for the node itself: assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis" assert subc[0]["DataAnalysis"] is False subd = dbt.debug_tree[dircheckstr("DataAnalysis", "2020_climate-model-predict")] subc = dbt.debug_metadata["copied"][dircheckstr("DataAnalysis", "2020_climate-model-predict")] assert len(subd[1]) == 1 assert len(subd[1]["Project"].get_parents()) == 1 assert subd[1]["Project"].get_parents()[0].name == "Project" assert subd[1]["Project"].get_property("date").value == "2020" assert subd[1]["Project"].get_property( "identifier").value == "climate-model-predict" assert len(subd[0]) == 9 assert subd[0]["date"] == "2020" assert subd[0]["identifier"] == "climate-model-predict" assert subd[0]["Project"].__class__ == db.Record assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis" assert subc[0]["DataAnalysis"] is True assert subd[0]["project_dir"] == "examples_article/DataAnalysis/2020_climate-model-predict" assert subc[0]["project_dir"] is False # Check the copy flags for the first level in the hierarchy: assert len(subc[0]) == 9 assert len(subc[1]) == 1 assert subc[1]["Project"] is False assert subc[0]["Project"] is False assert subc[0]["date"] is False assert subc[0]["identifier"] is False subd = dbt.debug_tree[dircheckstr("DataAnalysis", "2020_climate-model-predict", "2020-02-08_prediction-errors")] subc = dbt.debug_metadata["copied"][dircheckstr("DataAnalysis", "2020_climate-model-predict", "2020-02-08_prediction-errors")] assert len(subd[0]) == 12 assert subd[0]["date"] == "2020-02-08" assert subd[0]["identifier"] == "prediction-errors" assert subd[0]["Project"].__class__ == db.Record assert subd[0]["Measurement"].__class__ == db.Record assert len(subd[1]) == 2 assert len(subd[1]["Project"].get_parents()) == 1 assert subd[1]["Project"].get_parents()[0].name == "Project" assert subd[1]["Project"].get_property("date").value == "2020" assert subd[1]["Project"].get_property( "identifier").value == "climate-model-predict" assert len(subd[1]["Measurement"].get_parents()) == 1 assert subd[1]["Measurement"].get_parents()[0].name == "Measurement" assert subd[1]["Measurement"].get_property("date").value == "2020-02-08" assert subd[1]["Measurement"].get_property( "identifier").value == "prediction-errors" assert subd[1]["Measurement"].get_property("project").value != "$Project" assert subd[1]["Measurement"].get_property( "project").value.__class__ == db.Record assert subd[1]["Measurement"].get_property( "project").value == subd[0]["Project"] # Check the copy flags for the second level in the hierarchy: assert subc[1]["Project"] is True assert subc[0]["Project"] is True assert subc[1]["Measurement"] is False assert subc[0]["Measurement"] is False assert subc[0]["date"] is False assert subc[0]["identifier"] is False def test_record_generation(): """ Test the correct list of returned records by the scanner using the scifolder example from the article. """ records = scan_directory(UNITTESTDIR / "test_directories" / "examples_article", UNITTESTDIR / "scifolder_cfood.yml") def parent_filter(parent_name): return [p for p in records if len(p.parents) == 1 and p.parents[0].name == parent_name] def check_properties(records, check_props, check_additional=True): records_found = [0 for r in check_props] for rec in records: rec_found = 0 # Try each record to check for i, check_prop in enumerate(check_props): matches = True # Verify that all props are in the record and have the right value for pr in check_prop: if rec.get_property(pr) is None: matches = False break if check_prop[pr] is None: if rec.get_property(pr).value is not None: matches = False break else: if rec.get_property(pr).value != check_prop[pr]: matches = False break if check_additional: # Verify that there are no additional props in the record for rpr in rec.properties: if rpr.name not in check_prop: matches = False break if matches: records_found[i] += 1 return records_found # Check projects: # Ther are two projects in mixed categories: climate_model_predict and SpeedOfLight projects_found = check_properties(parent_filter("Project"), [ {"identifier": "climate-model-predict", "date": "2020"}, {"identifier": "SpeedOfLight", "date": "2020"} ]) assert projects_found == [3, 2] measurements = parent_filter("Measurement") assert len(measurements) == 11 measurements_found = check_properties(measurements, [ {"identifier": "prediction-errors", "date": "2020-02-08"}, {"identifier": "average-all-exp", "date": "2020-01-04"}, {"identifier": "average-all-exp-corr", "date": "2020-01-05"}, {"date": "1980-01-01", "identifier": None}, {"date": "1990-01-01", "identifier": None}, {"date": "2000-01-01", "identifier": None}, {"date": "2010-01-01", "identifier": None}, {"date": "2020-01-01", "identifier": "TimeOfFlight"}, {"date": "2020-01-02", "identifier": "Cavity"}, {"date": "2020-01-03", "identifier": None}, {"date": "2020-02-01", "identifier": None}, ], False) for f in measurements_found: assert f == 1 persons = parent_filter("Person") check_props = [ {"first_name": None, "last_name": "Author" + letter} for letter in ("A", "B", "C", "D", "E")] persons_found = check_properties(persons, check_props) for f in persons_found: assert f > 0 def test_variable_deletion_problems(): records = scan_directory(UNITTESTDIR / "test_directories" / "example_variable_deletion", UNITTESTDIR / "cfood_variable_deletion.yml") for record in records: if record.name == "Record from Data_1": assert record.get_property("var1").value == "bla" assert record.get_property("var2").value == "$test_2" elif record.name == "Record from Data_2": assert record.get_property("var1").value == "$test_1" assert record.get_property("var2").value == "test" else: raise RuntimeError("Wrong name") records = scan_directory(UNITTESTDIR / "test_directories" / "example_variable_deletion", UNITTESTDIR / "cfood_variable_deletion2.yml") # For the following test the order of records is actually important: assert records[0].name == "Record from Data_1" assert records[1].name == "Record from Data_2" for record in records: if record.name == "Record from Data_1": assert record.get_property("var1").value == "bla" assert record.get_property("var2").value == "$test_2" elif record.name == "Record from Data_2": assert record.get_property("var1").value == "example_variable_deletion" assert record.get_property("var2").value == "test" else: raise RuntimeError("Wrong name") def test_record_parents(): """ Test the correct list of returned records by the scanner """ data = { 'Experiments': {} } crawler_definition = load_definition(UNITTESTDIR / "test_parent_cfood.yml") converter_registry = create_converter_registry(crawler_definition) records = scan_structure_elements(DictElement(name="", value=data), crawler_definition, converter_registry) assert len(records) == 4 for rec in records: if rec.name == 'e': assert rec.parents[0].name == 'Exp' # default parent was overwritten assert len(rec.parents) == 1 elif rec.name == 'c': assert rec.parents[0].name == 'Cap2' # default parent was overwritten by second # converter assert len(rec.parents) == 1 elif rec.name == 'p': assert rec.parents[0].name == 'Projekt' # top level set parent was overwritten assert len(rec.parents) == 1 elif rec.name == 's': assert rec.parents[0].name == 'Stuff' # default parent stays if no parent is given on # lower levels assert len(rec.parents) == 1 def test_error_messages(): data = { 'Experiments': {} } broken_yaml = """ EmptyConverter: """ broken_definition = _load_definition_from_yaml_dict( [yaml.load(broken_yaml, Loader=yaml.SafeLoader)]) converter_registry = create_converter_registry(broken_definition) with pytest.raises(RuntimeError, match="Definition of converter \"EmptyConverter\" is empty"): scan_structure_elements(DictElement(name="", value=data), broken_definition, converter_registry) broken_yaml = """ Converter: type: DictElement records: TestRecord: "42" """ broken_definition = _load_definition_from_yaml_dict( [yaml.load(broken_yaml, Loader=yaml.SafeLoader)]) converter_registry = create_converter_registry(broken_definition) with pytest.raises(RuntimeError, match="dict expected, but found str: 42"): scan_structure_elements(DictElement(name="", value=data), broken_definition, converter_registry)