diff --git a/src/caosadvancedtools/table_json_conversion/convert.py b/src/caosadvancedtools/table_json_conversion/convert.py index 09882f963fd976583d4acbc8f2dd3b67ef510ac8..bffab78e175ae1291e7447186c356bfa39f2a5ef 100644 --- a/src/caosadvancedtools/table_json_conversion/convert.py +++ b/src/caosadvancedtools/table_json_conversion/convert.py @@ -25,10 +25,11 @@ from __future__ import annotations import datetime import itertools import sys +import textwrap from functools import reduce from operator import getitem from types import SimpleNamespace -from typing import Any, BinaryIO, Callable, TextIO, Union +from typing import Any, BinaryIO, Callable, TextIO, Union, Optional from warnings import warn import jsonschema @@ -46,6 +47,105 @@ def _strict_bool(value: Any) -> bool: raise TypeError(f"Not a good boolean: {repr(value)}") +def _column_id_to_chars(num): + """Converts a column id (zero based) to the corresponding string + representation, e.g. 0 -> 'A', 97 -> 'CT'""" + if num < 0: + return "" + return _column_id_to_chars(int(num / 26) - 1) + chr(int(num % 26) + 65) + + +def _format_exception_table(exceptions: list[tuple], worksheet_title: str, + column_names: Optional[Union[dict, list]] = None, + max_line_length: int = 120) -> str: + """ + Given a list of tuples containing a row and column number as well as an + exception in that order, and the title of the current worksheet, returns + a formatted table of the exceptions. + + Optionally takes a dict of column names, if given a header will be + generated for each column and exceptions will be clustered by column. + + Default line length is 120 and can be overwritten by max_line_length. + + Params + ------ + exceptions: list of tuples containing row, column, and exception + Data to be formatted + worksheet_title: str + Name of the current worksheet + column_names: dict or list, optional + column_names[column_num] should return the name of + column column_names. + If given, exceptions will be clustered by column. + max_line_length: int, default=120 + Soft cap for the line length of the resulting table + + Return + ------ + string_rep: str + Table containing the given exceptions + """ + max_line_length -= 40 # Estimate of Field + Type space use + + headers = {"loc": "Location", "type": "Error Type", "mess": ["Message"]} + lengths = {key: len(headers[key]) for key in headers} + new_data = [] + + current_column = None + exceptions.sort(key=lambda tup: tup[1]) + for row_i, col_i, excep in exceptions: + if column_names is not None: + # Add a line with information about the current column + if current_column != col_i: + current_column = col_i + new_data.append({ + "loc": f"\nErrors in column '{column_names[col_i]}':", + "type": "", "mess": [""] + }) + # Setup for current Exception + curr_err_data = {} + new_data.append(curr_err_data) + # Get field + if isinstance(row_i, int): + curr_err_data["loc"] = f"Cell {_column_id_to_chars(col_i)}{row_i + 1}" + else: + curr_err_data["loc"] = f"Column {_column_id_to_chars(col_i)}" + lengths["loc"] = max(lengths["loc"], len(curr_err_data["loc"])) + # Add error code + curr_err_data["type"] = type(excep).__name__ + lengths["type"] = max(lengths["type"], len(curr_err_data["type"])) + # Format message - split into lines + lines = str(excep).split('\n') + new_lines = [] + for line in lines: + new_lines += textwrap.wrap(line, max_line_length, break_long_words=False) + for line in new_lines: + lengths["mess"] = max(lengths["mess"], len(line)) + if new_lines == []: + new_lines = [""] + curr_err_data["mess"] = new_lines + + # Generate underline for each header + dividers = {key: '–' * l for key, l in lengths.items()} + dividers["mess"] = [dividers["mess"]] + # Fill with spaces for alignment + string_rep = f"There were errors during the validation of worksheet '{worksheet_title}':\n\n" + for curr_err_data in [headers, dividers] + new_data: + string_rep += ' {loc: <{fill}} '.format(loc=curr_err_data["loc"], + fill=lengths["loc"]) + string_rep += ' {typ: <{fill}} '.format(typ=curr_err_data["type"], + fill=lengths["type"]) + # Fill for the messages is set to 0, if we want another column or align + # right we need to use lengths["mess"] + string_rep += ' {mes: <{fill}}\n'.format(mes=curr_err_data["mess"][0], fill=0) + for line in curr_err_data["mess"][1:]: + # Front padding for lines without location and error type + string_rep += ' ' * (lengths["loc"] + lengths["type"] + 6) + string_rep += ' {mes: <{fill}}\n'.format(mes=line, fill=0) + return string_rep + + class ForeignError(KeyError): def __init__(self, *args, definitions: list, message: str = ""): super().__init__(message, *args) @@ -55,10 +155,9 @@ class ForeignError(KeyError): class XLSXConverter: """Class for conversion from XLSX to JSON. -For a detailed description of the required formatting of the XLSX files, see ``specs.md`` in the -documentation. + For a detailed description of the required formatting of the XLSX files, see ``specs.md`` in the + documentation. """ - PARSER: dict[str, Callable] = { "string": str, "number": float, @@ -69,21 +168,26 @@ documentation. def __init__(self, xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO], strict: bool = False): """ -Parameters ----------- -xlsx: Union[str, BinaryIO] - Path to the XLSX file or opened file object. + Parameters + ---------- + xlsx: Union[str, BinaryIO] + Path to the XLSX file or opened file object. -schema: Union[dict, str, TextIO] - Schema for validation of XLSX content. + schema: Union[dict, str, TextIO] + Schema for validation of XLSX content. -strict: bool, optional - If True, fail faster. -""" + strict: bool, optional + If True, fail faster. + """ self._workbook = load_workbook(xlsx) self._schema = read_or_dict(schema) self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook) - self._check_columns(fail_fast=strict) + try: + self._check_columns(fail_fast=strict) + except KeyError as e: + raise jsonschema.ValidationError(f"Malformed metadata: Cannot parse paths. " + f"Unknown path: '{e.args[1]}' in sheet '{e.args[0]}'." + ) from e self._handled_sheets: set[str] = set() self._result: dict = {} self._errors: dict = {} @@ -91,27 +195,47 @@ strict: bool, optional def to_dict(self, validate: bool = False, collect_errors: bool = True) -> dict: """Convert the xlsx contents to a dict. -Parameters ----------- -validate: bool, optional - If True, validate the result against the schema. + Parameters + ---------- + validate: bool, optional + If True, validate the result against the schema. -collect_errors: bool, optional - If True, do not fail at the first error, but try to collect as many errors as possible. After an - Exception is raised, the errors can be collected with ``get_errors()`` and printed with - ``get_error_str()``. + collect_errors: bool, optional + If True, do not fail at the first error, but try to collect as many errors as possible. After an + Exception is raised, the errors can be collected with ``get_errors()`` and printed with + ``get_error_str()``. -Returns -------- -out: dict - A dict representing the JSON with the extracted data. + Returns + ------- + out: dict + A dict representing the JSON with the extracted data. """ self._handled_sheets = set() self._result = {} self._errors = {} - for sheetname in self._workbook.sheetnames: - if sheetname not in self._handled_sheets: - self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors) + if not collect_errors: + for sheetname in self._workbook.sheetnames: + if sheetname not in self._handled_sheets: + self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors) + else: + # Collect errors from converting + exceptions = [] + for sheetname in self._workbook.sheetnames: + if sheetname not in self._handled_sheets: + try: + self._handle_sheet(self._workbook[sheetname], fail_later=collect_errors) + except jsonschema.ValidationError as e: + exceptions.append(e) + # do not collect errors from sheet again + self._handled_sheets.add(sheetname) + if len(exceptions) == 1: + raise exceptions[0] + elif len(exceptions) > 1: + mess = "There were errors during the validation of several worksheets:\n\n" + mess += '\n\n'.join([str(e).replace("There were errors during the validation of worksheet", + "In worksheet") + for e in exceptions]) + raise jsonschema.ValidationError(mess) if validate: jsonschema.validate(self._result, self._schema) if self._errors: @@ -147,8 +271,11 @@ out: dict parents[xlsx_utils.p2s(col.path[:-1])] = col.path[:-1] col_paths.append(col.path) for path in parents.values(): - subschema = xlsx_utils.get_subschema(path, self._schema) - + try: + subschema = xlsx_utils.get_subschema(path, self._schema) + except KeyError as kerr: + kerr.args = (sheetname, *kerr.args) + raise # Unfortunately, there are a lot of special cases to handle here. if subschema.get("type") == "array": subschema = subschema["items"] @@ -177,24 +304,29 @@ out: dict def _handle_sheet(self, sheet: Worksheet, fail_later: bool = False) -> None: """Add the contents of the sheet to the result (stored in ``self._result``). -Each row in the sheet corresponds to one entry in an array in the result. Which array exactly is -defined by the sheet's "proper name" and the content of the foreign columns. + Each row in the sheet corresponds to one entry in an array in the result. Which array exactly is + defined by the sheet's "proper name" and the content of the foreign columns. -Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name". + Look at ``xlsx_utils.get_path_position`` for the specification of the "proper name". -Parameters ----------- -fail_later: bool, optional - If True, do not fail with unresolvable foreign definitions, but collect all errors. -""" + Parameters + ---------- + fail_later: bool, optional + If True, do not fail with unresolvable foreign definitions, but collect all errors. + """ row_type_column = xlsx_utils.get_row_type_column_index(sheet) + col_type_row = xlsx_utils.get_column_type_row_index(sheet) foreign_columns = xlsx_utils.get_foreign_key_columns(sheet) foreign_column_paths = {col.index: col.path for col in foreign_columns.values()} data_columns = xlsx_utils.get_data_columns(sheet) data_column_paths = {col.index: col.path for col in data_columns.values()} # Parent path, insert in correct order. - parent, proper_name = xlsx_utils.get_path_position(sheet) + try: + parent, proper_name = xlsx_utils.get_path_position(sheet) + except UnboundLocalError as e: + raise jsonschema.ValidationError(f"Malformed metadata: Cannot parse " + f"paths in worksheet '{sheet.title}'.") from e if parent: parent_sheetname = xlsx_utils.get_worksheet_for_path(parent, self._defining_path_index) if parent_sheetname not in self._handled_sheets: @@ -206,8 +338,11 @@ fail_later: bool, optional # # - data: The actual data of this entry, a dict. # entries: dict[str, list[SimpleNamespace]] = {} + exceptions = [] + warns = [] + col_names = {} for row_idx, row in enumerate(sheet.iter_rows(values_only=True)): - # Skip non-data rows. + # Skip non-data rows if row[row_type_column] is not None: continue foreign_repr = "" @@ -220,24 +355,41 @@ fail_later: bool, optional foreign.append(foreign_column_paths[col_idx] + [value]) continue - if col_idx in data_column_paths: - path = data_column_paths[col_idx] - if self._is_multiple_choice(path): - real_value = path.pop() # Last component is the enum value, insert above - # set up list - try: - _set_in_nested(mydict=data, path=path, value=[], prefix=parent, skip=1) - except ValueError as err: - if not str(err).startswith("There is already some value at"): - raise - if not xlsx_utils.parse_multiple_choice(value): - continue - _set_in_nested(mydict=data, path=path, value=real_value, prefix=parent, - skip=1, append_to_list=True) + try: + if col_idx in data_column_paths: + path = data_column_paths[col_idx] + col_names[col_idx] = '.'.join(path) + if self._is_multiple_choice(path): + real_value = path.pop() # Last component is the enum value, insert above + # set up list + try: + _set_in_nested(mydict=data, path=path, value=[], prefix=parent, skip=1) + except ValueError as err: + if not str(err).startswith("There is already some value at"): + raise + if not xlsx_utils.parse_multiple_choice(value): + continue + _set_in_nested(mydict=data, path=path, value=real_value, prefix=parent, + skip=1, append_to_list=True) + else: + value = self._validate_and_convert(value, path) + _set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1) + continue + elif sheet.cell(col_type_row+1, col_idx+1).value is None: + mess = (f"\nNo metadata configured for column " + f"'{_column_id_to_chars(col_idx)}' in worksheet " + f"'{sheet.title}'.\n") + if mess not in warns: + print(mess, file=sys.stderr) + warns.append(mess) # Prevent multiple instances of same warning + except (ValueError, KeyError, jsonschema.ValidationError) as e: + # Append error for entire column only once + if isinstance(e, KeyError) and 'column' in str(e): + if len([err for ri, ci, err in exceptions + if ci == col_idx and isinstance(err, KeyError)]) == 0: + exceptions.append((None, col_idx, e)) else: - value = self._validate_and_convert(value, path) - _set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1) - continue + exceptions.append((row_idx, col_idx, e)) try: # Find current position in tree @@ -251,6 +403,12 @@ fail_later: bool, optional if not fail_later: raise self._errors[(sheet.title, row_idx)] = kerr.definitions + + if exceptions: + exception_table = _format_exception_table(exceptions, sheet.title, + col_names) + raise jsonschema.ValidationError(exception_table) + self._handled_sheets.add(sheet.title) def _is_multiple_choice(self, path: list[str]) -> bool: @@ -267,9 +425,9 @@ fail_later: bool, optional def _get_parent_dict(self, parent_path: list[str], foreign: list[list]) -> dict: """Return the dict into which values can be inserted. -This method returns, from the current result-in-making, the entry at ``parent_path`` which matches -the values given in the ``foreign`` specification. -""" + This method returns, from the current result-in-making, the entry at ``parent_path`` which matches + the values given in the ``foreign`` specification. + """ foreign_groups = _group_foreign_paths(foreign, common=parent_path) current_object = self._result @@ -296,33 +454,31 @@ the values given in the ``foreign`` specification. def _validate_and_convert(self, value: Any, path: list[str]): """Apply some basic validation and conversion steps. -This includes: -- Validation against the type given in the schema -- List typed values are split at semicolons and validated individually + This includes: + - Validation against the type given in the schema + - List typed values are split at semicolons and validated individually """ if value is None: return value - subschema = self._get_subschema(path) + try: + subschema = self._get_subschema(path) + except KeyError as e: + raise KeyError("There is no entry in the schema that corresponds to this column.") from e # Array handling only if schema says it's an array. if subschema.get("type") == "array": array_type = subschema["items"]["type"] if isinstance(value, str) and ";" in value: values = [self.PARSER[array_type](v) for v in value.split(";")] return values - try: - # special case: datetime or date - if ("anyOf" in subschema): - if isinstance(value, datetime.datetime) and ( - {'type': 'string', 'format': 'date-time'} in subschema["anyOf"]): - return value - if isinstance(value, datetime.date) and ( - {'type': 'string', 'format': 'date'} in subschema["anyOf"]): - return value - jsonschema.validate(value, subschema) - except jsonschema.ValidationError as verr: - print(verr) - print(path) - raise + # special case: datetime or date + if ("anyOf" in subschema): + if isinstance(value, datetime.datetime) and ( + {'type': 'string', 'format': 'date-time'} in subschema["anyOf"]): + return value + if isinstance(value, datetime.date) and ( + {'type': 'string', 'format': 'date'} in subschema["anyOf"]): + return value + jsonschema.validate(value, subschema) # Finally: convert to target type return self.PARSER[subschema.get("type", "string")](value) @@ -340,29 +496,29 @@ This includes: def _group_foreign_paths(foreign: list[list], common: list[str]) -> list[SimpleNamespace]: """Group the foreign keys by their base paths. -Parameters ----------- -foreign: list[list] - A list of foreign definitions, consisting of path components, property and possibly value. - -common: list[list[str]] - A common path which defines the final target of the foreign definitions. This helps to understand - where the ``foreign`` paths shall be split. - -Returns -------- -out: list[dict[str, list[list]]] - - A list of foreign path segments, grouped by their common segments. Each element is a namespace - with detailed information of all those elements which form the group. The namespace has the - following attributes: - - - ``path``: The full path to this path segment. This is always the previous segment's ``path`` - plus this segment's ``subpath``. - - ``stringpath``: The stringified ``path``, might be useful for comparison or sorting. - - ``subpath``: The path, relative from the previous segment. - - ``definitions``: A list of the foreign definitions for this segment, but stripped of the - ``path`` components. + Parameters + ---------- + foreign: list[list] + A list of foreign definitions, consisting of path components, property and possibly value. + + common: list[list[str]] + A common path which defines the final target of the foreign definitions. This helps to understand + where the ``foreign`` paths shall be split. + + Returns + ------- + out: list[dict[str, list[list]]] + + A list of foreign path segments, grouped by their common segments. Each element is a namespace + with detailed information of all those elements which form the group. The namespace has the + following attributes: + + - ``path``: The full path to this path segment. This is always the previous segment's ``path`` + plus this segment's ``subpath``. + - ``stringpath``: The stringified ``path``, might be useful for comparison or sorting. + - ``subpath``: The path, relative from the previous segment. + - ``definitions``: A list of the foreign definitions for this segment, but stripped of the + ``path`` components. """ # Build a simple dict first, without subpath. results = {} @@ -392,9 +548,6 @@ out: list[dict[str, list[list]]] last_level = len(elem.path) resultlist.append(elem) - # from IPython import embed - # embed() - if last_level != len(common): raise ValueError("Foreign keys must cover the complete `common` depth.") return resultlist @@ -405,31 +558,31 @@ def _set_in_nested(mydict: dict, path: list, value: Any, prefix: list = [], skip overwrite: bool = False, append_to_list: bool = False) -> dict: """Set a value in a nested dict. -Parameters ----------- -mydict: dict - The dict into which the ``value`` shall be inserted. -path: list - A list of keys, denoting the location of the value. -value - The value which shall be set inside the dict. -prefix: list - A list of keys which shall be removed from ``path``. A KeyError is raised if ``path`` does not - start with the elements of ``prefix``. -skip: int = 0 - Remove this many additional levels from the path, *after* removing the prefix. -overwrite: bool = False - If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values - leads to an exception. -append_to_list: bool = False - If True, assume that the element at ``path`` is a list and append the value to it. If the list - does not exist, create it. If there is a non-list at ``path`` already, overwrite it with a new - list, if ``overwrite`` is True, otherwise raise a ValueError. - -Returns -------- -mydict: dict - The same dictionary that was given as a parameter, but modified. + Parameters + ---------- + mydict: dict + The dict into which the ``value`` shall be inserted. + path: list + A list of keys, denoting the location of the value. + value + The value which shall be set inside the dict. + prefix: list + A list of keys which shall be removed from ``path``. A KeyError is raised if ``path`` does not + start with the elements of ``prefix``. + skip: int = 0 + Remove this many additional levels from the path, *after* removing the prefix. + overwrite: bool = False + If True, allow overwriting existing content. Otherwise, attempting to overwrite existing values + leads to an exception. + append_to_list: bool = False + If True, assume that the element at ``path`` is a list and append the value to it. If the list + does not exist, create it. If there is a non-list at ``path`` already, overwrite it with a new + list, if ``overwrite`` is True, otherwise raise a ValueError. + + Returns + ------- + mydict: dict + The same dictionary that was given as a parameter, but modified. """ for idx, el in enumerate(prefix): if path[idx] != el: @@ -473,25 +626,25 @@ def to_dict(xlsx: Union[str, BinaryIO], schema: Union[dict, str, TextIO], validate: bool = None, strict: bool = False) -> dict: """Convert the xlsx contents to a dict, it must follow a schema. -Parameters ----------- -xlsx: Union[str, BinaryIO] - Path to the XLSX file or opened file object. + Parameters + ---------- + xlsx: Union[str, BinaryIO] + Path to the XLSX file or opened file object. -schema: Union[dict, str, TextIO] - Schema for validation of XLSX content. + schema: Union[dict, str, TextIO] + Schema for validation of XLSX content. -validate: bool, optional - If True, validate the result against the schema. + validate: bool, optional + If True, validate the result against the schema. -strict: bool, optional - If True, fail faster. + strict: bool, optional + If True, fail faster. -Returns -------- -out: dict - A dict representing the JSON with the extracted data. + Returns + ------- + out: dict + A dict representing the JSON with the extracted data. """ converter = XLSXConverter(xlsx, schema, strict=strict) return converter.to_dict() diff --git a/src/caosadvancedtools/table_json_conversion/xlsx_utils.py b/src/caosadvancedtools/table_json_conversion/xlsx_utils.py index 5002f3ac7fe4bd78accffe0697cd7ecc7273dc27..7770e3ecd372aa0bfb149659dc6397e3b675ff87 100644 --- a/src/caosadvancedtools/table_json_conversion/xlsx_utils.py +++ b/src/caosadvancedtools/table_json_conversion/xlsx_utils.py @@ -68,22 +68,22 @@ class RowType(Enum): def array_schema_from_model_schema(model_schema: dict) -> dict: """Convert a *data model* schema to a *data array* schema. -Practically, this means that the top level properties are converted into lists. In a simplified -notation, this can be expressed as: - -``array_schema = { elem: [elem typed data...] for elem in model_schema }`` - -Parameters ----------- -model_schema: dict - The schema description of the data model. Must be a json schema *object*, with a number of - *object* typed properties. - -Returns -------- -array_schema: dict - A corresponding json schema, where the properties are arrays with the types of the input's - top-level properties. + Practically, this means that the top level properties are converted into lists. In a simplified + notation, this can be expressed as: + + ``array_schema = { elem: [elem typed data...] for elem in model_schema }`` + + Parameters + ---------- + model_schema: dict + The schema description of the data model. Must be a json schema *object*, with a number of + *object* typed properties. + + Returns + ------- + array_schema: dict + A corresponding json schema, where the properties are arrays with the types of the input's + top-level properties. """ assert model_schema["type"] == "object" result = deepcopy(model_schema) @@ -100,30 +100,30 @@ array_schema: dict def get_defining_paths(workbook: Workbook) -> dict[str, list[list[str]]]: """For all sheets in ``workbook``, list the paths which they define. -A sheet is said to define a path, if it has data columns for properties inside that path. For -example, consider the following worksheet: - -| `COL_TYPE` | `SCALAR` | `SCALAR` | `LIST` | `SCALAR` | -| `PATH` | `Training` | `Training` | `Training` | `Training` | -| `PATH` | `url` | `date` | `subjects` | `supervisor` | -| `PATH` | | | | `email` | -|------------|----------------|---------------|--------------|--------------------| -| | example.com/mp | 2024-02-27 | Math;Physics | steve@example.com | -| | example.com/m | 2024-02-27 | Math | stella@example.com | - -This worksheet defines properties for the paths `["Training"]` and `["Training", "supervisor"]`, and -thus these two path lists would be returned for the key with this sheet's sheetname. - -Parameters ----------- -workbook: Workbook - The workbook to analyze. - -Returns -------- -out: dict[str, list[list[str]] - A dict with worksheet names as keys and lists of paths (represented as string lists) as values. -""" + A sheet is said to define a path, if it has data columns for properties inside that path. For + example, consider the following worksheet: + + | `COL_TYPE` | `SCALAR` | `SCALAR` | `LIST` | `SCALAR` | + | `PATH` | `Training` | `Training` | `Training` | `Training` | + | `PATH` | `url` | `date` | `subjects` | `supervisor` | + | `PATH` | | | | `email` | + |------------|----------------|---------------|--------------|--------------------| + | | example.com/mp | 2024-02-27 | Math;Physics | steve@example.com | + | | example.com/m | 2024-02-27 | Math | stella@example.com | + + This worksheet defines properties for the paths `["Training"]` and `["Training", "supervisor"]`, and + thus these two path lists would be returned for the key with this sheet's sheetname. + + Parameters + ---------- + workbook: Workbook + The workbook to analyze. + + Returns + ------- + out: dict[str, list[list[str]] + A dict with worksheet names as keys and lists of paths (represented as string lists) as values. + """ result: dict[str, list[list[str]]] = {} for sheet in workbook.worksheets: paths = [] @@ -140,11 +140,11 @@ out: dict[str, list[list[str]] def get_data_columns(sheet: Worksheet) -> dict[str, SimpleNamespace]: """Return the data paths of the worksheet. -Returns -------- -out: dict[str, SimpleNamespace] - The keys are the stringified paths. The values are SimpleNamespace objects with ``index``, - ``path`` and ``column`` attributes. + Returns + ------- + out: dict[str, SimpleNamespace] + The keys are the stringified paths. The values are SimpleNamespace objects with ``index``, + ``path`` and ``column`` attributes. """ column_types = _get_column_types(sheet) path_rows = get_path_rows(sheet) @@ -171,11 +171,11 @@ out: dict[str, SimpleNamespace] def get_foreign_key_columns(sheet: Worksheet) -> dict[str, SimpleNamespace]: """Return the foreign keys of the worksheet. -Returns -------- -out: dict[str, SimpleNamespace] - The keys are the stringified paths. The values are SimpleNamespace objects with ``index``, - ``path`` and ``column`` attributes. + Returns + ------- + out: dict[str, SimpleNamespace] + The keys are the stringified paths. The values are SimpleNamespace objects with ``index``, + ``path`` and ``column`` attributes. """ column_types = _get_column_types(sheet) path_rows = get_path_rows(sheet) @@ -198,20 +198,20 @@ out: dict[str, SimpleNamespace] def get_path_position(sheet: Worksheet) -> tuple[list[str], str]: """Return a path which represents the parent element, and the sheet's "proper name". -For top-level sheets / entries (those without foreign columns), the path is an empty list. + For top-level sheets / entries (those without foreign columns), the path is an empty list. -A sheet's "proper name" is detected from the data column paths: it is the first component after the -parent components. + A sheet's "proper name" is detected from the data column paths: it is the first component after the + parent components. -Returns -------- -parent: list[str] - Path to the parent element. Note that there may be list elements on the path which are **not** - represented in this return value. + Returns + ------- + parent: list[str] + Path to the parent element. Note that there may be list elements on the path which are **not** + represented in this return value. -proper_name: str - The "proper name" of this sheet. This defines an array where all the data lives, relative to the - parent path. + proper_name: str + The "proper name" of this sheet. This defines an array where all the data lives, relative to the + parent path. """ # Parent element: longest common path shared among any foreign column and all the data columns parent: list[str] = [] @@ -258,6 +258,16 @@ def get_row_type_column_index(sheet: Worksheet): raise ValueError("The column which defines row types (COL_TYPE, PATH, ...) is missing") +def get_column_type_row_index(sheet: Worksheet): + """Return the row index (0-indexed) of the row which defines the column types. + """ + for row in sheet.rows: + for cell in row: + if cell.value == RowType.COL_TYPE.name: + return cell.row - 1 + raise ValueError("The column which defines row types (COL_TYPE, SCALAR, ...) is missing") + + def get_subschema(path: list[str], schema: dict) -> dict: """Return the sub schema at ``path``.""" if path: @@ -285,7 +295,7 @@ def is_exploded_sheet(sheet: Worksheet) -> bool: """Return True if this is a an "exploded" sheet. An exploded sheet is a sheet whose data entries are LIST valued properties of entries in another - sheet. A sheet is detected as exploded iff it has FOREIGN columns. + sheet. A sheet is detected as exploded if and only if it has FOREIGN columns. """ column_types = _get_column_types(sheet) return ColumnType.FOREIGN.name in column_types.values() @@ -308,22 +318,22 @@ def p2s(path: list[str]) -> str: def parse_multiple_choice(value: Any) -> bool: """Interpret ``value`` as a multiple choice input. -*Truthy* values are: -- The boolean ``True``. -- The number "1". -- The (case-insensitive) strings ``true``, ``wahr``, ``x``, ``√``, ``yes``, ``ja``, ``y``, ``j``. - -*Falsy* values are: -- The boolean ``False``. -- ``None``, empty strings, lists, dicts. -- The number "0". -- The (case-insensitive) strings ``false``, ``falsch``, ``-``, ``no``, ``nein``, ``n``. -- Everything else. - -Returns -------- -out: bool - The interpretation result of ``value``. + *Truthy* values are: + - The boolean ``True``. + - The number "1". + - The (case-insensitive) strings ``true``, ``wahr``, ``x``, ``√``, ``yes``, ``ja``, ``y``, ``j``. + + *Falsy* values are: + - The boolean ``False``. + - ``None``, empty strings, lists, dicts. + - The number "0". + - The (case-insensitive) strings ``false``, ``falsch``, ``-``, ``no``, ``nein``, ``n``. + - Everything else. + + Returns + ------- + out: bool + The interpretation result of ``value``. """ # Non-string cases first: # pylint: disable-next=too-many-boolean-expressions @@ -349,7 +359,7 @@ out: bool def read_or_dict(data: Union[dict, str, TextIO]) -> dict: """If data is a json file name or input stream, read data from there. -If it is a dict already, just return it.""" + If it is a dict already, just return it.""" if isinstance(data, dict): return data diff --git a/unittests/table_json_conversion/data/simple_data_broken.xlsx b/unittests/table_json_conversion/data/simple_data_broken.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..a65d464a53459de73e41fd20d807899c44728cda Binary files /dev/null and b/unittests/table_json_conversion/data/simple_data_broken.xlsx differ diff --git a/unittests/table_json_conversion/data/simple_data_broken_paths.xlsx b/unittests/table_json_conversion/data/simple_data_broken_paths.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..0221570c942fc28f2b59a282f751781ff4a504fa Binary files /dev/null and b/unittests/table_json_conversion/data/simple_data_broken_paths.xlsx differ diff --git a/unittests/table_json_conversion/test_read_xlsx.py b/unittests/table_json_conversion/test_read_xlsx.py index 0eec2e9caa1f800ad86ab43057b8c512dc09881f..ac0a42b59478a57a1bf7ef53f4333e20c0358e76 100644 --- a/unittests/table_json_conversion/test_read_xlsx.py +++ b/unittests/table_json_conversion/test_read_xlsx.py @@ -27,6 +27,7 @@ import re from types import SimpleNamespace +import jsonschema import pytest from caosadvancedtools.table_json_conversion import convert @@ -112,6 +113,64 @@ def test_missing_columns(): assert expected in messages +def test_error_table(): + with pytest.raises(jsonschema.ValidationError) as caught: + convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"), + schema=rfp("data/simple_schema.json")) + # Correct Errors + assert "Malformed metadata: Cannot parse paths in worksheet 'Person'." in str(caught.value) + assert "'Not a num' is not of type 'number'" in str(caught.value) + assert "'Yes a number?' is not of type 'number'" in str(caught.value) + assert "1.5 is not of type 'integer'" in str(caught.value) + assert "1.2345 is not of type 'integer'" in str(caught.value) + assert "'There is no entry in the schema" in str(caught.value) + assert "'Not an enum' is not one of [" in str(caught.value) + # Correct Locations + for line in str(caught.value).split('\n'): + if "'Not a num' is not of type 'number'" in line: + assert "J7" in line + if "'Yes a number?' is not of type 'number'" in line: + assert "J8" in line + if "1.5 is not of type 'integer'" in line: + assert "K7" in line + if "1.2345 is not of type 'integer'" in line: + assert "K8" in line + if "'There is no entry in the schema" in line: + assert "Column M" in line + if "'Not an enum' is not one of [" in line: + assert "G8" in line + # No additional errors + assert str(caught.value).count("Malformed metadata: Cannot parse paths in worksheet") == 1 + assert str(caught.value).count("There is no entry in the schema") == 1 + assert str(caught.value).count("is not one of") == 1 + # FIXME ToDo: Remove when boolean is fixed / when everything works as + # expected, set correct number. + if "is not of type 'boolean'" in str(caught.value): + assert str(caught.value).count("is not of type") == 6 + else: + assert str(caught.value).count("is not of type") == 4 + # Check correct error message for completely unknown path + with pytest.raises(jsonschema.ValidationError) as caught: + convert.to_dict(xlsx=rfp("data/simple_data_broken_paths.xlsx"), + schema=rfp("data/simple_schema.json")) + assert ("Malformed metadata: Cannot parse paths. Unknown path: 'There' in sheet 'Person'." + == str(caught.value)) + + +def test_additional_column(): + with pytest.raises(jsonschema.ValidationError) as caught: + convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"), + schema=rfp("data/simple_schema.json")) + # Correct Error + assert "no entry in the schema that corresponds to this column" in str(caught.value) + # Correct Location + for line in str(caught.value).split('\n'): + if "no entry in the schema that corresponds to this column" in line: + assert " M " in line + # No additional column errors + assert str(caught.value).count("no entry in the schema that corresponds to this column") == 1 + + def test_faulty_foreign(): # Simple wrong foreign key converter = convert.XLSXConverter(xlsx=rfp("data/simple_data_wrong_foreign.xlsx"),