Skip to content
Snippets Groups Projects
Select Git revision
  • 0305597404332f480cc7addde93f0e9da36ac6d3
  • main default protected
  • f-yaml-parser-enums
  • dev protected
  • f-fix-paths
  • f-fix-validate-to-dict
  • f-labfolder-converter
  • f-state-machine-script
  • f-xlsx-converter-warnings-errors
  • f-rename
  • f-extra-deps
  • f-more-jsonschema-export
  • f-henrik
  • f-fix-89
  • f-trigger-advanced-user-tools
  • f-real-rename-test
  • f-linkahead-rename
  • f-register-integrationtests
  • f-fix-id
  • f-h5-files
  • f-json-schema
  • v0.14.0
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.0-numpy2
  • v0.10.0
  • v0.9.0
  • v0.8.0
  • v0.7.0
  • v0.6.1
  • v0.6.0
  • v0.5.0
  • v0.4.1
  • v0.4.0
  • v0.3.1
  • v0.3.0
37 results

test_h5.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_h5.py 8.28 KiB
    import unittest
    from tempfile import NamedTemporaryFile
    
    import caosdb as db
    import caosdb.apiutils
    import h5py
    import numpy as np
    from caosadvancedtools.cfoods import h5
    from caosadvancedtools.cfoods.h5 import h5_attr_to_property
    
    from create_dummy_hdf5file import create_hdf5_file
    
    ENTS = {
        101: db.Record(id=101),
        102: db.Record(id=102),
        103: db.Record(id=103).add_property("test", value=101,
                                            datatype=db.REFERENCE),
    }
    
    
    def dummy_get(eid):
        return ENTS[eid]
    
    
    class H5CFoodTest(unittest.TestCase):
        def setUp(self):
            self.h5file = NamedTemporaryFile(delete=False, suffix=".h5")
            self.h5file.close()
            create_hdf5_file(self.h5file.name)
            self.h5obj = h5py.File(self.h5file.name, mode="a")
    
        def test_create_record_records(self):
            result = h5.H5CFood.create_structure(self.h5obj)
    
            record_list = []
            parents = ['group_level1_a', 'group_level1_b', 'group_level1_c', 'root_integers']
    
            for i in parents:
                record_list.append(db.Record().add_parent(name=i))
    
            found_parents = []
    
            for ent in [p.value for p in result.properties]:
                if ent.parents[0].name == 'group_level1_a':
                    found_parents.append('group_level1_a')
                    self.assertTrue(ent.get_property("group_level2_aa") is not None)
                    self.assertTrue(ent.get_property("group_level1_a") is None)
                elif ent.parents[0].name == 'group_level1_b':
                    found_parents.append('group_level1_b')
                    pass
                elif ent.parents[0].name == 'group_level1_c':
                    found_parents.append('group_level1_c')
                    pass
                elif ent.parents[0].name == 'root_integers':
                    found_parents.append('root_integers')
                    pass
    
            for p in parents:
                self.assertTrue(p in found_parents)
    
            for i in range(len(result.properties)):
                for j in result.properties[i].value.get_parents():
                    for k in record_list[i].get_parents():
                        self.assertEqual(j.name, k.name)
    
            result1 = h5.H5CFood.create_structure(self.h5obj["group_level1_a"])
    
            for i in result1.get_parents():
                self.assertEqual(i.name, "group_level1_a")
    
            result2 = h5.H5CFood.create_structure(self.h5obj["group_level1_a/group_level2_aa"])
    
            for i in result2.get_parents():
                self.assertEqual(i.name, "group_level2_aa")
    
        def test_collect_existing_structure(self):
            # TODO this does probably break the code: The function will not be
            # restored correctly.
            # Change it to use the BaseMockUpTest
            real_retrieve = caosdb.apiutils.retrieve_entity_with_id
            caosdb.apiutils.retrieve_entity_with_id = dummy_get
    
            # should run without problem
            h5.collect_existing_structure(db.Record(), db.Record(id=234), h5.EntityMapping())
    
            # test with retrieval: both Records have one test Property with one
            # value -> The referenced Entities are matched
            r_exist = db.Record(id=234)
            r_exist.add_property("test", value=101, datatype=db.REFERENCE)
            r_target = db.Record()
            r_child = db.Record()
            r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
            em = h5.EntityMapping()
            h5.collect_existing_structure(r_target, r_exist, em)
            self.assertTrue(em.to_existing[r_child._cuid] is ENTS[101])
            self.assertTrue(em.to_target[101] is r_child)
    
            # test with retrieval: the existing Record has another Property
            # -> The referenced Entities are matched
            r_exist = db.Record(id=234)
            r_exist.add_property("test_other", value=101, datatype=db.REFERENCE)
            r_target = db.Record()
            r_child = db.Record()
            r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
            em = h5.EntityMapping()
            h5.collect_existing_structure(r_target, r_exist, em)
            self.assertEqual(em.to_existing, {})
            self.assertEqual(em.to_target, {})
    
            # test with retrieval: both Records have one test Property; the
            # existing is missing the value -> The referenced Entities are matched
            r_exist = db.Record(id=234)
            r_exist.add_property("test", value=None, datatype=db.REFERENCE)
            r_target = db.Record()
            r_child = db.Record()
            r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
            em = h5.EntityMapping()
            h5.collect_existing_structure(r_target, r_exist, em)
            self.assertEqual(em.to_existing, {})
            self.assertEqual(em.to_target, {})
    
            # test with retrieval: both Records have one test Property with
            # multiple values -> The referenced Entities are matched
            r_exist = db.Record(id=234)
            r_exist.add_property("test", value=[101, 102], datatype=db.LIST(db.REFERENCE))
            r_target = db.Record()
            r_child = db.Record()
            r_child2 = db.Record()
            r_target.add_property("test", value=[r_child, r_child2],
                                  datatype=db.LIST(db.REFERENCE))
            em = h5.EntityMapping()
            h5.collect_existing_structure(r_target, r_exist, em)
            self.assertEqual(em.to_existing[r_child._cuid], ENTS[101])
            self.assertEqual(em.to_existing[r_child2._cuid], ENTS[102])
            self.assertEqual(em.to_target[101], r_child)
            self.assertEqual(em.to_target[102], r_child2)
    
            # test with retrieval: both Records have one test Property with one
            # value; Add another recursion level -> The referenced Entities are matched
            r_exist = db.Record(id=234)
            r_exist.add_property("test", value=103, datatype=db.REFERENCE)
            r_target = db.Record()
            r_child = db.Record()
            r_child2 = db.Record()
            r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
            r_child.add_property("test", value=r_child2, datatype=db.REFERENCE)
            em = h5.EntityMapping()
            h5.collect_existing_structure(r_target, r_exist, em)
            self.assertEqual(em.to_existing[r_child._cuid], ENTS[103])
            self.assertEqual(em.to_target[103], r_child)
            self.assertEqual(em.to_existing[r_child2._cuid], ENTS[101])
            self.assertEqual(em.to_target[101], r_child2)
    
            caosdb.apiutils.retrieve_entity_with_id = real_retrieve
    
        def test_h5_attr_to_property(self):
    
            test_int: int = 1
            test_integer = np.int_(1)
            test_float = np.float_(1.0)
            test_str = "Test"
            test_complex: complex = 2+3j
            self.assertRaises(NotImplementedError, h5_attr_to_property, test_int)  # only numpy-integers processed?
            self.assertTupleEqual((1, db.INTEGER), h5_attr_to_property(test_integer))
            self.assertTupleEqual((1.0, db.DOUBLE), h5_attr_to_property(test_float))
            self.assertTupleEqual(("Test", db.TEXT), h5_attr_to_property(test_str))
            self.assertTupleEqual((2+3j, db.TEXT), h5_attr_to_property(test_complex))
            # strings are often represented using a binary format
            self.assertTupleEqual(("yeti", db.TEXT), h5_attr_to_property(
                np.array(["yeti"], dtype=h5py.string_dtype(r'utf-8', 8))[0]))
    
            test_integer_1d = np.arange(10)
            test_float_1d = np.arange(0, 1, 0.1)
            test_str_1d = np.array(["a", "b", "c"])
            self.assertTrue((np.arange(10) == h5_attr_to_property(test_integer_1d)[0]).all())
            self.assertTrue(db.LIST(db.INTEGER) == h5_attr_to_property(test_integer_1d)[1])
            self.assertTrue((np.arange(0, 1, 0.1) == h5_attr_to_property(test_float_1d)[0]).all())
            self.assertTrue(db.LIST(db.DOUBLE) == h5_attr_to_property(test_float_1d)[1])
            self.assertTrue((np.array(["a", "b", "c"]) == h5_attr_to_property(test_str_1d)[0]).all())
            self.assertTrue(db.LIST(db.TEXT) == h5_attr_to_property(test_str_1d)[1])
    
            test_integers_2d = np.diag(np.arange(100))
            test_floats_2d = np.eye(100)
            self.assertTupleEqual((None, None), h5_attr_to_property(test_integers_2d))
            self.assertTupleEqual((None, None), h5_attr_to_property(test_floats_2d))
    
            # Test scalar values given as np.array
            self.assertTupleEqual((1, db.INTEGER), h5_attr_to_property(np.array(1)))
            self.assertTupleEqual((1.123, db.DOUBLE), h5_attr_to_property(np.array(1.123)))
            self.assertTupleEqual(('Hello World', db.TEXT), h5_attr_to_property(np.array("Hello World")))