-
Henrik tom Wörden authoredHenrik tom Wörden authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_realworld_example.py 8.69 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2022 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2022 Henrik tom Wörden <h.tomwoerden@indiscale.com>
# Copyright (C) 2022 Florian Spreckelsen <f.spreckelsen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""
an integration test module that runs a test against a (close to) real world example
"""
from caosdb.utils.register_tests import clear_database, set_test_key
import logging
import json
import os
import caosdb as db
from caoscrawler.crawl import Crawler, crawler_main
from caoscrawler.identifiable_adapters import CaosDBIdentifiableAdapter
from caoscrawler.structure_elements import Directory
import pytest
from caosadvancedtools.models.parser import parse_model_from_json_schema, parse_model_from_yaml
from caosadvancedtools.loadFiles import loadpath
from caoscrawler.scanner import load_definition, scan_structure_elements, create_converter_registry
import sys
set_test_key("10b128cf8a1372f30aa3697466bb55e76974e0c16a599bb44ace88f19c8f61e2")
def rfp(*pathcomponents):
"""
Return full path.
Shorthand convenience function.
"""
return os.path.join(os.path.dirname(__file__), *pathcomponents)
DATADIR = rfp("test_data", "extroot", "realworld_example")
@pytest.fixture
def addfiles():
loadpath(path='/opt/caosdb/mnt/extroot/',
include=None,
exclude=None,
prefix="",
dryrun=False,
forceAllowSymlinks=True,
)
@pytest.fixture
def usemodel():
# First load dataspace data model
dataspace_definitions = parse_model_from_json_schema(
os.path.join(DATADIR, "schema", "dataspace.schema.json"))
dataspace_definitions.sync_data_model(noquestion=True)
# Then general dataset definitions
dataset_definitions = parse_model_from_json_schema(
os.path.join(DATADIR, "schema", "dataset.schema.json"))
dataset_definitions.sync_data_model(noquestion=True)
# Finally, add inheritances as defined in yaml
dataset_inherits = parse_model_from_yaml(
os.path.join(DATADIR, "schema", "dataset-inheritance.yml"))
dataset_inherits.sync_data_model(noquestion=True)
@pytest.fixture
def clear_database():
# TODO(fspreck): Remove once the corresponding advancedtools function can
# be used.
ents = db.execute_query("FIND ENTITY WITH ID>99")
if ents:
ents.delete()
def create_identifiable_adapter():
ident = CaosDBIdentifiableAdapter()
ident.load_from_yaml_definition(os.path.join(DATADIR, "identifiables.yml"))
return ident
def test_dataset(clear_database, usemodel, addfiles, caplog):
caplog.set_level(logging.DEBUG, logger="caoscrawler")
identifiable_path = os.path.join(DATADIR, "identifiables.yml")
crawler_definition_path = os.path.join(DATADIR, "dataset_cfoods.yml")
crawler_main(
crawled_directory_path=os.path.join(DATADIR, 'data'),
cfood_file_name=crawler_definition_path,
identifiables_definition_file=identifiable_path,
provenance_file=os.path.join(DATADIR, "provenance.yml"),
dry_run=False,
remove_prefix=DATADIR,
# this test will fail without this prefix since the crawler would try to create new files
add_prefix="/extroot/realworld_example"
)
dataspace = db.execute_query("FIND RECORD Dataspace WITH name=35 AND dataspace_id=20002 AND "
"archived=FALSE AND url='https://datacloud.de/index.php/f/7679'"
" AND Person", unique=True)
assert dataspace.get_property("start_date").value == "2022-03-01"
db.execute_query("FIND RECORD Person with full_name='Max Schmitt' AND"
" given_name='Max'", unique=True)
dataset = db.execute_query(f"FIND RECORD Dataset with Dataspace={dataspace.id} AND title="
"'Random numbers created on a random autumn day in a random person\\'s office'"
"", unique=True)
assert db.execute_query(f"COUNT RECORD with id={dataset.id} AND WHICH REFERENCES Person WITH full_name="
"'Alexa Nozone' AND WHICH REFERENCES Person WITH full_name='Max Schmitt'"
"") == 1
assert db.execute_query(f"COUNT RECORD with id={dataset.id} AND WHICH REFERENCES Event WITH "
"start_datetime='2022-02-10T16:36:48+01:00'") == 1
assert db.execute_query(f"FIND Event WITH latitude=53", unique=True)
# test logging
assert "Executed inserts" in caplog.text
assert "Going to insert" in caplog.text
assert "Executed updates" in caplog.text
def test_event_update(clear_database, usemodel, addfiles):
identifiable_path = os.path.join(DATADIR, "identifiables.yml")
crawler_definition_path = os.path.join(DATADIR, "dataset_cfoods.yml")
crawler_main(
crawled_directory_path=os.path.join(DATADIR, 'data'),
cfood_file_name=crawler_definition_path,
identifiables_definition_file=identifiable_path,
provenance_file=os.path.join(DATADIR, "provenance.yml"),
dry_run=False,
remove_prefix=DATADIR,
# this test will fail without this prefix since the crawler would try to create new files
add_prefix="/extroot/realworld_example"
)
old_dataset_rec = db.execute_query(
"FIND RECORD Dataset WHICH HAS AN EVENT WITH location='Bremen, Germany'")
assert len(old_dataset_rec) == 1
old_dataset_rec = old_dataset_rec[0]
assert old_dataset_rec.get_property("Event").datatype == db.LIST("Event")
assert len(old_dataset_rec.get_property("Event").value) == 1
old_event_rec = db.Record(
id=old_dataset_rec.get_property("Event").value[0]).retrieve()
# TODO(fspreck): crawl again manually, edit the event records in the update
# list, synchronize, and test whether the events have been updated.
ident = CaosDBIdentifiableAdapter()
ident.load_from_yaml_definition(identifiable_path)
second_crawler = Crawler(identifiableAdapter=ident)
second_crawler.generate_run_id()
crawler_definition = load_definition(
crawler_definition_path)
converter_registry = create_converter_registry(crawler_definition)
records = scan_structure_elements(
Directory("data", os.path.join(DATADIR, "data")),
crawler_definition,
converter_registry
)
for rec in records:
if rec.parents[0].name == "Event":
rec.get_property("longitude").value = 0.0
rec.get_property("latitude").value = 0.0
rec.get_property("location").value = "Origin"
elif rec.parents[0].name == "Dataset":
rec.get_property("Event").value[0].get_property(
"longitude").value = 0.0
rec.get_property("Event").value[0].get_property(
"latitude").value = 0.0
rec.get_property("Event").value[0].get_property(
"location").value = "Origin"
second_crawler.synchronize(crawled_data=records)
# Dataset is still the same Record, but with an updated event
new_dataset_rec = db.Record(id=old_dataset_rec.id).retrieve()
for prop in old_dataset_rec.get_properties():
if not prop.name == "Event":
assert new_dataset_rec.get_property(
prop.name).datatype == prop.datatype
assert new_dataset_rec.get_property(
prop.name).value == prop.value
assert new_dataset_rec.get_property("Event").datatype == db.LIST("Event")
assert new_dataset_rec.get_property("Event").value is not None
assert len(new_dataset_rec.get_property("Event").value) == 1
assert new_dataset_rec.get_property("Event").value[0] != old_event_rec.id
# The event has new properties
new_event_rec = db.Record(
id=new_dataset_rec.get_property("Event").value[0]).retrieve()
assert new_event_rec.get_property("longitude").value == 0.0
assert new_event_rec.get_property("latitude").value == 0.0
assert new_event_rec.get_property("location").value == "Origin"
assert new_event_rec.get_property(
"start_datetime").value == old_event_rec.get_property("start_datetime").value