diff options
author | David Mulder <dmulder@samba.org> | 2022-11-28 13:37:52 -0700 |
---|---|---|
committer | Jeremy Allison <jra@samba.org> | 2022-12-21 02:04:36 +0000 |
commit | 2953329ba07cb7de6c8df1718779b7c9045d3910 (patch) | |
tree | 4f03af7a9ed45f718937c7c4ebf68b09d5cbaf80 /python/samba | |
parent | 81dbcae9dfba2f2dd7b5e7e04f9ababca02ed49b (diff) | |
download | samba-2953329ba07cb7de6c8df1718779b7c9045d3910.tar.gz |
gp: Modify Sudoers CSEs to use new files applier
Signed-off-by: David Mulder <dmulder@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'python/samba')
-rw-r--r-- | python/samba/gp/gp_centrify_sudoers_ext.py | 51 | ||||
-rw-r--r-- | python/samba/gp/gp_sudoers_ext.py | 69 | ||||
-rw-r--r-- | python/samba/gp/vgp_sudoers_ext.py | 51 |
3 files changed, 69 insertions, 102 deletions
diff --git a/python/samba/gp/gp_centrify_sudoers_ext.py b/python/samba/gp/gp_centrify_sudoers_ext.py index 2d03a4871a4..3de4fb871ea 100644 --- a/python/samba/gp/gp_centrify_sudoers_ext.py +++ b/python/samba/gp/gp_centrify_sudoers_ext.py @@ -15,11 +15,8 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os -from samba.gp.gpclass import gp_pol_ext -from base64 import b64encode -from tempfile import NamedTemporaryFile -from subprocess import Popen, PIPE -from samba.gp.gp_sudoers_ext import visudo, intro +from samba.gp.gpclass import gp_pol_ext, gp_file_applier +from samba.gp.gp_sudoers_ext import sudo_applier_func from samba.gp.util.logging import log def ext_enabled(entries): @@ -29,57 +26,41 @@ def ext_enabled(entries): return e.data == 1 return False -class gp_centrify_sudoers_ext(gp_pol_ext): +class gp_centrify_sudoers_ext(gp_pol_ext, gp_file_applier): def __str__(self): return 'Centrify/Sudo Rights' def process_group_policy(self, deleted_gpo_list, changed_gpo_list, sdir='/etc/sudoers.d'): for guid, settings in deleted_gpo_list: - self.gp_db.set_guid(guid) if str(self) in settings: for attribute, sudoers in settings[str(self)].items(): - if os.path.exists(sudoers): - os.unlink(sudoers) - self.gp_db.delete(str(self), attribute) - self.gp_db.commit() + self.unapply(guid, attribute, sudoers) for gpo in changed_gpo_list: if gpo.file_sys_path: section = 'Software\\Policies\\Centrify\\UnixSettings\\SuDo' - self.gp_db.set_guid(gpo.name) pol_file = 'MACHINE/Registry.pol' path = os.path.join(gpo.file_sys_path, pol_file) pol_conf = self.parse(path) if not pol_conf or not ext_enabled(pol_conf.entries): continue + sudo_entries = [] for e in pol_conf.entries: if e.keyname == section and e.data.strip(): if '**delvals.' in e.valuename: continue - attribute = b64encode(e.data.encode()).decode() - old_val = self.gp_db.retrieve(str(self), attribute) - if not old_val: - contents = intro - contents += '%s\n' % e.data - with NamedTemporaryFile() as f: - with open(f.name, 'w') as w: - w.write(contents) - sudo_validation = \ - Popen([visudo, '-c', '-f', f.name], - stdout=PIPE, stderr=PIPE).wait() - if sudo_validation == 0: - with NamedTemporaryFile(prefix='gp_', - delete=False, - dir=sdir) as f: - with open(f.name, 'w') as w: - w.write(contents) - self.gp_db.store(str(self), - attribute, - f.name) - else: - log.error('Sudoers apply failed', e.data) - self.gp_db.commit() + sudo_entries.append(e.data) + # Each GPO applies only one set of sudoers, in a + # set of files, so the attribute does not need uniqueness. + attribute = self.generate_attribute(gpo.name, *sudo_entries) + # The value hash is generated from the sudo_entries, ensuring + # any changes to this GPO will cause the files to be rewritten. + value_hash = self.generate_value_hash(*sudo_entries) + self.apply(gpo.name, attribute, value_hash, sudo_applier_func, + sdir, sudo_entries) + # Cleanup any old entries that are no longer part of the policy + self.clean(gpo.name, keep=[attribute]) def rsop(self, gpo): output = {} diff --git a/python/samba/gp/gp_sudoers_ext.py b/python/samba/gp/gp_sudoers_ext.py index b3a37efb61a..5607298eb19 100644 --- a/python/samba/gp/gp_sudoers_ext.py +++ b/python/samba/gp/gp_sudoers_ext.py @@ -15,8 +15,7 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os -from samba.gp.gpclass import gp_pol_ext -from base64 import b64encode +from samba.gp.gpclass import gp_pol_ext, gp_file_applier from tempfile import NamedTemporaryFile from subprocess import Popen, PIPE from samba.gp.util.logging import log @@ -42,55 +41,61 @@ intro = ''' visudo = find_executable('visudo', path='%s:%s' % (os.environ['PATH'], '/usr/sbin')) -class gp_sudoers_ext(gp_pol_ext): +def sudo_applier_func(sudo_dir, sudo_entries): + ret = [] + for p in sudo_entries: + contents = intro + contents += '%s\n' % p + with NamedTemporaryFile() as f: + with open(f.name, 'w') as w: + w.write(contents) + sudo_validation = \ + Popen([visudo, '-c', '-f', f.name], + stdout=PIPE, stderr=PIPE).wait() + if sudo_validation == 0: + with NamedTemporaryFile(prefix='gp_', + delete=False, + dir=sudo_dir) as f: + with open(f.name, 'w') as w: + w.write(contents) + ret.append(f.name) + else: + log.error('Sudoers apply failed', p) + return ret + +class gp_sudoers_ext(gp_pol_ext, gp_file_applier): def __str__(self): return 'Unix Settings/Sudo Rights' def process_group_policy(self, deleted_gpo_list, changed_gpo_list, sdir='/etc/sudoers.d'): for guid, settings in deleted_gpo_list: - self.gp_db.set_guid(guid) if str(self) in settings: for attribute, sudoers in settings[str(self)].items(): - if os.path.exists(sudoers): - os.unlink(sudoers) - self.gp_db.delete(str(self), attribute) - self.gp_db.commit() + self.unapply(guid, attribute, sudoers) for gpo in changed_gpo_list: if gpo.file_sys_path: section = 'Software\\Policies\\Samba\\Unix Settings\\Sudo Rights' - self.gp_db.set_guid(gpo.name) pol_file = 'MACHINE/Registry.pol' path = os.path.join(gpo.file_sys_path, pol_file) pol_conf = self.parse(path) if not pol_conf: continue + sudo_entries = [] for e in pol_conf.entries: if e.keyname == section and e.data.strip(): - attribute = b64encode(e.data.encode()).decode() - old_val = self.gp_db.retrieve(str(self), attribute) - if not old_val: - contents = intro - contents += '%s\n' % e.data - with NamedTemporaryFile() as f: - with open(f.name, 'w') as w: - w.write(contents) - sudo_validation = \ - Popen([visudo, '-c', '-f', f.name], - stdout=PIPE, stderr=PIPE).wait() - if sudo_validation == 0: - with NamedTemporaryFile(prefix='gp_', - delete=False, - dir=sdir) as f: - with open(f.name, 'w') as w: - w.write(contents) - self.gp_db.store(str(self), - attribute, - f.name) - else: - log.error('Sudoers apply failed', e.data) - self.gp_db.commit() + sudo_entries.append(e.data) + # Each GPO applies only one set of sudoers, in a + # set of files, so the attribute does not need uniqueness. + attribute = self.generate_attribute(gpo.name) + # The value hash is generated from the sudo_entries, ensuring + # any changes to this GPO will cause the files to be rewritten. + value_hash = self.generate_value_hash(*sudo_entries) + self.apply(gpo.name, attribute, value_hash, sudo_applier_func, + sdir, sudo_entries) + # Cleanup any old entries that are no longer part of the policy + self.clean(gpo.name, keep=[attribute]) def rsop(self, gpo): output = {} diff --git a/python/samba/gp/vgp_sudoers_ext.py b/python/samba/gp/vgp_sudoers_ext.py index c0137d5f49e..d11200869e7 100644 --- a/python/samba/gp/vgp_sudoers_ext.py +++ b/python/samba/gp/vgp_sudoers_ext.py @@ -15,31 +15,23 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os -from samba.gp.gpclass import gp_xml_ext -from base64 import b64encode -from tempfile import NamedTemporaryFile -from subprocess import Popen, PIPE -from samba.gp.gp_sudoers_ext import visudo, intro +from samba.gp.gpclass import gp_xml_ext, gp_file_applier +from samba.gp.gp_sudoers_ext import sudo_applier_func from samba.gp.util.logging import log -class vgp_sudoers_ext(gp_xml_ext): +class vgp_sudoers_ext(gp_xml_ext, gp_file_applier): def __str__(self): return 'VGP/Unix Settings/Sudo Rights' def process_group_policy(self, deleted_gpo_list, changed_gpo_list, sdir='/etc/sudoers.d'): for guid, settings in deleted_gpo_list: - self.gp_db.set_guid(guid) if str(self) in settings: for attribute, sudoers in settings[str(self)].items(): - if os.path.exists(sudoers): - os.unlink(sudoers) - self.gp_db.delete(str(self), attribute) - self.gp_db.commit() + self.unapply(guid, attribute, sudoers) for gpo in changed_gpo_list: if gpo.file_sys_path: - self.gp_db.set_guid(gpo.name) xml = 'MACHINE/VGP/VTLA/Sudo/SudoersConfiguration/manifest.xml' path = os.path.join(gpo.file_sys_path, xml) xml_conf = self.parse(path) @@ -47,6 +39,7 @@ class vgp_sudoers_ext(gp_xml_ext): continue policy = xml_conf.find('policysetting') data = policy.find('data') + sudo_entries = [] for entry in data.findall('sudoers_entry'): command = entry.find('command').text user = entry.find('user').text @@ -62,29 +55,17 @@ class vgp_sudoers_ext(gp_xml_ext): nopassword = entry.find('password') is None np_entry = ' NOPASSWD:' if nopassword else '' p = '%s ALL=(%s)%s %s' % (uname, user, np_entry, command) - attribute = b64encode(p.encode()).decode() - old_val = self.gp_db.retrieve(str(self), attribute) - if not old_val: - contents = intro - contents += '%s\n' % p - with NamedTemporaryFile() as f: - with open(f.name, 'w') as w: - w.write(contents) - sudo_validation = \ - Popen([visudo, '-c', '-f', f.name], - stdout=PIPE, stderr=PIPE).wait() - if sudo_validation == 0: - with NamedTemporaryFile(prefix='gp_', - delete=False, - dir=sdir) as f: - with open(f.name, 'w') as w: - w.write(contents) - self.gp_db.store(str(self), - attribute, - f.name) - else: - log.error('Sudoers apply failed', p) - self.gp_db.commit() + sudo_entries.append(p) + # Each GPO applies only one set of sudoers, in a + # set of files, so the attribute does not need uniqueness. + attribute = self.generate_attribute(gpo.name) + # The value hash is generated from the sudo_entries, ensuring + # any changes to this GPO will cause the files to be rewritten. + value_hash = self.generate_value_hash(*sudo_entries) + self.apply(gpo.name, attribute, value_hash, sudo_applier_func, + sdir, sudo_entries) + # Cleanup any old entries that are no longer part of the policy + self.clean(gpo.name, keep=[attribute]) def rsop(self, gpo): output = {} |