Skip to content
Snippets Groups Projects
test_sss_helper.py 4.11 KiB
Newer Older
Timm Fitschen's avatar
Timm Fitschen committed
import subprocess
from email import message_from_file, policy
from os import listdir, remove
from os.path import abspath, dirname, exists, isfile, join

import caosdb as db
from caosadvancedtools.serverside.helper import (NameCollector, get_data,
                                                 get_file_via_download,
                                                 init_data_model,
                                                 parse_arguments, send_mail)
from caosdb import RecordType, configure_connection, get_config
from caosdb.connection.mockup import MockUpResponse, MockUpServerConnection
Timm Fitschen's avatar
Timm Fitschen committed
from pytest import mark, raises


def get_data_example():
    return abspath(join(dirname(__file__), "sss_helper_example_data.json"))


def setup_module():
    connection = configure_connection(url="unittests", username="testuser",
                                      password_method="plain",
                                      password="testpassword", timeout=200,
                                      implementation=MockUpServerConnection)
    entities = '<Response><RecordType name="Test" id="1234"/></Response>'
    connection._delegate_connection.resources.append(
        lambda **kwargs: MockUpResponse(200, {}, entities))


def teardown_module():
    for m in get_tmp_mails():
        remove(m)


def get_tmp_mails():
    tmpmail = "/tmp/mail"
    if not exists(tmpmail):
        return []
    mails = [join(tmpmail, f) for f in listdir(tmpmail) if isfile(join(tmpmail,
                                                                       f))]
    return mails


def test_parse_arguments():
    args = parse_arguments(["--auth-token", "1234ABCD", "test.json"])
    assert args.filename == "test.json"
    assert args.auth_token == "1234ABCD"


def test_get_data():
    data = get_data(filename=get_data_example())
    # default={"test": "bla"})
    assert data["box"] == 2345

    data = get_data(filename=get_data_example(),
                    default={"test": "bla", "comment": "no comment"})
    assert data["box"] == 2345
    assert data["test"] == "bla"
    assert data["comment"] == "this is a comment"


def test_init_data_model():
    rt = RecordType(name="Test")
    assert rt.id is None
    init_data_model([rt])
    assert rt.id == 1234


Timm Fitschen's avatar
Timm Fitschen committed
@mark.skipif("Misc" not in get_config() or
             "sendmail" not in get_config()["Misc"],
             reason="sendmail client not defined")
def test_send_mail():
    assert len(get_tmp_mails()) == 0
    send_mail("me@example.com", "you@example.com", "the subject", "hello!")
    mails = get_tmp_mails()
    assert len(mails) == 1

    with open(mails[0], "r") as f:
        msg = message_from_file(f, policy=policy.SMTP)

    assert msg["From"] == "me@example.com"
    assert msg["To"] == "you@example.com"
    assert msg["Subject"] == "the subject"
    assert msg.get_content() == "hello!\n"
Timm Fitschen's avatar
Timm Fitschen committed


def test_send_mail_error():
    with raises(subprocess.CalledProcessError):
        send_mail("me@example.com", "you@example.com", "the subject", "hello!",
                  send_mail_bin="/bin/cat")


def test_get_file_via_download():
    class DummyFile():
        size = 5
        id = 5
    tmp = DummyFile()
    tmp.size = 0
    assert isinstance(get_file_via_download(tmp), str)

    # TODO test whether something ends up in the logger
    class Inconsistent(DummyFile):
        def download(*args, **kwargs):
            raise db.ConsistencyError()
    with raises(db.ConsistencyError):
        get_file_via_download(Inconsistent())

    # TODO test whether something ends up in the logger
    class NotThere(DummyFile):
        def download(*args, **kwargs):
            raise db.CaosDBException()
    with raises(db.CaosDBException):
        get_file_via_download(Inconsistent())


def test_get_unique_savename():
    nc = NameCollector()
    assert nc.get_unique_savename("ha") == "ha"
    assert nc.get_unique_savename("ho") == "ho"
    assert nc.get_unique_savename("ho") == "ho_2"
    assert nc.get_unique_savename("ha") == "ha_2"
    assert nc.get_unique_savename("ha") == "ha_3"
    assert nc.get_unique_savename("hi") == "hi"
    # check reset
    nc = NameCollector()
    assert nc.get_unique_savename("ha") == "ha"