diff options
| author | Alan Conway <aconway@apache.org> | 2011-12-06 15:56:40 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-12-06 15:56:40 +0000 |
| commit | ae0f67263950f41ce6078a9fde79be78d47f4a11 (patch) | |
| tree | c2f1105dc677a6739d3faca8e2bb860e12209329 /qpid/cpp/src/tests | |
| parent | 03d03c025427c234fedcfae3126f0092afa0e1e7 (diff) | |
| download | qpid-python-ae0f67263950f41ce6078a9fde79be78d47f4a11.tar.gz | |
QPID-3652: Fix cluster authentication.
Only allow brokers that authenticate as the cluster-username to join a cluster.
New broker first connects to a cluster broker authenticates as the cluster-username
and sends its CPG member ID to the qpid.cluster-credentials exchange.
The cluster broker that subsequently acts as updater verifies that the credentials are
valid before connecting to give the update.
NOTE 1: If you are using an ACL, the cluster-username must be allowed to
publish to the qpid.cluster-credentials exchange. E.g. in your ACL file:
acl allow foo@QPID publish exchange name=qpid.cluster-credentials
NOTE 2: This changes the cluster initialization protocol, you will
need to restart the cluster with all new version brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1210989 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/InitialStatusMap.cpp | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/cluster_authentication_soak.cpp | 2 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/cluster_tests.py | 50 |
4 files changed, 52 insertions, 26 deletions
diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp index ecbe2d4161..95806737e3 100644 --- a/qpid/cpp/src/tests/InitialStatusMap.cpp +++ b/qpid/cpp/src/tests/InitialStatusMap.cpp @@ -36,21 +36,25 @@ QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite) typedef InitialStatusMap::Status Status; -Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) { +Status activeStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet(), + const framing::Array& urls=framing::Array()) +{ return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid(), - encodeMemberSet(ms)); + encodeMemberSet(ms), urls); } -Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet()) { +Status newcomerStatus(const Uuid& id=Uuid(), const MemberSet& ms=MemberSet(), + const framing::Array& urls=framing::Array()) +{ return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid(), - encodeMemberSet(ms)); + encodeMemberSet(ms), urls); } Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid(), - const MemberSet& ms=MemberSet()) + const MemberSet& ms=MemberSet(), const framing::Array& urls=framing::Array()) { - return Status(ProtocolVersion(), 0, active, start, state, stop, - encodeMemberSet(ms)); + return Status(ProtocolVersion(), 0, active, start, state, stop, + encodeMemberSet(ms), urls); } QPID_AUTO_TEST_CASE(testFirstInCluster) { diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 16d7fb0b78..12aed1f671 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -363,16 +363,20 @@ class Broker(Popen): def host_port(self): return "%s:%s" % (self.host(), self.port()) + def log_contains(self, str, timeout=1): + """Wait for str to appear in the log file up to timeout. Return true if found""" + return retry(lambda: find_in_file(str, self.log), timeout) + def log_ready(self): """Return true if the log file exists and contains a broker ready message""" if not self._log_ready: self._log_ready = find_in_file("notice Broker running", self.log) return self._log_ready - def ready(self, **kwargs): + def ready(self, timeout=5, **kwargs): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. - if not retry(self.log_ready, timeout=60): + if not retry(self.log_ready, timeout=timeout): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) # Create a connection and a session. For a cluster broker this will diff --git a/qpid/cpp/src/tests/cluster_authentication_soak.cpp b/qpid/cpp/src/tests/cluster_authentication_soak.cpp index b8e8a22693..a3271701c3 100644 --- a/qpid/cpp/src/tests/cluster_authentication_soak.cpp +++ b/qpid/cpp/src/tests/cluster_authentication_soak.cpp @@ -96,7 +96,7 @@ startBroker ( brokerVector & brokers , int brokerNumber, string const & clusterN argv.push_back (clusterArg.str()); argv.push_back ("--cluster-username=zig"); argv.push_back ("--cluster-password=zig"); - argv.push_back ("--cluster-mechanism=ANONYMOUS"); + argv.push_back ("--cluster-mechanism=PLAIN"); argv.push_back ("--sasl-config=./sasl_config"); argv.push_back ("--auth=yes"); argv.push_back ("--mgmt-enable=yes"); diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py index 0e80e06d34..2db2cdd433 100755 --- a/qpid/cpp/src/tests/cluster_tests.py +++ b/qpid/cpp/src/tests/cluster_tests.py @@ -114,7 +114,9 @@ class ShortTests(BrokerTest): sasl_config=os.path.join(self.rootdir, "sasl_config") acl=os.path.join(os.getcwd(), "policy.acl") aclf=file(acl,"w") + # Must allow cluster-user (zag) access to credentials exchange. aclf.write(""" +acl allow zag@QPID publish exchange name=qpid.cluster-credentials acl allow zig@QPID all all acl deny all all """) @@ -122,7 +124,11 @@ acl deny all all cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config, "--load-module", os.getenv("ACL_LIB"), - "--acl-file", acl]) + "--acl-file", acl, + "--cluster-username=zag", + "--cluster-password=zag", + "--cluster-mechanism=PLAIN" + ]) # Valid user/password, ensure queue is created. c = cluster[0].connect(username="zig", password="zig") @@ -167,39 +173,51 @@ acl deny all all self.fail("Expected exception") except messaging.exceptions.NotFound: pass - def test_sasl_join(self): + def test_sasl_join_good(self): """Verify SASL authentication between brokers when joining a cluster.""" sasl_config=os.path.join(self.rootdir, "sasl_config") # Test with a valid username/password cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config, - "--load-module", os.getenv("ACL_LIB"), "--cluster-username=zig", "--cluster-password=zig", "--cluster-mechanism=PLAIN" ]) cluster.start() - cluster.ready() - c = cluster[1].connect(username="zag", password="zag") + c = cluster[1].connect(username="zag", password="zag", mechanism="PLAIN") - # Test with an invalid username/password + def test_sasl_join_bad_password(self): + # Test with an invalid password cluster = self.cluster(1, args=["--auth", "yes", - "--sasl-config", sasl_config, - "--load-module", os.getenv("ACL_LIB"), - "--cluster-username=x", - "--cluster-password=y", + "--sasl-config", os.path.join(self.rootdir, "sasl_config"), + "--cluster-username=zig", + "--cluster-password=bad", "--cluster-mechanism=PLAIN" ]) - try: - cluster.start(expect=EXPECT_EXIT_OK) - cluster[1].ready() - self.fail("Expected exception") - except: pass + cluster.start(wait=False, expect=EXPECT_EXIT_FAIL) + assert cluster[1].log_contains("critical Unexpected error: connection-forced: Authentication failed") + + def test_sasl_join_wrong_user(self): + # Test with a valid user that is not the cluster user. + cluster = self.cluster(0, args=["--auth", "yes", + "--sasl-config", os.path.join(self.rootdir, "sasl_config")]) + cluster.start(args=["--cluster-username=zig", + "--cluster-password=zig", + "--cluster-mechanism=PLAIN" + ]) + + cluster.start(wait=False, expect=EXPECT_EXIT_FAIL, + args=["--cluster-username=zag", + "--cluster-password=zag", + "--cluster-mechanism=PLAIN" + ]) + assert cluster[1].log_contains("critical Unexpected error: unauthorized-access: unauthorized-access: Unauthorized user zag@QPID for qpid.cluster-credentials, should be zig") def test_user_id_update(self): """Ensure that user-id of an open session is updated to new cluster members""" sasl_config=os.path.join(self.rootdir, "sasl_config") - cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config,]) + cluster = self.cluster(1, args=["--auth", "yes", "--sasl-config", sasl_config, + "--cluster-mechanism=ANONYMOUS"]) c = cluster[0].connect(username="zig", password="zig") s = c.session().sender("q;{create:always}") s.send(Message("x", user_id="zig")) # Message sent before start new broker |
