Skip to content
Snippets Groups Projects
Select Git revision
  • d83c300899a93d7f9399de77950fa373b3f99c3b
  • main default protected
  • dev
  • f-unmod
  • f-checkidentical
  • f-simple-breakpoint
  • f-new-debug-tree
  • f-existing-file-id
  • f-no-ident
  • f-collect-problems
  • f-refactor-debug-tree
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
  • v0.5.0
  • v0.4.0
  • v0.3.0
  • v0.2.0
  • v0.1.0
27 results

test_crawler.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_file.py 29.01 KiB
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    # Copyright (C) 2019-2021 IndiScale GmbH <info@indiscale.com>
    # Copyright (C) 2019 Daniel Hornung <d.hornung@indiscale.com>
    # Copyright (C) 2020-2021 Timm Fitschen <t.fitschen@indiscale.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    """Created on 18.09.2013.
    
    @author: Timm Fitschen
    """
    import os
    import shutil
    from random import randint
    from sys import maxsize as maxint
    
    from lxml import etree
    from pytest import raises, mark
    import caosdb as db
    from caosdb import execute_query, get_config, get_connection
    from caosdb.common import models
    from caosdb.exceptions import EntityError, TransactionError
    from caosdb.utils.checkFileSystemConsistency import runCheck
    
    
    def setup_module():
        teardown_function(None)
    
    
    def setup_function(function):
        teardown_function(function)
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        os.makedirs("testfolder/subfolder")
        with open("testfolder/test1.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        with open("testfolder/subfolder/test2.dat", "w") as upload_file:
            upload_file.write("hello world\n")
    
    
    def teardown_function(function):
        d = execute_query("FIND ENTITY WHICH HAS AN ID >= 100")
        if len(d) > 0:
            d.delete()
        try:
            shutil.rmtree("testfolder")
        except Exception as e:
            print(e)
        try:
            os.remove("test.dat")
        except Exception as e:
            print(e)
        try:
            os.remove("test2.dat")
        except Exception as e:
            print(e)
        try:
            os.remove("test.dat.tmp")
        except BaseException:
            pass
    
        # remove directories
        body = get_connection().retrieve(
            entity_uri_segments=[
                "Entity"],
            query_dict={"query": "FIND ENTITY WITH ID>99"},
            reconnect=True).read()
        xml = etree.fromstring(body)
        delete_ids = []
        for d in xml.xpath('/Response/Directory'):
            delete_ids.append(d.get("id"))
        get_connection().delete(
            entity_uri_segments=[
                "Entity", "&".join(delete_ids)],
            reconnect=True)
    
    
    def test_file_with_space():
        file_ = models.File(name="TestFile",
                            description="Testfile Desc",
                            path="testfiles/test file with spaces.dat",
                            file="test.dat")
        file_.insert()
        qfile = execute_query("FIND FILE TestFile", unique=True)
        assert qfile.id == file_.id
        assert qfile.path == "/testfiles/test file with spaces.dat"
    
        qfile.download("test2.dat")
    
    
    @mark.local_server
    def test_pickup_file():
        d = models.DropOffBox()
        d.sync()
        try:
            pickup_file = open(os.path.join(d.path, "testpickup.dat"), "w")
        except BaseException:
            print("drop off box not on this system.")
        else:
            pickup_file.write("hello world\n")
            pickup_file.close()
            file_ = models.File(name="PickupTestfile",
                                description="Pickup test file desc",
                                path="testfiles/pickuptestfile.dat" +
                                hex(randint(0, maxint)),
                                pickup="testpickup.dat")
            file_.insert()
            assert file_.id is not None
    
    
    @mark.local_server
    def test_pickup_folder():
        # pickup_folder
        d = models.DropOffBox()
        d.sync()
        try:
            os.mkdir(d.path + "/testfolder")
        except BaseException:
            print("drop off box not on this system.")
        else:
            os.mkdir(d.path + "/testfolder/subfolder")
            pickup_file = open(d.path + "/testfolder/testpickup1.dat", "w")
            pickup_file.write("hello world\n")
            pickup_file.close()
            pickup_file = open(
                d.path + "/testfolder/subfolder/testpickup2.dat", "w")
            pickup_file.write("hello world\n")
            pickup_file.close()
    
            file_ = models.File(name="PickupTestfolder",
                                description="Pickup test folder desc",
                                path="testfiles/pickuptestfolder" +
                                hex(randint(0, maxint)) + "/",
                                pickup="testfolder/")
            file_.insert()
    
    
    @mark.local_server
    def test_file4():
        try:
            d = models.DropOffBox()
            d.sync()
            try:
                pickup_file = open(d.path + "/testpickup1.dat", "w")
            except BaseException:
                print("drop off box not on this system.")
            else:
                pickup_file.write("hello world\n")
                pickup_file.close()
                pickup_file = open(d.path + "/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                file1_ = models.File(
                    name="Testfile1", description="Testfile Desc",
                    path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
                    pickup="testpickup1.dat")
                file2_ = models.File(
                    name="Testfile2", description="Testfile Desc",
                    path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
                    pickup="testpickup2.dat")
                c = models.Container()
                c.extend([file1_, file2_])
                c.insert()
                assert file1_.id is not None
                assert file2_.id is not None
        finally:
            try:
                c.delete()
            except BaseException:
                pass
    
    
    def test_upload_complete_folder():
        file1_ = models.File(name="Testfile1",
                             description="Testfile Desc",
                             path="testfiles/testfile1.dat" + hex(randint(0,
                                                                          maxint)),
                             file="test.dat")
        file2_ = models.File(name="Testfile2",
                             description="Testfile Desc",
                             path="testfiles/testfile2.dat" + hex(randint(0,
                                                                          maxint)),
                             file="test.dat")
        c = models.Container()
        c.extend([file1_, file2_])
        c.insert()
        assert file1_.id is not None
        assert file2_.id is not None
        c.delete()
    
        c = models.Container()
        folder_ = models.File(name="Testfolder",
                              description="Testfolder Desc",
                              path="testfiles/testfolder" + hex(randint(0,
                                                                        maxint)),
                              file="testfolder")
        c.append(folder_)
        c.insert()
        assert folder_.id is not None
        c.delete()
    
    
    def test_file6():
        try:
            # upload file to testfiles2/testfile...
            upload_file = open("test.dat", "w")
            upload_file.write("hello world\n")
            upload_file.close()
            file_ = models.File(name="Testfidb.dble",
                                description="Testfile Desc",
                                path="testfiles2/testfile" +
                                hex(randint(0, maxint)) + ".dat",
                                file="test.dat")
            file_.insert()
    
            assert file_.id is not None
            assert file_.is_valid()
    
            # pickup_folder
            # and try to store it to testfiles2/
            d = models.DropOffBox()
            d.sync()
            path = d.path + "/testfolder"
    
            if not os.path.isdir(d.path):
                print("drop off box not on this system.")
            else:
                os.mkdir(path)
                pickup_file = open(path + "/testpickup1.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
                os.mkdir(path + "/subfolder")
                pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
                pickup_file.write("hello world\n")
                pickup_file.close()
    
                folder_ = models.File(
                    name="PickupTestfolder",
                    description="Pickup test folder desc",
                    path="testfiles2/",
                    pickup="testfolder/")
    
                with raises(TransactionError) as te:
                    folder_.insert()
                cm = te.value.errors[0]
                errors = cm.entity.get_errors()
                assert errors[0].description == 'This target path does already exist.'
        finally:
            try:
                folder_.delete()
            except BaseException:
                pass
            try:
                file_.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    def test_file7():
        try:
            # upload file to testfiles2/testsub/testfile...
            with open("test.dat", "w") as upload_file:
                upload_file.write("hello world\n")
            file_ = models.File(name="Testfidb.dble",
                                description="Testfile Desc",
                                path="testfiles2/testsub/testfile" +
                                hex(randint(0, maxint)) + ".dat",
                                file="test.dat")
            file_.insert()
    
            assert file_.id is not None
            assert file_.is_valid() is True
    
            # pickup_folder
            # and try to store it to testfiles2/
            d = models.DropOffBox()
            d.sync()
            path = d.path + "/testfolder"
    
            if not os.path.isdir(d.path):
                print("drop off box not on this system.")
            else:
                os.mkdir(path)
                with open(path + "/testpickup1.dat", "w") as pickup_file:
                    pickup_file.write("hello world\n")
                os.mkdir(path + "/subfolder")
                with open(path + "/subfolder/testpickup2.dat", "w") as pickup_file:
                    pickup_file.write("hello world\n")
    
                folder_ = models.File(
                    name="PickupTestfolder",
                    description="Pickup test folder desc",
                    path="testfiles2/",
                    pickup="testfolder")
    
                with raises(TransactionError) as te:
                    folder_.insert()
                assert te.value.has_error(EntityError)
        finally:
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    def test_consistency_file_was_modified():
        # insert new test file
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        file_ = models.File(name="TestConsistency1",
                            description="Testfile Desc",
                            path="debug/test_file_storage_consistency",
                            file="test.dat")
        file_.insert()
    
        assert file_.id is not None
        assert file_.is_valid()
    
        # run consistency check (no consistency warning)
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c FILE_WAS_MODIFIED'
        #
        # which simulates a modified file.
        c = runCheck(None, "-c FILE_WAS_MODIFIED")
    
        assert c.messages["Error", 0] is not None
        assert c.messages["Error",
                          0][0] == 'debug/test_file_storage_consistency: File was modified.'
    
        # download file again and check if it is still the same (just to be
        # sure that the server only simulated the consistency breach
        with open(file_.download(target="test.dat.tmp"), "r") as d:
            r = d.read()
            assert r == "hello world\n"
    
        # run a passing check again
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
    
    def test_consistency_file_does_not_exist():
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        file_ = models.File(name="TestConsistency2",
                            description="Testfile Desc",
                            path="debug/test_file_storage_consistency",
                            file="test.dat")
        file_.insert()
    
        assert file_.id is not None
        assert file_.is_valid()
    
        c = runCheck(None, "/debug/")
        assert c.messages["Info", 0] is not None
        assert c.messages["Info",
                          0][0] == "File system below debug/ is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c FILE_DOES_NOT_EXIST'
        #
        # which simulates a accidentially removed file.
        c = runCheck(None, "-c FILE_DOES_NOT_EXIST")
        assert c.messages["Error", 0] is not None
        assert c.messages["Error",
                          0][0] == 'debug/test_file_storage_consistency: File does not exist.'
    
        with open(file_.download(target="test.dat.tmp"), "r") as d:
            r = d.read()
            assert r == "hello world\n"
    
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
    
    def test_consistency_unknown_file():
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c UNKNOWN_FILE'
        #
        # which simulates an unknown file.
        c = runCheck(None, "-c UNKNOWN_FILE")
        assert c.messages["Warning", 0] is not None
        assert c.messages["Warning",
                          0][0] == 'debug/: Unknown file.'
    
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
    
    def test_insert_files_in_dir_error1():
    
        c = models.Container()
        c.retrieve(
            unique=False,
            raise_exception_on_error=False,
            flags={
                "InsertFilesInDir": "/root"})
        # If this fails, it is likely that the server runs with root permissions
        # which is discouraged.
        assert c.messages["Error", 0] is not None
        assert c.messages["Error", 0][0].startswith(
            "Cannot read or enter the desired directory.") is True
    
        c = models.Container()
        c.retrieve(unique=False, raise_exception_on_error=False,
                   flags={"InsertFilesInDir": "non-existent"})
        assert c.messages["Error", 0] is not None
        assert c.messages["Error", 0][0].startswith("No such directory") is True
    
        c.insert(raise_exception_on_error=False, flags={
                 "InsertFilesInDir": "non-existent"})
        assert c.messages["Error", 0] is not None
        assert c.messages["Error", 0][0].startswith("No such directory") is True
    
        c = models.Container()
        c.retrieve(
            unique=False,
            raise_exception_on_error=False,
            flags={
                "InsertFilesInDir": "."})
        assert c.messages["Error", 0] is not None
        assert c.messages["Error", 0][0].startswith("Dir is not allowed") is True
    
    
    @mark.local_server
    def test_insert_files_in_dir_with_symlink():
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
    
            # create file in a server-readable directory
            os.makedirs(path)
            path = os.path.realpath(path) + "/"
            os.makedirs(path + "subfolder/")
    
            test_file = open(path + "subfolder/test2.dat", "w")
            test_file.write("hello world2\n")
            test_file.close()
    
            # create a symlink to the directory where the test_file resides
            os.symlink(path_on_server + "subfolder/", path + "linked_subfolder")
    
            # call insertFilesInDir job
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server +
                    "linked_subfolder"})
            assert c.messages["Warning", 6] is not None
            assert c.messages["Warning", 6][0] == ("Directory or file is symbolic "
                                                   "link: {}linked_subfolder"
                                                   ).format(path_on_server)
    
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "--force-allow-symlinks " +
                    path_on_server +
                    "linked_subfolder"})
            assert c.messages["Info", 0] is not None
            assert c.messages["Info",
                              0][0] == "Files count in linked_subfolder/: 1"
            assert c[0].name == "test2.dat"
            assert c[0].path == "/linked_subfolder/test2.dat"
    
            d = models.Container()
            d.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "--force-allow-symlinks " +
                    path_on_server +
                    "linked_subfolder"})
            assert c.messages["Info", 0] is not None
            assert d.messages["Info",
                              0][0] == "Files count in linked_subfolder/: 1"
            assert d[0].is_valid() is True
            assert d[0].name == "test2.dat"
            assert d[0].path == "/linked_subfolder/test2.dat"
    
        finally:
            try:
                d.delete()
            except BaseException:
                pass
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @mark.local_server
    def test_insert_files_in_dir():
        """ test if files in directories can be inserted as symlinks via the
        InsertFilesInDir flag. This tests also verifies that the job can be
        executed for the root directory after the child directory and only the new
        files are being inserted.
        """
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
            os.makedirs(path)
            os.makedirs(path + "subfolder/")
            test_file1 = open(path + "subfolder/test.dat", "w")
            test_file1.write("hello world\n")
            test_file1.close()
    
            test_file2 = open(path + "subfolder/test2.dat", "w")
            test_file2.write("hello world2\n")
            test_file2.close()
    
            # DRY-RUN
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert c.messages["Info", 0] is not None
            assert c.messages["Info", 0][0] == "Files count in testfolder/: 2"
    
            # ACTUAL RUN
            d = models.Container()
            d.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert c.messages["Info", 0] is not None
            assert d.messages["Info", 0][0] == "Files count in testfolder/: 2"
            assert d[0].is_valid() is True
    
            # create a new file and call insertFilesInDir again
            test_file3 = open(path + "/test3.dat", "w")
            test_file3.write("hello world3\n")
            test_file3.close()
    
            e = models.Container()
            e.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert e.messages["Info", 0] is not None
            assert e.messages["Info", 0][0] == "Files count in testfolder/: 3"
    
            # only the new file is given back...
            assert 1 == len(e)
            assert e[0].is_valid() is False
            assert e[0].name == "test3.dat"
            assert e[0].path == "/testfolder/test3.dat"
    
            f = models.Container()
            f.insert(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": path_on_server})
            assert f.messages["Info", 0] is not None
            assert f.messages["Info", 0][0] == "Files count in testfolder/: 3"
            # only the new file is given back...
            assert 1 == len(f)
            assert f[0].is_valid() is True
            assert f[0].name == "test3.dat"
            assert f[0].path == "/testfolder/test3.dat"
    
        finally:
            try:
                f.delete()
            except BaseException:
                pass
            try:
                d.delete()
            except BaseException:
                pass
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @mark.local_server
    def test_insert_files_in_dir_regex():
        path = get_config().get("IntegrationTests",
                                "test_files.test_insert_files_in_dir.local") + "testfolder/"
        path_on_server = get_config().get("IntegrationTests",
                                          "test_files.test_insert_files_in_dir.server") + "testfolder/"
        try:
            os.makedirs(path)
            os.makedirs(path + "subfolder/")
            test_file1 = open(path + "subfolder/test_dont_insert.dat", "w")
            test_file1.write("hello world\n")
            test_file1.close()
    
            test_file2 = open(path + "subfolder/test_insert.dat", "w")
            test_file2.write("hello world2\n")
            test_file2.close()
    
            c = models.Container()
            c.retrieve(
                unique=False,
                raise_exception_on_error=False,
                flags={
                    "InsertFilesInDir": "-e dont_insert " +
                    path_on_server})
            assert c.messages["Warning", 2] is not None
            assert c.messages["Warning", 2][0] == "Explicitly excluded file: {}".format(
                path_on_server + "subfolder/test_dont_insert.dat")
    
            # the other has been inserted
            assert len(c) == 1
            assert c[0].name == "test_insert.dat"
    
        finally:
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @mark.xfail(reason="Needs file system refactoring")
    def test_auto_create_parent_dirs():
        file_ = models.File(name="TestFile",
                            path="A/B/C/test.dat",
                            file="test.dat")
        file_.set_flag("autoCreateDirs", "true")
        file_.insert()
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                ""],
            reconnect=True).read()
        print(body)
        xml = etree.fromstring(body)
        assert len(xml.xpath('/Response/dir')) == 1
        root = xml.xpath('/Response/dir')[0]
        assert root.get("name") == "/"
        assert root.get("path") == ""
        assert root.get("url")[-12:] == "/FileSystem/"
    
        assert len(root.xpath('dir')) == 1
        assert len(root.xpath('file')) == 0
    
        dir_a = root.xpath('dir')[0]
        assert dir_a.get("name") == "A/"
        assert dir_a.get("url")[-14:] == "/FileSystem/A/"
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                "A", ""],
            reconnect=True).read()
        xml = etree.fromstring(body)
        assert len(xml.xpath('/Response/dir')) == 1
        dir_a = xml.xpath('/Response/dir')[0]
    
        assert dir_a.get("name") == "A/"
        assert dir_a.get("path") == "A"
        assert dir_a.get("url")[-14:] == "/FileSystem/A/"
    
        assert len(dir_a.xpath('dir')) == 1
        assert len(dir_a.xpath('file')) == 0
    
        dir_b = dir_a.xpath('dir')[0]
        assert dir_b.get("name") == "B/"
        assert dir_b.get("url")[-16:] == "/FileSystem/A/B/"
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                "A", "B", ""],
            reconnect=True).read()
        xml = etree.fromstring(body)
        assert len(xml.xpath('/Response/dir')) == 1
        dir_b = xml.xpath('/Response/dir')[0]
    
        assert len(dir_b.xpath('dir')) == 1
        assert len(dir_b.xpath('file')) == 0
    
        dir_c = dir_b.xpath('dir')[0]
        assert dir_c.get("name") == "C/"
        assert dir_c.get("url")[-18:] == "/FileSystem/A/B/C/"
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                "A", "B", "C", ""],
            reconnect=True).read()
        xml = etree.fromstring(body)
        assert len(xml.xpath('/Response/dir')) == 1
        dir_c = xml.xpath('/Response/dir')[0]
    
        assert len(dir_c.xpath('dir')) == 0
        assert len(dir_c.xpath('file')) == 1
    
        file_1 = dir_c.xpath("file")[0]
        assert file_1.get("name") == "test.dat"
        assert file_1.get("url")[-26:] == "/FileSystem/A/B/C/test.dat"
    
    
    @mark.xfail(reason="Needs file system refactoring")
    def test_import_file():
        path = get_config().get("IntegrationTests",
                                "test_files.test_import_files_in_dir.local") + "testfolder/"
    
        try:
            os.makedirs(path)
            os.makedirs(path + "subfolder/")
            with open(path + "subfolder/some_data.csv", "w") as test_file1:
                test_file1.write("hello, world\n")
    
            file_ = db.File(description="Testfile Desc",
                            import_file=True,
                            path="subfolder/some_data.csv")
            file_.insert()
    
            file_ = db.File(description="Testfile Desc",
                            import_file=True,
                            path="subfolder/some_data.csv")
            file_.insert()
    
            file_.delete()
        finally:
            # we need to delete this one manually, bc file_.delete() will not
            # delete the file
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @mark.xfail(reason="Needs file system refactoring")
    def test_import_directoy():
        path = get_config().get("IntegrationTests",
                                "test_files.test_import_files_in_dir.local") + "testfolder/"
    
        try:
            os.makedirs(path)
            os.makedirs(path + "subfolder/")
            with open(path + "subfolder/some_data.csv", "w") as test_file1:
                test_file1.write("hello, world\n")
    
            c = models.Container()
        finally:
            try:
                c.delete()
            except BaseException:
                pass
            try:
                shutil.rmtree(path)
            except BaseException:
                pass
    
    
    @mark.xfail(reason="Needs file system refactoring")
    def test_consistency_file_was_modified_2():
        # this test is different from test_consistency_file_was_modified in that
        # the server does actually corrupt the file.
        # insert new test file
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        file_ = models.File(name="TestConsistency3",
                            description="Testfile Desc",
                            path="debug/test_file_storage_consistency",
                            file="test.dat")
        file_.insert()
        checksum1 = file_.checksum.lower()
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum1
    
        assert file_.id is not None
        assert file_.is_valid()
    
        # run consistency check (no consistency warning)
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c FILE_WAS_MODIFIED_2'
        #
        # which simulates a modified file.
        c = runCheck(None, "-c FILE_WAS_MODIFIED_2")
    
        assert c.messages["Error", 0] is not None
        assert c.messages["Error",
                          0][0] == 'debug/test_file_storage_consistency: File was modified.'
    
        # old checksum is still stored
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum1
    
        # download fails because the checksum changed.
        with raises(db.exceptions.ConsistencyError) as exc:
            file_.download(target="test.dat.tmp")
    
        file_.download(target="test.dat.tmp", check_hash=False)
        assert file_.checksum.lower() == checksum1
        checksum2 = db.File.sha512("test.dat.tmp").lower()
        file_.checksum = checksum2
        assert checksum2 != checksum1
    
        # old checksum is still stored
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum1
    
        # here the client may validate the new file
        file_.file = None
        file_.update()
        assert checksum2 == file_.checksum.lower()
    
        # new checksum is stored
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum2
    
        # no ConsistencyError anymore
        file_.download(target="test.dat.tmp")
        assert checksum2 == file_.checksum.lower()
    
        # new checksum is still stored
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum2