Source code for univention.admin.handlers

# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""
This module is the base for all Univention Directory Management handler modules.
A UDM handler represents an abstraction of an LDAP object.

.. seealso:: :mod:`univention.admin.uldap`
.. seealso:: :mod:`univention.admin.modules`
.. seealso:: :mod:`univention.admin.objects`
.. seealso:: :mod:`univention.admin.mapping`
.. seealso:: :mod:`univention.admin.syntax`
.. seealso:: :mod:`univention.admin.uexceptions`
"""

from __future__ import absolute_import

import copy
import re
import time
import sys
import inspect
import traceback
from typing import Any, Dict, Iterable, List, Optional, Set, Text, Tuple, Union  # noqa: F401

import six
from ipaddress import ip_address, ip_network, IPv4Address, IPv6Address
import ldap
from ldap.filter import filter_format
from ldap.dn import explode_rdn, escape_dn_chars, str2dn, dn2str
from ldap.controls.readentry import PostReadControl

import univention.debug as ud

from univention.admindiary.client import write_event
from univention.admindiary.events import DiaryEvent

import univention.admin.filter
import univention.admin.uldap
import univention.admin.mapping
import univention.admin.modules
import univention.admin.uexceptions
import univention.admin.localization
import univention.admin.syntax
from univention.admin import configRegistry
from univention.admin.uldap import DN
try:
	import univention.lib.admember
	_prevent_to_change_ad_properties = univention.lib.admember.is_localhost_in_admember_mode()
except ImportError:
	ud.debug(ud.ADMIN, ud.WARN, "Failed to import univention.lib.admember")
	_prevent_to_change_ad_properties = False

_Attributes = Dict[Text, Union[bytes, List[bytes]]]
_Properties = Dict[Text, Union[Text, List[Text]]]

translation = univention.admin.localization.translation('univention/admin/handlers')
_ = translation.translate

# global caching variable
if configRegistry.is_true('directory/manager/samba3/legacy', False):
	s4connector_present = False  # type: Optional[bool]
elif configRegistry.is_false('directory/manager/samba3/legacy', False):
	s4connector_present = True
else:
	s4connector_present = None


[docs]def disable_ad_restrictions(disable=True): # type: (bool) -> None global _prevent_to_change_ad_properties _prevent_to_change_ad_properties = disable
[docs]class simpleLdap(object): """The base class for all UDM handler modules. :param co: *deprecated* parameter for a config. Please pass `None`. :type co: None :param lo: A required LDAP connection object which is used for all LDAP operations (search, create, modify). It should be bound to a user which has the LDAP permissions to do the required operations. :type lo: :class:`univention.admin.uldap.access` :param position: The LDAP container where a new object should be created in, or `None` for existing objects. :type position: :class:`univention.admin.uldap.position` or `None` :param dn: The DN of an existing LDAP object. If a object should be created the DN must not be passed here! :type dn: str or None :param superordinate: The superordinate object of this object. Can be omitted. It is automatically searched by the given DN or position. :type superordinate: :class:`univention.admin.handlers.simpleLdap` or `None`. :param attributes: The LDAP attributes of the LDAP object as dict. This should by default be omitted. To save performance when an LDAP search is done this can be used, e.g. by the lookup() method. :type attributes: None or dict The following attributes hold information about the state of this object: :ivar str dn: A LDAP distinguished name (DN) of this object (if exists, otherwise None) :ivar str module: the UDM handlers name (e.g. users/user) :ivar dict oldattr: The LDAP attributes of this object as dict. If the object does not exists the dict is empty. :ivar dict info: A internal dictionary which holds the values for every property. :ivar list options: A list of UDM options which are enabled on this object. Enabling options causes specific object classes and attributes to be added to the object. :ivar list policies: A list of DNs containing references to assigned policies. :ivar dict properties: a dict which maps all UDM properties to :class:`univention.admin.property` instances. :ivar univention.admin.mapping.mapping mapping: A :class:`univention.admin.mapping.mapping` instance containing a mapping of UDM property names to LDAP attribute names. :ivar dict oldinfo: A private copy of :attr:`info` containing the original properties which were set during object loading. This is only set by :func:`univention.admin.handlers.simpleLdap.save`. :ivar list old_options: A private copy of :attr:`options` containing the original options which were set during object loading. This is only set by :func:`univention.admin.handlers.simpleLdap.save`. :ivar list oldpolicies: A private copy of :attr:`policies` containing the original policies which were set during object loading. This is only set by :func:`univention.admin.handlers.simpleLdap.save`. .. caution:: Do not operate on :attr:`info` directly because this would bypass syntax validations. This object should be used like a dict. Properties should be assigned in the following way: obj['name'] = 'value' """ module = '' # the name of the module use_performant_ldap_search_filter = False def __init__(self, co, lo, position, dn=u'', superordinate=None, attributes=None): # type: (None, univention.admin.uldap.access, univention.admin.uldap.position, Text, simpleLdap, _Attributes) -> None self._exists = False self.co = None if isinstance(lo, univention.admin.uldap.access): self.lo = lo # type: univention.admin.uldap.access elif isinstance(lo, univention.uldap.access): ud.debug(ud.ADMIN, ud.ERROR, 'using univention.uldap.access instance is deprecated. Use univention.admin.uldap.access instead.') self.lo = univention.admin.uldap.access(lo=lo) else: raise TypeError('lo must be instance of univention.admin.uldap.access.') self.dn = dn.decode('utf-8') if isinstance(dn, bytes) else dn # type: Optional[Text] self.old_dn = self.dn # type: Optional[Text] self.superordinate = superordinate # type: Optional[univention.admin.handlers.simpleLdap] self.set_defaults = not self.dn # this object is newly created and so we can use the default values self.position = position or univention.admin.uldap.position(lo.base) # type: univention.admin.uldap.position if not position and self.dn: self.position.setDn(self.dn) self.info = {} # type: _Properties self.oldinfo = {} # type: _Properties self.policies = [] # type: List[Text] self.oldpolicies = [] # type: List[Text] self.policyObjects = {} # type: Dict[Text, simplePolicy] self.__no_default = [] # type: List[Text] self._open = False self.options = [] # type: List[Text] self.old_options = [] # type: List[Text] self.alloc = [] # type: List[Union[Tuple[str, str], Tuple[str, str, bool]]] # name,value,updateLastUsedValue # s4connector_present is a global caching variable than can be # None ==> ldap has not been checked for servers with service "S4 Connector" # True ==> at least one server with IP address (aRecord) is present # False ==> no server is present global s4connector_present if s4connector_present is None: s4connector_present = False searchResult = self.lo.searchDn(u'(&(|(objectClass=univentionDomainController)(objectClass=univentionMemberServer))(univentionService=S4 Connector)(|(aRecord=*)(aAAARecord=*)))') s4connector_present = bool(searchResult) self.s4connector_present = s4connector_present if not univention.admin.modules.modules: ud.debug(ud.ADMIN, ud.WARN, 'univention.admin.modules.update() was not called') univention.admin.modules.update() m = univention.admin.modules.get(self.module) if not hasattr(self, 'mapping'): self.mapping = getattr(m, 'mapping', None) self.oldattr = {} # type: _Attributes if attributes: self.oldattr = attributes elif self.dn: try: attr = self._ldap_attributes() self.oldattr = self.lo.get(self.dn, attr=attr, required=True) except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.noObject(self.dn) if self.oldattr: self._exists = True if not univention.admin.modules.virtual(self.module) and not univention.admin.modules.recognize(self.module, self.dn, self.oldattr): raise univention.admin.uexceptions.wrongObjectType('%s is not recognized as %s.' % (self.dn, self.module)) oldinfo = self.mapping.unmapValues(self.oldattr) oldinfo = self._post_unmap(oldinfo, self.oldattr) oldinfo = self._falsy_boolean_extended_attributes(oldinfo) self.info.update(oldinfo) self.policies = [x.decode('utf-8') for x in self.oldattr.get('univentionPolicyReference', [])] self.__set_options() self.save() self._validate_superordinate(False) @property def descriptions(self): # type: () -> Dict[Text, univention.admin.property] return univention.admin.modules.get(self.module).property_descriptions
[docs] def save(self): # type: () -> None """Saves the current internal object state as old state for later comparison when e.g. modifying this object. .. seealso:: This method should be called by :func:`univention.admin.handlers.simpleLdap.open` and after further modifications in modify() / create(). .. note:: self.oldattr is not set and must be set manually """ self.oldinfo = copy.deepcopy(self.info) self.old_dn = self.dn self.oldpolicies = copy.deepcopy(self.policies) self.options = list(set(self.options)) self.old_options = [] if self.exists(): self.old_options = copy.deepcopy(self.options)
[docs] def diff(self): # type: () -> List[Tuple[str, Any, Any]] """ Returns the difference between old and current state as a UDM modlist. :returns: A list of 3-tuples (udm-property-name, old-property-value, new-property-values). :rtype: list """ changes = [] # type: List[Tuple[str, Any, Any]] for key, prop in self.descriptions.items(): null = [] if prop.multivalue else None # type: Union[List, None] # remove properties which are disabled by options if prop.options and not set(prop.options) & set(self.options): if self.oldinfo.get(key, null) not in (null, None): ud.debug(ud.ADMIN, ud.INFO, "simpleLdap.diff: key %s not valid (option not set)" % key) changes.append((key, self.oldinfo[key], null)) continue if (self.oldinfo.get(key) or self.info.get(key)) and self.oldinfo.get(key, null) != self.info.get(key, null): changes.append((key, self.oldinfo.get(key, null), self.info.get(key, null))) return changes
[docs] def hasChanged(self, key): # type: (Union[str, List[str], Tuple[str]]) -> bool """ Checks if the given attribute(s) was (were) changed. :param key: The name of a property. :type key: str or list[str] or tuple[str] :returns: True if the property changed, False otherwise. :rtype: bool """ # FIXME: key can even be nested if not isinstance(key, six.string_types): return any(self.hasChanged(i) for i in key) if (not self.oldinfo.get(key, '') or self.oldinfo[key] == ['']) and (not self.info.get(key, '') or self.info[key] == ['']): return False return not univention.admin.mapping.mapCmp(self.mapping, key, self.oldinfo.get(key, ''), self.info.get(key, ''))
[docs] def ready(self): # type: () -> bool """Makes sure all preconditions are met before creating or modifying this object. It checks if all properties marked required are set. It checks if the superordinate is valid. :returns: True :rtype: bool :raises: :class:`univention.admin.uexceptions.insufficientInformation` """ missing = [] for name, p in self.descriptions.items(): # skip if this property is not present in the current option set if p.options and not set(p.options) & set(self.options): continue if p.required and (not self[name] or (isinstance(self[name], list) and self[name] == [u''])): ud.debug(ud.ADMIN, ud.INFO, "property %s is required but not set." % name) missing.append(name) if missing: raise univention.admin.uexceptions.insufficientInformation(_('The following properties are missing:\n%s') % ('\n'.join(missing),)) # when creating a object make sure that its position is underneath of its superordinate if not self.exists() and self.position and self.superordinate: if not self._ensure_dn_in_subtree(self.superordinate.dn, self.position.getDn()): raise univention.admin.uexceptions.insufficientInformation(_('The position must be in the subtree of the superordinate.')) self._validate_superordinate(True) return True
if six.PY2: def has_key(self, key): # type: (str) -> bool """ Checks if the property exists in this module and if it is enabled in the set UDM options. :param str key: The name of a property. :returns: True if the property exists and is enabled, False otherwise. :rtype: bool .. deprecated:: 4.4 Use :func:`univention.admin.handlers.simpleLdap.has_property` instead! """ return self.has_property(key)
[docs] def has_property(self, key): # type: (str) -> bool """ Checks if the property exists in this module and if it is enabled in the set UDM options. :param str key: The name of a property. :returns: True if the property exists and is enabled, False otherwise. :rtype: bool """ try: p = self.descriptions[key] except KeyError: return False if p.options: return bool(set(p.options) & set(self.options)) return True
def __setitem__(self, key, value): # type: (str, Any) -> None """Sets or unsets the property to the given value. :param str key: The name of a property. :param value: The value to set. :raises KeyError: if the property belongs to an option, which is currently not enabled. :raises: :class:`univention.admin.uexceptions.noProperty` or :class:`KeyError` if the property does not exists or is not enabled by the UDM options. :raises: :class:`univention.admin.uexceptions.valueRequired` if the value is unset but required. :raises: :class:`univention.admin.uexceptions.valueMayNotChange` if the values cannot be modified. :raises: :class:`univention.admin.uexceptions.valueInvalidSyntax` if the value is invalid. """ def _changeable(): yield self.descriptions[key].editable if not self.descriptions[key].may_change: yield key not in self.oldinfo or self.oldinfo[key] == value # if _prevent_to_change_ad_properties: # FIXME: users.user.object.__init__ modifies firstname and lastname by hand # yield not (self.descriptions[key].readonly_when_synced and self._is_synced_object() and self.exists()) # property does not exist if not self.has_property(key): # don't set value if the option is not enabled ud.debug(ud.ADMIN, ud.WARN, '__setitem__: Ignoring property %s' % key) try: self.descriptions[key] except KeyError: # raise univention.admin.uexceptions.noProperty(key) raise return # attribute may not be changed elif not all(_changeable()): raise univention.admin.uexceptions.valueMayNotChange(_('key=%(key)s old=%(old)s new=%(new)s') % {'key': key, 'old': self[key], 'new': value}, property=key) # required attribute may not be removed elif self.descriptions[key].required and not value: raise univention.admin.uexceptions.valueRequired(_('The property %s is required') % self.descriptions[key].short_description, property=key) # do nothing if self.info.get(key, None) == value: ud.debug(ud.ADMIN, ud.INFO, 'values are identical: %s:%s' % (key, value)) return if self.info.get(key, None) == self.descriptions[key].default(self): self.__no_default.append(key) if self.descriptions[key].multivalue: # make sure value is list if isinstance(value, six.string_types): value = [value] elif not isinstance(value, list): raise univention.admin.uexceptions.valueInvalidSyntax(_('The property %s must be a list') % (self.descriptions[key].short_description,), property=key) self.info[key] = [] for v in value: if not v: continue err = "" p = None try: s = self.descriptions[key].syntax p = s.parse(v) except univention.admin.uexceptions.valueError as emsg: err = emsg if not p: if not err: err = "" try: raise univention.admin.uexceptions.valueInvalidSyntax("%s: %s" % (key, err), property=key) except UnicodeEncodeError: # raise fails if err contains umlauts or other non-ASCII-characters raise univention.admin.uexceptions.valueInvalidSyntax(self.descriptions[key].short_description, property=key) self.info[key].append(p) elif not value and key in self.info: del self.info[key] elif value: err = "" p = None try: s = self.descriptions[key].syntax p = s.parse(value) except univention.admin.uexceptions.valueError as e: err = e if not p: if not err: err = "" try: raise univention.admin.uexceptions.valueInvalidSyntax("%s: %s" % (self.descriptions[key].short_description, err), property=key) except UnicodeEncodeError: # raise fails if err contains umlauts or other non-ASCII-characters raise univention.admin.uexceptions.valueInvalidSyntax("%s" % self.descriptions[key].short_description, property=key) self.info[key] = p def __getitem__(self, key): # type: (str) -> Any """ Get the currently set value of the given property. :param str key: The name of a property. :returns: The currently set value. If the value is not set the default value is returned. .. warning:: this method changes the set value to the default if it is unset. For a side effect free retrieval of the value use :func:`univention.admin.handlers.simpleLdap.get`. """ if not key: return None if key in self.info: if self.descriptions[key].multivalue and not isinstance(self.info[key], list): # why isn't this correct in the first place? ud.debug(ud.ADMIN, ud.WARN, 'The mapping for %s in %s is broken!' % (key, self.module)) self.info[key] = [self.info[key]] return self.info[key] elif key not in self.__no_default and self.descriptions[key].editable: self.info[key] = self.descriptions[key].default(self) return self.info[key] elif self.descriptions[key].multivalue: return [] else: return None
[docs] def get(self, key, default=None): # type: (str, Any) -> Any """ Return the currently set value of the given property. :param str key: The name of a property. :param default: The default to return if the property is not set. :returns: The currently set value. If the value is not set :attr:`default` is returned. """ return self.info.get(key, default)
def __contains__(self, key): # type: (str) -> bool """ Checks if the property exists in this module. :param key: The name of a property. :returns: True if the property exists, False otherwise. :rtype: bool .. warning:: This does not check if the property is also enabled by the UDM options. Use :func:`univention.admin.handlers.simpleLdap.has_property` instead. """ return key in self.descriptions
[docs] def keys(self): # type: () -> Iterable[str] """ Returns the names of all properties this module has. :returns: The list of property names. :rtype: list[str] """ return self.descriptions.keys()
[docs] def items(self): # type: () -> Iterable[Tuple[str, Any]] """ Return all items which belong to the current options - even if they are empty. :returns: a list of 2-tuples (udm-property-name, property-value). :rtype: list[tuple] .. warning:: In certain circumstances this sets the default value for every property (e.g. when having a new object). """ return [(key, self[key]) for key in self.keys() if self.has_property(key)]
[docs] def create(self, serverctrls=None, response=None): # type: (List[ldap.controls.LDAPControl], Dict[Text, Any]) -> Text """ Creates the LDAP object if it does not exists by building the list of attributes (addlist) and write it to LDAP. If this call raises an exception it is necessary to instantiate a new object before trying to create it again. :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be created. :raises: :class:`univention.admin.uexceptions.objectExists` if the object already exists. :raises: :class:`univention.admin.uexceptions.insufficientInformation` :param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request. :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :returns: The DN of the created object. :rtype: str """ if not univention.admin.modules.supports(self.module, 'add'): raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be created.') % (self.module,)) if self.exists(): raise univention.admin.uexceptions.objectExists(self.dn) if not isinstance(response, dict): response = {} try: self._ldap_pre_ready() self.ready() dn = self._create(response=response, serverctrls=serverctrls) except Exception: self._safe_cancel() raise for c in response.get('ctrls', []): if c.controlType == PostReadControl.controlType: self.oldattr.update(c.entry) self._write_admin_diary_create() return dn
def _get_admin_diary_event(self, event_name): name = self.module.replace('/', '_').upper() return DiaryEvent.get('UDM_%s_%s' % (name, event_name)) or DiaryEvent.get('UDM_GENERIC_%s' % event_name) def _get_admin_diary_args_names(self, event): return [ name for name in self.descriptions if name in event.args ] def _get_admin_diary_args(self, event): args = {'module': self.module} if event.name.startswith('UDM_GENERIC_'): value = self.dn for k, v in self.descriptions.items(): if v.identifies: value = self[k] break args['id'] = value else: for name in self._get_admin_diary_args_names(event): args[name] = str(self[name]) return args def _get_admin_diary_username(self): username = ldap.dn.explode_rdn(self.lo.binddn)[0] if username != 'cn=admin': username = username.rsplit('=', 1)[1] return username def _write_admin_diary_event(self, event, additional_args=None): try: event = self._get_admin_diary_event(event) if not event: return args = self._get_admin_diary_args(event) if args: if additional_args: args.update(additional_args) username = self._get_admin_diary_username() write_event(event, args, username=username) except Exception as exc: ud.debug(ud.ADMIN, ud.WARN, "Failed to write Admin Diary entry: %s" % exc) def _write_admin_diary_create(self): self._write_admin_diary_event('CREATED')
[docs] def modify(self, modify_childs=True, ignore_license=False, serverctrls=None, response=None): # type: (bool, bool, List[ldap.controls.LDAPControl], Dict[Text, Any]) -> Text """Modifies the LDAP object by building the difference between the current state and the old state of this object and write this modlist to LDAP. :param modify_childs: Specifies if child objects should be modified as well. :type modify_childs: bool :param ignore_license: If the license is exceeded the modification may fail. Setting this to True causes license checks to be disabled :type ignore_license: bool :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be modified. :raises: :class:`univention.admin.uexceptions.noObject` if the object does not exists. :raises: :class:`univention.admin.uexceptions.insufficientInformation` :returns: The DN of the modified object. :rtype: str """ if not univention.admin.modules.supports(self.module, 'edit'): # if the licence is exceeded 'edit' is removed from the modules operations. Nevertheless we need a way to make modifications then. if not ignore_license: raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be modified.') % (self.module,)) if not self.exists(): raise univention.admin.uexceptions.noObject(self.dn) if not isinstance(response, dict): response = {} try: self._ldap_pre_ready() self.ready() dn = self._modify(modify_childs, ignore_license=ignore_license, response=response) except Exception: self._safe_cancel() raise for c in response.get('ctrls', []): if c.controlType == PostReadControl.controlType: self.oldattr.update(c.entry) return dn
def _write_admin_diary_modify(self): self._write_admin_diary_event('MODIFIED') def _create_temporary_ou(self): # type: () -> Text name = u'temporary_move_container_%s' % time.time() module = univention.admin.modules.get('container/ou') position = univention.admin.uldap.position(u'%s' % self.lo.base) temporary_object = module.object(None, self.lo, position) temporary_object.open() temporary_object['name'] = name temporary_object.create() return u'ou=%s' % ldap.dn.escape_dn_chars(name) def _delete_temporary_ou_if_empty(self, temporary_ou): # type: (str) -> None """ Try to delete the organizational unit entry if it is empty. :param str temporary_ou: The distinguished name of the container. """ if not temporary_ou: return dn = u'%s,%s' % (temporary_ou, self.lo.base) module = univention.admin.modules.get('container/ou') temporary_object = univention.admin.modules.lookup(module, None, self.lo, scope='base', base=dn, required=True, unique=True)[0] temporary_object.open() try: temporary_object.remove() except (univention.admin.uexceptions.ldapError, ldap.NOT_ALLOWED_ON_NONLEAF): pass
[docs] def move(self, newdn, ignore_license=False, temporary_ou=None): # type: (str, bool, str) -> str """Moves the LDAP object to the target position. :param str newdn: The DN of the target position. :param bool ignore_license: If the license is exceeded the modification may fail. Setting this to True causes license checks to be disabled. :param str temporary_ou: The distiguished name of a temporary container which is used to rename the object if only is letter casing changes. :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be moved. :raises: :class:`univention.admin.uexceptions.noObject` if the object does not exists. :returns: The new DN of the moved object :rtype: str """ ud.debug(ud.ADMIN, ud.INFO, 'move: called for %s to %s' % (self.dn, newdn)) if not (univention.admin.modules.supports(self.module, 'move') or univention.admin.modules.supports(self.module, 'subtree_move')): raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be moved.') % (self.module,)) if self.lo.compare_dn(self.dn, self.lo.whoami()): raise univention.admin.uexceptions.invalidOperation(_('The own object cannot be moved.')) if not self.exists(): raise univention.admin.uexceptions.noObject(self.dn) if _prevent_to_change_ad_properties and self._is_synced_object(): raise univention.admin.uexceptions.invalidOperation(_('Objects from Active Directory can not be moved.')) def n(x): return dn2str(str2dn(x)) newdn = n(newdn) self.dn = n(self.dn) goaldn = self.lo.parentDn(newdn) goalmodule = univention.admin.modules.identifyOne(goaldn, self.lo.get(goaldn)) goalmodule = univention.admin.modules.get(goalmodule) if not goalmodule or not hasattr(goalmodule, 'childs') or not goalmodule.childs == 1: raise univention.admin.uexceptions.invalidOperation(_("Destination object can't have sub objects.")) if self.lo.compare_dn(self.dn.lower(), newdn.lower()): if self.dn == newdn: raise univention.admin.uexceptions.ldapError(_('Moving not possible: old and new DN are identical.')) else: # We must use a temporary folder because OpenLDAP does not allow a rename of an container with subobjects temporary_ou = self._create_temporary_ou() temp_dn = dn2str(str2dn(newdn)[:1] + str2dn(temporary_ou) + str2dn(self.lo.base)) self.dn = n(self.move(temp_dn, ignore_license, temporary_ou)) if newdn.lower().endswith(self.dn.lower()): raise univention.admin.uexceptions.ldapError(_("Moving into one's own sub container not allowed.")) if univention.admin.modules.supports(self.module, 'subtree_move'): # check if is subtree: subelements = self.lo.search(base=self.dn, scope='one', attr=[]) if subelements: olddn = self.dn ud.debug(ud.ADMIN, ud.INFO, 'move: found subelements, do subtree move: newdn: %s' % newdn) # create copy of myself module = univention.admin.modules.get(self.module) position = univention.admin.uldap.position(self.lo.base) position.setDn(self.lo.parentDn(newdn)) copyobject = module.object(None, self.lo, position) copyobject.options = self.options[:] copyobject.open() for key in self.keys(): copyobject[key] = self[key] copyobject.policies = self.policies copyobject.create() to_be_moved = [] moved = [] pattern = re.compile(u'%s$' % (re.escape(self.dn),), flags=re.I) try: for subolddn, suboldattrs in subelements: # Convert the DNs to lowercase before the replacement. The cases might be mixed up if the python lib is # used by the connector, for example: # subolddn: uid=user_test_h80,ou=TEST_H81,$LDAP_BASE # self.dn: ou=test_h81,$LDAP_BASE # newdn: OU=TEST_H81,ou=test_h82,$LDAP_BASE # -> subnewdn: uid=user_test_h80,OU=TEST_H81,ou=test_h82,$LDAP_BASE subnew_position = pattern.sub(dn2str(str2dn(self.lo.parentDn(subolddn))), newdn) subnewdn = dn2str(str2dn(subolddn)[:1] + str2dn(subnew_position)) ud.debug(ud.ADMIN, ud.INFO, 'move: subelement %r to %r' % (subolddn, subnewdn)) submodule = univention.admin.modules.identifyOne(subolddn, suboldattrs) submodule = univention.admin.modules.get(submodule) subobject = univention.admin.objects.get(submodule, None, self.lo, position='', dn=subolddn) if not subobject or not (univention.admin.modules.supports(submodule, 'move') or univention.admin.modules.supports(submodule, 'subtree_move')): subold_rdn = u'+'.join(explode_rdn(subolddn, 1)) type_ = univention.admin.modules.identifyOne(subolddn, suboldattrs) raise univention.admin.uexceptions.invalidOperation(_('Unable to move object %(name)s (%(type)s) in subtree, trying to revert changes.') % { 'name': subold_rdn, 'type': type_ and type_.module, }) to_be_moved.append((subobject, subolddn, subnewdn)) for subobject, subolddn, subnewdn in to_be_moved: subobject.open() subobject.move(subnewdn) moved.append((subolddn, subnewdn)) univention.admin.objects.get(univention.admin.modules.get(self.module), None, self.lo, position='', dn=self.dn).remove() self._delete_temporary_ou_if_empty(temporary_ou) except BaseException: ud.debug(ud.ADMIN, ud.ERROR, 'move: subtree move failed, trying to move back.') position = univention.admin.uldap.position(self.lo.base) position.setDn(self.lo.parentDn(olddn)) for subolddn, subnewdn in moved: submodule = univention.admin.modules.identifyOne(subnewdn, self.lo.get(subnewdn)) submodule = univention.admin.modules.get(submodule) subobject = univention.admin.objects.get(submodule, None, self.lo, position='', dn=subnewdn) subobject.open() subobject.move(subolddn) copyobject.remove() self._delete_temporary_ou_if_empty(temporary_ou) raise self.dn = newdn return newdn else: # normal move, fails on subtrees res = n(self._move(newdn, ignore_license=ignore_license)) self._delete_temporary_ou_if_empty(temporary_ou) return res else: res = n(self._move(newdn, ignore_license=ignore_license)) self._delete_temporary_ou_if_empty(temporary_ou) return res
[docs] def move_subelements(self, olddn, newdn, subelements, ignore_license=False): # type: (str, str, List[Tuple[str, Dict]], bool) -> Optional[List[Tuple[str, str]]] """ Internal function to move all children of a container. :param str olddn: The old distinguished name of the parent container. :param str newdn: The new distinguished name of the parent container. :param subelements: A list of 2-tuples (old-dn, old-attrs) for each child of the parent container. :type subelements: tuple[str, dict] :param bool ignore_license: If the license is exceeded the modification may fail. Setting this to True causes license checks to be disabled. :returns: A list of 2-tuples (old-dn, new-dn) :rtype: list[tuple[str, str]] """ if subelements: ud.debug(ud.ADMIN, ud.INFO, 'move: found subelements, do subtree move') moved = [] try: for subolddn, suboldattrs in subelements: ud.debug(ud.ADMIN, ud.INFO, 'move: subelement %s' % subolddn) subnewdn = re.sub(u'%s$' % (re.escape(olddn),), newdn, subolddn) # FIXME: looks broken submodule = univention.admin.modules.identifyOne(subolddn, suboldattrs) submodule = univention.admin.modules.get(submodule) subobject = univention.admin.objects.get(submodule, None, self.lo, position='', dn=subolddn) if not subobject or not (univention.admin.modules.supports(submodule, 'move') or univention.admin.modules.supports(submodule, 'subtree_move')): subold_rdn = u'+'.join(explode_rdn(subolddn, 1)) raise univention.admin.uexceptions.invalidOperation(_('Unable to move object %(name)s (%(type)s) in subtree, trying to revert changes.') % {'name': subold_rdn, 'type': univention.admin.modules.identifyOne(subolddn, suboldattrs)}) subobject.open() subobject._move(subnewdn) moved.append((subolddn, subnewdn)) return moved except Exception: ud.debug(ud.ADMIN, ud.ERROR, 'move: subtree move failed, try to move back') for subolddn, subnewdn in moved: submodule = univention.admin.modules.identifyOne(subnewdn, self.lo.get(subnewdn)) submodule = univention.admin.modules.get(submodule) subobject = univention.admin.objects.get(submodule, None, self.lo, position='', dn=subnewdn) subobject.open() subobject.move(subolddn) raise return None # FIXME
[docs] def remove(self, remove_childs=False): # type: (bool) -> None """ Removes this LDAP object. :param bool remove_childs: Specifies to remove children objects before removing this object. :raises: :class:`univention.admin.uexceptions.ldapError` (Operation not allowed on non-leaf: subordinate objects must be deleted first) if the object contains childrens and *remove_childs* is False. :raises: :class:`univention.admin.uexceptions.invalidOperation` if objects of this type do not support to be removed. :raises: :class:`univention.admin.uexceptions.noObject` if the object does not exists. """ if not univention.admin.modules.supports(self.module, 'remove'): raise univention.admin.uexceptions.invalidOperation(_('Objects of the "%s" object type can not be removed.') % (self.module,)) if not self.dn or not self.lo.get(self.dn): raise univention.admin.uexceptions.noObject(self.dn) if self.lo.compare_dn(self.dn, self.lo.whoami()): raise univention.admin.uexceptions.invalidOperation(_('The own object cannot be removed.')) return self._remove(remove_childs)
[docs] def get_gid_for_primary_group(self): # type: () -> str """ Return the numerical group ID of the primary group. :returns: The numerical group ID as a string or "99999" if no primary group is declared. :rtype: str :raises univention.admin.uexceptions.primaryGroup: if the object has no primary group. """ gidNum = u'99999' if self['primaryGroup']: try: gidNum = self.lo.getAttr(self['primaryGroup'], 'gidNumber', required=True)[0].decode('ASCII') except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.primaryGroup(self['primaryGroup']) return gidNum
[docs] def get_sid_for_primary_group(self): # type: () -> str """ Return the Windows security ID for the primary group. :returns: The security identifier of the primary group. :rtype: str :raises univention.admin.uexceptions.primaryGroup: if the object has no primary group. """ try: sidNum = self.lo.getAttr(self['primaryGroup'], 'sambaSID', required=True)[0].decode('ASCII') except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.primaryGroupWithoutSamba(self['primaryGroup']) return sidNum
def _ldap_pre_ready(self): # type: () -> None """Hook which is called before :func:`univention.admin.handlers.simpleLdap.ready`.""" pass def _ldap_pre_create(self): # type: () -> None """Hook which is called before the object creation.""" self.dn = self._ldap_dn() self.request_lock('cn-uid-position', self.dn) def _ldap_dn(self): # type: () -> Text """ Builds the LDAP DN of the object before creation by using the identifying properties to build the RDN. :returns: the distringuised name. :rtype: str """ identifier = [ (self.mapping.mapName(name), self.mapping.mapValueDecoded(name, self.info[name]), 2) for name, prop in self.descriptions.items() if prop.identifies ] return u'%s,%s' % (dn2str([identifier]), dn2str(str2dn(self.dn)[1:]) if self.exists() else self.position.getDn()) def _ldap_post_create(self): # type: () -> None """Hook which is called after the object creation.""" self._confirm_locks() def _ldap_pre_modify(self): # type: () -> None """Hook which is called before the object modification.""" pass def _ldap_post_modify(self): # type: () -> None """Hook which is called after the object modification.""" self._confirm_locks() def _ldap_pre_rename(self, newdn): # type: (str) -> None """ Hook which is called before renaming the object. :param str newdn: The new distiguished name the object will be renamed to. """ self.request_lock('cn-uid-position', newdn) def _ldap_post_rename(self, olddn): # type: (str) -> None """ Hook which is called after renaming the object. :param str olddn: The old distiguished name the object was renamed from. """ pass def _ldap_pre_move(self, newdn): # type: (str) -> None """ Hook which is called before the object moving. :param str newdn: The new distiguished name the object will be moved to. """ self.request_lock('cn-uid-position', newdn) def _ldap_post_move(self, olddn): # type: (str) -> None """ Hook which is called after the object moving. :param str olddn: The old distiguished name the object was moved from. """ pass def _ldap_pre_remove(self): # type: () -> None """Hook which is called before the object removal.""" pass def _ldap_post_remove(self): # type: () -> None """Hook which is called after the object removal.""" self._release_locks() def _safe_cancel(self): # type: () -> None try: self.cancel() except (KeyboardInterrupt, SystemExit, SyntaxError): raise except Exception: ud.debug(ud.ADMIN, ud.ERROR, "cancel() failed: %s" % (traceback.format_exc(),)) def _falsy_boolean_extended_attributes(self, info): # type: (_Properties) -> _Properties m = univention.admin.modules.get(self.module) for prop in getattr(m, 'extended_udm_attributes', []): if prop.syntax == 'boolean' and not info.get(prop.name): info[prop.name] = u'0' return info
[docs] def exists(self): # type: () -> bool """ Indicates that this object exists in LDAP. :returns: True if the object exists in LDAP, False otherwise. :rtype: bool """ return self._exists
def _validate_superordinate(self, must_exists=True): # type: (bool) -> None """Checks if the superordinate is set to a valid :class:`univention.admin.handlers.simpleLdap` object if this module requires a superordinate. It is ensured that the object type of the superordinate is correct. It is ensured that the object lies underneath of the superordinate position. :raises: :class:`univention.admin.uexceptions.insufficientInformation` :raises: :class:`univention.admin.uexceptions.noSuperordinate` """ superordinate_names = set(univention.admin.modules.superordinate_names(self.module)) if not superordinate_names: return # module has no superodinates if not self.dn and not self.position: # this check existed in all modules with superordinates, so still check it here, too raise univention.admin.uexceptions.insufficientInformation(_('Neither DN nor position given.')) if not self.superordinate: self.superordinate = univention.admin.objects.get_superordinate(self.module, None, self.lo, self.dn or self.position.getDn()) if not self.superordinate: if superordinate_names == {'settings/cn'}: ud.debug(ud.ADMIN, ud.WARN, 'No settings/cn superordinate was given.') return # settings/cn might be misued as superordinate, don't risk currently if not must_exists: return raise univention.admin.uexceptions.noSuperordinate(_('No superordinate object given')) # check if the superordinate is of the correct object type if not {self.superordinate.module} & superordinate_names: raise univention.admin.uexceptions.insufficientInformation(_('The given %r superordinate is expected to be of type %s.') % (self.superordinate.module, ', '.join(superordinate_names))) if self.dn and not self._ensure_dn_in_subtree(self.superordinate.dn, self.lo.parentDn(self.dn)): raise univention.admin.uexceptions.insufficientInformation(_('The DN must be underneath of the superordinate.')) def _ensure_dn_in_subtree(self, parent, dn): # type: (Text, Text) -> bool """ Checks if the given DN is underneath of the subtree of the given parent DN. :param str parent: The distiguished name of the parent container. :param str dn: The distinguished name to check. :returns: True if `dn` is underneath of `parent`, False otherwise. :rtype: bool """ while dn: if self.lo.compare_dn(dn, parent): return True dn = self.lo.parentDn(dn) return False
[docs] def call_udm_property_hook(self, hookname, module, changes=None): # types: (Text, Text, Dict[str, Tuple]) -> Dict[str, Tuple] """ Internal method to call a hook scripts of extended attributes. :param str hookname: The name of the hook function to call. :param str module: The name of the UDM module. :param dict changes: A list of changes. :returns: The (modified) list of changes. :rtype: dict or None """ m = univention.admin.modules.get(module.module) if hasattr(m, 'extended_udm_attributes'): for prop in m.extended_udm_attributes: if prop.hook is not None: func = getattr(prop.hook, hookname, None) if changes is None: func(module) else: changes = func(module, changes) return changes
[docs] def open(self): # type: () -> None """Opens this object. During the initialization of this object the current set LDAP attributes are mapped into :py:attr:`info`. This method makes it possible to e.g. resolve external references to other objects which are not represented in the raw LDAP attributes of this object, for example the group memberships of a user. By default only the `open` hook for extended attributes is called. This method can be subclassed. .. warning:: If this method changes anything in self.info it *must* call :py:meth:`save` afterwards. .. warning:: If your are going to do any modifications (such as creating, modifying, moving, removing this object) this method must be called directly after the constructor and before modifying any property. """ self._open = True self.call_udm_property_hook('hook_open', self) self.save()
def _remove_option(self, name): # type: (str) -> None """ Removes the UDM option if it is set. :param str name: The name of the option to remove. """ if name in self.options: self.options.remove(name) def __set_options(self): # type: () -> None """Enables the UDM options of this object by evaluating the currently set LDAP object classes. If the object does not exists yet the default options are enabled.""" options = univention.admin.modules.options(self.module) if 'objectClass' in self.oldattr: ocs = {x.decode('UTF-8') for x in self.oldattr['objectClass']} self.options = [ opt for opt, option in options.items() if not option.disabled and option.matches(ocs) and self.__app_option_enabled(opt, option) ] else: ud.debug(ud.ADMIN, ud.INFO, 'reset options to default by _define_options') self.options = [] self._define_options(options) def _define_options(self, module_options): # type: (Dict[str, Any]) -> None """ Enables all UDM options which are enabled by default. :param dict module_options: A mapping of option-name to option. """ ud.debug(ud.ADMIN, ud.INFO, 'modules/__init__.py _define_options: reset to default options') self.options.extend( name for name, opt in module_options.items() if not opt.disabled and opt.default )
[docs] def option_toggled(self, option): # type: (str) -> bool """ Checks if an UDM option was changed. :param str option: The name of the option to check. :returns: True if the option was changed, False otherwise. :rtype: bool .. warning:: This does not work for not yet existing objects. """ return option in set(self.options) ^ set(self.old_options)
[docs] def policy_reference(self, *policies): for policy in policies: if not ldap.dn.is_dn(policy): raise univention.admin.uexceptions.valueInvalidSyntax(policy) try: if b'univentionPolicy' not in self.lo.getAttr(policy, 'objectClass', required=True): raise univention.admin.uexceptions.valueError('Object is not a policy', policy) except ldap.NO_SUCH_OBJECT: raise univention.admin.uexceptions.noObject('Policy does not exists', policy) self.policies.extend(policy for policy in policies if not any(self.lo.compare_dn(pol, policy) for pol in self.policies))
[docs] def policy_dereference(self, *policies): for policy in policies: if not ldap.dn.is_dn(policy): raise univention.admin.uexceptions.valueInvalidSyntax(policy) self.policies = [policy for policy in self.policies if not any(self.lo.compare_dn(pol, policy) for pol in policies)]
[docs] def policiesChanged(self): # type: () -> bool return set(self.oldpolicies) != set(self.policies)
def __app_option_enabled(self, name, option): if option.is_app_option: return all(self[pname] in ('TRUE', '1', 'OK') for pname, prop in self.descriptions.items() if name in prop.options and prop.syntax.name in ('AppActivatedBoolean', 'AppActivatedTrue', 'AppActivatedOK')) return True
[docs] def description(self): # type: () -> str """ Return a descriptive string for the object. By default the relative distinguished name is returned. :returns: A descriptive string or `none` if no :py:attr:`dn` is not yet set. :rtype: str """ if self.dn: return u'+'.join(explode_rdn(self.dn, 1)) return u'none'
def _post_unmap(self, info, values): """ This method can be overwritten to define special un-map methods to map back from LDAP to UDM that can not be done with the default mapping API. :param info: The list of UDM properties. :param values: The list of LDAP attributes. :returns: The (modified) list of UDM properties. :rtype: """ return info def _post_map(self, modlist, diff): """ This method can be overwritten to define special map methods to map from UDM to LDAP that can not be done with the default mapping API. :param modlist: The list of LDAP modifications. :param list diff: A list of modified UDM properties. :returns: The (modified) list of LDAP modifications. :rtype: """ return modlist def _ldap_addlist(self): # type: () -> List[Tuple[Text, Any]] return [] def _ldap_modlist(self): """Builds the list of modifications when creating and modifying this object. It compares the old properties (:py:attr:`oldinfo`) with the new properties (:py:attr:`info`) and applies the LDAP mapping. Differences are added to the modlist which consists of a tuple with three items: ("LDAP attribute-name", [old, values], [new, values]) ("LDAP attribute-name", old_value, new_value) ("LDAP attribute-name", None, added_value) .. seealso:: :mod:`univention.uldap` for further information about the format of the modlist. This method can be overridden in a subclass to add special behavior, e.g. for properties which have no mapping defined. .. caution:: The final modlist used for creation of objects is mixed with the :func:`univention.admin.handlers.simpleLdap._ldap_addlist`. Make sure this method don't add attributes which are already set. :rtype: list of tuples """ diff_ml = self.diff() ml = univention.admin.mapping.mapDiff(self.mapping, diff_ml) ml = self._post_map(ml, diff_ml) if self.policiesChanged(): policy_ocs_set = b'univentionPolicyReference' in self.oldattr.get('objectClass', []) if self.policies and not policy_ocs_set: ml.append(('objectClass', b'', [b'univentionPolicyReference'])) elif not self.policies and policy_ocs_set: ml.append(('objectClass', b'univentionPolicyReference', b'')) ml.append(('univentionPolicyReference', [x.encode('UTF-8') for x in self.oldpolicies], [x.encode('UTF-8') for x in self.policies])) return ml def _create(self, response=None, serverctrls=None): """Create the object. Should only be called by :func:`univention.admin.handlers.simpleLdap.create`.""" self._ldap_pre_create() self._update_policies() self.call_udm_property_hook('hook_ldap_pre_create', self) self.set_default_values() # iterate over all properties and call checkLdap() of corresponding syntax self._call_checkLdap_on_all_property_syntaxes() al = self._ldap_addlist() al.extend(self._ldap_modlist()) al = self._ldap_object_classes_add(al) al = self.call_udm_property_hook('hook_ldap_addlist', self, al) # ensure univentionObject is set al.append(('objectClass', [b'univentionObject', ])) al.append(('univentionObjectType', [self.module.encode('utf-8'), ])) ud.debug(ud.ADMIN, ud.INFO, "create object with dn: %s" % (self.dn,)) ud.debug(ud.ADMIN, 99, 'Create dn=%r;\naddlist=%r;' % (self.dn, al)) # if anything goes wrong we need to remove the already created object, otherwise we run into 'already exists' errors try: self.lo.add(self.dn, al, serverctrls=serverctrls, response=response) self._exists = True self._ldap_post_create() except Exception: # ensure that there is no lock left exc = sys.exc_info() ud.debug(ud.ADMIN, ud.PROCESS, "Creating %r failed: %r" % (self.dn, exc[1])) try: self.cancel() except Exception: ud.debug(ud.ADMIN, ud.ERROR, "Post-create: cancel() failed: %s" % (traceback.format_exc(),)) try: if self._exists: # add succeeded but _ldap_post_create failed! obj = univention.admin.objects.get(univention.admin.modules.get(self.module), None, self.lo, self.position, self.dn) obj.open() obj.remove() except Exception: ud.debug(ud.ADMIN, ud.ERROR, "Post-create: remove() failed: %s" % (traceback.format_exc(),)) six.reraise(exc[0], exc[1], exc[2]) self.call_udm_property_hook('hook_ldap_post_create', self) self.save() return self.dn def _ldap_object_classes_add(self, al): m = univention.admin.modules.get(self.module) # evaluate extended attributes ocs = set() # type: Set[str] for prop in getattr(m, 'extended_udm_attributes', []): ud.debug(ud.ADMIN, ud.INFO, 'simpleLdap._create: info[%s]:%r = %r' % (prop.name, self.has_property(prop.name), self.info.get(prop.name))) if prop.syntax == 'boolean' and self.info.get(prop.name) == u'0': continue if self.has_property(prop.name) and self.info.get(prop.name): ocs.add(prop.objClass) module_options = univention.admin.modules.options(self.module) # add object classes of (especially extended) options for option in ['default'] + self.options: try: opt = module_options[option] except KeyError: ud.debug(ud.ADMIN, ud.INFO, '%r does not specify option %r' % (m.module, option)) continue ocs |= set(opt.objectClasses) # remove duplicated object classes for i in al: key, val = i[0], i[-1] # might be a triple if val and key.lower() == 'objectclass': val_list = [val] if not isinstance(val, (tuple, list)) else val val_unicode = [x.decode('UTF-8') if isinstance(x, bytes) else x for x in val_list] ocs -= set(val_unicode) # TODO: check six.string_types vs bytes everywhere for ocs calculations if ocs: al.append(('objectClass', [x.encode('UTF-8') for x in ocs])) return al def _modify(self, modify_childs=True, ignore_license=False, response=None, serverctrls=None): """Modify the object. Should only be called by :func:`univention.admin.handlers.simpleLdap.modify`.""" self.__prevent_ad_property_change() self._ldap_pre_modify() self._update_policies() self.call_udm_property_hook('hook_ldap_pre_modify', self) self.set_default_values() self._fix_app_options() # iterate over all properties and call checkLdap() of corresponding syntax self._call_checkLdap_on_all_property_syntaxes() ml = self._ldap_modlist() ml = self.call_udm_property_hook('hook_ldap_modlist', self, ml) ml = self._ldap_object_classes(ml) class wouldRename(Exception): @classmethod def on_rename(cls, dn, new_dn, ml): raise cls(dn, new_dn) # FIXME: timeout without exception if objectClass of Object is not exsistant !! ud.debug(ud.ADMIN, 99, 'Modify dn=%r;\nmodlist=%r;\noldattr=%r;' % (self.dn, ml, self.oldattr)) try: self.dn = self.lo.modify(self.dn, ml, ignore_license=ignore_license, serverctrls=serverctrls, response=response, rename_callback=wouldRename.on_rename) except wouldRename as exc: self._ldap_pre_rename(exc.args[1]) self.dn = self.lo.modify(self.dn, ml, ignore_license=ignore_license, serverctrls=serverctrls, response=response) self._ldap_post_rename(exc.args[0]) if ml: self._write_admin_diary_modify() self._ldap_post_modify() self.call_udm_property_hook('hook_ldap_post_modify', self) self.save() return self.dn
[docs] def set_default_values(self): # type: () -> None """Sets all the default values of all properties.""" # Make sure all default values are set... for name, p in self.descriptions.items(): # ... if property has no option or any required option is currently enabled if not self.has_property(name): continue set_defaults = self.set_defaults if not self.set_defaults and p.options and not set(self.old_options) & set(p.options): # set default values of properties which depend on an option but weren't activated prior modifying self.set_defaults = True try: if p.default(self): self[name] # __getitem__ sets default value finally: self.set_defaults = set_defaults
def _fix_app_options(self): # type: () -> None # for objects with objectClass=appObject and appObjectActivated=0 we must set appObjectActivated=1 for option, opt in getattr(univention.admin.modules.get(self.module), 'options', {}).items(): if not opt.is_app_option or not self.option_toggled(option) or option not in self.options: continue for pname, prop in self.descriptions.items(): if option in prop.options and prop.syntax.name in ('AppActivatedBoolean', 'AppActivatedTrue', 'AppActivatedOK'): self[pname] = True def _ldap_object_classes(self, ml): # type: (list) -> list """Detects the attributes changed in the given modlist, calculates the changes of the object class and appends it to the modlist.""" m = univention.admin.modules.get(self.module) def lowerset(vals): # type: (Iterable[str]) -> Set[str] return {x.lower() for x in vals} ocs = lowerset(x.decode('UTF-8') for x in _MergedAttributes(self, ml).get_attribute('objectClass')) unneeded_ocs = set() # type: Set[Text] required_ocs = set() # type: Set[Text] # evaluate (extended) options module_options = univention.admin.modules.options(self.module) available_options = set(module_options) options = set(self.options) if 'default' in available_options: options |= {'default', } old_options = set(self.old_options) if options != old_options: ud.debug(ud.ADMIN, ud.INFO, 'options=%r; old_options=%r' % (options, old_options)) unavailable_options = (options - available_options) | (old_options - available_options) if unavailable_options: # Bug #46586: as we simulate legacy options, this is no longer an error ud.debug(ud.ADMIN, ud.INFO, '%r does not provide options: %r' % (self.module, unavailable_options)) added_options = options - old_options - unavailable_options removed_options = old_options - options - unavailable_options # evaluate extended attributes for prop in getattr(m, 'extended_udm_attributes', []): ud.debug(ud.ADMIN, ud.INFO, 'simpleLdap._modify: extended attribute=%r oc=%r' % (prop.name, prop.objClass)) if self.has_property(prop.name) and self.info.get(prop.name) and (True if prop.syntax != 'boolean' else self.info.get(prop.name) != '0'): required_ocs |= {prop.objClass} continue if prop.deleteObjClass: unneeded_ocs |= {prop.objClass} # if the value is unset we need to remove the attribute completely if self.oldattr.get(prop.ldapMapping): ml = [x for x in ml if x[0].lower() != prop.ldapMapping.lower()] ml.append((prop.ldapMapping, self.oldattr.get(prop.ldapMapping), b'')) unneeded_ocs |= {oc for option in removed_options for oc in module_options[option].objectClasses} required_ocs |= {oc for option in added_options for oc in module_options[option].objectClasses} ocs -= lowerset(unneeded_ocs) ocs |= lowerset(required_ocs) if lowerset(x.decode('utf-8') for x in self.oldattr.get('objectClass', [])) == ocs: return ml ud.debug(ud.ADMIN, ud.INFO, 'OCS=%r; required=%r; removed: %r' % (ocs, required_ocs, unneeded_ocs)) # case normalize object class names schema = self.lo.get_schema() ocs = {x.names[0] for x in (schema.get_obj(ldap.schema.models.ObjectClass, x) for x in ocs) if x} # make sure we still have a structural object class if not schema.get_structural_oc(ocs): structural_ocs = schema.get_structural_oc(unneeded_ocs) if not structural_ocs: ud.debug(ud.ADMIN, ud.ERROR, 'missing structural object class. Modify will fail.') return ml ud.debug(ud.ADMIN, ud.WARN, 'Preventing to remove last structural object class %r' % (structural_ocs,)) ocs -= set(schema.get_obj(ldap.schema.models.ObjectClass, structural_ocs).names) # validate removal of object classes must, may = schema.attribute_types(ocs) allowed = {name.lower() for attr in may.values() for name in attr.names} | {name.lower() for attr in must.values() for name in attr.names} ml = [x for x in ml if x[0].lower() != 'objectclass'] ml.append(('objectClass', self.oldattr.get('objectClass', []), [x.encode('utf-8') for x in ocs])) newattr = ldap.cidict.cidict(_MergedAttributes(self, ml).get_attributes()) # make sure only attributes known by the object classes are set for attr, val in newattr.items(): if not val: continue if re.sub(u';binary$', u'', attr.lower()) not in allowed: ud.debug(ud.ADMIN, ud.WARN, 'The attribute %r is not allowed by any object class.' % (attr,)) # ml.append((attr, val, [])) # TODO: Remove the now invalid attribute instead return ml # require all MUST attributes to be set for attr in must.values(): if not any(newattr.get(name) or newattr.get(u'%s;binary' % (name,)) for name in attr.names): ud.debug(ud.ADMIN, ud.WARN, 'The attribute %r is required by the current object classes.' % (attr.names,)) return ml ml = [x for x in ml if x[0].lower() != 'objectclass'] ml.append(('objectClass', self.oldattr.get('objectClass', []), [x.encode('utf-8') for x in ocs])) return ml def _move_in_subordinates(self, olddn): result = self.lo.searchDn(base=self.lo.base, filter=filter_format(u'(&(objectclass=person)(secretary=%s))', [olddn])) for subordinate in result: self.lo.modify(subordinate, [('secretary', olddn.encode('utf-8'), self.dn.encode('utf-8'))]) def _move_in_groups(self, olddn): for group in self.oldinfo.get('groups', []) + [self.oldinfo.get('machineAccountGroup', '')]: if group != '': members = self.lo.getAttr(group, 'uniqueMember') newmembers = [ member for member in members if dn2str(str2dn(member)).lower() not in (dn2str(str2dn(olddn)).lower(), dn2str(str2dn(self.dn)).lower(), ) ] newmembers.append(self.dn.encode('UTF-8')) self.lo.modify(group, [('uniqueMember', members, newmembers)]) def _move(self, newdn, modify_childs=True, ignore_license=False): # type: (str, bool, bool) -> str """Moves this object to the new DN. Should only be called by :func:`univention.admin.handlers.simpleLdap.move`.""" self._ldap_pre_move(newdn) olddn = self.dn self.lo.rename(self.dn, newdn) self.dn = newdn try: self._move_in_groups(olddn) # can be done always, will do nothing if oldinfo has no attribute 'groups' self._move_in_subordinates(olddn) self._ldap_post_move(olddn) except Exception: # move back ud.debug(ud.ADMIN, ud.WARN, 'simpleLdap._move: self._ldap_post_move failed, move object back to %s' % olddn) self.lo.rename(self.dn, olddn) self.dn = olddn raise self._write_admin_diary_move(newdn) return self.dn def _write_admin_diary_move(self, position): self._write_admin_diary_event('MOVED', {'position': position}) def _remove(self, remove_childs=False): # type: (bool) -> None """Removes this object. Should only be called by :func:`univention.admin.handlers.simpleLdap.remove`.""" ud.debug(ud.ADMIN, ud.INFO, 'handlers/__init__._remove() called for %r with remove_childs=%r' % (self.dn, remove_childs)) if _prevent_to_change_ad_properties and self._is_synced_object(): raise univention.admin.uexceptions.invalidOperation(_('Objects from Active Directory can not be removed.')) self._ldap_pre_remove() self.call_udm_property_hook('hook_ldap_pre_remove', self) if remove_childs: subelements = [] # type: List[Tuple[str, Dict[str, List[str]]]] if b'FALSE' not in self.lo.getAttr(self.dn, 'hasSubordinates'): ud.debug(ud.ADMIN, ud.INFO, 'handlers/__init__._remove() children of base dn %s' % (self.dn,)) subelements = self.lo.search(base=self.dn, scope='one', attr=[]) for subolddn, suboldattrs in subelements: ud.debug(ud.ADMIN, ud.INFO, 'remove: subelement %s' % (subolddn,)) for submodule in univention.admin.modules.identify(subolddn, suboldattrs): subobject = submodule.object(None, self.lo, None, dn=subolddn, attributes=suboldattrs) subobject.open() try: subobject.remove(remove_childs) except univention.admin.uexceptions.base as exc: ud.debug(ud.ADMIN, ud.ERROR, 'remove: could not remove %r: %s: %s' % (subolddn, type(exc).__name__, exc)) break else: ud.debug(ud.ADMIN, ud.WARN, 'remove: could not identify UDM module of %r' % (subolddn,)) self.lo.delete(self.dn) self._exists = False self._ldap_post_remove() self.call_udm_property_hook('hook_ldap_post_remove', self) self.oldattr = {} self._write_admin_diary_remove() self.save() def _write_admin_diary_remove(self): # type: () -> None self._write_admin_diary_event('REMOVED')
[docs] def loadPolicyObject(self, policy_type, reset=0): # type: (str, int) -> simplePolicy pathlist = [] ud.debug(ud.ADMIN, ud.INFO, "loadPolicyObject: policy_type: %s" % policy_type) policy_module = univention.admin.modules.get(policy_type) # overwrite property descriptions univention.admin.ucr_overwrite_properties(policy_module, self.lo) # re-build layout if there any overwrites defined univention.admin.ucr_overwrite_module_layout(policy_module) # retrieve path info from 'cn=directory,cn=univention,<current domain>' object pathResult = self.lo.get('cn=directory,cn=univention,' + self.position.getDomain()) if not pathResult: pathResult = self.lo.get('cn=default containers,cn=univention,' + self.position.getDomain()) for i in pathResult.get('univentionPolicyObject', []): i = i.decode('utf-8') try: self.lo.searchDn(base=i, scope='base') pathlist.append(i) ud.debug(ud.ADMIN, ud.INFO, "loadPolicyObject: added path %s" % i) except Exception: ud.debug(ud.ADMIN, ud.INFO, "loadPolicyObject: invalid path setting: %s does not exist in LDAP" % i) continue # looking for next policy container break # at least one item has been found; so we can stop here since only pathlist[0] is used if not pathlist: policy_position = self.position else: policy_position = univention.admin.uldap.position(self.position.getBase()) policy_path = pathlist[0] try: prefix = univention.admin.modules.policyPositionDnPrefix(policy_module) self.lo.searchDn(base=u"%s,%s" % (prefix, policy_path), scope='base') policy_position.setDn(u"%s,%s" % (prefix, policy_path)) except Exception: policy_position.setDn(policy_path) for dn in self.policies: if univention.admin.modules.recognize(policy_module, dn, self.lo.get(dn)) and self.policyObjects.get(policy_type, None) and self.policyObjects[policy_type].cloned == dn and not reset: return self.policyObjects[policy_type] for dn in self.policies: modules = univention.admin.modules.identify(dn, self.lo.get(dn)) for module in modules: if univention.admin.modules.name(module) == policy_type: self.policyObjects[policy_type] = univention.admin.objects.get(module, None, self.lo, policy_position, dn=dn) self.policyObjects[policy_type].clone(self) self._init_ldap_search(self.policyObjects[policy_type]) return self.policyObjects[policy_type] if not modules: self.policies.remove(dn) if not self.policyObjects.get(policy_type, None) or reset: self.policyObjects[policy_type] = univention.admin.objects.get(policy_module, None, self.lo, policy_position) self.policyObjects[policy_type].copyIdentifier(self) self._init_ldap_search(self.policyObjects[policy_type]) return self.policyObjects[policy_type]
def _init_ldap_search(self, policy): # type: (simplePolicy) -> None properties = {} # type: Dict[str, univention.admin.property] if hasattr(policy, 'property_descriptions'): properties = policy.property_descriptions elif hasattr(policy, 'descriptions'): properties = policy.descriptions for pname, prop in properties.items(): if prop.syntax.name == 'LDAP_Search': prop.syntax._load(self.lo) if prop.syntax.viewonly: policy.mapping.unregister(pname, False) def _update_policies(self): # type: () -> None for policy_type, policy_object in self.policyObjects.items(): ud.debug(ud.ADMIN, ud.INFO, "simpleLdap._update_policies: processing policy of type: %s" % policy_type) if policy_object.changes: ud.debug(ud.ADMIN, ud.INFO, "simpleLdap._update_policies: trying to create policy of type: %s" % policy_type) ud.debug(ud.ADMIN, ud.INFO, "simpleLdap._update_policies: policy_object.info=%s" % policy_object.info) policy_object.create() univention.admin.objects.replacePolicyReference(self, policy_type, policy_object.dn)
[docs] def closePolicyObjects(self): # type: () -> None self.policyObjects = {}
[docs] def savePolicyObjects(self): # type: () -> None self._update_policies() self.closePolicyObjects()
[docs] def cancel(self): # type: () -> None """Cancels the object creation or modification. This method can be subclassed to revert changes for example releasing locks.""" self._release_locks()
def _release_locks(self): # type: () -> None """Release all temporary done locks""" while self.alloc: name, value = self.alloc.pop()[0:2] univention.debug.debug(univention.debug.ADMIN, univention.debug.INFO, 'release_lock(%s): %r' % (name, value)) univention.admin.allocators.release(self.lo, self.position, name, value) def _confirm_locks(self): # type: () -> None """ Confirm all temporary done locks. self.alloc should contain a 2-tuple or 3-tuple: (name:str, value:str) or (name:str, value:str, updateLastUsedValue:bool) """ while self.alloc: item = self.alloc.pop() name, value = item[0:2] updateLastUsedValue = True if len(item) > 2: updateLastUsedValue = item[2] univention.admin.allocators.confirm(self.lo, self.position, name, value, updateLastUsedValue=updateLastUsedValue)
[docs] def request_lock(self, name, value=None, updateLastUsedValue=True): """Request a lock for the given value""" try: if name == 'sid+user': value = univention.admin.allocators.requestUserSid(self.lo, self.position, value) name = 'sid' else: value = univention.admin.allocators.request(self.lo, self.position, name, value) except univention.admin.uexceptions.noLock: self._release_locks() raise if not updateLastUsedValue: # backwards compatibility: 2er-tuples required! self.alloc.append((name, value, updateLastUsedValue)) else: self.alloc.append((name, value)) return value
def _call_checkLdap_on_all_property_syntaxes(self): # type: () -> None """Calls checkLdap() method on every property if present. checkLdap() may raise an exception if the value does not match the constraints of the underlying syntax. """ properties = {} # type: Dict[str, univention.admin.property] if hasattr(self, 'descriptions'): properties = self.descriptions for pname, prop in properties.items(): if hasattr(prop.syntax, 'checkLdap'): if not self.exists() or self.hasChanged(pname): prop.syntax.checkLdap(self.lo, self.info.get(pname)) def __prevent_ad_property_change(self): # type: () -> None if not _prevent_to_change_ad_properties or not self._is_synced_object(): return for key in self.descriptions: if self.descriptions[key].readonly_when_synced: value = self.info.get(key) oldval = self.oldinfo.get(key) if oldval != value: raise univention.admin.uexceptions.valueMayNotChange(_('key=%(key)s old=%(old)s new=%(new)s') % {'key': key, 'old': oldval, 'new': value}, property=key) def _is_synced_object(self): # type: () -> bool """Checks whether this object was synchronized from Active Directory to UCS.""" flags = self.oldattr.get('univentionObjectFlag', []) return b'synced' in flags and b'docker' not in flags
[docs] @classmethod def get_default_containers(cls, lo): """ Returns list of default containers for this module. :param univention.admin.uldap.access lo: UDM LDAP access object. """ containers = univention.admin.modules.defaultContainers(univention.admin.modules.get_module(cls.module)) settings_directory = univention.admin.modules.get_module('settings/directory') try: default_containers = settings_directory.lookup(None, lo, '', required=True)[0] except univention.admin.uexceptions.noObject: return containers base = cls.module.split('/', 1)[0] if cls.module in ('shares/print', 'shares/printer', 'shares/printergroup'): base = 'printers' elif cls.module in ('computers/domaincontroller_master', 'computers/domaincontroller_backup', 'computers/domaincontroller_slave', 'computers/windows_domaincontroller'): base = 'domaincontroller' containers.extend(default_containers.info.get(base, [])) return containers
[docs] @classmethod def lookup(cls, co, lo, filter_s, base='', superordinate=None, scope='sub', unique=False, required=False, timeout=-1, sizelimit=0, serverctrls=None, response=None): # type: (None, univention.admin.uldap.access, str, str, Optional[str], str, bool, bool, int, int, Optional[List], Optional[Dict]) -> List[simpleLdap] """ Perform a LDAP search and return a list of instances. :param None co: obsolete config :param univention.admin.uldap.access lo: UDM LDAP access object. :param str filter_s: LDAP filter string. :param str base: LDAP search base distinguished name. :param str superordinate: Distinguished name of a superordinate object. :param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search. :param bool unique: Raise an exception if more than one object matches. :param bool required: Raise an exception instead of returning an empty dictionary. :param int timeout: wait at most `timeout` seconds for a search to complete. `-1` for no limit. :param int sizelimit: retrieve at most `sizelimit` entries for a search. `0` for no limit. :param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request. :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :return: A list of UDM objects. :rtype: list[simpleLdap] """ filter_s = cls.lookup_filter(filter_s, lo) if superordinate: filter_s = cls.lookup_filter_superordinate(filter_s, superordinate) filter_str = six.text_type(filter_s or u'') attr = cls._ldap_attributes() result = [] for dn, attrs in lo.search(filter_str, base, scope, attr, unique, required, timeout, sizelimit, serverctrls=serverctrls, response=response): try: result.append(cls(co, lo, None, dn=dn, superordinate=superordinate, attributes=attrs)) except univention.admin.uexceptions.base as exc: ud.debug(ud.ADMIN, ud.ERROR, 'lookup() of object %r failed: %s' % (dn, exc)) if required and not result: raise univention.admin.uexceptions.noObject('lookup(base=%r, filter_s=%r)' % (base, filter_s)) return result
[docs] @classmethod def lookup_filter(cls, filter_s=None, lo=None): # type: (Optional[str], Optional[univention.admin.uldap.access]) -> univention.admin.filter.conjunction """ Return a LDAP filter as a UDM filter expression. :param str filter_s: LDAP filter string. :param univention.admin.uldap.access lo: UDM LDAP access object. :returns: A LDAP filter expression. :rtype: univention.admin.filter.conjunction See :py:meth:`lookup`. """ filter_p = cls.unmapped_lookup_filter() # there are instances where the lookup/lookup_filter method of an module handler is called before # univention.admin.modules.update() was performed. (e.g. management/univention-directory-manager-modules/univention-dnsedit) module = univention.admin.modules.get_module(cls.module) filter_p.append_unmapped_filter_string(filter_s, cls.rewrite_filter, module.mapping) return filter_p
[docs] @classmethod def lookup_filter_superordinate(cls, filter, superordinate): return filter
[docs] @classmethod def unmapped_lookup_filter(cls): # type: () -> univention.admin.filter.conjunction """ Return a LDAP filter UDM filter expression. :returns: A LDAP filter expression. :rtype: univention.admin.filter.conjunction See :py:meth:`lookup_filter`. """ filter_conditions = [] if cls.use_performant_ldap_search_filter: filter_conditions.append(univention.admin.filter.expression(u'univentionObjectType', cls.module, escape=True)) else: object_classes = univention.admin.modules.options(cls.module).get(u'default', univention.admin.option()).objectClasses - {u'top', u'univentionPolicy', u'univentionObjectMetadata', u'person'} filter_conditions.extend(univention.admin.filter.expression(u'objectClass', ocs) for ocs in object_classes) return univention.admin.filter.conjunction(u'&', filter_conditions)
[docs] @classmethod def rewrite_filter(cls, filter, mapping): key = filter.variable # management/univention-management-console/src/univention/management/console/acl.py does not call univention.admin.modules.update() mod = univention.admin.modules.get_module(cls.module) property_ = mod.property_descriptions.get(key) if property_ and not isinstance(filter.value, (list, tuple)): if property_.multivalue: # special case: mutlivalue properties need to be a list when map()-ing filter.value = [filter.value] if issubclass(property_.syntax if inspect.isclass(property_.syntax) else type(property_.syntax), univention.admin.syntax.complex): # special case: complex syntax properties need to be a list (of lists, if multivalue) filter.value = [filter.value] elif not property_ and key == 'options' and filter.value in getattr(mod, 'options', {}): ocs = mod.options[filter.value] filter.variable = u'objectClass' if len(ocs.objectClasses) > 1: con = univention.admin.filter.conjunction(u'&', [univention.admin.filter.expression(u'objectClass', oc, escape=True) for oc in ocs.objectClasses]) filter.transform_to_conjunction(con) elif ocs.objectClasses: filter.value = list(ocs.objectClasses)[0] return try: if not mapping.shouldMap(filter.variable): return except KeyError: return filter.variable = mapping.mapName(key) if filter.value == '*' and property_ and issubclass(property_.syntax if inspect.isclass(property_.syntax) else type(property_.syntax), (univention.admin.syntax.IStates, univention.admin.syntax.boolean)): # special case: properties that are represented as Checkboxes in the # frontend should include '(!(propertyName=*))' in the ldap filter # if the Checkboxe is set to False to also find objects where the property # is not set. In that case we don't want to map the '*' to a different value. pass else: filter.value = mapping.mapValueDecoded(key, filter.value, encoding_errors='ignore') if isinstance(filter.value, (list, tuple)) and filter.value: # complex syntax filter.value = filter.value[0]
[docs] @classmethod def identify(cls, dn, attr, canonical=False): ocs = {x.decode('utf-8') for x in attr.get('objectClass', [])} required_object_classes = univention.admin.modules.options(cls.module).get('default', univention.admin.option()).objectClasses - {'top', 'univentionPolicy', 'univentionObjectMetadata', 'person'} return (ocs & required_object_classes) == required_object_classes
@classmethod def _ldap_attributes(cls): return []
[docs]class simpleComputer(simpleLdap): def __init__(self, co, lo, position, dn='', superordinate=None, attributes=[]): simpleLdap.__init__(self, co, lo, position, dn, superordinate, attributes) self.newPrimaryGroupDn = 0 self.oldPrimaryGroupDn = 0 self.ip = [] self.network_object = False self.old_network = 'None' self.__saved_dhcp_entry = None # read-only attribute containing the FQDN of the host self.descriptions['fqdn'] = univention.admin.property( short_description='FQDN', long_description='', syntax=univention.admin.syntax.string, may_change=False, ) self['dnsAlias'] = [] # defined here to avoid pseudo non-None value of [''] in modwizard search self.oldinfo['ip'] = [] self.info['ip'] = [] if self.exists(): ips = [ip_address(addr.decode('ASCII')).exploded for key in ('aRecord', 'aAAARecord') for addr in self.oldattr.get(key, [])] self.oldinfo['ip'] += ips self.info['ip'] += ips
[docs] def getMachineSid(self, lo, position, uidNum, rid=None): # if rid is given, use it regardless of s4 connector if rid: searchResult = self.lo.search(filter='objectClass=sambaDomain', attr=['sambaSID']) domainsid = searchResult[0][1]['sambaSID'][0].decode('ASCII') sid = domainsid + u'-' + rid return self.request_lock('sid', sid) else: # if no rid is given, create a domain sid or local sid if connector is present if self.s4connector_present: return u'S-1-4-%s' % uidNum else: num = uidNum while True: try: return self.request_lock('sid+user', num) except univention.admin.uexceptions.noLock: num = str(int(num) + 1)
# HELPER @classmethod def _ip_from_ptr(cls, zoneName, relativeDomainName): # type: (str, str) -> str """ Extract IP address from reverse DNS record. >>> simpleComputer._ip_from_ptr("2.1.in-addr.arpa", "4.3") '1.2.3.4' >>> simpleComputer._ip_from_ptr("0.0.0.0.0.0.0.0.0.8.b.d.1.0.0.2.ip6.arpa", "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0") '2001:db80:0000:0000:0000:0000:0000:0001' """ if 'ip6' in zoneName: return cls._ipv6_from_ptr(zoneName, relativeDomainName) else: return cls._ipv4_from_ptr(zoneName, relativeDomainName) @staticmethod def _ipv4_from_ptr(zoneName, relativeDomainName): # type: (str, str) -> str """ Extract IPv4 address from reverse DNS record. >>> simpleComputer._ipv4_from_ptr("2.1.in-addr.arpa", "4.3") '1.2.3.4' """ return '%s.%s' % ( '.'.join(reversed(zoneName.replace('.in-addr.arpa', '').split('.'))), '.'.join(reversed(relativeDomainName.split('.')))) @staticmethod def _ipv6_from_ptr(zoneName, relativeDomainName): # type: (str, str) -> str """ Extract IPv6 address from reverse DNS record. >>> simpleComputer._ipv6_from_ptr("0.0.0.0.0.0.0.0.0.8.b.d.1.0.0.2.ip6.arpa", "1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0") '2001:db80:0000:0000:0000:0000:0000:0001' """ fullName = relativeDomainName + '.' + zoneName.replace('.ip6.arpa', '') digits = fullName.split('.') blocks = [''.join(reversed(digits[i:i + 4])) for i in range(0, len(digits), 4)] return ':'.join(reversed(blocks)) @staticmethod def _is_ip(ip): # type: (str) -> bool """ Check if valid IPv4 (0.0.0.0 is allowed) or IPv6 address. :param ip: string. :returns: `True` if it is a valid IPv4 or IPv6 address., `False` otherwise. >>> simpleComputer._is_ip('192.0.2.0') True >>> simpleComputer._is_ip('::1') True >>> simpleComputer._is_ip('') False """ try: ip_address(u'%s' % (ip,)) ud.debug(ud.ADMIN, ud.INFO, 'IP[%s]? -> Yes' % ip) return True except ValueError: ud.debug(ud.ADMIN, ud.INFO, 'IP[%s]? -> No' % ip) return False
[docs] def open(self): """ Load the computer object from LDAP. """ simpleLdap.open(self) self.newPrimaryGroupDn = 0 self.oldPrimaryGroupDn = 0 self.ip_alredy_requested = 0 self.ip_freshly_set = False self.__multiip = len(self['mac']) > 1 or len(self['ip']) > 1 self['dnsEntryZoneForward'] = [] self['dnsEntryZoneReverse'] = [] self['dhcpEntryZone'] = [] self['groups'] = [] self['dnsEntryZoneAlias'] = [] # search forward zone and insert into the object if self['name']: tmppos = univention.admin.uldap.position(self.position.getDomain()) zones = [] searchFilter = filter_format('(&(objectClass=dNSZone)(relativeDomainName=%s)(!(cNAMERecord=*)))', [self['name']]) try: result = self.lo.search(base=tmppos.getBase(), scope='domain', filter=searchFilter, attr=['zoneName', 'aRecord', 'aAAARecord'], unique=False) for dn, attr in result: zoneName = attr['zoneName'][0].decode('UTF-8') for key in ('aRecord', 'aAAARecord'): if key in attr: zones.append((zoneName, [ip_address(x.decode('ASCII')).exploded for x in attr[key]])) ud.debug(ud.ADMIN, ud.INFO, 'zoneNames: %s' % zones) for zoneName, ips in zones: searchFilter = filter_format('(&(objectClass=dNSZone)(zoneName=%s)(relativeDomainName=@))', [zoneName]) results = self.lo.searchDn(base=tmppos.getBase(), scope='domain', filter=searchFilter, unique=False) for dn in results: for ip in ips: self['dnsEntryZoneForward'].append([dn, ip]) ud.debug(ud.ADMIN, ud.INFO, 'dnsEntryZoneForward: %s' % (self['dnsEntryZoneForward'],)) except univention.admin.uexceptions.insufficientInformation: self['dnsEntryZoneForward'] = [] raise for zoneName, ips in zones: searchFilter = filter_format('(&(objectClass=dNSZone)(|(PTRRecord=%s)(PTRRecord=%s.%s.)))', (self['name'], self['name'], zoneName)) try: results = self.lo.search(base=tmppos.getBase(), scope='domain', attr=['relativeDomainName', 'zoneName'], filter=searchFilter, unique=False) for dn, attr in results: ip = self._ip_from_ptr(attr['zoneName'][0].decode('UTF-8'), attr['relativeDomainName'][0].decode('UTF-8')) if not self._is_ip(ip): ud.debug(ud.ADMIN, ud.WARN, 'simpleComputer: dnsEntryZoneReverse: invalid IP address generated: %r' % (ip,)) continue entry = [self.lo.parentDn(dn), ip] if entry not in self['dnsEntryZoneReverse']: self['dnsEntryZoneReverse'].append(entry) except univention.admin.uexceptions.insufficientInformation: self['dnsEntryZoneReverse'] = [] raise ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: dnsEntryZoneReverse: %s' % self['dnsEntryZoneReverse']) for zoneName, ips in zones: searchFilter = filter_format('(&(objectClass=dNSZone)(|(cNAMERecord=%s)(cNAMERecord=%s.%s.)))', (self['name'], self['name'], zoneName)) try: results = self.lo.search(base=tmppos.getBase(), scope='domain', attr=['relativeDomainName', 'cNAMERecord', 'zoneName'], filter=searchFilter, unique=False) for dn, attr in results: dnsAlias = attr['relativeDomainName'][0].decode('UTF-8') self['dnsAlias'].append(dnsAlias) dnsAliasZoneContainer = self.lo.parentDn(dn) if attr['cNAMERecord'][0].decode('UTF-8') == self['name']: dnsForwardZone = attr['zoneName'][0].decode('UTF-8') else: dnsForwardZone = zoneName entry = [dnsForwardZone, dnsAliasZoneContainer, dnsAlias] if entry not in self['dnsEntryZoneAlias']: self['dnsEntryZoneAlias'].append(entry) except univention.admin.uexceptions.insufficientInformation: self['dnsEntryZoneAlias'] = [] raise ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: dnsEntryZoneAlias: %s' % self['dnsEntryZoneAlias']) for macAddress in self['mac']: # mac address may be an empty string (Bug #21958) if not macAddress: continue ud.debug(ud.ADMIN, ud.INFO, 'open: DHCP; we have a mac address: %s' % macAddress) ethernet = 'ethernet ' + macAddress searchFilter = filter_format('(&(dhcpHWAddress=%s)(objectClass=univentionDhcpHost))', (ethernet,)) ud.debug(ud.ADMIN, ud.INFO, 'open: DHCP; we search for "%s"' % searchFilter) try: results = self.lo.search(base=tmppos.getBase(), scope='domain', attr=['univentionDhcpFixedAddress'], filter=searchFilter, unique=False) ud.debug(ud.ADMIN, ud.INFO, 'open: DHCP; the result: "%s"' % results) for dn, attr in results: service = self.lo.parentDn(dn) if 'univentionDhcpFixedAddress' in attr: for ip in attr['univentionDhcpFixedAddress']: entry = (service, ip.decode('ASCII'), macAddress) if entry not in self['dhcpEntryZone']: self['dhcpEntryZone'].append(entry) else: entry = (service, '', macAddress) if entry not in self['dhcpEntryZone']: self['dhcpEntryZone'].append(entry) ud.debug(ud.ADMIN, ud.INFO, 'open: DHCP; self[ dhcpEntryZone ] = "%s"' % self['dhcpEntryZone']) except univention.admin.uexceptions.insufficientInformation: raise if self.exists(): if self.has_property('network'): self.old_network = self['network'] # get groupmembership self['groups'] = self.lo.searchDn(base=self.lo.base, filter=filter_format('(&(objectclass=univentionGroup)(uniqueMember=%s))', [self.dn])) if 'name' in self.info and 'domain' in self.info: self.info['fqdn'] = '%s.%s' % (self['name'], self['domain'])
def __modify_dhcp_object(self, position, mac, ip=None): # identify the dhcp object with the mac address name = self['name'] ud.debug(ud.ADMIN, ud.INFO, '__modify_dhcp_object: position: "%s"; name: "%s"; mac: "%s"; ip: "%s"' % (position, name, mac, ip)) if not all((name, mac)): return ethernet = 'ethernet %s' % mac bip = ip.encode('ASCII') if ip else b'' tmppos = univention.admin.uldap.position(self.position.getDomain()) if not position: ud.debug(ud.ADMIN, ud.WARN, 'could not access network object and given position is "None", using LDAP root as position for DHCP entry') position = tmppos.getBase() results = self.lo.search(base=position, scope='domain', attr=['univentionDhcpFixedAddress'], filter=filter_format('dhcpHWAddress=%s', [ethernet]), unique=False) if not results: # if the dhcp object doesn't exists, then we create it # but it is possible, that the hostname for the dhcp object is already used, so we use the _uv$NUM extension ud.debug(ud.ADMIN, ud.INFO, 'the dhcp object with the mac address "%s" does not exists, we create one' % ethernet) results = self.lo.searchDn(base=position, scope='domain', filter=filter_format('(&(objectClass=univentionDhcpHost)(|(cn=%s)(cn=%s_uv*)))', (name, name)), unique=False) if results: ud.debug(ud.ADMIN, ud.INFO, 'the host "%s" already has a dhcp object, so we search for the next free uv name' % (name)) RE = re.compile(r'cn=[^,]+_uv(\d+),') taken = {int(m.group(1)) for m in (RE.match(dn) for dn in results) if m} n = min(set(range(max(taken) + 2)) - taken) if taken else 0 name = '%s_uv%d' % (name, n) dn = 'cn=%s,%s' % (escape_dn_chars(name), position) ml = [ ('objectClass', [b'top', b'univentionObject', b'univentionDhcpHost']), ('univentionObjectType', [b'dhcp/host']), ('cn', [name.encode('UTF-8')]), ('dhcpHWAddress', [ethernet.encode('ASCII')]), ] if ip: ml.append(('univentionDhcpFixedAddress', [bip])) self.lo.add(dn, ml) ud.debug(ud.ADMIN, ud.INFO, 'we just added the object "%s"' % (dn,)) elif ip: # if the object already exists, we append or remove the ip address ud.debug(ud.ADMIN, ud.INFO, 'the dhcp object with the mac address "%s" exists, we change the ip' % ethernet) for dn, attr in results: if bip in attr.get('univentionDhcpFixedAddress', []): continue self.lo.modify(dn, [('univentionDhcpFixedAddress', b'', bip)]) ud.debug(ud.ADMIN, ud.INFO, 'we added the ip "%s"' % ip) def __rename_dns_object(self, position=None, old_name=None, new_name=None): for dns_line in self['dnsEntryZoneForward']: # dns_line may be the empty string if not dns_line: continue dn, ip = self.__split_dns_line(dns_line) if ':' in ip: # IPv6 results = self.lo.searchDn(base=dn, scope='domain', filter=filter_format('(&(relativeDomainName=%s)(aAAARecord=%s))', (old_name, ip)), unique=False) else: results = self.lo.searchDn(base=dn, scope='domain', filter=filter_format('(&(relativeDomainName=%s)(aRecord=%s))', (old_name, ip)), unique=False) for result in results: object = univention.admin.objects.get(univention.admin.modules.get('dns/host_record'), self.co, self.lo, position=self.position, dn=result) object.open() object['name'] = new_name object.modify() for dns_line in self['dnsEntryZoneReverse']: # dns_line may be the empty string if not dns_line: continue dn, ip = self.__split_dns_line(dns_line) results = self.lo.searchDn(base=dn, scope='domain', filter=filter_format('(|(pTRRecord=%s)(pTRRecord=%s.*))', (old_name, old_name)), unique=False) for result in results: object = univention.admin.objects.get(univention.admin.modules.get('dns/ptr_record'), self.co, self.lo, position=self.position, dn=result) object.open() object['ptr_record'] = [ptr_record.replace(old_name, new_name) for ptr_record in object.get('ptr_record', [])] object.modify() for entry in self['dnsEntryZoneAlias']: # entry may be the empty string if not entry: continue dnsforwardzone, dnsaliaszonecontainer, alias = entry results = self.lo.searchDn(base=dnsaliaszonecontainer, scope='domain', filter=filter_format('relativedomainname=%s', [alias]), unique=False) for result in results: object = univention.admin.objects.get(univention.admin.modules.get('dns/alias'), self.co, self.lo, position=self.position, dn=result) object.open() object['cname'] = '%s.%s.' % (new_name, dnsforwardzone) object.modify() def __rename_dhcp_object(self, old_name, new_name): module = univention.admin.modules.get('dhcp/host') tmppos = univention.admin.uldap.position(self.position.getDomain()) for mac in self['mac']: # mac may be the empty string if not mac: continue ethernet = 'ethernet %s' % mac results = self.lo.searchDn(base=tmppos.getBase(), scope='domain', filter=filter_format('dhcpHWAddress=%s', [ethernet]), unique=False) if not results: continue ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: filter [ dhcpHWAddress = %s ]; results: %s' % (ethernet, results)) for result in results: object = univention.admin.objects.get(module, self.co, self.lo, position=self.position, dn=result) object.open() object['host'] = object['host'].replace(old_name, new_name) object.modify() def __remove_from_dhcp_object(self, mac=None, ip=None): # if we got the mac address, then we remove the object # if we only got the ip address, we remove the ip address ud.debug(ud.ADMIN, ud.INFO, 'we should remove a dhcp object: mac="%s", ip="%s"' % (mac, ip)) dn = None tmppos = univention.admin.uldap.position(self.position.getDomain()) if ip and mac: ethernet = 'ethernet %s' % mac ud.debug(ud.ADMIN, ud.INFO, 'we only remove the ip "%s" from the dhcp object' % ip) results = self.lo.search(base=tmppos.getBase(), scope='domain', attr=['univentionDhcpFixedAddress'], filter=filter_format('(&(dhcpHWAddress=%s)(univentionDhcpFixedAddress=%s))', (ethernet, ip)), unique=False) for dn, attr in results: object = univention.admin.objects.get(univention.admin.modules.get('dhcp/host'), self.co, self.lo, position=self.position, dn=dn) object.open() if ip in object['fixedaddress']: ud.debug(ud.ADMIN, ud.INFO, 'fixedaddress: "%s"' % object['fixedaddress']) object['fixedaddress'].remove(ip) if len(object['fixedaddress']) == 0: object.remove() else: object.modify() dn = object.dn elif mac: ethernet = 'ethernet %s' % mac ud.debug(ud.ADMIN, ud.INFO, 'Remove the following mac: ethernet: "%s"' % ethernet) results = self.lo.search(base=tmppos.getBase(), scope='domain', attr=['univentionDhcpFixedAddress'], filter=filter_format('dhcpHWAddress=%s', [ethernet]), unique=False) for dn, attr in results: ud.debug(ud.ADMIN, ud.INFO, '... done') object = univention.admin.objects.get(univention.admin.modules.get('dhcp/host'), self.co, self.lo, position=self.position, dn=dn) object.remove() dn = object.dn elif ip: ud.debug(ud.ADMIN, ud.INFO, 'Remove the following ip: "%s"' % ip) results = self.lo.search(base=tmppos.getBase(), scope='domain', attr=['univentionDhcpFixedAddress'], filter=filter_format('univentionDhcpFixedAddress=%s', [ip]), unique=False) for dn, attr in results: ud.debug(ud.ADMIN, ud.INFO, '... done') object = univention.admin.objects.get(univention.admin.modules.get('dhcp/host'), self.co, self.lo, position=self.position, dn=dn) object.remove() dn = object.dn return dn def __split_dhcp_line(self, entry): service = entry[0] ip = '' try: # sanitize mac address # 0011.2233.4455 -> 00:11:22:33:44:55 -> is guaranteed to work together with our DHCP server # __split_dhcp_line may be used outside of UDM which means that MAC_Address.parse may not be called. mac = univention.admin.syntax.MAC_Address.parse(entry[-1]) if self._is_ip(entry[-2]): ip = entry[-2] except univention.admin.uexceptions.valueError: mac = '' return (service, ip, mac) def __split_dns_line(self, entry): zone = entry[0] if len(entry) > 1: ip = self._is_ip(entry[1]) and entry[1] or None else: ip = None ud.debug(ud.ADMIN, ud.INFO, 'Split entry %s into zone %s and ip %s' % (entry, zone, ip)) return (zone, ip) def __remove_dns_reverse_object(self, name, dnsEntryZoneReverse, ip): # type: (str, str, str) -> None def modify(rdn, zoneDN): # type: (Text, str) -> None zone_name = explode_rdn(zoneDN, True)[0] for dn, attributes in self.lo.search(scope='domain', attr=['pTRRecord'], filter=filter_format('(&(relativeDomainName=%s)(zoneName=%s))', (rdn, zone_name))): ptr_records = attributes.get('pTRRecord', []) removals = [] if len(ptr_records) > 1: removals = [b'%s.%s.' % (name.encode('UTF-8'), attributes2['zoneName'][0]) for dn2, attributes2 in self.lo.search(scope='domain', attr=['zoneName'], filter=filter_format('(&(relativeDomainName=%s)(objectClass=dNSZone))', [name]), unique=False)] if len(ptr_records) <= 1 or set(ptr_records) == set(removals): self.lo.delete('relativeDomainName=%s,%s' % (escape_dn_chars(rdn), zoneDN)) else: self.lo.modify(dn, [('pTRRecord', removals, b'')]) zone = univention.admin.handlers.dns.reverse_zone.object(self.co, self.lo, self.position, zoneDN) zone.open() zone.modify() ud.debug(ud.ADMIN, ud.INFO, 'we should remove a dns reverse object: dnsEntryZoneReverse="%s", name="%s", ip="%s"' % (dnsEntryZoneReverse, name, ip)) if dnsEntryZoneReverse: try: rdn = self.calc_dns_reverse_entry_name(ip, dnsEntryZoneReverse) except ValueError: pass else: modify(rdn, dnsEntryZoneReverse) elif ip: tmppos = univention.admin.uldap.position(self.position.getDomain()) results = self.lo.search(base=tmppos.getBase(), scope='domain', attr=['zoneDn'], filter=filter_format('(&(objectClass=dNSZone)(|(pTRRecord=%s)(pTRRecord=%s.*)))', (name, name)), unique=False) for dn, attr in results: ud.debug(ud.ADMIN, ud.INFO, 'DEBUG: dn: "%s"' % dn) zone = self.lo.parentDn(dn) ud.debug(ud.ADMIN, ud.INFO, 'DEBUG: zone: "%s"' % zone) try: rdn = self.calc_dns_reverse_entry_name(ip, zone) ud.debug(ud.ADMIN, ud.INFO, 'DEBUG: rdn: "%s"' % rdn) modify(rdn, zone) except ValueError as ex: ud.debug(ud.ADMIN, ud.INFO, 'DEBUG: rdn: "%s"' % ex) except univention.admin.uexceptions.noObject: pass def __add_dns_reverse_object(self, name, zoneDn, ip): # type: (str, str, str) -> None ud.debug(ud.ADMIN, ud.INFO, 'we should create a dns reverse object: zoneDn="%s", name="%s", ip="%s"' % (zoneDn, name, ip)) if not all((name, zoneDn, ip)): return addr, attr = self._ip2dns(ip) try: ipPart = self.calc_dns_reverse_entry_name(ip, zoneDn) except ValueError: raise univention.admin.uexceptions.missingInformation(_('Reverse zone and IP address are incompatible.')) tmppos = univention.admin.uldap.position(self.position.getDomain()) results = self.lo.search(base=tmppos.getBase(), scope='domain', attr=['zoneName'], filter=filter_format('(&(relativeDomainName=%s)(zoneName=*)(%s=%s))', (name, attr, addr.exploded)), unique=False) hostname_list = { u'%s.%s.' % (name, attr['zoneName'][0].decode('UTF-8')) for dn, attr in results } if not hostname_list: ud.debug(ud.ADMIN, ud.ERROR, 'Could not determine host record for name=%r, ip=%r. Not creating pointer record.' % (name, ip)) return results = self.lo.searchDn(base=tmppos.getBase(), scope='domain', filter=filter_format('(&(relativeDomainName=%s)(%s=%s))', [ipPart] + list(str2dn(zoneDn)[0][0][:2])), unique=False) if not results: self.lo.add('relativeDomainName=%s,%s' % (escape_dn_chars(ipPart), zoneDn), [ ('objectClass', [b'top', b'dNSZone', b'univentionObject']), ('univentionObjectType', [b'dns/ptr_record']), ('zoneName', [explode_rdn(zoneDn, True)[0].encode('UTF-8')]), ('relativeDomainName', [ipPart.encode('ASCII')]), ('PTRRecord', [x.encode('UTF-8') for x in hostname_list]) ]) # update Serial zone = univention.admin.handlers.dns.reverse_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() def __remove_dns_forward_object(self, name, zoneDn, ip=None): # type: (str, str, str) -> None ud.debug(ud.ADMIN, ud.INFO, 'we should remove a dns forward object: zoneDn="%s", name="%s", ip="%s"' % (zoneDn, name, ip)) if name: # check if dns forward object has more than one ip address if not ip: if zoneDn: self.lo.delete('relativeDomainName=%s,%s' % (escape_dn_chars(name), zoneDn)) zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: if zoneDn: base = zoneDn else: tmppos = univention.admin.uldap.position(self.position.getDomain()) base = tmppos.getBase() ud.debug(ud.ADMIN, ud.INFO, 'search base="%s"' % base) if ':' in ip: ip = IPv6Address(u'%s' % (ip,)).exploded (attrEdit, attrOther, ) = ('aAAARecord', 'aRecord', ) else: (attrEdit, attrOther, ) = ('aRecord', 'aAAARecord', ) results = self.lo.search(base=base, scope='domain', attr=['aRecord', 'aAAARecord', ], filter=filter_format('(&(relativeDomainName=%s)(%s=%s))', (name, attrEdit, ip)), unique=False, required=False) for dn, attr in results: if [x.decode('ASCII') for x in attr[attrEdit]] == [ip, ] and not attr.get(attrOther): # the <ip> to be removed is the last on the object # remove the object self.lo.delete(dn) else: # remove only the ip address attribute new_ip_list = copy.deepcopy(attr[attrEdit]) new_ip_list.remove(ip.encode('ASCII')) self.lo.modify(dn, [(attrEdit, attr[attrEdit], new_ip_list, ), ]) zone = zoneDn or self.lo.parentDn(dn) zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zone) zone.open() zone.modify() def __add_related_ptrrecords(self, zoneDN, ip): # type: (str, str) -> None if not all((zoneDN, ip)): return ptrrecord = '%s.%s.' % (self.info['name'], explode_rdn(zoneDN, True)[0]) ip_split = ip.split('.') ip_split.reverse() search_filter = filter_format('(|(relativeDomainName=%s)(relativeDomainName=%s)(relativeDomainName=%s))', (ip_split[0], '.'.join(ip_split[:1]), '.'.join(ip_split[:2]))) for dn, attributes in self.lo.search(base=zoneDN, scope='domain', attr=['pTRRecord'], filter=search_filter): self.lo.modify(dn, [('pTRRecord', '', ptrrecord)]) def __remove_related_ptrrecords(self, zoneDN, ip): # type: (str, str) -> None ptrrecord = '%s.%s.' % (self.info['name'], explode_rdn(zoneDN, True)[0]) ip_split = ip.split('.') ip_split.reverse() search_filter = filter_format('(|(relativeDomainName=%s)(relativeDomainName=%s)(relativeDomainName=%s))', (ip_split[0], '.'.join(ip_split[:1]), '.'.join(ip_split[:2]))) for dn, attributes in self.lo.search(base=zoneDN, scope='domain', attr=['pTRRecord'], filter=search_filter): if ptrrecord in attributes['pTRRecord']: self.lo.modify(dn, [('pTRRecord', ptrrecord, '')])
[docs] def check_common_name_length(self): # type: () -> None ud.debug(ud.ADMIN, ud.INFO, 'check_common_name_length with self["ip"] = %r and self["dnsEntryZoneForward"] = %r' % (self['ip'], self['dnsEntryZoneForward'], )) if len(self['ip']) > 0 and len(self['dnsEntryZoneForward']) > 0: for zone in self['dnsEntryZoneForward']: if zone == '': continue zoneName = explode_rdn(zone[0], True)[0] if len(zoneName) + len(self['name']) >= 63: ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: length of Common Name is too long: %d' % (len(zoneName) + len(self['name']) + 1)) raise univention.admin.uexceptions.commonNameTooLong()
@staticmethod def _ip2dns(addr): # type: (str) -> Tuple[Union[IPv4Address, IPv6Address], str] """ Convert IP address string to 2-tuple (IPAddress, LdapAttributeName). :param addr: an IPv4 or IPv6 address. :returns: 2-tuple (IPAddress, LdapAttributeName) >>> simpleComputer._ip2dns('127.0.0.1') (IPv4Address(u'127.0.0.1'), 'aRecord') >>> simpleComputer._ip2dns('::1') (IPv6Address(u'::1'), 'aAAARecord') """ ip = ip_address(u'%s' % (addr, )) return (ip, 'aAAARecord' if isinstance(ip, IPv6Address) else 'aRecord') def __modify_dns_forward_object(self, name, zoneDn, new_ip, old_ip): # type: (str, str, str, str) -> None ud.debug(ud.ADMIN, ud.INFO, 'we should modify a dns forward object: zoneDn="%s", name="%s", new_ip="%s", old_ip="%s"' % (zoneDn, name, new_ip, old_ip)) zone = None if old_ip and new_ip: if not zoneDn: tmppos = univention.admin.uldap.position(self.position.getDomain()) base = tmppos.getBase() else: base = zoneDn naddr, nattr = self._ip2dns(new_ip) oaddr, oattr = self._ip2dns(old_ip) results = self.lo.search(base=base, scope='domain', attr=['aRecord', 'aAAARecord'], filter=filter_format('(&(relativeDomainName=%s)(%s=%s))', (name, oattr, old_ip)), unique=False) for dn, attr in results: old_aRecord = attr.get('aRecord', []) new_aRecord = copy.deepcopy(old_aRecord) old_aAAARecord = attr.get('aAAARecord', []) new_aAAARecord = copy.deepcopy(old_aAAARecord) if isinstance(oaddr, IPv6Address): new_aAAARecord.remove(old_ip.encode('ASCII')) else: new_aRecord.remove(old_ip.encode('ASCII')) new_ip = naddr.exploded.encode('ASCII') if isinstance(naddr, IPv6Address): if new_ip not in new_aAAARecord: new_aAAARecord.append(new_ip) else: if new_ip not in new_aRecord: new_aRecord.append(new_ip) modlist = [] if old_aAAARecord != new_aAAARecord: modlist.append(('aAAARecord', old_aAAARecord, new_aAAARecord, )) if old_aRecord != new_aRecord: modlist.append(('aRecord', old_aRecord, new_aRecord, )) self.lo.modify(dn, modlist) if not zoneDn: zone = self.lo.parentDn(dn) if zoneDn: zone = zoneDn if zone: ud.debug(ud.ADMIN, ud.INFO, 'update the zon sOARecord for the zone: %s' % zone) zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zone) zone.open() zone.modify() def __add_dns_forward_object(self, name, zoneDn, ip): # type: (str, str, str) -> None ud.debug(ud.ADMIN, ud.INFO, 'we should add a dns forward object: zoneDn="%s", name="%s", ip="%s"' % (zoneDn, name, ip)) if not all((name, ip, zoneDn)): return addr = ip_address(u'%s' % (ip,)) if isinstance(addr, IPv6Address): self.__add_dns_forward_object_ipv6(name, zoneDn, addr) elif isinstance(addr, IPv4Address): self.__add_dns_forward_object_ipv4(name, zoneDn, addr) def __add_dns_forward_object_ipv6(self, name, zoneDn, addr): # type: (str, str, IPv6Address) -> None ip = addr.exploded.encode('ASCII') results = self.lo.search(base=zoneDn, scope='domain', attr=['aAAARecord'], filter=filter_format('(&(relativeDomainName=%s)(!(cNAMERecord=*)))', (name,)), unique=False) if not results: try: self.lo.add('relativeDomainName=%s,%s' % (escape_dn_chars(name), zoneDn), [ ('objectClass', [b'top', b'dNSZone', b'univentionObject']), ('univentionObjectType', [b'dns/host_record']), ('zoneName', explode_rdn(zoneDn, True)[0].encode('UTF-8')), ('aAAARecord', [ip]), ('relativeDomainName', [name.encode('UTF-8')]) ]) except univention.admin.uexceptions.objectExists as ex: raise univention.admin.uexceptions.dnsAliasRecordExists(ex) # TODO: check if zoneDn really a forwardZone, maybe it is a container under a zone zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: for dn, attr in results: if 'aAAARecord' in attr: new_ip_list = copy.deepcopy(attr['aAAARecord']) if ip not in new_ip_list: new_ip_list.append(ip) self.lo.modify(dn, [('aAAARecord', attr['aAAARecord'], new_ip_list)]) else: self.lo.modify(dn, [('aAAARecord', b'', ip)]) def __add_dns_forward_object_ipv4(self, name, zoneDn, addr): # type: (str, str, IPv4Address) -> None ip = addr.exploded.encode('ASCII') results = self.lo.search(base=zoneDn, scope='domain', attr=['aRecord'], filter=filter_format('(&(relativeDomainName=%s)(!(cNAMERecord=*)))', (name,)), unique=False) if not results: try: self.lo.add('relativeDomainName=%s,%s' % (escape_dn_chars(name), zoneDn), [ ('objectClass', [b'top', b'dNSZone', b'univentionObject']), ('univentionObjectType', [b'dns/host_record']), ('zoneName', explode_rdn(zoneDn, True)[0].encode('UTF-8')), ('ARecord', [ip]), ('relativeDomainName', [name.encode('UTF-8')]) ]) except univention.admin.uexceptions.objectExists as ex: raise univention.admin.uexceptions.dnsAliasRecordExists(ex) # TODO: check if zoneDn really a forwardZone, maybe it is a container under a zone zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: for dn, attr in results: if 'aRecord' in attr: new_ip_list = copy.deepcopy(attr['aRecord']) if ip not in new_ip_list: new_ip_list.append(ip) self.lo.modify(dn, [('aRecord', attr['aRecord'], new_ip_list)]) else: self.lo.modify(dn, [('aRecord', b'', ip)]) def __add_dns_alias_object(self, name, dnsForwardZone, dnsAliasZoneContainer, alias): # type: (str, str, str, str) -> None ud.debug(ud.ADMIN, ud.INFO, 'add a dns alias object: name="%s", dnsForwardZone="%s", dnsAliasZoneContainer="%s", alias="%s"' % (name, dnsForwardZone, dnsAliasZoneContainer, alias)) alias = alias.rstrip('.') if name and dnsForwardZone and dnsAliasZoneContainer and alias: results = self.lo.search(base=dnsAliasZoneContainer, scope='domain', attr=['cNAMERecord'], filter=filter_format('relativeDomainName=%s', (alias,)), unique=False) if not results: self.lo.add('relativeDomainName=%s,%s' % (escape_dn_chars(alias), dnsAliasZoneContainer), [ ('objectClass', [b'top', b'dNSZone', b'univentionObject']), ('univentionObjectType', [b'dns/alias']), ('zoneName', explode_rdn(dnsAliasZoneContainer, True)[0].encode('UTF-8')), ('cNAMERecord', [b"%s.%s." % (name.encode('UTF-8'), dnsForwardZone.encode('UTF-8'))]), ('relativeDomainName', [alias.encode('UTF-8')]) ]) # TODO: check if dnsAliasZoneContainer really is a forwardZone, maybe it is a container under a zone zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, dnsAliasZoneContainer) zone.open() zone.modify() else: # throw exception, cNAMERecord is single value raise univention.admin.uexceptions.dnsAliasAlreadyUsed(_('DNS alias is already in use.')) def __remove_dns_alias_object(self, name, dnsForwardZone, dnsAliasZoneContainer, alias=None): # type: (str, str, str, str) -> None ud.debug(ud.ADMIN, ud.INFO, 'remove a dns alias object: name="%s", dnsForwardZone="%s", dnsAliasZoneContainer="%s", alias="%s"' % (name, dnsForwardZone, dnsAliasZoneContainer, alias)) if name: if alias: if dnsAliasZoneContainer: self.lo.delete('relativeDomainName=%s,%s' % (escape_dn_chars(alias), dnsAliasZoneContainer)) zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, dnsAliasZoneContainer) zone.open() zone.modify() elif dnsForwardZone: tmppos = univention.admin.uldap.position(self.position.getDomain()) base = tmppos.getBase() ud.debug(ud.ADMIN, ud.INFO, 'search base="%s"' % base) results = self.lo.search(base=base, scope='domain', attr=['zoneName'], filter=filter_format('(&(objectClass=dNSZone)(relativeDomainName=%s)(cNAMERecord=%s.%s.))', (alias, name, dnsForwardZone)), unique=False, required=False) for dn, attr in results: # remove the object self.lo.delete(dn) # and update the SOA version number for the zone results = self.lo.searchDn(base=tmppos.getBase(), scope='domain', filter=filter_format('(&(objectClass=dNSZone)(zoneName=%s)(relativeDomainName=@))', (attr['zoneName'][0].decode('UTF-8'),)), unique=False) for zoneDn in results: zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: # could throw some exception pass else: if dnsForwardZone: tmppos = univention.admin.uldap.position(self.position.getDomain()) base = tmppos.getBase() ud.debug(ud.ADMIN, ud.INFO, 'search base="%s"' % base) results = self.lo.search(base=base, scope='domain', attr=['zoneName'], filter=filter_format('(&(objectClass=dNSZone)(&(cNAMERecord=%s)(cNAMERecord=%s.%s.))', (name, name, dnsForwardZone)), unique=False, required=False) for dn, attr in results: # remove the object self.lo.delete(dn) # and update the SOA version number for the zone results = self.lo.searchDn(base=tmppos.getBase(), scope='domain', filter=filter_format('(&(objectClass=dNSZone)(zoneName=%s)(relativeDomainName=@))', (attr['zoneName'][0].decode('UTF-8'),)), unique=False) for zoneDn in results: zone = univention.admin.handlers.dns.forward_zone.object(self.co, self.lo, self.position, zoneDn) zone.open() zone.modify() else: # not enough info to remove alias entries pass def _ldap_post_modify(self): super(simpleComputer, self)._ldap_post_modify() self.__multiip |= len(self['mac']) > 1 or len(self['ip']) > 1 for entry in self.__changes['dhcpEntryZone']['remove']: ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: dhcp check: removed: %s' % (entry,)) dn, ip, mac = self.__split_dhcp_line(entry) if not ip and not mac and not self.__multiip: mac = '' if self['mac']: mac = self['mac'][0] self.__remove_from_dhcp_object(mac=mac) else: self.__remove_from_dhcp_object(ip=ip, mac=mac) for entry in self.__changes['dhcpEntryZone']['add']: ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: dhcp check: added: %s' % (entry,)) dn, ip, mac = self.__split_dhcp_line(entry) if not ip and not mac and not self.__multiip: ip, mac = ('', '') if self['ip']: ip = self['ip'][0] if self['mac']: mac = self['mac'][0] self.__modify_dhcp_object(dn, mac, ip=ip) for entry in self.__changes['dnsEntryZoneForward']['remove']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__remove_dns_forward_object(self['name'], dn, ip) self.__remove_related_ptrrecords(dn, ip) else: self.__remove_dns_forward_object(self['name'], dn, ip) self.__remove_related_ptrrecords(dn, ip) for entry in self.__changes['dnsEntryZoneForward']['add']: ud.debug(ud.ADMIN, ud.INFO, 'we should add a dns forward object "%s"' % (entry,)) dn, ip = self.__split_dns_line(entry) ud.debug(ud.ADMIN, ud.INFO, 'changed the object to dn="%s" and ip="%s"' % (dn, ip)) if not ip and not self.__multiip: ud.debug(ud.ADMIN, ud.INFO, 'no multiip environment') ip = '' if self['ip']: ip = self['ip'][0] self.__add_dns_forward_object(self['name'], dn, ip) self.__add_related_ptrrecords(dn, ip) else: self.__add_dns_forward_object(self['name'], dn, ip) self.__add_related_ptrrecords(dn, ip) for entry in self.__changes['dnsEntryZoneReverse']['remove']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__remove_dns_reverse_object(self['name'], dn, ip) else: self.__remove_dns_reverse_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneReverse']['add']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__add_dns_reverse_object(self['name'], dn, ip) else: self.__add_dns_reverse_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneAlias']['remove']: dnsForwardZone, dnsAliasZoneContainer, alias = entry if not alias: # nonfunctional code since self[ 'alias' ] should be self[ 'dnsAlias' ], but this case does not seem to occur self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, self['alias'][0]) else: self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias) for entry in self.__changes['dnsEntryZoneAlias']['add']: ud.debug(ud.ADMIN, ud.INFO, 'we should add a dns alias object "%s"' % (entry,)) dnsForwardZone, dnsAliasZoneContainer, alias = entry ud.debug(ud.ADMIN, ud.INFO, 'changed the object to dnsForwardZone [%s], dnsAliasZoneContainer [%s] and alias [%s]' % (dnsForwardZone, dnsAliasZoneContainer, alias)) if not alias: self.__add_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, self['alias'][0]) else: self.__add_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias) for entry in self.__changes['mac']['remove']: self.__remove_from_dhcp_object(mac=entry) changed_ip = False for entry in self.__changes['ip']['remove']: # self.__remove_from_dhcp_object(ip=entry) if not self.__multiip: if len(self.__changes['ip']['add']) > 0: # we change single_ip = self.__changes['ip']['add'][0] self.__modify_dns_forward_object(self['name'], None, single_ip, entry) changed_ip = True for mac in self['mac']: dn = self.__remove_from_dhcp_object(ip=entry, mac=mac) try: dn = self.lo.parentDn(dn) self.__modify_dhcp_object(dn, mac, ip=single_ip) except Exception: pass else: # remove the dns objects self.__remove_dns_forward_object(self['name'], None, entry) else: self.__remove_dns_forward_object(self['name'], None, entry) self.__remove_from_dhcp_object(ip=entry) self.__remove_dns_reverse_object(self['name'], None, entry) for entry in self.__changes['ip']['add']: if not self.__multiip: if self.get('dnsEntryZoneForward', []) and not changed_ip: self.__add_dns_forward_object(self['name'], self['dnsEntryZoneForward'][0][0], entry) for dnsEntryZoneReverse in self.get('dnsEntryZoneReverse', []): x, ip = self.__split_dns_line(dnsEntryZoneReverse) zoneIsV6 = explode_rdn(x, True)[0].endswith('.ip6.arpa') entryIsV6 = ':' in entry if zoneIsV6 == entryIsV6: self.__add_dns_reverse_object(self['name'], x, entry) if self.__changes['name']: ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: name has changed') self.__update_groups_after_namechange() self.__rename_dhcp_object(old_name=self.__changes['name'][0], new_name=self.__changes['name'][1]) self.__rename_dns_object(position=None, old_name=self.__changes['name'][0], new_name=self.__changes['name'][1]) self.update_groups() def __remove_associated_domain(self, entry): dn, ip = self.__split_dns_line(entry) domain = explode_rdn(dn, 1)[0] if self.info.get('domain', None) == domain: self.info['domain'] = None def __set_associated_domain(self, entry): dn, ip = self.__split_dns_line(entry) domain = explode_rdn(dn, 1)[0] if not self.info.get('domain', None): self.info['domain'] = domain def _ldap_modlist(self): self.__changes = { 'mac': {'remove': [], 'add': []}, 'ip': {'remove': [], 'add': []}, 'name': None, 'dnsEntryZoneForward': {'remove': [], 'add': []}, 'dnsEntryZoneReverse': {'remove': [], 'add': []}, 'dnsEntryZoneAlias': {'remove': [], 'add': []}, 'dhcpEntryZone': {'remove': [], 'add': []} } ml = [] if self.hasChanged('mac'): for macAddress in self.info.get('mac', []): if macAddress in self.oldinfo.get('mac', []): continue try: self.__changes['mac']['add'].append(self.request_lock('mac', macAddress)) except univention.admin.uexceptions.noLock: raise univention.admin.uexceptions.macAlreadyUsed(macAddress) for macAddress in self.oldinfo.get('mac', []): if macAddress in self.info.get('mac', []): continue self.__changes['mac']['remove'].append(macAddress) oldAddresses = self.oldinfo.get('ip') or () newAddresses = self.info.get('ip') or () if oldAddresses != newAddresses: old_addr = [ip_address(u'%s' % addr) for addr in oldAddresses] old_ipv4 = [addr.exploded.encode('ASCII') for addr in old_addr if isinstance(addr, IPv4Address)] old_ipv6 = [addr.exploded.encode('ASCII') for addr in old_addr if isinstance(addr, IPv6Address)] new_addr = [ip_address(u'%s' % addr) for addr in newAddresses] new_ipv4 = [addr.exploded.encode('ASCII') for addr in new_addr if isinstance(addr, IPv4Address)] new_ipv6 = [addr.exploded.encode('ASCII') for addr in new_addr if isinstance(addr, IPv6Address)] ml.append(('aRecord', old_ipv4, new_ipv4)) ml.append(('aAAARecord', old_ipv6, new_ipv6)) if self.hasChanged('ip'): for ipAddress in self['ip']: if not ipAddress: continue if ipAddress in self.oldinfo.get('ip'): continue if not self.ip_alredy_requested: try: ipAddress = self.request_lock('aRecord', ipAddress) except univention.admin.uexceptions.noLock: self.ip_alredy_requested = 0 raise univention.admin.uexceptions.ipAlreadyUsed(ipAddress) self.__changes['ip']['add'].append(ipAddress) for ipAddress in self.oldinfo.get('ip', []): if ipAddress in self.info['ip']: continue self.__changes['ip']['remove'].append(ipAddress) if self.hasChanged('name'): ml.append(('sn', self.oldattr.get('sn', [None])[0], self['name'].encode('UTF-8'))) self.__changes['name'] = (self.oldattr.get('sn', [b''])[0].decode("UTF-8") or None, self['name']) if self.hasChanged('ip') or self.hasChanged('mac'): dhcp = [self.__split_dhcp_line(entry) for entry in self.info.get('dhcpEntryZone', [])] if len(newAddresses) <= 1 and len(self.info.get('mac', [])) == 1 and dhcp: # In this special case, we assume the mapping between ip/mac address to be # unique. The dhcp entry needs to contain the mac address (as specified by # the ldap search for dhcp entries), the ip address may not correspond to # the ip address associated with the computer ldap object, but this would # be erroneous anyway. We therefore update the dhcp entry to correspond to # the current ip and mac address. (Bug #20315) self.info['dhcpEntryZone'] = [ (dn, newAddresses[0] if newAddresses else '', self.info['mac'][0]) for (dn, ip, _mac) in dhcp ] else: # in all other cases, we remove old dhcp entries that do not match ip or # mac addresses (Bug #18966) removedIPs = set(self.oldinfo.get('ip', [])) - set(self['ip']) removedMACs = set(self.oldinfo.get('mac', [])) - set(self['mac']) self.info['dhcpEntryZone'] = [ (dn, ip, _mac) for (dn, ip, _mac) in dhcp if not (ip in removedIPs or _mac in removedMACs) ] if self.hasChanged('dhcpEntryZone'): if 'dhcpEntryZone' in self.oldinfo: if 'dhcpEntryZone' in self.info: for entry in self.oldinfo['dhcpEntryZone']: if entry not in self.info['dhcpEntryZone']: self.__changes['dhcpEntryZone']['remove'].append(entry) else: for entry in self.oldinfo['dhcpEntryZone']: self.__changes['dhcpEntryZone']['remove'].append(entry) if 'dhcpEntryZone' in self.info: for entry in self.info['dhcpEntryZone']: # check if line is valid dn, ip, mac = self.__split_dhcp_line(entry) if dn and mac: if entry not in self.oldinfo.get('dhcpEntryZone', []): self.__changes['dhcpEntryZone']['add'].append(entry) else: raise univention.admin.uexceptions.invalidDhcpEntry(_('The DHCP entry for this host should contain the zone LDAP-DN, the IP address and the MAC address.')) if self.hasChanged('dnsEntryZoneForward'): for entry in self.oldinfo.get('dnsEntryZoneForward', []): if entry not in self.info.get('dnsEntryZoneForward', []): self.__changes['dnsEntryZoneForward']['remove'].append(entry) self.__remove_associated_domain(entry) for entry in self.info.get('dnsEntryZoneForward', []): if entry == '': continue if entry not in self.oldinfo.get('dnsEntryZoneForward', []): self.__changes['dnsEntryZoneForward']['add'].append(entry) self.__set_associated_domain(entry) if self.hasChanged('dnsEntryZoneReverse'): for entry in self.oldinfo.get('dnsEntryZoneReverse', []): if entry not in self.info.get('dnsEntryZoneReverse', []): self.__changes['dnsEntryZoneReverse']['remove'].append(entry) for entry in self.info.get('dnsEntryZoneReverse', []): if entry not in self.oldinfo.get('dnsEntryZoneReverse', []): self.__changes['dnsEntryZoneReverse']['add'].append(entry) if self.hasChanged('dnsEntryZoneAlias'): for entry in self.oldinfo.get('dnsEntryZoneAlias', []): if entry not in self.info.get('dnsEntryZoneAlias', []): self.__changes['dnsEntryZoneAlias']['remove'].append(entry) for entry in self.info.get('dnsEntryZoneAlias', []): # check if line is valid dnsForwardZone, dnsAliasZoneContainer, alias = entry if dnsForwardZone and dnsAliasZoneContainer and alias: if entry not in self.oldinfo.get('dnsEntryZoneAlias', []): self.__changes['dnsEntryZoneAlias']['add'].append(entry) else: raise univention.admin.uexceptions.invalidDNSAliasEntry(_('The DNS alias entry for this host should contain the zone name, the alias zone container LDAP-DN and the alias.')) self.__multiip = len(self['mac']) > 1 or len(self['ip']) > 1 ml += super(simpleComputer, self)._ldap_modlist() return ml
[docs] @classmethod def calc_dns_reverse_entry_name(cls, sip, reverseDN): # type: (Text, Text) -> Text """ >>> simpleComputer.calc_dns_reverse_entry_name('10.200.2.5', 'subnet=2.200.10.in-addr.arpa') u'5' >>> simpleComputer.calc_dns_reverse_entry_name('10.200.2.5', 'subnet=200.10.in-addr.arpa') u'5.2' >>> simpleComputer.calc_dns_reverse_entry_name('2001:db8::3', 'subnet=0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa') u'3.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0' >>> simpleComputer.calc_dns_reverse_entry_name('1.2.3.4', 'subnet=2.in-addr.arpa') Traceback (most recent call last): ... ValueError: 4.3.2.1.in-addr.arpa not in .2.in-addr.arpa """ addr = ip_address(u'%s' % (sip,)) rev = addr.reverse_pointer subnet = u".%s" % (explode_rdn(reverseDN, True)[0],) if not rev.endswith(subnet): raise ValueError("%s not in %s" % (rev, subnet)) return rev[:-len(subnet)]
def _ldap_pre_create(self): super(simpleComputer, self)._ldap_pre_create() self.check_common_name_length() def _ldap_pre_modify(self): super(simpleComputer, self)._ldap_pre_modify() self.check_common_name_length() def _ldap_post_create(self): super(simpleComputer, self)._ldap_post_create() for entry in self.__changes['dhcpEntryZone']['remove']: ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: dhcp check: removed: %s' % (entry,)) dn, ip, mac = self.__split_dhcp_line(entry) if not ip and not mac and not self.__multiip: mac = '' if self['mac']: mac = self['mac'][0] self.__remove_from_dhcp_object(mac=mac) else: self.__remove_from_dhcp_object(ip=ip, mac=mac) for entry in self.__changes['dhcpEntryZone']['add']: ud.debug(ud.ADMIN, ud.INFO, 'simpleComputer: dhcp check: added: %s' % (entry,)) dn, ip, mac = self.__split_dhcp_line(entry) if not ip and not mac and not self.__multiip: if len(self['ip']) > 0 and len(self['mac']) > 0: self.__modify_dhcp_object(dn, self['mac'][0], ip=self['ip'][0]) else: self.__modify_dhcp_object(dn, mac, ip=ip) for entry in self.__changes['dnsEntryZoneForward']['remove']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__remove_dns_forward_object(self['name'], dn, ip) else: self.__remove_dns_forward_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneForward']['add']: ud.debug(ud.ADMIN, ud.INFO, 'we should add a dns forward object "%s"' % (entry,)) dn, ip = self.__split_dns_line(entry) ud.debug(ud.ADMIN, ud.INFO, 'changed the object to dn="%s" and ip="%s"' % (dn, ip)) if not ip and not self.__multiip: ud.debug(ud.ADMIN, ud.INFO, 'no multiip environment') ip = '' if self['ip']: ip = self['ip'][0] self.__add_dns_forward_object(self['name'], dn, ip) else: self.__add_dns_forward_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneReverse']['remove']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__remove_dns_reverse_object(self['name'], dn, ip) else: self.__remove_dns_reverse_object(self['name'], dn, ip) for entry in self.__changes['dnsEntryZoneReverse']['add']: dn, ip = self.__split_dns_line(entry) if not ip and not self.__multiip: ip = '' if self['ip']: ip = self['ip'][0] self.__add_dns_reverse_object(self['name'], dn, ip) else: self.__add_dns_reverse_object(self['name'], dn, ip) if not self.__multiip: if len(self.get('dhcpEntryZone', [])) > 0: dn, ip, mac = self['dhcpEntryZone'][0] for entry in self.__changes['mac']['add']: if len(self['ip']) > 0: self.__modify_dhcp_object(dn, entry, ip=self['ip'][0]) else: self.__modify_dhcp_object(dn, entry) for entry in self.__changes['ip']['add']: if len(self['mac']) > 0: self.__modify_dhcp_object(dn, self['mac'][0], ip=entry) for entry in self.__changes['dnsEntryZoneAlias']['remove']: dnsForwardZone, dnsAliasZoneContainer, alias = entry if not alias: # nonfunctional code since self[ 'alias' ] should be self[ 'dnsAlias' ], but this case does not seem to occur self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, self['alias'][0]) else: self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias) for entry in self.__changes['dnsEntryZoneAlias']['add']: ud.debug(ud.ADMIN, ud.INFO, 'we should add a dns alias object "%s"' % (entry,)) dnsForwardZone, dnsAliasZoneContainer, alias = entry ud.debug(ud.ADMIN, ud.INFO, 'changed the object to dnsForwardZone [%s], dnsAliasZoneContainer [%s] and alias [%s]' % (dnsForwardZone, dnsAliasZoneContainer, alias)) if not alias: self.__add_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, self['alias'][0]) else: self.__add_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias) self.update_groups() def _ldap_post_remove(self): if self['mac']: for macAddress in self['mac']: if macAddress: self.alloc.append(('mac', macAddress)) if self['ip']: for ipAddress in self['ip']: if ipAddress: self.alloc.append(('aRecord', ipAddress)) super(simpleComputer, self)._ldap_post_remove() # remove computer from groups groups = copy.deepcopy(self['groups']) if self.oldinfo.get('primaryGroup'): groups.append(self.oldinfo.get('primaryGroup')) for group in groups: groupObject = univention.admin.objects.get(univention.admin.modules.get('groups/group'), self.co, self.lo, self.position, group) groupObject.fast_member_remove([self.dn], [x.decode('UTF-8') for x in self.oldattr.get('uid', [])], ignore_license=True) def __update_groups_after_namechange(self): oldname = self.oldinfo.get('name') newname = self.info.get('name') if not oldname: ud.debug(ud.ADMIN, ud.ERROR, '__update_groups_after_namechange: oldname is empty') return olddn = self.old_dn.encode('UTF-8') newdn = self.dn.encode('UTF-8') oldUid = b'%s$' % oldname.encode('UTF-8') newUid = b'%s$' % newname.encode('UTF-8') ud.debug(ud.ADMIN, ud.INFO, '__update_groups_after_namechange: olddn=%s' % olddn) ud.debug(ud.ADMIN, ud.INFO, '__update_groups_after_namechange: newdn=%s' % newdn) new_groups = set(self.info.get('groups', [])) old_groups = set(self.oldinfo.get('groups', [])) for group in new_groups | old_groups: # Using the UDM groups/group object does not work at this point. The computer object has already been renamed. # During open() of groups/group each member is checked if it exists. Because the computer object with "olddn" is missing, # it won't show up in groupobj['hosts']. That's why the uniqueMember/memberUid updates is done directly via # self.lo.modify() oldMemberUids = self.lo.getAttr(group, 'memberUid') newMemberUids = copy.deepcopy(oldMemberUids) if group in new_groups: ud.debug(ud.ADMIN, ud.INFO, '__update_groups_after_namechange: changing memberUid in grp=%s' % (group,)) if oldUid in newMemberUids: newMemberUids.remove(oldUid) if newUid not in newMemberUids: newMemberUids.append(newUid) self.lo.modify(group, [('memberUid', oldMemberUids, newMemberUids)]) else: ud.debug(ud.ADMIN, ud.INFO, '__update_groups_after_namechange: removing memberUid from grp=%s' % (group,)) if oldUid in oldMemberUids: oldMemberUids = oldUid newMemberUids = b'' self.lo.modify(group, [('memberUid', oldMemberUids, newMemberUids)]) # we are doing the uniqueMember seperately because of a potential refint overlay that already changed the dn for us oldUniqueMembers = self.lo.getAttr(group, 'uniqueMember') newUniqueMembers = copy.deepcopy(oldUniqueMembers) if group in new_groups: ud.debug(ud.ADMIN, ud.INFO, '__update_groups_after_namechange: changing uniqueMember in grp=%s' % (group,)) if olddn in newUniqueMembers: newUniqueMembers.remove(olddn) if newdn not in newUniqueMembers: newUniqueMembers.append(newdn) self.lo.modify(group, [('uniqueMember', oldUniqueMembers, newUniqueMembers)]) else: if olddn in oldUniqueMembers: ud.debug(ud.ADMIN, ud.INFO, '__update_groups_after_namechange: removing uniqueMember from grp=%s' % (group,)) oldUniqueMembers = olddn newUniqueMembers = b'' self.lo.modify(group, [('uniqueMember', oldUniqueMembers, newUniqueMembers)]) if newdn in oldUniqueMembers: ud.debug(ud.ADMIN, ud.INFO, '__update_groups_after_namechange: removing uniqueMember from grp=%s' % (group,)) oldUniqueMembers = newdn newUniqueMembers = b'' self.lo.modify(group, [('uniqueMember', oldUniqueMembers, newUniqueMembers)])
[docs] def update_groups(self): # type: () -> None if not self.hasChanged('groups') and not self.oldPrimaryGroupDn and not self.newPrimaryGroupDn: return ud.debug(ud.ADMIN, ud.INFO, 'updating groups') old_groups = DN.set(self.oldinfo.get('groups', [])) new_groups = DN.set(self.info.get('groups', [])) if self.oldPrimaryGroupDn: old_groups += DN.set([self.oldPrimaryGroupDn]) if self.newPrimaryGroupDn: new_groups.add(DN(self.newPrimaryGroupDn)) # prevent machineAccountGroup from being removed if self.has_property('machineAccountGroup'): machine_account_group = DN.set([self['machineAccountGroup']]) new_groups += machine_account_group old_groups -= machine_account_group for group in old_groups ^ new_groups: groupdn = str(group) groupObject = univention.admin.objects.get(univention.admin.modules.get('groups/group'), self.co, self.lo, self.position, groupdn) groupObject.open() # add this computer to the group hosts = DN.set(groupObject['hosts']) if group not in new_groups: # remove this computer from the group hosts.discard(DN(self.old_dn)) else: hosts.add(DN(self.dn)) groupObject['hosts'] = list(DN.values(hosts)) groupObject.modify(ignore_license=True)
[docs] def primary_group(self): # type: () -> None if not self.hasChanged('primaryGroup'): return ud.debug(ud.ADMIN, ud.INFO, 'updating primary groups') primaryGroupNumber = self.lo.getAttr(self['primaryGroup'], 'gidNumber', required=True) self.newPrimaryGroupDn = self['primaryGroup'] self.lo.modify(self.dn, [('gidNumber', b'None', primaryGroupNumber[0])]) if 'samba' in self.options: primaryGroupSambaNumber = self.lo.getAttr(self['primaryGroup'], 'sambaSID', required=True) self.lo.modify(self.dn, [('sambaPrimaryGroupSID', b'None', primaryGroupSambaNumber[0])])
[docs] def cleanup(self): # type: () -> None self.open() if self['dnsEntryZoneForward']: for dnsEntryZoneForward in self['dnsEntryZoneForward']: dn, ip = self.__split_dns_line(dnsEntryZoneForward) try: self.__remove_dns_forward_object(self['name'], dn, None) except Exception as e: ud.debug(ud.ADMIN, ud.WARN, 'dnsEntryZoneForward.delete(%s): %s' % (dnsEntryZoneForward, e)) if self['dnsEntryZoneReverse']: for dnsEntryZoneReverse in self['dnsEntryZoneReverse']: dn, ip = self.__split_dns_line(dnsEntryZoneReverse) try: self.__remove_dns_reverse_object(self['name'], dn, ip) except Exception as e: ud.debug(ud.ADMIN, ud.WARN, 'dnsEntryZoneReverse.delete(%s): %s' % (dnsEntryZoneReverse, e)) if self['dhcpEntryZone']: for dhcpEntryZone in self['dhcpEntryZone']: dn, ip, mac = self.__split_dhcp_line(dhcpEntryZone) try: self.__remove_from_dhcp_object(mac=mac) except Exception as e: ud.debug(ud.ADMIN, ud.WARN, 'dhcpEntryZone.delete(%s): %s' % (dhcpEntryZone, e)) if self['dnsEntryZoneAlias']: for entry in self['dnsEntryZoneAlias']: dnsForwardZone, dnsAliasZoneContainer, alias = entry try: self.__remove_dns_alias_object(self['name'], dnsForwardZone, dnsAliasZoneContainer, alias) except Exception as e: ud.debug(ud.ADMIN, ud.WARN, 'dnsEntryZoneAlias.delete(%s): %s' % (entry, e)) # remove service record entries (see Bug #26400) ud.debug(ud.ADMIN, ud.INFO, '_ldap_post_remove: clean up service records, host records, and IP address saved at the forward zone') ips = set(self['ip'] or []) fqdn = self['fqdn'] fqdnDot = '%s.' % fqdn # we might have entries w/ or w/out trailing '.' # iterate over all reverse zones for zone in self['dnsEntryZoneReverse'] or []: # load zone object ud.debug(ud.ADMIN, ud.INFO, 'clean up entries for zone: %s' % zone) if len(zone) < 1: continue zoneObj = univention.admin.objects.get( univention.admin.modules.get('dns/reverse_zone'), self.co, self.lo, self.position, dn=zone[0]) zoneObj.open() # clean up nameserver records if 'nameserver' in zoneObj: if fqdnDot in zoneObj['nameserver']: ud.debug( ud.ADMIN, ud.INFO, 'removing %s from dns zone %s' % (fqdnDot, zone[0])) # nameserver is required in reverse zone if len(zoneObj['nameserver']) > 1: zoneObj['nameserver'].remove(fqdnDot) zoneObj.modify() # iterate over all forward zones for zone in self['dnsEntryZoneForward'] or []: # load zone object ud.debug(ud.ADMIN, ud.INFO, 'clean up entries for zone: %s' % zone) if len(zone) < 1: continue zoneObj = univention.admin.objects.get( univention.admin.modules.get('dns/forward_zone'), self.co, self.lo, self.position, dn=zone[0]) zoneObj.open() ud.debug(ud.ADMIN, ud.INFO, 'zone aRecords: %s' % zoneObj['a']) zone_obj_modified = False # clean up nameserver records if 'nameserver' in zoneObj: if fqdnDot in zoneObj['nameserver']: ud.debug( ud.ADMIN, ud.INFO, 'removing %s from dns zone %s' % (fqdnDot, zone)) # nameserver is required in forward zone if len(zoneObj['nameserver']) > 1: zoneObj['nameserver'].remove(fqdnDot) zone_obj_modified = True # clean up aRecords of zone itself new_entries = list(set(zoneObj['a']) - ips) if len(new_entries) != len(zoneObj['a']): ud.debug( ud.ADMIN, ud.INFO, 'Clean up zone records:\n%s ==> %s' % (zoneObj['a'], new_entries)) zoneObj['a'] = new_entries zone_obj_modified = True if zone_obj_modified: zoneObj.modify() # clean up service records for irecord in univention.admin.modules.lookup('dns/srv_record', self.co, self.lo, base=self.lo.base, scope='sub', superordinate=zoneObj): irecord.open() new_entries = [j for j in irecord['location'] if fqdn not in j and fqdnDot not in j] if len(new_entries) != len(irecord['location']): ud.debug(ud.ADMIN, ud.INFO, 'Entry found in "%s":\n%s ==> %s' % (irecord.dn, irecord['location'], new_entries)) irecord['location'] = new_entries irecord.modify() # clean up host records (that should probably be done correctly by Samba4) for irecord in univention.admin.modules.lookup('dns/host_record', self.co, self.lo, base=self.lo.base, scope='sub', superordinate=zoneObj): irecord.open() new_entries = list(set(irecord['a']) - ips) if len(new_entries) != len(irecord['a']): ud.debug(ud.ADMIN, ud.INFO, 'Entry found in "%s":\n%s ==> %s' % (irecord.dn, irecord['a'], new_entries)) irecord['a'] = new_entries irecord.modify()
def __setitem__(self, key, value): raise_after = None ips = [ip for ip in self['ip'] if ip] if self.has_property('ip') and self['ip'] else [] ip1 = self['ip'][0] if len(ips) == 1 else '' macs = [mac for mac in self['mac'] if mac] if self.has_property('mac') and self['mac'] else [] mac1 = self['mac'][0] if len(macs) == 1 else '' if key == 'network': if self.old_network != value: if value and value != 'None': network_object = univention.admin.handlers.networks.network.object(self.co, self.lo, self.position, value) network_object.open() subnet = ip_network(u"%(network)s/%(netmask)s" % network_object, strict=False) if not ips or ip_address(u'%s' % (ip1,)) not in subnet: if self.ip_freshly_set: raise_after = univention.admin.uexceptions.ipOverridesNetwork else: # get next IP network_object.refreshNextIp() self['ip'] = network_object['nextIp'] ips = [ip for ip in self['ip'] if ip] if self.has_property('ip') and self['ip'] else [] ip1 = self['ip'][0] if len(ips) == 1 else '' try: self.ip = self.request_lock('aRecord', self['ip'][0]) self.ip_alredy_requested = True except univention.admin.uexceptions.noLock: pass self.network_object = network_object if network_object['dnsEntryZoneForward'] and ip1: self['dnsEntryZoneForward'] = [[network_object['dnsEntryZoneForward'], ip1]] if network_object['dnsEntryZoneReverse'] and ip1: self['dnsEntryZoneReverse'] = [[network_object['dnsEntryZoneReverse'], ip1]] if network_object['dhcpEntryZone']: if ip1 and mac1: self['dhcpEntryZone'] = [(network_object['dhcpEntryZone'], ip1, mac1)] else: self.__saved_dhcp_entry = network_object['dhcpEntryZone'] self.old_network = value elif key == 'ip': self.ip_freshly_set = True if not self.ip or self.ip != value: if self.ip_alredy_requested: univention.admin.allocators.release(self.lo, self.position, 'aRecord', self.ip) self.ip_alredy_requested = 0 if value and self.network_object: if self.network_object['dnsEntryZoneForward'] and ip1: self['dnsEntryZoneForward'] = [[self.network_object['dnsEntryZoneForward'], ip1]] if self.network_object['dnsEntryZoneReverse'] and ip1: self['dnsEntryZoneReverse'] = [[self.network_object['dnsEntryZoneReverse'], ip1]] if self.network_object['dhcpEntryZone']: if ip1 and macs: self['dhcpEntryZone'] = [(self.network_object['dhcpEntryZone'], ip1, mac1)] else: self.__saved_dhcp_entry = self.network_object['dhcpEntryZone'] if not self.ip: self.ip_freshly_set = False elif key == 'mac' and self.__saved_dhcp_entry: if ip1 and macs: if isinstance(value, list): self['dhcpEntryZone'] = [(self.__saved_dhcp_entry, ip1, value[0])] else: self['dhcpEntryZone'] = [(self.__saved_dhcp_entry, ip1, value)] super(simpleComputer, self).__setitem__(key, value) if raise_after: raise raise_after
[docs]class simplePolicy(simpleLdap): def __init__(self, co, lo, position, dn='', superordinate=None, attributes=[]): self.resultmode = 0 if not hasattr(self, 'cloned'): self.cloned = None if not hasattr(self, 'changes'): self.changes = 0 if not hasattr(self, 'policy_attrs'): self.policy_attrs = {} if not hasattr(self, 'referring_object_dn'): self.referring_object_dn = None simpleLdap.__init__(self, co, lo, position, dn, superordinate, attributes) def _ldap_post_remove(self): super(simplePolicy, self)._ldap_post_remove() for object_dn in self.lo.searchDn(filter_format('univentionPolicyReference=%s', [self.dn])): try: self.lo.modify(object_dn, [('univentionPolicyReference', self.dn.encode('UTF-8'), None)]) except (univention.admin.uexceptions.base, ldap.LDAPError) as exc: univention.debug.debug(univention.debug.ADMIN, univention.debug.ERROR, 'Could not remove policy reference %r from %r: %s' % (self.dn, object_dn, exc))
[docs] def copyIdentifier(self, from_object): """Activate the result mode and set the referring object""" self.resultmode = 1 for key, property in from_object.descriptions.items(): if property.identifies: for key2, property2 in self.descriptions.items(): if property2.identifies: self.info[key2] = from_object.info[key] self.referring_object_dn = from_object.dn if not self.referring_object_dn: self.referring_object_dn = from_object.position.getDn() self.referring_object_position_dn = from_object.position.getDn()
[docs] def clone(self, referring_object): """Marks the object as a not existing one containing values retrieved by evaluating the policies for the given object""" self.cloned = self.dn self.dn = '' self.copyIdentifier(referring_object)
[docs] def getIdentifier(self): # type: () -> str for key, property in self.descriptions.items(): if property.identifies and key in self.info and self.info[key]: return key raise ValueError()
def __makeUnique(self): identifier = self.getIdentifier() components = self.info[identifier].split("_uv") if len(components) > 1: try: n = int(components[1]) n += 1 except ValueError: n = 1 else: n = 0 self.info[identifier] = "%s_uv%d" % (components[0], n) ud.debug(ud.ADMIN, ud.INFO, 'simplePolicy.__makeUnique: result: %s' % self.info[identifier])
[docs] def create(self, serverctrls=None, response=None): if not self.resultmode: return super(simplePolicy, self).create(serverctrls=serverctrls, response=response) self._exists = False try: self.oldinfo = {} dn = super(simplePolicy, self).create(serverctrls=serverctrls, response=response) ud.debug(ud.ADMIN, ud.INFO, 'simplePolicy.create: created object: info=%s' % (self.info)) except univention.admin.uexceptions.objectExists: self.__makeUnique() dn = self.create() return dn
[docs] def policy_result(self, faked_policy_reference=None): """This method retrieves the policy values currently effective for this object. If the 'resultmode' is not active the evaluation is cancelled. If faked_policy_reference is given at the top object (referring_object_dn) this policy object temporarily referenced. faked_policy_reference can be a string or a list of strings.""" if not self.resultmode: return self.polinfo_more = {} if not self.policy_attrs: policies = [] if isinstance(faked_policy_reference, (list, tuple)): policies.extend(faked_policy_reference) elif faked_policy_reference: policies.append(faked_policy_reference) self.__load_policies(policies) if hasattr(self, '_custom_policy_result_map'): self._custom_policy_result_map() else: values = {} for attr_name, value_dict in self.policy_attrs.items(): value_dict = copy.deepcopy(value_dict) values[attr_name] = copy.copy(value_dict['value']) value_dict['value'] = [x.decode('UTF-8') for x in value_dict['value']] self.polinfo_more[self.mapping.unmapName(attr_name)] = value_dict self.polinfo = univention.admin.mapping.mapDict(self.mapping, values) self.polinfo = self._post_unmap(self.polinfo, values)
def __load_policies(self, policies=None): if not self.policy_attrs: # the referring object does not exist yet if not self.referring_object_dn == self.referring_object_position_dn: result = self.lo.getPolicies(self.lo.parentDn(self.referring_object_dn), policies=policies) else: result = self.lo.getPolicies(self.referring_object_position_dn, policies=policies) for policy_oc, attrs in result.items(): if univention.admin.objects.ocToType(policy_oc) == self.module: self.policy_attrs = attrs def __getitem__(self, key): if not self.resultmode: if self.has_property('emptyAttributes') and self.mapping.mapName(key) and self.mapping.mapName(key) in simpleLdap.__getitem__(self, 'emptyAttributes'): ud.debug(ud.ADMIN, ud.INFO, 'simplePolicy.__getitem__: empty Attribute %s' % key) if self.descriptions[key].multivalue: return [] else: return '' return simpleLdap.__getitem__(self, key) self.policy_result() if (key in self.polinfo and not (key in self.info or key in self.oldinfo)) or (key in self.polinfo_more and 'fixed' in self.polinfo_more[key] and self.polinfo_more[key]['fixed']): if self.descriptions[key].multivalue and not isinstance(self.polinfo[key], list): # why isn't this correct in the first place? self.polinfo[key] = [self.polinfo[key]] ud.debug(ud.ADMIN, ud.INFO, 'simplePolicy.__getitem__: presult: %s=%s' % (key, self.polinfo[key])) return self.polinfo[key] result = simpleLdap.__getitem__(self, key) ud.debug(ud.ADMIN, ud.INFO, 'simplePolicy.__getitem__: result: %s=%s' % (key, result)) return result
[docs] def fixedAttributes(self): # type: () -> Dict[str, bool] """ Return effectively fixed attributes. """ if not self.resultmode: return {} self.__load_policies(None) return { self.mapping.unmapName(attr_name): value_dict.get('fixed', False) for attr_name, value_dict in self.policy_attrs.items() }
[docs] def emptyAttributes(self): # type: () -> Dict[str, bool] """ return effectively empty attributes. """ if not self.has_property('emptyAttributes'): return {} return { self.mapping.unmapName(attrib): True for attrib in simpleLdap.__getitem__(self, 'emptyAttributes') or () }
def __setitem__(self, key, newvalue): if not self.resultmode: simpleLdap.__setitem__(self, key, newvalue) return self.policy_result() if key in self.polinfo: if self.polinfo[key] != newvalue or self.polinfo_more[key]['policy'] == self.cloned or (key in self.info and self.info[key] != newvalue): if self.polinfo_more[key]['fixed'] and self.polinfo_more[key]['policy'] != self.cloned: raise univention.admin.uexceptions.policyFixedAttribute(key) simpleLdap.__setitem__(self, key, newvalue) ud.debug(ud.ADMIN, ud.INFO, 'polinfo: set key %s to newvalue %s' % (key, newvalue)) if self.hasChanged(key): ud.debug(ud.ADMIN, ud.INFO, 'polinfo: key:%s hasChanged' % (key)) self.changes = 1 return # this object did not exist before if not self.oldinfo: # if this attribute is of type boolean and the new value is equal to the default, than ignore this "change" if isinstance(self.descriptions[key].syntax, univention.admin.syntax.boolean): default = self.descriptions[key].base_default if type(self.descriptions[key].base_default) in (tuple, list): default = self.descriptions[key].base_default[0] if (not default and newvalue == '0') or default == newvalue: return simpleLdap.__setitem__(self, key, newvalue) if self.hasChanged(key): self.changes = 1
class _MergedAttributes(object): """Evaluates old attributes and the modlist to get a new representation of the object.""" def __init__(self, obj, modlist): self.obj = obj self.modlist = [x if len(x) == 3 else (x[0], None, x[-1]) for x in modlist] self.case_insensitive_attributes = ['objectClass'] def get_attributes(self): attributes = set(self.obj.oldattr.keys()) | {x[0] for x in self.modlist} return {attr: self.get_attribute(attr) for attr in attributes} def get_attribute(self, attr): values = set(self.obj.oldattr.get(attr, [])) # evaluate the modlist and apply all changes to the current values for (att, old, new) in self.modlist: if att.lower() != attr.lower(): continue new = [] if not new else [new] if isinstance(new, bytes) else new old = [] if not old else [old] if isinstance(old, bytes) else old if not old and new: # MOD_ADD values |= set(new) elif not new and old: # MOD_DELETE values -= set(old) elif old and new: # MOD_REPLACE values = set(new) return list(values)