summaryrefslogtreecommitdiff
path: root/python/samba
diff options
context:
space:
mode:
authorDavid Mulder <dmulder@samba.org>2022-12-06 08:56:24 -0700
committerJeremy Allison <jra@samba.org>2022-12-21 02:04:37 +0000
commitf36542b50c70de74144b9e2e94e91944e8536421 (patch)
tree1ee53cf486909674f3a859e5a50abe99d14c6a0d /python/samba
parentacdc7fbe8985a16d41075b72f54a96f217c3f884 (diff)
downloadsamba-f36542b50c70de74144b9e2e94e91944e8536421.tar.gz
gp: Modify Cert Auto Enroll CSE to use new applier
Signed-off-by: David Mulder <dmulder@samba.org> Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'python/samba')
-rw-r--r--python/samba/gp/gp_cert_auto_enroll_ext.py114
1 files changed, 73 insertions, 41 deletions
diff --git a/python/samba/gp/gp_cert_auto_enroll_ext.py b/python/samba/gp/gp_cert_auto_enroll_ext.py
index f1dcf6bbafb..3be8931bf1e 100644
--- a/python/samba/gp/gp_cert_auto_enroll_ext.py
+++ b/python/samba/gp/gp_cert_auto_enroll_ext.py
@@ -17,7 +17,7 @@
import os
import operator
import requests
-from samba.gp.gpclass import gp_pol_ext
+from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE
from samba import Ldb
from ldb import SCOPE_SUBTREE, SCOPE_BASE
from samba.auth import system_session
@@ -39,6 +39,7 @@ except ModuleNotFoundError:
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509 import load_der_x509_certificate
from cryptography.hazmat.backends import default_backend
+from samba.common import get_string
cert_wrap = b"""
-----BEGIN CERTIFICATE-----
@@ -153,9 +154,9 @@ def fetch_certification_authorities(ldb):
if len(res) == 0:
return result
for es in res:
- data = { 'name': es['cn'][0],
- 'hostname': es['dNSHostName'][0],
- 'cACertificate': es['cACertificate'][0]
+ data = { 'name': get_string(es['cn'][0]),
+ 'hostname': get_string(es['dNSHostName'][0]),
+ 'cACertificate': get_string(es['cACertificate'][0])
}
result.append(data)
return result
@@ -171,7 +172,7 @@ def fetch_template_attrs(ldb, name, attrs=['msPKI-Minimal-Key-Size']):
return {'msPKI-Minimal-Key-Size': ['2048']}
def format_root_cert(cert):
- cert = base64.b64encode(cert)
+ cert = base64.b64encode(cert.encode())
return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert, 0, re.DOTALL)
def find_cepces_submit():
@@ -248,7 +249,7 @@ def getca(ca, url, trust_dir):
def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
"""Install the root certificate chain."""
- data = {'files': [], 'templates': []}
+ data = dict({'files': [], 'templates': []}, **ca)
url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
root_certs = getca(ca, url, trust_dir)
data['files'].extend(root_certs)
@@ -311,10 +312,42 @@ def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
'certificate auto enrollment to work')
return json.dumps(data)
-class gp_cert_auto_enroll_ext(gp_pol_ext):
+class gp_cert_auto_enroll_ext(gp_pol_ext, gp_applier):
def __str__(self):
return 'Cryptography\AutoEnrollment'
+ def unapply(self, guid, attribute, value):
+ ca_cn = base64.b64decode(attribute)
+ data = json.loads(value)
+ getcert = which('getcert')
+ if getcert is not None:
+ Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
+ for nickname in data['templates']:
+ Popen([getcert, 'stop-tracking', '-i', nickname]).wait()
+ for f in data['files']:
+ if os.path.exists(f):
+ if os.path.exists(f):
+ os.unlink(f)
+ self.cache_remove_attribute(guid, attribute)
+
+ def apply(self, guid, ca, applier_func, *args, **kwargs):
+ attribute = base64.b64encode(ca['name'].encode()).decode()
+ # If the policy has changed, unapply, then apply new policy
+ old_val = self.cache_get_attribute_value(guid, attribute)
+ old_data = json.loads(old_val) if old_val is not None else {}
+ if all([(ca[k] == old_data[k] if k in old_data else False) \
+ for k in ca.keys()]) or \
+ self.cache_get_apply_state() == GPOSTATE.ENFORCE:
+ self.unapply(guid, attribute, old_val)
+ # If policy is already applied, skip application
+ if old_val is not None and \
+ self.cache_get_apply_state() != GPOSTATE.ENFORCE:
+ return
+
+ # Apply the policy and log the changes
+ data = applier_func(*args, **kwargs)
+ self.cache_add_attribute(guid, attribute, data)
+
def process_group_policy(self, deleted_gpo_list, changed_gpo_list,
trust_dir=None, private_dir=None):
if trust_dir is None:
@@ -327,27 +360,13 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
os.mkdir(private_dir, mode=0o700)
for guid, settings in deleted_gpo_list:
- self.gp_db.set_guid(guid)
if str(self) in settings:
for ca_cn_enc, data in settings[str(self)].items():
- ca_cn = base64.b64decode(ca_cn_enc)
- data = json.loads(data)
- getcert = which('getcert')
- if getcert is not None:
- Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
- for nickname in data['templates']:
- Popen([getcert, 'stop-tracking',
- '-i', nickname]).wait()
- for f in data['files']:
- if os.path.exists(f):
- os.unlink(f)
- self.gp_db.delete(str(self), ca_cn_enc)
- self.gp_db.commit()
+ self.unapply(guid, ca_cn_enc, data)
for gpo in changed_gpo_list:
if gpo.file_sys_path:
section = 'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
- self.gp_db.set_guid(gpo.name)
pol_file = 'MACHINE/Registry.pol'
path = os.path.join(gpo.file_sys_path, pol_file)
pol_conf = self.parse(path)
@@ -362,11 +381,22 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
manage = e.data & 0x2 == 0x2
retrive_pending = e.data & 0x4 == 0x4
if enroll:
- self.__enroll(pol_conf.entries, trust_dir,
- private_dir)
- self.gp_db.commit()
+ ca_names = self.__enroll(gpo.name,
+ pol_conf.entries,
+ trust_dir, private_dir)
+
+ # Cleanup any old CAs that have been removed
+ ca_attrs = [base64.b64encode(n.encode()).decode() \
+ for n in ca_names]
+ self.clean(gpo.name, keep=ca_attrs)
+ else:
+ # If enrollment has been disabled for this GPO,
+ # remove any existing policy
+ ca_attrs = \
+ self.cache_get_all_attribute_values(gpo.name)
+ self.clean(gpo.name, remove=ca_attrs)
- def __read_cep_data(self, ldb, end_point_information,
+ def __read_cep_data(self, guid, ldb, end_point_information,
trust_dir, private_dir):
"""Read CEP Data.
@@ -414,42 +444,44 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
# For each CertificateEnrollmentPolicyEndPoint instance for that
# group:
+ ca_names = []
for ca in end_point_group:
# If EndPoint.URI equals "LDAP":
if ca['URL'] == 'LDAP:':
# This is a basic configuration.
cas = fetch_certification_authorities(ldb)
for _ca in cas:
- data = cert_enroll(_ca, ldb, trust_dir, private_dir)
- self.gp_db.store(str(self),
- base64.b64encode(_ca['name']).decode(),
- data)
+ self.apply(guid, _ca, cert_enroll, _ca, ldb, trust_dir,
+ private_dir)
+ ca_names.append(_ca['name'])
# If EndPoint.URI starts with "HTTPS//":
elif ca['URL'].lower().startswith('https://'):
- data = cert_enroll(ca, ldb, trust_dir,
- private_dir, auth=ca['auth'])
- self.gp_db.store(str(self),
- base64.b64encode(ca['name'].encode()).decode(),
- data)
+ self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
+ private_dir, auth=ca['auth'])
+ ca_names.append(ca['name'])
else:
edata = { 'endpoint': ca['URL'] }
log.error('Unrecognized endpoint', edata)
+ return ca_names
- def __enroll(self, entries, trust_dir, private_dir):
+ def __enroll(self, guid, entries, trust_dir, private_dir):
url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
ldb = Ldb(url=url, session_info=system_session(),
lp=self.lp, credentials=self.creds)
+ ca_names = []
end_point_information = obtain_end_point_information(entries)
if len(end_point_information) > 0:
- self.__read_cep_data(ldb, end_point_information,
- trust_dir, private_dir)
+ ca_names.extend(self.__read_cep_data(guid, ldb,
+ end_point_information,
+ trust_dir, private_dir))
else:
cas = fetch_certification_authorities(ldb)
for ca in cas:
- data = cert_enroll(ca, ldb, trust_dir, private_dir)
- self.gp_db.store(str(self),
- base64.b64encode(ca['name']).decode(), data)
+ self.apply(guid, ca, cert_enroll, ca, ldb, trust_dir,
+ private_dir)
+ ca_names.append(ca['name'])
+ return ca_names
def rsop(self, gpo):
output = {}