Skip to content
Snippets Groups Projects

Filling XLSX: Everything except multiple choice.

Merged Daniel Hornung requested to merge f-json-table into dev
All threads resolved!
Compare and Show latest version
16 files
+ 356
77
Compare changes
  • Side-by-side
  • Inline
Files
16
@@ -23,12 +23,16 @@
from __future__ import annotations
import json
import pathlib
from collections import OrderedDict
from types import SimpleNamespace
from typing import Any, Dict, List, Optional, Union, TextIO
from typing import Any, Dict, List, Optional, TextIO, Union
from warnings import warn
from jsonschema import FormatChecker, validate
from openpyxl import load_workbook, Workbook
from jsonschema.exceptions import ValidationError
from openpyxl import Workbook, load_workbook
from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE
from openpyxl.worksheet.worksheet import Worksheet
from .table_generator import ColumnType, RowType
@@ -52,11 +56,10 @@ def _get_column_types(sheet: Worksheet) -> OrderedDict:
type_row_index = _get_row_type_column_index(sheet)
for idx, col in enumerate(sheet.columns):
type_cell = col[type_row_index]
result[idx] = type_cell.value
assert (hasattr(ColumnType, type_cell.value)
or type_cell.value == RowType.COL_TYPE.name
or type_cell.value is None), (
f"Unexpected column type value: {type_cell.value}")
result[idx] = type_cell.value if type_cell.value is not None else ColumnType.IGNORE.name
assert (hasattr(ColumnType, result[idx])
or result[idx] == RowType.COL_TYPE.name), (
f"Unexpected column type value ({idx}{type_row_index}): {type_cell.value}")
return result
@@ -133,8 +136,9 @@ def _read_or_dict(data: Union[dict, str, TextIO]) -> dict:
class TemplateFiller:
"""Class to fill XLSX templates. Has an index for all relevant columns."""
def __init__(self, workbook: Workbook):
def __init__(self, workbook: Workbook, graceful: bool = False):
self._workbook = workbook
self._graceful = graceful
self._create_index()
@property
@@ -182,10 +186,19 @@ class TemplateFiller:
self._props[fullpath] = value
def fill_from_data(self, data: Dict[str, Any]):
# TODO recursive for dicts and list?
"""Fill current level with all scalar elements of ``data``."""
for name, value in data.items():
if not isinstance(value, (dict, list)):
self[name] = value
elif isinstance(value, dict):
if not value or isinstance(list(value.items())[0], list):
continue
old_path = self._current_path
new_path = self._current_path.copy() + [name]
self._current_path = new_path
self.fill_from_data(data=value)
self._current_path = old_path
def _create_index(self):
"""Create a sheet index for the workbook.
@@ -263,20 +276,24 @@ out: union[dict, None]
insertables: Dict[str, Any] = {}
for name, content in data.items():
# TODO is this the best way to do it????
if name == "file":
continue
path = current_path + [name]
next_context = context.next_level(name)
# preprocessing
if isinstance(content, list):
if not content:
if not content: # empty list
continue
# Must be all of the same type.
# List elements must be all of the same type.
assert len(set(type(entry) for entry in content)) == 1
if isinstance(content[0], dict):
if isinstance(content[0], dict): # all elements are dicts
# An array of objects: must go into exploded sheet
for entry in content:
self._handle_data(data=entry, current_path=path, context=next_context)
continue
elif isinstance(content, dict):
elif isinstance(content, dict): # we recurse and simply use the result
if not current_path: # Special handling for top level
self._handle_data(content, current_path=path, context=next_context)
continue
@@ -287,26 +304,32 @@ out: union[dict, None]
insertables.update(insert)
continue
else: # scalars
content = [content]
content = [content] # make list for unified treatment below
# collecting the data
assert isinstance(content, list)
if len(content) == 1:
value = content[0]
if len(content) > 1:
content = [ILLEGAL_CHARACTERS_RE.sub("", str(x)) for x in content]
value = ";".join(content) # TODO we need escaping of values
else:
value = ";".join(content)
value = content[0]
if isinstance(value, str):
value = ILLEGAL_CHARACTERS_RE.sub("", value)
path_str = p2s(path)
assert path_str not in insertables
insertables[path_str] = value
if only_collect_insertables:
return insertables
if not current_path:
if not current_path: # Top level returns, because there are only sheets for the children.
return None
# actual data insertion
insert_row = None
sheet = None
for path_str, value in insertables.items():
if self._graceful and path_str not in self._sheet_index:
warn(f"Ignoring path with missing sheet index: {path_str}")
continue
sheet_meta = self._sheet_index[path_str]
if sheet is None:
sheet = sheet_meta.sheet
@@ -319,7 +342,11 @@ out: union[dict, None]
# Insert foreign keys
if insert_row is not None and sheet is not None and _is_exploded_sheet(sheet):
foreigns = _get_foreign_key_columns(sheet)
try:
foreigns = _get_foreign_key_columns(sheet)
except ValueError:
print(f"Sheet: {sheet}")
raise
for index, path in ((f.index, f.path) for f in foreigns.values()):
value = context[path]
sheet.cell(row=insert_row+1, column=index+1, value=value)
@@ -344,7 +371,8 @@ result: str
Path for the result XLSX.
validation_schema: dict, optional
If given, validate the date against this schema first. This raises an exception if the validation
fails.
fails. If no validation schema is given, try to ignore more errors in the data when filling the
XLSX template.
"""
data = _read_or_dict(data)
assert isinstance(data, dict)
@@ -352,10 +380,19 @@ validation_schema: dict, optional
# Validation
if validation_schema is not None:
validation_schema = _read_or_dict(validation_schema)
validate(data, validation_schema, format_checker=FormatChecker())
try:
validate(data, validation_schema, format_checker=FormatChecker())
except ValidationError as ve:
print(ve.message)
raise ve
else:
print("No validation schema given, continue at your own risk.")
# Filling the data
result_wb = load_workbook(template)
template_filler = TemplateFiller(result_wb)
template_filler = TemplateFiller(result_wb, graceful=(validation_schema is None))
template_filler.fill_data(data=data)
parentpath = pathlib.Path(result).parent
parentpath.mkdir(parents=True, exist_ok=True)
result_wb.save(result)
Loading