# vgp_access_ext samba group policy # Copyright (C) David Mulder 2020 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os, re from samba.gpclass import gp_xml_ext from hashlib import blake2b from tempfile import NamedTemporaryFile from samba.common import get_bytes intro = ''' ### autogenerated by samba # # This file is generated by the vgp_access_ext Group Policy # Client Side Extension. To modify the contents of this file, # modify the appropriate Group Policy objects which apply # to this machine. DO NOT MODIFY THIS FILE DIRECTLY. # ''' # Access files in /etc/security/access.d are read in the order of the system # locale. Here we number the conf files to ensure they are read in the correct # order. def select_next_conf(directory): configs = [re.match(r'(\d+)', f) for f in os.listdir(directory)] return max([int(m.group(1)) for m in configs if m]+[0])+1 class vgp_access_ext(gp_xml_ext): def __str__(self): return 'VGP/Unix Settings/Host Access' def process_group_policy(self, deleted_gpo_list, changed_gpo_list, access='/etc/security/access.d'): for guid, settings in deleted_gpo_list: self.gp_db.set_guid(guid) if str(self) in settings: for attribute, access_file in settings[str(self)].items(): if os.path.exists(access_file): os.unlink(access_file) self.gp_db.delete(str(self), attribute) self.gp_db.commit() for gpo in changed_gpo_list: if gpo.file_sys_path: self.gp_db.set_guid(gpo.name) allow = 'MACHINE/VGP/VTLA/VAS/HostAccessControl/Allow/manifest.xml' path = os.path.join(gpo.file_sys_path, allow) allow_conf = self.parse(path) deny = 'MACHINE/VGP/VTLA/VAS/HostAccessControl/Deny/manifest.xml' path = os.path.join(gpo.file_sys_path, deny) deny_conf = self.parse(path) entries = [] if allow_conf: policy = allow_conf.find('policysetting') data = policy.find('data') for listelement in data.findall('listelement'): adobject = listelement.find('adobject') name = adobject.find('name').text domain = adobject.find('domain').text entries.append('+:%s\\%s:ALL' % (domain, name)) if deny_conf: policy = deny_conf.find('policysetting') data = policy.find('data') for listelement in data.findall('listelement'): adobject = listelement.find('adobject') name = adobject.find('name').text domain = adobject.find('domain').text entries.append('-:%s\\%s:ALL' % (domain, name)) if len(entries) == 0: continue conf_id = select_next_conf(access) access_file = os.path.join(access, '%010d_gp.conf' % conf_id) access_contents = '\n'.join(entries) attribute = blake2b(get_bytes(access_contents)).hexdigest() old_val = self.gp_db.retrieve(str(self), attribute) if old_val is not None: continue if not os.path.isdir(access): os.mkdir(access, 0o644) with NamedTemporaryFile(delete=False, dir=access) as f: with open(f.name, 'w') as w: w.write(intro) w.write(access_contents) os.chmod(f.name, 0o644) os.rename(f.name, access_file) self.gp_db.store(str(self), attribute, access_file) self.gp_db.commit() def rsop(self, gpo): output = {} if gpo.file_sys_path: self.gp_db.set_guid(gpo.name) allow = 'MACHINE/VGP/VTLA/VAS/HostAccessControl/Allow/manifest.xml' path = os.path.join(gpo.file_sys_path, allow) allow_conf = self.parse(path) deny = 'MACHINE/VGP/VTLA/VAS/HostAccessControl/Deny/manifest.xml' path = os.path.join(gpo.file_sys_path, deny) deny_conf = self.parse(path) entries = [] if allow_conf: policy = allow_conf.find('policysetting') data = policy.find('data') for listelement in data.findall('listelement'): adobject = listelement.find('adobject') name = adobject.find('name').text domain = adobject.find('domain').text if str(self) not in output.keys(): output[str(self)] = [] output[str(self)].append('+:%s\\%s:ALL' % (name, domain)) if deny_conf: policy = deny_conf.find('policysetting') data = policy.find('data') for listelement in data.findall('listelement'): adobject = listelement.find('adobject') name = adobject.find('name').text domain = adobject.find('domain').text if str(self) not in output.keys(): output[str(self)] = [] output[str(self)].append('-:%s\\%s:ALL' % (name, domain)) return output