# -*- coding: utf-8 -*- # # ** header v3.0 # This file is a part of the LinkAhead Project. # # Copyright (C) 2018 Research Group Biomedical Physics, # Max-Planck-Institute for Dynamics and Self-Organization Göttingen # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # ** end header # """test_server_side_scripting Integration tests for the implementation of the server-side-scripting api. """ from __future__ import print_function, unicode_literals import json import os import ssl import tempfile from http.client import HTTPSConnection from linkahead import Info, RecordType from linkahead import administration as admin from linkahead import execute_query, get_config, get_connection from linkahead.connection.encode import MultipartParam, multipart_encode from linkahead.connection.utils import urlencode, urlparse from linkahead.exceptions import HTTPClientError, HTTPResourceNotFoundError from linkahead.utils.server_side_scripting import run_server_side_script from lxml import etree from pytest import mark, raises _TEST_SCRIPTS = ["not_executable", "ok", "err", "ok_anonymous"] try: _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = get_config().get( "IntegrationTests", "test_server_side_scripting.bin_dir.server") except Exception: _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = "" _TEST_SCRIPTS_DIR = "./resources/" _REMOVE_FILES_AFTERWARDS = [] _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = "" def clean_database(): admin._set_permissions("anonymous", []) d = execute_query("FIND ENTITY WITH ID > 99") if len(d) > 0: d.delete() def assert_stderr(stderr) -> None: """Assert that ``stderr`` is either None or contains only the pyarrow deprecation warning. This can probably removed with Pandas 3.0, to be replaced by ``assert stderr is None``. Parameters ---------- stderr The object to be tested. """ if stderr is None: return assert isinstance(stderr, str) assert stderr.split(sep="\n", maxsplit=1)[1] == """Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0), (to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries) but was not found to be installed on your system. If this would cause problems for you, please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466 i = __import__(m)""" def setup_function(function): clean_database() def teardown_function(function): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _ORIGINAL_SERVER_SCRIPTING_BIN_DIR) clean_database() def setup_module(): global _ORIGINAL_SERVER_SCRIPTING_BIN_DIR _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = admin.get_server_property( "SERVER_SIDE_SCRIPTING_BIN_DIRS") clean_database() def teardown_module(): from os import remove from os.path import exists, isdir from shutil import rmtree for obsolete in _REMOVE_FILES_AFTERWARDS: if exists(obsolete): if isdir(obsolete): rmtree(obsolete) else: remove(obsolete) clean_database() def request(method, headers, path, body=None): """Connect without auth-token. This is clumsy because the pylib is not intended to be used as anonymous user. """ context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_REQUIRED if hasattr(context, "check_hostname"): context.check_hostname = True context.load_verify_locations(get_config().get("Connection", "cacert")) url = get_config().get("Connection", "url") fullurl = urlparse(url) http_con = HTTPSConnection( str(fullurl.netloc), timeout=200, context=context) http_con.request(method=method, headers=headers, url=str(fullurl.path) + path, body=body) return http_con.getresponse() def test_call_script_non_existing(): form = dict() form["call"] = "non_existing_script" with raises(HTTPResourceNotFoundError): get_connection().post_form_data("scripting", form) @mark.local_server def test_call_script_not_executable(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER) form = dict() form["call"] = "not_executable" with raises(HTTPClientError) as exc_info: get_connection().post_form_data("scripting", form) assert "not executable" in exc_info.value.body.decode("utf-8") @mark.local_server def test_call_ok(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER) form = dict() form["call"] = "ok" r = get_connection().post_form_data("scripting", form) xml = etree.parse(r) assert xml.xpath("/Response/script/call")[0].text == "ok" assert xml.xpath("/Response/script/stdout")[0].text == "ok" assert_stderr(xml.xpath("/Response/script/stderr")[0].text) assert xml.xpath("/Response/script/@code")[0] == "0" @mark.local_server def test_call_err(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER) form = dict() form["call"] = "err" r = get_connection().post_form_data("scripting", form) xml = etree.parse(r) assert xml.xpath("/Response/script/@code")[0] == "1" assert xml.xpath("/Response/script/call")[0].text == "err" assert xml.xpath("/Response/script/stdout")[0].text is None assert xml.xpath("/Response/script/stderr")[0].text == "err" def test_run_server_side_script_with_file_as_positional_param(): RecordType("TestRT").insert() _REMOVE_FILES_AFTERWARDS.append("test_file.txt") with open("test_file.txt", "w") as f: f.write("this is a test") response = run_server_side_script("administration/diagnostics.py", "pos0", "pos1", exit="123", query="COUNT ENTITY TestRT", files={"-p2": "test_file.txt"}) assert_stderr(response.stderr) assert response.code == 123 assert response.call == ('administration/diagnostics.py ' '--exit=123 --query=COUNT ENTITY TestRT ' 'pos0 pos1 .upload_files/test_file.txt') json_data = json.loads(response.stdout) assert "caosdb" in json_data assert "query" in json_data["caosdb"] assert json_data["caosdb"]["query"] == ["COUNT ENTITY TestRT", "1"] assert "./.upload_files/test_file.txt" in json_data["files"] def test_run_server_side_script_with_additional_file(): RecordType("TestRT").insert() _REMOVE_FILES_AFTERWARDS.append("test_file.txt") with open("test_file.txt", "w") as f: f.write("this is a test") response = run_server_side_script("administration/diagnostics.py", "pos0", "pos1", exit="123", query="COUNT ENTITY TestRT", files={"dummykey": "test_file.txt"}) assert_stderr(response.stderr) assert response.code == 123 assert response.call == ('administration/diagnostics.py ' '--exit=123 --query=COUNT ENTITY TestRT ' 'pos0 pos1') json_data = json.loads(response.stdout) assert json_data["caosdb"]["query"] == ["COUNT ENTITY TestRT", "1"] assert "./.upload_files/test_file.txt" in json_data["files"] def test_diagnostics_basic(): RecordType("TestRT").insert() form = dict() form["call"] = "administration/diagnostics.py" form["-Oexit"] = "123" form["-Oquery"] = "COUNT ENTITY TestRT" response = get_connection().post_form_data("scripting", form) xml = etree.parse(response) print(etree.tostring(xml)) assert response.status == 200 # ok assert "text/xml" in response.getheader("Content-Type").lower() assert "charset=utf-8" in response.getheader("Content-Type").lower() diagnostics = xml.xpath("/Response/script/stdout")[0].text assert diagnostics is not None diagnostics = json.loads(diagnostics) assert diagnostics["python_version"] is not None assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed" " in server's python path") assert diagnostics["auth_token"] is not None EXC_ERR = ("There shouldn't be any exception during the diagnostics of " "the interaction of the server-side script and the server.") assert "exception" not in diagnostics["caosdb"], EXC_ERR assert "query" in diagnostics["caosdb"] assert diagnostics["caosdb"]["query"][0] == "COUNT ENTITY TestRT" assert diagnostics["caosdb"]["query"][1] == "1", ("The RecordType should " "have been found.") assert xml.xpath("/Response/script/@code")[0] == "123", ("The script " "should exit " "with code 123.") assert xml.xpath("/Response/script/call")[0].text.startswith( "administration/diagnostics.py") assert_stderr(xml.xpath("/Response/script/stderr")[0].text) def test_diagnostics_with_file_upload(): RecordType("TestRT").insert() _REMOVE_FILES_AFTERWARDS.append("test_file.txt") with open("test_file.txt", "w") as f: f.write("this is a test") parts = [] parts.append(MultipartParam.from_file(paramname="txt_file", filename="test_file.txt")) parts.append(MultipartParam("call", "administration/diagnostics.py")) body, headers = multipart_encode(parts) response = get_connection().insert( ["scripting"], body=body, headers=headers) assert response.status == 200 # ok assert "text/xml" in response.getheader("Content-Type").lower() assert "charset=utf-8" in response.getheader("Content-Type").lower() xml = etree.parse(response) print(etree.tostring(xml)) assert xml.xpath("/Response/script/@code")[0] == "0" diagnostics = xml.xpath("/Response/script/stdout")[0].text assert diagnostics is not None diagnostics = json.loads(diagnostics) assert diagnostics["python_version"] is not None assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed" " in server's python path") assert diagnostics["auth_token"] is not None assert "exception" not in diagnostics["caosdb"], ("There shouldn't be any " "exception during the " "diagnostics of the " "interaction of the " "server-side " "script and the server.") assert xml.xpath("/Response/script/@code")[0] == "0", ("The script " "should exit " "with code 0.") assert xml.xpath("/Response/script/call")[0].text.startswith( "administration/diagnostics.py") assert_stderr(xml.xpath("/Response/script/stderr")[0].text) @mark.local_server def test_call_as_anonymous_with_administration_role(): assert Info().user_info.roles == ["administration"] # activate anonymous user admin.set_server_property("AUTH_OPTIONAL", "TRUE") # give permission to call diagnostics.py admin._set_permissions( "anonymous", [ admin.PermissionRule( "Grant", "SCRIPTING:EXECUTE:administration:diagnostics.py")]) form = dict() form["call"] = "administration/diagnostics.py" headers = {"Content-Type": "application/x-www-form-urlencoded"} response = request(method="POST", headers=headers, path="scripting", body=urlencode(form)) xml = etree.parse(response) assert response.getheader("Set-Cookie") is None # no auth token returned assert "text/xml" in response.getheader("Content-Type").lower() assert "charset=utf-8" in response.getheader("Content-Type").lower() assert response.status == 200 # ok assert xml.xpath("/Response/script/@code")[0] == "0" diagnostics = xml.xpath("/Response/script/stdout")[0].text assert diagnostics is not None diagnostics = json.loads(diagnostics) assert diagnostics["python_version"] is not None assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed" " in server's python path") assert diagnostics["auth_token"] is not None assert "exception" not in diagnostics["caosdb"] def test_anonymous_script_calling_not_permitted(): form = dict() form["call"] = "ok" # activate anonymous user admin.set_server_property("AUTH_OPTIONAL", "TRUE") # remove all anonymous permissions admin._set_permissions("anonymous", [ admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok"), admin.PermissionRule("Deny", "*") ]) headers = {"Content-Type": "application/x-www-form-urlencoded"} response = request(method="POST", headers=headers, path="scripting", body=urlencode(form)) assert response.status == 403 # forbidden assert response.getheader("Set-Cookie") is None # no auth token returned @ mark.local_server def test_anonymous_script_calling_success(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER) form = dict() form["call"] = "ok_anonymous" # activate anonymous user admin.set_server_property("AUTH_OPTIONAL", "TRUE") # give permission to call ok_anonymous admin._set_permissions("anonymous", [admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok_anonymous")]) headers = {"Content-Type": "application/x-www-form-urlencoded"} response = request(method="POST", headers=headers, path="scripting", body=urlencode(form)) assert response.status == 200 # ok assert response.getheader("Set-Cookie") is None # no auth token returned assert "text/xml" in response.getheader("Content-Type").lower() assert "charset=utf-8" in response.getheader("Content-Type").lower() body = response.read() xml = etree.fromstring(body) # verify unauthenticated call to script ok_anonymous assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous" assert xml.xpath("/Response/script/call")[0].text == "ok_anonymous" assert xml.xpath("/Response/script/stdout")[0].text == "ok_anonymous" assert_stderr(xml.xpath("/Response/script/stderr")[0].text) assert xml.xpath("/Response/script/@code")[0] == "0" @mark.local_server def test_evil_path(): """Test that we can't "escape" from the defined bin dirs, i.e., can't execute scripts in arbitrary locations. """ # Set SSS bin dir to "subdir" sub directory admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER, "subdir")) # The parent directory is not in the list of SSS bin dirs, so the # server must not allow the execution of ok. with raises(HTTPResourceNotFoundError): r = run_server_side_script("../ok")