Skip to content
Snippets Groups Projects
Commit fcc6489c authored by Timm Fitschen's avatar Timm Fitschen
Browse files

Merge branch 'f-one-time-tokens' into 'dev'

f-one-time-tokens -> dev

# Summary

One-time tokens are access tokens which can be used to authenticate and authorize a caosdb client. One-time tokens can be used once to sign in and create a session token for further communication with the server. After that they expire immediately. However, the one-time token if they are not used before a particular date they expire even before anyone has used them at all. Thus, one-time tokens work as if they were one-time passwords with the additional feature that they can specify a set of roles and explicit permissions of the resulting session.

One-time tokens have two major use-cases. The first is the authentication and authorization of sessions by clients (e.g. crawler). They can read out a one-time token that is being written to a file by the server. The authentication token can be fed into the `configure_connection` function of the pylib and the client can authenticate without a password.

The other one is the authentication and authorization of server-side scripts which are triggered by a user who does not necessarily have the permissions that the script needs for it's work. E.g the anonymous user is allowed to use scripts that need some specific update permissions. Effectively, one-time tokens can be used to promote a caller of a server-side script from a unprivileged client to a client with more permissions (for this one session). However, the script calling is encapsulated in such a way, that only the script runs with these promoted permissions. The caller does not gain any more permissions apart from that.

This MR also depends on https://gitlab.com/caosdb/caosdb-server/-/merge_requests/40 and https://gitlab.com/caosdb/caosdb-pylib/-/merge_requests/28

# Focus

The focus should be the new tests in `test_adminstration` which test the usage of one-time tokens in the client (1. use-case) and the new tests in `test_server_side_scripting` which test calling a server-side script as unprivileged user (2. use-case). 

# Test Environment

Just check the pipeline. You will notice, that the server-side scripting tests fail. However, that is due to a missing/wrong pycaosdb.ini in the home directory of the server-side scripting. Relevant for this MR is that the server-side scripts have been called with the correct authtoken, which seems to be the case.

# Check List for the Author

Please, prepare your MR for a review. Be sure to write a summary and a
focus and create gitlab comments for the reviewer. They should guide the
reviewer through the changes, explain your changes and also point out open
questions. For further good practices have a look at [our review
guidelines](https://gitlab.com/caosdb/caosdb/-/blob/dev/REVIEW_GUIDELINES.md)

- [x] All automated tests pass
- [x] Reference related Issues
- [x] Up-to-date CHANGELOG.md
- [x] Annotations in code (Gitlab comments)
  - Intent of new code
  - Problems with old code
  - Why this implementation?


# Check List for the Reviewer


- [x] I understand the intent of this MR
- [ ] All automated tests pass
- [x] Up-to-date CHANGELOG.md
- [x] The test environment setup works and the intended behavior is
  reproducible in the test environment
- [x] In-code documentation and comments are up-to-date.
- [x] Check: Are there spezifications? Are they satisfied?

For further good practices have a look at [our review guidelines](https://gitlab.com/caosdb/caosdb/-/blob/dev/REVIEW_GUIDELINES.md).

See merge request caosdb/caosdb-pyinttest!29
parents d707cf45 08881afc
No related branches found
No related tags found
No related merge requests found
......@@ -18,8 +18,3 @@ COPY wait-for-it.sh /opt/caosdb/wait-for-it.sh
WORKDIR /opt/caosdb
RUN mkdir -p /opt/caosdb/build_docker/
CMD /bin/bash
# python client
ADD https://gitlab.com/api/v4/projects/13656973/repository/branches/dev \
pylib_version.json
RUN pip3 install git+https://gitlab.com/caosdb/caosdb-pylib
......@@ -15,6 +15,18 @@ ADD https://gitlab.com/api/v4/projects/13656973/repository/commits/${PYLIB} \
RUN git clone https://gitlab.com/caosdb/caosdb-pylib.git && \
cd caosdb-pylib && git checkout ${PYLIB} && pip3 install .
COPY . /git
RUN rm -r /git/.git && mv /git/.docker/pycaosdb.ini /git
# Delete .git because it is huge.
RUN rm -r /git/.git
# Install pycaosdb.ini for the tests
RUN mv /git/.docker/tester_pycaosdb.ini /git/pycaosdb.ini
WORKDIR /git
CMD /wait-for-it.sh caosdb-server:10443 -t 500 -- tox
# wait for server,
CMD /wait-for-it.sh caosdb-server:10443 -t 500 -- \
# ... install pycaosdb.ini the server-side scripts
cp /git/.docker/sss_pycaosdb.ini /scripting/home/.pycaosdb.ini && \
# ... and run tests
tox
......@@ -22,16 +22,21 @@ services:
target: /opt/caosdb/mnt/extroot
- type: volume
source: scripting
target: /opt/caosdb/git/caosdb-server/scripting/bin
target: /opt/caosdb/git/caosdb-server/scripting
- type: volume
source: authtoken
target: /opt/caosdb/git/caosdb-server/authtoken
ports:
# - "from_outside:from_inside"
- "10443:10443"
- "10080:10080"
environment:
DEBUG: 1
CAOSDB_CONFIG_AUTHTOKEN_CONFIG: "conf/core/authtoken.example.yaml"
volumes:
scripting:
extroot:
authtoken:
networks:
caosnet:
driver: bridge
[IntegrationTests]
test_server_side_scripting.bin_dir=/scripting-bin/
# location of the files from the pyinttest perspective
test_files.test_insert_files_in_dir.local=/extroot/test_insert_files_in_dir/
# location of the files from the caosdb_servers perspective
test_files.test_insert_files_in_dir.server=/opt/caosdb/mnt/extroot/test_insert_files_in_dir/
[Connection]
url=https://caosdb-server:10443/
username=admin
cacert=/cert/caosdb.cert.pem
#cacert=/etc/ssl/cert.pem
debug=0
passwordmethod=plain
password=caosdb
ssl_insecure=True
timeout=500
[Container]
debug=0
; this is the pycaosdb.ini for the server-side-scripting home.
[Connection]
url = https://caosdb-server:10443
cacert = /opt/caosdb/cert/caosdb.cert.pem
debug = 0
timeout = 5000
[Misc]
sendmail = /usr/local/bin/sendmail_to_file
......@@ -13,10 +13,14 @@ services:
target: /extroot
- type: volume
source: scripting
target: /scripting-bin
target: /scripting
- type: volume
source: authtoken
target: /authtoken
networks:
docker_caosnet:
external: true
volumes:
scripting:
extroot:
authtoken:
; pycaosdb.ini for pytest test suites.
[IntegrationTests]
; location of the scripting bin dir which is used for the test scripts from the
; server's perspective.
test_server_side_scripting.bin_dir.server = scripting/bin-debug/
; location of the scripting bin dir which is used for the test scripts from the
; pyinttest's perspective.
test_server_side_scripting.bin_dir.local = /scripting/bin-debug/
; location of the files from the pyinttest perspective
test_files.test_insert_files_in_dir.local = /extroot/test_insert_files_in_dir/
; location of the files from the caosdb_servers perspective
test_files.test_insert_files_in_dir.server = /opt/caosdb/mnt/extroot/test_insert_files_in_dir/
; location of the one-time tokens from the pyinttest's perspective
test_authentication.admin_token_crud = /authtoken/admin_token_crud.txt
test_authentication.admin_token_expired = /authtoken/admin_token_expired.txt
test_authentication.admin_token_3_attempts = /authtoken/admin_token_3_attempts.txt
[Connection]
url = https://caosdb-server:10443/
username = admin
cacert = /cert/caosdb.cert.pem
debug = 0
passwordmethod = plain
password = caosdb
timeout = 500
......@@ -21,6 +21,7 @@
variables:
CI_REGISTRY_IMAGE: $CI_REGISTRY/caosdb/caosdb-pyinttest/testenv:latest
CI_REGISTRY_IMAGE_BASE: $CI_REGISTRY/caosdb/caosdb-pyinttest/base:latest
DEPLOY_REF: dev
stages:
- setup
......@@ -42,24 +43,27 @@ stages:
# | +-(caosdb-server)-------------------+ |
# | | | |
# | | /opt/caosdb | |
# | .---->| + /git/caosdb-server/scripting/ | |
# | | .--->| + /mnt/extroot | |
# | | | .->| + /cert | |
# | | | | | | |
# | | | | +-----------------------------------+ |
# | | | | |
# | | | | filesystem: |
# | | | *- /cert -----------. |
# | | | | |
# | | | volumes: | |
# | . *-- extroot ------. | |
# | *---- scripting --. | | |
# | | | | |
# | +-(caosdb-pyinttest)---+ | | | |
# | | | | | | |
# | | /scripting |<---* | | |
# | | /extroot |<----* . |
# | | /cert |<------* |
# | .------->| + /git/caosdb-server/scripting/ | |
# | | .----->| + /git/caosdb-server/authtoken/ | |
# | | | .--->| + /mnt/extroot | |
# | | | | .->| + /cert | |
# | | | | | | | |
# | | | | | +-----------------------------------+ |
# | | | | | |
# | | | | | filesystem: |
# | | | | *--- /cert -----------. |
# | | | | | |
# | | | | volumes: | |
# | | | *----- extroot ------. | |
# | | *------- scripting --. | | |
# | *--------- authtoken -. | | | |
# | | | | | |
# | +-(caosdb-pyinttest)---+ | | | | |
# | | | | | | | |
# | | /authtoken |<---* | | | |
# | | /scripting |<----* | | |
# | | /extroot |<------* | |
# | | /cert |<--------* |
# | | | |
# | +----------------------+ |
# +---------------------------------------------------+
......@@ -69,9 +73,8 @@ stages:
# pipeline and a certificate is created in there. The certificat is then
# available in mounted directories in the server and pyinttest containers.
#
# Two additional volumes in the root docker are shared by the caosdb-server and
# the caosdb-pyintest containers.
# These volumes are inteded to be used for testing server-side scripting and
# Additional volumes in the root docker are shared by the caosdb-server and the caosdb-pyintest
# containers. These volumes are intended to be used for testing server-side scripting and
# file-system features.
#
services:
......@@ -81,7 +84,9 @@ test:
tags: [docker]
stage: test
image: $CI_REGISTRY_IMAGE_BASE
needs: ["cert"]
script:
- ls /
- echo $F_BRANCH
- echo $CAOSDB_TAG
- echo $CI_COMMIT_REF_NAME
......@@ -92,23 +97,27 @@ test:
- if ! echo "$F_BRANCH" | grep -c "^f-" ; then
F_BRANCH=dev;
fi
- echo $F_BRANCH
- echo $CI_COMMIT_REF_NAME
- echo $CI_REGISTRY_IMAGE
- docker login -u gitlab+deploy-token-ci-pull -p $TOKEN_CI_PULL $CI_REGISTRY_INDISCALE
- if [[ "$CAOSDB_TAG" == "" ]]; then
if echo "$F_BRANCH" | grep -c "^f-" ; then
CAOSDB_TAG=dev_F_${F_BRANCH}-latest;
CAOSDB_TAG=${DEPLOY_REF}_F_${F_BRANCH}-latest;
docker pull $CI_REGISTRY_INDISCALE/caosdb/src/caosdb-deploy:$CAOSDB_TAG || CAOSDB_TAG=${DEPLOY_REF}-latest ;
else
CAOSDB_TAG=dev-latest;
CAOSDB_TAG=${DEPLOY_REF}-latest;
fi;
fi
# TODO default $CAOSDB_TAG to dev_F_$F_BRANCH-latest
- echo $F_BRANCH
- docker pull $CI_REGISTRY_INDISCALE/caosdb/src/caosdb-deploy:$CAOSDB_TAG || CAOSDB_TAG=dev-latest ;
- echo $CAOSDB_TAG
- echo $CI_COMMIT_REF_NAME
- time docker load < /image-cache/caosdb-pyint-testenv-${CI_COMMIT_REF_NAME}.tar || true
- time docker load < /image-cache/mariadb-${F_BRANCH}.tar || true
- time docker load < /image-cache/caosdb-${F_BRANCH}.tar || true
- docker login -u gitlab-ci-token -p $CI_JOB_TOKEN $CI_REGISTRY
- docker login -u gitlab+deploy-token-ci-pull -p $TOKEN_CI_PULL $CI_REGISTRY_INDISCALE
- docker pull $CI_REGISTRY_IMAGE
- cd .docker
# here the server and the mysql backend docker are being started
......@@ -134,6 +143,7 @@ build-testenv:
tags: [cached-dind]
image: docker:19.03
stage: setup
needs: []
script:
- df -h
- command -v wget
......@@ -165,6 +175,7 @@ cert:
tags: [docker]
stage: cert
image: $CI_REGISTRY_IMAGE
needs: ["build-testenv"]
artifacts:
paths:
- .docker/cert/
......@@ -175,6 +186,7 @@ cert:
style:
tags: [docker]
stage: style
needs: ["build-testenv"]
image: $CI_REGISTRY_IMAGE
script:
- autopep8 -r --diff --exit-code .
......
......@@ -8,7 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added (for new features)
- Tests for [#62](https://gitlab.com/caosdb/caosdb-server/-/issues/62)
* Tests for One-time Authentication Tokens
* Test for [caosdb-pylib#31](https://gitlab.com/caosdb/caosdb-pylib/-/issues/31)
* Tests for [caosdb-server#62](https://gitlab.com/caosdb/caosdb-server/-/issues/62)
in caosdb-server-project, i.e., renaming of a RecordType that should
be reflected in properties with that RT as datatype.
......@@ -20,7 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed (for any bug fixes)
- Tests for NaN Double Values (see https://gitlab.com/caosdb/caosdb-server/issues/41)
* Tests for NaN Double Values (see https://gitlab.com/caosdb/caosdb-server/issues/41)
* Tests for name queries. [caosdb-server#51](https://gitlab.com/caosdb/caosdb-server/-/issues/51)
### Security (in case of vulnerabilities)
......
#!/bin/bash
echo "ok_anonymous"
......@@ -27,7 +27,7 @@
"""
from caosdb import (administration as admin, get_config)
from nose.tools import assert_true, assert_equal, assert_is_not_none, with_setup, assert_raises
from nose.tools import assert_true, assert_equal, assert_is_not_none, assert_raises
from caosdb.exceptions import (ClientErrorException, TransactionError,
AuthorizationException, LoginFailedException)
from caosdb.connection.connection import get_connection, configure_connection
......@@ -96,12 +96,10 @@ def test_set_server_property():
assert admin.get_server_property("AUTH_OPTIONAL") == "FALSE"
@with_setup(setup, teardown)
def test_insert_role_success():
assert_true(admin._insert_role(name=test_role, description=test_role_desc))
assert admin._insert_role(name=test_role, description=test_role_desc)
@with_setup(setup, teardown)
def test_insert_role_failure_permission():
switch_to_normal_user()
with assert_raises(AuthorizationException) as cm:
......@@ -111,7 +109,6 @@ def test_insert_role_failure_permission():
"You are not permitted to insert a new role.")
@with_setup(setup, teardown)
def test_insert_role_failure_name_duplicates():
test_insert_role_success()
with assert_raises(ClientErrorException) as cm:
......@@ -121,7 +118,6 @@ def test_insert_role_failure_name_duplicates():
"Role name is already in use. Choose a different name.")
@with_setup(setup, teardown)
def test_update_role_success():
test_insert_role_success()
assert_is_not_none(
......@@ -131,7 +127,6 @@ def test_update_role_success():
"asdf"))
@with_setup(setup, teardown)
def test_update_role_failure_permissions():
test_insert_role_success()
switch_to_normal_user()
......@@ -142,20 +137,17 @@ def test_update_role_failure_permissions():
"You are not permitted to update this role.")
@with_setup(setup, teardown)
def test_update_role_failure_non_existing():
with assert_raises(TransactionError) as cm:
admin._update_role(name=test_role, description=test_role_desc + "asdf")
assert_equal(cm.exception.msg, "Role does not exist.")
@with_setup(setup, teardown)
def test_delete_role_success():
test_insert_role_success()
assert_true(admin._delete_role(name=test_role))
@with_setup(setup, teardown)
def test_delete_role_failure_permissions():
test_insert_role_success()
switch_to_normal_user()
......@@ -166,21 +158,18 @@ def test_delete_role_failure_permissions():
"You are not permitted to delete this role.")
@with_setup(setup, teardown)
def test_delete_role_failure_non_existing():
with assert_raises(TransactionError) as cm:
admin._delete_role(name=test_role)
assert_equal(cm.exception.msg, "Role does not exist.")
@with_setup(setup, teardown)
def test_retrieve_role_success():
test_insert_role_success()
r = admin._retrieve_role(test_role)
assert_is_not_none(r)
@with_setup(setup, teardown)
def test_retrieve_role_failure_permission():
test_insert_role_success()
switch_to_normal_user()
......@@ -191,14 +180,12 @@ def test_retrieve_role_failure_permission():
"You are not permitted to retrieve this role.")
@with_setup(setup, teardown)
def test_retrieve_role_failure_non_existing():
with assert_raises(TransactionError) as cm:
admin._retrieve_role(name=test_role)
assert_equal(cm.exception.msg, "Role does not exist.")
@with_setup(setup, teardown)
def test_set_permissions_success():
test_insert_role_success()
assert_true(
......@@ -210,7 +197,6 @@ def test_set_permissions_success():
"BLA:BLA:BLA")]))
@with_setup(setup, teardown)
def test_set_permissions_failure_permissions():
test_insert_role_success()
switch_to_normal_user()
......@@ -224,7 +210,6 @@ def test_set_permissions_failure_permissions():
"You are not permitted to set this role's permissions.")
@with_setup(setup, teardown)
def test_set_permissions_failure_non_existing():
with assert_raises(TransactionError) as cm:
admin._set_permissions(
......@@ -234,7 +219,6 @@ def test_set_permissions_failure_non_existing():
assert_equal(cm.exception.msg, "Role does not exist.")
@with_setup(setup, teardown)
def test_get_permissions_success():
test_set_permissions_success()
r = admin._get_permissions(role=test_role)
......@@ -242,7 +226,6 @@ def test_get_permissions_success():
assert_is_not_none(r)
@with_setup(setup, teardown)
def test_get_permissions_failure_permissions():
test_set_permissions_success()
switch_to_normal_user()
......@@ -253,14 +236,12 @@ def test_get_permissions_failure_permissions():
"You are not permitted to retrieve this role's permissions.")
@with_setup(setup, teardown)
def test_get_permissions_failure_non_existing():
with assert_raises(TransactionError) as cm:
admin._get_permissions(role="non-existing-role")
assert_equal(cm.exception.msg, "Role does not exist.")
@with_setup(setup, teardown)
def test_get_roles_success():
test_insert_role_success()
r = admin._get_roles(username=test_user)
......@@ -268,7 +249,6 @@ def test_get_roles_success():
return r
@with_setup(setup, teardown)
def test_get_roles_failure_permissions():
test_insert_role_success()
switch_to_normal_user()
......@@ -279,14 +259,12 @@ def test_get_roles_failure_permissions():
"You are not permitted to retrieve this user's roles.")
@with_setup(setup, teardown)
def test_get_roles_failure_non_existing():
with assert_raises(TransactionError) as cm:
admin._get_roles(username="non-existing-user")
assert_equal(cm.exception.msg, "User does not exist.")
@with_setup(setup, teardown)
def test_set_roles_success():
roles_old = test_get_roles_success()
roles = {test_role}
......@@ -296,7 +274,6 @@ def test_set_roles_success():
assert_is_not_none(admin._set_roles(username=test_user, roles=roles_old))
@with_setup(setup, teardown)
def test_set_roles_success_with_warning():
test_insert_role_success()
roles = {test_role}
......@@ -305,7 +282,6 @@ def test_set_roles_success_with_warning():
assert_is_not_none(admin._set_roles(username=test_user, roles=[]))
@with_setup(setup, teardown)
def test_set_roles_failure_permissions():
roles_old = test_get_roles_success()
roles = {test_role}
......@@ -333,7 +309,6 @@ def test_set_roles_failure_non_existing_user():
assert_equal(cm.exception.msg, "User does not exist.")
@with_setup(setup, teardown)
def test_insert_user_success():
admin._insert_user(
name=test_user + "2",
......@@ -343,7 +318,6 @@ def test_insert_user_success():
entity=None)
@with_setup(setup, teardown)
def test_insert_user_failure_permissions():
switch_to_normal_user()
with assert_raises(AuthorizationException) as cm:
......@@ -358,7 +332,6 @@ def test_insert_user_failure_permissions():
"You are not permitted to insert a new user.")
@with_setup(setup, teardown)
def test_insert_user_failure_name_in_use():
test_insert_user_success()
with assert_raises(ClientErrorException) as cm:
......@@ -366,13 +339,11 @@ def test_insert_user_failure_name_in_use():
assert_equal(cm.exception.msg, "User name is already in use.")
@with_setup(setup, teardown)
def test_delete_user_success():
test_insert_user_success()
assert_is_not_none(admin._delete_user(name=test_user + "2"))
@with_setup(setup, teardown)
def test_delete_user_failure_permissions():
test_insert_user_success()
switch_to_normal_user()
......@@ -383,14 +354,12 @@ def test_delete_user_failure_permissions():
"You are not permitted to delete this user.")
@with_setup(setup, teardown)
def test_delete_user_failure_non_existing():
with assert_raises(TransactionError) as cm:
admin._delete_user(name="non_existing_user")
assert_equal(cm.exception.msg, "User does not exist.")
@with_setup(setup, teardown)
def test_update_user_success_status():
assert_is_not_none(
admin._insert_user(
......@@ -408,7 +377,6 @@ def test_update_user_success_status():
entity=None)
@with_setup(setup, teardown)
def test_update_user_success_email():
assert_is_not_none(
admin._insert_user(
......@@ -426,7 +394,6 @@ def test_update_user_success_email():
entity=None)
@with_setup(setup, teardown)
def test_update_user_success_entity():
assert_is_not_none(
admin._insert_user(
......@@ -439,7 +406,6 @@ def test_update_user_success_entity():
status=None, email=None, entity="21")
@with_setup(setup, teardown)
def test_update_user_success_password():
assert_is_not_none(
admin._insert_user(
......@@ -457,7 +423,6 @@ def test_update_user_success_password():
entity=None)
@with_setup(setup, teardown)
def test_update_user_failure_permissions_status():
assert_is_not_none(
admin._insert_user(
......@@ -480,7 +445,6 @@ def test_update_user_failure_permissions_status():
"You are not permitted to update this user.")
@with_setup(setup, teardown)
def test_update_user_failure_permissions_email():
assert_is_not_none(
admin._insert_user(
......@@ -503,7 +467,6 @@ def test_update_user_failure_permissions_email():
"You are not permitted to update this user.")
@with_setup(setup, teardown)
def test_update_user_failure_permissions_entity():
assert_is_not_none(
admin._insert_user(
......@@ -526,7 +489,6 @@ def test_update_user_failure_permissions_entity():
"You are not permitted to update this user.")
@with_setup(setup, teardown)
def test_update_user_failure_permissions_password():
assert_is_not_none(
admin._insert_user(
......@@ -549,7 +511,6 @@ def test_update_user_failure_permissions_password():
"You are not permitted to update this user.")
@with_setup(setup, teardown)
def test_update_user_failure_non_existing_user():
with assert_raises(TransactionError) as cm:
admin._update_user(
......@@ -562,7 +523,6 @@ def test_update_user_failure_non_existing_user():
assert_equal(cm.exception.msg, "User does not exist.")
@with_setup(setup, teardown)
def test_update_user_failure_non_existing_entity():
assert_is_not_none(
admin._insert_user(
......@@ -582,13 +542,11 @@ def test_update_user_failure_non_existing_entity():
assert_equal(cm.exception.msg, "Entity does not exist.")
@with_setup(setup, teardown)
def test_retrieve_user_success():
test_insert_user_success()
assert_is_not_none(admin._retrieve_user(realm=None, name=test_user + "2"))
@with_setup(setup, teardown)
def test_retrieve_user_failure_permissions():
test_insert_user_success()
switch_to_normal_user()
......@@ -599,14 +557,12 @@ def test_retrieve_user_failure_permissions():
"You are not permitted to retrieve this user.")
@with_setup(setup, teardown)
def test_retrieve_user_failure_non_existing():
with assert_raises(TransactionError) as cm:
admin._retrieve_user(realm=None, name="non_existing")
assert_equal(cm.exception.msg, "User does not exist.")
@with_setup(setup, teardown)
def test_login_with_inactive_user_failure():
assert_is_not_none(
admin._insert_user(
......
......@@ -48,10 +48,9 @@ file_path = "testfile.dat"
def setup():
try:
db.execute_query("FIND Test*").delete()
except Exception as e:
print(e)
d = db.execute_query("FIND ENTITY WITH ID > 99")
if len(d) > 0:
d.delete()
db.RecordType(name=recty_name).insert()
db.Record(name=rec_name).add_parent(name=recty_name).insert()
db.File(name=file_name, file=file_path, path="testfile.dat").insert()
......
......@@ -5,6 +5,8 @@
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2020 IndiScale GmbH <info@indiscale.com>
# Copyright (C) 2020 Timm Fitschen <t.fitschen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -21,39 +23,44 @@
#
# ** end header
#
"""Created on 20.01.2015.
@author: tf
"""
import os
import time
from sys import hexversion
from urllib.parse import urlparse
from http.client import HTTPSConnection
import ssl
from subprocess import call, check_output
from subprocess import call
from lxml import etree
from pytest import skip
from pytest import raises, mark
from caosdb.exceptions import LoginFailedException
import caosdb as h
from nose.tools import (assert_false, assert_true, assert_is_none,
assert_raises, assert_equal, assert_is_not_none,
nottest, with_setup)
from caosdb.connection.connection import _Connection
import caosdb as db
from .test_server_side_scripting import request
_USED_OTA_TOKEN = set()
def setup():
try:
h.execute_query("FIND Test*").delete()
except Exception as e:
print(e)
db.configure_connection()
# deactivate anonymous user
db.administration.set_server_property("AUTH_OPTIONAL", "FALSE")
d = db.execute_query("FIND Test*")
if len(d) > 0:
d.delete()
def teardown():
setup()
@mark.skipif(
not db.get_config().has_option("Connection", "password_method")
or not db.get_config().get("Connection", "password_method") == "pass",
reason="password_method is not pass")
def test_pass():
if not h.get_config().has_option("Connection", "password_method") or not h.get_config(
).get("Connection", "password_method") == "pass":
skip()
assert call(["pass", h.get_config().get("Connection",
assert call(["pass", db.get_config().get("Connection",
"password_identifier")]) == 0
......@@ -65,9 +72,9 @@ def test_https_support():
context.verify_mode = ssl.CERT_REQUIRED
if hasattr(context, "check_hostname"):
context.check_hostname = True
context.load_verify_locations(h.get_config().get("Connection", "cacert"))
context.load_verify_locations(db.get_config().get("Connection", "cacert"))
url = h.get_config().get("Connection", "url")
url = db.get_config().get("Connection", "url")
fullurl = urlparse(url)
http_con = HTTPSConnection(
......@@ -79,27 +86,35 @@ def test_https_support():
def test_login_via_post_form_data_failure():
with assert_raises(LoginFailedException) as cm:
h.get_connection().post_form_data(
with raises(LoginFailedException):
db.get_connection().post_form_data(
"login", {
"username": h.get_config().get("Connection", "username"),
"username": db.get_config().get("Connection", "username"),
"password": "wrongpassphrase"
})
def test_anonymous_not_returning_auth_token():
# activate anonymous user
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
response = request(method="GET", headers={}, path="Entity")
assert response.getheader("Set-Cookie") is None # no auth token returned
def test_anonymous_setter():
""" this test verifies that the "test_login_while_anonymous_is_active" is
"""This test verifies that the "test_login_while_anonymous_is_active" is
effective."""
# activate anonymous user
h.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
# connect without auth-token
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(h.get_config().get("Connection", "cacert"))
context.load_verify_locations(db.get_config().get("Connection", "cacert"))
url = h.get_config().get("Connection", "url")
url = db.get_config().get("Connection", "url")
fullurl = urlparse(url)
http_con = HTTPSConnection(
......@@ -113,15 +128,14 @@ def test_anonymous_setter():
assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
@with_setup(setup, setup)
def test_login_while_anonymous_is_active():
# activate anonymous user
h.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
# logout
h.get_connection()._logout()
db.get_connection()._logout()
body = h.get_connection().retrieve(
body = db.get_connection().retrieve(
entity_uri_segments=["Entity"],
reconnect=True).read()
xml = etree.fromstring(body)
......@@ -129,3 +143,176 @@ def test_login_while_anonymous_is_active():
# pylib did the login even though the anonymous user is active
assert xml.xpath(
"/Response/UserInfo/Roles/Role")[0].text == "administration"
def test_authtoken_config():
assert db.administration.get_server_property(
"AUTHTOKEN_CONFIG") == "conf/core/authtoken.example.yaml"
def get_one_time_token(testcase):
username = "anonymous"
realm = "OneTimeAuthenticationToken"
roles = ["administration"]
permissions = []
filename = db.get_config().get("IntegrationTests",
"test_authentication.{}".format(testcase))
assert os.path.isdir(os.path.split(filename)[0])
assert os.path.isfile(filename)
with open(filename, "r") as f:
auth_token = f.read()
while auth_token in _USED_OTA_TOKEN:
# wait until the server has renewed the token
time.sleep(1)
with open(filename, "r") as f:
auth_token = f.read()
else:
_USED_OTA_TOKEN.add(auth_token)
assert auth_token.startswith('["O","{re}","{us}",["{rol}"],{perm},'.format(
re=realm, us=username, rol=",".join(roles), perm=permissions))
return auth_token
def test_one_time_token():
assert db.Info().user_info.roles == ["administration"]
assert db.Info().user_info.name == db.get_config().get("Connection", "username")
auth_token = get_one_time_token("admin_token_crud")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.Info().user_info.roles == ["administration"]
assert db.Info().user_info.name == "anonymous"
assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
db.configure_connection()
assert db.Info().user_info.roles == ["administration"]
assert db.Info().user_info.name == db.get_config().get("Connection",
"username")
def test_one_time_token_invalid():
auth_token = get_one_time_token("admin_token_crud")
auth_token = auth_token.replace("[]", '["permission"]')
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
# also raises exception when anonymous is enabled
db.configure_connection()
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
def test_one_time_token_expired():
auth_token = get_one_time_token("admin_token_expired")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
# also raises exception when anonymous is enabled
db.configure_connection()
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
def test_one_time_token_3_attempts():
auth_token = get_one_time_token("admin_token_3_attempts")
# 1st
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
assert db.Info().user_info.roles == ["administration"]
assert db.get_connection()._authenticator.auth_token != auth_token
assert db.Info().user_info.name == "anonymous"
assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
# 2nd
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
assert db.Info().user_info.roles == ["administration"]
assert db.get_connection()._authenticator.auth_token != auth_token
assert db.Info().user_info.name == "anonymous"
assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
# 3rd
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
assert db.Info().user_info.roles == ["administration"]
assert db.get_connection()._authenticator.auth_token != auth_token
assert db.Info().user_info.name == "anonymous"
assert db.Info().user_info.realm == "OneTimeAuthenticationToken"
# 4th
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
db.configure_connection()
db.administration.set_server_property("AUTH_OPTIONAL", "TRUE")
# 5th attempt, also raises error when anonymous user is enabled
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
assert db.get_connection()._authenticator.auth_token == auth_token
with raises(db.LoginFailedException) as lfe:
db.Info()
assert lfe.value.args[0] == (
"The authentication token is expired or you have been logged out otherwise. The auth_token "
"authenticator cannot log in again. You must provide a new authentication token.")
def test_crud_with_one_time_token():
auth_token = get_one_time_token("admin_token_crud")
db.configure_connection(password_method="auth_token",
auth_token=auth_token)
# CREATE
rt = db.RecordType(name="TestRT")
rt.insert()
assert rt.id == db.execute_query("FIND TestRT", unique=True).id
# UPDATE
assert db.execute_query("FIND TestRT", unique=True).description is None
rt.description = "new desc"
rt.update()
assert rt.description == db.execute_query(
"FIND TestRT", unique=True).description
# RETRIEVE
rt.retrieve()
assert rt.id == db.execute_query("FIND TestRT", unique=True).id
rt.delete()
assert len(db.execute_query("FIND TestRT")) == 0
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (c) 2020 IndiScale GmbH <www.indiscale.com>
# Copyright (c) 2020 Timm Fitschen <t.fitschen@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
......@@ -21,60 +20,44 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""server_side_script.py.
An example which implements a minimal server-side script.
1) This script expects to find a *.txt file in the .upload_files dir which is
printed to stdout.
import pytest
2) It executes a "Count stars" query and prints the result to stdout.
import caosdb as db
from caosdb.common.utils import xml2str
3) It will return with code 0 if everything is ok, or with any code that is
specified with the commandline option --exit
"""
import sys
from os import listdir
from caosdb import configure_connection, execute_query
def setup_module():
d = db.execute_query("FIND Test*")
if len(d) > 0:
d.delete()
# parse --auth-token option and configure connection
CODE = 0
QUERY = "COUNT stars"
for arg in sys.argv:
if arg.startswith("--auth-token="):
auth_token = arg[13:]
configure_connection(auth_token=auth_token)
if arg.startswith("--exit="):
CODE = int(arg[7:])
if arg.startswith("--query="):
QUERY = arg[8:]
def setup():
setup_module()
############################################################
# 1 # find and print *.txt file ############################
############################################################
def teardown():
setup_module()
try:
for fname in listdir(".upload_files"):
if fname.endswith(".txt"):
with open(".upload_files/{}".format(fname)) as f:
print(f.read())
except FileNotFoundError:
pass
@pytest.mark.xfail(reason="""see pylib issue #31""")
def test_add_properties_with_wrong_role():
p = db.Property(name="TestProperty1", datatype=db.TEXT).insert()
rt = db.RecordType(name="TestRT1").add_property("TestProperty1").insert()
############################################################
# 2 # query "COUNT stars" ##################################
############################################################
wrong_role = db.Record(name="TestRT1").retrieve()
right_role = db.RecordType(name="TestRT1").retrieve()
rec = db.Record(name="fail").add_property(wrong_role)
rec2 = db.Record(name="ok").add_property(right_role)
RESULT = execute_query(QUERY)
print(RESULT)
assert wrong_role.get_property("TestProperty1") is not None
assert str(wrong_role.get_property("TestProperty1")) == str(
right_role.get_property("TestProperty1"))
############################################################
# 3 ########################################################
############################################################
xml = rec2.to_xml()
assert not xml.xpath("/Record/Property/Property")
sys.exit(CODE)
xml = rec.to_xml()
# TODO this fails because the Record is treated differently than the
# RecordType.
assert not xml.xpath("/Record/Property/Property")
......@@ -26,33 +26,66 @@
Integration tests for the implementation of the server-side-scripting api.
"""
from __future__ import print_function, unicode_literals
from pytest import raises, mark
import os
from pytest import raises
import json
from lxml import etree
from caosdb import get_connection, get_config
from http.client import HTTPSConnection
import ssl
from caosdb import get_connection, get_config, Info, execute_query, RecordType
from caosdb.exceptions import (EntityDoesNotExistError, ClientErrorException)
from caosdb.connection.encode import MultipartParam, multipart_encode
from caosdb.connection.utils import urlencode, urlparse
from caosdb import administration as admin
from caosdb.utils.server_side_scripting import run_server_side_script
_TEST_SCRIPTS = ["not_executable", "ok", "err", "simple_script.py"]
_SERVER_SIDE_SCRIPTING_BIN_DIR = get_config().get(
_TEST_SCRIPTS = ["not_executable", "ok", "err", "ok_anonymous"]
_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL = get_config().get(
"IntegrationTests",
"test_server_side_scripting.bin_dir.local")
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER = get_config().get(
"IntegrationTests",
"test_server_side_scripting.bin_dir")
"test_server_side_scripting.bin_dir.server")
_TEST_SCRIPTS_DIR = "./resources/"
_REMOVE_FILES_AFTERWARDS = []
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR = ""
def clean_database():
admin._set_permissions("anonymous", [])
d = execute_query("FIND ENTITY WITH ID > 99")
if len(d) > 0:
d.delete()
def setup():
clean_database()
def teardown():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIR",
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR)
clean_database()
def setup_module():
global _ORIGINAL_SERVER_SCRIPTING_BIN_DIR
_ORIGINAL_SERVER_SCRIPTING_BIN_DIR = admin.get_server_property(
"SERVER_SIDE_SCRIPTING_BIN_DIR")
clean_database()
from os import makedirs
from os.path import join, isdir, exists
from shutil import copyfile, copymode
print("bin: " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR))
print("bin dir (local): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL))
print("bin dir (server): " + str(_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER))
print("tests scripts: " + str(_TEST_SCRIPTS))
if not exists(_SERVER_SIDE_SCRIPTING_BIN_DIR):
makedirs(_SERVER_SIDE_SCRIPTING_BIN_DIR)
_REMOVE_FILES_AFTERWARDS.append(_SERVER_SIDE_SCRIPTING_BIN_DIR)
assert isdir(_SERVER_SIDE_SCRIPTING_BIN_DIR)
if not exists(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL):
makedirs(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
_REMOVE_FILES_AFTERWARDS.append(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
assert isdir(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL)
for script_file in _TEST_SCRIPTS:
target = join(_SERVER_SIDE_SCRIPTING_BIN_DIR, script_file)
target = join(_SERVER_SIDE_SCRIPTING_BIN_DIR_LOCAL, script_file)
src = join(_TEST_SCRIPTS_DIR, script_file)
copyfile(src, target)
copymode(src, target)
......@@ -69,6 +102,7 @@ def teardown_module():
rmtree(obsolete)
else:
remove(obsolete)
clean_database()
def test_call_script_non_existing():
......@@ -79,15 +113,8 @@ def test_call_script_non_existing():
def test_call_script_not_executable():
props = admin.get_server_properties()
print(props)
import os
dirpath = os.getcwd()
print("PWD={}".format(dirpath))
print("ls ./\n{}".format(os.listdir("./")))
print("ls ../\n{}".format(os.listdir("../")))
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIR",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "not_executable"
with raises(ClientErrorException) as exc_info:
......@@ -96,6 +123,8 @@ def test_call_script_not_executable():
def test_call_ok():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIR",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "ok"
r = get_connection().post_form_data("scripting", form)
......@@ -107,6 +136,8 @@ def test_call_ok():
def test_call_err():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIR",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "err"
r = get_connection().post_form_data("scripting", form)
......@@ -117,29 +148,234 @@ def test_call_err():
assert xml.xpath("/Response/script/stderr")[0].text == "err"
@mark.skip(reason="need to setup .pycaosdb.ini in home dirs of sss within docker")
def test_simple_sss():
def test_run_server_side_script_with_file_as_positional_param():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
response = run_server_side_script("administration/diagnostics.py",
"pos0",
"pos1",
exit="123",
query="COUNT TestRT",
files={"-p2": "test_file.txt"})
assert response.stderr is None
assert response.code == 123
assert response.call == ('administration/diagnostics.py '
'--exit=123 --query=COUNT TestRT '
'pos0 pos1 .upload_files/test_file.txt')
json_data = json.loads(response.stdout)
assert "caosdb" in json_data
assert "query" in json_data["caosdb"]
assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"]
assert "./.upload_files/test_file.txt" in json_data["files"]
def test_run_server_side_script_with_additional_file():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
response = run_server_side_script("administration/diagnostics.py",
"pos0",
"pos1",
exit="123",
query="COUNT TestRT",
files={"dummykey": "test_file.txt"})
assert response.stderr is None
assert response.code == 123
assert response.call == ('administration/diagnostics.py '
'--exit=123 --query=COUNT TestRT '
'pos0 pos1')
json_data = json.loads(response.stdout)
assert json_data["caosdb"]["query"] == ["COUNT TestRT", "1"]
assert "./.upload_files/test_file.txt" in json_data["files"]
def test_diagnostics_basic():
RecordType("TestRT").insert()
form = dict()
form["call"] = "simple_script.py"
form["call"] = "administration/diagnostics.py"
form["-Oexit"] = "123"
r = get_connection().post_form_data("scripting", form)
xml = etree.parse(r)
form["-Oquery"] = "COUNT TestRT"
response = get_connection().post_form_data("scripting", form)
xml = etree.parse(response)
print(etree.tostring(xml))
assert xml.xpath("/Response/script/@code")[0] == "123"
assert response.status == 200 # ok
assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
EXC_ERR = ("There shouldn't be any exception during the diagnostics of "
"the interaction of the server-side script and the server.")
assert "exception" not in diagnostics["caosdb"], EXC_ERR
assert "query" in diagnostics["caosdb"]
assert diagnostics["caosdb"]["query"][0] == "COUNT TestRT"
assert diagnostics["caosdb"]["query"][1] == "1", ("The RecordType should "
"have been found.")
assert xml.xpath("/Response/script/@code")[0] == "123", ("The script "
"should exit "
"with code 123.")
assert xml.xpath("/Response/script/call")[0].text.startswith(
"administration/diagnostics.py")
assert xml.xpath("/Response/script/stderr")[0].text is None
def test_diagnostics_with_file_upload():
RecordType("TestRT").insert()
_REMOVE_FILES_AFTERWARDS.append("test_file.txt")
with open("test_file.txt", "w") as f:
f.write("this is a test")
parts = []
parts.append(MultipartParam.from_file(paramname="txt_file",
filename="test_file.txt"))
parts.append(MultipartParam("call", "simple_script.py"))
parts.append(MultipartParam("call", "administration/diagnostics.py"))
body, headers = multipart_encode(parts)
r = get_connection().insert(["scripting"], body=body, headers=headers)
xml = etree.parse(r)
print(etree.tostring(xml).decode("utf-8"))
response = get_connection().insert(
["scripting"], body=body, headers=headers)
assert response.status == 200 # ok
assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
xml = etree.parse(response)
print(etree.tostring(xml))
assert xml.xpath("/Response/script/@code")[0] == "0"
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
assert "exception" not in diagnostics["caosdb"], ("There shouldn't be any "
"exception during the "
"diagnostics of the "
"interaction of the "
"server-side "
"script and the server.")
assert xml.xpath("/Response/script/@code")[0] == "0", ("The script "
"should exit "
"with code 0.")
assert xml.xpath("/Response/script/call")[0].text.startswith(
"administration/diagnostics.py")
assert xml.xpath("/Response/script/stderr")[0].text is None
def test_call_as_anonymous_with_administration_role():
assert Info().user_info.roles == ["administration"]
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# give permission to call diagnostics.py
admin._set_permissions(
"anonymous", [
admin.PermissionRule(
"Grant", "SCRIPTING:EXECUTE:administration:diagnostics.py")])
form = dict()
form["call"] = "administration/diagnostics.py"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
xml = etree.parse(response)
assert response.getheader("Set-Cookie") is None # no auth token returned
assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
assert response.status == 200 # ok
assert xml.xpath("/Response/script/@code")[0] == "0"
assert xml.xpath(
"/Response/script/call")[0].text.startswith("simple_script.py")
assert "this is a test" in xml.xpath("/Response/script/stdout")[0].text
diagnostics = xml.xpath("/Response/script/stdout")[0].text
assert diagnostics is not None
diagnostics = json.loads(diagnostics)
assert diagnostics["python_version"] is not None
assert diagnostics["import"]["caosdb"][0] is True, ("caosdb not installed"
" in server's python path")
assert diagnostics["auth_token"] is not None
assert "exception" not in diagnostics["caosdb"]
def request(method, headers, path, body=None):
""" Connect without auth-token. This is clumsy, bc the pylib is not
intended to be used as anonymous user.
"""
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_REQUIRED
if hasattr(context, "check_hostname"):
context.check_hostname = True
context.load_verify_locations(get_config().get("Connection", "cacert"))
url = get_config().get("Connection", "url")
fullurl = urlparse(url)
http_con = HTTPSConnection(
str(fullurl.netloc), timeout=200, context=context)
http_con.request(method=method, headers=headers, url=str(fullurl.path) +
path, body=body)
return http_con.getresponse()
def test_anonymous_script_calling_not_permitted():
form = dict()
form["call"] = "ok"
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# remove all anonymous permissions
admin._set_permissions("anonymous", [
admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok"),
admin.PermissionRule("Deny", "*")
])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
assert response.status == 403 # forbidden
assert response.getheader("Set-Cookie") is None # no auth token returned
def test_anonymous_script_calling_success():
admin.set_server_property("SERVER_SIDE_SCRIPTING_BIN_DIR",
_SERVER_SIDE_SCRIPTING_BIN_DIR_SERVER)
form = dict()
form["call"] = "ok_anonymous"
# activate anonymous user
admin.set_server_property("AUTH_OPTIONAL", "TRUE")
# give permission to call ok_anonymous
admin._set_permissions("anonymous",
[admin.PermissionRule("Grant", "SCRIPTING:EXECUTE:ok_anonymous")])
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request(method="POST", headers=headers, path="scripting",
body=urlencode(form))
assert response.status == 200 # ok
assert response.getheader("Set-Cookie") is None # no auth token returned
assert response.getheader("Content-Type") == 'text/xml; charset=UTF-8'
body = response.read()
xml = etree.fromstring(body)
# verify unauthenticated call to script ok_anonymous
assert xml.xpath("/Response/UserInfo/Roles/Role")[0].text == "anonymous"
assert xml.xpath("/Response/script/call")[0].text == "ok_anonymous"
assert xml.xpath("/Response/script/stdout")[0].text == "ok_anonymous"
assert xml.xpath("/Response/script/stderr")[0].text is None
assert xml.xpath("/Response/script/@code")[0] == "0"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment