summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/Acl.cpp14
-rwxr-xr-xqpid/cpp/src/tests/acl.py180
2 files changed, 191 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/Acl.cpp b/qpid/cpp/src/tests/Acl.cpp
index 75a52c8ca1..9c3de0de62 100644
--- a/qpid/cpp/src/tests/Acl.cpp
+++ b/qpid/cpp/src/tests/Acl.cpp
@@ -45,6 +45,13 @@ QPID_AUTO_TEST_CASE(TestLexerObjectEnums) {
OBJ_ENUMS(OBJ_METHOD, "method");
OBJ_ENUMS(OBJ_QUERY, "query");
OBJ_ENUMS(OBJ_CONNECTION, "connection");
+ int maxLen = 0;
+ for (int i=0; i<acl::OBJECTSIZE; i++) {
+ int thisLen = AclHelper::getObjectTypeStr( ObjectType(i) ).length();
+ if (thisLen > maxLen)
+ maxLen = thisLen;
+ }
+ BOOST_CHECK_EQUAL(maxLen, acl::OBJECTTYPE_STR_WIDTH);
}
#define ACT_ENUMS(e, s) \
@@ -65,6 +72,13 @@ QPID_AUTO_TEST_CASE(TestLexerActionEnums) {
ACT_ENUMS(ACT_MOVE, "move");
ACT_ENUMS(ACT_REDIRECT, "redirect");
ACT_ENUMS(ACT_REROUTE, "reroute");
+ int maxLen = 0;
+ for (int i=0; i<acl::ACTIONSIZE; i++) {
+ int thisLen = AclHelper::getActionStr( Action(i) ).length();
+ if (thisLen > maxLen)
+ maxLen = thisLen;
+ }
+ BOOST_CHECK_EQUAL(maxLen, acl::ACTION_STR_WIDTH);
}
#define PROP_ENUMS(e, s) \
diff --git a/qpid/cpp/src/tests/acl.py b/qpid/cpp/src/tests/acl.py
index 5f5d1e01fe..75aa39295a 100755
--- a/qpid/cpp/src/tests/acl.py
+++ b/qpid/cpp/src/tests/acl.py
@@ -46,10 +46,10 @@ class ACLTests(TestBase010):
parms = {'username':user, 'password':passwd, 'sasl_mechanisms':'PLAIN'}
brokerurl="%s:%s" %(self.broker.host, self.broker.port)
connection = qpid.messaging.Connection(brokerurl, **parms)
- connection.open()
+ connection.open()
return connection
- # For connection limit tests this function
+ # For connection limit tests this function
# throws if the connection won't start
# returns a connection that the caller can close if he likes.
def get_connection(self, user, passwd):
@@ -2115,6 +2115,7 @@ class ACLTests(TestBase010):
aclf.write('acl allow all access method\n')
# this should let bob access the timestamp configuration
aclf.write('acl allow bob@QPID access broker\n')
+ aclf.write('acl allow bob@QPID update broker\n')
aclf.write('acl allow admin@QPID all all\n')
aclf.write('acl deny all all')
aclf.close()
@@ -2239,7 +2240,7 @@ class ACLTests(TestBase010):
self.LookupPublish(u, "company.topic", "private.audit.This", "allow-log")
for u in uInTest:
- for a in action_all:
+ for a in ['bind', 'unbind', 'access', 'publish']:
self.Lookup(u, a, "exchange", "company.topic", {"routingkey":"private.audit.This"}, "allow-log")
for u in uOutTest:
@@ -3709,6 +3710,179 @@ class ACLTests(TestBase010):
self.assertEqual(403,e.args[0].error_code)
self.fail("ACL should allow exchange delete request for edae3h");
+ #=====================================
+ # 'create connection' tests
+ #=====================================
+# def test_connect_mode_file_rejects_two_defaults(self):
+# """
+# Should reject a file with two connect mode statements
+# """
+# aclf = self.get_acl_file()
+# aclf.write('acl allow all create connection host=all\n')
+# aclf.write('acl allow all create connection host=all\n')
+# aclf.close()
+#
+# result = self.reload_acl()
+# if (result):
+# pass
+# else:
+# self.fail(result)
+
+ def test_connect_mode_accepts_host_spec_formats(self):
+ """
+ Should accept host specs of various forms
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow bob@QPID create connection host=all\n')
+ aclf.write('acl allow bob@QPID create connection host=1.1.1.1\n')
+ aclf.write('acl allow bob@QPID create connection host=1.1.1.1,2.2.2.2\n')
+ aclf.write('acl allow bob@QPID create connection host=localhost\n')
+ aclf.write('acl allow all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ def test_connect_mode_allow_all_mode(self):
+ """
+ Should allow one 'all', 'all'
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all create connection host=all\n')
+ aclf.write('acl allow all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+
+ def test_connect_mode_allow_all_localhost(self):
+ """
+ Should allow 'all' 'localhost'
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all create connection host=localhost\n')
+ aclf.write('acl deny all create connection host=all\n')
+ aclf.write('acl allow all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+
+ def test_connect_mode_global_deny(self):
+ """
+ Should allow 'all' 'localhost'
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all create connection host=localhost\n')
+ aclf.write('acl deny all create connection host=all\n')
+ aclf.write('acl allow all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"127.0.0.1"}, "allow")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"127.0.0.2"}, "deny")
+
+
+ def test_connect_mode_global_range(self):
+ """
+ Should allow 'all' 'localhost'
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow all create connection host=10.0.0.0,10.255.255.255\n')
+ aclf.write('acl allow all create connection host=localhost\n')
+ aclf.write('acl deny all create connection host=all\n')
+ aclf.write('acl allow all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"0.0.0.0"}, "deny")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"9.255.255.255"}, "deny")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.0.0"}, "allow")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.255.255.255"}, "allow")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"11.0.0.0"}, "deny")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"255.255.255.255"},"deny")
+
+
+ def test_connect_mode_nested_ranges(self):
+ """
+ Tests nested ranges for single user
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl deny-log bob@QPID create connection host=10.0.1.0,10.0.1.255\n')
+ aclf.write('acl allow-log bob@QPID create connection host=10.0.0.0,10.255.255.255\n')
+ aclf.write('acl deny-log bob@QPID create connection host=all\n')
+ aclf.write('acl allow all create connection host=localhost\n')
+ aclf.write('acl allow all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"0.0.0.0"}, "deny-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"9.255.255.255"}, "deny-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.0.0"}, "allow-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.0.255"}, "allow-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.1.0"}, "deny-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.1.255"}, "deny-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.2.0"}, "allow-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.255.255.255"}, "allow-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"11.0.0.0"}, "deny-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"255.255.255.255"},"deny-log")
+
+
+ def test_connect_mode_user_ranges(self):
+ """
+ Two user ranges should not interfere with each other
+ """
+ aclf = self.get_acl_file()
+ aclf.write('acl allow-log bob@QPID create connection host=10.0.0.0,10.255.255.255\n')
+ aclf.write('acl deny-log bob@QPID create connection host=all\n')
+ aclf.write('acl allow-log cat@QPID create connection host=192.168.0.0,192.168.255.255\n')
+ aclf.write('acl deny-log cat@QPID create connection host=all\n')
+ aclf.write('acl allow all create connection host=localhost\n')
+ aclf.write('acl allow all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('bob','bob')
+
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"0.0.0.0"}, "deny-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"9.255.255.255"}, "deny-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.0.0.0"}, "allow-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"10.255.255.255"}, "allow-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"11.0.0.0"}, "deny-log")
+ self.Lookup("bob@QPID", "create", "connection", "", {"host":"255.255.255.255"},"deny-log")
+ self.Lookup("cat@QPID", "create", "connection", "", {"host":"0.0.0.0"}, "deny-log")
+ self.Lookup("cat@QPID", "create", "connection", "", {"host":"192.167.255.255"},"deny-log")
+ self.Lookup("cat@QPID", "create", "connection", "", {"host":"192.168.0.0"}, "allow-log")
+ self.Lookup("cat@QPID", "create", "connection", "", {"host":"192.168.255.255"},"allow-log")
+ self.Lookup("cat@QPID", "create", "connection", "", {"host":"192.169.0.0"}, "deny-log")
+ self.Lookup("cat@QPID", "create", "connection", "", {"host":"255.255.255.255"},"deny-log")
class BrokerAdmin: