Select Git revision
insertEntity.sql
-
Daniel Hornung authoredDaniel Hornung authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_table_importer.py 10.79 KiB
#!/usr/bin/env python
# encoding: utf-8
#
# Copyright (C) 2020 Henrik tom Wörden
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import datetime
import os
import unittest
from functools import partial
from tempfile import NamedTemporaryFile
import caosdb as db
import numpy as np
import pandas as pd
import pytest
from caosadvancedtools.datainconsistency import DataInconsistencyError
from caosadvancedtools.table_importer import (CSVImporter, TableImporter,
TSVImporter, XLSImporter,
assure_name_format,
check_reference_field,
date_converter,
datetime_converter,
incomplete_date_converter,
string_in_list,
win_path_converter,
win_path_list_converter,
yes_no_converter)
from test_utils import BaseMockUpTest
class ConverterTest(unittest.TestCase):
def test_yes_no(self):
self.assertTrue(yes_no_converter("YES"))
self.assertTrue(yes_no_converter("Yes"))
self.assertTrue(yes_no_converter("yes"))
self.assertTrue(not yes_no_converter("No"))
self.assertTrue(not yes_no_converter("no"))
self.assertRaises(ValueError, yes_no_converter, "nope")
self.assertRaises(ValueError, yes_no_converter, "FALSE")
self.assertRaises(ValueError, yes_no_converter, "TRUE")
self.assertRaises(ValueError, yes_no_converter, "True")
self.assertRaises(ValueError, yes_no_converter, "true")
def test_string_in_list(self):
self.assertEqual("false", string_in_list("false",
["FALSE", "TRUE"]))
self.assertEqual("FALSE", string_in_list("FALSE",
["FALSE", "TRUE"], False))
self.assertRaises(ValueError, string_in_list, "FALSE", [])
self.assertRaises(ValueError, string_in_list, "FALSE", ["fals"])
self.assertRaises(ValueError, string_in_list,
"FALSE", ["false"], False)
def test_assure_name_format(self):
self.assertEqual(assure_name_format("Müstermann, Max"),
"Müstermann, Max")
self.assertRaises(ValueError, assure_name_format, "Max Mustermann")
def test_winpath(self):
self.assertRaises(ValueError, win_path_converter, "/hallo/python")
self.assertEqual(win_path_converter(r"\this\computer"),
"/this/computer")
self.assertEqual(win_path_list_converter(r"\this\computer"),
["/this/computer"])
self.assertEqual(win_path_list_converter(
r"\this\computer,\this\computer"),
["/this/computer", "/this/computer"])
@pytest.mark.xfail(reason="To be fixed, see Issue #34")
def test_datetime(self):
test_file = os.path.join(os.path.dirname(__file__), "date.xlsx")
importer = XLSImporter(converters={'d': datetime_converter,
}, obligatory_columns=['d'])
xls_file = pd.io.excel.ExcelFile(test_file)
df = xls_file.parse()
df = importer.read_xls(test_file)
assert df.shape[0] == 2
# TODO datatypes are different; fix it
assert df.d.iloc[0] == datetime.datetime(1980, 12, 31, 13, 24, 23)
def test_date_xlsx(self):
"""Test with .xlsx in order to check openpyxl engine."""
test_file = os.path.join(os.path.dirname(__file__), "date.xlsx")
importer = XLSImporter(converters={'a': date_converter,
'b': date_converter,
'c': partial(date_converter,
fmt="%d.%m.%y")
}, obligatory_columns=['a'])
xls_file = pd.io.excel.ExcelFile(test_file)
df = xls_file.parse()
df = importer.read_xls(test_file)
assert df.shape[0] == 2
assert df.a.iloc[0] == df.b.iloc[0] == df.c.iloc[0]
def test_date_xls(self):
"""Test with .xls in order to check xlrd engine."""
test_file = os.path.join(os.path.dirname(__file__), "date.xls")
importer = XLSImporter(converters={'a': date_converter,
'b': date_converter,
'c': partial(date_converter,
fmt="%d.%m.%y")
}, obligatory_columns=['a'])
xls_file = pd.io.excel.ExcelFile(test_file)
df = xls_file.parse()
df = importer.read_xls(test_file)
assert df.shape[0] == 2
assert df.a.iloc[0] == df.b.iloc[0] == df.c.iloc[0]
def test_inc_date(self):
incomplete_date_converter("2020", fmts={"%Y": "%Y"}) == "2020"
incomplete_date_converter("02/2020",
fmts={"%Y": "%Y", "%Y-%m": "%m/%Y"}
) == "2020-02"
incomplete_date_converter("02/02/2020",
fmts={"%Y": "%Y", "%Y-%m": "%m/%Y",
"%Y-%m-%d": "%d/%m/%Y"}
) == "2020-02-02"
incomplete_date_converter("2020",
fmts={"%Y": "%Y", "%Y-%m": "%m/%Y",
"%Y-%m-%d": "%d/%m/%Y"}
) == "2020"
self.assertRaises(RuntimeError,
incomplete_date_converter,
"2020e",
fmts={"%Y": "%Y"})
class TableImporterTest(unittest.TestCase):
def setUp(self):
self.importer_kwargs = dict(
converters={'c': float, 'd': yes_no_converter},
datatypes={'a': str, 'b': int},
obligatory_columns=['a', 'b'], unique_keys=[('a', 'b')])
self.valid_df = pd.DataFrame(
[['a', 1, 2.0, 'yes']], columns=['a', 'b', 'c', 'd'])
def test_missing_col(self):
# check missing from converters
df = pd.DataFrame(columns=['a', 'b', 'c'])
importer = TableImporter(**self.importer_kwargs)
self.assertRaises(ValueError, importer.check_columns, df)
# check missing from datatypes
df = pd.DataFrame(columns=['a', 'd', 'c'])
importer = TableImporter(**self.importer_kwargs)
self.assertRaises(ValueError, importer.check_columns, df)
# check valid
importer.check_columns(self.valid_df)
def test_missing_val(self):
importer = TableImporter(**self.importer_kwargs)
# check valid
importer.check_missing(self.valid_df)
# check invalid
df = pd.DataFrame([[None, np.nan, 2.0, 'yes'],
[None, 1, 2.0, 'yes'],
['a', np.nan, 2.0, 'yes'],
['b', 5, 3.0, 'no']],
columns=['a', 'b', 'c', 'd'])
df_new = importer.check_missing(df)
self.assertEqual(df_new.shape[0], 1)
self.assertEqual(df_new.shape[1], 4)
self.assertEqual(df_new.iloc[0].b, 5)
def test_wrong_datatype(self):
importer = TableImporter(**self.importer_kwargs)
df = pd.DataFrame([[None, np.nan, 2.0, 'yes'],
[5, 1, 2.0, 'yes']],
columns=['a', 'b', 'c', 'd'])
self.assertRaises(DataInconsistencyError, importer.check_datatype, df)
def test_unique(self):
importer = TableImporter(**self.importer_kwargs)
importer.check_missing(self.valid_df)
df = pd.DataFrame([['b', 5, 3.0, 'no'], ['b', 5, 3.0, 'no']],
columns=['a', 'b', 'c', 'd'])
df_new = importer.check_unique(df)
self.assertEqual(df_new.shape[0], 1)
class XLSImporterTest(TableImporterTest):
def test_full(self):
""" test full run with example data """
tmp = NamedTemporaryFile(delete=False, suffix=".xlsx")
tmp.close()
self.valid_df.to_excel(tmp.name)
importer = XLSImporter(**self.importer_kwargs)
importer.read_file(tmp.name)
def test_raise(self):
importer = XLSImporter(**self.importer_kwargs)
tmp = NamedTemporaryFile(delete=False, suffix=".lol")
tmp.close()
self.assertRaises(DataInconsistencyError, importer.read_xls,
tmp.name)
class CSVImporterTest(TableImporterTest):
def test_full(self):
""" test full run with example data """
tmp = NamedTemporaryFile(delete=False, suffix=".csv")
tmp.close()
self.valid_df.to_csv(tmp.name)
importer = CSVImporter(**self.importer_kwargs)
importer.read_file(tmp.name)
class TSVImporterTest(TableImporterTest):
def test_full(self):
""" test full run with example data """
tmp = NamedTemporaryFile(delete=False, suffix=".tsv")
tmp.close()
self.valid_df.to_csv(tmp.name, sep="\t")
importer = TSVImporter(**self.importer_kwargs)
importer.read_file(tmp.name)
class CountQueryNoneConverterTest(BaseMockUpTest):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# simulate that 0 entity exists
self.entities = (
'<Response count="0">'
'<Query string="count record" results="0">'
'</Query>'
'</Response>'
)
def test_check_reference_field(self):
self.assertRaises(ValueError, check_reference_field, "1232", "Max")
class CountQuerySingleConverterTest(BaseMockUpTest):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# simulate that 1 entity exists
self.entities = (
'<Response count="1">'
'<Query string="count record" results="1">'
'</Query>'
'</Response>'
)
def test_check_reference_field(self):
self.assertEqual(check_reference_field("1232", "Max"),
"1232")