Private
Server IP : 195.201.23.43  /  Your IP : 3.140.184.21
Web Server : Apache
System : Linux webserver2.vercom.be 5.4.0-192-generic #212-Ubuntu SMP Fri Jul 5 09:47:39 UTC 2024 x86_64
User : kdecoratie ( 1041)
PHP Version : 7.1.33-63+ubuntu20.04.1+deb.sury.org+1
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/certbot/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/certbot/plugins/common_test.py
"""Tests for certbot.plugins.common."""
import functools
import shutil
import unittest
import warnings

import josepy as jose
import mock

from acme import challenges

from certbot import achallenges
from certbot import crypto_util
from certbot import errors
from certbot.compat import os
from certbot.compat import filesystem
from certbot.tests import acme_util
from certbot.tests import util as test_util

AUTH_KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
ACHALL = achallenges.KeyAuthorizationAnnotatedChallenge(
            challb=acme_util.chall_to_challb(challenges.HTTP01(token=b'token1'),
                                             "pending"),
            domain="encryption-example.demo", account_key=AUTH_KEY)

class NamespaceFunctionsTest(unittest.TestCase):
    """Tests for certbot.plugins.common.*_namespace functions."""

    def test_option_namespace(self):
        from certbot.plugins.common import option_namespace
        self.assertEqual("foo-", option_namespace("foo"))

    def test_dest_namespace(self):
        from certbot.plugins.common import dest_namespace
        self.assertEqual("foo_", dest_namespace("foo"))

    def test_dest_namespace_with_dashes(self):
        from certbot.plugins.common import dest_namespace
        self.assertEqual("foo_bar_", dest_namespace("foo-bar"))


class PluginTest(unittest.TestCase):
    """Test for certbot.plugins.common.Plugin."""

    def setUp(self):
        from certbot.plugins.common import Plugin

        class MockPlugin(Plugin):  # pylint: disable=missing-docstring
            @classmethod
            def add_parser_arguments(cls, add):
                add("foo-bar", dest="different_to_foo_bar", x=1, y=None)

        self.plugin_cls = MockPlugin
        self.config = mock.MagicMock()
        self.plugin = MockPlugin(config=self.config, name="mock")

    def test_init(self):
        self.assertEqual("mock", self.plugin.name)
        self.assertEqual(self.config, self.plugin.config)

    def test_option_namespace(self):
        self.assertEqual("mock-", self.plugin.option_namespace)

    def test_option_name(self):
        self.assertEqual("mock-foo_bar", self.plugin.option_name("foo_bar"))

    def test_dest_namespace(self):
        self.assertEqual("mock_", self.plugin.dest_namespace)

    def test_dest(self):
        self.assertEqual("mock_foo_bar", self.plugin.dest("foo-bar"))
        self.assertEqual("mock_foo_bar", self.plugin.dest("foo_bar"))

    def test_conf(self):
        self.assertEqual(self.config.mock_foo_bar, self.plugin.conf("foo-bar"))

    def test_inject_parser_options(self):
        parser = mock.MagicMock()
        self.plugin_cls.inject_parser_options(parser, "mock")
        # note that inject_parser_options doesn't check if dest has
        # correct prefix
        parser.add_argument.assert_called_once_with(
            "--mock-foo-bar", dest="different_to_foo_bar", x=1, y=None)


class InstallerTest(test_util.ConfigTestCase):
    """Tests for certbot.plugins.common.Installer."""

    def setUp(self):
        super(InstallerTest, self).setUp()
        filesystem.mkdir(self.config.config_dir)
        from certbot.plugins.common import Installer

        self.installer = Installer(config=self.config,
                                   name="Installer")
        self.reverter = self.installer.reverter

    def test_add_to_real_checkpoint(self):
        files = set(("foo.bar", "baz.qux",))
        save_notes = "foo bar baz qux"
        self._test_wrapped_method("add_to_checkpoint", files, save_notes)

    def test_add_to_real_checkpoint2(self):
        self._test_add_to_checkpoint_common(False)

    def test_add_to_temporary_checkpoint(self):
        self._test_add_to_checkpoint_common(True)

    def _test_add_to_checkpoint_common(self, temporary):
        files = set(("foo.bar", "baz.qux",))
        save_notes = "foo bar baz qux"

        installer_func = functools.partial(self.installer.add_to_checkpoint,
                                           temporary=temporary)

        if temporary:
            reverter_func_name = "add_to_temp_checkpoint"
        else:
            reverter_func_name = "add_to_checkpoint"

        self._test_adapted_method(installer_func, reverter_func_name, files, save_notes)

    def test_finalize_checkpoint(self):
        self._test_wrapped_method("finalize_checkpoint", "foo")

    def test_recovery_routine(self):
        self._test_wrapped_method("recovery_routine")

    def test_revert_temporary_config(self):
        self._test_wrapped_method("revert_temporary_config")

    def test_rollback_checkpoints(self):
        self._test_wrapped_method("rollback_checkpoints", 42)

    def test_view_config_changes(self):
        self._test_wrapped_method("view_config_changes")

    def test_view_config_changes_warning_supression(self):
        with warnings.catch_warnings():
            # Without the catch_warnings() code in
            # common.Installer.view_config_changes, this would raise an
            # exception. The module parameter here is ".*common$" because the
            # stacklevel=2 parameter of warnings.warn causes the warning to
            # refer to the code in the caller rather than the call to
            # warnings.warn. This means the warning in common.Installer refers
            # to this module and the warning in the reverter refers to the
            # plugins.common module.
            warnings.filterwarnings("error", ".*view_config_changes", module=".*common$")
            self.installer.view_config_changes()

    def _test_wrapped_method(self, name, *args, **kwargs):
        """Test a wrapped reverter method.

        :param str name: name of the method to test
        :param tuple args: position arguments to method
        :param dict kwargs: keyword arguments to method

        """
        installer_func = getattr(self.installer, name)
        self._test_adapted_method(installer_func, name, *args, **kwargs)

    def _test_adapted_method(self, installer_func,
                             reverter_func_name, *passed_args, **passed_kwargs):
        """Test an adapted reverter method

        :param callable installer_func: installer method to test
        :param str reverter_func_name: name of the method on the
            reverter that should be called
        :param tuple passed_args: positional arguments passed from
            installer method to the reverter method
        :param dict passed_kargs: keyword arguments passed from
            installer method to the reverter method

        """
        with mock.patch.object(self.reverter, reverter_func_name) as reverter_func:
            installer_func(*passed_args, **passed_kwargs)
            reverter_func.assert_called_once_with(*passed_args, **passed_kwargs)
            reverter_func.side_effect = errors.ReverterError
            self.assertRaises(
                errors.PluginError, installer_func, *passed_args, **passed_kwargs)

    def test_install_ssl_dhparams(self):
        self.installer.install_ssl_dhparams()
        self.assertTrue(os.path.isfile(self.installer.ssl_dhparams))

    def _current_ssl_dhparams_hash(self):
        from certbot.constants import SSL_DHPARAMS_SRC
        return crypto_util.sha256sum(SSL_DHPARAMS_SRC)

    def test_current_file_hash_in_all_hashes(self):
        from certbot.constants import ALL_SSL_DHPARAMS_HASHES
        self.assertTrue(self._current_ssl_dhparams_hash() in ALL_SSL_DHPARAMS_HASHES,
            "Constants.ALL_SSL_DHPARAMS_HASHES must be appended"
            " with the sha256 hash of self.config.ssl_dhparams when it is updated.")


class AddrTest(unittest.TestCase):
    """Tests for certbot.client.plugins.common.Addr."""

    def setUp(self):
        from certbot.plugins.common import Addr
        self.addr1 = Addr.fromstring("192.168.1.1")
        self.addr2 = Addr.fromstring("192.168.1.1:*")
        self.addr3 = Addr.fromstring("192.168.1.1:80")
        self.addr4 = Addr.fromstring("[fe00::1]")
        self.addr5 = Addr.fromstring("[fe00::1]:*")
        self.addr6 = Addr.fromstring("[fe00::1]:80")
        self.addr7 = Addr.fromstring("[fe00::1]:5")
        self.addr8 = Addr.fromstring("[fe00:1:2:3:4:5:6:7:8:9]:8080")

    def test_fromstring(self):
        self.assertEqual(self.addr1.get_addr(), "192.168.1.1")
        self.assertEqual(self.addr1.get_port(), "")
        self.assertEqual(self.addr2.get_addr(), "192.168.1.1")
        self.assertEqual(self.addr2.get_port(), "*")
        self.assertEqual(self.addr3.get_addr(), "192.168.1.1")
        self.assertEqual(self.addr3.get_port(), "80")
        self.assertEqual(self.addr4.get_addr(), "[fe00::1]")
        self.assertEqual(self.addr4.get_port(), "")
        self.assertEqual(self.addr5.get_addr(), "[fe00::1]")
        self.assertEqual(self.addr5.get_port(), "*")
        self.assertEqual(self.addr6.get_addr(), "[fe00::1]")
        self.assertEqual(self.addr6.get_port(), "80")
        self.assertEqual(self.addr6.get_ipv6_exploded(),
                         "fe00:0:0:0:0:0:0:1")
        self.assertEqual(self.addr1.get_ipv6_exploded(),
                         "")
        self.assertEqual(self.addr7.get_port(), "5")
        self.assertEqual(self.addr8.get_ipv6_exploded(),
                         "fe00:1:2:3:4:5:6:7")

    def test_str(self):
        self.assertEqual(str(self.addr1), "192.168.1.1")
        self.assertEqual(str(self.addr2), "192.168.1.1:*")
        self.assertEqual(str(self.addr3), "192.168.1.1:80")
        self.assertEqual(str(self.addr4), "[fe00::1]")
        self.assertEqual(str(self.addr5), "[fe00::1]:*")
        self.assertEqual(str(self.addr6), "[fe00::1]:80")

    def test_get_addr_obj(self):
        self.assertEqual(str(self.addr1.get_addr_obj("443")), "192.168.1.1:443")
        self.assertEqual(str(self.addr2.get_addr_obj("")), "192.168.1.1")
        self.assertEqual(str(self.addr1.get_addr_obj("*")), "192.168.1.1:*")
        self.assertEqual(str(self.addr4.get_addr_obj("443")), "[fe00::1]:443")
        self.assertEqual(str(self.addr5.get_addr_obj("")), "[fe00::1]")
        self.assertEqual(str(self.addr4.get_addr_obj("*")), "[fe00::1]:*")

    def test_eq(self):
        self.assertEqual(self.addr1, self.addr2.get_addr_obj(""))
        self.assertNotEqual(self.addr1, self.addr2)
        self.assertFalse(self.addr1 == 3333)

        self.assertEqual(self.addr4, self.addr4.get_addr_obj(""))
        self.assertNotEqual(self.addr4, self.addr5)
        self.assertFalse(self.addr4 == 3333)
        from certbot.plugins.common import Addr
        self.assertEqual(self.addr4, Addr.fromstring("[fe00:0:0::1]"))
        self.assertEqual(self.addr4, Addr.fromstring("[fe00:0::0:0:1]"))


    def test_set_inclusion(self):
        from certbot.plugins.common import Addr
        set_a = set([self.addr1, self.addr2])
        addr1b = Addr.fromstring("192.168.1.1")
        addr2b = Addr.fromstring("192.168.1.1:*")
        set_b = set([addr1b, addr2b])

        self.assertEqual(set_a, set_b)

        set_c = set([self.addr4, self.addr5])
        addr4b = Addr.fromstring("[fe00::1]")
        addr5b = Addr.fromstring("[fe00::1]:*")
        set_d = set([addr4b, addr5b])

        self.assertEqual(set_c, set_d)


class ChallengePerformerTest(unittest.TestCase):
    """Tests for certbot.plugins.common.ChallengePerformer."""

    def setUp(self):
        configurator = mock.MagicMock()

        from certbot.plugins.common import ChallengePerformer
        self.performer = ChallengePerformer(configurator)

    def test_add_chall(self):
        self.performer.add_chall(ACHALL, 0)
        self.assertEqual(1, len(self.performer.achalls))
        self.assertEqual([0], self.performer.indices)

    def test_perform(self):
        self.assertRaises(NotImplementedError, self.performer.perform)


class InstallVersionControlledFileTest(test_util.TempDirTestCase):
    """Tests for certbot.plugins.common.install_version_controlled_file."""

    def setUp(self):
        super(InstallVersionControlledFileTest, self).setUp()
        self.hashes = ["someotherhash"]
        self.dest_path = os.path.join(self.tempdir, "options-ssl-dest.conf")
        self.hash_path = os.path.join(self.tempdir, ".options-ssl-conf.txt")
        self.old_path = os.path.join(self.tempdir, "options-ssl-old.conf")
        self.source_path = os.path.join(self.tempdir, "options-ssl-src.conf")
        for path in (self.source_path, self.old_path,):
            with open(path, "w") as f:
                f.write(path)
            self.hashes.append(crypto_util.sha256sum(path))

    def _call(self):
        from certbot.plugins.common import install_version_controlled_file
        install_version_controlled_file(self.dest_path,
                                        self.hash_path,
                                        self.source_path,
                                        self.hashes)

    def _current_file_hash(self):
        return crypto_util.sha256sum(self.source_path)

    def _assert_current_file(self):
        self.assertTrue(os.path.isfile(self.dest_path))
        self.assertEqual(crypto_util.sha256sum(self.dest_path),
            self._current_file_hash())

    def test_no_file(self):
        self.assertFalse(os.path.isfile(self.dest_path))
        self._call()
        self._assert_current_file()

    def test_current_file(self):
        # 1st iteration installs the file, the 2nd checks if it needs updating
        for _ in range(2):
            self._call()
            self._assert_current_file()

    def test_prev_file_updates_to_current(self):
        shutil.copyfile(self.old_path, self.dest_path)
        self._call()
        self._assert_current_file()

    def test_manually_modified_current_file_does_not_update(self):
        self._call()
        with open(self.dest_path, "a") as mod_ssl_conf:
            mod_ssl_conf.write("a new line for the wrong hash\n")
        with mock.patch("certbot.plugins.common.logger") as mock_logger:
            self._call()
            self.assertFalse(mock_logger.warning.called)
        self.assertTrue(os.path.isfile(self.dest_path))
        self.assertEqual(crypto_util.sha256sum(self.source_path),
            self._current_file_hash())
        self.assertNotEqual(crypto_util.sha256sum(self.dest_path),
            self._current_file_hash())

    def test_manually_modified_past_file_warns(self):
        with open(self.dest_path, "a") as mod_ssl_conf:
            mod_ssl_conf.write("a new line for the wrong hash\n")
        with open(self.hash_path, "w") as f:
            f.write("hashofanoldversion")
        with mock.patch("certbot.plugins.common.logger") as mock_logger:
            self._call()
            self.assertEqual(mock_logger.warning.call_args[0][0],
                "%s has been manually modified; updated file "
                "saved to %s. We recommend updating %s for security purposes.")
        self.assertEqual(crypto_util.sha256sum(self.source_path),
            self._current_file_hash())
        # only print warning once
        with mock.patch("certbot.plugins.common.logger") as mock_logger:
            self._call()
            self.assertFalse(mock_logger.warning.called)

if __name__ == "__main__":
    unittest.main()  # pragma: no cover
Private