Skip to content
Snippets Groups Projects
test_kolla_docker.py 58.6 KiB
Newer Older
        self.assertFalse(self.dw.compare_volumes(container_info))

    def test_compare_volumes_pos(self):
        container_info = {
            'Config': dict(Volumes=['/var/log/kolla/']),
            'HostConfig': dict(Binds=['kolla_logs:/var/log/kolla/:rw'])}
        self.dw = get_DockerWorker(
            {'volumes': ['/dev/:/dev/:rw']})

        self.assertTrue(self.dw.compare_volumes(container_info))

    def test_compare_environment_neg(self):
        container_info = {'Config': dict(
            Env=['KOLLA_CONFIG_STRATEGY=COPY_ALWAYS',
                 'KOLLA_BASE_DISTRO=ubuntu',
                 'KOLLA_INSTALL_TYPE=binary']
        )}
        self.dw = get_DockerWorker({
            'environment': dict(KOLLA_CONFIG_STRATEGY='COPY_ALWAYS',
                                KOLLA_BASE_DISTRO='ubuntu',
                                KOLLA_INSTALL_TYPE='binary')})

        self.assertFalse(self.dw.compare_environment(container_info))

    def test_compare_environment_pos(self):
        container_info = {'Config': dict(
            Env=['KOLLA_CONFIG_STRATEGY=COPY_ALWAYS',
                 'KOLLA_BASE_DISTRO=ubuntu',
                 'KOLLA_INSTALL_TYPE=binary']
        )}
        self.dw = get_DockerWorker({
            'environment': dict(KOLLA_CONFIG_STRATEGY='COPY_ALWAYS',
                                KOLLA_BASE_DISTRO='centos',
                                KOLLA_INSTALL_TYPE='binary')})

        self.assertTrue(self.dw.compare_environment(container_info))

    def test_compare_container_state_neg(self):
        container_info = {'State': dict(Status='running')}
        self.dw = get_DockerWorker({'state': 'running'})
        self.assertFalse(self.dw.compare_container_state(container_info))

    def test_compare_dimensions_pos(self):
        self.fake_data['params']['dimensions'] = {
            'blkio_weight': 10, 'mem_limit': 30}
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_dimensions(container_info))

    def test_compare_dimensions_neg(self):
        self.fake_data['params']['dimensions'] = {
            'blkio_weight': 10}
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 10,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertFalse(self.dw.compare_dimensions(container_info))

    def test_compare_wrong_dimensions(self):
        self.fake_data['params']['dimensions'] = {
            'blki_weight': 0}
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.dw.compare_dimensions(container_info)
        self.dw.module.exit_json.assert_called_once_with(
            failed=True, msg=repr("Unsupported dimensions"),
            unsupported_dimensions=set(['blki_weight']))

    def test_compare_empty_dimensions(self):
        self.fake_data['params']['dimensions'] = dict()
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '1', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_dimensions(container_info))

    def test_compare_dimensions_removed_and_changed(self):
        self.fake_data['params']['dimensions'] = {
            'mem_reservation': 10}
        container_info = dict()
        # Here mem_limit and mem_reservation are already present
        # Now we are updating only 'mem_reservation'.
        # Ideally it should return True stating that the docker
        # dimensions have been changed.
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 10, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 10,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_dimensions(container_info))

    def test_compare_dimensions_explicit_default(self):
        self.fake_data['params']['dimensions'] = {
            'mem_reservation': 0}
        container_info = dict()
        # Here mem_limit and mem_reservation are already present
        # Now we are updating only 'mem_reservation'.
        # Ideally it should return True stating that the docker
        # dimensions have been changed.
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertFalse(self.dw.compare_dimensions(container_info))

    def test_compare_container_state_pos(self):
        container_info = {'State': dict(Status='running')}
        self.dw = get_DockerWorker({'state': 'exited'})
        self.assertTrue(self.dw.compare_container_state(container_info))

    def test_compare_ulimits_pos(self):
        self.fake_data['params']['dimensions'] = {
            'ulimits': {'nofile': {'soft': 131072, 'hard': 131072}}}
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_dimensions(container_info))

    def test_compare_ulimits_neg(self):
        self.fake_data['params']['dimensions'] = {
            'ulimits': {'nofile': {'soft': 131072, 'hard': 131072}}}
        ulimits_nofile = Ulimit(name='nofile',
                                soft=131072, hard=131072)
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': [ulimits_nofile]}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertFalse(self.dw.compare_dimensions(container_info))

    def test_compare_empty_new_healthcheck(self):
        container_info = dict()
        container_info['Config'] = {
            'Healthcheck': {
                'Test': [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_healthcheck(container_info))

    def test_compare_empty_current_healthcheck(self):
        self.fake_data['params']['healthcheck'] = {
            'test': ['CMD-SHELL', '/bin/check.sh'],
            'interval': 30,
            'timeout': 30,
            'start_period': 5,
            'retries': 3}
        container_info = dict()
        container_info['Config'] = {}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_healthcheck(container_info))

    def test_compare_healthcheck_no_test(self):
        self.fake_data['params']['healthcheck'] = {
            'interval': 30,
            'timeout': 30,
            'start_period': 5,
            'retries': 3}
        container_info = dict()
        container_info['Config'] = {
            'Healthcheck': {
                'Test': [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.dw.compare_healthcheck(container_info)
        self.dw.module.exit_json.assert_called_once_with(
            failed=True, msg=repr("Missing healthcheck option"),
            missing_healthcheck=set(['test']))

    def test_compare_healthcheck_pos(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['CMD', '/bin/check']}
        container_info = dict()
        container_info['Config'] = {
            'Healthcheck': {
                'Test': [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_healthcheck(container_info))

    def test_compare_healthcheck_neg(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['CMD-SHELL', '/bin/check.sh'],
             'interval': 30,
             'timeout': 30,
             'start_period': 5,
             'retries': 3}
        container_info = dict()
        container_info['Config'] = {
            "Healthcheck": {
                "Test": [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertFalse(self.dw.compare_healthcheck(container_info))

    def test_compare_healthcheck_time_zero(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['CMD-SHELL', '/bin/check.sh'],
             'interval': 0,
             'timeout': 30,
             'start_period': 5,
             'retries': 3}
        container_info = dict()
        container_info['Config'] = {
            "Healthcheck": {
                "Test": [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_healthcheck(container_info))

    def test_compare_healthcheck_time_wrong_type(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['CMD-SHELL', '/bin/check.sh'],
             'timeout': 30,
             'start_period': 5,
             'retries': 3}
        self.fake_data['params']['healthcheck']['interval'] = \
            {"broken": {"interval": "True"}}
        container_info = dict()
        container_info['Config'] = {
            "Healthcheck": {
                "Test": [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertRaises(TypeError,
                          lambda: self.dw.compare_healthcheck(container_info))

    def test_compare_healthcheck_time_wrong_value(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['CMD-SHELL', '/bin/check.sh'],
             'timeout': 30,
             'start_period': 5,
             'retries': 3}
        self.fake_data['params']['healthcheck']['interval'] = "dog"
        container_info = dict()
        container_info['Config'] = {
            "Healthcheck": {
                "Test": [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertRaises(ValueError,
                          lambda: self.dw.compare_healthcheck(container_info))

    def test_compare_healthcheck_opt_missing(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['CMD-SHELL', '/bin/check.sh'],
             'interval': 30,
             'timeout': 30,
             'retries': 3}
        container_info = dict()
        container_info['Config'] = {
            "Healthcheck": {
                "Test": [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.dw.compare_healthcheck(container_info)
        self.dw.module.exit_json.assert_called_once_with(
            failed=True, msg=repr("Missing healthcheck option"),
            missing_healthcheck=set(['start_period']))

    def test_compare_healthcheck_opt_extra(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['CMD-SHELL', '/bin/check.sh'],
             'interval': 30,
             'start_period': 5,
             'extra_option': 1,
             'timeout': 30,
             'retries': 3}
        container_info = dict()
        container_info['Config'] = {
            "Healthcheck": {
                "Test": [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.dw.compare_healthcheck(container_info)
        self.dw.module.exit_json.assert_called_once_with(
            failed=True, msg=repr("Unsupported healthcheck options"),
            unsupported_healthcheck=set(['extra_option']))

    def test_compare_healthcheck_value_false(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['CMD-SHELL', '/bin/check.sh'],
             'interval': 30,
             'start_period': 5,
             'extra_option': 1,
             'timeout': 30,
             'retries': False}
        container_info = dict()
        container_info['Config'] = {
            "Healthcheck": {
                "Test": [
                    "CMD-SHELL",
                    "/bin/check.sh"],
                "Interval": 30000000000,
                "Timeout": 30000000000,
                "StartPeriod": 5000000000,
                "Retries": 3}}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_healthcheck(container_info))

    def test_parse_healthcheck_empty(self):
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertIsNone(self.dw.parse_healthcheck(
                          self.fake_data.get('params', {}).get('healthcheck')))

    def test_parse_healthcheck_test_none(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': 'NONE'}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertIsNone(self.dw.parse_healthcheck(
                          self.fake_data['params']['healthcheck']))

    def test_parse_healthcheck_test_none_brackets(self):
        self.fake_data['params']['healthcheck'] = \
            {'test': ['NONE']}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertIsNone(self.dw.parse_healthcheck(
                          self.fake_data['params']['healthcheck']))