Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_read_xlsx.py 11.61 KiB
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Testing the conversion from XLSX to JSON"""


import datetime
import json
import os
import re

from types import SimpleNamespace

import jsonschema
import pytest
from caosadvancedtools.table_json_conversion import convert

from .utils import assert_equal_jsons


def rfp(*pathcomponents):
    """Return full path, a shorthand convenience function.
    """
    return os.path.join(os.path.dirname(__file__), *pathcomponents)


def convert_and_compare(xlsx_file: str, schema_file: str, known_good_file: str,
                        known_good_data: dict = None, strict: bool = False,
                        validate: bool = True) -> dict:
    """Convert an XLSX file and compare to a known result.

Exactly one of ``known_good_file`` and ``known_good_data`` should be non-empty.

Returns
-------
json: dict
  The result of the conversion.
    """
    result = convert.to_dict(xlsx=xlsx_file, schema=schema_file, validate=validate)
    if known_good_file:
        with open(known_good_file, encoding="utf-8") as myfile:
            expected = json.load(myfile)
    else:
        expected = known_good_data
    assert_equal_jsons(result, expected, allow_none=not strict, allow_empty=not strict)
    return result


def test_conversions():
    """Test conversion from XLSX to JSON."""
    convert_and_compare(xlsx_file=rfp("data/simple_data.xlsx"),
                        schema_file=rfp("data/simple_schema.json"),
                        known_good_file=rfp("data/simple_data.json"))
    convert_and_compare(xlsx_file=rfp("data/multiple_refs_data.xlsx"),
                        schema_file=rfp("data/multiple_refs_schema.json"),
                        known_good_file=rfp("data/multiple_refs_data.json"))
    convert_and_compare(xlsx_file=rfp("data/indirect_data.xlsx"),
                        schema_file=rfp("data/indirect_schema.json"),
                        known_good_file=rfp("data/indirect_data.json"))
    convert_and_compare(xlsx_file=rfp("data/multiple_choice_data.xlsx"),
                        schema_file=rfp("data/multiple_choice_schema.json"),
                        known_good_file=rfp("data/multiple_choice_data.json"),
                        strict=True)

    with open(rfp("data/simple_data.json"), encoding="utf-8") as myfile:
        expected_datetime = json.load(myfile)
        expected_datetime["Training"][0]["date"] = datetime.datetime(2023, 1, 1, 0, 0)
    convert_and_compare(xlsx_file=rfp("data/simple_data_datetime.xlsx"),
                        schema_file=rfp("data/simple_schema.json"),
                        known_good_file="", known_good_data=expected_datetime)

    # Data loss when saving as xlsx
    with pytest.raises(AssertionError) as err:
        convert_and_compare(xlsx_file=rfp("data/simple_data_ascii_chars.xlsx"),
                            schema_file=rfp("data/simple_schema.json"),
                            known_good_file=rfp("data/simple_data_ascii_chars.json"))
    assert str(err.value).startswith("Values at path ['Training', 0, ")


def test_missing_columns():
    with pytest.raises(ValueError) as caught:
        convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
                        schema=rfp("data/simple_schema.json"), strict=True)
    assert str(caught.value) == "Missing column: Training.coach.given_name"
    with pytest.warns(UserWarning) as caught:
        convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
                        schema=rfp("data/simple_schema.json"))
    assert str(caught.pop().message) == "Missing column: Training.coach.given_name"
    with pytest.warns(UserWarning) as caught:
        convert.to_dict(xlsx=rfp("data/multiple_choice_data_missing.xlsx"),
                        schema=rfp("data/multiple_choice_schema.json"))
    messages = {str(w.message) for w in caught}
    for expected in [
            "Missing column: Training.skills.Communication",
            "Missing column: Training.exam_types.Oral",
    ]:
        assert expected in messages


def test_error_table():
    with pytest.raises(jsonschema.ValidationError) as caught:
        convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"),
                        schema=rfp("data/simple_schema.json"))
    # Correct Errors
    assert "Malformed metadata: Cannot parse paths in worksheet 'Person'." in str(caught.value)
    assert "'Not a num' is not of type 'number'" in str(caught.value)
    assert "'Yes a number?' is not of type 'number'" in str(caught.value)
    assert "1.5 is not of type 'integer'" in str(caught.value)
    assert "1.2345 is not of type 'integer'" in str(caught.value)
    assert "'There is no entry in the schema" in str(caught.value)
    assert "'Not an enum' is not one of [" in str(caught.value)
    # Correct Locations
    for line in str(caught.value).split('\n'):
        if "'Not a num' is not of type 'number'" in line:
            assert "J7" in line
        if "'Yes a number?' is not of type 'number'" in line:
            assert "J8" in line
        if "1.5 is not of type 'integer'" in line:
            assert "K7" in line
        if "1.2345 is not of type 'integer'" in line:
            assert "K8" in line
        if "'There is no entry in the schema" in line:
            assert "Column M" in line
        if "'Not an enum' is not one of [" in line:
            assert "G8" in line
    # No additional errors
    assert str(caught.value).count("Malformed metadata: Cannot parse paths in worksheet") == 1
    assert str(caught.value).count("There is no entry in the schema") == 1
    assert str(caught.value).count("is not one of") == 1
    # FIXME ToDo: Remove when boolean is fixed / when everything works as
    #             expected, set correct number.
    if "is not of type 'boolean'" in str(caught.value):
        assert str(caught.value).count("is not of type") == 6
    else:
        assert str(caught.value).count("is not of type") == 4
    # Check correct error message for completely unknown path
    with pytest.raises(jsonschema.ValidationError) as caught:
        convert.to_dict(xlsx=rfp("data/simple_data_broken_paths.xlsx"),
                        schema=rfp("data/simple_schema.json"))
    assert "Malformed metadata: Cannot parse paths" in str(caught.value)


def test_additional_column():
    with pytest.raises(jsonschema.ValidationError) as caught:
        convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"),
                        schema=rfp("data/simple_schema.json"))
    # Correct Error
    assert "no entry in the schema that corresponds to this column" in str(caught.value)
    # Correct Location
    for line in str(caught.value).split('\n'):
        if "no entry in the schema that corresponds to this column" in line:
            assert " M " in line
    # No additional column errors
    assert str(caught.value).count("no entry in the schema that corresponds to this column") == 1


def test_faulty_foreign():
    # Simple wrong foreign key
    converter = convert.XLSXConverter(xlsx=rfp("data/simple_data_wrong_foreign.xlsx"),
                                      schema=rfp("data/simple_schema.json"))
    with pytest.raises(RuntimeError):
        converter.to_dict()
    errors = converter.get_errors()
    assert errors == {('Training.coach', 6): [['date', datetime.datetime(2023, 1, 2, 0, 0)],
                                              ['url', 'www.indiscale.com']]}

    # More extensive example
    converter = convert.XLSXConverter(xlsx=rfp("data/multiple_refs_data_wrong_foreign.xlsx"),
                                      schema=rfp("data/multiple_refs_schema.json"))
    with pytest.raises(RuntimeError):
        converter.to_dict()
    errors = converter.get_errors()
    assert errors == {
        ('Training.Organisation.Person', 8): [
            ['name', 'World Training Organization 2']],
        ('Training.Organisation.Person', 9): [
            ['date', '2024-03-21T14:12:00.000Z'],
            ['url', 'www.getlinkahead.com']],
        ('Training.participant', 6): [
            ['date', '2024-03-21T14:12:00.000Z'],
            ['url', None]],
        ('Training.participant', 7): [
            ['date', '2024-03-21T14:12:00.000Z'],
            ['url', None]],
    }

    error_str = converter.get_error_str()
    assert error_str == """Sheet: Training.Organisation.Person\tRow: 9
\t\t['name']:\tWorld Training Organization 2
Sheet: Training.Organisation.Person\tRow: 10
\t\t['date']:\t2024-03-21T14:12:00.000Z
\t\t['url']:\twww.getlinkahead.com
Sheet: Training.participant\tRow: 7
\t\t['date']:\t2024-03-21T14:12:00.000Z
\t\t['url']:\tNone
Sheet: Training.participant\tRow: 8
\t\t['date']:\t2024-03-21T14:12:00.000Z
\t\t['url']:\tNone
"""


def test_set_in_nested():
    """Test the ``_set_in_nested`` function."""
    set_in_nested = convert._set_in_nested  # pylint: disable=protected-access

    test_data_in = [
        {"mydict": {}, "path": ["a", 1], "value": 3},
        {"mydict": {"a": 1}, "path": ["a"], "value": 3, "overwrite": True},
        {"mydict": {"a": 1}, "path": ["a", 1], "value": 3, "overwrite": True},
        {"mydict": {"b": 2}, "path": ["a", 1, 3.141], "value": 3},
        {"mydict": {}, "path": ["X", "Y", "a", 1], "value": 3, "prefix": ["X", "Y"]},
    ]
    test_data_out = [
        {"a": {1: 3}},
        {"a": 3},
        {"a": {1: 3}},
        {"a": {1: {3.141: 3}}, "b": 2},
        {"a": {1: 3}},
    ]

    for data_in, data_out in zip(test_data_in, test_data_out):
        assert set_in_nested(**data_in) == data_out

    # Testing exceptions
    test_data_in = [
        {"mydict": {"a": 1}, "path": ["a"], "value": 3},
        {"mydict": {"a": 1}, "path": ["a", 1], "value": 3},
        {"mydict": {}, "path": ["a", 1], "value": 3, "prefix": ["X", "Y", "Z"]},
    ]
    exceptions = [
        [ValueError, r"There is already some value at \[a\]"],
        [ValueError, r"There is already some value at \[1\]"],
        [KeyError, r"Path does not start with prefix: \['X', 'Y', 'Z'\] not in \['a', 1\]"],
    ]

    for data_in, (exc_out, match) in zip(test_data_in, exceptions):
        with pytest.raises(exc_out, match=match):
            set_in_nested(**data_in)


def test_group_foreign_paths():
    """Test the ``_group_foreign_paths`` function."""
    group = convert._group_foreign_paths  # pylint: disable=protected-access

    foreign = [
        ["A", "x", 1.1],
        ["A", "y", "z", "some text"],
        ["A", "B", "CC", "x", 42],
    ]
    common = ["A", "B", "CC"]
    common_wrong = ["A", "B", "C"]
    expected = [
        SimpleNamespace(stringpath="A", path=["A"], subpath=["A"],
                        definitions=[["x", 1.1], ["y", "z", "some text"]]),
        SimpleNamespace(stringpath="A.B.CC", path=["A", "B", "CC"], subpath=["B", "CC"],
                        definitions=[["x", 42]]),
    ]

    with pytest.raises(ValueError, match=re.escape(
            "Foreign keys must cover the complete `common` depth.")):
        result = group(foreign=foreign, common=common_wrong)
    result = group(foreign=foreign, common=common)
    assert result == expected