Skip to content
Snippets Groups Projects
Verified Commit 3d9adf32 authored by Daniel Hornung's avatar Daniel Hornung
Browse files

ENH: checking of foreign keys and missing columns.

parent 57cd92c4
No related branches found
No related tags found
2 merge requests!107Release v0.11.0,!103xlsx -> json conversion
...@@ -22,12 +22,16 @@ ...@@ -22,12 +22,16 @@
from __future__ import annotations from __future__ import annotations
import datetime
import itertools
import sys
from functools import reduce from functools import reduce
from operator import getitem from operator import getitem
from types import SimpleNamespace from types import SimpleNamespace
from typing import Any, BinaryIO, Callable, TextIO, Union from typing import Any, BinaryIO, Callable, TextIO, Union
from warnings import warn
from jsonschema import validate, ValidationError import jsonschema
from openpyxl import load_workbook from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet from openpyxl.worksheet.worksheet import Worksheet
...@@ -42,6 +46,12 @@ def _strict_bool(value: Any) -> bool: ...@@ -42,6 +46,12 @@ def _strict_bool(value: Any) -> bool:
raise TypeError(f"Not a good boolean: {repr(value)}") raise TypeError(f"Not a good boolean: {repr(value)}")
class ForeignError(KeyError):
def __init__(self, *args, definitions: list, message: str = ""):
super().__init__(message, *args)
self.definitions = definitions
class XLSXConverter: class XLSXConverter:
"""Class for conversion from XLSX to JSON. """Class for conversion from XLSX to JSON.
...@@ -56,7 +66,8 @@ documentation. ...@@ -56,7 +66,8 @@ documentation.
"boolean": _strict_bool, "boolean": _strict_bool,
} }
def __init__(self, xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO]): def __init__(self, xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO],
strict: bool = False):
""" """
Parameters Parameters
---------- ----------
...@@ -65,16 +76,29 @@ xlsx: Union[str, BinaryIO] ...@@ -65,16 +76,29 @@ xlsx: Union[str, BinaryIO]
schema: Union[dict, str, TextIO] schema: Union[dict, str, TextIO]
Schema for validation of XLSX content. Schema for validation of XLSX content.
strict: bool, optional
If True, fail faster.
""" """
self._workbook = load_workbook(xlsx) self._workbook = load_workbook(xlsx)
self._schema = read_or_dict(schema) self._schema = read_or_dict(schema)
self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook) self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook)
self._check_columns(fail_fast=strict)
self._handled_sheets: set[str] = set() self._handled_sheets: set[str] = set()
self._result: dict = {} self._result: dict = {}
self._errors: dict = {}
def to_dict(self) -> dict: def to_dict(self, validate: bool = False, collect_errors: bool = True) -> dict:
"""Convert the xlsx contents to a dict. """Convert the xlsx contents to a dict.
Parameters
----------
validate: bool, optional
If True, validate the result against the schema.
collect_errors: bool, optional
If True, do not fail at the first error, but try to collect as many errors as possible.
Returns Returns
------- -------
out: dict out: dict
...@@ -82,12 +106,62 @@ out: dict ...@@ -82,12 +106,62 @@ out: dict
""" """
self._handled_sheets = set() self._handled_sheets = set()
self._result = {} self._result = {}
self._errors = {}
for sheetname in self._workbook.sheetnames: for sheetname in self._workbook.sheetnames:
if sheetname not in self._handled_sheets: if sheetname not in self._handled_sheets:
self._handle_sheet(self._workbook[sheetname]) self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors)
if validate:
jsonschema.validate(self._result, self._schema)
if self._errors:
raise RuntimeError("There were error while handling the XLSX file.")
return self._result return self._result
def _handle_sheet(self, sheet: Worksheet) -> None: def get_errors(self) -> dict:
"""Return a dict with collected errors."""
return self._errors
def _check_columns(self, fail_fast: bool = False):
"""Check if the columns correspond to the schema."""
def missing(path):
message = f"Missing column: {xlsx_utils.p2s(path)}"
if fail_fast:
raise ValueError(message)
else:
warn(message)
for sheetname in self._workbook.sheetnames:
sheet = self._workbook[sheetname]
parents: dict = {}
col_paths = []
for col in xlsx_utils.get_data_columns(sheet).values():
parents[xlsx_utils.p2s(col.path[:-1])] = col.path[:-1]
col_paths.append(col.path)
for path in parents.values():
subschema = xlsx_utils.get_subschema(path, self._schema)
if subschema.get("type") == "array":
subschema = subschema["items"]
if "enum" in subschema: # Was handled in parent level already
continue
for child, content in subschema["properties"].items():
child_path = path + [child]
if content == {'type': 'string', 'format': 'data-url'}:
continue # skip files
if content.get("type") == "array" and (
content.get("items").get("type") == "object"):
if child_path not in itertools.chain(*self._defining_path_index.values()):
missing(child_path)
elif content.get("type") == "array" and "enum" in content.get("items", []) and (
content.get("uniqueItems") is True):
# multiple choice
for choice in content["items"]["enum"]:
if child_path + [choice] not in col_paths:
missing(child_path + [choice])
elif content.get("type") == "object":
pass
else:
if child_path not in col_paths:
missing(child_path)
def _handle_sheet(self, sheet: Worksheet, fail_later: bool = False) -> None:
"""Add the contents of the sheet to the result (stored in ``self._result``). """Add the contents of the sheet to the result (stored in ``self._result``).
Each row in the sheet corresponds to one entry in an array in the result. Which array exactly is Each row in the sheet corresponds to one entry in an array in the result. Which array exactly is
...@@ -95,6 +169,11 @@ defined by the sheet's "proper name" and the content of the foreign columns. ...@@ -95,6 +169,11 @@ defined by the sheet's "proper name" and the content of the foreign columns.
Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name". Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name".
Parameters
----------
fail_later: bool, optional
If True, do not fail with unresolvable foreign definitions, but collect all errors.
""" """
row_type_column = xlsx_utils.get_row_type_column_index(sheet) row_type_column = xlsx_utils.get_row_type_column_index(sheet)
foreign_columns = xlsx_utils.get_foreign_key_columns(sheet) foreign_columns = xlsx_utils.get_foreign_key_columns(sheet)
...@@ -106,7 +185,7 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na ...@@ -106,7 +185,7 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na
if parent: if parent:
parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index) parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index)
if parent_sheetname not in self._handled_sheets: if parent_sheetname not in self._handled_sheets:
self._handle_sheet(self._workbook[parent_sheetname]) self._handle_sheet(self._workbook[parent_sheetname], fail_later=fail_later)
# # We save single entries in lists, indexed by their foreign key contents. Each entry # # We save single entries in lists, indexed by their foreign key contents. Each entry
# # consists of: # # consists of:
...@@ -114,7 +193,7 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na ...@@ -114,7 +193,7 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na
# # - data: The actual data of this entry, a dict. # # - data: The actual data of this entry, a dict.
# entries: dict[str, list[SimpleNamespace]] = {} # entries: dict[str, list[SimpleNamespace]] = {}
for row in sheet.iter_rows(values_only=True): for row_idx, row in enumerate(sheet.iter_rows(values_only=True)):
# Skip non-data rows. # Skip non-data rows.
if row[row_type_column] is not None: if row[row_type_column] is not None:
continue continue
...@@ -147,13 +226,18 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na ...@@ -147,13 +226,18 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na
_set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1) _set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1)
continue continue
# Find current position in tree try:
parent_dict = self._get_parent_dict(parent_path=parent, foreign=foreign) # Find current position in tree
parent_dict = self._get_parent_dict(parent_path=parent, foreign=foreign)
# Append data to current position's list
if proper_name not in parent_dict: # Append data to current position's list
parent_dict[proper_name] = [] if proper_name not in parent_dict:
parent_dict[proper_name].append(data) parent_dict[proper_name] = []
parent_dict[proper_name].append(data)
except ForeignError as kerr:
if not fail_later:
raise
self._errors[(sheet.title, row_idx)] = kerr.definitions
self._handled_sheets.add(sheet.title) self._handled_sheets.add(sheet.title)
def _is_multiple_choice(self, path: list[str]) -> bool: def _is_multiple_choice(self, path: list[str]) -> bool:
...@@ -187,7 +271,12 @@ the values given in the ``foreign`` specification. ...@@ -187,7 +271,12 @@ the values given in the ``foreign`` specification.
current_object = cand current_object = cand
break break
else: else:
raise KeyError("Cannot find an element which matches the foreign definitions") message = "Cannot find an element which matches these foreign definitions:\n\n"
for name, value in group.definitions:
message += f"{name}: {value}\n"
print(message, file=sys.stderr)
error = ForeignError(definitions=group.definitions, message=message)
raise error
assert isinstance(current_object, dict) assert isinstance(current_object, dict)
return current_object return current_object
...@@ -208,8 +297,16 @@ This includes: ...@@ -208,8 +297,16 @@ This includes:
values = [self.PARSER[array_type](v) for v in value.split(";")] values = [self.PARSER[array_type](v) for v in value.split(";")]
return values return values
try: try:
validate(value, subschema) # special case: datetime or date
except ValidationError as verr: if ("anyOf" in subschema):
if isinstance(value, datetime.datetime) and (
{'type': 'string', 'format': 'date-time'} in subschema["anyOf"]):
return value
if isinstance(value, datetime.date) and (
{'type': 'string', 'format': 'date'} in subschema["anyOf"]):
return value
jsonschema.validate(value, subschema)
except jsonschema.ValidationError as verr:
print(verr) print(verr)
print(path) print(path)
raise raise
...@@ -359,7 +456,8 @@ mydict: dict ...@@ -359,7 +456,8 @@ mydict: dict
return mydict return mydict
def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO]) -> dict: def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO],
validate: bool = None, strict: bool = False) -> dict:
"""Convert the xlsx contents to a dict, it must follow a schema. """Convert the xlsx contents to a dict, it must follow a schema.
Parameters Parameters
...@@ -370,10 +468,17 @@ xlsx: Union[str, BinaryIO] ...@@ -370,10 +468,17 @@ xlsx: Union[str, BinaryIO]
schema: Union[dict, str, TextIO] schema: Union[dict, str, TextIO]
Schema for validation of XLSX content. Schema for validation of XLSX content.
validate: bool, optional
If True, validate the result against the schema.
strict: bool, optional
If True, fail faster.
Returns Returns
------- -------
out: dict out: dict
A dict representing the JSON with the extracted data. A dict representing the JSON with the extracted data.
""" """
converter = XLSXConverter(xlsx, schema) converter = XLSXConverter(xlsx, schema, strict=strict)
return converter.to_dict() return converter.to_dict()
File added
File added
File added
File added
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
"""Testing the conversion from XLSX to JSON""" """Testing the conversion from XLSX to JSON"""
import datetime
import json import json
import os import os
import re import re
...@@ -39,17 +40,23 @@ def rfp(*pathcomponents): ...@@ -39,17 +40,23 @@ def rfp(*pathcomponents):
def convert_and_compare(xlsx_file: str, schema_file: str, known_good_file: str, def convert_and_compare(xlsx_file: str, schema_file: str, known_good_file: str,
strict: bool = False) -> dict: known_good_data: dict = None, strict: bool = False,
validate: bool = True) -> dict:
"""Convert an XLSX file and compare to a known result. """Convert an XLSX file and compare to a known result.
Exactly one of ``known_good_file`` and ``known_good_data`` should be non-empty.
Returns Returns
------- -------
json: dict json: dict
The result of the conversion. The result of the conversion.
""" """
result = convert.to_dict(xlsx=xlsx_file, schema=schema_file) result = convert.to_dict(xlsx=xlsx_file, schema=schema_file, validate=validate)
with open(known_good_file, encoding="utf-8") as myfile: if known_good_file:
expected = json.load(myfile) with open(known_good_file, encoding="utf-8") as myfile:
expected = json.load(myfile)
else:
expected = known_good_data
assert_equal_jsons(result, expected, allow_none=not strict, allow_empty=not strict) assert_equal_jsons(result, expected, allow_none=not strict, allow_empty=not strict)
return result return result
...@@ -70,6 +77,13 @@ def test_conversions(): ...@@ -70,6 +77,13 @@ def test_conversions():
known_good_file=rfp("data/multiple_choice_data.json"), known_good_file=rfp("data/multiple_choice_data.json"),
strict=True) strict=True)
with open(rfp("data/simple_data.json"), encoding="utf-8") as myfile:
expected_datetime = json.load(myfile)
expected_datetime["Training"][0]["date"] = datetime.datetime(2023, 1, 1, 0, 0)
convert_and_compare(xlsx_file=rfp("data/simple_data_datetime.xlsx"),
schema_file=rfp("data/simple_schema.json"),
known_good_file="", known_good_data=expected_datetime)
# Data loss when saving as xlsx # Data loss when saving as xlsx
with pytest.raises(AssertionError) as err: with pytest.raises(AssertionError) as err:
convert_and_compare(xlsx_file=rfp("data/simple_data_ascii_chars.xlsx"), convert_and_compare(xlsx_file=rfp("data/simple_data_ascii_chars.xlsx"),
...@@ -78,6 +92,57 @@ def test_conversions(): ...@@ -78,6 +92,57 @@ def test_conversions():
assert str(err.value).startswith("Values at path ['Training', 0, ") assert str(err.value).startswith("Values at path ['Training', 0, ")
def test_missing_columns():
with pytest.raises(ValueError) as caught:
convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
schema=rfp("data/simple_schema.json"), strict=True)
assert str(caught.value) == "Missing column: Training.coach.given_name"
with pytest.warns(UserWarning) as caught:
convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
schema=rfp("data/simple_schema.json"))
assert str(caught.pop().message) == "Missing column: Training.coach.given_name"
with pytest.warns(UserWarning) as caught:
convert.to_dict(xlsx=rfp("data/multiple_choice_data_missing.xlsx"),
schema=rfp("data/multiple_choice_schema.json"))
messages = {str(w.message) for w in caught}
for expected in [
"Missing column: Training.skills.Communication",
"Missing column: Training.exam_types.Oral",
]:
assert expected in messages
def test_faulty_foreign():
# Simple wrong foreign key
converter = convert.XLSXConverter(xlsx=rfp("data/simple_data_wrong_foreign.xlsx"),
schema=rfp("data/simple_schema.json"))
with pytest.raises(RuntimeError):
converter.to_dict()
errors = converter.get_errors()
assert errors == {('Training.coach', 6): [['date', datetime.datetime(2023, 1, 2, 0, 0)],
['url', 'www.indiscale.com']]}
# More extensive example
converter = convert.XLSXConverter(xlsx=rfp("data/multiple_refs_data_wrong_foreign.xlsx"),
schema=rfp("data/multiple_refs_schema.json"))
with pytest.raises(RuntimeError):
converter.to_dict()
errors = converter.get_errors()
assert errors == {
('Training.Organisation.Person', 8): [
['name', 'World Training Organization 2']],
('Training.Organisation.Person', 9): [
['date', '2024-03-21T14:12:00.000Z'],
['url', 'www.getlinkahead.com']],
('Training.participant', 6): [
['date', '2024-03-21T14:12:00.000Z'],
['url', None]],
('Training.participant', 7): [
['date', '2024-03-21T14:12:00.000Z'],
['url', None]],
}
def test_set_in_nested(): def test_set_in_nested():
"""Test the ``_set_in_nested`` function.""" """Test the ``_set_in_nested`` function."""
set_in_nested = convert._set_in_nested # pylint: disable=protected-access set_in_nested = convert._set_in_nested # pylint: disable=protected-access
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment