Skip to content
Snippets Groups Projects
Commit 2a2de269 authored by Henrik tom Wörden's avatar Henrik tom Wörden
Browse files

Helper tools for server side scripting

parent f8a8397d
No related branches found
No related tags found
1 merge request!22Release 0.3
# Summary
Insert a meaningful description for this merge request here. What is the
new/changed behavior? Which bug has been fixed? Are there related Issues?
# Focus
Point the reviewer to the core of the code change. Where should they start
reading? What should they focus on (e.g. security, performance,
maintainability, user-friendliness, compliance with the specs, finding more
corner cases, concrete questions)?
# Test Environment
How to set up a test environment for manual testing?
# Check List for the Author
Please, prepare your MR for a review. Be sure to write a summary and a
focus and create gitlab comments for the reviewer. They should guide the
reviewer through the changes, explain your changes and also point out open
questions. For further good practices have a look at [our review
guidelines](https://gitlab.com/caosdb/caosdb/-/blob/dev/REVIEW_GUIDELINES.md)
- [ ] All automated tests pass
- [ ] Reference related Issues
- [ ] Up-to-date CHANGELOG.md
- [ ] Annotations in code (Gitlab comments)
- Intent of new code
- Problems with old code
- Why this implementation?
# Check List for the Reviewer
- [ ] I understand the intent of this MR
- [ ] All automated tests pass
- [ ] Up-to-date CHANGELOG.md
- [ ] The test environment setup works and the intended behavior is
reproducible in the test environment
- [ ] In-code documentation and comments are up-to-date.
- [ ] Check: Are there spezifications? Are they satisfied?
For further good practices have a look at [our review guidelines](https://gitlab.com/caosdb/caosdb/-/blob/dev/REVIEW_GUIDELINES.md).
/assign me
# encoding: utf-8
#
# Copyright (C) 2019, 2020 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
# Copyright (C) 2019, 2020 Henrik tom Wörden <h.tomwoerden@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
from __future__ import absolute_import
import argparse
import datetime
import json
import logging
import sys
import caosdb as db
def wrap_bootstrap_alert(text, kind):
""" Wrap a text into a Bootstrap (3.3.7) DIV.alert.
Parameters
----------
text : str
The text body of the bootstrap alert.
kind : str
One of ["success", "info", "warning", "danger"]
Returns
-------
alert : str
A HTML str of a Bootstrap DIV.alert
"""
return ('<div class="alert alert-{kind} alert-dismissible" '
'role="alert">{text}</div>').format(kind=kind, text=text)
def print_bootstrap(text, kind, file=sys.stdout):
""" Wrap a text into a Bootstrap (3.3.7) DIV.alert and print it to a file.
Parameters
----------
text : str
The text body of the bootstrap alert.
kind : str
One of ["success", "info", "warning", "danger"]
file : file, optional
Print the alert to this file. Default: sys.stdout.
Returns
-------
None
"""
print(wrap_bootstrap_alert(text, kind), file=file)
def print_success(text):
"""Shortcut for print_bootstrap(text, kine="success")
The text body is also prefixed with "<b>Success:</b> ".
Parameters
----------
text : str
The text body of the bootstrap alert.
Returns
-------
None
"""
print_bootstrap("<b>Success:</b> " + text, kind="success")
def print_info(text):
"""Shortcut for print_bootstrap(text, kine="info")
The text body is also prefixed with "<b>Info:</b> ".
Parameters
----------
text : str
The text body of the bootstrap alert.
Returns
-------
None
"""
print_bootstrap("<b>Info:</b> " + text, kind="info")
def print_warning(text):
"""Shortcut for print_bootstrap(text, kine="warning")
The text body is also prefixed with "<b>Warning:</b> ".
Parameters
----------
text : str
The text body of the bootstrap alert.
Returns
-------
None
"""
print_bootstrap("<b>Warning:</b> " + text, kind="warning")
def print_error(text):
"""Shortcut for print_bootstrap(text, kine="danger")
The text body is also prefixed with "<b>ERROR:</b> ".
Parameters
----------
text : str
The text body of the bootstrap alert.
Returns
-------
None
"""
print_bootstrap("<b>ERROR:</b> " + text, kind="danger", file=sys.stderr)
class WebUI_Handler(logging.StreamHandler):
""" allows to make logging to be nicely displayed in the WebUI
You can enable this as follows:
logger = logging.getLogger("<LoggerName>")
logger.addHandler(WebUI_Handler())
"""
def __init__(self, *args, full_file=None, **kwargs):
super().__init__(*args, **kwargs)
self.max_elements = 100
self.counter = 0
self.full_file = full_file
def format(self, record):
""" Return the HTML formatted log record for display on a website.
Implementation of the format function as defined by
logging.StreamHandler.
Parameters
----------
record :
Raises
------
RuntimeError
If the log level of the record is not supported. Supported log
levels include logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, and logging.CRITICAL.
Returns
-------
str
The formatted log record.
"""
self.counter += 1
if self.counter == self.max_elements:
return wrap_bootstrap_alert(
"<b>Warning:</b> Due to the large number of messages, the "
"output is stopped here. You can see the full log "
" <a href='{}'>here</a>.".format(self.full_file),
kind="warning")
if self.counter > self.max_elements:
return ""
text = str(record.getMessage()).replace("\n", r"</br>")
text = text.replace("\t", r"&nbsp;"*4)
if record.levelno == logging.DEBUG:
return wrap_bootstrap_alert(str(record.getMessage()), kind="info")
elif record.levelno == logging.INFO:
return wrap_bootstrap_alert("<b>Info:</b> " + text, kind="info")
elif record.levelno == logging.WARNING:
return wrap_bootstrap_alert("<b>Warning:</b> " + text, kind="warning")
elif record.levelno == logging.ERROR:
return wrap_bootstrap_alert("<b>ERROR:</b> " + text, kind="danger")
elif record.levelno == logging.CRITICAL:
return wrap_bootstrap_alert("<b>CRITICAL ERROR:</b> " + text, kind="danger")
else:
raise RuntimeError("Unsupported log level: {}".format(record.levelno))
class DataModelError(RuntimeError):
"""DataModelError indicates that the server-side script cannot work as
intended due to missing datat model entities or an otherwise incompatible
data model."""
def __init__(self, rt):
super().__init__(
"This script expects certain RecordTypes and Properties to exist "
"in the data model. There is a problem with {} .".format(rt))
def recordtype_is_child_of(rt, parent):
"""Return True iff the RecordType is a child of another Entity.
The parent Entity can be a direct or indirect parent.
Parameters
----------
rt : caosdb.Entity
The child RecordType.
parent : str or int
The parent's name or id.
Returns
-------
bool
True iff `rt` is a child of `parent`
"""
query = "COUNT RecordType {} with id={}".format(parent, rt.id)
if db.execute_query(query) > 0:
return True
else:
return False
def init_data_model(entities):
"""Return True if all entities exist.
Parameters
----------
entities : iterable of caosdb.Entity
The data model entities which are to be checked for existence.
Raises
------
DataModelError
If any entity in `entities` does not exists.
Returns
-------
bool
True if all entities exist.
"""
try:
for e in entities:
e.retrieve()
except db.exceptions.EntityDoesNotExistError:
raise DataModelError(e.name)
return True
def get_data(filename, default=None):
"""Load data from a json file as a dict.
Parameters
----------
filename : str
The file's path, relative or absolute.
default : dict
Default data, which is overridden by the data in the file, if the keys
are defined in the file.
Returns
-------
dict
Data from the given file.
"""
result = default.copy() if default is not None else {}
with open(filename, 'r') as fi:
data = json.load(fi)
result.update(data)
return result
def get_timestamp():
"""Return a ISO 8601 compliante timestamp (second precision)"""
return datetime.datetime.utcnow().isoformat(timespec='seconds')
def get_argument_parser():
"""Return a argparse.ArgumentParser for typical use-cases.
The parser expects a file name as data input ('filename') and and an
optional auth-token ('--auth-token').
The parser can also be augmented for other use cases.
Returns
-------
argparse.ArgumentParser
"""
p = argparse.ArgumentParser()
# TODO: add better description. I do not know what json file is meant.
# TODO: use a prefix for this argument? using this in another parser is
# difficult otherwise
p.add_argument("filename", help="The json filename")
p.add_argument("--auth-token")
return p
def parse_arguments(args):
"""Use the standard parser and parse the arguments.
Call with `parse_arguments(args=sys.argv)` to parse the command line
arguments.
Parameters
----------
args : list of str
Arguments to parse.
Returns
-------
dict
Parsed arguments.
"""
p = get_argument_parser()
return p.parse_args(args)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment