Source code for univention.management.console.acl

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
#  UMC ACL implementation
#
# Copyright 2006-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

r"""
UMC ACL implementation
======================

This module implements the UMC ACLs used to define the access rights for
users and groups to the UMC service.

UMC ACLs are defined by creating *UMC operation set* objects that are added
to *UMC policies*. These policies can be connected with users or
groups.

An *UMC operation set* consists of a list of UMC command patterns like

.. code-block:: none

	udm/* objectType=nagios/*

This specifies that all commands hat match the pattern `udm/\*` can be
called if the option *objectType* is given and the value matches the
pattern `nagios/\*`.

Patterns for commands and options may just use the asterik and know no
other wildcards. For options there is one additional format allowed to
specify that the option may not exist. Therefore the following format is
used

.. code-block:: none

	udm/* !objectType
"""

from __future__ import absolute_import

import os
import ldap
import json
import pickle
import itertools
import operator
import traceback
from fnmatch import fnmatch
from ldap.filter import filter_format
import six

from .config import ucr
from .log import ACL

import univention.admin.handlers.computers.domaincontroller_master as dc_master
import univention.admin.handlers.computers.domaincontroller_backup as dc_backup
import univention.admin.handlers.computers.domaincontroller_slave as dc_slave
import univention.admin.handlers.computers.memberserver as memberserver

import univention.admin.uexceptions as udm_errors


[docs]class Rule(dict): """A simple class representing one ACL rule in a form that can be simply serialized.""" @property def fromUser(self): """Returns *True* if the rule was connected with a user, otherwise False""" return self.get('fromUser', False) @property def host(self): """Returns a hostname pattern. If the pattern matches the hostname the command is allowed on the host""" return self.get('host', '*') @property def command(self): """Returns the command pattern this rule describes""" return self.get('command', '') @property def options(self): """Returns the option pattern for the rule""" return self.get('options', {}) @property def flavor(self): """Returns the flavor if given otherwise *None*""" return self.get('flavor', None) def __eq__(self, other): return self.fromUser == other.fromUser and self.host == other.host and self.command == other.command and self.flavor == other.flavor and self.options == other.options
[docs]class ACLs(object): """Provides methods to determine the access rights of users to specific UMC commands. It defines a cache for ACLs, a parser for command definitions of ACLs and functions for comparison. """ # constants (MATCH_NONE, MATCH_PART, MATCH_FULL) = range(3) #: defines the directory for the cache files CACHE_DIR = '/var/cache/univention-management-console/acls' #: list of all supported computer types for ACL rules _systemroles = (dc_master, dc_backup, dc_slave, memberserver) def __init__(self, ldap_base=None, acls=None): self.__ldap_base = ldap_base # the main acl dict if acls is None: self.acls = [] else: self.acls = [Rule(x) for x in acls] def _expand_hostlist(self, hostlist): hosts = [] if self.__ldap_base is None: self.__ldap_base = ucr.get('ldap/base', None) for host in hostlist: host = host.decode('utf-8') if host.startswith('systemrole:'): role = host[len('systemrole:'):] if role.lower() == ucr.get('server/role').lower(): hosts.append(ucr['hostname']) elif host.startswith('service:'): service = host[len('service:'):] for role in ACLs._systemroles: servers = role.lookup(None, self.lo, filter_format('univentionService=%s', [service]), base=self.__ldap_base) for server in servers: if 'name' in server: hosts.append(server['name']) elif host == '*': hosts.append(ucr['hostname']) elif fnmatch(ucr['hostname'], host): hosts.append(ucr['hostname']) return hosts def __parse_command(self, command): data = '' if ':' in command: command, data = command.split(':', 1) options = {} if data: elements = data.split(',') for elem in elements: if '=' in elem: key, value = elem.split('=', 1) options[key.strip()] = value.strip() elif elem.startswith('!'): # key without value allowed if starting with ! -> key may not exist options[elem.strip()] = None return command, options def _append(self, fromUser, ldap_object): for host in self._expand_hostlist(ldap_object.get('umcOperationSetHost', [b'*'])): flavor = ldap_object.get('umcOperationSetFlavor', [b'*']) for command in ldap_object.get('umcOperationSetCommand', b''): command, options = self.__parse_command(command.decode('utf-8')) new_rule = Rule({'fromUser': fromUser, 'host': host, 'command': command, 'options': options, 'flavor': flavor[0].decode('utf-8')}) self.acls.append(new_rule) def __compare_rules(self, rule1, rule2): """Hacky version of rule comparison""" if not rule1: return rule2 if not rule2: return rule1 if rule1.fromUser and not rule2.fromUser: return rule1 elif not rule1.fromUser and rule2.fromUser: return rule2 else: if len(rule1.command) >= len(rule2.command): return rule1 else: return rule2 def __option_match(self, opt_pattern, opts): match = ACLs.MATCH_FULL for key, value in opt_pattern.items(): # a key starting with ! means it may not be available if key.startswith('!') and key[1:] in opts: return ACLs.MATCH_NONE # else if key not in opts no rule available -> OK if key not in opts: continue if isinstance(opts[key], six.string_types): options = (opts[key], ) else: options = opts[key] for option in options: if not value.endswith('*'): if value != option: return ACLs.MATCH_NONE elif not option.startswith(value[: -1]): return ACLs.MATCH_NONE else: match = ACLs.MATCH_FULL return match def __command_match(self, cmd1, cmd2): """ if cmd1 == cmd2 return self.COMMAND_MATCH if cmd2 is part of cmd1 return self.COMMAND_PART if noting return self.COMMAND_NONE """ if cmd1 == cmd2: return ACLs.MATCH_FULL if cmd1 and cmd1.endswith('*') and cmd2.startswith(cmd1[:-1]): return ACLs.MATCH_PART return ACLs.MATCH_NONE def __flavor_match(self, flavor1, flavor2): """ if flavor1 == flavor2 or flavor1 is None or the pattern '*' return self.COMMAND_MATCH if flavor2 is part of flavor1 return self.COMMAND_PART if noting return self.COMMAND_NONE """ if flavor1 == flavor2 or flavor1 is None or flavor1 == '*': return ACLs.MATCH_FULL if flavor1.endswith('*') and flavor2 and flavor2.startswith(flavor1[:-1]): return ACLs.MATCH_PART return ACLs.MATCH_NONE def _is_allowed(self, acls, command, hostname, options, flavor): for rule in acls: if hostname and rule.host != '*' and rule.host != hostname: continue match = self.__command_match(rule.command, command) opt_match = self.__option_match(rule.options, options) flavor_match = self.__flavor_match(rule.flavor, flavor) if match in (ACLs.MATCH_PART, ACLs.MATCH_FULL) and opt_match == ACLs.MATCH_FULL and flavor_match in (ACLs.MATCH_PART, ACLs.MATCH_FULL): return True # default is to prohibit the command execution return False
[docs] def is_command_allowed(self, command, hostname=None, options={}, flavor=None): """This method verifies if the given command (with options and flavor) is on the named host allowed. :param str command: the command to check access for :param str hostname: FQDN of the host :param dict options: the command options given in the UMCP request :param str flavor: the flavor given in the UMCP request :rtype: bool """ if not hostname: hostname = ucr['hostname'] # first check the group rules. If the group policy allows the # command there is no need to check the user policy user_rules = [x for x in self.acls if x.fromUser] group_rules = [x for x in self.acls if not x.fromUser] return self._is_allowed(user_rules + group_rules, command, hostname, options, flavor)
def _dump(self): """Dumps the ACLs for the user""" ACL.info('Allowed UMC operations:') ACL.info(' %-5s | %-20s | %-15s | %-20s | %-20s' % ('User', 'Host', 'Flavor', 'Command', 'Options')) ACL.info('******************************************************************************') for rule in self.acls: ACL.info(' %-5s | %-20s | %-15s | %-20s | %-20s' % (rule.fromUser, rule.host, rule.flavor, rule.command, rule.options)) ACL.info('') def _read_from_file(self, username): filename = os.path.join(ACLs.CACHE_DIR, username.replace('/', '')) try: try: with open(filename, 'r') as fd: acls = json.load(fd) except (ValueError, TypeError): if six.PY3: raise with open(filename, 'r') as fd: acls = pickle.load(fd) else: acls = [Rule(x) for x in acls] except EnvironmentError as exc: ACL.process('Could not load ACLs of %r: %s' % (username, exc,)) return False self.acls = [] for rule in acls: if rule not in self.acls: if 'flavor' not in rule: rule['flavor'] = None if 'options' not in rule: rule['options'] = {} self.acls.append(rule) def _write_to_file(self, username): filename = os.path.join(ACLs.CACHE_DIR, username.replace('/', '')) try: file = os.open(filename, os.O_WRONLY | os.O_TRUNC | os.O_CREAT, 0o600) os.write(file, json.dumps(self.acls, ensure_ascii=True).encode('ASCII')) os.close(file) except EnvironmentError as exc: ACL.error('Could not write ACL file: %s' % (exc,)) return False
[docs] def json(self): """Returns the ACL definitions in a JSON compatible form.""" return self.acls
[docs]class LDAP_ACLs(ACLs): """Reads ACLs from LDAP directory for the given username. By inheriting the class :class:`ACLs` the ACL definitions can be cached on the local system. If the LDAP server can not be reached the cache is used if available.""" FROM_USER = True FROM_GROUP = False def __init__(self, lo, username, ldap_base): ACLs.__init__(self, ldap_base) self.lo = lo self.username = username if self.lo: self._read_from_ldap() self._write_to_file(self.username) else: # read ACLs from file self._read_from_file(self.username) self._dump() def _get_policy_for_dn(self, dn): policy = self.lo.getPolicies(dn, policies=[], attrs={}, result={}, fixedattrs={}) return policy.get('umcPolicy', None) def _read_from_ldap(self): # TODO: check for fixed attributes try: userdn = self.lo.searchDn(filter_format('(&(objectClass=person)(uid=%s))', [self.username]), unique=True)[0] policy = self._get_policy_for_dn(userdn) except (udm_errors.base, ldap.LDAPError, IndexError) as exc: if not isinstance(exc, IndexError): ACL.warn('Error reading credentials from LDAP for user %s: %s' % (self.username, traceback.format_exc(),)) # read ACLs from file self._read_from_file(self.username) return if policy and 'umcPolicyGrantedOperationSet' in policy: for value in policy['umcPolicyGrantedOperationSet']['value']: self._append(LDAP_ACLs.FROM_USER, self.lo.get(value.decode('UTF-8'))) # TODO: check for nested groups groupDNs = self.lo.searchDn(filter=filter_format('uniqueMember=%s', [userdn])) for gDN in groupDNs: policy = self._get_policy_for_dn(gDN) if policy and 'umcPolicyGrantedOperationSet' in policy: for value in policy['umcPolicyGrantedOperationSet']['value']: self._append(LDAP_ACLs.FROM_GROUP, self.lo.get(value.decode('UTF-8'))) # make the ACLs unique self.acls.sort(key=operator.itemgetter('fromUser', 'host', 'command', 'flavor')) result = [] for k, g in itertools.groupby(self.acls, operator.itemgetter('fromUser', 'host', 'command', 'options', 'flavor')): result.append(next(g)) self.acls[:] = result