Skip to content
Snippets Groups Projects
Verified Commit 84f13721 authored by Daniel Hornung's avatar Daniel Hornung
Browse files

WIP: XLSX reader

parent 2f20561b
No related branches found
No related tags found
2 merge requests!107Release v0.11.0,!102ENH: XLSX reader
Pipeline #50225 failed
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
......@@ -19,14 +18,20 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Convert XLSX files to JSON dictionaries."""
from collections import OrderedDict
from types import SimpleNamespace
from typing import Any, BinaryIO, Dict, List, Optional, TextIO, Union
import caosadvancedtools.table_json_conversion.xlsx_utils as xlsx_utils
from . import fill_xlsx
from .fill_xlsx import read_or_dict
from openpyxl import load_workbook, Workbook
from openpyxl.worksheet.worksheet import Worksheet
from . import fill_xlsx
from .fill_xlsx import read_or_dict
class XLSXConverter:
"""Class for conversion from XLSX to JSON.
......@@ -47,8 +52,9 @@ schema: Union[dict, str, TextIO]
"""
self._workbook = load_workbook(xlsx)
self._schema = read_or_dict(schema)
self._handled_sheets = set()
self._result = {}
self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook)
self._handled_sheets: set[str] = set()
self._result: dict = {}
def to_dict(self) -> dict:
"""Convert the xlsx contents to a dict.
......@@ -63,17 +69,119 @@ out: dict
for sheetname in self._workbook.sheetnames:
if sheetname not in self._handled_sheets:
self._handle_sheet(self._workbook.get_sheet_by_name(sheetname))
return self._result
def _handle_sheet(self, sheet: Worksheet) -> None:
"""Add the contents of the sheet to the result.
Each row in the sheet corresponds to one entry in an array in the result.
Which array exactly is defined by the content of the foreign columns.
"""
path_rows = xlsx_utils.get_path_rows(sheet)
row_type_column = xlsx_utils.get_row_type_column_index(sheet)
foreign_columns = xlsx_utils.get_foreign_key_columns(sheet)
# Parent element: longest common path shared among any foreign column and all the data columns
foreign_column_paths = {col.index: col.path for col in foreign_columns.values()}
data_columns = xlsx_utils.get_data_columns(sheet)
data_column_paths = {col.index: col.path for col in data_columns.values()}
# Parent path, insert in correct order.
parent = xlsx_utils.get_parent_path(sheet)
# from IPython import embed
# embed()
if parent:
parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index)
if parent_sheetname not in self._handled_sheets:
self._handle_sheet(self._workbook.get_sheet_by_name(parent_sheetname))
# We save single entries in lists, indexed by their foreign key contents. Each entry
# consists of:
# - foreign: Dict with path -> value for the foreign columns
# - data: The actual data of this entry, a dict.
entries: dict[str, list[SimpleNamespace]] = {}
if len(parent) < 2:
return
for row in sheet.iter_rows(values_only=True):
# Skip non-data rows.
if row[row_type_column] is not None:
continue
foreign_repr = ""
foreign = [] # A list of lists, each of which is: [path1, path2, ..., leaf, value]
data = {} # Local data dict
# Collect data (in dict relative to current level) and foreign data information
for col_idx, value in enumerate(row):
if col_idx in foreign_column_paths:
foreign_repr += str(value)
foreign.append(foreign_column_paths[col_idx] + [value])
continue
if col_idx in data_column_paths:
_set_in_nested(mydict=data, path=data_column_paths[col_idx], value=value,
prefix=parent)
continue
continue
# Find current position in tree
parent_list = self._get_parent_list(foreign)
# Append data to current position's list
parent_list.append(data)
def _get_parent_list(self, parent_path: list[str], foreign: list[list]) -> list[dict]:
"""For a ``foreign`` specification, get the correct list from the current result-in-making.
"""
if not foreign:
from IPython import embed
embed()
# pylint: disable-next=dangerous-default-value
def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], overwrite=False) -> (
dict):
"""Set a value in a nested dict.
Parameters
----------
mydict: dict
The dict into which the ``value`` shall be inserted.
path: list
A list of keys, denoting the location of the value.
value
The value inside the dict.
prefix: list
A list of keys which shall be removed from ``path``. A KeyError is raised if ``path`` does not
start with the elements of ``prefix``.
overwrite: bool = False
If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values
leads to an exception.
Returns
-------
mydict: dict
The same dictionary that was given as a parameter, but modified.
"""
for idx, el in enumerate(prefix):
if path[idx] != el:
raise KeyError(f"Path does not start with prefix: {prefix} not in {path}")
path = path[len(prefix):]
tmp_dict = mydict
while len(path) > 1:
key = path.pop(0)
if key not in tmp_dict:
tmp_dict[key] = {}
if not isinstance(tmp_dict[key], dict):
if overwrite:
tmp_dict[key] = {}
else:
raise ValueError(f"There is already some value at {path}")
tmp_dict = tmp_dict[key]
key = path.pop()
if key in tmp_dict and not overwrite:
raise ValueError(f"There is already some value at [{key}]")
if key not in tmp_dict:
tmp_dict[key] = {}
tmp_dict[key] = value
return mydict
def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO]) -> dict:
......
......@@ -29,6 +29,7 @@ from enum import Enum
from types import SimpleNamespace
from typing import Dict, List, TextIO, Union
from openpyxl import Workbook
from openpyxl.worksheet.worksheet import Worksheet
......@@ -71,6 +72,46 @@ If it is a dict already, just return it."""
return data
def get_defining_paths(workbook: Workbook) -> dict[str, list[list[str]]]:
"""For all sheets in ``workbook``, list the paths which they define.
A sheet is said to define a path, if it has data columns for properties inside that path. For
example, consider the following worksheet:
| `COL_TYPE` | `SCALAR` | `SCALAR` | `LIST` | `SCALAR` |
| `PATH` | `Training` | `Training` | `Training` | `Training` |
| `PATH` | `url` | `date` | `subjects` | `supervisor` |
| `PATH` | | | | `email` |
|------------|----------------|---------------|--------------|--------------------|
| | example.com/mp | 2024-02-27 | Math;Physics | steve@example.com |
| | example.com/m | 2024-02-27 | Math | stella@example.com |
This worksheet defines properties for the paths `["Training"]` and `["Training", "supervisor"]`, and
thus these two path lists would be returned for the key with this sheet's sheetname.
Parameters
----------
workbook: Workbook
The workbook to analyze.
Returns
-------
out: dict[str, list[list[str]]
A dict with worksheet names as keys and lists of paths (represented as string lists) as values.
"""
result: dict[str, list[list[str]]] = {}
for sheet in workbook.worksheets:
paths = []
added = set()
for col in get_data_columns(sheet).values():
rep = p2s(col.path[:-1])
if rep not in added:
paths.append(col.path[:-1])
added.add(rep)
result[sheet.title] = paths
return result
def get_data_columns(sheet: Worksheet) -> Dict[str, SimpleNamespace]:
"""Return the data paths of the worksheet.
......@@ -132,7 +173,7 @@ out: dict[str, SimpleNamespace]
def get_parent_path(sheet: Worksheet) -> list[str]:
"""Return a path which represents the parent element.
For top-level sheets / entries, this returns an empty list.
For top-level sheets / entries (those without foreign columns), this returns an empty list.
"""
# Parent element: longest common path shared among any foreign column and all the data columns
result: list[str] = []
......@@ -178,6 +219,14 @@ def get_row_type_column_index(sheet: Worksheet):
raise ValueError("The column which defines row types (COL_TYPE, PATH, ...) is missing")
def get_worksheet_for_path(path: list[str], defining_path_index: dict[str, list[list[str]]]) -> str:
"""Find the sheet name which corresponds to the given path."""
for sheetname, paths in defining_path_index.items():
if path in paths:
return sheetname
raise KeyError(f"Could not find defining worksheet for path: {path}")
def next_row_index(sheet: Worksheet) -> int:
"""Return the index for the next data row.
......
......@@ -66,3 +66,41 @@ def test_simple():
convert.to_dict(xlsx=rfp("data/multiple_refs_data.xlsx"), schema=rfp("data/multiple_refs_schema.json"))
# conv = XLSXConverter(schema=rfp("data/simple_schema.json"))
# result = conv.to_dict(rfp("data/simple_template.xlsx"))
def test_protected():
set_in_nested = convert._set_in_nested # pylint: disable=protected-access
test_data_in = [
{"mydict": {}, "path": ["a", 1], "value": 3},
{"mydict": {"a": 1}, "path": ["a"], "value": 3, "overwrite": True},
{"mydict": {"a": 1}, "path": ["a", 1], "value": 3, "overwrite": True},
{"mydict": {"b": 2}, "path": ["a", 1, 3.141], "value": 3},
{"mydict": {}, "path": ["X", "Y", "a", 1], "value": 3, "prefix": ["X", "Y"]},
]
test_data_out = [
{"a": {1: 3}},
{"a": 3},
{"a": {1: 3}},
{"a": {1: {3.141: 3}}, "b": 2},
{"a": {1: 3}},
]
for data_in, data_out in zip(test_data_in, test_data_out):
assert set_in_nested(**data_in) == data_out
# Testing exceptions
test_data_in = [
{"mydict": {"a": 1}, "path": ["a"], "value": 3},
{"mydict": {"a": 1}, "path": ["a", 1], "value": 3},
{"mydict": {}, "path": ["a", 1], "value": 3, "prefix": ["X", "Y", "Z"]},
]
exceptions = [
[ValueError, r"There is already some value at \[a\]"],
[ValueError, r"There is already some value at \[1\]"],
[KeyError, r"Path does not start with prefix: \['X', 'Y', 'Z'\] not in \['a', 1\]"],
]
for data_in, (exc_out, match) in zip(test_data_in, exceptions):
with pytest.raises(exc_out, match=match):
set_in_nested(**data_in)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment