#!/usr/bin/env python3 # encoding: utf-8 # # This file is a part of the LinkAhead Project. # # Copyright (C) 2022 - 2024 GEOMAR # Copyright (C) 2022 Jakob Eckstein # Copyright (C) 2023 - 2024 Indiscale GmbH <info@indiscale.com> # Copyright (C) 2023 - 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # import json import logging import os import linkahead as db import pandas as pd from caosadvancedtools.serverside import helper from caoscrawler import Crawler, SecurityMode from caoscrawler.crawl import _notify_about_inserts_and_updates from caoscrawler.logging import configure_server_side_logging from linkahead.cached import cached_query, cached_get_entity_by from bis_utils import (get_do_not_insert_type_names, replace_entity_urls_by_ids, whitespace_cleanup_in_df) from sample_helpers.container_update_get_parent import get_container_by_identifier from sample_helpers.container_update_post_processing import (post_process_containers_before_sync, post_process_parent_containers_before_sync, post_process_inserts_and_updates) from sample_helpers.utils import (CONSTANTS, get_column_header_name, get_entity_name) # suppress warning of diff function apilogger = logging.getLogger("linkahead.apiutils") apilogger.setLevel(logging.ERROR) ERROR_PREFIX = CONSTANTS["error_prefix"] ERROR_SUFFIX = CONSTANTS["error_suffix"] logger = logging.getLogger("caosadvancedtools") def _value_in_row(key, row): if not key in row: return False if pd.isnull(row[key]) or row[key] is None or f"{row[key]}" == "": return False return True def get_parser(): par = helper.get_argument_parser() return par def main(): userlog_public, htmluserlog_public, debuglog_public = configure_server_side_logging() logger = logging.getLogger("caosadvancedtools") parser = get_parser() args = parser.parse_args() # Check whether executed locally or as an SSS depending on # auth_token argument. if hasattr(args, "auth_token") and args.auth_token: db.configure_connection(auth_token=args.auth_token) if hasattr(args, "filename") and args.filename: upload_dir = os.path.dirname((args.filename)) # Read the input from the form (form.json) with open(args.filename) as form_json: form_data = json.load(form_json) # Read content of th uplaoded file path = os.path.join(upload_dir, form_data["container_metadata_file"]) data = whitespace_cleanup_in_df(pd.read_csv(path, comment='#')) else: raise ValueError("This script was called without the mandatory form data json file.") data = replace_entity_urls_by_ids(data) # Get referenced container entities child_containers = db.Container() parent_containers = db.Container() id_column_name = get_column_header_name("entity_id") parent_column_name = get_column_header_name("Parent container") for index, row in data.iterrows(): if not _value_in_row(id_column_name, row): logger.error(f"{id_column_name} is missing in row {index+1}. Nothing was updated.") return 1 try: child = db.Record(id=int(row[id_column_name])) except ValueError: logger.error( f"Invalid {id_column_name} {row[id_column_name]} in row {index + 1}. Nothing was updated.") return 1 child.add_parent(get_entity_name("container_rt")) child_containers.append(child) if _value_in_row(parent_column_name, row): parent_identifier = row[parent_column_name] parent = get_container_by_identifier(parent_identifier) if len(parent) == 0: logger.error( f"Couldn't find parent with identifier '{parent_identifier}' " f"in row {index+1}." ) return 1 elif len(parent) > 1: logger.error( f"Parent with identifier '{parent_identifier}' in row " f"{index+1} was not unique. Please specify with " f"{id_column_name} instead.") return 1 parent = parent[0] try: parent_containers.get_entity_by_id(parent.id) except KeyError: parent_containers.append(parent) if not child_containers: # Nothing to update logger.error("There are no containers to be updated") return 1 # Get IDs of proerperties child_container_prop = cached_get_entity_by( query=f"FIND Property WITH name = '{get_entity_name('child_container_prop')}'" ) custom_label_prop = cached_get_entity_by( query=f"FIND Property WITH name = '{get_entity_name('custom_label_prop')}'" ) pdf_rt = cached_get_entity_by( query=f"FIND RECORDTYPE WITH name='{get_entity_name('PDFReport')}'" ) # Update (/create) container entities for index, row in data.iterrows(): # Add child to parent parent = None if _value_in_row(parent_column_name, row): parent_identifier = row[parent_column_name] # This has already been checked above for uniqueness candidate = get_container_by_identifier(parent_identifier)[0] # A bit redundant, but we need the exact Python object here that is in the parent_containers list. parent = parent_containers.get_entity_by_id(candidate.id) if parent.get_property(child_container_prop.id) is None: parent.add_property(id=child_container_prop.id, name=child_container_prop.name, value=[int(row[id_column_name])]) else: if parent.get_property(child_container_prop.id).value is None: parent.get_property(child_container_prop.id).value = [int(row[id_column_name])] else: if int(row[id_column_name]) not in parent.get_property(child_container_prop.id).value: parent.get_property(child_container_prop.id).value.append( int(row[id_column_name])) # remove the current child from all other parents (don't # do anything if the parent didn't change) old_parents = cached_query( f"FIND {get_entity_name('container_rt')} WHICH REFERENCES " f"{int(row[id_column_name])}" ) for old_parent in old_parents: if parent is not None and old_parent.id == parent.id: # old parent also is new parent continue try: # Has already been registered for updates old_parent = parent_containers.get_entity_by_id(old_parent.id) except KeyError: parent_containers.append(old_parent) old_parent.remove_value_from_property(child_container_prop.name, int( row[id_column_name]), remove_if_empty_afterwards=False) if old_parent.get_property(child_container_prop.name).value is None: old_parent.get_property(child_container_prop.name).value = [] # Add custom label o child child = child_containers.get_entity_by_id(id=int(row[id_column_name])) if _value_in_row(get_column_header_name("custom_label_prop"), row): child.name = row[get_column_header_name("custom_label_prop")] if child.get_property(custom_label_prop.id) is None: child.add_property(id=custom_label_prop.id, name=custom_label_prop.name, value=row[get_column_header_name("custom_label_prop")]) else: child.get_property(custom_label_prop.id).value = row[get_column_header_name("custom_label_prop")] # Treat PI if _value_in_row(get_column_header_name("PI"), row): pi = row[get_column_header_name("PI")] pi_prop = cached_get_entity_by(query=f"FIND PROPERTY {get_entity_name{'PI'}") try: query = f"FIND RECORD Person WITH ID={int(pi)}" except ValueError: query = f"FIND RECORD Person WITH AN '{get_entity_name('abbreviation_prop')}'='{pi}'" try: pi_rec = cached_get_entity_by(query=query) if child.get_property(pi_prop.name) is not None: child.get_property(pi_prop.name).value = pi_rec.id else: child.add_property(id=pi_prop.id, name=pi_prop.name, value=pi_rec.id) except db.EmptyUniqueQueryError: logger.warning(f"There is no PI with ID or {get_entity_name('abbreviation_prop')} {pi}. Skipping.") # Collection(s) if _value_in_row(get_column_header_name("Collection"), row): collection_rt = cached_get_entity_by(query=f"FIND RECORDTYPE '{get_entity_name('Collection')}'") if not ";" in str(row[get_column_header_name("Collection")]): collections = [row[get_column_header_name("Collection")]] else: collections = [coll.strip() for coll in str(row[get_column_header_name("Collection")]).split(';')] prop_val = [] for coll in collections: try: query = f"FIND '{collection_rt.name}' WITH ID={int(coll)}" except ValueError: query = f"FIND RECORD '{collection_rt.name}' WITH name='{coll}'" try: coll_rec = cached_get_entity_by(query=query) prop_val.append(coll_rec.id) except db.EmptyUniqueQueryError: logger.warning(f"There is no {collection_rt.name} with name or ID {coll}. Skipping.") continue if prop_val: if child.get_property(collection_rt.name) is not None: child.get_property(collection_rt.name).datatype = db.LIST(collection_rt.name) child.get_property(collection_rt.name).value = prop_val else: child.add_property(id=collection_rt.id, name=collection_rt.name, datatype=db.LIST( collection_rt.name), value=prop_val) # Treat Container Contents if _value_in_row(get_column_header_name("Container Contents"), row): contents_prop = cached_get_entity_by( query=f"FIND PROPERTY '{get_entity_name('Container Contents')}'" ) if child.get_property(contents_prop.name) is not None: child.get_property(contents_prop.name).value = row[get_column_header_name("Container Contents")] else: child.add_property(id=contents_prop.id, name=contents_prop.name, value=row[get_column_header_name("Container Contents")]) # Treat PDF Report if _value_in_row(get_column_header_name("PDFReport"), row): pdf_id = row[get_column_header_name("PDFReport")] try: pdf_id = int(pdf_id) pdf_rec = cached_query(f"FIND FILE {get_entity_name('PDFReport')} WITH ID={pdf_id}") if not pdf_rec: logger.warning( f"There is no PDFReport with ID {pdf_id}, so no PDF is attached to container {child.id}.") else: if child.get_property(get_entity_name("PDFReport")) is not None: child.get_property(get_property("PDFReport")).value = pdf_id else: child.add_property(id=pdf_rt.id, name=pdf_rt.name, value=pdf_id) except ValueError: logger.warning( f"There is no valid ID provided for container {child.id}." f"Provided was {pdf_id}. Skipping") # This is a workaround for weird merging errors in the # crawler. TODO(fspreck): Remove after merge of sync_node and sync_graph and # following release. merged = [] for par in parent_containers: if (data[id_column_name] == par.id).any(): # A container to be updated is used as another containers parent: child = child_containers.get_entity_by_id(par.id) # All parents have a child sample property with a value (which may # be empty). No child sample has this property, so the following is # okay without checks: prop = par.get_property(child_container_prop.name) child.add_property(name=prop.name, id=prop.id, value=prop.value) merged.append(par) for par in merged: # All relevant information, i.e., the new children have been merged into # the corresponding child, so drop this. parent_containers.remove(par) # TODO Add notes as CommentAnnotation crawler = Crawler(securityMode=SecurityMode.UPDATE) child_containers = post_process_containers_before_sync(child_containers) parent_containers = post_process_parent_containers_before_sync(parent_containers) to_be_synchronized = child_containers + parent_containers inserts, updates = crawler.synchronize( commit_changes=True, unique_names=False, crawled_data=to_be_synchronized, no_insert_RTs=get_do_not_insert_type_names() ) if "SHARED_DIR" in os.environ: _notify_about_inserts_and_updates(len(inserts), len(updates), htmluserlog_public, crawler.run_id) inserts, updates = post_process_inserts_and_updates(isnerts, updates) logger.info(f"Successfully processed {len(child_containers)} containers and " f"{len(parent_containers)} parent containers.") # TODO Create new Spreadsheet for download if __name__ == "__main__": main()