# encoding: utf-8 # # This file is a part of the LinkAhead Project. # # Copyright (C) 2024 Indiscale GmbH <info@indiscale.com> # Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. """Testing the conversion from XLSX to JSON""" import datetime import json import os import re from types import SimpleNamespace import jsonschema import pytest from caosadvancedtools.table_json_conversion import convert from .utils import assert_equal_jsons def rfp(*pathcomponents): """Return full path, a shorthand convenience function. """ return os.path.join(os.path.dirname(__file__), *pathcomponents) def convert_and_compare(xlsx_file: str, schema_file: str, known_good_file: str, known_good_data: dict = None, strict: bool = False, validate: bool = True) -> dict: """Convert an XLSX file and compare to a known result. Exactly one of ``known_good_file`` and ``known_good_data`` should be non-empty. Returns ------- json: dict The result of the conversion. """ result = convert.to_dict(xlsx=xlsx_file, schema=schema_file, validate=validate) if known_good_file: with open(known_good_file, encoding="utf-8") as myfile: expected = json.load(myfile) else: expected = known_good_data assert_equal_jsons(result, expected, allow_none=not strict, allow_empty=not strict) return result def test_conversions(): """Test conversion from XLSX to JSON.""" convert_and_compare(xlsx_file=rfp("data/simple_data.xlsx"), schema_file=rfp("data/simple_schema.json"), known_good_file=rfp("data/simple_data.json")) convert_and_compare(xlsx_file=rfp("data/multiple_refs_data.xlsx"), schema_file=rfp("data/multiple_refs_schema.json"), known_good_file=rfp("data/multiple_refs_data.json")) convert_and_compare(xlsx_file=rfp("data/indirect_data.xlsx"), schema_file=rfp("data/indirect_schema.json"), known_good_file=rfp("data/indirect_data.json")) convert_and_compare(xlsx_file=rfp("data/multiple_choice_data.xlsx"), schema_file=rfp("data/multiple_choice_schema.json"), known_good_file=rfp("data/multiple_choice_data.json"), strict=True) with open(rfp("data/simple_data.json"), encoding="utf-8") as myfile: expected_datetime = json.load(myfile) expected_datetime["Training"][0]["date"] = datetime.datetime(2023, 1, 1, 0, 0) convert_and_compare(xlsx_file=rfp("data/simple_data_datetime.xlsx"), schema_file=rfp("data/simple_schema.json"), known_good_file="", known_good_data=expected_datetime) # Data loss when saving as xlsx with pytest.raises(AssertionError) as err: convert_and_compare(xlsx_file=rfp("data/simple_data_ascii_chars.xlsx"), schema_file=rfp("data/simple_schema.json"), known_good_file=rfp("data/simple_data_ascii_chars.json")) assert str(err.value).startswith("Values at path ['Training', 0, ") def test_missing_columns(): with pytest.raises(ValueError) as caught: convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"), schema=rfp("data/simple_schema.json"), strict=True) assert str(caught.value) == "Missing column: Training.coach.given_name" with pytest.warns(UserWarning) as caught: convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"), schema=rfp("data/simple_schema.json")) assert str(caught.pop().message) == "Missing column: Training.coach.given_name" with pytest.warns(UserWarning) as caught: convert.to_dict(xlsx=rfp("data/multiple_choice_data_missing.xlsx"), schema=rfp("data/multiple_choice_schema.json")) messages = {str(w.message) for w in caught} for expected in [ "Missing column: Training.skills.Communication", "Missing column: Training.exam_types.Oral", ]: assert expected in messages def test_wrong_datatype(): with pytest.raises(jsonschema.ValidationError) as caught: convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"), schema=rfp("data/simple_schema.json")) # Correct Errors assert "'Not a num' is not of type 'number'" in str(caught.value) assert "1.5 is not of type 'integer'" in str(caught.value) # Correct Locations for line in str(caught.value).split('\n'): if "'Not a num' is not of type 'number'" in line: assert "J7" in line if "1.5 is not of type 'integer'" in line: assert "K7" in line # No additional type errors if "is not of type 'boolean'" in str(caught.value): # ToDo: Remove when boolean is fixed assert str(caught.value).count("is not of type") == 3 else: assert str(caught.value).count("is not of type") == 2 def test_additional_column(): with pytest.raises(jsonschema.ValidationError) as caught: convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"), schema=rfp("data/simple_schema.json")) # Correct Error assert "no entry in the schema that corresponds to this column" in str(caught.value) # Correct Location for line in str(caught.value).split('\n'): if "no entry in the schema that corresponds to this column" in line: assert " M " in line # No additional column errors assert str(caught.value).count("no entry in the schema that corresponds to this column") == 1 def test_faulty_foreign(): # Simple wrong foreign key converter = convert.XLSXConverter(xlsx=rfp("data/simple_data_wrong_foreign.xlsx"), schema=rfp("data/simple_schema.json")) with pytest.raises(RuntimeError): converter.to_dict() errors = converter.get_errors() assert errors == {('Training.coach', 6): [['date', datetime.datetime(2023, 1, 2, 0, 0)], ['url', 'www.indiscale.com']]} # More extensive example converter = convert.XLSXConverter(xlsx=rfp("data/multiple_refs_data_wrong_foreign.xlsx"), schema=rfp("data/multiple_refs_schema.json")) with pytest.raises(RuntimeError): converter.to_dict() errors = converter.get_errors() assert errors == { ('Training.Organisation.Person', 8): [ ['name', 'World Training Organization 2']], ('Training.Organisation.Person', 9): [ ['date', '2024-03-21T14:12:00.000Z'], ['url', 'www.getlinkahead.com']], ('Training.participant', 6): [ ['date', '2024-03-21T14:12:00.000Z'], ['url', None]], ('Training.participant', 7): [ ['date', '2024-03-21T14:12:00.000Z'], ['url', None]], } error_str = converter.get_error_str() assert error_str == """Sheet: Training.Organisation.Person\tRow: 9 \t\t['name']:\tWorld Training Organization 2 Sheet: Training.Organisation.Person\tRow: 10 \t\t['date']:\t2024-03-21T14:12:00.000Z \t\t['url']:\twww.getlinkahead.com Sheet: Training.participant\tRow: 7 \t\t['date']:\t2024-03-21T14:12:00.000Z \t\t['url']:\tNone Sheet: Training.participant\tRow: 8 \t\t['date']:\t2024-03-21T14:12:00.000Z \t\t['url']:\tNone """ def test_set_in_nested(): """Test the ``_set_in_nested`` function.""" set_in_nested = convert._set_in_nested # pylint: disable=protected-access test_data_in = [ {"mydict": {}, "path": ["a", 1], "value": 3}, {"mydict": {"a": 1}, "path": ["a"], "value": 3, "overwrite": True}, {"mydict": {"a": 1}, "path": ["a", 1], "value": 3, "overwrite": True}, {"mydict": {"b": 2}, "path": ["a", 1, 3.141], "value": 3}, {"mydict": {}, "path": ["X", "Y", "a", 1], "value": 3, "prefix": ["X", "Y"]}, ] test_data_out = [ {"a": {1: 3}}, {"a": 3}, {"a": {1: 3}}, {"a": {1: {3.141: 3}}, "b": 2}, {"a": {1: 3}}, ] for data_in, data_out in zip(test_data_in, test_data_out): assert set_in_nested(**data_in) == data_out # Testing exceptions test_data_in = [ {"mydict": {"a": 1}, "path": ["a"], "value": 3}, {"mydict": {"a": 1}, "path": ["a", 1], "value": 3}, {"mydict": {}, "path": ["a", 1], "value": 3, "prefix": ["X", "Y", "Z"]}, ] exceptions = [ [ValueError, r"There is already some value at \[a\]"], [ValueError, r"There is already some value at \[1\]"], [KeyError, r"Path does not start with prefix: \['X', 'Y', 'Z'\] not in \['a', 1\]"], ] for data_in, (exc_out, match) in zip(test_data_in, exceptions): with pytest.raises(exc_out, match=match): set_in_nested(**data_in) def test_group_foreign_paths(): """Test the ``_group_foreign_paths`` function.""" group = convert._group_foreign_paths # pylint: disable=protected-access foreign = [ ["A", "x", 1.1], ["A", "y", "z", "some text"], ["A", "B", "CC", "x", 42], ] common = ["A", "B", "CC"] common_wrong = ["A", "B", "C"] expected = [ SimpleNamespace(stringpath="A", path=["A"], subpath=["A"], definitions=[["x", 1.1], ["y", "z", "some text"]]), SimpleNamespace(stringpath="A.B.CC", path=["A", "B", "CC"], subpath=["B", "CC"], definitions=[["x", 42]]), ] with pytest.raises(ValueError, match=re.escape( "Foreign keys must cover the complete `common` depth.")): result = group(foreign=foreign, common=common_wrong) result = group(foreign=foreign, common=common) assert result == expected