Skip to content
Snippets Groups Projects

XLSX-Konverter: Bessere Fehlermeldung bei inkorrektem Typ in Spalte, zusätzlicher Spalte

Merged I. Nüske requested to merge f-xlsx-converter-error-messages into dev
Files
5
@@ -25,10 +25,11 @@ from __future__ import annotations
import datetime
import itertools
import sys
import textwrap
from functools import reduce
from operator import getitem
from types import SimpleNamespace
from typing import Any, BinaryIO, Callable, TextIO, Union
from typing import Any, BinaryIO, Callable, TextIO, Union, Optional
from warnings import warn
import jsonschema
@@ -46,6 +47,105 @@ def _strict_bool(value: Any) -> bool:
raise TypeError(f"Not a good boolean: {repr(value)}")
def _column_id_to_chars(num):
"""Converts a column id (zero based) to the corresponding string
representation, e.g. 0 -> 'A', 97 -> 'CT'"""
if num < 0:
return ""
return _column_id_to_chars(int(num / 26) - 1) + chr(int(num % 26) + 65)
def _format_exception_table(exceptions: list[tuple], worksheet_title: str,
column_names: Optional[Union[dict, list]] = None,
max_line_length: int = 120) -> str:
"""
Given a list of tuples containing a row and column number as well as an
exception in that order, and the title of the current worksheet, returns
a formatted table of the exceptions.
Optionally takes a dict of column names, if given a header will be
generated for each column and exceptions will be clustered by column.
Default line length is 120 and can be overwritten by max_line_length.
Params
------
exceptions: list of tuples containing row, column, and exception
Data to be formatted
worksheet_title: str
Name of the current worksheet
column_names: dict or list, optional
column_names[column_num] should return the name of
column column_names.
If given, exceptions will be clustered by column.
max_line_length: int, default=120
Soft cap for the line length of the resulting table
Return
------
string_rep: str
Table containing the given exceptions
"""
max_line_length -= 40 # Estimate of Field + Type space use
headers = {"loc": "Location", "type": "Error Type", "mess": ["Message"]}
lengths = {key: len(headers[key]) for key in headers}
new_data = []
current_column = None
exceptions.sort(key=lambda tup: tup[1])
for row_i, col_i, excep in exceptions:
if column_names is not None:
# Add a line with information about the current column
if current_column != col_i:
current_column = col_i
new_data.append({
"loc": f"\nErrors in column '{column_names[col_i]}':",
"type": "", "mess": [""]
})
# Setup for current Exception
curr_err_data = {}
new_data.append(curr_err_data)
# Get field
if isinstance(row_i, int):
curr_err_data["loc"] = f"Cell {_column_id_to_chars(col_i)}{row_i + 1}"
else:
curr_err_data["loc"] = f"Column {_column_id_to_chars(col_i)}"
lengths["loc"] = max(lengths["loc"], len(curr_err_data["loc"]))
# Add error code
curr_err_data["type"] = type(excep).__name__
lengths["type"] = max(lengths["type"], len(curr_err_data["type"]))
# Format message - split into lines
lines = str(excep).split('\n')
new_lines = []
for line in lines:
new_lines += textwrap.wrap(line, max_line_length, break_long_words=False)
for line in new_lines:
lengths["mess"] = max(lengths["mess"], len(line))
if new_lines == []:
new_lines = [""]
curr_err_data["mess"] = new_lines
# Generate underline for each header
dividers = {key: '' * l for key, l in lengths.items()}
dividers["mess"] = [dividers["mess"]]
# Fill with spaces for alignment
string_rep = f"There were errors during the validation of worksheet '{worksheet_title}':\n\n"
for curr_err_data in [headers, dividers] + new_data:
string_rep += ' {loc: <{fill}} '.format(loc=curr_err_data["loc"],
fill=lengths["loc"])
string_rep += ' {typ: <{fill}} '.format(typ=curr_err_data["type"],
fill=lengths["type"])
# Fill for the messages is set to 0, if we want another column or align
# right we need to use lengths["mess"]
string_rep += ' {mes: <{fill}}\n'.format(mes=curr_err_data["mess"][0], fill=0)
for line in curr_err_data["mess"][1:]:
# Front padding for lines without location and error type
string_rep += ' ' * (lengths["loc"] + lengths["type"] + 6)
string_rep += ' {mes: <{fill}}\n'.format(mes=line, fill=0)
return string_rep
class ForeignError(KeyError):
def __init__(self, *args, definitions: list, message: str = ""):
super().__init__(message, *args)
@@ -55,10 +155,9 @@ class ForeignError(KeyError):
class XLSXConverter:
"""Class for conversion from XLSX to JSON.
For a detailed description of the required formatting of the XLSX files, see ``specs.md`` in the
documentation.
For a detailed description of the required formatting of the XLSX files, see ``specs.md`` in the
documentation.
"""
PARSER: dict[str, Callable] = {
"string": str,
"number": float,
@@ -69,21 +168,26 @@ documentation.
def __init__(self, xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO],
strict: bool = False):
"""
Parameters
----------
xlsx: Union[str, BinaryIO]
Path to the XLSX file or opened file object.
Parameters
----------
xlsx: Union[str, BinaryIO]
Path to the XLSX file or opened file object.
schema: Union[dict, str, TextIO]
Schema for validation of XLSX content.
schema: Union[dict, str, TextIO]
Schema for validation of XLSX content.
strict: bool, optional
If True, fail faster.
"""
strict: bool, optional
If True, fail faster.
"""
self._workbook = load_workbook(xlsx)
self._schema = read_or_dict(schema)
self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook)
self._check_columns(fail_fast=strict)
try:
self._check_columns(fail_fast=strict)
except KeyError as e:
raise jsonschema.ValidationError(f"Malformed metadata: Cannot parse paths. "
f"Unknown path: '{e.args[1]}' in sheet '{e.args[0]}'."
) from e
self._handled_sheets: set[str] = set()
self._result: dict = {}
self._errors: dict = {}
@@ -91,27 +195,47 @@ strict: bool, optional
def to_dict(self, validate: bool = False, collect_errors: bool = True) -> dict:
"""Convert the xlsx contents to a dict.
Parameters
----------
validate: bool, optional
If True, validate the result against the schema.
Parameters
----------
validate: bool, optional
If True, validate the result against the schema.
collect_errors: bool, optional
If True, do not fail at the first error, but try to collect as many errors as possible. After an
Exception is raised, the errors can be collected with ``get_errors()`` and printed with
``get_error_str()``.
collect_errors: bool, optional
If True, do not fail at the first error, but try to collect as many errors as possible. After an
Exception is raised, the errors can be collected with ``get_errors()`` and printed with
``get_error_str()``.
Returns
-------
out: dict
A dict representing the JSON with the extracted data.
Returns
-------
out: dict
A dict representing the JSON with the extracted data.
"""
self._handled_sheets = set()
self._result = {}
self._errors = {}
for sheetname in self._workbook.sheetnames:
if sheetname not in self._handled_sheets:
self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors)
if not collect_errors:
for sheetname in self._workbook.sheetnames:
if sheetname not in self._handled_sheets:
self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors)
else:
# Collect errors from converting
exceptions = []
for sheetname in self._workbook.sheetnames:
if sheetname not in self._handled_sheets:
try:
self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors)
except jsonschema.ValidationError as e:
exceptions.append(e)
# do not collect errors from sheet again
self._handled_sheets.add(sheetname)
if len(exceptions) == 1:
raise exceptions[0]
elif len(exceptions) > 1:
mess = "There were errors during the validation of several worksheets:\n\n"
mess += '\n\n'.join([str(e).replace("There were errors during the validation of worksheet",
"In worksheet")
for e in exceptions])
raise jsonschema.ValidationError(mess)
if validate:
jsonschema.validate(self._result, self._schema)
if self._errors:
@@ -147,8 +271,11 @@ out: dict
parents[xlsx_utils.p2s(col.path[:-1])] = col.path[:-1]
col_paths.append(col.path)
for path in parents.values():
subschema = xlsx_utils.get_subschema(path, self._schema)
try:
subschema = xlsx_utils.get_subschema(path, self._schema)
except KeyError as kerr:
kerr.args = (sheetname, *kerr.args)
raise
# Unfortunately, there are a lot of special cases to handle here.
if subschema.get("type") == "array":
subschema = subschema["items"]
@@ -177,24 +304,29 @@ out: dict
def _handle_sheet(self, sheet: Worksheet, fail_later: bool = False) -> None:
"""Add the contents of the sheet to the result (stored in ``self._result``).
Each row in the sheet corresponds to one entry in an array in the result. Which array exactly is
defined by the sheet's "proper name" and the content of the foreign columns.
Each row in the sheet corresponds to one entry in an array in the result. Which array exactly is
defined by the sheet's "proper name" and the content of the foreign columns.
Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name".
Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name".
Parameters
----------
fail_later: bool, optional
If True, do not fail with unresolvable foreign definitions, but collect all errors.
"""
Parameters
----------
fail_later: bool, optional
If True, do not fail with unresolvable foreign definitions, but collect all errors.
"""
row_type_column = xlsx_utils.get_row_type_column_index(sheet)
col_type_row = xlsx_utils.get_column_type_row_index(sheet)
foreign_columns = xlsx_utils.get_foreign_key_columns(sheet)
foreign_column_paths = {col.index: col.path for col in foreign_columns.values()}
data_columns = xlsx_utils.get_data_columns(sheet)
data_column_paths = {col.index: col.path for col in data_columns.values()}
# Parent path, insert in correct order.
parent, proper_name = xlsx_utils.get_path_position(sheet)
try:
parent, proper_name = xlsx_utils.get_path_position(sheet)
except UnboundLocalError as e:
raise jsonschema.ValidationError(f"Malformed metadata: Cannot parse "
f"paths in worksheet '{sheet.title}'.") from e
if parent:
parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index)
if parent_sheetname not in self._handled_sheets:
@@ -206,8 +338,11 @@ fail_later: bool, optional
# # - data: The actual data of this entry, a dict.
# entries: dict[str, list[SimpleNamespace]] = {}
exceptions = []
warns = []
col_names = {}
for row_idx, row in enumerate(sheet.iter_rows(values_only=True)):
# Skip non-data rows.
# Skip non-data rows
if row[row_type_column] is not None:
continue
foreign_repr = ""
@@ -220,24 +355,41 @@ fail_later: bool, optional
foreign.append(foreign_column_paths[col_idx] + [value])
continue
if col_idx in data_column_paths:
path = data_column_paths[col_idx]
if self._is_multiple_choice(path):
real_value = path.pop() # Last component is the enum value, insert above
# set up list
try:
_set_in_nested(mydict=data, path=path, value=[], prefix=parent, skip=1)
except ValueError as err:
if not str(err).startswith("There is already some value at"):
raise
if not xlsx_utils.parse_multiple_choice(value):
continue
_set_in_nested(mydict=data, path=path, value=real_value, prefix=parent,
skip=1, append_to_list=True)
try:
if col_idx in data_column_paths:
path = data_column_paths[col_idx]
col_names[col_idx] = '.'.join(path)
if self._is_multiple_choice(path):
real_value = path.pop() # Last component is the enum value, insert above
# set up list
try:
_set_in_nested(mydict=data, path=path, value=[], prefix=parent, skip=1)
except ValueError as err:
if not str(err).startswith("There is already some value at"):
raise
if not xlsx_utils.parse_multiple_choice(value):
continue
_set_in_nested(mydict=data, path=path, value=real_value, prefix=parent,
skip=1, append_to_list=True)
else:
value = self._validate_and_convert(value, path)
_set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1)
continue
elif sheet.cell(col_type_row+1, col_idx+1).value is None:
mess = (f"\nNo metadata configured for column "
f"'{_column_id_to_chars(col_idx)}' in worksheet "
f"'{sheet.title}'.\n")
if mess not in warns:
print(mess, file=sys.stderr)
warns.append(mess) # Prevent multiple instances of same warning
except (ValueError, KeyError, jsonschema.ValidationError) as e:
# Append error for entire column only once
if isinstance(e, KeyError) and 'column' in str(e):
if len([err for ri, ci, err in exceptions
if ci == col_idx and isinstance(err, KeyError)]) == 0:
exceptions.append((None, col_idx, e))
else:
value = self._validate_and_convert(value, path)
_set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1)
continue
exceptions.append((row_idx, col_idx, e))
try:
# Find current position in tree
@@ -251,6 +403,12 @@ fail_later: bool, optional
if not fail_later:
raise
self._errors[(sheet.title, row_idx)] = kerr.definitions
if exceptions:
exception_table = _format_exception_table(exceptions, sheet.title,
col_names)
raise jsonschema.ValidationError(exception_table)
self._handled_sheets.add(sheet.title)
def _is_multiple_choice(self, path: list[str]) -> bool:
@@ -267,9 +425,9 @@ fail_later: bool, optional
def _get_parent_dict(self, parent_path: list[str], foreign: list[list]) -> dict:
"""Return the dict into which values can be inserted.
This method returns, from the current result-in-making, the entry at ``parent_path`` which matches
the values given in the ``foreign`` specification.
"""
This method returns, from the current result-in-making, the entry at ``parent_path`` which matches
the values given in the ``foreign`` specification.
"""
foreign_groups = _group_foreign_paths(foreign, common=parent_path)
current_object = self._result
@@ -296,33 +454,31 @@ the values given in the ``foreign`` specification.
def _validate_and_convert(self, value: Any, path: list[str]):
"""Apply some basic validation and conversion steps.
This includes:
- Validation against the type given in the schema
- List typed values are split at semicolons and validated individually
This includes:
- Validation against the type given in the schema
- List typed values are split at semicolons and validated individually
"""
if value is None:
return value
subschema = self._get_subschema(path)
try:
subschema = self._get_subschema(path)
except KeyError as e:
raise KeyError("There is no entry in the schema that corresponds to this column.") from e
# Array handling only if schema says it's an array.
if subschema.get("type") == "array":
array_type = subschema["items"]["type"]
if isinstance(value, str) and ";" in value:
values = [self.PARSER[array_type](v) for v in value.split(";")]
return values
try:
# special case: datetime or date
if ("anyOf" in subschema):
if isinstance(value, datetime.datetime) and (
{'type': 'string', 'format': 'date-time'} in subschema["anyOf"]):
return value
if isinstance(value, datetime.date) and (
{'type': 'string', 'format': 'date'} in subschema["anyOf"]):
return value
jsonschema.validate(value, subschema)
except jsonschema.ValidationError as verr:
print(verr)
print(path)
raise
# special case: datetime or date
if ("anyOf" in subschema):
if isinstance(value, datetime.datetime) and (
{'type': 'string', 'format': 'date-time'} in subschema["anyOf"]):
return value
if isinstance(value, datetime.date) and (
{'type': 'string', 'format': 'date'} in subschema["anyOf"]):
return value
jsonschema.validate(value, subschema)
# Finally: convert to target type
return self.PARSER[subschema.get("type", "string")](value)
@@ -340,29 +496,29 @@ This includes:
def _group_foreign_paths(foreign: list[list], common: list[str]) -> list[SimpleNamespace]:
"""Group the foreign keys by their base paths.
Parameters
----------
foreign: list[list]
A list of foreign definitions, consisting of path components, property and possibly value.
common: list[list[str]]
A common path which defines the final target of the foreign definitions. This helps to understand
where the ``foreign`` paths shall be split.
Returns
-------
out: list[dict[str, list[list]]]
A list of foreign path segments, grouped by their common segments. Each element is a namespace
with detailed information of all those elements which form the group. The namespace has the
following attributes:
- ``path``: The full path to this path segment. This is always the previous segment's ``path``
plus this segment's ``subpath``.
- ``stringpath``: The stringified ``path``, might be useful for comparison or sorting.
- ``subpath``: The path, relative from the previous segment.
- ``definitions``: A list of the foreign definitions for this segment, but stripped of the
``path`` components.
Parameters
----------
foreign: list[list]
A list of foreign definitions, consisting of path components, property and possibly value.
common: list[list[str]]
A common path which defines the final target of the foreign definitions. This helps to understand
where the ``foreign`` paths shall be split.
Returns
-------
out: list[dict[str, list[list]]]
A list of foreign path segments, grouped by their common segments. Each element is a namespace
with detailed information of all those elements which form the group. The namespace has the
following attributes:
- ``path``: The full path to this path segment. This is always the previous segment's ``path``
plus this segment's ``subpath``.
- ``stringpath``: The stringified ``path``, might be useful for comparison or sorting.
- ``subpath``: The path, relative from the previous segment.
- ``definitions``: A list of the foreign definitions for this segment, but stripped of the
``path`` components.
"""
# Build a simple dict first, without subpath.
results = {}
@@ -392,9 +548,6 @@ out: list[dict[str, list[list]]]
last_level = len(elem.path)
resultlist.append(elem)
# from IPython import embed
# embed()
if last_level != len(common):
raise ValueError("Foreign keys must cover the complete `common` depth.")
return resultlist
@@ -405,31 +558,31 @@ def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], skip
overwrite: bool = False, append_to_list: bool = False) -> dict:
"""Set a value in a nested dict.
Parameters
----------
mydict: dict
The dict into which the ``value`` shall be inserted.
path: list
A list of keys, denoting the location of the value.
value
The value which shall be set inside the dict.
prefix: list
A list of keys which shall be removed from ``path``. A KeyError is raised if ``path`` does not
start with the elements of ``prefix``.
skip: int = 0
Remove this many additional levels from the path, *after* removing the prefix.
overwrite: bool = False
If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values
leads to an exception.
append_to_list: bool = False
If True, assume that the element at ``path`` is a list and append the value to it. If the list
does not exist, create it. If there is a non-list at ``path`` already, overwrite it with a new
list, if ``overwrite`` is True, otherwise raise a ValueError.
Returns
-------
mydict: dict
The same dictionary that was given as a parameter, but modified.
Parameters
----------
mydict: dict
The dict into which the ``value`` shall be inserted.
path: list
A list of keys, denoting the location of the value.
value
The value which shall be set inside the dict.
prefix: list
A list of keys which shall be removed from ``path``. A KeyError is raised if ``path`` does not
start with the elements of ``prefix``.
skip: int = 0
Remove this many additional levels from the path, *after* removing the prefix.
overwrite: bool = False
If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values
leads to an exception.
append_to_list: bool = False
If True, assume that the element at ``path`` is a list and append the value to it. If the list
does not exist, create it. If there is a non-list at ``path`` already, overwrite it with a new
list, if ``overwrite`` is True, otherwise raise a ValueError.
Returns
-------
mydict: dict
The same dictionary that was given as a parameter, but modified.
"""
for idx, el in enumerate(prefix):
if path[idx] != el:
@@ -473,25 +626,25 @@ def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO],
validate: bool = None, strict: bool = False) -> dict:
"""Convert the xlsx contents to a dict, it must follow a schema.
Parameters
----------
xlsx: Union[str, BinaryIO]
Path to the XLSX file or opened file object.
Parameters
----------
xlsx: Union[str, BinaryIO]
Path to the XLSX file or opened file object.
schema: Union[dict, str, TextIO]
Schema for validation of XLSX content.
schema: Union[dict, str, TextIO]
Schema for validation of XLSX content.
validate: bool, optional
If True, validate the result against the schema.
validate: bool, optional
If True, validate the result against the schema.
strict: bool, optional
If True, fail faster.
strict: bool, optional
If True, fail faster.
Returns
-------
out: dict
A dict representing the JSON with the extracted data.
Returns
-------
out: dict
A dict representing the JSON with the extracted data.
"""
converter = XLSXConverter(xlsx, schema, strict=strict)
return converter.to_dict()
Loading