-
Florian Spreckelsen authoredFlorian Spreckelsen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_h5.py 8.35 KiB
import unittest
from tempfile import NamedTemporaryFile
import linkahead as db
import linkahead.apiutils
import h5py
import numpy as np
from caosadvancedtools.cfoods import h5
from caosadvancedtools.cfoods.h5 import h5_attr_to_property
from create_dummy_hdf5file import create_hdf5_file
ENTS = {
101: db.Record(id=101),
102: db.Record(id=102),
103: db.Record(id=103).add_property("test", value=101,
datatype=db.REFERENCE),
}
def dummy_get(eid):
return ENTS[eid]
class H5CFoodTest(unittest.TestCase):
def setUp(self):
self.h5file = NamedTemporaryFile(delete=False, suffix=".h5")
self.h5file.close()
create_hdf5_file(self.h5file.name)
self.h5obj = h5py.File(self.h5file.name, mode="a")
def test_create_record_records(self):
result = h5.H5CFood.create_structure(self.h5obj)
record_list = []
parents = ['group_level1_a', 'group_level1_b', 'group_level1_c', 'root_integers']
for i in parents:
record_list.append(db.Record().add_parent(name=i))
found_parents = []
for ent in [p.value for p in result.properties]:
if ent.parents[0].name == 'group_level1_a':
found_parents.append('group_level1_a')
self.assertTrue(ent.get_property("group_level2_aa") is not None)
self.assertTrue(ent.get_property("group_level1_a") is None)
elif ent.parents[0].name == 'group_level1_b':
found_parents.append('group_level1_b')
pass
elif ent.parents[0].name == 'group_level1_c':
found_parents.append('group_level1_c')
pass
elif ent.parents[0].name == 'root_integers':
found_parents.append('root_integers')
pass
for p in parents:
self.assertTrue(p in found_parents)
for i in range(len(result.properties)):
for j in result.properties[i].value.get_parents():
for k in record_list[i].get_parents():
self.assertEqual(j.name, k.name)
result1 = h5.H5CFood.create_structure(self.h5obj["group_level1_a"])
for i in result1.get_parents():
self.assertEqual(i.name, "group_level1_a")
result2 = h5.H5CFood.create_structure(self.h5obj["group_level1_a/group_level2_aa"])
for i in result2.get_parents():
self.assertEqual(i.name, "group_level2_aa")
def test_collect_existing_structure(self):
# TODO this does probably break the code: The function will not be
# restored correctly.
# Change it to use the BaseMockUpTest
real_retrieve = linkahead.apiutils.retrieve_entity_with_id
linkahead.apiutils.retrieve_entity_with_id = dummy_get
# should run without problem
h5.collect_existing_structure(db.Record(), db.Record(id=234), h5.EntityMapping())
# test with retrieval: both Records have one test Property with one
# value -> The referenced Entities are matched
r_exist = db.Record(id=234)
r_exist.add_property("test", value=101, datatype=db.REFERENCE)
r_target = db.Record()
r_child = db.Record()
r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
em = h5.EntityMapping()
h5.collect_existing_structure(r_target, r_exist, em)
self.assertTrue(em.to_existing[r_child._cuid] is ENTS[101])
self.assertTrue(em.to_target[101] is r_child)
# test with retrieval: the existing Record has another Property
# -> The referenced Entities are matched
r_exist = db.Record(id=234)
r_exist.add_property("test_other", value=101, datatype=db.REFERENCE)
r_target = db.Record()
r_child = db.Record()
r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
em = h5.EntityMapping()
h5.collect_existing_structure(r_target, r_exist, em)
self.assertEqual(em.to_existing, {})
self.assertEqual(em.to_target, {})
# test with retrieval: both Records have one test Property; the
# existing is missing the value -> The referenced Entities are matched
r_exist = db.Record(id=234)
r_exist.add_property("test", value=None, datatype=db.REFERENCE)
r_target = db.Record()
r_child = db.Record()
r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
em = h5.EntityMapping()
h5.collect_existing_structure(r_target, r_exist, em)
self.assertEqual(em.to_existing, {})
self.assertEqual(em.to_target, {})
# test with retrieval: both Records have one test Property with
# multiple values -> The referenced Entities are matched
r_exist = db.Record(id=234)
r_exist.add_property("test", value=[101, 102], datatype=db.LIST(db.REFERENCE))
r_target = db.Record()
r_child = db.Record()
r_child2 = db.Record()
r_target.add_property("test", value=[r_child, r_child2],
datatype=db.LIST(db.REFERENCE))
em = h5.EntityMapping()
h5.collect_existing_structure(r_target, r_exist, em)
self.assertEqual(em.to_existing[r_child._cuid], ENTS[101])
self.assertEqual(em.to_existing[r_child2._cuid], ENTS[102])
self.assertEqual(em.to_target[101], r_child)
self.assertEqual(em.to_target[102], r_child2)
# test with retrieval: both Records have one test Property with one
# value; Add another recursion level -> The referenced Entities are matched
r_exist = db.Record(id=234)
r_exist.add_property("test", value=103, datatype=db.REFERENCE)
r_target = db.Record()
r_child = db.Record()
r_child2 = db.Record()
r_target.add_property("test", value=r_child, datatype=db.REFERENCE)
r_child.add_property("test", value=r_child2, datatype=db.REFERENCE)
em = h5.EntityMapping()
h5.collect_existing_structure(r_target, r_exist, em)
self.assertEqual(em.to_existing[r_child._cuid], ENTS[103])
self.assertEqual(em.to_target[103], r_child)
self.assertEqual(em.to_existing[r_child2._cuid], ENTS[101])
self.assertEqual(em.to_target[101], r_child2)
linkahead.apiutils.retrieve_entity_with_id = real_retrieve
def test_h5_attr_to_property(self):
test_int: int = 1
test_integer = np.int_(1)
test_float = np.float_(1.0)
test_str = "Test"
test_complex: complex = 2+3j
self.assertRaises(NotImplementedError, h5_attr_to_property,
test_int) # only numpy-integers processed?
self.assertTupleEqual((1, db.INTEGER), h5_attr_to_property(test_integer))
self.assertTupleEqual((1.0, db.DOUBLE), h5_attr_to_property(test_float))
self.assertTupleEqual(("Test", db.TEXT), h5_attr_to_property(test_str))
self.assertTupleEqual((2+3j, db.TEXT), h5_attr_to_property(test_complex))
# strings are often represented using a binary format
self.assertTupleEqual(("yeti", db.TEXT), h5_attr_to_property(
np.array(["yeti"], dtype=h5py.string_dtype(r'utf-8', 8))[0]))
test_integer_1d = np.arange(10)
test_float_1d = np.arange(0, 1, 0.1)
test_str_1d = np.array(["a", "b", "c"])
self.assertTrue((np.arange(10) == h5_attr_to_property(test_integer_1d)[0]).all())
self.assertTrue(db.LIST(db.INTEGER) == h5_attr_to_property(test_integer_1d)[1])
self.assertTrue((np.arange(0, 1, 0.1) == h5_attr_to_property(test_float_1d)[0]).all())
self.assertTrue(db.LIST(db.DOUBLE) == h5_attr_to_property(test_float_1d)[1])
self.assertTrue((np.array(["a", "b", "c"]) == h5_attr_to_property(test_str_1d)[0]).all())
self.assertTrue(db.LIST(db.TEXT) == h5_attr_to_property(test_str_1d)[1])
test_integers_2d = np.diag(np.arange(100))
test_floats_2d = np.eye(100)
self.assertTupleEqual((None, None), h5_attr_to_property(test_integers_2d))
self.assertTupleEqual((None, None), h5_attr_to_property(test_floats_2d))
# Test scalar values given as np.array
self.assertTupleEqual((1, db.INTEGER), h5_attr_to_property(np.array(1)))
self.assertTupleEqual((1.123, db.DOUBLE), h5_attr_to_property(np.array(1.123)))
self.assertTupleEqual(('Hello World', db.TEXT),
h5_attr_to_property(np.array("Hello World")))