Select Git revision
test_file.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_file.py 23.53 KiB
# encoding: utf-8
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2019 IndiScale GmbH (info@indiscale.com)
# Copyright (C) 2019 Daniel Hornung (d.hornung@indiscale.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""Created on 18.09.2013.
@author: Timm Fitschen
"""
import os
import shutil
from random import randint
from sys import maxsize as maxint
from lxml import etree
from pytest import raises
from nose.tools import (assert_equal, assert_false, # @UnresolvedImport
assert_is_not_none, assert_raises, assert_true,
nottest)
from caosdb import Info
from caosdb import administration as admin
from caosdb import execute_query, get_config, get_connection
from caosdb.common import models
from caosdb.exceptions import EntityError
from caosdb.utils.checkFileSystemConsistency import runCheck
def setup_module():
teardown()
def setup():
teardown()
with open("test.dat", "w") as upload_file:
upload_file.write("hello world\n")
os.makedirs("testfolder/subfolder")
with open("testfolder/test1.dat", "w") as upload_file:
upload_file.write("hello world\n")
with open("testfolder/subfolder/test2.dat", "w") as upload_file:
upload_file.write("hello world\n")
def teardown():
d = execute_query("FIND ENTITY WHICH HAS AN ID >= 100")
if len(d) > 0:
d.delete()
try:
shutil.rmtree("testfolder")
except Exception as e:
print(e)
try:
os.remove("test.dat")
except Exception as e:
print(e)
try:
os.remove("test2.dat")
except Exception as e:
print(e)
def test_file_with_space():
file_ = models.File(name="TestFile",
description="Testfile Desc",
path="testfiles/test file with spaces.dat",
file="test.dat")
file_.insert()
qfile = execute_query("FIND FILE TestFile", unique=True)
assert_equal(qfile.id, file_.id)
assert_equal(qfile.path, "/testfiles/test file with spaces.dat")
qfile.download("test2.dat")
def test_pickup_file():
d = models.DropOffBox()
d.sync()
try:
pickup_file = open(os.path.join(d.path, "testpickup.dat"), "w")
except BaseException:
print("drop off box not on this system.")
else:
pickup_file.write("hello world\n")
pickup_file.close()
file_ = models.File(name="PickupTestfile",
description="Pickup test file desc",
path="testfiles/pickuptestfile.dat" +
hex(randint(0, maxint)),
pickup="testpickup.dat")
file_.insert()
assert_is_not_none(file_.id)
def test_pickup_folder():
# pickup_folder
d = models.DropOffBox()
d.sync()
try:
os.mkdir(d.path + "/testfolder")
except BaseException:
print("drop off box not on this system.")
else:
os.mkdir(d.path + "/testfolder/subfolder")
pickup_file = open(d.path + "/testfolder/testpickup1.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
pickup_file = open(
d.path + "/testfolder/subfolder/testpickup2.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
file_ = models.File(name="PickupTestfolder",
description="Pickup test folder desc",
path="testfiles/pickuptestfolder" +
hex(randint(0, maxint)) + "/",
pickup="testfolder/")
file_.insert()
def test_file4():
try:
d = models.DropOffBox()
d.sync()
try:
pickup_file = open(d.path + "/testpickup1.dat", "w")
except BaseException:
print("drop off box not on this system.")
else:
pickup_file.write("hello world\n")
pickup_file.close()
pickup_file = open(d.path + "/testpickup2.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
file1_ = models.File(
name="Testfile1", description="Testfile Desc",
path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
pickup="testpickup1.dat")
file2_ = models.File(
name="Testfile2", description="Testfile Desc",
path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
pickup="testpickup2.dat")
c = models.Container()
c.extend([file1_, file2_])
c.insert()
assert_is_not_none(file1_.id)
assert_is_not_none(file2_.id)
finally:
try:
c.delete()
except BaseException:
pass
def test_upload_complete_folder():
file1_ = models.File(name="Testfile1",
description="Testfile Desc",
path="testfiles/testfile1.dat" + hex(randint(0,
maxint)),
file="test.dat")
file2_ = models.File(name="Testfile2",
description="Testfile Desc",
path="testfiles/testfile2.dat" + hex(randint(0,
maxint)),
file="test.dat")
c = models.Container()
c.extend([file1_, file2_])
c.insert()
assert_is_not_none(file1_.id)
assert_is_not_none(file2_.id)
c.delete()
c = models.Container()
folder_ = models.File(name="Testfolder",
description="Testfolder Desc",
path="testfiles/testfolder" + hex(randint(0,
maxint)),
file="testfolder")
c.append(folder_)
c.insert()
assert_is_not_none(folder_.id)
c.delete()
def test_file6():
try:
# upload file to testfiles2/testfile...
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = models.File(name="Testfidb.dble",
description="Testfile Desc",
path="testfiles2/testfile" +
hex(randint(0, maxint)) + ".dat",
file="test.dat")
file_.insert()
assert file_.id is not None
assert file_.is_valid()
# pickup_folder
# and try to store it to testfiles2/
d = models.DropOffBox()
d.sync()
path = d.path + "/testfolder"
if not os.path.isdir(d.path):
print("drop off box not on this system.")
else:
os.mkdir(path)
pickup_file = open(path + "/testpickup1.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
os.mkdir(path + "/subfolder")
pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
folder_ = models.File(
name="PickupTestfolder",
description="Pickup test folder desc",
path="testfiles2/",
pickup=path)
with raises(EntityError) as cm:
folder_.insert()
errors = cm.value.entity.get_errors()
assert errors[0].description == 'This target path does already exist.'
finally:
try:
folder_.delete()
except BaseException:
pass
try:
file_.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
def test_file7():
try:
# upload file to testfiles2/testsub/testfile...
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = models.File(name="Testfidb.dble",
description="Testfile Desc",
path="testfiles2/testsub/testfile" +
hex(randint(0, maxint)) + ".dat",
file="test.dat")
file_.insert()
assert_is_not_none(file_.id)
assert_true(file_.is_valid())
# pickup_folder
# and try to store it to testfiles2/
d = models.DropOffBox()
d.sync()
path = d.path + "/testfolder"
if not os.path.isdir(d.path):
print("drop off box not on this system.")
else:
os.mkdir(path)
pickup_file = open(path + "/testpickup1.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
os.mkdir(path + "/subfolder")
pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
folder_ = models.File(
name="PickupTestfolder",
description="Pickup test folder desc",
path="testfiles2/",
pickup="path")
assert_raises(EntityError, folder_.insert)
finally:
try:
folder_.delete()
except BaseException:
pass
try:
file_.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
def test_consistency_file_was_modified():
try:
# insert new test file
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = models.File(name="TestConsistency1",
description="Testfile Desc",
path="debug/test_file_storage_consistency",
file="test.dat")
file_.insert()
assert file_.id is not None
assert file_.is_valid()
# run consistency check (no consistency warning)
c = runCheck(None, None)
assert c.messages["Info", 0] is not None
assert c.messages["Info", 0][0] == "File system is consistent."
# when in debug mode, the server offers a special option
#
# '-c FILE_WAS_MODIFIED'
#
# which simulates a modified file.
c = runCheck(None, "-c FILE_WAS_MODIFIED")
assert c.messages["Error", 0] is not None
assert c.messages["Error",
0][0] == 'debug/test_file_storage_consistency: File was modified.'
# download file again and check if it is still the same (just to be
# sure that the server only simulated the consistency breach
d = open(file_.download(target="test.dat.tmp"), "r")
r = d.read()
assert r == "hello world\n"
# run a passing check again
c = runCheck(None, None)
assert c.messages["Info", 0] is not None
assert c.messages["Info", 0][0] == "File system is consistent."
finally:
# clean up file record
try:
file_.delete()
except BaseException:
pass
# clean up local files
try:
d.close()
except BaseException:
pass
try:
os.remove("test.dat")
except BaseException:
pass
try:
os.remove("test.dat.tmp")
except BaseException:
pass
def test_consistency_file_does_not_exist():
try:
with open("test.dat", "w") as upload_file:
upload_file.write("hello world\n")
file_ = models.File(name="TestConsistency1",
description="Testfile Desc",
path="debug/test_file_storage_consistency",
file="test.dat")
file_.insert()
assert file_.id is not None
assert file_.is_valid()
c = runCheck(None, "/debug/")
assert c.messages["Info", 0] is not None
assert c.messages["Info",
0][0] == "File system below debug/ is consistent."
# when in debug mode, the server offers a special option
#
# '-c FILE_DOES_NOT_EXIST'
#
# which simulates a accidentially removed file.
c = runCheck(None, "-c FILE_DOES_NOT_EXIST")
assert c.messages["Error", 0] is not None
assert c.messages["Error",
0][0] == 'debug/test_file_storage_consistency: File does not exist.'
with open(file_.download(target="test.dat.tmp"), "r") as d:
r = d.read()
assert r == "hello world\n"
c = runCheck(None, None)
assert c.messages["Info", 0] is not None
assert c.messages["Info", 0][0] == "File system is consistent."
finally:
try:
file_.delete()
except BaseException:
pass
try:
d.close()
except BaseException:
pass
try:
os.remove("test.dat")
except BaseException:
pass
try:
os.remove("test.dat.tmp")
except BaseException:
pass
def test_consistency_unknown_file():
c = runCheck(None, None)
assert c.messages["Info", 0] is not None
assert c.messages["Info", 0][0] == "File system is consistent."
# when in debug mode, the server offers a special option
#
# '-c UNKNOWN_FILE'
#
# which simulates an unknown file.
c = runCheck(None, "-c UNKNOWN_FILE")
assert c.messages["Warning", 0] is not None
assert c.messages["Warning", 0][0] == 'debug/: Unknown file.'
c = runCheck(None, None)
assert c.messages["Info", 0] is not None
assert c.messages["Info", 0][0] == "File system is consistent."
def test_insert_files_in_dir_error1():
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "/root"})
# If this fails, it is likely that the server runs with root permissions
# which is discouraged.
assert_is_not_none(c.messages["Error", 0])
assert_true(c.messages["Error", 0][0].startswith(
"Cannot read or enter the desired directory."))
c = models.Container()
c.retrieve(unique=False, raise_exception_on_error=False,
flags={"InsertFilesInDir": "non-existent"})
assert_is_not_none(c.messages["Error", 0])
assert_true(c.messages["Error", 0][0].startswith("No such directory"))
c.insert(raise_exception_on_error=False, flags={
"InsertFilesInDir": "non-existent"})
assert_is_not_none(c.messages["Error", 0])
assert_true(c.messages["Error", 0][0].startswith("No such directory"))
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "."})
assert_is_not_none(c.messages["Error", 0])
assert_true(c.messages["Error", 0][0].startswith("Dir is not allowed"))
def test_insert_files_in_dir_with_symlink():
path = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.local") + "testfolder/"
path_on_server = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.server") + "testfolder/"
try:
# create file in a server-readable directory
os.makedirs(path)
path = os.path.realpath(path) + "/"
os.makedirs(path + "subfolder/")
test_file = open(path + "subfolder/test2.dat", "w")
test_file.write("hello world2\n")
test_file.close()
# create a symlink to the directory where the test_file resides
os.symlink(path_on_server + "subfolder/", path + "linked_subfolder")
# call insertFilesInDir job
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server +
"linked_subfolder"})
assert_is_not_none(c.messages["Warning", 6])
assert_equal(c.messages["Warning", 6][0],
"Directory or file is symbolic link: " + path_on_server +
"linked_subfolder")
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "--force-allow-symlinks " +
path_on_server +
"linked_subfolder"})
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0],
"Files count in linked_subfolder/: 1")
assert_equal(c[0].name, "test2.dat")
assert_equal(c[0].path, "/linked_subfolder/test2.dat")
d = models.Container()
d.insert(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "--force-allow-symlinks " +
path_on_server +
"linked_subfolder"})
assert_is_not_none(c.messages["Info", 0])
assert_equal(d.messages["Info", 0][0],
"Files count in linked_subfolder/: 1")
assert_true(d[0].is_valid())
assert_equal(d[0].name, "test2.dat")
assert_equal(d[0].path, "/linked_subfolder/test2.dat")
finally:
try:
d.delete()
except BaseException:
pass
try:
c.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
def test_insert_files_in_dir():
""" test if files in directories can be inserted as symlinks via the
InsertFilesInDir flag. This tests also verifies that the job can be
executed for the root directory after the child directory and only the new
files are being inserted.
"""
path = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.local") + "testfolder/"
path_on_server = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.server") + "testfolder/"
try:
os.makedirs(path)
os.makedirs(path + "subfolder/")
test_file1 = open(path + "subfolder/test.dat", "w")
test_file1.write("hello world\n")
test_file1.close()
test_file2 = open(path + "subfolder/test2.dat", "w")
test_file2.write("hello world2\n")
test_file2.close()
# DRY-RUN
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server})
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0],
"Files count in testfolder/: 2")
# ACTUAL RUN
d = models.Container()
d.insert(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server})
assert_is_not_none(c.messages["Info", 0])
assert_equal(d.messages["Info", 0][0],
"Files count in testfolder/: 2")
assert_true(d[0].is_valid())
# create a new file and call insertFilesInDir again
test_file3 = open(path + "/test3.dat", "w")
test_file3.write("hello world3\n")
test_file3.close()
e = models.Container()
e.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server})
assert_is_not_none(e.messages["Info", 0])
assert_equal(e.messages["Info", 0][0],
"Files count in testfolder/: 3")
# only the new file is given back...
assert_equal(1, len(e))
assert_false(e[0].is_valid())
assert_equal(e[0].name, "test3.dat")
assert_equal(e[0].path, "/testfolder/test3.dat")
f = models.Container()
f.insert(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server})
assert_is_not_none(f.messages["Info", 0])
assert_equal(f.messages["Info", 0][0],
"Files count in testfolder/: 3")
# only the new file is given back...
assert_equal(1, len(f))
assert_true(f[0].is_valid())
assert_equal(f[0].name, "test3.dat")
assert_equal(f[0].path, "/testfolder/test3.dat")
finally:
try:
f.delete()
except BaseException:
pass
try:
d.delete()
except BaseException:
pass
try:
c.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
def test_insert_files_in_dir_regex():
path = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.local") + "testfolder/"
path_on_server = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.server") + "testfolder/"
try:
os.makedirs(path)
os.makedirs(path + "subfolder/")
test_file1 = open(path + "subfolder/test_dont_insert.dat", "w")
test_file1.write("hello world\n")
test_file1.close()
test_file2 = open(path + "subfolder/test_insert.dat", "w")
test_file2.write("hello world2\n")
test_file2.close()
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "-e dont_insert " +
path_on_server})
assert c.messages["Warning", 2] is not None
assert c.messages["Warning", 2][0] == "Explicitly excluded file: {}".format(
path_on_server + "subfolder/test_dont_insert.dat")
# the other has been inserted
assert len(c) == 1
assert c[0].name == "test_insert.dat"
finally:
try:
c.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
def test_thumbnails():
file_ = models.File(name="TestFile",
description="Testfile Desc",
path="testfiles/thumbnails_test.dat",
file="test.dat", thumbnail="testfolder/test1.dat")
file_.insert()
body = get_connection().retrieve(
entity_uri_segments=[
"FileSystem",
"testfiles"],
reconnect=True).read()
print(body)
xml = etree.fromstring(body)
assert xml.xpath('/Response/dir/file')[0].get(
"thumbnail")[-41:] == "/Thumbnails/testfiles/thumbnails_test.dat"