#!/usr/bin/env python3 # encoding: utf-8 # # This file is a part of the LinkAhead Project. # # Copyright (C) 2024 GEOMAR # Copyright (C) 2022 Jakob Eckstein # Copyright (C) 2024 Indiscale GmbH <info@indiscale.com> # Copyright (C) 2023 Henrik tom Wörden <h.tomwoerden@indiscale.com> # Copyright (C) 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # import json import logging import os import pandas as pd import subprocess import sys from pathlib import Path from tempfile import NamedTemporaryFile import linkahead as db from caosadvancedtools.datainconsistency import DataInconsistencyError from caosadvancedtools.serverside import helper from caosadvancedtools.table_importer import CSVImporter from caoscrawler.logging import configure_server_side_logging from bis_utils import (replace_entity_urls_by_ids, whitespace_cleanup_in_df) from sample_helpers.sample_upload_column_definitions import ( COLUMN_CONVERTER, DATATYPE_DEFINITIONS, OBLIGATORY_COLUMNS, OBLIGATORY_COLUMNS_CHILD, SPECIAL_TREATMENT_SAMPLE) from sample_helpers.utils import CONSTANTS, get_column_header_name # suppress warning of diff function apilogger = logging.getLogger("linkahead.apiutils") apilogger.setLevel(logging.ERROR) ERROR_PREFIX = CONSTANTS["error_prefix"] ERROR_SUFFIX = CONSTANTS["error_suffix"] # Column datatypes logger = logging.getLogger("caosadvancedtools") def get_parser(): par = helper.get_argument_parser() return par def _is_child_sample_table(filename): tmp_data = pd.read_csv(filename, sep=',') if 'Parent BIS ID' in tmp_data.columns: return not tmp_data[get_column_header_name("Parent LinkAhead ID")].isnull().all() return False def read_data_from_file(filename): if _is_child_sample_table(filename): oblig = OBLIGATORY_COLUMNS_CHILD else: oblig = OBLIGATORY_COLUMNS table_importer = CSVImporter( converters=COLUMN_CONVERTER, obligatory_columns=oblig, unique_keys=None, datatypes=DATATYPE_DEFINITIONS, existing_columns=oblig, ) try: df = table_importer.read_file(filename, sep=",", comment="#") except TypeError as te: logger.error( f"There was a wrong datatype detected in your CSV: \n{te}\n" "Please verify that all entries have the correct type, e.g., no floating " "point numbers in integer columns or text in numeric columns." ) raise DataInconsistencyError("There was a problem with the CSV upload.") # strip leading and trailing whitespaces return whitespace_cleanup_in_df(df) def _get_converter_from_property_datatype(dt): if dt == db.TEXT: return str elif dt == db.REFERENCE: return int elif dt == db.DOUBLE: return float elif dt == db.INTEGER: return int elif dt == db.FILE: return int elif dt == db.DATETIME: raise NotImplementedError() elif dt == db.BOOLEAN: return bool else: raise ValueError(f"Property has unknown datatype {dt}") def main(): parser = get_parser() args = parser.parse_args() # Check whether executed locally or as an SSS depending on # auth_token argument. if hasattr(args, "auth_token") and args.auth_token: db.configure_connection(auth_token=args.auth_token) userlog_public, htmluserlog_public, debuglog_public = configure_server_side_logging() else: rootlogger = logging.getLogger() rootlogger.setLevel(logging.INFO) logger.setLevel(logging.DEBUG) handler = logging.StreamHandler(stream=sys.stdout) handler.setLevel(logging.DEBUG) rootlogger.addHandler(handler) userlog_public = "/tmp/upload_sample_userlog.log" htmluserlog_public = "/tmp/upload_sample_userlog.html" debuglog_public = "/tmp/upload_sample_debuglog.html" if hasattr(args, "filename") and args.filename: if hasattr(args, "auth_token") and args.auth_token: upload_dir = os.path.dirname((args.filename)) # Read the input from the form (form.json) with open(args.filename) as form_json: form_data = json.load(form_json) # Read content of th uplaoded file path = os.path.join(upload_dir, form_data["sample_template_file"]) else: path = args.filename # Extend the converter lists by those properties that are unknown to this script but are # Properties of Sample sample = db.execute_query("FIND RECORDTYPE Sample", unique=True) for eprop in sample.properties: property_name = eprop.name if property_name in SPECIAL_TREATMENT_SAMPLE: continue if property_name not in DATATYPE_DEFINITIONS: if db.apiutils.is_reference(eprop): rt = db.get_entity_by_id(eprop.id) if len(rt.properties) == 1: converter = _get_converter_from_property_datatype(rt.properties[0].datatype) elif len(rt.properties) < 1: converter = str else: converter = None else: converter = _get_converter_from_property_datatype(eprop.datatype) if converter is None: continue DATATYPE_DEFINITIONS[property_name] = converter if sample.get_importance(property_name) == db.OBLIGATORY: # This is only needed if the sample is not a child sample OBLIGATORY_COLUMNS.append(property_name) try: data = read_data_from_file(path) data = replace_entity_urls_by_ids(data, [get_column_header_name(name) for name in [ "entity_id", "Storage ID", "Parent LinkAhead ID"]]) pickle_out = NamedTemporaryFile(delete=False, suffix=".pkl") data.to_pickle(pickle_out.name) except DataInconsistencyError as err: # DataInconsistencyError is logged elsewhere logger.error(f"Sample upload failed.\n{err}") return 1 parent_path = Path(__file__).parent db.Info() # call db.Info to refresh the auth_token, just to be sure. conn = db.get_connection() auth_token_callee = conn._authenticator.auth_token cmds = [ str(parent_path / "crawl_sample_data_async.py"), "--auth-token", auth_token_callee, args.filename, pickle_out.name, Path(path).name ] myenv = os.environ.copy() myenv["HOME"] = str(parent_path.parent / "home") # For a few samples, we can run the upload directly and give # user output. More samples will be run asynchronously. if data.shape[0] < 20: p = subprocess.Popen(cmds, start_new_session=False, env=myenv, cwd=parent_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.returncode == 0: # We know that the stdout will be formatted by the # serverside logging helper, so we don't wrap it but # just print it. print(stdout.decode()) else: logger.error(stderr.decode()) print(stdout.decode()) else: logger.info( "Starting sample upload in the background. This may take a while. " "You will be notified by email when it has finished." ) p = subprocess.Popen(cmds, start_new_session=True, env=myenv, cwd=parent_path) else: msg = "{}upload_sample_template.py was called without the JSON file in args.{}".format( ERROR_PREFIX, ERROR_SUFFIX) logger.error(msg) if __name__ == "__main__": main()