Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
convert.py 25.06 KiB
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Convert XLSX files to JSON dictionaries."""

from __future__ import annotations

import datetime
import itertools
import sys
from functools import reduce
from operator import getitem
from types import SimpleNamespace
from typing import Any, BinaryIO, Callable, TextIO, Union, Optional
from warnings import warn

import jsonschema
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet

from caosadvancedtools.table_json_conversion import xlsx_utils
from caosadvancedtools.table_json_conversion.fill_xlsx import read_or_dict


def _strict_bool(value: Any) -> bool:
    """Convert value to bool, but only if it really is a valid XLSX bool."""
    if isinstance(value, bool):
        return value
    raise TypeError(f"Not a good boolean: {repr(value)}")


def format_exception_table(exceptions: list(tuple), worksheet_title: str,
                           column_names: Optional[dict, list] = None,
                           max_line_length: Optional[int] = 120) -> str:
    """
    Given a list of tuples containing a row and column number as well as an
    exception in that order, and the title of the current worksheet, returns
    a formatted table of the exceptions.

    Optionally takes a dict of column names, if given a header will be
    generated for each column and exceptions will be clustered by column.

    Default line length is 120 and can be overwritten by max_line_length.

    Params
    ------
    exceptions:         list of tuples containing row, column, and exception
                        Data to be formatted
    worksheet_title:    str
                        Name of the current worksheet
    column_names:       dict or list, optional
                        column_names[column_num] should return the name of
                        column column_names.
                        If given, exceptions will be clustered by column.
    max_line_length:    int
                        Soft cap for the line length of the resulting table

    Return
    ------
    string_rep:         str
                        Table containing the given exceptions
    """
    def to_char(num):
        if num < 0:
            return ""
        return to_char(int(num / 26) - 1) + chr(int(num % 26) + 65)
    max_line_length -= 40             # Estimate of Field + Type space use

    headers = {"loc": "Location", "type": "Error Type", "mess": ["Message"]}
    lengths = {key: len(headers[key]) for key in headers}
    new_data = []

    current_column = None
    exceptions.sort(key=lambda tup: tup[1])
    for row_i, col_i, excep in exceptions:
        if column_names is not None:
            # Update Names
            if current_column != col_i:
                current_column = col_i
                new_data.append({
                    "loc": f"\nErrors in column '{column_names[col_i]}':",
                    "type": "", "mess": [""]
                })
        # Setup
        row = {}
        new_data.append(row)
        # Field
        if isinstance(row_i, int):
            row["loc"] = f"Cell {to_char(col_i)}{row_i + 1}"
        else:
            row["loc"] = f"Column {to_char(col_i)}"
        lengths["loc"] = max(lengths["loc"], len(row["loc"]))
        # Code
        row["type"] = type(excep).__name__
        lengths["type"] = max(lengths["type"], len(row["type"]))
        # Message
        lines = str(excep).split('\n')
        new_lines = []
        for line in lines:
            if len(line) > max_line_length:
                words = line.split(' ')
                current = ""
                for word, next_word in zip(words, words[1:] + [""]):
                    if current != "":
                        current += " "
                    current += word
                    if len(current + next_word) > max_line_length:
                        lengths["mess"] = max(lengths["mess"], len(current))
                        new_lines.append(current)
                        current = ""
                if current != "":
                    lengths["mess"] = max(lengths["mess"], len(current))
                    new_lines.append(current)
            elif len(line) > 0:
                lengths["mess"] = max(lengths["mess"], len(line))
                new_lines.append(line)
        if new_lines == []:
            new_lines = [""]
        row["mess"] = new_lines

    dividers = {key: '' * l for key, l in lengths.items()}
    dividers["mess"] = [dividers["mess"]]

    # Fill for the messages is set to 0, if we want another column or align
    # right we need to use lengths["mess"]
    string_rep = f"There were errors during the validation of worksheet '{worksheet_title}':\n\n"
    for row in [headers, dividers] + new_data:
        string_rep += ' {loc: <{fill}}  '.format(loc=row["loc"],
                                                 fill=lengths["loc"])
        string_rep += ' {typ: <{fill}}  '.format(typ=row["type"],
                                                 fill=lengths["type"])
        string_rep += ' {mes: <{fill}}\n'.format(mes=row["mess"][0], fill=0)
        for line in row["mess"][1:]:
            # Front padding
            string_rep += ' ' * (lengths["loc"] + lengths["type"] + 7)
            string_rep += ' {mes: <{fill}}\n'.format(mes=line, fill=0)
    return string_rep


class ForeignError(KeyError):
    def __init__(self, *args, definitions: list, message: str = ""):
        super().__init__(message, *args)
        self.definitions = definitions


class XLSXConverter:
    """Class for conversion from XLSX to JSON.

    For a detailed description of the required formatting of the XLSX files, see ``specs.md`` in the
    documentation.
    """
    PARSER: dict[str, Callable] = {
        "string": str,
        "number": float,
        "integer": int,
        "boolean": _strict_bool,
    }

    def __init__(self, xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO],
                 strict: bool = False):
        """
        Parameters
        ----------
        xlsx: Union[str, BinaryIO]
          Path to the XLSX file or opened file object.

        schema: Union[dict, str, TextIO]
          Schema for validation of XLSX content.

        strict: bool, optional
          If True, fail faster.
        """
        self._workbook = load_workbook(xlsx)
        self._schema = read_or_dict(schema)
        self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook)
        self._check_columns(fail_fast=strict)
        self._handled_sheets: set[str] = set()
        self._result: dict = {}
        self._errors: dict = {}

    def to_dict(self, validate: bool = False, collect_errors: bool = True) -> dict:
        """Convert the xlsx contents to a dict.

        Parameters
        ----------
        validate: bool, optional
          If True, validate the result against the schema.

        collect_errors: bool, optional
          If True, do not fail at the first error, but try to collect as many errors as possible.  After an
          Exception is raised, the errors can be collected with ``get_errors()`` and printed with
          ``get_error_str()``.

        Returns
        -------
        out: dict
          A dict representing the JSON with the extracted data.
        """
        self._handled_sheets = set()
        self._result = {}
        self._errors = {}
        for sheetname in self._workbook.sheetnames:
            if sheetname not in self._handled_sheets:
                self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors)
        if validate:
            jsonschema.validate(self._result, self._schema)
        if self._errors:
            raise RuntimeError("There were error while handling the XLSX file.")
        return self._result

    def get_errors(self) -> dict:
        """Return a dict with collected errors."""
        return self._errors

    def get_error_str(self) -> str:
        """Return a beautiful string with the collected errors."""
        result = ""
        for loc, value in self._errors.items():
            result += f"Sheet: {loc[0]}\tRow: {loc[1] + 1}\n"
            for item in value:
                result += f"\t\t{item[:-1]}:\t{item[-1]}\n"
        return result

    def _check_columns(self, fail_fast: bool = False):
        """Check if the columns correspond to the schema."""
        def missing(path):
            message = f"Missing column: {xlsx_utils.p2s(path)}"
            if fail_fast:
                raise ValueError(message)
            else:
                warn(message)
        for sheetname in self._workbook.sheetnames:
            sheet = self._workbook[sheetname]
            parents: dict = {}
            col_paths = []
            for col in xlsx_utils.get_data_columns(sheet).values():
                parents[xlsx_utils.p2s(col.path[:-1])] = col.path[:-1]
                col_paths.append(col.path)
            for path in parents.values():
                subschema = xlsx_utils.get_subschema(path, self._schema)

                # Unfortunately, there are a lot of special cases to handle here.
                if subschema.get("type") == "array":
                    subschema = subschema["items"]
                if "enum" in subschema:  # Was handled in parent level already
                    continue
                for child, content in subschema["properties"].items():
                    child_path = path + [child]
                    if content == {'type': 'string', 'format': 'data-url'}:
                        continue  # skip files
                    if content.get("type") == "array" and (
                            content.get("items").get("type") == "object"):
                        if child_path not in itertools.chain(*self._defining_path_index.values()):
                            missing(child_path)
                    elif content.get("type") == "array" and "enum" in content.get("items", []) and (
                            content.get("uniqueItems") is True):
                        # multiple choice
                        for choice in content["items"]["enum"]:
                            if child_path + [choice] not in col_paths:
                                missing(child_path + [choice])
                    elif content.get("type") == "object":
                        pass
                    else:
                        if child_path not in col_paths:
                            missing(child_path)

    def _handle_sheet(self, sheet: Worksheet, fail_later: bool = False) -> None:
        """Add the contents of the sheet to the result (stored in ``self._result``).

        Each row in the sheet corresponds to one entry in an array in the result.  Which array exactly is
        defined by the sheet's "proper name" and the content of the foreign columns.

        Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name".


        Parameters
        ----------
        fail_later: bool, optional
          If True, do not fail with unresolvable foreign definitions, but collect all errors.
        """
        row_type_column = xlsx_utils.get_row_type_column_index(sheet)
        foreign_columns = xlsx_utils.get_foreign_key_columns(sheet)
        foreign_column_paths = {col.index: col.path for col in foreign_columns.values()}
        data_columns = xlsx_utils.get_data_columns(sheet)
        data_column_paths = {col.index: col.path for col in data_columns.values()}
        # Parent path, insert in correct order.
        parent, proper_name = xlsx_utils.get_path_position(sheet)
        if parent:
            parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index)
            if parent_sheetname not in self._handled_sheets:
                self._handle_sheet(self._workbook[parent_sheetname], fail_later=fail_later)

        # # We save single entries in lists, indexed by their foreign key contents.  Each entry
        # # consists of:
        # # - foreign: Dict with path -> value for the foreign columns
        # # - data: The actual data of this entry, a dict.
        # entries: dict[str, list[SimpleNamespace]] = {}

        exceptions = []
        col_names = {}
        for row_idx, row in enumerate(sheet.iter_rows(values_only=True)):
            # Skip non-data rows
            if row[row_type_column] is not None:
                continue
            foreign_repr = ""
            foreign = []  # A list of lists, each of which is: [path1, path2, ..., leaf, value]
            data: dict = {}     # Local data dict
            # Collect data (in dict relative to current level) and foreign data information
            for col_idx, value in enumerate(row):
                if col_idx in foreign_column_paths:
                    foreign_repr += str(value)
                    foreign.append(foreign_column_paths[col_idx] + [value])
                    continue

                try:
                    if col_idx in data_column_paths:
                        path = data_column_paths[col_idx]
                        col_names[col_idx] = '.'.join(path)
                        if self._is_multiple_choice(path):
                            real_value = path.pop()  # Last component is the enum value, insert above
                            # set up list
                            try:
                                _set_in_nested(mydict=data, path=path, value=[], prefix=parent, skip=1)
                            except ValueError as err:
                                if not str(err).startswith("There is already some value at"):
                                    raise
                            if not xlsx_utils.parse_multiple_choice(value):
                                continue
                            _set_in_nested(mydict=data, path=path, value=real_value, prefix=parent,
                                           skip=1, append_to_list=True)
                        else:
                            value = self._validate_and_convert(value, path)
                            _set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1)
                        continue
                except (ValueError, KeyError, jsonschema.ValidationError) as e:
                    # Append error for entire column only once
                    if isinstance(e, KeyError) and 'column' in str(e):
                        if len([err for ri, ci, err in exceptions
                                if ci == col_idx and isinstance(err, KeyError)]) == 0:
                            exceptions.append((None, col_idx, e))
                    else:
                        exceptions.append((row_idx, col_idx, e))

            try:
                # Find current position in tree
                parent_dict = self._get_parent_dict(parent_path=parent, foreign=foreign)

                # Append data to current position's list
                if proper_name not in parent_dict:
                    parent_dict[proper_name] = []
                parent_dict[proper_name].append(data)
            except ForeignError as kerr:
                if not fail_later:
                    raise
                self._errors[(sheet.title, row_idx)] = kerr.definitions

        if exceptions != []:
            exception_table = format_exception_table(exceptions, sheet.title,
                                                     col_names)
            raise jsonschema.ValidationError(exception_table)

        self._handled_sheets.add(sheet.title)

    def _is_multiple_choice(self, path: list[str]) -> bool:
        """Test if the path belongs to a multiple choice section."""
        if not path:
            return False
        subschema = self._get_subschema(path[:-1])
        if (subschema["type"] == "array"
                and subschema.get("uniqueItems") is True
                and "enum" in subschema["items"]):
            return True
        return False

    def _get_parent_dict(self, parent_path: list[str], foreign: list[list]) -> dict:
        """Return the dict into which values can be inserted.

        This method returns, from the current result-in-making, the entry at ``parent_path`` which matches
        the values given in the ``foreign`` specification.
        """
        foreign_groups = _group_foreign_paths(foreign, common=parent_path)

        current_object = self._result
        for group in foreign_groups:
            # Find list for which foreign definitions are relevant.
            current_object = reduce(getitem, group.subpath, current_object)
            assert isinstance(current_object, list)
            # Test all candidates.
            for cand in current_object:
                if all(reduce(getitem, definition[:-1], cand) == definition[-1]
                       for definition in group.definitions):
                    current_object = cand
                    break
            else:
                message = f"Cannot find an element at {parent_path} for these foreign defs:\n"
                for name, value in group.definitions:
                    message += f"    {name}: {value}\n"
                print(message, file=sys.stderr)
                error = ForeignError(definitions=group.definitions, message=message)
                raise error
        assert isinstance(current_object, dict)
        return current_object

    def _validate_and_convert(self, value: Any, path: list[str]):
        """Apply some basic validation and conversion steps.

        This includes:
        - Validation against the type given in the schema
        - List typed values are split at semicolons and validated individually
        """
        if value is None:
            return value
        try:
            subschema = self._get_subschema(path)
        except KeyError as e:
            raise KeyError("There is no entry in the schema that corresponds to this column.") from e
        # Array handling only if schema says it's an array.
        if subschema.get("type") == "array":
            array_type = subschema["items"]["type"]
            if isinstance(value, str) and ";" in value:
                values = [self.PARSER[array_type](v) for v in value.split(";")]
                return values
        # special case: datetime or date
        if ("anyOf" in subschema):
            if isinstance(value, datetime.datetime) and (
                    {'type': 'string', 'format': 'date-time'} in subschema["anyOf"]):
                return value
            if isinstance(value, datetime.date) and (
                    {'type': 'string', 'format': 'date'} in subschema["anyOf"]):
                return value
        jsonschema.validate(value, subschema)

        # Finally: convert to target type
        return self.PARSER[subschema.get("type", "string")](value)

    def _get_subschema(self, path: list[str], schema: dict = None) -> dict:
        """Return the sub schema at ``path``."""
        if schema is None:
            schema = self._schema
            assert schema is not None
        assert isinstance(schema, dict)

        return xlsx_utils.get_subschema(path, schema)


def _group_foreign_paths(foreign: list[list], common: list[str]) -> list[SimpleNamespace]:
    """Group the foreign keys by their base paths.

    Parameters
    ----------
    foreign: list[list]
      A list of foreign definitions, consisting of path components, property and possibly value.

    common: list[list[str]]
      A common path which defines the final target of the foreign definitions.  This helps to understand
      where the ``foreign`` paths shall be split.

    Returns
    -------
    out: list[dict[str, list[list]]]

      A list of foreign path segments, grouped by their common segments.  Each element is a namespace
      with detailed information of all those elements which form the group.  The namespace has the
      following attributes:

      - ``path``: The full path to this path segment.  This is always the previous segment's ``path``
        plus this segment's ``subpath``.
      - ``stringpath``: The stringified ``path``, might be useful for comparison or sorting.
      - ``subpath``: The path, relative from the previous segment.
      - ``definitions``: A list of the foreign definitions for this segment, but stripped of the
        ``path`` components.
    """
    # Build a simple dict first, without subpath.
    results = {}
    for f_path in foreign:
        path = []
        for component in f_path:
            path.append(component)
            if path != common[:len(path)]:
                break
        path.pop()
        definition = f_path[len(path):]
        stringpath = xlsx_utils.p2s(path)
        if stringpath not in results:
            results[stringpath] = SimpleNamespace(stringpath=stringpath, path=path,
                                                  definitions=[definition])
        else:
            results[stringpath].definitions.append(definition)

    # Then sort by stringpath and calculate subpath.
    stringpaths = sorted(results.keys())

    resultlist = []
    last_level = 0
    for stringpath in stringpaths:
        elem = results[stringpath]
        elem.subpath = elem.path[last_level:]
        last_level = len(elem.path)
        resultlist.append(elem)

    # from IPython import embed
    # embed()

    if last_level != len(common):
        raise ValueError("Foreign keys must cover the complete `common` depth.")
    return resultlist


# pylint: disable-next=dangerous-default-value,too-many-arguments
def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], skip: int = 0,
                   overwrite: bool = False, append_to_list: bool = False) -> dict:
    """Set a value in a nested dict.

    Parameters
    ----------
    mydict: dict
      The dict into which the ``value`` shall be inserted.
    path: list
      A list of keys, denoting the location of the value.
    value
      The value which shall be set inside the dict.
    prefix: list
      A list of keys which shall be removed from ``path``.  A KeyError is raised if ``path`` does not
      start with the elements of ``prefix``.
    skip: int = 0
      Remove this many additional levels from the path, *after* removing the prefix.
    overwrite: bool = False
      If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values
      leads to an exception.
    append_to_list: bool = False
      If True, assume that the element at ``path`` is a list and append the value to it.  If the list
      does not exist, create it.  If there is a non-list at ``path`` already, overwrite it with a new
      list, if ``overwrite`` is True, otherwise raise a ValueError.

    Returns
    -------
    mydict: dict
      The same dictionary that was given as a parameter, but modified.
    """
    for idx, el in enumerate(prefix):
        if path[idx] != el:
            raise KeyError(f"Path does not start with prefix: {prefix} not in {path}")
    path = path[len(prefix):]
    if skip:
        assert len(path) > skip, f"Path must be long enoug to remove skip={skip} elements."
        path = path[skip:]

    tmp_dict = mydict
    while len(path) > 1:
        key = path.pop(0)
        if key not in tmp_dict:
            tmp_dict[key] = {}
        if not isinstance(tmp_dict[key], dict):
            if overwrite:
                tmp_dict[key] = {}
            else:
                raise ValueError(f"There is already some value at {path}")
        tmp_dict = tmp_dict[key]
    key = path.pop()
    if append_to_list:
        if key not in tmp_dict:
            tmp_dict[key] = []
        if not isinstance(tmp_dict[key], list):
            if overwrite:
                tmp_dict[key] = []
            else:
                raise ValueError(f"There is already some non-list value at [{key}]")
        tmp_dict[key].append(value)
    else:
        if key in tmp_dict and not overwrite:
            raise ValueError(f"There is already some value at [{key}]")
        if key not in tmp_dict:
            tmp_dict[key] = {}
        tmp_dict[key] = value
    return mydict


def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO],
            validate: bool = None, strict: bool = False) -> dict:
    """Convert the xlsx contents to a dict, it must follow a schema.

    Parameters
    ----------
    xlsx: Union[str, BinaryIO]
      Path to the XLSX file or opened file object.

    schema: Union[dict, str, TextIO]
      Schema for validation of XLSX content.

    validate: bool, optional
      If True, validate the result against the schema.

    strict: bool, optional
      If True, fail faster.


    Returns
    -------
    out: dict
      A dict representing the JSON with the extracted data.
    """
    converter = XLSXConverter(xlsx, schema, strict=strict)
    return converter.to_dict()