Select Git revision
export_container_csv.py
-
Florian Spreckelsen authoredFlorian Spreckelsen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
export_container_csv.py 12.73 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2023 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2023 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2023 Florian Spreckelsen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import json
import logging
import os
import sys
import linkahead as db
from caosadvancedtools.serverside import helper
from caosadvancedtools.table_export import BaseTableExporter
from caoscrawler.logging import configure_server_side_logging
from linkahead.cached import cached_get_entity_by, cached_query as cquery
from linkahead.exceptions import (EmptyUniqueQueryError,
QueryNotUniqueError)
from bis_utils import (create_email_with_link_text,
get_description_row, get_email_from_username,
get_options_row, send_mail_with_defaults)
from sample_helpers.utils import (CONSTANTS, get_column_header_name,
get_entity_name)
# suppress warning of diff function
apilogger = logging.getLogger("linkahead.apiutils")
apilogger.setLevel(logging.ERROR)
logger = logging.getLogger("caosadvancedtools")
ERROR_PREFIX = CONSTANTS["error_prefix"]
ERROR_SUFFIX = CONSTANTS["error_suffix"]
def cached_query(query, unique=False):
"""Wrapper for cached queries that may be unique."""
if unique:
return cached_get_entity_by(query=query)
return cquery(query)
def reverse_semicolon_separated_list(value):
if isinstance(value, list):
return ";".join([str(val) for val in value])
else:
return value
def generate_label_text(entity):
custom_label_prop_name = get_entity_name("custom_label_prop")
if entity.get_property(custom_label_prop_name) is not None and entity.get_property(custom_label_prop_name).value:
label = entity.get_property(custom_label_prop_name).value
else:
container_label_prop_name = get_entity_name("container_label_prop")
label = entity.get_property(container_label_prop_name).value if entity.get_property(
container_label_prop_name) is not None else None
if not label:
if not entity.name:
return entity.id
return f"{entity.id} {entity.name}"
if f"{label}" == f"{entity.id}":
# prevent special case of Label = ID resulting in a preview
# that looks like "id, id".
return entity.id
return f"{entity.id} {label}"
def extract_parent_container(record, key):
possible_parents = cached_query(
f"FIND {get_entity_name('container_rt')} WHICH REFERENCES {record.id}"
)
values = []
for par in possible_parents:
if par.name:
values.append(par.name)
else:
values.append(par.id)
return reverse_semicolon_separated_list(values)
def extract_eid(record, key):
return record.id
def extract_custom_label(record, key):
custom_label_prop_name = get_entity_name("custom_label_prop")
if record.get_property(custom_label_prop_name) is not None:
return record.get_property(custom_label_prop_name).value
return None
def extract_pi(record, key):
if record.get_property("PI") is not None and record.get_property("PI").value is not None:
pi_id = record.get_property("PI").value
pi_rec = cached_query(f"FIND RECORD Person WITH ID={pi_id}", unique=True)
abbr_prop_name = get_entity_name("abbreviation_prop")
if pi_rec.get_property(abbr_prop_name) is not None and pi_rec.get_property(abbr_prop_name).value is not None:
return pi_rec.get_property(abbr_prop_name).value
return pi_id
return None
def extract_collection(record, key):
if record.get_property("Collection") is not None and record.get_property("Collection").value is not None:
collections = record.get_property("Collection").value
if not isinstance(collections, list):
collections = [collections]
values = []
for coll in collections:
cr = cached_query(f"FIND RECORD Collection WITH ID={coll}", unique=True)
if cr.name is not None and cr.name != "":
values.append(cr.name)
else:
values.append(cr.id)
return reverse_semicolon_separated_list(values)
return None
def extract_container_type(record, key):
abbr_prop_name = get_entity_name("abbreviation_prop")
containertype_rt_name = get_entity_name("containertype_rt")
if record.get_property(containertype_rt_name) is not None and record.get_property(containertype_rt_name).value is not None:
ct_id = record.get_property(containertype_rt_name).value
ctr = cached_query(f"FIND {containertype_rt_name} WITH ID={ct_id}", unique=True)
if ctr.get_property(abbr_prop_name) is not None and ctr.get_property(abbr_prop_name).value is not None:
return ctr.get_property(abbr_prop_name).value
return ct_id
return None
def extract_storage_chain(record, key):
def find_referencing_containers(eid):
containing_containers = []
candidates = cached_query(f"FIND {get_entity_name('container_rt')} WHICH REFERENCES {eid}")
if len(candidates) > 1:
logger.debug(f"Entity {eid} is referenced by more than one container.")
return []
elif len(candidates) == 1:
containing_containers.extend(find_referencing_containers(candidates[0].id))
containing_containers.extend(candidates)
return containing_containers
containing_containers = find_referencing_containers(record.id)
containing_containers.append(record)
return " → ".join([str(generate_label_text(cont)) for cont in containing_containers])
def default_extractor(record, key):
if record.get_property(key) is not None:
return record.get_property(key).value
else:
return None
EXTRACTORS = {
get_column_header_name("entity_id"): extract_eid,
get_column_header_name("container_label_prop"): default_extractor,
get_column_header_name("Collection"): extract_collection,
get_column_header_name("container_size_prop"): default_extractor,
get_column_header_name("containertype_rt"): extract_container_type,
get_column_header_name("custom_label_prop"): extract_custom_label,
get_column_header_name("PDFReport"): default_extractor,
get_column_header_name("PI"): extract_pi,
get_column_header_name("Parent container"): extract_parent_container,
get_column_header_name("Storage chain"): extract_storage_chain,
}
# List of sample properties to be ignored because they are treated
# otherwise. Similar, but not identical to SPECIAL TREATMENT.
IGNORE_KEYS = [
# To be changed by updating the child, not the parent.
get_column_header_name("child_container_prop"),
get_column_header_name("containertype_rt"), # handled separately above
get_column_header_name("Responsible"), # Not to be changed via upload
]
def extract_value(r, e):
if e in EXTRACTORS:
v = EXTRACTORS[e](r, e)
else:
v = default_extractor(r, e)
if isinstance(v, str) and (',' in v or '\n' in v):
# Quote text fields with commas in them
v = f"\"{v}\""
return v if v is not None else ""
class TableExporter(BaseTableExporter):
pass
def retrieve_containers(data):
container = []
not_found = []
for eid in data:
if isinstance(eid, int):
try:
container.append(
cached_query(
f"FIND RECORD {get_entity_name('container_rt')} WITH id = '{eid}'",
unique=True))
except EmptyUniqueQueryError as e:
# we want to warn about these
not_found.append(eid)
else:
found_at_least_one_in_range = False
for next_eid in eid:
try:
container.append(
cached_query(
f"FIND RECORD {get_entity_name('container_rt')} WITH id = '{next_eid}'",
unique=True))
found_at_least_one_in_range = True
except EmptyUniqueQueryError as e:
pass
if not found_at_least_one_in_range:
not_found.append(f"{eid.start}-{eid.stop-1}")
return container, not_found
def to_csv(containers):
export_dict = {}
for key in EXTRACTORS:
export_dict[key] = {}
lower_case_keys = [e.lower() for e in export_dict]
for c in containers:
# collect other properties
for p in c.get_properties():
if not p.name.lower() in lower_case_keys and not p.name.lower() in [ign.lower() for ign in IGNORE_KEYS]:
export_dict[p.name] = {}
lower_case_keys.append(p.name.lower())
for e in export_dict:
export_dict[e]["find_func"] = extract_value
export_dict[e]["optional"] = True
keys = [e for e in export_dict]
csv = []
for c in containers:
table_exporter = TableExporter(export_dict, record=c)
table_exporter.all_keys = keys
table_exporter.collect_information()
logger.debug('<code>' + str(table_exporter.info) + '</code>')
csv.append(table_exporter.prepare_csv_export(print_header=False))
header = ",".join(keys) + "\n"
header += ",".join(get_description_row(keys)) + '\n'
header += ",".join(get_options_row(keys)) + '\n'
return header + "\n".join(csv)
def write_csv(file_name, csv):
display_path, internal_path = helper.get_shared_filename(file_name)
with open(internal_path, "w") as csv_file:
csv_file.write(csv)
return display_path
def main():
parser = helper.get_argument_parser()
args = parser.parse_args()
# Check whether executed locally or as an SSS depending on
# auth_token argument.
if hasattr(args, "auth_token") and args.auth_token:
db.configure_connection(auth_token=args.auth_token)
debug_file = configure_server_side_logging()
else:
rootlogger = logging.getLogger()
rootlogger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setLevel(logging.DEBUG)
rootlogger.addHandler(handler)
debug_file = "/tmp/upload_sample_debug.log"
if hasattr(args, "filename") and args.filename:
# Read the input from the form (form.json)
with open(args.filename) as form_json:
form_data = json.load(form_json)
tmp = form_data["container_ids"].split(",")
data = []
for d in tmp:
if "-" in d:
bound = [int(b) for b in d.split("-")]
data.append(range(min(bound), max(bound) + 1))
else:
data.append(int(d.strip()))
containers, not_found = retrieve_containers(data)
if len(containers) == 0:
logger.error("No containers in the given range.")
return
for s in containers:
logger.debug("Found container " + str(s.id))
for s in not_found:
logger.warning("No containers found: " + str(s))
csv = to_csv(containers)
max_id = max([c.id for c in containers])
min_id = min([c.id for c in containers])
file_name = f"containers_export_(IDs_{min_id}_to_{max_id}).csv"
display_path = write_csv(file_name, csv)
logger.info("Your CSV-Export has been prepared successfully.\n" +
f"Download the file <a href=/Shared/{display_path}>here</a>.")
try:
send_mail_with_defaults(
to=get_email_from_username(),
subject=f"BIS container export {file_name}",
body=create_email_with_link_text("container export", display_path)
)
except KeyError as ke:
logger.error(
"There is a problem with the server's email configuration:\n\n"
f"{ke}\n\nPlease contact your admin."
)
else:
msg = "{} export_container_csv.py was called without the JSON file in args. {}".format(
ERROR_PREFIX, ERROR_SUFFIX)
logger.error(msg)
if __name__ == "__main__":
main()