diff --git a/src/caosadvancedtools/json_schema_exporter.py b/src/caosadvancedtools/json_schema_exporter.py index 4f331585e3bbbfd5eed3b2d22b573f2d3d60cb56..172cc1c089c56d092a61feb705e15c6cc1aa2e83 100644 --- a/src/caosadvancedtools/json_schema_exporter.py +++ b/src/caosadvancedtools/json_schema_exporter.py @@ -20,9 +20,12 @@ # with this program. If not, see <https://www.gnu.org/licenses/>. # """Module for converting a data model into a json schema compatible dictionary. + +The scope of this json schema is the automatic generation of user interfaces. """ -from typing import Any, List, Optional +from collections import OrderedDict +from typing import Any, Dict, Iterable, List, Optional, Union import linkahead as db from linkahead.common.datatype import get_list_datatype, is_list_datatype @@ -46,8 +49,8 @@ class JsonSchemaExporter: Whether additional properties will be admitted in the resulting schema. Optional, default is True. name_and_description_in_properties : bool, optional - Whether to include name and description in the `properties` section of - the schema to be exported. Optional, default is False. + Whether objects that are generated from reference properties shall have a `name` and + `description` property in the generated schema. Optional, default is False. additional_options_for_text_props : dict, optional Dictionary containing additional "pattern" or "format" options for string-typed properties. Optional, default is empty. @@ -113,7 +116,7 @@ class JsonSchemaExporter: return self._make_text_property(prop.description, text_format, text_pattern) - json_prop = {} + json_prop = OrderedDict() if prop.description: json_prop["description"] = prop.description if self._units_in_description and prop.unit: @@ -142,8 +145,8 @@ class JsonSchemaExporter: values = self._retrieve_enum_values("RECORD") + self._retrieve_enum_values("FILE") json_prop["enum"] = values elif prop.datatype == db.FILE: - # TODO: different issue - raise NotImplementedError("Files have not been implemented yet.") + json_prop["type"] = "string" + json_prop["format"] = "data-url" else: prop_name = prop.datatype if isinstance(prop.datatype, db.Entity): @@ -224,7 +227,7 @@ class JsonSchemaExporter: schema["required"] = self._make_required_list(rt) schema["additionalProperties"] = self._additional_properties - props = {} + props = OrderedDict() if self._name_and_description_in_properties: props["name"] = self._make_text_property("The name of the Record to be created") props["description"] = self._make_text_property( @@ -257,7 +260,9 @@ class JsonSchemaExporter: schema : dict A dict containing the json schema created from the given RecordType's properties. """ - + if rt is None: + raise ValueError( + "recordtype_to_json_schema(...) cannot be called with a `None` RecordType.") schema = self._make_segment_from_recordtype(rt) schema["$schema"] = "https://json-schema.org/draft/2019-09/schema" if rt.name: @@ -287,8 +292,8 @@ def recordtype_to_json_schema(rt: db.RecordType, additional_properties: bool = T Whether additional properties will be admitted in the resulting schema. Optional, default is True. name_and_description_in_properties : bool, optional - Whether to include name and description in the `properties` section of - the schema to be exported. Optional, default is False. + Whether objects that are generated from reference properties shall have a `name` and + `description` property in the generated schema. Optional, default is False. additional_options_for_text_props : dict, optional Dictionary containing additional "pattern" or "format" options for string-typed properties. Optional, default is empty. @@ -317,3 +322,93 @@ def recordtype_to_json_schema(rt: db.RecordType, additional_properties: bool = T do_not_create=do_not_create, ) return exporter.recordtype_to_json_schema(rt) + + +def make_array(schema: dict) -> dict: + """Create an array of the given schema. + +The result will look like this: + +.. code:: js + + { "type": "array", + "items": { + // the schema + } + } + +Parameters +---------- + +schema : dict + The JSON schema which shall be packed into an array. + +Returns +------- + +out : dict + A JSON schema dict with a top-level array which contains instances of the given schema. + """ + result = { + "type": "array", + "items": schema, + "$schema": "https://json-schema.org/draft/2019-09/schema", + } + return result + + +def merge_schemas(schemas: Union[Dict[str, dict], Iterable[dict]]) -> dict: + """Merge the given schemata into a single schema. + +The result will look like this: + +.. code:: js + + { + "type": "object", + "properties": { + // A, B, C + }, + "required": [ + // "A", "B", "C" + ], + "additionalProperties": false + } + + +Parameters +---------- + +schemas : dict[str, dict] | Iterable[dict] + A dict or iterable of schemata which shall be merged together. If this is a dict, the keys will + be used as property names, otherwise the titles of the submitted schemata. If they have no title, + numbers will be used as a fallback. Note that even with a dict, the original schema's "title" is + not changed. + +Returns +------- + +out : dict + A JSON schema dict with a top-level object which contains the given schemata as properties. + """ + sub_schemas: dict[str, dict] = OrderedDict() + required = [] + + if isinstance(schemas, dict): + sub_schemas = schemas + required = [str(k) for k in schemas.keys()] + else: + for i, schema in enumerate(schemas, start=1): + title = schema.get("title", str(i)) + sub_schemas[title] = schema + required.append(title) + + result = { + "type": "object", + "properties": sub_schemas, + "required": required, + "additionalProperties": False, + "$schema": "https://json-schema.org/draft/2019-09/schema", + } + + return result diff --git a/src/caosadvancedtools/models/data_model.py b/src/caosadvancedtools/models/data_model.py index bb40939a52a0700883f119ff03ddf499c2589845..27f60b5ec877c2ad7646fffd4ef735ebb62c1694 100644 --- a/src/caosadvancedtools/models/data_model.py +++ b/src/caosadvancedtools/models/data_model.py @@ -29,8 +29,9 @@ from copy import deepcopy # remove this, when we drop support for old Python versions. from typing import List -import caosdb as db -from caosdb.apiutils import compare_entities, describe_diff +import linkahead as db +import linkahead.common.models as models +from linkahead.apiutils import compare_entities, describe_diff CAOSDB_INTERNAL_PROPERTIES = [ @@ -263,28 +264,56 @@ class DataModel(dict): return list(all_ents.values()) - def get_deep(self, name: str, visited: set = None): + def get_deep(self, name: str, visited_props: set = None, visited_parents: set = None): """Attempt to resolve references for the given ``name``. - This methods only uses data which is available in this datamodel, which acts kind of like a - cache pool. + The returned entity has all the properties it inherits from its ancestry and all properties + have the correct descriptions and datatypes. This methods only uses data which is available + in this DataModel, which acts kind of like a cache pool. + + Note that this may change this data model (subsequent "get" like calls may also return + deeper content.) - Note that this may change this data model (subsequent "get" like calls may also return deep - content.) """ entity = self.get(name) if not entity: return entity - if not visited: - visited = set() + if not visited_props: + visited_props = set() + if not visited_parents: + visited_parents = set() + + importances = { + models.OBLIGATORY: 0, + models.RECOMMENDED: 1, + models.SUGGESTED: 2, + } + + for parent in list(entity.get_parents()): # Make a change-resistant list copy. + if parent.name in visited_parents: + continue + visited_parents.add(parent.name) + parent_importance = importances.get(parent._flags.get("inheritance"), 999) + if parent.name in self: + deep_parent = self.get_deep(parent.name, # visited_props=visited_props, + visited_parents=visited_parents + ) + + for prop in deep_parent.properties: + importance = importances[deep_parent.get_importance(prop.name)] + if (importance <= parent_importance + and prop.name not in [prop.name for prop in entity.properties]): + entity.add_property(prop) + else: + print(f"Referenced parent \"{parent.name}\" not found in data model.") - # new_props = [] for prop in list(entity.get_properties()): # Make a change-resistant list copy. - if prop.name in visited: + if prop.name in visited_props: continue - visited.add(prop.name) + visited_props.add(prop.name) if prop.name in self: - deep_prop = self.get_deep(prop.name, visited=visited) + deep_prop = self.get_deep(prop.name, visited_props=visited_props, + visited_parents=visited_parents) linked_prop = entity.get_property(prop) if not linked_prop.datatype: if deep_prop.role == "Property": @@ -295,4 +324,5 @@ class DataModel(dict): linked_prop.description = deep_prop.description else: print(f"Referenced property \"{prop.name}\" not found in data model.") + return entity diff --git a/src/caosadvancedtools/models/parser.py b/src/caosadvancedtools/models/parser.py index 25b5727c2674e0fbfa58f31a595da91aebfc806a..ba63c5cd77217352144e4ec26e052cc2772339ce 100644 --- a/src/caosadvancedtools/models/parser.py +++ b/src/caosadvancedtools/models/parser.py @@ -141,7 +141,11 @@ class JsonSchemaDefinitionError(RuntimeError): def parse_model_from_yaml(filename, existing_model: Optional[dict] = None): - """Shortcut if the Parser object is not needed. + """Parse a data model from a YAML file. + +This is a convenience function if the Parser object is not needed, it calls +``Parser.parse_model_from_yaml(...)`` internally. + Parameters ---------- @@ -155,7 +159,10 @@ existing_model : dict, optional def parse_model_from_string(string, existing_model: Optional[dict] = None): - """Shortcut if the Parser object is not needed. + """Parse a data model from a YAML string + +This is a convenience function if the Parser object is not needed, it calls +``Parser.parse_model_from_string(...)`` internally. Parameters ---------- diff --git a/unittests/test_json_schema_exporter.py b/unittests/test_json_schema_exporter.py index 597c86a9a375e05fdc6b85fad4e0cb1a44b125e9..18a375363c175dd6d2fd1736023ef29afa110bfd 100644 --- a/unittests/test_json_schema_exporter.py +++ b/unittests/test_json_schema_exporter.py @@ -25,6 +25,7 @@ import json import linkahead as db +import caosadvancedtools.json_schema_exporter as jsex from jsonschema import FormatChecker, validate, ValidationError from pytest import raises @@ -533,17 +534,21 @@ def test_rt_with_references(): } validate(example, schema) + # Single file and multiple files rt = db.RecordType() rt.add_property(name="FileProp", datatype=db.FILE) - with raises(NotImplementedError): - schema = rtjs(rt) + schema = rtjs(rt) + assert schema["properties"]["FileProp"]["type"] == "string" + assert schema["properties"]["FileProp"]["format"] == "data-url" rt = db.RecordType() rt.add_property(name="FileProp", datatype=db.LIST(db.FILE)) - with raises(NotImplementedError): - schema = rtjs(rt) + schema = rtjs(rt) + assert schema["properties"]["FileProp"]["type"] == "array" + assert schema["properties"]["FileProp"]["items"]["type"] == "string" + assert schema["properties"]["FileProp"]["items"]["format"] == "data-url" def test_broken(): @@ -681,3 +686,90 @@ RT2: "$schema": "https://json-schema.org/draft/2019-09/schema", "title": "RT2" }""" + + +def test_schema_modification(): + """Testing functions which modify json schema dicts: + +- make_array() +- merge_schemas(). + """ + + model_str = """ +some_date: + datatype: DATETIME +RT1: + obligatory_properties: + some_date: + +some_text: + datatype: TEXT +RT2: + obligatory_properties: + some_text: + """ + model = parse_model_from_string(model_str) + schema_RT1 = rtjs(model.get_deep("RT1"), additional_properties=False) + schema_RT2 = rtjs(model.get_deep("RT2"), additional_properties=False) + + # Merge the schemata + merged_list = jsex.merge_schemas([schema_RT1, schema_RT2]) + assert merged_list["type"] == "object" + assert merged_list["properties"]["RT1"]["title"] == "RT1" + assert merged_list["properties"]["RT2"]["properties"]["some_text"]["type"] == "string" + + merged_dict = jsex.merge_schemas({"schema1": schema_RT1, "schema2": schema_RT2}) + assert merged_dict["type"] == "object" + assert merged_dict["properties"]["schema1"]["title"] == "RT1" + assert merged_dict["properties"]["schema2"]["properties"]["some_text"]["type"] == "string" + + # Make an array + array = jsex.make_array(schema_RT1) + assert array["type"] == "array" + assert array["items"] == schema_RT1 + + +def test_inheritance(): + """Test data models with inherited properties.""" + model_str = """ +some_date: + datatype: DATETIME +RT1: + obligatory_properties: + some_date: +RT2: + inherit_from_suggested: + - RT1 + """ + model = parse_model_from_string(model_str) + rt2_deep = model.get_deep("RT2") + assert "some_date" in [prop.name for prop in rt2_deep.properties] + + model_str = """ +RT1: + obligatory_properties: + RT2: +RT2: + inherit_from_suggested: + - RT1 +RT3: + inherit_from_suggested: + - RT4 +RT4: + inherit_from_suggested: + - RT3 +RT5: + inherit_from_suggested: + - RT5 + """ + model = parse_model_from_string(model_str) + # This must not lead to an infinite recursion + rt1_deep = model.get_deep("RT1") + rt2_deep = model.get_deep("RT2") + assert rt2_deep.get_property("RT2").name == rt1_deep.get_property("RT2").name + rt3_deep = model.get_deep("RT3") + assert rt3_deep.get_parents()[0].name == "RT4" + rt4_deep = model.get_deep("RT4") + assert rt4_deep.get_parents()[0].name == "RT3" + rt5_deep = model.get_deep("RT5") + assert rt5_deep.get_parents()[0].name == "RT5"