diff options
author | David Mulder <dmulder@samba.org> | 2022-12-06 08:56:24 -0700 |
---|---|---|
committer | Jeremy Allison <jra@samba.org> | 2022-12-21 02:04:37 +0000 |
commit | f36542b50c70de74144b9e2e94e91944e8536421 (patch) | |
tree | 1ee53cf486909674f3a859e5a50abe99d14c6a0d /python/samba | |
parent | acdc7fbe8985a16d41075b72f54a96f217c3f884 (diff) | |
download | samba-f36542b50c70de74144b9e2e94e91944e8536421.tar.gz |
gp: Modify Cert Auto Enroll CSE to use new applier
Signed-off-by: David Mulder <dmulder@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'python/samba')
-rw-r--r-- | python/samba/gp/gp_cert_auto_enroll_ext.py | 114 |
1 files changed, 73 insertions, 41 deletions
diff --git a/python/samba/gp/gp_cert_auto_enroll_ext.py b/python/samba/gp/gp_cert_auto_enroll_ext.py index f1dcf6bbafb..3be8931bf1e 100644 --- a/python/samba/gp/gp_cert_auto_enroll_ext.py +++ b/python/samba/gp/gp_cert_auto_enroll_ext.py @@ -17,7 +17,7 @@ import os import operator import requests -from samba.gp.gpclass import gp_pol_ext +from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE from samba import Ldb from ldb import SCOPE_SUBTREE, SCOPE_BASE from samba.auth import system_session @@ -39,6 +39,7 @@ except ModuleNotFoundError: from cryptography.hazmat.primitives.serialization import Encoding from cryptography.x509 import load_der_x509_certificate from cryptography.hazmat.backends import default_backend +from samba.common import get_string cert_wrap = b""" -----BEGIN CERTIFICATE----- @@ -153,9 +154,9 @@ def fetch_certification_authorities(ldb): if len(res) == 0: return result for es in res: - data = { 'name': es['cn'][0], - 'hostname': es['dNSHostName'][0], - 'cACertificate': es['cACertificate'][0] + data = { 'name': get_string(es['cn'][0]), + 'hostname': get_string(es['dNSHostName'][0]), + 'cACertificate': get_string(es['cACertificate'][0]) } result.append(data) return result @@ -171,7 +172,7 @@ def fetch_template_attrs(ldb, name, attrs=['msPKI-Minimal-Key-Size']): return {'msPKI-Minimal-Key-Size': ['2048']} def format_root_cert(cert): - cert = base64.b64encode(cert) + cert = base64.b64encode(cert.encode()) return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert, 0, re.DOTALL) def find_cepces_submit(): @@ -248,7 +249,7 @@ def getca(ca, url, trust_dir): def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'): """Install the root certificate chain.""" - data = {'files': [], 'templates': []} + data = dict({'files': [], 'templates': []}, **ca) url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname'] root_certs = getca(ca, url, trust_dir) data['files'].extend(root_certs) @@ -311,10 +312,42 @@ def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'): 'certificate auto enrollment to work') return json.dumps(data) -class gp_cert_auto_enroll_ext(gp_pol_ext): +class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier): def __str__(self): return 'Cryptography\AutoEnrollment' + def unapply(self, guid, attribute, value): + ca_cn = base64.b64decode(attribute) + data = json.loads(value) + getcert = which('getcert') + if getcert is not None: + Popen([getcert, 'remove-ca', '-c', ca_cn]).wait() + for nickname in data['templates']: + Popen([getcert, 'stop-tracking', '-i', nickname]).wait() + for f in data['files']: + if os.path.exists(f): + if os.path.exists(f): + os.unlink(f) + self.cache_remove_attribute(guid, attribute) + + def apply(self, guid, ca, applier_func, *args, **kwargs): + attribute = base64.b64encode(ca['name'].encode()).decode() + # If the policy has changed, unapply, then apply new policy + old_val = self.cache_get_attribute_value(guid, attribute) + old_data = json.loads(old_val) if old_val is not None else {} + if all([(ca[k] == old_data[k] if k in old_data else False) \ + for k in ca.keys()]) or \ + self.cache_get_apply_state() == GPOSTATE.ENFORCE: + self.unapply(guid, attribute, old_val) + # If policy is already applied, skip application + if old_val is not None and \ + self.cache_get_apply_state() != GPOSTATE.ENFORCE: + return + + # Apply the policy and log the changes + data = applier_func(*args, **kwargs) + self.cache_add_attribute(guid, attribute, data) + def process_group_policy(self, deleted_gpo_list, changed_gpo_list, trust_dir=None, private_dir=None): if trust_dir is None: @@ -327,27 +360,13 @@ class gp_cert_auto_enroll_ext(gp_pol_ext): os.mkdir(private_dir, mode=0o700) for guid, settings in deleted_gpo_list: - self.gp_db.set_guid(guid) if str(self) in settings: for ca_cn_enc, data in settings[str(self)].items(): - ca_cn = base64.b64decode(ca_cn_enc) - data = json.loads(data) - getcert = which('getcert') - if getcert is not None: - Popen([getcert, 'remove-ca', '-c', ca_cn]).wait() - for nickname in data['templates']: - Popen([getcert, 'stop-tracking', - '-i', nickname]).wait() - for f in data['files']: - if os.path.exists(f): - os.unlink(f) - self.gp_db.delete(str(self), ca_cn_enc) - self.gp_db.commit() + self.unapply(guid, ca_cn_enc, data) for gpo in changed_gpo_list: if gpo.file_sys_path: section = 'Software\Policies\Microsoft\Cryptography\AutoEnrollment' - self.gp_db.set_guid(gpo.name) pol_file = 'MACHINE/Registry.pol' path = os.path.join(gpo.file_sys_path, pol_file) pol_conf = self.parse(path) @@ -362,11 +381,22 @@ class gp_cert_auto_enroll_ext(gp_pol_ext): manage = e.data & 0x2 == 0x2 retrive_pending = e.data & 0x4 == 0x4 if enroll: - self.__enroll(pol_conf.entries, trust_dir, - private_dir) - self.gp_db.commit() + ca_names = self.__enroll(gpo.name, + pol_conf.entries, + trust_dir, private_dir) + + # Cleanup any old CAs that have been removed + ca_attrs = [base64.b64encode(n.encode()).decode() \ + for n in ca_names] + self.clean(gpo.name, keep=ca_attrs) + else: + # If enrollment has been disabled for this GPO, + # remove any existing policy + ca_attrs = \ + self.cache_get_all_attribute_values(gpo.name) + self.clean(gpo.name, remove=ca_attrs) - def __read_cep_data(self, ldb, end_point_information, + def __read_cep_data(self, guid, ldb, end_point_information, trust_dir, private_dir): """Read CEP Data. @@ -414,42 +444,44 @@ class gp_cert_auto_enroll_ext(gp_pol_ext): # For each CertificateEnrollmentPolicyEndPoint instance for that # group: + ca_names = [] for ca in end_point_group: # If EndPoint.URI equals "LDAP": if ca['URL'] == 'LDAP:': # This is a basic configuration. cas = fetch_certification_authorities(ldb) for _ca in cas: - data = cert_enroll(_ca, ldb, trust_dir, private_dir) - self.gp_db.store(str(self), - base64.b64encode(_ca['name']).decode(), - data) + self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir, + private_dir) + ca_names.append(_ca['name']) # If EndPoint.URI starts with "HTTPS//": elif ca['URL'].lower().startswith('https://'): - data = cert_enroll(ca, ldb, trust_dir, - private_dir, auth=ca['auth']) - self.gp_db.store(str(self), - base64.b64encode(ca['name'].encode()).decode(), - data) + self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir, + private_dir, auth=ca['auth']) + ca_names.append(ca['name']) else: edata = { 'endpoint': ca['URL'] } log.error('Unrecognized endpoint', edata) + return ca_names - def __enroll(self, entries, trust_dir, private_dir): + def __enroll(self, guid, entries, trust_dir, private_dir): url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp) ldb = Ldb(url=url, session_info=system_session(), lp=self.lp, credentials=self.creds) + ca_names = [] end_point_information = obtain_end_point_information(entries) if len(end_point_information) > 0: - self.__read_cep_data(ldb, end_point_information, - trust_dir, private_dir) + ca_names.extend(self.__read_cep_data(guid, ldb, + end_point_information, + trust_dir, private_dir)) else: cas = fetch_certification_authorities(ldb) for ca in cas: - data = cert_enroll(ca, ldb, trust_dir, private_dir) - self.gp_db.store(str(self), - base64.b64encode(ca['name']).decode(), data) + self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir, + private_dir) + ca_names.append(ca['name']) + return ca_names def rsop(self, gpo): output = {} |