# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""test_server_side_scripting

Integration tests for the implementation of the server-side-scripting api.
"""
from __future__ import print_function, unicode_literals

import json
import os
import ssl
import tempfile
from http.client import HTTPSConnection

from caosdb import Info, RecordType
from caosdb import administration as admin
from caosdb import execute_query, get_config, get_connection
from caosdb.connection.encode import MultipartParam, multipart_encode
from caosdb.connection.utils import urlencode, urlparse
from caosdb.exceptions import HTTPClientError, HTTPResourceNotFoundError
from caosdb.utils.server_side_scripting import run_server_side_script
from lxml import etree
from pytest import mark, raises

_TEST_SCRIPTS = ["not_executable", "ok", "err", "ok_anonymous"]

try:
    _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = get_config().get(
        "IntegrationTests",
        "test_server_side_scripting.bin_dir.local")
except Exception:
    _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = tempfile.mkdtemp()
try:
    _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = get_config().get(
        "IntegrationTests",
        "test_server_side_scripting.bin_dir.server")
except Exception:
    _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = ""
_TEST_SCRIPTS_DIR = "./resources/"
_REMOVE_FILES_AFTERWARDS = []
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR = ""


def clean_database():
    admin._set_permissions("anonymous", [])
    d = execute_query("FIND ENTITY WITH ID > 99")

    if len(d) > 0:
        d.delete()


def setup():
    clean_database()


def teardown():
    admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                              _ORIGINAL_SERVER_SCRIPTING_BIN_DIR)
    clean_database()


def setup_module():
    global _ORIGINAL_SERVER_SCRIPTING_BIN_DIR
    _ORIGINAL_SERVER_SCRIPTING_BIN_DIR = admin.get_server_property(
        "SERVER_SIDE_SCRIPTING_BIN_DIRS")
    clean_database()

    from os import makedirs
    from os.path import exists, isdir, join
    from shutil import copyfile, copymode
    print("bin dir (local): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL))
    print("bin dir (server): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER))
    print("tests scripts: " + str(_TEST_SCRIPTS))

    if not exists(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL):
        makedirs(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
        _REMOVE_FILES_AFTERWARDS.append(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
    assert isdir(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)

    for script_file in _TEST_SCRIPTS:
        target = join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, script_file)
        src = join(_TEST_SCRIPTS_DIR, script_file)
        copyfile(src, target)
        copymode(src, target)
        _REMOVE_FILES_AFTERWARDS.append(target)


def teardown_module():
    from os import remove
    from os.path import exists, isdir
    from shutil import rmtree

    for obsolete in _REMOVE_FILES_AFTERWARDS:
        if exists(obsolete):
            if isdir(obsolete):
                rmtree(obsolete)
            else:
                remove(obsolete)
    clean_database()


def test_call_script_non_existing():
    form = dict()
    form["call"] = "non_existing_script"
    with raises(HTTPResourceNotFoundError):
        get_connection().post_form_data("scripting", form)


@mark.local_server
def test_call_script_not_executable():
    admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                              _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
    form = dict()
    form["call"] = "not_executable"
    with raises(HTTPClientError) as exc_info:
        get_connection().post_form_data("scripting", form)
    assert "not executable" in exc_info.value.body.decode("utf-8")


@mark.local_server
def test_call_ok():
    admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                              _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
    form = dict()
    form["call"] = "ok"
    r = get_connection().post_form_data("scripting", form)
    xml = etree.parse(r)
    assert xml.xpath("/Response/script/call")[0].text == "ok"
    assert xml.xpath("/Response/script/stdout")[0].text == "ok"
    assert xml.xpath("/Response/script/stderr")[0].text is None
    assert xml.xpath("/Response/script/@code")[0] == "0"


@mark.local_server
def test_call_err():
    admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                              _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
    form = dict()
    form["call"] = "err"
    r = get_connection().post_form_data("scripting", form)
    xml = etree.parse(r)
    assert xml.xpath("/Response/script/@code")[0] == "1"
    assert xml.xpath("/Response/script/call")[0].text == "err"
    assert xml.xpath("/Response/script/stdout")[0].text is None
    assert xml.xpath("/Response/script/stderr")[0].text == "err"


def test_run_server_side_script_with_file_as_positional_param():
    RecordType("TestRT").insert()
    _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
    with open("test_file.txt", "w") as f:
        f.write("this is a test")

    response = run_server_side_script("administration/diagnostics.py",
                                      "pos0",
                                      "pos1",
                                      exit="123",
                                      query="COUNT TestRT",
                                      files={"-p2": "test_file.txt"})
    assert response.stderr is None
    assert response.code == 123
    assert response.call == ('administration/diagnostics.py '
                             '--exit=123 --query=COUNT TestRT '
                             'pos0 pos1 .upload_files/test_file.txt')

    json_data = json.loads(response.stdout)
    assert "caosdb" in json_data
    assert "query" in json_data["caosdb"]
    assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"]
    assert "./.upload_files/test_file.txt" in json_data["files"]


def test_run_server_side_script_with_additional_file():
    RecordType("TestRT").insert()
    _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
    with open("test_file.txt", "w") as f:
        f.write("this is a test")

    response = run_server_side_script("administration/diagnostics.py",
                                      "pos0",
                                      "pos1",
                                      exit="123",
                                      query="COUNT TestRT",
                                      files={"dummykey": "test_file.txt"})
    assert response.stderr is None
    assert response.code == 123
    assert response.call == ('administration/diagnostics.py '
                             '--exit=123 --query=COUNT TestRT '
                             'pos0 pos1')

    json_data = json.loads(response.stdout)
    assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"]
    assert "./.upload_files/test_file.txt" in json_data["files"]


def test_diagnostics_basic():
    RecordType("TestRT").insert()

    form = dict()
    form["call"] = "administration/diagnostics.py"
    form["-Oexit"] = "123"
    form["-Oquery"] = "COUNT TestRT"

    response = get_connection().post_form_data("scripting", form)
    xml = etree.parse(response)
    print(etree.tostring(xml))

    assert response.status == 200  # ok
    assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'

    diagnostics = xml.xpath("/Response/script/stdout")[0].text
    assert diagnostics is not None
    diagnostics = json.loads(diagnostics)
    assert diagnostics["python_version"] is not None
    assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                        " in server's python path")
    assert diagnostics["auth_token"] is not None
    EXC_ERR = ("There shouldn't be any exception during the diagnostics of "
               "the interaction of the server-side script and the server.")
    assert "exception" not in diagnostics["caosdb"], EXC_ERR

    assert "query" in diagnostics["caosdb"]
    assert diagnostics["caosdb"]["query"][0] == "COUNT TestRT"
    assert diagnostics["caosdb"]["query"][1] == "1", ("The RecordType should "
                                                      "have been found.")
    assert xml.xpath("/Response/script/@code")[0] == "123", ("The script "
                                                             "should exit "
                                                             "with code 123.")
    assert xml.xpath("/Response/script/call")[0].text.startswith(
        "administration/diagnostics.py")
    assert xml.xpath("/Response/script/stderr")[0].text is None


def test_diagnostics_with_file_upload():
    RecordType("TestRT").insert()
    _REMOVE_FILES_AFTERWARDS.append("test_file.txt")
    with open("test_file.txt", "w") as f:
        f.write("this is a test")

    parts = []
    parts.append(MultipartParam.from_file(paramname="txt_file",
                                          filename="test_file.txt"))
    parts.append(MultipartParam("call", "administration/diagnostics.py"))
    body, headers = multipart_encode(parts)
    response = get_connection().insert(
        ["scripting"], body=body, headers=headers)
    assert response.status == 200  # ok
    assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'

    xml = etree.parse(response)
    print(etree.tostring(xml))
    assert xml.xpath("/Response/script/@code")[0] == "0"

    diagnostics = xml.xpath("/Response/script/stdout")[0].text
    assert diagnostics is not None
    diagnostics = json.loads(diagnostics)
    assert diagnostics["python_version"] is not None
    assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                        " in server's python path")
    assert diagnostics["auth_token"] is not None
    assert "exception" not in diagnostics["caosdb"], ("There shouldn't be any "
                                                      "exception during the "
                                                      "diagnostics of the "
                                                      "interaction of the "
                                                      "server-side "
                                                      "script and the server.")

    assert xml.xpath("/Response/script/@code")[0] == "0", ("The script "
                                                           "should exit "
                                                           "with code 0.")
    assert xml.xpath("/Response/script/call")[0].text.startswith(
        "administration/diagnostics.py")
    assert xml.xpath("/Response/script/stderr")[0].text is None


@mark.local_server
def test_call_as_anonymous_with_administration_role():
    assert Info().user_info.roles == ["administration"]

    # activate anonymous user
    admin.set_server_property("AUTH_OPTIONAL", "TRUE")
    # give permission to call diagnostics.py
    admin._set_permissions(
        "anonymous", [
            admin.PermissionRule(
                "Grant", "SCRIPTING:EXECUTE:administration:diagnostics.py")])

    form = dict()
    form["call"] = "administration/diagnostics.py"

    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    response = request(method="POST", headers=headers, path="scripting",
                       body=urlencode(form))

    xml = etree.parse(response)

    assert response.getheader("Set-Cookie") is None  # no auth token returned
    assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'

    assert response.status == 200  # ok
    assert xml.xpath("/Response/script/@code")[0] == "0"
    diagnostics = xml.xpath("/Response/script/stdout")[0].text
    assert diagnostics is not None
    diagnostics = json.loads(diagnostics)
    assert diagnostics["python_version"] is not None
    assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
                                                        " in server's python path")
    assert diagnostics["auth_token"] is not None
    assert "exception" not in diagnostics["caosdb"]


def request(method, headers, path, body=None):
    """ Connect without auth-token. This is clumsy, bc the pylib is not
    intended to be used as anonymous user.
    """
    context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    context.verify_mode = ssl.CERT_REQUIRED

    if hasattr(context, "check_hostname"):
        context.check_hostname = True
    context.load_verify_locations(get_config().get("Connection", "cacert"))

    url = get_config().get("Connection", "url")
    fullurl = urlparse(url)

    http_con = HTTPSConnection(
        str(fullurl.netloc), timeout=200, context=context)
    http_con.request(method=method, headers=headers, url=str(fullurl.path) +
                     path, body=body)

    return http_con.getresponse()


def test_anonymous_script_calling_not_permitted():
    form = dict()
    form["call"] = "ok"

    # activate anonymous user
    admin.set_server_property("AUTH_OPTIONAL", "TRUE")

    # remove all anonymous permissions
    admin._set_permissions("anonymous", [
        admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok"),
        admin.PermissionRule("Deny", "*")
    ])

    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    response = request(method="POST", headers=headers, path="scripting",
                       body=urlencode(form))
    assert response.status == 403  # forbidden
    assert response.getheader("Set-Cookie") is None  # no auth token returned


@ mark.local_server
def test_anonymous_script_calling_success():
    admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                              _SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
    form = dict()
    form["call"] = "ok_anonymous"

    # activate anonymous user
    admin.set_server_property("AUTH_OPTIONAL", "TRUE")
    # give permission to call ok_anonymous
    admin._set_permissions("anonymous",
                           [admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok_anonymous")])

    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    response = request(method="POST", headers=headers, path="scripting",
                       body=urlencode(form))
    assert response.status == 200  # ok
    assert response.getheader("Set-Cookie") is None  # no auth token returned
    assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'

    body = response.read()
    xml = etree.fromstring(body)

    # verify unauthenticated call to script ok_anonymous
    assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
    assert xml.xpath("/Response/script/call")[0].text == "ok_anonymous"
    assert xml.xpath("/Response/script/stdout")[0].text == "ok_anonymous"
    assert xml.xpath("/Response/script/stderr")[0].text is None
    assert xml.xpath("/Response/script/@code")[0] == "0"


@mark.local_server
def test_evil_path():
    subdir = os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, "subdir")

    if not os.path.exists(subdir):
        os.makedirs(subdir)
        _REMOVE_FILES_AFTERWARDS.append(subdir)
    admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIRS",
                              os.path.join(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER,
                                           "subdir"))

    # ok exists one level up from "subdir"
    assert os.path.exists(
        os.path.join(
            _SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL,
            "ok"))

    with raises(HTTPResourceNotFoundError):
        r = run_server_side_script("../ok")