Skip to content
Snippets Groups Projects
Select Git revision
  • d1cd32723f97023209915d357ec070ca668e0bb6
  • main default protected
  • dev protected
  • f-linkahead-rename
  • f-real-id
  • f-filesystem-import
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-filesystem-main
  • f-name
  • keep_changes
  • f-permission-checks-2
  • f-mysql8-tests
  • f-retrieve-history
  • t-distinct-parents
  • v8.1.0
  • v8.0.0
  • v7.0.2
  • v7.0.1
  • v7.0.0
  • v6.0.1
  • v6.0.0
  • v5.0.0
  • v4.1.0
  • v4.0.0
  • v3.0
  • v2.0.30
29 results

patch.sql

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    bis_utils.py 8.15 KiB
    #!/usr/bin/env python3
    # encoding: utf-8
    #
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
    # Copyright (C) 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    
    import os
    import pandas as pd
    import re
    
    from urllib.parse import urljoin
    
    import linkahead as db
    
    from caosadvancedtools.serverside.helper import send_mail
    from caoscrawler.config import get_config_setting
    from linkahead import get_entity_by_name
    
    from sample_helpers.sample_upload_column_definitions import (
        IGNORED_COLUMN_NAMES_SAMPLE, SPECIAL_TREATMENT_SAMPLE)
    from sample_helpers.utils import CONSTANTS
    
    
    COLUMN_DESCRIPTIONS = CONSTANTS["csv_column_descriptions"]
    
    
    def get_do_not_insert_type_names(override_names: list[str] = []):
        """Return all names of RecordTypes with parent
        ``ControlledRecordType``, the name of which is not included in
        ``override_names``.
    
        """
        # To lower-case for case-insensitivity
        overrides_lower = [name.lower() for name in override_names]
        do_not_inserts = [rt.name for rt in db.execute_query(
            "SELECT name FROM RECORDTYPE ControlledRecordType")]
    
        return [name for name in do_not_inserts if name.lower() not in overrides_lower]
    
    
    def get_email_from_username(suffix: str = "@geomar.de"):
        """Return the email address for a given username. For now, just
        `username+suffix`.
    
        """
        # connection.get_username doesn't work with auth tokens, so use the user name.
        username = db.Info().user_info.name
        return f"{username}{suffix}"
    
    
    def create_email_with_link_text(ftype: str, fpath_within_shared: str):
        """Return a standard email body text stating the type of download and the
        link to the file.
    
        Parameters
        ----------
        ftype : str
            Type of the download, e.g., "Sample export" or "Sample registration".
        fpath_within_shared : str
            Relative path of the file to be downloaded w.r.t. the SHARED directory.
        """
    
        public_host_url = get_config_setting("public_host_url")
        full_link = urljoin(public_host_url, os.path.join("/Shared", fpath_within_shared))
    
        body_text = f"""
    Hello,
    
    Your {ftype} is ready for download.  You can download it here:
    
    {full_link}
    
    Note that this link will become invalid after a BIS server restart.
        """
    
        return body_text
    
    
    def send_mail_with_defaults(**kwargs):
        """Thin wrapper for caosadvancedtools.serverside.helper.send_mail that fills
        some arguments with reasonable defaults: `from_addr` and `to` default to the
        crawler's sendmail config if not specified.
    
        """
        if not "from_addr" in kwargs:
            kwargs["from_addr"] = get_config_setting("sendmail_from_address")
        if not "to" in kwargs:
            kwargs["to"] = get_config_setting("sendmail_to_address")
        if not "cc" in kwargs:
            # If none is specified, CC curator if curator is not in the recipients
            # already.
            curator_addr = get_config_setting("sendmail_to_address")
            if isinstance(kwargs["to"], list):
                if not curator_addr in kwargs["to"]:
                    kwargs["cc"] = curator_addr
            elif kwargs["to"] != curator_addr:
                kwargs["cc"] = curator_addr
    
        send_mail(**kwargs)
    
    
    def replace_entity_urls_by_ids(data: pd.DataFrame, eid_columns: list[str] = ["BIS ID", "Parent container"]):
        """Replace all entity urls in the relevant columns `BIS ID` and `Parent
        container` by their entity id.
    
        """
        entity_url_pattern = r"^http(s)?:\/\/(.*)?\/(E|e)ntity\/(?P<eid>(.*))$"
        for index, row in data.iterrows():
            for cname in eid_columns:
                if cname in row:
                    matches = re.match(entity_url_pattern, str(row[cname]))
                    if matches:
                        data.at[index, cname] = matches.groupdict()["eid"]
    
        return data
    
    
    def return_value_if_not_none(val):
        """Workaround for somewhat weird pandas behavior. Return value if actual
        value, otherwise, return None.
    
        """
        if isinstance(val, list):
            if len(val) == 0:
                return None
            if len(val) == 1:
                if pd.isnull(val[0]) or val[0] is None or f"{val[0]}".lower() == "nan" or f"{val[0]}" == "":
                    return None
        elif pd.isnull(val) or val is None or f"{val}".lower() == "nan" or f"{val}" == "":
            return None
        return val
    
    
    def whitespace_cleanup_in_df(df: pd.DataFrame):
        """Strip all leading and trailing whitespaces from all str values in df."""
    
        for col_name in df.columns:
            if pd.api.types.is_string_dtype(df[col_name].dtype):
                df[col_name] = df[col_name].str.strip()
    
        return df
    
    
    def get_description_row(column_names: list[str]):
    
        descriptions = []
        for name in column_names:
            descr = ""
            if name in COLUMN_DESCRIPTIONS:
                # Simple: Hard coded
                descr = COLUMN_DESCRIPTIONS[name]
            else:
                # Try properties first
                cand = db.execute_query(f"FIND PROPERTY WITH name='{name}'")
                if len(cand) == 0:
                    # Try RecordTypes
                    cand = db.execute_query(f"FIND RECORDTYPE WITH name='{name}'")
                if len(cand) == 1 and cand[0].description is not None:
                    descr = cand[0].description
            if ',' in descr:
                descr = f"\"{descr}\""
            descriptions.append(descr)
    
        if descriptions:
            descriptions[0] = f"#{descriptions[0]}"
    
        return descriptions
    
    
    def get_options_row(column_names: list[str], override_names: list[str] = []):
        """Return a list of possible options for all column names which
        correspond to controlled RecordTypes.
    
        """
    
        controlled_rts = get_do_not_insert_type_names(override_names)
        # We will need special treatment for gears, but only if they are
        # in the column_names
        gears_and_configs = {}
        if "Gear" in column_names:
            gears_recs = db.execute_query("SELECT Parent, Configuration FROM RECORD Gear")
            for rec in gears_recs:
                # We ignore the case of multiple parents for gears for now.
                gear_name = rec.parents[0].name
                if rec.get_property("Configuration") is not None:
                    config = rec.get_property("Configuration").value
                else:
                    config = None
                if gear_name in gears_and_configs:
                    gears_and_configs[gear_name].append(config)
                else:
                    gears_and_configs[gear_name] = [config]
    
        options = []
        for name in column_names:
            if name.lower() == "gear":
                option = ";".join(list(gears_and_configs.keys()))
            elif name.lower() == "gear configuration":
                option = ";".join([f"{key}:{val}" for key, val in gears_and_configs.items()])
            elif name in controlled_rts:
                rt = get_entity_by_name(name, role="RECORDTYPE")
                if len(rt.properties) == 0:
                    # Records are identified by name
                    recs = db.execute_query(f"SELECT name FROM RECORD '{name}'")
                    option = ";".join([rec.name for rec in recs if name is not None])
                else:
                    # We use the first property (should be only) as identifier.
                    pname = rt.properties[0].name
                    recs = db.execute_query(f"SELECT '{pname}' FROM RECORD '{name}'")
                    non_empty_values = [rec.get_property(
                        pname).value for rec in recs if rec.get_property(pname).value is not None]
                    option = ";".join(non_empty_values)
            else:
                option = ""
            if ',' in option:
                option = f"\"{option}\""
            options.append(option)
    
        if options:
            options[0] = f"#{options[0]}"
    
        return options