Skip to content
Snippets Groups Projects
Select Git revision
  • 1ebdcbe0ac02981ef49da81674d45ea3e316e25d
  • main default protected
  • dev
  • f-unmod
  • f-checkidentical
  • f-simple-breakpoint
  • f-new-debug-tree
  • f-existing-file-id
  • f-no-ident
  • f-collect-problems
  • f-refactor-debug-tree
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
  • v0.5.0
  • v0.4.0
  • v0.3.0
  • v0.2.0
  • v0.1.0
27 results

scanner.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_tool.py 20.34 KiB
    #!/bin/python
    # Tests for the tool using pytest
    # Adapted from check-sfs
    # A. Schlemmer, 06/2021
    
    from newcrawler import Crawler
    from newcrawler.converters import MarkdownFileConverter
    from newcrawler.structure_elements import File, DictTextElement, DictListElement
    from newcrawler.identifiable_adapters import IdentifiableAdapter, LocalStorageIdentifiableAdapter
    from functools import partial
    from copy import deepcopy
    from unittest.mock import MagicMock, Mock
    from os.path import join, dirname, basename
    import yaml
    import caosdb as db
    from caosdb.apiutils import compare_entities
    
    import pytest
    from pytest import raises
    
    
    def rfp(*pathcomponents):
        """
        Return full path.
        Shorthand convenience function.
        """
        return join(dirname(__file__), *pathcomponents)
    
    
    def dircheckstr(*pathcomponents):
        """
        Return the debug tree identifier for a given path.
        """
        return "newcrawler.structure_elements.Directory: " + basename(join(*pathcomponents)) + ", " + rfp("test_directories", "examples_article", *pathcomponents)
    
    
    @pytest.fixture
    def crawler():
        crawler = Crawler(debug=True)
        crawler.crawl_directory(rfp("test_directories", "examples_article"),
                                rfp("scifolder_cfood.yml"))
        return crawler
    
    @pytest.fixture
    def ident(crawler):
        ident = LocalStorageIdentifiableAdapter()
        crawler.identifiableAdapter = ident
    
        # The records.xml file is constructed as follows:
        # To a full run of the crawler, resolve all identifiables and insert all resulting entities.
        # See: test-setup/datamodel/generate_test_data.py for details.
        ident.restore_state(rfp("records.xml"))
    
        ident.register_identifiable(
            "Person", db.RecordType()
            .add_parent(name="Person")
            .add_property(name="first_name")
            .add_property(name="last_name"))
        ident.register_identifiable(
            "Measurement", db.RecordType()
            .add_parent(name="Measurement")
            .add_property(name="identifier")
            .add_property(name="date")
            .add_property(name="project"))
        ident.register_identifiable(
            "Project", db.RecordType()
            .add_parent(name="Project")
            .add_property(name="date")
            .add_property(name="identifier"))
        return ident
    
    def test_crawler(crawler):
        subd = crawler.debug_tree[dircheckstr("DataAnalysis")]
        subc = crawler.debug_metadata["copied"][dircheckstr("DataAnalysis")]
        assert len(subd) == 2
        assert len(subd[0]) == 0
        assert len(subd[1]) == 0
        assert len(subc) == 2
        assert len(subc[0]) == 0
        assert len(subc[1]) == 0
    
        subd = crawler.debug_tree[dircheckstr("DataAnalysis", "2020_climate-model-predict")]
        subc = crawler.debug_metadata["copied"][dircheckstr(
            "DataAnalysis", "2020_climate-model-predict")]
    
        assert len(subd[1]) == 1
        assert len(subd[1]["Project"].get_parents()) == 1
        assert subd[1]["Project"].get_parents()[0].name == "Project"
        assert subd[1]["Project"].get_property("date").value == "2020"
        assert subd[1]["Project"].get_property("identifier").value == "climate-model-predict"
    
        assert len(subd[0]) == 3
        assert subd[0]["date"] == "2020"
        assert subd[0]["identifier"] == "climate-model-predict"
        assert subd[0]["Project"].__class__ == db.Record
    
        # Check the copy flags for the first level in the hierarchy:
        assert len(subc[0]) == 3
        assert len(subc[1]) == 1
        assert subc[1]["Project"] is False
        assert subc[0]["Project"] is False
        assert subc[0]["date"] is False
        assert subc[0]["identifier"] is False
    
        subd = crawler.debug_tree[dircheckstr("DataAnalysis",
                                              "2020_climate-model-predict",
                                              "2020-02-08_prediction-errors")]
        subc = crawler.debug_metadata["copied"][dircheckstr("DataAnalysis",
                                                            "2020_climate-model-predict",
                                                            "2020-02-08_prediction-errors")]
        assert len(subd[0]) == 4
        assert subd[0]["date"] == "2020-02-08"
        assert subd[0]["identifier"] == "prediction-errors"
        assert subd[0]["Project"].__class__ == db.Record
        assert subd[0]["Measurement"].__class__ == db.Record
    
        assert len(subd[1]) == 2
    
        assert len(subd[1]["Project"].get_parents()) == 1
        assert subd[1]["Project"].get_parents()[0].name == "Project"
        assert subd[1]["Project"].get_property("date").value == "2020"
        assert subd[1]["Project"].get_property("identifier").value == "climate-model-predict"
    
        assert len(subd[1]["Measurement"].get_parents()) == 1
        assert subd[1]["Measurement"].get_parents()[0].name == "Measurement"
        assert subd[1]["Measurement"].get_property("date").value == "2020-02-08"
        assert subd[1]["Measurement"].get_property("identifier").value == "prediction-errors"
        assert subd[1]["Measurement"].get_property("project").value != "$Project"
        assert subd[1]["Measurement"].get_property("project").value.__class__ == db.Record
        assert subd[1]["Measurement"].get_property("project").value == subd[0]["Project"]
    
        # Check the copy flags for the second level in the hierarchy:
        assert subc[1]["Project"] is True
        assert subc[0]["Project"] is True
        assert subc[1]["Measurement"] is False
        assert subc[0]["Measurement"] is False
        assert subc[0]["date"] is False
        assert subc[0]["identifier"] is False
    
    
    def test_markdown_converter():
        test_readme = File("README.md", rfp(
            "test_directories", "examples_article", "DataAnalysis",
            "2020_climate-model-predict", "2020-02-08_prediction-errors", "README.md"))
    
        converter = MarkdownFileConverter({
            "match": "(.*)"
        }, "TestMarkdownFileConverter")
    
        m = converter.match(File("test_tool.py", rfp(
            "test_tool.py")))
        assert m is None
    
        m = converter.match(test_readme)
        assert m is not None
        assert m.__class__ == dict
        assert len(m) == 0
    
        converter = MarkdownFileConverter({
            "match": "README.md"
        }, "TestMarkdownFileConverter")
    
        m = converter.match(test_readme)
        assert m is not None
        assert len(m) == 0
    
        children = converter.create_children(None, test_readme)
        assert len(children) == 5
        assert children[1].__class__ == DictTextElement
        assert children[1].name == "description"
        assert children[1].value.__class__ == str
    
        assert children[0].__class__ == DictTextElement
        assert children[0].name == "responsible"
        assert children[0].value.__class__ == str
    
        test_readme2 = File("README.md", rfp("test_directories", "examples_article",
                            "ExperimentalData", "2020_SpeedOfLight", "2020-01-01_TimeOfFlight", "README.md"))
    
        m = converter.match(test_readme2)
        assert m is not None
        assert len(m) == 0
    
        children = converter.create_children(None, test_readme2)
        assert len(children) == 2
        assert children[1].__class__ == DictTextElement
        assert children[1].name == "description"
        assert children[1].value.__class__ == str
    
        assert children[0].__class__ == DictListElement
        assert children[0].name == "responsible"
        assert children[0].value.__class__ == list
    
    # def prepare_test_record_file():
    #     ident = LocalStorageIdentifiableAdapter()
    #     crawler = Crawler(debug=True, identifiableAdapter=ident)
    #     crawler.crawl_directory(rfp("test_directories", "examples_article"),
    #                             rfp("scifolder_cfood.yml"))
    
    #     # clean record list:
    #     recordlist = ident.get_records()
    #     for i in range(len(recordlist)-1, 1, -1):
    #         if recordlist[i].parents[0].name == "Person":
    #             del recordlist[i]
    
    #     ident.store_state(rfp("records.xml"))
    
    
    def test_ambigious_records(crawler, ident):
        ident.get_records().clear()
        ident.get_records().extend(crawler.updateList)
        r = ident.get_records()
        id_r0 = ident.get_identifiable(r[0])
        with raises(RuntimeError, match=".*unambigiously.*"):
            ident.retrieve_identified_record(id_r0)
    
    
    def test_crawler_update_list(crawler, ident):
        crawler.copy_attributes = Mock()
        # If the following assertions fail, that is a hint, that the test file records.xml is
        # incorrect:
        assert len(ident.get_records()) == 18
        assert len([r for r in ident.get_records() if r.parents[0].name == "Person"]) == 5
        assert len([r for r in ident.get_records() if r.parents[0].name == "Measurement"]) == 11
        assert len([r for r in ident.get_records() if r.parents[0].name == "Project"]) == 2
    
        # The crawler contains lots of duplicates, because identifiables have not been resolved yet:
        assert len(ident.get_records()) != len(crawler.updateList)
    
        # Check consistency:
        # Check whether identifiables retrieved from current identifiable store return the same results.
    
        # take the first person in the list of records:
        for r in ident.get_records():
            if r.parents[0].name == "Person":
                r_cur = r
                break
    
        id_r0 = ident.get_identifiable(r_cur)
        assert r_cur.parents[0].name == id_r0.parents[0].name
        assert r_cur.get_property("first_name").value == id_r0.get_property("first_name").value
        assert r_cur.get_property("last_name").value == id_r0.get_property("last_name").value
        assert len(r_cur.parents) == 1
        assert len(id_r0.parents) == 1
        assert len(r_cur.properties) == 2
        assert len(id_r0.properties) == 2
    
        idr_r0_test = ident.retrieve_identified_record(id_r0)
        idr_r0 = ident.retrieve_identifiable(r_cur)
        assert idr_r0 == idr_r0_test
    
        # take the first measurement in the list of records:
        for r in ident.get_records():
            if r.parents[0].name == "Measurement":
                r_cur = r
                break
    
        id_r1 = ident.get_identifiable(r_cur)
        assert r_cur.parents[0].name == id_r1.parents[0].name
        assert r_cur.get_property("identifier").value == id_r1.get_property("identifier").value
        assert r_cur.get_property("date").value == id_r1.get_property("date").value
        assert r_cur.get_property("project").value == id_r1.get_property("project").value
        assert len(r_cur.parents) == 1
        assert len(id_r1.parents) == 1
        assert len(r_cur.properties) == 5
        assert len(id_r1.properties) == 3
    
        idr_r1_test = ident.retrieve_identified_record(id_r1)
        idr_r1 = ident.retrieve_identifiable(r_cur)
        assert idr_r1 == idr_r1_test
        assert idr_r1 != idr_r0
        assert idr_r1_test != idr_r0_test
    
        assert len(idr_r1.properties) == 5
        assert r_cur.get_property("responsible").value == idr_r1.get_property("responsible").value
        assert r_cur.get_property("description").value == idr_r1.get_property("description").value
    
        # test whether compare_entites function works in this context:
        comp = compare_entities(r_cur, id_r1)
        assert len(comp[0]["parents"]) == 0
        assert len(comp[1]["parents"]) == 0
        assert len(comp[0]["properties"]) == 2
        assert len(comp[1]["properties"]) == 0
        assert "responsible" in comp[0]["properties"]
        assert "description" in comp[0]["properties"]
    
        comp = compare_entities(r_cur, idr_r1)
        assert len(comp[0]["parents"]) == 0
        assert len(comp[1]["parents"]) == 0
        assert len(comp[0]["properties"]) == 0
        assert len(comp[1]["properties"]) == 0
    
        insl, updl = crawler.synchronize()
        assert len(insl) == 0
        assert len(updl) == 0
    
    
    def test_remove_unnecessary_updates():
        # test trvial case
        upl = [db.Record().add_parent("A")]
        irs = [db.Record().add_parent("A")]
        Crawler.remove_unnecessary_updates(upl, irs)
        assert len(upl) == 0
    
        # test property difference case
        # TODO this should work right?
        #upl = [db.Record().add_parent("A").add_property("a", 3)]
        # irs = [db.Record().add_parent("A")]  # ID should be s
        #Crawler.remove_unnecessary_updates(upl, irs)
        #assert len(upl) == 1
    
        # test value difference case
        upl = [db.Record().add_parent("A").add_property("a", 5)]
        irs = [db.Record().add_parent("A").add_property("a")]
        Crawler.remove_unnecessary_updates(upl, irs)
        assert len(upl) == 1
        upl = [db.Record().add_parent("A").add_property("a", 5)]
        irs = [db.Record().add_parent("A").add_property("a", 5)]
        Crawler.remove_unnecessary_updates(upl, irs)
        assert len(upl) == 0
    
        # test unit difference case
        upl = [db.Record().add_parent("A").add_property("a", unit='cm')]
        irs = [db.Record().add_parent("A").add_property("a")]
        Crawler.remove_unnecessary_updates(upl, irs)
        assert len(upl) == 1
    
        # test None difference case
        upl = [db.Record().add_parent("A").add_property("a")]
        irs = [db.Record().add_parent("A").add_property("a", 5)]
        Crawler.remove_unnecessary_updates(upl, irs)
        assert len(upl) == 1
    
    
    def test_identifiable_adapter():
        query = IdentifiableAdapter.create_query_for_identifiable(
            db.Record().add_parent("Person")
            .add_property("first_name", value="A")
            .add_property("last_name", value="B"))
        assert query.lower() == "find record person with 'first_name'='a' and 'last_name'='b' "
    
    
    @pytest.mark.xfail
    def test_identifiable_adapter_no_identifiable(crawler, ident):
        del ident._registered_identifiables["Person"]
        insl, updl = crawler.synchronize()
        assert len(updl) == 0
    
        pers = [r for r in crawler.updateList if r.parents[0].name == "Person"]
        # All persons are inserted, because they are not identifiable:
        assert len(insl) == len(pers)
    
    
    def test_provenance_debug_data(crawler):
        crawler.save_debug_data(rfp("provenance.yml"))
    
        with open(rfp("provenance.yml"), "r") as f:
            provenance = yaml.load(f, Loader=yaml.SafeLoader)
    
        pr = provenance["provenance"]
    
        def check_key_count(prefix):
            return sum([1 for key in pr.keys() if key.startswith(prefix)])
        assert check_key_count("Measurement") == 11
        assert check_key_count("Project") == 5
        assert check_key_count("Person") == 14
    
    
    @pytest.fixture
    def mock_retrieve(crawler):
        # simulate remote server content by using the names to identify records
        def base_mocked_lookup(rec, known):
            if rec.name in known:
                return known[rec.name]
            else:
                return None
    
        crawler.copy_attributes = Mock()
    
        # a record that is found remotely and should be added to the update list and one that is not
        # found and should be added to the insert one
        remote_known = {"A": db.Record(id=1111, name="A")}
        crawler.identifiableAdapter.retrieve_identifiable = Mock(side_effect=partial(
            base_mocked_lookup, known=remote_known))
        crawler.identifiableAdapter.get_registered_identifiable = (
            lambda x: db.Record().add_parent(x.parents[0].name))
        return crawler
    
    
    def test_split_into_inserts_and_updates_trivial(crawler):
        # Try trivial argument
        crawler.split_into_inserts_and_updates([])
    
    
    def test_split_into_inserts_and_updates_single(mock_retrieve):
        crawler = mock_retrieve
    
        entlist = [db.Record(name="A").add_parent("C"), db.Record(name="B").add_parent("C")]
    
        assert crawler.get_identified_record_from_local_cache(entlist[0]) is None
        assert crawler.get_identified_record_from_local_cache(entlist[1]) is None
        assert crawler.can_be_checked_externally(entlist[0])
        assert crawler.can_be_checked_externally(entlist[1])
        assert crawler.identifiableAdapter.retrieve_identifiable(entlist[0]).id == 1111
        assert crawler.identifiableAdapter.retrieve_identifiable(entlist[1]) is None
    
        insert, update = crawler.split_into_inserts_and_updates(deepcopy(entlist))
        assert len(insert) == 1
        assert insert[0].name == "B"
        assert len(update) == 1
        assert update[0].name == "A"
    
    
    def test_split_into_inserts_and_updates_with_duplicate(mock_retrieve):
        crawler = mock_retrieve
        # try it with a reference
        a = db.Record(name="A").add_parent("C")
        b = db.Record(name="B").add_parent("C")
        b.add_property("A", a)
        c = db.Record(name="A").add_parent("C")
        entlist = [a, b, c]
        insert, update = crawler.split_into_inserts_and_updates(entlist)
        assert len(insert) == 1
        assert insert[0].name == "B"
        assert len(update) == 1
        assert update[0].name == "A"
    
    
    def test_split_into_inserts_and_updates_with_ref(mock_retrieve):
        crawler = mock_retrieve
        # try it with a reference
        a = db.Record(name="A").add_parent("C")
        b = db.Record(name="B").add_parent("C")
        b.add_property("A", a)
        entlist = [a, b]
        insert, update = crawler.split_into_inserts_and_updates(entlist)
        assert len(insert) == 1
        assert insert[0].name == "B"
        assert len(update) == 1
        assert update[0].name == "A"
    
    
    def test_split_into_inserts_and_updates_with_circ(mock_retrieve):
        # try circular
        crawler = mock_retrieve
        a = db.Record(name="A").add_parent("C")
        b = db.Record(name="B").add_parent("C")
        b.add_property("A", a)
        a.add_property("B", b)
        entlist = [a, b]
    
    
    def test_split_into_inserts_and_updates_with_complex(mock_retrieve):
        crawler = mock_retrieve
        #      A
        #      ^
        #      |
        # F <- B <- G
        a = db.Record(name="A").add_parent("C").add_property('d', 13).add_property('e', "lskdjlsfdj")
        b = db.Record(name="B").add_parent("C")
        g = db.Record(name="G").add_parent("C")
        f = db.Record(name="F").add_parent("C")
        g.add_property("A", a)
        b.add_property("A", f)
        b.add_property("A", a)
        entlist = [a, b, g]
        insert, update = crawler.split_into_inserts_and_updates(entlist)
        assert len(insert) == 3
        assert "B" in [el.name for el in insert]
        assert len(update) == 1
        assert update[0].name == "A"
    
        # TODO write test where the unresoled entity is not part of the identifiable
    
    
    def test_split_into_inserts_and_updates_with_copy_attr(mock_retrieve):
        crawler = mock_retrieve
        # assume identifiable is only the name
        a = db.Record(name="A").add_parent("C")
        a.add_property("foo", 1)
        b = db.Record(name="A").add_parent("C")
        b.add_property("bar", 2)
        entlist = [a, b]
        insert, update = crawler.split_into_inserts_and_updates(entlist)
    
        # expected TODO
        assert update[0].get_property("bar").value == 2
        assert update[0].get_property("foo").value == 1
    
    
    def test_all_references_are_existing_already(crawler):
        def base_mocked_lookup(rec, known):
            if rec.name in known:
                return known[rec.name]
            else:
                return None
        crawler.identifiableAdapter.get_registered_identifiable = Mock(side_effect=partial(
            base_mocked_lookup, known={"A": db.Record(name="A").add_parent("C"),
                                       "B": db.Record(name="B").add_parent("C")}))
    
        assert crawler.all_references_are_existing_already(db.Record().add_property('a', 123))
        assert crawler.all_references_are_existing_already(db.Record()
                                                           .add_property('a', db.Record(id=123)))
        assert crawler.all_references_are_existing_already(db.Record()
                                                           .add_property('a', 123)
                                                           .add_property('b', db.Record(id=123)))
        assert not crawler.all_references_are_existing_already(db.Record()
                                                               .add_property('a', 123)
                                                               .add_property('b', db.Record(name="A")
                                                                             .add_parent("C")))
        a = db.Record(name="A").add_parent("C")
        crawler.add_identified_record_to_local_cache(a)
        assert crawler.all_references_are_existing_already(db.Record()
                                                           .add_property('a', 123)
                                                           .add_property('b', a))
    
    
    def test_can_be_checked_externally(crawler):
        assert crawler.can_be_checked_externally(db.Record().add_property('a', 123))
        assert crawler.can_be_checked_externally(db.Record()
                                                 .add_property('a', db.Record(id=123)))
        assert crawler.can_be_checked_externally(db.Record()
                                                 .add_property('a', 123)
                                                 .add_property('b', db.Record(id=123)))
    
        assert not crawler.can_be_checked_externally(db.Record()
                                                     .add_property('a', 123)
                                                     .add_property('b', db.Record()))
    
    
    def test_replace_entities_by_ids(crawler):
        a = (db.Record().add_parent("B").add_property("A", 12345)
             .add_property("B", db.Record(id=12345))
             .add_property("C", [db.Record(id=12345), 233324]))
    
        crawler.replace_entities_by_ids(a)
        assert a.get_property("A").value == 12345
        assert a.get_property("B").value == 12345
        assert a.get_property("C").value == [12345, 233324]