summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/acl.py')
-rwxr-xr-xqpid/cpp/src/tests/acl.py222
1 files changed, 215 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index 1020a2eff6..48723bfde9 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -2065,36 +2065,242 @@ class ACLTests(TestBase010):
# Connection limits
#=====================================
- def test_connection_limits(self):
+ def test_connection_limits_cli_sets_all(self):
+
+ try:
+ sessiona1 = self.get_session_by_port('alice','alice', self.port_u())
+ sessiona2 = self.get_session_by_port('alice','alice', self.port_u())
+ except Exception, e:
+ self.fail("Could not create two connections for user alice: " + str(e))
+
+ # Third session should fail
+ try:
+ sessiona3 = self.get_session_by_port('alice','alice', self.port_u())
+ self.fail("Should not be able to create third connection for user alice")
+ except Exception, e:
+ result = None
+
+
+
+ def test_connection_limits_by_named_user(self):
"""
Test ACL control connection limits
"""
+ aclf = self.get_acl_file()
+ aclf.write('quota connections 2 alice bob\n')
+ aclf.write('quota connections 0 evildude\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
# By username should be able to connect twice per user
try:
- sessiona1 = self.get_session_by_port('alice','alice', self.port_u())
- sessiona2 = self.get_session_by_port('alice','alice', self.port_u())
+ sessiona1 = self.get_session('alice','alice')
+ sessiona2 = self.get_session('alice','alice')
except Exception, e:
self.fail("Could not create two connections for user alice: " + str(e))
# Third session should fail
try:
- sessiona3 = self.get_session_by_port('alice','alice', self.port_u())
+ sessiona3 = self.get_session('alice','alice')
+ self.fail("Should not be able to create third connection for user alice")
+ except Exception, e:
+ result = None
+
+ # Disconnecting should allow another session.
+ sessiona1.close()
+ try:
+ sessiona3 = self.get_session('alice','alice')
+ except Exception, e:
+ self.fail("Could not recreate second connection for user alice: " + str(e))
+
+ # By username should be able to connect twice per user
+ try:
+ sessionb1 = self.get_session('bob','bob')
+ sessionb2 = self.get_session('bob','bob')
+ except Exception, e:
+ self.fail("Could not create two connections for user bob: " + str(e))
+
+ # Third session should fail
+ try:
+ sessionb3 = self.get_session('bob','bob')
+ self.fail("Should not be able to create third connection for user bob")
+ except Exception, e:
+ result = None
+
+
+ # User with quota of 0 is denied
+ try:
+ sessione1 = self.get_session('evildude','evildude')
+ self.fail("Should not be able to create a connection for user evildude")
+ except Exception, e:
+ result = None
+
+
+ # User not named in quotas is denied
+ try:
+ sessionc1 = self.get_session('charlie','charlie')
+ self.fail("Should not be able to create a connection for user charlie")
+ except Exception, e:
+ result = None
+
+ # Clean up the sessions
+ sessiona2.close()
+ sessiona3.close()
+ sessionb1.close()
+ sessionb2.close()
+
+
+
+ def test_connection_limits_by_unnamed_all(self):
+ """
+ Test ACL control connection limits
+ """
+ aclf = self.get_acl_file()
+ aclf.write('quota connections 2 alice bob\n')
+ aclf.write('quota connections 1 all\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ # By username should be able to connect twice per user
+ try:
+ sessiona1 = self.get_session('alice','alice')
+ sessiona2 = self.get_session('alice','alice')
+ except Exception, e:
+ self.fail("Could not create two connections for user alice: " + str(e))
+
+ # Third session should fail
+ try:
+ sessiona3 = self.get_session('alice','alice')
self.fail("Should not be able to create third connection for user alice")
except Exception, e:
result = None
+ # By username should be able to connect twice per user
try:
- sessionb1 = self.get_session_by_port('bob','bob', self.port_u())
- sessionb2 = self.get_session_by_port('bob','bob', self.port_u())
+ sessionb1 = self.get_session('bob','bob')
+ sessionb2 = self.get_session('bob','bob')
except Exception, e:
self.fail("Could not create two connections for user bob: " + str(e))
+ # Third session should fail
try:
- sessionb3 = self.get_session_by_port('bob','bob', self.port_u())
+ sessionb3 = self.get_session('bob','bob')
self.fail("Should not be able to create third connection for user bob")
except Exception, e:
result = None
+ # User not named in quotas gets 'all' quota
+ try:
+ sessionc1 = self.get_session('charlie','charlie')
+ except Exception, e:
+ self.fail("Could not create one connection for user charlie: " + str(e))
+
+ # Next session should fail
+ try:
+ sessionc2 = self.get_session('charlie','charlie')
+ self.fail("Should not be able to create second connection for user charlie")
+ except Exception, e:
+ result = None
+
+ # Clean up the sessions
+ sessiona1.close()
+ sessiona2.close()
+ sessionb1.close()
+ sessionb2.close()
+ sessionc1.close()
+
+
+ def test_connection_limits_by_group(self):
+ """
+ Test ACL control connection limits
+ """
+ aclf = self.get_acl_file()
+ aclf.write('group stooges moe@QPID larry@QPID curly@QPID\n')
+ aclf.write('quota connections 2 alice bob\n')
+ aclf.write('quota connections 2 stooges charlie\n')
+ aclf.write('# user and groups may be overwritten. Should use last value\n')
+ aclf.write('quota connections 3 bob stooges\n')
+ aclf.write('acl allow all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ # Alice gets 2
+ try:
+ sessiona1 = self.get_session('alice','alice')
+ sessiona2 = self.get_session('alice','alice')
+ except Exception, e:
+ self.fail("Could not create two connections for user alice: " + str(e))
+
+ # Third session should fail
+ try:
+ sessiona3 = self.get_session('alice','alice')
+ self.fail("Should not be able to create third connection for user alice")
+ except Exception, e:
+ result = None
+
+ # Bob gets 3
+ try:
+ sessionb1 = self.get_session('bob','bob')
+ sessionb2 = self.get_session('bob','bob')
+ sessionb3 = self.get_session('bob','bob')
+ except Exception, e:
+ self.fail("Could not create three connections for user bob: " + str(e))
+
+ # Fourth session should fail
+ try:
+ sessionb4 = self.get_session('bob','bob')
+ self.fail("Should not be able to create fourth connection for user bob")
+ except Exception, e:
+ result = None
+
+ # Moe gets 3
+ try:
+ sessionm1 = self.get_session('moe','moe')
+ sessionm2 = self.get_session('moe','moe')
+ sessionm3 = self.get_session('moe','moe')
+ except Exception, e:
+ self.fail("Could not create three connections for user moe: " + str(e))
+
+ # Fourth session should fail
+ try:
+ sessionb4 = self.get_session('moe','moe')
+ self.fail("Should not be able to create fourth connection for user ,pe")
+ except Exception, e:
+ result = None
+
+ # User not named in quotas is denied
+ try:
+ sessions1 = self.get_session('shemp','shemp')
+ self.fail("Should not be able to create a connection for user shemp")
+ except Exception, e:
+ result = None
+
+ # Clean up the sessions
+ sessiona1.close()
+ sessiona2.close()
+ sessionb1.close()
+ sessionb2.close()
+ sessionb3.close()
+ sessionm1.close()
+ sessionm2.close()
+ sessionm3.close()
+
+
+ def test_connection_limits_by_ip_address(self):
+ """
+ Test ACL control connection limits by ip address
+ """
# By IP address should be able to connect twice per client address
try:
sessionb1 = self.get_session_by_port('alice','alice', self.port_i())
@@ -2109,6 +2315,8 @@ class ACLTests(TestBase010):
except Exception, e:
result = None
+ sessionb1.close()
+ sessionb2.close()
#=====================================
# User name substitution