diff options
| author | Charles E. Rolke <chug@apache.org> | 2013-02-09 00:56:42 +0000 |
|---|---|---|
| committer | Charles E. Rolke <chug@apache.org> | 2013-02-09 00:56:42 +0000 |
| commit | ea58b84cd08114053c009ddca9b3057a895d1b7d (patch) | |
| tree | 1425bbf938bed59b7798e2efe4c295c6be1a1fb2 /qpid/cpp/src/tests/acl.py | |
| parent | b111ea9e3690b34b47289a8f78cbaa0428f45442 (diff) | |
| download | qpid-python-ea58b84cd08114053c009ddca9b3057a895d1b7d.tar.gz | |
QPID-4054 C++ Broker connection limits per user
1. Constrain maximum limits to be a few ticks below Uint16_t max to avoid inadvertent wrapping and to allow room for some named constants such as UNLIMITED.
2. Add syntax to Acl rule file
quota connections N user|group [user|group]
3. Pseudo user 'all' receives value from command line switch or from Acl rule file.
4. Named constant strings used in comparisons instead of local strings.
5. Connection counts maintained all the time to support reolad of Acl rule file that may change limits.
6. Self tests exercise all the features.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1444302 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/acl.py')
| -rwxr-xr-x | qpid/cpp/src/tests/acl.py | 222 |
1 files changed, 215 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py index 1020a2eff6..48723bfde9 100755 --- a/qpid/cpp/src/tests/acl.py +++ b/qpid/cpp/src/tests/acl.py @@ -2065,36 +2065,242 @@ class ACLTests(TestBase010): # Connection limits #===================================== - def test_connection_limits(self): + def test_connection_limits_cli_sets_all(self): + + try: + sessiona1 = self.get_session_by_port('alice','alice', self.port_u()) + sessiona2 = self.get_session_by_port('alice','alice', self.port_u()) + except Exception, e: + self.fail("Could not create two connections for user alice: " + str(e)) + + # Third session should fail + try: + sessiona3 = self.get_session_by_port('alice','alice', self.port_u()) + self.fail("Should not be able to create third connection for user alice") + except Exception, e: + result = None + + + + def test_connection_limits_by_named_user(self): """ Test ACL control connection limits """ + aclf = self.get_acl_file() + aclf.write('quota connections 2 alice bob\n') + aclf.write('quota connections 0 evildude\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + # By username should be able to connect twice per user try: - sessiona1 = self.get_session_by_port('alice','alice', self.port_u()) - sessiona2 = self.get_session_by_port('alice','alice', self.port_u()) + sessiona1 = self.get_session('alice','alice') + sessiona2 = self.get_session('alice','alice') except Exception, e: self.fail("Could not create two connections for user alice: " + str(e)) # Third session should fail try: - sessiona3 = self.get_session_by_port('alice','alice', self.port_u()) + sessiona3 = self.get_session('alice','alice') + self.fail("Should not be able to create third connection for user alice") + except Exception, e: + result = None + + # Disconnecting should allow another session. + sessiona1.close() + try: + sessiona3 = self.get_session('alice','alice') + except Exception, e: + self.fail("Could not recreate second connection for user alice: " + str(e)) + + # By username should be able to connect twice per user + try: + sessionb1 = self.get_session('bob','bob') + sessionb2 = self.get_session('bob','bob') + except Exception, e: + self.fail("Could not create two connections for user bob: " + str(e)) + + # Third session should fail + try: + sessionb3 = self.get_session('bob','bob') + self.fail("Should not be able to create third connection for user bob") + except Exception, e: + result = None + + + # User with quota of 0 is denied + try: + sessione1 = self.get_session('evildude','evildude') + self.fail("Should not be able to create a connection for user evildude") + except Exception, e: + result = None + + + # User not named in quotas is denied + try: + sessionc1 = self.get_session('charlie','charlie') + self.fail("Should not be able to create a connection for user charlie") + except Exception, e: + result = None + + # Clean up the sessions + sessiona2.close() + sessiona3.close() + sessionb1.close() + sessionb2.close() + + + + def test_connection_limits_by_unnamed_all(self): + """ + Test ACL control connection limits + """ + aclf = self.get_acl_file() + aclf.write('quota connections 2 alice bob\n') + aclf.write('quota connections 1 all\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + # By username should be able to connect twice per user + try: + sessiona1 = self.get_session('alice','alice') + sessiona2 = self.get_session('alice','alice') + except Exception, e: + self.fail("Could not create two connections for user alice: " + str(e)) + + # Third session should fail + try: + sessiona3 = self.get_session('alice','alice') self.fail("Should not be able to create third connection for user alice") except Exception, e: result = None + # By username should be able to connect twice per user try: - sessionb1 = self.get_session_by_port('bob','bob', self.port_u()) - sessionb2 = self.get_session_by_port('bob','bob', self.port_u()) + sessionb1 = self.get_session('bob','bob') + sessionb2 = self.get_session('bob','bob') except Exception, e: self.fail("Could not create two connections for user bob: " + str(e)) + # Third session should fail try: - sessionb3 = self.get_session_by_port('bob','bob', self.port_u()) + sessionb3 = self.get_session('bob','bob') self.fail("Should not be able to create third connection for user bob") except Exception, e: result = None + # User not named in quotas gets 'all' quota + try: + sessionc1 = self.get_session('charlie','charlie') + except Exception, e: + self.fail("Could not create one connection for user charlie: " + str(e)) + + # Next session should fail + try: + sessionc2 = self.get_session('charlie','charlie') + self.fail("Should not be able to create second connection for user charlie") + except Exception, e: + result = None + + # Clean up the sessions + sessiona1.close() + sessiona2.close() + sessionb1.close() + sessionb2.close() + sessionc1.close() + + + def test_connection_limits_by_group(self): + """ + Test ACL control connection limits + """ + aclf = self.get_acl_file() + aclf.write('group stooges moe@QPID larry@QPID curly@QPID\n') + aclf.write('quota connections 2 alice bob\n') + aclf.write('quota connections 2 stooges charlie\n') + aclf.write('# user and groups may be overwritten. Should use last value\n') + aclf.write('quota connections 3 bob stooges\n') + aclf.write('acl allow all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + # Alice gets 2 + try: + sessiona1 = self.get_session('alice','alice') + sessiona2 = self.get_session('alice','alice') + except Exception, e: + self.fail("Could not create two connections for user alice: " + str(e)) + + # Third session should fail + try: + sessiona3 = self.get_session('alice','alice') + self.fail("Should not be able to create third connection for user alice") + except Exception, e: + result = None + + # Bob gets 3 + try: + sessionb1 = self.get_session('bob','bob') + sessionb2 = self.get_session('bob','bob') + sessionb3 = self.get_session('bob','bob') + except Exception, e: + self.fail("Could not create three connections for user bob: " + str(e)) + + # Fourth session should fail + try: + sessionb4 = self.get_session('bob','bob') + self.fail("Should not be able to create fourth connection for user bob") + except Exception, e: + result = None + + # Moe gets 3 + try: + sessionm1 = self.get_session('moe','moe') + sessionm2 = self.get_session('moe','moe') + sessionm3 = self.get_session('moe','moe') + except Exception, e: + self.fail("Could not create three connections for user moe: " + str(e)) + + # Fourth session should fail + try: + sessionb4 = self.get_session('moe','moe') + self.fail("Should not be able to create fourth connection for user ,pe") + except Exception, e: + result = None + + # User not named in quotas is denied + try: + sessions1 = self.get_session('shemp','shemp') + self.fail("Should not be able to create a connection for user shemp") + except Exception, e: + result = None + + # Clean up the sessions + sessiona1.close() + sessiona2.close() + sessionb1.close() + sessionb2.close() + sessionb3.close() + sessionm1.close() + sessionm2.close() + sessionm3.close() + + + def test_connection_limits_by_ip_address(self): + """ + Test ACL control connection limits by ip address + """ # By IP address should be able to connect twice per client address try: sessionb1 = self.get_session_by_port('alice','alice', self.port_i()) @@ -2109,6 +2315,8 @@ class ACLTests(TestBase010): except Exception, e: result = None + sessionb1.close() + sessionb2.close() #===================================== # User name substitution |
