Skip to content
Snippets Groups Projects

Filling XLSX: Everything except multiple choice.

Merged Daniel Hornung requested to merge f-json-table into dev
Compare and
21 files
+ 565
100
Compare changes
  • Side-by-side
  • Inline
Files
21
@@ -5,6 +5,7 @@
#
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Henrik tom Wörden <h.tomwoerden@indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@@ -19,71 +20,346 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from openpyxl import load_workbook
from __future__ import annotations
import json
import pathlib
from collections import OrderedDict
from types import SimpleNamespace
from typing import Any, Dict, List, Optional, Union, TextIO
from jsonschema import FormatChecker, validate
from openpyxl import load_workbook, Workbook
from openpyxl.worksheet.worksheet import Worksheet
from .table_generator import ColumnType, RowType
from .utils import p2s
def _is_exploded_sheet(sheet: Worksheet) -> bool:
"""Return True if this is a an "exploded" sheet.
An exploded sheet is a sheet whose data entries are LIST valued properties of entries in another
sheet. A sheet is detected as exploded iff it has FOREIGN columns.
"""
column_types = _get_column_types(sheet)
return ColumnType.FOREIGN.name in column_types.values()
def _get_column_types(sheet: Worksheet) -> OrderedDict:
"""Return an OrderedDict: column index -> column type for the sheet.
"""
result = OrderedDict()
type_row_index = _get_row_type_column_index(sheet)
for idx, col in enumerate(sheet.columns):
type_cell = col[type_row_index]
result[idx] = type_cell.value
assert (hasattr(ColumnType, type_cell.value)
or type_cell.value == RowType.COL_TYPE.name
or type_cell.value is None), (
f"Unexpected column type value: {type_cell.value}")
return result
def _fill_leaves(json_doc: dict, workbook):
for key, value in json_doc:
if not isinstance(value, list):
value = [value]
for el in value:
if isinstance(el, dict):
_fill_leaves(el, workbook)
workbook.cell(1, 2, el)
def _get_foreign_key_columns(sheet: Worksheet) -> Dict[str, SimpleNamespace]:
"""Return the foreign keys of the worksheet.
Returns
-------
out: dict[str, SimpleNamespace]
The keys are the stringified paths. The values are SimpleNamespace objects with ``index``,
``path`` and ``column`` attributes.
"""
column_types = _get_column_types(sheet)
path_rows = _get_path_rows(sheet)
result = OrderedDict()
for for_idx, name in column_types.items():
if name != ColumnType.FOREIGN.name:
continue
path = []
for row in path_rows:
component = sheet.cell(row=row+1, column=for_idx+1).value
if component is None:
break
assert isinstance(component, str), f"Expected string: {component}"
path.append(component)
result[p2s(path)] = SimpleNamespace(index=for_idx, path=path,
column=list(sheet.columns)[for_idx])
return result
def _get_row_type_column(worksheet):
for col in worksheet.columns:
def _get_row_type_column_index(sheet: Worksheet):
"""Return the column index (0-indexed) of the column which defines the row types.
"""
for col in sheet.columns:
for cell in col:
if cell.value == RowType.COL_TYPE.name:
return cell.column
return cell.column - 1
raise ValueError("The column which defines row types (COL_TYPE, PATH, ...) is missing")
def _get_path_rows(worksheet):
def _get_path_rows(sheet: Worksheet):
"""Return the 0-based indices of the rows which represent paths."""
rows = []
rt_col = _get_row_type_column(worksheet)
for cell in list(worksheet.columns)[rt_col-1]:
print(cell.value)
rt_col = _get_row_type_column_index(sheet)
for cell in list(sheet.columns)[rt_col]:
if cell.value == RowType.PATH.name:
rows.append(cell.row)
rows.append(cell.row-1)
return rows
def _generate_path_col_mapping(workbook):
rt_col = _get_row_type_column(workbook)
def _next_row_index(sheet: Worksheet) -> int:
"""Return the index for the next data row.
This is defined as the first row without any content.
"""
return sheet.max_row
for col in workbook.columns:
def _read_or_dict(data: Union[dict, str, TextIO]) -> dict:
"""If data is a json file name or input stream, read data from there."""
if isinstance(data, dict):
pass
elif isinstance(data, str):
with open(data, encoding="utf-8") as infile:
data = json.load(infile)
elif hasattr(data, "read"):
data = json.load(data)
else:
raise ValueError(f"I don't know how to handle the datatype of `data`: {type(data)}")
assert isinstance(data, dict)
return data
def fill_template(template_path: str, json_path: str, result_path: str) -> None:
"""
Fill the contents of the JSON document stored at ``json_path`` into the template stored at
``template_path`` and store the result under ``result_path``.
"""
template = load_workbook(template_path)
# For each top level key in the json we iterate the values (if it is an array). Those are the
# root elements that belong to a particular sheet.
# After treating a root element, the row index for the corresponding sheet needs to be
# increased
# When we finished treating an object that goes into a lower ranked sheet (see below), we
# increase the row index of that sheet.
#
# We can generate a hierarchy of sheets in the beginning (using the paths). The lower sheets
# are for objects referenced by objects in higher ranked sheets.
# We can detect the sheet corresponding to a root element by looking at the first path element:
# The first path element must be the root element every where.
# Suggestion:
# row indices: Dict[str, int] string is the sheet name
# sheet_hirarchy: List[Tuple[str]] elements are sheet names
#
# Question:
# We can create an internal representation where we assign as sheet_names the same names that
# are used in table generator. Or should we create another special row that contains this
# somehow?
template.save(result_path)
class TemplateFiller:
"""Class to fill XLSX templates. Has an index for all relevant columns."""
def __init__(self, workbook: Workbook):
self._workbook = workbook
self._create_index()
@property
def workbook(self):
return self._workbook
def fill_data(self, data: dict):
"""Fill the data into the workbook."""
self._handle_data(data=data)
class Context:
"""Context for an entry: simple properties of all ancestors, organized in a dict.
This is similar to a dictionary with all scalar element properties at the tree nodes up to
the root. Siblings in lists and dicts are ignored. Additionally the context knows where
its current position is.
Lookup of elements can easily be achieved by giving the path (as ``list[str]`` or
stringified path).
"""
def __init__(self, current_path: List[str] = None, props: Dict[str, Any] = None):
self._current_path = current_path if current_path is not None else []
self._props = props if props is not None else {} # this is flat
def copy(self) -> TemplateFiller.Context:
"""Deep copy."""
result = TemplateFiller.Context(current_path=self._current_path.copy(),
props=self._props.copy())
return result
def next_level(self, next_level: str) -> TemplateFiller.Context:
result = self.copy()
result._current_path.append(next_level)
return result
def __getitem__(self, path: Union[List[str], str], owner=None) -> Any:
if isinstance(path, list):
path = p2s(path)
return self._props[path]
def __setitem__(self, propname: str, value):
fullpath = p2s(self._current_path + [propname])
self._props[fullpath] = value
def fill_from_data(self, data: Dict[str, Any]):
"""Fill current level with all scalar elements of ``data``."""
for name, value in data.items():
if not isinstance(value, (dict, list)):
self[name] = value
def _create_index(self):
"""Create a sheet index for the workbook.
Index the sheets by all path arrays leading to them. Also create a simple column index by
column type and path.
"""
self._sheet_index = {}
for sheetname in self._workbook.sheetnames:
sheet = self._workbook[sheetname]
type_column = [x.value for x in list(sheet.columns)[
_get_row_type_column_index(sheet)]]
# 0-indexed, as everything outside of sheet.cell(...):
coltype_idx = type_column.index(RowType.COL_TYPE.name)
path_indices = [i for i, typ in enumerate(type_column) if typ == RowType.PATH.name]
# Get the paths, use without the leaf component for sheet indexing, with type prefix and
# leaf for column indexing.
for col_idx, col in enumerate(sheet.columns):
if col[coltype_idx].value == RowType.COL_TYPE.name:
continue
path = []
for path_idx in path_indices:
if col[path_idx].value is not None:
path.append(col[path_idx].value)
# col_key = p2s([col[coltype_idx].value] + path)
# col_index[col_key] = SimpleNamespace(column=col, col_index=col_idx)
if col[coltype_idx].value not in [ColumnType.SCALAR.name, ColumnType.LIST.name]:
continue
path_str = p2s(path)
assert path_str not in self._sheet_index
self._sheet_index[path_str] = SimpleNamespace(
sheetname=sheetname, sheet=sheet, col_index=col_idx,
col_type=col[coltype_idx].value)
def _handle_data(self, data: dict, current_path: List[str] = None,
context: TemplateFiller.Context = None,
only_collect_insertables: bool = False,
) -> Optional[Dict[str, Any]]:
"""Handle the data and write it into ``workbook``.
Parameters
----------
data: dict
The data at the current path position. Elements may be dicts, lists or simple scalar values.
current_path: list[str], optional
If this is None or empty, we are at the top level. This means that all children shall be entered
into their respective sheets and not into a sheet at this level. ``current_path`` and ``context``
must either both be given, or none of them.
context: TemplateFiller.Context, optional
Directopry of scalar element properties at the tree nodes up to the root. Siblings in lists
and dicts are ignored. ``context`` and ``current_path`` must either both be given, or none of
them.
only_collect_insertables: bool, optional
If True, do not insert anything on this level, but return a dict with entries to be inserted.
Returns
-------
out: union[dict, None]
If ``only_collect_insertables`` is True, return a dict (path string -> value)
"""
assert (current_path is None) is (context is None), (
"`current_path` and `context` must either both be given, or none of them.")
if current_path is None:
current_path = []
if context is None:
context = TemplateFiller.Context()
context.fill_from_data(data)
insertables: Dict[str, Any] = {}
for name, content in data.items():
path = current_path + [name]
next_context = context.next_level(name)
# preprocessing
if isinstance(content, list):
if not content:
continue
# Must be all of the same type.
assert len(set(type(entry) for entry in content)) == 1
if isinstance(content[0], dict):
# An array of objects: must go into exploded sheet
for entry in content:
self._handle_data(data=entry, current_path=path, context=next_context)
continue
elif isinstance(content, dict):
if not current_path: # Special handling for top level
self._handle_data(content, current_path=path, context=next_context)
continue
insert = self._handle_data(content, current_path=path, context=next_context.copy(),
only_collect_insertables=True)
assert isinstance(insert, dict)
assert not any(key in insertables for key in insert)
insertables.update(insert)
continue
else: # scalars
content = [content]
# collecting the data
assert isinstance(content, list)
if len(content) == 1:
value = content[0]
else:
value = ";".join(content)
path_str = p2s(path)
assert path_str not in insertables
insertables[path_str] = value
if only_collect_insertables:
return insertables
if not current_path:
return None
# actual data insertion
insert_row = None
sheet = None
for path_str, value in insertables.items():
sheet_meta = self._sheet_index[path_str]
if sheet is None:
sheet = sheet_meta.sheet
assert sheet is sheet_meta.sheet, "All entries must be in the same sheet."
col_index = sheet_meta.col_index
if insert_row is None:
insert_row = _next_row_index(sheet)
sheet.cell(row=insert_row+1, column=col_index+1, value=value)
# Insert foreign keys
if insert_row is not None and sheet is not None and _is_exploded_sheet(sheet):
foreigns = _get_foreign_key_columns(sheet)
for index, path in ((f.index, f.path) for f in foreigns.values()):
value = context[path]
sheet.cell(row=insert_row+1, column=index+1, value=value)
return None
def fill_template(data: Union[dict, str, TextIO], template: str, result: str,
validation_schema: Union[dict, str, TextIO] = None) -> None:
"""Insert json data into an xlsx file, according to a template.
This function fills the json data into the template stored at ``template`` and stores the result as
``result``.
Parameters
----------
data: Union[dict, str, TextIO]
The data, given as Python dict, path to a file or a file-like object.
template: str
Path to the XLSX template.
result: str
Path for the result XLSX.
validation_schema: dict, optional
If given, validate the date against this schema first. This raises an exception if the validation
fails.
"""
data = _read_or_dict(data)
assert isinstance(data, dict)
# Validation
if validation_schema is not None:
validation_schema = _read_or_dict(validation_schema)
validate(data, validation_schema, format_checker=FormatChecker())
# Filling the data
result_wb = load_workbook(template)
template_filler = TemplateFiller(result_wb)
template_filler.fill_data(data=data)
parentpath = pathlib.Path(result).parent
parentpath.mkdir(parents=True, exist_ok=True)
result_wb.save(result)
Loading