Skip to content
Snippets Groups Projects
Select Git revision
  • 2690236e46fb27fab4925148730bbac058810373
  • main default protected
  • f-prefill
  • dev protected
  • f-fix-accent-sensitivity
  • f-filesystem-import
  • f-update-acl
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-string-ids
  • f-filesystem-main
  • f-multipart-encoding
  • f-trigger-advanced-user-tools
  • f-real-rename-test-pylibsolo2
  • f-real-rename-test-pylibsolo
  • f-real-rename-test
  • f-linkahead-rename
  • f-reference-record
  • f-xml-serialization
  • linkahead-pylib-v0.18.0
  • linkahead-control-v0.16.0
  • linkahead-pylib-v0.17.0
  • linkahead-mariadbbackend-v8.0.0
  • linkahead-server-v0.13.0
  • caosdb-pylib-v0.15.0
  • caosdb-pylib-v0.14.0
  • caosdb-pylib-v0.13.2
  • caosdb-server-v0.12.1
  • caosdb-pylib-v0.13.1
  • caosdb-pylib-v0.12.0
  • caosdb-server-v0.10.0
  • caosdb-pylib-v0.11.1
  • caosdb-pylib-v0.11.0
  • caosdb-server-v0.9.0
  • caosdb-pylib-v0.10.0
  • caosdb-server-v0.8.1
  • caosdb-pylib-v0.8.0
  • caosdb-server-v0.8.0
  • caosdb-pylib-v0.7.2
41 results

test_file.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_file.py 24.26 KiB
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    # Copyright (C) 2019 IndiScale GmbH (info@indiscale.com)
    # Copyright (C) 2019 Daniel Hornung (d.hornung@indiscale.com)
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    """Created on 18.09.2013.
    
    @author: Timm Fitschen
    """
    import os
    import shutil
    from random import randint
    from sys import maxsize as maxint
    
    from lxml import etree
    from nose.tools import (assert_equal, assert_false,  # @UnresolvedImport
                            assert_is_not_none, assert_raises, assert_true,
                            nottest, with_setup)
    
    from caosdb import Info
    from caosdb import administration as admin
    from caosdb import execute_query, get_config, get_connection
    from caosdb.common import models
    from caosdb.exceptions import EntityError
    from caosdb.utils.checkFileSystemConsistency import runCheck
    
    
    def setup_module():
        teardown()
    
    
    def setup():
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        os.makedirs("testfolder/subfolder")
        with open("testfolder/test1.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        with open("testfolder/subfolder/test2.dat", "w") as upload_file:
            upload_file.write("hello world\n")
    
    
    def teardown():
        try:
            execute_query("FIND ENTITY WHICH HAS AN ID >= 100").delete()
        except Exception as e:
            print(e)
        try:
            shutil.rmtree("testfolder")
        except Exception as e:
            print(e)
        try:
            os.remove("test.dat")
        except Exception as e:
            print(e)
        try:
            os.remove("test2.dat")
        except Exception as e:
            print(e)
    
    
    @with_setup(setup, teardown)
    def test_file_with_space():
        file_ = models.File(name="TestFile",
                            description="Testfile Desc",
                            path="testfiles/test file with spaces.dat",
                            file="test.dat")
        file_.insert()
        qfile = execute_query("FIND FILE TestFile", unique=True)
        assert_equal(qfile.id, file_.id)
        assert_equal(qfile.path, "/testfiles/test file with spaces.dat")
    
        qfile.download("test2.dat")
    
    
    @with_setup(setup, teardown)
    def test_pickup_file():
        try:
            d = models.DropOffBox()
            d.sync()
            try:
                pickup_file = open(os.path.join(d.path, "testpickup.dat"), "w")
            except BaseException:
                print("drop off box not on this system.")
            else:
                pickup_file.write("hello world\n")
                pickup_file.close()
                file_ = models.File(name="PickupTestfile",
                                    description="Pickup test file desc",
                                    path="testfiles/pickuptestfile.dat" +
                                    hex(randint(0, maxint)),
                                    pickup="testpickup.dat")
                file_.insert()
                assert_is_not_none(file_.id)
        finally:
            try:
                file_.delete()
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_pickup_folder():
        try:
            # pickup_folder
            d = models.DropOffBox()
            d.sync()
            try:
                os.mkdir(d.path + "/testfolder")
            except BaseException:
                print("drop off box not on this system.")
            else:
                os.mkdir(d.path + "/testfolder/subfolder")
                pickup_file = open(d.path + "/testfolder/testpickup1.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                pickup_file = open(
                    d.path + "/testfolder/subfolder/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
    
                file_ = models.File(name="PickupTestfolder",
                                    description="Pickup test folder desc",
                                    path="testfiles/pickuptestfolder" +
                                    hex(randint(0, maxint)) + "/",
                                    pickup="testfolder/")
                file_.insert()
        finally:
            try:
                file_.delete()
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_file4():
        try:
            d = models.DropOffBox()
            d.sync()
            try:
                pickup_file = open(d.path + "/testpickup1.dat", "w")
            except BaseException:
                print("drop off box not on this system.")
            else:
                pickup_file.write("hello world\n")
                pickup_file.close()
                pickup_file = open(d.path + "/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                file1_ = models.File(
                    name="Testfile1", description="Testfile Desc",
                    path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
                    pickup="testpickup1.dat")
                file2_ = models.File(
                    name="Testfile2", description="Testfile Desc",
                    path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
                    pickup="testpickup2.dat")
                c = models.Container()
                c.extend([file1_, file2_])
                c.insert()
                assert_is_not_none(file1_.id)
                assert_is_not_none(file2_.id)
        finally:
            try:
                c.delete()
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_upload_complete_folder():
        file1_ = models.File(name="Testfile1",
                             description="Testfile Desc",
                             path="testfiles/testfile1.dat" + hex(randint(0,
                                                                          maxint)),
                             file="test.dat")
        file2_ = models.File(name="Testfile2",
                             description="Testfile Desc",
                             path="testfiles/testfile2.dat" + hex(randint(0,
                                                                          maxint)),
                             file="test.dat")
        c = models.Container()
        c.extend([file1_, file2_])
        c.insert()
        assert_is_not_none(file1_.id)
        assert_is_not_none(file2_.id)
        c.delete()
    
        c = models.Container()
        folder_ = models.File(name="Testfolder",
                              description="Testfolder Desc",
                              path="testfiles/testfolder" + hex(randint(0,
                                                                        maxint)),
                              file="testfolder")
        c.append(folder_)
        c.insert()
        assert_is_not_none(folder_.id)
        c.delete()
    
    
    @with_setup(setup, teardown)
    def test_file6():
        try:
            # upload file to testfiles2/testfile...
            upload_file = open("test.dat", "w")
            upload_file.write("hello world\n")
            upload_file.close()
            file_ = models.File(name="Testfidb.dble",
                                description="Testfile Desc",
                                path="testfiles2/testfile" +
                                hex(randint(0, maxint)) + ".dat",
                                file="test.dat")
            file_.insert()
    
            assert_is_not_none(file_.id)
            assert_true(file_.is_valid())
    
            # pickup_folder
            # and try to store it to testfiles2/
            d = models.DropOffBox()
            d.sync()
            path = d.path + "/testfolder"
    
            if not os.path.isdir(d.path):
                print("drop off box not on this system.")
            else:
                os.mkdir(path)
                pickup_file = open(path + "/testpickup1.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                os.mkdir(path + "/subfolder")
                pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
    
                folder_ = models.File(
                    name="PickupTestfolder",
                    description="Pickup test folder desc",
                    path="testfiles2/",
                    pickup="path")
    
                assert_raises(EntityError, folder_.insert)
        finally:
            try:
                folder_.delete()
            except BaseException:
                pass
            try:
                file_.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_file7():
        try:
            # upload file to testfiles2/testsub/testfile...
            upload_file = open("test.dat", "w")
            upload_file.write("hello world\n")
            upload_file.close()
            file_ = models.File(name="Testfidb.dble",
                                description="Testfile Desc",
                                path="testfiles2/testsub/testfile" +
                                hex(randint(0, maxint)) + ".dat",
                                file="test.dat")
            file_.insert()
    
            assert_is_not_none(file_.id)
            assert_true(file_.is_valid())
    
            # pickup_folder
            # and try to store it to testfiles2/
            d = models.DropOffBox()
            d.sync()
            path = d.path + "/testfolder"
    
            if not os.path.isdir(d.path):
                print("drop off box not on this system.")
            else:
                os.mkdir(path)
                pickup_file = open(path + "/testpickup1.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                os.mkdir(path + "/subfolder")
                pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
    
                folder_ = models.File(
                    name="PickupTestfolder",
                    description="Pickup test folder desc",
                    path="testfiles2/",
                    pickup="path")
    
                assert_raises(EntityError, folder_.insert)
        finally:
            try:
                folder_.delete()
            except BaseException:
                pass
            try:
                file_.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_consistency_file_was_modified():
        try:
    
            # insert new test file
            upload_file = open("test.dat", "w")
            upload_file.write("hello world\n")
            upload_file.close()
            file_ = models.File(name="TestConsistency1",
                                description="Testfile Desc",
                                path="debug/test_file_storage_consistency",
                                file="test.dat")
            file_.insert()
    
            assert_is_not_none(file_.id)
            assert_true(file_.is_valid())
    
            # run consistency check (no consistency warning)
            c = runCheck(None, None)
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(c.messages["Info", 0][0], "File system is consistent.")
    
            # run consistency check with modified warning)
            # TODO fix
            # This smells badly. The argument is meant to transport the location
            # that is checked. Understanding of the code is difficult.
            c = runCheck(None, "-c FILE_WAS_MODIFIED")
            assert_is_not_none(c.messages["Error", 0])
            assert_equal(
                c.messages["Error", 0][0],
                'debug/test_file_storage_consistency: File was modified.')
    
            # download file again and check if it is still the same (just to be
            # sure that the server only simulated the consistency breach
            d = open(file_.download(target="test.dat.tmp"), "r")
    
            r = d.read()
            assert_equal(r, "hello world\n")
    
            # run a passing check again
            c = runCheck(None, None)
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(c.messages["Info", 0][0], "File system is consistent.")
    
        finally:
            # clean up file record
            try:
                file_.delete()
            except BaseException:
                pass
            # clean up local files
            try:
                d.close()
            except BaseException:
                pass
            try:
                os.remove("test.dat")
            except BaseException:
                pass
            try:
                os.remove("test.dat.tmp")
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_consistency_file_does_not_exist():
        try:
            with open("test.dat", "w") as upload_file:
                upload_file.write("hello world\n")
            file_ = models.File(name="TestConsistency1",
                                description="Testfile Desc",
                                path="debug/test_file_storage_consistency",
                                file="test.dat")
            file_.insert()
    
            assert_is_not_none(file_.id)
            assert_true(file_.is_valid())
    
            c = runCheck(None, "/debug/")
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(c.messages["Info", 0][0],
                         "File system below debug/ is consistent.")
    
            # TODO fix
            # This smells badly. The argument is meant to transport the location
            # that is checked. Understanding of the code is difficult.
            c = runCheck(None, "-c FILE_DOES_NOT_EXIST")
            assert_is_not_none(c.messages["Error", 0])
            assert_equal(
                c.messages["Error", 0][0],
                'debug/test_file_storage_consistency: File does not exist.')
    
            d = open(file_.download(target="test.dat.tmp"), "r")
    
            r = d.read()
            assert_equal(r, "hello world\n")
    
            c = runCheck(None, None)
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(c.messages["Info", 0][0], "File system is consistent.")
    
        finally:
            try:
                file_.delete()
            except BaseException:
                pass
            try:
                d.close()
            except BaseException:
                pass
            try:
                os.remove("test.dat")
            except BaseException:
                pass
            try:
                os.remove("test.dat.tmp")
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_consistency_unknown_file():
        c = runCheck(None, None)
        assert_is_not_none(c.messages["Info", 0])
        assert_equal(c.messages["Info", 0][0], "File system is consistent.")
    
        # TODO fix
        # This smells badly. The argument is meant to transport the location
        # that is checked. Understanding of the code is difficult.
        c = runCheck(None, "-c UNKNOWN_FILE")
        assert_is_not_none(c.messages["Warning", 0])
        assert_equal(c.messages["Warning", 0][0], 'debug/: Unknown file.')
    
        c = runCheck(None, None)
        assert_is_not_none(c.messages["Info", 0])
        assert_equal(c.messages["Info", 0][0], "File system is consistent.")
    
    
    @with_setup(setup, teardown)
    def test_insert_files_in_dir_error1():
    
        c = models.Container()
        c.retrieve(
            unique=False,
            raise_exception_on_error=False,
            flags={
                "InsertFilesInDir": "/root"})
        # If this fails, it is likely that the server runs with root permissions
        # which is discouraged.
        assert_is_not_none(c.messages["Error", 0])
        assert_true(c.messages["Error", 0][0].startswith(
            "Cannot read or enter the desired directory."))
    
        c = models.Container()
        c.retrieve(unique=False, raise_exception_on_error=False,
                   flags={"InsertFilesInDir": "non-existent"})
        assert_is_not_none(c.messages["Error", 0])
        assert_true(c.messages["Error", 0][0].startswith("No such directory"))
    
        c.insert(raise_exception_on_error=False, flags={
                 "InsertFilesInDir": "non-existent"})
        assert_is_not_none(c.messages["Error", 0])
        assert_true(c.messages["Error", 0][0].startswith("No such directory"))
    
        c = models.Container()
        c.retrieve(
            unique=False,
            raise_exception_on_error=False,
            flags={
                "InsertFilesInDir": "."})
        assert_is_not_none(c.messages["Error", 0])
        assert_true(c.messages["Error", 0][0].startswith("Dir is not allowed"))
    
    
    @with_setup(setup, teardown)
    def test_insert_files_in_dir_with_symlink():
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
    
            # create file in a server-readable directory
            os.makedirs(path)
            path = os.path.realpath(path) + "/"
            os.makedirs(path + "subfolder/")
    
            test_file = open(path + "subfolder/test2.dat", "w")
            test_file.write("hello world2\n")
            test_file.close()
    
            # create a symlink to the directory where the test_file resides
            os.symlink(path_on_server + "subfolder/", path + "linked_subfolder")
    
            # call insertFilesInDir job
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server +
                    "linked_subfolder"})
            assert_is_not_none(c.messages["Warning", 6])
            assert_equal(c.messages["Warning", 6][0],
                         "Directory or file is symbolic link: " + path_on_server +
                         "linked_subfolder")
    
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "--force-allow-symlinks " +
                    path_on_server +
                    "linked_subfolder"})
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(c.messages["Info", 0][0],
                         "Files count in linked_subfolder/: 1")
            assert_equal(c[0].name, "test2.dat")
            assert_equal(c[0].path, "/linked_subfolder/test2.dat")
    
            d = models.Container()
            d.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "--force-allow-symlinks " +
                    path_on_server +
                    "linked_subfolder"})
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(d.messages["Info", 0][0],
                         "Files count in linked_subfolder/: 1")
            assert_true(d[0].is_valid())
            assert_equal(d[0].name, "test2.dat")
            assert_equal(d[0].path, "/linked_subfolder/test2.dat")
    
        finally:
            try:
                d.delete()
            except BaseException:
                pass
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @with_setup(None, teardown)
    def test_insert_files_in_dir():
        """ test if files in directories can be inserted as symlinks via the
        InsertFilesInDir flag. This tests also verifies that the job can be
        executed for the root directory after the child directory and only the new
        files are being inserted.
        """
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
            os.makedirs(path)
            os.makedirs(path + "subfolder/")
            test_file1 = open(path + "subfolder/test.dat", "w")
            test_file1.write("hello world\n")
            test_file1.close()
    
            test_file2 = open(path + "subfolder/test2.dat", "w")
            test_file2.write("hello world2\n")
            test_file2.close()
    
            # DRY-RUN
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(c.messages["Info", 0][0],
                         "Files count in testfolder/: 2")
    
            # ACTUAL RUN
            d = models.Container()
            d.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(d.messages["Info", 0][0],
                         "Files count in testfolder/: 2")
            assert_true(d[0].is_valid())
    
            # create a new file and call insertFilesInDir again
            test_file3 = open(path + "/test3.dat", "w")
            test_file3.write("hello world3\n")
            test_file3.close()
    
            e = models.Container()
            e.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert_is_not_none(e.messages["Info", 0])
            assert_equal(e.messages["Info", 0][0],
                         "Files count in testfolder/: 3")
    
            # only the new file is given back...
            assert_equal(1, len(e))
            assert_false(e[0].is_valid())
            assert_equal(e[0].name, "test3.dat")
            assert_equal(e[0].path, "/testfolder/test3.dat")
    
            f = models.Container()
            f.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert_is_not_none(f.messages["Info", 0])
            assert_equal(f.messages["Info", 0][0],
                         "Files count in testfolder/: 3")
            # only the new file is given back...
            assert_equal(1, len(f))
            assert_true(f[0].is_valid())
            assert_equal(f[0].name, "test3.dat")
            assert_equal(f[0].path, "/testfolder/test3.dat")
    
        finally:
            try:
                f.delete()
            except BaseException:
                pass
            try:
                d.delete()
            except BaseException:
                pass
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_insert_files_in_dir_regex():
        # TODO
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
            os.makedirs(path)
            os.makedirs(path + "subfolder/")
            test_file1 = open(path + "subfolder/test.dat", "w")
            test_file1.write("hello world\n")
            test_file1.close()
    
            test_file2 = open(path + "subfolder/test2.dat", "w")
            test_file2.write("hello world2\n")
            test_file2.close()
    
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "-e test " +
                    path_on_server})
            assert c.messages["Warning", 2] is not None
            assert c.messages["Warning", 2][0] == "Explicitly excluded directory or file: {}".format(
                path_on_server[:-1])
            assert len(c) == 0
    
        finally:
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @with_setup(setup, teardown)
    def test_thumbnails():
        file_ = models.File(name="TestFile",
                            description="Testfile Desc",
                            path="testfiles/thumbnails_test.dat",
                            file="test.dat", thumbnail="testfolder/test1.dat")
        file_.insert()
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                "testfiles"],
            reconnect=True).read()
        print(body)
        xml = etree.fromstring(body)
        assert xml.xpath('/Response')
        assert xml.xpath('/Response/dir/file')
        # TODO find a better way to check this
        assert_equal(xml[1][0].get("thumbnail")[-41:],
                     "/Thumbnails/testfiles/thumbnails_test.dat")