Skip to content
Snippets Groups Projects
Commit 55e6f39b authored by Alexander Schlemmer's avatar Alexander Schlemmer
Browse files

API(validator): new validator behavior

parent 17017e1a
No related branches found
No related tags found
2 merge requests!217TST: Make NamedTemporaryFiles Windows-compatible,!201Validator that checks created records using a json schema
Pipeline #58338 failed
...@@ -41,7 +41,7 @@ from caoscrawler import scanner ...@@ -41,7 +41,7 @@ from caoscrawler import scanner
# from collections import OrderedDict # from collections import OrderedDict
def load_json_schema_from_datamodel_yaml(filename: str) -> list: def load_json_schema_from_datamodel_yaml(filename: str) -> dict[str, dict]:
""" """
Load a data model yaml file (using caosadvancedtools) and convert Load a data model yaml file (using caosadvancedtools) and convert
all record types into a json schema using the json_schema_exporter module. all record types into a json schema using the json_schema_exporter module.
...@@ -53,15 +53,16 @@ def load_json_schema_from_datamodel_yaml(filename: str) -> list: ...@@ -53,15 +53,16 @@ def load_json_schema_from_datamodel_yaml(filename: str) -> list:
Returns Returns
------- -------
A list of json schema objects. A dict of json schema objects. The keys are the record types for which the schemas
are generated.
""" """
model = parse_model_from_yaml(filename) model = parse_model_from_yaml(filename)
rt_schemas = [] rt_schemas = {}
for el_key, el in model.items(): for el_key, el in model.items():
if isinstance(el, db.RecordType): if isinstance(el, db.RecordType):
rt_schemas.append(recordtype_to_json_schema(el)) rt_schemas[el_key] = recordtype_to_json_schema(el)
return rt_schemas return rt_schemas
...@@ -119,10 +120,10 @@ def convert_record(record: db.Record): ...@@ -119,10 +120,10 @@ def convert_record(record: db.Record):
The record that is supposed to be converted. The record that is supposed to be converted.
""" """
pobj = convert_to_python_object(record).serialize() pobj = convert_to_python_object(record).serialize()
return apply_schema_patches(pobj) return _apply_schema_patches(pobj)
def validate(records: list[db.Record], schemas: list[dict]) -> list[tuple[bool, list]]: def validate(records: list[db.Record], schemas: dict[str, dict]) -> list[tuple[bool, list]]:
""" """
Validate a list of records against a list of possible JSON schemas. Validate a list of records against a list of possible JSON schemas.
...@@ -146,21 +147,17 @@ def validate(records: list[db.Record], schemas: list[dict]) -> list[tuple[bool, ...@@ -146,21 +147,17 @@ def validate(records: list[db.Record], schemas: list[dict]) -> list[tuple[bool,
- Index 1: A list of schemas matching the record at this position of the list `records`. - Index 1: A list of schemas matching the record at this position of the list `records`.
""" """
# TODO:
# I think it makes sense to change the behavior as follows:
# - Only validate the schema that was generated for a specific record type that matches the parent
# record that is validated.
# - With this behavior for each record a single schema is matched, and if it does not match the
# validation error can be returned.
retval = [] retval = []
for r in records: for r in records:
matching_schemas = [] if len(r.parents) != 0:
for schema in schemas: raise RuntimeError(
try: "Schema validation is only supported if records have exactly one parent.")
jsonschema.validate(convert_record(r), schema) if r.parents[0] not in schemas:
matching_schemas.append(schema) raise RuntimeError(
except ValidationError: "No schema for record type {} in schema dictionary.".format(r.parents[0]))
pass try:
retval.append((len(matching_schemas) > 0, matching_schemas)) jsonschema.validate(convert_record(r), schemas[r.parents[0]])
retval.append((True, None))
except ValidationError as ex:
retval.append((False, ex))
return retval return retval
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment