diff --git a/src/caoscrawler/sync_graph.py b/src/caoscrawler/sync_graph.py index a4d321681473f02e12b79960e09ed41bec4aa986..0bd038abe6884fe93215bfec6fb12b25882e03ad 100644 --- a/src/caoscrawler/sync_graph.py +++ b/src/caoscrawler/sync_graph.py @@ -41,6 +41,30 @@ from .sync_node import SyncNode logger = logging.getLogger(__name__) +def _for_each_scalar_value(node, condition, kind, value=None): + for p in node.properties: + if isinstance(p.value, list): + for ii, el in enumerate(p.value): + if condition(el): + if kind == "remove": + p.value.remove(el) + elif kind == "set": + p.value[ii] = value(el) + elif condition(p.value): + if kind == "remove": + node.properties.remove(p) + elif kind == "set": + p.value = value(p.value) + + +def _remove_each_scalar_value(node, condition): + _for_each_scalar_value(node, condition, "remove") + + +def _set_each_scalar_value(node, condition, value): + _for_each_scalar_value(node, condition, "set", value=value) + + class SyncGraph(): """ combines nodes in the graph based on their identity in order to create a graph of objects that can either be inserted or update in(to) the remote server. @@ -75,6 +99,7 @@ class SyncGraph(): self._existing: Dict[int, SyncNode] = {} # entities that are missing get negative IDs to allow identifiable creation self._remote_missing_counter = -1 + self._replacements = {} self.problems = set() self.raise_problems = raise_problems @@ -114,7 +139,7 @@ class SyncGraph(): if self.raise_problems: raise else: - remove, rmrefs = self.remove_failed(node) + remove = self.remove_failed(node) logger.error(exc) logger.error(f"{len(remove)}, {len(rmrefs)}") for node in list(self.nodes): @@ -125,6 +150,11 @@ class SyncGraph(): except Exception as es: pass + def _get_replacement(self, uuid): + while uuid in self._replacements: + uuid = self._replacements[uuid] + return uuid + def remove_unidentifiable(self, node): self.unchecked.remove(node) # TODO remove from identifying references! @@ -138,8 +168,6 @@ class SyncGraph(): if node_id is None: node_id = self._get_new_id() node.id = node_id - for el in node.other: - el.id = node_id if node_id in self._id_look_up: self._merge_into(node, self._id_look_up[node.id]) else: @@ -197,36 +225,27 @@ class SyncGraph(): if self.raise_problems: raise else: - remove, rmrefs = self.remove_failed(el) + remove = self.remove_failed(el) logger.error(exc) logger.error(f"{len(remove)}, {len(rmrefs)}") for el in self.nodes: try: entities.append(el.export_entity()) node_map[id(el)] = entities[-1] - for oel in el.other: - node_map[id(oel)] = entities[-1] except ImpossibleMergeError as exc: if self.raise_problems: raise else: - remove, rmrefs = self.remove_failed(el) + remove = self.remove_failed(el) logger.error(exc) logger.error(f"{len(remove)}, {len(rmrefs)}") if len(self.unchecked) > 1: self.unchecked_contains_circular_dependency() for ent in entities: - for p in ent.properties: - if isinstance(p.value, list): - for ii, el in enumerate(p.value): - if isinstance(el, SyncNode): - # TODO how is it possible that id(el) is not in node_map - # probably because el was removed from nodes. - p.value[ii] = node_map[id(el)] - - elif isinstance(p.value, SyncNode): - p.value = node_map[id(p.value)] + _set_each_scalar_value(ent, + condition=lambda val: isinstance(val, SyncNode), + value=lambda val: node_map[id(val)]) missing = [el for el in entities if el.id < 0] existing = [el for el in entities if el.id > 0] @@ -354,24 +373,14 @@ class SyncGraph(): def remove_failed(self, node): if node not in self.nodes: - return [], [] + return [] self.nodes.remove(node) if node in self.unchecked: self.unchecked.remove(node) - refs2 = [] # remove reference property or value from referencing nodes for referencing in self.backward_references[node.uuid]: - for p in referencing.properties: - v = p.value - if not isinstance(p.value, list): - v = [v] - for vv in v: - if vv is node: - if not isinstance(p.value, list): - referencing.properties.remove(p) - else: - p.value.remove(node) - refs2.append(referencing) + _remove_each_scalar_value(referencing, + lambda val: val is node) remove = [] remove.extend(self.forward_id_referenced_by[node.uuid]) @@ -396,8 +405,7 @@ class SyncGraph(): for el in remove: rm, rf = self.remove_failed(el) remove.extend(rm) - refs2.extend(rf) - return remove, refs2 + return remove @staticmethod def _create_reference_mapping(flat: List[SyncNode]): @@ -500,10 +508,7 @@ class SyncGraph(): """ assert source is not target target.update(source) - target.other.append(source) - target.other.extend(source.other) - for el in target.other: - el.id = target.id + self._replacements[source.uuid] = target.uuid # update reference mappings for node in self.forward_references.pop(source.uuid): @@ -511,6 +516,10 @@ class SyncGraph(): self.backward_references[node.uuid].remove(source) self.backward_references[node.uuid].add(target) for node in self.backward_references.pop(source.uuid): + # replace actual reference property values + _set_each_scalar_value(node, + condition=lambda val: val is source, + value=lambda val: target) self.backward_references[target.uuid].add(node) self.forward_references[node.uuid].remove(source) self.forward_references[node.uuid].add(target) @@ -569,14 +578,9 @@ class SyncGraph(): self.identifiableAdapter.get_registered_identifiable(el))) se_lookup[id(el)] = self.nodes[-1] for node in self.nodes: - for p in node.properties: - if isinstance(p.value, list): - for index, val in enumerate(p.value): - if id(val) in se_lookup: - p.value[index] = se_lookup[id(val)] - else: - if id(p.value) in se_lookup: - p.value = se_lookup[id(p.value)] + _set_each_scalar_value(node, + condition=lambda val: id(val) in se_lookup, + value=lambda val: se_lookup[id(val)]) def _treat_missing(self, node): self._missing[id(node)] = node diff --git a/src/caoscrawler/sync_node.py b/src/caoscrawler/sync_node.py index 0617c1781c8e759622c4c7068eb817be0099a24d..3ccff28d4507e3cc628cc32ad1b4b023446cb4f5 100644 --- a/src/caoscrawler/sync_node.py +++ b/src/caoscrawler/sync_node.py @@ -128,7 +128,7 @@ class SyncNode(): res += f"user: {self._metadata['user']}\n" res += f"json: {self._metadata['json']}\n" res += "---------------------------------------------------\n" - res += yaml.dump({"id": self.id, "name": self.name, + res += yaml.dump({"uuid": self.uuid.hex, "id": self.id, "name": self.name, "parents": [el.name for el in self.parents]}, allow_unicode=True) res += "---------------------------------------------------\n" res += "properties:\n" diff --git a/unittests/test_sync_graph.py b/unittests/test_sync_graph.py index 0b58cd837fa9a541ea44d2538e1ea20dc5dd329e..d969ddcf27335f0f0a37a0852cb42810db90f4c3 100644 --- a/unittests/test_sync_graph.py +++ b/unittests/test_sync_graph.py @@ -23,6 +23,7 @@ from unittest.mock import MagicMock, Mock, patch import linkahead as db import pytest +from caoscrawler.exceptions import ImpossibleMergeError from caoscrawler.identifiable import Identifiable from caoscrawler.identifiable_adapters import CaosDBIdentifiableAdapter from caoscrawler.sync_graph import SyncGraph @@ -170,6 +171,8 @@ def test_merge_into_trivial(simple_adapter): st._merge_into(se_a, se_b) + assert st._get_replacement(se_a.uuid) == se_b.uuid + # CHECK REFERENCE MAP (after merge): # c is now referenced by b assert se_a.uuid not in st.forward_references @@ -199,6 +202,11 @@ def test_merge_into_trivial(simple_adapter): assert len(st.backward_id_referenced_by[se_c.uuid]) == 1 se_b in st.backward_id_referenced_by[se_c.uuid] + # the following is only to test multiple replacements + st._merge_into(se_b, se_c) + + assert st._get_replacement(se_a.uuid) == se_c.uuid + def test_merge_into_simple(simple_adapter): # simple case: a -> c <- b (a & b reference c; a & b have the same target record) @@ -289,16 +297,16 @@ def test_backward_id_referenced_by(): ident_adapter.register_identifiable("C", ident_b) referenced = db.Record(name="B").add_parent("C") - entlist = [referenced, db.Record(name="A").add_parent("BR").add_property("ref", referenced), ] + ent_list = [referenced, db.Record(name="A").add_parent("BR").add_property("ref", referenced), ] - st = SyncGraph(entlist, ident_adapter) + st = SyncGraph(ent_list, ident_adapter) assert st.nodes[1] in st.backward_id_referenced_by[st.nodes[0].uuid] def test_set_id_of_node(simple_adapter): # setting the id should lead to the node being marked as existing - entlist = [db.Record().add_parent("RT1")] - st = SyncGraph(entlist, simple_adapter) + ent_list = [db.Record().add_parent("RT1")] + st = SyncGraph(ent_list, simple_adapter) assert len(st.nodes) == 1 assert len(st.unchecked) == 1 st.set_id_of_node(st.unchecked[0], 101) @@ -307,8 +315,8 @@ def test_set_id_of_node(simple_adapter): assert id(st.nodes[0]) in st._existing # setting the id with None should lead to the node being marked as missing - entlist = [db.Record().add_parent("RT1").add_property(name="RT2", value=1)] - st = SyncGraph(entlist, simple_adapter) + ent_list = [db.Record().add_parent("RT1").add_property(name="RT2", value=1)] + st = SyncGraph(ent_list, simple_adapter) assert len(st.nodes) == 1 assert len(st.unchecked) == 1 # is automatically set in during initialization of graph @@ -319,10 +327,10 @@ def test_set_id_of_node(simple_adapter): assert id(st.nodes[0]) in st._missing # setting the id to one that already exists should lead to a merge - entlist = [ + ent_list = [ db.Record(id=101).add_parent("RT1"), db.Record().add_parent("RT1").add_property(name="a", value=1)] - st = SyncGraph(entlist, simple_adapter) + st = SyncGraph(ent_list, simple_adapter) assert len(st.nodes) == 2 assert len(st.unchecked) == 1 st.set_id_of_node(st.unchecked[0], 101) @@ -331,11 +339,11 @@ def test_set_id_of_node(simple_adapter): assert st.nodes[0].properties[0].name == "a" # setting the id to None should lead to depending nodes marked as missing - entlist = [ + ent_list = [ db.Record().add_parent("RT3").add_property(name="a", value=1).add_property( name="RT2", value=db.Record().add_parent("RT2")), ] - st = SyncGraph(entlist, simple_adapter) + st = SyncGraph(ent_list, simple_adapter) assert len(st.nodes) == 2 assert len(st.unchecked) == 2 st.set_id_of_node(st.unchecked[0]) @@ -345,13 +353,13 @@ def test_set_id_of_node(simple_adapter): assert id(st.nodes[1]) in st._missing # same as above but with backref - entlist = [ + ent_list = [ db.Record() .add_parent("RT4") .add_property(name="RT3", value=db.Record().add_parent("RT3").add_property(name="a", value=1)), ] - st = SyncGraph(entlist, simple_adapter) + st = SyncGraph(ent_list, simple_adapter) assert len(st.nodes) == 2 assert len(st.unchecked) == 2 assert st.unchecked[1].identifiable is not None @@ -362,13 +370,13 @@ def test_set_id_of_node(simple_adapter): assert id(st.nodes[1]) in st._missing # setting an id might allow to check another node that depends on the former - entlist = [ + ent_list = [ db.Record() .add_parent("RT4") .add_property(name="RT3", value=db.Record().add_parent("RT3").add_property(name="a", value=1)), ] - st = SyncGraph(entlist, simple_adapter) + st = SyncGraph(ent_list, simple_adapter) assert st.nodes[0].identifiable is None assert st.nodes[1].identifiable is not None st.set_id_of_node(st.unchecked[1], 111) @@ -376,7 +384,7 @@ def test_set_id_of_node(simple_adapter): assert st.nodes[1].identifiable is not None # same as above but going one step further: the new identifiable allows to merge that node - entlist = [ + ent_list = [ (db.Record() .add_parent("RT4") .add_property(name="RT3", @@ -386,7 +394,7 @@ def test_set_id_of_node(simple_adapter): .add_parent("RT4") .add_property(name="RT3", value=111)) ] - st = SyncGraph(entlist, simple_adapter) + st = SyncGraph(ent_list, simple_adapter) assert st.nodes[0].identifiable is None assert st.nodes[1].identifiable is not None assert st.nodes[2].identifiable is not None @@ -395,65 +403,44 @@ def test_set_id_of_node(simple_adapter): assert st.nodes[0].identifiable is not None assert len(st.nodes) == 2 - # Test for meaningful exception when referencing a list of unmergeable entities. - # - # Datamodel - # --------- - # A: - # B: LIST<B> - # prop_ident: INTEGER - # - # B: - # prop_ident: - # - # - # Identifiables - # ------------- - # - # id_A: [prop_ident] - # id_B: [prop_ident, "is_referenced_by: A"] - # - # Data - # ---- - # - # - # b1: ("same", c1) - # b2: ("same", c2) - # - # a: ([b1, b2]) - - prop_ident = db.Property("prop_ident", datatype=db.INTEGER) - prop_other = db.Property("prop_ident", datatype=db.INTEGER) - # Somehow it is necessary that `B` has a reference property. Dunno if C must have an - # identifiable as well. - rt_b = db.RecordType("B").add_property(prop_ident).add_property("C") - rt_a = db.RecordType("A").add_property(prop_ident).add_property("LIST<B>") - - ident_a = db.RecordType().add_parent("A").add_property("prop_ident") - ident_b = db.RecordType().add_parent("B").add_property("prop_ident").add_property( - "is_referenced_by", value="A") + # Test for meaningful exception when having unmergeable properties. + ent_list = [ + db.Record().add_parent("RT3").add_property('a', value=1).add_property('b', value=1), + db.Record().add_parent("RT3").add_property('a', value=1).add_property('b', value=2), + ] - rec_a = db.Record("a").add_parent(rt_a).add_property("prop_ident", value=1234) - rec_b = [] - for value in [23, 42]: - rec_b.append(db.Record().add_parent(rt_b).add_property("prop_ident", value=2020)) - rec_a.add_property("B", rec_b) + st = SyncGraph(ent_list, simple_adapter) + with pytest.raises(ImpossibleMergeError): + st.export_record_lists() + ent_list = [ + db.Record().add_parent("RT3").add_property('a', value=1) + .add_property('b', value=db.Record().add_parent("RT5")), + db.Record().add_parent("RT3").add_property('a', value=1) + .add_property('b', value=db.Record().add_parent("RT5")), + ] - ident_adapter = CaosDBIdentifiableAdapter() - ident_adapter.register_identifiable("A", ident_a) - ident_adapter.register_identifiable("B", ident_b) + st = SyncGraph(ent_list, simple_adapter) + with pytest.raises(ImpossibleMergeError): + st.export_record_lists() + ent_list = [ + db.Record(id=101).add_parent("RT3") + .add_property('b', value=db.Record().add_parent("RT5")), + db.Record().add_parent("RT3") + .add_property('b', value=db.Record().add_parent("RT5")), + ] - st = SyncGraph([rec_a, *rec_b], ident_adapter) - for node in st.nodes: - print(node.id, node.parents) - assert st.identity_relies_on_unchecked_entity(st.nodes[0]) is False - assert st.identity_relies_on_unchecked_entity(st.nodes[1]) - assert st.identity_relies_on_unchecked_entity(st.nodes[2]) + st = SyncGraph(ent_list, simple_adapter) + assert st.nodes[2].is_unidentifiable() + assert st.nodes[3].is_unidentifiable() + assert len(st.nodes) == 4 + assert len(st.unchecked) == 1 + st.set_id_of_node(st.nodes[1], 101) assert len(st.nodes) == 3 - assert len(st.unchecked) == 3 - st.set_id_of_node(st.nodes[0]) - assert len(st.nodes) == 2 assert len(st.unchecked) == 0 + # until implementation of it ... + with pytest.raises(NotImplementedError): + # with pytest.raises(ImpossibleMergeError): + st.export_record_lists() @patch("caoscrawler.sync_graph.cached_get_entity_by", @@ -469,30 +456,30 @@ def test_merging(simple_adapter): basic_retrieve_by_name_mock_up, known={"A": db.Record(id=1111, name="A")})) # merging based on id - entlist = [ + ent_list = [ db.Record(id=101).add_parent("A"), db.Record(id=101).add_parent("A")] - st = SyncGraph(entlist, ident_adapter) + st = SyncGraph(ent_list, ident_adapter) assert len(st.nodes) == 1 assert len(st.unchecked) == 0 assert 101 == st.nodes[0].id assert "A" == st.nodes[0].parents[0].name # merging based on path - entlist = [ + ent_list = [ db.File(path='101').add_parent("A"), db.File(path='101').add_parent("A")] - st = SyncGraph(entlist, ident_adapter) + st = SyncGraph(ent_list, ident_adapter) assert len(st.nodes) == 1 assert len(st.unchecked) == 0 assert '101' == st.nodes[0].path assert "A" == st.nodes[0].parents[0].name # merging based on identifiable - entlist = [ + ent_list = [ db.File(name='101').add_parent("A").add_property('a', value=1), db.File(name='101').add_parent("A").add_property('a', value=1)] - st = SyncGraph(entlist, ident_adapter) + st = SyncGraph(ent_list, ident_adapter) assert len(st.nodes) == 1 assert st.nodes[0].id is None assert '101' == st.nodes[0].name @@ -502,13 +489,13 @@ def test_merging(simple_adapter): # Merging a mix. One Record needs the identifiable to be merged. But the identifying # information is scattered in the other case. - entlist = [ + ent_list = [ db.Record(id=101).add_parent("A"), db.Record(id=101, name='a').add_parent("A"), db.Record(id=101).add_parent("A").add_property('a', value=1), db.Record(name='a').add_parent("A").add_property('a', value=1)] - st = SyncGraph(entlist, ident_adapter) + st = SyncGraph(ent_list, ident_adapter) assert len(st.nodes) == 1 assert len(st.unchecked) == 0 assert 'a' == st.nodes[0].name @@ -520,7 +507,7 @@ def test_merging(simple_adapter): def test_something(simple_adapter): a = db.Record().add_parent("RT3").add_property('a', value=1) - entlist = [ + ent_list = [ a, db.Record().add_parent("RT3").add_property('a', value=1), db.Record().add_parent("RT3").add_property('a', value=1), @@ -529,7 +516,7 @@ def test_something(simple_adapter): db.Record().add_parent("RT4").add_property('RT3', value=a), db.Record().add_parent("RT3").add_property('a', value=1), db.Record().add_parent("RT3").add_property('a', value=1)] - st = SyncGraph(entlist, simple_adapter) + st = SyncGraph(ent_list, simple_adapter) assert len(st.nodes) == 2 assert len(st.unchecked) == 2 assert 'RT4' == st.nodes[1].parents[0].name @@ -539,10 +526,10 @@ def test_something(simple_adapter): def test_no_ident(simple_adapter): - entlist = [ + ent_list = [ db.Record(name='el').add_parent("RT5"), db.Record(name='101').add_parent("RT3").add_property('a', value=1)] - st = SyncGraph(entlist, simple_adapter) + st = SyncGraph(ent_list, simple_adapter) assert len(st.nodes) == 2 assert len(st.unchecked) == 1 # the non-identifiable was removed from unchecked @@ -555,12 +542,12 @@ def test_no_ident(simple_adapter): assert len(updates) == 1 el = db.Record(name='el').add_parent("RT5") - entlist = [ + ent_list = [ db.Record(name='101').add_parent("RT3").add_property('a', value=1).add_property('b', value=el), el, db.Record(name='101').add_parent("RT3").add_property('a', value=1)] - st = SyncGraph(entlist, simple_adapter) + st = SyncGraph(ent_list, simple_adapter) assert len(st.nodes) == 2 assert len(st.unchecked) == 1 # the non-identifiable was removed from unchecked @@ -719,3 +706,26 @@ def test_export_node(): assert len(p.value) == len(exp.get_property(p.name).value) assert len(exp.properties) == len(rec_a.properties) assert len(exp.parents) == len(rec_a.parents) + + +def test_remove_merged(simple_adapter): + # We reference an entity that is merged into another node and then remove the merged node + # This should result in the reference being removed + b = db.Record().add_parent("RT3").add_property('a', value=1) + ent_list = [ + db.Record().add_parent("RT3").add_property('a', value=1), + b, + db.Record().add_parent("RT3").add_property('a', value=3).add_property('RT3', value=b), + ] + + st = SyncGraph(ent_list, simple_adapter) + se_a = st.nodes[0] + se_c = st.nodes[1] + for node in st.nodes: + print(node) + assert len(st.nodes) == 2 + assert len(st.unchecked) == 2 + st.remove_failed(se_a) + assert len(st.nodes) == 1 + assert len(st.unchecked) == 1 + assert "RT3" not in [p.name for p in se_c.properties]