import unittest from tempfile import NamedTemporaryFile import linkahead as db import linkahead.apiutils import h5py import numpy as np from caosadvancedtools.cfoods import h5 from caosadvancedtools.cfoods.h5 import h5_attr_to_property from create_dummy_hdf5file import create_hdf5_file ENTS = { 101: db.Record(id=101), 102: db.Record(id=102), 103: db.Record(id=103).add_property("test", value=101, datatype=db.REFERENCE), } def dummy_get(eid): return ENTS[eid] class H5CFoodTest(unittest.TestCase): def setUp(self): self.h5file = NamedTemporaryFile(delete=False, suffix=".h5") self.h5file.close() create_hdf5_file(self.h5file.name) self.h5obj = h5py.File(self.h5file.name, mode="a") def test_create_record_records(self): result = h5.H5CFood.create_structure(self.h5obj) record_list = [] parents = ['group_level1_a', 'group_level1_b', 'group_level1_c', 'root_integers'] for i in parents: record_list.append(db.Record().add_parent(name=i)) found_parents = [] for ent in [p.value for p in result.properties]: if ent.parents[0].name == 'group_level1_a': found_parents.append('group_level1_a') self.assertTrue(ent.get_property("group_level2_aa") is not None) self.assertTrue(ent.get_property("group_level1_a") is None) elif ent.parents[0].name == 'group_level1_b': found_parents.append('group_level1_b') pass elif ent.parents[0].name == 'group_level1_c': found_parents.append('group_level1_c') pass elif ent.parents[0].name == 'root_integers': found_parents.append('root_integers') pass for p in parents: self.assertTrue(p in found_parents) for i in range(len(result.properties)): for j in result.properties[i].value.get_parents(): for k in record_list[i].get_parents(): self.assertEqual(j.name, k.name) result1 = h5.H5CFood.create_structure(self.h5obj["group_level1_a"]) for i in result1.get_parents(): self.assertEqual(i.name, "group_level1_a") result2 = h5.H5CFood.create_structure(self.h5obj["group_level1_a/group_level2_aa"]) for i in result2.get_parents(): self.assertEqual(i.name, "group_level2_aa") def test_collect_existing_structure(self): # TODO this does probably break the code: The function will not be # restored correctly. # Change it to use the BaseMockUpTest real_retrieve = linkahead.apiutils.retrieve_entity_with_id linkahead.apiutils.retrieve_entity_with_id = dummy_get # should run without problem h5.collect_existing_structure(db.Record(), db.Record(id=234), h5.EntityMapping()) # test with retrieval: both Records have one test Property with one # value -> The referenced Entities are matched r_exist = db.Record(id=234) r_exist.add_property("test", value=101, datatype=db.REFERENCE) r_target = db.Record() r_child = db.Record() r_target.add_property("test", value=r_child, datatype=db.REFERENCE) em = h5.EntityMapping() h5.collect_existing_structure(r_target, r_exist, em) self.assertTrue(em.to_existing[r_child._cuid] is ENTS[101]) self.assertTrue(em.to_target[101] is r_child) # test with retrieval: the existing Record has another Property # -> The referenced Entities are matched r_exist = db.Record(id=234) r_exist.add_property("test_other", value=101, datatype=db.REFERENCE) r_target = db.Record() r_child = db.Record() r_target.add_property("test", value=r_child, datatype=db.REFERENCE) em = h5.EntityMapping() h5.collect_existing_structure(r_target, r_exist, em) self.assertEqual(em.to_existing, {}) self.assertEqual(em.to_target, {}) # test with retrieval: both Records have one test Property; the # existing is missing the value -> The referenced Entities are matched r_exist = db.Record(id=234) r_exist.add_property("test", value=None, datatype=db.REFERENCE) r_target = db.Record() r_child = db.Record() r_target.add_property("test", value=r_child, datatype=db.REFERENCE) em = h5.EntityMapping() h5.collect_existing_structure(r_target, r_exist, em) self.assertEqual(em.to_existing, {}) self.assertEqual(em.to_target, {}) # test with retrieval: both Records have one test Property with # multiple values -> The referenced Entities are matched r_exist = db.Record(id=234) r_exist.add_property("test", value=[101, 102], datatype=db.LIST(db.REFERENCE)) r_target = db.Record() r_child = db.Record() r_child2 = db.Record() r_target.add_property("test", value=[r_child, r_child2], datatype=db.LIST(db.REFERENCE)) em = h5.EntityMapping() h5.collect_existing_structure(r_target, r_exist, em) self.assertEqual(em.to_existing[r_child._cuid], ENTS[101]) self.assertEqual(em.to_existing[r_child2._cuid], ENTS[102]) self.assertEqual(em.to_target[101], r_child) self.assertEqual(em.to_target[102], r_child2) # test with retrieval: both Records have one test Property with one # value; Add another recursion level -> The referenced Entities are matched r_exist = db.Record(id=234) r_exist.add_property("test", value=103, datatype=db.REFERENCE) r_target = db.Record() r_child = db.Record() r_child2 = db.Record() r_target.add_property("test", value=r_child, datatype=db.REFERENCE) r_child.add_property("test", value=r_child2, datatype=db.REFERENCE) em = h5.EntityMapping() h5.collect_existing_structure(r_target, r_exist, em) self.assertEqual(em.to_existing[r_child._cuid], ENTS[103]) self.assertEqual(em.to_target[103], r_child) self.assertEqual(em.to_existing[r_child2._cuid], ENTS[101]) self.assertEqual(em.to_target[101], r_child2) linkahead.apiutils.retrieve_entity_with_id = real_retrieve def test_h5_attr_to_property(self): test_int: int = 1 test_integer = np.int_(1) test_float = np.float_(1.0) test_str = "Test" test_complex: complex = 2+3j self.assertRaises(NotImplementedError, h5_attr_to_property, test_int) # only numpy-integers processed? self.assertTupleEqual((1, db.INTEGER), h5_attr_to_property(test_integer)) self.assertTupleEqual((1.0, db.DOUBLE), h5_attr_to_property(test_float)) self.assertTupleEqual(("Test", db.TEXT), h5_attr_to_property(test_str)) self.assertTupleEqual((2+3j, db.TEXT), h5_attr_to_property(test_complex)) # strings are often represented using a binary format self.assertTupleEqual(("yeti", db.TEXT), h5_attr_to_property( np.array(["yeti"], dtype=h5py.string_dtype(r'utf-8', 8))[0])) test_integer_1d = np.arange(10) test_float_1d = np.arange(0, 1, 0.1) test_str_1d = np.array(["a", "b", "c"]) self.assertTrue((np.arange(10) == h5_attr_to_property(test_integer_1d)[0]).all()) self.assertTrue(db.LIST(db.INTEGER) == h5_attr_to_property(test_integer_1d)[1]) self.assertTrue((np.arange(0, 1, 0.1) == h5_attr_to_property(test_float_1d)[0]).all()) self.assertTrue(db.LIST(db.DOUBLE) == h5_attr_to_property(test_float_1d)[1]) self.assertTrue((np.array(["a", "b", "c"]) == h5_attr_to_property(test_str_1d)[0]).all()) self.assertTrue(db.LIST(db.TEXT) == h5_attr_to_property(test_str_1d)[1]) test_integers_2d = np.diag(np.arange(100)) test_floats_2d = np.eye(100) self.assertTupleEqual((None, None), h5_attr_to_property(test_integers_2d)) self.assertTupleEqual((None, None), h5_attr_to_property(test_floats_2d)) # Test scalar values given as np.array self.assertTupleEqual((1, db.INTEGER), h5_attr_to_property(np.array(1))) self.assertTupleEqual((1.123, db.DOUBLE), h5_attr_to_property(np.array(1.123))) self.assertTupleEqual(('Hello World', db.TEXT), h5_attr_to_property(np.array("Hello World")))