Skip to content
Snippets Groups Projects
Select Git revision
  • f49137e67a7bf52de90ffc01331a6a475df097d0
  • main default protected
  • dev
  • f-docs-pylib
  • f-parse-value
  • f-compare
  • f-string-ids
  • f-217-set-special-property
  • f-filesystem-import
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-check-merge-entities
  • f-compare-enid
  • f-select-subproperties
  • v0.18.0
  • v0.17.0
  • v0.16.0
  • v0.15.1
  • v0.15.0
  • v0.14.0
  • v0.13.2
  • v0.13.1
  • v0.13.0
  • linkahead-rename-step-2
  • linkahead-rename-step-1
  • v0.12.0
  • v0.11.2
  • v0.11.1
  • v0.11.0
  • v0.10.0
  • v0.9.0
  • v0.8.0
  • v0.7.4
  • v0.7.3
36 results

setup.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_server_side_scripting.py 16.13 KiB
    # -*- coding: utf-8 -*-
    #
    # ** header v3.0
    # This file is a part of the LinkAhead Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    """test_server_side_scripting
    
    Integration tests for the implementation of the server-side-scripting api.
    """
    from __future__ import print_function, unicode_literals
    
    import json
    import os
    import ssl
    import tempfile
    from http.client import HTTPSConnection
    
    from linkahead import Info, RecordType
    from linkahead import administration as admin
    from linkahead import execute_query, get_config, get_connection
    from linkahead.connection.encode import MultipartParam, multipart_encode
    from linkahead.connection.utils import urlencode, urlparse
    from linkahead.exceptions import HTTPClientError, HTTPResourceNotFoundError
    from linkahead.utils.server_side_scripting import run_server_side_script
    from lxml import etree
    from pytest import mark, raises
    
    _TEST_SCRIPTS = ["not_executable", "ok", "err", "ok_anonymous"]
    
    try:
        _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = get_config().get(
            "IntegrationTests",
            "test_server_side_scripting.bin_dir.server")
    except Exception:
        _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = ""
    _TEST_SCRIPTS_DIR = "./resources/"
    _REMOVE_FILES_AFTERWARDS = []
    _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = ""
    
    
    def clean_database():
        admin._set_permissions("anonymous", [])
        d = execute_query("FIND ENTITY WITH ID > 99")
    
        if len(d) > 0:
            d.delete()
    
    
    def assert_stderr(stderr) -> None:
        """Assert that ``stderr`` is either None or contains only the pyarrow deprecation warning.
    
    This can probably removed with Pandas 3.0, to be replaced by ``assert stderr is None``.
    
    Parameters
    ----------
    stderr
      The object to be tested.
        """
        if stderr is None:
            return
        assert isinstance(stderr, str)
        assert stderr.split(sep="\n", maxsplit=1)[1] == """Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
    (to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
    but was not found to be installed on your system.
    If this would cause problems for you,
    please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
            
      i = __import__(m)"""
    
    
    def setup_function(function):
        clean_database()
    
    
    def teardown_function(function):
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _ORIGINAL_SERVER_SCRIPTING_BIN_DIR)
        clean_database()
    
    
    def setup_module():
        global _ORIGINAL_SERVER_SCRIPTING_BIN_DIR
        _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = admin.get_server_property(
            "SERVER_SIDE_SCRIPTING_BIN_DIRS")
        clean_database()
    
    
    def teardown_module():
        from os import remove
        from os.path import exists, isdir
        from shutil import rmtree
    
        for obsolete in _REMOVE_FILES_AFTERWARDS:
            if exists(obsolete):
                if isdir(obsolete):
                    rmtree(obsolete)
                else:
                    remove(obsolete)
        clean_database()
    
    
    def request(method, headers, path, body=None):
        """Connect without auth-token.
    
        This is clumsy because the pylib is not intended to be used as anonymous user.
        """
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        context.verify_mode = ssl.CERT_REQUIRED
    
        if hasattr(context, "check_hostname"):
            context.check_hostname = True
        context.load_verify_locations(get_config().get("Connection", "cacert"))
    
        url = get_config().get("Connection", "url")
        fullurl = urlparse(url)
    
        http_con = HTTPSConnection(
            str(fullurl.netloc), timeout=200, context=context)
        http_con.request(method=method, headers=headers, url=str(fullurl.path) +
                         path, body=body)
    
        return http_con.getresponse()
    
    
    def test_call_script_non_existing():
        form = dict()
        form["call"] = "non_existing_script"
        with raises(HTTPResourceNotFoundError):
            get_connection().post_form_data("scripting", form)
    
    
    @mark.local_server
    def test_call_script_not_executable():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
        form = dict()
        form["call"] = "not_executable"
        with raises(HTTPClientError) as exc_info:
            get_connection().post_form_data("scripting", form)
        assert "not executable" in exc_info.value.body.decode("utf-8")
    
    
    @mark.local_server
    def test_call_ok():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
        form = dict()
        form["call"] = "ok"
        r = get_connection().post_form_data("scripting", form)
        xml = etree.parse(r)
        assert xml.xpath("/Response/script/call")[0].text == "ok"
        assert xml.xpath("/Response/script/stdout")[0].text == "ok"
        assert_stderr(xml.xpath("/Response/script/stderr")[0].text)
        assert xml.xpath("/Response/script/@code")[0] == "0"
    
    
    @mark.local_server
    def test_call_err():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
        form = dict()
        form["call"] = "err"
        r = get_connection().post_form_data("scripting", form)
        xml = etree.parse(r)
        assert xml.xpath("/Response/script/@code")[0] == "1"
        assert xml.xpath("/Response/script/call")[0].text == "err"
        assert xml.xpath("/Response/script/stdout")[0].text is None
        assert xml.xpath("/Response/script/stderr")[0].text == "err"
    
    
    def test_run_server_side_script_with_file_as_positional_param():
        RecordType("TestRT").insert()
        _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
        with open("test_file.txt", "w") as f:
            f.write("this is a test")
    
        response = run_server_side_script("administration/diagnostics.py",
                                          "pos0",
                                          "pos1",
                                          exit="123",
                                          query="COUNT ENTITY TestRT",
                                          files={"-p2": "test_file.txt"})
        assert_stderr(response.stderr)
        assert response.code == 123
        assert response.call == ('administration/diagnostics.py '
                                 '--exit=123 --query=COUNT ENTITY TestRT '
                                 'pos0 pos1 .upload_files/test_file.txt')
    
        json_data = json.loads(response.stdout)
        assert "caosdb" in json_data
        assert "query" in json_data["caosdb"]
        assert json_data["caosdb"]["query"] == ["COUNT ENTITY TestRT", "1"]
        assert "./.upload_files/test_file.txt" in json_data["files"]
    
    
    def test_run_server_side_script_with_additional_file():
        RecordType("TestRT").insert()
        _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
        with open("test_file.txt", "w") as f:
            f.write("this is a test")
    
        response = run_server_side_script("administration/diagnostics.py",
                                          "pos0",
                                          "pos1",
                                          exit="123",
                                          query="COUNT ENTITY TestRT",
                                          files={"dummykey": "test_file.txt"})
        assert_stderr(response.stderr)
        assert response.code == 123
        assert response.call == ('administration/diagnostics.py '
                                 '--exit=123 --query=COUNT ENTITY TestRT '
                                 'pos0 pos1')
    
        json_data = json.loads(response.stdout)
        assert json_data["caosdb"]["query"] == ["COUNT ENTITY TestRT", "1"]
        assert "./.upload_files/test_file.txt" in json_data["files"]
    
    
    def test_diagnostics_basic():
        RecordType("TestRT").insert()
    
        form = dict()
        form["call"] = "administration/diagnostics.py"
        form["-Oexit"] = "123"
        form["-Oquery"] = "COUNT ENTITY TestRT"
    
        response = get_connection().post_form_data("scripting", form)
        xml = etree.parse(response)
        print(etree.tostring(xml))
    
        assert response.status == 200  # ok
        assert "text/xml" in response.getheader("Content-Type").lower()
        assert "charset=utf-8" in response.getheader("Content-Type").lower()
    
        diagnostics = xml.xpath("/Response/script/stdout")[0].text
        assert diagnostics is not None
        diagnostics = json.loads(diagnostics)
        assert diagnostics["python_version"] is not None
        assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                            " in server's python path")
        assert diagnostics["auth_token"] is not None
        EXC_ERR = ("There shouldn't be any exception during the diagnostics of "
                   "the interaction of the server-side script and the server.")
        assert "exception" not in diagnostics["caosdb"], EXC_ERR
    
        assert "query" in diagnostics["caosdb"]
        assert diagnostics["caosdb"]["query"][0] == "COUNT ENTITY TestRT"
        assert diagnostics["caosdb"]["query"][1] == "1", ("The RecordType should "
                                                          "have been found.")
        assert xml.xpath("/Response/script/@code")[0] == "123", ("The script "
                                                                 "should exit "
                                                                 "with code 123.")
        assert xml.xpath("/Response/script/call")[0].text.startswith(
            "administration/diagnostics.py")
        assert_stderr(xml.xpath("/Response/script/stderr")[0].text)
    
    
    def test_diagnostics_with_file_upload():
        RecordType("TestRT").insert()
        _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
        with open("test_file.txt", "w") as f:
            f.write("this is a test")
    
        parts = []
        parts.append(MultipartParam.from_file(paramname="txt_file",
                                              filename="test_file.txt"))
        parts.append(MultipartParam("call", "administration/diagnostics.py"))
        body, headers = multipart_encode(parts)
        response = get_connection().insert(
            ["scripting"], body=body, headers=headers)
        assert response.status == 200  # ok
        assert "text/xml" in response.getheader("Content-Type").lower()
        assert "charset=utf-8" in response.getheader("Content-Type").lower()
    
        xml = etree.parse(response)
        print(etree.tostring(xml))
        assert xml.xpath("/Response/script/@code")[0] == "0"
    
        diagnostics = xml.xpath("/Response/script/stdout")[0].text
        assert diagnostics is not None
        diagnostics = json.loads(diagnostics)
        assert diagnostics["python_version"] is not None
        assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                            " in server's python path")
        assert diagnostics["auth_token"] is not None
        assert "exception" not in diagnostics["caosdb"], ("There shouldn't be any "
                                                          "exception during the "
                                                          "diagnostics of the "
                                                          "interaction of the "
                                                          "server-side "
                                                          "script and the server.")
    
        assert xml.xpath("/Response/script/@code")[0] == "0", ("The script "
                                                               "should exit "
                                                               "with code 0.")
        assert xml.xpath("/Response/script/call")[0].text.startswith(
            "administration/diagnostics.py")
        assert_stderr(xml.xpath("/Response/script/stderr")[0].text)
    
    
    @mark.local_server
    def test_call_as_anonymous_with_administration_role():
        assert Info().user_info.roles == ["administration"]
    
        # activate anonymous user
        admin.set_server_property("AUTH_OPTIONAL", "TRUE")
        # give permission to call diagnostics.py
        admin._set_permissions(
            "anonymous", [
                admin.PermissionRule(
                    "Grant", "SCRIPTING:EXECUTE:administration:diagnostics.py")])
    
        form = dict()
        form["call"] = "administration/diagnostics.py"
    
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = request(method="POST", headers=headers, path="scripting",
                           body=urlencode(form))
    
        xml = etree.parse(response)
    
        assert response.getheader("Set-Cookie") is None  # no auth token returned
        assert "text/xml" in response.getheader("Content-Type").lower()
        assert "charset=utf-8" in response.getheader("Content-Type").lower()
    
        assert response.status == 200  # ok
        assert xml.xpath("/Response/script/@code")[0] == "0"
        diagnostics = xml.xpath("/Response/script/stdout")[0].text
        assert diagnostics is not None
        diagnostics = json.loads(diagnostics)
        assert diagnostics["python_version"] is not None
        assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                            " in server's python path")
        assert diagnostics["auth_token"] is not None
        assert "exception" not in diagnostics["caosdb"]
    
    
    def test_anonymous_script_calling_not_permitted():
        form = dict()
        form["call"] = "ok"
    
        # activate anonymous user
        admin.set_server_property("AUTH_OPTIONAL", "TRUE")
    
        # remove all anonymous permissions
        admin._set_permissions("anonymous", [
            admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok"),
            admin.PermissionRule("Deny", "*")
        ])
    
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = request(method="POST", headers=headers, path="scripting",
                           body=urlencode(form))
        assert response.status == 403  # forbidden
        assert response.getheader("Set-Cookie") is None  # no auth token returned
    
    
    @ mark.local_server
    def test_anonymous_script_calling_success():
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
        form = dict()
        form["call"] = "ok_anonymous"
    
        # activate anonymous user
        admin.set_server_property("AUTH_OPTIONAL", "TRUE")
        # give permission to call ok_anonymous
        admin._set_permissions("anonymous",
                               [admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok_anonymous")])
    
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        response = request(method="POST", headers=headers, path="scripting",
                           body=urlencode(form))
        assert response.status == 200  # ok
        assert response.getheader("Set-Cookie") is None  # no auth token returned
        assert "text/xml" in response.getheader("Content-Type").lower()
        assert "charset=utf-8" in response.getheader("Content-Type").lower()
    
        body = response.read()
        xml = etree.fromstring(body)
    
        # verify unauthenticated call to script ok_anonymous
        assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
        assert xml.xpath("/Response/script/call")[0].text == "ok_anonymous"
        assert xml.xpath("/Response/script/stdout")[0].text == "ok_anonymous"
        assert_stderr(xml.xpath("/Response/script/stderr")[0].text)
        assert xml.xpath("/Response/script/@code")[0] == "0"
    
    
    @mark.local_server
    def test_evil_path():
        """Test that we can't "escape" from the defined bin dirs, i.e.,
        can't execute scripts in arbitrary locations.
    
        """
    
        # Set SSS bin dir to "subdir" sub directory
        admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                                  os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER,
                                               "subdir"))
        # The parent directory is not in the list of SSS bin dirs, so the
        # server must not allow the execution of ok.
        with raises(HTTPResourceNotFoundError):
            r = run_server_side_script("../ok")