Source code for univention.lib.share_restrictions

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Samba
#  this script creates samba configurations from ucr values
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

from __future__ import print_function
from univention.config_registry import ConfigRegistry

from six.moves.configparser import ConfigParser
from six.moves.urllib_parse import quote
import os
import re
import shlex

# defaults
ucr = ConfigRegistry()

# global hashes
shares = {}
globals = {}
printers = {}

ucr.load()


[docs]class Restrictions(dict): INVALID_USERS = 'invalid users' VALID_USERS = 'valid users' HOSTS_DENY = 'hosts deny' HOSTS_ALLOW = 'hosts allow' def __init__(self, name): dict.__init__(self, { Restrictions.INVALID_USERS: None, Restrictions.VALID_USERS: None, Restrictions.HOSTS_DENY: None, Restrictions.HOSTS_ALLOW: None }) self.name = name self.ucr = False def _add(self, key, value): if not isinstance(value, (tuple, list, set)): value = [value] value = ['"%s"' % x if ' ' in x else x for x in value] if self[key] is None: self[key] = set(value) else: self[key].update(value) @property def invalid_users(self): return self[Restrictions.INVALID_USERS] @invalid_users.setter def invalid_users(self, value): self._add(Restrictions.INVALID_USERS, value) @property def valid_users(self): return self[Restrictions.VALID_USERS] @valid_users.setter def valid_users(self, value): self._add(Restrictions.VALID_USERS, value) @property def hosts_deny(self): return self[Restrictions.HOSTS_DENY] @hosts_deny.setter def hosts_deny(self, value): self._add(Restrictions.HOSTS_DENY, value) @property def hosts_allow(self): return self[Restrictions.HOSTS_ALLOW] @hosts_allow.setter def hosts_allow(self, value): self._add(Restrictions.HOSTS_ALLOW, value)
[docs]class Share(Restrictions): pass
[docs]class Printer(Restrictions): def __init__(self, name): Restrictions.__init__(self, name) self['smbname'] = None @property def smbname(self): return self['smbname'] @smbname.setter def smbname(self, name): self['smbname'] = name
[docs]class ShareConfiguration(object): SHARES_DIR = '/etc/samba/local.config.d' SHARES_UDM_DIR = '/etc/samba/shares.conf.d' PRINTERS_UDM_DIR = '/etc/samba/printers.conf.d' POSTFIX = '.local.config.conf' PREFIX = 'printer.' INCLUDE_CONF = '/etc/samba/local.config.conf' GLOBAL_CONF = '/etc/samba/local.config.d/global.local.config.conf' CUPS_CONF = '/etc/cups/printers.conf' def __init__(self): self._shares = {} self._globals = {} self._printers = {}
[docs] def delete(self): """delete all conf's in SHARES_DIR and INCLUDE_CONF""" if not os.path.isdir(ShareConfiguration.SHARES_DIR): os.makedirs(ShareConfiguration.SHARES_DIR) if os.path.isfile(ShareConfiguration.INCLUDE_CONF): os.remove(ShareConfiguration.INCLUDE_CONF) if os.path.isfile(ShareConfiguration.GLOBAL_CONF): os.remove(ShareConfiguration.GLOBAL_CONF) for item in os.listdir(ShareConfiguration.SHARES_DIR): filename = os.path.join(ShareConfiguration.SHARES_DIR, item) if os.path.isfile(filename) and filename.endswith(ShareConfiguration.POSTFIX): os.remove(filename)
[docs] def read_shares(self): """get invalid user from samba share conf""" if not os.path.isdir(ShareConfiguration.SHARES_UDM_DIR): return for filename in os.listdir(ShareConfiguration.SHARES_UDM_DIR): filename = os.path.join(ShareConfiguration.SHARES_UDM_DIR, filename) cfg = ConfigParser() cfg.read(filename) try: share = Share(cfg.sections()[0]) except IndexError: continue if cfg.has_option(share.name, Restrictions.INVALID_USERS): share.invalid_users = shlex.split(cfg.get(share.name, Restrictions.INVALID_USERS)) if cfg.has_option(share.name, Restrictions.HOSTS_DENY): share.hosts_deny = shlex.split(cfg.get(share.name, Restrictions.HOSTS_DENY)) self._shares[share.name] = share
[docs] def read_printers(self): """get invalid/valid users from cups and samba config""" # read CUPS configuration if os.path.isfile(ShareConfiguration.CUPS_CONF): reg_cups = re.compile(r'\s*<Printer\s+([^>]+)>') with open("/etc/cups/printers.conf") as fd: for line in fd.readlines(): m_cups = reg_cups.match(line) if m_cups: prt = Printer(m_cups.group(1).strip()) self._printers[prt.name] = prt # samba if not os.path.exists(ShareConfiguration.PRINTERS_UDM_DIR): return for filename in os.listdir(ShareConfiguration.PRINTERS_UDM_DIR): cfg = ConfigParser() cfg.read(os.path.join(ShareConfiguration.PRINTERS_UDM_DIR, filename)) try: prt_name = cfg.sections()[0] except IndexError: continue prt = None if prt_name in self._printers: prt = self._printers[prt_name] else: if cfg.has_option(prt_name, 'printer name'): cups_name = cfg.get(prt_name, 'printer name') if cups_name in self._printers: prt = self._printers[cups_name] prt.smbname = prt_name if prt is None: continue if cfg.has_option(prt_name, Restrictions.INVALID_USERS): prt.invalid_users = shlex.split(cfg.get(prt_name, Restrictions.INVALID_USERS)) if cfg.has_option(prt_name, Restrictions.VALID_USERS): prt.valid_users = shlex.split(cfg.get(prt_name, Restrictions.VALID_USERS)) if cfg.has_option(prt_name, Restrictions.HOSTS_DENY): prt.hosts_deny = shlex.split(cfg.get(prt_name, Restrictions.HOSTS_DENY))
def _set_invalids(self, value, share, group): if share and group and value.lower() in ('true', 'yes', '1'): if share not in self._shares: self._shares[share] = Share(share) self._shares[share].ucr = True self._shares[share].invalid_users = '@' + group def _set_denied_hosts(self, value, share): if share not in self._shares: self._shares[share] = Share(share) self._shares[share].ucr = True self._shares[share].hosts_deny = shlex.split(value) def _set_printmode_group(self, mode, group): if mode not in ('none', 'all'): return group = "@" + group for prt in self._printers.values(): prt.ucr = True if mode == 'none': prt.invalid_users = group else: prt.valid_users = group def _set_printmode_hosts(self, hosts, mode): if mode not in ('none', 'all'): return hosts = hosts.split(' ') for prt in self._printers.values(): prt.ucr = True if mode == "none": prt.hosts_deny = hosts else: prt.hosts_allow = hosts def _set_othershares(self, value, group): """append group to invalid users for all shares, except shares group (the groupname) and marktplatz""" if not group or not value.lower() in ('true', 'yes', '1'): return for share in self._shares.values(): if share.name in (group, 'marktplatz', 'homes'): continue share.invalid_users = '@' + group share.ucr = True def _set_othershares_hosts(self, value): if not value: return for share in self._shares.values(): if share.name in ('marktplatz', 'homes'): continue share.hosts_deny = shlex.split(value) share.ucr = True # set global options to -> globals def _set_globals(self, value, option): if option and value: self._globals[option] = value # set share options to -> shares def _set_options(self, value, share, option): if share and option and value: if share not in self._shares: return if option not in self._shares[share]: self._shares[share][option] = set() self._shares[share].ucr = True self._shares[share][option].add(value) # parse ucr
[docs] def read_ucr(self): _map = dict( options=(re.compile(r'samba/share/([^\/]+)/options/(.*)'), self._set_options), globals=(re.compile('samba/global/options/(.*)'), self._set_globals), hosts=(re.compile(r'samba/share/([^\/]+)/hosts/deny'), self._set_denied_hosts), users=(re.compile(r'samba/share/([^\/]+)/usergroup/([^\/]+)/invalid'), self._set_invalids), printmode_groups=(re.compile(r'samba/printmode/usergroup/(.*)'), self._set_printmode_group), printmode_hosts=(re.compile('samba/printmode/hosts/(.*)'), self._set_printmode_hosts), othershares=(re.compile(r'samba/othershares/usergroup/([^\/]+)/invalid'), self._set_othershares), othershares_hosts=(re.compile('samba/othershares/hosts/deny'), self._set_othershares_hosts) ) for key in ucr.keys(): for regex, func in _map.values(): match = regex.match(key) if match: func(ucr[key], *match.groups())
[docs] def read(self): # get available cups samba printers and valid/invalid users self.read_printers() # get invalid users for shares from samba config self.read_shares() # get ucr options self.read_ucr()
[docs] def write(self): includes = set() self.delete() # write conf file with global options if self.globals: with open(ShareConfiguration.GLOBAL_CONF, 'w') as fd: fd.write("[global]\n") fd.write(''.join('%s = %s\n' % item for item in self.globals.items())) includes.add('include = %s' % ShareConfiguration.GLOBAL_CONF) # write share configs files with options and invalid users for share in self._shares.values(): # write share conf only if we have ucr settings if not share.ucr: continue share_filename = os.path.join(ShareConfiguration.SHARES_DIR, quote(share.name, safe='') + ShareConfiguration.POSTFIX) with open(share_filename, "w") as fd: fd.write("[" + share.name + "]\n") for option in share: if share[option] is None: continue fd.write('%s = ' % option) fd.write(' '.join(share[option])) fd.write('\n') includes.add('include = %s' % share_filename) # write print share configs for prt in self._printers.values(): # write prin share conf only if we have a proper ucr setting if not prt.ucr: continue filename = os.path.join(ShareConfiguration.SHARES_DIR, ShareConfiguration.PREFIX + prt.name + ShareConfiguration.POSTFIX) includes.add('include = %s' % filename) with open(filename, 'w') as fd: if not prt.smbname: fd.write('[%s]\n' % prt.name) else: fd.write('[%s]\n' % prt.smbname) fd.write('printer name = %s\n' % prt.name) fd.write('path = /tmp\n') fd.write('guest ok = yes\n') fd.write('printable = yes\n') for option in (Restrictions.VALID_USERS, Restrictions.INVALID_USERS, Restrictions.HOSTS_DENY, Restrictions.HOSTS_ALLOW): if option in prt and prt[option] is not None: fd.write('%s = ' % option) fd.write(' '.join(prt[option])) fd.write('\n') # all include statements go to this file (create file een if there is no include with open(ShareConfiguration.INCLUDE_CONF, 'w') as f: f.write('\n'.join(includes) + '\n')
@property def globals(self): return self._globals @property def shares(self): return self._shares @property def printers(self): return self._printers
if __name__ == '__main__': cfg = ShareConfiguration() cfg.read() print(cfg.globals) print(cfg.shares) print(cfg.printers) cfg.write()