Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_converters.py 20.14 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2021,2022 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2021,2022 Henrik tom Wörden <h.tomwoerden@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

"""
test the converters module
"""
import json
import yaml
import importlib
import os
from itertools import product
import datetime
import pytest
import yaml

from caoscrawler.converters import (Converter, ConverterValidationError, DictElementConverter,
                                    DirectoryConverter, DictIntegerElementConverter,
                                    handle_value, MarkdownFileConverter, DateElementConverter,
                                    FloatElementConverter, IntegerElementConverter,
                                    JSONFileConverter, YAMLFileConverter)
from caoscrawler.converters import _AbstractScalarValueElementConverter
from caoscrawler.crawl import Crawler
from caoscrawler.stores import GeneralStore
from caoscrawler.structure_elements import (File, TextElement, ListElement, DictElement,
                                            BooleanElement, IntegerElement,
                                            FloatElement, Directory)

from test_tool import rfp


@pytest.fixture
def converter_registry():
    converter_registry: dict[str, dict[str, str]] = {
        "Directory": {
            "converter": "DirectoryConverter",
            "package": "caoscrawler.converters"},
        "MarkdownFile": {
            "converter": "MarkdownFileConverter",
            "package": "caoscrawler.converters"},
        "Date": {
            "converter": "DateElementConverter",
            "package": "caoscrawler.converters"},
        "DictElement": {
            "converter": "DictElementConverter",
            "package": "caoscrawler.converters"},
        "TextElement": {
            "converter": "TextElementConverter",
            "package": "caoscrawler.converters"},
        "ListElement": {
            "converter": "ListElementConverter",
            "package": "caoscrawler.converters"},
        "JSONFile": {
            "converter": "JSONFileConverter",
            "package": "caoscrawler.converters"},
    }

    for key, value in converter_registry.items():
        module = importlib.import_module(value["package"])
        value["class"] = getattr(module, value["converter"])
    return converter_registry


def testConverterTrivial(converter_registry):

    types = [
        "Directory",
        "MarkdownFile",
        "TextElement",
        "ListElement",
        "TextElement"
    ]

    for ct in types:
        Converter.converter_factory(
            definition={
                "type": ct},
            name="Test",
            converter_registry=converter_registry)


def testDirectoryConverter(converter_registry):
    """ test using the "test_directories" folder"""
    dc = Converter.converter_factory(
        definition={
            "type": "Directory"
        },
        name="Test", converter_registry=converter_registry)
    elements = dc.create_children(GeneralStore(),
                                  Directory("test_directories", rfp("test_directories")))

    # Check whether the right structure elements were created
    # this has been updated, there are more directories now
    # assert len(elements) == 1
    element_names = []
    for element in elements:
        assert isinstance(element, Directory)
        element_names.append(element.name)
    assert "examples_article" in element_names
    assert "example_overwrite_1" in element_names
    assert "example_insert" in element_names


def test_markdown_converter(converter_registry):
    test_readme = File(
        "README.md",
        rfp(
            "test_directories", "examples_article", "DataAnalysis",
            "2020_climate-model-predict", "2020-02-08_prediction-errors", "README.md"
        )
    )

    converter = MarkdownFileConverter({
        "match": "(.*)"
    }, "TestMarkdownFileConverter",
       converter_registry)

    m = converter.match(File("test_tool.py", rfp(
        "test_tool.py")))
    assert m is None

    m = converter.match(test_readme)
    assert m is not None
    assert m.__class__ == dict
    assert len(m) == 0

    converter = MarkdownFileConverter({
        "match": "README.md"
    }, "TestMarkdownFileConverter",
       converter_registry)

    m = converter.match(test_readme)
    assert m is not None
    assert len(m) == 0

    children = converter.create_children(None, test_readme)
    assert len(children) == 5
    assert children[1].__class__ == TextElement
    assert children[1].name == "description"
    assert children[1].value.__class__ == str

    assert children[0].__class__ == TextElement
    assert children[0].name == "responsible"
    assert children[0].value.__class__ == str

    test_readme2 = File(
        "README.md",
        rfp("test_directories", "examples_article",
            "ExperimentalData", "2020_SpeedOfLight", "2020-01-01_TimeOfFlight", "README.md")
    )

    m = converter.match(test_readme2)
    assert m is not None
    assert len(m) == 0

    children = converter.create_children(None, test_readme2)
    assert len(children) == 2
    assert children[1].__class__ == TextElement
    assert children[1].name == "description"
    assert children[1].value.__class__ == str

    assert children[0].__class__ == ListElement
    assert children[0].name == "responsible"
    assert children[0].value.__class__ == list


def test_json_converter(converter_registry):
    test_json = File("testjson.json", rfp(
        "test_directories", "examples_json", "testjson.json"))

    schema_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                               "test_directories", "examples_json", "testjson.schema.json")
    jsonconverter = JSONFileConverter(
        definition={"match": "(.*)", "validate": schema_path},
        name="TestJSONFileConverter",
        converter_registry=converter_registry)

    m = jsonconverter.match(test_json)
    assert m is not None
    assert len(m) == 0

    dict_el = jsonconverter.create_children(None, test_json)
    assert len(dict_el) == 1

    dictconverter = DictElementConverter(
        definition={"match_name": "(.*)"},
        name="dictconv",
        converter_registry=converter_registry)
    children = dictconverter.create_children(None, dict_el[0])
    for child in children:
        if child.name == "name":
            assert isinstance(child, TextElement)
            assert isinstance(child.value, str)
            assert child.value == "DEMO"
        elif child.name == "projectId":
            assert isinstance(child, IntegerElement)
            assert isinstance(child.value, int)
            assert child.value == 10002
        elif child.name == "archived":
            assert isinstance(child, BooleanElement)
            assert isinstance(child.value, bool)
            assert child.value is False
        elif child.name == "Person":
            assert isinstance(child, ListElement)
            assert isinstance(child.value, list)
            assert len(child.value) == 2
        elif child.name == "start_date":
            assert isinstance(child, TextElement)
            assert isinstance(child.value, str)
            assert child.value == '2022-03-01'
        elif child.name == "candidates":
            assert isinstance(child, ListElement)
            assert isinstance(child.value, list)
            assert child.value == ["Mouse", "Penguine"]
        elif child.name == "rvalue":
            assert isinstance(child, FloatElement)
            assert isinstance(child.value, float)
        elif child.name == "url":
            assert isinstance(child, TextElement)
            assert isinstance(child.value, str)
        else:
            raise ValueError()

    invalid_json = File(
        "invalidjson.json",
        rfp("test_directories", "examples_json", "invalidjson.json")
    )
    # Doesn't validate because of missing required 'name' property
    with pytest.raises(ConverterValidationError) as err:
        jsonconverter.create_children(None, invalid_json)
        assert err.value.message.startswith("Couldn't validate")

    broken_json = File(
        "brokenjson.json",
        rfp("test_directories", "examples_json", "brokenjson.json")
    )
    with pytest.raises(json.decoder.JSONDecodeError) as err:
        jsonconverter.create_children(None, broken_json)


def test_yaml_converter(converter_registry):
    test_yaml = File("testyaml.yml", rfp(
        "test_directories", "test_yamls", "testyaml.yml"))

    schema_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                               "test_directories", "test_yamls", "testyaml.schema.json")
    yamlconverter = YAMLFileConverter(
        definition={"match": "(.*)", "validate": schema_path},
        name="TestYAMLFileConverter",
        converter_registry=converter_registry)

    m = yamlconverter.match(test_yaml)
    assert m is not None
    assert len(m) == 0

    dict_el = yamlconverter.create_children(None, test_yaml)
    assert len(dict_el) == 1

    dictconverter = DictElementConverter(
        definition={"match_name": "(.*)"},
        name="dictconv",
        converter_registry=converter_registry)
    children = dictconverter.create_children(None, dict_el[0])
    for child in children:
        if child.name == "name":
            assert isinstance(child, TextElement)
            assert isinstance(child.value, str)
            assert child.value == "DEMO"
        elif child.name == "projectId":
            assert isinstance(child, IntegerElement)
            assert isinstance(child.value, int)
            assert child.value == 10002
        elif child.name == "archived":
            assert isinstance(child, BooleanElement)
            assert isinstance(child.value, bool)
            assert child.value is False
        elif child.name == "Person":
            assert isinstance(child, ListElement)
            assert isinstance(child.value, list)
            assert len(child.value) == 2
        elif child.name == "start_date":
            assert isinstance(child, TextElement)
            assert isinstance(child.value, str)
            assert child.value == '2022-03-01'
        elif child.name == "candidates":
            assert isinstance(child, ListElement)
            assert isinstance(child.value, list)
            assert child.value == ["Mouse", "Penguine"]
        elif child.name == "rvalue":
            assert isinstance(child, FloatElement)
            assert isinstance(child.value, float)
        elif child.name == "url":
            assert isinstance(child, TextElement)
            assert isinstance(child.value, str)
        else:
            raise ValueError()

    invalid_yaml = File(
        "invalidyaml.yml",
        rfp("test_directories", "test_yamls", "invalidyaml.yml")
    )

    # Doesn't validate because of missing required 'name' property
    with pytest.raises(ConverterValidationError) as err:
        yamlconverter.create_children(None, invalid_yaml)
        assert err.value.message.startswith("Couldn't validate")

    broken_yaml = File(
        "brokenyaml.yml",
        rfp("test_directories", "test_yamls", "brokenyaml.yml")
    )
    with pytest.raises(yaml.parser.ParserError) as err:
        yamlconverter.create_children(None, broken_yaml)


def test_variable_replacement():
    values = GeneralStore()
    values["a"] = 4
    values["b"] = "68"

    assert handle_value("b", values) == ("b", "single")
    assert handle_value("+b", values) == ("b", "list")
    assert handle_value("*b", values) == ("b", "multiproperty")
    assert handle_value("$b", values) == ("68", "single")
    assert handle_value("+$b", values) == ("68", "list")
    assert handle_value("*$b", values) == ("68", "multiproperty")

    assert handle_value({"value": "b",
                         "collection_mode": "single"}, values) == ("b", "single")
    assert handle_value({"value": "b",
                         "collection_mode": "list"}, values) == ("b", "list")
    assert handle_value({"value": "b",
                         "collection_mode": "multiproperty"}, values) == ("b", "multiproperty")
    assert handle_value({"value": "$b",
                         "collection_mode": "single"}, values) == ("68", "single")
    assert handle_value({"value": "$b",
                         "collection_mode": "list"}, values) == ("68", "list")
    assert handle_value({"value": "$b",
                         "collection_mode": "multiproperty"}, values) == ("68", "multiproperty")

    assert handle_value(["a", "b"], values) == (["a", "b"], "single")
    assert handle_value(["$a", "$b"], values) == (["4", "68"], "single")


def test_filter_children_of_directory(converter_registry, capsys):
    """Verify that children (i.e., files) in a directory are filtered or sorted
    correctly.

    """
    test_dir = Directory("examples_filter_children", rfp(
        "test_directories", "examples_filter_children"))

    dc = DirectoryConverter(
        definition={
            "match": "(.*)",
            "debug_match": True,
            "filter": {
                "expr": "test_(?P<date>[0-9]{4,4}-[0-9]{2,2}-[0-9]{2,2}).json",
                "group": "date",
                "rule": "only_max"
            }
        },
        name="TestOnlyMaxDirectoryConverter",
        converter_registry=converter_registry
    )

    m = dc.match(test_dir)
    assert m is not None
    # checking debug output
    captured = capsys.readouterr()
    # the name
    assert "examples_filter_children" in captured.out
    # the regexp
    assert "(.*)" in captured.out
    # the empty result set
    assert "{}" in captured.out

    # This should only contain the youngest json and the csv that doesn't match
    # the above filter expression.
    children = dc.create_children(None, test_dir)
    assert len(children) == 2
    assert children[0].__class__ == File
    assert children[0].name == "test_2022-02-02.json"
    assert children[1].__class__ == File
    assert children[1].name == "some_other_file.csv"

    dc = DirectoryConverter(
        definition={
            "match": "(.*)",
            "filter": {
                "expr": "test_(?P<date>[0-9]{4,4}-[0-9]{2,2}-[0-9]{2,2}).json",
                "group": "date",
                "rule": "only_min"
            }
        },
        name="TestOnlyMinDirectoryConverter",
        converter_registry=converter_registry
    )

    m = dc.match(test_dir)
    assert m is not None

    # This should only contain the youngest json and the csv that doesn't match
    # the above filter expression.
    children = dc.create_children(None, test_dir)
    assert len(children) == 2
    assert children[0].__class__ == File
    assert children[0].name == "test_2022-01-01.json"
    assert children[1].__class__ == File
    assert children[1].name == "some_other_file.csv"

    dc = DirectoryConverter(
        definition={
            "match": "(.*)",
            "filter": {
                "expr": "test_(?P<date>[0-9]{4,4}-[0-9]{2,2}-[0-9]{2,2}).json",
                "group": "date",
                "rule": "does_not_exist"
            }
        },
        name="TestBrokenDirectoryConverter",
        converter_registry=converter_registry
    )

    m = dc.match(test_dir)
    assert m is not None

    with pytest.raises(RuntimeError):
        children = dc.create_children(None, test_dir)


def test_validate_custom_converters():
    one_doc_yaml = """
Converters:
  MyNewType:
    converter: MyNewTypeConverter
    package: some_package.my_converters
MyElement:
  type: MyNewType
  match: something
    """
    crawler1 = Crawler()
    one_doc_definitions = crawler1._load_definition_from_yaml_dict(
        [yaml.load(one_doc_yaml, Loader=yaml.SafeLoader)])
    assert "MyElement" in one_doc_definitions
    assert one_doc_definitions["MyElement"]["type"] == "MyNewType"

    # this has to be equivalent
    two_doc_yaml = """
---
metadata:
  Converters:
    MyNewType:
      converter: MyNewTypeConverter
      package: some_package.my_converters
---
MyElement:
  type: MyNewType
  match: something
    """
    crawler2 = Crawler()
    two_doc_definitions = crawler2._load_definition_from_yaml_dict(
        list(yaml.safe_load_all(two_doc_yaml)))
    assert "MyElement" in two_doc_definitions
    assert two_doc_definitions["MyElement"]["type"] == one_doc_definitions["MyElement"]["type"]


def test_abstract_dict_element_converter():
    definition = yaml.safe_load("""
match_name: text
match_value: .*begin(?P<text>.*)end
accept_text: True
    """)
    converter = _AbstractScalarValueElementConverter(
        definition, "test_converter",
        None  # This is possible when "subtree" is not used
    )
    element = TextElement("text", """
begin
bla
end""")
    val = converter.match(element)
    assert val is not None
    assert val["text"] == "\nbla\n"


def test_converter_value_match(converter_registry):
    # test with defaults
    dc = FloatElementConverter(
        definition={
            "match_name": "(.*)",
            "match_value": "(.*)",
        },
        name="Test",
        converter_registry=converter_registry
    )
    m = dc.match(IntegerElement(name="a", value=4))
    assert m is not None

    # overwrite default with no match for int
    dc = FloatElementConverter(
        definition={
            "match_name": "(.*)",
            "match_value": "(.*)",
            "accept_int": False,
        },
        name="Test",
        converter_registry=converter_registry
    )
    assert dc.typecheck(IntegerElement(name="a", value=4)) is False

    # overwrite default with match for float
    dc = IntegerElementConverter(
        definition={
            "match_name": "(.*)",
            "match_value": "(.*)",
            "accept_float": True,
        },
        name="Test",
        converter_registry=converter_registry
    )
    m = dc.match(FloatElement(name="a", value=4.0))
    assert m is not None


def test_match_debug(converter_registry, capsys):
    for m, mn, mv in product([".*", None], [".*", None], [".*", None]):
        defi = {"debug_match": True}
        if m:
            defi["match"] = m
        if mn:
            defi["match_name"] = mn
        if mv:
            defi["match_value"] = mv
        dc = FloatElementConverter(
            definition=defi,
            name="Test",
            converter_registry=converter_registry
        )
        if m and mn:
            with pytest.raises(RuntimeError) as err:
                mtch = dc.match(IntegerElement(name="a", value=4))
            continue
        else:
            mtch = dc.match(IntegerElement(name="a", value=4))
        if not (m is None and mn is None and mv is None):
            assert mtch is not None
            # checking debug output
            captured = capsys.readouterr()
            # the name
            assert "a" in captured.out
            # the regexp
            assert ".*" in captured.out
            # the empty result set
            assert "{}" in captured.out


def test_date_converter():
    dictconverter = DateElementConverter(
        definition={"match_value": "(?P<date>.*)"},
        name="conv",
        converter_registry=converter_registry)
    matches = dictconverter.match(TextElement("text", "2022-11-11"))
    assert "date" in matches
    assert isinstance(matches["date"], datetime.date)
    assert matches["date"].year == 2022

    dictconverter = DateElementConverter(
        definition={"match_value": r"(?P<date>(\d|-)+)",
                    "date_format": "%y-%m-%d"},
        name="conv",
        converter_registry=converter_registry)
    matches = dictconverter.match(TextElement("text", "22-11-11"))
    assert "date" in matches
    assert isinstance(matches["date"], datetime.date)
    assert matches["date"].year == 2022

    matches = dictconverter.match(TextElement("text", "alve"))
    assert matches is None