Select Git revision
test_server_side_scripting.py
-
Timm Fitschen authoredTimm Fitschen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_server_side_scripting.py 16.03 KiB
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""test_server_side_scripting
Integration tests for the implementation of the server-side-scripting api.
"""
from __future__ import print_function, unicode_literals
import json
import os
import ssl
import tempfile
from http.client import HTTPSConnection
from caosdb import Info, RecordType
from caosdb import administration as admin
from caosdb import execute_query, get_config, get_connection
from caosdb.connection.encode import MultipartParam, multipart_encode
from caosdb.connection.utils import urlencode, urlparse
from caosdb.exceptions import HTTPClientError, HTTPResourceNotFoundError
from caosdb.utils.server_side_scripting import run_server_side_script
from lxml import etree
from pytest import mark, raises
_TEST_SCRIPTS = ["not_executable", "ok", "err", "ok_anonymous"]
try:
_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = get_config().get(
"IntegrationTests",
"test_server_side_scripting.bin_dir.local")
except Exception:
_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = tempfile.mkdtemp()
try:
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = get_config().get(
"IntegrationTests",
"test_server_side_scripting.bin_dir.server")
except Exception:
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = ""
_TEST_SCRIPTS_DIR = "./resources/"
_REMOVE_FILES_AFTERWARDS = []
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR = ""
def clean_database():
admin._set_permissions("anonymous", [])
d = execute_query("FIND ENTITY WITH ID > 99")
if len(d) > 0:
d.delete()
def setup():
clean_database()
def teardown():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR)
clean_database()
def setup_module():
global _ORIGINAL_SERVER_SCRIPTING_BIN_DIR
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR = admin.get_server_property(
"SERVER_SIDE_SCRIPTING_BIN_DIRS")
clean_database()
from os import makedirs
from os.path import exists, isdir, join
from shutil import copyfile, copymode
print("bin dir (local): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL))
print("bin dir (server): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER))
print("tests scripts: " + str(_TEST_SCRIPTS))
if not exists(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL):
makedirs(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
_REMOVE_FILES_AFTERWARDS.append(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
assert isdir(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
for script_file in _TEST_SCRIPTS:
target = join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, script_file)
src = join(_TEST_SCRIPTS_DIR, script_file)
copyfile(src, target)
copymode(src, target)
_REMOVE_FILES_AFTERWARDS.append(target)
def teardown_module():
from os import remove
from os.path import exists, isdir
from shutil import rmtree
for obsolete in _REMOVE_FILES_AFTERWARDS:
if exists(obsolete):
if isdir(obsolete):
rmtree(obsolete)
else:
remove(obsolete)
clean_database()
def test_call_script_non_existing():
form = dict()
form["call"] = "non_existing_script"
with raises(HTTPResourceNotFoundError):
get_connection().post_form_data("scripting", form)
@mark.local_server
def test_call_script_not_executable():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "not_executable"
with raises(HTTPClientError) as exc_info:
get_connection().post_form_data("scripting", form)
assert "not executable" in exc_info.value.body.decode("utf-8")
@mark.local_server
def test_call_ok():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "ok"
r = get_connection().post_form_data("scripting", form)
xml = etree.parse(r)
assert xml.xpath("/Response/script/call")[0].text == "ok"
assert xml.xpath("/Response/script/stdout")[0].text == "ok"
assert xml.xpath("/Response/script/stderr")[0].text is None
assert xml.xpath("/Response/script/@code")[0] == "0"
@mark.local_server
def test_call_err():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "err"
r = get_connection().post_form_data("scripting", form)
xml = etree.parse(r)
assert xml.xpath("/Response/script/@code")[0] == "1"
assert xml.xpath("/Response/script/call")[0].text == "err"
assert xml.xpath("/Response/script/stdout")[0].text is None
assert xml.xpath("/Response/script/stderr")[0].text == "err"
def test_run_server_side_script_with_file_as_positional_param():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
response = run_server_side_script("administration/diagnostics.py",
"pos0",
"pos1",
exit="123",
query="COUNT TestRT",
files={"-p2": "test_file.txt"})
assert response.stderr is None
assert response.code == 123
assert response.call == ('administration/diagnostics.py '
'--exit=123 --query=COUNT TestRT '
'pos0 pos1 .upload_files/test_file.txt')
json_data = json.loads(response.stdout)
assert "caosdb" in json_data
assert "query" in json_data["caosdb"]
assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"]
assert "./.upload_files/test_file.txt" in json_data["files"]
def test_run_server_side_script_with_additional_file():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
response = run_server_side_script("administration/diagnostics.py",
"pos0",
"pos1",
exit="123",
query="COUNT TestRT",
files={"dummykey": "test_file.txt"})
assert response.stderr is None
assert response.code == 123
assert response.call == ('administration/diagnostics.py '
'--exit=123 --query=COUNT TestRT '
'pos0 pos1')
json_data = json.loads(response.stdout)
assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"]
assert "./.upload_files/test_file.txt" in json_data["files"]
def test_diagnostics_basic():
RecordType("TestRT").insert()
form = dict()
form["call"] = "administration/diagnostics.py"
form["-Oexit"] = "123"
form["-Oquery"] = "COUNT TestRT"
response = get_connection().post_form_data("scripting", form)
xml = etree.parse(response)
print(etree.tostring(xml))
assert response.status == 200 # ok
assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
EXC_ERR = ("There shouldn't be any exception during the diagnostics of "
"the interaction of the server-side script and the server.")
assert "exception" not in diagnostics["caosdb"], EXC_ERR
assert "query" in diagnostics["caosdb"]
assert diagnostics["caosdb"]["query"][0] == "COUNT TestRT"
assert diagnostics["caosdb"]["query"][1] == "1", ("The RecordType should "
"have been found.")
assert xml.xpath("/Response/script/@code")[0] == "123", ("The script "
"should exit "
"with code 123.")
assert xml.xpath("/Response/script/call")[0].text.startswith(
"administration/diagnostics.py")
assert xml.xpath("/Response/script/stderr")[0].text is None
def test_diagnostics_with_file_upload():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
parts = []
parts.append(MultipartParam.from_file(paramname="txt_file",
filename="test_file.txt"))
parts.append(MultipartParam("call", "administration/diagnostics.py"))
body, headers = multipart_encode(parts)
response = get_connection().insert(
["scripting"], body=body, headers=headers)
assert response.status == 200 # ok
assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
xml = etree.parse(response)
print(etree.tostring(xml))
assert xml.xpath("/Response/script/@code")[0] == "0"
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
assert "exception" not in diagnostics["caosdb"], ("There shouldn't be any "
"exception during the "
"diagnostics of the "
"interaction of the "
"server-side "
"script and the server.")
assert xml.xpath("/Response/script/@code")[0] == "0", ("The script "
"should exit "
"with code 0.")
assert xml.xpath("/Response/script/call")[0].text.startswith(
"administration/diagnostics.py")
assert xml.xpath("/Response/script/stderr")[0].text is None
@mark.local_server
def test_call_as_anonymous_with_administration_role():
assert Info().user_info.roles == ["administration"]
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# give permission to call diagnostics.py
admin._set_permissions(
"anonymous", [
admin.PermissionRule(
"Grant", "SCRIPTING:EXECUTE:administration:diagnostics.py")])
form = dict()
form["call"] = "administration/diagnostics.py"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
xml = etree.parse(response)
assert response.getheader("Set-Cookie") is None # no auth token returned
assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
assert response.status == 200 # ok
assert xml.xpath("/Response/script/@code")[0] == "0"
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
assert "exception" not in diagnostics["caosdb"]
def request(method, headers, path, body=None):
""" Connect without auth-token. This is clumsy, bc the pylib is not
intended to be used as anonymous user.
"""
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_REQUIRED
if hasattr(context, "check_hostname"):
context.check_hostname = True
context.load_verify_locations(get_config().get("Connection", "cacert"))
url = get_config().get("Connection", "url")
fullurl = urlparse(url)
http_con = HTTPSConnection(
str(fullurl.netloc), timeout=200, context=context)
http_con.request(method=method, headers=headers, url=str(fullurl.path) +
path, body=body)
return http_con.getresponse()
def test_anonymous_script_calling_not_permitted():
form = dict()
form["call"] = "ok"
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# remove all anonymous permissions
admin._set_permissions("anonymous", [
admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok"),
admin.PermissionRule("Deny", "*")
])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
assert response.status == 403 # forbidden
assert response.getheader("Set-Cookie") is None # no auth token returned
@ mark.local_server
def test_anonymous_script_calling_success():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "ok_anonymous"
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# give permission to call ok_anonymous
admin._set_permissions("anonymous",
[admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok_anonymous")])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
assert response.status == 200 # ok
assert response.getheader("Set-Cookie") is None # no auth token returned
assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
body = response.read()
xml = etree.fromstring(body)
# verify unauthenticated call to script ok_anonymous
assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
assert xml.xpath("/Response/script/call")[0].text == "ok_anonymous"
assert xml.xpath("/Response/script/stdout")[0].text == "ok_anonymous"
assert xml.xpath("/Response/script/stderr")[0].text is None
assert xml.xpath("/Response/script/@code")[0] == "0"
@mark.local_server
def test_evil_path():
subdir = os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, "subdir")
if not os.path.exists(subdir):
os.makedirs(subdir)
_REMOVE_FILES_AFTERWARDS.append(subdir)
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER,
"subdir"))
# ok exists one level up from "subdir"
assert os.path.exists(
os.path.join(
_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL,
"ok"))
with raises(HTTPResourceNotFoundError):
r = run_server_side_script("../ok")