Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_basic.py 12.23 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2021 Indiscale GmbH <info@indiscale.com>
#               2021 Henrik tom Wörden <h.tomwoerden@indiscale.com>
#               2021 Alexander Schlemmer
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#

"""
an integration test module that does basic integration tests
"""

from caosadvancedtools.crawler import Crawler as OldCrawler
import os
from caosdb import EmptyUniqueQueryError
import argparse
import sys
from argparse import RawTextHelpFormatter
from caoscrawler import Crawler, SecurityMode
from caoscrawler.identifiable import Identifiable
import caosdb as db
from caoscrawler.identifiable_adapters import CaosDBIdentifiableAdapter
import pytest
from caosadvancedtools.models.parser import parse_model_from_yaml
import yaml

from caosdb.utils.register_tests import clear_database, set_test_key
set_test_key("10b128cf8a1372f30aa3697466bb55e76974e0c16a599bb44ace88f19c8f61e2")


def rfp(*pathcomponents):
    """
    Return full path.
    Shorthand convenience function.
    """
    return os.path.join(os.path.dirname(__file__), *pathcomponents)


@pytest.fixture
def usemodel():
    model = parse_model_from_yaml(rfp("model.yml"))
    model.sync_data_model(noquestion=True, verbose=False)


@pytest.fixture
def ident():
    ident = CaosDBIdentifiableAdapter()

    # TODO place this definition of identifiables elsewhere
    ident.register_identifiable(
        "Person", db.RecordType()
        .add_parent(name="Person")
        # .add_property(name="first_name")
        .add_property(name="last_name"))
    ident.register_identifiable(
        "Measurement", db.RecordType()
        .add_parent(name="Measurement")
        # .add_property(name="identifier")
        .add_property(name="date")
        .add_property(name="project"))
    ident.register_identifiable(
        "Project", db.RecordType()
        .add_parent(name="Project")
        .add_property(name="date")
        .add_property(name="identifier"))
    return ident


def crawl_standard_test_directory(cr: Crawler,
                                  subdir: str = "examples_article",
                                  cfood: str = "scifolder_cfood.yml"):
    return cr.crawl_directory(rfp("..", "..", "unittests", "test_directories", subdir),
                              rfp("..", "..", "unittests", cfood))


@pytest.fixture
def crawler(ident):
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr)
    return cr, crawled_data, debug_tree


@pytest.fixture
def crawler_extended(ident):
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr, cfood="scifolder_extended.yml")
    # correct paths for current working directory
    file_list = [r for r in crawled_data if r.role == "File"]
    for f in file_list:
        f.file = rfp("..", "..", "unittests", "test_directories", f.file)
    return cr, crawled_data, debug_tree


def test_ambigious_lookup(clear_database, usemodel, crawler, ident):
    ins, ups = crawler[0].synchronize(crawler[1])

    proj = db.execute_query("FIND Project WITH identifier='SpeedOfLight'", unique=True)
    with pytest.raises(RuntimeError, match=".*unambigiously.*"):
        print(crawler[0].identifiableAdapter.retrieve_identified_record_for_identifiable(
            Identifiable(properties={'project': proj.id})))


def test_single_insertion(clear_database, usemodel, crawler, ident):
    ins, ups = crawler[0].synchronize(crawler[1])

    # This test also generates the file records.xml used in some of the unittesets:
    res = db.execute_query("FIND Record")
    for i in reversed(range(len(res))):
        if res[i].parents[0].name == "PyTestInfo":
            del res[i]
    # uncomment this to recreate the `records.xml` file
    # filename = rfp("..", "..", "unittests", "records.xml")
    # with open(filename, "w") as f:
    #    xml = res.to_xml()
    #    # Remove noscript and transaction benchmark:
    #    for tag in ("noscript", "TransactionBenchmark"):
    #        if xml.find(tag) is not None:
    #            xml.remove(xml.find(tag))
    #    f.write(db.common.utils.xml2str(xml))

    assert len(ins) == 18
    assert len(ups) == 0

    # Do a second run on the same data, there should be no changes:
    crawler = Crawler(identifiableAdapter=ident)
    crawled_data = crawler.crawl_directory(rfp("../../unittests/test_directories", "examples_article"),
                                           rfp("../../unittests/scifolder_cfood.yml"))
    ins, ups = crawler.synchronize(crawled_data)
    assert len(ins) == 0
    assert len(ups) == 0


def test_multiple_insertions(clear_database, usemodel, ident, crawler):
    ins, ups = crawler[0].synchronize(crawler[1])

    # Do a second run on the same data, there should be no changes:
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr)
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 0
    assert len(ups) == 0


def test_insertion(clear_database, usemodel, ident, crawler):
    ins, ups = crawler[0].synchronize(crawler[1])

    # Do a second run on the same data, there should a new insert:
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr, "example_insert")
    assert len(crawled_data) == 3
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 1
    assert len(ups) == 0

    # Do it again to check whether nothing is changed:
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr, "example_insert")
    assert len(crawled_data) == 3
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 0
    assert len(ups) == 0


def test_insert_auth(clear_database, usemodel, ident, crawler):
    ins, ups = crawler[0].synchronize(crawler[1])

    # Do a second run on the same data, there should a new insert:
    cr = Crawler(identifiableAdapter=ident, securityMode=SecurityMode.RETRIEVE)
    crawled_data = crawl_standard_test_directory(cr, "example_insert")
    assert len(crawled_data) == 3
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 1
    assert not ins[0].is_valid()
    nins, nups = OldCrawler.update_authorized_changes(cr.run_id)
    assert nins == 1

    # Do it again to check whether nothing is changed:
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr, "example_insert")
    assert len(crawled_data) == 3
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 0
    assert len(ups) == 0


def test_insertion_and_update(clear_database, usemodel, ident, crawler):
    ins, ups = crawler[0].synchronize(crawler[1])

    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr, "example_insert")
    ins, ups = cr.synchronize(crawled_data)

    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr, "example_overwrite_1")
    # cr.save_debug_data(rfp("provenance.yml"))
    assert len(crawled_data) == 3
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 0
    assert len(ups) == 1


def test_identifiable_update(clear_database, usemodel, ident, crawler):
    ins, ups = crawler[0].synchronize(crawler[1])

    # Do a second run on the same data with a change in one
    # of the identifiables:
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr)

    # Test the addition of a single property:
    l = crawled_data
    for record in l:
        if (record.parents[0].name == "Measurement" and
                record.get_property("date").value == "2020-01-03"):
            # maybe a bit weird, but add an email address to a measurement
            record.add_property(
                name="email", value="testperson@testaccount.test")
            print("one change")
            break
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 0
    assert len(ups) == 1

    # Test the change within one property:
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr)
    l = crawled_data
    for record in l:
        if (record.parents[0].name == "Measurement" and
                record.get_property("date").value == "2020-01-03"):
            record.add_property(name="email", value="testperson@coolmail.test")
            print("one change")
            break
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 0
    assert len(ups) == 1

    # Changing the date should result in a new insertion:
    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr)
    l = crawled_data
    for record in l:
        if (record.parents[0].name == "Measurement" and
                record.get_property("date").value == "2020-01-03"):
            record.add_property(name="email", value="testperson@coolmail.test")
            record.get_property("date").value = "2012-01-02"
            print("one change")
            break
    ins, ups = cr.synchronize(crawled_data)
    assert len(ins) == 1
    assert len(ups) == 0


def test_file_insertion_dry(clear_database, usemodel, ident):
    crawler_extended = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(
        crawler_extended, cfood="scifolder_extended.yml")
    file_list = [r for r in crawled_data if r.role == "File"]
    assert len(file_list) == 11

    for f in file_list:
        assert f.path.endswith("README.md")
        assert f.path[1:] == f.file

    ins, ups = crawler_extended.synchronize(crawled_data, commit_changes=False)
    assert len(ups) == 0
    file_list_ins = [r for r in ins if r.role == "File"]
    assert len(file_list_ins) == 11


def test_file_insertion(clear_database, usemodel, ident, crawler_extended):
    ins, ups = crawler_extended[0].synchronize(crawler_extended[1], commit_changes=True)
    file_list_ins = [r for r in ins if r.role == "File"]
    assert len(file_list_ins) == 11

    assert db.execute_query("COUNT File") > 0

    # find record which references File does not seem to be possible
    # retrieve ids of files:
    files = db.execute_query("FIND File")
    for f in files:
        r = db.execute_query("FIND Record which references {}".format(f.id))
        assert len(r) == 1
        assert r[0].get_property("ReadmeFile").value == f.id


def test_file_update(clear_database, usemodel, ident, crawler_extended):
    ins1, ups1 = crawler_extended[0].synchronize(crawler_extended[1], commit_changes=True)
    file_list_ins = [r for r in ins1 if r.role == "File"]

    cr = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr, cfood="scifolder_extended.yml")

    file_list = [r for r in crawled_data if r.role == "File"]
    for f in file_list:
        f.file = rfp("..", "..", "unittests", "test_directories", f.file)
    ins2, ups2 = cr.synchronize(crawled_data, commit_changes=True)
    assert len(ups1) == 0
    assert len(ups2) == 0

    # Try adding a parent:
    res = db.execute_query("Find File")
    assert len(res) == 11
    assert len(res[0].parents) == 0

    cr2 = Crawler(identifiableAdapter=ident)
    crawled_data = crawl_standard_test_directory(cr2, cfood="scifolder_extended2.yml")

    file_list = [r for r in crawled_data if r.role == "File"]
    for f in file_list:
        f.file = rfp("..", "..", "unittests", "test_directories", f.file)
    ins3, ups3 = cr2.synchronize(crawled_data, commit_changes=True)
    assert len(ups3) == 11

    res = db.execute_query("Find File")
    assert len(res) == 11
    assert res[0].parents[0].name == "ProjectMarkdownReadme"

    # TODO: Implement file update checks (based on checksum)
    # Add test with actual file update:
    # assert len(ins2) == 0
    # assert len(ups2) == len(file_list_ins)