summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp206
-rw-r--r--cpp/src/qpid/cluster/Connection.h20
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp1
3 files changed, 95 insertions, 132 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9a8cab24a6..08e31c184a 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -39,7 +39,6 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
-#include "qpid/framing/ClusterConnectionSecureUserIdBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
@@ -48,15 +47,6 @@
#include <boost/current_function.hpp>
-typedef boost::function<void ( std::string& )> UserIdCallback;
-
-// TODO aconway 2008-11-03:
-//
-// Refactor code for receiving an update into a separate UpdateConnection
-// class.
-//
-
-
namespace qpid {
namespace cluster {
@@ -88,10 +78,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
- secureConnection(0),
- mcastSentButNotReceived(false),
- inConnectionNegotiation(true)
-{ }
+ secureConnection(0)
+{}
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
@@ -107,9 +95,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
- secureConnection(0),
- mcastSentButNotReceived(false),
- inConnectionNegotiation(true)
+ secureConnection(0)
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
@@ -117,11 +103,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
// and initialized when the announce is received.
QPID_LOG(info, "new client connection " << *this);
giveReadCredit(cluster.getSettings().readMax); // Flow control
- cluster.getMulticast().mcastControl(
- ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
- connectionCtor.external.ssf,
- connectionCtor.external.authid,
- connectionCtor.external.nodict), getId());
+ init();
}
else {
// Catch-up shadow connections initialized using nextShadow id.
@@ -135,7 +117,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
- secureConnection = sc;
+ secureConnection = sc;
+ if (connection.get()) connection->setSecureConnection(sc);
}
void Connection::init() {
@@ -155,30 +138,33 @@ void Connection::init() {
}
if (!isCatchUp())
connection->setErrorListener(this);
- UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 );
- connection->setUserIdCallback ( fn );
}
// Called when we have consumed a read buffer to give credit to the
// connection layer to continue reading.
void Connection::giveReadCredit(int credit) {
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if (inConnectionNegotiation) {
- mcastSentButNotReceived = false;
- connectionNegotiationMonitor.notify();
- }
- }
if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
-void Connection::announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict) {
+void Connection::announce(
+ const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict,
+ const std::string& username, const std::string& initialFrames)
+{
QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
QPID_ASSERT(ssf == connectionCtor.external.ssf);
QPID_ASSERT(authid == connectionCtor.external.authid);
QPID_ASSERT(nodict == connectionCtor.external.nodict);
- init();
+ // Local connections are already initialized.
+ if (isShadow()) {
+ init();
+ // Play initial frames into the connection.
+ Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
+ AMQFrame frame;
+ while (frame.decode(buf))
+ connection->received(frame);
+ connection->setUserId(username);
+ }
}
Connection::~Connection() {
@@ -201,7 +187,6 @@ void Connection::received(framing::AMQFrame& f) {
if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
-
connection->received(f);
}
else { // Shadow or updated catch-up connection.
@@ -235,7 +220,7 @@ struct GiveReadCreditOnExit {
int credit;
GiveReadCreditOnExit(Connection& connection_, int credit_) :
connection(connection_), credit(credit_) {}
- ~GiveReadCreditOnExit() { connection.giveReadCredit(credit); }
+ ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); }
};
void Connection::deliverDoOutput(uint32_t limit) {
@@ -307,57 +292,76 @@ void Connection::abort() {
}
// ConnectionCodec::decode receives read buffers from directly-connected clients.
-size_t Connection::decode(const char* buffer, size_t size) {
-
- if (catchUp) { // Handle catch-up locally.
- Buffer buf(const_cast<char*>(buffer), size);
+size_t Connection::decode(const char* data, size_t size) {
+ GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default.
+ const char* ptr = data;
+ const char* end = data + size;
+ if (catchUp) { // Handle catch-up locally.
+ Buffer buf(const_cast<char*>(ptr), size);
+ ptr += size;
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
- return buf.getPosition();
}
else { // Multicast local connections.
- assert(isLocal());
- const char* remainingData = buffer;
- size_t remainingSize = size;
-
- if (expectProtocolHeader) {
- //If this is an outgoing link, we will receive a protocol
- //header which needs to be decoded first
- framing::ProtocolInitiation pi;
- Buffer buf(const_cast<char*>(buffer), size);
- if (pi.decode(buf)) {
- //TODO: check the version is correct
- QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
- expectProtocolHeader = false;
- remainingData = buffer + pi.encodedSize();
- remainingSize = size - pi.encodedSize();
- } else {
- QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link");
- giveReadCredit(1); // We're not going to mcast so give read credit now.
- return 0;
- }
+ assert(isLocalClient());
+ assert(connection.get());
+ if (!checkProtocolHeader(ptr, size)) // Updates ptr
+ return 0; // Incomplete header
+
+ if (!connection->isOpen())
+ processInitialFrames(ptr, end-ptr); // Updates ptr
+
+ if (connection->isOpen() && end - ptr > 0) {
+ // We're multi-casting, we will give read credit on delivery.
+ grc.credit = 0;
+ cluster.getMulticast().mcastBuffer(ptr, end - ptr, self);
+ ptr = end;
}
-
- // During connection negotiation wait for each multicast to be
- // processed before sending the next, to ensure that the
- // security layer is activated before we attempt to decode
- // encrypted frames.
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if ( inConnectionNegotiation ) {
- assert(!mcastSentButNotReceived);
- mcastSentButNotReceived = true;
- }
- }
- cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if (inConnectionNegotiation)
- while (mcastSentButNotReceived)
- connectionNegotiationMonitor.wait();
- assert(!mcastSentButNotReceived);
+ }
+ return ptr - data;
+}
+
+// Decode the protocol header if needed. Updates data and size
+// returns true if the header is complete or already read.
+bool Connection::checkProtocolHeader(const char*& data, size_t size) {
+ if (expectProtocolHeader) {
+ //If this is an outgoing link, we will receive a protocol
+ //header which needs to be decoded first
+ framing::ProtocolInitiation pi;
+ Buffer buf(const_cast<char*&>(data), size);
+ if (pi.decode(buf)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
+ expectProtocolHeader = false;
+ data += pi.encodedSize();
+ } else {
+ return false;
}
- return size;
+ }
+ return true;
+}
+
+void Connection::processInitialFrames(const char*& ptr, size_t size) {
+ // Process the initial negotiation locally and store it so
+ // it can be replayed on other brokers in announce()
+ Buffer buf(const_cast<char*>(ptr), size);
+ framing::AMQFrame frame;
+ while (!connection->isOpen() && frame.decode(buf))
+ received(frame);
+ initialFrames.append(ptr, buf.getPosition());
+ ptr += buf.getPosition();
+ if (connection->isOpen()) { // initial negotiation complete
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionAnnounceBody(
+ ProtocolVersion(),
+ connectionCtor.mgmtId,
+ connectionCtor.external.ssf,
+ connectionCtor.external.authid,
+ connectionCtor.external.nodict,
+ connection->getUserId(),
+ initialFrames),
+ getId());
+ initialFrames.clear();
}
}
@@ -574,21 +578,14 @@ void Connection::queue(const std::string& encoded) {
}
void Connection::sessionError(uint16_t , const std::string& msg) {
- // If we are negotiating the connection when it fails just close the connectoin.
- // If it fails after that then we have to flag the error to the cluster.
- if (inConnectionNegotiation)
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
- else
+ // Ignore errors before isOpen(), we're not multicasting yet.
+ if (connection->isOpen())
cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
-
}
void Connection::connectionError(const std::string& msg) {
- // If we are negotiating the connection when it fails just close the connectoin.
- // If it fails after that then we have to flag the error to the cluster.
- if (inConnectionNegotiation)
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
- else
+ // Ignore errors before isOpen(), we're not multicasting yet.
+ if (connection->isOpen())
cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
}
@@ -630,30 +627,5 @@ void Connection::managementAgents(const std::string& data) {
QPID_LOG(debug, cluster << " updated management agents");
}
-
-void Connection::mcastUserId ( std::string & id ) {
- // Only the directly connected broker will mcast the secure user id, and only
- // for client connections (not update connections)
- if (isLocalClient())
- cluster.getMulticast().mcastControl(
- ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() );
- {
- // This call signals the end of the connection negotiation phase.
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- inConnectionNegotiation = false;
- mcastSentButNotReceived = false;
- connectionNegotiationMonitor.notify();
- }
-}
-
-// All connections, shadow or not, get this call.
-void Connection::secureUserId(const std::string& id) {
- // Only set the user ID on shadow connections, and only if id is not the empty string.
- if ( isShadow() && !id.empty() )
- connection->setUserId ( id );
-}
-
-
-
}} // Namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 4f69bf7cf4..70c4d0e2a3 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -164,8 +164,9 @@ class Connection :
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
- void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict);
- void secureUserId(const std::string&);
+ void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid,
+ bool nodict, const std::string& username,
+ const std::string& initFrames);
void abort();
void deliverClose();
@@ -175,16 +176,8 @@ class Connection :
void managementSchema(const std::string& data);
void managementAgents(const std::string& data);
void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
-
- //uint32_t getSsf() const { return connectionCtor.external.ssf; }
-
void setSecureConnection ( broker::SecureConnection * sc );
- // This is a callback, registered with the broker connection.
- // It gives me the user ID, if one is negotiated through Sasl.
- void mcastUserId ( std::string & );
-
-
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -228,6 +221,8 @@ class Connection :
bool checkUnsupported(const framing::AMQBody& body);
void deliverDoOutput(uint32_t limit);
+ bool checkProtocolHeader(const char*& data, size_t size);
+ void processInitialFrames(const char*& data, size_t size);
boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
broker::SessionState& sessionState();
broker::SemanticState& semanticState();
@@ -247,13 +242,10 @@ class Connection :
McastFrameHandler mcastFrameHandler;
UpdateReceiver& updateIn;
qpid::broker::SecureConnection* secureConnection;
+ std::string initialFrames;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
- mutable sys::Monitor connectionNegotiationMonitor;
- bool mcastSentButNotReceived;
- bool inConnectionNegotiation;
-
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index d57ff76941..8916de9628 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -61,7 +61,6 @@ void Multicaster::mcast(const Event& e) {
QPID_LOG(trace, "MCAST " << e);
if (bypass) { // direct, don't queue
iovec iov = e.toIovec();
- // FIXME aconway 2010-03-10: should do limited retry.
while (!cpg.mcast(&iov, 1))
;
}