Skip to content
Snippets Groups Projects
Commit 4916e4e9 authored by Alexander Schlemmer's avatar Alexander Schlemmer
Browse files

Merge branch 'f-qualitycheck-trigger' into 'main'

F qualitycheck trigger

See merge request caosdb/customers/f-fit/ruqad!3
parents f3dd9f46 f7f85b58
No related branches found
No related tags found
1 merge request!3F qualitycheck trigger
Pipeline #58490 failed
Showing
with 766 additions and 16 deletions
# -*- mode:conf; -*-
# auto saves, caches
*~ *~
.coverage
__pycache__ __pycache__
.coverage
.tox .tox
*.egg-info
# development artifacts
venv/ venv/
build
/.env/ /.env/
# configurations
qualitycheck_config.toml
# build artifacts
*.egg-info
build
/src/doc/_apidoc/ /src/doc/_apidoc/
...@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ...@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### ### Added ###
- LinkAhead crawler for the metadata check.
- Triggers the quality checker.
### Changed ### ### Changed ###
### Deprecated ### ### Deprecated ###
......
File added
File added
# Copyright (C) 2024 IndiScale GmbH <info@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""
tests the crawling of ELN files
"""
import os
import zipfile
from datetime import datetime
from pathlib import Path
from tempfile import NamedTemporaryFile
from time import sleep
from uuid import uuid1
from ruqad.crawler import trigger_crawler
DATADIR = Path(__file__).parent / "data" / "crawler_data"
def test_crawl():
"""
crawl a directory as it would be created by export from kadi and running a data quality check
"""
print(os.listdir(DATADIR))
trigger_crawler(os.fspath(DATADIR))
klsdjf
Dataset:
obligatory_properties:
Author:
Repository:
dateModified:
datatype: DATETIME
dateCreated:
datatype: DATETIME
recommended_properties:
MetaData:
datatype: LIST<MetaData>
notes:
datatype: TEXT
rating:
datatype: INTEGER
voltage:
datatype: DOUBLE
unit: V
MetaData:
obligatory_properties:
v:
datatype: TEXT
Author:
obligatory_properties:
url:
datatype: TEXT
recommended_properties:
nr:
datatype: INTEGER
Repository:
obligatory_properties:
url:
ELNFile:
recommended_properties:
QualityReportFile:
QualityReportFile:
recommended_properties:
ELNFile:
#!/bin/bash
# Insert the data model used by the crawler
# A. Schlemmer, 02/2023
python3 -m caosadvancedtools.models.parser --noquestion --sync datamodel.yaml
[Connection]
ssl_insecure=true
url=https://localhost:10443/
username=admin
password_method=input
timeout=10000
debug=0
[Container]
debug=0
File added
# Playground for S3 services #
## Start local service ##
```sh
# Start S3 server, local storage in /data inside the container, no SSL.
docker run -p 9000:9000 -p 9001:9001 quay.io/minio/minio server /data --console-address ":9001"
```
The **default credentials** for the web interface are `minioadmin:minioadmin`, but this can be
changed via environment variables `MINIO_ROOT_USER` and `MINIO_ROOT_PASSWORD`.
## Create credentials ##
- Open http://localhost:9001/access-keys/new-account
- Set custom access key if desired, click "Create".
- Save credentials into `qualitycheck_secrets.toml` as `s3_access_key` and `s3_secret_key`.
## Create bucket ##
- Open http://localhost:9001/buckets/add-bucket
- Create bucket with custom name.
## Access with boto3 ##
```python
import boto3
# Verify connection
session = boto3.Session(aws_access_key_id="testaccount", aws_secret_access_key="SECRET")
client = session.client("s3", endpoint_url="http://localhost:9000") # no SSL for testing!
client.list_buckets()
# Add content to bucket
client.upload_file(file_name, "testbucket", "some_name")
client.list_objects_v2(Bucket="testbucket")["Contents"] # list contents
# Download object to filesystem
client.download_file("testbucket", "some_name", "./output_file")
# Delete objects in bucket
client.delete_object(Bucket="testbucket", Key="some name for later reference")
```
...@@ -24,8 +24,10 @@ classifiers = [ ...@@ -24,8 +24,10 @@ classifiers = [
requires-python = ">= 3.8" requires-python = ">= 3.8"
dependencies = [ dependencies = [
"linkahead", "linkahead",
"kadi-apy" "caoscrawler[rocrate]",
"kadi-apy",
"boto3>=1.35",
"toml>=0.10",
] ]
[project.urls] [project.urls]
...@@ -41,3 +43,6 @@ dev = [ ...@@ -41,3 +43,6 @@ dev = [
test = [ test = [
"pytest", "pytest",
] ]
[tool.setuptools.package-data]
ruqad = ["src/ruqad/resources/crawler-settings"]
s3_endpoint = "http://localhost:9000"
s3_bucket = "ruqad"
# Insert your secrets here, save as `secrets.sh`, then source the file with
# . secrets.sh
# Do not add this file to your version control!
## Kadi
# Host and token to retrieve data
export KADIHOST="https://demo-kadi4mat.iam.kit.edu"
export KADITOKEN="pat_KADI123456789"
## S3
# Key ID and secret to access the S3 bucket defined in `qualitycheck_config.toml`.
export S3_ACCESS_KEY_ID="456S3S3S3654"
export S3_SECRET_ACCESS_KEY="123S3S3S3987"
## Gitlab
# Tokens to trigger a pipeline run and to get pipeline status and result via the API.
export GITLAB_PIPELINE_TOKEN="glptt-123456789"
export GITLAB_API_TOKEN="glpat-987654321"
#!/usr/bin/env python3
# Small module wrapping the crawler run call
# A. Schlemmer, 11/2024
import os
from importlib import resources
from os import walk
from os.path import join
import linkahead as db
from caoscrawler.crawl import crawler_main
ruqad_crawler_settings = resources.files('ruqad').joinpath('resources/crawler-settings')
def trigger_crawler(target_dir: str):
"""
Trigger a standard crawler run equivalent to the command line:
```
caosdb-crawler -i crawler/identifiables.yaml -s update crawler/cfood.yaml <target_dir>
```
"""
# insert all .zip and .eln files, if they do not yet exist
for fp, ds, fs in walk(target_dir):
for fn in fs:
if fn.endswith(".eln") or fn.endswith(".zip"):
file_path = join(target_dir, fp, fn)
file_entity = join(fp[len(target_dir):], fn)
file_ent = db.File(file=file_path,
path=file_entity)
print(f"retrieve {join(fp, fn)}")
file_ent.retrieve()
if file_ent.id is None:
print(f"insert {join(fp, fn)}")
file_ent.insert()
else:
print(f"update {join(fp, fn)}")
file_ent.update()
print("crawl", target_dir)
crawler_main(crawled_directory_path=target_dir,
cfood_file_name=ruqad_crawler_settings.joinpath('cfood.yaml'),
identifiables_definition_file=ruqad_crawler_settings.joinpath(
'identifiables.yaml'),
remove_prefix="/"+os.path.basename(target_dir))
#!/usr/bin/env python3
# This file is a part of the RuQaD project.
#
# Copyright (C) 2024 IndiScale GmbH <www.indiscale.com>
# Copyright (C) 2024 Henrik tom Wörden <h.tomwoerden@indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Daemon like script which monitors the Kadi4Mat server for new items.
""" """
monitor the kadi instance
""" import traceback
import shutil
import os import os
import argparse
import sys
from time import sleep from time import sleep
from tempfile import NamedTemporaryFile from tempfile import TemporaryDirectory
import traceback
from datetime import datetime, timezone from datetime import datetime, timezone
from kadi_apy import KadiManager
from .kadi import collect_records_created_after, download_eln_for sys.path.append(os.path.dirname(__file__))
from .qualitycheck import QualityChecker # NOQA
from .kadi import collect_records_created_after, download_eln_for # NOQA
from .crawler import trigger_crawler # NOQA
from kadi_apy import KadiManager # NOQA
KADIARGS = { KADIARGS = {
...@@ -23,6 +51,7 @@ if __name__ == "__main__": ...@@ -23,6 +51,7 @@ if __name__ == "__main__":
try: try:
timestamp = datetime.now(timezone.utc) timestamp = datetime.now(timezone.utc)
with KadiManager(**KADIARGS) as manager: with KadiManager(**KADIARGS) as manager:
qc = QualityChecker()
print(f"Checking for records created after {cut_off_date}...") print(f"Checking for records created after {cut_off_date}...")
rec_ids = collect_records_created_after(manager, cut_off_date) rec_ids = collect_records_created_after(manager, cut_off_date)
cut_off_date = timestamp cut_off_date = timestamp
...@@ -33,11 +62,21 @@ if __name__ == "__main__": ...@@ -33,11 +62,21 @@ if __name__ == "__main__":
if len(rec_ids) == 0: if len(rec_ids) == 0:
print("no new recs") print("no new recs")
for rid in rec_ids: for rid in rec_ids:
temp = NamedTemporaryFile(delete=False) with TemporaryDirectory() as cdir:
temp.close() eln_file = os.path.join(cdir, "export.eln")
download_eln_for(manager, rid, path=temp.name) download_eln_for(manager, rid, path=eln_file)
print(temp.name) print(f"Downlaoded {eln_file}")
sleep(5) qc.check(filename=eln_file, target_dir=cdir)
print(f"Quality check done. {os.listdir(cdir)}")
# trigger crawler on dir
remote_dir_path = os.path.join(cdir, "ruqad", str(rid))
os.makedirs(remote_dir_path)
shutil.move(os.path.join(cdir, "artifacts.zip"),
os.path.join(remote_dir_path, "report.zip"))
shutil.move(os.path.join(cdir, "export.eln"),
os.path.join(remote_dir_path, "export.eln"))
trigger_crawler(target_dir=cdir)
sleep(60)
except KeyboardInterrupt as e: except KeyboardInterrupt as e:
raise e raise e
......
#!/usr/bin/env python3
# This file is a part of the RuQaD project.
#
# Copyright (C) 2024 IndiScale GmbH <www.indiscale.com>
# Copyright (C) 2024 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
"""
# TODO Use Gitlab Python module instead of curl. Or at least the requests module.
import argparse
import json
import os
import time
from pathlib import Path
from subprocess import run
from tempfile import TemporaryDirectory
from typing import Optional
from zipfile import ZipFile
import boto3
import toml
def read_config() -> dict:
"""Read config from ``./qualitycheck_config.toml``.
This config file must define the following:
- ``s3_endpoint``: S3 endpoint to connect to.
- ``s3_bucket``: Bucket in the S3 Service.
Returns
-------
out: dict
The config.
"""
config = toml.load("./qualitycheck_config.toml")
assert "s3_endpoint" in config
assert isinstance(config["s3_endpoint"], str)
assert "s3_bucket" in config
assert isinstance(config["s3_bucket"], str)
return config
class QualityChecker:
class CheckFailed(RuntimeError):
pass
def __init__(self):
"""The QualityChecker can do quality checks for content.
"""
self._config = read_config()
secret_vars = [
"S3_ACCESS_KEY_ID",
"S3_SECRET_ACCESS_KEY",
"GITLAB_PIPELINE_TOKEN",
"GITLAB_API_TOKEN",
]
missing = False
for varname in secret_vars:
try:
self._config[varname.lower()] = os.environ[varname]
except KeyError:
print(f"This environment variable is missing: {varname}")
missing = True
if missing:
raise RuntimeError("Missing environment variables.")
self._bucketname = self._config["s3_bucket"]
session = boto3.Session(aws_access_key_id=self._config["s3_access_key_id"],
aws_secret_access_key=self._config["s3_secret_access_key"])
self._s3_client = session.client("s3", endpoint_url=self._config["s3_endpoint"])
def check(self, filename: str, target_dir: str = ".") -> bool:
"""Check for data quality.
Parameters
----------
filename : str
The file to be checked.
target_dir : str, default="."
Download to this directory.
Returns
-------
out : bool
True if the checks passed, false otherwise.
"""
# Prepare check
self._upload(filename)
# Actual check
check_ok = True
try:
pipeline_id = self._trigger_check()
job_id = self._wait_for_check(pipeline_id=pipeline_id)
self._download_result(job_id=job_id, target_dir=target_dir)
except self.CheckFailed as cfe:
print("Check failed")
breakpoint()
check_ok = False
# Cleanup
self._cleanup()
return check_ok
def _cleanup(self):
"""Clean up the S3 bucket.
This deletes all the objects in the bucket.
"""
objects = self._s3_client.list_objects_v2(Bucket=self._bucketname).get("Contents")
if objects is None:
# nothing to delete
return
for obj in objects:
self._s3_client.delete_object(Bucket=self._bucketname, Key=obj["Key"])
def _extract_content(self, filename: str, upload: bool = False):
"""Extract content from the archive. May also upload to S3.
Parameters
----------
filename : str
upload : bool, default=False
"""
with TemporaryDirectory() as tmp:
if not tmp.endswith(os.path.sep):
tmp = tmp + os.path.sep
zipf = ZipFile(filename)
zipf.extractall(path=tmp) # TODO Zip bomb detection and prevention.
for name in zipf.namelist():
if name.endswith(".json"):
continue
if upload:
self._upload(os.path.join(tmp, name), remove_prefix=tmp)
def _upload(self, filename: str, remove_prefix: Optional[str] = None):
"""Upload the file to the S3 bucket.
Compressed files (with suffix .zip or .eln) will be extracted first.
Parameters
----------
filename : str
The file to be checked.
remove_prefix : Optional[str]
If given, remove this prefix from the filename when storing into the bucket.
"""
# Check file type first.
if Path(filename).suffix in [".eln", ".zip"]:
self._extract_content(filename, upload=True)
return
target_filename = filename
if remove_prefix:
if not filename.startswith(remove_prefix):
raise ValueError(f"{filename} was expected to start with {remove_prefix}")
target_filename = filename[len(remove_prefix):]
self._s3_client.upload_file(filename, self._bucketname,
os.path.join("data", target_filename))
def _trigger_check(self) -> str:
"""Trigger a new pipeline to start quality checks.
Returns
-------
out: str
The ID of the started pipeline.
"""
cmd = ["curl",
"-X", "POST",
"--fail",
"-F", f"token={self._config['gitlab_pipeline_token']}",
"-F", "ref=ruqad",
"https://gitlab.indiscale.com/api/v4/projects/268/trigger/pipeline"
]
cmd_result = run(cmd, check=False, capture_output=True)
result = json.loads(cmd_result.stdout)
return str(result["id"])
def _wait_for_check(self, pipeline_id: str) -> str:
"""Wait for the pipeline to finish successfully.
Parameters
----------
pipeline_id : str
The pipeline ID to watch.
Returns
-------
out : str
The ID of the "report" job. FIXME: or "pages"?
"""
# Wait for pipeline to finish.
cmd = [
"curl",
"--header", f"PRIVATE-TOKEN: {self._config['gitlab_api_token']}",
f"https://gitlab.indiscale.com/api/v4/projects/268/pipelines/{pipeline_id}"
]
while True:
cmd_result = run(cmd, check=True, capture_output=True)
result = json.loads(cmd_result.stdout)
if result["finished_at"] is not None:
break
time.sleep(1)
if not result["status"] == "success":
print("Pipeline terminated unsuccessfully.")
raise self.CheckFailed(result)
# Get jobs.
# We expect that these jobs are run runby the pipeline:
# - evaluate: run the quality check
# - report: build the report
# - pages: publish the report (not relevant for us)
cmd = [
"curl",
"--header", f"PRIVATE-TOKEN: {self._config['gitlab_api_token']}",
f"https://gitlab.indiscale.com/api/v4/projects/268/pipelines/{pipeline_id}/jobs"
]
cmd_result = run(cmd, check=False, capture_output=True)
result = json.loads(cmd_result.stdout)
evaluate_job = [job for job in result if job["name"] == "evaluate"][0]
if not evaluate_job["status"] == "success":
raise self.CheckFailed()
report_job = [job for job in result if job["name"] == "report"][0]
return report_job["id"]
def _download_result(self, job_id: str, target_dir: str = "."):
"""Download the artifacts from the pipeline.
Parameters
----------
job_id : str
The ID of the job with the artifacts.
target_dir : str, default="."
Download to this directory.
"""
target = os.path.join(target_dir, "artifacts.zip")
cmd = [
"curl",
"--location", "--fail",
"--output", target,
"--header", f"PRIVATE-TOKEN: {self._config['gitlab_api_token']}",
f"https://gitlab.indiscale.com/api/v4/projects/268/jobs/{job_id}/artifacts"
]
cmd_result = run(cmd, check=False, capture_output=True)
assert cmd_result.returncode == 0
print(f"Downloaded archive to: {target}")
def _parse_arguments():
"""Parse the arguments."""
parser = argparse.ArgumentParser(description='Trigger quality checks for the given content')
parser.add_argument('-f', '--file', required=True,
help=("Check the quality for this file."),
)
# FIXME needs both file and schema.
return parser.parse_args()
def main():
"""The main function of this script."""
args = _parse_arguments()
qc = QualityChecker()
qc.check(filename=args.file)
if __name__ == "__main__":
main()
---
metadata:
crawler-version: 0.9.2
macros:
---
Converters:
ELNFile:
converter: ELNFileConverter
package: caoscrawler.converters
ROCrateEntity:
converter: ROCrateEntityConverter
package: caoscrawler.converters
DataDir:
type: Directory
match: .*
subtree:
DataDir:
type: Directory
match: ^ruqad$
subtree:
DataDir:
type: Directory
match: ^[0-9]+$
subtree:
QualityReportFile:
type: SimpleFile
match: ^report\.zip$
transform:
elnfilename:
in: $QualityReportFile
out: $ELNFile
functions:
- replace:
remove: report.zip
insert: export.eln
records:
ELNFileElement:
parents:
- ELNFile
role: File
file: $ELNFile
path: $ELNFile
QualityReportFileElement:
parents:
- QualityReportFile
role: File
file: $QualityReportFile
path: $QualityReportFile
ELNFile: $ELNFileElement
ELNFile:
type: ELNFile
transform:
qualityfilename:
in: $ELNFile
out: $QualityReportFile
functions:
- replace:
insert: report.zip
remove: export.eln
match: ^.*\.eln$
records:
QualityReportFileElement:
parents:
- QualityReportFile
role: File
file: $QualityReportFile
path: $QualityReportFile
ELNFileElement:
parents:
- ELNFile
role: File
file: $ELNFile
path: $ELNFile
QualityReportFile: $QualityReportFileElement
subtree:
AuthorDataset:
type: ROCrateEntity
match_entity_type: Person
match_properties:
name: (?P<name>.*)$
"@id": (?P<url>.*)$
records:
Author:
url: $url
name: $name
RootDataset:
type: ROCrateEntity
# match_entity_type: Dataset
match_properties:
"@id": \./$
subtree:
Dataset:
type: ROCrateEntity
match_entity_type: Dataset
match_properties:
"@id": .*/$
name: (?P<name>.*)
dateCreated: (?P<dateCreated>.*)$
dateModified: (?P<dateModified>.*)$
records:
Dataset:
name: $name
description: $description
dateModified: $dateModified
dateCreated: $dateCreated
ELNFile: $ELNFileElement
QualityReportFile: $QualityReportFileElement
subtree:
Description:
type: DictElement
match_name: description
subtree:
DescriptionString:
type: TextElement
match_name: text
match_value: (?P<text>.*)$
records:
Dataset:
description: $text
VariableMeasured:
type: ListElement
match_name: variableMeasured
subtree:
MetaData:
type: DictElement
match_properties:
propertyID: (?P<propid>.*)$
value: (?P<propvalue>.*)$
records:
Dataset:
$propid: $propvalue
# MetaData:
# type: DictElement
# records:
# MetaData:
# Dataset:
# MetaData: +$MetaData
# subtree:
# PropertyID:
# type: TextElement
# match_name: propertyID
# match_value: (?P<propid>.*)$
# records:
# MetaData:
# name: $propid
# PropertyValue:
# type: TextElement
# match_name: value
# match_value: (?P<propvalue>.*)$
# records:
# MetaData:
# v: $propvalue
Author:
# breakpoint: true
type: DictElement
match_name: author
match_value: .*
subtree:
AuthorID:
match_name: "@id"
type: TextElement
match_value: ^(?P<url>(?P<repo>https://.*?)(/users/)(?P<unr>[0-9]+))$
records:
Author:
nr: $unr
url: $url
Repository:
url: $repo
Dataset:
Author: $Author
Repository: $Repository
Dataset:
- name
MetaData:
- name
- v
Author:
- url
Repository:
- url
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment