diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/broker/Exchange.cpp | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Exchange.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 83 |
1 files changed, 71 insertions, 12 deletions
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index d143471559..d68845062d 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -19,16 +19,18 @@ * */ +#include "qpid/broker/Broker.h" +#include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/FedOps.h" -#include "qpid/broker/Broker.h" -#include "qpid/management/ManagementAgent.h" #include "qpid/broker/Queue.h" -#include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/reply_exceptions.h" -#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/sys/ExceptionHolder.h" +#include <stdexcept> using namespace qpid::broker; using namespace qpid::framing; @@ -56,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) { if (parent->sequence){ parent->sequenceNo++; - msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo); + msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo); } if (parent->ive) { parent->lastMsg = &( msg.getMessage()); @@ -70,6 +72,36 @@ Exchange::PreRoute::~PreRoute(){ } } +namespace { +/** Store information about an exception to be thrown later. + * If multiple exceptions are stored, save the first of the "most severe" + * exceptions, SESSION is les sever than CONNECTION etc. + */ +class ExInfo { + public: + enum Type { NONE, SESSION, CONNECTION, OTHER }; + + ExInfo(string exchange) : type(NONE), exchange(exchange) {} + void store(Type type_, const qpid::sys::ExceptionHolder& exception_, const boost::shared_ptr<Queue>& queue) { + QPID_LOG(warning, "Exchange " << exchange << " cannot deliver to queue " + << queue->getName() << ": " << exception_.what()); + if (type < type_) { // Replace less severe exception + type = type_; + exception = exception_; + } + } + + void raise() { + exception.raise(); + } + + private: + Type type; + string exchange; + qpid::sys::ExceptionHolder exception; +}; +} + void Exchange::doRoute(Deliverable& msg, ConstBindingList b) { int count = 0; @@ -80,11 +112,25 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) msg.getMessage().blockContentRelease(); } + + ExInfo error(getName()); // Save exception to throw at the end. for(std::vector<Binding::shared_ptr>::const_iterator i = b->begin(); i != b->end(); i++, count++) { - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched(); + try { + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched(); + } + catch (const SessionException& e) { + error.store(ExInfo::SESSION, framing::createSessionException(e.code, e.what()),(*i)->queue); + } + catch (const ConnectionException& e) { + error.store(ExInfo::CONNECTION, framing::createConnectionException(e.code, e.what()), (*i)->queue); + } + catch (const std::exception& e) { + error.store(ExInfo::OTHER, qpid::sys::ExceptionHolder(new Exception(e.what())), (*i)->queue); + } } + error.raise(); } if (mgmtExchange != 0) @@ -115,7 +161,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0), broker(b) + sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { @@ -133,7 +179,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent, Broker* b) : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), - args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b) + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { @@ -155,7 +201,11 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } ive = _args.get(qpidIVE); - if (ive) QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); + if (ive) { + if (broker && broker->isInCluster()) + throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster"); + QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); + } } Exchange::~Exchange () @@ -340,5 +390,14 @@ bool Exchange::MatchQueue::operator()(Exchange::Binding::shared_ptr b) } void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) { - msg->getProperties<DeliveryProperties>()->setExchange(getName()); + msg->setExchange(getName()); +} + +bool Exchange::routeWithAlternate(Deliverable& msg) +{ + route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); + if (!msg.delivered && alternate) { + alternate->route(msg, msg.getMessage().getRoutingKey(), msg.getMessage().getApplicationHeaders()); + } + return msg.delivered; } |