summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-22 19:08:47 +0000
committerAlan Conway <aconway@apache.org>2008-09-22 19:08:47 +0000
commit5028ba1a330f86f4f53fdeaa89d3564435086b29 (patch)
tree6ea93770958c647bc7c5abc200707b80905a7819 /cpp/src/qpid
parentf31ce3a04e59da295f397379025809c16ee1258d (diff)
downloadqpid-python-5028ba1a330f86f4f53fdeaa89d3564435086b29.tar.gz
Fixed error handling session-busy condition on broker.
Added accessors to iterate over broker::SemanticState consumers. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.cpp12
-rw-r--r--cpp/src/qpid/amqp_0_10/SessionHandler.h1
-rw-r--r--cpp/src/qpid/broker/SemanticState.h10
-rw-r--r--cpp/src/qpid/broker/SessionState.h2
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp8
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h4
-rw-r--r--cpp/src/qpid/sys/AggregateOutput.h9
7 files changed, 39 insertions, 7 deletions
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 8bf12d248a..c9bb57a13e 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -83,9 +83,7 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
catch(const ChannelException& e){
QPID_LOG(error, "Channel exception: " << e.what());
- if (getState())
- peer.detached(getState()->getId().getName(), e.code);
- channelException(e.code, e.getMessage());
+ peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
QPID_LOG(error, "Connection exception: " << e.what());
@@ -126,11 +124,15 @@ void SessionHandler::checkName(const std::string& name) {
<< ", expecting: " << getState()->getId().getName()));
}
-void SessionHandler::attach(const std::string& name, bool force) {
+void SessionHandler::attach(const std::string& name_, bool force) {
+ // Save the name for possible session-busy exception. Session-busy
+ // can be thrown before we have attached the handler to a valid
+ // SessionState, and in that case we need the name to send peer.detached
+ name = name_;
if (getState() && name == getState()->getId().getName())
return; // Idempotent
if (getState())
- throw SessionBusyException(
+ throw TransportBusyException(
QPID_MSG("Channel " << channel.get() << " already attached to " << getState()->getId()));
setState(name, force);
QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId());
diff --git a/cpp/src/qpid/amqp_0_10/SessionHandler.h b/cpp/src/qpid/amqp_0_10/SessionHandler.h
index ccbe597bfc..684258bbae 100644
--- a/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -106,6 +106,7 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
Peer peer;
bool ignoring;
bool sendReady, receiveReady;
+ std::string name;
private:
void sendCommandPoint(const SessionPoint&);
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 94bd929adc..0c56885f8f 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -45,6 +45,7 @@
#include <vector>
#include <boost/intrusive_ptr.hpp>
+#include <boost/cast.hpp>
namespace qpid {
namespace broker {
@@ -58,6 +59,7 @@ class SessionContext;
class SemanticState : public sys::OutputTask,
private boost::noncopyable
{
+ public:
class ConsumerImpl : public Consumer, public sys::OutputTask,
public boost::enable_shared_from_this<ConsumerImpl>
{
@@ -106,8 +108,11 @@ class SemanticState : public sys::OutputTask,
bool hasOutput();
bool doOutput();
+
+ std::string getName() const { return name; }
};
+ private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
@@ -190,6 +195,11 @@ class SemanticState : public sys::OutputTask,
void attached();
void detached();
+
+ template <class F> void eachConsumer(const F& f) {
+ outputTasks.eachOutput(
+ boost::bind(f, boost::bind(&boost::polymorphic_downcast<ConsumerImpl*, OutputTask>, _1)));
+ }
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index 5dd57d2299..bdef894f9f 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -100,6 +100,8 @@ class SessionState : public qpid::SessionState,
void readyToSend();
+ template <class F> void eachConsumer(const F& f) { semanticState.eachConsumer(f); }
+
private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index c78859cc39..45ccec7166 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -186,8 +186,16 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
client::Session cs;
client::SessionBase_0_10Access(cs).set(simpl);
cs.sync();
+
+ broker::SessionState* ss = sh.getSession();
+ ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1));
+
// FIXME aconway 2008-09-19: remaining session state.
QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
}
+void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
+ QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName());
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
index 6cd382667a..6ce41a53a9 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -24,6 +24,7 @@
#include "qpid/client/Connection.h"
#include "qpid/client/AsyncSession.h"
+#include "qpid/broker/SemanticState.h"
#include "qpid/sys/Runnable.h"
#include <boost/shared_ptr.hpp>
@@ -69,7 +70,8 @@ class DumpClient : public sys::Runnable {
void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
void dumpSession(broker::SessionHandler& s);
-
+ void dumpConsumer(broker::SemanticState::ConsumerImpl*);
+
private:
Url receiver;
Cluster& donor;
diff --git a/cpp/src/qpid/sys/AggregateOutput.h b/cpp/src/qpid/sys/AggregateOutput.h
index 02a53ed50b..af26601f76 100644
--- a/cpp/src/qpid/sys/AggregateOutput.h
+++ b/cpp/src/qpid/sys/AggregateOutput.h
@@ -21,11 +21,13 @@
#ifndef _AggregateOutput_
#define _AggregateOutput_
-#include <vector>
#include "Mutex.h"
#include "OutputControl.h"
#include "OutputTask.h"
+#include <algorithm>
+#include <vector>
+
namespace qpid {
namespace sys {
@@ -46,6 +48,11 @@ namespace sys {
bool hasOutput();
void addOutputTask(OutputTask* t);
void removeOutputTask(OutputTask* t);
+
+ /** Apply f to each OutputTask* in the tasks list */
+ template <class F> void eachOutput(const F& f) {
+ std::for_each(tasks.begin(), tasks.end(), f);
+ }
};
}