From 4f616ada7e4de32f7b1429f55ddb2fbbe58c05ca Mon Sep 17 00:00:00 2001 From: Daniel <d.hornung@indiscale.com> Date: Fri, 19 Apr 2024 11:27:21 +0200 Subject: [PATCH] MAINT: Refactored helper functions. --- .../table_json_conversion/fill_xlsx.py | 127 ++--------- .../table_json_conversion/table_generator.py | 19 +- .../table_json_conversion/utils.py | 25 --- .../table_json_conversion/xlsx_utils.py | 210 ++++++++++++++++++ .../table_json_conversion/test_fill_xlsx.py | 11 +- .../test_table_template_generator.py | 4 +- 6 files changed, 240 insertions(+), 156 deletions(-) delete mode 100644 src/caosadvancedtools/table_json_conversion/utils.py create mode 100644 src/caosadvancedtools/table_json_conversion/xlsx_utils.py diff --git a/src/caosadvancedtools/table_json_conversion/fill_xlsx.py b/src/caosadvancedtools/table_json_conversion/fill_xlsx.py index e9a410db..45b571cb 100644 --- a/src/caosadvancedtools/table_json_conversion/fill_xlsx.py +++ b/src/caosadvancedtools/table_json_conversion/fill_xlsx.py @@ -22,115 +22,26 @@ from __future__ import annotations -import json import pathlib -from collections import OrderedDict from types import SimpleNamespace from typing import Any, Dict, List, Optional, TextIO, Union from warnings import warn from jsonschema import FormatChecker, validate from jsonschema.exceptions import ValidationError -from openpyxl import Workbook, load_workbook +from openpyxl import load_workbook, Workbook from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE -from openpyxl.worksheet.worksheet import Worksheet -from .table_generator import ColumnType, RowType -from .utils import p2s - - -def _is_exploded_sheet(sheet: Worksheet) -> bool: - """Return True if this is a an "exploded" sheet. - - An exploded sheet is a sheet whose data entries are LIST valued properties of entries in another - sheet. A sheet is detected as exploded iff it has FOREIGN columns. - """ - column_types = _get_column_types(sheet) - return ColumnType.FOREIGN.name in column_types.values() - - -def _get_column_types(sheet: Worksheet) -> OrderedDict: - """Return an OrderedDict: column index -> column type for the sheet. - """ - result = OrderedDict() - type_row_index = _get_row_type_column_index(sheet) - for idx, col in enumerate(sheet.columns): - type_cell = col[type_row_index] - result[idx] = type_cell.value if type_cell.value is not None else ColumnType.IGNORE.name - assert (hasattr(ColumnType, result[idx]) - or result[idx] == RowType.COL_TYPE.name), ( - f"Unexpected column type value ({idx}{type_row_index}): {type_cell.value}") - return result - - -def _get_foreign_key_columns(sheet: Worksheet) -> Dict[str, SimpleNamespace]: - """Return the foreign keys of the worksheet. - -Returns -------- -out: dict[str, SimpleNamespace] - The keys are the stringified paths. The values are SimpleNamespace objects with ``index``, - ``path`` and ``column`` attributes. - """ - column_types = _get_column_types(sheet) - path_rows = _get_path_rows(sheet) - result = OrderedDict() - for for_idx, name in column_types.items(): - if name != ColumnType.FOREIGN.name: - continue - path = [] - for row in path_rows: - component = sheet.cell(row=row+1, column=for_idx+1).value - if component is None: - break - assert isinstance(component, str), f"Expected string: {component}" - path.append(component) - result[p2s(path)] = SimpleNamespace(index=for_idx, path=path, - column=list(sheet.columns)[for_idx]) - return result - - -def _get_row_type_column_index(sheet: Worksheet): - """Return the column index (0-indexed) of the column which defines the row types. - """ - for col in sheet.columns: - for cell in col: - if cell.value == RowType.COL_TYPE.name: - return cell.column - 1 - raise ValueError("The column which defines row types (COL_TYPE, PATH, ...) is missing") - - -def _get_path_rows(sheet: Worksheet): - """Return the 0-based indices of the rows which represent paths.""" - rows = [] - rt_col = _get_row_type_column_index(sheet) - for cell in list(sheet.columns)[rt_col]: - if cell.value == RowType.PATH.name: - rows.append(cell.row-1) - return rows - - -def _next_row_index(sheet: Worksheet) -> int: - """Return the index for the next data row. - - This is defined as the first row without any content. - """ - return sheet.max_row - - -def read_or_dict(data: Union[dict, str, TextIO]) -> dict: - """If data is a json file name or input stream, read data from there.""" - if isinstance(data, dict): - pass - elif isinstance(data, str): - with open(data, encoding="utf-8") as infile: - data = json.load(infile) - elif hasattr(data, "read"): - data = json.load(data) - else: - raise ValueError(f"I don't know how to handle the datatype of `data`: {type(data)}") - assert isinstance(data, dict) - return data +from .xlsx_utils import ( + get_foreign_key_columns, + get_row_type_column_index, + is_exploded_sheet, + next_row_index, + p2s, + read_or_dict, + ColumnType, + RowType +) class TemplateFiller: @@ -143,6 +54,7 @@ class TemplateFiller: @property def workbook(self): + """Return the workbook of this TemplateFiller.""" return self._workbook def fill_data(self, data: dict): @@ -172,6 +84,7 @@ class TemplateFiller: return result def next_level(self, next_level: str) -> TemplateFiller.Context: + """Return a copy of this Context, with the path appended by ``next_level``.""" result = self.copy() result._current_path.append(next_level) # pylint: disable=protected-access return result @@ -212,7 +125,7 @@ class TemplateFiller: for sheetname in self._workbook.sheetnames: sheet = self._workbook[sheetname] type_column = [x.value for x in list(sheet.columns)[ - _get_row_type_column_index(sheet)]] + get_row_type_column_index(sheet)]] # 0-indexed, as everything outside of sheet.cell(...): coltype_idx = type_column.index(RowType.COL_TYPE.name) path_indices = [i for i, typ in enumerate(type_column) if typ == RowType.PATH.name] @@ -342,14 +255,14 @@ out: union[dict, None] assert sheet is sheet_meta.sheet, "All entries must be in the same sheet." col_index = sheet_meta.col_index if insert_row is None: - insert_row = _next_row_index(sheet) + insert_row = next_row_index(sheet) sheet.cell(row=insert_row+1, column=col_index+1, value=value) # Insert foreign keys - if insert_row is not None and sheet is not None and _is_exploded_sheet(sheet): + if insert_row is not None and sheet is not None and is_exploded_sheet(sheet): try: - foreigns = _get_foreign_key_columns(sheet) + foreigns = get_foreign_key_columns(sheet) except ValueError: print(f"Sheet: {sheet}") raise @@ -422,9 +335,9 @@ validation_schema: dict, optional validation_schema = read_or_dict(validation_schema) try: validate(data, validation_schema, format_checker=FormatChecker()) - except ValidationError as ve: - print(ve.message) - raise ve + except ValidationError as verr: + print(verr.message) + raise verr else: print("No validation schema given, continue at your own risk.") diff --git a/src/caosadvancedtools/table_json_conversion/table_generator.py b/src/caosadvancedtools/table_json_conversion/table_generator.py index 857100ef..851173e2 100644 --- a/src/caosadvancedtools/table_json_conversion/table_generator.py +++ b/src/caosadvancedtools/table_json_conversion/table_generator.py @@ -27,30 +27,13 @@ This module allows to generate template tables from JSON schemas. import pathlib import re from abc import ABC, abstractmethod -from enum import Enum from typing import Dict, List, Optional, Tuple from openpyxl import Workbook from openpyxl.styles import PatternFill from openpyxl.workbook.child import INVALID_TITLE_REGEX -from .utils import p2s - - -class ColumnType(Enum): - """ column types enum """ - SCALAR = 1 - LIST = 2 - FOREIGN = 3 - MULTIPLE_CHOICE = 4 - IGNORE = 5 - - -class RowType(Enum): - """ row types enum """ - COL_TYPE = 1 - PATH = 2 - IGNORE = 3 +from .xlsx_utils import p2s, ColumnType, RowType class TableTemplateGenerator(ABC): diff --git a/src/caosadvancedtools/table_json_conversion/utils.py b/src/caosadvancedtools/table_json_conversion/utils.py deleted file mode 100644 index 15ae488d..00000000 --- a/src/caosadvancedtools/table_json_conversion/utils.py +++ /dev/null @@ -1,25 +0,0 @@ -# This file is a part of the LinkAhead Project. -# -# Copyright (C) 2024 IndiScale GmbH <info@indiscale.com> -# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com> -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see <https://www.gnu.org/licenses/>. - -from typing import List - - -def p2s(path: List[str]): - """Path to string: dot-separated. - """ - return ".".join(path) diff --git a/src/caosadvancedtools/table_json_conversion/xlsx_utils.py b/src/caosadvancedtools/table_json_conversion/xlsx_utils.py new file mode 100644 index 00000000..594f6ee4 --- /dev/null +++ b/src/caosadvancedtools/table_json_conversion/xlsx_utils.py @@ -0,0 +1,210 @@ +# encoding: utf-8 +# +# This file is a part of the LinkAhead Project. +# +# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com> +# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +"""General utilities to work with XLSX files with (hidden) column and row annotations and typing.""" + +from __future__ import annotations + +import json + +from collections import OrderedDict +from enum import Enum +from types import SimpleNamespace +from typing import Dict, List, TextIO, Union + +from openpyxl.worksheet.worksheet import Worksheet + + +class ColumnType(Enum): + """ column types enum """ + SCALAR = 1 + LIST = 2 + FOREIGN = 3 + MULTIPLE_CHOICE = 4 + IGNORE = 5 + + +class RowType(Enum): + """ row types enum """ + COL_TYPE = 1 + PATH = 2 + IGNORE = 3 + + +def p2s(path: List[str]) -> str: + """Path to string: dot-separated. + """ + return ".".join(path) + + +def read_or_dict(data: Union[dict, str, TextIO]) -> dict: + """If data is a json file name or input stream, read data from there. +If it is a dict already, just return it.""" + if isinstance(data, dict): + return data + + if isinstance(data, str): + with open(data, encoding="utf-8") as infile: + data = json.load(infile) + elif hasattr(data, "read"): + data = json.load(data) + else: + raise ValueError(f"I don't know how to handle the datatype of `data`: {type(data)}") + assert isinstance(data, dict) + return data + + +def get_data_columns(sheet: Worksheet) -> Dict[str, SimpleNamespace]: + """Return the data paths of the worksheet. + +Returns +------- +out: dict[str, SimpleNamespace] + The keys are the stringified paths. The values are SimpleNamespace objects with ``index``, + ``path`` and ``column`` attributes. + """ + column_types = _get_column_types(sheet) + path_rows = get_path_rows(sheet) + result = OrderedDict() + for for_idx, name in column_types.items(): + if name not in ( + ColumnType.SCALAR.name, + ColumnType.LIST.name, + ColumnType.MULTIPLE_CHOICE.name, + ): + continue + path = [] + for row in path_rows: + component = sheet.cell(row=row+1, column=for_idx+1).value + if component is None: + break + assert isinstance(component, str), f"Expected string: {component}" + path.append(component) + result[p2s(path)] = SimpleNamespace(index=for_idx, path=path, + column=list(sheet.columns)[for_idx]) + return result + + +def get_foreign_key_columns(sheet: Worksheet) -> Dict[str, SimpleNamespace]: + """Return the foreign keys of the worksheet. + +Returns +------- +out: dict[str, SimpleNamespace] + The keys are the stringified paths. The values are SimpleNamespace objects with ``index``, + ``path`` and ``column`` attributes. + """ + column_types = _get_column_types(sheet) + path_rows = get_path_rows(sheet) + result = OrderedDict() + for for_idx, name in column_types.items(): + if name != ColumnType.FOREIGN.name: + continue + path = [] + for row in path_rows: + component = sheet.cell(row=row+1, column=for_idx+1).value + if component is None: + break + assert isinstance(component, str), f"Expected string: {component}" + path.append(component) + result[p2s(path)] = SimpleNamespace(index=for_idx, path=path, + column=list(sheet.columns)[for_idx]) + return result + + +def get_parent_path(sheet: Worksheet) -> list[str]: + """Return a path which represents the parent element. + +For top-level sheets / entries, this returns an empty list. + """ + # Parent element: longest common path shared among any foreign column and all the data columns + result: list[str] = [] + + # longest common path in data colums + data_paths = [el.path for el in get_data_columns(sheet).values()] + for ii in range(min([len(path) for path in data_paths])): + components_at_index = {path[ii] for path in data_paths} + if len(components_at_index) > 1: + break + longest_data_path = data_paths[0][:ii] + + # longest common overall path + foreign_paths = [el.path for el in get_foreign_key_columns(sheet).values()] + for foreign_path in foreign_paths: + for ii in range(min([len(foreign_path), len(longest_data_path)])): + components_at_index = {foreign_path[ii], longest_data_path[ii]} + if len(components_at_index) > 1: + break + if ii > len(result): + result = foreign_path[:ii] + + return result + + +def get_path_rows(sheet: Worksheet): + """Return the 0-based indices of the rows which represent paths.""" + rows = [] + rt_col = get_row_type_column_index(sheet) + for cell in list(sheet.columns)[rt_col]: + if cell.value == RowType.PATH.name: + rows.append(cell.row-1) + return rows + + +def get_row_type_column_index(sheet: Worksheet): + """Return the column index (0-indexed) of the column which defines the row types. + """ + for col in sheet.columns: + for cell in col: + if cell.value == RowType.COL_TYPE.name: + return cell.column - 1 + raise ValueError("The column which defines row types (COL_TYPE, PATH, ...) is missing") + + +def next_row_index(sheet: Worksheet) -> int: + """Return the index for the next data row. + + This is defined as the first row without any content. + """ + return sheet.max_row + + +def is_exploded_sheet(sheet: Worksheet) -> bool: + """Return True if this is a an "exploded" sheet. + + An exploded sheet is a sheet whose data entries are LIST valued properties of entries in another + sheet. A sheet is detected as exploded iff it has FOREIGN columns. + """ + column_types = _get_column_types(sheet) + return ColumnType.FOREIGN.name in column_types.values() + + +def _get_column_types(sheet: Worksheet) -> OrderedDict: + """Return an OrderedDict: column index -> column type for the sheet. + """ + result = OrderedDict() + type_row_index = get_row_type_column_index(sheet) + for idx, col in enumerate(sheet.columns): + type_cell = col[type_row_index] + result[idx] = type_cell.value if type_cell.value is not None else ( + ColumnType.IGNORE.name) + assert (hasattr(ColumnType, result[idx]) or result[idx] == RowType.COL_TYPE.name), ( + f"Unexpected column type value ({idx}{type_row_index}): {type_cell.value}") + return result diff --git a/unittests/table_json_conversion/test_fill_xlsx.py b/unittests/table_json_conversion/test_fill_xlsx.py index 1315bd9f..b2eaf042 100644 --- a/unittests/table_json_conversion/test_fill_xlsx.py +++ b/unittests/table_json_conversion/test_fill_xlsx.py @@ -26,9 +26,12 @@ import tempfile import jsonschema.exceptions as schema_exc import pytest -from caosadvancedtools.table_json_conversion.fill_xlsx import ( - _get_path_rows, _get_row_type_column_index, fill_template) from openpyxl import load_workbook +from caosadvancedtools.table_json_conversion.fill_xlsx import fill_template +from caosadvancedtools.table_json_conversion.xlsx_utils import ( + get_row_type_column_index, + get_path_rows, +) from .utils import compare_workbooks @@ -67,8 +70,8 @@ custom_output: str, optional def test_detect(): example = load_workbook(rfp("data/simple_template.xlsx")) - assert 0 == _get_row_type_column_index(example['Person']) - assert [1, 2] == _get_path_rows(example['Person']) + assert 0 == get_row_type_column_index(example['Person']) + assert [1, 2] == get_path_rows(example['Person']) def test_temporary(): diff --git a/unittests/table_json_conversion/test_table_template_generator.py b/unittests/table_json_conversion/test_table_template_generator.py index 61da2142..070a7908 100644 --- a/unittests/table_json_conversion/test_table_template_generator.py +++ b/unittests/table_json_conversion/test_table_template_generator.py @@ -25,8 +25,8 @@ import tempfile from typing import Tuple import pytest -from caosadvancedtools.table_json_conversion.table_generator import ( - ColumnType, XLSXTemplateGenerator) +from caosadvancedtools.table_json_conversion.table_generator import XLSXTemplateGenerator +from caosadvancedtools.table_json_conversion.xlsx_utils import ColumnType from openpyxl import load_workbook from .utils import compare_workbooks -- GitLab