Source code for univention.testing.utils

# Copyright 2013-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""
Common functions used by tests.
"""

from __future__ import print_function

import functools
import inspect
import os
import socket
import subprocess
import sys
import time
import traceback
from itertools import chain
from enum import IntEnum
from types import TracebackType  # noqa: F401
from typing import IO, Any, Callable, Dict, Iterable, List, NoReturn, Optional, Sequence, Text, Tuple, Type, TypeVar, Union  # noqa: F401

import ldap
import six

import univention.uldap as uldap
from univention.config_registry import ConfigRegistry

try:
	from univention.admin.uldap import access
except ImportError:
	access = None

S4CONNECTOR_INIT_SCRIPT = '/etc/init.d/univention-s4-connector'
FIREWALL_INIT_SCRIPT = '/etc/init.d/univention-firewall'
SLAPD_INIT_SCRIPT = '/etc/init.d/slapd'

UCR = ConfigRegistry()


_T = TypeVar("_T")


[docs]class LDAPError(Exception): pass
[docs]class LDAPReplicationFailed(LDAPError): pass
[docs]class LDAPObjectNotFound(LDAPError): pass
[docs]class LDAPUnexpectedObjectFound(LDAPError): pass
[docs]class LDAPObjectValueMissing(LDAPError): pass
[docs]class LDAPObjectUnexpectedValue(LDAPError): pass
[docs]class UCSTestDomainAdminCredentials(object): """ This class fetches the username, the LDAP bind DN and the password for a domain admin user account from UCR. The account may be used for testing. >>> dummy_ucr = {'ldap/base': 'dc=example,dc=com', 'tests/domainadmin/pwdfile': '/dev/null'} >>> account = UCSTestDomainAdminCredentials(ucr=dummy_ucr) >>> account.username 'Administrator' >>> account.binddn 'uid=Administrator,cn=users,dc=example,dc=com' >>> account.bindpw '' """ def __init__(self, ucr=None): # type: (Optional[ConfigRegistry]) -> None if ucr is None: ucr = UCR ucr.load() self.binddn = ucr.get('tests/domainadmin/account', 'uid=Administrator,cn=users,%(ldap/base)s' % ucr) self.pwdfile = ucr.get('tests/domainadmin/pwdfile') if self.pwdfile: with open(self.pwdfile, 'r') as f: self.bindpw = f.read().strip('\n\r') else: self.bindpw = ucr.get('tests/domainadmin/pwd', 'univention') if self.binddn: self.username = uldap.explodeDn(self.binddn, 1)[0] # type: Optional[Text] else: self.username = None
[docs]def get_ldap_connection(admin_uldap=False, primary=False): # type: (bool, bool) -> access ucr = UCR ucr.load() if primary: port = int(ucr.get('ldap/master/port', 7389)) ldap_servers = [ucr['ldap/master']] else: port = int(ucr.get('ldap/server/port', 7389)) ldap_servers = [] if ucr['ldap/server/name']: ldap_servers.append(ucr['ldap/server/name']) if ucr['ldap/servers/addition']: ldap_servers.extend(ucr['ldap/server/addition'].split()) creds = UCSTestDomainAdminCredentials() for ldap_server in ldap_servers: try: lo = uldap.access(host=ldap_server, port=port, base=ucr['ldap/base'], binddn=creds.binddn, bindpw=creds.bindpw, start_tls=2, decode_ignorelist=[], follow_referral=True) if admin_uldap: lo = access(lo=lo) return lo except ldap.SERVER_DOWN: pass raise ldap.SERVER_DOWN()
[docs]def retry_on_error(func, exceptions=(Exception,), retry_count=20, delay=10): # type: (Callable[..., _T], Tuple[Type[Exception], ...], int, float) -> _T """ This function calls the given function `func`. If one of the specified `exceptions` is caught, `func` is called again until the retry count is reached or any unspecified exception is caught. Between two calls of `func` retry_on_error waits for `delay` seconds. :param func: function to be called :param exceptions: tuple of exception classes, that cause a rerun of `func` :param retry_count: retry the execution of `func` max `retry_count` times :param delay: waiting time in seconds between two calls of `func` :returns: return value of `func` """ for i in range(retry_count + 1): try: return func() except exceptions: exc_info = sys.exc_info() if i != retry_count: print('Exception occurred: %s (%s). Retrying in %.2f seconds (retry %d/%d).\n' % (exc_info[0], exc_info[1], delay, i, retry_count)) time.sleep(delay) else: print('Exception occurred: %s (%s). This was the last retry (retry %d/%d).\n' % (exc_info[0], exc_info[1], i, retry_count)) else: six.reraise(*exc_info)
[docs]def verify_ldap_object( baseDn, # type: str expected_attr=None, # type: Optional[Dict[str, str]] strict=True, # type: bool should_exist=True, # type: bool retry_count=20, # type: int delay=10, # type: float primary=False, # type: bool pre_check=None, # type: Optional[Callable[..., None]] pre_check_kwargs=None, # type: Optional[Dict[str, Any]] not_expected_attr=None, # type: Optional[Dict[str, str]] ): # type: (...) -> None """ Verify [non]existence and attributes of LDAP object. :param str baseDn: DN of object to check :param dict expected_attr: attributes and their values that the LDAP object is expected to have :param bool strict: value lists of multi-value attributes must be complete :param bool should_exist: whether the object is expected to exist :param int retry_count: how often to retry the verification if it fails before raising an exception :param float delay: waiting time in seconds between retries on verification failures :param bool primary: whether to connect to the primary (DC master) instead of local LDAP (to be exact: ucr[ldap/server/name], ucr['ldap/server/addition']) :param pre_check: function to execute before starting verification. Value should be a function object like `utils.wait_for_replication`. :param dict pre_check_kwargs: dict with kwargs to pass to `pre_check()` call :param dict not_expected_attr: attributes and their values that the LDAP object is NOT expected to have :return: None :raises LDAPObjectNotFound: when no object was found at `baseDn` :raises LDAPUnexpectedObjectFound: when an object was found at `baseDn`, but `should_exist=False` :raises LDAPObjectValueMissing: when a value listed in `expected_attr` is missing in the LDAP object :raises LDAPObjectUnexpectedValue: if `strict=True` and a multi-value attribute of the LDAP object has more values than were listed in `expected_attr` or an `not_expected_attr` was found :raises TypeError: if the value of `pre_check` is not a function object """ ucr = UCR ucr.load() retry_count = int(ucr.get("tests/verify_ldap_object/retry_count", retry_count)) delay = int(ucr.get("tests/verify_ldap_object/delay", delay)) if pre_check: if not inspect.isfunction(pre_check) or inspect.ismethod(pre_check): raise TypeError("Value of argument 'pre_check' is not a function: {!r}".format(pre_check)) pre_check(**(pre_check_kwargs or {})) return retry_on_error( functools.partial(__verify_ldap_object, baseDn, expected_attr, strict, should_exist, primary, not_expected_attr), (LDAPUnexpectedObjectFound, LDAPObjectNotFound, LDAPObjectValueMissing, LDAPObjectUnexpectedValue), retry_count, delay)
def __verify_ldap_object(baseDn, expected_attr=None, strict=True, should_exist=True, primary=False, not_expected_attr=None): # type: (str, Optional[Dict[str, str]], bool, bool, bool) -> None if expected_attr is None: expected_attr = {} if not_expected_attr is None: not_expected_attr = {} try: dn, attr = get_ldap_connection(primary=primary).search( filter='(objectClass=*)', base=baseDn, scope=ldap.SCOPE_BASE, attr=set(chain(expected_attr.keys(), not_expected_attr.keys())) )[0] except (ldap.NO_SUCH_OBJECT, IndexError): if should_exist: raise LDAPObjectNotFound('DN: %s' % baseDn) return if not should_exist: raise LDAPUnexpectedObjectFound('DN: %s' % baseDn) values_missing = {} unexpected_values = {} for attribute, expected_values_ in expected_attr.items(): found_values = set(attr.get(attribute, [])) expected_values = {x if isinstance(x, bytes) else x.encode('UTF-8') for x in expected_values_} difference = expected_values - found_values if difference: values_missing[attribute] = difference if strict: difference = found_values - expected_values if difference: unexpected_values[attribute] = difference for attribute, not_expected_values_ in not_expected_attr.items(): if strict and attribute in expected_attr.keys(): continue found_values = set(attr.get(attribute, [])) not_expected_values = {x if isinstance(x, bytes) else x.encode('UTF-8') for x in not_expected_values_} intersection = found_values.intersection(not_expected_values) if intersection: unexpected_values[attribute] = intersection mixed = dict((key, (values_missing.get(key), unexpected_values.get(key))) for key in list(values_missing) + list(unexpected_values)) msg = u'DN: %s\n%s\n' % ( baseDn, u'\n'.join( u"%s: %r, %s %s" % ( attribute, attr.get(attribute), ('missing: %r;' % u"', ".join(x.decode('UTF-8', 'replace') for x in difference_missing)) if difference_missing else '', ('unexpected: %r' % u"', ".join(x.decode('UTF-8', 'replace') for x in difference_unexpected)) if difference_unexpected else '', ) for attribute, (difference_missing, difference_unexpected) in mixed.items()) ) if values_missing: raise LDAPObjectValueMissing(msg) if unexpected_values: raise LDAPObjectUnexpectedValue(msg)
[docs]def s4connector_present(): # type: () -> bool ucr = ConfigRegistry() ucr.load() if ucr.is_true('directory/manager/samba3/legacy', False): return False if ucr.is_false('directory/manager/samba3/legacy', False): return True for dn, attr in get_ldap_connection().search( filter='(&(|(objectClass=univentionDomainController)(objectClass=univentionMemberServer))(univentionService=S4 Connector))', attr=['aRecord'] ): if 'aRecord' in attr: return True return False
[docs]def stop_s4connector(): # type: () -> None if package_installed('univention-s4-connector'): subprocess.call((S4CONNECTOR_INIT_SCRIPT, 'stop'))
[docs]def start_s4connector(): # type: () -> None if package_installed('univention-s4-connector'): subprocess.call((S4CONNECTOR_INIT_SCRIPT, 'start'))
[docs]def restart_s4connector(): # type: () -> None stop_s4connector() start_s4connector()
[docs]def stop_slapd(): # type: () -> None subprocess.call((SLAPD_INIT_SCRIPT, 'stop'))
[docs]def start_slapd(): # type: () -> None subprocess.call((SLAPD_INIT_SCRIPT, 'start'))
[docs]def restart_slapd(): # type: () -> None subprocess.call((SLAPD_INIT_SCRIPT, 'restart'))
[docs]def stop_listener(): # type: () -> None subprocess.call(('systemctl', 'stop', 'univention-directory-listener'))
[docs]def start_listener(): # type: () -> None subprocess.call(('systemctl', 'start', 'univention-directory-listener'))
[docs]def restart_listener(): # type: () -> None subprocess.call(('systemctl', 'restart', 'univention-directory-listener'))
[docs]def restart_firewall(): # type: () -> None subprocess.call((FIREWALL_INIT_SCRIPT, 'restart'))
[docs]class AutomaticListenerRestart(object): """ Automatically restart Univention Directory Listener when leaving the "with" block:: with AutomaticListenerRestart() as alr: with ucr_test.UCSTestConfigRegistry() as ucr: # set some ucr variables, that influence the Univention Directory Listener univention.config_registry.handler_set(['foo/bar=ding/dong']) """ # noqa: E101 def __enter__(self): # type: () -> AutomaticListenerRestart return self def __exit__(self, exc_type, exc_value, traceback): # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None restart_listener()
[docs]class AutoCallCommand(object): """ Automatically call the given commands when entering/leaving the "with" block. The keyword arguments enter_cmd and exit_cmd are optional:: with AutoCallCommand( enter_cmd=['/etc/init.d/dovecot', 'reload'], exit_cmd=['/etc/init.d/dovecot', 'restart']) as acc: with ucr_test.UCSTestConfigRegistry() as ucr: # set some ucr variables, that influence the Univention Directory Listener univention.config_registry.handler_set(['foo/bar=ding/dong']) In case some filedescriptors for stdout/stderr have to be passed to the executed command, they may be passed as kwarg:: with AutoCallCommand( enter_cmd=['/etc/init.d/dovecot', 'reload'], exit_cmd=['/etc/init.d/dovecot', 'restart'], stderr=open('/dev/zero', 'w')) as acc: pass """ # noqa: E101 def __init__(self, enter_cmd=None, exit_cmd=None, stdout=None, stderr=None): # type: (Optional[Sequence[str]], Optional[Sequence[str]], Optional[IO[str]], Optional[IO[str]]) -> None self.enter_cmd = None if type(enter_cmd) in (list, tuple): self.enter_cmd = enter_cmd self.exit_cmd = None if type(exit_cmd) in (list, tuple): self.exit_cmd = exit_cmd self.pipe_stdout = stdout self.pipe_stderr = stderr def __enter__(self): # type: () -> AutoCallCommand if self.enter_cmd: subprocess.call(self.enter_cmd, stdout=self.pipe_stdout, stderr=self.pipe_stderr) return self def __exit__(self, exc_type, exc_value, traceback): # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None if self.exit_cmd: subprocess.call(self.exit_cmd, stdout=self.pipe_stdout, stderr=self.pipe_stderr)
[docs]class FollowLogfile(object): """ Prints the contents of the listed files on exit of the with block if an exception occurred. Set always=True to also print them without exception. You may wish to make the server flush its logs before existing the with block. Use AutoCallCommand inside the block for that:: cmd = ('doveadm', 'log', 'reopen') with FollowLogfile(logfiles=['/var/log/syslog', '/var/log/mail.log']): with utils.AutoCallCommand(enter_cmd=cmd, exit_cmd=cmd): pass with FollowLogfile(logfiles=['/var/log/syslog'], always=True): with utils.AutoCallCommand(enter_cmd=cmd, exit_cmd=cmd): pass """ # noqa: E101 def __init__(self, logfiles, always=False): # type: (Iterable[str], bool) -> None """ :param logfiles: list of absolute filenames to read from :param always: bool, if True: print logfile change also if no error occurred (default=False) """ assert isinstance(always, bool) self.always = always self.logfile_pos = dict.fromkeys(logfiles, 0) # type: Dict[str, int] def __enter__(self): # type: () -> FollowLogfile self.logfile_pos.update((logfile, os.path.getsize(logfile)) for logfile in self.logfile_pos) return self def __exit__(self, exc_type, exc_value, traceback): # type: (Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType]) -> None if self.always or exc_type: for logfile, pos in self.logfile_pos.items(): with open(logfile, "r") as log: log.seek(pos, 0) print(logfile.center(79, "=")) sys.stdout.writelines(log) print("=" * 79)
[docs]class ReplicationType(IntEnum): LISTENER = 1 POSTRUN = 2 S4C_FROM_UCS = 3 S4C_TO_UCS = 4 DRS = 5
[docs]def wait_for_replication_from_master_openldap_to_local_samba(replication_postrun=False, ldap_filter=None, verbose=True): # type: (bool, Optional[str], bool) -> None """Wait for all kind of replications""" # the order matters! conditions = [(ReplicationType.LISTENER, 'postrun' if replication_postrun else True)] # type: List[Tuple[ReplicationType, Any]] ucr = UCR ucr.load() if ucr.get('samba4/ldap/base'): conditions.append((ReplicationType.S4C_FROM_UCS, ldap_filter)) if ucr.get('server/role') in ('domaincontroller_backup', 'domaincontroller_slave'): conditions.append((ReplicationType.DRS, ldap_filter)) wait_for(conditions, verbose=True)
[docs]def wait_for_replication_from_local_samba_to_local_openldap(replication_postrun=False, ldap_filter=None, verbose=True): # type: (bool, Optional[str], bool) -> None """Wait for all kind of replications""" conditions = [] # the order matters! ucr = UCR ucr.load() if ucr.get('server/role') in ('domaincontroller_backup', 'domaincontroller_slave'): conditions.append((ReplicationType.DRS, ldap_filter)) if ucr.get('samba4/ldap/base'): conditions.append((ReplicationType.S4C_FROM_UCS, ldap_filter)) if replication_postrun: conditions.append((ReplicationType.LISTENER, 'postrun')) else: conditions.append((ReplicationType.LISTENER, None)) wait_for(conditions, verbose=True)
[docs]def wait_for(conditions=None, verbose=True): # type: (Optional[List[Tuple[ReplicationType, Any]]], bool) -> None """Wait for all kind of replications""" for replicationtype, detail in conditions or []: if replicationtype == ReplicationType.LISTENER: if detail == 'postrun': wait_for_listener_replication_and_postrun(verbose) else: wait_for_listener_replication(verbose) elif replicationtype == ReplicationType.S4C_FROM_UCS: wait_for_s4connector_replication(verbose) if detail: # TODO: search in Samba/AD with filter=detail pass elif replicationtype == ReplicationType.S4C_TO_UCS: wait_for_s4connector_replication(verbose) if detail: # TODO: search in OpenLDAP with filter=detail pass elif replicationtype == ReplicationType.DRS: if not isinstance(detail, dict): detail = {'ldap_filter': detail} wait_for_drs_replication(verbose=verbose, **detail)
[docs]def wait_for_drs_replication(*args, **kwargs): # type: (*Any, **Any) -> None from univention.testing.ucs_samba import wait_for_drs_replication return wait_for_drs_replication(*args, **kwargs)
[docs]def wait_for_listener_replication(verbose=True): # type: (bool) -> None sys.stdout.flush() time.sleep(1) # Give the notifier some time to increase its transaction id if verbose: print('Waiting for replication...') for _ in range(300): # The "-c 1" option ensures listener and notifier id are equal. # Otherwise the check is successful as long as the listener id changed since the last check. cmd = ('/usr/lib/nagios/plugins/check_univention_replication', '-c', '1') proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, _stderr = proc.communicate() if proc.returncode == 0: if verbose: print('Done: replication complete.') return print('.', end=' ') time.sleep(1) print('Error: replication incomplete.') raise LDAPReplicationFailed()
[docs]def get_lid(): # type: () -> int """ get_lid() returns the last processed notifier ID of univention-directory-listener. """ with open("/var/lib/univention-directory-listener/notifier_id", "r") as notifier_id: return int(notifier_id.readline())
[docs]def wait_for_listener_replication_and_postrun(verbose=True): # type: (bool) -> None # Postrun function in listener modules are called after 15 seconds without any events wait_for_listener_replication(verbose=verbose) if verbose: print("Waiting for postrun...") lid = get_lid() seconds_since_last_change = 0 for _ in range(300): time.sleep(1) print('.', end=' ') if lid == get_lid(): seconds_since_last_change += 1 else: seconds_since_last_change = 0 if seconds_since_last_change > 12: # Less than 15 sec because a postrun function can potentially make ldap changes, # which would result in a loop here. time.sleep(10) # Give the postrun function some time if verbose: print("Postrun should have run") return lid = get_lid() print("Postrun was probably never called in the last 300 seconds") raise LDAPReplicationFailed
[docs]def wait_for_s4connector_replication(verbose=True): # type: (bool) -> None if verbose: print('Waiting for connector replication') import univention.testing.ucs_samba try: univention.testing.ucs_samba.wait_for_s4connector(17) except OSError as exc: # nagios not installed if verbose: print('Nagios not installed: %s' % (exc,), file=sys.stderr) time.sleep(16) except univention.testing.ucs_samba.WaitForS4ConnectorTimeout: if verbose: print('Warning: S4 Connector replication was not finished after 17 seconds', file=sys.stderr)
# backwards compatibility wait_for_replication = wait_for_listener_replication wait_for_replication_and_postrun = wait_for_listener_replication_and_postrun wait_for_connector_replication = wait_for_s4connector_replication
[docs]def package_installed(package): # type: (str) -> bool sys.stdout.flush() with open('/dev/null', 'w') as null: return (subprocess.call("dpkg-query -W -f '${Status}' %s | grep -q ^install" % package, stderr=null, shell=True) == 0)
[docs]def fail(log_message=None, returncode=1): # type: (Optional[str], int) -> NoReturn print('### FAIL ###') if log_message: print('%s\n### ###' % log_message) if sys.exc_info()[-1]: print(traceback.format_exc(), file=sys.stderr) sys.exit(returncode)
[docs]def uppercase_in_ldap_base(): # type: () -> bool ucr = ConfigRegistry() ucr.load() return not ucr.get('ldap/base').islower()
[docs]def is_udp_port_open(port, ip=None): # type: (int, Optional[str]) -> bool if ip is None: ip = '127.0.0.1' try: udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_sock.connect((ip, int(port))) os.write(udp_sock.fileno(), b'X') os.write(udp_sock.fileno(), b'X') os.write(udp_sock.fileno(), b'X') return True except OSError as ex: print('is_udp_port_open({0}) failed: {1}'.format(port, ex)) return False
[docs]def is_port_open(port, hosts=None, timeout=60): # type: (int, Optional[Iterable[str]], float) -> bool ''' check if port is open, if host == None check hostname and 127.0.0.1 :param int port: TCP port number :param hosts: list of hostnames or localhost if hosts is None. :type hosts: list[str] or None :return: True if at least on host is reachable, False otherwise. :rtype: boolean ''' if hosts is None: hosts = (socket.gethostname(), '127.0.0.1', '::1') for host in hosts: address = (host, int(port)) try: connection = socket.create_connection(address, timeout) connection.close() return True except EnvironmentError as ex: print('is_port_open({0}) failed: {1}'.format(port, ex)) return False
if __name__ == '__main__': import doctest doctest.testmod() # vim: set fileencoding=utf-8 ft=python sw=4 ts=4 :