Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_read_xlsx.py 9.65 KiB
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Testing the conversion from XLSX to JSON"""


import datetime
import json
import os
import re

from types import SimpleNamespace

import jsonschema
import pytest
from caosadvancedtools.table_json_conversion import convert

from .utils import assert_equal_jsons


def rfp(*pathcomponents):
    """Return full path, a shorthand convenience function.
    """
    return os.path.join(os.path.dirname(__file__), *pathcomponents)


def convert_and_compare(xlsx_file: str, schema_file: str, known_good_file: str,
                        known_good_data: dict = None, strict: bool = False,
                        validate: bool = True) -> dict:
    """Convert an XLSX file and compare to a known result.

Exactly one of ``known_good_file`` and ``known_good_data`` should be non-empty.

Returns
-------
json: dict
  The result of the conversion.
    """
    result = convert.to_dict(xlsx=xlsx_file, schema=schema_file, validate=validate)
    if known_good_file:
        with open(known_good_file, encoding="utf-8") as myfile:
            expected = json.load(myfile)
    else:
        expected = known_good_data
    assert_equal_jsons(result, expected, allow_none=not strict, allow_empty=not strict)
    return result


def test_conversions():
    """Test conversion from XLSX to JSON."""
    convert_and_compare(xlsx_file=rfp("data/simple_data.xlsx"),
                        schema_file=rfp("data/simple_schema.json"),
                        known_good_file=rfp("data/simple_data.json"))
    convert_and_compare(xlsx_file=rfp("data/multiple_refs_data.xlsx"),
                        schema_file=rfp("data/multiple_refs_schema.json"),
                        known_good_file=rfp("data/multiple_refs_data.json"))
    convert_and_compare(xlsx_file=rfp("data/indirect_data.xlsx"),
                        schema_file=rfp("data/indirect_schema.json"),
                        known_good_file=rfp("data/indirect_data.json"))
    convert_and_compare(xlsx_file=rfp("data/multiple_choice_data.xlsx"),
                        schema_file=rfp("data/multiple_choice_schema.json"),
                        known_good_file=rfp("data/multiple_choice_data.json"),
                        strict=True)

    with open(rfp("data/simple_data.json"), encoding="utf-8") as myfile:
        expected_datetime = json.load(myfile)
        expected_datetime["Training"][0]["date"] = datetime.datetime(2023, 1, 1, 0, 0)
    convert_and_compare(xlsx_file=rfp("data/simple_data_datetime.xlsx"),
                        schema_file=rfp("data/simple_schema.json"),
                        known_good_file="", known_good_data=expected_datetime)

    # Data loss when saving as xlsx
    with pytest.raises(AssertionError) as err:
        convert_and_compare(xlsx_file=rfp("data/simple_data_ascii_chars.xlsx"),
                            schema_file=rfp("data/simple_schema.json"),
                            known_good_file=rfp("data/simple_data_ascii_chars.json"))
    assert str(err.value).startswith("Values at path ['Training', 0, ")


def test_missing_columns():
    with pytest.raises(ValueError) as caught:
        convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
                        schema=rfp("data/simple_schema.json"), strict=True)
    assert str(caught.value) == "Missing column: Training.coach.given_name"
    with pytest.warns(UserWarning) as caught:
        convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
                        schema=rfp("data/simple_schema.json"))
    assert str(caught.pop().message) == "Missing column: Training.coach.given_name"
    with pytest.warns(UserWarning) as caught:
        convert.to_dict(xlsx=rfp("data/multiple_choice_data_missing.xlsx"),
                        schema=rfp("data/multiple_choice_schema.json"))
    messages = {str(w.message) for w in caught}
    for expected in [
            "Missing column: Training.skills.Communication",
            "Missing column: Training.exam_types.Oral",
    ]:
        assert expected in messages


def test_wrong_datatype():
    with pytest.raises(jsonschema.ValidationError) as caught:
        convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"),
                        schema=rfp("data/simple_schema.json"))
    # Correct Errors
    assert "'Not a num' is not of type 'number'" in str(caught.value)
    assert "1.5 is not of type 'integer'" in str(caught.value)
    # Correct Locations
    for line in str(caught.value).split('\n'):
        if "'Not a num' is not of type 'number'" in line:
            assert "J7" in line
        if "1.5 is not of type 'integer'" in line:
            assert "K7" in line
    # No additional type errors
    if "is not of type 'boolean'" in str(caught.value):   # ToDo: Remove when boolean is fixed
        assert str(caught.value).count("is not of type") == 3
    else:
        assert str(caught.value).count("is not of type") == 2


def test_faulty_foreign():
    # Simple wrong foreign key
    converter = convert.XLSXConverter(xlsx=rfp("data/simple_data_wrong_foreign.xlsx"),
                                      schema=rfp("data/simple_schema.json"))
    with pytest.raises(RuntimeError):
        converter.to_dict()
    errors = converter.get_errors()
    assert errors == {('Training.coach', 6): [['date', datetime.datetime(2023, 1, 2, 0, 0)],
                                              ['url', 'www.indiscale.com']]}

    # More extensive example
    converter = convert.XLSXConverter(xlsx=rfp("data/multiple_refs_data_wrong_foreign.xlsx"),
                                      schema=rfp("data/multiple_refs_schema.json"))
    with pytest.raises(RuntimeError):
        converter.to_dict()
    errors = converter.get_errors()
    assert errors == {
        ('Training.Organisation.Person', 8): [
            ['name', 'World Training Organization 2']],
        ('Training.Organisation.Person', 9): [
            ['date', '2024-03-21T14:12:00.000Z'],
            ['url', 'www.getlinkahead.com']],
        ('Training.participant', 6): [
            ['date', '2024-03-21T14:12:00.000Z'],
            ['url', None]],
        ('Training.participant', 7): [
            ['date', '2024-03-21T14:12:00.000Z'],
            ['url', None]],
    }

    error_str = converter.get_error_str()
    assert error_str == """Sheet: Training.Organisation.Person\tRow: 9
\t\t['name']:\tWorld Training Organization 2
Sheet: Training.Organisation.Person\tRow: 10
\t\t['date']:\t2024-03-21T14:12:00.000Z
\t\t['url']:\twww.getlinkahead.com
Sheet: Training.participant\tRow: 7
\t\t['date']:\t2024-03-21T14:12:00.000Z
\t\t['url']:\tNone
Sheet: Training.participant\tRow: 8
\t\t['date']:\t2024-03-21T14:12:00.000Z
\t\t['url']:\tNone
"""


def test_set_in_nested():
    """Test the ``_set_in_nested`` function."""
    set_in_nested = convert._set_in_nested  # pylint: disable=protected-access

    test_data_in = [
        {"mydict": {}, "path": ["a", 1], "value": 3},
        {"mydict": {"a": 1}, "path": ["a"], "value": 3, "overwrite": True},
        {"mydict": {"a": 1}, "path": ["a", 1], "value": 3, "overwrite": True},
        {"mydict": {"b": 2}, "path": ["a", 1, 3.141], "value": 3},
        {"mydict": {}, "path": ["X", "Y", "a", 1], "value": 3, "prefix": ["X", "Y"]},
    ]
    test_data_out = [
        {"a": {1: 3}},
        {"a": 3},
        {"a": {1: 3}},
        {"a": {1: {3.141: 3}}, "b": 2},
        {"a": {1: 3}},
    ]

    for data_in, data_out in zip(test_data_in, test_data_out):
        assert set_in_nested(**data_in) == data_out

    # Testing exceptions
    test_data_in = [
        {"mydict": {"a": 1}, "path": ["a"], "value": 3},
        {"mydict": {"a": 1}, "path": ["a", 1], "value": 3},
        {"mydict": {}, "path": ["a", 1], "value": 3, "prefix": ["X", "Y", "Z"]},
    ]
    exceptions = [
        [ValueError, r"There is already some value at \[a\]"],
        [ValueError, r"There is already some value at \[1\]"],
        [KeyError, r"Path does not start with prefix: \['X', 'Y', 'Z'\] not in \['a', 1\]"],
    ]

    for data_in, (exc_out, match) in zip(test_data_in, exceptions):
        with pytest.raises(exc_out, match=match):
            set_in_nested(**data_in)


def test_group_foreign_paths():
    """Test the ``_group_foreign_paths`` function."""
    group = convert._group_foreign_paths  # pylint: disable=protected-access

    foreign = [
        ["A", "x", 1.1],
        ["A", "y", "z", "some text"],
        ["A", "B", "CC", "x", 42],
    ]
    common = ["A", "B", "CC"]
    common_wrong = ["A", "B", "C"]
    expected = [
        SimpleNamespace(stringpath="A", path=["A"], subpath=["A"],
                        definitions=[["x", 1.1], ["y", "z", "some text"]]),
        SimpleNamespace(stringpath="A.B.CC", path=["A", "B", "CC"], subpath=["B", "CC"],
                        definitions=[["x", 42]]),
    ]

    with pytest.raises(ValueError, match=re.escape(
            "Foreign keys must cover the complete `common` depth.")):
        result = group(foreign=foreign, common=common_wrong)
    result = group(foreign=foreign, common=common)
    assert result == expected