# -*- coding: utf-8 -*- # # ** header v3.0 # This file is a part of the CaosDB Project. # # Copyright (C) 2018 Research Group Biomedical Physics, # Max-Planck-Institute for Dynamics and Self-Organization Göttingen # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # # ** end header # """test_server_side_scripting Integration tests for the implementation of the server-side-scripting api. """ from __future__ import print_function, unicode_literals import json import os import ssl import tempfile from http.client import HTTPSConnection from caosdb import Info, RecordType from caosdb import administration as admin from caosdb import execute_query, get_config, get_connection from caosdb.connection.encode import MultipartParam, multipart_encode from caosdb.connection.utils import urlencode, urlparse from caosdb.exceptions import HTTPClientError, HTTPResourceNotFoundError from caosdb.utils.server_side_scripting import run_server_side_script from lxml import etree from pytest import mark, raises _TEST_SCRIPTS = ["not_executable", "ok", "err", "ok_anonymous"] try: _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = get_config().get( "IntegrationTests", "test_server_side_scripting.bin_dir.local") except Exception: _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = tempfile.mkdtemp() try: _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = get_config().get( "IntegrationTests", "test_server_side_scripting.bin_dir.server") except Exception: _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = "" _TEST_SCRIPTS_DIR = "./resources/" _REMOVE_FILES_AFTERWARDS = [] _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = "" def clean_database(): admin._set_permissions("anonymous", []) d = execute_query("FIND ENTITY WITH ID > 99") if len(d) > 0: d.delete() def setup(): clean_database() def teardown(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _ORIGINAL_SERVER_SCRIPTING_BIN_DIR) clean_database() def setup_module(): global _ORIGINAL_SERVER_SCRIPTING_BIN_DIR _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = admin.get_server_property( "SERVER_SIDE_SCRIPTING_BIN_DIRS") clean_database() from os import makedirs from os.path import exists, isdir, join from shutil import copyfile, copymode print("bin dir (local): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)) print("bin dir (server): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)) print("tests scripts: " + str(_TEST_SCRIPTS)) if not exists(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL): makedirs(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL) _REMOVE_FILES_AFTERWARDS.append(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL) assert isdir(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL) for script_file in _TEST_SCRIPTS: target = join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, script_file) src = join(_TEST_SCRIPTS_DIR, script_file) copyfile(src, target) copymode(src, target) _REMOVE_FILES_AFTERWARDS.append(target) def teardown_module(): from os import remove from os.path import exists, isdir from shutil import rmtree for obsolete in _REMOVE_FILES_AFTERWARDS: if exists(obsolete): if isdir(obsolete): rmtree(obsolete) else: remove(obsolete) clean_database() def test_call_script_non_existing(): form = dict() form["call"] = "non_existing_script" with raises(HTTPResourceNotFoundError): get_connection().post_form_data("scripting", form) @mark.local_server def test_call_script_not_executable(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER) form = dict() form["call"] = "not_executable" with raises(HTTPClientError) as exc_info: get_connection().post_form_data("scripting", form) assert "not executable" in exc_info.value.body.decode("utf-8") @mark.local_server def test_call_ok(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER) form = dict() form["call"] = "ok" r = get_connection().post_form_data("scripting", form) xml = etree.parse(r) assert xml.xpath("/Response/script/call")[0].text == "ok" assert xml.xpath("/Response/script/stdout")[0].text == "ok" assert xml.xpath("/Response/script/stderr")[0].text is None assert xml.xpath("/Response/script/@code")[0] == "0" @mark.local_server def test_call_err(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER) form = dict() form["call"] = "err" r = get_connection().post_form_data("scripting", form) xml = etree.parse(r) assert xml.xpath("/Response/script/@code")[0] == "1" assert xml.xpath("/Response/script/call")[0].text == "err" assert xml.xpath("/Response/script/stdout")[0].text is None assert xml.xpath("/Response/script/stderr")[0].text == "err" def test_run_server_side_script_with_file_as_positional_param(): RecordType("TestRT").insert() _REMOVE_FILES_AFTERWARDS.append("test_file.txt") with open("test_file.txt", "w") as f: f.write("this is a test") response = run_server_side_script("administration/diagnostics.py", "pos0", "pos1", exit="123", query="COUNT TestRT", files={"-p2": "test_file.txt"}) assert response.stderr is None assert response.code == 123 assert response.call == ('administration/diagnostics.py ' '--exit=123 --query=COUNT TestRT ' 'pos0 pos1 .upload_files/test_file.txt') json_data = json.loads(response.stdout) assert "caosdb" in json_data assert "query" in json_data["caosdb"] assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"] assert "./.upload_files/test_file.txt" in json_data["files"] def test_run_server_side_script_with_additional_file(): RecordType("TestRT").insert() _REMOVE_FILES_AFTERWARDS.append("test_file.txt") with open("test_file.txt", "w") as f: f.write("this is a test") response = run_server_side_script("administration/diagnostics.py", "pos0", "pos1", exit="123", query="COUNT TestRT", files={"dummykey": "test_file.txt"}) assert response.stderr is None assert response.code == 123 assert response.call == ('administration/diagnostics.py ' '--exit=123 --query=COUNT TestRT ' 'pos0 pos1') json_data = json.loads(response.stdout) assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"] assert "./.upload_files/test_file.txt" in json_data["files"] def test_diagnostics_basic(): RecordType("TestRT").insert() form = dict() form["call"] = "administration/diagnostics.py" form["-Oexit"] = "123" form["-Oquery"] = "COUNT TestRT" response = get_connection().post_form_data("scripting", form) xml = etree.parse(response) print(etree.tostring(xml)) assert response.status == 200 # ok assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8' diagnostics = xml.xpath("/Response/script/stdout")[0].text assert diagnostics is not None diagnostics = json.loads(diagnostics) assert diagnostics["python_version"] is not None assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed" " in server's python path") assert diagnostics["auth_token"] is not None EXC_ERR = ("There shouldn't be any exception during the diagnostics of " "the interaction of the server-side script and the server.") assert "exception" not in diagnostics["caosdb"], EXC_ERR assert "query" in diagnostics["caosdb"] assert diagnostics["caosdb"]["query"][0] == "COUNT TestRT" assert diagnostics["caosdb"]["query"][1] == "1", ("The RecordType should " "have been found.") assert xml.xpath("/Response/script/@code")[0] == "123", ("The script " "should exit " "with code 123.") assert xml.xpath("/Response/script/call")[0].text.startswith( "administration/diagnostics.py") assert xml.xpath("/Response/script/stderr")[0].text is None def test_diagnostics_with_file_upload(): RecordType("TestRT").insert() _REMOVE_FILES_AFTERWARDS.append("test_file.txt") with open("test_file.txt", "w") as f: f.write("this is a test") parts = [] parts.append(MultipartParam.from_file(paramname="txt_file", filename="test_file.txt")) parts.append(MultipartParam("call", "administration/diagnostics.py")) body, headers = multipart_encode(parts) response = get_connection().insert( ["scripting"], body=body, headers=headers) assert response.status == 200 # ok assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8' xml = etree.parse(response) print(etree.tostring(xml)) assert xml.xpath("/Response/script/@code")[0] == "0" diagnostics = xml.xpath("/Response/script/stdout")[0].text assert diagnostics is not None diagnostics = json.loads(diagnostics) assert diagnostics["python_version"] is not None assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed" " in server's python path") assert diagnostics["auth_token"] is not None assert "exception" not in diagnostics["caosdb"], ("There shouldn't be any " "exception during the " "diagnostics of the " "interaction of the " "server-side " "script and the server.") assert xml.xpath("/Response/script/@code")[0] == "0", ("The script " "should exit " "with code 0.") assert xml.xpath("/Response/script/call")[0].text.startswith( "administration/diagnostics.py") assert xml.xpath("/Response/script/stderr")[0].text is None @mark.local_server def test_call_as_anonymous_with_administration_role(): assert Info().user_info.roles == ["administration"] # activate anonymous user admin.set_server_property("AUTH_OPTIONAL", "TRUE") # give permission to call diagnostics.py admin._set_permissions( "anonymous", [ admin.PermissionRule( "Grant", "SCRIPTING:EXECUTE:administration:diagnostics.py")]) form = dict() form["call"] = "administration/diagnostics.py" headers = {"Content-Type": "application/x-www-form-urlencoded"} response = request(method="POST", headers=headers, path="scripting", body=urlencode(form)) xml = etree.parse(response) assert response.getheader("Set-Cookie") is None # no auth token returned assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8' assert response.status == 200 # ok assert xml.xpath("/Response/script/@code")[0] == "0" diagnostics = xml.xpath("/Response/script/stdout")[0].text assert diagnostics is not None diagnostics = json.loads(diagnostics) assert diagnostics["python_version"] is not None assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed" " in server's python path") assert diagnostics["auth_token"] is not None assert "exception" not in diagnostics["caosdb"] def request(method, headers, path, body=None): """ Connect without auth-token. This is clumsy, bc the pylib is not intended to be used as anonymous user. """ context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_REQUIRED if hasattr(context, "check_hostname"): context.check_hostname = True context.load_verify_locations(get_config().get("Connection", "cacert")) url = get_config().get("Connection", "url") fullurl = urlparse(url) http_con = HTTPSConnection( str(fullurl.netloc), timeout=200, context=context) http_con.request(method=method, headers=headers, url=str(fullurl.path) + path, body=body) return http_con.getresponse() def test_anonymous_script_calling_not_permitted(): form = dict() form["call"] = "ok" # activate anonymous user admin.set_server_property("AUTH_OPTIONAL", "TRUE") # remove all anonymous permissions admin._set_permissions("anonymous", [ admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok"), admin.PermissionRule("Deny", "*") ]) headers = {"Content-Type": "application/x-www-form-urlencoded"} response = request(method="POST", headers=headers, path="scripting", body=urlencode(form)) assert response.status == 403 # forbidden assert response.getheader("Set-Cookie") is None # no auth token returned @ mark.local_server def test_anonymous_script_calling_success(): admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER) form = dict() form["call"] = "ok_anonymous" # activate anonymous user admin.set_server_property("AUTH_OPTIONAL", "TRUE") # give permission to call ok_anonymous admin._set_permissions("anonymous", [admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok_anonymous")]) headers = {"Content-Type": "application/x-www-form-urlencoded"} response = request(method="POST", headers=headers, path="scripting", body=urlencode(form)) assert response.status == 200 # ok assert response.getheader("Set-Cookie") is None # no auth token returned assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8' body = response.read() xml = etree.fromstring(body) # verify unauthenticated call to script ok_anonymous assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous" assert xml.xpath("/Response/script/call")[0].text == "ok_anonymous" assert xml.xpath("/Response/script/stdout")[0].text == "ok_anonymous" assert xml.xpath("/Response/script/stderr")[0].text is None assert xml.xpath("/Response/script/@code")[0] == "0" @mark.local_server def test_evil_path(): subdir = os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, "subdir") if not os.path.exists(subdir): os.makedirs(subdir) _REMOVE_FILES_AFTERWARDS.append(subdir) admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS", os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER, "subdir")) # ok exists one level up from "subdir" assert os.path.exists( os.path.join( _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, "ok")) with raises(HTTPResourceNotFoundError): r = run_server_side_script("../ok")