Select Git revision
-
Henrik tom Wörden authoredHenrik tom Wörden authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
bis_utils.py 8.15 KiB
#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2024 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2024 Florian Spreckelsen <f.spreckelsen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import os
import pandas as pd
import re
from urllib.parse import urljoin
import linkahead as db
from caosadvancedtools.serverside.helper import send_mail
from caoscrawler.config import get_config_setting
from linkahead import get_entity_by_name
from sample_helpers.sample_upload_column_definitions import (
IGNORED_COLUMN_NAMES_SAMPLE, SPECIAL_TREATMENT_SAMPLE)
from sample_helpers.utils import CONSTANTS
COLUMN_DESCRIPTIONS = CONSTANTS["csv_column_descriptions"]
def get_do_not_insert_type_names(override_names: list[str] = []):
"""Return all names of RecordTypes with parent
``ControlledRecordType``, the name of which is not included in
``override_names``.
"""
# To lower-case for case-insensitivity
overrides_lower = [name.lower() for name in override_names]
do_not_inserts = [rt.name for rt in db.execute_query(
"SELECT name FROM RECORDTYPE ControlledRecordType")]
return [name for name in do_not_inserts if name.lower() not in overrides_lower]
def get_email_from_username(suffix: str = "@geomar.de"):
"""Return the email address for a given username. For now, just
`username+suffix`.
"""
# connection.get_username doesn't work with auth tokens, so use the user name.
username = db.Info().user_info.name
return f"{username}{suffix}"
def create_email_with_link_text(ftype: str, fpath_within_shared: str):
"""Return a standard email body text stating the type of download and the
link to the file.
Parameters
----------
ftype : str
Type of the download, e.g., "Sample export" or "Sample registration".
fpath_within_shared : str
Relative path of the file to be downloaded w.r.t. the SHARED directory.
"""
public_host_url = get_config_setting("public_host_url")
full_link = urljoin(public_host_url, os.path.join("/Shared", fpath_within_shared))
body_text = f"""
Hello,
Your {ftype} is ready for download. You can download it here:
{full_link}
Note that this link will become invalid after a BIS server restart.
"""
return body_text
def send_mail_with_defaults(**kwargs):
"""Thin wrapper for caosadvancedtools.serverside.helper.send_mail that fills
some arguments with reasonable defaults: `from_addr` and `to` default to the
crawler's sendmail config if not specified.
"""
if not "from_addr" in kwargs:
kwargs["from_addr"] = get_config_setting("sendmail_from_address")
if not "to" in kwargs:
kwargs["to"] = get_config_setting("sendmail_to_address")
if not "cc" in kwargs:
# If none is specified, CC curator if curator is not in the recipients
# already.
curator_addr = get_config_setting("sendmail_to_address")
if isinstance(kwargs["to"], list):
if not curator_addr in kwargs["to"]:
kwargs["cc"] = curator_addr
elif kwargs["to"] != curator_addr:
kwargs["cc"] = curator_addr
send_mail(**kwargs)
def replace_entity_urls_by_ids(data: pd.DataFrame, eid_columns: list[str] = ["BIS ID", "Parent container"]):
"""Replace all entity urls in the relevant columns `BIS ID` and `Parent
container` by their entity id.
"""
entity_url_pattern = r"^http(s)?:\/\/(.*)?\/(E|e)ntity\/(?P<eid>(.*))$"
for index, row in data.iterrows():
for cname in eid_columns:
if cname in row:
matches = re.match(entity_url_pattern, str(row[cname]))
if matches:
data.at[index, cname] = matches.groupdict()["eid"]
return data
def return_value_if_not_none(val):
"""Workaround for somewhat weird pandas behavior. Return value if actual
value, otherwise, return None.
"""
if isinstance(val, list):
if len(val) == 0:
return None
if len(val) == 1:
if pd.isnull(val[0]) or val[0] is None or f"{val[0]}".lower() == "nan" or f"{val[0]}" == "":
return None
elif pd.isnull(val) or val is None or f"{val}".lower() == "nan" or f"{val}" == "":
return None
return val
def whitespace_cleanup_in_df(df: pd.DataFrame):
"""Strip all leading and trailing whitespaces from all str values in df."""
for col_name in df.columns:
if pd.api.types.is_string_dtype(df[col_name].dtype):
df[col_name] = df[col_name].str.strip()
return df
def get_description_row(column_names: list[str]):
descriptions = []
for name in column_names:
descr = ""
if name in COLUMN_DESCRIPTIONS:
# Simple: Hard coded
descr = COLUMN_DESCRIPTIONS[name]
else:
# Try properties first
cand = db.execute_query(f"FIND PROPERTY WITH name='{name}'")
if len(cand) == 0:
# Try RecordTypes
cand = db.execute_query(f"FIND RECORDTYPE WITH name='{name}'")
if len(cand) == 1 and cand[0].description is not None:
descr = cand[0].description
if ',' in descr:
descr = f"\"{descr}\""
descriptions.append(descr)
if descriptions:
descriptions[0] = f"#{descriptions[0]}"
return descriptions
def get_options_row(column_names: list[str], override_names: list[str] = []):
"""Return a list of possible options for all column names which
correspond to controlled RecordTypes.
"""
controlled_rts = get_do_not_insert_type_names(override_names)
# We will need special treatment for gears, but only if they are
# in the column_names
gears_and_configs = {}
if "Gear" in column_names:
gears_recs = db.execute_query("SELECT Parent, Configuration FROM RECORD Gear")
for rec in gears_recs:
# We ignore the case of multiple parents for gears for now.
gear_name = rec.parents[0].name
if rec.get_property("Configuration") is not None:
config = rec.get_property("Configuration").value
else:
config = None
if gear_name in gears_and_configs:
gears_and_configs[gear_name].append(config)
else:
gears_and_configs[gear_name] = [config]
options = []
for name in column_names:
if name.lower() == "gear":
option = ";".join(list(gears_and_configs.keys()))
elif name.lower() == "gear configuration":
option = ";".join([f"{key}:{val}" for key, val in gears_and_configs.items()])
elif name in controlled_rts:
rt = get_entity_by_name(name, role="RECORDTYPE")
if len(rt.properties) == 0:
# Records are identified by name
recs = db.execute_query(f"SELECT name FROM RECORD '{name}'")
option = ";".join([rec.name for rec in recs if name is not None])
else:
# We use the first property (should be only) as identifier.
pname = rt.properties[0].name
recs = db.execute_query(f"SELECT '{pname}' FROM RECORD '{name}'")
non_empty_values = [rec.get_property(
pname).value for rec in recs if rec.get_property(pname).value is not None]
option = ";".join(non_empty_values)
else:
option = ""
if ',' in option:
option = f"\"{option}\""
options.append(option)
if options:
options[0] = f"#{options[0]}"
return options