Skip to content
Snippets Groups Projects
Select Git revision
  • c1574aa3c0c33786f791795076a4b91d9e13ad46
  • main default protected
  • dev protected
  • f-fix-accent-sensitivity
  • f-filesystem-import
  • f-update-acl
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-string-ids
  • f-filesystem-main
  • f-multipart-encoding
  • f-trigger-advanced-user-tools
  • f-real-rename-test-pylibsolo2
  • f-real-rename-test-pylibsolo
  • f-real-rename-test
  • f-linkahead-rename
  • f-reference-record
  • f-xml-serialization
  • f-xfail-server-181
  • linkahead-pylib-v0.18.0
  • linkahead-control-v0.16.0
  • linkahead-pylib-v0.17.0
  • linkahead-mariadbbackend-v8.0.0
  • linkahead-server-v0.13.0
  • caosdb-pylib-v0.15.0
  • caosdb-pylib-v0.14.0
  • caosdb-pylib-v0.13.2
  • caosdb-server-v0.12.1
  • caosdb-pylib-v0.13.1
  • caosdb-pylib-v0.12.0
  • caosdb-server-v0.10.0
  • caosdb-pylib-v0.11.1
  • caosdb-pylib-v0.11.0
  • caosdb-server-v0.9.0
  • caosdb-pylib-v0.10.0
  • caosdb-server-v0.8.1
  • caosdb-pylib-v0.8.0
  • caosdb-server-v0.8.0
  • caosdb-pylib-v0.7.2
41 results

test_file.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_file.py 23.53 KiB
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    # Copyright (C) 2019 IndiScale GmbH (info@indiscale.com)
    # Copyright (C) 2019 Daniel Hornung (d.hornung@indiscale.com)
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    """Created on 18.09.2013.
    
    @author: Timm Fitschen
    """
    import os
    import shutil
    from random import randint
    from sys import maxsize as maxint
    
    from lxml import etree
    from pytest import raises
    from nose.tools import (assert_equal, assert_false,  # @UnresolvedImport
                            assert_is_not_none, assert_raises, assert_true,
                            nottest)
    
    from caosdb import Info
    from caosdb import administration as admin
    from caosdb import execute_query, get_config, get_connection
    from caosdb.common import models
    from caosdb.exceptions import EntityError
    from caosdb.utils.checkFileSystemConsistency import runCheck
    
    
    def setup_module():
        teardown()
    
    
    def setup():
        teardown()
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        os.makedirs("testfolder/subfolder")
        with open("testfolder/test1.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        with open("testfolder/subfolder/test2.dat", "w") as upload_file:
            upload_file.write("hello world\n")
    
    
    def teardown():
        d = execute_query("FIND ENTITY WHICH HAS AN ID >= 100")
        if len(d) > 0:
            d.delete()
        try:
            shutil.rmtree("testfolder")
        except Exception as e:
            print(e)
        try:
            os.remove("test.dat")
        except Exception as e:
            print(e)
        try:
            os.remove("test2.dat")
        except Exception as e:
            print(e)
    
    
    def test_file_with_space():
        file_ = models.File(name="TestFile",
                            description="Testfile Desc",
                            path="testfiles/test file with spaces.dat",
                            file="test.dat")
        file_.insert()
        qfile = execute_query("FIND FILE TestFile", unique=True)
        assert_equal(qfile.id, file_.id)
        assert_equal(qfile.path, "/testfiles/test file with spaces.dat")
    
        qfile.download("test2.dat")
    
    
    def test_pickup_file():
        d = models.DropOffBox()
        d.sync()
        try:
            pickup_file = open(os.path.join(d.path, "testpickup.dat"), "w")
        except BaseException:
            print("drop off box not on this system.")
        else:
            pickup_file.write("hello world\n")
            pickup_file.close()
            file_ = models.File(name="PickupTestfile",
                                description="Pickup test file desc",
                                path="testfiles/pickuptestfile.dat" +
                                hex(randint(0, maxint)),
                                pickup="testpickup.dat")
            file_.insert()
            assert_is_not_none(file_.id)
    
    
    def test_pickup_folder():
        # pickup_folder
        d = models.DropOffBox()
        d.sync()
        try:
            os.mkdir(d.path + "/testfolder")
        except BaseException:
            print("drop off box not on this system.")
        else:
            os.mkdir(d.path + "/testfolder/subfolder")
            pickup_file = open(d.path + "/testfolder/testpickup1.dat", "w")
            pickup_file.write("hello world\n")
            pickup_file.close()
            pickup_file = open(
                d.path + "/testfolder/subfolder/testpickup2.dat", "w")
            pickup_file.write("hello world\n")
            pickup_file.close()
    
            file_ = models.File(name="PickupTestfolder",
                                description="Pickup test folder desc",
                                path="testfiles/pickuptestfolder" +
                                hex(randint(0, maxint)) + "/",
                                pickup="testfolder/")
            file_.insert()
    
    
    def test_file4():
        try:
            d = models.DropOffBox()
            d.sync()
            try:
                pickup_file = open(d.path + "/testpickup1.dat", "w")
            except BaseException:
                print("drop off box not on this system.")
            else:
                pickup_file.write("hello world\n")
                pickup_file.close()
                pickup_file = open(d.path + "/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                file1_ = models.File(
                    name="Testfile1", description="Testfile Desc",
                    path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
                    pickup="testpickup1.dat")
                file2_ = models.File(
                    name="Testfile2", description="Testfile Desc",
                    path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
                    pickup="testpickup2.dat")
                c = models.Container()
                c.extend([file1_, file2_])
                c.insert()
                assert_is_not_none(file1_.id)
                assert_is_not_none(file2_.id)
        finally:
            try:
                c.delete()
            except BaseException:
                pass
    
    
    def test_upload_complete_folder():
        file1_ = models.File(name="Testfile1",
                             description="Testfile Desc",
                             path="testfiles/testfile1.dat" + hex(randint(0,
                                                                          maxint)),
                             file="test.dat")
        file2_ = models.File(name="Testfile2",
                             description="Testfile Desc",
                             path="testfiles/testfile2.dat" + hex(randint(0,
                                                                          maxint)),
                             file="test.dat")
        c = models.Container()
        c.extend([file1_, file2_])
        c.insert()
        assert_is_not_none(file1_.id)
        assert_is_not_none(file2_.id)
        c.delete()
    
        c = models.Container()
        folder_ = models.File(name="Testfolder",
                              description="Testfolder Desc",
                              path="testfiles/testfolder" + hex(randint(0,
                                                                        maxint)),
                              file="testfolder")
        c.append(folder_)
        c.insert()
        assert_is_not_none(folder_.id)
        c.delete()
    
    
    def test_file6():
        try:
            # upload file to testfiles2/testfile...
            upload_file = open("test.dat", "w")
            upload_file.write("hello world\n")
            upload_file.close()
            file_ = models.File(name="Testfidb.dble",
                                description="Testfile Desc",
                                path="testfiles2/testfile" +
                                hex(randint(0, maxint)) + ".dat",
                                file="test.dat")
            file_.insert()
    
            assert file_.id is not None
            assert file_.is_valid()
    
            # pickup_folder
            # and try to store it to testfiles2/
            d = models.DropOffBox()
            d.sync()
            path = d.path + "/testfolder"
    
            if not os.path.isdir(d.path):
                print("drop off box not on this system.")
            else:
                os.mkdir(path)
                pickup_file = open(path + "/testpickup1.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                os.mkdir(path + "/subfolder")
                pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
    
                folder_ = models.File(
                    name="PickupTestfolder",
                    description="Pickup test folder desc",
                    path="testfiles2/",
                    pickup=path)
    
                with raises(EntityError) as cm:
                    folder_.insert()
                errors = cm.value.entity.get_errors()
                assert errors[0].description == 'This target path does already exist.'
        finally:
            try:
                folder_.delete()
            except BaseException:
                pass
            try:
                file_.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    def test_file7():
        try:
            # upload file to testfiles2/testsub/testfile...
            upload_file = open("test.dat", "w")
            upload_file.write("hello world\n")
            upload_file.close()
            file_ = models.File(name="Testfidb.dble",
                                description="Testfile Desc",
                                path="testfiles2/testsub/testfile" +
                                hex(randint(0, maxint)) + ".dat",
                                file="test.dat")
            file_.insert()
    
            assert_is_not_none(file_.id)
            assert_true(file_.is_valid())
    
            # pickup_folder
            # and try to store it to testfiles2/
            d = models.DropOffBox()
            d.sync()
            path = d.path + "/testfolder"
    
            if not os.path.isdir(d.path):
                print("drop off box not on this system.")
            else:
                os.mkdir(path)
                pickup_file = open(path + "/testpickup1.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                os.mkdir(path + "/subfolder")
                pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
    
                folder_ = models.File(
                    name="PickupTestfolder",
                    description="Pickup test folder desc",
                    path="testfiles2/",
                    pickup="path")
    
                assert_raises(EntityError, folder_.insert)
        finally:
            try:
                folder_.delete()
            except BaseException:
                pass
            try:
                file_.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    def test_consistency_file_was_modified():
        try:
    
            # insert new test file
            upload_file = open("test.dat", "w")
            upload_file.write("hello world\n")
            upload_file.close()
            file_ = models.File(name="TestConsistency1",
                                description="Testfile Desc",
                                path="debug/test_file_storage_consistency",
                                file="test.dat")
            file_.insert()
    
            assert file_.id is not None
            assert file_.is_valid()
    
            # run consistency check (no consistency warning)
            c = runCheck(None, None)
            assert c.messages["Info", 0] is not None
            assert c.messages["Info", 0][0] == "File system is consistent."
    
            # when in debug mode, the server offers a special option
            #
            #     '-c FILE_WAS_MODIFIED'
            #
            # which simulates a modified file.
            c = runCheck(None, "-c FILE_WAS_MODIFIED")
    
            assert c.messages["Error", 0] is not None
            assert c.messages["Error",
                              0][0] == 'debug/test_file_storage_consistency: File was modified.'
    
            # download file again and check if it is still the same (just to be
            # sure that the server only simulated the consistency breach
            d = open(file_.download(target="test.dat.tmp"), "r")
    
            r = d.read()
            assert r == "hello world\n"
    
            # run a passing check again
            c = runCheck(None, None)
            assert c.messages["Info", 0] is not None
            assert c.messages["Info", 0][0] == "File system is consistent."
    
        finally:
            # clean up file record
            try:
                file_.delete()
            except BaseException:
                pass
            # clean up local files
            try:
                d.close()
            except BaseException:
                pass
            try:
                os.remove("test.dat")
            except BaseException:
                pass
            try:
                os.remove("test.dat.tmp")
            except BaseException:
                pass
    
    
    def test_consistency_file_does_not_exist():
        try:
            with open("test.dat", "w") as upload_file:
                upload_file.write("hello world\n")
            file_ = models.File(name="TestConsistency1",
                                description="Testfile Desc",
                                path="debug/test_file_storage_consistency",
                                file="test.dat")
            file_.insert()
    
            assert file_.id is not None
            assert file_.is_valid()
    
            c = runCheck(None, "/debug/")
            assert c.messages["Info", 0] is not None
            assert c.messages["Info",
                              0][0] == "File system below debug/ is consistent."
    
            # when in debug mode, the server offers a special option
            #
            #     '-c FILE_DOES_NOT_EXIST'
            #
            # which simulates a accidentially removed file.
            c = runCheck(None, "-c FILE_DOES_NOT_EXIST")
            assert c.messages["Error", 0] is not None
            assert c.messages["Error",
                              0][0] == 'debug/test_file_storage_consistency: File does not exist.'
    
            with open(file_.download(target="test.dat.tmp"), "r") as d:
                r = d.read()
            assert r == "hello world\n"
    
            c = runCheck(None, None)
            assert c.messages["Info", 0] is not None
            assert c.messages["Info", 0][0] == "File system is consistent."
    
        finally:
            try:
                file_.delete()
            except BaseException:
                pass
            try:
                d.close()
            except BaseException:
                pass
            try:
                os.remove("test.dat")
            except BaseException:
                pass
            try:
                os.remove("test.dat.tmp")
            except BaseException:
                pass
    
    
    def test_consistency_unknown_file():
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c UNKNOWN_FILE'
        #
        # which simulates an unknown file.
        c = runCheck(None, "-c UNKNOWN_FILE")
        assert c.messages["Warning", 0] is not None
        assert c.messages["Warning", 0][0] == 'debug/: Unknown file.'
    
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
    
    def test_insert_files_in_dir_error1():
    
        c = models.Container()
        c.retrieve(
            unique=False,
            raise_exception_on_error=False,
            flags={
                "InsertFilesInDir": "/root"})
        # If this fails, it is likely that the server runs with root permissions
        # which is discouraged.
        assert_is_not_none(c.messages["Error", 0])
        assert_true(c.messages["Error", 0][0].startswith(
            "Cannot read or enter the desired directory."))
    
        c = models.Container()
        c.retrieve(unique=False, raise_exception_on_error=False,
                   flags={"InsertFilesInDir": "non-existent"})
        assert_is_not_none(c.messages["Error", 0])
        assert_true(c.messages["Error", 0][0].startswith("No such directory"))
    
        c.insert(raise_exception_on_error=False, flags={
                 "InsertFilesInDir": "non-existent"})
        assert_is_not_none(c.messages["Error", 0])
        assert_true(c.messages["Error", 0][0].startswith("No such directory"))
    
        c = models.Container()
        c.retrieve(
            unique=False,
            raise_exception_on_error=False,
            flags={
                "InsertFilesInDir": "."})
        assert_is_not_none(c.messages["Error", 0])
        assert_true(c.messages["Error", 0][0].startswith("Dir is not allowed"))
    
    
    def test_insert_files_in_dir_with_symlink():
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
    
            # create file in a server-readable directory
            os.makedirs(path)
            path = os.path.realpath(path) + "/"
            os.makedirs(path + "subfolder/")
    
            test_file = open(path + "subfolder/test2.dat", "w")
            test_file.write("hello world2\n")
            test_file.close()
    
            # create a symlink to the directory where the test_file resides
            os.symlink(path_on_server + "subfolder/", path + "linked_subfolder")
    
            # call insertFilesInDir job
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server +
                    "linked_subfolder"})
            assert_is_not_none(c.messages["Warning", 6])
            assert_equal(c.messages["Warning", 6][0],
                         "Directory or file is symbolic link: " + path_on_server +
                         "linked_subfolder")
    
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "--force-allow-symlinks " +
                    path_on_server +
                    "linked_subfolder"})
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(c.messages["Info", 0][0],
                         "Files count in linked_subfolder/: 1")
            assert_equal(c[0].name, "test2.dat")
            assert_equal(c[0].path, "/linked_subfolder/test2.dat")
    
            d = models.Container()
            d.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "--force-allow-symlinks " +
                    path_on_server +
                    "linked_subfolder"})
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(d.messages["Info", 0][0],
                         "Files count in linked_subfolder/: 1")
            assert_true(d[0].is_valid())
            assert_equal(d[0].name, "test2.dat")
            assert_equal(d[0].path, "/linked_subfolder/test2.dat")
    
        finally:
            try:
                d.delete()
            except BaseException:
                pass
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    def test_insert_files_in_dir():
        """ test if files in directories can be inserted as symlinks via the
        InsertFilesInDir flag. This tests also verifies that the job can be
        executed for the root directory after the child directory and only the new
        files are being inserted.
        """
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
            os.makedirs(path)
            os.makedirs(path + "subfolder/")
            test_file1 = open(path + "subfolder/test.dat", "w")
            test_file1.write("hello world\n")
            test_file1.close()
    
            test_file2 = open(path + "subfolder/test2.dat", "w")
            test_file2.write("hello world2\n")
            test_file2.close()
    
            # DRY-RUN
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(c.messages["Info", 0][0],
                         "Files count in testfolder/: 2")
    
            # ACTUAL RUN
            d = models.Container()
            d.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert_is_not_none(c.messages["Info", 0])
            assert_equal(d.messages["Info", 0][0],
                         "Files count in testfolder/: 2")
            assert_true(d[0].is_valid())
    
            # create a new file and call insertFilesInDir again
            test_file3 = open(path + "/test3.dat", "w")
            test_file3.write("hello world3\n")
            test_file3.close()
    
            e = models.Container()
            e.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert_is_not_none(e.messages["Info", 0])
            assert_equal(e.messages["Info", 0][0],
                         "Files count in testfolder/: 3")
    
            # only the new file is given back...
            assert_equal(1, len(e))
            assert_false(e[0].is_valid())
            assert_equal(e[0].name, "test3.dat")
            assert_equal(e[0].path, "/testfolder/test3.dat")
    
            f = models.Container()
            f.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert_is_not_none(f.messages["Info", 0])
            assert_equal(f.messages["Info", 0][0],
                         "Files count in testfolder/: 3")
            # only the new file is given back...
            assert_equal(1, len(f))
            assert_true(f[0].is_valid())
            assert_equal(f[0].name, "test3.dat")
            assert_equal(f[0].path, "/testfolder/test3.dat")
    
        finally:
            try:
                f.delete()
            except BaseException:
                pass
            try:
                d.delete()
            except BaseException:
                pass
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    def test_insert_files_in_dir_regex():
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
            os.makedirs(path)
            os.makedirs(path + "subfolder/")
            test_file1 = open(path + "subfolder/test_dont_insert.dat", "w")
            test_file1.write("hello world\n")
            test_file1.close()
    
            test_file2 = open(path + "subfolder/test_insert.dat", "w")
            test_file2.write("hello world2\n")
            test_file2.close()
    
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "-e dont_insert " +
                    path_on_server})
            assert c.messages["Warning", 2] is not None
            assert c.messages["Warning", 2][0] == "Explicitly excluded file: {}".format(
                path_on_server + "subfolder/test_dont_insert.dat")
    
            # the other has been inserted
            assert len(c) == 1
            assert c[0].name == "test_insert.dat"
    
        finally:
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    def test_thumbnails():
        file_ = models.File(name="TestFile",
                            description="Testfile Desc",
                            path="testfiles/thumbnails_test.dat",
                            file="test.dat", thumbnail="testfolder/test1.dat")
        file_.insert()
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                "testfiles"],
            reconnect=True).read()
        print(body)
        xml = etree.fromstring(body)
        assert xml.xpath('/Response/dir/file')[0].get(
            "thumbnail")[-41:] == "/Thumbnails/testfiles/thumbnails_test.dat"