#!/usr/bin/env python3 # encoding: utf-8 # # This file is a part of the CaosDB Project. # # Copyright (C) 2023 Indiscale GmbH <info@indiscale.com> # Copyright (C) 2023 Timm Fitschen <t.fitschen@indiscale.com> # Copyright (C) 2023 Florian Spreckelsen # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # import json import logging import os import sys import linkahead as db from caosadvancedtools.serverside import helper from caosadvancedtools.table_export import BaseTableExporter from caoscrawler.logging import configure_server_side_logging from linkahead.cached import cached_get_entity_by, cached_query as cquery from linkahead.exceptions import (EmptyUniqueQueryError, QueryNotUniqueError) from bis_utils import (create_email_with_link_text, get_description_row, get_email_from_username, get_options_row, send_mail_with_defaults) from sample_helpers.utils import (CONSTANTS, get_column_header_name, get_entity_name) # suppress warning of diff function apilogger = logging.getLogger("linkahead.apiutils") apilogger.setLevel(logging.ERROR) logger = logging.getLogger("caosadvancedtools") ERROR_PREFIX = CONSTANTS["error_prefix"] ERROR_SUFFIX = CONSTANTS["error_suffix"] def cached_query(query, unique=False): """Wrapper for cached queries that may be unique.""" if unique: return cached_get_entity_by(query=query) return cquery(query) def reverse_semicolon_separated_list(value): if isinstance(value, list): return ";".join([str(val) for val in value]) else: return value def generate_label_text(entity): custom_label_prop_name = get_entity_name("custom_label_prop") if entity.get_property(custom_label_prop_name) is not None and entity.get_property(custom_label_prop_name).value: label = entity.get_property(custom_label_prop_name).value else: container_label_prop_name = get_entity_name("container_label_prop") label = entity.get_property(container_label_prop_name).value if entity.get_property( container_label_prop_name) is not None else None if not label: if not entity.name: return entity.id return f"{entity.id} {entity.name}" if f"{label}" == f"{entity.id}": # prevent special case of Label = ID resulting in a preview # that looks like "id, id". return entity.id return f"{entity.id} {label}" def extract_parent_container(record, key): possible_parents = cached_query( f"FIND {get_entity_name('container_rt')} WHICH REFERENCES {record.id}" ) values = [] for par in possible_parents: if par.name: values.append(par.name) else: values.append(par.id) return reverse_semicolon_separated_list(values) def extract_eid(record, key): return record.id def extract_custom_label(record, key): custom_label_prop_name = get_entity_name("custom_label_prop") if record.get_property(custom_label_prop_name) is not None: return record.get_property(custom_label_prop_name).value return None def extract_pi(record, key): if record.get_property("PI") is not None and record.get_property("PI").value is not None: pi_id = record.get_property("PI").value pi_rec = cached_query(f"FIND RECORD Person WITH ID={pi_id}", unique=True) abbr_prop_name = get_entity_name("abbreviation_prop") if pi_rec.get_property(abbr_prop_name) is not None and pi_rec.get_property(abbr_prop_name).value is not None: return pi_rec.get_property(abbr_prop_name).value return pi_id return None def extract_collection(record, key): if record.get_property("Collection") is not None and record.get_property("Collection").value is not None: collections = record.get_property("Collection").value if not isinstance(collections, list): collections = [collections] values = [] for coll in collections: cr = cached_query(f"FIND RECORD Collection WITH ID={coll}", unique=True) if cr.name is not None and cr.name != "": values.append(cr.name) else: values.append(cr.id) return reverse_semicolon_separated_list(values) return None def extract_container_type(record, key): abbr_prop_name = get_entity_name("abbreviation_prop") containertype_rt_name = get_entity_name("containertype_rt") if record.get_property(containertype_rt_name) is not None and record.get_property(containertype_rt_name).value is not None: ct_id = record.get_property(containertype_rt_name).value ctr = cached_query(f"FIND {containertype_rt_name} WITH ID={ct_id}", unique=True) if ctr.get_property(abbr_prop_name) is not None and ctr.get_property(abbr_prop_name).value is not None: return ctr.get_property(abbr_prop_name).value return ct_id return None def extract_storage_chain(record, key): def find_referencing_containers(eid): containing_containers = [] candidates = cached_query(f"FIND {get_entity_name('container_rt')} WHICH REFERENCES {eid}") if len(candidates) > 1: logger.debug(f"Entity {eid} is referenced by more than one container.") return [] elif len(candidates) == 1: containing_containers.extend(find_referencing_containers(candidates[0].id)) containing_containers.extend(candidates) return containing_containers containing_containers = find_referencing_containers(record.id) containing_containers.append(record) return " → ".join([str(generate_label_text(cont)) for cont in containing_containers]) def default_extractor(record, key): if record.get_property(key) is not None: return record.get_property(key).value else: return None EXTRACTORS = { get_column_header_name("entity_id"): extract_eid, get_column_header_name("container_label_prop"): default_extractor, get_column_header_name("Collection"): extract_collection, get_column_header_name("container_size_prop"): default_extractor, get_column_header_name("containertype_rt"): extract_container_type, get_column_header_name("custom_label_prop"): extract_custom_label, get_column_header_name("PDFReport"): default_extractor, get_column_header_name("PI"): extract_pi, get_column_header_name("Parent container"): extract_parent_container, get_column_header_name("Storage chain"): extract_storage_chain, } # List of sample properties to be ignored because they are treated # otherwise. Similar, but not identical to SPECIAL TREATMENT. IGNORE_KEYS = [ # To be changed by updating the child, not the parent. get_column_header_name("child_container_prop"), get_column_header_name("containertype_rt"), # handled separately above get_column_header_name("Responsible"), # Not to be changed via upload ] def extract_value(r, e): if e in EXTRACTORS: v = EXTRACTORS[e](r, e) else: v = default_extractor(r, e) if isinstance(v, str) and (',' in v or '\n' in v): # Quote text fields with commas in them v = f"\"{v}\"" return v if v is not None else "" class TableExporter(BaseTableExporter): pass def retrieve_containers(data): container = [] not_found = [] for eid in data: if isinstance(eid, int): try: container.append( cached_query( f"FIND RECORD {get_entity_name('container_rt')} WITH id = '{eid}'", unique=True)) except EmptyUniqueQueryError as e: # we want to warn about these not_found.append(eid) else: found_at_least_one_in_range = False for next_eid in eid: try: container.append( cached_query( f"FIND RECORD {get_entity_name('container_rt')} WITH id = '{next_eid}'", unique=True)) found_at_least_one_in_range = True except EmptyUniqueQueryError as e: pass if not found_at_least_one_in_range: not_found.append(f"{eid.start}-{eid.stop-1}") return container, not_found def to_csv(containers): export_dict = {} for key in EXTRACTORS: export_dict[key] = {} lower_case_keys = [e.lower() for e in export_dict] for c in containers: # collect other properties for p in c.get_properties(): if not p.name.lower() in lower_case_keys and not p.name.lower() in [ign.lower() for ign in IGNORE_KEYS]: export_dict[p.name] = {} lower_case_keys.append(p.name.lower()) for e in export_dict: export_dict[e]["find_func"] = extract_value export_dict[e]["optional"] = True keys = [e for e in export_dict] csv = [] for c in containers: table_exporter = TableExporter(export_dict, record=c) table_exporter.all_keys = keys table_exporter.collect_information() logger.debug('<code>' + str(table_exporter.info) + '</code>') csv.append(table_exporter.prepare_csv_export(print_header=False)) header = ",".join(keys) + "\n" header += ",".join(get_description_row(keys)) + '\n' header += ",".join(get_options_row(keys)) + '\n' return header + "\n".join(csv) def write_csv(file_name, csv): display_path, internal_path = helper.get_shared_filename(file_name) with open(internal_path, "w") as csv_file: csv_file.write(csv) return display_path def main(): parser = helper.get_argument_parser() args = parser.parse_args() # Check whether executed locally or as an SSS depending on # auth_token argument. if hasattr(args, "auth_token") and args.auth_token: db.configure_connection(auth_token=args.auth_token) debug_file = configure_server_side_logging() else: rootlogger = logging.getLogger() rootlogger.setLevel(logging.INFO) logger.setLevel(logging.DEBUG) handler = logging.StreamHandler(stream=sys.stdout) handler.setLevel(logging.DEBUG) rootlogger.addHandler(handler) debug_file = "/tmp/upload_sample_debug.log" if hasattr(args, "filename") and args.filename: # Read the input from the form (form.json) with open(args.filename) as form_json: form_data = json.load(form_json) tmp = form_data["container_ids"].split(",") data = [] for d in tmp: if "-" in d: bound = [int(b) for b in d.split("-")] data.append(range(min(bound), max(bound) + 1)) else: data.append(int(d.strip())) containers, not_found = retrieve_containers(data) if len(containers) == 0: logger.error("No containers in the given range.") return for s in containers: logger.debug("Found container " + str(s.id)) for s in not_found: logger.warning("No containers found: " + str(s)) csv = to_csv(containers) max_id = max([c.id for c in containers]) min_id = min([c.id for c in containers]) file_name = f"containers_export_(IDs_{min_id}_to_{max_id}).csv" display_path = write_csv(file_name, csv) logger.info("Your CSV-Export has been prepared successfully.\n" + f"Download the file <a href=/Shared/{display_path}>here</a>.") try: send_mail_with_defaults( to=get_email_from_username(), subject=f"BIS container export {file_name}", body=create_email_with_link_text("container export", display_path) ) except KeyError as ke: logger.error( "There is a problem with the server's email configuration:\n\n" f"{ke}\n\nPlease contact your admin." ) else: msg = "{} export_container_csv.py was called without the JSON file in args. {}".format( ERROR_PREFIX, ERROR_SUFFIX) logger.error(msg) if __name__ == "__main__": main()