diff options
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/Acl.cpp | 14 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/acl.py | 180 |
2 files changed, 191 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/Acl.cpp b/qpid/cpp/src/tests/Acl.cpp index 75a52c8ca1..9c3de0de62 100644 --- a/qpid/cpp/src/tests/Acl.cpp +++ b/qpid/cpp/src/tests/Acl.cpp @@ -45,6 +45,13 @@ QPID_AUTO_TEST_CASE(TestLexerObjectEnums) { OBJ_ENUMS(OBJ_METHOD, "method"); OBJ_ENUMS(OBJ_QUERY, "query"); OBJ_ENUMS(OBJ_CONNECTION, "connection"); + int maxLen = 0; + for (int i=0; i<acl::OBJECTSIZE; i++) { + int thisLen = AclHelper::getObjectTypeStr( ObjectType(i) ).length(); + if (thisLen > maxLen) + maxLen = thisLen; + } + BOOST_CHECK_EQUAL(maxLen, acl::OBJECTTYPE_STR_WIDTH); } #define ACT_ENUMS(e, s) \ @@ -65,6 +72,13 @@ QPID_AUTO_TEST_CASE(TestLexerActionEnums) { ACT_ENUMS(ACT_MOVE, "move"); ACT_ENUMS(ACT_REDIRECT, "redirect"); ACT_ENUMS(ACT_REROUTE, "reroute"); + int maxLen = 0; + for (int i=0; i<acl::ACTIONSIZE; i++) { + int thisLen = AclHelper::getActionStr( Action(i) ).length(); + if (thisLen > maxLen) + maxLen = thisLen; + } + BOOST_CHECK_EQUAL(maxLen, acl::ACTION_STR_WIDTH); } #define PROP_ENUMS(e, s) \ diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py index 5f5d1e01fe..75aa39295a 100755 --- a/qpid/cpp/src/tests/acl.py +++ b/qpid/cpp/src/tests/acl.py @@ -46,10 +46,10 @@ class ACLTests(TestBase010): parms = {'username':user, 'password':passwd, 'sasl_mechanisms':'PLAIN'} brokerurl="%s:%s" %(self.broker.host, self.broker.port) connection = qpid.messaging.Connection(brokerurl, **parms) - connection.open() + connection.open() return connection - # For connection limit tests this function + # For connection limit tests this function # throws if the connection won't start # returns a connection that the caller can close if he likes. def get_connection(self, user, passwd): @@ -2115,6 +2115,7 @@ class ACLTests(TestBase010): aclf.write('acl allow all access method\n') # this should let bob access the timestamp configuration aclf.write('acl allow bob@QPID access broker\n') + aclf.write('acl allow bob@QPID update broker\n') aclf.write('acl allow admin@QPID all all\n') aclf.write('acl deny all all') aclf.close() @@ -2239,7 +2240,7 @@ class ACLTests(TestBase010): self.LookupPublish(u, "company.topic", "private.audit.This", "allow-log") for u in uInTest: - for a in action_all: + for a in ['bind', 'unbind', 'access', 'publish']: self.Lookup(u, a, "exchange", "company.topic", {"routingkey":"private.audit.This"}, "allow-log") for u in uOutTest: @@ -3709,6 +3710,179 @@ class ACLTests(TestBase010): self.assertEqual(403,e.args[0].error_code) self.fail("ACL should allow exchange delete request for edae3h"); + #===================================== + # 'create connection' tests + #===================================== +# def test_connect_mode_file_rejects_two_defaults(self): +# """ +# Should reject a file with two connect mode statements +# """ +# aclf = self.get_acl_file() +# aclf.write('acl allow all create connection host=all\n') +# aclf.write('acl allow all create connection host=all\n') +# aclf.close() +# +# result = self.reload_acl() +# if (result): +# pass +# else: +# self.fail(result) + + def test_connect_mode_accepts_host_spec_formats(self): + """ + Should accept host specs of various forms + """ + aclf = self.get_acl_file() + aclf.write('acl allow bob@QPID create connection host=all\n') + aclf.write('acl allow bob@QPID create connection host=1.1.1.1\n') + aclf.write('acl allow bob@QPID create connection host=1.1.1.1,2.2.2.2\n') + aclf.write('acl allow bob@QPID create connection host=localhost\n') + aclf.write('acl allow all all\n') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + def test_connect_mode_allow_all_mode(self): + """ + Should allow one 'all', 'all' + """ + aclf = self.get_acl_file() + aclf.write('acl allow all create connection host=all\n') + aclf.write('acl allow all all\n') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('bob','bob') + + + def test_connect_mode_allow_all_localhost(self): + """ + Should allow 'all' 'localhost' + """ + aclf = self.get_acl_file() + aclf.write('acl allow all create connection host=localhost\n') + aclf.write('acl deny all create connection host=all\n') + aclf.write('acl allow all all\n') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('bob','bob') + + + def test_connect_mode_global_deny(self): + """ + Should allow 'all' 'localhost' + """ + aclf = self.get_acl_file() + aclf.write('acl allow all create connection host=localhost\n') + aclf.write('acl deny all create connection host=all\n') + aclf.write('acl allow all all\n') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('bob','bob') + + self.Lookup("bob@QPID", "create", "connection", "", {"host":"127.0.0.1"}, "allow") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"127.0.0.2"}, "deny") + + + def test_connect_mode_global_range(self): + """ + Should allow 'all' 'localhost' + """ + aclf = self.get_acl_file() + aclf.write('acl allow all create connection host=10.0.0.0,10.255.255.255\n') + aclf.write('acl allow all create connection host=localhost\n') + aclf.write('acl deny all create connection host=all\n') + aclf.write('acl allow all all\n') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('bob','bob') + + self.Lookup("bob@QPID", "create", "connection", "", {"host":"0.0.0.0"}, "deny") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"9.255.255.255"}, "deny") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.0.0"}, "allow") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.255.255.255"}, "allow") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"11.0.0.0"}, "deny") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"255.255.255.255"},"deny") + + + def test_connect_mode_nested_ranges(self): + """ + Tests nested ranges for single user + """ + aclf = self.get_acl_file() + aclf.write('acl deny-log bob@QPID create connection host=10.0.1.0,10.0.1.255\n') + aclf.write('acl allow-log bob@QPID create connection host=10.0.0.0,10.255.255.255\n') + aclf.write('acl deny-log bob@QPID create connection host=all\n') + aclf.write('acl allow all create connection host=localhost\n') + aclf.write('acl allow all all\n') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('bob','bob') + + self.Lookup("bob@QPID", "create", "connection", "", {"host":"0.0.0.0"}, "deny-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"9.255.255.255"}, "deny-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.0.0"}, "allow-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.0.255"}, "allow-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.1.0"}, "deny-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.1.255"}, "deny-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.2.0"}, "allow-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.255.255.255"}, "allow-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"11.0.0.0"}, "deny-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"255.255.255.255"},"deny-log") + + + def test_connect_mode_user_ranges(self): + """ + Two user ranges should not interfere with each other + """ + aclf = self.get_acl_file() + aclf.write('acl allow-log bob@QPID create connection host=10.0.0.0,10.255.255.255\n') + aclf.write('acl deny-log bob@QPID create connection host=all\n') + aclf.write('acl allow-log cat@QPID create connection host=192.168.0.0,192.168.255.255\n') + aclf.write('acl deny-log cat@QPID create connection host=all\n') + aclf.write('acl allow all create connection host=localhost\n') + aclf.write('acl allow all all\n') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('bob','bob') + + self.Lookup("bob@QPID", "create", "connection", "", {"host":"0.0.0.0"}, "deny-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"9.255.255.255"}, "deny-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.0.0"}, "allow-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.255.255.255"}, "allow-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"11.0.0.0"}, "deny-log") + self.Lookup("bob@QPID", "create", "connection", "", {"host":"255.255.255.255"},"deny-log") + self.Lookup("cat@QPID", "create", "connection", "", {"host":"0.0.0.0"}, "deny-log") + self.Lookup("cat@QPID", "create", "connection", "", {"host":"192.167.255.255"},"deny-log") + self.Lookup("cat@QPID", "create", "connection", "", {"host":"192.168.0.0"}, "allow-log") + self.Lookup("cat@QPID", "create", "connection", "", {"host":"192.168.255.255"},"allow-log") + self.Lookup("cat@QPID", "create", "connection", "", {"host":"192.169.0.0"}, "deny-log") + self.Lookup("cat@QPID", "create", "connection", "", {"host":"255.255.255.255"},"deny-log") class BrokerAdmin: |
