summaryrefslogtreecommitdiff
path: root/source4
diff options
context:
space:
mode:
authorTim Beale <timbeale@catalyst.net.nz>2018-07-27 14:34:16 +1200
committerAndrew Bartlett <abartlet@samba.org>2018-08-17 02:58:28 +0200
commit68f8a1c2747fd51a633b34dc4301b1f6acae5de6 (patch)
treef72534cc3e3dc0e75693a7ac79283f175f9d9836 /source4
parent7065f5299f04f4a7f766f97cf90cb3c913491005 (diff)
downloadsamba-68f8a1c2747fd51a633b34dc4301b1f6acae5de6.tar.gz
Fix PEP8 warning E501 line too long
Mostly involves splitting up long strings or comments so that they span multiple lines. Some place-holder variables have been added in a few places to avoid exceeding 80 chars. Signed-off-by: Tim Beale <timbeale@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Diffstat (limited to 'source4')
-rw-r--r--source4/dsdb/tests/python/password_settings.py53
-rw-r--r--source4/torture/drs/python/getncchanges.py137
-rw-r--r--source4/torture/drs/python/link_conflicts.py133
3 files changed, 201 insertions, 122 deletions
diff --git a/source4/dsdb/tests/python/password_settings.py b/source4/dsdb/tests/python/password_settings.py
index f0dde562ea2..88d7c3d38c1 100644
--- a/source4/dsdb/tests/python/password_settings.py
+++ b/source4/dsdb/tests/python/password_settings.py
@@ -26,7 +26,8 @@
# Usage:
# export SERVER_IP=target_dc
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
-# PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN password_settings -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
+# PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN \
+# password_settings -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
#
import samba.tests
@@ -37,6 +38,7 @@ import time
from samba.tests.password_test import PasswordTestCase
from samba.tests.pso import TestUser
from samba.tests.pso import PasswordSettings
+from samba.tests import env_get_var_value
from samba.credentials import Credentials
from samba import gensec
import base64
@@ -46,7 +48,7 @@ class PasswordSettingsTestCase(PasswordTestCase):
def setUp(self):
super(PasswordSettingsTestCase, self).setUp()
- self.host_url = "ldap://%s" % samba.tests.env_get_var_value("SERVER_IP")
+ self.host_url = "ldap://%s" % env_get_var_value("SERVER_IP")
self.ldb = samba.tests.connect_samdb(self.host_url)
# create a temp OU to put this test's users into
@@ -65,7 +67,7 @@ class PasswordSettingsTestCase(PasswordTestCase):
# remove all objects under the top-level OU
self.ldb.delete(self.ou, ["tree_delete:1"])
- # PSOs can't reside within an OU so they need to be cleaned up separately
+ # PSOs can't reside within an OU so they get cleaned up separately
for obj in self.test_objs:
self.ldb.delete(obj)
@@ -79,7 +81,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
self.ldb.add({"dn": dn, "objectclass": "group"})
return dn
- def set_attribute(self, dn, attr, value, operation=FLAG_MOD_ADD, samdb=None):
+ def set_attribute(self, dn, attr, value, operation=FLAG_MOD_ADD,
+ samdb=None):
"""Modifies an attribute for an object"""
if samdb is None:
samdb = self.ldb
@@ -114,7 +117,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
except ldb.LdbError as e:
(num, msg) = e.args
# fail the test (rather than throw an error)
- self.fail("Password '%s' unexpectedly rejected: %s" % (password, msg))
+ self.fail("Password '%s' unexpectedly rejected: %s" % (password,
+ msg))
def assert_PSO_applied(self, user, pso):
"""
@@ -191,7 +195,7 @@ class PasswordSettingsTestCase(PasswordTestCase):
self.assert_password_valid(user, password)
def test_pso_basics(self):
- """Simple tests that a PSO takes effect when applied to a group or user"""
+ """Simple tests that a PSO takes effect when applied to a group/user"""
# create some PSOs that vary in priority and basic password-len
best_pso = PasswordSettings("highest-priority-PSO", self.ldb,
@@ -237,7 +241,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
self.assert_PSO_applied(user, best_pso)
# delete a group membership and check the PSO changes
- self.set_attribute(group3, "member", user.dn, operation=FLAG_MOD_DELETE)
+ self.set_attribute(group3, "member", user.dn,
+ operation=FLAG_MOD_DELETE)
self.assert_PSO_applied(user, medium_pso)
# apply the low-precedence PSO directly to the user
@@ -311,7 +316,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
self.assert_PSO_applied(user, group2_pso)
def get_guid(self, dn):
- res = self.ldb.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE)
+ res = self.ldb.search(base=dn, attrs=["objectGUID"],
+ scope=ldb.SCOPE_BASE)
return res[0]['objectGUID'][0]
def guid_string(self, guid):
@@ -333,7 +339,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
best_guid = guid_list[0]
# sanity-check the mapping between GUID and DN is correct
- self.assertEqual(self.guid_string(self.get_guid(mapping[best_guid].dn)),
+ best_pso_dn = mapping[best_guid].dn
+ self.assertEqual(self.guid_string(self.get_guid(best_pso_dn)),
self.guid_string(best_guid))
# return the PSO that this GUID corresponds to
@@ -457,7 +464,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
pso.apply_to(user.dn)
self.assertTrue(user.get_resultant_PSO() == pso.dn)
- # changing the password immediately should fail, even if password is valid
+ # changing the password immediately should fail, even if the password
+ # is valid
valid_password = "min-age-passwd"
self.assert_password_invalid(user, valid_password)
# then trying the same password later should succeed
@@ -481,8 +489,9 @@ class PasswordSettingsTestCase(PasswordTestCase):
user = self.add_user("testuser")
- # we can't wait around long enough for the max-age to expire, so instead
- # just check the msDS-UserPasswordExpiryTimeComputed for the user
+ # we can't wait around long enough for the max-age to expire, so
+ # instead just check the msDS-UserPasswordExpiryTimeComputed for
+ # the user
attrs = ['msDS-UserPasswordExpiryTimeComputed']
res = self.ldb.search(user.dn, attrs=attrs)
domain_expiry = int(res[0]['msDS-UserPasswordExpiryTimeComputed'][0])
@@ -519,9 +528,10 @@ class PasswordSettingsTestCase(PasswordTestCase):
precedence=2, password_len=10)
self.add_obj_cleanup([default_pso.dn, guest_pso.dn, admin_pso.dn,
builtin_pso.dn])
- domain_users = "CN=Domain Users,CN=Users,%s" % self.ldb.domain_dn()
- domain_guests = "CN=Domain Guests,CN=Users,%s" % self.ldb.domain_dn()
- admin_users = "CN=Domain Admins,CN=Users,%s" % self.ldb.domain_dn()
+ base_dn = self.ldb.domain_dn()
+ domain_users = "CN=Domain Users,CN=Users,%s" % base_dn
+ domain_guests = "CN=Domain Guests,CN=Users,%s" % base_dn
+ admin_users = "CN=Domain Admins,CN=Users,%s" % base_dn
# if we apply a PSO to Domain Users (which all users are a member of)
# then that PSO should take effect on a new user
@@ -532,8 +542,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
# Apply a PSO to a builtin group. 'Domain Users' should be a member of
# Builtin/Users, but builtin groups should be excluded from the PSO
# calculation, so this should have no effect
- builtin_pso.apply_to("CN=Users,CN=Builtin,%s" % self.ldb.domain_dn())
- builtin_pso.apply_to("CN=Administrators,CN=Builtin,%s" % self.ldb.domain_dn())
+ builtin_pso.apply_to("CN=Users,CN=Builtin,%s" % base_dn)
+ builtin_pso.apply_to("CN=Administrators,CN=Builtin,%s" % base_dn)
self.assert_PSO_applied(user, default_pso)
# change the user's primary group to another group (the primaryGroupID
@@ -798,7 +808,9 @@ unicodePwd:: %s
def set_domain_pwdHistoryLength(self, value):
m = ldb.Message()
m.dn = ldb.Dn(self.ldb, self.ldb.domain_dn())
- m["pwdHistoryLength"] = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, "pwdHistoryLength")
+ m["pwdHistoryLength"] = ldb.MessageElement(value,
+ ldb.FLAG_MOD_REPLACE,
+ "pwdHistoryLength")
self.ldb.modify(m)
def test_domain_pwd_history(self):
@@ -852,8 +864,3 @@ unicodePwd:: %s
# *next* time the user's password changes.
self.set_domain_pwdHistoryLength("1")
self.assert_password_invalid(user, "NewPwd12#")
-
-
-
-
-
diff --git a/source4/torture/drs/python/getncchanges.py b/source4/torture/drs/python/getncchanges.py
index 086dcd19d4d..097866d81fc 100644
--- a/source4/torture/drs/python/getncchanges.py
+++ b/source4/torture/drs/python/getncchanges.py
@@ -24,7 +24,8 @@
# export DC1=dc1_dns_name
# export DC2=dc2_dns_name
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
-# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
+# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
+# getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
#
from __future__ import print_function
@@ -101,7 +102,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def start_new_repl_cycle(self):
"""Resets enough state info to start a new replication cycle"""
# reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
- # whether a parent/target is unknown and needs GET_ANC/GET_TGT to resolve
+ # whether a parent/target is unknown and needs GET_ANC/GET_TGT to
+ # resolve
self.rxd_links = []
self.used_get_tgt = False
@@ -159,13 +161,16 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# Note that with GET_ANC Windows can end up sending the same parent
# object multiple times, so this might be noteworthy but doesn't
# warrant failing the test
- if (len(received_list) != len(expected_list)):
- print("Note: received %d objects but expected %d" % (len(received_list),
- len(expected_list)))
+ num_received = len(received_list)
+ num_expected = len(expected_list)
+ if num_received != num_expected:
+ print("Note: received %d objects but expected %d" % (num_received,
+ num_expected))
# Check that we received every object that we were expecting
for dn in expected_list:
- self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn)
+ self.assertTrue(dn in received_list,
+ "DN '%s' missing from replication." % dn)
def test_repl_integrity(self):
"""
@@ -176,8 +181,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# The server behaviour differs between samba and Windows. Samba returns
# the objects in the original order (up to the pre-modify HWM). Windows
# incorporates the modified objects and returns them in the new order
- # (i.e. modified objects last), up to the post-modify HWM. The Microsoft
- # docs state the Windows behaviour is optional.
+ # (i.e. modified objects last), up to the post-modify HWM. The
+ # Microsoft docs state the Windows behaviour is optional.
# Create a range of objects to replicate.
expected_dn_list = self.create_object_range(0, 400)
@@ -188,11 +193,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# so long as by the end we've received everything
self.repl_get_next()
- # Modify some of the second page of objects. This should bump the highwatermark
+ # Modify some of the second page of objects. This should bump the
+ # highwatermark
for x in range(100, 200):
self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
- (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
+ (post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
# Get the remaining blocks of data
@@ -223,7 +229,9 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
return parent_dn == self.ou or parent_dn in known_dn_list
def _repl_send_request(self, get_anc=False, get_tgt=False):
- """Sends a GetNCChanges request for the next block of replication data."""
+ """
+ Sends a GetNCChanges request for the next block of replication data.
+ """
# we're just trying to mimic regular client behaviour here, so just
# use the highwatermark in the last response we received
@@ -240,7 +248,7 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
more_flags = 0
if get_anc:
- replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP | drsuapi.DRSUAPI_DRS_GET_ANC
+ replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC
self.used_get_anc = True
if get_tgt:
@@ -252,6 +260,7 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
max_objects=self.max_objects,
highwatermark=highwatermark,
uptodateness_vector=uptodateness_vector,
+
more_flags=more_flags)
def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
@@ -298,11 +307,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
assert_links=assert_links)
- # check we know about references to any objects in the linked attritbutes
+ # check we know about references to any objects in the linked attrs
received_links = self._get_ctr6_links(ctr6)
- # This is so that older versions of Samba fail - we want the links to be
- # sent roughly with the objects, rather than getting all links at the end
+ # This is so that older versions of Samba fail - we want the links to
+ # be sent roughly with the objects, rather than getting all links at
+ # the end
if assert_links:
self.assertTrue(len(received_links) > 0,
"Links were expected in the GetNCChanges response")
@@ -363,8 +373,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def test_repl_integrity_get_anc(self):
"""
- Modify the parent objects being replicated while the replication is still
- in progress (using GET_ANC) and check that no object loss occurs.
+ Modify the parent objects being replicated while the replication is
+ still in progress (using GET_ANC) and check that no object loss occurs.
"""
# Note that GET_ANC behaviour varies between Windows and Samba.
@@ -453,13 +463,15 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# Look up the link attribute in the DB
# The extended_dn option will dump the GUID info for the link
# attribute (as a hex blob)
- res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), attrs=[link_attr],
- controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE)
+ res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn),
+ attrs=[link_attr],
+ controls=['extended_dn:1:0'],
+ scope=ldb.SCOPE_BASE)
# We didn't find the expected link attribute in the DB for the object.
# Something has gone wrong somewhere...
- self.assertTrue(link_attr in res[0], "%s in DB doesn't have attribute %s"
- % (dn, link_attr))
+ self.assertTrue(link_attr in res[0],
+ "%s in DB doesn't have attribute %s" % (dn, link_attr))
# find the received link in the list and assert that the target and
# source GUIDs match what's in the DB
@@ -481,7 +493,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
break
- self.assertTrue(found, "Did not receive expected link for DN %s" % dn)
+ self.assertTrue(found,
+ "Did not receive expected link for DN %s" % dn)
def test_repl_get_tgt(self):
"""
@@ -512,7 +525,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# with GET_TGT
while not self.replication_complete():
- # get the next block of replication data (this sets GET_TGT if needed)
+ # get the next block of replication data (this sets GET_TGT
+ # if needed)
self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links)
@@ -550,10 +564,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
self.modify_object(objectsA[i], "managedBy", objectsB[i])
for i in range(0, 100):
- self.modify_object(objectsB[i], "managedBy", objectsC[(i + 1) % 100])
+ self.modify_object(objectsB[i], "managedBy",
+ objectsC[(i + 1) % 100])
for i in range(0, 100):
- self.modify_object(objectsC[i], "managedBy", objectsB[(i + 1) % 100])
+ self.modify_object(objectsC[i], "managedBy",
+ objectsB[(i + 1) % 100])
all_objects = objectsA + objectsB + objectsC
expected_links = all_objects
@@ -567,7 +583,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# with GET_TGT
while not self.replication_complete():
- # get the next block of replication data (this sets GET_TGT if needed)
+ # get the next block of replication data (this sets GET_TGT
+ # if needed)
self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links)
@@ -602,8 +619,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
self.repl_get_next(get_tgt=True)
# create the target objects and add the links. These objects should be
- # outside the scope of the Samba replication cycle, but the links should
- # still get sent with the source object
+ # outside the scope of the Samba replication cycle, but the links
+ # should still get sent with the source object
managers = self.create_object_range(0, 100, prefix="manager")
for i in range(0, 100):
@@ -643,7 +660,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# Add links from the parents to the children
for x in range(0, 100):
- self.modify_object(parent_dn_list[x], "managedBy", expected_dn_list[x + 100])
+ self.modify_object(parent_dn_list[x], "managedBy",
+ expected_dn_list[x + 100])
# add some filler objects at the end. This allows us to easily see
# which chunk the links get sent in
@@ -706,7 +724,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# with GET_TGT and GET_ANC
while not self.replication_complete():
- # get the next block of replication data (this sets GET_TGT/GET_ANC)
+ # get the next block of replication data (this sets
+ # GET_TGT/GET_ANC)
self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links)
@@ -842,23 +861,31 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def restore_deleted_object(self, guid, new_dn):
"""Re-animates a deleted object"""
- res = self.test_ldb_dc.search(base="<GUID=%s>" % self._GUID_string(guid), attrs=["isDeleted"],
- controls=['show_deleted:1'], scope=ldb.SCOPE_BASE)
+ guid_str = self._GUID_string(guid)
+ res = self.test_ldb_dc.search(base="<GUID=%s>" % guid_str,
+ attrs=["isDeleted"],
+ controls=['show_deleted:1'],
+ scope=ldb.SCOPE_BASE)
if len(res) != 1:
return
msg = ldb.Message()
msg.dn = res[0].dn
- msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
- msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
+ msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE,
+ "isDeleted")
+ msg["distinguishedName"] = ldb.MessageElement([new_dn],
+ ldb.FLAG_MOD_REPLACE,
+ "distinguishedName")
self.test_ldb_dc.modify(msg, ["show_deleted:1"])
def sync_DCs(self, nc_dn=None):
# make sure DC1 has all the changes we've made to DC2
- self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, nc_dn=nc_dn)
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2,
+ nc_dn=nc_dn)
def get_object_guid(self, dn):
- res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE)
+ res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"],
+ scope=ldb.SCOPE_BASE)
return res[0]['objectGUID'][0]
def set_dc_connection(self, conn):
@@ -921,7 +948,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
la_sources = self.create_object_range(0, 100, prefix="la_src")
la_targets = self.create_object_range(0, 100, prefix="la_tgt")
- # store the target object's GUIDs (we need to know these to reanimate them)
+ # store the target object's GUIDs (we need to know these to
+ # reanimate them)
target_guids = []
for dn in la_targets:
@@ -984,15 +1012,16 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
self.add_object(la_source)
# create the link target (a server object) in the config NC
+ sites_dn = "CN=Sites,%s" % self.config_dn
+ servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn
rand = random.randint(1, 10000000)
- la_target = "CN=getncchanges-%d,CN=Servers,CN=Default-First-Site-Name," \
- "CN=Sites,%s" % (rand, self.config_dn)
+ la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn)
self.add_object(la_target, objectclass="server")
# add a cross-partition link between the two
self.modify_object(la_source, "managedBy", la_target)
- # First, sync across to the peer the NC containing the link source object
+ # First, sync to the peer the NC containing the link source object
self.sync_DCs()
# Now, before the peer has received the partition containing the target
@@ -1002,8 +1031,9 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
while not self.replication_complete():
# pretend we've received other link targets out of order and that's
- # forced us to use GET_TGT. This checks the peer doesn't fail trying
- # to fetch a cross-partition target object that doesn't exist
+ # forced us to use GET_TGT. This checks the peer doesn't fail
+ # trying to fetch a cross-partition target object that doesn't
+ # exist
self.repl_get_next(get_tgt=True)
self.set_dc_connection(self.default_conn)
@@ -1029,14 +1059,14 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
attrs=["managedBy"],
controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE)
- self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
- % la_source)
+ self.assertFalse("managedBy" in res[0],
+ "%s in DB still has managedBy attribute" % la_source)
res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
attrs=["managedBy"],
controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE)
- self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
- % la_source)
+ self.assertFalse("managedBy" in res[0],
+ "%s in DB still has managedBy attribute" % la_source)
# Check receiving a cross-partition link to a deleted target.
# Delete the target and make sure the deletion is sync'd between DCs
@@ -1056,8 +1086,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
attrs=["managedBy"],
controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE)
- self.assertTrue("managedBy" in res[0], "%s in DB missing managedBy attribute"
- % la_source)
+ self.assertTrue("managedBy" in res[0],
+ "%s in DB missing managedBy attribute" % la_source)
# cleanup the server object we created in the Configuration partition
self.test_ldb_dc.delete(la_target)
@@ -1094,7 +1124,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# check we received all the expected objects/links
self.assert_expected_data(expected_objects)
- self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
+ self.assert_expected_links([la_source], link_attr="addressBookRoots2",
+ num_expected=500)
# Do the replication again, forcing the use of GET_TGT this time
self.init_test_state()
@@ -1113,7 +1144,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# check we received all the expected objects/links
self.assert_expected_data(expected_objects)
- self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
+ self.assert_expected_links([la_source], link_attr="addressBookRoots2",
+ num_expected=500)
class DcConnection:
@@ -1122,6 +1154,5 @@ class DcConnection:
def __init__(self, drs_base, ldb_dc, dnsname_dc):
self.ldb_dc = ldb_dc
(self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
- (self.default_hwm, self.default_utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
-
-
+ (self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
+ self.default_utdv = utdv
diff --git a/source4/torture/drs/python/link_conflicts.py b/source4/torture/drs/python/link_conflicts.py
index 6e74a56316d..4d6f05f41c1 100644
--- a/source4/torture/drs/python/link_conflicts.py
+++ b/source4/torture/drs/python/link_conflicts.py
@@ -25,7 +25,8 @@
# export DC1=dc1_dns_name
# export DC2=dc2_dns_name
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
-# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN link_conflicts -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
+# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
+# link_conflicts -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
#
import drs_base
@@ -37,6 +38,7 @@ import time
from drs_base import AbstractLink
from samba.dcerpc import drsuapi, misc
+from samba.dcerpc.drsuapi import DRSUAPI_EXOP_ERR_SUCCESS
# specifies the order to sync DCs in
DC1_TO_DC2 = 1
@@ -47,7 +49,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
super(DrsReplicaLinkConflictTestCase, self).setUp()
- self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_link_conflict")
+ self.ou = samba.tests.create_test_ou(self.ldb_dc1,
+ "test_link_conflict")
self.base_dn = self.ldb_dc1.get_default_basedn()
(self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
@@ -97,12 +100,16 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
"""Manually syncs the 2 DCs to ensure they're in sync"""
if sync_order == DC1_TO_DC2:
# sync DC1-->DC2, then DC2-->DC1
- self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1)
- self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2)
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1)
+ self._net_drs_replicate(DC=self.dnsname_dc1,
+ fromDC=self.dnsname_dc2)
else:
# sync DC2-->DC1, then DC1-->DC2
- self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2)
- self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1)
+ self._net_drs_replicate(DC=self.dnsname_dc1,
+ fromDC=self.dnsname_dc2)
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1)
def ensure_unique_timestamp(self):
"""Waits a second to ensure a unique timestamp between 2 objects"""
@@ -123,12 +130,14 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
"""
actual_len = len(res1[0][attr])
self.assertTrue(actual_len == expected_count,
- "Expected %u %s attributes, but got %u" % (expected_count,
- attr, actual_len))
+ "Expected %u %s attributes, got %u" % (expected_count,
+ attr,
+ actual_len))
actual_len = len(res2[0][attr])
self.assertTrue(actual_len == expected_count,
- "Expected %u %s attributes, but got %u" % (expected_count,
- attr, actual_len))
+ "Expected %u %s attributes, got %u" % (expected_count,
+ attr,
+ actual_len))
# check DCs both agree on the same linked attributes
for val in res1[0][attr]:
@@ -214,7 +223,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self._check_replicated_links(src_ou, [link1, link2])
def test_conflict_single_valued_link(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
self._test_conflict_single_valued_link(sync_order=DC1_TO_DC2)
self._test_conflict_single_valued_link(sync_order=DC2_TO_DC1)
@@ -248,7 +258,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self.assert_attrs_match(res1, res2, "managedBy", 1)
def test_duplicate_single_valued_link(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
self._test_duplicate_single_valued_link(sync_order=DC1_TO_DC2)
self._test_duplicate_single_valued_link(sync_order=DC2_TO_DC1)
@@ -267,9 +278,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# create the same user (link target) on each DC.
# Note that the GUIDs will differ between the DCs
target_dn = self.unique_dn("CN=target")
- target1_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user")
+ target1_guid = self.add_object(self.ldb_dc1, target_dn,
+ objectclass="user")
self.ensure_unique_timestamp()
- target2_guid = self.add_object(self.ldb_dc2, target_dn, objectclass="user")
+ target2_guid = self.add_object(self.ldb_dc2, target_dn,
+ objectclass="user")
# link the src group to the respective target created
self.add_link_attr(self.ldb_dc1, src_dn, "member", target_dn)
@@ -298,7 +311,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
"Expected link to conflicting target object not found")
def test_conflict_multi_valued_link(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
self._test_conflict_multi_valued_link(sync_order=DC1_TO_DC2)
self._test_conflict_multi_valued_link(sync_order=DC2_TO_DC1)
@@ -331,7 +345,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self.assert_attrs_match(res1, res2, "member", 1)
def test_duplicate_multi_valued_link(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
self._test_duplicate_multi_valued_link(sync_order=DC1_TO_DC2)
self._test_duplicate_multi_valued_link(sync_order=DC2_TO_DC1)
@@ -343,7 +358,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# create a common link target
target_dn = self.unique_dn("CN=target")
- target_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user")
+ target_guid = self.add_object(self.ldb_dc1, target_dn,
+ objectclass="user")
self.sync_DCs()
# create the same group (link source) on each DC.
@@ -367,7 +383,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
scope=SCOPE_BASE, attrs=["memberOf"])
src1_backlink = False
- # our test user should still be a member of 2 groups (check both DCs agree)
+ # our test user should still be a member of 2 groups (check both
+ # DCs agree)
self.assert_attrs_match(res1, res2, "memberOf", 2)
for val in res1[0]["memberOf"]:
@@ -377,10 +394,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
src1_backlink = True
self.assertTrue(src1_backlink,
- "Expected backlink to conflicting source object not found")
+ "Backlink to conflicting source object not found")
def test_conflict_backlinks(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
self._test_conflict_backlinks(sync_order=DC1_TO_DC2)
self._test_conflict_backlinks(sync_order=DC2_TO_DC1)
@@ -424,12 +442,15 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
res2 = self.ldb_dc2.search(base="<GUID=%s>" % src_guid,
scope=SCOPE_BASE, attrs=["member"])
- # our test user should still be a member of the group (check both DCs agree)
- self.assertTrue("member" in res1[0], "Expected member attribute missing")
+ # our test user should still be a member of the group (check both
+ # DCs agree)
+ self.assertTrue("member" in res1[0],
+ "Expected member attribute missing")
self.assert_attrs_match(res1, res2, "member", 1)
def test_link_deletion_conflict(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
self._test_link_deletion_conflict(sync_order=DC1_TO_DC2)
self._test_link_deletion_conflict(sync_order=DC2_TO_DC1)
@@ -440,7 +461,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
"""
target_dn = self.unique_dn("CN=target")
- target_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user")
+ target_guid = self.add_object(self.ldb_dc1, target_dn,
+ objectclass="user")
src_dn = self.unique_dn("CN=src")
src_guid = self.add_object(self.ldb_dc1, src_dn, objectclass="group")
@@ -463,9 +485,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# the object deletion should trump the link addition.
# Check the link no longer exists on the remaining object
res1 = self.ldb_dc1.search(base="<GUID=%s>" % search_guid,
- scope=SCOPE_BASE, attrs=["member", "memberOf"])
+ scope=SCOPE_BASE,
+ attrs=["member", "memberOf"])
res2 = self.ldb_dc2.search(base="<GUID=%s>" % search_guid,
- scope=SCOPE_BASE, attrs=["member", "memberOf"])
+ scope=SCOPE_BASE,
+ attrs=["member", "memberOf"])
self.assertFalse("member" in res1[0], "member attr shouldn't exist")
self.assertFalse("member" in res2[0], "member attr shouldn't exist")
@@ -473,13 +497,18 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self.assertFalse("memberOf" in res2[0], "member attr shouldn't exist")
def test_obj_deletion_conflict(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
- self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2, del_target=True)
- self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1, del_target=True)
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
+ self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2,
+ del_target=True)
+ self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1,
+ del_target=True)
# and also try deleting the source object instead of the link target
- self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2, del_target=False)
- self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1, del_target=False)
+ self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2,
+ del_target=False)
+ self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1,
+ del_target=False)
def _test_full_sync_link_conflict(self, sync_order):
"""
@@ -501,11 +530,19 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# Do a couple of full syncs which should resolve the conflict
# (but only for one DC)
if sync_order == DC1_TO_DC2:
- self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, full_sync=True)
- self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, full_sync=True)
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ full_sync=True)
+ self._net_drs_replicate(DC=self.dnsname_dc2,
+ fromDC=self.dnsname_dc1,
+ full_sync=True)
else:
- self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, full_sync=True)
- self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, full_sync=True)
+ self._net_drs_replicate(DC=self.dnsname_dc1,
+ fromDC=self.dnsname_dc2,
+ full_sync=True)
+ self._net_drs_replicate(DC=self.dnsname_dc1,
+ fromDC=self.dnsname_dc2,
+ full_sync=True)
# delete and re-add the link on one DC
self.del_link_attr(self.ldb_dc1, src_dn, "member", target_dn)
@@ -525,11 +562,13 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
scope=SCOPE_BASE, attrs=["member"])
# check the membership still exits (and both DCs agree)
- self.assertTrue("member" in res1[0], "Expected member attribute missing")
+ self.assertTrue("member" in res1[0],
+ "Expected member attribute missing")
self.assert_attrs_match(res1, res2, "member", 1)
def test_full_sync_link_conflict(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
self._test_full_sync_link_conflict(sync_order=DC1_TO_DC2)
self._test_full_sync_link_conflict(sync_order=DC2_TO_DC1)
@@ -586,7 +625,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
def _test_conflict_single_valued_link_deleted_loser(self, sync_order):
"""
- Tests a single-valued link conflict, where the losing link value is deleted.
+ Tests a single-valued link conflict, where the losing link value is
+ deleted.
"""
src_ou = self.unique_dn("OU=src")
src_guid = self.add_object(self.ldb_dc1, src_ou)
@@ -599,9 +639,9 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
target1_guid = self.add_object(self.ldb_dc1, target1_ou)
target2_guid = self.add_object(self.ldb_dc2, target2_ou)
- # add the links - we want the link to end up deleted on DC2, but active on
- # DC1. DC1 has the better version and DC2 has the better timestamp - the
- # better version should win
+ # add the links - we want the link to end up deleted on DC2, but active
+ # on DC1. DC1 has the better version and DC2 has the better timestamp -
+ # the better version should win
self.add_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou)
self.del_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou)
self.add_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou)
@@ -689,7 +729,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self._check_replicated_links(src_ou, [link1, link2])
def test_conflict_existing_single_valued_link(self):
- # repeat the test twice, to give each DC a chance to resolve the conflict
+ # repeat the test twice, to give each DC a chance to resolve
+ # the conflict
self._test_conflict_existing_single_valued_link(sync_order=DC1_TO_DC2)
self._test_conflict_existing_single_valued_link(sync_order=DC2_TO_DC1)
@@ -707,7 +748,7 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# get the link info via replication
ctr6 = self._get_replication(drsuapi.DRSUAPI_DRS_WRIT_REP,
dest_dsa=None,
- drs_error=drsuapi.DRSUAPI_EXOP_ERR_SUCCESS,
+ drs_error=DRSUAPI_EXOP_ERR_SUCCESS,
exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
highwatermark=self.zero_highwatermark(),
nc_dn_str=src_ou)
@@ -715,6 +756,6 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self.assertTrue(ctr6.linked_attributes_count == 1,
"DRS didn't return a link")
link = ctr6.linked_attributes[0]
- self.assertTrue(link.meta_data.version == 1,
- "Link version started from %u, not 1" % link.meta_data.version)
-
+ rcvd_version = link.meta_data.version
+ self.assertTrue(rcvd_version == 1,
+ "Link version started from %u, not 1" % rcvd_version)