Skip to content
Snippets Groups Projects
Select Git revision
  • 0305597404332f480cc7addde93f0e9da36ac6d3
  • main default protected
  • f-xlsx-list-file
  • f-yaml-parser-enums
  • dev protected
  • f-fix-paths
  • f-fix-validate-to-dict
  • f-labfolder-converter
  • f-state-machine-script
  • f-xlsx-converter-warnings-errors
  • f-rename
  • f-extra-deps
  • f-more-jsonschema-export
  • f-henrik
  • f-fix-89
  • f-trigger-advanced-user-tools
  • f-real-rename-test
  • f-linkahead-rename
  • f-register-integrationtests
  • f-fix-id
  • f-h5-files
  • v0.14.0
  • v0.13.0
  • v0.12.0
  • v0.11.0
  • v0.10.0-numpy2
  • v0.10.0
  • v0.9.0
  • v0.8.0
  • v0.7.0
  • v0.6.1
  • v0.6.0
  • v0.5.0
  • v0.4.1
  • v0.4.0
  • v0.3.1
  • v0.3.0
37 results

test_table_importer.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_table_importer.py 10.79 KiB
    #!/usr/bin/env python
    # encoding: utf-8
    #
    # Copyright (C) 2020 Henrik tom Wörden
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU Affero General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU Affero General Public License for more details.
    #
    # You should have received a copy of the GNU Affero General Public License
    # along with this program. If not, see <https://www.gnu.org/licenses/>.
    
    
    import datetime
    import os
    import unittest
    from functools import partial
    from tempfile import NamedTemporaryFile
    
    import caosdb as db
    import numpy as np
    import pandas as pd
    import pytest
    from caosadvancedtools.datainconsistency import DataInconsistencyError
    from caosadvancedtools.table_importer import (CSVImporter, TableImporter,
                                                  TSVImporter, XLSImporter,
                                                  assure_name_format,
                                                  check_reference_field,
                                                  date_converter,
                                                  datetime_converter,
                                                  incomplete_date_converter,
                                                  string_in_list,
                                                  win_path_converter,
                                                  win_path_list_converter,
                                                  yes_no_converter)
    
    from test_utils import BaseMockUpTest
    
    
    class ConverterTest(unittest.TestCase):
        def test_yes_no(self):
            self.assertTrue(yes_no_converter("YES"))
            self.assertTrue(yes_no_converter("Yes"))
            self.assertTrue(yes_no_converter("yes"))
            self.assertTrue(not yes_no_converter("No"))
            self.assertTrue(not yes_no_converter("no"))
            self.assertRaises(ValueError, yes_no_converter, "nope")
            self.assertRaises(ValueError, yes_no_converter, "FALSE")
            self.assertRaises(ValueError, yes_no_converter, "TRUE")
            self.assertRaises(ValueError, yes_no_converter, "True")
            self.assertRaises(ValueError, yes_no_converter, "true")
    
        def test_string_in_list(self):
            self.assertEqual("false", string_in_list("false",
                                                     ["FALSE", "TRUE"]))
            self.assertEqual("FALSE", string_in_list("FALSE",
                                                     ["FALSE", "TRUE"], False))
            self.assertRaises(ValueError, string_in_list, "FALSE", [])
            self.assertRaises(ValueError, string_in_list, "FALSE", ["fals"])
            self.assertRaises(ValueError, string_in_list,
                              "FALSE", ["false"], False)
    
        def test_assure_name_format(self):
            self.assertEqual(assure_name_format("Müstermann, Max"),
                             "Müstermann, Max")
            self.assertRaises(ValueError, assure_name_format, "Max Mustermann")
    
        def test_winpath(self):
            self.assertRaises(ValueError, win_path_converter, "/hallo/python")
            self.assertEqual(win_path_converter(r"\this\computer"),
                             "/this/computer")
            self.assertEqual(win_path_list_converter(r"\this\computer"),
                             ["/this/computer"])
            self.assertEqual(win_path_list_converter(
                r"\this\computer,\this\computer"),
                ["/this/computer", "/this/computer"])
    
        @pytest.mark.xfail(reason="To be fixed, see Issue #34")
        def test_datetime(self):
            test_file = os.path.join(os.path.dirname(__file__), "date.xlsx")
            importer = XLSImporter(converters={'d': datetime_converter,
                                               }, obligatory_columns=['d'])
    
            xls_file = pd.io.excel.ExcelFile(test_file)
            df = xls_file.parse()
            df = importer.read_xls(test_file)
            assert df.shape[0] == 2
            # TODO datatypes are different; fix it
            assert df.d.iloc[0] == datetime.datetime(1980, 12, 31, 13, 24, 23)
    
        def test_date_xlsx(self):
            """Test with .xlsx in order to check openpyxl engine."""
            test_file = os.path.join(os.path.dirname(__file__), "date.xlsx")
            importer = XLSImporter(converters={'a': date_converter,
                                               'b': date_converter,
                                               'c': partial(date_converter,
                                                            fmt="%d.%m.%y")
                                               }, obligatory_columns=['a'])
    
            xls_file = pd.io.excel.ExcelFile(test_file)
            df = xls_file.parse()
            df = importer.read_xls(test_file)
            assert df.shape[0] == 2
            assert df.a.iloc[0] == df.b.iloc[0] == df.c.iloc[0]
    
        def test_date_xls(self):
            """Test with .xls in order to check xlrd engine."""
            test_file = os.path.join(os.path.dirname(__file__), "date.xls")
            importer = XLSImporter(converters={'a': date_converter,
                                               'b': date_converter,
                                               'c': partial(date_converter,
                                                            fmt="%d.%m.%y")
                                               }, obligatory_columns=['a'])
    
            xls_file = pd.io.excel.ExcelFile(test_file)
            df = xls_file.parse()
            df = importer.read_xls(test_file)
            assert df.shape[0] == 2
            assert df.a.iloc[0] == df.b.iloc[0] == df.c.iloc[0]
    
        def test_inc_date(self):
            incomplete_date_converter("2020", fmts={"%Y": "%Y"}) == "2020"
            incomplete_date_converter("02/2020",
                                      fmts={"%Y": "%Y", "%Y-%m": "%m/%Y"}
                                      ) == "2020-02"
            incomplete_date_converter("02/02/2020",
                                      fmts={"%Y": "%Y", "%Y-%m": "%m/%Y",
                                            "%Y-%m-%d": "%d/%m/%Y"}
                                      ) == "2020-02-02"
            incomplete_date_converter("2020",
                                      fmts={"%Y": "%Y", "%Y-%m": "%m/%Y",
                                            "%Y-%m-%d": "%d/%m/%Y"}
                                      ) == "2020"
            self.assertRaises(RuntimeError,
                              incomplete_date_converter,
                              "2020e",
                              fmts={"%Y": "%Y"})
    
    
    class TableImporterTest(unittest.TestCase):
        def setUp(self):
            self.importer_kwargs = dict(
                converters={'c': float, 'd': yes_no_converter},
                datatypes={'a': str, 'b': int},
                obligatory_columns=['a', 'b'], unique_keys=[('a', 'b')])
            self.valid_df = pd.DataFrame(
                [['a', 1, 2.0, 'yes']], columns=['a', 'b', 'c', 'd'])
    
        def test_missing_col(self):
            # check missing from converters
            df = pd.DataFrame(columns=['a', 'b', 'c'])
            importer = TableImporter(**self.importer_kwargs)
            self.assertRaises(ValueError, importer.check_columns, df)
            # check missing from datatypes
            df = pd.DataFrame(columns=['a', 'd', 'c'])
            importer = TableImporter(**self.importer_kwargs)
            self.assertRaises(ValueError, importer.check_columns, df)
            # check valid
            importer.check_columns(self.valid_df)
    
        def test_missing_val(self):
            importer = TableImporter(**self.importer_kwargs)
            # check valid
            importer.check_missing(self.valid_df)
            # check invalid
            df = pd.DataFrame([[None, np.nan, 2.0, 'yes'],
                               [None, 1, 2.0, 'yes'],
                               ['a', np.nan, 2.0, 'yes'],
                               ['b', 5, 3.0, 'no']],
                              columns=['a', 'b', 'c', 'd'])
            df_new = importer.check_missing(df)
            self.assertEqual(df_new.shape[0], 1)
            self.assertEqual(df_new.shape[1], 4)
            self.assertEqual(df_new.iloc[0].b, 5)
    
        def test_wrong_datatype(self):
            importer = TableImporter(**self.importer_kwargs)
            df = pd.DataFrame([[None, np.nan, 2.0, 'yes'],
                               [5, 1, 2.0, 'yes']],
                              columns=['a', 'b', 'c', 'd'])
            self.assertRaises(DataInconsistencyError, importer.check_datatype, df)
    
        def test_unique(self):
            importer = TableImporter(**self.importer_kwargs)
            importer.check_missing(self.valid_df)
            df = pd.DataFrame([['b', 5, 3.0, 'no'], ['b', 5, 3.0, 'no']],
                              columns=['a', 'b', 'c', 'd'])
            df_new = importer.check_unique(df)
            self.assertEqual(df_new.shape[0], 1)
    
    
    class XLSImporterTest(TableImporterTest):
        def test_full(self):
            """ test full run with example data """
            tmp = NamedTemporaryFile(delete=False, suffix=".xlsx")
            tmp.close()
            self.valid_df.to_excel(tmp.name)
            importer = XLSImporter(**self.importer_kwargs)
            importer.read_file(tmp.name)
    
        def test_raise(self):
            importer = XLSImporter(**self.importer_kwargs)
            tmp = NamedTemporaryFile(delete=False, suffix=".lol")
            tmp.close()
            self.assertRaises(DataInconsistencyError, importer.read_xls,
                              tmp.name)
    
    
    class CSVImporterTest(TableImporterTest):
        def test_full(self):
            """ test full run with example data """
            tmp = NamedTemporaryFile(delete=False, suffix=".csv")
            tmp.close()
            self.valid_df.to_csv(tmp.name)
            importer = CSVImporter(**self.importer_kwargs)
            importer.read_file(tmp.name)
    
    
    class TSVImporterTest(TableImporterTest):
        def test_full(self):
            """ test full run with example data """
            tmp = NamedTemporaryFile(delete=False, suffix=".tsv")
            tmp.close()
            self.valid_df.to_csv(tmp.name, sep="\t")
            importer = TSVImporter(**self.importer_kwargs)
            importer.read_file(tmp.name)
    
    
    class CountQueryNoneConverterTest(BaseMockUpTest):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            # simulate that 0 entity exists
            self.entities = (
                '<Response count="0">'
                '<Query string="count record" results="0">'
                '</Query>'
                '</Response>'
                )
    
        def test_check_reference_field(self):
            self.assertRaises(ValueError, check_reference_field, "1232",  "Max")
    
    
    class CountQuerySingleConverterTest(BaseMockUpTest):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            # simulate that 1 entity exists
            self.entities = (
                '<Response count="1">'
                '<Query string="count record" results="1">'
                '</Query>'
                '</Response>'
                )
    
        def test_check_reference_field(self):
            self.assertEqual(check_reference_field("1232",  "Max"),
                             "1232")