#!/usr/bin/env python3 # encoding: utf-8 # # This file is a part of the CaosDB Project. # # Copyright (C) 2022 Indiscale GmbH <info@indiscale.com> # Copyright (C) 2022 Henrik tom Wörden <h.tomwoerden@indiscale.com> # Copyright (C) 2022 Florian Spreckelsen <f.spreckelsen@indiscale.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # """ an integration test module that runs a test against a (close to) real world example """ from caosdb.utils.register_tests import clear_database, set_test_key import logging import json import os import caosdb as db from caoscrawler.crawl import Crawler, crawler_main from caoscrawler.identifiable_adapters import CaosDBIdentifiableAdapter from caoscrawler.structure_elements import Directory import pytest from caosadvancedtools.models.parser import parse_model_from_json_schema, parse_model_from_yaml from caosadvancedtools.loadFiles import loadpath from caoscrawler.scanner import load_definition, scan_structure_elements, create_converter_registry import sys set_test_key("10b128cf8a1372f30aa3697466bb55e76974e0c16a599bb44ace88f19c8f61e2") def rfp(*pathcomponents): """ Return full path. Shorthand convenience function. """ return os.path.join(os.path.dirname(__file__), *pathcomponents) DATADIR = rfp("test_data", "extroot", "realworld_example") @pytest.fixture def addfiles(): loadpath(path='/opt/caosdb/mnt/extroot/', include=None, exclude=None, prefix="", dryrun=False, forceAllowSymlinks=True, ) @pytest.fixture def usemodel(): # First load dataspace data model dataspace_definitions = parse_model_from_json_schema( os.path.join(DATADIR, "schema", "dataspace.schema.json")) dataspace_definitions.sync_data_model(noquestion=True) # Then general dataset definitions dataset_definitions = parse_model_from_json_schema( os.path.join(DATADIR, "schema", "dataset.schema.json")) dataset_definitions.sync_data_model(noquestion=True) # Finally, add inheritances as defined in yaml dataset_inherits = parse_model_from_yaml( os.path.join(DATADIR, "schema", "dataset-inheritance.yml")) dataset_inherits.sync_data_model(noquestion=True) @pytest.fixture def clear_database(): # TODO(fspreck): Remove once the corresponding advancedtools function can # be used. ents = db.execute_query("FIND ENTITY WITH ID>99") if ents: ents.delete() def create_identifiable_adapter(): ident = CaosDBIdentifiableAdapter() ident.load_from_yaml_definition(os.path.join(DATADIR, "identifiables.yml")) return ident def test_dataset(clear_database, usemodel, addfiles, caplog): caplog.set_level(logging.DEBUG, logger="caoscrawler") identifiable_path = os.path.join(DATADIR, "identifiables.yml") crawler_definition_path = os.path.join(DATADIR, "dataset_cfoods.yml") crawler_main( crawled_directory_path=os.path.join(DATADIR, 'data'), cfood_file_name=crawler_definition_path, identifiables_definition_file=identifiable_path, provenance_file=os.path.join(DATADIR, "provenance.yml"), dry_run=False, remove_prefix=DATADIR, # this test will fail without this prefix since the crawler would try to create new files add_prefix="/extroot/realworld_example" ) dataspace = db.execute_query("FIND RECORD Dataspace WITH name=35 AND dataspace_id=20002 AND " "archived=FALSE AND url='https://datacloud.de/index.php/f/7679'" " AND Person", unique=True) assert dataspace.get_property("start_date").value == "2022-03-01" db.execute_query("FIND RECORD Person with full_name='Max Schmitt' AND" " given_name='Max'", unique=True) dataset = db.execute_query(f"FIND RECORD Dataset with Dataspace={dataspace.id} AND title=" "'Random numbers created on a random autumn day in a random person\\'s office'" "", unique=True) assert db.execute_query(f"COUNT RECORD with id={dataset.id} AND WHICH REFERENCES Person WITH full_name=" "'Alexa Nozone' AND WHICH REFERENCES Person WITH full_name='Max Schmitt'" "") == 1 assert db.execute_query(f"COUNT RECORD with id={dataset.id} AND WHICH REFERENCES Event WITH " "start_datetime='2022-02-10T16:36:48+01:00'") == 1 assert db.execute_query(f"FIND Event WITH latitude=53", unique=True) # test logging assert "Executed inserts" in caplog.text assert "Going to insert" in caplog.text assert "Executed updates" in caplog.text def test_event_update(clear_database, usemodel, addfiles): identifiable_path = os.path.join(DATADIR, "identifiables.yml") crawler_definition_path = os.path.join(DATADIR, "dataset_cfoods.yml") crawler_main( crawled_directory_path=os.path.join(DATADIR, 'data'), cfood_file_name=crawler_definition_path, identifiables_definition_file=identifiable_path, provenance_file=os.path.join(DATADIR, "provenance.yml"), dry_run=False, remove_prefix=DATADIR, # this test will fail without this prefix since the crawler would try to create new files add_prefix="/extroot/realworld_example" ) old_dataset_rec = db.execute_query( "FIND RECORD Dataset WHICH HAS AN EVENT WITH location='Bremen, Germany'") assert len(old_dataset_rec) == 1 old_dataset_rec = old_dataset_rec[0] assert old_dataset_rec.get_property("Event").datatype == db.LIST("Event") assert len(old_dataset_rec.get_property("Event").value) == 1 old_event_rec = db.Record( id=old_dataset_rec.get_property("Event").value[0]).retrieve() # TODO(fspreck): crawl again manually, edit the event records in the update # list, synchronize, and test whether the events have been updated. ident = CaosDBIdentifiableAdapter() ident.load_from_yaml_definition(identifiable_path) second_crawler = Crawler(identifiableAdapter=ident) second_crawler.generate_run_id() crawler_definition = load_definition( crawler_definition_path) converter_registry = create_converter_registry(crawler_definition) records = scan_structure_elements( Directory("data", os.path.join(DATADIR, "data")), crawler_definition, converter_registry ) for rec in records: if rec.parents[0].name == "Event": rec.get_property("longitude").value = 0.0 rec.get_property("latitude").value = 0.0 rec.get_property("location").value = "Origin" elif rec.parents[0].name == "Dataset": rec.get_property("Event").value[0].get_property( "longitude").value = 0.0 rec.get_property("Event").value[0].get_property( "latitude").value = 0.0 rec.get_property("Event").value[0].get_property( "location").value = "Origin" second_crawler.synchronize(crawled_data=records) # Dataset is still the same Record, but with an updated event new_dataset_rec = db.Record(id=old_dataset_rec.id).retrieve() for prop in old_dataset_rec.get_properties(): if not prop.name == "Event": assert new_dataset_rec.get_property( prop.name).datatype == prop.datatype assert new_dataset_rec.get_property( prop.name).value == prop.value assert new_dataset_rec.get_property("Event").datatype == db.LIST("Event") assert new_dataset_rec.get_property("Event").value is not None assert len(new_dataset_rec.get_property("Event").value) == 1 assert new_dataset_rec.get_property("Event").value[0] != old_event_rec.id # The event has new properties new_event_rec = db.Record( id=new_dataset_rec.get_property("Event").value[0]).retrieve() assert new_event_rec.get_property("longitude").value == 0.0 assert new_event_rec.get_property("latitude").value == 0.0 assert new_event_rec.get_property("location").value == "Origin" assert new_event_rec.get_property( "start_datetime").value == old_event_rec.get_property("start_datetime").value