Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_issues_pylib.py 8.35 KiB
# -*- coding: utf-8 -*-
# This file is a part of the LinkAhead Project.
#
# Copyright (c) 2023 IndiScale GmbH <info@indiscale.com>
# Copyright (c) 2023 Daniel Hornung <d.hornung@indiscale.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Tests for issues on gitlab.com, project linkahead-pylib.

Tests should be named:

    ``test_gitlab_com_{issue_id}``
"""

import math
import os
import tempfile
import time
import warnings
from pathlib import Path

import linkahead as db
import linkahead.common.utils
import pytest

from linkahead import administration as admin
from linkahead.exceptions import (TransactionError, HTTPClientError)
from linkahead.apiutils import compare_entities, empty_diff, merge_entities

CURATOR_ROLE = "curator"


def setup_module():
    db.configure_connection()
    try:
        db.execute_query("FIND ENTITY WITH ID > 99").delete()
    except Exception as delete_exc:
        print(delete_exc)
    try:
        admin._delete_user("TestUser")
    except Exception as delete_exc:
        print(delete_exc)
    try:
        admin._delete_role(CURATOR_ROLE)
    except Exception as delete_exc:
        print(delete_exc)


def setup_function(function):
    """No setup required."""
    setup_module()


def teardown_function(function):
    """Deleting entities again."""
    setup_module()

# ########################### Issue tests start here #####################


def test_gitlab_com_89():
    """
    Test that after retrieving an entity from the server, generating an xml
    string and subsequently recreating the container from xml does not
    generate any errors.

    See https://gitlab.com/linkahead/linkahead-pylib/-/issues/89 and
    https://gitlab.indiscale.com/caosdb/customers/f-fit/management/-/issues/81
    """
    # We need a container generated with data from the server
    rt = db.RecordType(name="TestType")
    rt.insert()
    container = db.execute_query("FIND RECORDTYPE *")

    # With this container, to_xml, xml2str, and from_xml should not generate
    # warnings - the simplefilter means that any warning fails the test
    with warnings.catch_warnings():
        warnings.simplefilter("error")
        xml_str = linkahead.common.utils.xml2str(container.to_xml())
        cont_from_xml = db.Container.from_xml(xml_str)
        assert len(cont_from_xml) == len(container)
        assert cont_from_xml[0].name == rt.name


def test_gitlab_com_103():
    """
    Test that after causing an UriTooLong error the used auth_token
    is still valid.

    See https://gitlab.com/linkahead/linkahead-pylib/-/issues/103 and
    https://gitlab.indiscale.com/caosdb/customers/f-fit/management/-/issues/82
    """
    # Configure connection to use auth_token
    auth_token = db.get_connection()._authenticator.auth_token
    db.configure_connection(auth_token=auth_token)

    # Trigger UriTooLong error and check correct error thrown
    c = db.Container()
    c.extend([db.Record(id=i) for i in range(1000, 5000)])
    with pytest.raises(db.exceptions.TransactionError) as te:
        c.retrieve()
        assert "authentication token" not in str(te)


# @pytest.mark.xfail(reason="Entities with many, long, properties: "
#                    "https://gitlab.com/linkahead/linkahead-pylib/-/issues/108")
def test_gitlab_com_108():
    """Create RT and a list of properties, then insert and retrieve.

    This is another instance of bugs caused by caosdb/src/caosdb-mysqlbackend#48, but was originally
    reported as https://gitlab.com/linkahead/linkahead-pylib/-/issues/108
    """
    cont = db.Container()
    long = "Long" * 50
    first_RT = db.RecordType(name="TestRecord_first")
    for index in range(20):
        this_RT = db.RecordType(name=f"TestRecord_{long}_{index:02d}")
        first_RT.add_property(this_RT)
        cont.append(this_RT)
    cont.append(first_RT)
    cont.insert()
    cont.retrieve()
    print("retrieved")

    # Incidentally, the following lines seem to trigger another, unrelated problem
    tests = db.execute_query("FIND ENTITY test*", cache=False)
    tests.delete()
    print("deleted")


def test_gitlab_com_119():
    """
    Test that merge_entities works on properties with id but no name.

    See https://gitlab.com/linkahead/linkahead-pylib/-/issues/119 and
    https://gitlab.indiscale.com/caosdb/customers/f-fit/management/-/issues/94
    """
    prop = db.Property(name="Test", datatype=db.TEXT).insert()
    rt = db.RecordType(name="TestRT").insert()

    rec1 = db.Record(name="TestRec").add_parent(rt)
    rec2 = db.Record(name="TestRec").add_parent(rt)
    rec1.add_property(id=prop.id, value="something")

    # Ensure rec1 has prop, rec2 does not
    assert not empty_diff(rec1, rec2)
    assert len(rec2.properties) == 0
    diff1, diff2 = compare_entities(rec1, rec2)
    assert prop.id in diff1["properties"]
    assert None not in diff1["properties"]
    assert len(diff2["properties"]) == 0

    # Merge and check rec2 now has prop
    merge_entities(rec2, rec1)
    assert rec2.get_property(prop) is not None
    assert empty_diff(rec1.get_property(prop), rec2.get_property(prop))


def test_gitlab_com_120():
    """Test that an update doesn't add unwanted subproperties.

    See https://gitlab.com/linkahead/linkahead-pylib/-/issues/120.

    """
    rt1 = db.RecordType(name="TestType1")
    rt2 = db.RecordType(name="TestType2")
    prop = db.Property(name="TestProp", datatype=db.TEXT)
    rt2.add_property(prop)
    rt1.add_property(rt2)

    # no subproperties in rt1's rt2 property:
    assert len(rt1.get_property(rt2.name).properties) == 0

    db.Container().extend([rt1, rt2, prop]).insert()

    rt1_retrieved = db.RecordType(id=rt1.id).retrieve()
    # Also no subproperties after retrieval
    assert len(rt1_retrieved.get_property(rt2.name).properties) == 0

    new_prop = db.Property(name="TestPropNew", datatype=db.INTEGER).insert()
    rt1_retrieved.add_property(new_prop)
    # Still no subproperties
    assert len(rt1_retrieved.get_property(rt2.name).properties) == 0

    rt1_retrieved.update()
    # The update and addition of a new property must not change this, either.
    assert len(rt1_retrieved.get_property(rt2.name).properties) == 0


def test_gitlab_com_127():
    """
    Test that the timeout option in pylinkahead.ini accepts separate
    connect/read timeouts and timeout None.

    See https://gitlab.com/linkahead/linkahead-pylib/-/issues/127 and
    https://gitlab.indiscale.com/caosdb/customers/f-fit/management/-/issues/93
    """
    # Setup paths and save previous timeout
    base_dir = Path(__file__).parent.parent
    temp_pylinkahead_path = base_dir/'.pyla-temp-test.ini'
    temp_pylinkahead_path.unlink(True)
    prev_timeout = None
    if db.get_config().has_option('Connection', 'timeout'):
        prev_timeout = db.get_config().get('Connection', 'timeout')

    # Parse various timeout strings and check successful connect
    valid_timeout_strings = ["timeout = None", "timeout=null",
                             "timeout = (4, 40)", "timeout=(4,4)",
                             "timeout = (4, None)", "timeout= ( null , 4 )"]
    for timeout_string in valid_timeout_strings:
        try:
            # Create temporary config with timeout option
            with open(temp_pylinkahead_path, "x") as temp_pylinkahead:
                temp_pylinkahead.write('[Connection]\n')
                temp_pylinkahead.write(timeout_string)
            # Parse temporary config and check successful connect
            db.get_config().read(str(temp_pylinkahead_path))
            db.configure_connection()
            assert 'Connection to' in str(db.Info())
        finally:
            # Delete temporary config
            temp_pylinkahead_path.unlink()

    # Reset configuration
    db.get_config().remove_option('Connection', 'timeout')
    if prev_timeout is not None:
        db.get_config().set('Connection', 'timeout', prev_timeout)
    db.configure_connection()