#!/usr/bin/env python3
# encoding: utf-8
#
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2022 - 2023 GEOMAR
# Copyright (C) 2022 Jakob Eckstein
# Copyright (C) 2023 Indiscale GmbH <info@indiscale.com>
# Copyright (C) 2023 Florian Spreckelsen <f.spreckelsen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import csv
import datetime
import json
import logging
import sys
import caosdb as db
from caosadvancedtools.serverside import helper
from caosadvancedtools.serverside.logging import configure_server_side_logging
from bis_utils import (create_email_with_link_text,
get_description_row, get_email_from_username,
get_options_row, send_mail_with_defaults)
from sample_helpers.container_name import get_container_name_prefix
from sample_helpers.container_registration_identifier import get_container_identifier
from sample_helpers.container_registration_post_processing import (
post_process_containers, post_process_parent_container)
from sample_helpers.utils import CONSTANTS, get_column_header_name, get_entity_name
ERROR_PREFIX = CONSTANTS["error_prefix"]
ERROR_SUFFIX = CONSTANTS["error_suffix"]
logger = logging.getLogger("caosadvancedtools")
def get_parser():
par = helper.get_argument_parser()
# par.add_argument(
# "ids", help="The id(s) of the sample record(s) to be exported.", type=int, metavar="id", nargs='*')
return par
def get_current_year():
currentDateTime = datetime.datetime.now()
date = currentDateTime.date()
year = date.strftime("%Y")
return year
def create_new_label_counter(container_type_id, value=1):
counter = db.Record()
counter.add_parent(name=get_entity_name("labelcounter_rt"))
counter.add_property(name=get_entity_name("labelcounter_prop"), value=value)
counter.add_property(name=get_entity_name("containertype_rt"), value=container_type_id)
counter.insert()
def get_label_counter(container_type_id):
counter = db.execute_query(
f"FIND '{get_entity_name('labelcounter_rt')}' WITH "
f"'{get_entity_name('containertype_rt')}' = {container_type_id}"
)
if len(counter) == 0:
create_new_label_counter(container_type_id)
return 1
else:
return counter[0].get_property(get_entity_name("labelcounter_prop")).value
def set_label_counter(container_type_id, value):
counter = db.execute_query(
f"FIND '{get_entity_name('labelcounter_rt')}' WITH "
f"'{get_entity_name('containertype_rt')}' = {container_type_id}"
)
if len(counter) == 0:
create_new_label_counter(container_type_id, value)
else:
try:
counter[0].get_property(get_entity_name("labelcounter_prop")).value = value
counter.update()
except:
logger.error('{}There was an error when updating the internal label counter {}.{}'.format(
ERROR_PREFIX, counter[0], ERROR_SUFFIX))
def create_container_entities(responsible_id, container_type_id, container_size, number_of_containers, parent_container):
"""Create the container entities specified via the "Register
Containers" form in the web interface.
Returns
-------
conts : db.Container
A container of the newly registered container entities.
"""
# Create Batch for insert
batch = db.Container()
# Get BIS Label for responsible person
responsible_entity = db.execute_query("FIND {}".format(responsible_id))[0]
bis_label_prefix = get_container_name_prefix(responsible_id, container_type_id)
bis_label_counter = get_label_counter(container_type_id)
for i in range(number_of_containers):
bis_label = f"{bis_label_prefix}_{bis_label_counter+i}"
container = db.Record(bis_label)
container.add_parent(name=get_entity_name("container_rt"))
container.add_property(name=get_entity_name("responsible_rt"), value=responsible_id)
container.add_property(name=get_entity_name("containertype_rt"), value=container_type_id)
container.add_property(name=get_entity_name("container_size_prop"), value=container_size)
container.add_property(name=get_entity_name("child_container_prop"), value=[])
batch += [container]
# Post process child containers
batch = post_process_containers(batch)
try:
batch.insert()
except:
logger.error("{}The registered containers could not be imported to the system.{}".format(
ERROR_PREFIX, ERROR_SUFFIX))
return None
# TODO: Reset the internal_container_label_counter of person to its previous value if nothing has be inserted
# Add to parent container
if parent_container is not None:
child_cont_prop = db.Property(name=get_entity_name("child_container_prop").retrieve()
new_childs=[]
for entity in batch:
new_childs += [entity.id]
# Check if property Child container exists:
if parent_container.get_property(child_cont_prop.name) is None:
parent_container.add_property(id=child_cont_prop.id, value=new_childs)
else:
# Check if the container allready has childs
if parent_container.get_property(child_cont_prop.name).value is None:
parent_container.get_property(child_cont_prop.name).value=new_childs
else:
parent_container.get_property(child_cont_prop.name).value += new_childs
parent_container=post_process_parent_container(parent)
parent_container.update()
set_label_counter(container_type_id=container_type_id,
value=bis_label_counter + number_of_containers)
return batch
def get_template_name(container_entities, file_format_extension):
first_id=container_entities[0].id
last_id=container_entities[-1].id
return "container_template_(IDs_{}_to_{}).{}".format(first_id, last_id, file_format_extension)
def get_parent_container(id):
try:
parent=db.execute_query("FIND {}".format(id))[0]
return parent
except:
logger.info(
"No parent container specified. The Contianer ID or label of the "
"parent container has to be entered in the template before it is uploaded."
)
return None
def get_container_type(id):
try:
type=db.execute_query("FIND {}".format(id))[0]
return type
except:
logger.error("{}The specified ContainerType could not be retrieved. {}".format(
ERROR_PREFIX, ERROR_SUFFIX))
def create_csv_template(template_internal_path, container_entities, container_type, container_size, parent_container):
if parent_container is None:
parent_container_label=""
else:
parent_container_label=get_container_identifier(parent_container)
with open(template_internal_path, 'w') as template_csv:
writer=csv.writer(template_csv)
# Write header
headers=[
"entity_id",
"container_type_rt",
"container_size_prop",
"container_label",
"PI",
"Collection",
"Parent container",
"Custom label",
"Container Contents",
"PDFReport"
]
headers=[get_column_header_name(header) for header in headers]
writer.writerow(headers)
# Write description with leading '#'
descriptions=get_description_row(headers)
if descriptions:
writer.writerow(descriptions)
options=get_options_row(headers)
if options:
writer.writerow(options)
# Write entity data to the lines
for entity in container_entities:
writer.writerow([
entity.id,
container_type.name,
container_size,
get_container_identifier(entity),
"",
"",
parent_container_label,
"",
""])
def main():
parser=get_parser()
args=parser.parse_args()
global logger
# Check whether executed locally or as an SSS depending on
# auth_token argument.
if hasattr(args, "auth_token") and args.auth_token:
db.configure_connection(auth_token=args.auth_token)
debug_file=configure_server_side_logging()
# logger = logging.getLogger("caosadvancedtools")
else:
# logging config for local execution
# logger = logging.getLogger("caosadvancedtools")
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)
debug_file=None
if hasattr(args, "filename") and args.filename:
# Read the input from the form (form.json)
with open(args.filename) as form_json:
form_data=json.load(form_json)
parent_container=get_parent_container(form_data["parent_id"])
container_type=get_container_type(form_data["container_type"])
container_size=form_data["container_size"]
# Create entities in BIS
container_entities=create_container_entities(
responsible_id=int(form_data["responsible_entity"]),
container_type_id=int(form_data["container_type"]),
container_size=container_size,
number_of_containers=int(form_data["number_of_containers"]),
parent_container=parent_container
)
# In case there has been an error upon container creation
if container_entities is None:
logger.error("The new containers could not be inserted.")
return 1
# Create template file on the server
file_format_extension=form_data["file_format"]
template_name=get_template_name(container_entities, file_format_extension)
template_display_path, template_internal_path=helper.get_shared_filename(
template_name)
if (file_format_extension == "csv"):
create_csv_template(
template_internal_path=template_internal_path,
container_entities=container_entities,
container_type=container_type,
container_size=container_size,
parent_container=parent_container)
logger.info("Your CSV-template has been prepared successfully.\n" +
"Download the file <a href=/Shared/{}>here</a>.".format(template_display_path))
try:
send_mail_with_defaults(
to=get_email_from_username(),
subject=f"BIS container registration {template_name}",
body=create_email_with_link_text(
"container registration template", template_display_path)
)
except KeyError as ke:
logger.error(
"There is a problem with the server's email configuration:\n\n"
f"{ke}\n\nPlease contact your admin."
)
else:
logger.error("{}There was a problem with the specified file format: {}.{}".format(
ERROR_PREFIX, file_format_extension, ERROR_SUFFIX))
else:
logger.error("{}register_new_contaners.py was called without the JSON file in args.{}".format(
ERROR_PREFIX, ERROR_SUFFIX))
if __name__ == "__main__":
main()