summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Mulder <dmulder@suse.com>2022-04-12 12:50:25 -0600
committerJeremy Allison <jra@samba.org>2022-05-03 21:48:57 +0000
commitddeedcb6b2a036e5d51be94c7f7215543ff3f163 (patch)
treee5c0e6024af962bf127bbcb2d2cc4a137d619385
parenta54d707435b1d5098028d72a2021dbb463ef1d03 (diff)
downloadsamba-ddeedcb6b2a036e5d51be94c7f7215543ff3f163.tar.gz
gpo: Add Cert Auto Enroll Advanced Config
Advanced configuration for Certifcate Auto Enrollment is stored on the sysvol, and needs to be parsed/used when provided. Signed-off-by: David Mulder <dmulder@suse.com> Reviewed-by: Jeremy Allison <jra@samba.org> Autobuild-User(master): Jeremy Allison <jra@samba.org> Autobuild-Date(master): Tue May 3 21:48:57 UTC 2022 on sn-devel-184
-rw-r--r--python/samba/gp_cert_auto_enroll_ext.py196
-rw-r--r--selftest/knownfail.d/gpo1
2 files changed, 183 insertions, 14 deletions
diff --git a/python/samba/gp_cert_auto_enroll_ext.py b/python/samba/gp_cert_auto_enroll_ext.py
index 45ef4de8047..b2903f3b7e0 100644
--- a/python/samba/gp_cert_auto_enroll_ext.py
+++ b/python/samba/gp_cert_auto_enroll_ext.py
@@ -15,9 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
+import operator
from samba.gpclass import gp_pol_ext
from samba import Ldb
-from ldb import SCOPE_SUBTREE
+from ldb import SCOPE_SUBTREE, SCOPE_BASE
from samba.auth import system_session
from samba.gpclass import get_dc_hostname
import base64
@@ -35,6 +36,8 @@ cert_wrap = b"""
%s
-----END CERTIFICATE-----"""
global_trust_dir = '/etc/pki/trust/anchors'
+endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
+ '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
def octet_string_to_objectGUID(data):
return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
@@ -44,6 +47,84 @@ def octet_string_to_objectGUID(data):
'%02x%02x' % struct.unpack('>HL', data[10:]))
'''
+Group and Sort End Point Information
+[MS-CAESO] 4.4.5.3.2.3
+In this step autoenrollment processes the end point information by grouping it
+by CEP ID and sorting in the order with which it will use the end point to
+access the CEP information.
+'''
+def group_and_sort_end_point_information(end_point_information):
+ # Create groups of the CertificateEnrollmentPolicyEndPoint instances that
+ # have the same value of the EndPoint.PolicyID datum.
+ end_point_groups = {}
+ for e in end_point_information:
+ if e['PolicyID'] not in end_point_groups.keys():
+ end_point_groups[e['PolicyID']] = []
+ end_point_groups[e['PolicyID']].append(e)
+
+ # Sort each group by following these rules:
+ for end_point_group in end_point_groups.values():
+ # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
+ # order based on the EndPoint.Cost value.
+ end_point_group.sort(key=lambda e: e['Cost'])
+
+ # For instances that have the same EndPoint.Cost:
+ cost_list = [e['Cost'] for e in end_point_group]
+ costs = set(cost_list)
+ for cost in costs:
+ i = cost_list.index(cost)
+ j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
+ if i == j:
+ continue
+
+ # Sort those that have EndPoint.Authentication equal to Kerberos
+ # first. Then sort those that have EndPoint.Authentication equal to
+ # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
+ # instances follow in an arbitrary order.
+ def sort_auth(e):
+ # 0x2 - Kerberos
+ if e['AuthFlags'] == 0x2:
+ return 0
+ # 0x1 - Anonymous
+ elif e['AuthFlags'] == 0x1:
+ return 1
+ else:
+ return 2
+ end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
+ key=sort_auth)
+ return list(end_point_groups.values())
+
+'''
+Obtaining End Point Information
+[MS-CAESO] 4.4.5.3.2.2
+In this step autoenrollment initializes the
+CertificateEnrollmentPolicyEndPoints table.
+'''
+def obtain_end_point_information(entries):
+ end_point_information = {}
+ section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
+ for e in entries:
+ if not e.keyname.startswith(section):
+ continue
+ name = e.keyname.replace(section, '')
+ if name not in end_point_information.keys():
+ end_point_information[name] = {}
+ end_point_information[name][e.valuename] = e.data
+ for ca in end_point_information.values():
+ m = re.match(endpoint_re, ca['URL'])
+ if m:
+ name = '%s-CA' % m.group('server').replace('.', '-')
+ ca['name'] = name
+ ca['hostname'] = m.group('server')
+ ca['auth'] = m.group('auth')
+ else:
+ edata = { 'endpoint': ca['URL'] }
+ log.error('Failed to parse the endpoint', edata)
+ end_point_information = \
+ group_and_sort_end_point_information(end_point_information.values())
+ return end_point_information
+
+'''
Initializing CAs
[MS-CAESO] 4.4.5.3.1.2
'''
@@ -228,18 +309,96 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
manage = e.data & 0x2 == 1
retrive_pending = e.data & 0x4 == 1
if enroll:
- url = 'ldap://%s' % get_dc_hostname(self.creds,
- self.lp)
- ldb = Ldb(url=url, session_info=system_session(),
- lp=self.lp, credentials=self.creds)
- cas = fetch_certification_authorities(ldb)
- for ca in cas:
- data = cert_enroll(ca, ldb, trust_dir, private_dir)
- self.gp_db.store(str(self),
- base64.b64encode(ca['name']).decode(),
- data)
+ self.__enroll(pol_conf.entries, trust_dir,
+ private_dir)
self.gp_db.commit()
+ '''
+ Read CEP Data
+ [MS-CAESO] 4.4.5.3.2.4
+ In this step autoenrollment initializes instances of the
+ CertificateEnrollmentPolicy by accessing end points associated with CEP
+ groups created in the previous step.
+ '''
+ def __read_cep_data(self, ldb, end_point_information,
+ trust_dir, private_dir):
+ # For each group created in the previous step:
+ for end_point_group in end_point_information:
+ # Pick an arbitrary instance of the
+ # CertificateEnrollmentPolicyEndPoint from the group
+ e = end_point_group[0]
+
+ # If this instance does not have the AutoEnrollmentEnabled flag set
+ # in the EndPoint.Flags, continue with the next group.
+ if not e['Flags'] & 0x10:
+ continue
+
+ # If the current group contains a
+ # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
+ # equal to "LDAP":
+ if any([e['URL'] == 'LDAP:' for e in end_point_group]):
+ # Perform an LDAP search to read the value of the objectGuid
+ # attribute of the root object of the forest root domain NC. If
+ # any errors are encountered, continue with the next group.
+ res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
+ ['rootDomainNamingContext'])
+ if len(res) != 1:
+ continue
+ res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
+ SCOPE_BASE, '(objectClass=*)',
+ ['objectGUID'])
+ if len(res2) != 1:
+ continue
+
+ # Compare the value read in the previous step to the
+ # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
+ # instance. If the values do not match, continue with the next
+ # group.
+ objectGUID = '{%s}' % \
+ octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
+ if objectGUID != e['PolicyID']:
+ continue
+
+ # For each CertificateEnrollmentPolicyEndPoint instance for that
+ # group:
+ for ca in end_point_group:
+ # If EndPoint.URI equals "LDAP":
+ if ca['URL'] == 'LDAP:':
+ # This is a basic configuration.
+ cas = fetch_certification_authorities(ldb)
+ for ca in cas:
+ data = cert_enroll(ca, ldb, trust_dir, private_dir)
+ self.gp_db.store(str(self),
+ base64.b64encode(ca['name']).decode(),
+ data)
+ # If EndPoint.URI starts with "HTTPS//":
+ elif ca['URL'].lower().startswith('https://'):
+ data = cert_enroll(ca, ldb, trust_dir,
+ private_dir, auth=ca['auth'])
+ self.gp_db.store(str(self),
+ base64.b64encode(ca['name'].encode()).decode(),
+ data)
+ else:
+ edata = { 'endpoint': ca['URL'] }
+ log.error('Unrecognized endpoint', edata)
+
+ def __enroll(self, entries, trust_dir, private_dir):
+ url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
+ ldb = Ldb(url=url, session_info=system_session(),
+ lp=self.lp, credentials=self.creds)
+
+ end_point_information = obtain_end_point_information(entries)
+ if len(end_point_information) > 0:
+ for end_point_group in end_point_information:
+ self.__read_cep_data(ldb, end_point_information,
+ trust_dir, private_dir)
+ else:
+ cas = fetch_certification_authorities(ldb)
+ for ca in cas:
+ data = cert_enroll(ca, ldb, trust_dir, private_dir)
+ self.gp_db.store(str(self),
+ base64.b64encode(ca['name']).decode(), data)
+
def rsop(self, gpo):
output = {}
pol_file = 'MACHINE/Registry.pol'
@@ -258,15 +417,26 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
ldb = Ldb(url=url, session_info=system_session(),
lp=self.lp, credentials=self.creds)
+ end_point_information = \
+ obtain_end_point_information(pol_conf.entries)
cas = fetch_certification_authorities(ldb)
+ if len(end_point_information) > 0:
+ cas2 = [ep for sl in end_point_information for ep in sl]
+ if any([ca['URL'] == 'LDAP:' for ca in cas2]):
+ cas.extend(cas2)
+ else:
+ cas = cas2
for ca in cas:
+ if 'URL' in ca and ca['URL'] == 'LDAP:':
+ continue
policy = 'Auto Enrollment Policy'
cn = ca['name']
if policy not in output:
output[policy] = {}
output[policy][cn] = {}
- output[policy][cn]['CA Certificate'] = \
- format_root_cert(ca['cACertificate']).decode()
+ if 'cACertificate' in ca:
+ output[policy][cn]['CA Certificate'] = \
+ format_root_cert(ca['cACertificate']).decode()
output[policy][cn]['Auto Enrollment Server'] = \
ca['hostname']
supported_templates = \
diff --git a/selftest/knownfail.d/gpo b/selftest/knownfail.d/gpo
deleted file mode 100644
index 5961cac5d66..00000000000
--- a/selftest/knownfail.d/gpo
+++ /dev/null
@@ -1 +0,0 @@
-samba.tests.gpo.samba.tests.gpo.GPOTests.test_advanced_gp_cert_auto_enroll_ext