Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_tool.py 28.22 KiB
#!/bin/python
# Tests for the tool using pytest
# Adapted from check-sfs
# A. Schlemmer, 06/2021
from caoscrawler.crawl import Crawler, SecurityMode
from caoscrawler.structure_elements import File, DictTextElement, DictListElement
from caoscrawler.identifiable_adapters import IdentifiableAdapter, LocalStorageIdentifiableAdapter
from simulated_server_data import full_data
from functools import partial
from copy import deepcopy
from unittest.mock import patch
import caosdb.common.models as dbmodels
from unittest.mock import MagicMock, Mock
from os.path import join, dirname, basename
import yaml
import caosdb as db
from caosdb.apiutils import compare_entities
import pytest
from pytest import raises
def rfp(*pathcomponents):
"""
Return full path.
Shorthand convenience function.
"""
return join(dirname(__file__), *pathcomponents)
ident = LocalStorageIdentifiableAdapter()
ident.restore_state(rfp("records.xml"))
full_data.update({el.name: el for el in ident._records if el.name is not None})
full_data.update({el.id: el for el in ident._records if el.name is None})
def dircheckstr(*pathcomponents):
"""
Return the debug tree identifier for a given path.
"""
return ("caoscrawler.structure_elements.Directory: " + basename(
join(*pathcomponents)) + ", " + rfp(
"test_directories", "examples_article", *pathcomponents))
@pytest.fixture
def crawler():
crawler = Crawler(debug=True)
crawler.crawl_directory(rfp("test_directories", "examples_article"),
rfp("scifolder_cfood.yml"))
return crawler
@pytest.fixture
def ident(crawler):
ident = LocalStorageIdentifiableAdapter()
crawler.identifiableAdapter = ident
# The records.xml file is constructed as follows:
# To a full run of the crawler, resolve all identifiables and insert all resulting entities.
# See: test-setup/datamodel/generate_test_data.py for details.
ident.restore_state(rfp("records.xml"))
ident.register_identifiable(
"Person", db.RecordType()
.add_parent(name="Person")
.add_property(name="first_name")
.add_property(name="last_name"))
ident.register_identifiable(
"Measurement", db.RecordType()
.add_parent(name="Measurement")
.add_property(name="identifier")
.add_property(name="date")
.add_property(name="project"))
ident.register_identifiable(
"Project", db.RecordType()
.add_parent(name="Project")
.add_property(name="date")
.add_property(name="identifier"))
return ident
def test_record_structure_generation(crawler):
subd = crawler.debug_tree[dircheckstr("DataAnalysis")]
subc = crawler.debug_metadata["copied"][dircheckstr("DataAnalysis")]
assert len(subd) == 2
# variables store on Data Analysis node of debug tree
assert len(subd[0]) == 2
# record store on Data Analysis node of debug tree
assert len(subd[1]) == 0
assert len(subc) == 2
assert len(subc[0]) == 2
assert len(subc[1]) == 0
# The data analysis node creates one variable for the node itself:
assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis"
assert subc[0]["DataAnalysis"] is False
subd = crawler.debug_tree[dircheckstr(
"DataAnalysis", "2020_climate-model-predict")]
subc = crawler.debug_metadata["copied"][dircheckstr(
"DataAnalysis", "2020_climate-model-predict")]
assert len(subd[1]) == 1
assert len(subd[1]["Project"].get_parents()) == 1
assert subd[1]["Project"].get_parents()[0].name == "Project"
assert subd[1]["Project"].get_property("date").value == "2020"
assert subd[1]["Project"].get_property(
"identifier").value == "climate-model-predict"
assert len(subd[0]) == 6
assert subd[0]["date"] == "2020"
assert subd[0]["identifier"] == "climate-model-predict"
assert subd[0]["Project"].__class__ == db.Record
assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis"
assert subc[0]["DataAnalysis"] is True
assert subd[0]["project_dir"] == "examples_article/DataAnalysis/2020_climate-model-predict"
assert subc[0]["project_dir"] is False
# Check the copy flags for the first level in the hierarchy:
assert len(subc[0]) == 6
assert len(subc[1]) == 1
assert subc[1]["Project"] is False
assert subc[0]["Project"] is False
assert subc[0]["date"] is False
assert subc[0]["identifier"] is False
subd = crawler.debug_tree[dircheckstr("DataAnalysis",
"2020_climate-model-predict",
"2020-02-08_prediction-errors")]
subc = crawler.debug_metadata["copied"][dircheckstr("DataAnalysis",
"2020_climate-model-predict",
"2020-02-08_prediction-errors")]
assert len(subd[0]) == 8
assert subd[0]["date"] == "2020-02-08"
assert subd[0]["identifier"] == "prediction-errors"
assert subd[0]["Project"].__class__ == db.Record
assert subd[0]["Measurement"].__class__ == db.Record
assert len(subd[1]) == 2
assert len(subd[1]["Project"].get_parents()) == 1
assert subd[1]["Project"].get_parents()[0].name == "Project"
assert subd[1]["Project"].get_property("date").value == "2020"
assert subd[1]["Project"].get_property(
"identifier").value == "climate-model-predict"
assert len(subd[1]["Measurement"].get_parents()) == 1
assert subd[1]["Measurement"].get_parents()[0].name == "Measurement"
assert subd[1]["Measurement"].get_property("date").value == "2020-02-08"
assert subd[1]["Measurement"].get_property(
"identifier").value == "prediction-errors"
assert subd[1]["Measurement"].get_property("project").value != "$Project"
assert subd[1]["Measurement"].get_property(
"project").value.__class__ == db.Record
assert subd[1]["Measurement"].get_property(
"project").value == subd[0]["Project"]
# Check the copy flags for the second level in the hierarchy:
assert subc[1]["Project"] is True
assert subc[0]["Project"] is True
assert subc[1]["Measurement"] is False
assert subc[0]["Measurement"] is False
assert subc[0]["date"] is False
assert subc[0]["identifier"] is False
# def prepare_test_record_file():
# ident = LocalStorageIdentifiableAdapter()
# crawler = Crawler(debug=True, identifiableAdapter=ident)
# crawler.crawl_directory(rfp("test_directories", "examples_article"),
# rfp("scifolder_cfood.yml"))
# # clean record list:
# recordlist = ident.get_records()
# for i in range(len(recordlist)-1, 1, -1):
# if recordlist[i].parents[0].name == "Person":
# del recordlist[i]
# ident.store_state(rfp("records.xml"))
def test_ambigious_records(crawler, ident):
ident.get_records().clear()
ident.get_records().extend(crawler.target_data)
r = ident.get_records()
id_r0 = ident.get_identifiable(r[0])
with raises(RuntimeError, match=".*unambigiously.*"):
ident.retrieve_identified_record_for_identifiable(id_r0)
def test_crawler_update_list(crawler, ident):
# If the following assertions fail, that is a hint, that the test file records.xml has changed
# and this needs to be updated:
assert len(ident.get_records()) == 18
assert len(
[r for r in ident.get_records() if r.parents[0].name == "Person"]
) == 5
assert len(
[r for r in ident.get_records() if r.parents[0].name == "Measurement"]
) == 11
assert len(
[r for r in ident.get_records() if r.parents[0].name == "Project"]
) == 2
# The crawler contains lots of duplicates, because identifiables have not been resolved yet:
assert len(ident.get_records()) != len(crawler.target_data)
# Check consistency:
# Check whether identifiables retrieved from current identifiable store return
# the same results.
# take the first person in the list of records:
for r in ident.get_records():
if r.parents[0].name == "Person":
r_cur = r
break
id_r0 = ident.get_identifiable(r_cur)
assert r_cur.parents[0].name == id_r0.parents[0].name
assert r_cur.get_property(
"first_name").value == id_r0.get_property("first_name").value
assert r_cur.get_property(
"last_name").value == id_r0.get_property("last_name").value
assert len(r_cur.parents) == 1
assert len(id_r0.parents) == 1
assert len(r_cur.properties) == 2
assert len(id_r0.properties) == 2
idr_r0_test = ident.retrieve_identified_record_for_identifiable(id_r0)
idr_r0 = ident.retrieve_identified_record_for_record(r_cur)
assert idr_r0 == idr_r0_test
# take the first measurement in the list of records:
for r in ident.get_records():
if r.parents[0].name == "Measurement":
r_cur = r
break
id_r1 = ident.get_identifiable(r_cur)
assert r_cur.parents[0].name == id_r1.parents[0].name
assert r_cur.get_property(
"identifier").value == id_r1.get_property("identifier").value
assert r_cur.get_property("date").value == id_r1.get_property("date").value
assert r_cur.get_property(
"project").value == id_r1.get_property("project").value
assert len(r_cur.parents) == 1
assert len(id_r1.parents) == 1
assert len(r_cur.properties) == 4
assert len(id_r1.properties) == 3
idr_r1_test = ident.retrieve_identified_record_for_identifiable(id_r1)
idr_r1 = ident.retrieve_identified_record_for_record(r_cur)
assert idr_r1 == idr_r1_test
assert idr_r1 != idr_r0
assert idr_r1_test != idr_r0_test
assert len(idr_r1.properties) == 4
assert r_cur.get_property(
"responsible").value == idr_r1.get_property("responsible").value
assert r_cur.description == idr_r1.description
# test whether compare_entites function works in this context:
comp = compare_entities(r_cur, id_r1)
assert len(comp[0]["parents"]) == 0
assert len(comp[1]["parents"]) == 0
assert len(comp[0]["properties"]) == 1
assert len(comp[1]["properties"]) == 0
assert "responsible" in comp[0]["properties"]
assert "description" in comp[0]
comp = compare_entities(r_cur, idr_r1)
assert len(comp[0]["parents"]) == 0
assert len(comp[1]["parents"]) == 0
assert len(comp[0]["properties"]) == 0
assert len(comp[1]["properties"]) == 0
def test_synchronization(crawler, ident):
insl, updl = crawler.synchronize(commit_changes=False)
assert len(insl) == 0
assert len(updl) == 0
def test_identifiable_adapter():
query = IdentifiableAdapter.create_query_for_identifiable(
db.Record().add_parent("Person")
.add_property("first_name", value="A")
.add_property("last_name", value="B"))
assert query.lower() == "find record person with 'first_name'='a' and 'last_name'='b' "
def test_remove_unnecessary_updates():
# test trvial case
upl = [db.Record().add_parent("A")]
irs = [db.Record().add_parent("A")]
Crawler.remove_unnecessary_updates(upl, irs)
assert len(upl) == 0
# test property difference case
# TODO this should work right?
# upl = [db.Record().add_parent("A").add_property("a", 3)]
# irs = [db.Record().add_parent("A")] # ID should be s
# Crawler.remove_unnecessary_updates(upl, irs)
# assert len(upl) == 1
# test value difference case
upl = [db.Record().add_parent("A").add_property("a", 5)]
irs = [db.Record().add_parent("A").add_property("a")]
Crawler.remove_unnecessary_updates(upl, irs)
assert len(upl) == 1
upl = [db.Record().add_parent("A").add_property("a", 5)]
irs = [db.Record().add_parent("A").add_property("a", 5)]
Crawler.remove_unnecessary_updates(upl, irs)
assert len(upl) == 0
# test unit difference case
upl = [db.Record().add_parent("A").add_property("a", unit='cm')]
irs = [db.Record().add_parent("A").add_property("a")]
Crawler.remove_unnecessary_updates(upl, irs)
assert len(upl) == 1
# test None difference case
upl = [db.Record().add_parent("A").add_property("a")]
irs = [db.Record().add_parent("A").add_property("a", 5)]
Crawler.remove_unnecessary_updates(upl, irs)
assert len(upl) == 1
# Current status:
# TODO: currently, this test fails, because non identifiable records cannot
# be inserted into the cache. Solution might be, just not to add them
# into the local cache. Probably in split_into_inserts_and_updates.
@pytest.mark.xfail
def test_identifiable_adapter_no_identifiable(crawler, ident):
del ident._registered_identifiables["Person"]
insl, updl = crawler.synchronize()
assert len(updl) == 0
pers = [r for r in crawler.target_data if r.parents[0].name == "Person"]
# All persons are inserted, because they are not identifiable:
assert len(insl) == len(pers)
def test_provenance_debug_data(crawler):
crawler.save_debug_data(rfp("provenance.yml"))
with open(rfp("provenance.yml"), "r") as f:
provenance = yaml.load(f, Loader=yaml.SafeLoader)
pr = provenance["provenance"]
def check_key_count(prefix):
return sum([1 for key in pr.keys() if key.startswith(prefix)])
assert check_key_count("Measurement") == 11
assert check_key_count("Project") == 5
assert check_key_count("Person") == 14
def basic_retrieve_by_name_mock_up(rec, known):
""" returns a stored Record if rec.name is an existing key, None otherwise """
if rec.name in known:
return known[rec.name]
else:
return None
@pytest.fixture
def crawler_mocked_identifiable_retrieve(crawler):
# mock retrieval of registered identifiabls: return Record with just a parent
crawler.identifiableAdapter.get_registered_identifiable = Mock(
side_effect=lambda x: db.Record().add_parent(x.parents[0].name))
# Simulate remote server content by using the names to identify records
# There is only a single known Record with name A
crawler.identifiableAdapter.retrieve_identified_record_for_record = Mock(side_effect=partial(
basic_retrieve_by_name_mock_up, known={"A": db.Record(id=1111, name="A")}))
return crawler
def test_split_into_inserts_and_updates_trivial(crawler):
# Try trivial argument
crawler.split_into_inserts_and_updates([])
def test_split_into_inserts_and_updates_single(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
entlist = [db.Record(name="A").add_parent(
"C"), db.Record(name="B").add_parent("C")]
assert crawler.get_from_any_cache(entlist[0]) is None
assert crawler.get_from_any_cache(entlist[1]) is None
assert not crawler.has_reference_value_without_id(entlist[0])
assert not crawler.has_reference_value_without_id(entlist[1])
assert crawler.identifiableAdapter.retrieve_identified_record_for_record(
entlist[0]).id == 1111
assert crawler.identifiableAdapter.retrieve_identified_record_for_record(
entlist[1]) is None
insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist))
assert len(insert) == 1
assert insert[0].name == "B"
assert len(update) == 1
assert update[0].name == "A"
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called()
def test_split_into_inserts_and_updates_with_duplicate(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
a = db.Record(name="A").add_parent("C")
b = db.Record(name="B").add_parent("C")
b.add_property("A", a)
# This is identical to a and should be removed
c = db.Record(name="A").add_parent("C")
entlist = [a, b, c]
insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist))
assert len(insert) == 1
assert insert[0].name == "B"
assert len(update) == 1
assert update[0].name == "A"
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called()
def test_split_into_inserts_and_updates_with_ref(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
# try it with a reference
a = db.Record(name="A").add_parent("C")
b = db.Record(name="B").add_parent("C")
b.add_property("A", a)
entlist = [a, b]
insert, update = crawler.split_into_inserts_and_updates(entlist)
assert len(insert) == 1
assert insert[0].name == "B"
assert len(update) == 1
assert update[0].name == "A"
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called()
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
def test_split_into_inserts_and_updates_with_circ(crawler):
# try circular
a = db.Record(name="A").add_parent("C")
b = db.Record(name="B").add_parent("C")
b.add_property("A", a)
a.add_property("B", b)
entlist = [a, b]
# TODO this does not seem to be complete!
def test_split_into_inserts_and_updates_with_complex(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
# A
# ^
# |
# F <- B <- G
a = db.Record(name="A").add_parent("C").add_property(
'd', 13).add_property('e', "lskdjlsfdj")
b = db.Record(name="B").add_parent("C")
g = db.Record(name="G").add_parent("C")
f = db.Record(name="F").add_parent("C")
g.add_property("A", a)
b.add_property("A", f)
b.add_property("A", a)
entlist = [a, b, g]
insert, update = crawler.split_into_inserts_and_updates(entlist)
assert len(insert) == 3
assert "B" in [el.name for el in insert]
assert len(update) == 1
assert update[0].name == "A"
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called()
# TODO write test where the unresoled entity is not part of the identifiable
def test_split_into_inserts_and_updates_with_copy_attr(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
# assume identifiable is only the name
a = db.Record(name="A").add_parent("C")
a.add_property("foo", 1)
b = db.Record(name="A").add_parent("C")
b.add_property("bar", 2)
entlist = [a, b]
insert, update = crawler.split_into_inserts_and_updates(entlist)
assert update[0].get_property("bar").value == 2
assert update[0].get_property("foo").value == 1
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_record.assert_called()
def test_has_missing_object_in_references(crawler):
# Simulate remote server content by using the names to identify records
# There are only two known Records with name A and B
crawler.identifiableAdapter.get_registered_identifiable = Mock(side_effect=partial(
basic_retrieve_by_name_mock_up, known={"C": db.Record(name="C").add_parent("RTC")
.add_property("d"),
"D": db.Record(name="D").add_parent("RTD")
.add_property("d").add_property("e"),
}))
# one reference with id -> check
assert not crawler.has_missing_object_in_references(db.Record(name="C")
.add_parent("RTC").add_property('d', 123))
# one ref with Entity with id -> check
assert not crawler.has_missing_object_in_references(db.Record(name="C")
.add_parent("RTC")
.add_property('d', db.Record(id=123)
.add_parent("C")))
# one ref with id one with Entity with id (mixed) -> check
assert not crawler.has_missing_object_in_references(db.Record(name="C").add_parent("RTD")
.add_property('d', 123)
.add_property('b', db.Record(id=123)
.add_parent("RTC")))
# entity to be referenced in the following
a = db.Record(name="C").add_parent("C").add_property("d", 12311)
# one ref with id one with Entity without id (but not identifying) -> fail
assert not crawler.has_missing_object_in_references(db.Record(name="C").add_parent("RTC")
.add_property('d', 123)
.add_property('e', a))
# one ref with id one with Entity without id (mixed) -> fail
assert not crawler.has_missing_object_in_references(db.Record(name="D").add_parent("RTD")
.add_property('d', 123)
.add_property('e', a))
crawler.add_to_remote_missing_cache(a)
# one ref with id one with Entity without id but in cache -> check
assert crawler.has_missing_object_in_references(db.Record(name="D").add_parent("RTD")
.add_property('d', 123)
.add_property('e', a))
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
def test_references_entities_without_ids(crawler, ident):
assert not crawler.has_reference_value_without_id(db.Record().add_parent("Person")
.add_property('last_name', 123)
.add_property('first_name', 123))
# id and rec with id
assert not crawler.has_reference_value_without_id(db.Record().add_parent("Person")
.add_property('first_name', 123)
.add_property('last_name', db.Record(id=123)))
# id and rec with id and one unneeded prop
assert crawler.has_reference_value_without_id(db.Record().add_parent("Person")
.add_property('first_name', 123)
.add_property('stuff', db.Record())
.add_property('last_name', db.Record(id=123)))
# one identifying prop is missing
assert crawler.has_reference_value_without_id(db.Record().add_parent("Person")
.add_property('first_name', 123)
.add_property('last_name', db.Record()))
def test_replace_entities_with_ids(crawler):
a = (db.Record().add_parent("B").add_property("A", 12345)
.add_property("B", db.Record(id=12345))
.add_property("C", [db.Record(id=12345), 233324]))
crawler.replace_entities_with_ids(a)
assert a.get_property("A").value == 12345
assert a.get_property("B").value == 12345
assert a.get_property("C").value == [12345, 233324]
def mock_get_entity_by_id(id):
candidates = [el for el in list(full_data.values()) if el.id == id]
if len(candidates) > 0:
return candidates[0]
else:
raise ValueError()
def mock_get_entity_by_name(name):
candidates = [el for el in full_data.values()
if (el.name is not None and el.name.lower() == name.lower())]
if len(candidates) > 0:
return candidates[0]
else:
raise ValueError()
def prepare_crawler_with_sec_mode(mode, ident):
crawler = Crawler(debug=True, securityMode=mode)
crawler.crawl_directory(rfp("test_directories", "examples_article"),
rfp("scifolder_cfood.yml"))
crawler.identifiableAdapter = ident
return crawler
def reset_mocks(mocks):
for mock in mocks:
mock.reset_mock()
def change_identifiable_prop(ident):
# the checks in here are only to make sure we change the record as we intend to
meas = ident._records[-2]
assert meas.parents[0].name == "Measurement"
resps = meas.properties[0]
assert resps.name == "date"
# change one element; This changes the date which is part of the identifiable
resps.value = "2022-01-04"
def change_non_identifiable_prop(ident):
# the checks in here are only to make sure we change the record as we intend to
meas = ident._records[-1]
assert meas.parents[0].name == "Measurement"
resps = meas.properties[-1]
assert resps.name == "responsible"
assert len(resps.value) == 2
# change one element; This removes a responsible which is not part of the identifiable
del resps.value[-1]
@patch("caoscrawler.crawl.Crawler._get_entity_by_id",
new=Mock(side_effect=mock_get_entity_by_id))
@patch("caoscrawler.crawl.Crawler._get_entity_by_name",
new=Mock(side_effect=mock_get_entity_by_name))
@patch("caoscrawler.crawl.db.Container.insert")
@patch("caoscrawler.crawl.db.Container.update")
@patch("caoscrawler.crawl.UpdateCache.insert")
def test_security_mode(updateCacheMock, upmock, insmock, ident):
records_backup = deepcopy(ident._records)
# trivial case: nothing to do
crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident)
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_not_called()
upmock.assert_not_called()
updateCacheMock.assert_not_called()
# RETRIEVE: insert only
crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident)
# remove one element
del ident._records[-1]
# insert forbidden
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_not_called()
upmock.assert_not_called()
assert updateCacheMock.call_count == 1
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
# RETRIEVE: update only
crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident)
# change one element
change_non_identifiable_prop(ident)
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_not_called()
upmock.assert_not_called()
assert updateCacheMock.call_count == 1
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
# INSERT: insert only
crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident)
# remove one element
del ident._records[-1]
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_called_once()
upmock.assert_not_called()
updateCacheMock.assert_not_called()
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
# INSERT: update only
crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident)
# change one element
change_non_identifiable_prop(ident)
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_not_called()
upmock.assert_not_called()
updateCacheMock.assert_called_once()
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
# INSERT: insert and update
crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident)
# change two elements
change_non_identifiable_prop(ident)
change_identifiable_prop(ident)
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.asser_called_once()
upmock.assert_not_called()
updateCacheMock.assert_called_once()
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
def test_create_flat_list():
a = db.Record()
a.add_property(name="a", value=a)
Crawler.create_flat_list([a], [])