diff options
author | David Mulder <dmulder@suse.com> | 2022-04-12 12:50:25 -0600 |
---|---|---|
committer | Jeremy Allison <jra@samba.org> | 2022-05-03 21:48:57 +0000 |
commit | ddeedcb6b2a036e5d51be94c7f7215543ff3f163 (patch) | |
tree | e5c0e6024af962bf127bbcb2d2cc4a137d619385 | |
parent | a54d707435b1d5098028d72a2021dbb463ef1d03 (diff) | |
download | samba-ddeedcb6b2a036e5d51be94c7f7215543ff3f163.tar.gz |
gpo: Add Cert Auto Enroll Advanced Config
Advanced configuration for Certifcate Auto
Enrollment is stored on the sysvol, and needs
to be parsed/used when provided.
Signed-off-by: David Mulder <dmulder@suse.com>
Reviewed-by: Jeremy Allison <jra@samba.org>
Autobuild-User(master): Jeremy Allison <jra@samba.org>
Autobuild-Date(master): Tue May 3 21:48:57 UTC 2022 on sn-devel-184
-rw-r--r-- | python/samba/gp_cert_auto_enroll_ext.py | 196 | ||||
-rw-r--r-- | selftest/knownfail.d/gpo | 1 |
2 files changed, 183 insertions, 14 deletions
diff --git a/python/samba/gp_cert_auto_enroll_ext.py b/python/samba/gp_cert_auto_enroll_ext.py index 45ef4de8047..b2903f3b7e0 100644 --- a/python/samba/gp_cert_auto_enroll_ext.py +++ b/python/samba/gp_cert_auto_enroll_ext.py @@ -15,9 +15,10 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os +import operator from samba.gpclass import gp_pol_ext from samba import Ldb -from ldb import SCOPE_SUBTREE +from ldb import SCOPE_SUBTREE, SCOPE_BASE from samba.auth import system_session from samba.gpclass import get_dc_hostname import base64 @@ -35,6 +36,8 @@ cert_wrap = b""" %s -----END CERTIFICATE-----""" global_trust_dir = '/etc/pki/trust/anchors' +endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \ + '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP' def octet_string_to_objectGUID(data): return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0], @@ -44,6 +47,84 @@ def octet_string_to_objectGUID(data): '%02x%02x' % struct.unpack('>HL', data[10:])) ''' +Group and Sort End Point Information +[MS-CAESO] 4.4.5.3.2.3 +In this step autoenrollment processes the end point information by grouping it +by CEP ID and sorting in the order with which it will use the end point to +access the CEP information. +''' +def group_and_sort_end_point_information(end_point_information): + # Create groups of the CertificateEnrollmentPolicyEndPoint instances that + # have the same value of the EndPoint.PolicyID datum. + end_point_groups = {} + for e in end_point_information: + if e['PolicyID'] not in end_point_groups.keys(): + end_point_groups[e['PolicyID']] = [] + end_point_groups[e['PolicyID']].append(e) + + # Sort each group by following these rules: + for end_point_group in end_point_groups.values(): + # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending + # order based on the EndPoint.Cost value. + end_point_group.sort(key=lambda e: e['Cost']) + + # For instances that have the same EndPoint.Cost: + cost_list = [e['Cost'] for e in end_point_group] + costs = set(cost_list) + for cost in costs: + i = cost_list.index(cost) + j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1 + if i == j: + continue + + # Sort those that have EndPoint.Authentication equal to Kerberos + # first. Then sort those that have EndPoint.Authentication equal to + # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint + # instances follow in an arbitrary order. + def sort_auth(e): + # 0x2 - Kerberos + if e['AuthFlags'] == 0x2: + return 0 + # 0x1 - Anonymous + elif e['AuthFlags'] == 0x1: + return 1 + else: + return 2 + end_point_group[i:j+1] = sorted(end_point_group[i:j+1], + key=sort_auth) + return list(end_point_groups.values()) + +''' +Obtaining End Point Information +[MS-CAESO] 4.4.5.3.2.2 +In this step autoenrollment initializes the +CertificateEnrollmentPolicyEndPoints table. +''' +def obtain_end_point_information(entries): + end_point_information = {} + section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\' + for e in entries: + if not e.keyname.startswith(section): + continue + name = e.keyname.replace(section, '') + if name not in end_point_information.keys(): + end_point_information[name] = {} + end_point_information[name][e.valuename] = e.data + for ca in end_point_information.values(): + m = re.match(endpoint_re, ca['URL']) + if m: + name = '%s-CA' % m.group('server').replace('.', '-') + ca['name'] = name + ca['hostname'] = m.group('server') + ca['auth'] = m.group('auth') + else: + edata = { 'endpoint': ca['URL'] } + log.error('Failed to parse the endpoint', edata) + end_point_information = \ + group_and_sort_end_point_information(end_point_information.values()) + return end_point_information + +''' Initializing CAs [MS-CAESO] 4.4.5.3.1.2 ''' @@ -228,18 +309,96 @@ class gp_cert_auto_enroll_ext(gp_pol_ext): manage = e.data & 0x2 == 1 retrive_pending = e.data & 0x4 == 1 if enroll: - url = 'ldap://%s' % get_dc_hostname(self.creds, - self.lp) - ldb = Ldb(url=url, session_info=system_session(), - lp=self.lp, credentials=self.creds) - cas = fetch_certification_authorities(ldb) - for ca in cas: - data = cert_enroll(ca, ldb, trust_dir, private_dir) - self.gp_db.store(str(self), - base64.b64encode(ca['name']).decode(), - data) + self.__enroll(pol_conf.entries, trust_dir, + private_dir) self.gp_db.commit() + ''' + Read CEP Data + [MS-CAESO] 4.4.5.3.2.4 + In this step autoenrollment initializes instances of the + CertificateEnrollmentPolicy by accessing end points associated with CEP + groups created in the previous step. + ''' + def __read_cep_data(self, ldb, end_point_information, + trust_dir, private_dir): + # For each group created in the previous step: + for end_point_group in end_point_information: + # Pick an arbitrary instance of the + # CertificateEnrollmentPolicyEndPoint from the group + e = end_point_group[0] + + # If this instance does not have the AutoEnrollmentEnabled flag set + # in the EndPoint.Flags, continue with the next group. + if not e['Flags'] & 0x10: + continue + + # If the current group contains a + # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI + # equal to "LDAP": + if any([e['URL'] == 'LDAP:' for e in end_point_group]): + # Perform an LDAP search to read the value of the objectGuid + # attribute of the root object of the forest root domain NC. If + # any errors are encountered, continue with the next group. + res = ldb.search('', SCOPE_BASE, '(objectClass=*)', + ['rootDomainNamingContext']) + if len(res) != 1: + continue + res2 = ldb.search(res[0]['rootDomainNamingContext'][0], + SCOPE_BASE, '(objectClass=*)', + ['objectGUID']) + if len(res2) != 1: + continue + + # Compare the value read in the previous step to the + # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint + # instance. If the values do not match, continue with the next + # group. + objectGUID = '{%s}' % \ + octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper() + if objectGUID != e['PolicyID']: + continue + + # For each CertificateEnrollmentPolicyEndPoint instance for that + # group: + for ca in end_point_group: + # If EndPoint.URI equals "LDAP": + if ca['URL'] == 'LDAP:': + # This is a basic configuration. + cas = fetch_certification_authorities(ldb) + for ca in cas: + data = cert_enroll(ca, ldb, trust_dir, private_dir) + self.gp_db.store(str(self), + base64.b64encode(ca['name']).decode(), + data) + # If EndPoint.URI starts with "HTTPS//": + elif ca['URL'].lower().startswith('https://'): + data = cert_enroll(ca, ldb, trust_dir, + private_dir, auth=ca['auth']) + self.gp_db.store(str(self), + base64.b64encode(ca['name'].encode()).decode(), + data) + else: + edata = { 'endpoint': ca['URL'] } + log.error('Unrecognized endpoint', edata) + + def __enroll(self, entries, trust_dir, private_dir): + url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp) + ldb = Ldb(url=url, session_info=system_session(), + lp=self.lp, credentials=self.creds) + + end_point_information = obtain_end_point_information(entries) + if len(end_point_information) > 0: + for end_point_group in end_point_information: + self.__read_cep_data(ldb, end_point_information, + trust_dir, private_dir) + else: + cas = fetch_certification_authorities(ldb) + for ca in cas: + data = cert_enroll(ca, ldb, trust_dir, private_dir) + self.gp_db.store(str(self), + base64.b64encode(ca['name']).decode(), data) + def rsop(self, gpo): output = {} pol_file = 'MACHINE/Registry.pol' @@ -258,15 +417,26 @@ class gp_cert_auto_enroll_ext(gp_pol_ext): url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp) ldb = Ldb(url=url, session_info=system_session(), lp=self.lp, credentials=self.creds) + end_point_information = \ + obtain_end_point_information(pol_conf.entries) cas = fetch_certification_authorities(ldb) + if len(end_point_information) > 0: + cas2 = [ep for sl in end_point_information for ep in sl] + if any([ca['URL'] == 'LDAP:' for ca in cas2]): + cas.extend(cas2) + else: + cas = cas2 for ca in cas: + if 'URL' in ca and ca['URL'] == 'LDAP:': + continue policy = 'Auto Enrollment Policy' cn = ca['name'] if policy not in output: output[policy] = {} output[policy][cn] = {} - output[policy][cn]['CA Certificate'] = \ - format_root_cert(ca['cACertificate']).decode() + if 'cACertificate' in ca: + output[policy][cn]['CA Certificate'] = \ + format_root_cert(ca['cACertificate']).decode() output[policy][cn]['Auto Enrollment Server'] = \ ca['hostname'] supported_templates = \ diff --git a/selftest/knownfail.d/gpo b/selftest/knownfail.d/gpo deleted file mode 100644 index 5961cac5d66..00000000000 --- a/selftest/knownfail.d/gpo +++ /dev/null @@ -1 +0,0 @@ -samba.tests.gpo.samba.tests.gpo.GPOTests.test_advanced_gp_cert_auto_enroll_ext |