summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-04-11 14:29:04 +0000
committerAlan Conway <aconway@apache.org>2009-04-11 14:29:04 +0000
commit769416f61343b6458529f023164b6ebb837eec3c (patch)
treed38809248e0d8814734fa89bd097774fa60cc5dd /cpp/src/qpid/broker
parent99d89b32f80599872df73a8f1999acd57aa37748 (diff)
downloadqpid-python-769416f61343b6458529f023164b6ebb837eec3c.tar.gz
Fix issues when cluster is run with persistence enabled.
- Handle partial failures (e.g. due to disk error): failing brokers shut down, others continue. - Enable persistence in cluster tests. - Correct message status in DeliveryRecord updates. - Remove qpid.update queue when update complete - avoid it becoming persistent git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@764204 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--cpp/src/qpid/broker/Connection.h17
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp3
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp14
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h3
5 files changed, 34 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index b06e06d353..365b3ccbeb 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -57,7 +57,8 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
mgmtObject(0),
links(broker_.getLinks()),
agent(0),
- timer(broker_.getTimer())
+ timer(broker_.getTimer()),
+ errorListener(0)
{
Manageable* parent = broker.GetVhostObject();
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index b659fe6468..e67cdce681 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -66,6 +66,17 @@ class Connection : public sys::ConnectionInputHandler,
public RefCounted
{
public:
+ /**
+ * Listener that can be registered with a Connection to be informed of errors.
+ */
+ class ErrorListener
+ {
+ public:
+ virtual ~ErrorListener() {}
+ virtual void sessionError(uint16_t channel, const std::string&) = 0;
+ virtual void connectionError(const std::string&) = 0;
+ };
+
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
~Connection ();
@@ -101,6 +112,9 @@ class Connection : public sys::ConnectionInputHandler,
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
void setFederationLink(bool b);
+ /** Connection does not delete the listener. 0 resets. */
+ void setErrorListener(ErrorListener* l) { errorListener=l; }
+ ErrorListener* getErrorListener() { return errorListener; }
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
@@ -112,6 +126,7 @@ class Connection : public sys::ConnectionInputHandler,
void sendClose();
void setSecureConnection(SecureConnection* secured);
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -128,6 +143,8 @@ class Connection : public sys::ConnectionInputHandler,
management::ManagementAgent* agent;
Timer& timer;
boost::intrusive_ptr<TimerTask> heartbeatTimer;
+ ErrorListener* errorListener;
+
public:
qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
};
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 63212c7794..8b70836da0 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -64,13 +64,16 @@ void ConnectionHandler::heartbeat()
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
AMQMethodBody* method=frame.getBody()->getMethod();
+ Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
try{
if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
handler->connection.getChannel(frame.getChannel()).in(frame);
}
}catch(ConnectionException& e){
+ if (errorListener) errorListener->connectionError(e.what());
handler->proxy.close(e.code, e.what());
}catch(std::exception& e){
+ if (errorListener) errorListener->connectionError(e.what());
handler->proxy.close(541/*internal error*/, e.what());
}
}
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 442c3eb34b..ca1f875991 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -45,14 +45,20 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
-void SessionHandler::channelException(framing::session::DetachCode, const std::string&) {
- handleDetach();
-}
-
void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ // NOTE: must tell the error listener _before_ calling connection.close()
+ if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
connection.close(code, msg);
}
+void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
+ if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
+void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
+ if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index ffc032f64c..ca6d6bb193 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -73,8 +73,9 @@ class SessionHandler : public amqp_0_10::SessionHandler {
virtual void setState(const std::string& sessionName, bool force);
virtual qpid::SessionState* getState();
virtual framing::FrameHandler* getInHandler();
- virtual void channelException(framing::session::DetachCode code, const std::string& msg);
virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
+ virtual void channelException(framing::session::DetachCode, const std::string& msg);
+ virtual void executionException(framing::execution::ErrorCode, const std::string& msg);
virtual void detaching();
virtual void readyToSend();