-
Florian Spreckelsen authoredFlorian Spreckelsen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
upload_sample_template.py 8.53 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 GEOMAR
# Copyright (C) 2022 Jakob Eckstein
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2023 Henrik tom Wörden <h.tomwoerden@indiscale.com>
# Copyright (C) 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import json
import logging
import os
import pandas as pd
import subprocess
import sys
from pathlib import Path
from tempfile import NamedTemporaryFile
import linkahead as db
from caosadvancedtools.datainconsistency import DataInconsistencyError
from caosadvancedtools.serverside import helper
from caosadvancedtools.table_importer import CSVImporter
from caoscrawler.logging import configure_server_side_logging
from bis_utils import (replace_entity_urls_by_ids,
whitespace_cleanup_in_df)
from sample_helpers.sample_upload_column_definitions import (
COLUMN_CONVERTER, DATATYPE_DEFINITIONS,
OBLIGATORY_COLUMNS, OBLIGATORY_COLUMNS_CHILD, SPECIAL_TREATMENT_SAMPLE)
from sample_helpers.utils import CONSTANTS, get_column_header_name
# suppress warning of diff function
apilogger = logging.getLogger("linkahead.apiutils")
apilogger.setLevel(logging.ERROR)
ERROR_PREFIX = CONSTANTS["error_prefix"]
ERROR_SUFFIX = CONSTANTS["error_suffix"]
# Column datatypes
logger = logging.getLogger("caosadvancedtools")
def get_parser():
par = helper.get_argument_parser()
return par
def _is_child_sample_table(filename):
tmp_data = pd.read_csv(filename, sep=',')
if 'Parent BIS ID' in tmp_data.columns:
return not tmp_data[get_column_header_name("Parent LinkAhead ID")].isnull().all()
return False
def read_data_from_file(filename):
if _is_child_sample_table(filename):
oblig = OBLIGATORY_COLUMNS_CHILD
else:
oblig = OBLIGATORY_COLUMNS
table_importer = CSVImporter(
converters=COLUMN_CONVERTER,
obligatory_columns=oblig,
unique_keys=None,
datatypes=DATATYPE_DEFINITIONS,
existing_columns=oblig,
)
try:
df = table_importer.read_file(filename, sep=",", comment="#")
except TypeError as te:
logger.error(
f"There was a wrong datatype detected in your CSV: \n{te}\n"
"Please verify that all entries have the correct type, e.g., no floating "
"point numbers in integer columns or text in numeric columns."
)
raise DataInconsistencyError("There was a problem with the CSV upload.")
# strip leading and trailing whitespaces
return whitespace_cleanup_in_df(df)
def _get_converter_from_property_datatype(dt):
if dt == db.TEXT:
return str
elif dt == db.REFERENCE:
return int
elif dt == db.DOUBLE:
return float
elif dt == db.INTEGER:
return int
elif dt == db.FILE:
return int
elif dt == db.DATETIME:
raise NotImplementedError()
elif dt == db.BOOLEAN:
return bool
else:
raise ValueError(f"Property has unknown datatype {dt}")
def main():
parser = get_parser()
args = parser.parse_args()
# Check whether executed locally or as an SSS depending on
# auth_token argument.
if hasattr(args, "auth_token") and args.auth_token:
db.configure_connection(auth_token=args.auth_token)
userlog_public, htmluserlog_public, debuglog_public = configure_server_side_logging()
else:
rootlogger = logging.getLogger()
rootlogger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(logging.DEBUG)
rootlogger.addHandler(handler)
userlog_public = "/tmp/upload_sample_userlog.log"
htmluserlog_public = "/tmp/upload_sample_userlog.html"
debuglog_public = "/tmp/upload_sample_debuglog.html"
if hasattr(args, "filename") and args.filename:
if hasattr(args, "auth_token") and args.auth_token:
upload_dir = os.path.dirname((args.filename))
# Read the input from the form (form.json)
with open(args.filename) as form_json:
form_data = json.load(form_json)
# Read content of th uplaoded file
path = os.path.join(upload_dir, form_data["sample_template_file"])
else:
path = args.filename
# Extend the converter lists by those properties that are unknown to this script but are
# Properties of Sample
sample = db.execute_query("FIND RECORDTYPE Sample", unique=True)
for eprop in sample.properties:
property_name = eprop.name
if property_name in SPECIAL_TREATMENT_SAMPLE:
continue
if property_name not in DATATYPE_DEFINITIONS:
if db.apiutils.is_reference(eprop):
rt = db.get_entity_by_id(eprop.id)
if len(rt.properties) == 1:
converter = _get_converter_from_property_datatype(rt.properties[0].datatype)
elif len(rt.properties) < 1:
converter = str
else:
converter = None
else:
converter = _get_converter_from_property_datatype(eprop.datatype)
if converter is None:
continue
DATATYPE_DEFINITIONS[property_name] = converter
if sample.get_importance(property_name) == db.OBLIGATORY:
# This is only needed if the sample is not a child sample
OBLIGATORY_COLUMNS.append(property_name)
try:
data = read_data_from_file(path)
data = replace_entity_urls_by_ids(data, [get_column_header_name(name) for name in [
"entity_id", "Storage ID", "Parent LinkAhead ID"]])
pickle_out = NamedTemporaryFile(delete=False, suffix=".pkl")
data.to_pickle(pickle_out.name)
except DataInconsistencyError as err:
# DataInconsistencyError is logged elsewhere
logger.error(f"Sample upload failed.\n{err}")
return 1
parent_path = Path(__file__).parent
db.Info() # call db.Info to refresh the auth_token, just to be sure.
conn = db.get_connection()
auth_token_callee = conn._authenticator.auth_token
cmds = [
str(parent_path / "crawl_sample_data_async.py"),
"--auth-token",
auth_token_callee,
args.filename,
pickle_out.name,
Path(path).name
]
myenv = os.environ.copy()
myenv["HOME"] = str(parent_path.parent / "home")
# For a few samples, we can run the upload directly and give
# user output. More samples will be run asynchronously.
if data.shape[0] < 20:
p = subprocess.Popen(cmds, start_new_session=False, env=myenv,
cwd=parent_path, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode == 0:
# We know that the stdout will be formatted by the
# serverside logging helper, so we don't wrap it but
# just print it.
print(stdout.decode())
else:
logger.error(stderr.decode())
print(stdout.decode())
else:
logger.info(
"Starting sample upload in the background. This may take a while. "
"You will be notified by email when it has finished."
)
p = subprocess.Popen(cmds, start_new_session=True, env=myenv,
cwd=parent_path)
else:
msg = "{}upload_sample_template.py was called without the JSON file in args.{}".format(
ERROR_PREFIX, ERROR_SUFFIX)
logger.error(msg)
if __name__ == "__main__":
main()