Skip to content
Snippets Groups Projects
test_yaml_model_parser.py 22.5 KiB
Newer Older
# This file is a part of the LinkAhead project.
Daniel Hornung's avatar
Daniel Hornung committed
#
# Copyright (C) 2023 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2023 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import unittest
from datetime import date
from tempfile import NamedTemporaryFile
import caosadvancedtools
import linkahead as db
from caosadvancedtools.models.parser import (TwiceDefinedException,
                                             YamlDefinitionError,
                                             parse_model_from_string,
                                             parse_model_from_yaml)
from linkahead.apiutils import compare_entities
from pytest import mark, raises


def to_file(string):
    f = NamedTemporaryFile(mode="w", delete=False)
    f.write(string)
    f.close()

    return f.name

# TODO: check purpose of this function... add documentation

def parse_str(string):
    parse_model_from_yaml(to_file(string))


def has_property(el, name):
    for p in el.get_properties():
        if p.name == name:
            return True

    return False


def has_parent(el, name):
    for p in el.get_parents():
        if p.name == name:
            return True

    return False


class TwiceTest(unittest.TestCase):
    def test_defined_once(self):
        string = """
RT1:
  recommended_properties:
    a:
RT2:
  recommended_properties:
    RT1:
RT3:
  recommended_properties:
    RT4:
      recommended_properties:
        a:
RT4:
"""
        model = parse_model_from_yaml(to_file(string))
        assert has_property(model["RT1"], "a")
        assert has_property(model["RT4"], "a")

    def test_defined_twice(self):
        string = """
RT1:
  recommended_properties:
    a:
RT2:
  recommended_properties:
    RT1:
      recommended_properties:
        a:
"""

        self.assertRaises(TwiceDefinedException,
                          lambda: parse_model_from_yaml(to_file(string)))

    def test_typical_case(self):
        string = """
RT1:
  recommended_properties:
    p1:
      datatype: TEXT
      description: shiet egal
  obligatory_properties:
    p2:
      datatype: TEXT
RT2:
  description: "This is awesome"
  inherit_from_suggested:
  - RT1
  - RT4
  obligatory_properties:
    RT1:
    p3:
      datatype: DATETIME
  recommended_properties:
    p4:
    RT4:
p1:
p5:
RT5:
  """
        parse_model_from_yaml(to_file(string))

    def test_wrong_kind(self):
        string = """
- RT1:
- RT2:
"""
        self.assertRaises(
            ValueError, lambda: parse_model_from_yaml(to_file(string)))

    def test_unknown_kwarg(self):
        string = """
RT1:
  datetime:
    p1:
"""
        self.assertRaises(
            ValueError, lambda: parse_model_from_yaml(to_file(string)))

    def test_definition_in_inheritance(self):
        string = """
RT2:
  description: "This is awesome"
  inherit_from_suggested:
  - RT1:
    description: "tach"
"""
        self.assertRaises(
            ValueError, lambda: parse_model_from_yaml(to_file(string)))

    def test_inheritance(self):
        string = """
RT1:
  description: "This is awesome"
  inherit_from_suggested:
  - RT2
  inherit_from_recommended:
  - RT3
  inherit_from_obligatory:
  - RT4
  - RT5
RT2:
RT3:
RT4:
RT5:
"""
        model = parse_model_from_yaml(to_file(string))
        assert has_parent(model["RT1"], "RT2")
        assert (model["RT1"].get_parent(
            "RT2")._flags["inheritance"] == db.SUGGESTED)
        assert has_parent(model["RT1"], "RT3")
        assert (model["RT1"].get_parent(
            "RT3")._flags["inheritance"] == db.RECOMMENDED)
        assert has_parent(model["RT1"], "RT4")
        assert (model["RT1"].get_parent(
            "RT4")._flags["inheritance"] == db.OBLIGATORY)
        assert has_parent(model["RT1"], "RT5")
        assert (model["RT1"].get_parent(
            "RT5")._flags["inheritance"] == db.OBLIGATORY)

    def test_properties(self):
        string = """
RT1:
  description: "This is awesome"
  recommended_properties:
    RT2:
  suggested_properties:
    RT3:
  obligatory_properties:
    RT4:
      recommended_properties:
        RT2:
    RT5:
"""
        model = parse_model_from_yaml(to_file(string))
        assert has_property(model["RT1"], "RT2")
        assert model["RT1"].get_importance("RT2") == db.RECOMMENDED
        assert has_property(model["RT1"], "RT3")
        assert model["RT1"].get_importance("RT3") == db.SUGGESTED
        assert has_property(model["RT1"], "RT4")
        assert model["RT1"].get_importance("RT4") == db.OBLIGATORY
        assert has_property(model["RT1"], "RT5")
        assert model["RT1"].get_importance("RT5") == db.OBLIGATORY
        assert has_property(model["RT4"], "RT2")
        assert model["RT4"].get_importance("RT2") == db.RECOMMENDED

    def test_datatype(self):
        string = """
p1:
  datatype: TEXT
"""
        parse_model_from_yaml(to_file(string))
        string = """
p2:
  datatype: TXT
"""
Henrik tom Wörden's avatar
Henrik tom Wörden committed
        self.assertRaises(ValueError, parse_model_from_yaml, to_file(string))


class ListTest(unittest.TestCase):
    def test_list(self):
        string = """
RT1:
  recommended_properties:
    a:
      datatype: LIST(RT2)
Henrik tom Wörden's avatar
Henrik tom Wörden committed
    b:
      datatype: LIST(TEXT)
Henrik tom Wörden's avatar
Henrik tom Wörden committed
    c:
      datatype: LIST<TEXT>
Henrik tom Wörden's avatar
Henrik tom Wörden committed
        model = parse_model_from_yaml(to_file(string))

        self.assertTrue(isinstance(model['b'], db.Property))
        self.assertEqual(model['b'].datatype, db.LIST(db.TEXT))
Henrik tom Wörden's avatar
Henrik tom Wörden committed
        self.assertTrue(isinstance(model['c'], db.Property))
        self.assertEqual(model['c'].datatype, db.LIST(db.TEXT))

        # This failed for an older version of caosdb-models
        string_list = """
A:
  obligatory_properties:
    B:
      datatype: LIST(B)
B:
  obligatory_properties:
    c:
      datatype: INTEGER
"""
Henrik tom Wörden's avatar
Henrik tom Wörden committed
        model = parse_model_from_yaml(to_file(string_list))
        self.assertTrue(isinstance(model['A'], db.RecordType))
        self.assertEqual(model['A'].properties[0].datatype, db.LIST("B"))


class ParserTest(unittest.TestCase):
    """Generic tests for good and bad syntax."""

    def test_empty_property_list(self):
        """Emtpy property lists are allowed now."""
        empty = """
A:
  obligatory_properties:
"""
        parse_str(empty)

    def test_non_string_name(self):
        """Test for when the name does not look like a string to YAML."""
        name_int = """1:
  recommended_properties:
    1.2:
    Null:
    0x0:
    010:
"""
        model = parse_model_from_string(name_int)
        self.assertEqual(len(model), 5)
        for key in model.keys():
            self.assertIsInstance(key, str)

    def test_unexpected_keyword(self):
        """Test for when keywords happen at places where they should not be."""
        yaml = """A:
  obligatory_properties:
    recommended_properties:
"""
        with self.assertRaises(YamlDefinitionError) as yde:
            parse_model_from_string(yaml)
        self.assertIn("line 3", yde.exception.args[0])
        self.assertIn("recommended_properties", yde.exception.args[0])

    def test_parents_list(self):
        """Parents must be a list."""
        yaml = """A:
  inherit_from_obligatory:
    A:
"""
        with self.assertRaises(YamlDefinitionError) as yde:
            parse_model_from_string(yaml)
        self.assertIn("line 3", yde.exception.args[0])

    def test_reference_property(self):
        """Test correct creation of reference property using an RT."""
  recommended_properties:
    ref:
      datatype: LIST<A>
"""
        model = parse_model_from_string(modeldef)
        self.assertEqual(len(model), 2)
        for key, value in model.items():
                self.assertTrue(isinstance(value, db.RecordType))
            elif key == "ref":
                self.assertTrue(isinstance(value, db.Property))
                self.assertEqual(value.datatype, "LIST<A>")
                assert value.description == "new description"

class ExternTest(unittest.TestCase):
    """TODO Testing the "extern" keyword in the YAML."""
    @unittest.expectedFailure
    def test_extern(self):
        raise NotImplementedError("Extern testing is not implemented yet.")


class ErrorMessageTest(unittest.TestCase):
    """Tests for understandable error messages."""

    # Note: This was changed with implementation of role keyword
    @unittest.expectedFailure
    def test_non_dict(self):
        """When a value is given, where a list or mapping is expected."""
        recordtype_value = """
A: "some class"
"""
        recommended_value = """
A:
  recommended_properties: 23
"""
        property_value = """
prop:
  datatype: DOUBLE
A:
  recommended_properties:
  - prop: 3.14
"""
        # Failing strings and the lines where they fail
        failing = {
            recordtype_value: 2,
            recommended_value: 3,
            property_value: 6
        }
        for string, line in failing.items():
            # parse_str(string)
            with self.assertRaises(YamlDefinitionError) as yde:
                parse_str(string)
            assert "line {}".format(line) in yde.exception.args[0]
Daniel Hornung's avatar
Daniel Hornung committed
def test_existing_model():
    """Parsing more than one model may require to append to existing models."""
    model_str_1 = """
A:
  obligatory_properties:
    number:
      datatype: INTEGER
    """
    model_str_2 = """
B:
  obligatory_properties:
    A:
    """
    model_1 = parse_model_from_string(model_str_1)
    model_2 = parse_model_from_string(model_str_2, existing_model=model_1)
    for ent in ["A", "B", "number"]:
        assert ent in model_2

    model_str_redefine = """
number:
  datatype: DOUBLE
  description: Hello number!
    """
    model_redefine = parse_model_from_string(model_str_redefine, existing_model=model_1)
    print(model_redefine)
    assert model_redefine["number"].description == "Hello number!"
    assert model_redefine["number"].datatype == db.INTEGER  # FIXME Shouldn't this be DOUBLE?


def test_define_role():
    model = """
A:
  role: Record
"""
    entities = parse_model_from_string(model)
    assert "A" in entities
    assert isinstance(entities["A"], db.Record)
    assert entities["A"].role == "Record"

    model = """
A:
  role: Record
  inherit_from_obligatory:
  - C
  obligatory_properties:
    b:
b:
  datatype: INTEGER
C:
  obligatory_properties:
    b:
D:
  role: RecordType
"""
    entities = parse_model_from_string(model)
    for name, ent in (("A", "Record"), ("b", "Property"),
                      ("C", "RecordType"), ("D", "RecordType")):
        assert name in entities
        assert isinstance(entities[name], getattr(db, ent))
        assert entities[name].role == ent

    assert entities["A"].parents[0].name == "C"
    assert entities["A"].name == "A"

    assert entities["A"].properties[0].name == "b"
    assert entities["A"].properties[0].value is None

    assert entities["C"].properties[0].name == "b"
    assert entities["C"].properties[0].value is None

    model = """
A:
  role: Record
  obligatory_properties:
    b: 42
b:
  datatype: INTEGER
"""

    entities = parse_model_from_string(model)
    assert entities["A"].get_property("b").value == 42
    assert entities["b"].value is None

    model = """
b:
  datatype: INTEGER
  value: 18
"""
    entities = parse_model_from_string(model)
    assert entities["b"].value == 18


def test_issue_72():
    """Tests for
    https://gitlab.indiscale.com/caosdb/src/caosdb-advanced-user-tools/-/issues/72

    In some cases, faulty values would be read in for properties without a
    specified value.

    """
    model = """
Experiment:
  obligatory_properties:
    date:
Florian Spreckelsen's avatar
Florian Spreckelsen committed
      datatype: DATETIME
      description: 'date of the experiment'
    identifier:
Florian Spreckelsen's avatar
Florian Spreckelsen committed
      datatype: TEXT
      description: 'identifier of the experiment'
    temperature:
Florian Spreckelsen's avatar
Florian Spreckelsen committed
      datatype: DOUBLE
TestExperiment:
  role: Record
  inherit_from_obligatory:
    - Experiment
  obligatory_properties:
    date: 2022-03-02
    identifier: Test
    temperature: 23
  recommended_properties:
    additional_prop:
      datatype: INTEGER
      value: 7
"""
    entities = parse_model_from_string(model)
    assert "Experiment" in entities
    assert "date" in entities
    assert "identifier" in entities
    assert "temperature" in entities
    assert "TestExperiment" in entities
    assert "additional_prop" in entities
    assert isinstance(entities["Experiment"], db.RecordType)

    assert entities["Experiment"].get_property("date") is not None
    # No value is set, so this has to be None
Florian Spreckelsen's avatar
Florian Spreckelsen committed
    assert entities["Experiment"].get_property("date").value is None

    assert entities["Experiment"].get_property("identifier") is not None
Florian Spreckelsen's avatar
Florian Spreckelsen committed
    assert entities["Experiment"].get_property("identifier").value is None

    assert entities["Experiment"].get_property("temperature") is not None
Florian Spreckelsen's avatar
Florian Spreckelsen committed
    assert entities["Experiment"].get_property("temperature").value is None

    test_rec = entities["TestExperiment"]
    assert isinstance(test_rec, db.Record)
    assert test_rec.get_property("date").value == date(2022, 3, 2)
    assert test_rec.get_property("identifier").value == "Test"
    assert test_rec.get_property("temperature").value == 23
    assert test_rec.get_property("additional_prop").value == 7
    assert test_rec.name == "TestExperiment"


def test_file_role():
    """Not implemented for now, see
    https://gitlab.indiscale.com/caosdb/src/caosdb-advanced-user-tools/-/issues/74.

    """
    model = """
F:
  role: File
"""
    with raises(NotImplementedError):
        entities = parse_model_from_string(model)
    """Test whether the `parent` keyword is removed.

    See https://gitlab.com/caosdb/caosdb-advanced-user-tools/-/issues/36.

    """
    model_string = """
R1:
  obligatory_properties:
    prop1:
      datatype: TEXT
R2:
  obligatory_properties:
    prop2:
      datatype: TEXT
  recommended_properties:
    prop3:
      datatype: TEXT
R3:
  parent:
  - R2
  inherit_from_obligatory:
  - R1
"""
    with raises(ValueError) as ve:
        # The keyword has been removed, so it should raise a regular ValueError.
        model = parse_model_from_string(model_string)

    assert "invalid keyword" in str(ve.value)
    assert "parent" in str(ve.value)


def test_yaml_error():
    """Testing error while parsing a yaml.
    """

    with raises(ValueError, match=r"line 2: .*"):
        parse_model_from_yaml("unittests/models/model_invalid.yml")


def test_inherit_error():
    """Must fail with an understandable exception."""
    model_string = """
prop1:
  inherit_from_obligatory: prop2
    """
    with raises(YamlDefinitionError,
                match=r"Parents must be a list but is given as string: prop1 > prop2"):
        parse_model_from_string(model_string)
Daniel Hornung's avatar
Daniel Hornung committed
@mark.xfail(reason="""Issue is
 https://gitlab.com/linkahead/linkahead-advanced-user-tools/-/issues/57""")
def test_inherit_properties():
    # TODO Is not even specified yet.
    model_string = """
prop1:
  datatype: DOUBLE
prop2:
#  role: Property
  inherit_from_obligatory:
  - prop1
    """
    model = parse_model_from_string(model_string)
    prop2 = model["prop2"]
    assert prop2.role == "Property"


def test_fancy_yaml():
    """Testing aliases and other fancy YAML features."""
    # Simple aliasing
    model_string = """
foo:
  datatype: INTEGER
RT1:
  obligatory_properties: &RT1_oblig
    foo:
RT2:
  obligatory_properties: *RT1_oblig
    """
    model = parse_model_from_string(model_string)

    assert len(model) == 3
    assert isinstance(model["foo"], db.Property)
    assert model["foo"].datatype == db.INTEGER
    for st in ("RT1", "RT2"):
        assert isinstance(model[st], db.RecordType)
        assert model[st].get_property("foo").datatype == db.INTEGER

    # Aliasing with override
    model_string = """
foo:
  datatype: INTEGER
RT1:
  obligatory_properties: &RT1_oblig
    foo:
RT2:
  obligatory_properties:
    <<: *RT1_oblig
    bar:
    """
    model = parse_model_from_string(model_string)

    assert len(model) == 4
    assert isinstance(model["bar"], db.RecordType)
    for st in ("RT1", "RT2"):
        assert isinstance(model[st], db.RecordType)
        assert model[st].get_property("foo").datatype == db.INTEGER
    assert model["RT2"].get_property("bar").datatype == "bar"
    """
    Test for this issue:
    https://gitlab.indiscale.com/caosdb/src/caosdb-advanced-user-tools/-/issues/130
    """
    model_string = """
foo:
  datatype: INTEGER
  description: bla bla
  unit: m

RT1:
  obligatory_properties:
    foo:
    RT2:

test_reference:
  datatype: RT2
    """
    model = parse_model_from_string(model_string)

    # Without the fix, foo will have no datatype, description and no unit **as part of RT1**, so the
    # comparison with a version taken from a LinkAhead instance will have these attributes.
    # Furthermore, RT2 will be set as the datatype **in object version** in the yaml definition, while
    # it is an ID in case of the version from the LinkAhead instance.

    server_response = """
<Entities>
  <noscript>
    </noscript>
  <Property id="2272" name="foo" description="bla bla" datatype="INTEGER" unit="m">
    <Version id="7819eedaeba2aa7305e10c96e8cf7b9ac84aea4a" head="true"/>
  </Property>
  <RecordType id="2273" name="RT1">
    <Version id="0c1b9df6677ee40d1e1429b2123e078ee6c863e0" head="true"/>
    <Property id="2272" name="foo" description="bla bla" datatype="INTEGER" unit="m" importance="OBLIGATORY" flag="inheritance:FIX"/>
    <Property id="2274" name="RT2" description="Describe RT2" datatype="LIST&lt;RT2&gt;" importance="OBLIGATORY" flag="inheritance:FIX"/>
    <Property id="2275" name="test_reference" datatype="RT2" importance="OBLIGATORY" flag="inheritance:FIX"/>
  </RecordType>
  <RecordType id="2274" name="RT2" description="Describe RT2">
    <Version id="185940642680a7eba7f71914dd8dd7758dd13faa" head="true"/>
  </RecordType>
  <Property id="2275" name="test_reference" datatype="RT2">
    <Version id="03cf86061c78a079b376394dfecdf32566b72fb7" head="true"/>
  </Property>
</Entities>"""

    entities = db.Container.from_xml(server_response)

    c1 = compare_entities(model["foo"], entities[0])
    c2 = compare_entities(model["RT1"], entities[1])
    c3 = compare_entities(model["RT2"], entities[2])
    c4 = compare_entities(model["test_reference"], entities[3])
    # Make sure the mock response matches the datamodel definiton
    # exactly, i.e., they only differ in ids which are None for all
    # entities from the datamodel and not None for the mocked
    # response.
        assert "id" in cs[0]
        assert cs[0]["id"] is None
        assert cs[0]["parents"] == []
        for name, val in cs[0]["properties"].items():
            # Also properties differ in ids: The one from the
            # datamodel have None
            assert len(val) == 1
            assert "id" in val
            assert val["id"] is None
        assert cs[1]["id"] is not None
        assert cs[1]["parents"] == []
        for name, val in cs[1]["properties"].items():
            # Also properties differ in ids: The one from the
            # mock response have not None
            assert len(val) == 1
            assert "id" in val
            assert val["id"] is not None
    # The server response would be the same as the xml above:
    def get_existing_entities(ent_cont):
        return entities
    class MockQuery:
        def __init__(self, q):
            self.q = q
        def execute(self, unique=True):
            id = int(self.q.split("=")[1])
            for existing_ent in entities:
                if existing_ent.id == id:
                    return existing_ent
            return None
    model.get_existing_entities = get_existing_entities
    caosadvancedtools.models.parser.db.Query = MockQuery
    caosadvancedtools.models.parser.db.Container.update = Mock()
    caosadvancedtools.models.parser.db.Container.insert = Mock()
    assert not caosadvancedtools.models.parser.db.Container.update.called
    assert not caosadvancedtools.models.parser.db.Container.insert.called
    output, err = capfd.readouterr()
    assert "No new entities." in output
    assert "No differences found. No update" in output
def test_sync_output(capfd):
    model = parse_model_from_string("""
RT:
  obligatory_properties:
    identifier:
      datatype: TEXT
""")

    existing_entities = [db.RecordType(
      name="RT", id=25).add_property(name="identifier",
                                     datatype="INTEGER",
                                     importance="OBLIGATORY",
                                     id=24),
                         db.Property(name="identifier", datatype="INTEGER", id=24)]

    def get_existing_entities(ent_cont):
        return existing_entities

    class MockQuery:
        def __init__(self, q):
            self.q = q

        def execute(self, unique=True):
            id = int(self.q.split("=")[1])
            for existing_ent in existing_entities:
                if existing_ent.id == id:
                    return existing_ent
            return None

    model.get_existing_entities = get_existing_entities
    caosadvancedtools.models.parser.db.Query = MockQuery
    caosadvancedtools.models.parser.db.Container.update = Mock()
    caosadvancedtools.models.parser.db.Container.insert = Mock()

    model.sync_data_model(True, True)
    assert caosadvancedtools.models.parser.db.Container.update.called
    assert not caosadvancedtools.models.parser.db.Container.insert.called
    output, err = capfd.readouterr()
    print(output)
    assert "version from the yaml file: TEXT" in output
    assert "version from LinkAhead: INTEGER" in output
def test_setting_values():
    model = parse_model_from_string("""
  parameter:
    datatype: INTEGER

  Simulation:
    role: Record
    obligatory_properties:
      parameter: 26
  """)

    assert len(model) == 2
    assert str(model["parameter"])[:-1] == '<Property name="parameter" datatype="INTEGER"/>'
    assert model["Simulation"].role == "Record"
    assert model["Simulation"].name == "Simulation"
    assert model["Simulation"].get_property("parameter").value == 26