#!/usr/bin/env python3 # encoding: utf-8 # # This file is a part of the CaosDB Project. # # Copyright (C) 2021 Indiscale GmbH <info@indiscale.com> # 2021 Henrik tom Wörden <h.tomwoerden@indiscale.com> # 2021 Alexander Schlemmer # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # """ an integration test module that does basic integration tests """ import argparse import os import sys from argparse import RawTextHelpFormatter from pathlib import Path import linkahead as db import pytest import yaml from caosadvancedtools.crawler import Crawler as OldCrawler from caosadvancedtools.models.parser import parse_model_from_yaml from caoscrawler import Crawler, SecurityMode from caoscrawler.debug_tree import DebugTree from caoscrawler.identifiable import Identifiable from caoscrawler.identifiable_adapters import CaosDBIdentifiableAdapter from caoscrawler.scanner import scan_directory from linkahead import EmptyUniqueQueryError from linkahead.utils.register_tests import clear_database, set_test_key set_test_key("10b128cf8a1372f30aa3697466bb55e76974e0c16a599bb44ace88f19c8f61e2") # TODO move test related stuff here and remove it from unittests UNITTESTDIR = Path(__file__).parent.parent.parent / "unittests" BASICTESTDIR = Path(__file__).parent @pytest.fixture def usemodel(): model = parse_model_from_yaml(BASICTESTDIR / "model.yml") model.sync_data_model(noquestion=True, verbose=False) @pytest.fixture def ident(): ident = CaosDBIdentifiableAdapter() # TODO place this definition of identifiables elsewhere ident.register_identifiable( "Person", db.RecordType() .add_parent(name="Person") # .add_property(name="first_name") .add_property(name="last_name")) ident.register_identifiable( "Measurement", db.RecordType() .add_parent(name="Measurement") # .add_property(name="identifier") .add_property(name="date") .add_property(name="project")) ident.register_identifiable( "Project", db.RecordType() .add_parent(name="Project") .add_property(name="date") .add_property(name="identifier")) return ident def crawl_standard_test_directory(subdir: str = "examples_article", cfood: str = "scifolder_cfood.yml", debug_tree=None): return scan_directory(UNITTESTDIR / "test_directories" / subdir, UNITTESTDIR / cfood, debug_tree=debug_tree) @pytest.fixture def crawler(ident): cr = Crawler(identifiableAdapter=ident) debug_tree = DebugTree() crawled_data = crawl_standard_test_directory(debug_tree=debug_tree) return cr, crawled_data, debug_tree @pytest.fixture def crawler_extended(ident): cr = Crawler(identifiableAdapter=ident) debug_tree = DebugTree() crawled_data = crawl_standard_test_directory( cfood="scifolder_extended.yml", debug_tree=debug_tree) # correct paths for current working directory file_list = [r for r in crawled_data if r.role == "File"] for f in file_list: f.file = UNITTESTDIR / "test_directories" / f.file return cr, crawled_data, debug_tree def test_ambiguous_lookup(clear_database, usemodel, crawler, ident): ins, ups = crawler[0].synchronize(crawled_data=crawler[1]) proj = db.execute_query("FIND Project WITH identifier='SpeedOfLight'", unique=True) with pytest.raises(RuntimeError, match=".*unambiguously.*"): print(crawler[0].identifiableAdapter.retrieve_identified_record_for_identifiable( Identifiable(properties={'project': proj.id}))) def test_single_insertion(clear_database, usemodel, crawler, ident): ins, ups = crawler[0].synchronize(crawled_data=crawler[1]) # This test also generates the file records.xml used in some of the unittesets: res = db.execute_query("FIND Record") for i in reversed(range(len(res))): if res[i].parents[0].name == "PyTestInfo": del res[i] # uncomment this to recreate the `records.xml` file # filename = UNITTESTDIR/ "records.xml" # with open(filename, "w") as f: # xml = res.to_xml() # # Remove noscript and transaction benchmark: # for tag in ("noscript", "TransactionBenchmark"): # if xml.find(tag) is not None: # xml.remove(xml.find(tag)) # f.write(db.common.utils.xml2str(xml)) assert len(ins) == 18 assert len(ups) == 0 # Do a second run on the same data, there should be no changes: crawler = Crawler(identifiableAdapter=ident) crawled_data = scan_directory(UNITTESTDIR / "test_directories" / "examples_article", UNITTESTDIR / "scifolder_cfood.yml") ins, ups = crawler.synchronize(crawled_data=crawled_data) assert len(ins) == 0 assert len(ups) == 0 def test_multiple_insertions(clear_database, usemodel, ident, crawler): ins, ups = crawler[0].synchronize(crawled_data=crawler[1]) # Do a second run on the same data, there should be no changes: cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory() ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 0 assert len(ups) == 0 def test_insertion(clear_database, usemodel, ident, crawler): ins, ups = crawler[0].synchronize(crawled_data=crawler[1]) # Do a second run on the same data, there should a new insert: cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory("example_insert") assert len(crawled_data) == 3 ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 1 assert len(ups) == 0 # Do it again to check whether nothing is changed: cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory("example_insert") assert len(crawled_data) == 3 ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 0 assert len(ups) == 0 def test_insert_auth(clear_database, usemodel, ident, crawler): ins, ups = crawler[0].synchronize(crawled_data=crawler[1]) # Do a second run on the same data, there should a new insert: cr = Crawler(identifiableAdapter=ident, securityMode=SecurityMode.RETRIEVE) crawled_data = crawl_standard_test_directory("example_insert") assert len(crawled_data) == 3 ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 1 assert not ins[0].is_valid() nins, nups = OldCrawler.update_authorized_changes(cr.run_id) assert nins == 1 # Do it again to check whether nothing is changed: cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory("example_insert") assert len(crawled_data) == 3 ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 0 assert len(ups) == 0 def test_insertion_and_update(clear_database, usemodel, ident, crawler): ins, ups = crawler[0].synchronize(crawled_data=crawler[1]) cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory("example_insert") ins, ups = cr.synchronize(crawled_data=crawled_data) cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory("example_overwrite_1") assert len(crawled_data) == 3 ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 0 assert len(ups) == 1 def test_identifiable_update(clear_database, usemodel, ident, crawler): ins, ups = crawler[0].synchronize(crawled_data=crawler[1]) # Do a second run on the same data with a change in one # of the identifiables: cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory() # Test the addition of a single property: l = crawled_data for record in l: if (record.parents[0].name == "Measurement" and record.get_property("date").value == "2020-01-03"): # maybe a bit weird, but add an email address to a measurement record.add_property( name="email", value="testperson@testaccount.test") print("one change") break ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 0 assert len(ups) == 1 # Test the change within one property: cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory() l = crawled_data for record in l: if (record.parents[0].name == "Measurement" and record.get_property("date").value == "2020-01-03"): record.add_property(name="email", value="testperson@coolmail.test") print("one change") break ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 0 assert len(ups) == 1 # Changing the date should result in a new insertion: cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory() l = crawled_data for record in l: if (record.parents[0].name == "Measurement" and record.get_property("date").value == "2020-01-03"): record.add_property(name="email", value="testperson@coolmail.test") record.get_property("date").value = "2012-01-02" print("one change") break ins, ups = cr.synchronize(crawled_data=crawled_data) assert len(ins) == 1 assert len(ups) == 0 def test_file_insertion_dry(clear_database, usemodel, ident): crawler_extended = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory( cfood="scifolder_extended.yml") file_list = [r for r in crawled_data if r.role == "File"] assert len(file_list) == 11 for f in file_list: assert f.path.endswith("README.md") assert f.path[1:] == f.file ins, ups = crawler_extended.synchronize(crawled_data=crawled_data, commit_changes=False) assert len(ups) == 0 file_list_ins = [r for r in ins if r.role == "File"] assert len(file_list_ins) == 11 def test_file_insertion(clear_database, usemodel, ident, crawler_extended): ins, ups = crawler_extended[0].synchronize( crawled_data=crawler_extended[1], commit_changes=True) file_list_ins = [r for r in ins if r.role == "File"] assert len(file_list_ins) == 11 assert db.execute_query("COUNT File") > 0 # find record which references File does not seem to be possible # retrieve ids of files: files = db.execute_query("FIND File") for f in files: r = db.execute_query("FIND Record which references {}".format(f.id)) assert len(r) == 1 assert r[0].get_property("ReadmeFile").value == f.id def test_file_update(clear_database, usemodel, ident, crawler_extended): ins1, ups1 = crawler_extended[0].synchronize( crawled_data=crawler_extended[1], commit_changes=True) file_list_ins = [r for r in ins1 if r.role == "File"] cr = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory(cfood="scifolder_extended.yml") file_list = [r for r in crawled_data if r.role == "File"] for f in file_list: f.file = UNITTESTDIR / "test_directories", f.file ins2, ups2 = cr.synchronize(crawled_data=crawled_data, commit_changes=True) assert len(ups1) == 0 assert len(ups2) == 0 # Try adding a parent: res = db.execute_query("Find File") assert len(res) == 11 assert len(res[0].parents) == 0 cr2 = Crawler(identifiableAdapter=ident) crawled_data = crawl_standard_test_directory(cfood="scifolder_extended2.yml") file_list = [r for r in crawled_data if r.role == "File"] for f in file_list: f.file = UNITTESTDIR / "test_directories", f.file ins3, ups3 = cr2.synchronize(crawled_data=crawled_data, commit_changes=True) assert len(ups3) == 11 res = db.execute_query("Find File") assert len(res) == 11 assert res[0].parents[0].name == "ProjectMarkdownReadme" # TODO: Implement file update checks (based on checksum) # Add test with actual file update: # assert len(ins2) == 0 # assert len(ups2) == len(file_list_ins)