diff options
Diffstat (limited to 'qpid/cpp/src/tests/acl.py')
| -rwxr-xr-x | qpid/cpp/src/tests/acl.py | 222 |
1 files changed, 215 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py index 1020a2eff6..48723bfde9 100755 --- a/qpid/cpp/src/tests/acl.py +++ b/qpid/cpp/src/tests/acl.py @@ -2065,36 +2065,242 @@ class ACLTests(TestBase010): # Connection limits #===================================== - def test_connection_limits(self): + def test_connection_limits_cli_sets_all(self): + + try: + sessiona1 = self.get_session_by_port('alice','alice', self.port_u()) + sessiona2 = self.get_session_by_port('alice','alice', self.port_u()) + except Exception, e: + self.fail("Could not create two connections for user alice: " + str(e)) + + # Third session should fail + try: + sessiona3 = self.get_session_by_port('alice','alice', self.port_u()) + self.fail("Should not be able to create third connection for user alice") + except Exception, e: + result = None + + + + def test_connection_limits_by_named_user(self): """ Test ACL control connection limits """ + aclf = self.get_acl_file() + aclf.write('quota connections 2 alice bob\n') + aclf.write('quota connections 0 evildude\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + # By username should be able to connect twice per user try: - sessiona1 = self.get_session_by_port('alice','alice', self.port_u()) - sessiona2 = self.get_session_by_port('alice','alice', self.port_u()) + sessiona1 = self.get_session('alice','alice') + sessiona2 = self.get_session('alice','alice') except Exception, e: self.fail("Could not create two connections for user alice: " + str(e)) # Third session should fail try: - sessiona3 = self.get_session_by_port('alice','alice', self.port_u()) + sessiona3 = self.get_session('alice','alice') + self.fail("Should not be able to create third connection for user alice") + except Exception, e: + result = None + + # Disconnecting should allow another session. + sessiona1.close() + try: + sessiona3 = self.get_session('alice','alice') + except Exception, e: + self.fail("Could not recreate second connection for user alice: " + str(e)) + + # By username should be able to connect twice per user + try: + sessionb1 = self.get_session('bob','bob') + sessionb2 = self.get_session('bob','bob') + except Exception, e: + self.fail("Could not create two connections for user bob: " + str(e)) + + # Third session should fail + try: + sessionb3 = self.get_session('bob','bob') + self.fail("Should not be able to create third connection for user bob") + except Exception, e: + result = None + + + # User with quota of 0 is denied + try: + sessione1 = self.get_session('evildude','evildude') + self.fail("Should not be able to create a connection for user evildude") + except Exception, e: + result = None + + + # User not named in quotas is denied + try: + sessionc1 = self.get_session('charlie','charlie') + self.fail("Should not be able to create a connection for user charlie") + except Exception, e: + result = None + + # Clean up the sessions + sessiona2.close() + sessiona3.close() + sessionb1.close() + sessionb2.close() + + + + def test_connection_limits_by_unnamed_all(self): + """ + Test ACL control connection limits + """ + aclf = self.get_acl_file() + aclf.write('quota connections 2 alice bob\n') + aclf.write('quota connections 1 all\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + # By username should be able to connect twice per user + try: + sessiona1 = self.get_session('alice','alice') + sessiona2 = self.get_session('alice','alice') + except Exception, e: + self.fail("Could not create two connections for user alice: " + str(e)) + + # Third session should fail + try: + sessiona3 = self.get_session('alice','alice') self.fail("Should not be able to create third connection for user alice") except Exception, e: result = None + # By username should be able to connect twice per user try: - sessionb1 = self.get_session_by_port('bob','bob', self.port_u()) - sessionb2 = self.get_session_by_port('bob','bob', self.port_u()) + sessionb1 = self.get_session('bob','bob') + sessionb2 = self.get_session('bob','bob') except Exception, e: self.fail("Could not create two connections for user bob: " + str(e)) + # Third session should fail try: - sessionb3 = self.get_session_by_port('bob','bob', self.port_u()) + sessionb3 = self.get_session('bob','bob') self.fail("Should not be able to create third connection for user bob") except Exception, e: result = None + # User not named in quotas gets 'all' quota + try: + sessionc1 = self.get_session('charlie','charlie') + except Exception, e: + self.fail("Could not create one connection for user charlie: " + str(e)) + + # Next session should fail + try: + sessionc2 = self.get_session('charlie','charlie') + self.fail("Should not be able to create second connection for user charlie") + except Exception, e: + result = None + + # Clean up the sessions + sessiona1.close() + sessiona2.close() + sessionb1.close() + sessionb2.close() + sessionc1.close() + + + def test_connection_limits_by_group(self): + """ + Test ACL control connection limits + """ + aclf = self.get_acl_file() + aclf.write('group stooges moe@QPID larry@QPID curly@QPID\n') + aclf.write('quota connections 2 alice bob\n') + aclf.write('quota connections 2 stooges charlie\n') + aclf.write('# user and groups may be overwritten. Should use last value\n') + aclf.write('quota connections 3 bob stooges\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + # Alice gets 2 + try: + sessiona1 = self.get_session('alice','alice') + sessiona2 = self.get_session('alice','alice') + except Exception, e: + self.fail("Could not create two connections for user alice: " + str(e)) + + # Third session should fail + try: + sessiona3 = self.get_session('alice','alice') + self.fail("Should not be able to create third connection for user alice") + except Exception, e: + result = None + + # Bob gets 3 + try: + sessionb1 = self.get_session('bob','bob') + sessionb2 = self.get_session('bob','bob') + sessionb3 = self.get_session('bob','bob') + except Exception, e: + self.fail("Could not create three connections for user bob: " + str(e)) + + # Fourth session should fail + try: + sessionb4 = self.get_session('bob','bob') + self.fail("Should not be able to create fourth connection for user bob") + except Exception, e: + result = None + + # Moe gets 3 + try: + sessionm1 = self.get_session('moe','moe') + sessionm2 = self.get_session('moe','moe') + sessionm3 = self.get_session('moe','moe') + except Exception, e: + self.fail("Could not create three connections for user moe: " + str(e)) + + # Fourth session should fail + try: + sessionb4 = self.get_session('moe','moe') + self.fail("Should not be able to create fourth connection for user ,pe") + except Exception, e: + result = None + + # User not named in quotas is denied + try: + sessions1 = self.get_session('shemp','shemp') + self.fail("Should not be able to create a connection for user shemp") + except Exception, e: + result = None + + # Clean up the sessions + sessiona1.close() + sessiona2.close() + sessionb1.close() + sessionb2.close() + sessionb3.close() + sessionm1.close() + sessionm2.close() + sessionm3.close() + + + def test_connection_limits_by_ip_address(self): + """ + Test ACL control connection limits by ip address + """ # By IP address should be able to connect twice per client address try: sessionb1 = self.get_session_by_port('alice','alice', self.port_i()) @@ -2109,6 +2315,8 @@ class ACLTests(TestBase010): except Exception, e: result = None + sessionb1.close() + sessionb2.close() #===================================== # User name substitution |
