Select Git revision
update_containers.py
-
Florian Spreckelsen authoredFlorian Spreckelsen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
update_containers.py 14.28 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2022 - 2024 GEOMAR
# Copyright (C) 2022 Jakob Eckstein
# Copyright (C) 2023 - 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2023 - 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import json
import logging
import os
import linkahead as db
import pandas as pd
from caosadvancedtools.serverside import helper
from caoscrawler import Crawler, SecurityMode
from caoscrawler.crawl import _notify_about_inserts_and_updates
from caoscrawler.logging import configure_server_side_logging
from linkahead.cached import cached_query, cached_get_entity_by
from bis_utils import (get_do_not_insert_type_names,
replace_entity_urls_by_ids, whitespace_cleanup_in_df)
from sample_helpers.container_update_get_parent import get_container_by_identifier
from sample_helpers.container_update_post_processing import (post_process_containers_before_sync, post_process_parent_containers_before_sync, post_process_inserts_and_updates)
from sample_helpers.utils import (CONSTANTS, get_column_header_name, get_entity_name)
# suppress warning of diff function
apilogger = logging.getLogger("linkahead.apiutils")
apilogger.setLevel(logging.ERROR)
ERROR_PREFIX = CONSTANTS["error_prefix"]
ERROR_SUFFIX = CONSTANTS["error_suffix"]
logger = logging.getLogger("caosadvancedtools")
def _value_in_row(key, row):
if not key in row:
return False
if pd.isnull(row[key]) or row[key] is None or f"{row[key]}" == "":
return False
return True
def get_parser():
par = helper.get_argument_parser()
return par
def main():
userlog_public, htmluserlog_public, debuglog_public = configure_server_side_logging()
logger = logging.getLogger("caosadvancedtools")
parser = get_parser()
args = parser.parse_args()
# Check whether executed locally or as an SSS depending on
# auth_token argument.
if hasattr(args, "auth_token") and args.auth_token:
db.configure_connection(auth_token=args.auth_token)
if hasattr(args, "filename") and args.filename:
upload_dir = os.path.dirname((args.filename))
# Read the input from the form (form.json)
with open(args.filename) as form_json:
form_data = json.load(form_json)
# Read content of th uplaoded file
path = os.path.join(upload_dir, form_data["container_metadata_file"])
data = whitespace_cleanup_in_df(pd.read_csv(path, comment='#'))
else:
raise ValueError("This script was called without the mandatory form data json file.")
data = replace_entity_urls_by_ids(data)
# Get referenced container entities
child_containers = db.Container()
parent_containers = db.Container()
id_column_name = get_column_header_name("entity_id")
parent_column_name = get_column_header_name("Parent container")
for index, row in data.iterrows():
if not _value_in_row(id_column_name, row):
logger.error(f"{id_column_name} is missing in row {index+1}. Nothing was updated.")
return 1
try:
child = db.Record(id=int(row[id_column_name]))
except ValueError:
logger.error(
f"Invalid {id_column_name} {row[id_column_name]} in row {index + 1}. Nothing was updated.")
return 1
child.add_parent(get_entity_name("container_rt"))
child_containers.append(child)
if _value_in_row(parent_column_name, row):
parent_identifier = row[parent_column_name]
parent = get_container_by_identifier(parent_identifier)
if len(parent) == 0:
logger.error(
f"Couldn't find parent with identifier '{parent_identifier}' "
f"in row {index+1}."
)
return 1
elif len(parent) > 1:
logger.error(
f"Parent with identifier '{parent_identifier}' in row "
f"{index+1} was not unique. Please specify with "
f"{id_column_name} instead.")
return 1
parent = parent[0]
try:
parent_containers.get_entity_by_id(parent.id)
except KeyError:
parent_containers.append(parent)
if not child_containers:
# Nothing to update
logger.error("There are no containers to be updated")
return 1
# Get IDs of proerperties
child_container_prop = cached_get_entity_by(
query=f"FIND Property WITH name = '{get_entity_name('child_container_prop')}'"
)
custom_label_prop = cached_get_entity_by(
query=f"FIND Property WITH name = '{get_entity_name('custom_label_prop')}'"
)
pdf_rt = cached_get_entity_by(
query=f"FIND RECORDTYPE WITH name='{get_entity_name('PDFReport')}'"
)
# Update (/create) container entities
for index, row in data.iterrows():
# Add child to parent
parent = None
if _value_in_row(parent_column_name, row):
parent_identifier = row[parent_column_name]
# This has already been checked above for uniqueness
candidate = get_container_by_identifier(parent_identifier)[0]
# A bit redundant, but we need the exact Python object here that is in the parent_containers list.
parent = parent_containers.get_entity_by_id(candidate.id)
if parent.get_property(child_container_prop.id) is None:
parent.add_property(id=child_container_prop.id,
name=child_container_prop.name, value=[int(row[id_column_name])])
else:
if parent.get_property(child_container_prop.id).value is None:
parent.get_property(child_container_prop.id).value = [int(row[id_column_name])]
else:
if int(row[id_column_name]) not in parent.get_property(child_container_prop.id).value:
parent.get_property(child_container_prop.id).value.append(
int(row[id_column_name]))
# remove the current child from all other parents (don't
# do anything if the parent didn't change)
old_parents = cached_query(
f"FIND {get_entity_name('container_rt')} WHICH REFERENCES "
f"{int(row[id_column_name])}"
)
for old_parent in old_parents:
if parent is not None and old_parent.id == parent.id:
# old parent also is new parent
continue
try:
# Has already been registered for updates
old_parent = parent_containers.get_entity_by_id(old_parent.id)
except KeyError:
parent_containers.append(old_parent)
old_parent.remove_value_from_property(child_container_prop.name, int(
row[id_column_name]), remove_if_empty_afterwards=False)
if old_parent.get_property(child_container_prop.name).value is None:
old_parent.get_property(child_container_prop.name).value = []
# Add custom label o child
child = child_containers.get_entity_by_id(id=int(row[id_column_name]))
if _value_in_row(get_column_header_name("custom_label_prop"), row):
child.name = row[get_column_header_name("custom_label_prop")]
if child.get_property(custom_label_prop.id) is None:
child.add_property(id=custom_label_prop.id,
name=custom_label_prop.name, value=row[get_column_header_name("custom_label_prop")])
else:
child.get_property(custom_label_prop.id).value = row[get_column_header_name("custom_label_prop")]
# Treat PI
if _value_in_row(get_column_header_name("PI"), row):
pi = row[get_column_header_name("PI")]
pi_prop = cached_get_entity_by(query=f"FIND PROPERTY {get_entity_name{'PI'}")
try:
query = f"FIND RECORD Person WITH ID={int(pi)}"
except ValueError:
query = f"FIND RECORD Person WITH AN '{get_entity_name('abbreviation_prop')}'='{pi}'"
try:
pi_rec = cached_get_entity_by(query=query)
if child.get_property(pi_prop.name) is not None:
child.get_property(pi_prop.name).value = pi_rec.id
else:
child.add_property(id=pi_prop.id, name=pi_prop.name, value=pi_rec.id)
except db.EmptyUniqueQueryError:
logger.warning(f"There is no PI with ID or {get_entity_name('abbreviation_prop')} {pi}. Skipping.")
# Collection(s)
if _value_in_row(get_column_header_name("Collection"), row):
collection_rt = cached_get_entity_by(query=f"FIND RECORDTYPE '{get_entity_name('Collection')}'")
if not ";" in str(row[get_column_header_name("Collection")]):
collections = [row[get_column_header_name("Collection")]]
else:
collections = [coll.strip() for coll in str(row[get_column_header_name("Collection")]).split(';')]
prop_val = []
for coll in collections:
try:
query = f"FIND '{collection_rt.name}' WITH ID={int(coll)}"
except ValueError:
query = f"FIND RECORD '{collection_rt.name}' WITH name='{coll}'"
try:
coll_rec = cached_get_entity_by(query=query)
prop_val.append(coll_rec.id)
except db.EmptyUniqueQueryError:
logger.warning(f"There is no {collection_rt.name} with name or ID {coll}. Skipping.")
continue
if prop_val:
if child.get_property(collection_rt.name) is not None:
child.get_property(collection_rt.name).datatype = db.LIST(collection_rt.name)
child.get_property(collection_rt.name).value = prop_val
else:
child.add_property(id=collection_rt.id, name=collection_rt.name, datatype=db.LIST(
collection_rt.name), value=prop_val)
# Treat Container Contents
if _value_in_row(get_column_header_name("Container Contents"), row):
contents_prop = cached_get_entity_by(
query=f"FIND PROPERTY '{get_entity_name('Container Contents')}'"
)
if child.get_property(contents_prop.name) is not None:
child.get_property(contents_prop.name).value = row[get_column_header_name("Container Contents")]
else:
child.add_property(id=contents_prop.id, name=contents_prop.name,
value=row[get_column_header_name("Container Contents")])
# Treat PDF Report
if _value_in_row(get_column_header_name("PDFReport"), row):
pdf_id = row[get_column_header_name("PDFReport")]
try:
pdf_id = int(pdf_id)
pdf_rec = cached_query(f"FIND FILE {get_entity_name('PDFReport')} WITH ID={pdf_id}")
if not pdf_rec:
logger.warning(
f"There is no PDFReport with ID {pdf_id}, so no PDF is attached to container {child.id}.")
else:
if child.get_property(get_entity_name("PDFReport")) is not None:
child.get_property(get_property("PDFReport")).value = pdf_id
else:
child.add_property(id=pdf_rt.id, name=pdf_rt.name, value=pdf_id)
except ValueError:
logger.warning(
f"There is no valid ID provided for container {child.id}."
f"Provided was {pdf_id}. Skipping")
# This is a workaround for weird merging errors in the
# crawler. TODO(fspreck): Remove after merge of sync_node and sync_graph and
# following release.
merged = []
for par in parent_containers:
if (data[id_column_name] == par.id).any():
# A container to be updated is used as another containers parent:
child = child_containers.get_entity_by_id(par.id)
# All parents have a child sample property with a value (which may
# be empty). No child sample has this property, so the following is
# okay without checks:
prop = par.get_property(child_container_prop.name)
child.add_property(name=prop.name, id=prop.id, value=prop.value)
merged.append(par)
for par in merged:
# All relevant information, i.e., the new children have been merged into
# the corresponding child, so drop this.
parent_containers.remove(par)
# TODO Add notes as CommentAnnotation
crawler = Crawler(securityMode=SecurityMode.UPDATE)
child_containers = post_process_containers_before_sync(child_containers)
parent_containers = post_process_parent_containers_before_sync(parent_containers)
to_be_synchronized = child_containers + parent_containers
inserts, updates = crawler.synchronize(
commit_changes=True, unique_names=False, crawled_data=to_be_synchronized,
no_insert_RTs=get_do_not_insert_type_names()
)
if "SHARED_DIR" in os.environ:
_notify_about_inserts_and_updates(len(inserts), len(updates), htmluserlog_public,
crawler.run_id)
inserts, updates = post_process_inserts_and_updates(isnerts, updates)
logger.info(f"Successfully processed {len(child_containers)} containers and "
f"{len(parent_containers)} parent containers.")
# TODO Create new Spreadsheet for download
if __name__ == "__main__":
main()