Skip to content
Snippets Groups Projects
Select Git revision
  • 40044dff3e1f0c0176657d220269e76a82738a1e
  • main default protected
  • f-yaml-parser-enums
  • dev protected
  • f-fix-paths
  • f-fix-validate-to-dict
  • f-labfolder-converter
  • f-state-machine-script
  • f-xlsx-converter-warnings-errors
  • f-rename
  • f-extra-deps
  • f-more-jsonschema-export
  • f-henrik
  • f-fix-89
  • f-trigger-advanced-user-tools
  • f-real-rename-test
  • f-linkahead-rename
  • f-register-integrationtests
  • f-fix-id
  • f-h5-files
  • f-json-schema
  • v0.14.0
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.0-numpy2
  • v0.10.0
  • v0.9.0
  • v0.8.0
  • v0.7.0
  • v0.6.1
  • v0.6.0
  • v0.5.0
  • v0.4.1
  • v0.4.0
  • v0.3.1
  • v0.3.0
37 results

test_cfood.py

  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_cfood.py 12.81 KiB
    #!/usr/bin/env python
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    import re
    import unittest
    
    import caosdb as db
    from caosadvancedtools.cfood import (AbstractCFood, AbstractFileCFood, CMeal,
                                         assure_has_parent, assure_has_property,
                                         assure_object_is_in_list,
                                         assure_parents_are, assure_property_is,
                                         get_entity_for_path)
    from caosadvancedtools.crawler import FileCrawler
    from caosadvancedtools.example_cfood import ExampleCFood
    from linkahead.common.models import _parse_single_xml_element
    from lxml import etree
    from datetime import datetime, timezone
    
    PATTERN = "h.*"
    
    
    class ExampleCFoodMeal(AbstractFileCFood, CMeal):
        matching_groups = ["test"]
    
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            # add the constructor of CMeal
            CMeal.__init__(self)
    
        @classmethod
        def match_item(cls, path):
            """ standard match_match, but returns False if a suitable cfood exists """
    
            print(path)
            if cls.has_suitable_cfood(path):
                return False
    
            return re.match(cls.get_re(), path) is not None
    
        def looking_for(self, crawled_file):
            """ standard looking_for, but returns True if the file matches all
            groups"""
    
            if self.belongs_to_meal(crawled_file):
                return True
    
            return super().looking_for(crawled_file)
    
        @staticmethod
        def get_re():
            return r"/(?P<test>[a-z]*)/"
    
        def create_identifiables(self):
            pass
    
        def update_identifiables(self):
            pass
    
    
    class SimpleCFood(AbstractFileCFood):
    
        @staticmethod
        def get_re():
            return PATTERN
    
    
    class DependendCFood(AbstractCFood):
        existing = []
    
        @classmethod
        def match_item(cls, item):
            if len(cls.existing) == 0:
                return True
    
        def __init__(self, *args,  **kwargs):
            super().__init__(*args,  **kwargs)
            DependendCFood.existing.append(self)
    
        def create_identifiables(self):
            pass
    
        def update_identifiables(self):
            pass
    
    
    class CFoodReTest(unittest.TestCase):
        def test(self):
            self.assertEqual(SimpleCFood.get_re(), PATTERN)
            self.assertEqual(SimpleCFood._pattern, None)
            self.assertTrue(SimpleCFood.match_item("hallo"))
            # TODO the caching is of compiled re is disabled currently
            # self.assertIsNotNone(SimpleCFood._pattern)
            self.assertTrue(SimpleCFood.match_item("hallo"))
            self.assertFalse(SimpleCFood.match_item("allo"))
    
        def test_extensions(self):
            """Test the RE generation."""
            empty_extensions = []
            extensions = ["foo", "bar"]
    
            self.assertIsNone(AbstractFileCFood.re_from_extensions(empty_extensions))
            self.assertIsNotNone(SimpleCFood.re_from_extensions(extensions))
    
            class ExtCFood(AbstractFileCFood):
    
                @staticmethod
                def get_re():
                    return AbstractFileCFood.re_from_extensions(extensions)
                create_identifiables = None
                update_identifiables = None
    
            # test which paths are matched
            print(ExtCFood.re_from_extensions(extensions))
            self.assertTrue(ExtCFood.match_item("hello/world.foo"))
            self.assertTrue(ExtCFood.match_item("hello/world.bar"))
            self.assertFalse(ExtCFood.match_item("hello/world.baz"))
            self.assertFalse(ExtCFood.match_item("hello/world.foo "))  # Mind the space.
            self.assertFalse(ExtCFood.match_item("hello/world.foobar"))
            self.assertFalse(ExtCFood.match_item("hello/world.foo|bar"))
            self.assertFalse(ExtCFood.match_item("hello/world.fobar"))
            self.assertFalse(ExtCFood.match_item("hello/world.fooar"))
    
            # Test stored extension
            self.assertEqual(ExtCFood("hello/world.foo").match["ext"], "foo")
    
    
    class InsertionTest(unittest.TestCase):
        def test_contained_in_list(self):
            entity_with_list = db.Record()
            prop_name = "list_prop"
            to_be_updated = []
            assure_object_is_in_list(1005, entity_with_list, prop_name,
                                     to_be_updated)
            assert to_be_updated[0] is entity_with_list
            assert (to_be_updated[0].get_property(prop_name).value[0] == 1005)
            to_be_updated = []
            assure_object_is_in_list(1005, entity_with_list, prop_name,
                                     to_be_updated)
            assert len(to_be_updated) == 0
    
        def test_parent(self):
            entity = db.Record()
            to_be_updated = []
            assure_has_parent(entity, "parent", to_be_updated)
            assert to_be_updated[0] is entity
            assert (to_be_updated[0].get_parents()[0].name == "parent")
            to_be_updated = []
            assure_has_parent(entity, "parent", to_be_updated)
            assert len(to_be_updated) == 0
    
        def test_has_property(self):
            """Test properties with string, int, float, and Boolean values"""
            entity = db.Record()
            to_be_updated = []
            int_name = "Test int"
            types_and_values = {
                int_name: ("INT", 5),
                "Test float": (db.DOUBLE, 3.14),
                "Test bool": (db.BOOLEAN, True),
                "Test string": (db.TEXT, "bla")
            }
    
            for name, ty_val in types_and_values.items():
                entity.add_property(name=name, datatype=ty_val[0],
                                    value=ty_val[1])
                assure_has_property(entity=entity, name=name,
                                    value=ty_val[1], to_be_updated=to_be_updated)
                assert len(to_be_updated) == 0
            new_int = 6
            assure_has_property(entity=entity, name=int_name,
                                value=new_int, to_be_updated=to_be_updated)
            assert to_be_updated[0] is entity
    
            """Test properties with lists"""
            rec1 = db.Record(id=12345)
            rec1.add_property("Exp", value=[98765], datatype=db.LIST("Exp"))
            rec2 = db.Record(id=98765)
            update = []
            # compare Entity with id
            assure_has_property(rec1, "Exp", [rec2], to_be_updated=update)
            assert len(update) == 0
            update = []
            # compare id with id
            assure_has_property(rec1, "Exp", [98765], to_be_updated=update)
            assert len(update) == 0
            update = []
            # compare id with different list of ids
            assure_has_property(rec1, "Exp2", [98765, 444, 555],
                                to_be_updated=update)
            assert len(update) == 1
    
            rec = db.Record(id=666666)
            rec3 = db.Record(id=777777)
            rec.add_property("Exp", value=[888888, rec3], datatype=db.LIST("Exp"))
            rec2 = db.Record(id=888888)
            update = []
            # compare id and Entity with id and Entity
            # i.e. check that conversion from Entity to id works in both
            # directions.
            assure_has_property(rec, "Exp", [rec2, 777777], to_be_updated=update)
            assert len(update) == 0
    
        def test_property_is(self):
            """Test properties with string, int, float, and Boolean values"""
            entity = db.Record()
            to_be_updated = []
            int_name = "Test int"
            types_and_values = {
                int_name: ("INT", 5),
                "Test float": (db.DOUBLE, 3.14),
                "Test bool": (db.BOOLEAN, True),
                "Test string": (db.TEXT, "bla")
            }
    
            for name, ty_val in types_and_values.items():
                entity.add_property(name=name, datatype=ty_val[0],
                                    value=ty_val[1])
                assure_property_is(entity=entity, name=name,
                                   value=ty_val[1], to_be_updated=to_be_updated)
                assert len(to_be_updated) == 0
    
            # Test overwriting existing values
            types_and_values = {
                int_name: ("INT", 7),
                "Test float": (db.DOUBLE, 3.34),
                "Test bool": (db.BOOLEAN, False),
                "Test string": (db.TEXT, "blasdkfjlj")
            }
    
            for i, (name, ty_val) in enumerate(types_and_values.items()):
                assure_property_is(entity=entity, name=name,
                                   value=ty_val[1], to_be_updated=to_be_updated)
                assert len(to_be_updated) == i+1
                assert entity.get_property(name).value == ty_val[1]
            entity.add_property(name="Test float", datatype=db.DOUBLE, value=3.2)
            self.assertRaises(Exception, assure_property_is, entity=entity,
                              name="Test float",
                              value=4.2, to_be_updated=to_be_updated)
    
        def test_parents_are(self):
            entity = db.Record()
            to_be_updated = []
            assure_parents_are(entity, "parent", to_be_updated)
            assert to_be_updated[0] is entity
            assure_parents_are(entity, "parent", to_be_updated)
            assert len(to_be_updated) == 1
            assure_parents_are(entity, ["parent", "other_parent"], to_be_updated)
            assert len(to_be_updated) == 2
            ps = [p.name for p in entity.get_parents()]
            assert "parent" in ps
            assert "other_parent" in ps
            assure_parents_are(entity, ["parent", "other_parent"], to_be_updated)
            assert len(to_be_updated) == 2
            assure_parents_are(entity, "yet_another_parent", to_be_updated)
            assert len(to_be_updated) == 3
            ps = [p.name for p in entity.get_parents()]
            assert "yet_another_parent" in ps
            assert "parent" not in ps
            assert "other_parent" not in ps
    
        def test_assure_datetime(self):
            entity_xml = '<Record id="1234" name="Test_Record"><Property id="1233" name="TestDate" datatype="DATETIME">{}</Property></Record>'
            to_be_updated = []
            rec = _parse_single_xml_element(
                etree.fromstring(entity_xml.format("2020-01-01")))
            assure_has_property(entity=rec, name="TestDate", value=datetime(
                2020, 1, 1), to_be_updated=to_be_updated)
            assert not to_be_updated
            assure_has_property(entity=rec, name="TestDate", value=datetime(
                2020, 1, 1, 0, 0), to_be_updated=to_be_updated)
            assert not to_be_updated
            assure_has_property(entity=rec, name="TestDate", value=datetime(
                2020, 1, 1, 15, 0), to_be_updated=to_be_updated)
            assert len(to_be_updated) == 1
            rec = _parse_single_xml_element(etree.fromstring(
                entity_xml.format("2020-01-01T00:00:00.0")))
            assure_has_property(entity=rec, name="TestDate", value=datetime(
                2020, 1, 1), to_be_updated=to_be_updated)
            rec = _parse_single_xml_element(etree.fromstring(
                entity_xml.format("2020-01-01T00:00:00.000+0000")))
            assure_has_property(entity=rec, name="TestDate", value=datetime(
                2020, 1, 1, tzinfo=timezone.utc), to_be_updated=to_be_updated)
    
            assert len(to_be_updated) == 1
    
    
    class DependendTest(unittest.TestCase):
        def test(self):
            self.assertTrue(DependendCFood.match_item(None))
            cf = DependendCFood(None)
            self.assertFalse(DependendCFood.match_item(None))
    
    
    class ExampleTest(unittest.TestCase):
        def test(self):
            path = "/data/rabbit/2019-03-03/README.md"
            cf = ExampleCFood(crawled_path=path)
            self.assertIsNotNone(ExampleCFood.match_item(path))
            self.assertEqual(cf.match.group('species'), 'rabbit')
            self.assertEqual(cf.match.group('date'), '2019-03-03')
    
    
    class MealTest(unittest.TestCase):
        def test(self):
            # path should match
            self.assertTrue(ExampleCFoodMeal.match_item("/this/file"))
            # create an instance
            c = ExampleCFoodMeal("/this/file")
            # same prefix should no longer match
            self.assertFalse(ExampleCFoodMeal.match_item("/this/other"))
            # but instance should be looking for this prefix
            self.assertTrue(c.looking_for("/this/other"))
            # class should still match other prefixes
            self.assertTrue(ExampleCFoodMeal.match_item("/that/file"))
    
    
    class FileCacheTest(unittest.TestCase):
        def test(self):
            # if there is no connection to a server an exception should be raised.
            self.assertRaises(Exception, get_entity_for_path, "/lol")
            FileCrawler(cfood_types=[], files=[db.File(path="/lol")])
            get_entity_for_path("/lol")