#!/usr/bin/env python3 # encoding: utf-8 # # This file is a part of the LinkAhead Project. # # Copyright (C) 2022 - 2024 GEOMAR # Copyright (C) 2022 Jakob Eckstein # Copyright (C) 2023 - 2024 Indiscale GmbH <info@indiscale.com> # Copyright (C) 2023 - 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # import json import logging import os import linkahead as db import pandas as pd from caosadvancedtools.serverside import helper from caoscrawler import Crawler, SecurityMode from caoscrawler.crawl import _notify_about_inserts_and_updates from caoscrawler.logging import configure_server_side_logging from linkahead.cached import cached_query, cached_get_entity_by from bis_utils import (get_do_not_insert_type_names, replace_entity_urls_by_ids, whitespace_cleanup_in_df) # suppress warning of diff function apilogger = logging.getLogger("linkahead.apiutils") apilogger.setLevel(logging.ERROR) ERROR_PREFIX = 'Something went wrong: ' ERROR_SUFFIX = ' Please conatct <a href="mailto:biosamples@geomar.de">biosamples@geomar.de</a> if you encounter this issue.' logger = logging.getLogger("caosadvancedtools") def _value_in_row(key, row): if not key in row: return False if pd.isnull(row[key]) or row[key] is None or f"{row[key]}" == "": return False return True def _get_parent_by_identifier(parent_identifier): """Get parent specified either by BIS ID, name, or BIS label.""" try: parent_identifier = int(parent_identifier) query = f"FIND Container WITH ID={parent_identifier}" except ValueError: query = (f"FIND Container WITH name='{parent_identifier}' " f"OR WITH 'BIS label'='{parent_identifier}'") return cached_query(query) def get_parser(): par = helper.get_argument_parser() return par def main(): userlog_public, htmluserlog_public, debuglog_public = configure_server_side_logging() logger = logging.getLogger("caosadvancedtools") parser = get_parser() args = parser.parse_args() # Check whether executed locally or as an SSS depending on # auth_token argument. if hasattr(args, "auth_token") and args.auth_token: db.configure_connection(auth_token=args.auth_token) if hasattr(args, "filename") and args.filename: upload_dir = os.path.dirname((args.filename)) # Read the input from the form (form.json) with open(args.filename) as form_json: form_data = json.load(form_json) # Read content of th uplaoded file path = os.path.join(upload_dir, form_data["container_metadata_file"]) data = whitespace_cleanup_in_df(pd.read_csv(path, comment='#')) else: raise ValueError("This script was called without the mandatory form data json file.") data = replace_entity_urls_by_ids(data) # Get referenced container entities child_containers = db.Container() parent_containers = db.Container() for index, row in data.iterrows(): if not _value_in_row("BIS ID", row): logger.error(f"BIS ID is missing in row {index+1}. Nothing was updated.") return 1 try: child = db.Record(id=int(row["BIS ID"])) except ValueError: logger.error( f"Invalid BIS ID {row['BIS ID']} in row {index + 1}. Nothing was updated.") return 1 child.add_parent("Container") child_containers.append(child) if _value_in_row("Parent container", row): parent_identifier = row["Parent container"] parent = _get_parent_by_identifier(parent_identifier) if len(parent) == 0: logger.error( f"Couldn't find parent with identifier '{parent_identifier}' in row {index+1}.") return 1 elif len(parent) > 1: logger.error(f"Parent with identifier '{parent_identifier}' in row {index+1} was not unique. " "Please specify with BIS ID instead.") return 1 parent = parent[0] try: parent_containers.get_entity_by_id(parent.id) except KeyError: parent_containers.append(parent) if not child_containers: # Nothing to update logger.error("There are no containers to be updated") return 1 # Get IDs of proerperties child_container_prop = cached_get_entity_by(query="FIND Property WITH name = 'Child container'") custom_label_prop = cached_get_entity_by(query="FIND Property WITH name = 'Custom label'") pdf_rt = cached_get_entity_by(query="FIND RECORDTYPE WITH name=PDFReport") # Update (/create) container entities for index, row in data.iterrows(): # Add child to parent parent = None if _value_in_row("Parent container", row): parent_identifier = row["Parent container"] # This has already been checked above for uniqueness candidate = _get_parent_by_identifier(parent_identifier)[0] # A bit redundant, but we need the exact Python object here that is in the parent_containers list. parent = parent_containers.get_entity_by_id(candidate.id) if parent.get_property(child_container_prop.id) is None: parent.add_property(id=child_container_prop.id, name=child_container_prop.name, value=[int(row["BIS ID"])]) else: if parent.get_property(child_container_prop.id).value is None: parent.get_property(child_container_prop.id).value = [int(row["BIS ID"])] else: if int(row["BIS ID"]) not in parent.get_property(child_container_prop.id).value: parent.get_property(child_container_prop.id).value.append( int(row["BIS ID"])) # remove the current child from all other parents (don't do anything if the parent didn't change) old_parents = cached_query(f"FIND Container WHICH REFERENCES {int(row['BIS ID'])}") for old_parent in old_parents: if parent is not None and old_parent.id == parent.id: # old parent also is new parent continue try: # Has already been registered for updates old_parent = parent_containers.get_entity_by_id(old_parent.id) except KeyError: parent_containers.append(old_parent) old_parent.remove_value_from_property("Child container", int( row["BIS ID"]), remove_if_empty_afterwards=False) if old_parent.get_property("Child container").value is None: old_parent.get_property("Child container").value = [] # Add custom label o child child = child_containers.get_entity_by_id(id=int(row["BIS ID"])) if _value_in_row("Custom label", row): child.name = row["Custom label"] if child.get_property(custom_label_prop.id) is None: child.add_property(id=custom_label_prop.id, name=custom_label_prop.name, value=row["Custom label"]) else: child.get_property(custom_label_prop.id).value = row["Custom label"] # Treat PI if _value_in_row("PI", row): pi = row["PI"] pi_prop = cached_get_entity_by(query="FIND PROPERTY Pi") try: query = f"FIND RECORD Person WITH ID={int(pi)}" except ValueError: query = f"FIND RECORD Person WITH AN Abbreviation='{pi}'" try: pi_rec = cached_get_entity_by(query=query) if child.get_property(pi_prop.name) is not None: child.get_property(pi_prop.name).value = pi_rec.id else: child.add_property(id=pi_prop.id, name=pi_prop.name, value=pi_rec.id) except db.EmptyUniqueQueryError: logger.warning(f"There is no PI with BIS ID or abbreviation {pi}. Skipping.") # Collection(s) if _value_in_row("Collection", row): collection_rt = cached_get_entity_by(query="FIND RECORDTYPE Collection") if not ";" in str(row["Collection"]): collections = [row["Collection"]] else: collections = [coll.strip() for coll in str(row["Collection"]).split(';')] prop_val = [] for coll in collections: try: query = f"FIND RECORD Collection WITH ID={int(coll)}" except ValueError: query = f"FIND RECORD Collection WITH name='{coll}'" try: coll_rec = cached_get_entity_by(query=query) prop_val.append(coll_rec.id) except db.EmptyUniqueQueryError: logger.warning(f"There is no collection with name or BIS ID {coll}. Skipping.") continue if prop_val: if child.get_property("Collection") is not None: child.get_property("Collection").datatype = db.LIST("Collection") child.get_property("Collection").value = prop_val else: child.add_property(id=collection_rt.id, name=collection_rt.name, datatype=db.LIST( "Collection"), value=prop_val) # Treat Container Contents if _value_in_row("Container Contents", row): if not (_value_in_row("PI", row) and _value_in_row("Collection", row)): logger.error( f"Container Contents are given for container {child.id} but it " "is missing PI and/or Collection info. No updates have been performed." ) return 1 contents_prop = cached_get_entity_by(query="FIND PROPERTY 'Container Contents'") if child.get_property(contents_prop.name) is not None: child.get_property(contents_prop.name).value = row["Container Contents"] else: child.add_property(id=contents_prop.id, name=contents_prop.name, value=row["Container Contents"]) # Treat PDF Report if _value_in_row("PDFReport", row): pdf_id = row["PDFReport"] try: pdf_id = int(pdf_id) pdf_rec = cached_query(f"FIND FILE PDFReport WITH ID={pdf_id}") if not pdf_rec: logger.warning( f"There is no PDFReport with Bis ID {pdf_id}, so no PDF is attached to container {child.id}.") else: if child.get_property("PDFReport") is not None: child.get_property("PDFReport").value = pdf_id else: child.add_property(id=pdf_rt.id, name=pdf_rt.name, value=pdf_id) except ValueError: logger.warning( f"There is no valid Bis ID provided for container {child.id}." f"Provided was {pdf_id}. Skipping") # This is a workaround for weird merging errors in the # crawler. TODO(fspreck): Remove after merge of sync_node and sync_graph and # following release. merged = [] for par in parent_containers: if (data['BIS ID'] == par.id).any(): # A container to be updated is used as another containers parent: child = child_containers.get_entity_by_id(par.id) # All parents have a child sample property with a value (which may # be empty). No child sample has this property, so the following is # okay without checks: prop = par.get_property("Child container") child.add_property(name=prop.name, id=prop.id, value=prop.value) merged.append(par) for par in merged: # All relevant information, i.e., the new children have been merged into # the corresponding child, so drop this. parent_containers.remove(par) # TODO Add notes as CommentAnnotation crawler = Crawler(securityMode=SecurityMode.UPDATE) to_be_synchronized = child_containers + parent_containers inserts, updates = crawler.synchronize( commit_changes=True, unique_names=False, crawled_data=to_be_synchronized, no_insert_RTs=get_do_not_insert_type_names() ) if "SHARED_DIR" in os.environ: _notify_about_inserts_and_updates(len(inserts), len(updates), htmluserlog_public, crawler.run_id) for ent in inserts + updates: ent.retrieve_acl() ent.grant(role='Stock Manager', priority=False, permission="EDIT:ACL") ent.update_acl() logger.info(f"Successfully processed {len(child_containers)} containers and " f"{len(parent_containers)} parent containers.") # TODO Create new Spreadsheet for download if __name__ == "__main__": main()