Skip to content
Snippets Groups Projects
test_read_data.py 6.96 KiB
Newer Older
Daniel Hornung's avatar
Daniel Hornung committed
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
Daniel Hornung's avatar
Daniel Hornung committed
"""Testing the conversion from XLSX to JSON"""

Daniel Hornung's avatar
Daniel Hornung committed

import json
import os
import re
import tempfile

Daniel Hornung's avatar
Daniel Hornung committed
from types import SimpleNamespace

Daniel Hornung's avatar
Daniel Hornung committed
import jsonschema.exceptions as schema_exc
import pytest
import caosadvancedtools.table_json_conversion.convert as convert
from openpyxl import load_workbook

from .utils import compare_workbooks


def rfp(*pathcomponents):
    """Return full path, a shorthand convenience function.
    """
    return os.path.join(os.path.dirname(__file__), *pathcomponents)


def fill_and_compare(json_file: str, template_file: str, known_good: str,
                     schema: str = None, custom_output: str = None):
    """Fill the data into a template and compare to a known good.

Parameters:
-----------
schema: str, optional,
  Json schema to validate against.
custom_output: str, optional
  If given, write to this file and drop into an IPython shell.  For development only.
    """
    with tempfile.TemporaryDirectory() as tmpdir:
        outfile = os.path.join(tmpdir, 'test.xlsx')
        assert not os.path.exists(outfile)
        if custom_output is not None:
            outfile = custom_output
        # fill_template(data=json_file, template=template_file, result=outfile,
        #               validation_schema=schema)
        assert os.path.exists(outfile)
        generated = load_workbook(outfile)  # workbook can be read
    known_good_wb = load_workbook(known_good)
    compare_workbooks(generated, known_good_wb)


Daniel Hornung's avatar
Daniel Hornung committed
def _assert_equal_jsons(json1, json2, allow_none: bool = True, allow_empty: bool = True,
                        path: list = None) -> None:
    """Compare two json objects for near equality.

Raise an assertion exception if they are not equal."""
    if path is None:
        path = []
    assert isinstance(json1, dict) == isinstance(json2, dict), f"Type mismatch, path: {path}"
    if isinstance(json1, dict):
        keys = set(json1.keys()).union(json2.keys())
        for key in keys:
            this_path = path + [key]
            # Case 1: both exist
            if key in json1 and key in json2:
                el1 = json1[key]
                el2 = json2[key]
                assert type(el1) is type(el2), f"Type mismatch, path: {this_path}"
                if isinstance(el1, (dict, list)):
                    _assert_equal_jsons(el1, el2, allow_none=allow_none, allow_empty=allow_empty,
                                        path=this_path)
                assert el1 == el2, f"Values at path {this_path} are not equal:\n{el1},\n{el2}"
                continue
            # Case 2: only one exists
            existing = json1.get(key, json2.get(key))
            assert (allow_none and existing is None) or (allow_empty and existing == []), (
                f"Element at path {this_path} is None or empty in one json and does not exist in "
                "the other.")

    assert isinstance(json1, list) and isinstance(json2, list), f"Type mismatch, path: {path}"
    assert len(json1) == len(json2), f"Lists must have equal length, path: {path}"
    for idx, (el1, el2) in enumerate(zip(json1, json2)):
        this_path = path + [idx]
        assert isinstance(el1, dict) and isinstance(el2, dict), (
            f"List elements must be dicts: path: {this_path}")
        _assert_equal_jsons(el1, el2, allow_none=allow_none, allow_empty=allow_empty,
                            path=this_path)


def test_conversions():
    result = convert.to_dict(xlsx=rfp("data/simple_data.xlsx"), schema=rfp("data/simple_schema.json"))
    expected = json.load(open(rfp("data/simple_data.json")))
    # result = convert.to_dict(xlsx=rfp("data/multiple_refs_data.xlsx"),
    #                          schema=rfp("data/multiple_refs_schema.json"))
    # expected = json.load(open(rfp("data/multiple_refs_data.json")))
    breakpoint()
    _assert_equal_jsons(result, expected)
    breakpoint()
Daniel Hornung's avatar
Daniel Hornung committed
    # conv = XLSXConverter(schema=rfp("data/simple_schema.json"))
    # result = conv.to_dict(rfp("data/simple_template.xlsx"))
Daniel Hornung's avatar
Daniel Hornung committed
def test_set_in_nested():
Daniel Hornung's avatar
Daniel Hornung committed
    set_in_nested = convert._set_in_nested  # pylint: disable=protected-access

    test_data_in = [
        {"mydict": {}, "path": ["a", 1], "value": 3},
        {"mydict": {"a": 1}, "path": ["a"], "value": 3, "overwrite": True},
        {"mydict": {"a": 1}, "path": ["a", 1], "value": 3, "overwrite": True},
        {"mydict": {"b": 2}, "path": ["a", 1, 3.141], "value": 3},
        {"mydict": {}, "path": ["X", "Y", "a", 1], "value": 3, "prefix": ["X", "Y"]},
    ]
    test_data_out = [
        {"a": {1: 3}},
        {"a": 3},
        {"a": {1: 3}},
        {"a": {1: {3.141: 3}}, "b": 2},
        {"a": {1: 3}},
    ]

    for data_in, data_out in zip(test_data_in, test_data_out):
        assert set_in_nested(**data_in) == data_out

    # Testing exceptions
    test_data_in = [
        {"mydict": {"a": 1}, "path": ["a"], "value": 3},
        {"mydict": {"a": 1}, "path": ["a", 1], "value": 3},
        {"mydict": {}, "path": ["a", 1], "value": 3, "prefix": ["X", "Y", "Z"]},
    ]
    exceptions = [
        [ValueError, r"There is already some value at \[a\]"],
        [ValueError, r"There is already some value at \[1\]"],
        [KeyError, r"Path does not start with prefix: \['X', 'Y', 'Z'\] not in \['a', 1\]"],
    ]

    for data_in, (exc_out, match) in zip(test_data_in, exceptions):
        with pytest.raises(exc_out, match=match):
            set_in_nested(**data_in)
Daniel Hornung's avatar
Daniel Hornung committed


def test_group_foreign_paths():
    group = convert._group_foreign_paths  # pylint: disable=protected-access

    foreign = [
        ["A", "x", 1.1],
        ["A", "y", "z", "some text"],
        ["A", "B", "CC", "x", 42],
    ]
    common = ["A", "B", "CC"]
    common_wrong = ["A", "B", "C"]
    expected = [
        SimpleNamespace(stringpath="A", path=["A"], subpath=["A"],
                        definitions=[["x", 1.1], ["y", "z", "some text"]]),
        SimpleNamespace(stringpath="A.B.CC", path=["A", "B", "CC"], subpath=["B", "CC"],
                        definitions=[["x", 42]]),
    ]

    with pytest.raises(ValueError, match=re.escape(
            "Foreign keys must cover the complete `common` depth.")):
        result = group(foreign=foreign, common=common_wrong)
    result = group(foreign=foreign, common=common)
    assert result == expected