Select Git revision
test_async.cpp
-
Daniel Hornung authoredDaniel Hornung authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_authentication.py 11.54 KiB
# encoding: utf-8
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
import os
import time
from sys import hexversion
from urllib.parse import urlparse
from http.client import HTTPSConnection
import ssl
from subprocess import call
from lxml import etree
from pytest import raises, mark
from caosdb.exceptions import LoginFailedException
import caosdb as db
from .test_server_side_scripting import request
_USED_OTA_TOKEN = set()
def setup():
db.configure_connection()
# deactivate anonymous user
db.administration.set_server_property("AUTH_OPTIONAL", "FALSE")
d = db.execute_query("FIND Test*")
if len(d) > 0:
d.delete()
def teardown():
setup()
@mark.skipif(
not db.get_config().has_option("Connection", "password_method")
or not db.get_config().get("Connection", "password_method") == "pass",
reason="password_method is not pass")
def test_pass():
assert call(["pass", db.get_config().get("Connection",
"password_identifier")]) == 0
def test_https_support():
if 0x02999999 < hexversion < 0x03020000:
raise Exception("version " + str(hex(hexversion)))
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_REQUIRED
if hasattr(context, "check_hostname"):
context.check_hostname = True
context.load_verify_locations(db.get_config().get("Connection", "cacert"))
url = db.get_config().get("Connection", "url")
fullurl = urlparse(url)
http_con = HTTPSConnection(
str(fullurl.netloc), timeout=200, context=context)
http_con.request(method="GET", headers={}, url=str(fullurl.path) + "Info")
r = http_con.getresponse()
print(r.read())
def test_login_via_post_form_data_failure():
with raises(LoginFailedException):
db.get_connection().post_form_data(
"login", {
"username": db.get_config().get("Connection", "username"),
"password": "wrongpassphrase"
})
def test_anonymous_not_returning_auth_token():
# activate anonymous user
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
response = request(method="GET", headers={}, path="Entity")
assert response.getheader("Set-Cookie") is None # no auth token returned
def test_anonymous_setter():
"""This test verifies that the "test_login_while_anonymous_is_active" is
effective."""
# activate anonymous user
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
# connect without auth-token
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(db.get_config().get("Connection", "cacert"))
url = db.get_config().get("Connection", "url")
fullurl = urlparse(url)
http_con = HTTPSConnection(
str(fullurl.netloc), timeout=200, context=context)
http_con.request(method="GET", headers={}, url=str(fullurl.path) + "Info")
body = http_con.getresponse().read()
xml = etree.fromstring(body)
# verify unauthenticated
assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
def test_login_while_anonymous_is_active():
# activate anonymous user
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
# logout
db.get_connection()._logout()
body = db.get_connection().retrieve(
entity_uri_segments=["Entity"],
reconnect=True).read()
xml = etree.fromstring(body)
# pylib did the login even though the anonymous user is active
assert xml.xpath(
"/Response/UserInfo/Roles/Role")[0].text == "administration"
def test_authtoken_config():
assert db.administration.get_server_property(
"AUTHTOKEN_CONFIG") == "conf/core/authtoken.example.yaml"
def get_one_time_token(testcase):
username = "anonymous"
realm = "OneTimeAuthenticationToken"
roles = ["administration"]
permissions = []
filename = db.get_config().get("IntegrationTests",
"test_authentication.{}".format(testcase))
assert os.path.isdir(os.path.split(filename)[0])
assert os.path.isfile(filename)
with open(filename, "r") as f:
auth_token = f.read()
while auth_token in _USED_OTA_TOKEN:
# wait until the server has renewed the token
time.sleep(1)
with open(filename, "r") as f:
auth_token = f.read()
else:
_USED_OTA_TOKEN.add(auth_token)
assert auth_token.startswith('["O","{re}","{us}",["{rol}"],{perm},'.format(
re=realm, us=username, rol=",".join(roles), perm=permissions))
return auth_token
def test_one_time_token():
assert db.Info().user_info.roles == ["administration"]
assert db.Info().user_info.name == db.get_config().get("Connection", "username")
auth_token = get_one_time_token("admin_token_crud")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.Info().user_info.roles == ["administration"]
assert db.Info().user_info.name == "anonymous"
assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
db.configure_connection()
assert db.Info().user_info.roles == ["administration"]
assert db.Info().user_info.name == db.get_config().get("Connection",
"username")
def test_one_time_token_invalid():
auth_token = get_one_time_token("admin_token_crud")
auth_token = auth_token.replace("[]", '["permission"]')
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
# also raises exception when anonymous is enabled
db.configure_connection()
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
def test_one_time_token_expired():
auth_token = get_one_time_token("admin_token_expired")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
# also raises exception when anonymous is enabled
db.configure_connection()
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
def test_one_time_token_3_attempts():
auth_token = get_one_time_token("admin_token_3_attempts")
# 1st
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
assert db.Info().user_info.roles == ["administration"]
assert db.get_connection()._authenticator.auth_token != auth_token
assert db.Info().user_info.name == "anonymous"
assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
# 2nd
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
assert db.Info().user_info.roles == ["administration"]
assert db.get_connection()._authenticator.auth_token != auth_token
assert db.Info().user_info.name == "anonymous"
assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
# 3rd
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
assert db.Info().user_info.roles == ["administration"]
assert db.get_connection()._authenticator.auth_token != auth_token
assert db.Info().user_info.name == "anonymous"
assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
# 4th
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
db.configure_connection()
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
# 5th attempt, also raises error when anonymous user is enabled
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
def test_crud_with_one_time_token():
auth_token = get_one_time_token("admin_token_crud")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
# CREATE
rt = db.RecordType(name="TestRT")
rt.insert()
assert rt.id == db.execute_query("FIND TestRT", unique=True).id
# UPDATE
assert db.execute_query("FIND TestRT", unique=True).description is None
rt.description = "new desc"
rt.update()
assert rt.description == db.execute_query(
"FIND TestRT", unique=True).description
# RETRIEVE
rt.retrieve()
assert rt.id == db.execute_query("FIND TestRT", unique=True).id
rt.delete()
assert len(db.execute_query("FIND TestRT")) == 0