Skip to content
Snippets Groups Projects
Select Git revision
  • 16ed23be423359430029f513a58959f03aa35506
  • main default protected
  • dev protected
  • f-linkahead-rename
  • f-real-id
  • f-filesystem-import
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-filesystem-main
  • f-name
  • keep_changes
  • f-permission-checks-2
  • f-mysql8-tests
  • f-retrieve-history
  • t-distinct-parents
  • v8.1.0
  • v8.0.0
  • v7.0.2
  • v7.0.1
  • v7.0.0
  • v6.0.1
  • v6.0.0
  • v5.0.0
  • v4.1.0
  • v4.0.0
  • v3.0
  • v2.0.30
29 results

patch.sql

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    update_containers.py 14.28 KiB
    #!/usr/bin/env python3
    # encoding: utf-8
    #
    # This file is a part of the LinkAhead Project.
    #
    # Copyright (C) 2022 - 2024 GEOMAR
    # Copyright (C) 2022 Jakob Eckstein
    # Copyright (C) 2023 - 2024 Indiscale GmbH <info@indiscale.com>
    # Copyright (C) 2023 - 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    
    import json
    import logging
    import os
    
    import linkahead as db
    import pandas as pd
    from caosadvancedtools.serverside import helper
    from caoscrawler import Crawler, SecurityMode
    from caoscrawler.crawl import _notify_about_inserts_and_updates
    from caoscrawler.logging import configure_server_side_logging
    from linkahead.cached import cached_query, cached_get_entity_by
    
    from bis_utils import (get_do_not_insert_type_names,
                           replace_entity_urls_by_ids, whitespace_cleanup_in_df)
    from sample_helpers.container_update_get_parent import get_container_by_identifier
    from sample_helpers.container_update_post_processing import (post_process_containers_before_sync, post_process_parent_containers_before_sync, post_process_inserts_and_updates)
    from sample_helpers.utils import (CONSTANTS, get_column_header_name, get_entity_name)
    
    # suppress warning of diff function
    apilogger = logging.getLogger("linkahead.apiutils")
    apilogger.setLevel(logging.ERROR)
    
    ERROR_PREFIX = CONSTANTS["error_prefix"]
    ERROR_SUFFIX = CONSTANTS["error_suffix"]
    logger = logging.getLogger("caosadvancedtools")
    
    
    def _value_in_row(key, row):
    
        if not key in row:
            return False
        if pd.isnull(row[key]) or row[key] is None or f"{row[key]}" == "":
            return False
        return True
    
    
    
    
    def get_parser():
        par = helper.get_argument_parser()
        return par
    
    
    def main():
        userlog_public, htmluserlog_public, debuglog_public = configure_server_side_logging()
        logger = logging.getLogger("caosadvancedtools")
        parser = get_parser()
        args = parser.parse_args()
        # Check whether executed locally or as an SSS depending on
        # auth_token argument.
        if hasattr(args, "auth_token") and args.auth_token:
            db.configure_connection(auth_token=args.auth_token)
    
        if hasattr(args, "filename") and args.filename:
            upload_dir = os.path.dirname((args.filename))
            # Read the input from the form (form.json)
            with open(args.filename) as form_json:
                form_data = json.load(form_json)
            # Read content of th uplaoded file
            path = os.path.join(upload_dir, form_data["container_metadata_file"])
            data = whitespace_cleanup_in_df(pd.read_csv(path, comment='#'))
        else:
            raise ValueError("This script was called without the mandatory form data json file.")
        data = replace_entity_urls_by_ids(data)
    
        # Get referenced container entities
        child_containers = db.Container()
        parent_containers = db.Container()
        id_column_name = get_column_header_name("entity_id")
        parent_column_name = get_column_header_name("Parent container")
        for index, row in data.iterrows():
            if not _value_in_row(id_column_name, row):
                logger.error(f"{id_column_name} is missing in row {index+1}. Nothing was updated.")
                return 1
            try:
                child = db.Record(id=int(row[id_column_name]))
            except ValueError:
                logger.error(
                    f"Invalid {id_column_name} {row[id_column_name]} in row {index + 1}. Nothing was updated.")
                return 1
            child.add_parent(get_entity_name("container_rt"))
            child_containers.append(child)
            
            if _value_in_row(parent_column_name, row):
                parent_identifier = row[parent_column_name]
                parent = get_container_by_identifier(parent_identifier)
                if len(parent) == 0:
                    logger.error(
                        f"Couldn't find parent with identifier '{parent_identifier}' "
                        f"in row {index+1}."
                    )
                    return 1
                elif len(parent) > 1:
                    logger.error(
                        f"Parent with identifier '{parent_identifier}' in row "
                        f"{index+1} was not unique. Please specify with "
                        f"{id_column_name} instead.")
                    return 1
                parent = parent[0]
                try:
                    parent_containers.get_entity_by_id(parent.id)
                except KeyError:
                    parent_containers.append(parent)
    
        if not child_containers:
            # Nothing to update
            logger.error("There are no containers to be updated")
            return 1
    
        # Get IDs of proerperties
        child_container_prop = cached_get_entity_by(
            query=f"FIND Property WITH name = '{get_entity_name('child_container_prop')}'"
        )
        custom_label_prop = cached_get_entity_by(
            query=f"FIND Property WITH name = '{get_entity_name('custom_label_prop')}'"
        )
        pdf_rt = cached_get_entity_by(
            query=f"FIND RECORDTYPE WITH name='{get_entity_name('PDFReport')}'"
        )
    
        # Update (/create) container entities
        for index, row in data.iterrows():
            # Add child to parent
            parent = None
            if _value_in_row(parent_column_name, row):
                parent_identifier = row[parent_column_name]
                # This has already been checked above for uniqueness
                candidate = get_container_by_identifier(parent_identifier)[0]
                # A bit redundant, but we need the exact Python object here that is in the parent_containers list.
                parent = parent_containers.get_entity_by_id(candidate.id)
    
                if parent.get_property(child_container_prop.id) is None:
                    parent.add_property(id=child_container_prop.id,
                                        name=child_container_prop.name, value=[int(row[id_column_name])])
                else:
                    if parent.get_property(child_container_prop.id).value is None:
                        parent.get_property(child_container_prop.id).value = [int(row[id_column_name])]
                    else:
                        if int(row[id_column_name]) not in parent.get_property(child_container_prop.id).value:
                            parent.get_property(child_container_prop.id).value.append(
                                int(row[id_column_name]))
    
                # remove the current child from all other parents (don't
                # do anything if the parent didn't change)
                old_parents = cached_query(
                    f"FIND {get_entity_name('container_rt')} WHICH REFERENCES "
                    f"{int(row[id_column_name])}"
                )
                for old_parent in old_parents:
                    if parent is not None and old_parent.id == parent.id:
                        # old parent also is new parent
                        continue
                    try:
                        # Has already been registered for updates
                        old_parent = parent_containers.get_entity_by_id(old_parent.id)
                    except KeyError:
                        parent_containers.append(old_parent)
                    old_parent.remove_value_from_property(child_container_prop.name, int(
                        row[id_column_name]), remove_if_empty_afterwards=False)
                    if old_parent.get_property(child_container_prop.name).value is None:
                        old_parent.get_property(child_container_prop.name).value = []
    
            # Add custom label o child
            child = child_containers.get_entity_by_id(id=int(row[id_column_name]))
            if _value_in_row(get_column_header_name("custom_label_prop"), row):
                child.name = row[get_column_header_name("custom_label_prop")]
                if child.get_property(custom_label_prop.id) is None:
                    child.add_property(id=custom_label_prop.id,
                                       name=custom_label_prop.name, value=row[get_column_header_name("custom_label_prop")])
                else:
                    child.get_property(custom_label_prop.id).value = row[get_column_header_name("custom_label_prop")]
    
            # Treat PI
            if _value_in_row(get_column_header_name("PI"), row):
                pi = row[get_column_header_name("PI")]
                pi_prop = cached_get_entity_by(query=f"FIND PROPERTY {get_entity_name{'PI'}")
                try:
                    query = f"FIND RECORD Person WITH ID={int(pi)}"
                except ValueError:
                    query = f"FIND RECORD Person WITH AN '{get_entity_name('abbreviation_prop')}'='{pi}'"
                try:
                    pi_rec = cached_get_entity_by(query=query)
                    if child.get_property(pi_prop.name) is not None:
                        child.get_property(pi_prop.name).value = pi_rec.id
                    else:
                        child.add_property(id=pi_prop.id, name=pi_prop.name, value=pi_rec.id)
                except db.EmptyUniqueQueryError:
                    logger.warning(f"There is no PI with ID or {get_entity_name('abbreviation_prop')} {pi}. Skipping.")
    
            # Collection(s)
            if _value_in_row(get_column_header_name("Collection"), row):
                collection_rt = cached_get_entity_by(query=f"FIND RECORDTYPE '{get_entity_name('Collection')}'")
                if not ";" in str(row[get_column_header_name("Collection")]):
                    collections = [row[get_column_header_name("Collection")]]
                else:
                    collections = [coll.strip() for coll in str(row[get_column_header_name("Collection")]).split(';')]
                prop_val = []
                for coll in collections:
                    try:
                        query = f"FIND '{collection_rt.name}' WITH ID={int(coll)}"
                    except ValueError:
                        query = f"FIND RECORD '{collection_rt.name}' WITH name='{coll}'"
                    try:
                        coll_rec = cached_get_entity_by(query=query)
                        prop_val.append(coll_rec.id)
                    except db.EmptyUniqueQueryError:
                        logger.warning(f"There is no {collection_rt.name} with name or ID {coll}. Skipping.")
                        continue
                if prop_val:
                    if child.get_property(collection_rt.name) is not None:
                        child.get_property(collection_rt.name).datatype = db.LIST(collection_rt.name)
                        child.get_property(collection_rt.name).value = prop_val
                    else:
                        child.add_property(id=collection_rt.id, name=collection_rt.name, datatype=db.LIST(
                            collection_rt.name), value=prop_val)
    
            # Treat Container Contents
            if _value_in_row(get_column_header_name("Container Contents"), row):
                contents_prop = cached_get_entity_by(
                    query=f"FIND PROPERTY '{get_entity_name('Container Contents')}'"
                )
                if child.get_property(contents_prop.name) is not None:
                    child.get_property(contents_prop.name).value = row[get_column_header_name("Container Contents")]
                else:
                    child.add_property(id=contents_prop.id, name=contents_prop.name,
                                       value=row[get_column_header_name("Container Contents")])
    
            # Treat PDF Report
            if _value_in_row(get_column_header_name("PDFReport"), row):
                pdf_id = row[get_column_header_name("PDFReport")]
                try:
                    pdf_id = int(pdf_id)
                    pdf_rec = cached_query(f"FIND FILE {get_entity_name('PDFReport')} WITH ID={pdf_id}")
                    if not pdf_rec:
                        logger.warning(
                            f"There is no PDFReport with ID {pdf_id}, so no PDF is attached to container {child.id}.")
                    else:
                        if child.get_property(get_entity_name("PDFReport")) is not None:
                            child.get_property(get_property("PDFReport")).value = pdf_id
                        else:
                            child.add_property(id=pdf_rt.id, name=pdf_rt.name, value=pdf_id)
                except ValueError:
                    logger.warning(
                        f"There is no valid ID provided for container {child.id}."
                        f"Provided was {pdf_id}. Skipping")
    
        # This is a workaround for weird merging errors in the
        # crawler. TODO(fspreck): Remove after merge of sync_node and sync_graph and
        # following release.
        merged = []
        for par in parent_containers:
            if (data[id_column_name] == par.id).any():
                # A container to be updated is used as another containers parent:
                child = child_containers.get_entity_by_id(par.id)
                # All parents have a child sample property with a value (which may
                # be empty). No child sample has this property, so the following is
                # okay without checks:
                prop = par.get_property(child_container_prop.name)
                child.add_property(name=prop.name, id=prop.id, value=prop.value)
                merged.append(par)
        for par in merged:
            # All relevant information, i.e., the new children have been merged into
            # the corresponding child, so drop this.
            parent_containers.remove(par)
        # TODO Add notes as CommentAnnotation
        crawler = Crawler(securityMode=SecurityMode.UPDATE)
        child_containers = post_process_containers_before_sync(child_containers)
        parent_containers = post_process_parent_containers_before_sync(parent_containers)
        to_be_synchronized = child_containers + parent_containers
    
        inserts, updates = crawler.synchronize(
            commit_changes=True, unique_names=False, crawled_data=to_be_synchronized,
            no_insert_RTs=get_do_not_insert_type_names()
        )
        if "SHARED_DIR" in os.environ:
            _notify_about_inserts_and_updates(len(inserts), len(updates), htmluserlog_public,
                                              crawler.run_id)
        inserts, updates = post_process_inserts_and_updates(isnerts, updates)
    
        logger.info(f"Successfully processed {len(child_containers)} containers and "
                    f"{len(parent_containers)} parent containers.")
    
        # TODO Create new Spreadsheet for download
    
    
    if __name__ == "__main__":
        main()