Skip to content
Snippets Groups Projects
Select Git revision
  • bec6c03c2d4b82f956edf2c5c347463190322114
  • main default protected
  • f-remove-deprecation-warning
  • dev
  • f-docs-pylib
  • f-parse-value
  • f-compare
  • f-string-ids
  • f-217-set-special-property
  • f-filesystem-import
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-check-merge-entities
  • f-compare-enid
  • f-select-subproperties
  • v0.18.0
  • v0.17.0
  • v0.16.0
  • v0.15.1
  • v0.15.0
  • v0.14.0
  • v0.13.2
  • v0.13.1
  • v0.13.0
  • linkahead-rename-step-2
  • linkahead-rename-step-1
  • v0.12.0
  • v0.11.2
  • v0.11.1
  • v0.11.0
  • v0.10.0
  • v0.9.0
  • v0.8.0
  • v0.7.4
  • v0.7.3
37 results

setup.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_read_xlsx.py 13.17 KiB
    # encoding: utf-8
    #
    # This file is a part of the LinkAhead Project.
    #
    # Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
    # Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    """Testing the conversion from XLSX to JSON"""
    
    
    import datetime
    import json
    import os
    import re
    
    from types import SimpleNamespace
    from typing import Optional
    
    import jsonschema
    import pytest
    from caosadvancedtools.table_json_conversion import convert
    
    from .utils import assert_equal_jsons
    
    
    def rfp(*pathcomponents):
        """Return full path, a shorthand convenience function.
        """
        return os.path.join(os.path.dirname(__file__), *pathcomponents)
    
    
    def convert_and_compare(xlsx_file: str, schema_file: str, known_good_file: str,
                            known_good_data: Optional[dict] = None, strict: bool = False,
                            validate: bool = False) -> dict:
        """Convert an XLSX file and compare to a known result.
    
    Exactly one of ``known_good_file`` and ``known_good_data`` should be non-empty.
    
    Returns
    -------
    json: dict
      The result of the conversion.
        """
        # FIXME Set default "validate" back to True, after implementation of
        # https://gitlab.indiscale.com/caosdb/src/caosdb-advanced-user-tools/-/issues/138
        result = convert.to_dict(xlsx=xlsx_file, schema=schema_file, validate=validate)
        if known_good_file:
            with open(known_good_file, encoding="utf-8") as myfile:
                expected = json.load(myfile)
        else:
            expected = known_good_data
        assert_equal_jsons(result, expected, allow_none=not strict, allow_empty=not strict)
        return result
    
    
    def test_conversions():
        """Test conversion from XLSX to JSON."""
        convert_and_compare(xlsx_file=rfp("data/simple_data.xlsx"),
                            schema_file=rfp("data/simple_schema.json"),
                            known_good_file=rfp("data/simple_data.json"))
        convert_and_compare(xlsx_file=rfp("data/multiple_refs_data.xlsx"),
                            schema_file=rfp("data/multiple_refs_schema.json"),
                            known_good_file=rfp("data/multiple_refs_data.json"))
        convert_and_compare(xlsx_file=rfp("data/indirect_data.xlsx"),
                            schema_file=rfp("data/indirect_schema.json"),
                            known_good_file=rfp("data/indirect_data.json"))
        convert_and_compare(xlsx_file=rfp("data/multiple_choice_data.xlsx"),
                            schema_file=rfp("data/multiple_choice_schema.json"),
                            known_good_file=rfp("data/multiple_choice_data.json"),
                            strict=True)
        convert_and_compare(xlsx_file=rfp("data/simple_data_booleans.xlsx"),
                            schema_file=rfp("data/simple_schema.json"),
                            known_good_file=rfp("data/simple_data_booleans.json"))
    
        with open(rfp("data/simple_data.json"), encoding="utf-8") as myfile:
            expected_datetime = json.load(myfile)
            expected_datetime["Training"][0]["date"] = datetime.datetime(2023, 1, 1, 0, 0)
        convert_and_compare(xlsx_file=rfp("data/simple_data_datetime.xlsx"),
                            schema_file=rfp("data/simple_schema.json"),
                            known_good_file="", known_good_data=expected_datetime)
    
        # Data loss when saving as xlsx
        with pytest.raises(AssertionError) as err:
            convert_and_compare(xlsx_file=rfp("data/simple_data_ascii_chars.xlsx"),
                                schema_file=rfp("data/simple_schema.json"),
                                known_good_file=rfp("data/simple_data_ascii_chars.json"))
        assert str(err.value).startswith("Values at path ['Training', 0, ")
    
    
    def test_missing_columns():
        with pytest.raises(ValueError) as caught:
            convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
                            schema=rfp("data/simple_schema.json"), strict=True)
        assert str(caught.value) == "Missing column: Training.coach.given_name"
        with pytest.warns(UserWarning) as caught:
            convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
                            schema=rfp("data/simple_schema.json"))
        messages = {str(w.message) for w in caught}
        assert "Missing column: Training.coach.given_name" in messages
        with pytest.warns(UserWarning) as caught:
            convert.to_dict(xlsx=rfp("data/multiple_choice_data_missing.xlsx"),
                            schema=rfp("data/multiple_choice_schema.json"))
        messages = {str(w.message) for w in caught}
        for expected in [
                "Missing column: Training.skills.Communication",
                "Missing column: Training.exam_types.Oral",
        ]:
            assert expected in messages
    
    
    def test_error_table():
        with pytest.raises(jsonschema.ValidationError) as caught:
            convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"),
                            schema=rfp("data/simple_schema.json"))
        # Correct Errors
        assert "'Not a num' is not of type 'number'" in str(caught.value)
        assert "'Yes a number?' is not of type 'number'" in str(caught.value)
        assert "1.5 is not of type 'integer'" in str(caught.value)
        assert "1.2345 is not of type 'integer'" in str(caught.value)
        assert "'Not an enum' is not one of [" in str(caught.value)
        # Correct Locations
        matches = set()
        for line in str(caught.value).split('\n'):
            if "'Not a num' is not of type 'number'" in line:
                assert "J7" in line
                matches.add("J7")
            if "'Yes a number?' is not of type 'number'" in line:
                assert "J8" in line
                matches.add("J8")
            if "1.5 is not of type 'integer'" in line:
                assert "K7" in line
                matches.add("K7")
            if "1.2345 is not of type 'integer'" in line:
                assert "K8" in line
                matches.add("K8")
            if "'Not an enum' is not one of [" in line:
                assert "G8" in line
                matches.add("K8")
            # The next two tests could potentially be removed in the future, once we evaluate formulas.
            if "'=NOT(FALSE())' is not of type 'boolean'" in line:
                assert "L9" in line
                matches.add("L9")
            if "'=NOT(TRUE())' is not of type 'boolean'" in line:
                assert "L10" in line
                matches.add("L10")
        assert matches == {"J7", "J8", "K7", "K8", "K8", "L9", "L10"}
    
        # No additional errors
        assert str(caught.value).count("is not one of") == 1
        assert str(caught.value).count("is not of type") == 6
    
    
    def test_malformed_paths():
        with pytest.raises(jsonschema.ValidationError) as caught:
            convert.to_dict(xlsx=rfp("data/simple_data_broken_paths_2.xlsx"),
                            schema=rfp("data/simple_schema.json"))
        message_lines = str(caught.value).lower().split('\n')
        expected_errors = {
            'person': {'c': "column type is missing",
                       'd': "parsing of the path",
                       'e': "path may be incomplete"},
            'training': {'c': "path is missing",
                         'd': "column type is missing",
                         'e': "path may be incomplete",
                         'f': "parsing of the path",
                         'g': "path may be incomplete",
                         'h': "parsing of the path",
                         'i': "parsing of the path"},
            'training.coach': {'f': "no column metadata set"}}
        current_sheet = None
        for line in message_lines:
            if 'in sheet' in line:
                current_sheet = line.replace('in sheet ', '').replace(':', '')
                continue
            if 'in column' in line:
                for column, expected_error in expected_errors[current_sheet].items():
                    if f'in column {column}' in line:
                        assert expected_error in line
                        expected_errors[current_sheet].pop(column)
                        break
        for _, errors_left in expected_errors.items():
            assert len(errors_left) == 0
    
    
    def test_empty_columns():
        with pytest.warns(UserWarning) as caught:
            try:
                convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"),
                                schema=rfp("data/simple_schema.json"))
            except jsonschema.ValidationError:
                pass                  # Errors are checked in test_error_table
        messages = {str(w.message).lower() for w in caught}
        expected_warnings = {"column h": "no column metadata"}
        for message in messages:
            for column, warning in list(expected_warnings.items()):
                if column in message:
                    assert warning in message
                    expected_warnings.pop(column)
                else:
                    assert warning not in message
        assert len(expected_warnings) == 0
    
    
    def test_faulty_foreign():
        # Simple wrong foreign key
        converter = convert.XLSXConverter(xlsx=rfp("data/simple_data_wrong_foreign.xlsx"),
                                          schema=rfp("data/simple_schema.json"))
        with pytest.raises(RuntimeError):
            converter.to_dict()
        errors = converter.get_errors()
        assert errors == {('Training.coach', 6): [['date', datetime.datetime(2023, 1, 2, 0, 0)],
                                                  ['url', 'www.indiscale.com']]}
    
        # More extensive example
        converter = convert.XLSXConverter(xlsx=rfp("data/multiple_refs_data_wrong_foreign.xlsx"),
                                          schema=rfp("data/multiple_refs_schema.json"))
        with pytest.raises(RuntimeError):
            converter.to_dict()
        errors = converter.get_errors()
        assert errors == {
            ('Training.Organisation.Person', 8): [
                ['name', 'World Training Organization 2']],
            ('Training.Organisation.Person', 9): [
                ['date', '2024-03-21T14:12:00.000Z'],
                ['url', 'www.getlinkahead.com']],
            ('Training.participant', 6): [
                ['date', '2024-03-21T14:12:00.000Z'],
                ['url', None]],
            ('Training.participant', 7): [
                ['date', '2024-03-21T14:12:00.000Z'],
                ['url', None]],
        }
    
        error_str = converter.get_error_str()
        assert error_str == """Sheet: Training.Organisation.Person\tRow: 9
    \t\t['name']:\tWorld Training Organization 2
    Sheet: Training.Organisation.Person\tRow: 10
    \t\t['date']:\t2024-03-21T14:12:00.000Z
    \t\t['url']:\twww.getlinkahead.com
    Sheet: Training.participant\tRow: 7
    \t\t['date']:\t2024-03-21T14:12:00.000Z
    \t\t['url']:\tNone
    Sheet: Training.participant\tRow: 8
    \t\t['date']:\t2024-03-21T14:12:00.000Z
    \t\t['url']:\tNone
    """
    
    
    def test_set_in_nested():
        """Test the ``_set_in_nested`` function."""
        set_in_nested = convert._set_in_nested  # pylint: disable=protected-access
    
        test_data_in = [
            {"mydict": {}, "path": ["a", 1], "value": 3},
            {"mydict": {"a": 1}, "path": ["a"], "value": 3, "overwrite": True},
            {"mydict": {"a": 1}, "path": ["a", 1], "value": 3, "overwrite": True},
            {"mydict": {"b": 2}, "path": ["a", 1, 3.141], "value": 3},
            {"mydict": {}, "path": ["X", "Y", "a", 1], "value": 3, "prefix": ["X", "Y"]},
        ]
        test_data_out = [
            {"a": {1: 3}},
            {"a": 3},
            {"a": {1: 3}},
            {"a": {1: {3.141: 3}}, "b": 2},
            {"a": {1: 3}},
        ]
    
        for data_in, data_out in zip(test_data_in, test_data_out):
            assert set_in_nested(**data_in) == data_out
    
        # Testing exceptions
        test_data_in = [
            {"mydict": {"a": 1}, "path": ["a"], "value": 3},
            {"mydict": {"a": 1}, "path": ["a", 1], "value": 3},
            {"mydict": {}, "path": ["a", 1], "value": 3, "prefix": ["X", "Y", "Z"]},
        ]
        exceptions = [
            [ValueError, r"There is already some value at \[a\]"],
            [ValueError, r"There is already some value at \[1\]"],
            [KeyError, r"Path does not start with prefix: \['X', 'Y', 'Z'\] not in \['a', 1\]"],
        ]
    
        for data_in, (exc_out, match) in zip(test_data_in, exceptions):
            with pytest.raises(exc_out, match=match):
                set_in_nested(**data_in)
    
    
    def test_group_foreign_paths():
        """Test the ``_group_foreign_paths`` function."""
        group = convert._group_foreign_paths  # pylint: disable=protected-access
    
        foreign = [
            ["A", "x", 1.1],
            ["A", "y", "z", "some text"],
            ["A", "B", "CC", "x", 42],
        ]
        common = ["A", "B", "CC"]
        common_wrong = ["A", "B", "C"]
        expected = [
            SimpleNamespace(stringpath="A", path=["A"], subpath=["A"],
                            definitions=[["x", 1.1], ["y", "z", "some text"]]),
            SimpleNamespace(stringpath="A.B.CC", path=["A", "B", "CC"], subpath=["B", "CC"],
                            definitions=[["x", 42]]),
        ]
    
        with pytest.raises(ValueError, match=re.escape(
                "Foreign keys must cover the complete `common` depth.")):
            result = group(foreign=foreign, common=common_wrong)
        result = group(foreign=foreign, common=common)
        assert result == expected