Select Git revision
test_table_converter.py
-
Henrik tom Wörden authoredHenrik tom Wörden authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_file.py 22.65 KiB
# encoding: utf-8
#
# ** header v3.0
# This file is a part of the CaosDB Project.
#
# Copyright (C) 2018 Research Group Biomedical Physics,
# Max-Planck-Institute for Dynamics and Self-Organization Göttingen
# Copyright (C) 2019 IndiScale GmbH (info@indiscale.com)
# Copyright (C) 2019 Daniel Hornung (d.hornung@indiscale.com)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header
#
"""Created on 18.09.2013.
@author: Timm Fitschen
"""
import os
import shutil
from random import randint
from sys import maxsize as maxint
from lxml import etree
from nose.tools import (assert_equal, assert_false, # @UnresolvedImport
assert_is_not_none, assert_raises, assert_true,
nottest, with_setup)
from caosdb import execute_query, get_config, get_connection, Info
from caosdb.common import models
from caosdb.exceptions import EntityError
from caosdb.utils.checkFileSystemConsistency import runCheck
from caosdb import administration as admin
def setup_module():
teardown()
def setup():
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
os.makedirs("testfolder/subfolder")
with open("testfolder/test1.dat", "w") as upload_file:
upload_file.write("hello world\n")
with open("testfolder/subfolder/test2.dat", "w") as upload_file:
upload_file.write("hello world\n")
def teardown():
try:
execute_query("FIND ENTITY WHICH HAS AN ID >= 100").delete()
except Exception as e:
print(e)
try:
shutil.rmtree("testfolder")
except Exception as e:
print(e)
try:
os.remove("test.dat")
except Exception as e:
print(e)
try:
os.remove("test2.dat")
except Exception as e:
print(e)
@with_setup(setup, teardown)
def test_file_with_space():
file_ = models.File(name="TestFile",
description="Testfile Desc",
path="testfiles/test file with spaces.dat",
file="test.dat")
file_.insert()
qfile = execute_query("FIND FILE TestFile", unique=True)
assert_equal(qfile.id, file_.id)
assert_equal(qfile.path, "/testfiles/test file with spaces.dat")
qfile.download("test2.dat")
@with_setup(setup, teardown)
def test_pickup_file():
try:
d = models.DropOffBox()
d.sync()
try:
pickup_file = open(os.path.join(d.path, "testpickup.dat"), "w")
except BaseException:
print("drop off box not on this system.")
else:
pickup_file.write("hello world\n")
pickup_file.close()
file_ = models.File(name="PickupTestfile",
description="Pickup test file desc",
path="testfiles/pickuptestfile.dat" +
hex(randint(0, maxint)),
pickup="testpickup.dat")
file_.insert()
assert_is_not_none(file_.id)
finally:
try:
file_.delete()
except BaseException:
pass
@with_setup(setup, teardown)
def test_pickup_folder():
try:
# pickup_folder
d = models.DropOffBox()
d.sync()
try:
os.mkdir(d.path + "/testfolder")
except BaseException:
print("drop off box not on this system.")
else:
os.mkdir(d.path + "/testfolder/subfolder")
pickup_file = open(d.path + "/testfolder/testpickup1.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
pickup_file = open(
d.path + "/testfolder/subfolder/testpickup2.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
file_ = models.File(name="PickupTestfolder",
description="Pickup test folder desc",
path="testfiles/pickuptestfolder" +
hex(randint(0, maxint)) + "/",
pickup="testfolder/")
file_.insert()
finally:
try:
file_.delete()
except BaseException:
pass
@with_setup(setup, teardown)
def test_file4():
try:
d = models.DropOffBox()
d.sync()
try:
pickup_file = open(d.path + "/testpickup1.dat", "w")
except BaseException:
print("drop off box not on this system.")
else:
pickup_file.write("hello world\n")
pickup_file.close()
pickup_file = open(d.path + "/testpickup2.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
file1_ = models.File(
name="Testfile1", description="Testfile Desc",
path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
pickup="testpickup1.dat")
file2_ = models.File(
name="Testfile2", description="Testfile Desc",
path="testfiles/testfile1.dat" + hex(randint(0, maxint)),
pickup="testpickup2.dat")
c = models.Container()
c.extend([file1_, file2_])
c.insert()
assert_is_not_none(file1_.id)
assert_is_not_none(file2_.id)
finally:
try:
c.delete()
except BaseException:
pass
@with_setup(setup, teardown)
def test_upload_complete_folder():
file1_ = models.File(name="Testfile1",
description="Testfile Desc",
path="testfiles/testfile1.dat" + hex(randint(0,
maxint)),
file="test.dat")
file2_ = models.File(name="Testfile2",
description="Testfile Desc",
path="testfiles/testfile2.dat" + hex(randint(0,
maxint)),
file="test.dat")
c = models.Container()
c.extend([file1_, file2_])
c.insert()
assert_is_not_none(file1_.id)
assert_is_not_none(file2_.id)
c.delete()
c = models.Container()
folder_ = models.File(name="Testfolder",
description="Testfolder Desc",
path="testfiles/testfolder" + hex(randint(0,
maxint)),
file="testfolder")
c.append(folder_)
c.insert()
assert_is_not_none(folder_.id)
c.delete()
@with_setup(setup, teardown)
def test_file6():
try:
# upload file to testfiles2/testfile...
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = models.File(name="Testfidb.dble",
description="Testfile Desc",
path="testfiles2/testfile" +
hex(randint(0, maxint)) + ".dat",
file="test.dat")
file_.insert()
assert_is_not_none(file_.id)
assert_true(file_.is_valid())
# pickup_folder
# and try to store it to testfiles2/
d = models.DropOffBox()
d.sync()
path = d.path + "/testfolder"
if not os.path.isdir(d.path):
print("drop off box not on this system.")
else:
os.mkdir(path)
pickup_file = open(path + "/testpickup1.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
os.mkdir(path + "/subfolder")
pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
folder_ = models.File(
name="PickupTestfolder",
description="Pickup test folder desc",
path="testfiles2/",
pickup="path")
assert_raises(EntityError, folder_.insert)
finally:
try:
folder_.delete()
except BaseException:
pass
try:
file_.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
@with_setup(setup, teardown)
def test_file7():
try:
# upload file to testfiles2/testsub/testfile...
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = models.File(name="Testfidb.dble",
description="Testfile Desc",
path="testfiles2/testsub/testfile" +
hex(randint(0, maxint)) + ".dat",
file="test.dat")
file_.insert()
assert_is_not_none(file_.id)
assert_true(file_.is_valid())
# pickup_folder
# and try to store it to testfiles2/
d = models.DropOffBox()
d.sync()
path = d.path + "/testfolder"
if not os.path.isdir(d.path):
print("drop off box not on this system.")
else:
os.mkdir(path)
pickup_file = open(path + "/testpickup1.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
os.mkdir(path + "/subfolder")
pickup_file = open(path + "/subfolder/testpickup2.dat", "w")
pickup_file.write("hello world\n")
pickup_file.close()
folder_ = models.File(
name="PickupTestfolder",
description="Pickup test folder desc",
path="testfiles2/",
pickup="path")
assert_raises(EntityError, folder_.insert)
finally:
try:
folder_.delete()
except BaseException:
pass
try:
file_.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
@with_setup(setup, teardown)
def test_consistency_file_was_modified():
try:
# insert new test file
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = models.File(name="TestConsistency1",
description="Testfile Desc",
path="debug/test_file_storage_consistency",
file="test.dat")
file_.insert()
assert_is_not_none(file_.id)
assert_true(file_.is_valid())
# run consistency check (no consistency warning)
c = runCheck(None, None)
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0], "File system is consistent.")
# run consistency check with modified warning)
c = runCheck(None, "-c FILE_WAS_MODIFIED")
assert_is_not_none(c.messages["Error", 0])
assert_equal(
c.messages["Error", 0][0],
'debug/test_file_storage_consistency: File was modified.')
# download file again and check if it is still the same (just to be
# sure that the server only simulated the consistency breach
d = open(file_.download(target="test.dat.tmp"), "r")
r = d.read()
assert_equal(r, "hello world\n")
# run a passing check again
c = runCheck(None, None)
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0], "File system is consistent.")
finally:
# clean up file record
try:
file_.delete()
except BaseException:
pass
# clean up local files
try:
d.close()
except BaseException:
pass
try:
os.remove("test.dat")
except BaseException:
pass
try:
os.remove("test.dat.tmp")
except BaseException:
pass
@with_setup(setup, teardown)
def test_consistency_file_does_not_exist():
try:
upload_file = open("test.dat", "w")
upload_file.write("hello world\n")
upload_file.close()
file_ = models.File(name="TestConsistency1",
description="Testfile Desc",
path="debug/test_file_storage_consistency",
file="test.dat")
file_.insert()
assert_is_not_none(file_.id)
assert_true(file_.is_valid())
c = runCheck(None, "/debug/")
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0],
"File system below debug/ is consistent.")
c = runCheck(None, "-c FILE_DOES_NOT_EXIST")
assert_is_not_none(c.messages["Error", 0])
assert_equal(
c.messages["Error", 0][0],
'debug/test_file_storage_consistency: File does not exist.')
d = open(file_.download(target="test.dat.tmp"), "r")
r = d.read()
assert_equal(r, "hello world\n")
c = runCheck(None, None)
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0], "File system is consistent.")
finally:
try:
file_.delete()
except BaseException:
pass
try:
d.close()
except BaseException:
pass
try:
os.remove("test.dat")
except BaseException:
pass
try:
os.remove("test.dat.tmp")
except BaseException:
pass
@with_setup(setup, teardown)
def test_consistency_unknown_file():
c = runCheck(None, None)
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0], "File system is consistent.")
c = runCheck(None, "-c UNKNOWN_FILE")
assert_is_not_none(c.messages["Warning", 0])
assert_equal(c.messages["Warning", 0][0], 'debug/: Unknown file.')
c = runCheck(None, None)
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0], "File system is consistent.")
@with_setup(setup, teardown)
def test_insert_files_in_dir_error1():
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "/root"})
# If this fails, it is likely that the server runs with root permissions
# which is discouraged.
assert_is_not_none(c.messages["Error", 0])
assert_true(c.messages["Error", 0][0].startswith(
"Cannot read or enter the desired directory."))
c = models.Container()
c.retrieve(unique=False, raise_exception_on_error=False,
flags={"InsertFilesInDir": "non-existent"})
assert_is_not_none(c.messages["Error", 0])
assert_true(c.messages["Error", 0][0].startswith("No such directory"))
c.insert(raise_exception_on_error=False, flags={
"InsertFilesInDir": "non-existent"})
assert_is_not_none(c.messages["Error", 0])
assert_true(c.messages["Error", 0][0].startswith("No such directory"))
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "."})
assert_is_not_none(c.messages["Error", 0])
assert_true(c.messages["Error", 0][0].startswith("Dir is not allowed"))
@with_setup(setup, teardown)
def test_insert_files_in_dir_with_symlink():
path = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.local") + "testfolder/"
path_on_server = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.server") + "testfolder/"
try:
# create file in a server-readable directory
os.makedirs(path)
path = os.path.realpath(path) + "/"
os.makedirs(path + "subfolder/")
test_file = open(path + "subfolder/test2.dat", "w")
test_file.write("hello world2\n")
test_file.close()
# create a symlink to the directory where the test_file resides
os.symlink(path_on_server + "subfolder/", path + "linked_subfolder")
# call insertFilesInDir job
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server +
"linked_subfolder"})
assert_is_not_none(c.messages["Warning", 6])
assert_equal(c.messages["Warning", 6][0],
"Directory or file is symbolic link: " + path_on_server +
"linked_subfolder")
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "--force-allow-symlinks " +
path_on_server +
"linked_subfolder"})
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0],
"Files count in linked_subfolder/: 1")
assert_equal(c[0].name, "test2.dat")
assert_equal(c[0].path, "/linked_subfolder/test2.dat")
d = models.Container()
d.insert(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": "--force-allow-symlinks " +
path_on_server +
"linked_subfolder"})
assert_is_not_none(c.messages["Info", 0])
assert_equal(d.messages["Info", 0][0],
"Files count in linked_subfolder/: 1")
assert_true(d[0].is_valid())
assert_equal(d[0].name, "test2.dat")
assert_equal(d[0].path, "/linked_subfolder/test2.dat")
finally:
try:
d.delete()
except BaseException:
pass
try:
c.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
@with_setup(None, teardown)
def test_insert_files_in_dir():
path = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.local") + "testfolder/"
path_on_server = get_config().get("IntegrationTests",
"test_files.test_insert_files_in_dir.server") + "testfolder/"
try:
os.makedirs(path)
os.makedirs(path + "subfolder/")
test_file1 = open(path + "subfolder/test.dat", "w")
test_file1.write("hello world\n")
test_file1.close()
test_file2 = open(path + "subfolder/test2.dat", "w")
test_file2.write("hello world2\n")
test_file2.close()
c = models.Container()
c.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server})
assert_is_not_none(c.messages["Info", 0])
assert_equal(c.messages["Info", 0][0],
"Files count in testfolder/: 2")
#assert_equal(c[0].name, "test.dat")
#assert_equal(c[0].path, "/testfolder/subfolder/test.dat")
#assert_equal(c[1].name, "test2.dat")
#assert_equal(c[1].path, "/testfolder/subfolder/test2.dat")
d = models.Container()
d.insert(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server})
assert_is_not_none(c.messages["Info", 0])
assert_equal(d.messages["Info", 0][0],
"Files count in testfolder/: 2")
assert_true(d[0].is_valid())
#assert_equal(d[0].name, "test.dat")
#assert_equal(d[0].path, "/testfolder/subfolder/test.dat")
#assert_equal(d[1].name, "test2.dat")
#assert_equal(d[1].path, "/testfolder/subfolder/test2.dat")
# create a new file and call insertFilesInDir again
test_file3 = open(path + "/test3.dat", "w")
test_file3.write("hello world3\n")
test_file3.close()
e = models.Container()
e.retrieve(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server})
assert_is_not_none(e.messages["Info", 0])
assert_equal(e.messages["Info", 0][0],
"Files count in testfolder/: 3")
# only the new file is given back...
assert_equal(1, len(e))
assert_false(e[0].is_valid())
assert_equal(e[0].name, "test3.dat")
assert_equal(e[0].path, "/testfolder/test3.dat")
f = models.Container()
f.insert(
unique=False,
raise_exception_on_error=False,
flags={
"InsertFilesInDir": path_on_server})
assert_is_not_none(f.messages["Info", 0])
assert_equal(f.messages["Info", 0][0],
"Files count in testfolder/: 3")
# only the new file is given back...
assert_equal(1, len(f))
assert_true(f[0].is_valid())
assert_equal(f[0].name, "test3.dat")
assert_equal(f[0].path, "/testfolder/test3.dat")
finally:
try:
f.delete()
except BaseException:
pass
try:
d.delete()
except BaseException:
pass
try:
c.delete()
except BaseException:
pass
try:
shutil.rmtree(path)
except BaseException:
pass
@with_setup(setup, teardown)
def test_thumbnails():
file_ = models.File(name="TestFile",
description="Testfile Desc",
path="testfiles/thumbnails_test.dat",
file="test.dat", thumbnail="testfolder/test1.dat")
file_.insert()
body = get_connection().retrieve(
entity_uri_segments=[
"FileSystem",
"testfiles"],
reconnect=True).read()
print(body)
xml = etree.fromstring(body)
assert xml.xpath('/Response')
assert xml.xpath('/Response/dir/file')
# TODO find a better way to check this
assert_equal(xml[1][0].get("thumbnail")[-41:],
"/Thumbnails/testfiles/thumbnails_test.dat")