Skip to content
Snippets Groups Projects
Select Git revision
  • d8645b5845238f5487daf88fad15b301742d76cb
  • main default protected
  • dev
  • v0.1.0
  • GEOMAR
5 results

export_container_csv.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    export_container_csv.py 12.73 KiB
    #!/usr/bin/env python3
    # encoding: utf-8
    #
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2023 Indiscale GmbH <info@indiscale.com>
    # Copyright (C) 2023 Timm Fitschen <t.fitschen@indiscale.com>
    # Copyright (C) 2023 Florian Spreckelsen
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    import json
    import logging
    import os
    import sys
    
    import linkahead as db
    
    from caosadvancedtools.serverside import helper
    from caosadvancedtools.table_export import BaseTableExporter
    from caoscrawler.logging import configure_server_side_logging
    from linkahead.cached import cached_get_entity_by, cached_query as cquery
    from linkahead.exceptions import (EmptyUniqueQueryError,
                                      QueryNotUniqueError)
    
    from bis_utils import (create_email_with_link_text,
                           get_description_row, get_email_from_username,
                           get_options_row, send_mail_with_defaults)
    from sample_helpers.utils import (CONSTANTS, get_column_header_name,
                                      get_entity_name)
    
    # suppress warning of diff function
    apilogger = logging.getLogger("linkahead.apiutils")
    apilogger.setLevel(logging.ERROR)
    
    logger = logging.getLogger("caosadvancedtools")
    
    ERROR_PREFIX = CONSTANTS["error_prefix"]
    ERROR_SUFFIX = CONSTANTS["error_suffix"]
    
    
    def cached_query(query, unique=False):
        """Wrapper for cached queries that may be unique."""
        if unique:
            return cached_get_entity_by(query=query)
        return cquery(query)
    
    
    def reverse_semicolon_separated_list(value):
        if isinstance(value, list):
            return ";".join([str(val) for val in value])
        else:
            return value
    
    
    def generate_label_text(entity):
    
        custom_label_prop_name = get_entity_name("custom_label_prop")
        if entity.get_property(custom_label_prop_name) is not None and entity.get_property(custom_label_prop_name).value:
            label = entity.get_property(custom_label_prop_name).value
        else:
            container_label_prop_name = get_entity_name("container_label_prop")
            label = entity.get_property(container_label_prop_name).value if entity.get_property(
                container_label_prop_name) is not None else None
    
        if not label:
            if not entity.name:
                return entity.id
            return f"{entity.id} {entity.name}"
        if f"{label}" == f"{entity.id}":
            # prevent special case of Label = ID resulting in a preview
            # that looks like "id, id".
            return entity.id
        return f"{entity.id} {label}"
    
    
    def extract_parent_container(record, key):
    
        possible_parents = cached_query(
            f"FIND {get_entity_name('container_rt')} WHICH REFERENCES {record.id}"
        )
    
        values = []
        for par in possible_parents:
            if par.name:
                values.append(par.name)
            else:
                values.append(par.id)
        return reverse_semicolon_separated_list(values)
    
    
    def extract_eid(record, key):
    
        return record.id
    
    
    def extract_custom_label(record, key):
    
        custom_label_prop_name = get_entity_name("custom_label_prop")
    
        if record.get_property(custom_label_prop_name) is not None:
    
            return record.get_property(custom_label_prop_name).value
    
        return None
    
    
    def extract_pi(record, key):
    
        if record.get_property("PI") is not None and record.get_property("PI").value is not None:
    
            pi_id = record.get_property("PI").value
            pi_rec = cached_query(f"FIND RECORD Person WITH ID={pi_id}", unique=True)
    
            abbr_prop_name = get_entity_name("abbreviation_prop")
            if pi_rec.get_property(abbr_prop_name) is not None and pi_rec.get_property(abbr_prop_name).value is not None:
                return pi_rec.get_property(abbr_prop_name).value
            return pi_id
    
        return None
    
    
    def extract_collection(record, key):
    
        if record.get_property("Collection") is not None and record.get_property("Collection").value is not None:
    
            collections = record.get_property("Collection").value
            if not isinstance(collections, list):
                collections = [collections]
    
            values = []
            for coll in collections:
                cr = cached_query(f"FIND RECORD Collection WITH ID={coll}", unique=True)
                if cr.name is not None and cr.name != "":
                    values.append(cr.name)
                else:
                    values.append(cr.id)
            return reverse_semicolon_separated_list(values)
        return None
    
    
    def extract_container_type(record, key):
    
        abbr_prop_name = get_entity_name("abbreviation_prop")
        containertype_rt_name = get_entity_name("containertype_rt")
    
        if record.get_property(containertype_rt_name) is not None and record.get_property(containertype_rt_name).value is not None:
            ct_id = record.get_property(containertype_rt_name).value
            ctr = cached_query(f"FIND {containertype_rt_name} WITH ID={ct_id}", unique=True)
            if ctr.get_property(abbr_prop_name) is not None and ctr.get_property(abbr_prop_name).value is not None:
                return ctr.get_property(abbr_prop_name).value
            return ct_id
        return None
    
    
    def extract_storage_chain(record, key):
    
        def find_referencing_containers(eid):
    
            containing_containers = []
            candidates = cached_query(f"FIND {get_entity_name('container_rt')} WHICH REFERENCES {eid}")
            if len(candidates) > 1:
                logger.debug(f"Entity {eid} is referenced by more than one container.")
                return []
            elif len(candidates) == 1:
                containing_containers.extend(find_referencing_containers(candidates[0].id))
            containing_containers.extend(candidates)
            return containing_containers
    
        containing_containers = find_referencing_containers(record.id)
        containing_containers.append(record)
    
        return "".join([str(generate_label_text(cont)) for cont in containing_containers])
    
    
    def default_extractor(record, key):
    
        if record.get_property(key) is not None:
            return record.get_property(key).value
        else:
            return None
    
    
    EXTRACTORS = {
        get_column_header_name("entity_id"): extract_eid,
        get_column_header_name("container_label_prop"): default_extractor,
        get_column_header_name("Collection"): extract_collection,
        get_column_header_name("container_size_prop"): default_extractor,
        get_column_header_name("containertype_rt"): extract_container_type,
        get_column_header_name("custom_label_prop"): extract_custom_label,
        get_column_header_name("PDFReport"): default_extractor,
        get_column_header_name("PI"): extract_pi,
        get_column_header_name("Parent container"): extract_parent_container,
        get_column_header_name("Storage chain"): extract_storage_chain,
    }
    
    # List of sample properties to be ignored because they are treated
    # otherwise. Similar, but not identical to SPECIAL TREATMENT.
    IGNORE_KEYS = [
        # To be changed by updating the child, not the parent.
        get_column_header_name("child_container_prop"),
        get_column_header_name("containertype_rt"),  # handled separately above
        get_column_header_name("Responsible"),  # Not to be changed via upload
    ]
    
    
    def extract_value(r, e):
        if e in EXTRACTORS:
            v = EXTRACTORS[e](r, e)
        else:
            v = default_extractor(r, e)
        if isinstance(v, str) and (',' in v or '\n' in v):
            # Quote text fields with commas in them
            v = f"\"{v}\""
    
        return v if v is not None else ""
    
    
    class TableExporter(BaseTableExporter):
        pass
    
    
    def retrieve_containers(data):
    
        container = []
        not_found = []
        for eid in data:
            if isinstance(eid, int):
                try:
                    container.append(
                        cached_query(
                            f"FIND RECORD {get_entity_name('container_rt')} WITH id = '{eid}'",
                            unique=True))
                except EmptyUniqueQueryError as e:
                    # we want to warn about these
                    not_found.append(eid)
            else:
                found_at_least_one_in_range = False
                for next_eid in eid:
                    try:
                        container.append(
                            cached_query(
                                f"FIND RECORD {get_entity_name('container_rt')} WITH id = '{next_eid}'",
                                unique=True))
                        found_at_least_one_in_range = True
                    except EmptyUniqueQueryError as e:
                        pass
                if not found_at_least_one_in_range:
                    not_found.append(f"{eid.start}-{eid.stop-1}")
        return container, not_found
    
    
    def to_csv(containers):
    
        export_dict = {}
        for key in EXTRACTORS:
            export_dict[key] = {}
    
        lower_case_keys = [e.lower() for e in export_dict]
        for c in containers:
            # collect other properties
            for p in c.get_properties():
                if not p.name.lower() in lower_case_keys and not p.name.lower() in [ign.lower() for ign in IGNORE_KEYS]:
                    export_dict[p.name] = {}
                    lower_case_keys.append(p.name.lower())
    
        for e in export_dict:
            export_dict[e]["find_func"] = extract_value
            export_dict[e]["optional"] = True
    
        keys = [e for e in export_dict]
        csv = []
        for c in containers:
            table_exporter = TableExporter(export_dict, record=c)
            table_exporter.all_keys = keys
            table_exporter.collect_information()
            logger.debug('<code>' + str(table_exporter.info) + '</code>')
    
            csv.append(table_exporter.prepare_csv_export(print_header=False))
    
        header = ",".join(keys) + "\n"
        header += ",".join(get_description_row(keys)) + '\n'
        header += ",".join(get_options_row(keys)) + '\n'
        return header + "\n".join(csv)
    
    
    def write_csv(file_name, csv):
    
        display_path, internal_path = helper.get_shared_filename(file_name)
        with open(internal_path, "w") as csv_file:
            csv_file.write(csv)
        return display_path
    
    
    def main():
        parser = helper.get_argument_parser()
        args = parser.parse_args()
        # Check whether executed locally or as an SSS depending on
        # auth_token argument.
        if hasattr(args, "auth_token") and args.auth_token:
            db.configure_connection(auth_token=args.auth_token)
            debug_file = configure_server_side_logging()
        else:
            rootlogger = logging.getLogger()
            rootlogger.setLevel(logging.INFO)
            logger.setLevel(logging.DEBUG)
            handler = logging.StreamHandler(stream=sys.stdout)
            handler.setLevel(logging.DEBUG)
            rootlogger.addHandler(handler)
            debug_file = "/tmp/upload_sample_debug.log"
    
        if hasattr(args, "filename") and args.filename:
            # Read the input from the form (form.json)
            with open(args.filename) as form_json:
                form_data = json.load(form_json)
    
                tmp = form_data["container_ids"].split(",")
                data = []
                for d in tmp:
                    if "-" in d:
                        bound = [int(b) for b in d.split("-")]
                        data.append(range(min(bound), max(bound) + 1))
                    else:
                        data.append(int(d.strip()))
    
            containers, not_found = retrieve_containers(data)
    
            if len(containers) == 0:
                logger.error("No containers in the given range.")
                return
    
            for s in containers:
                logger.debug("Found container " + str(s.id))
            for s in not_found:
                logger.warning("No containers found: " + str(s))
    
            csv = to_csv(containers)
    
            max_id = max([c.id for c in containers])
            min_id = min([c.id for c in containers])
            file_name = f"containers_export_(IDs_{min_id}_to_{max_id}).csv"
            display_path = write_csv(file_name, csv)
            logger.info("Your CSV-Export has been prepared successfully.\n" +
                        f"Download the file <a href=/Shared/{display_path}>here</a>.")
            try:
                send_mail_with_defaults(
                    to=get_email_from_username(),
                    subject=f"BIS container export {file_name}",
                    body=create_email_with_link_text("container export", display_path)
                )
            except KeyError as ke:
                logger.error(
                    "There is a problem with the server's email configuration:\n\n"
                    f"{ke}\n\nPlease contact your admin."
                )
        else:
            msg = "{} export_container_csv.py was called without the JSON file in args. {}".format(
                ERROR_PREFIX, ERROR_SUFFIX)
            logger.error(msg)
    
    
    if __name__ == "__main__":
        main()