Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_scanner.py 14.31 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2023,2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2023,2024 Henrik tom Wörden <h.tomwoerden@indiscale.com>
#               2021-2023 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Alexander Schlemmer <alexander.schlemmer@ds.mpg.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""
Unit test functions for the scanner.
"""

from functools import partial
from pathlib import Path
from tempfile import NamedTemporaryFile
from unittest.mock import MagicMock, Mock, patch

import linkahead as db
import pytest
import yaml
from caoscrawler.crawl import Crawler
from caoscrawler.debug_tree import DebugTree
from caoscrawler.scanner import (create_converter_registry, load_definition,
                                 scan_directory, scan_structure_elements,
                                 _load_definition_from_yaml_dict)
from caoscrawler.structure_elements import (DictElement, DictListElement,
                                            DictTextElement, File)
from pytest import raises

from utils import dircheckstr as dircheck_base

UNITTESTDIR = Path(__file__).parent

dircheckstr = partial(dircheck_base, UNITTESTDIR / "test_directories" / "examples_article")


def test_scan_structure_elements():
    tmpfi = NamedTemporaryFile(delete=False)
    with open(UNITTESTDIR / "example_datastructure.yml", "r") as f:
        data = yaml.load(f, Loader=yaml.SafeLoader)

    crawler_definition = load_definition(UNITTESTDIR / "example_cfood.yml")
    converter_registry = create_converter_registry(crawler_definition)
    recs = scan_structure_elements(DictElement(name="", value=data), crawler_definition,
                                   converter_registry)
    assert len(recs) == 4


def test_provenance_debug_data():
    # TODO rewrite the test to use a smaller example setup
    tmpfi = NamedTemporaryFile(delete=False)
    debug_tree = DebugTree()
    with open(UNITTESTDIR / "example_datastructure.yml", "r") as f:
        data = yaml.load(f, Loader=yaml.SafeLoader)

    crawler_definition = load_definition(UNITTESTDIR / "example_cfood.yml")
    converter_registry = create_converter_registry(crawler_definition)
    stuff = scan_structure_elements(DictElement(name="", value=data), crawler_definition,
                                    converter_registry, debug_tree=debug_tree)
    crawler = Crawler()
    crawler.save_debug_data(tmpfi.name, debug_tree)
    with open(tmpfi.name, "r") as f:
        provenance = yaml.load(f, Loader=yaml.SafeLoader)

    pr = provenance["provenance"]

    def check_key_count(prefix):
        return sum([1 for key in pr.keys() if key.startswith(prefix)])
    assert check_key_count("Ent") == 4


def test_record_structure_generation():
    # TODO create a test from this that tests scan_structure
    # the cfood should be minimal but cover typical scenarios (e.g. children)
    # add also a minimal test for scan_directory; it can be very basic since the only difference
    # to scan_structure is the kind of starting structure_element (check this statement)
    # The test should not check debug tree output but actual created records

    # TODO test creation of debug information in a separate test

    dbt = DebugTree()
    scan_directory(UNITTESTDIR / "test_directories" / "examples_article",
                   UNITTESTDIR / "scifolder_cfood.yml",
                   debug_tree=dbt)
    subd = dbt.debug_tree[dircheckstr("DataAnalysis")]
    subc = dbt.debug_metadata["copied"][dircheckstr("DataAnalysis")]
    assert len(subd) == 2
    # variables store on Data Analysis node of debug tree
    assert len(subd[0]) == 4
    # record store on Data Analysis node of debug tree
    assert len(subd[1]) == 0
    assert len(subc) == 2
    assert len(subc[0]) == 4
    assert len(subc[1]) == 0

    # The data analysis node creates one variable for the node itself:
    assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis"
    assert subc[0]["DataAnalysis"] is False

    subd = dbt.debug_tree[dircheckstr("DataAnalysis", "2020_climate-model-predict")]
    subc = dbt.debug_metadata["copied"][dircheckstr("DataAnalysis", "2020_climate-model-predict")]

    assert len(subd[1]) == 1
    assert len(subd[1]["Project"].get_parents()) == 1
    assert subd[1]["Project"].get_parents()[0].name == "Project"
    assert subd[1]["Project"].get_property("date").value == "2020"
    assert subd[1]["Project"].get_property(
        "identifier").value == "climate-model-predict"

    assert len(subd[0]) == 9
    assert subd[0]["date"] == "2020"
    assert subd[0]["identifier"] == "climate-model-predict"
    assert subd[0]["Project"].__class__ == db.Record

    assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis"
    assert subc[0]["DataAnalysis"] is True
    assert subd[0]["project_dir"] == "examples_article/DataAnalysis/2020_climate-model-predict"
    assert subc[0]["project_dir"] is False

    # Check the copy flags for the first level in the hierarchy:
    assert len(subc[0]) == 9
    assert len(subc[1]) == 1
    assert subc[1]["Project"] is False
    assert subc[0]["Project"] is False
    assert subc[0]["date"] is False
    assert subc[0]["identifier"] is False

    subd = dbt.debug_tree[dircheckstr("DataAnalysis",
                                      "2020_climate-model-predict",
                                      "2020-02-08_prediction-errors")]
    subc = dbt.debug_metadata["copied"][dircheckstr("DataAnalysis",
                                                    "2020_climate-model-predict",
                                                    "2020-02-08_prediction-errors")]
    assert len(subd[0]) == 12
    assert subd[0]["date"] == "2020-02-08"
    assert subd[0]["identifier"] == "prediction-errors"
    assert subd[0]["Project"].__class__ == db.Record
    assert subd[0]["Measurement"].__class__ == db.Record

    assert len(subd[1]) == 2

    assert len(subd[1]["Project"].get_parents()) == 1
    assert subd[1]["Project"].get_parents()[0].name == "Project"
    assert subd[1]["Project"].get_property("date").value == "2020"
    assert subd[1]["Project"].get_property(
        "identifier").value == "climate-model-predict"

    assert len(subd[1]["Measurement"].get_parents()) == 1
    assert subd[1]["Measurement"].get_parents()[0].name == "Measurement"
    assert subd[1]["Measurement"].get_property("date").value == "2020-02-08"
    assert subd[1]["Measurement"].get_property(
        "identifier").value == "prediction-errors"
    assert subd[1]["Measurement"].get_property("project").value != "$Project"
    assert subd[1]["Measurement"].get_property(
        "project").value.__class__ == db.Record
    assert subd[1]["Measurement"].get_property(
        "project").value == subd[0]["Project"]

    # Check the copy flags for the second level in the hierarchy:
    assert subc[1]["Project"] is True
    assert subc[0]["Project"] is True
    assert subc[1]["Measurement"] is False
    assert subc[0]["Measurement"] is False
    assert subc[0]["date"] is False
    assert subc[0]["identifier"] is False


def test_record_generation():
    """
    Test the correct list of returned records by the scanner using the
    scifolder example from the article.
    """

    records = scan_directory(UNITTESTDIR / "test_directories" / "examples_article",
                             UNITTESTDIR / "scifolder_cfood.yml")

    def parent_filter(parent_name):
        return [p for p in records if len(p.parents) == 1 and p.parents[0].name == parent_name]

    def check_properties(records, check_props, check_additional=True):
        records_found = [0 for r in check_props]
        for rec in records:
            rec_found = 0
            # Try each record to check
            for i, check_prop in enumerate(check_props):
                matches = True
                # Verify that all props are in the record and have the right value
                for pr in check_prop:
                    if rec.get_property(pr) is None:
                        matches = False
                        break
                    if check_prop[pr] is None:
                        if rec.get_property(pr).value is not None:
                            matches = False
                            break
                    else:
                        if rec.get_property(pr).value != check_prop[pr]:
                            matches = False
                            break
                if check_additional:
                    # Verify that there are no additional props in the record
                    for rpr in rec.properties:
                        if rpr.name not in check_prop:
                            matches = False
                            break
                if matches:
                    records_found[i] += 1
        return records_found

    # Check projects:
    # Ther are two projects in mixed categories: climate_model_predict and SpeedOfLight
    projects_found = check_properties(parent_filter("Project"), [
        {"identifier": "climate-model-predict", "date": "2020"},
        {"identifier": "SpeedOfLight", "date": "2020"}
    ])
    assert projects_found == [3, 2]

    measurements = parent_filter("Measurement")
    assert len(measurements) == 11
    measurements_found = check_properties(measurements, [
        {"identifier": "prediction-errors", "date": "2020-02-08"},
        {"identifier": "average-all-exp", "date": "2020-01-04"},
        {"identifier": "average-all-exp-corr", "date": "2020-01-05"},
        {"date": "1980-01-01", "identifier": None},
        {"date": "1990-01-01", "identifier": None},
        {"date": "2000-01-01", "identifier": None},
        {"date": "2010-01-01", "identifier": None},
        {"date": "2020-01-01", "identifier": "TimeOfFlight"},
        {"date": "2020-01-02", "identifier": "Cavity"},
        {"date": "2020-01-03", "identifier": None},
        {"date": "2020-02-01", "identifier": None},
    ], False)
    for f in measurements_found:
        assert f == 1

    persons = parent_filter("Person")
    check_props = [
        {"first_name": None, "last_name": "Author" + letter} for letter in
        ("A", "B", "C", "D", "E")]
    persons_found = check_properties(persons, check_props)
    for f in persons_found:
        assert f > 0


def test_variable_deletion_problems():
    records = scan_directory(UNITTESTDIR / "test_directories" / "example_variable_deletion",
                             UNITTESTDIR / "cfood_variable_deletion.yml")

    for record in records:
        if record.name == "Record from Data_1":
            assert record.get_property("var1").value == "bla"
            assert record.get_property("var2").value == "$test_2"
        elif record.name == "Record from Data_2":
            assert record.get_property("var1").value == "$test_1"
            assert record.get_property("var2").value == "test"
        else:
            raise RuntimeError("Wrong name")

    records = scan_directory(UNITTESTDIR / "test_directories" / "example_variable_deletion",
                             UNITTESTDIR / "cfood_variable_deletion2.yml")

    # For the following test the order of records is actually important:
    assert records[0].name == "Record from Data_1"
    assert records[1].name == "Record from Data_2"
    for record in records:
        if record.name == "Record from Data_1":
            assert record.get_property("var1").value == "bla"
            assert record.get_property("var2").value == "$test_2"
        elif record.name == "Record from Data_2":
            assert record.get_property("var1").value == "example_variable_deletion"
            assert record.get_property("var2").value == "test"
        else:
            raise RuntimeError("Wrong name")


def test_record_parents():
    """ Test the correct list of returned records by the scanner     """

    data = {
        'Experiments': {}
    }

    crawler_definition = load_definition(UNITTESTDIR / "test_parent_cfood.yml")
    converter_registry = create_converter_registry(crawler_definition)

    records = scan_structure_elements(DictElement(name="", value=data), crawler_definition,
                                      converter_registry)
    assert len(records) == 4
    for rec in records:
        if rec.name == 'e':
            assert rec.parents[0].name == 'Exp'  # default parent was overwritten
            assert len(rec.parents) == 1
        elif rec.name == 'c':
            assert rec.parents[0].name == 'Cap2'  # default parent was overwritten by second
            # converter
            assert len(rec.parents) == 1
        elif rec.name == 'p':
            assert rec.parents[0].name == 'Projekt'  # top level set parent was overwritten
            assert len(rec.parents) == 1
        elif rec.name == 's':
            assert rec.parents[0].name == 'Stuff'  # default parent stays if no parent is given on
            # lower levels
            assert len(rec.parents) == 1


def test_error_messages():
    data = {
        'Experiments': {}
    }

    broken_yaml = """
EmptyConverter:
    """
    broken_definition = _load_definition_from_yaml_dict(
        [yaml.load(broken_yaml, Loader=yaml.SafeLoader)])

    converter_registry = create_converter_registry(broken_definition)

    with pytest.raises(RuntimeError, match="Definition of converter \"EmptyConverter\" is empty"):
        scan_structure_elements(DictElement(name="", value=data), broken_definition, converter_registry)

    broken_yaml = """
Converter:
  type: DictElement
  records:
    TestRecord: "42"
    """

    broken_definition = _load_definition_from_yaml_dict(
        [yaml.load(broken_yaml, Loader=yaml.SafeLoader)])

    converter_registry = create_converter_registry(broken_definition)

    with pytest.raises(RuntimeError, match="dict expected, but found str: 42"):
      scan_structure_elements(DictElement(name="", value=data), broken_definition, converter_registry)