summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDavid Mulder <dmulder@samba.org>2023-03-14 11:21:02 -0600
committerAndrew Bartlett <abartlet@samba.org>2023-04-28 02:15:36 +0000
commita8bad5d5b859a2a76ce18919fbe2bf42f8ef7562 (patch)
tree4f57960e61470f6e38cba2aac5ee9fe1cbee18a0 /python
parent848bce061afa514a2cc340f1b8895f83129ebd1a (diff)
downloadsamba-a8bad5d5b859a2a76ce18919fbe2bf42f8ef7562.tar.gz
gpupdate: Implement get_gpo_list in python
The ADS code in libgpo is buggy. Rewrite get_gpo_list in python using SamDB. BUG: https://bugzilla.samba.org/show_bug.cgi?id=15225 Signed-off-by: David Mulder <dmulder@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rw-r--r--python/samba/gp/gpclass.py298
-rw-r--r--python/samba/tests/gpo.py142
2 files changed, 349 insertions, 91 deletions
diff --git a/python/samba/gp/gpclass.py b/python/samba/gp/gpclass.py
index 605f94f3317..c28de3f7134 100644
--- a/python/samba/gp/gpclass.py
+++ b/python/samba/gp/gpclass.py
@@ -44,6 +44,15 @@ from samba.gp.util.logging import log
from hashlib import blake2b
import numbers
from samba.common import get_string
+from samba.samdb import SamDB
+from samba.auth import system_session
+import ldb
+from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_SERVER_TRUST_ACCOUNT, GPLINK_OPT_ENFORCE, GPLINK_OPT_DISABLE, GPO_INHERIT, GPO_BLOCK_INHERITANCE
+from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
+from samba.dcerpc import security
+import samba.security
+from samba.dcerpc import netlogon
+
try:
from enum import Enum
@@ -577,13 +586,290 @@ def get_dc_hostname(creds, lp):
''' Fetch a list of GUIDs for applicable GPOs '''
+def get_gpo(samdb, gpo_dn):
+ g = gpo.GROUP_POLICY_OBJECT()
+ attrs = [
+ "cn",
+ "displayName",
+ "flags",
+ "gPCFileSysPath",
+ "gPCFunctionalityVersion",
+ "gPCMachineExtensionNames",
+ "gPCUserExtensionNames",
+ "gPCWQLFilter",
+ "name",
+ "nTSecurityDescriptor",
+ "versionNumber"
+ ]
+ if gpo_dn.startswith("LDAP://"):
+ gpo_dn = gpo_dn.lstrip("LDAP://")
+
+ sd_flags = (security.SECINFO_OWNER |
+ security.SECINFO_GROUP |
+ security.SECINFO_DACL)
+ try:
+ res = samdb.search(gpo_dn, ldb.SCOPE_BASE, "(objectclass=*)", attrs,
+ controls=['sd_flags:1:%d' % sd_flags])
+ except Exception:
+ log.error('Failed to fetch gpo object with nTSecurityDescriptor')
+ return
+ if res.count != 1:
+ raise ldb.LdbError(ldb.ERR_NO_SUCH_OBJECT,
+ 'get_gpo: search failed')
+
+ g.ds_path = gpo_dn
+ if 'versionNumber' in res.msgs[0].keys():
+ g.version = int(res.msgs[0]['versionNumber'][0])
+ if 'flags' in res.msgs[0].keys():
+ g.options = int(res.msgs[0]['flags'][0])
+ if 'gPCFileSysPath' in res.msgs[0].keys():
+ g.file_sys_path = res.msgs[0]['gPCFileSysPath'][0].decode()
+ if 'displayName' in res.msgs[0].keys():
+ g.display_name = res.msgs[0]['displayName'][0].decode()
+ if 'name' in res.msgs[0].keys():
+ g.name = res.msgs[0]['name'][0].decode()
+ if 'gPCMachineExtensionNames' in res.msgs[0].keys():
+ g.machine_extensions = str(res.msgs[0]['gPCMachineExtensionNames'][0])
+ if 'gPCUserExtensionNames' in res.msgs[0].keys():
+ g.user_extensions = str(res.msgs[0]['gPCUserExtensionNames'][0])
+ if 'nTSecurityDescriptor' in res.msgs[0].keys():
+ g.set_sec_desc(bytes(res.msgs[0]['nTSecurityDescriptor'][0]))
+ return g
+
+class GP_LINK:
+ def __init__(self, gPLink, gPOptions):
+ self.link_names = []
+ self.link_opts = []
+ self.gpo_parse_gplink(gPLink)
+ self.gp_opts = int(gPOptions)
+
+ def gpo_parse_gplink(self, gPLink):
+ for p in gPLink.decode().split(']'):
+ if not p:
+ continue
+ log.debug('gpo_parse_gplink: processing link')
+ p = p.lstrip('[')
+ link_name, link_opt = p.split(';')
+ log.debug('gpo_parse_gplink: link: {}'.format(link_name))
+ log.debug('gpo_parse_gplink: opt: {}'.format(link_opt))
+ self.link_names.append(link_name)
+ self.link_opts.append(int(link_opt))
+
+ def num_links(self):
+ if len(self.link_names) != len(self.link_opts):
+ raise RuntimeError('Link names and opts mismatch')
+ return len(self.link_names)
+
+def find_samaccount(samdb, samaccountname):
+ attrs = ['dn', 'userAccountControl']
+ res = samdb.search(samdb.get_default_basedn(), ldb.SCOPE_SUBTREE,
+ '(sAMAccountName={})'.format(samaccountname), attrs)
+ if res.count != 1:
+ raise ldb.LdbError(ldb.ERR_NO_SUCH_OBJECT,
+ "Failed to find samAccountName '{}'".format(samaccountname)
+ )
+ uac = int(res.msgs[0]['userAccountControl'][0])
+ dn = res.msgs[0]['dn']
+ log.info('Found dn {} for samaccountname {}'.format(dn, samaccountname))
+ return uac, dn
+
+def get_gpo_link(samdb, link_dn):
+ res = samdb.search(link_dn, ldb.SCOPE_BASE,
+ '(objectclass=*)', ['gPLink', 'gPOptions'])
+ if res.count != 1:
+ raise ldb.LdbError(ldb.ERR_NO_SUCH_OBJECT, 'get_gpo_link: no result')
+ if not 'gPLink' in res.msgs[0]:
+ raise ldb.LdbError(ldb.ERR_NO_SUCH_ATTRIBUTE,
+ "get_gpo_link: no 'gPLink' attribute found for '{}'".format(link_dn)
+ )
+ gPLink = res.msgs[0]['gPLink'][0]
+ gPOptions = 0
+ if 'gPOptions' in res.msgs[0]:
+ gPOptions = res.msgs[0]['gPOptions'][0]
+ else:
+ log.debug("get_gpo_link: no 'gPOptions' attribute found")
+ return GP_LINK(gPLink, gPOptions)
+
+def add_gplink_to_gpo_list(samdb, gpo_list, forced_gpo_list, link_dn, gp_link,
+ link_type, only_add_forced_gpos, token):
+ for i in range(gp_link.num_links()-1, -1, -1):
+ is_forced = (gp_link.link_opts[i] & GPLINK_OPT_ENFORCE) != 0
+ if gp_link.link_opts[i] & GPLINK_OPT_DISABLE:
+ log.debug('skipping disabled GPO')
+ continue
+
+ if only_add_forced_gpos:
+ if not is_forced:
+ log.debug("skipping nonenforced GPO link "
+ "because GPOPTIONS_BLOCK_INHERITANCE "
+ "has been set")
+ continue
+ else:
+ log.debug("adding enforced GPO link although "
+ "the GPOPTIONS_BLOCK_INHERITANCE "
+ "has been set")
+
+ try:
+ new_gpo = get_gpo(samdb, gp_link.link_names[i])
+ except ldb.LdbError as e:
+ (enum, estr) = e.args
+ log.debug("failed to get gpo: %s" % gp_link.link_names[i])
+ if enum == ldb.ERR_NO_SUCH_OBJECT:
+ log.debug("skipping empty gpo: %s" % gp_link.link_names[i])
+ continue
+ return
+ else:
+ try:
+ sec_desc = ndr_unpack(security.descriptor,
+ new_gpo.get_sec_desc_buf())
+ samba.security.access_check(sec_desc, token,
+ security.SEC_STD_READ_CONTROL |
+ security.SEC_ADS_LIST |
+ security.SEC_ADS_READ_PROP)
+ except Exception as e:
+ log.debug("skipping GPO \"%s\" as object "
+ "has no access to it" % new_gpo.display_name)
+ continue
+
+ new_gpo.link = str(link_dn)
+ new_gpo.link_type = link_type
+
+ if is_forced:
+ forced_gpo_list.insert(0, new_gpo)
+ else:
+ gpo_list.insert(0, new_gpo)
+
+ log.debug("add_gplink_to_gpo_list: added GPLINK #%d %s "
+ "to GPO list" % (i, gp_link.link_names[i]))
+
+def merge_nt_token(token_1, token_2):
+ sids = token_1.sids
+ sids.extend(token_2.sids)
+ token_1.sids = sids
+ token_1.rights_mask |= token_2.rights_mask
+ token_1.privilege_mask |= token_2.privilege_mask
+ return token_1
+
+def site_dn_for_machine(samdb, dc_hostname, lp, creds, hostname):
+ # [MS-GPOL] 3.2.5.1.4 Site Search
+ config_context = samdb.get_config_basedn()
+ c = netlogon.netlogon("ncacn_np:%s[seal]" % dc_hostname, lp, creds)
+ site_name = c.netr_DsRGetSiteName(hostname)
+ return 'CN={},CN=Sites,{}'.format(site_name, config_context)
+
def get_gpo_list(dc_hostname, creds, lp, username):
- gpos = []
- ads = gpo.ADS_STRUCT(dc_hostname, lp, creds)
- if ads.connect():
- # username is DOM\\SAM, but get_gpo_list expects SAM
- gpos = ads.get_gpo_list(username.split('\\')[-1])
- return gpos
+ '''Get the full list of GROUP_POLICY_OBJECTs for a given username.
+ Push GPOs to gpo_list so that the traversal order of the list matches
+ the order of application:
+ (L)ocal (S)ite (D)omain (O)rganizational(U)nit
+ For different domains and OUs: parent-to-child.
+ Within same level of domains and OUs: Link order.
+ Since GPOs are pushed to the front of gpo_list, GPOs have to be
+ pushed in the opposite order of application (OUs first, local last,
+ child-to-parent).
+ Forced GPOs are appended in the end since they override all others.
+ '''
+ gpo_list = []
+ forced_gpo_list = []
+ url = 'ldap://' + dc_hostname
+ samdb = SamDB(url=url,
+ session_info=system_session(),
+ credentials=creds, lp=lp)
+ # username is DOM\\SAM, but get_gpo_list expects SAM
+ uac, dn = find_samaccount(samdb, username.split('\\')[-1])
+ add_only_forced_gpos = False
+
+ # Fetch the security token
+ session_info_flags = (AUTH_SESSION_INFO_DEFAULT_GROUPS |
+ AUTH_SESSION_INFO_AUTHENTICATED)
+ if url.startswith('ldap'):
+ session_info_flags |= AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
+ session = samba.auth.user_session(samdb, lp_ctx=lp, dn=dn,
+ session_info_flags=session_info_flags)
+ gpo_list_machine = False
+ if uac & UF_WORKSTATION_TRUST_ACCOUNT or uac & UF_SERVER_TRUST_ACCOUNT:
+ gpo_list_machine = True
+ token = merge_nt_token(session.security_token,
+ system_session().security_token)
+ else:
+ token = session.security_token
+
+ # (O)rganizational(U)nit
+ parent_dn = dn.parent()
+ while True:
+ if str(parent_dn) == str(samdb.get_default_basedn().parent()):
+ break
+
+ # An account can be a member of more OUs
+ if parent_dn.get_component_name(0) == 'OU':
+ try:
+ log.debug("get_gpo_list: query OU: [%s] for GPOs" % parent_dn)
+ gp_link = get_gpo_link(samdb, parent_dn)
+ except ldb.LdbError as e:
+ (enum, estr) = e.args
+ log.debug(estr)
+ else:
+ add_gplink_to_gpo_list(samdb, gpo_list, forced_gpo_list,
+ parent_dn, gp_link,
+ gpo.GP_LINK_OU,
+ add_only_forced_gpos, token)
+
+ # block inheritance from now on
+ if gp_link.gp_opts & GPO_BLOCK_INHERITANCE:
+ add_only_forced_gpos = True
+
+ parent_dn = parent_dn.parent()
+
+ # (D)omain
+ parent_dn = dn.parent()
+ while True:
+ if str(parent_dn) == str(samdb.get_default_basedn().parent()):
+ break
+
+ # An account can just be a member of one domain
+ if parent_dn.get_component_name(0) == 'DC':
+ try:
+ log.debug("get_gpo_list: query DC: [%s] for GPOs" % parent_dn)
+ gp_link = get_gpo_link(samdb, parent_dn)
+ except ldb.LdbError as e:
+ (enum, estr) = e.args
+ log.debug(estr)
+ else:
+ add_gplink_to_gpo_list(samdb, gpo_list, forced_gpo_list,
+ parent_dn, gp_link,
+ gpo.GP_LINK_DOMAIN,
+ add_only_forced_gpos, token)
+
+ # block inheritance from now on
+ if gp_link.gp_opts & GPO_BLOCK_INHERITANCE:
+ add_only_forced_gpos = True
+
+ parent_dn = parent_dn.parent()
+
+ # (S)ite
+ if gpo_list_machine:
+ site_dn = site_dn_for_machine(samdb, dc_hostname, lp, creds, username)
+
+ try:
+ log.debug("get_gpo_list: query SITE: [%s] for GPOs" % site_dn)
+ gp_link = get_gpo_link(samdb, site_dn)
+ except ldb.LdbError as e:
+ (enum, estr) = e.args
+ log.debug(estr)
+ else:
+ add_gplink_to_gpo_list(samdb, gpo_list, forced_gpo_list,
+ site_dn, gp_link,
+ gpo.GP_LINK_SITE,
+ add_only_forced_gpos, token)
+
+ # (L)ocal
+ gpo_list.insert(0, gpo.GROUP_POLICY_OBJECT("Local Policy",
+ "Local Policy",
+ gpo.GP_LINK_LOCAL))
+
+ # Append |forced_gpo_list| at the end of |gpo_list|,
+ # so that forced GPOs are applied on top of non enforced GPOs.
+ return gpo_list+forced_gpo_list
def cache_gpo_dir(conn, cache, sub_dir):
diff --git a/python/samba/tests/gpo.py b/python/samba/tests/gpo.py
index 8aea59eb61a..580bf7450cb 100644
--- a/python/samba/tests/gpo.py
+++ b/python/samba/tests/gpo.py
@@ -18,7 +18,7 @@ import os, grp, pwd
import errno
from samba import gpo, tests
from samba.gp.gpclass import register_gp_extension, list_gp_extensions, \
- unregister_gp_extension, GPOStorage
+ unregister_gp_extension, GPOStorage, get_gpo_list
from samba.param import LoadParm
from samba.gp.gpclass import check_refresh_gpo_list, check_safe_path, \
check_guid, parse_gpext_conf, atomic_write_conf, get_deleted_gpos_list
@@ -5048,9 +5048,8 @@ class GPOTests(tests.TestCase):
def test_gpo_list(self):
global poldir, dspath
- ads = gpo.ADS_STRUCT(self.server, self.lp, self.creds)
- if ads.connect():
- gpos = ads.get_gpo_list(self.creds.get_username())
+ gpos = get_gpo_list(self.server, self.creds, self.lp,
+ self.creds.get_username())
guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
names = ['Local Policy', guid]
file_sys_paths = [None, '%s\\%s' % (poldir, guid)]
@@ -5088,9 +5087,8 @@ class GPOTests(tests.TestCase):
def test_check_refresh_gpo_list(self):
cache = self.lp.cache_path('gpo_cache')
- ads = gpo.ADS_STRUCT(self.server, self.lp, self.creds)
- if ads.connect():
- gpos = ads.get_gpo_list(self.creds.get_username())
+ gpos = get_gpo_list(self.server, self.creds, self.lp,
+ self.creds.get_username())
check_refresh_gpo_list(self.server, self.lp, self.creds, gpos)
self.assertTrue(os.path.exists(cache),
@@ -5207,9 +5205,8 @@ class GPOTests(tests.TestCase):
days2rel_nttime(998),
'minPwdAge policy not set')
- ads = gpo.ADS_STRUCT(self.server, self.lp, self.creds)
- if ads.connect():
- gpos = ads.get_gpo_list(self.dc_account)
+ gpos = get_gpo_list(self.server, self.creds, self.lp,
+ self.dc_account)
del_gpos = get_deleted_gpos_list(gp_db, gpos[:-1])
self.assertEqual(len(del_gpos), 1, 'Returned delete gpos is incorrect')
self.assertEqual(guids[-1], del_gpos[0][0],
@@ -5243,9 +5240,8 @@ class GPOTests(tests.TestCase):
ext = gp_krb_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Include MaxClockSkew to ensure we don't fail on a key we ignore
stage = '[Kerberos Policy]\nMaxTicketAge = %d\nMaxClockSkew = 5'
@@ -5298,9 +5294,8 @@ class GPOTests(tests.TestCase):
ext = gp_scripts_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
reg_key = b'Software\\Policies\\Samba\\Unix Settings'
sections = { b'%s\\Daily Scripts' % reg_key : '.cron.daily',
@@ -5360,9 +5355,8 @@ class GPOTests(tests.TestCase):
ext = gp_sudoers_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
stage = preg.file()
@@ -5415,9 +5409,8 @@ class GPOTests(tests.TestCase):
ext = vgp_sudoers_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the manifest.xml file with test data
stage = etree.Element('vgppolicy')
@@ -5543,9 +5536,8 @@ class GPOTests(tests.TestCase):
machine_creds.guess(self.lp)
machine_creds.set_machine_account()
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
gp_extensions = []
gp_extensions.append(gp_krb_ext)
@@ -5653,9 +5645,8 @@ class GPOTests(tests.TestCase):
machine_creds.guess(self.lp)
machine_creds.set_machine_account()
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
gp_extensions = []
gp_extensions.append(gp_krb_ext)
@@ -5736,9 +5727,8 @@ class GPOTests(tests.TestCase):
machine_creds.guess(self.lp)
machine_creds.set_machine_account()
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
entries = []
e = preg.entry()
@@ -5825,9 +5815,8 @@ class GPOTests(tests.TestCase):
ext = gp_msgs_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
stage = preg.file()
@@ -5892,9 +5881,8 @@ class GPOTests(tests.TestCase):
ext = vgp_symlink_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
with TemporaryDirectory() as dname:
test_source = os.path.join(dname, 'test.source')
@@ -5973,9 +5961,8 @@ class GPOTests(tests.TestCase):
ext = vgp_files_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the manifest.xml file with test data
with TemporaryDirectory() as dname:
@@ -6061,9 +6048,8 @@ class GPOTests(tests.TestCase):
ext = vgp_openssh_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the manifest.xml file with test data
stage = etree.Element('vgppolicy')
@@ -6135,9 +6121,8 @@ class GPOTests(tests.TestCase):
ext = vgp_startup_scripts_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the manifest.xml file with test data
stage = etree.Element('vgppolicy')
@@ -6310,9 +6295,8 @@ class GPOTests(tests.TestCase):
ext = vgp_motd_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the manifest.xml file with test data
stage = etree.Element('vgppolicy')
@@ -6363,9 +6347,8 @@ class GPOTests(tests.TestCase):
ext = vgp_issue_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the manifest.xml file with test data
stage = etree.Element('vgppolicy')
@@ -6421,9 +6404,8 @@ class GPOTests(tests.TestCase):
ext = vgp_access_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the manifest.xml allow file
stage = etree.Element('vgppolicy')
@@ -6556,9 +6538,8 @@ class GPOTests(tests.TestCase):
ext = gp_gnome_settings_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
parser = GPPolParser()
@@ -6782,9 +6763,8 @@ class GPOTests(tests.TestCase):
ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
parser = GPPolParser()
@@ -6884,9 +6864,8 @@ class GPOTests(tests.TestCase):
ext = gp_user_scripts_ext(self.lp, machine_creds,
os.environ.get('DC_USERNAME'), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
reg_key = b'Software\\Policies\\Samba\\Unix Settings'
sections = { b'%s\\Daily Scripts' % reg_key : b'@daily',
@@ -6948,9 +6927,8 @@ class GPOTests(tests.TestCase):
ext = gp_firefox_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
parser = GPPolParser()
@@ -7009,9 +6987,8 @@ class GPOTests(tests.TestCase):
ext = gp_chromium_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
parser = GPPolParser()
@@ -7121,9 +7098,8 @@ class GPOTests(tests.TestCase):
ext = gp_firewalld_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
parser = GPPolParser()
@@ -7196,9 +7172,8 @@ class GPOTests(tests.TestCase):
ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
admin_creds = Credentials()
admin_creds.set_username(os.environ.get('DC_USERNAME'))
@@ -7311,9 +7286,8 @@ class GPOTests(tests.TestCase):
ext = gp_centrify_sudoers_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
stage = preg.file()
@@ -7381,9 +7355,8 @@ class GPOTests(tests.TestCase):
ext = gp_centrify_crontab_ext(self.lp, machine_creds,
machine_creds.get_username(), store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
stage = preg.file()
@@ -7438,9 +7411,8 @@ class GPOTests(tests.TestCase):
os.environ.get('DC_USERNAME'),
store)
- ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
- if ads.connect():
- gpos = ads.get_gpo_list(machine_creds.get_username())
+ gpos = get_gpo_list(self.server, machine_creds, self.lp,
+ machine_creds.get_username())
# Stage the Registry.pol file with test data
stage = preg.file()