Skip to content
Snippets Groups Projects

xlsx -> json conversion

Merged Daniel Hornung requested to merge f-convert-xlsx-to-json into dev
10 files
+ 293
42
Compare changes
  • Side-by-side
  • Inline
Files
10
@@ -22,12 +22,16 @@
from __future__ import annotations
import datetime
import itertools
import sys
from functools import reduce
from operator import getitem
from types import SimpleNamespace
from typing import Any, BinaryIO, Callable, TextIO, Union
from warnings import warn
from jsonschema import validate, ValidationError
import jsonschema
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
@@ -42,6 +46,12 @@ def _strict_bool(value: Any) -> bool:
raise TypeError(f"Not a good boolean: {repr(value)}")
class ForeignError(KeyError):
def __init__(self, *args, definitions: list, message: str = ""):
super().__init__(message, *args)
self.definitions = definitions
class XLSXConverter:
"""Class for conversion from XLSX to JSON.
@@ -56,7 +66,8 @@ documentation.
"boolean": _strict_bool,
}
def __init__(self, xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO]):
def __init__(self, xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO],
strict: bool = False):
"""
Parameters
----------
@@ -65,16 +76,31 @@ xlsx: Union[str, BinaryIO]
schema: Union[dict, str, TextIO]
Schema for validation of XLSX content.
strict: bool, optional
If True, fail faster.
"""
self._workbook = load_workbook(xlsx)
self._schema = read_or_dict(schema)
self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook)
self._check_columns(fail_fast=strict)
self._handled_sheets: set[str] = set()
self._result: dict = {}
self._errors: dict = {}
def to_dict(self) -> dict:
def to_dict(self, validate: bool = False, collect_errors: bool = True) -> dict:
"""Convert the xlsx contents to a dict.
Parameters
----------
validate: bool, optional
If True, validate the result against the schema.
collect_errors: bool, optional
If True, do not fail at the first error, but try to collect as many errors as possible. After an
Exception is raised, the errors can be collected with ``get_errors()`` and printed with
``get_error_str()``.
Returns
-------
out: dict
@@ -82,12 +108,73 @@ out: dict
"""
self._handled_sheets = set()
self._result = {}
self._errors = {}
for sheetname in self._workbook.sheetnames:
if sheetname not in self._handled_sheets:
self._handle_sheet(self._workbook[sheetname])
self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors)
if validate:
jsonschema.validate(self._result, self._schema)
if self._errors:
raise RuntimeError("There were error while handling the XLSX file.")
return self._result
def _handle_sheet(self, sheet: Worksheet) -> None:
def get_errors(self) -> dict:
"""Return a dict with collected errors."""
return self._errors
def get_error_str(self) -> str:
"""Return a beautiful string with the collected errors."""
result = ""
for loc, value in self._errors.items():
result += f"Sheet: {loc[0]}\tRow: {loc[1] + 1}\n"
for item in value:
result += f"\t\t{item[:-1]}:\t{item[-1]}\n"
return result
def _check_columns(self, fail_fast: bool = False):
"""Check if the columns correspond to the schema."""
def missing(path):
message = f"Missing column: {xlsx_utils.p2s(path)}"
if fail_fast:
raise ValueError(message)
else:
warn(message)
for sheetname in self._workbook.sheetnames:
sheet = self._workbook[sheetname]
parents: dict = {}
col_paths = []
for col in xlsx_utils.get_data_columns(sheet).values():
parents[xlsx_utils.p2s(col.path[:-1])] = col.path[:-1]
col_paths.append(col.path)
for path in parents.values():
subschema = xlsx_utils.get_subschema(path, self._schema)
# Unfortunately, there are a lot of special cases to handle here.
if subschema.get("type") == "array":
subschema = subschema["items"]
if "enum" in subschema: # Was handled in parent level already
continue
for child, content in subschema["properties"].items():
child_path = path + [child]
if content == {'type': 'string', 'format': 'data-url'}:
continue # skip files
if content.get("type") == "array" and (
content.get("items").get("type") == "object"):
if child_path not in itertools.chain(*self._defining_path_index.values()):
missing(child_path)
elif content.get("type") == "array" and "enum" in content.get("items", []) and (
content.get("uniqueItems") is True):
# multiple choice
for choice in content["items"]["enum"]:
if child_path + [choice] not in col_paths:
missing(child_path + [choice])
elif content.get("type") == "object":
pass
else:
if child_path not in col_paths:
missing(child_path)
def _handle_sheet(self, sheet: Worksheet, fail_later: bool = False) -> None:
"""Add the contents of the sheet to the result (stored in ``self._result``).
Each row in the sheet corresponds to one entry in an array in the result. Which array exactly is
@@ -95,6 +182,11 @@ defined by the sheet's "proper name" and the content of the foreign columns.
Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name".
Parameters
----------
fail_later: bool, optional
If True, do not fail with unresolvable foreign definitions, but collect all errors.
"""
row_type_column = xlsx_utils.get_row_type_column_index(sheet)
foreign_columns = xlsx_utils.get_foreign_key_columns(sheet)
@@ -106,7 +198,7 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na
if parent:
parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index)
if parent_sheetname not in self._handled_sheets:
self._handle_sheet(self._workbook[parent_sheetname])
self._handle_sheet(self._workbook[parent_sheetname], fail_later=fail_later)
# # We save single entries in lists, indexed by their foreign key contents. Each entry
# # consists of:
@@ -114,7 +206,7 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na
# # - data: The actual data of this entry, a dict.
# entries: dict[str, list[SimpleNamespace]] = {}
for row in sheet.iter_rows(values_only=True):
for row_idx, row in enumerate(sheet.iter_rows(values_only=True)):
# Skip non-data rows.
if row[row_type_column] is not None:
continue
@@ -147,13 +239,18 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na
_set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1)
continue
# Find current position in tree
parent_dict = self._get_parent_dict(parent_path=parent, foreign=foreign)
# Append data to current position's list
if proper_name not in parent_dict:
parent_dict[proper_name] = []
parent_dict[proper_name].append(data)
try:
# Find current position in tree
parent_dict = self._get_parent_dict(parent_path=parent, foreign=foreign)
# Append data to current position's list
if proper_name not in parent_dict:
parent_dict[proper_name] = []
parent_dict[proper_name].append(data)
except ForeignError as kerr:
if not fail_later:
raise
self._errors[(sheet.title, row_idx)] = kerr.definitions
self._handled_sheets.add(sheet.title)
def _is_multiple_choice(self, path: list[str]) -> bool:
@@ -187,7 +284,12 @@ the values given in the ``foreign`` specification.
current_object = cand
break
else:
raise KeyError("Cannot find an element which matches the foreign definitions")
message = f"Cannot find an element at {parent_path} for these foreign defs:\n"
for name, value in group.definitions:
message += f" {name}: {value}\n"
print(message, file=sys.stderr)
error = ForeignError(definitions=group.definitions, message=message)
raise error
assert isinstance(current_object, dict)
return current_object
@@ -208,8 +310,16 @@ This includes:
values = [self.PARSER[array_type](v) for v in value.split(";")]
return values
try:
validate(value, subschema)
except ValidationError as verr:
# special case: datetime or date
if ("anyOf" in subschema):
if isinstance(value, datetime.datetime) and (
{'type': 'string', 'format': 'date-time'} in subschema["anyOf"]):
return value
if isinstance(value, datetime.date) and (
{'type': 'string', 'format': 'date'} in subschema["anyOf"]):
return value
jsonschema.validate(value, subschema)
except jsonschema.ValidationError as verr:
print(verr)
print(path)
raise
@@ -217,23 +327,14 @@ This includes:
# Finally: convert to target type
return self.PARSER[subschema.get("type", "string")](value)
def _get_subschema(self, path: list[str], schema: Union[dict, list] = None) -> dict:
def _get_subschema(self, path: list[str], schema: dict = None) -> dict:
"""Return the sub schema at ``path``."""
if schema is None:
schema = self._schema
assert schema is not None
assert isinstance(schema, dict)
if path:
if schema["type"] == "object":
next_schema = schema["properties"][path[0]]
return self._get_subschema(path=path[1:], schema=next_schema)
if schema["type"] == "array":
items = schema["items"]
if "enum" in items:
return schema
next_schema = items["properties"][path[0]]
return self._get_subschema(path=path[1:], schema=next_schema)
return schema
return xlsx_utils.get_subschema(path, schema)
def _group_foreign_paths(foreign: list[list], common: list[str]) -> list[SimpleNamespace]:
@@ -368,7 +469,8 @@ mydict: dict
return mydict
def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO]) -> dict:
def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO],
validate: bool = None, strict: bool = False) -> dict:
"""Convert the xlsx contents to a dict, it must follow a schema.
Parameters
@@ -379,10 +481,17 @@ xlsx: Union[str, BinaryIO]
schema: Union[dict, str, TextIO]
Schema for validation of XLSX content.
validate: bool, optional
If True, validate the result against the schema.
strict: bool, optional
If True, fail faster.
Returns
-------
out: dict
A dict representing the JSON with the extracted data.
"""
converter = XLSXConverter(xlsx, schema)
converter = XLSXConverter(xlsx, schema, strict=strict)
return converter.to_dict()
Loading