Skip to content
Snippets Groups Projects
Verified Commit b10f9376 authored by Daniel Hornung's avatar Daniel Hornung
Browse files

WIP: Filling XLSX

parent 54b402d1
No related branches found
No related tags found
2 merge requests!100WIP: Filling XLSX: Seems to be working.,!93Filling XLSX: Everything except multiple choice.
Pipeline #48214 passed
......@@ -29,16 +29,7 @@ from openpyxl import load_workbook, Workbook
from openpyxl.worksheet.worksheet import Worksheet
from .table_generator import ColumnType, RowType
def _fill_leaves(json_doc: dict, workbook):
for key, value in json_doc:
if not isinstance(value, list):
value = [value]
for el in value:
if isinstance(el, dict):
_fill_leaves(el, workbook)
workbook.cell(1, 2, el)
from .utils import p2s
def _is_exploded_sheet(sheet: Worksheet) -> bool:
......@@ -48,44 +39,80 @@ def _is_exploded_sheet(sheet: Worksheet) -> bool:
sheet. A sheet is detected as exploded iff it has FOREIGN columns.
"""
column_types = _get_column_types(sheet)
return ColumnType.FOREIGN.value in column_types.values()
return ColumnType.FOREIGN.name in column_types.values()
def _get_column_types(sheet: Worksheet) -> OrderedDict:
"""Return an OrderedDict: column index -> column type for the sheet.
"""
result = OrderedDict()
type_row_index = _get_row_type_column_index(sheet) - 1
type_row_index = _get_row_type_column_index(sheet)
for idx, col in enumerate(sheet.columns):
type_cell = col[type_row_index]
result[idx] = type_cell.value
assert hasattr(ColumnType, type_cell.value) or type_cell.value is None, (
assert (hasattr(ColumnType, type_cell.value)
or type_cell.value == RowType.COL_TYPE.name
or type_cell.value is None), (
f"Unexpected column type value: {type_cell.value}")
return result
def _get_row_type_column_index(worksheet):
"""Return the column index (1-indexed) of the column which defines the row types.
def _get_foreign_key_columns(sheet: Worksheet) -> Dict[str, SimpleNamespace]:
"""Return the foreign keys of the worksheet.
Returns
-------
out: dict[str, SimpleNamespace]
The keys are the stringified paths. The values are SimpleNamespace objects with ``index``,
``path`` and ``column`` attributes.
"""
column_types = _get_column_types(sheet)
path_rows = _get_path_rows(sheet)
result = OrderedDict()
for for_idx, name in column_types.items():
if name != ColumnType.FOREIGN.name:
continue
path = []
for row in path_rows:
component = sheet.cell(row=row+1, column=for_idx+1).value
if component is None:
break
assert isinstance(component, str), f"Expected string: {component}"
path.append(component)
result[p2s(path)] = SimpleNamespace(index=for_idx, path=path,
column=list(sheet.columns)[for_idx])
return result
def _get_deep_value(data: Dict[str, Any], path: List[str]):
"""Return the value at ``path`` inside the dict ``data``.
"""
if len(path) > 1:
return _get_deep_value(data[path.pop(0)], path)
return data[path[0]]
def _get_row_type_column_index(sheet: Worksheet):
"""Return the column index (0-indexed) of the column which defines the row types.
"""
for col in worksheet.columns:
for col in sheet.columns:
for cell in col:
if cell.value == RowType.COL_TYPE.name:
return cell.column
return cell.column - 1
raise ValueError("The column which defines row types (COL_TYPE, PATH, ...) is missing")
def _get_path_rows(worksheet):
"""Return the 1-based indices of the rows which represent paths."""
def _get_path_rows(sheet: Worksheet):
"""Return the 0-based indices of the rows which represent paths."""
rows = []
rt_col = _get_row_type_column_index(worksheet)
for cell in list(worksheet.columns)[rt_col-1]:
print(cell.value)
rt_col = _get_row_type_column_index(sheet)
for cell in list(sheet.columns)[rt_col]:
if cell.value == RowType.PATH.name:
rows.append(cell.row)
rows.append(cell.row-1)
return rows
def _next_row_index(sheet) -> int:
def _next_row_index(sheet: Worksheet) -> int:
"""Return the index for the next data row.
This is defined as the first row without any content.
......@@ -94,13 +121,16 @@ def _next_row_index(sheet) -> int:
class TemplateFiller:
def __init__(self, workbook):
def __init__(self, workbook: Workbook):
self._workbook = workbook
self._create_index()
self._context: Optional[dict] = None
def fill_data(self, data: dict):
"""Fill the data into the workbook."""
self._context = data
self._handle_data(data=data, current_path=[])
self._context = None
def _create_index(self):
"""Create a sheet index for the workbook.
......@@ -113,7 +143,7 @@ class TemplateFiller:
for sheetname in self._workbook.sheetnames:
sheet = self._workbook[sheetname]
type_column = [x.value for x in list(sheet.columns)[
_get_row_type_column_index(sheet) - 1]]
_get_row_type_column_index(sheet)]]
# 0-indexed, as everything outside of sheet.cell(...):
coltype_idx = type_column.index(RowType.COL_TYPE.name)
path_indices = [i for i, typ in enumerate(type_column) if typ == RowType.PATH.name]
......@@ -127,12 +157,12 @@ class TemplateFiller:
for path_idx in path_indices:
if col[path_idx].value is not None:
path.append(col[path_idx].value)
# col_key = ".".join([col[coltype_idx].value] + path)
# col_key = p2s([col[coltype_idx].value] + path)
# col_index[col_key] = SimpleNamespace(column=col, col_index=col_idx)
if col[coltype_idx].value not in [ColumnType.SCALAR.name, ColumnType.LIST.name]:
continue
path_str = ".".join(path)
path_str = p2s(path)
assert path_str not in self._sheet_index
self._sheet_index[path_str] = SimpleNamespace(
sheetname=sheetname, sheet=sheet, col_index=col_idx,
......@@ -197,13 +227,13 @@ out: union[dict, None]
value = content[0]
else:
value = ";".join(content)
path_str = ".".join(path)
path_str = p2s(path)
assert path_str not in insertables
insertables[path_str] = value
if only_collect_insertables:
return insertables
if not current_path:
return
return None
# actual data insertion
insert_row = None
......@@ -219,6 +249,14 @@ out: union[dict, None]
sheet.cell(row=insert_row+1, column=col_index+1, value=value)
# self._handle_simple_data(data=content, current_path=path)
# Insert foreign keys
if insert_row is not None and sheet is not None and _is_exploded_sheet(sheet):
foreigns = _get_foreign_key_columns(sheet)
for index, path in ((f.index, f.path) for f in foreigns.values()):
value = _get_deep_value(self._context, path)
sheet.cell(row=insert_row+1, column=index+1, value=value)
return None
......
......@@ -5,6 +5,7 @@
#
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Henrik tom Wörden <h.tomwoerden@indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -31,6 +32,8 @@ from openpyxl import Workbook
from openpyxl.styles import PatternFill
from openpyxl.workbook.child import INVALID_TITLE_REGEX
from .utils import p2s
class ColumnType(Enum):
""" column types enum """
......@@ -187,7 +190,7 @@ class TableTemplateGenerator(ABC):
if (schema['items'].get('type') == 'object'
and len(path) > 1): # list of references; special treatment
# we add a new sheet with columns generated from the subtree of the schema
sheetname = ".".join(path)
sheetname = p2s(path)
if sheetname in sheets:
raise ValueError("The schema would lead to two sheets with the same name, "
f"which is forbidden: {sheetname}")
......@@ -200,11 +203,11 @@ class TableTemplateGenerator(ABC):
for array_path in array_paths:
foreigns = self._get_foreign_keys(foreign_keys, array_path)
for foreign in foreigns:
internal_key = ".".join(array_path + [foreign])
internal_key = p2s(array_path + [foreign])
if internal_key in sheets[sheetname]:
raise ValueError("The schema would lead to two columns with the same "
f"name, which is forbidden: {internal_key}")
ref_sheet = ".".join(array_path)
ref_sheet = p2s(array_path)
sheets[sheetname][internal_key] = (
ColumnType.FOREIGN, f"see sheet '{ref_sheet}'", array_path + [foreign])
# Columns are added to the new sheet, thus we do not return any columns for the
......@@ -237,7 +240,7 @@ class TableTemplateGenerator(ABC):
# The schema is a leaf.
description = schema['description'] if 'description' in schema else None
# definition of a single column
default_return = {".".join(path[level_in_sheet_name:]): (ctype, description, path)}
default_return = {p2s(path[level_in_sheet_name:]): (ctype, description, path)}
if 'type' not in schema and 'enum' in schema:
return default_return
if 'type' not in schema and 'anyOf' in schema:
......
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List
def p2s(path: List[str]):
"""Path to string: dot-separated.
"""
return ".".join(path)
......@@ -37,8 +37,8 @@ def rfp(*pathcomponents):
def test_detect():
example = load_workbook(rfp("example_template.xlsx"))
assert 1 == _get_row_type_column_index(example['Person'])
assert [2, 3] == _get_path_rows(example['Person'])
assert 0 == _get_row_type_column_index(example['Person'])
assert [1, 2] == _get_path_rows(example['Person'])
def test_fill_xlsx():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment