Skip to content
Snippets Groups Projects
test_kolla_docker.py 47.5 KiB
Newer Older
            'blkio_weight': 10, 'mem_limit': 30}
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_dimensions(container_info))

    def test_compare_dimensions_neg(self):
        self.fake_data['params']['dimensions'] = {
            'blkio_weight': 10}
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 10,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertFalse(self.dw.compare_dimensions(container_info))

    def test_compare_wrong_dimensions(self):
        self.fake_data['params']['dimensions'] = {
            'blki_weight': 0}
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.dw.compare_dimensions(container_info)
        self.dw.module.exit_json.assert_called_once_with(
            failed=True, msg=repr("Unsupported dimensions"),
            unsupported_dimensions=set(['blki_weight']))

    def test_compare_empty_dimensions(self):
        self.fake_data['params']['dimensions'] = dict()
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '1', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_dimensions(container_info))

    def test_compare_dimensions_removed_and_changed(self):
        self.fake_data['params']['dimensions'] = {
            'mem_reservation': 10}
        container_info = dict()
        # Here mem_limit and mem_reservation are already present
        # Now we are updating only 'mem_reservation'.
        # Ideally it should return True stating that the docker
        # dimensions have been changed.
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 10, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 10,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_dimensions(container_info))

    def test_compare_dimensions_explicit_default(self):
        self.fake_data['params']['dimensions'] = {
            'mem_reservation': 0}
        container_info = dict()
        # Here mem_limit and mem_reservation are already present
        # Now we are updating only 'mem_reservation'.
        # Ideally it should return True stating that the docker
        # dimensions have been changed.
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertFalse(self.dw.compare_dimensions(container_info))

    def test_compare_container_state_pos(self):
        container_info = {'State': dict(Status='running')}
        self.dw = get_DockerWorker({'state': 'exited'})
        self.assertTrue(self.dw.compare_container_state(container_info))

    def test_compare_ulimits_pos(self):
        self.fake_data['params']['dimensions'] = {
            'ulimits': {'nofile': {'soft': 131072, 'hard': 131072}}}
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': []}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertTrue(self.dw.compare_dimensions(container_info))

    def test_compare_ulimits_neg(self):
        self.fake_data['params']['dimensions'] = {
            'ulimits': {'nofile': {'soft': 131072, 'hard': 131072}}}
        ulimits_nofile = Ulimit(name='nofile',
                                soft=131072, hard=131072)
        container_info = dict()
        container_info['HostConfig'] = {
            'CpuPeriod': 0, 'KernelMemory': 0, 'Memory': 0, 'CpuQuota': 0,
            'CpusetCpus': '', 'CpuShares': 0, 'BlkioWeight': 0,
            'CpusetMems': '', 'MemorySwap': 0, 'MemoryReservation': 0,
            'Ulimits': [ulimits_nofile]}
        self.dw = get_DockerWorker(self.fake_data['params'])
        self.assertFalse(self.dw.compare_dimensions(container_info))