-
Florian Spreckelsen authoredFlorian Spreckelsen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
crawl_sample_data_async.py 11.25 KiB
#!/usr/bin/env python3
#
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2024 GEOMAR
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import logging
import os
import pandas as pd
import re
import sys
from dateutil import parser as dateparser
from pickle import UnpicklingError
import linkahead as db
from caosadvancedtools.datainconsistency import DataInconsistencyError
from caosadvancedtools.serverside import helper
from caoscrawler import Crawler, SecurityMode
from caoscrawler.crawl import ForbiddenTransaction, _notify_about_inserts_and_updates
from caoscrawler.exceptions import ImpossibleMergeError
from caoscrawler.identifiable_adapters import CaosDBIdentifiableAdapter
from caoscrawler.logging import configure_server_side_logging
from linkahead.cached import cached_get_entity_by
from linkahead.common.datatype import get_id_of_datatype
from bis_utils import (get_do_not_insert_type_names,
IGNORED_COLUMN_NAMES_SAMPLE,
return_value_if_not_none,
send_mail_with_defaults,
SPECIAL_TREATMENT_SAMPLE)
from sample_helpers.sample_upload_add_special_properties import add_special_properties
from sample_helpers.sample_upload_get_event import add_event_to_sample
from sample_helpers.sample_upload_get_person import get_person
from sample_helpers.sample_upload_post_processing import post_process_samples
from sample_helpers.utils import (get_column_header_name, get_entity_name, update_property)
# suppress warning of diff function
apilogger = logging.getLogger("linkahead.apiutils")
apilogger.setLevel(logging.ERROR)
logger = logging.getLogger("caosadvancedtools")
def _notify_about_error(text, subject):
logger.error(text)
send_mail_with_defaults(subject=subject, body=text)
def _is_ignored_column_name(name, parent_suffix="_parent"):
return name in IGNORED_COLUMN_NAMES_SAMPLE or name.endswith(parent_suffix)
def synchroize(records, additional_property_ents, htmluserlog_public):
crawler = Crawler(securityMode=SecurityMode.UPDATE)
identifiables_definition_file = os.path.expanduser("~/identifiables.yml")
ident = CaosDBIdentifiableAdapter()
ident.load_from_yaml_definition(identifiables_definition_file)
for property_name, entity in additional_property_ents.items():
if entity.role != "RecordType":
continue
if len(entity.properties) == 0:
ident.register_identifiable(
name=entity.name,
definition=db.RecordType().add_parent(entity.name).add_property(name="name"))
else:
ident.register_identifiable(
name=entity.name,
definition=db.RecordType().add_parent(entity.name).add_property(
name=entity.properties[0].name))
crawler.identifiableAdapter = ident
inserts, updates = crawler.synchronize(commit_changes=True, unique_names=False,
crawled_data=records,
no_insert_RTs=get_do_not_insert_type_names(),
no_update_RTs=None,
)
if "SHARED_DIR" in os.environ:
_notify_about_inserts_and_updates(len(inserts), len(updates), htmluserlog_public,
crawler.run_id)
def update_sample_records(data, htmluserlog_public):
logger.info("Starting sample updates...")
# TODO Check data first and if there are Errors in the data: Provide the user with a download
# link to a template with Error descriptions.
additional_properties = data.keys().to_list()
additional_property_ids = {} # name-> id
additional_property_ents = {} # name-> Entity
for property_name in additional_properties:
if property_name in SPECIAL_TREATMENT_SAMPLE or _is_ignored_column_name(property_name):
continue
try:
try:
res = cached_get_entity_by(query=f"FIND PROPERTY WITH name='{property_name}'")
except db.EmptyUniqueQueryError:
res = cached_get_entity_by(query=f"FIND RECORDTYPE WITH name='{property_name}'")
additional_property_ids[property_name] = res.id
additional_property_ents[property_name] = res
except db.exceptions.EmptyUniqueQueryError:
logger.info(f"Couldn't find (unique) Property or RecordType: '{property_name}'."
f"\nThe column '{property_name}' is not being used.")
except db.QueryNotUniqueError:
logger.info(f"Property or RecordType {property_name} was not unique. "
"Skipping this column.")
# Create everything needed to update the samples
samples = []
for index, row in data.iterrows():
sample_id_exists = not pd.isnull(row[get_column_header_name("entity_id")])
if not sample_id_exists:
raise DataInconsistencyError(f"Missing sample ID in row {index}")
try:
sample = db.execute_query(
"FIND RECORD Sample WITH id = {}".format(
row[get_column_header_name("entity_id")]),
unique=True)
except db.exceptions.EmptyUniqueQueryError:
msg = "There is no Sample with ID = {} in the system.".format(
row[get_column_header_name("entity_id")])
raise DataInconsistencyError(msg)
if get_column_header_name("sample_name") in data:
# We want to allow to overwrite the name with empty
# values. Note that this is cureently broken in the
# crawler and needs to be fixed there.
sample.name = return_value_if_not_none(row[get_column_header_name("sample_name")])
# All special properties are added here
sample = add_special_properties(sample, row)
# Add additional properties
for property_name in additional_property_ids.keys():
if return_value_if_not_none(row[property_name]) is None or (isinstance(row[property_name], list) and
len(row[property_name]) == 0):
continue
ent = additional_property_ents[property_name]
if ent.role == "RecordType":
value = db.Record().add_parent(ent.name)
if len(ent.properties) > 1:
raise DataInconsistencyError(
f"Trying to add a {ent.name} to a sample. Cannot identify property to set "
f"because RecordType with ID={ent.id} has more than one Property.")
if len(ent.properties) == 0:
value.name = return_value_if_not_none(row[property_name])
else:
value.add_property(
name=ent.properties[0].name, value=return_value_if_not_none(row[property_name]))
else:
value = return_value_if_not_none(row[property_name])
sample = update_property(
entity=sample, property_id=additional_property_ids[property_name],
value=value, property_name=property_name)
# Now, treat events and event data
sample = add_event_to_sample(sample, row)
samples.append(sample)
# Samples might need additional post processing
samples = post_process_samples(samples, data)
synchroize(samples, additional_property_ents, htmluserlog_public)
def main():
parser = helper.get_argument_parser()
parser.add_argument(
"pickled_sample_data",
help="Dump of the cleaned and checked sample data for crawling."
)
parser.add_argument(
"old_filename",
help="Name of the file that was uploaded originally for logging purposes."
)
args = parser.parse_args()
if hasattr(args, "auth_token") and args.auth_token:
db.configure_connection(
auth_token=args.auth_token,
timeout=(30, 60*60*24*7) # Rather short connection timeout, one week for read.
)
userlog_public, htmluserlog_public, debuglog_public = configure_server_side_logging()
else:
rootlogger = logging.getLogger()
rootlogger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(logging.DEBUG)
rootlogger.addHandler(handler)
userlog_public = "/tmp/upload_sample_userlog.log"
htmluserlog_public = "/tmp/upload_sample_userlog.html"
debuglog_public = "/tmp/upload_sample_debuglog.html"
try:
sample_data = pd.read_pickle(args.pickled_sample_data)
except (FileNotFoundError, UnpicklingError) as err:
email_body = f"""
Dear curator,
There were problems transferring the read-in CSV data from
{args.old_filename} to the asynchronous crawl script:
{str(err)}
"""
_notify_about_error(
subject=f"Errors when loading {args.old_filename}",
text=email_body
)
return 2
try:
update_sample_records(sample_data, htmluserlog_public)
except db.TransactionError as te:
email_body = f"""
Dear curator,
There were problems synchronizing the sample entities from {args.old_filename} to the LinkAhead server:
{te}
"""
_notify_about_error(
subject=f"Errors when synchronoizing {args.old_filename}",
text=email_body
)
return 3
except DataInconsistencyError as die:
email_body = f"""
Dear Curator,
There were problems with the data in {args.old_filename}:
{die}
Please check for mistakes like typos in names or ids, wrong data
types, or missing information.
"""
_notify_about_error(
subject=f"Parsing errors in {args.old_filename}",
text=email_body
)
return 4
except ForbiddenTransaction as fte:
email_body = f"""
Dear Curator,
Crawling {args.old_filename} resulted in forbidden transactions:
{fte}
"""
_notify_about_error(
subject=f"Forbidden transactions in {args.old_filename}",
text=email_body
)
return 5
except ImpossibleMergeError as ime:
email_body = f"""
Dear Curator,
There was a conflict when merging sample or event information in {args.old_filename}:
{ime}
Please verify that all information that there is no contradictory
information belonging to a single entity.
"""
_notify_about_error(
subject=f"Merge conflict in {args.old_filename}",
text=email_body
)
return 6
if __name__ == "__main__":
sys.exit(main())