Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_table_importer.py 11.40 KiB
#!/usr/bin/env python
# encoding: utf-8
#
# Copyright (C) 2020 Henrik tom Wörden
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


import datetime
import os
import unittest
from functools import partial
from tempfile import NamedTemporaryFile

import numpy as np
import pandas as pd
import pytest
from caosadvancedtools.datainconsistency import DataInconsistencyError
from caosadvancedtools.table_importer import (CSVImporter, TableImporter,
                                              TSVImporter, XLSImporter,
                                              assure_name_format,
                                              check_reference_field,
                                              date_converter,
                                              datetime_converter,
                                              incomplete_date_converter,
                                              string_in_list,
                                              win_path_converter,
                                              win_path_list_converter,
                                              yes_no_converter)

from test_utils import BaseMockUpTest


class ConverterTest(unittest.TestCase):
    def test_yes_no(self):
        self.assertTrue(yes_no_converter("YES"))
        self.assertTrue(yes_no_converter("Yes"))
        self.assertTrue(yes_no_converter("yes"))
        self.assertTrue(not yes_no_converter("No"))
        self.assertTrue(not yes_no_converter("no"))
        self.assertRaises(ValueError, yes_no_converter, "nope")
        self.assertRaises(ValueError, yes_no_converter, "FALSE")
        self.assertRaises(ValueError, yes_no_converter, "TRUE")
        self.assertRaises(ValueError, yes_no_converter, "True")
        self.assertRaises(ValueError, yes_no_converter, "true")

    def test_string_in_list(self):
        self.assertEqual("false", string_in_list("false",
                                                 ["FALSE", "TRUE"]))
        self.assertEqual("FALSE", string_in_list("FALSE",
                                                 ["FALSE", "TRUE"], False))
        self.assertRaises(ValueError, string_in_list, "FALSE", [])
        self.assertRaises(ValueError, string_in_list, "FALSE", ["fals"])
        self.assertRaises(ValueError, string_in_list,
                          "FALSE", ["false"], False)

    def test_assure_name_format(self):
        self.assertEqual(assure_name_format("Müstermann, Max"),
                         "Müstermann, Max")
        self.assertRaises(ValueError, assure_name_format, "Max Mustermann")

    def test_winpath(self):
        self.assertRaises(ValueError, win_path_converter, "/hallo/python")
        self.assertEqual(win_path_converter(r"\this\computer"),
                         "/this/computer")
        self.assertEqual(win_path_list_converter(r"\this\computer"),
                         ["/this/computer"])
        self.assertEqual(win_path_list_converter(
            r"\this\computer,\this\computer"),
            ["/this/computer", "/this/computer"])

    def test_datetime(self):
        test_file = os.path.join(os.path.dirname(__file__), "date.xlsx")
        importer = XLSImporter(converters={'d': datetime_converter,
                                           }, obligatory_columns=['d'])

        xls_file = pd.io.excel.ExcelFile(test_file)
        df = xls_file.parse()
        df = importer.read_xls(test_file)
        assert df.shape[0] == 2
        # TODO datatypes are different; fix it
        assert df.d.iloc[0] == datetime.datetime(1980, 12, 31, 13, 24, 23)

    def test_date_xlsx(self):
        """Test with .xlsx in order to check openpyxl engine."""
        test_file = os.path.join(os.path.dirname(__file__), "date.xlsx")
        importer = XLSImporter(converters={'a': date_converter,
                                           'b': date_converter,
                                           'c': partial(date_converter,
                                                        fmt="%d.%m.%y")
                                           }, obligatory_columns=['a'])

        xls_file = pd.io.excel.ExcelFile(test_file)
        df = xls_file.parse()
        df = importer.read_xls(test_file)
        assert df.shape[0] == 2
        assert df.a.iloc[0] == df.b.iloc[0] == df.c.iloc[0]

    def test_date_xls(self):
        """Test with .xls in order to check xlrd engine."""
        test_file = os.path.join(os.path.dirname(__file__), "date.xls")
        importer = XLSImporter(converters={'a': date_converter,
                                           'b': date_converter,
                                           'c': partial(date_converter,
                                                        fmt="%d.%m.%y")
                                           }, obligatory_columns=['a'])

        xls_file = pd.io.excel.ExcelFile(test_file)
        df = xls_file.parse()
        df = importer.read_xls(test_file)
        assert df.shape[0] == 2
        assert df.a.iloc[0] == df.b.iloc[0] == df.c.iloc[0]

    def test_inc_date(self):
        incomplete_date_converter("2020", fmts={"%Y": "%Y"}) == "2020"
        incomplete_date_converter("02/2020",
                                  fmts={"%Y": "%Y", "%Y-%m": "%m/%Y"}
                                  ) == "2020-02"
        incomplete_date_converter("02/02/2020",
                                  fmts={"%Y": "%Y", "%Y-%m": "%m/%Y",
                                        "%Y-%m-%d": "%d/%m/%Y"}
                                  ) == "2020-02-02"
        incomplete_date_converter("2020",
                                  fmts={"%Y": "%Y", "%Y-%m": "%m/%Y",
                                        "%Y-%m-%d": "%d/%m/%Y"}
                                  ) == "2020"
        self.assertRaises(RuntimeError,
                          incomplete_date_converter,
                          "2020e",
                          fmts={"%Y": "%Y"})


class TableImporterTest(unittest.TestCase):
    def setUp(self):
        self.importer_kwargs = dict(
            converters={'c': float, 'd': yes_no_converter, 'x': float},  # x does not exist
            datatypes={'a': str, 'b': int, 'x': int},  # x does not exist
            obligatory_columns=['a', 'b'], unique_keys=[('a', 'b')],
            existing_columns=['e'],
        )
        self.valid_df = pd.DataFrame(
            [['a', 1, 2.0, 'yes', np.nan]], columns=['a', 'b', 'c', 'd', 'e'])

    def test_missing_col(self):
        # check missing from obligatory
        df = pd.DataFrame(columns=['a', 'e'])
        importer = TableImporter(**self.importer_kwargs)
        self.assertRaises(ValueError, importer.check_columns, df)
        # check missing from existing
        df = pd.DataFrame(columns=['a', 'b'])
        importer = TableImporter(**self.importer_kwargs)
        self.assertRaises(ValueError, importer.check_columns, df)
        # check valid
        importer.check_columns(self.valid_df)

    def test_missing_val(self):
        importer = TableImporter(**self.importer_kwargs)
        # check valid
        importer.check_missing(self.valid_df)
        # check invalid
        df = pd.DataFrame([[None, np.nan, 2.0, 'yes'],
                           [None, 1, 2.0, 'yes'],
                           ['a', np.nan, 2.0, 'yes'],
                           ['b', 5, 3.0, 'no']],
                          columns=['a', 'b', 'c', 'd'])
        df_new = importer.check_missing(df)
        self.assertEqual(df_new.shape[0], 1)
        self.assertEqual(df_new.shape[1], 4)
        self.assertEqual(df_new.iloc[0].b, 5)

    def test_wrong_datatype(self):
        importer = TableImporter(**self.importer_kwargs)
        df = pd.DataFrame([[None, np.nan, 2.0, 'yes'],
                           [5, 1, 2.0, 'yes']],
                          columns=['a', 'b', 'c', 'd'])
        self.assertRaises(DataInconsistencyError, importer.check_datatype, df)

    def test_unique(self):
        importer = TableImporter(**self.importer_kwargs)
        importer.check_missing(self.valid_df)
        df = pd.DataFrame([['b', 5, 3.0, 'no'], ['b', 5, 3.0, 'no']],
                          columns=['a', 'b', 'c', 'd'])
        df_new = importer.check_unique(df)
        self.assertEqual(df_new.shape[0], 1)


class XLSImporterTest(TableImporterTest):
    def test_full(self):
        """ test full run with example data """
        tmp = NamedTemporaryFile(delete=False, suffix=".xlsx")
        tmp.close()
        self.valid_df.to_excel(tmp.name)
        importer = XLSImporter(**self.importer_kwargs)
        importer.read_file(tmp.name)

    def test_raise(self):
        importer = XLSImporter(**self.importer_kwargs)
        tmp = NamedTemporaryFile(delete=False, suffix=".lol")
        tmp.close()
        self.assertRaises(DataInconsistencyError, importer.read_xls,
                          tmp.name)

    def test_datatypes(self):
        """Test datataypes in columns."""
        importer = XLSImporter(converters={},
                               obligatory_columns=["float_as_float"],
                               datatypes={
                                   "float_as_float": float,
                                   "int_as_float": float,
                                   "int_as_int": int,
        }
        )
        df = importer.read_xls(os.path.join(
            os.path.dirname(__file__), "data", "datatypes.xlsx"))
        assert np.issubdtype(df.loc[0, "int_as_float"], float)


class CSVImporterTest(TableImporterTest):
    def test_full(self):
        """ test full run with example data """
        tmp = NamedTemporaryFile(delete=False, suffix=".csv")
        tmp.close()
        self.valid_df.to_csv(tmp.name)
        importer = CSVImporter(**self.importer_kwargs)
        importer.read_file(tmp.name)


class TSVImporterTest(TableImporterTest):
    def test_full(self):
        """ test full run with example data """
        tmp = NamedTemporaryFile(delete=False, suffix=".tsv")
        tmp.close()
        self.valid_df.to_csv(tmp.name, sep="\t")
        importer = TSVImporter(**self.importer_kwargs)
        importer.read_file(tmp.name)


class CountQueryNoneConverterTest(BaseMockUpTest):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # simulate that 0 entity exists
        self.entities = (
            '<Response count="0">'
            '<Query string="count record" results="0">'
            '</Query>'
            '</Response>'
        )

    def test_check_reference_field(self):
        self.assertRaises(ValueError, check_reference_field, "1232",  "Max")


class CountQuerySingleConverterTest(BaseMockUpTest):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # simulate that 1 entity exists
        self.entities = (
            '<Response count="1">'
            '<Query string="count record" results="1">'
            '</Query>'
            '</Response>'
        )

    def test_check_reference_field(self):
        self.assertEqual(check_reference_field("1232",  "Max"),
                         "1232")