#!/usr/bin/env python # encoding: utf-8 # # Copyright (C) 2020 Henrik tom Wörden # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. import datetime import os import unittest from functools import partial from tempfile import NamedTemporaryFile import numpy as np import pandas as pd import pytest from caosadvancedtools.datainconsistency import DataInconsistencyError from caosadvancedtools.table_importer import (CSVImporter, TableImporter, TSVImporter, XLSImporter, assure_name_format, check_reference_field, date_converter, datetime_converter, incomplete_date_converter, string_in_list, win_path_converter, win_path_list_converter, yes_no_converter) from test_utils import BaseMockUpTest class ConverterTest(unittest.TestCase): def test_yes_no(self): self.assertTrue(yes_no_converter("YES")) self.assertTrue(yes_no_converter("Yes")) self.assertTrue(yes_no_converter("yes")) self.assertTrue(not yes_no_converter("No")) self.assertTrue(not yes_no_converter("no")) self.assertRaises(ValueError, yes_no_converter, "nope") self.assertRaises(ValueError, yes_no_converter, "FALSE") self.assertRaises(ValueError, yes_no_converter, "TRUE") self.assertRaises(ValueError, yes_no_converter, "True") self.assertRaises(ValueError, yes_no_converter, "true") def test_string_in_list(self): self.assertEqual("false", string_in_list("false", ["FALSE", "TRUE"])) self.assertEqual("FALSE", string_in_list("FALSE", ["FALSE", "TRUE"], False)) self.assertRaises(ValueError, string_in_list, "FALSE", []) self.assertRaises(ValueError, string_in_list, "FALSE", ["fals"]) self.assertRaises(ValueError, string_in_list, "FALSE", ["false"], False) def test_assure_name_format(self): self.assertEqual(assure_name_format("Müstermann, Max"), "Müstermann, Max") self.assertRaises(ValueError, assure_name_format, "Max Mustermann") def test_winpath(self): self.assertRaises(ValueError, win_path_converter, "/hallo/python") self.assertEqual(win_path_converter(r"\this\computer"), "/this/computer") self.assertEqual(win_path_list_converter(r"\this\computer"), ["/this/computer"]) self.assertEqual(win_path_list_converter( r"\this\computer,\this\computer"), ["/this/computer", "/this/computer"]) def test_datetime(self): test_file = os.path.join(os.path.dirname(__file__), "date.xlsx") importer = XLSImporter(converters={'d': datetime_converter, }, obligatory_columns=['d']) xls_file = pd.io.excel.ExcelFile(test_file) df = xls_file.parse() df = importer.read_xls(test_file) assert df.shape[0] == 2 # TODO datatypes are different; fix it assert df.d.iloc[0] == datetime.datetime(1980, 12, 31, 13, 24, 23) def test_date_xlsx(self): """Test with .xlsx in order to check openpyxl engine.""" test_file = os.path.join(os.path.dirname(__file__), "date.xlsx") importer = XLSImporter(converters={'a': date_converter, 'b': date_converter, 'c': partial(date_converter, fmt="%d.%m.%y") }, obligatory_columns=['a']) xls_file = pd.io.excel.ExcelFile(test_file) df = xls_file.parse() df = importer.read_xls(test_file) assert df.shape[0] == 2 assert df.a.iloc[0] == df.b.iloc[0] == df.c.iloc[0] def test_date_xls(self): """Test with .xls in order to check xlrd engine.""" test_file = os.path.join(os.path.dirname(__file__), "date.xls") importer = XLSImporter(converters={'a': date_converter, 'b': date_converter, 'c': partial(date_converter, fmt="%d.%m.%y") }, obligatory_columns=['a']) xls_file = pd.io.excel.ExcelFile(test_file) df = xls_file.parse() df = importer.read_xls(test_file) assert df.shape[0] == 2 assert df.a.iloc[0] == df.b.iloc[0] == df.c.iloc[0] def test_inc_date(self): incomplete_date_converter("2020", fmts={"%Y": "%Y"}) == "2020" incomplete_date_converter("02/2020", fmts={"%Y": "%Y", "%Y-%m": "%m/%Y"} ) == "2020-02" incomplete_date_converter("02/02/2020", fmts={"%Y": "%Y", "%Y-%m": "%m/%Y", "%Y-%m-%d": "%d/%m/%Y"} ) == "2020-02-02" incomplete_date_converter("2020", fmts={"%Y": "%Y", "%Y-%m": "%m/%Y", "%Y-%m-%d": "%d/%m/%Y"} ) == "2020" self.assertRaises(RuntimeError, incomplete_date_converter, "2020e", fmts={"%Y": "%Y"}) class TableImporterTest(unittest.TestCase): def setUp(self): self.importer_kwargs = dict( converters={'c': float, 'd': yes_no_converter, 'x': float}, # x does not exist datatypes={'a': str, 'b': int, 'x': int}, # x does not exist obligatory_columns=['a', 'b'], unique_keys=[('a', 'b')], existing_columns=['e'], ) self.valid_df = pd.DataFrame( [['a', 1, 2.0, 'yes', np.nan]], columns=['a', 'b', 'c', 'd', 'e']) def test_missing_col(self): # check missing from obligatory df = pd.DataFrame(columns=['a', 'e']) importer = TableImporter(**self.importer_kwargs) self.assertRaises(ValueError, importer.check_columns, df) # check missing from existing df = pd.DataFrame(columns=['a', 'b']) importer = TableImporter(**self.importer_kwargs) self.assertRaises(ValueError, importer.check_columns, df) # check valid importer.check_columns(self.valid_df) def test_missing_val(self): importer = TableImporter(**self.importer_kwargs) # check valid importer.check_missing(self.valid_df) # check invalid df = pd.DataFrame([[None, np.nan, 2.0, 'yes'], [None, 1, 2.0, 'yes'], ['a', np.nan, 2.0, 'yes'], ['b', 5, 3.0, 'no']], columns=['a', 'b', 'c', 'd']) df_new = importer.check_missing(df) self.assertEqual(df_new.shape[0], 1) self.assertEqual(df_new.shape[1], 4) self.assertEqual(df_new.iloc[0].b, 5) def test_wrong_datatype(self): importer = TableImporter(**self.importer_kwargs) df = pd.DataFrame([[None, np.nan, 2.0, 'yes'], [5, 1, 2.0, 'yes']], columns=['a', 'b', 'c', 'd']) self.assertRaises(DataInconsistencyError, importer.check_datatype, df) def test_unique(self): importer = TableImporter(**self.importer_kwargs) importer.check_missing(self.valid_df) df = pd.DataFrame([['b', 5, 3.0, 'no'], ['b', 5, 3.0, 'no']], columns=['a', 'b', 'c', 'd']) df_new = importer.check_unique(df) self.assertEqual(df_new.shape[0], 1) class XLSImporterTest(TableImporterTest): def test_full(self): """ test full run with example data """ tmp = NamedTemporaryFile(delete=False, suffix=".xlsx") tmp.close() self.valid_df.to_excel(tmp.name) importer = XLSImporter(**self.importer_kwargs) importer.read_file(tmp.name) def test_raise(self): importer = XLSImporter(**self.importer_kwargs) tmp = NamedTemporaryFile(delete=False, suffix=".lol") tmp.close() self.assertRaises(DataInconsistencyError, importer.read_xls, tmp.name) def test_datatypes(self): """Test datataypes in columns.""" importer = XLSImporter(converters={}, obligatory_columns=["float_as_float"], datatypes={ "float_as_float": float, "int_as_float": float, "int_as_int": int, } ) df = importer.read_xls(os.path.join( os.path.dirname(__file__), "data", "datatypes.xlsx")) assert np.issubdtype(df.loc[0, "int_as_float"], float) class CSVImporterTest(TableImporterTest): def test_full(self): """ test full run with example data """ tmp = NamedTemporaryFile(delete=False, suffix=".csv") tmp.close() self.valid_df.to_csv(tmp.name) importer = CSVImporter(**self.importer_kwargs) importer.read_file(tmp.name) class TSVImporterTest(TableImporterTest): def test_full(self): """ test full run with example data """ tmp = NamedTemporaryFile(delete=False, suffix=".tsv") tmp.close() self.valid_df.to_csv(tmp.name, sep="\t") importer = TSVImporter(**self.importer_kwargs) importer.read_file(tmp.name) class CountQueryNoneConverterTest(BaseMockUpTest): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # simulate that 0 entity exists self.entities = ( '<Response count="0">' '<Query string="count record" results="0">' '</Query>' '</Response>' ) def test_check_reference_field(self): self.assertRaises(ValueError, check_reference_field, "1232", "Max") class CountQuerySingleConverterTest(BaseMockUpTest): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # simulate that 1 entity exists self.entities = ( '<Response count="1">' '<Query string="count record" results="1">' '</Query>' '</Response>' ) def test_check_reference_field(self): self.assertEqual(check_reference_field("1232", "Max"), "1232")