Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_h5.py 8.35 KiB
import unittest
from tempfile import NamedTemporaryFile

import linkahead as db
import linkahead.apiutils
import h5py
import numpy as np
from caosadvancedtools.cfoods import h5
from caosadvancedtools.cfoods.h5 import h5_attr_to_property

from create_dummy_hdf5file import create_hdf5_file

ENTS = {
    101: db.Record(id=101),
    102: db.Record(id=102),
    103: db.Record(id=103).add_property("test", value=101,
                                        datatype=db.REFERENCE),
}


def dummy_get(eid):
    return ENTS[eid]


class H5CFoodTest(unittest.TestCase):
    def setUp(self):
        self.h5file = NamedTemporaryFile(delete=False, suffix=".h5")
        self.h5file.close()
        create_hdf5_file(self.h5file.name)
        self.h5obj = h5py.File(self.h5file.name, mode="a")

    def test_create_record_records(self):
        result = h5.H5CFood.create_structure(self.h5obj)

        record_list = []
        parents = ['group_level1_a', 'group_level1_b', 'group_level1_c', 'root_integers']

        for i in parents:
            record_list.append(db.Record().add_parent(name=i))

        found_parents = []

        for ent in [p.value for p in result.properties]:
            if ent.parents[0].name == 'group_level1_a':
                found_parents.append('group_level1_a')
                self.assertTrue(ent.get_property("group_level2_aa") is not None)
                self.assertTrue(ent.get_property("group_level1_a") is None)
            elif ent.parents[0].name == 'group_level1_b':
                found_parents.append('group_level1_b')
                pass
            elif ent.parents[0].name == 'group_level1_c':
                found_parents.append('group_level1_c')
                pass
            elif ent.parents[0].name == 'root_integers':
                found_parents.append('root_integers')
                pass

        for p in parents:
            self.assertTrue(p in found_parents)

        for i in range(len(result.properties)):
            for j in result.properties[i].value.get_parents():
                for k in record_list[i].get_parents():
                    self.assertEqual(j.name, k.name)

        result1 = h5.H5CFood.create_structure(self.h5obj["group_level1_a"])

        for i in result1.get_parents():
            self.assertEqual(i.name, "group_level1_a")

        result2 = h5.H5CFood.create_structure(self.h5obj["group_level1_a/group_level2_aa"])

        for i in result2.get_parents():
            self.assertEqual(i.name, "group_level2_aa")

    def test_collect_existing_structure(self):
        # TODO this does probably break the code: The function will not be
        # restored correctly.
        # Change it to use the BaseMockUpTest
        real_retrieve = linkahead.apiutils.retrieve_entity_with_id
        linkahead.apiutils.retrieve_entity_with_id = dummy_get

        # should run without problem
        h5.collect_existing_structure(db.Record(), db.Record(id=234), h5.EntityMapping())

        # test with retrieval: both Records have one test Property with one
        # value -> The referenced Entities are matched
        r_exist = db.Record(id=234)
        r_exist.add_property("test", value=101, datatype=db.REFERENCE)
        r_target = db.Record()
        r_child = db.Record()
        r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
        em = h5.EntityMapping()
        h5.collect_existing_structure(r_target, r_exist, em)
        self.assertTrue(em.to_existing[r_child._cuid] is ENTS[101])
        self.assertTrue(em.to_target[101] is r_child)

        # test with retrieval: the existing Record has another Property
        # -> The referenced Entities are matched
        r_exist = db.Record(id=234)
        r_exist.add_property("test_other", value=101, datatype=db.REFERENCE)
        r_target = db.Record()
        r_child = db.Record()
        r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
        em = h5.EntityMapping()
        h5.collect_existing_structure(r_target, r_exist, em)
        self.assertEqual(em.to_existing, {})
        self.assertEqual(em.to_target, {})

        # test with retrieval: both Records have one test Property; the
        # existing is missing the value -> The referenced Entities are matched
        r_exist = db.Record(id=234)
        r_exist.add_property("test", value=None, datatype=db.REFERENCE)
        r_target = db.Record()
        r_child = db.Record()
        r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
        em = h5.EntityMapping()
        h5.collect_existing_structure(r_target, r_exist, em)
        self.assertEqual(em.to_existing, {})
        self.assertEqual(em.to_target, {})

        # test with retrieval: both Records have one test Property with
        # multiple values -> The referenced Entities are matched
        r_exist = db.Record(id=234)
        r_exist.add_property("test", value=[101, 102], datatype=db.LIST(db.REFERENCE))
        r_target = db.Record()
        r_child = db.Record()
        r_child2 = db.Record()
        r_target.add_property("test", value=[r_child, r_child2],
                              datatype=db.LIST(db.REFERENCE))
        em = h5.EntityMapping()
        h5.collect_existing_structure(r_target, r_exist, em)
        self.assertEqual(em.to_existing[r_child._cuid], ENTS[101])
        self.assertEqual(em.to_existing[r_child2._cuid], ENTS[102])
        self.assertEqual(em.to_target[101], r_child)
        self.assertEqual(em.to_target[102], r_child2)

        # test with retrieval: both Records have one test Property with one
        # value; Add another recursion level -> The referenced Entities are matched
        r_exist = db.Record(id=234)
        r_exist.add_property("test", value=103, datatype=db.REFERENCE)
        r_target = db.Record()
        r_child = db.Record()
        r_child2 = db.Record()
        r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
        r_child.add_property("test", value=r_child2, datatype=db.REFERENCE)
        em = h5.EntityMapping()
        h5.collect_existing_structure(r_target, r_exist, em)
        self.assertEqual(em.to_existing[r_child._cuid], ENTS[103])
        self.assertEqual(em.to_target[103], r_child)
        self.assertEqual(em.to_existing[r_child2._cuid], ENTS[101])
        self.assertEqual(em.to_target[101], r_child2)

        linkahead.apiutils.retrieve_entity_with_id = real_retrieve

    def test_h5_attr_to_property(self):

        test_int: int = 1
        test_integer = np.int_(1)
        test_float = np.float_(1.0)
        test_str = "Test"
        test_complex: complex = 2+3j
        self.assertRaises(NotImplementedError, h5_attr_to_property,
                          test_int)  # only numpy-integers processed?
        self.assertTupleEqual((1, db.INTEGER), h5_attr_to_property(test_integer))
        self.assertTupleEqual((1.0, db.DOUBLE), h5_attr_to_property(test_float))
        self.assertTupleEqual(("Test", db.TEXT), h5_attr_to_property(test_str))
        self.assertTupleEqual((2+3j, db.TEXT), h5_attr_to_property(test_complex))
        # strings are often represented using a binary format
        self.assertTupleEqual(("yeti", db.TEXT), h5_attr_to_property(
            np.array(["yeti"], dtype=h5py.string_dtype(r'utf-8', 8))[0]))

        test_integer_1d = np.arange(10)
        test_float_1d = np.arange(0, 1, 0.1)
        test_str_1d = np.array(["a", "b", "c"])
        self.assertTrue((np.arange(10) == h5_attr_to_property(test_integer_1d)[0]).all())
        self.assertTrue(db.LIST(db.INTEGER) == h5_attr_to_property(test_integer_1d)[1])
        self.assertTrue((np.arange(0, 1, 0.1) == h5_attr_to_property(test_float_1d)[0]).all())
        self.assertTrue(db.LIST(db.DOUBLE) == h5_attr_to_property(test_float_1d)[1])
        self.assertTrue((np.array(["a", "b", "c"]) == h5_attr_to_property(test_str_1d)[0]).all())
        self.assertTrue(db.LIST(db.TEXT) == h5_attr_to_property(test_str_1d)[1])

        test_integers_2d = np.diag(np.arange(100))
        test_floats_2d = np.eye(100)
        self.assertTupleEqual((None, None), h5_attr_to_property(test_integers_2d))
        self.assertTupleEqual((None, None), h5_attr_to_property(test_floats_2d))

        # Test scalar values given as np.array
        self.assertTupleEqual((1, db.INTEGER), h5_attr_to_property(np.array(1)))
        self.assertTupleEqual((1.123, db.DOUBLE), h5_attr_to_property(np.array(1.123)))
        self.assertTupleEqual(('Hello World', db.TEXT),
                              h5_attr_to_property(np.array("Hello World")))