Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_tool.py 38.80 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2021 Alexander Schlemmer
# Copyright (C) 2023 Henrik tom Wörden <h.tomwoerden@indiscale.com>
# Copyright (C) 2023 IndiScale GmbH <info@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""
Tests for the tool using pytest
Adapted from check-sfs
"""
import logging
from caoscrawler.stores import GeneralStore, RecordStore
import os
from caoscrawler.crawl import Crawler, SecurityMode, split_restricted_path
from caoscrawler.identifiable import Identifiable
from caoscrawler.structure_elements import File, DictTextElement, DictListElement, DictElement
from caoscrawler.identifiable_adapters import IdentifiableAdapter, LocalStorageIdentifiableAdapter
from simulated_server_data import full_data
from functools import partial
from copy import deepcopy
from unittest.mock import patch
from caoscrawler.crawl import crawler_main
import caosdb.common.models as dbmodels
from unittest.mock import MagicMock, Mock
from os.path import join, dirname, basename
import yaml
import caosdb as db
from caosdb.apiutils import compare_entities
import pytest
from pytest import raises
def rfp(*pathcomponents):
"""
Return full path.
Shorthand convenience function.
"""
return join(dirname(__file__), *pathcomponents)
ident = LocalStorageIdentifiableAdapter()
ident.restore_state(rfp("records.xml"))
full_data.update({el.name: el for el in ident._records if el.name is not None})
full_data.update({el.id: el for el in ident._records if el.name is None})
def dircheckstr(*pathcomponents):
"""
Return the debug tree identifier for a given path.
"""
return ("caoscrawler.structure_elements.Directory: " + basename(
join(*pathcomponents)) + ", " + rfp(
"test_directories", "examples_article", *pathcomponents))
@pytest.fixture
def crawler():
crawler = Crawler(debug=True)
crawler.crawl_directory(rfp("test_directories", "examples_article"),
rfp("scifolder_cfood.yml"))
return crawler
@pytest.fixture
def ident(crawler):
ident = LocalStorageIdentifiableAdapter()
crawler.identifiableAdapter = ident
# The records.xml file is constructed as follows:
# To a full run of the crawler, resolve all identifiables and insert all resulting entities.
# See: test-setup/datamodel/generate_test_data.py for details.
ident.restore_state(rfp("records.xml"))
ident.register_identifiable(
"Person", db.RecordType()
.add_parent(name="Person")
.add_property(name="first_name")
.add_property(name="last_name"))
ident.register_identifiable(
"Measurement", db.RecordType()
.add_parent(name="Measurement")
.add_property(name="identifier")
.add_property(name="date")
.add_property(name="project"))
ident.register_identifiable(
"Project", db.RecordType()
.add_parent(name="Project")
.add_property(name="date")
.add_property(name="identifier"))
return ident
def test_record_structure_generation(crawler):
# TODO How does this test relate to the test function in test_scalars_cfood with the same name?
# There seems to be code duplication
subd = crawler.debug_tree[dircheckstr("DataAnalysis")]
subc = crawler.debug_metadata["copied"][dircheckstr("DataAnalysis")]
assert len(subd) == 2
# variables store on Data Analysis node of debug tree
assert len(subd[0]) == 4
# record store on Data Analysis node of debug tree
assert len(subd[1]) == 0
assert len(subc) == 2
assert len(subc[0]) == 4
assert len(subc[1]) == 0
# The data analysis node creates one variable for the node itself:
assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis"
assert subc[0]["DataAnalysis"] is False
subd = crawler.debug_tree[dircheckstr(
"DataAnalysis", "2020_climate-model-predict")]
subc = crawler.debug_metadata["copied"][dircheckstr(
"DataAnalysis", "2020_climate-model-predict")]
assert len(subd[1]) == 1
assert len(subd[1]["Project"].get_parents()) == 1
assert subd[1]["Project"].get_parents()[0].name == "Project"
assert subd[1]["Project"].get_property("date").value == "2020"
assert subd[1]["Project"].get_property(
"identifier").value == "climate-model-predict"
assert len(subd[0]) == 9
assert subd[0]["date"] == "2020"
assert subd[0]["identifier"] == "climate-model-predict"
assert subd[0]["Project"].__class__ == db.Record
assert subd[0]["DataAnalysis"] == "examples_article/DataAnalysis"
assert subc[0]["DataAnalysis"] is True
assert subd[0]["project_dir"] == "examples_article/DataAnalysis/2020_climate-model-predict"
assert subc[0]["project_dir"] is False
# Check the copy flags for the first level in the hierarchy:
assert len(subc[0]) == 9
assert len(subc[1]) == 1
assert subc[1]["Project"] is False
assert subc[0]["Project"] is False
assert subc[0]["date"] is False
assert subc[0]["identifier"] is False
subd = crawler.debug_tree[dircheckstr("DataAnalysis",
"2020_climate-model-predict",
"2020-02-08_prediction-errors")]
subc = crawler.debug_metadata["copied"][dircheckstr("DataAnalysis",
"2020_climate-model-predict",
"2020-02-08_prediction-errors")]
assert len(subd[0]) == 12
assert subd[0]["date"] == "2020-02-08"
assert subd[0]["identifier"] == "prediction-errors"
assert subd[0]["Project"].__class__ == db.Record
assert subd[0]["Measurement"].__class__ == db.Record
assert len(subd[1]) == 2
assert len(subd[1]["Project"].get_parents()) == 1
assert subd[1]["Project"].get_parents()[0].name == "Project"
assert subd[1]["Project"].get_property("date").value == "2020"
assert subd[1]["Project"].get_property(
"identifier").value == "climate-model-predict"
assert len(subd[1]["Measurement"].get_parents()) == 1
assert subd[1]["Measurement"].get_parents()[0].name == "Measurement"
assert subd[1]["Measurement"].get_property("date").value == "2020-02-08"
assert subd[1]["Measurement"].get_property(
"identifier").value == "prediction-errors"
assert subd[1]["Measurement"].get_property("project").value != "$Project"
assert subd[1]["Measurement"].get_property(
"project").value.__class__ == db.Record
assert subd[1]["Measurement"].get_property(
"project").value == subd[0]["Project"]
# Check the copy flags for the second level in the hierarchy:
assert subc[1]["Project"] is True
assert subc[0]["Project"] is True
assert subc[1]["Measurement"] is False
assert subc[0]["Measurement"] is False
assert subc[0]["date"] is False
assert subc[0]["identifier"] is False
# def prepare_test_record_file():
# ident = LocalStorageIdentifiableAdapter()
# crawler = Crawler(debug=True, identifiableAdapter=ident)
# crawler.crawl_directory(rfp("test_directories", "examples_article"),
# rfp("scifolder_cfood.yml"))
# # clean record list:
# recordlist = ident.get_records()
# for i in range(len(recordlist)-1, 1, -1):
# if recordlist[i].parents[0].name == "Person":
# del recordlist[i]
# ident.store_state(rfp("records.xml"))
def test_crawler_update_list(crawler, ident):
# If the following assertions fail, that is a hint, that the test file records.xml has changed
# and this needs to be updated:
assert len(ident.get_records()) == 18
assert len(
[r for r in ident.get_records() if r.parents[0].name == "Person"]
) == 5
assert len(
[r for r in ident.get_records() if r.parents[0].name == "Measurement"]
) == 11
assert len(
[r for r in ident.get_records() if r.parents[0].name == "Project"]
) == 2
# The crawler contains lots of duplicates, because identifiables have not been resolved yet:
assert len(ident.get_records()) != len(crawler.crawled_data)
# Check consistency:
# Check whether identifiables retrieved from current identifiable store return
# the same results.
# take the first person in the list of records:
for r in ident.get_records():
if r.parents[0].name == "Person":
r_cur = r
break
id_r0 = ident.get_identifiable(r_cur)
assert r_cur.parents[0].name == id_r0.record_type
assert r_cur.get_property(
"first_name").value == id_r0.properties["first_name"]
assert r_cur.get_property(
"last_name").value == id_r0.properties["last_name"]
assert len(r_cur.parents) == 1
assert len(r_cur.properties) == 2
assert len(id_r0.properties) == 2
idr_r0_test = ident.retrieve_identified_record_for_identifiable(id_r0)
idr_r0 = ident.retrieve_identified_record_for_record(r_cur)
assert idr_r0 == idr_r0_test
# take the first measurement in the list of records:
for r in ident.get_records():
if r.parents[0].name == "Measurement":
r_cur = r
break
id_r1 = ident.get_identifiable(r_cur)
assert r_cur.parents[0].name == id_r1.record_type
assert r_cur.get_property(
"identifier").value == id_r1.properties["identifier"]
assert r_cur.get_property("date").value == id_r1.properties["date"]
assert r_cur.get_property(
"project").value == id_r1.properties["project"]
assert len(r_cur.parents) == 1
assert len(r_cur.properties) == 4
assert len(id_r1.properties) == 3
idr_r1_test = ident.retrieve_identified_record_for_identifiable(id_r1)
idr_r1 = ident.retrieve_identified_record_for_record(r_cur)
assert idr_r1 == idr_r1_test
assert idr_r1 != idr_r0
assert idr_r1_test != idr_r0_test
assert len(idr_r1.properties) == 4
assert r_cur.get_property(
"responsible").value == idr_r1.get_property("responsible").value
assert r_cur.description == idr_r1.description
def test_synchronization(crawler, ident):
insl, updl = crawler.synchronize(commit_changes=False)
assert len(insl) == 0
assert len(updl) == 0
def test_remove_unnecessary_updates():
# test trvial case
upl = [db.Record().add_parent("A")]
irs = [db.Record().add_parent("A")]
updates = Crawler.remove_unnecessary_updates(upl, irs)
assert len(updates) == 0
# test property difference case
# TODO this should work right?
# upl = [db.Record().add_parent("A").add_property("a", 3)]
# irs = [db.Record().add_parent("A")] # ID should be s
# Crawler.remove_unnecessary_updates(upl, irs)
# assert len(upl) == 1
# test value difference case
upl = [db.Record().add_parent("A").add_property("a", 5)]
irs = [db.Record().add_parent("A").add_property("a")]
updates = Crawler.remove_unnecessary_updates(upl, irs)
assert len(updates) == 1
upl = [db.Record().add_parent("A").add_property("a", 5)]
irs = [db.Record().add_parent("A").add_property("a", 5)]
updates = Crawler.remove_unnecessary_updates(upl, irs)
assert len(updates) == 0
# test unit difference case
upl = [db.Record().add_parent("A").add_property("a", unit='cm')]
irs = [db.Record().add_parent("A").add_property("a")]
updates = Crawler.remove_unnecessary_updates(upl, irs)
assert len(updates) == 1
# test None difference case
upl = [db.Record().add_parent("A").add_property("a")]
irs = [db.Record().add_parent("A").add_property("a", 5)]
updates = Crawler.remove_unnecessary_updates(upl, irs)
assert len(updates) == 1
# Current status:
# TODO: currently, this test fails, because non identifiable records cannot
# be inserted into the cache. Solution might be, just not to add them
# into the local cache. Probably in split_into_inserts_and_updates.
@pytest.mark.xfail
def test_identifiable_adapter_no_identifiable(crawler, ident):
del ident._registered_identifiables["Person"]
insl, updl = crawler.synchronize()
assert len(updl) == 0
pers = [r for r in crawler.crawled_data if r.parents[0].name == "Person"]
# All persons are inserted, because they are not identifiable:
assert len(insl) == len(pers)
def test_provenance_debug_data(crawler):
crawler.save_debug_data(rfp("provenance.yml"))
with open(rfp("provenance.yml"), "r") as f:
provenance = yaml.load(f, Loader=yaml.SafeLoader)
pr = provenance["provenance"]
def check_key_count(prefix):
return sum([1 for key in pr.keys() if key.startswith(prefix)])
assert check_key_count("Measurement") == 11
assert check_key_count("Project") == 5
assert check_key_count("Person") == 14
def test_split_into_inserts_and_updates_trivial(crawler):
crawler.split_into_inserts_and_updates([])
def basic_retrieve_by_name_mock_up(rec, referencing_entities=None, known=None):
""" returns a stored Record if rec.name is an existing key, None otherwise """
if rec.name in known:
return known[rec.name]
else:
return None
@pytest.fixture
def crawler_mocked_identifiable_retrieve(crawler):
# mock retrieval of registered identifiabls: return Record with just a parent
crawler.identifiableAdapter.get_registered_identifiable = Mock(
side_effect=lambda x: db.Record().add_parent(x.parents[0].name))
# Simulate remote server content by using the names to identify records
# There is only a single known Record with name A
crawler.identifiableAdapter.retrieve_identified_record_for_record = Mock(side_effect=partial(
basic_retrieve_by_name_mock_up, known={"A": db.Record(id=1111, name="A")}))
crawler.identifiableAdapter.retrieve_identified_record_for_identifiable = Mock(
side_effect=partial(
basic_retrieve_by_name_mock_up, known={"A": db.Record(id=1111, name="A")}))
return crawler
def test_split_into_inserts_and_updates_single(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
identlist = [Identifiable(name="A", record_type="C"), Identifiable(name="B", record_type="C")]
entlist = [db.Record(name="A").add_parent(
"C"), db.Record(name="B").add_parent("C")]
assert crawler.get_from_any_cache(identlist[0]) is None
assert crawler.get_from_any_cache(identlist[1]) is None
assert not crawler._has_reference_value_without_id(identlist[0])
assert not crawler._has_reference_value_without_id(identlist[1])
assert crawler.identifiableAdapter.retrieve_identified_record_for_record(
identlist[0]).id == 1111
assert crawler.identifiableAdapter.retrieve_identified_record_for_record(
identlist[1]) is None
insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist))
assert len(insert) == 1
assert insert[0].name == "B"
assert len(update) == 1
assert update[0].name == "A"
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_identifiable.assert_called()
def test_split_into_inserts_and_updates_with_duplicate(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
a = db.Record(name="A").add_parent("C")
b = db.Record(name="B").add_parent("C")
b.add_property("A", a)
# This is identical to a and should be removed
c = db.Record(name="A").add_parent("C")
entlist = [a, b, c]
insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist))
assert len(insert) == 1
assert insert[0].name == "B"
assert len(update) == 1
assert update[0].name == "A"
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_identifiable.assert_called()
def test_split_into_inserts_and_updates_with_ref(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
# try it with a reference
a = db.Record(name="A").add_parent("C")
b = db.Record(name="B").add_parent("C")
b.add_property("A", a)
entlist = [a, b]
insert, update = crawler.split_into_inserts_and_updates(entlist)
assert len(insert) == 1
assert insert[0].name == "B"
assert len(update) == 1
assert update[0].name == "A"
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_identifiable.assert_called()
def test_split_into_inserts_and_updates_with_circ(crawler):
# try circular
a = db.Record(name="A").add_parent("C")
b = db.Record(name="B").add_parent("C")
b.add_property("A", a)
a.add_property("B", b)
entlist = [a, b]
# TODO this does not seem to be complete!
def test_split_into_inserts_and_updates_with_complex(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
# A
# ^
# |
# F <- B <- G
a = db.Record(name="A").add_parent("C").add_property(
'd', 13).add_property('e', "lskdjlsfdj")
b = db.Record(name="B").add_parent("C")
g = db.Record(name="G").add_parent("C")
f = db.Record(name="F").add_parent("C")
g.add_property("A", a)
b.add_property("A", f)
b.add_property("A", a)
entlist = [a, b, g]
insert, update = crawler.split_into_inserts_and_updates(entlist)
assert len(insert) == 3
assert "B" in [el.name for el in insert]
assert len(update) == 1
assert update[0].name == "A"
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_identifiable.assert_called()
# TODO write test where the unresoled entity is not part of the identifiable
def test_split_into_inserts_and_updates_with_copy_attr(crawler_mocked_identifiable_retrieve):
crawler = crawler_mocked_identifiable_retrieve
# assume identifiable is only the name
a = db.Record(name="A").add_parent("C")
a.add_property("foo", 1)
b = db.Record(name="A").add_parent("C")
b.add_property("bar", 2)
entlist = [a, b]
insert, update = crawler.split_into_inserts_and_updates(entlist)
assert update[0].get_property("bar").value == 2
assert update[0].get_property("foo").value == 1
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
crawler.identifiableAdapter.retrieve_identified_record_for_identifiable.assert_called()
def test_has_missing_object_in_references(crawler):
# Simulate remote server content by using the names to identify records
# There are only two known Records with name A and B
crawler.identifiableAdapter.get_registered_identifiable = Mock(side_effect=partial(
basic_retrieve_by_name_mock_up, known={"C": db.Record(name="C").add_parent("RTC")
.add_property("d"),
"D": db.Record(name="D").add_parent("RTD")
.add_property("d").add_property("e"),
}))
# one reference with id -> check
assert not crawler._has_missing_object_in_references(
Identifiable(name="C", record_type="RTC", properties={'d': 123}), [])
# one ref with Entity with id -> check
assert not crawler._has_missing_object_in_references(
Identifiable(name="C", record_type="RTC", properties={'d': db.Record(id=123)
.add_parent("C")}), [])
# one ref with id one with Entity with id (mixed) -> check
assert not crawler._has_missing_object_in_references(
Identifiable(name="C", record_type="RTD",
properties={'d': 123, 'b': db.Record(id=123).add_parent("RTC")}), [])
# entity to be referenced in the following
a = db.Record(name="C").add_parent("C").add_property("d", 12311)
# one ref with id one with Entity without id (but not identifying) -> fail
assert not crawler._has_missing_object_in_references(
Identifiable(name="C", record_type="RTC", properties={'d': 123, 'e': a}), [])
# one ref with id one with Entity without id (mixed) -> fail
assert not crawler._has_missing_object_in_references(
Identifiable(name="D", record_type="RTD", properties={'d': 123, 'e': a}), [])
crawler.add_to_remote_missing_cache(a, Identifiable(name="C", record_type="RTC",
properties={'d': 12311}))
# one ref with id one with Entity without id but in cache -> check
assert crawler._has_missing_object_in_references(
Identifiable(name="D", record_type="RTD", properties={'d': 123, 'e': a}), [])
# if this ever fails, the mock up may be removed
crawler.identifiableAdapter.get_registered_identifiable.assert_called()
@pytest.mark.xfail()
def test_references_entities_without_ids(crawler, ident):
assert not crawler._has_reference_value_without_id(db.Record().add_parent("Person")
.add_property('last_name', 123)
.add_property('first_name', 123))
# id and rec with id
assert not crawler._has_reference_value_without_id(db.Record().add_parent("Person")
.add_property('first_name', 123)
.add_property('last_name',
db.Record(id=123)))
# id and rec with id and one unneeded prop
assert crawler._has_reference_value_without_id(db.Record().add_parent("Person")
.add_property('first_name', 123)
.add_property('stuff', db.Record())
.add_property('last_name', db.Record(id=123)))
# one identifying prop is missing
assert crawler._has_reference_value_without_id(db.Record().add_parent("Person")
.add_property('first_name', 123)
.add_property('last_name', db.Record()))
def test_replace_entities_with_ids(crawler):
a = (db.Record().add_parent("B").add_property("A", 12345)
.add_property("B", db.Record(id=12345))
.add_property("C", [db.Record(id=12345), 233324]))
crawler.replace_entities_with_ids(a)
assert a.get_property("A").value == 12345
assert a.get_property("B").value == 12345
assert a.get_property("C").value == [12345, 233324]
def mock_get_entity_by_id(id):
candidates = [el for el in list(full_data.values()) if el.id == id]
if len(candidates) > 0:
return candidates[0]
else:
raise ValueError()
def mock_get_entity_by_name(name):
candidates = [el for el in full_data.values()
if (el.name is not None and el.name.lower() == name.lower())]
if len(candidates) > 0:
return candidates[0]
else:
raise ValueError()
def prepare_crawler_with_sec_mode(mode, ident):
crawler = Crawler(debug=True, securityMode=mode)
crawler.crawl_directory(rfp("test_directories", "examples_article"),
rfp("scifolder_cfood.yml"))
crawler.identifiableAdapter = ident
return crawler
def reset_mocks(mocks):
for mock in mocks:
mock.reset_mock()
def change_identifiable_prop(ident):
"""
This function is supposed to change a non identifiing property.
"""
for ent in ident._records:
if len(ent.parents) == 0 or ent.parents[0].name != "Measurement":
continue
for prop in ent.properties:
if prop.name != "date":
continue
# change one element; This removes a responsible which is not part of the identifiable
prop.value = "2022-01-04"
return
# If it does not work, this test is not implemented properly
raise RuntimeError("Did not find the property that should be changed.")
def change_non_identifiable_prop(ident):
"""
This function is supposed to change a non identifiing property.
"""
for ent in ident._records:
if len(ent.parents) == 0 or ent.parents[0].name != "Measurement":
continue
for prop in ent.properties:
if prop.name != "responsible" or len(prop.value) < 2:
continue
# change one element; This removes a responsible which is not part of the identifiable
del prop.value[-1]
return
raise RuntimeError("Did not find the property that should be changed.")
@patch("caoscrawler.crawl.Crawler._get_entity_by_id",
new=Mock(side_effect=mock_get_entity_by_id))
@patch("caoscrawler.crawl.Crawler._get_entity_by_name",
new=Mock(side_effect=mock_get_entity_by_name))
@patch("caoscrawler.crawl.db.Container.insert")
@patch("caoscrawler.crawl.db.Container.update")
@patch("caoscrawler.crawl.UpdateCache.insert")
def test_security_mode(updateCacheMock, upmock, insmock, ident):
records_backup = deepcopy(ident._records)
# trivial case: nothing to do
crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident)
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_not_called()
upmock.assert_not_called()
updateCacheMock.assert_not_called()
# RETRIEVE: insert only
crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident)
# remove one element
del ident._records[-1]
# insert forbidden
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_not_called()
upmock.assert_not_called()
assert updateCacheMock.call_count == 1
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
# RETRIEVE: update only
crawler = prepare_crawler_with_sec_mode(SecurityMode.RETRIEVE, ident)
# change one element
change_non_identifiable_prop(ident)
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_not_called()
upmock.assert_not_called()
assert updateCacheMock.call_count == 1
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
# INSERT: insert only
crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident)
# remove one element
del ident._records[-1]
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_called_once()
upmock.assert_not_called()
updateCacheMock.assert_not_called()
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
# INSERT: update only
crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident)
# change one element
change_non_identifiable_prop(ident)
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.assert_not_called()
upmock.assert_not_called()
updateCacheMock.assert_called_once()
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
# INSERT: insert and update
crawler = prepare_crawler_with_sec_mode(SecurityMode.INSERT, ident)
# change two elements
change_non_identifiable_prop(ident)
change_identifiable_prop(ident)
crawler.synchronize(commit_changes=True)
assert crawler.run_id is not None
insmock.asser_called_once()
upmock.assert_not_called()
updateCacheMock.assert_called_once()
# reset counts
reset_mocks([updateCacheMock, insmock, upmock])
# restore original ident
ident._records = deepcopy(records_backup)
def test_create_reference_mapping():
a = db.Record().add_parent("A")
b = db.Record().add_parent("B").add_property('a', a)
ref = Crawler.create_reference_mapping([a, b])
assert id(a) in ref
assert id(b) not in ref
assert "B" in ref[id(a)]
assert ref[id(a)]["B"] == [b]
def test_create_flat_list():
a = db.Record()
b = db.Record()
a.add_property(name="a", value=a)
a.add_property(name="b", value=b)
flat = Crawler.create_flat_list([a])
assert len(flat) == 2
assert a in flat
assert b in flat
c = db.Record()
c.add_property(name="a", value=a)
# This would caus recursion if it is not dealt with properly.
a.add_property(name="c", value=c)
flat = Crawler.create_flat_list([c])
assert len(flat) == 3
assert a in flat
assert b in flat
assert c in flat
@pytest.fixture
def crawler_mocked_for_backref_test(crawler):
# mock retrieval of registered identifiabls: return Record with just a parent
def get_reg_ident(x):
if x.parents[0].name == "C":
return db.Record().add_parent(x.parents[0].name).add_property(
"is_referenced_by", value=["BR"])
elif x.parents[0].name == "D":
return db.Record().add_parent(x.parents[0].name).add_property(
"is_referenced_by", value=["BR", "BR2"])
else:
return db.Record().add_parent(x.parents[0].name)
crawler.identifiableAdapter.get_registered_identifiable = Mock(side_effect=get_reg_ident)
# Simulate remote server content by using the names to identify records
# There is only a single known Record with name A
crawler.identifiableAdapter.retrieve_identified_record_for_record = Mock(side_effect=partial(
basic_retrieve_by_name_mock_up, known={"A":
db.Record(id=1111, name="A").add_parent("BR")}))
crawler.identifiableAdapter.retrieve_identified_record_for_identifiable = Mock(
side_effect=partial(
basic_retrieve_by_name_mock_up, known={"A":
db.Record(id=1111, name="A").add_parent("BR")}))
return crawler
def test_validation_error_print(caplog):
caplog.set_level(logging.DEBUG, logger="caoscrawler.converters")
# there should be no server interaction since we only test the behavior if a validation error
# occurs during the data collection stage
DATADIR = os.path.join(os.path.dirname(__file__), "test_data", "failing_validation")
for fi in ["cfood.yml", "cfood2.yml"]:
ret = crawler_main(DATADIR,
os.path.join(DATADIR, fi),
os.path.join(DATADIR, "identifiables.yml"),
True,
None,
False)
assert "Couldn't validate" in caplog.text
caplog.clear()
def test_split_into_inserts_and_updates_backref(crawler_mocked_for_backref_test):
crawler = crawler_mocked_for_backref_test
identlist = [Identifiable(name="A", record_type="BR"),
Identifiable(name="B", record_type="C", backrefs=[db.Entity()])]
referenced = db.Record(name="B").add_parent("C")
entlist = [referenced, db.Record(name="A").add_parent("BR").add_property("ref", referenced), ]
# Test without referencing object
# currently a NotImplementedError is raised if necessary properties are missing.
with raises(NotImplementedError):
crawler.split_into_inserts_and_updates([db.Record(name="B").add_parent("C")])
# identifiables were not yet checked
assert crawler.get_from_any_cache(identlist[0]) is None
assert crawler.get_from_any_cache(identlist[1]) is None
# one with reference, one without
assert not crawler._has_reference_value_without_id(identlist[0])
assert crawler._has_reference_value_without_id(identlist[1])
# one can be found remotely, one not
assert crawler.identifiableAdapter.retrieve_identified_record_for_record(
identlist[0]).id == 1111
assert crawler.identifiableAdapter.retrieve_identified_record_for_record(
identlist[1]) is None
# check the split...
insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist))
# A was found remotely and is therefore in the update list
assert len(update) == 1
assert update[0].name == "A"
# B does not exist on the (simulated) remote server
assert len(insert) == 1
assert insert[0].name == "B"
def test_split_into_inserts_and_updates_mult_backref(crawler_mocked_for_backref_test):
# test whether multiple references of the same record type are correctly used
crawler = crawler_mocked_for_backref_test
referenced = db.Record(name="B").add_parent("C")
entlist = [referenced,
db.Record(name="A").add_parent("BR").add_property("ref", referenced),
db.Record(name="C").add_parent("BR").add_property("ref", referenced),
]
# test whether both entities are listed in the backref attribute of the identifiable
referencing_entities = crawler.create_reference_mapping(entlist)
identifiable = crawler.identifiableAdapter.get_identifiable(referenced, referencing_entities)
assert len(identifiable.backrefs) == 2
# check the split...
insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist))
assert len(update) == 1
assert len(insert) == 2
def test_split_into_inserts_and_updates_diff_backref(crawler_mocked_for_backref_test):
# test whether multiple references of the different record types are correctly used
crawler = crawler_mocked_for_backref_test
referenced = db.Record(name="B").add_parent("D")
entlist = [referenced,
db.Record(name="A").add_parent("BR").add_property("ref", referenced),
db.Record(name="A").add_parent("BR2").add_property("ref", referenced),
]
# test whether both entities are listed in the backref attribute of the identifiable
referencing_entities = crawler.create_reference_mapping(entlist)
identifiable = crawler.identifiableAdapter.get_identifiable(referenced, referencing_entities)
assert len(identifiable.backrefs) == 2
# check the split...
insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist))
assert len(update) == 2
assert len(insert) == 1
def mock_create_values(values, element):
pass
@patch("caoscrawler.converters.IntegerElementConverter.create_values")
def test_restricted_path(create_mock):
"""
The restricted_path argument allows to ignroe part of the crawled data structure. Here, we make
sure, that is that argument is provided, ideed only the given path of the tree is traversed.
The check is done using the mock of the create_values function of the IntegerElementConverter.
This function is only called if elements are being treated.
"""
crawler_definition = {
"DictTest": {
"type": "DictElement",
"match": "(.*)",
"subtree": {
"nextdict": {
"type": "DictElement",
"match": "(.*)",
"subtree": {
"int_element": {
"type": "IntegerElement",
"match_name": ".*",
"match_value": "(?P<int_value>.*)",
"records": {
"Dataset": {
"Subject": "$int_value"
}
}
}
}
}
}
}
}
crawler = Crawler(debug=True)
converter_registry = crawler.load_converters(crawler_definition)
# This structure is crawled
test_dict = {
"v1": {
"a": 1,
"b": 2,
},
"v2": {
"c": 3,
"d": 4,
}
}
# first test without a restricted_path
restricted_path = None
records = crawler.start_crawling(
DictElement("TestDict", test_dict), crawler_definition, converter_registry,
restricted_path
)
assert create_mock.call_count == 4
create_mock.reset_mock()
# test with a restricted_path but one that has no effect (single root element)
# this also tests that the remainder of the tree is fully traversed
restricted_path = ["TestDict"]
records = crawler.start_crawling(
DictElement("TestDict", test_dict), crawler_definition, converter_registry,
restricted_path
)
assert create_mock.call_count == 4
create_mock.reset_mock()
# test with a restricted_path that restricts the tree (single root element)
restricted_path = ["TestDict", "v2"]
records = crawler.start_crawling(
DictElement("TestDict", test_dict), crawler_definition, converter_registry,
restricted_path
)
assert create_mock.call_count == 2
create_mock.reset_mock()
# test with a restricted_path that contains a bad element
restricted_path = ["TestDict", "v3"]
with raises(RuntimeError):
records = crawler.start_crawling(
DictElement("TestDict", test_dict), crawler_definition, converter_registry,
restricted_path
)
def test_split_restricted_path():
assert ["el"] == split_restricted_path("/el")
assert ["el"] == split_restricted_path("/el/")
assert ["el", "el"] == split_restricted_path("/el/el")
def test_deprecated_prefix_option():
"""Test that calling the crawler's main function with the deprecated
`prefix` option raises the correct errors and warnings.
"""
with pytest.deprecated_call():
crawler_main("./", rfp("scifolder_cfood.yml"), prefix="to/be/removed")
with raises(ValueError) as ve:
crawler_main("./", rfp("scifolder_cfood.yml"), prefix="to/be/removed",
remove_prefix="to/be/removed")
assert "(deprecated) `prefix` and the `remove_prefix`" in str(ve.value)
def test_wrong_remove_prefix_option():
"""Test that calling the crawler's main function with `remove_prefix` option
with a bad value raises the correct error.
"""
with raises(RuntimeError) as re:
crawler_main(rfp("test_directories", "examples_article"),
rfp("scifolder_extended.yml"),
remove_prefix="to/be/removed")
assert "path does not start with the prefix" in str(re.value)