Skip to content
Snippets Groups Projects
Select Git revision
  • fc7ca36332f8e7b915288eff52483e268150be4c
  • main default protected
  • dev protected
  • f-string-ids
  • f-empty
  • caosdb-cpplib-v0.2.2
  • caosdb-server-v0.9.0
  • caosdb-cpplib-v0.2.1
  • caosdb-cpplib-v0.2.0
  • caosdb-server-v0.8.0
  • caosdb-cpplib-v0.1.2
  • caosdb-server-v0.7.3
  • caosdb-cpplib-v0.1
  • caosdb-server-v0.7.2
14 results

test_async.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_authentication.py 11.54 KiB
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    # Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
    # Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    
    import os
    import time
    from sys import hexversion
    from urllib.parse import urlparse
    from http.client import HTTPSConnection
    import ssl
    from subprocess import call
    from lxml import etree
    from pytest import raises, mark
    from caosdb.exceptions import LoginFailedException
    import caosdb as db
    from .test_server_side_scripting import request
    
    
    _USED_OTA_TOKEN = set()
    
    
    def setup():
        db.configure_connection()
    
        # deactivate anonymous user
        db.administration.set_server_property("AUTH_OPTIONAL", "FALSE")
        d = db.execute_query("FIND Test*")
        if len(d) > 0:
            d.delete()
    
    
    def teardown():
        setup()
    
    
    @mark.skipif(
        not db.get_config().has_option("Connection", "password_method")
        or not db.get_config().get("Connection", "password_method") == "pass",
        reason="password_method is not pass")
    def test_pass():
        assert call(["pass", db.get_config().get("Connection",
                                                 "password_identifier")]) == 0
    
    
    def test_https_support():
        if 0x02999999 < hexversion < 0x03020000:
            raise Exception("version " + str(hex(hexversion)))
    
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        context.verify_mode = ssl.CERT_REQUIRED
        if hasattr(context, "check_hostname"):
            context.check_hostname = True
        context.load_verify_locations(db.get_config().get("Connection", "cacert"))
    
        url = db.get_config().get("Connection", "url")
        fullurl = urlparse(url)
    
        http_con = HTTPSConnection(
            str(fullurl.netloc), timeout=200, context=context)
    
        http_con.request(method="GET", headers={}, url=str(fullurl.path) + "Info")
        r = http_con.getresponse()
        print(r.read())
    
    
    def test_login_via_post_form_data_failure():
        with raises(LoginFailedException):
            db.get_connection().post_form_data(
                "login", {
                    "username": db.get_config().get("Connection", "username"),
                    "password": "wrongpassphrase"
                })
    
    
    def test_anonymous_not_returning_auth_token():
        # activate anonymous user
        db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
    
        response = request(method="GET", headers={}, path="Entity")
        assert response.getheader("Set-Cookie") is None  # no auth token returned
    
    
    def test_anonymous_setter():
        """This test verifies that the "test_login_while_anonymous_is_active" is
        effective."""
    
        # activate anonymous user
        db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
    
        # connect without auth-token
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        context.verify_mode = ssl.CERT_REQUIRED
        context.load_verify_locations(db.get_config().get("Connection", "cacert"))
    
        url = db.get_config().get("Connection", "url")
        fullurl = urlparse(url)
    
        http_con = HTTPSConnection(
            str(fullurl.netloc), timeout=200, context=context)
    
        http_con.request(method="GET", headers={}, url=str(fullurl.path) + "Info")
        body = http_con.getresponse().read()
    
        xml = etree.fromstring(body)
        # verify unauthenticated
        assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
    
    
    def test_login_while_anonymous_is_active():
        # activate anonymous user
        db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
    
        # logout
        db.get_connection()._logout()
    
        body = db.get_connection().retrieve(
            entity_uri_segments=["Entity"],
            reconnect=True).read()
        xml = etree.fromstring(body)
    
        # pylib did the login even though the anonymous user is active
        assert xml.xpath(
            "/Response/UserInfo/Roles/Role")[0].text == "administration"
    
    
    def test_authtoken_config():
        assert db.administration.get_server_property(
            "AUTHTOKEN_CONFIG") == "conf/core/authtoken.example.yaml"
    
    
    def get_one_time_token(testcase):
        username = "anonymous"
        realm = "OneTimeAuthenticationToken"
        roles = ["administration"]
        permissions = []
        filename = db.get_config().get("IntegrationTests",
                                       "test_authentication.{}".format(testcase))
        assert os.path.isdir(os.path.split(filename)[0])
        assert os.path.isfile(filename)
        with open(filename, "r") as f:
            auth_token = f.read()
        while auth_token in _USED_OTA_TOKEN:
            # wait until the server has renewed the token
            time.sleep(1)
            with open(filename, "r") as f:
                auth_token = f.read()
        else:
            _USED_OTA_TOKEN.add(auth_token)
    
        assert auth_token.startswith('["O","{re}","{us}",["{rol}"],{perm},'.format(
            re=realm, us=username, rol=",".join(roles), perm=permissions))
        return auth_token
    
    
    def test_one_time_token():
        assert db.Info().user_info.roles == ["administration"]
        assert db.Info().user_info.name == db.get_config().get("Connection", "username")
    
        auth_token = get_one_time_token("admin_token_crud")
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
    
        assert db.Info().user_info.roles == ["administration"]
        assert db.Info().user_info.name == "anonymous"
        assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
    
        db.configure_connection()
    
        assert db.Info().user_info.roles == ["administration"]
        assert db.Info().user_info.name == db.get_config().get("Connection",
                                                               "username")
    
    
    def test_one_time_token_invalid():
        auth_token = get_one_time_token("admin_token_crud")
        auth_token = auth_token.replace("[]", '["permission"]')
    
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        with raises(db.LoginFailedException) as lfe:
            db.Info()
        assert lfe.value.args[0] == (
            "The authentication token is expired or you have been logged out otherwise. The auth_token "
            "authenticator cannot log in again. You must provide a new authentication token.")
    
        # also raises exception when anonymous is enabled
        db.configure_connection()
        db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        with raises(db.LoginFailedException) as lfe:
            db.Info()
        assert lfe.value.args[0] == (
            "The authentication token is expired or you have been logged out otherwise. The auth_token "
            "authenticator cannot log in again. You must provide a new authentication token.")
    
    
    def test_one_time_token_expired():
        auth_token = get_one_time_token("admin_token_expired")
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        with raises(db.LoginFailedException) as lfe:
            db.Info()
        assert lfe.value.args[0] == (
            "The authentication token is expired or you have been logged out otherwise. The auth_token "
            "authenticator cannot log in again. You must provide a new authentication token.")
    
        # also raises exception when anonymous is enabled
        db.configure_connection()
        db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        with raises(db.LoginFailedException) as lfe:
            db.Info()
        assert lfe.value.args[0] == (
            "The authentication token is expired or you have been logged out otherwise. The auth_token "
            "authenticator cannot log in again. You must provide a new authentication token.")
    
    
    def test_one_time_token_3_attempts():
        auth_token = get_one_time_token("admin_token_3_attempts")
    
        # 1st
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        assert db.get_connection()._authenticator.auth_token == auth_token
        assert db.Info().user_info.roles == ["administration"]
        assert db.get_connection()._authenticator.auth_token != auth_token
        assert db.Info().user_info.name == "anonymous"
        assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
    
        # 2nd
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        assert db.get_connection()._authenticator.auth_token == auth_token
        assert db.Info().user_info.roles == ["administration"]
        assert db.get_connection()._authenticator.auth_token != auth_token
        assert db.Info().user_info.name == "anonymous"
        assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
    
        # 3rd
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        assert db.get_connection()._authenticator.auth_token == auth_token
        assert db.Info().user_info.roles == ["administration"]
        assert db.get_connection()._authenticator.auth_token != auth_token
        assert db.Info().user_info.name == "anonymous"
        assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
    
        # 4th
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        assert db.get_connection()._authenticator.auth_token == auth_token
        with raises(db.LoginFailedException) as lfe:
            db.Info()
        assert lfe.value.args[0] == (
            "The authentication token is expired or you have been logged out otherwise. The auth_token "
            "authenticator cannot log in again. You must provide a new authentication token.")
    
        db.configure_connection()
        db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
    
        # 5th attempt, also raises error when anonymous user is enabled
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
        assert db.get_connection()._authenticator.auth_token == auth_token
        with raises(db.LoginFailedException) as lfe:
            db.Info()
        assert lfe.value.args[0] == (
            "The authentication token is expired or you have been logged out otherwise. The auth_token "
            "authenticator cannot log in again. You must provide a new authentication token.")
    
    
    def test_crud_with_one_time_token():
        auth_token = get_one_time_token("admin_token_crud")
        db.configure_connection(password_method="auth_token",
                                auth_token=auth_token)
    
        # CREATE
        rt = db.RecordType(name="TestRT")
        rt.insert()
        assert rt.id == db.execute_query("FIND TestRT", unique=True).id
    
        # UPDATE
        assert db.execute_query("FIND TestRT", unique=True).description is None
        rt.description = "new desc"
        rt.update()
        assert rt.description == db.execute_query(
            "FIND TestRT", unique=True).description
    
        # RETRIEVE
        rt.retrieve()
        assert rt.id == db.execute_query("FIND TestRT", unique=True).id
    
        rt.delete()
        assert len(db.execute_query("FIND TestRT")) == 0