Select Git revision
-
Henrik tom Wörden authoredHenrik tom Wörden authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_server_side_scripting.py 16.13 KiB
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the LinkAhead Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""test_server_side_scripting
Integration tests for the implementation of the server-side-scripting api.
"""
from __future__ import print_function, unicode_literals
import json
import os
import ssl
import tempfile
from http.client import HTTPSConnection
from linkahead import Info, RecordType
from linkahead import administration as admin
from linkahead import execute_query, get_config, get_connection
from linkahead.connection.encode import MultipartParam, multipart_encode
from linkahead.connection.utils import urlencode, urlparse
from linkahead.exceptions import HTTPClientError, HTTPResourceNotFoundError
from linkahead.utils.server_side_scripting import run_server_side_script
from lxml import etree
from pytest import mark, raises
_TEST_SCRIPTS = ["not_executable", "ok", "err", "ok_anonymous"]
try:
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = get_config().get(
"IntegrationTests",
"test_server_side_scripting.bin_dir.server")
except Exception:
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = ""
_TEST_SCRIPTS_DIR = "./resources/"
_REMOVE_FILES_AFTERWARDS = []
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR = ""
def clean_database():
admin._set_permissions("anonymous", [])
d = execute_query("FIND ENTITY WITH ID > 99")
if len(d) > 0:
d.delete()
def assert_stderr(stderr) -> None:
"""Assert that ``stderr`` is either None or contains only the pyarrow deprecation warning.
This can probably removed with Pandas 3.0, to be replaced by ``assert stderr is None``.
Parameters
----------
stderr
The object to be tested.
"""
if stderr is None:
return
assert isinstance(stderr, str)
assert stderr.split(sep="\n", maxsplit=1)[1] == """Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
i = __import__(m)"""
def setup_function(function):
clean_database()
def teardown_function(function):
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR)
clean_database()
def setup_module():
global _ORIGINAL_SERVER_SCRIPTING_BIN_DIR
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR = admin.get_server_property(
"SERVER_SIDE_SCRIPTING_BIN_DIRS")
clean_database()
def teardown_module():
from os import remove
from os.path import exists, isdir
from shutil import rmtree
for obsolete in _REMOVE_FILES_AFTERWARDS:
if exists(obsolete):
if isdir(obsolete):
rmtree(obsolete)
else:
remove(obsolete)
clean_database()
def request(method, headers, path, body=None):
"""Connect without auth-token.
This is clumsy because the pylib is not intended to be used as anonymous user.
"""
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_REQUIRED
if hasattr(context, "check_hostname"):
context.check_hostname = True
context.load_verify_locations(get_config().get("Connection", "cacert"))
url = get_config().get("Connection", "url")
fullurl = urlparse(url)
http_con = HTTPSConnection(
str(fullurl.netloc), timeout=200, context=context)
http_con.request(method=method, headers=headers, url=str(fullurl.path) +
path, body=body)
return http_con.getresponse()
def test_call_script_non_existing():
form = dict()
form["call"] = "non_existing_script"
with raises(HTTPResourceNotFoundError):
get_connection().post_form_data("scripting", form)
@mark.local_server
def test_call_script_not_executable():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "not_executable"
with raises(HTTPClientError) as exc_info:
get_connection().post_form_data("scripting", form)
assert "not executable" in exc_info.value.body.decode("utf-8")
@mark.local_server
def test_call_ok():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "ok"
r = get_connection().post_form_data("scripting", form)
xml = etree.parse(r)
assert xml.xpath("/Response/script/call")[0].text == "ok"
assert xml.xpath("/Response/script/stdout")[0].text == "ok"
assert_stderr(xml.xpath("/Response/script/stderr")[0].text)
assert xml.xpath("/Response/script/@code")[0] == "0"
@mark.local_server
def test_call_err():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "err"
r = get_connection().post_form_data("scripting", form)
xml = etree.parse(r)
assert xml.xpath("/Response/script/@code")[0] == "1"
assert xml.xpath("/Response/script/call")[0].text == "err"
assert xml.xpath("/Response/script/stdout")[0].text is None
assert xml.xpath("/Response/script/stderr")[0].text == "err"
def test_run_server_side_script_with_file_as_positional_param():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
response = run_server_side_script("administration/diagnostics.py",
"pos0",
"pos1",
exit="123",
query="COUNT ENTITY TestRT",
files={"-p2": "test_file.txt"})
assert_stderr(response.stderr)
assert response.code == 123
assert response.call == ('administration/diagnostics.py '
'--exit=123 --query=COUNT ENTITY TestRT '
'pos0 pos1 .upload_files/test_file.txt')
json_data = json.loads(response.stdout)
assert "caosdb" in json_data
assert "query" in json_data["caosdb"]
assert json_data["caosdb"]["query"] == ["COUNT ENTITY TestRT", "1"]
assert "./.upload_files/test_file.txt" in json_data["files"]
def test_run_server_side_script_with_additional_file():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
response = run_server_side_script("administration/diagnostics.py",
"pos0",
"pos1",
exit="123",
query="COUNT ENTITY TestRT",
files={"dummykey": "test_file.txt"})
assert_stderr(response.stderr)
assert response.code == 123
assert response.call == ('administration/diagnostics.py '
'--exit=123 --query=COUNT ENTITY TestRT '
'pos0 pos1')
json_data = json.loads(response.stdout)
assert json_data["caosdb"]["query"] == ["COUNT ENTITY TestRT", "1"]
assert "./.upload_files/test_file.txt" in json_data["files"]
def test_diagnostics_basic():
RecordType("TestRT").insert()
form = dict()
form["call"] = "administration/diagnostics.py"
form["-Oexit"] = "123"
form["-Oquery"] = "COUNT ENTITY TestRT"
response = get_connection().post_form_data("scripting", form)
xml = etree.parse(response)
print(etree.tostring(xml))
assert response.status == 200 # ok
assert "text/xml" in response.getheader("Content-Type").lower()
assert "charset=utf-8" in response.getheader("Content-Type").lower()
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
EXC_ERR = ("There shouldn't be any exception during the diagnostics of "
"the interaction of the server-side script and the server.")
assert "exception" not in diagnostics["caosdb"], EXC_ERR
assert "query" in diagnostics["caosdb"]
assert diagnostics["caosdb"]["query"][0] == "COUNT ENTITY TestRT"
assert diagnostics["caosdb"]["query"][1] == "1", ("The RecordType should "
"have been found.")
assert xml.xpath("/Response/script/@code")[0] == "123", ("The script "
"should exit "
"with code 123.")
assert xml.xpath("/Response/script/call")[0].text.startswith(
"administration/diagnostics.py")
assert_stderr(xml.xpath("/Response/script/stderr")[0].text)
def test_diagnostics_with_file_upload():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
parts = []
parts.append(MultipartParam.from_file(paramname="txt_file",
filename="test_file.txt"))
parts.append(MultipartParam("call", "administration/diagnostics.py"))
body, headers = multipart_encode(parts)
response = get_connection().insert(
["scripting"], body=body, headers=headers)
assert response.status == 200 # ok
assert "text/xml" in response.getheader("Content-Type").lower()
assert "charset=utf-8" in response.getheader("Content-Type").lower()
xml = etree.parse(response)
print(etree.tostring(xml))
assert xml.xpath("/Response/script/@code")[0] == "0"
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
assert "exception" not in diagnostics["caosdb"], ("There shouldn't be any "
"exception during the "
"diagnostics of the "
"interaction of the "
"server-side "
"script and the server.")
assert xml.xpath("/Response/script/@code")[0] == "0", ("The script "
"should exit "
"with code 0.")
assert xml.xpath("/Response/script/call")[0].text.startswith(
"administration/diagnostics.py")
assert_stderr(xml.xpath("/Response/script/stderr")[0].text)
@mark.local_server
def test_call_as_anonymous_with_administration_role():
assert Info().user_info.roles == ["administration"]
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# give permission to call diagnostics.py
admin._set_permissions(
"anonymous", [
admin.PermissionRule(
"Grant", "SCRIPTING:EXECUTE:administration:diagnostics.py")])
form = dict()
form["call"] = "administration/diagnostics.py"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
xml = etree.parse(response)
assert response.getheader("Set-Cookie") is None # no auth token returned
assert "text/xml" in response.getheader("Content-Type").lower()
assert "charset=utf-8" in response.getheader("Content-Type").lower()
assert response.status == 200 # ok
assert xml.xpath("/Response/script/@code")[0] == "0"
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
assert "exception" not in diagnostics["caosdb"]
def test_anonymous_script_calling_not_permitted():
form = dict()
form["call"] = "ok"
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# remove all anonymous permissions
admin._set_permissions("anonymous", [
admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok"),
admin.PermissionRule("Deny", "*")
])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
assert response.status == 403 # forbidden
assert response.getheader("Set-Cookie") is None # no auth token returned
@ mark.local_server
def test_anonymous_script_calling_success():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "ok_anonymous"
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# give permission to call ok_anonymous
admin._set_permissions("anonymous",
[admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok_anonymous")])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
assert response.status == 200 # ok
assert response.getheader("Set-Cookie") is None # no auth token returned
assert "text/xml" in response.getheader("Content-Type").lower()
assert "charset=utf-8" in response.getheader("Content-Type").lower()
body = response.read()
xml = etree.fromstring(body)
# verify unauthenticated call to script ok_anonymous
assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
assert xml.xpath("/Response/script/call")[0].text == "ok_anonymous"
assert xml.xpath("/Response/script/stdout")[0].text == "ok_anonymous"
assert_stderr(xml.xpath("/Response/script/stderr")[0].text)
assert xml.xpath("/Response/script/@code")[0] == "0"
@mark.local_server
def test_evil_path():
"""Test that we can't "escape" from the defined bin dirs, i.e.,
can't execute scripts in arbitrary locations.
"""
# Set SSS bin dir to "subdir" sub directory
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER,
"subdir"))
# The parent directory is not in the list of SSS bin dirs, so the
# server must not allow the execution of ok.
with raises(HTTPResourceNotFoundError):
r = run_server_side_script("../ok")