Skip to content
Snippets Groups Projects
Select Git revision
  • 03b1d0aab27e4d20e824f9b3c2de307c5145bca2
  • main default protected
  • dev
  • f-unmod
  • f-checkidentical
  • f-simple-breakpoint
  • f-new-debug-tree
  • f-existing-file-id
  • f-no-ident
  • f-collect-problems
  • f-refactor-debug-tree
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.1
  • v0.10.0
  • v0.9.1
  • v0.9.0
  • v0.8.0
  • v0.7.1
  • v0.7.0
  • v0.6.0
  • v0.5.0
  • v0.4.0
  • v0.3.0
  • v0.2.0
  • v0.1.0
27 results

test_identifiable_adapters.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_file.py 11.16 KiB
    # encoding: utf-8
    #
    # ** header v3.0
    # This file is a part of the CaosDB Project.
    #
    # Copyright (C) 2018 Research Group Biomedical Physics,
    # Max-Planck-Institute for Dynamics and Self-Organization Göttingen
    # Copyright (C) 2019-2021 IndiScale GmbH <info@indiscale.com>
    # Copyright (C) 2019 Daniel Hornung <d.hornung@indiscale.com>
    # Copyright (C) 2020-2021 Timm Fitschen <t.fitschen@indiscale.com>
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    #
    # ** end header
    #
    """Created on 18.09.2013.
    
    @author: Timm Fitschen
    """
    import os
    import shutil
    
    from lxml import etree
    from pytest import raises, mark
    import caosdb as db
    from caosdb import execute_query, get_config, get_connection
    from caosdb.common import models
    from caosdb.utils.checkFileSystemConsistency import runCheck
    
    
    def setup_module():
        teardown_function(None)
    
    
    def setup_function(function):
        teardown_function(function)
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        os.makedirs("testfolder/subfolder")
        with open("testfolder/test1.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        with open("testfolder/subfolder/test2.dat", "w") as upload_file:
            upload_file.write("hello world\n")
    
    
    def teardown_function(function):
        d = execute_query("FIND ENTITY WHICH HAS AN ID >= 100")
        if len(d) > 0:
            d.delete()
        try:
            path = get_config().get("IntegrationTests",
                                    "test_files.test_insert_files_in_dir.local") + "testfolder/"
            shutil.rmtree(path)
        except Exception as e:
            print(e)
        try:
            shutil.rmtree("testfolder")
        except Exception as e:
            print(e)
        try:
            os.remove("test.dat")
        except Exception as e:
            print(e)
        try:
            os.remove("test2.dat")
        except Exception as e:
            print(e)
        try:
            os.remove("test.dat.tmp")
        except Exception as e:
            print(e)
    
    
    def test_file_with_space():
        file_ = models.File(name="TestFile",
                            description="Testfile Desc",
                            path="testfiles/test file with spaces.dat",
                            file="test.dat")
        file_.insert()
        qfile = execute_query("FIND FILE TestFile", unique=True)
        assert qfile.id == file_.id
        assert qfile.path == "/testfiles/test file with spaces.dat"
    
        qfile.download("test2.dat")
    
    
    def test_consistency_file_does_not_exist():
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        file_ = models.File(name="TestConsistency1",
                            description="Testfile Desc",
                            path="debug/test_file_storage_consistency",
                            file="test.dat")
        file_.insert()
    
        assert file_.id is not None
        assert file_.is_valid()
    
        c = runCheck(None, "/debug/")
        assert c.messages["Info", 0] is not None
        assert c.messages["Info",
                          0][0] == "File system below debug/ is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c FILE_DOES_NOT_EXIST'
        #
        # which simulates a accidentially removed file.
        c = runCheck(None, "-c FILE_DOES_NOT_EXIST")
        assert c.messages["Error", 0] is not None
        assert c.messages["Error",
                          0][0] == 'MISSING - DEFAULT:debug/test_file_storage_consistency'
    
        with open(file_.download(target="test.dat.tmp"), "r") as d:
            r = d.read()
        assert r == "hello world\n"
    
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
    
    def test_consistency_file_was_modified():
        # insert new test file
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        file_ = models.File(name="TestConsistency1",
                            description="Testfile Desc",
                            path="debug/test_file_storage_consistency",
                            file="test.dat")
        file_.insert()
    
        assert file_.id is not None
        assert file_.is_valid()
    
        # run consistency check (no consistency warning)
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c FILE_WAS_MODIFIED'
        #
        # which simulates a modified file.
        c = runCheck(None, "-c FILE_WAS_MODIFIED")
    
        assert c.messages["Error", 0] is not None
        assert c.messages["Error",
                          0][0] == 'CHANGED:HASH - DEFAULT:debug/test_file_storage_consistency'
    
        # download file again and check if it is still the same (just to be
        # sure that the server only simulated the consistency breach
        with open(file_.download(target="test.dat.tmp"), "r") as d:
            r = d.read()
            assert r == "hello world\n"
    
        # run a passing check again
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
    
    def test_consistency_unknown_file():
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c UNKNOWN_FILE'
        #
        # which simulates an unknown file.
        c = runCheck(None, "-c UNKNOWN_FILE")
        assert c.messages["Warning", 0] is not None
        assert c.messages["Warning",
                          0][0] == 'UNKNOWN - DEFAULT:test_unknown'
    
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
    
    def test_auto_create_parent_dirs():
        file_ = models.File(name="TestFile",
                            path="A/B/C/test.dat",
                            file="test.dat")
        file_.insert()
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                ""],
            reconnect=True).read()
        print(body)
        xml = etree.fromstring(body)
        assert len(xml.xpath('/Response/dir')) == 1
        root = xml.xpath('/Response/dir')[0]
        assert root.get("name") == "/"
        assert root.get("path") == "/"
        assert root.get("url")[-12:] == "/FileSystem/"
    
        assert len(root.xpath('dir')) == 1
        assert len(root.xpath('file')) == 0
    
        dir_a = root.xpath('dir')[0]
        assert dir_a.get("name") == "A"
        assert dir_a.get("url")[-14:] == "/FileSystem/A/"
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                "A", ""],
            reconnect=True).read()
        xml = etree.fromstring(body)
        assert len(xml.xpath('/Response/dir')) == 1
        dir_a = xml.xpath('/Response/dir')[0]
    
        assert dir_a.get("name") == "A"
        assert dir_a.get("url")[-14:] == "/FileSystem/A/"
    
        assert len(dir_a.xpath('dir')) == 1
        assert len(dir_a.xpath('file')) == 0
    
        dir_b = dir_a.xpath('dir')[0]
        assert dir_b.get("name") == "B"
        assert dir_b.get("url")[-16:] == "/FileSystem/A/B/"
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                "A", "B", ""],
            reconnect=True).read()
        xml = etree.fromstring(body)
        assert len(xml.xpath('/Response/dir')) == 1
        dir_b = xml.xpath('/Response/dir')[0]
    
        assert len(dir_b.xpath('dir')) == 1
        assert len(dir_b.xpath('file')) == 0
    
        dir_c = dir_b.xpath('dir')[0]
        assert dir_c.get("name") == "C"
        assert dir_c.get("url")[-18:] == "/FileSystem/A/B/C/"
    
        body = get_connection().retrieve(
            entity_uri_segments=[
                "FileSystem",
                "A", "B", "C", ""],
            reconnect=True).read()
        xml = etree.fromstring(body)
        assert len(xml.xpath('/Response/dir')) == 1
        dir_c = xml.xpath('/Response/dir')[0]
    
        assert len(dir_c.xpath('dir')) == 0
        assert len(dir_c.xpath('file')) == 1
    
        file_1 = dir_c.xpath("file")[0]
        assert file_1.get("name") == "test.dat"
        assert file_1.get("url")[-26:] == "/FileSystem/A/B/C/test.dat"
    
    
    def test_consistency_file_was_modified_2():
        # this test is different from test_consistency_file_was_modified in that
        # the server does actually corrupt the file.
        # insert new test file
        with open("test.dat", "w") as upload_file:
            upload_file.write("hello world\n")
        file_ = models.File(name="TestConsistency3",
                            description="Testfile Desc",
                            path="debug/test_file_storage_consistency",
                            file="test.dat")
        file_.insert()
        checksum1 = file_.checksum.lower()
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum1
    
        assert file_.id is not None
        assert file_.is_valid()
    
        # run consistency check (no consistency warning)
        c = runCheck(None, None)
        assert c.messages["Info", 0] is not None
        assert c.messages["Info", 0][0] == "File system is consistent."
    
        # when in debug mode, the server offers a special option
        #
        #     '-c FILE_WAS_MODIFIED_2'
        #
        # which simulates a modified file.
        c = runCheck(None, "-c FILE_WAS_MODIFIED_2")
    
        assert c.messages["Error", 0] is not None
        assert c.messages["Error",
                          0][0] == 'CHANGED:HASH - DEFAULT:debug/test_file_storage_consistency'
    
        # old checksum is still stored
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum1
    
        # download fails because the checksum changed.
        with raises(db.exceptions.ConsistencyError) as exc:
            file_.download(target="test.dat.tmp")
    
        file_.download(target="test.dat.tmp", check_hash=False)
        assert file_.checksum.lower() == checksum1
        checksum2 = db.File._get_checksum("test.dat.tmp").lower()
        file_.checksum = checksum2
        assert checksum2 != checksum1
    
        # old checksum is still stored
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum1
    
        # here the client may validate the new file
        file_.file = None
        file_.size = None
        file_.update()
        assert checksum2 == file_.checksum.lower()
    
        # new checksum is stored
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum2
    
        # no ConsistencyError anymore
        file_.download(target="test.dat.tmp")
        assert checksum2 == file_.checksum.lower()
    
        # new checksum is still stored
        qfile = execute_query("FIND FILE TestConsistency3", unique=True)
        assert qfile.id == file_.id
        assert qfile.checksum.lower() == checksum2