Skip to content
Snippets Groups Projects
Verified Commit 9ed052b1 authored by Daniel Hornung's avatar Daniel Hornung
Browse files

ENH: XLSX reader works now.

Still TODO:

- Early: Foreign column checking
- Result validation with schema
- Warnings: Additional or missing columns
parent 8d187480
No related branches found
No related tags found
2 merge requests!107Release v0.11.0,!102ENH: XLSX reader
...@@ -131,9 +131,22 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na ...@@ -131,9 +131,22 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na
continue continue
if col_idx in data_column_paths: if col_idx in data_column_paths:
value = self._validate_and_convert(value=value, path=data_column_paths[col_idx]) path = data_column_paths[col_idx]
_set_in_nested(mydict=data, path=data_column_paths[col_idx], value=value, if self._is_multiple_choice(path):
prefix=parent, skip=1) real_value = path.pop() # Last component is the enum value, insert above
# set up list
try:
_set_in_nested(mydict=data, path=path, value=[], prefix=parent, skip=1)
except ValueError as err:
if not str(err).startswith("There is already some value at"):
raise
if not xlsx_utils.parse_multiple_choice(value):
continue
_set_in_nested(mydict=data, path=path, value=real_value, prefix=parent,
skip=1, append_to_list=True)
else:
value = self._validate_and_convert(value, path)
_set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1)
continue continue
continue continue
...@@ -150,6 +163,17 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na ...@@ -150,6 +163,17 @@ Look at ``xlsx_utils.get_path_position`` for the specification of the "proper na
self._handled_sheets.add(sheet.title) self._handled_sheets.add(sheet.title)
# print(f"Added sheet: {sheet.title}") # print(f"Added sheet: {sheet.title}")
def _is_multiple_choice(self, path: list[str]) -> bool:
"""Test if the path belongs to a multiple choice section."""
if not path:
return False
subschema = self._get_subschema(path[:-1])
if (subschema["type"] == "array"
and subschema.get("uniqueItems") is True
and "enum" in subschema["items"]):
return True
return False
def _get_parent_dict(self, parent_path: list[str], foreign: list[list]) -> dict: def _get_parent_dict(self, parent_path: list[str], foreign: list[list]) -> dict:
"""Return the dict into which values can be inserted. """Return the dict into which values can be inserted.
...@@ -211,7 +235,10 @@ This includes: ...@@ -211,7 +235,10 @@ This includes:
next_schema = schema["properties"][path[0]] next_schema = schema["properties"][path[0]]
return self._get_subschema(path=path[1:], schema=next_schema) return self._get_subschema(path=path[1:], schema=next_schema)
if schema["type"] == "array": if schema["type"] == "array":
next_schema = schema["items"]["properties"][path[0]] items = schema["items"]
if "enum" in items:
return schema
next_schema = items["properties"][path[0]]
return self._get_subschema(path=path[1:], schema=next_schema) return self._get_subschema(path=path[1:], schema=next_schema)
return schema return schema
...@@ -261,8 +288,7 @@ out: list[dict[str, list[list]]] ...@@ -261,8 +288,7 @@ out: list[dict[str, list[list]]]
results[stringpath].definitions.append(definition) results[stringpath].definitions.append(definition)
# Then sort by stringpath and calculate subpath. # Then sort by stringpath and calculate subpath.
stringpaths = list(results.keys()) stringpaths = sorted(results.keys())
stringpaths.sort()
resultlist = [] resultlist = []
last_level = 0 last_level = 0
...@@ -280,9 +306,9 @@ out: list[dict[str, list[list]]] ...@@ -280,9 +306,9 @@ out: list[dict[str, list[list]]]
return resultlist return resultlist
# pylint: disable-next=dangerous-default-value # pylint: disable-next=dangerous-default-value,too-many-arguments
def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], skip: int = 0, def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], skip: int = 0,
overwrite: bool = False) -> dict: overwrite: bool = False, append_to_list: bool = False) -> dict:
"""Set a value in a nested dict. """Set a value in a nested dict.
Parameters Parameters
...@@ -301,6 +327,10 @@ skip: int = 0 ...@@ -301,6 +327,10 @@ skip: int = 0
overwrite: bool = False overwrite: bool = False
If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values
leads to an exception. leads to an exception.
append_to_list: bool = False
If True, assume that the element at ``path`` is a list and append the value to it. If the list
does not exist, create it. If there is a non-list at ``path`` already, overwrite it with a new
list, if ``overwrite`` is True, otherwise raise a ValueError.
Returns Returns
------- -------
...@@ -327,6 +357,16 @@ mydict: dict ...@@ -327,6 +357,16 @@ mydict: dict
raise ValueError(f"There is already some value at {path}") raise ValueError(f"There is already some value at {path}")
tmp_dict = tmp_dict[key] tmp_dict = tmp_dict[key]
key = path.pop() key = path.pop()
if append_to_list:
if key not in tmp_dict:
tmp_dict[key] = []
if not isinstance(tmp_dict[key], list):
if overwrite:
tmp_dict[key] = []
else:
raise ValueError(f"There is already some non-list value at [{key}]")
tmp_dict[key].append(value)
else:
if key in tmp_dict and not overwrite: if key in tmp_dict and not overwrite:
raise ValueError(f"There is already some value at [{key}]") raise ValueError(f"There is already some value at [{key}]")
if key not in tmp_dict: if key not in tmp_dict:
......
...@@ -39,12 +39,16 @@ from collections import OrderedDict ...@@ -39,12 +39,16 @@ from collections import OrderedDict
from copy import deepcopy from copy import deepcopy
from enum import Enum from enum import Enum
from types import SimpleNamespace from types import SimpleNamespace
from typing import TextIO, Union from typing import Any, TextIO, Union
from openpyxl import Workbook from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet from openpyxl.worksheet.worksheet import Worksheet
TRUTHY = {"true", "wahr", "x", "", "yes", "ja", "y", "j"} # For multiple choice columns
FALSY = {"false", "falsch", "-", "no", "nein", "n"} # For multiple choice columns
class ColumnType(Enum): class ColumnType(Enum):
""" column types enum """ """ column types enum """
SCALAR = 1 SCALAR = 1
...@@ -288,6 +292,48 @@ def p2s(path: list[str]) -> str: ...@@ -288,6 +292,48 @@ def p2s(path: list[str]) -> str:
return ".".join(path) return ".".join(path)
def parse_multiple_choice(value: Any) -> bool:
"""Interpret ``value`` as a multiple choice input.
*Truthy* values are:
- The boolean ``True``.
- The number "1".
- The (case-insensitive) strings ``true``, ``wahr``, ``x``, ``√``, ``yes``, ``ja``, ``y``, ``j``.
*Falsy* values are:
- The boolean ``False``.
- ``None``, empty strings, lists, dicts.
- The number "0".
- The (case-insensitive) strings ``false``, ``falsch``, ``-``, ``no``, ``nein``, ``n``.
- Everything else.
Returns
-------
out: bool
The interpretation result of ``value``.
"""
# Non-string cases first:
# pylint: disable-next=too-many-boolean-expressions
if (value is None or value is False or value == 0
or value == [] or value == {} or value == ""):
return False
if (value is True or value == 1):
return True
# String cases follow:
if not isinstance(value, str):
return False
value = value.lower()
if value in TRUTHY:
return True
# Strictly speaking, this test is not necessary, but I think it's good practice.
if value in FALSY:
return False
return False
def read_or_dict(data: Union[dict, str, TextIO]) -> dict: def read_or_dict(data: Union[dict, str, TextIO]) -> dict:
"""If data is a json file name or input stream, read data from there. """If data is a json file name or input stream, read data from there.
If it is a dict already, just return it.""" If it is a dict already, just return it."""
......
...@@ -38,7 +38,8 @@ def rfp(*pathcomponents): ...@@ -38,7 +38,8 @@ def rfp(*pathcomponents):
return os.path.join(os.path.dirname(__file__), *pathcomponents) return os.path.join(os.path.dirname(__file__), *pathcomponents)
def convert_and_compare(xlsx_file: str, schema_file: str, known_good_file: str) -> dict: def convert_and_compare(xlsx_file: str, schema_file: str, known_good_file: str,
strict: bool = False) -> dict:
"""Convert an XLSX file and compare to a known result. """Convert an XLSX file and compare to a known result.
Returns Returns
...@@ -49,7 +50,7 @@ json: dict ...@@ -49,7 +50,7 @@ json: dict
result = convert.to_dict(xlsx=xlsx_file, schema=schema_file) result = convert.to_dict(xlsx=xlsx_file, schema=schema_file)
with open(known_good_file, encoding="utf-8") as myfile: with open(known_good_file, encoding="utf-8") as myfile:
expected = json.load(myfile) expected = json.load(myfile)
assert_equal_jsons(result, expected) assert_equal_jsons(result, expected, allow_none=not strict, allow_empty=not strict)
return result return result
...@@ -66,7 +67,8 @@ def test_conversions(): ...@@ -66,7 +67,8 @@ def test_conversions():
known_good_file=rfp("data/indirect_data.json")) known_good_file=rfp("data/indirect_data.json"))
convert_and_compare(xlsx_file=rfp("data/multiple_choice_data.xlsx"), convert_and_compare(xlsx_file=rfp("data/multiple_choice_data.xlsx"),
schema_file=rfp("data/multiple_choice_schema.json"), schema_file=rfp("data/multiple_choice_schema.json"),
known_good_file=rfp("data/multiple_choice_data.json")) known_good_file=rfp("data/multiple_choice_data.json"),
strict=True)
# Data loss when saving as xlsx # Data loss when saving as xlsx
with pytest.raises(AssertionError) as err: with pytest.raises(AssertionError) as err:
......
...@@ -42,7 +42,7 @@ Raise an assertion exception if they are not equal.""" ...@@ -42,7 +42,7 @@ Raise an assertion exception if they are not equal."""
if key in json1 and key in json2: if key in json1 and key in json2:
el1 = json1[key] el1 = json1[key]
el2 = json2[key] el2 = json2[key]
assert type(el1) is type(el2), f"Type mismatch, path: {this_path}" assert isinstance(el1, type(el2)), f"Type mismatch, path: {this_path}"
if isinstance(el1, (dict, list)): if isinstance(el1, (dict, list)):
# Iterables: Recursion # Iterables: Recursion
assert_equal_jsons(el1, el2, allow_none=allow_none, allow_empty=allow_empty, assert_equal_jsons(el1, el2, allow_none=allow_none, allow_empty=allow_empty,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment