Skip to content
Snippets Groups Projects
Commit 198eecbf authored by I. Nüske's avatar I. Nüske
Browse files

ENH: XLSXConverter now checks the paths in the given workbook for validity:

- New method XLSXConverter._check_path_validity() called in __init__
- New test with test data containing various incorrect paths
- Updated tests and test data to reflect new behaviour
parent 59ddf680
Branches
Tags
2 merge requests!128MNT: Added a warning when column metadata is not configured, and a better...,!125XLSXConverter paths validation
Pipeline #58990 passed
...@@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ...@@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### ### Fixed ###
- Yaml data model parser adds data types of properties of record types and other attributes which fixes https://gitlab.indiscale.com/caosdb/customers/f-fit/management/-/issues/58 - Yaml data model parser adds data types of properties of record types and other attributes which fixes https://gitlab.indiscale.com/caosdb/customers/f-fit/management/-/issues/58
- `XLSXConverter` now checks path validity before parsing the worksheet.
### Security ### ### Security ###
......
...@@ -182,16 +182,107 @@ class XLSXConverter: ...@@ -182,16 +182,107 @@ class XLSXConverter:
self._workbook = load_workbook(xlsx) self._workbook = load_workbook(xlsx)
self._schema = read_or_dict(schema) self._schema = read_or_dict(schema)
self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook) self._defining_path_index = xlsx_utils.get_defining_paths(self._workbook)
try: self._check_path_validity()
self._check_columns(fail_fast=strict) self._check_columns(fail_fast=strict)
except KeyError as e:
raise jsonschema.ValidationError(f"Malformed metadata: Cannot parse paths. "
f"Unknown path: '{e.args[1]}' in sheet '{e.args[0]}'."
) from e
self._handled_sheets: set[str] = set() self._handled_sheets: set[str] = set()
self._result: dict = {} self._result: dict = {}
self._errors: dict = {} self._errors: dict = {}
def _check_path_validity(self):
"""
Method to check the workbook paths for completeness and correctness,
and raises a jsonschema.ValidationError containing information on all
faulty paths if any are found.
If this method does not raise an error, this does not mean the workbook
is formatted correctly, only that the contained paths are complete and
can be found in the schema.
"""
# Setup
error_message = ["There were errors during path validation:"]
only_warnings = True
for sheetname in self._workbook.sheetnames:
sheet = self._workbook[sheetname]
error_message.append(f"\nIn sheet {sheetname}:")
# Collect path information and filter out information column
row_i_col_type = xlsx_utils.get_column_type_row_index(sheet)
path_rows = xlsx_utils.get_path_rows(sheet)
paths = []
for col_i, col in enumerate(sheet.iter_cols()):
col_type = col[row_i_col_type].value
path = [col[row_i].value for row_i in path_rows
if col[row_i].value not in [None, '']]
if col_type == 'COL_TYPE':
continue
paths.append((col_type, path, col_i, col))
# Check paths
for col_type, path, col_i, col in paths:
# No column type set
if col_type in [None, '']:
if len(path) == 0: # Likely a comment column
# Check whether the column has any visible content
content_in_column = False
for cell in col:
visible_content = ''.join(str(cell.value)).split()
if cell.value is not None and visible_content != '':
content_in_column = True
# If yes - might be an error but is not forbidden, so warn
if content_in_column:
m = (f"Warning:\tIn column {_column_id_to_chars(col_i)} "
f"there is no column metadata set. This column "
f"will be ignored during parsing.")
error_message.append(m)
continue
else: # Path is set but no column type
only_warnings = False
m = (f"ERROR:\t\tIn column {_column_id_to_chars(col_i)} "
f"the column type is missing.")
error_message.append(m)
# No continue - even if column type is missing, we can check path
if len(path) == 0: # Column type is set but no path
only_warnings = False
m = (f"ERROR:\t\tIn column {_column_id_to_chars(col_i)} "
f"the path is missing.")
error_message.append(m)
continue
# Check path is in schema
try:
subschema = xlsx_utils.get_subschema(path, self._schema)
schema_type = subschema.get('type', None)
if schema_type is None and 'enum' in subschema:
schema_type = 'enum'
if schema_type is None and 'anyOf' in subschema:
schema_type = 'anyOf'
if schema_type == 'array': # Check item type instead
schema_type = subschema.get('items', {}).get('type', None)
if schema_type in ['object', 'array', None]:
m = (f"Warning:\tIn column {_column_id_to_chars(col_i)} "
f"the path may be incomplete.")
error_message.append(m)
except KeyError as e:
only_warnings = False
m = (f"ERROR:\t\tIn column {_column_id_to_chars(col_i)} "
f"parsing of the path '{'.'.join(path)}' fails "
f"on the path component {str(e)}.\n\t\t\t"
f"This likely means the path is incomplete or not "
f"present in the schema.")
error_message.append(m)
# Cleanup if no errors were found
if error_message[-1] == f"\nIn sheet {sheetname}:":
error_message.pop(-1)
# Determine whether error / warning / nothing should be raised
if error_message == ["There were errors during path validation:"]:
return
error_message = '\n'.join(error_message)
if only_warnings:
warn(error_message)
else:
raise jsonschema.ValidationError(error_message)
def to_dict(self, validate: bool = False, collect_errors: bool = True) -> dict: def to_dict(self, validate: bool = False, collect_errors: bool = True) -> dict:
"""Convert the xlsx contents to a dict. """Convert the xlsx contents to a dict.
...@@ -375,13 +466,6 @@ class XLSXConverter: ...@@ -375,13 +466,6 @@ class XLSXConverter:
value = self._validate_and_convert(value, path) value = self._validate_and_convert(value, path)
_set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1) _set_in_nested(mydict=data, path=path, value=value, prefix=parent, skip=1)
continue continue
elif sheet.cell(col_type_row+1, col_idx+1).value is None:
mess = (f"\nNo metadata configured for column "
f"'{_column_id_to_chars(col_idx)}' in worksheet "
f"'{sheet.title}'.\n")
if mess not in warns:
print(mess, file=sys.stderr)
warns.append(mess) # Prevent multiple instances of same warning
except (ValueError, KeyError, jsonschema.ValidationError) as e: except (ValueError, KeyError, jsonschema.ValidationError) as e:
# Append error for entire column only once # Append error for entire column only once
if isinstance(e, KeyError) and 'column' in str(e): if isinstance(e, KeyError) and 'column' in str(e):
...@@ -460,10 +544,7 @@ class XLSXConverter: ...@@ -460,10 +544,7 @@ class XLSXConverter:
""" """
if value is None: if value is None:
return value return value
try:
subschema = self._get_subschema(path) subschema = self._get_subschema(path)
except KeyError as e:
raise KeyError("There is no entry in the schema that corresponds to this column.") from e
# Array handling only if schema says it's an array. # Array handling only if schema says it's an array.
if subschema.get("type") == "array": if subschema.get("type") == "array":
array_type = subschema["items"]["type"] array_type = subschema["items"]["type"]
......
No preview for this file type
File added
...@@ -105,7 +105,8 @@ def test_missing_columns(): ...@@ -105,7 +105,8 @@ def test_missing_columns():
with pytest.warns(UserWarning) as caught: with pytest.warns(UserWarning) as caught:
convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"), convert.to_dict(xlsx=rfp("data/simple_data_missing.xlsx"),
schema=rfp("data/simple_schema.json")) schema=rfp("data/simple_schema.json"))
assert str(caught.pop().message) == "Missing column: Training.coach.given_name" messages = {str(w.message) for w in caught}
assert "Missing column: Training.coach.given_name" in messages
with pytest.warns(UserWarning) as caught: with pytest.warns(UserWarning) as caught:
convert.to_dict(xlsx=rfp("data/multiple_choice_data_missing.xlsx"), convert.to_dict(xlsx=rfp("data/multiple_choice_data_missing.xlsx"),
schema=rfp("data/multiple_choice_schema.json")) schema=rfp("data/multiple_choice_schema.json"))
...@@ -122,12 +123,10 @@ def test_error_table(): ...@@ -122,12 +123,10 @@ def test_error_table():
convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"), convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"),
schema=rfp("data/simple_schema.json")) schema=rfp("data/simple_schema.json"))
# Correct Errors # Correct Errors
assert "Malformed metadata: Cannot parse paths in worksheet 'Person'." in str(caught.value)
assert "'Not a num' is not of type 'number'" in str(caught.value) assert "'Not a num' is not of type 'number'" in str(caught.value)
assert "'Yes a number?' is not of type 'number'" in str(caught.value) assert "'Yes a number?' is not of type 'number'" in str(caught.value)
assert "1.5 is not of type 'integer'" in str(caught.value) assert "1.5 is not of type 'integer'" in str(caught.value)
assert "1.2345 is not of type 'integer'" in str(caught.value) assert "1.2345 is not of type 'integer'" in str(caught.value)
assert "'There is no entry in the schema" in str(caught.value)
assert "'Not an enum' is not one of [" in str(caught.value) assert "'Not an enum' is not one of [" in str(caught.value)
# Correct Locations # Correct Locations
matches = set() matches = set()
...@@ -144,9 +143,6 @@ def test_error_table(): ...@@ -144,9 +143,6 @@ def test_error_table():
if "1.2345 is not of type 'integer'" in line: if "1.2345 is not of type 'integer'" in line:
assert "K8" in line assert "K8" in line
matches.add("K8") matches.add("K8")
if "'There is no entry in the schema" in line:
assert "Column M" in line
matches.add("Col M")
if "'Not an enum' is not one of [" in line: if "'Not an enum' is not one of [" in line:
assert "G8" in line assert "G8" in line
matches.add("K8") matches.add("K8")
...@@ -157,33 +153,62 @@ def test_error_table(): ...@@ -157,33 +153,62 @@ def test_error_table():
if "'=NOT(TRUE())' is not of type 'boolean'" in line: if "'=NOT(TRUE())' is not of type 'boolean'" in line:
assert "L10" in line assert "L10" in line
matches.add("L10") matches.add("L10")
assert matches == {"J7", "J8", "K7", "K8", "Col M", "K8", "L9", "L10"} assert matches == {"J7", "J8", "K7", "K8", "K8", "L9", "L10"}
# No additional errors # No additional errors
assert str(caught.value).count("Malformed metadata: Cannot parse paths in worksheet") == 1
assert str(caught.value).count("There is no entry in the schema") == 1
assert str(caught.value).count("is not one of") == 1 assert str(caught.value).count("is not one of") == 1
assert str(caught.value).count("is not of type") == 6 assert str(caught.value).count("is not of type") == 6
# Check correct error message for completely unknown path
with pytest.raises(jsonschema.ValidationError) as caught:
convert.to_dict(xlsx=rfp("data/simple_data_broken_paths.xlsx"),
schema=rfp("data/simple_schema.json"))
assert ("Malformed metadata: Cannot parse paths. Unknown path: 'There' in sheet 'Person'."
== str(caught.value))
def test_additional_column(): def test_malformed_paths():
with pytest.raises(jsonschema.ValidationError) as caught: with pytest.raises(jsonschema.ValidationError) as caught:
convert.to_dict(xlsx=rfp("data/simple_data_broken_paths_2.xlsx"),
schema=rfp("data/simple_schema.json"))
message_lines = str(caught.value).lower().split('\n')
expected_errors = {
'person': {'c': "column type is missing",
'd': "parsing of the path",
'e': "path may be incomplete"},
'training': {'c': "path is missing",
'd': "column type is missing",
'e': "path may be incomplete",
'f': "parsing of the path",
'g': "path may be incomplete",
'h': "parsing of the path",
'i': "parsing of the path"},
'training.coach': {'f': "no column metadata set"}}
current_sheet = None
for line in message_lines:
if 'in sheet' in line:
current_sheet = line.replace('in sheet ', '').replace(':', '')
continue
if 'in column' in line:
for column, expected_error in expected_errors[current_sheet].items():
if f'in column {column}' in line:
assert expected_error in line
expected_errors[current_sheet].pop(column)
break
for _, errors_left in expected_errors.items():
assert len(errors_left) == 0
def test_empty_columns():
with pytest.warns(UserWarning) as caught:
try:
convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"), convert.to_dict(xlsx=rfp("data/simple_data_broken.xlsx"),
schema=rfp("data/simple_schema.json")) schema=rfp("data/simple_schema.json"))
# Correct Error except jsonschema.ValidationError:
assert "no entry in the schema that corresponds to this column" in str(caught.value) pass # Errors are checked in test_error_table
# Correct Location messages = {str(w.message).lower() for w in caught}
for line in str(caught.value).split('\n'): expected_warnings = {"column h": "no column metadata"}
if "no entry in the schema that corresponds to this column" in line: for message in messages:
assert " M " in line for column, warning in list(expected_warnings.items()):
# No additional column errors if column in message:
assert str(caught.value).count("no entry in the schema that corresponds to this column") == 1 assert warning in message
expected_warnings.pop(column)
else:
assert warning not in message
assert len(expected_warnings) == 0
def test_faulty_foreign(): def test_faulty_foreign():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment