Skip to content
Snippets Groups Projects
test_suppressKnown.py 4.32 KiB
Newer Older
#!/usr/bin/env python
# encoding: utf-8
#
# ** header v3.0
# This file is a part of the LinkAhead project.
#
# Copyright (C) 2020 IndiScale GmbH
# Copyright (C) 2020 Henrik tom Wörden
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# ** end header

import logging
import os
import unittest
from tempfile import NamedTemporaryFile, gettempdir
from caosadvancedtools.suppressKnown import SuppressKnown


class Record(object):
    identifier = "5"
    category = "test"

    def getMessage(self):
        return "hi"


class SupTestBasic(unittest.TestCase):
    def setUp(self):
        self.db_file = os.path.join(gettempdir(), "test_suppress_msg_db_file_basic.db")
        self.basic = SuppressKnown(db_file=self.db_file)

    def test_msg(self):
        # testing basic setup
        self.basic.filter(r)
        digest = self.basic.hash("hi", 5)
        assert self.basic.was_tagged(digest)
        self.basic.filter(r)

    def tearDown(self):


class SupTestAdvanced(SupTestBasic):
    def setUp(self):
        self.db_file = os.path.join(gettempdir(), "test_suppress_msg_db_file_advanced.db")
        self.basic = SuppressKnown(db_file=self.db_file)

    def test_logger(self):
        """
        The logging output is directed to a file which is then checked whether
        the output is as expected.
        """
        logfile = NamedTemporaryFile(delete=False, mode="w")
        logger = logging.getLogger()
        logger.addHandler(logging.FileHandler(logfile.name))
        logger.setLevel(logging.DEBUG)
        sup = SuppressKnown(db_file=self.db_file)
        logger.addFilter(sup)
        logger.info("hi", extra={"identifier": "5", 'category': "test"})
        with open(logfile.name) as lf:
            log = lf.read()
            # assert that the log was written
            assert "hi" in log
            # there should be one line so far
            assert log.count("\n") == 1

        # the following is unchanged and should be suppressed
        logger.info("hi", extra={"identifier": "5", 'category': "test"})
        with open(logfile.name) as lf:
            log = lf.read()
            # one line with one hi
            assert log.count("hi") == 1
            assert log.count("\n") == 1

        # the following is a new message and should thus not be suppressed
        logger.info("ha", extra={"identifier": "5", 'category': "new"})
        with open(logfile.name) as lf:
            log = lf.read()
            assert log.count("ha") == 1
            assert log.count("\n") == 2

        # the following has a identifier and should thus not be suppressed
        logger.info("hi", extra={"identifier": "6", 'category': "test"})
        with open(logfile.name) as lf:
            log = lf.read()
            assert log.count("hi") == 2
            assert log.count("\n") == 3

        # the following should be suppressed again
        logger.info("ha", extra={"identifier": "5", 'category': "new"})
        with open(logfile.name) as lf:
            log = lf.read()
            assert log.count("ha") == 1
            assert log.count("hi") == 2
            assert log.count("\n") == 3

        # resetting test category; hi should be removed
        sup.reset("test")

        # hi should not be suppressed
        logger.info("hi", extra={"identifier": "5", 'category': "test"})
        with open(logfile.name) as lf:
            log = lf.read()
            assert log.count("hi") == 3
            assert log.count("\n") == 4

        # the following should be suppressed still
        logger.info("ha", extra={"identifier": "5", 'category': "new"})
        with open(logfile.name) as lf:
            log = lf.read()
            assert log.count("ha") == 1
            assert log.count("hi") == 3
            assert log.count("\n") == 4