Select Git revision
test_state.py
-
Henrik tom Wörden authoredHenrik tom Wörden authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_sync_graph.py 24.29 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
from functools import partial
from unittest.mock import MagicMock, Mock, patch
import linkahead as db
import pytest
from caoscrawler.exceptions import ImpossibleMergeError, MissingIdentifyingProperty
from caoscrawler.identifiable import Identifiable
from caoscrawler.identifiable_adapters import CaosDBIdentifiableAdapter
from caoscrawler.sync_graph import SyncGraph
from caoscrawler.sync_node import SyncNode, parent_in_list, property_in_list
from test_crawler import basic_retrieve_by_name_mock_up, mock_get_entity_by
@pytest.fixture
def simple_adapter():
# We use the reference as identifying reference in both directions. Thus the map is the same
# for all three categories: references, id_references and id_referenced_by
ident_adapter = CaosDBIdentifiableAdapter()
ident_adapter.register_identifiable(
"RT1",
db.RecordType().add_parent("RT1").add_property("RT2"))
ident_adapter.register_identifiable(
"RT2",
db.RecordType().add_parent("RT2").add_property("is_referenced_by", ["RT1", "RT3"]))
ident_adapter.register_identifiable(
"RT3",
db.RecordType().add_parent("RT3").add_property("a"))
ident_adapter.register_identifiable(
"RT4",
db.RecordType().add_parent("RT4").add_property("RT3"))
ident_adapter.register_identifiable(
"RT5",
db.RecordType().add_parent("RT5").add_property("name"))
return ident_adapter
def test_create_flat_list():
a = db.Record()
b = db.Record()
a.add_property(name="a", value=a)
a.add_property(name="b", value=b)
flat = SyncGraph._create_flat_list([a])
assert len(flat) == 2
assert a in flat
assert b in flat
c = db.Record()
c.add_property(name="a", value=a)
# This would caus recursion if it is not dealt with properly.
a.add_property(name="c", value=c)
flat = SyncGraph._create_flat_list([c])
assert len(flat) == 3
assert a in flat
assert b in flat
assert c in flat
def test_create_reference_mapping():
a = SyncNode(db.Record().add_parent("RT1"),
db.RecordType().add_property("is_referenced_by", ["RT2"]))
b = SyncNode(db.Record(id=132).add_parent("RT2").add_property('a', a),
db.RecordType().add_property("a"))
ses = [a, b]
(forward_references, backward_references, forward_id_references,
backward_id_references, forward_id_referenced_by,
backward_id_referenced_by) = SyncGraph._create_reference_mapping(ses)
# test initialization
assert id(ses[0]) in forward_references
assert id(ses[1]) in forward_references
assert id(ses[0]) in backward_references
assert id(ses[1]) in backward_references
assert id(ses[0]) in forward_id_references
assert id(ses[1]) in forward_id_references
assert id(ses[0]) in backward_id_references
assert id(ses[1]) in backward_id_references
assert id(ses[0]) in forward_id_referenced_by
assert id(ses[1]) in forward_id_referenced_by
assert id(ses[0]) in backward_id_referenced_by
assert id(ses[1]) in backward_id_referenced_by
# a has no ref
assert len(forward_references[id(ses[0])]) == 0
assert backward_references[id(ses[0])] == set([ses[1]])
# b does
assert forward_references[id(ses[1])] == set([ses[0]])
assert backward_references[id(ses[1])] == set()
# a has no identifying reference
assert forward_id_references[id(ses[0])] == set()
assert backward_references[id(ses[0])] == set([ses[1]])
# b has an identifying reference
assert forward_id_references[id(ses[1])] == set([ses[0]])
assert backward_references[id(ses[1])] == set()
# a has an identifying back reference
assert forward_id_referenced_by[id(ses[0])] == set()
assert backward_id_referenced_by[id(ses[0])] == set([ses[1]])
# b does not
assert forward_id_referenced_by[id(ses[1])] == set([ses[0]])
assert backward_id_referenced_by[id(ses[1])] == set()
@patch("caoscrawler.sync_graph.cached_get_entity_by",
new=Mock(side_effect=mock_get_entity_by))
def test_SyncGraph_init():
# trivial case
a = db.Record(id=101).add_parent("A")
ident_a = db.RecordType().add_parent("A").add_property("prop_ident")
ident_adapter = CaosDBIdentifiableAdapter()
ident_adapter.register_identifiable("A", ident_a)
SyncGraph([a], ident_adapter)
SyncGraph([], ident_adapter) # should not fail either...
# test whether missing identifying properties cause an exception
with pytest.raises(MissingIdentifyingProperty):
SyncGraph([db.Record().add_parent("A")], ident_adapter)
entlist = [
db.Record(id=101).add_parent("A"),
db.Record(id=102).add_parent("A"),
db.File(path='a').add_parent("A"),
db.File(path='b').add_parent("A"),
db.Record(id=103).add_parent("A"),
db.Record(id=104).add_parent("A").add_property(name='prop_ident', value="MERGEME"),
db.Record().add_parent("A").add_property(name='prop_ident', value="MERGEME"),
db.File(path='a', file='b').add_parent("A"),
db.Record(id=101).add_parent("A"),
db.Record().add_parent("A").add_property(name='prop_ident', value="other"),
db.Record().add_parent("A").add_property(name='prop_ident',
value=db.Record().add_parent("A")
.add_property(name='prop_ident', value="other")),
db.File(path='a', file='b').add_parent("A"),
db.Record(id=101).add_parent("A"),
]
st = SyncGraph(entlist, ident_adapter)
# all nodes with ID=101 have been merged
assert len([el for el in st.nodes if el.id == 101]) == 1
# all nodes with path='a' have been merged
assert len([el for el in st.nodes if el.path == 'a']) == 1
# all nodes with ID or path were removed from unchecked
for el in st.nodes:
if el.id is not None or el.path is not None:
assert el not in st.unchecked
# all nodes with ID are in the ID lookup
for el in st.nodes:
if el.id is not None:
assert st._id_look_up[el.id] is el
# all nodes with path are in the path lookup
for el in st.nodes:
if el.path is not None:
assert st._path_look_up[el.path] is el
# all nodes with identifiable are in the identifiable lookup
for el in st.nodes:
if el.identifiable is not None:
assert st._identifiable_look_up[el.identifiable.get_representation()] is el
# node without ID but with identifiable was merged with other node with ID
assert len([el for el in st.nodes if len(el.properties) > 0
and el.properties[0].value == "MERGEME"]) == 1
# every node that does not rely on something unchecked has an identifiable or an ID
for el in st.nodes:
if not st._identity_relies_on_unchecked_entity(el):
assert el.identifiable is not None or el.id is not None
def test_merge_into_trivial(simple_adapter):
# simplest case: a -> c
# b
# (a reference c; b does not reference anything; a & b have the same target
# record)
c = db.Record(name='c').add_parent("RT2")
a = db.Record(name='a').add_parent("RT1").add_property('RT2', c)
b = db.Record(id=101).add_parent("RT1")
st = SyncGraph([a, b], simple_adapter)
se_a = st.nodes[0]
se_b = st.nodes[1]
se_c = st.nodes[2]
assert se_a.name is 'a'
assert se_b.id is 101
assert se_c.name is 'c'
# CHECK REFERENCE MAP (before merge):
# c is referenced by a
assert len(st.forward_references[id(se_a)]) == 1
se_c in st.forward_references[id(se_a)]
assert len(st.forward_references[id(se_b)]) == 0
assert len(st.forward_references[id(se_c)]) == 0
assert len(st.backward_references[id(se_a)]) == 0
assert len(st.backward_references[id(se_b)]) == 0
assert len(st.backward_references[id(se_c)]) == 1
se_a in st.backward_references[id(se_c)]
assert len(st.forward_id_references[id(se_a)]) == 1
se_c in st.forward_id_references[id(se_a)]
assert len(st.forward_id_references[id(se_b)]) == 0
assert len(st.forward_id_references[id(se_c)]) == 0
assert len(st.backward_id_references[id(se_a)]) == 0
assert len(st.backward_id_references[id(se_b)]) == 0
assert len(st.backward_id_references[id(se_c)]) == 1
se_a in st.backward_id_references[id(se_c)]
assert len(st.forward_id_referenced_by[id(se_a)]) == 1
se_c in st.forward_id_referenced_by[id(se_a)]
assert len(st.forward_id_referenced_by[id(se_b)]) == 0
assert len(st.forward_id_referenced_by[id(se_c)]) == 0
assert len(st.backward_id_referenced_by[id(se_a)]) == 0
assert len(st.backward_id_referenced_by[id(se_b)]) == 0
assert len(st.backward_id_referenced_by[id(se_c)]) == 1
se_a in st.backward_id_referenced_by[id(se_c)]
st.set_id_of_node(se_a, 101)
# CHECK REFERENCE MAP (after merge):
# c is now referenced by b
assert id(se_a) not in st.forward_references
assert len(st.forward_references[id(se_b)]) == 1
se_c in st.forward_references[id(se_b)]
assert len(st.forward_references[id(se_c)]) == 0
assert id(se_a) not in st.backward_references
assert len(st.backward_references[id(se_b)]) == 0
assert len(st.backward_references[id(se_c)]) == 1
se_b in st.backward_references[id(se_c)]
assert id(se_a) not in st.forward_id_references
assert len(st.forward_id_references[id(se_b)]) == 1
se_c in st.forward_id_references[id(se_b)]
assert len(st.forward_id_references[id(se_c)]) == 0
assert id(se_a) not in st.backward_id_references
assert len(st.backward_id_references[id(se_b)]) == 0
assert len(st.backward_id_references[id(se_c)]) == 1
se_b in st.backward_id_references[id(se_c)]
assert id(se_a) not in st.forward_id_referenced_by
assert len(st.forward_id_referenced_by[id(se_b)]) == 1
se_c in st.forward_id_referenced_by[id(se_b)]
assert len(st.forward_id_referenced_by[id(se_c)]) == 0
assert id(se_a) not in st.backward_id_referenced_by
assert len(st.backward_id_referenced_by[id(se_b)]) == 0
assert len(st.backward_id_referenced_by[id(se_c)]) == 1
se_b in st.backward_id_referenced_by[id(se_c)]
def test_merge_into_simple(simple_adapter):
# simple case: a -> c <- b (a & b reference c; a & b have the same target record)
c = db.Record(name='c').add_parent("RT2")
a = db.Record().add_parent("RT1").add_property('RT2', c)
b = db.Record().add_parent("RT1").add_property('RT2', c)
st = SyncGraph([a, b], simple_adapter)
se_a = st.nodes[0]
se_b = st.nodes[1]
se_c = st.nodes[2]
# CHECK REFERENCE MAP:
# c is referenced by a & b
assert len(st.forward_references[id(se_a)]) == 1
se_c in st.forward_references[id(se_a)]
assert len(st.forward_references[id(se_b)]) == 1
se_c in st.forward_references[id(se_b)]
assert len(st.forward_references[id(se_c)]) == 0
assert len(st.backward_references[id(se_a)]) == 0
assert len(st.backward_references[id(se_b)]) == 0
assert len(st.backward_references[id(se_c)]) == 2
se_a in st.backward_references[id(se_c)]
se_b in st.backward_references[id(se_c)]
assert len(st.forward_id_references[id(se_a)]) == 1
se_c in st.forward_id_references[id(se_a)]
assert len(st.forward_id_references[id(se_b)]) == 1
se_c in st.forward_id_references[id(se_b)]
assert len(st.forward_id_references[id(se_c)]) == 0
assert len(st.backward_id_references[id(se_a)]) == 0
assert len(st.backward_id_references[id(se_b)]) == 0
assert len(st.backward_id_references[id(se_c)]) == 2
se_a in st.backward_id_references[id(se_c)]
se_b in st.backward_id_references[id(se_c)]
assert len(st.forward_id_referenced_by[id(se_a)]) == 1
se_c in st.forward_id_referenced_by[id(se_a)]
assert len(st.forward_id_referenced_by[id(se_b)]) == 1
se_c in st.forward_id_referenced_by[id(se_b)]
assert len(st.forward_id_referenced_by[id(se_c)]) == 0
assert len(st.backward_id_referenced_by[id(se_a)]) == 0
assert len(st.backward_id_referenced_by[id(se_b)]) == 0
assert len(st.backward_id_referenced_by[id(se_c)]) == 2
se_a in st.backward_id_referenced_by[id(se_c)]
se_b in st.backward_id_referenced_by[id(se_c)]
st._merge_into(se_a, se_b)
# CHECK REFERENCE MAP (after merge):
# c is now referenced by b
# (same situation as above)
assert id(se_a) not in st.forward_references
assert len(st.forward_references[id(se_b)]) == 1
se_c in st.forward_references[id(se_b)]
assert len(st.forward_references[id(se_c)]) == 0
assert id(se_a) not in st.backward_references
assert len(st.backward_references[id(se_b)]) == 0
assert len(st.backward_references[id(se_c)]) == 1
se_b in st.backward_references[id(se_c)]
assert id(se_a) not in st.forward_id_references
assert len(st.forward_id_references[id(se_b)]) == 1
se_c in st.forward_id_references[id(se_b)]
assert len(st.forward_id_references[id(se_c)]) == 0
assert id(se_a) not in st.backward_id_references
assert len(st.backward_id_references[id(se_b)]) == 0
assert len(st.backward_id_references[id(se_c)]) == 1
se_b in st.backward_id_references[id(se_c)]
assert id(se_a) not in st.forward_id_referenced_by
assert len(st.forward_id_referenced_by[id(se_b)]) == 1
se_c in st.forward_id_referenced_by[id(se_b)]
assert len(st.forward_id_referenced_by[id(se_c)]) == 0
assert id(se_a) not in st.backward_id_referenced_by
assert len(st.backward_id_referenced_by[id(se_b)]) == 0
assert len(st.backward_id_referenced_by[id(se_c)]) == 1
se_b in st.backward_id_referenced_by[id(se_c)]
def test_backward_id_referenced_by():
# We use the reference as identifying reference in both directions. Thus the map is the same
# for all three categories: references, id_references and id_referenced_by
ident_a = db.RecordType().add_parent("BR").add_property("name")
ident_b = db.RecordType().add_parent("C").add_property("is_referenced_by", ["BR"])
ident_adapter = CaosDBIdentifiableAdapter()
ident_adapter.register_identifiable("BR", ident_a)
ident_adapter.register_identifiable("C", ident_b)
referenced = db.Record(name="B").add_parent("C")
ent_list = [referenced, db.Record(name="A").add_parent("BR").add_property("ref", referenced), ]
st = SyncGraph(ent_list, ident_adapter)
assert st.nodes[1] in st.backward_id_referenced_by[id(st.nodes[0])]
def test_set_id_of_node(simple_adapter):
# setting the id should lead to the node being marked as existing
ent_list = [db.Record(name='a').add_parent("RT5")]
st = SyncGraph(ent_list, simple_adapter)
assert len(st.nodes) == 1
assert len(st.unchecked) == 1
st.set_id_of_node(st.unchecked[0], 101)
assert len(st.nodes) == 1
assert len(st.unchecked) == 0
assert id(st.nodes[0]) in st._existing
# setting the id with None should lead to the node being marked as missing
ent_list = [db.Record().add_parent("RT1").add_property(name="RT2", value=1)]
st = SyncGraph(ent_list, simple_adapter)
assert len(st.nodes) == 1
assert len(st.unchecked) == 1
# is automatically set in during initialization of graph
assert st.nodes[0].identifiable is not None
st.set_id_of_node(st.unchecked[0])
assert len(st.nodes) == 1
assert len(st.unchecked) == 0
assert id(st.nodes[0]) in st._missing
# setting the id to one that already exists should lead to a merge
ent_list = [
db.Record(id=101).add_parent("RT5"),
db.Record(name='a').add_parent("RT5").add_property(name="RT2", value=1)]
st = SyncGraph(ent_list, simple_adapter)
assert len(st.nodes) == 2
assert len(st.unchecked) == 1
st.set_id_of_node(st.unchecked[0], 101)
assert len(st.nodes) == 1
assert len(st.unchecked) == 0
assert st.nodes[0].properties[0].name == "RT2"
# setting the id to None should lead to depending nodes marked as missing
ent_list = [
db.Record().add_parent("RT3").add_property(name="a", value=1).add_property(
name="RT2", value=db.Record().add_parent("RT2")),
]
st = SyncGraph(ent_list, simple_adapter)
assert len(st.nodes) == 2
assert len(st.unchecked) == 2
st.set_id_of_node(st.unchecked[0])
assert len(st.nodes) == 2
assert len(st.unchecked) == 0
assert id(st.nodes[0]) in st._missing
assert id(st.nodes[1]) in st._missing
# same as above but with backref
ent_list = [
db.Record()
.add_parent("RT4")
.add_property(name="RT3",
value=db.Record().add_parent("RT3").add_property(name="a", value=1)),
]
st = SyncGraph(ent_list, simple_adapter)
assert len(st.nodes) == 2
assert len(st.unchecked) == 2
assert st.unchecked[1].identifiable is not None
st.set_id_of_node(st.unchecked[1])
assert len(st.nodes) == 2
assert len(st.unchecked) == 0
assert id(st.nodes[0]) in st._missing
assert id(st.nodes[1]) in st._missing
# setting an id might allow to check another node that depends on the former
ent_list = [
db.Record()
.add_parent("RT4")
.add_property(name="RT3",
value=db.Record().add_parent("RT3").add_property(name="a", value=1)),
]
st = SyncGraph(ent_list, simple_adapter)
assert st.nodes[0].identifiable is None
assert st.nodes[1].identifiable is not None
st.set_id_of_node(st.unchecked[1], 111)
assert st.nodes[0].identifiable is not None
assert st.nodes[1].identifiable is not None
# same as above but going one step further: the new identifiable allows to merge that node
ent_list = [
(db.Record()
.add_parent("RT4")
.add_property(name="RT3",
value=db.Record().add_parent("RT3").add_property(name="a", value=1))),
(db.Record()
.add_parent("RT4")
.add_property(name="RT3", value=111))
]
st = SyncGraph(ent_list, simple_adapter)
assert st.nodes[0].identifiable is None
assert st.nodes[1].identifiable is not None
assert st.nodes[2].identifiable is not None
assert len(st.nodes) == 3
st.set_id_of_node(st.unchecked[2], 111)
assert st.nodes[0].identifiable is not None
assert len(st.nodes) == 2
@patch("caoscrawler.sync_graph.cached_get_entity_by",
new=Mock(side_effect=mock_get_entity_by))
def test_merging(simple_adapter):
# identifying information can be given at various locations in the hierachical tree
# test whether an object is correctly combined for all cases
ident_adapter = CaosDBIdentifiableAdapter()
ident_a = db.RecordType().add_parent("A").add_property("name").add_property("a")
ident_adapter.register_identifiable("A", ident_a)
ident_adapter.retrieve_identified_record_for_identifiable = Mock(
side_effect=partial(
basic_retrieve_by_name_mock_up, known={"A": db.Record(id=1111, name="A")}))
# merging based on id
ent_list = [
db.Record(id=101).add_parent("A"),
db.Record(id=101).add_parent("A")]
st = SyncGraph(ent_list, ident_adapter)
assert len(st.nodes) == 1
assert len(st.unchecked) == 0
assert 101 == st.nodes[0].id
assert "A" == st.nodes[0].parents[0].name
# merging based on path
ent_list = [
db.File(path='101').add_parent("A"),
db.File(path='101').add_parent("A")]
st = SyncGraph(ent_list, ident_adapter)
assert len(st.nodes) == 1
assert len(st.unchecked) == 0
assert '101' == st.nodes[0].path
assert "A" == st.nodes[0].parents[0].name
# merging based on identifiable (non identifying properties are ignored)
ent_list = [
db.File(name='101').add_parent("A").add_property('a', value=1).add_property('b', value=1),
db.File(name='101').add_parent("A").add_property('a', value=1).add_property('b', value=2)]
st = SyncGraph(ent_list, ident_adapter)
assert len(st.nodes) == 1
assert st.nodes[0].id is None
assert '101' == st.nodes[0].name
assert "A" == st.nodes[0].parents[0].name
assert 1 == st.nodes[0].properties[0].value
assert "a" == st.nodes[0].properties[0].name
# Merging a mix. One Record needs the identifiable to be merged. But the identifying
# information is scattered in the other case.
ent_list = [
db.Record(id=101).add_parent("A"),
db.Record(id=101, name='a').add_parent("A"),
db.Record(id=101).add_parent("A").add_property('a', value=1),
db.Record(name='a').add_parent("A").add_property('a', value=1)]
st = SyncGraph(ent_list, ident_adapter)
assert len(st.nodes) == 1
assert len(st.unchecked) == 0
assert 'a' == st.nodes[0].name
assert "A" == st.nodes[0].parents[0].name
assert 1 == st.nodes[0].properties[0].value
assert "a" == st.nodes[0].properties[0].name
assert 101 == st.nodes[0].id
# test that adding an ID can lead to a cascade of merges
# This also tests whether setting something to missing allows to create an identifiable
# and thus allows a merge
subtree = db.Record(name='a').add_parent("A").add_property('a', value=db.Record(
name='b').add_parent("A").add_property('a', value=db.Record(
name='c').add_parent("A").add_property('a', value="missing")))
ent_list = [
db.Record(id=101).add_parent("A"),
db.Record(id=101, name='z').add_parent("A"),
db.Record(id=101).add_parent("A").add_property('a', value=subtree),
db.Record(name='z').add_parent("A").add_property('a', value=subtree),
]
st = SyncGraph(ent_list, ident_adapter)
assert len(st.nodes) == 5
assert len(st.unchecked) == 4
missing_one = [el for el in st.nodes if el.name == 'c'][0]
st.set_id_of_node(missing_one)
# setting c to missing means that b cannot exist which means that a cannot exist, this allows
# to merge the two z nodes
assert len(st.nodes) == 4
assert len(st.unchecked) == 0
def test_update_of_reference_values(simple_adapter):
# multiple nodes are merged including one that is referenced
# assure that this still leads to the value of the property of the referencing node to be
# updated, when the id is set. (Value object is replaced appropriately)
a = db.Record().add_parent("RT3").add_property('a', value=1)
ent_list = [
a,
db.Record().add_parent("RT3").add_property('a', value=1),
db.Record().add_parent("RT3").add_property('a', value=1),
db.Record().add_parent("RT3").add_property('a', value=1),
db.Record().add_parent("RT3").add_property('a', value=1),
db.Record().add_parent("RT4").add_property('RT3', value=a),
db.Record().add_parent("RT3").add_property('a', value=1),
db.Record().add_parent("RT3").add_property('a', value=1)]
st = SyncGraph(ent_list, simple_adapter)
assert len(st.nodes) == 2
assert len(st.unchecked) == 2
assert 'RT4' == st.nodes[1].parents[0].name
st.set_id_of_node(st.nodes[0], 101)
b_prop = st.nodes[1].properties[0].value
assert b_prop.id == 101
def test_ignoring_irrelevant_references(simple_adapter):
# make sure that a circle of references is no problem if one references is not identifying
b = db.Record(name='b').add_parent("RT5")
a = db.Record().add_parent("RT3").add_property('a', value=b)
b.add_property('a', value=a)
ent_list = [a, b]
st = SyncGraph(ent_list, simple_adapter)
assert len(st.nodes) == 2
assert len(st.unchecked) == 2
assert st.nodes[1].name == 'b'
# a relies on b
assert st._identity_relies_on_unchecked_entity(st.nodes[0])
# b relies on nothing
assert not st._identity_relies_on_unchecked_entity(st.nodes[1])
# set ID of b
st.set_id_of_node(st.nodes[1], 101)
assert len(st.unchecked) == 1
# now a nolonger relies on unchecked
assert not st._identity_relies_on_unchecked_entity(st.nodes[0])