diff --git a/src/caosadvancedtools/table_json_conversion/fill_xlsx.py b/src/caosadvancedtools/table_json_conversion/fill_xlsx.py index 5ef95925ceca111c3cd1d8c15a19f36f0b1e8fe3..bf6f8ef044c04c12dc1c7a5821c8a26f6d812530 100644 --- a/src/caosadvancedtools/table_json_conversion/fill_xlsx.py +++ b/src/caosadvancedtools/table_json_conversion/fill_xlsx.py @@ -21,10 +21,12 @@ # along with this program. If not, see <https://www.gnu.org/licenses/>. import json +from collections import OrderedDict from types import SimpleNamespace -from typing import List, Union, TextIO +from typing import Any, Dict, List, Optional, Union, TextIO -from openpyxl import load_workbook +from openpyxl import load_workbook, Workbook +from openpyxl.worksheet.worksheet import Worksheet from .table_generator import ColumnType, RowType @@ -39,7 +41,32 @@ def _fill_leaves(json_doc: dict, workbook): workbook.cell(1, 2, el) +def _is_exploded_sheet(sheet: Worksheet) -> bool: + """Return True if this is a an "exploded" sheet. + + An exploded sheet is a sheet whose data entries are LIST valued properties of entries in another + sheet. A sheet is detected as exploded iff it has FOREIGN columns. + """ + column_types = _get_column_types(sheet) + return ColumnType.FOREIGN.value in column_types.values() + + +def _get_column_types(sheet: Worksheet) -> OrderedDict: + """Return an OrderedDict: column index -> column type for the sheet. + """ + result = OrderedDict() + type_row_index = _get_row_type_column_index(sheet) - 1 + for idx, col in enumerate(sheet.columns): + type_cell = col[type_row_index] + result[idx] = type_cell.value + assert hasattr(ColumnType, type_cell.value) or type_cell.value is None, ( + f"Unexpected column type value: {type_cell.value}") + return result + + def _get_row_type_column_index(worksheet): + """Return the column index (1-indexed) of the column which defines the row types. + """ for col in worksheet.columns: for cell in col: if cell.value == RowType.COL_TYPE.name: @@ -48,6 +75,7 @@ def _get_row_type_column_index(worksheet): def _get_path_rows(worksheet): + """Return the 1-based indices of the rows which represent paths.""" rows = [] rt_col = _get_row_type_column_index(worksheet) for cell in list(worksheet.columns)[rt_col-1]: @@ -60,8 +88,8 @@ def _get_path_rows(worksheet): def _next_row_index(sheet) -> int: """Return the index for the next data row. -This is defined as the first row without any content. -""" + This is defined as the first row without any content. + """ return sheet.max_row @@ -74,11 +102,11 @@ class TemplateFiller: """Fill the data into the workbook.""" self._handle_data(data=data, current_path=[]) - def _create_index(self, ): + def _create_index(self): """Create a sheet index for the workbook. - Index the sheets by their relevant path array. Also create a simple column index by column - type and path. + Index the sheets by all path arrays leading to them. Also create a simple column index by + column type and path. """ self._sheet_index = {} @@ -92,8 +120,6 @@ class TemplateFiller: # Get the paths, use without the leaf component for sheet indexing, with type prefix and # leaf for column indexing. - paths = [] - col_index = {} for col_idx, col in enumerate(sheet.columns): if col[coltype_idx].value == RowType.COL_TYPE.name: continue @@ -101,37 +127,47 @@ class TemplateFiller: for path_idx in path_indices: if col[path_idx].value is not None: path.append(col[path_idx].value) - col_key = ".".join([col[coltype_idx].value] + path) - col_index[col_key] = SimpleNamespace(column=col, col_index=col_idx) + # col_key = ".".join([col[coltype_idx].value] + path) + # col_index[col_key] = SimpleNamespace(column=col, col_index=col_idx) if col[coltype_idx].value not in [ColumnType.SCALAR.name, ColumnType.LIST.name]: continue - paths.append(path[:-1]) - - # Find common components: - common_path = [] - for idx, component in enumerate(paths[0]): - for path in paths: - if not path[idx] == component: - break - else: - common_path.append(component) - assert len(common_path) >= 1 - - self._sheet_index[".".join(common_path)] = SimpleNamespace( - common_path=common_path, sheetname=sheetname, sheet=sheet, col_index=col_index) - - def _handle_data(self, data: dict, current_path: List[str] = None): + + path_str = ".".join(path) + assert path_str not in self._sheet_index + self._sheet_index[path_str] = SimpleNamespace( + sheetname=sheetname, sheet=sheet, col_index=col_idx, + col_type=col[coltype_idx].value) + + def _handle_data(self, data: dict, current_path: List[str] = None, + only_collect_insertables: bool = False, + ) -> Optional[Dict[str, Any]]: """Handle the data and write it into ``workbook``. Parameters ---------- data: dict The data at the current path position. Elements may be dicts, lists or simple scalar values. + +current_path: list[str], optional + If this is None or empty, we are at the top level. This means that all children shall be entered + into their respective sheets and not into a sheet at this level. + +only_collect_insertables: bool, optional + If True, do not insert anything on this level, but return a dict with entries to be inserted. + + +Returns +------- + +out: union[dict, None] + If ``only_collect_insertables`` is True, return a dict (path string -> value) """ if current_path is None: current_path = [] + insertables: Dict[str, Any] = {} for name, content in data.items(): path = current_path + [name] + # preprocessing if isinstance(content, list): if not content: continue @@ -142,34 +178,48 @@ data: dict for entry in content: self._handle_data(data=entry, current_path=path) continue - self._handle_simple_data(data=content, current_path=path) - - def _handle_simple_data(self, data, current_path: List[str]): - """Enter this single data item into the workbook. - -Parameters ----------- -data: dict - The data at the current path position. Must be single items (dict or simple scalar) or lists of - simple values. - """ - sheet_meta = self._sheet_index[".".join(current_path)] - sheet = sheet_meta.sheet - next_row = _next_row_index(sheet) - for name, content in data.items(): - if isinstance(content, list): - # TODO handle later - # scalar elements: semicolon separated - # nested dicts: recurse - pass elif isinstance(content, dict): - pass - # scalars + if not current_path: # Special handling for top level + self._handle_data(content, current_path=path) + continue + insert = self._handle_data(content, current_path=path, + only_collect_insertables=True) + assert isinstance(insert, dict) + assert not any(key in insertables for key in insert) + insertables.update(insert) + continue + else: # scalars + content = [content] + + # collecting the data + assert isinstance(content, list) + if len(content) == 1: + value = content[0] else: - path = current_path + [name] - path_str = ".".join([ColumnType.SCALAR.name] + path) - col_index = sheet_meta.col_index[path_str].col_index - sheet.cell(row=next_row+1, column=col_index+1, value=content) + value = ";".join(content) + path_str = ".".join(path) + assert path_str not in insertables + insertables[path_str] = value + if only_collect_insertables: + return insertables + if not current_path: + return + + # actual data insertion + insert_row = None + sheet = None + for path_str, value in insertables.items(): + sheet_meta = self._sheet_index[path_str] + if sheet is None: + sheet = sheet_meta.sheet + assert sheet is sheet_meta.sheet, "All entries must be in the same sheet." + col_index = sheet_meta.col_index + if insert_row is None: + insert_row = _next_row_index(sheet) + + sheet.cell(row=insert_row+1, column=col_index+1, value=value) + # self._handle_simple_data(data=content, current_path=path) + return None def fill_template(data: Union[dict, str, TextIO], template: str, result: str) -> None: diff --git a/unittests/table_json_conversion/example_template.xlsx b/unittests/table_json_conversion/example_template.xlsx index 68177385bcfda7c3cdcdeca24a94e4757240d793..1162965bf44642a4523123fa52c58dd240b25e5f 100644 Binary files a/unittests/table_json_conversion/example_template.xlsx and b/unittests/table_json_conversion/example_template.xlsx differ