Newer
Older
#!/usr/bin/env python3
# Kayobe overcloud host configure tests.
# Uses py.test and TestInfra.
import ipaddress
import os
import distro
import pytest
info = distro.id()
return info == 'ubuntu'
info = distro.id()
return info in ['centos', 'rocky']
def _is_dnf_mirror():
info = distro.id()
return info == 'centos'
def _is_ubuntu_noble():
name = distro.name()
version = distro.version()
return name == 'Ubuntu' and version == '24.04'
def test_network_ethernet(host):
interface = host.interface('dummy2')
assert interface.exists
assert '192.168.34.1' in interface.addresses
routes = host.check_output('/sbin/ip route show dev dummy2')
assert '192.168.40.0/24 via 192.168.34.254' in routes
def test_network_ethernet_vlan(host):
interface = host.interface('dummy2.42')
assert interface.exists
assert '192.168.35.1' in interface.addresses
assert host.file('/sys/class/net/dummy2.42/lower_dummy2').exists
routes = host.check_output(
'/sbin/ip route show dev dummy2.42 table kayobe-test-route-table')
assert '192.168.40.0/24 via 192.168.35.254' in routes
rules = host.check_output(
'/sbin/ip rule show table kayobe-test-route-table')
expected_from = 'from 192.168.35.0/24 lookup kayobe-test-route-table'
expected_to = 'to 192.168.35.0/24 lookup kayobe-test-route-table'
assert expected_from in rules
assert expected_to in rules
def test_network_bridge(host):
interface = host.interface('br0')
assert interface.exists
assert '192.168.36.1' in interface.addresses
state_file = "/sys/class/net/br0/bridge/stp_state"
stp_status = host.file(state_file).content_string.strip()
ports = ['dummy3', 'dummy4']
sys_ports = host.check_output('ls -1 /sys/class/net/br0/brif')
assert sys_ports == "\n".join(ports)
for port in ports:
interface = host.interface(port)
assert interface.exists
v4_addresses = [a for a in interface.addresses
if ipaddress.ip_address(a).version == '4']
assert not v4_addresses
def test_network_bridge_vlan(host):
interface = host.interface('br0.43')
assert interface.exists
assert '192.168.37.1' in interface.addresses
assert host.file('/sys/class/net/br0.43/lower_br0').exists
def test_network_bond(host):
interface = host.interface('bond0')
assert interface.exists
assert '192.168.38.1' in interface.addresses
sys_slaves = host.check_output('cat /sys/class/net/bond0/bonding/slaves')
# Ordering is not guaranteed, so compare sets.
sys_slaves = set(sys_slaves.split())
slaves = set(['dummy5', 'dummy6'])
assert sys_slaves == slaves
interface = host.interface(slave)
assert interface.exists
assert not interface.addresses
def test_network_bond_vlan(host):
interface = host.interface('bond0.44')
assert interface.exists
assert '192.168.39.1' in interface.addresses
assert host.file('/sys/class/net/bond0.44/lower_bond0').exists
def test_network_bridge_no_ip(host):
interface = host.interface('br1')
assert interface.exists
assert '192.168.40.1' not in interface.addresses
state_file = "/sys/class/net/br1/bridge/stp_state"
stp_status = host.file(state_file).content_string.strip()
@pytest.mark.skipif(not _is_apt(),
reason="systemd-networkd VLANs only supported on Ubuntu")
def test_network_systemd_vlan(host):
interface = host.interface('vlan45')
assert interface.exists
assert '192.168.41.1' in interface.addresses
def test_additional_user_account(host):
user = host.user("kayobe-test-user")
assert user.name == "kayobe-test-user"
assert user.group == "kayobe-test-user"
assert set(user.groups) == {"kayobe-test-user", "stack"}
assert user.gecos == "Kayobe test user"
with host.sudo():
assert user.password == 'kayobe-test-user-password'
def test_software_RAID(host):
slaves = host.check_output("ls -1 /sys/class/block/md0/slaves/")
assert slaves == "loop0\nloop1"
def test_luks(host):
# blkid returns an emptry string without root permissions
with host.sudo():
blkid = host.check_output('blkid /dev/md0')
assert 'TYPE="crypto_LUKS"' in blkid
def test_sysctls(host):
assert host.sysctl("fs.mount-max") == 99999
def test_cloud_init_is_disabled(host):
assert host.file("/etc/cloud/cloud-init.disabled").exists
def test_docker_storage_driver_is_overlay2(host):
with host.sudo("stack"):
info = host.check_output("docker info")
@pytest.mark.parametrize('user', ['kolla', 'stack'])
def test_docker_image_download(host, user):
with host.sudo(user):
host.check_output("docker pull quay.io/podman/hello")
@pytest.mark.parametrize('user', ['kolla', 'stack'])
def test_docker_container_run(host, user):
with host.sudo(user):
host.check_output("docker run --rm quay.io/podman/hello")
def test_timezone(host):
status = host.check_output("timedatectl status")
assert "Pacific/Honolulu" in status
def test_ntp_alternative_services_disabled(host):
# Tests that we don't have any conflicting NTP servers running
# NOTE(wszumski): We always mask services even if they don't exist
ntpd_service = host.service("ntp")
assert ntpd_service.is_masked
assert not ntpd_service.is_running
timesyncd_service = host.service("systemd-timesyncd")
assert timesyncd_service.is_masked
assert not timesyncd_service.is_running
def test_ntp_running(host):
# Tests that NTP services are enabled and running
assert host.package("chrony").is_installed
assert host.service("chronyd").is_enabled
assert host.service("chronyd").is_running
def test_ntp_non_default_time_server(host):
# Tests that the NTP pool has been changed from pool.ntp.org to
# time.cloudflare.com
if ('centos' in host.system_info.distribution.lower() or
'rocky' in host.system_info.distribution.lower()):
chrony_config = host.file("/etc/chrony.conf")
else:
# Debian based distributions use the following path
chrony_config = host.file("/etc/chrony/chrony.conf")
assert chrony_config.exists
assert "time.cloudflare.com" in chrony_config.content_string
# TODO(priteau): Remove once we force time sync
@pytest.mark.skipif(_is_ubuntu_noble(),
reason="Clock often fails to synchronize on Ubuntu Noble")
def test_ntp_clock_synchronized(host):
# Tests that the clock is synchronized
status_output = host.check_output("timedatectl status")
assert "synchronized: yes" in status_output
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
def test_apt_config(host):
apt_config = host.file("/etc/apt/apt.conf.d/99retries")
assert apt_config.exists
assert apt_config.content_string == "Acquire::Retries 1;\n"
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
def test_apt_preferences(host):
apt_preferences = host.file("/etc/apt/preferences.d/99fakepackage")
assert apt_preferences.exists
assert apt_preferences.content_string == ("Package: fake-package\n"
"Pin: origin fake.repo.url\n"
"Pin-Priority: 1\n")
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
def test_apt_custom_package_repository_is_available(host):
with host.sudo():
host.check_output("apt -y install td-agent")
assert host.package("td-agent").is_installed
@pytest.mark.skipif(not _is_apt(), reason="Apt only supported on Ubuntu")
def test_apt_auth(host):
apt_auth = host.file("/etc/apt/auth.conf.d/test.conf")
assert apt_auth.exists
with host.sudo():
auth_lines = apt_auth.content_string.splitlines()
assert "machine https://apt.example.com" in auth_lines
assert "login foo" in auth_lines
assert "password bar" in auth_lines
@pytest.mark.parametrize('repo', ["appstream", "baseos", "extras", "epel"])
@pytest.mark.skipif(not _is_dnf_mirror(),
reason="DNF OpenDev mirror only for CentOS 8")
def test_dnf_local_package_mirrors(host, repo):
# Depends on SITE_MIRROR_FQDN environment variable.
assert os.getenv('SITE_MIRROR_FQDN')
# NOTE(mgoddard): Should not require sudo but some files
# (/var/cache/dnf/expired_repos.json) can have incorrect permissions.
# https://bugzilla.redhat.com/show_bug.cgi?id=1636909
with host.sudo():
info = host.check_output("dnf repoinfo %s", repo)
assert os.getenv('SITE_MIRROR_FQDN') in info
@pytest.mark.skipif(not _is_dnf(), reason="DNF only supported on CentOS/Rocky")
def test_dnf_custom_package_repository_is_available(host):
with host.sudo():
host.check_output("dnf -y install td-agent")
assert host.package("td-agent").is_installed
@pytest.mark.skipif(not _is_dnf(), reason="DNF only supported on CentOS/Rocky")
def test_dnf_automatic(host):
assert host.package("dnf-automatic").is_installed
assert host.service("dnf-automatic.timer").is_enabled
assert host.service("dnf-automatic.timer").is_running
@pytest.mark.skipif(not _is_dnf(),
reason="tuned profiles only supported on CentOS/Rocky")
def test_tuned_profile_is_active(host):
tuned_output = host.check_output("tuned-adm active")
assert "throughput-performance" in tuned_output
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def test_firewalld_running(host):
assert host.package("firewalld").is_installed
assert host.service("firewalld.service").is_enabled
assert host.service("firewalld.service").is_running
def test_firewalld_zones(host):
# Verify that interfaces are on correct zones.
expected_zones = {
'dummy2.42': 'test-zone1',
'br0': 'test-zone2',
'br0.43': 'test-zone3',
'bond0': 'test-zone3',
'bond0.44': 'public'
}
for interface, expected_zone in expected_zones.items():
with host.sudo():
zone = host.check_output(
"firewall-cmd --get-zone-of-interface %s", interface)
assert zone == expected_zone
zone = host.check_output(
"firewall-cmd --permanent --get-zone-of-interface %s",
interface)
assert zone == expected_zone
def test_firewalld_rules(host):
# Verify that expected rules are present.
expected_info = {
'test-zone1': [
' services: ',
' ports: 8080/tcp',
' icmp-blocks: ',
],
'test-zone2': [
' services: http',
' ports: ',
' icmp-blocks: ',
],
'test-zone3': [
' services: ',
' ports: ',
' icmp-blocks: echo-request',
],
'public': [
' services: dhcpv6-client ssh',
' ports: ',
' icmp-blocks: ',
],
}
for zone, expected_lines in expected_info.items():
with host.sudo():
info = host.check_output(
"firewall-cmd --info-zone %s", zone)
info = info.splitlines()
perm_info = host.check_output(
"firewall-cmd --permanent --info-zone %s", zone)
perm_info = perm_info.splitlines()
for expected_line in expected_lines:
assert expected_line in info
assert expected_line in perm_info
@pytest.mark.skipif(not _is_dnf(),
reason="SELinux only supported on CentOS/Rocky")
def test_selinux(host):
selinux = host.check_output("sestatus")
selinux = selinux.splitlines()
# Remove duplicate whitespace characters in output
selinux = [" ".join(x.split()) for x in selinux]
assert "SELinux status: enabled" in selinux
assert "Current mode: permissive" in selinux
assert "Mode from config file: permissive" in selinux
def test_swap(host):
swapon = host.check_output("swapon -s")
swapon = swapon.splitlines()
assert len(swapon) > 1
swap_devs = [swap.split()[0] for swap in swapon[1:]]
assert "/swapfile" in swap_devs