Skip to content
Snippets Groups Projects
Select Git revision
  • d31c447ee87d90339359156347f741b0818a5c8e
  • main default protected
  • dev protected
  • f-fix-accent-sensitivity
  • f-filesystem-import
  • f-update-acl
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-string-ids
  • f-filesystem-main
  • f-multipart-encoding
  • f-trigger-advanced-user-tools
  • f-real-rename-test-pylibsolo2
  • f-real-rename-test-pylibsolo
  • f-real-rename-test
  • f-linkahead-rename
  • f-reference-record
  • f-xml-serialization
  • f-xfail-server-181
  • linkahead-pylib-v0.18.0
  • linkahead-control-v0.16.0
  • linkahead-pylib-v0.17.0
  • linkahead-mariadbbackend-v8.0.0
  • linkahead-server-v0.13.0
  • caosdb-pylib-v0.15.0
  • caosdb-pylib-v0.14.0
  • caosdb-pylib-v0.13.2
  • caosdb-server-v0.12.1
  • caosdb-pylib-v0.13.1
  • caosdb-pylib-v0.12.0
  • caosdb-server-v0.10.0
  • caosdb-pylib-v0.11.1
  • caosdb-pylib-v0.11.0
  • caosdb-server-v0.9.0
  • caosdb-pylib-v0.10.0
  • caosdb-server-v0.8.1
  • caosdb-pylib-v0.8.0
  • caosdb-server-v0.8.0
  • caosdb-pylib-v0.7.2
41 results

test_server_side_scripting.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_server_side_scripting.py 16.03 KiB
    # -*- coding: utf-8 -*-
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    """test_server_side_scripting
    
    Integration tests for the implementation of the server-side-scripting api.
    """
    from __future__ import print_function, unicode_literals
    
    import json
    import os
    import ssl
    import tempfile
    from http.client import HTTPSConnection
    
    from caosdb import Info, RecordType
    from caosdb import administration as admin
    from caosdb import execute_query, get_config, get_connection
    from caosdb.connection.encode import MultipartParam, multipart_encode
    from caosdb.connection.utils import urlencode, urlparse
    from caosdb.exceptions import HTTPClientError, HTTPResourceNotFoundError
    from caosdb.utils.server_side_scripting import run_server_side_script
    from lxml import etree
    from pytest import mark, raises
    
    _TEST_SCRIPTS = ["not_executable", "ok", "err", "ok_anonymous"]
    
    try:
        _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = get_config().get(
            "IntegrationTests",
            "test_server_side_scripting.bin_dir.local")
    except Exception:
        _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = tempfile.mkdtemp()
    try:
        _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = get_config().get(
            "IntegrationTests",
            "test_server_side_scripting.bin_dir.server")
    except Exception:
        _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = ""
    _TEST_SCRIPTS_DIR = "./resources/"
    _REMOVE_FILES_AFTERWARDS = []
    _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = ""
    
    
    def clean_database():
        admin._set_permissions("anonymous", [])
        d = execute_query("FIND ENTITY WITH ID > 99")
    
        if len(d) > 0:
            d.delete()
    
    
    def setup():
        clean_database()
    
    
    def teardown():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _ORIGINAL_SERVER_SCRIPTING_BIN_DIR)
        clean_database()
    
    
    def setup_module():
        global _ORIGINAL_SERVER_SCRIPTING_BIN_DIR
        _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = admin.get_server_property(
            "SERVER_SIDE_SCRIPTING_BIN_DIRS")
        clean_database()
    
        from os import makedirs
        from os.path import exists, isdir, join
        from shutil import copyfile, copymode
        print("bin dir (local): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL))
        print("bin dir (server): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER))
        print("tests scripts: " + str(_TEST_SCRIPTS))
    
        if not exists(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL):
            makedirs(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
            _REMOVE_FILES_AFTERWARDS.append(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
        assert isdir(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
    
        for script_file in _TEST_SCRIPTS:
            target = join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, script_file)
            src = join(_TEST_SCRIPTS_DIR, script_file)
            copyfile(src, target)
            copymode(src, target)
            _REMOVE_FILES_AFTERWARDS.append(target)
    
    
    def teardown_module():
        from os import remove
        from os.path import exists, isdir
        from shutil import rmtree
    
        for obsolete in _REMOVE_FILES_AFTERWARDS:
            if exists(obsolete):
                if isdir(obsolete):
                    rmtree(obsolete)
                else:
                    remove(obsolete)
        clean_database()
    
    
    def test_call_script_non_existing():
        form = dict()
        form["call"] = "non_existing_script"
        with raises(HTTPResourceNotFoundError):
            get_connection().post_form_data("scripting", form)
    
    
    @mark.local_server
    def test_call_script_not_executable():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
        form = dict()
        form["call"] = "not_executable"
        with raises(HTTPClientError) as exc_info:
            get_connection().post_form_data("scripting", form)
        assert "not executable" in exc_info.value.body.decode("utf-8")
    
    
    @mark.local_server
    def test_call_ok():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
        form = dict()
        form["call"] = "ok"
        r = get_connection().post_form_data("scripting", form)
        xml = etree.parse(r)
        assert xml.xpath("/Response/script/call")[0].text == "ok"
        assert xml.xpath("/Response/script/stdout")[0].text == "ok"
        assert xml.xpath("/Response/script/stderr")[0].text is None
        assert xml.xpath("/Response/script/@code")[0] == "0"
    
    
    @mark.local_server
    def test_call_err():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
        form = dict()
        form["call"] = "err"
        r = get_connection().post_form_data("scripting", form)
        xml = etree.parse(r)
        assert xml.xpath("/Response/script/@code")[0] == "1"
        assert xml.xpath("/Response/script/call")[0].text == "err"
        assert xml.xpath("/Response/script/stdout")[0].text is None
        assert xml.xpath("/Response/script/stderr")[0].text == "err"
    
    
    def test_run_server_side_script_with_file_as_positional_param():
        RecordType("TestRT").insert()
        _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
        with open("test_file.txt", "w") as f:
            f.write("this is a test")
    
        response = run_server_side_script("administration/diagnostics.py",
                                          "pos0",
                                          "pos1",
                                          exit="123",
                                          query="COUNT TestRT",
                                          files={"-p2": "test_file.txt"})
        assert response.stderr is None
        assert response.code == 123
        assert response.call == ('administration/diagnostics.py '
                                 '--exit=123 --query=COUNT TestRT '
                                 'pos0 pos1 .upload_files/test_file.txt')
    
        json_data = json.loads(response.stdout)
        assert "caosdb" in json_data
        assert "query" in json_data["caosdb"]
        assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"]
        assert "./.upload_files/test_file.txt" in json_data["files"]
    
    
    def test_run_server_side_script_with_additional_file():
        RecordType("TestRT").insert()
        _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
        with open("test_file.txt", "w") as f:
            f.write("this is a test")
    
        response = run_server_side_script("administration/diagnostics.py",
                                          "pos0",
                                          "pos1",
                                          exit="123",
                                          query="COUNT TestRT",
                                          files={"dummykey": "test_file.txt"})
        assert response.stderr is None
        assert response.code == 123
        assert response.call == ('administration/diagnostics.py '
                                 '--exit=123 --query=COUNT TestRT '
                                 'pos0 pos1')
    
        json_data = json.loads(response.stdout)
        assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"]
        assert "./.upload_files/test_file.txt" in json_data["files"]
    
    
    def test_diagnostics_basic():
        RecordType("TestRT").insert()
    
        form = dict()
        form["call"] = "administration/diagnostics.py"
        form["-Oexit"] = "123"
        form["-Oquery"] = "COUNT TestRT"
    
        response = get_connection().post_form_data("scripting", form)
        xml = etree.parse(response)
        print(etree.tostring(xml))
    
        assert response.status == 200  # ok
        assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
    
        diagnostics = xml.xpath("/Response/script/stdout")[0].text
        assert diagnostics is not None
        diagnostics = json.loads(diagnostics)
        assert diagnostics["python_version"] is not None
        assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                            " in server's python path")
        assert diagnostics["auth_token"] is not None
        EXC_ERR = ("There shouldn't be any exception during the diagnostics of "
                   "the interaction of the server-side script and the server.")
        assert "exception" not in diagnostics["caosdb"], EXC_ERR
    
        assert "query" in diagnostics["caosdb"]
        assert diagnostics["caosdb"]["query"][0] == "COUNT TestRT"
        assert diagnostics["caosdb"]["query"][1] == "1", ("The RecordType should "
                                                          "have been found.")
        assert xml.xpath("/Response/script/@code")[0] == "123", ("The script "
                                                                 "should exit "
                                                                 "with code 123.")
        assert xml.xpath("/Response/script/call")[0].text.startswith(
            "administration/diagnostics.py")
        assert xml.xpath("/Response/script/stderr")[0].text is None
    
    
    def test_diagnostics_with_file_upload():
        RecordType("TestRT").insert()
        _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
        with open("test_file.txt", "w") as f:
            f.write("this is a test")
    
        parts = []
        parts.append(MultipartParam.from_file(paramname="txt_file",
                                              filename="test_file.txt"))
        parts.append(MultipartParam("call", "administration/diagnostics.py"))
        body, headers = multipart_encode(parts)
        response = get_connection().insert(
            ["scripting"], body=body, headers=headers)
        assert response.status == 200  # ok
        assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
    
        xml = etree.parse(response)
        print(etree.tostring(xml))
        assert xml.xpath("/Response/script/@code")[0] == "0"
    
        diagnostics = xml.xpath("/Response/script/stdout")[0].text
        assert diagnostics is not None
        diagnostics = json.loads(diagnostics)
        assert diagnostics["python_version"] is not None
        assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                            " in server's python path")
        assert diagnostics["auth_token"] is not None
        assert "exception" not in diagnostics["caosdb"], ("There shouldn't be any "
                                                          "exception during the "
                                                          "diagnostics of the "
                                                          "interaction of the "
                                                          "server-side "
                                                          "script and the server.")
    
        assert xml.xpath("/Response/script/@code")[0] == "0", ("The script "
                                                               "should exit "
                                                               "with code 0.")
        assert xml.xpath("/Response/script/call")[0].text.startswith(
            "administration/diagnostics.py")
        assert xml.xpath("/Response/script/stderr")[0].text is None
    
    
    @mark.local_server
    def test_call_as_anonymous_with_administration_role():
        assert Info().user_info.roles == ["administration"]
    
        # activate anonymous user
        admin.set_server_property("AUTH_OPTIONAL", "TRUE")
        # give permission to call diagnostics.py
        admin._set_permissions(
            "anonymous", [
                admin.PermissionRule(
                    "Grant", "SCRIPTING:EXECUTE:administration:diagnostics.py")])
    
        form = dict()
        form["call"] = "administration/diagnostics.py"
    
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = request(method="POST", headers=headers, path="scripting",
                           body=urlencode(form))
    
        xml = etree.parse(response)
    
        assert response.getheader("Set-Cookie") is None  # no auth token returned
        assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
    
        assert response.status == 200  # ok
        assert xml.xpath("/Response/script/@code")[0] == "0"
        diagnostics = xml.xpath("/Response/script/stdout")[0].text
        assert diagnostics is not None
        diagnostics = json.loads(diagnostics)
        assert diagnostics["python_version"] is not None
        assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                            " in server's python path")
        assert diagnostics["auth_token"] is not None
        assert "exception" not in diagnostics["caosdb"]
    
    
    def request(method, headers, path, body=None):
        """ Connect without auth-token. This is clumsy, bc the pylib is not
        intended to be used as anonymous user.
        """
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        context.verify_mode = ssl.CERT_REQUIRED
    
        if hasattr(context, "check_hostname"):
            context.check_hostname = True
        context.load_verify_locations(get_config().get("Connection", "cacert"))
    
        url = get_config().get("Connection", "url")
        fullurl = urlparse(url)
    
        http_con = HTTPSConnection(
            str(fullurl.netloc), timeout=200, context=context)
        http_con.request(method=method, headers=headers, url=str(fullurl.path) +
                         path, body=body)
    
        return http_con.getresponse()
    
    
    def test_anonymous_script_calling_not_permitted():
        form = dict()
        form["call"] = "ok"
    
        # activate anonymous user
        admin.set_server_property("AUTH_OPTIONAL", "TRUE")
    
        # remove all anonymous permissions
        admin._set_permissions("anonymous", [
            admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok"),
            admin.PermissionRule("Deny", "*")
        ])
    
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = request(method="POST", headers=headers, path="scripting",
                           body=urlencode(form))
        assert response.status == 403  # forbidden
        assert response.getheader("Set-Cookie") is None  # no auth token returned
    
    
    @ mark.local_server
    def test_anonymous_script_calling_success():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
        form = dict()
        form["call"] = "ok_anonymous"
    
        # activate anonymous user
        admin.set_server_property("AUTH_OPTIONAL", "TRUE")
        # give permission to call ok_anonymous
        admin._set_permissions("anonymous",
                               [admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok_anonymous")])
    
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = request(method="POST", headers=headers, path="scripting",
                           body=urlencode(form))
        assert response.status == 200  # ok
        assert response.getheader("Set-Cookie") is None  # no auth token returned
        assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
    
        body = response.read()
        xml = etree.fromstring(body)
    
        # verify unauthenticated call to script ok_anonymous
        assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
        assert xml.xpath("/Response/script/call")[0].text == "ok_anonymous"
        assert xml.xpath("/Response/script/stdout")[0].text == "ok_anonymous"
        assert xml.xpath("/Response/script/stderr")[0].text is None
        assert xml.xpath("/Response/script/@code")[0] == "0"
    
    
    @mark.local_server
    def test_evil_path():
        subdir = os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, "subdir")
    
        if not os.path.exists(subdir):
            os.makedirs(subdir)
            _REMOVE_FILES_AFTERWARDS.append(subdir)
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER,
                                               "subdir"))
    
        # ok exists one level up from "subdir"
        assert os.path.exists(
            os.path.join(
                _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL,
                "ok"))
    
        with raises(HTTPResourceNotFoundError):
            r = run_server_side_script("../ok")