Skip to content
Snippets Groups Projects
Verified Commit 35e621d0 authored by Daniel Hornung's avatar Daniel Hornung
Browse files

WIP: XLSX table reader

parent 03bd9f91
Branches
Tags
2 merge requests!107Release v0.11.0,!102ENH: XLSX reader
......@@ -21,6 +21,8 @@
"""Convert XLSX files to JSON dictionaries."""
from collections import OrderedDict
from functools import reduce
from operator import getitem
from types import SimpleNamespace
from typing import Any, BinaryIO, Dict, List, Optional, TextIO, Union
......@@ -72,19 +74,23 @@ out: dict
return self._result
def _handle_sheet(self, sheet: Worksheet) -> None:
"""Add the contents of the sheet to the result.
"""Add the contents of the sheet to the result (stored in ``self._result``).
Each row in the sheet corresponds to one entry in an array in the result. Which array exactly is
defined by the sheet's "proper name" and the content of the foreign columns.
Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name".
Each row in the sheet corresponds to one entry in an array in the result.
Which array exactly is defined by the content of the foreign columns.
"""
path_rows = xlsx_utils.get_path_rows(sheet)
row_type_column = xlsx_utils.get_row_type_column_index(sheet)
foreign_columns = xlsx_utils.get_foreign_key_columns(sheet)
foreign_column_paths = {col.index: col.path for col in foreign_columns.values()}
data_columns = xlsx_utils.get_data_columns(sheet)
data_column_paths = {col.index: col.path for col in data_columns.values()}
# Parent path, insert in correct order.
parent = xlsx_utils.get_parent_path(sheet)
parent, proper_name = xlsx_utils.get_path_position(sheet)
# print(parent, proper_name, sheet.title)
# breakpoint()
if parent:
parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index)
if parent_sheetname not in self._handled_sheets:
......@@ -96,8 +102,6 @@ Which array exactly is defined by the content of the foreign columns.
# - data: The actual data of this entry, a dict.
entries: dict[str, list[SimpleNamespace]] = {}
if len(parent) < 2:
return
for row in sheet.iter_rows(values_only=True):
# Skip non-data rows.
if row[row_type_column] is not None:
......@@ -114,29 +118,116 @@ Which array exactly is defined by the content of the foreign columns.
if col_idx in data_column_paths:
_set_in_nested(mydict=data, path=data_column_paths[col_idx], value=value,
prefix=parent)
prefix=parent, skip=1)
continue
continue
# Find current position in tree
parent_list = self._get_parent_list(foreign)
parent_dict = self._get_parent_dict(parent_path=parent, foreign=foreign)
# Append data to current position's list
parent_list.append(data)
def _get_parent_list(self, parent_path: list[str], foreign: list[list]) -> list[dict]:
if proper_name not in parent_dict:
parent_dict[proper_name] = []
parent_dict[proper_name].append(data)
# breakpoint()
# if sheet.title == "Training.Organisation":
# breakpoint()
self._handled_sheets.add(sheet.title)
# print(f"Added sheet: {sheet.title}")
def _get_parent_dict(self, parent_path: list[str], foreign: list[list]) -> dict:
"""For a ``foreign`` specification, get the correct list from the current result-in-making.
"""
if not foreign:
# if not foreign:
# return self._result
foreign_groups = _group_foreign_paths(foreign, common=parent_path)
current_object = self._result
for group in foreign_groups:
# Find list for which foreign definitions are relevant.
current_object = reduce(getitem, group.subpath, current_object)
assert isinstance(current_object, list)
# Test all candidates.
for cand in current_object:
if all(reduce(getitem, definition[:-1], cand) == definition[-1]
for definition in group.definitions):
current_object = cand
break
else:
raise KeyError("Cannot find an element which matches the foreign definitions")
assert isinstance(current_object, dict)
return current_object
def _group_foreign_paths(foreign: list[list], common: list[str]) -> list[SimpleNamespace]:
"""Group the foreign keys by their base paths.
Parameters
----------
foreign: list[list]
A list of foreign definitions, consisting of path components, property and possibly value.
common: list[list[str]]
A common path which defines the final target of the foreign definitions. This helps to understand
where the ``foreign`` paths shall be split.
Returns
-------
out: list[dict[str, list[list]]]
A list of foreign path segments, grouped by their common segments. Each element is a namespace
with detailed information of all those elements which form the group. The namespace has the
following attributes:
- ``path``: The full path to this path segment. This is always the previous segment's ``path``
plus this segment's ``subpath``.
- ``stringpath``: The stringified ``path``, might be useful for comparison or sorting.
- ``subpath``: The path, relative from the previous segment.
- ``definitions``: A list of the foreign definitions for this segment, but stripped of the
``path`` components.
"""
# Build a simple dict first, without subpath.
results = {}
for f_path in foreign:
path = []
for component in f_path:
path.append(component)
if path != common[:len(path)]:
break
path.pop()
definition = f_path[len(path):]
stringpath = xlsx_utils.p2s(path)
if stringpath not in results:
results[stringpath] = SimpleNamespace(stringpath=stringpath, path=path,
definitions=[definition])
else:
results[stringpath].definitions.append(definition)
# Then sort by stringpath and calculate subpath.
stringpaths = list(results.keys())
stringpaths.sort()
resultlist = []
last_level = 0
for stringpath in stringpaths:
elem = results[stringpath]
elem.subpath = elem.path[last_level:]
last_level = len(elem.path)
resultlist.append(elem)
# from IPython import embed
# embed()
from IPython import embed
embed()
if last_level != len(common):
raise ValueError("Foreign keys must cover the complete `common` depth.")
return resultlist
# pylint: disable-next=dangerous-default-value
def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], overwrite=False) -> (
dict):
def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], skip: int = 0,
overwrite: bool = False) -> dict:
"""Set a value in a nested dict.
Parameters
......@@ -150,6 +241,8 @@ value
prefix: list
A list of keys which shall be removed from ``path``. A KeyError is raised if ``path`` does not
start with the elements of ``prefix``.
skip: int = 0
Remove this many additional levels from the path, *after* removing the prefix.
overwrite: bool = False
If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values
leads to an exception.
......@@ -163,6 +256,9 @@ mydict: dict
if path[idx] != el:
raise KeyError(f"Path does not start with prefix: {prefix} not in {path}")
path = path[len(prefix):]
if skip:
assert len(path) > skip, f"Path must be long enoug to remove skip={skip} elements."
path = path[skip:]
tmp_dict = mydict
while len(path) > 1:
......
......@@ -170,13 +170,26 @@ out: dict[str, SimpleNamespace]
return result
def get_parent_path(sheet: Worksheet) -> list[str]:
"""Return a path which represents the parent element.
def get_path_position(sheet: Worksheet) -> tuple[list[str], str]:
"""Return a path which represents the parent element, and the sheet's "proper name".
For top-level sheets / entries (those without foreign columns), this returns an empty list.
For top-level sheets / entries (those without foreign columns), the path is an empty list.
A sheet's "proper name" is detected from the data column paths: it is the first component after the
parent components.
Returns
-------
parent: list[str]
Path to the parent element. Note that there may be list elements on the path which are **not**
represented in this return value.
proper_name: str
The "proper name" of this sheet. This defines an array where all the data lives, relative to the
parent path.
"""
# Parent element: longest common path shared among any foreign column and all the data columns
result: list[str] = []
parent: list[str] = []
# longest common path in data colums
data_paths = [el.path for el in get_data_columns(sheet).values()]
......@@ -188,15 +201,18 @@ For top-level sheets / entries (those without foreign columns), this returns an
# longest common overall path
foreign_paths = [el.path for el in get_foreign_key_columns(sheet).values()]
ii = 0 # If no foreign_paths, proper name is the first element
for foreign_path in foreign_paths:
for ii in range(min([len(foreign_path), len(longest_data_path)])):
components_at_index = {foreign_path[ii], longest_data_path[ii]}
if len(components_at_index) > 1:
break
if ii > len(result):
result = foreign_path[:ii]
if ii > len(parent):
parent = foreign_path[:ii]
return result
# print(data_paths, ii)
# breakpoint()
return parent, data_paths[0][ii]
def get_path_rows(sheet: Worksheet):
......
......@@ -17,12 +17,16 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Testing the conversion from XLSX to JSON"""
import json
import os
import re
import tempfile
from types import SimpleNamespace
import jsonschema.exceptions as schema_exc
import pytest
import caosadvancedtools.table_json_conversion.convert as convert
......@@ -61,14 +65,58 @@ custom_output: str, optional
compare_workbooks(generated, known_good_wb)
def test_simple():
# convert.to_dict(xlsx=rfp("data/simple_data.xlsx"), schema=rfp("data/simple_schema.json"))
convert.to_dict(xlsx=rfp("data/multiple_refs_data.xlsx"), schema=rfp("data/multiple_refs_schema.json"))
def _assert_equal_jsons(json1, json2, allow_none: bool = True, allow_empty: bool = True,
path: list = None) -> None:
"""Compare two json objects for near equality.
Raise an assertion exception if they are not equal."""
if path is None:
path = []
assert isinstance(json1, dict) == isinstance(json2, dict), f"Type mismatch, path: {path}"
if isinstance(json1, dict):
keys = set(json1.keys()).union(json2.keys())
for key in keys:
this_path = path + [key]
# Case 1: both exist
if key in json1 and key in json2:
el1 = json1[key]
el2 = json2[key]
assert type(el1) is type(el2), f"Type mismatch, path: {this_path}"
if isinstance(el1, (dict, list)):
_assert_equal_jsons(el1, el2, allow_none=allow_none, allow_empty=allow_empty,
path=this_path)
assert el1 == el2, f"Values at path {this_path} are not equal:\n{el1},\n{el2}"
continue
# Case 2: only one exists
existing = json1.get(key, json2.get(key))
assert (allow_none and existing is None) or (allow_empty and existing == []), (
f"Element at path {this_path} is None or empty in one json and does not exist in "
"the other.")
assert isinstance(json1, list) and isinstance(json2, list), f"Type mismatch, path: {path}"
assert len(json1) == len(json2), f"Lists must have equal length, path: {path}"
for idx, (el1, el2) in enumerate(zip(json1, json2)):
this_path = path + [idx]
assert isinstance(el1, dict) and isinstance(el2, dict), (
f"List elements must be dicts: path: {this_path}")
_assert_equal_jsons(el1, el2, allow_none=allow_none, allow_empty=allow_empty,
path=this_path)
def test_conversions():
result = convert.to_dict(xlsx=rfp("data/simple_data.xlsx"), schema=rfp("data/simple_schema.json"))
expected = json.load(open(rfp("data/simple_data.json")))
# result = convert.to_dict(xlsx=rfp("data/multiple_refs_data.xlsx"),
# schema=rfp("data/multiple_refs_schema.json"))
# expected = json.load(open(rfp("data/multiple_refs_data.json")))
breakpoint()
_assert_equal_jsons(result, expected)
breakpoint()
# conv = XLSXConverter(schema=rfp("data/simple_schema.json"))
# result = conv.to_dict(rfp("data/simple_template.xlsx"))
def test_protected():
def test_set_in_nested():
set_in_nested = convert._set_in_nested # pylint: disable=protected-access
test_data_in = [
......@@ -104,3 +152,27 @@ def test_protected():
for data_in, (exc_out, match) in zip(test_data_in, exceptions):
with pytest.raises(exc_out, match=match):
set_in_nested(**data_in)
def test_group_foreign_paths():
group = convert._group_foreign_paths # pylint: disable=protected-access
foreign = [
["A", "x", 1.1],
["A", "y", "z", "some text"],
["A", "B", "CC", "x", 42],
]
common = ["A", "B", "CC"]
common_wrong = ["A", "B", "C"]
expected = [
SimpleNamespace(stringpath="A", path=["A"], subpath=["A"],
definitions=[["x", 1.1], ["y", "z", "some text"]]),
SimpleNamespace(stringpath="A.B.CC", path=["A", "B", "CC"], subpath=["B", "CC"],
definitions=[["x", 42]]),
]
with pytest.raises(ValueError, match=re.escape(
"Foreign keys must cover the complete `common` depth.")):
result = group(foreign=foreign, common=common_wrong)
result = group(foreign=foreign, common=common)
assert result == expected
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment