diff --git a/src/caosadvancedtools/table_json_conversion/convert.py b/src/caosadvancedtools/table_json_conversion/convert.py index a19f10fbf1347d87d16bff111edae257846be352..e970d0397fe433475d9686664385119d09669911 100644 --- a/src/caosadvancedtools/table_json_conversion/convert.py +++ b/src/caosadvancedtools/table_json_conversion/convert.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 # encoding: utf-8 # # This file is a part of the LinkAhead Project. @@ -19,14 +18,20 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. +"""Convert XLSX files to JSON dictionaries.""" + +from collections import OrderedDict +from types import SimpleNamespace from typing import Any, BinaryIO, Dict, List, Optional, TextIO, Union import caosadvancedtools.table_json_conversion.xlsx_utils as xlsx_utils -from . import fill_xlsx -from .fill_xlsx import read_or_dict + from openpyxl import load_workbook, Workbook from openpyxl.worksheet.worksheet import Worksheet +from . import fill_xlsx +from .fill_xlsx import read_or_dict + class XLSXConverter: """Class for conversion from XLSX to JSON. @@ -47,8 +52,9 @@ schema: Union[dict, str, TextIO] """ self._workbook = load_workbook(xlsx) self._schema = read_or_dict(schema) - self._handled_sheets = set() - self._result = {} + self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook) + self._handled_sheets: set[str] = set() + self._result: dict = {} def to_dict(self) -> dict: """Convert the xlsx contents to a dict. @@ -63,17 +69,119 @@ out: dict for sheetname in self._workbook.sheetnames: if sheetname not in self._handled_sheets: self._handle_sheet(self._workbook.get_sheet_by_name(sheetname)) + return self._result def _handle_sheet(self, sheet: Worksheet) -> None: """Add the contents of the sheet to the result. + +Each row in the sheet corresponds to one entry in an array in the result. +Which array exactly is defined by the content of the foreign columns. """ path_rows = xlsx_utils.get_path_rows(sheet) row_type_column = xlsx_utils.get_row_type_column_index(sheet) foreign_columns = xlsx_utils.get_foreign_key_columns(sheet) - # Parent element: longest common path shared among any foreign column and all the data columns + foreign_column_paths = {col.index: col.path for col in foreign_columns.values()} + data_columns = xlsx_utils.get_data_columns(sheet) + data_column_paths = {col.index: col.path for col in data_columns.values()} + # Parent path, insert in correct order. parent = xlsx_utils.get_parent_path(sheet) - # from IPython import embed - # embed() + if parent: + parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index) + if parent_sheetname not in self._handled_sheets: + self._handle_sheet(self._workbook.get_sheet_by_name(parent_sheetname)) + + # We save single entries in lists, indexed by their foreign key contents. Each entry + # consists of: + # - foreign: Dict with path -> value for the foreign columns + # - data: The actual data of this entry, a dict. + entries: dict[str, list[SimpleNamespace]] = {} + + if len(parent) < 2: + return + for row in sheet.iter_rows(values_only=True): + # Skip non-data rows. + if row[row_type_column] is not None: + continue + foreign_repr = "" + foreign = [] # A list of lists, each of which is: [path1, path2, ..., leaf, value] + data = {} # Local data dict + # Collect data (in dict relative to current level) and foreign data information + for col_idx, value in enumerate(row): + if col_idx in foreign_column_paths: + foreign_repr += str(value) + foreign.append(foreign_column_paths[col_idx] + [value]) + continue + + if col_idx in data_column_paths: + _set_in_nested(mydict=data, path=data_column_paths[col_idx], value=value, + prefix=parent) + continue + continue + + # Find current position in tree + parent_list = self._get_parent_list(foreign) + + # Append data to current position's list + parent_list.append(data) + + def _get_parent_list(self, parent_path: list[str], foreign: list[list]) -> list[dict]: + """For a ``foreign`` specification, get the correct list from the current result-in-making. + + """ + if not foreign: + + from IPython import embed + embed() + + +# pylint: disable-next=dangerous-default-value +def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], overwrite=False) -> ( + dict): + """Set a value in a nested dict. + +Parameters +---------- +mydict: dict + The dict into which the ``value`` shall be inserted. +path: list + A list of keys, denoting the location of the value. +value + The value inside the dict. +prefix: list + A list of keys which shall be removed from ``path``. A KeyError is raised if ``path`` does not + start with the elements of ``prefix``. +overwrite: bool = False + If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values + leads to an exception. + +Returns +------- +mydict: dict + The same dictionary that was given as a parameter, but modified. + """ + for idx, el in enumerate(prefix): + if path[idx] != el: + raise KeyError(f"Path does not start with prefix: {prefix} not in {path}") + path = path[len(prefix):] + + tmp_dict = mydict + while len(path) > 1: + key = path.pop(0) + if key not in tmp_dict: + tmp_dict[key] = {} + if not isinstance(tmp_dict[key], dict): + if overwrite: + tmp_dict[key] = {} + else: + raise ValueError(f"There is already some value at {path}") + tmp_dict = tmp_dict[key] + key = path.pop() + if key in tmp_dict and not overwrite: + raise ValueError(f"There is already some value at [{key}]") + if key not in tmp_dict: + tmp_dict[key] = {} + tmp_dict[key] = value + return mydict def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO]) -> dict: diff --git a/src/caosadvancedtools/table_json_conversion/xlsx_utils.py b/src/caosadvancedtools/table_json_conversion/xlsx_utils.py index 594f6ee42df30773055be512e3c412e6948609ea..5f53a5105fa220e6b01cfd117d1fc682da2a0156 100644 --- a/src/caosadvancedtools/table_json_conversion/xlsx_utils.py +++ b/src/caosadvancedtools/table_json_conversion/xlsx_utils.py @@ -29,6 +29,7 @@ from enum import Enum from types import SimpleNamespace from typing import Dict, List, TextIO, Union +from openpyxl import Workbook from openpyxl.worksheet.worksheet import Worksheet @@ -71,6 +72,46 @@ If it is a dict already, just return it.""" return data +def get_defining_paths(workbook: Workbook) -> dict[str, list[list[str]]]: + """For all sheets in ``workbook``, list the paths which they define. + +A sheet is said to define a path, if it has data columns for properties inside that path. For +example, consider the following worksheet: + +| `COL_TYPE` | `SCALAR` | `SCALAR` | `LIST` | `SCALAR` | +| `PATH` | `Training` | `Training` | `Training` | `Training` | +| `PATH` | `url` | `date` | `subjects` | `supervisor` | +| `PATH` | | | | `email` | +|------------|----------------|---------------|--------------|--------------------| +| | example.com/mp | 2024-02-27 | Math;Physics | steve@example.com | +| | example.com/m | 2024-02-27 | Math | stella@example.com | + +This worksheet defines properties for the paths `["Training"]` and `["Training", "supervisor"]`, and +thus these two path lists would be returned for the key with this sheet's sheetname. + +Parameters +---------- +workbook: Workbook + The workbook to analyze. + +Returns +------- +out: dict[str, list[list[str]] + A dict with worksheet names as keys and lists of paths (represented as string lists) as values. +""" + result: dict[str, list[list[str]]] = {} + for sheet in workbook.worksheets: + paths = [] + added = set() + for col in get_data_columns(sheet).values(): + rep = p2s(col.path[:-1]) + if rep not in added: + paths.append(col.path[:-1]) + added.add(rep) + result[sheet.title] = paths + return result + + def get_data_columns(sheet: Worksheet) -> Dict[str, SimpleNamespace]: """Return the data paths of the worksheet. @@ -132,7 +173,7 @@ out: dict[str, SimpleNamespace] def get_parent_path(sheet: Worksheet) -> list[str]: """Return a path which represents the parent element. -For top-level sheets / entries, this returns an empty list. +For top-level sheets / entries (those without foreign columns), this returns an empty list. """ # Parent element: longest common path shared among any foreign column and all the data columns result: list[str] = [] @@ -178,6 +219,14 @@ def get_row_type_column_index(sheet: Worksheet): raise ValueError("The column which defines row types (COL_TYPE, PATH, ...) is missing") +def get_worksheet_for_path(path: list[str], defining_path_index: dict[str, list[list[str]]]) -> str: + """Find the sheet name which corresponds to the given path.""" + for sheetname, paths in defining_path_index.items(): + if path in paths: + return sheetname + raise KeyError(f"Could not find defining worksheet for path: {path}") + + def next_row_index(sheet: Worksheet) -> int: """Return the index for the next data row. diff --git a/unittests/table_json_conversion/test_read_data.py b/unittests/table_json_conversion/test_read_data.py index 1b1b49d023d00aff4627b19ec3494a08ffbbd9e8..f78e8a3729fa41ebe966ddac12a46b04f5e28a7d 100644 --- a/unittests/table_json_conversion/test_read_data.py +++ b/unittests/table_json_conversion/test_read_data.py @@ -66,3 +66,41 @@ def test_simple(): convert.to_dict(xlsx=rfp("data/multiple_refs_data.xlsx"), schema=rfp("data/multiple_refs_schema.json")) # conv = XLSXConverter(schema=rfp("data/simple_schema.json")) # result = conv.to_dict(rfp("data/simple_template.xlsx")) + + +def test_protected(): + set_in_nested = convert._set_in_nested # pylint: disable=protected-access + + test_data_in = [ + {"mydict": {}, "path": ["a", 1], "value": 3}, + {"mydict": {"a": 1}, "path": ["a"], "value": 3, "overwrite": True}, + {"mydict": {"a": 1}, "path": ["a", 1], "value": 3, "overwrite": True}, + {"mydict": {"b": 2}, "path": ["a", 1, 3.141], "value": 3}, + {"mydict": {}, "path": ["X", "Y", "a", 1], "value": 3, "prefix": ["X", "Y"]}, + ] + test_data_out = [ + {"a": {1: 3}}, + {"a": 3}, + {"a": {1: 3}}, + {"a": {1: {3.141: 3}}, "b": 2}, + {"a": {1: 3}}, + ] + + for data_in, data_out in zip(test_data_in, test_data_out): + assert set_in_nested(**data_in) == data_out + + # Testing exceptions + test_data_in = [ + {"mydict": {"a": 1}, "path": ["a"], "value": 3}, + {"mydict": {"a": 1}, "path": ["a", 1], "value": 3}, + {"mydict": {}, "path": ["a", 1], "value": 3, "prefix": ["X", "Y", "Z"]}, + ] + exceptions = [ + [ValueError, r"There is already some value at \[a\]"], + [ValueError, r"There is already some value at \[1\]"], + [KeyError, r"Path does not start with prefix: \['X', 'Y', 'Z'\] not in \['a', 1\]"], + ] + + for data_in, (exc_out, match) in zip(test_data_in, exceptions): + with pytest.raises(exc_out, match=match): + set_in_nested(**data_in)