diff options
| author | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
| commit | f61e1ef7589da893b9b54448224dc0961515eb40 (patch) | |
| tree | 258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid/broker | |
| parent | c5294d471ade7a18c52ca7d4028a494011c82293 (diff) | |
| download | qpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz | |
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network
failure are automatically re-transmitted for transparent re-connection.
client::Session improvements:
- Locking to avoid races between network & user threads.
- Replaced client::StateManager with sys::StateMonitor - avoid heap allocation.
qpid::Exception clean up:
- use QPID_MSG consistently to format exception messages.
- throw typed exceptions (in reply_exceptions.h) for AMQP exceptions.
- re-throw correct typed exception on client for exceptions from broker.
- Removed QpidError.h
rubygen/templates/constants.rb:
- constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants.
- reply_constants.h: Added throwReplyException(code, text)
log::Logger:
- Fixed shutdown race in Statement::~Initializer()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
25 files changed, 222 insertions, 166 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e53774740a..b88f1c6c6a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -64,7 +64,8 @@ Broker::Options::Options(const std::string& name) : storeDir("/var"), storeAsync(false), enableMgmt(0), - mgmtPubInterval(10) + mgmtPubInterval(10), + ack(100) { addOptions() ("port,p", optValue(port,"PORT"), @@ -102,7 +103,8 @@ Broker::Broker(const Broker::Options& conf) : queues(store.get()), stagingThreshold(0), factory(*this), - dtxManager(store.get()) + dtxManager(store.get()), + sessionManager(conf.ack) { if(conf.enableMgmt){ managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 2018371624..817197a351 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -69,6 +69,7 @@ class Broker : public sys::Runnable, public Plugin::Target bool storeAsync; bool enableMgmt; uint16_t mgmtPubInterval; + uint32_t ack; }; virtual ~Broker(); diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 99b585406e..dad40868d6 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -21,6 +21,7 @@ #include "MessageDelivery.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace broker { @@ -75,8 +76,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri checkAlternate(response.first, alternate); } }catch(UnknownExchangeTypeException& e){ - throw ConnectionException( - 503, "Exchange type not implemented: " + type); + throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); } } } @@ -84,24 +84,23 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) { if (!type.empty() && exchange->getType() != type) { - throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type); + throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type)); } } void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) { - if (alternate && alternate != exchange->getAlternate()) { - throw ConnectionException(530, "Exchange declared with alternate-exchange " - + exchange->getAlternate()->getName() + ", requested " - + alternate->getName()); - } - + if (alternate && alternate != exchange->getAlternate()) + throw NotAllowedException( + QPID_MSG("Exchange declared with alternate-exchange " + << exchange->getAlternate()->getName() << ", requested " + << alternate->getName())); } void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ //TODO: implement unused Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange."); + if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); getBroker().getExchanges().destroy(name); @@ -292,7 +291,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, Queue::shared_ptr queue = state.getQueue(queueName); if(!consumerTag.empty() && state.exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); + throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); } string newTag = consumerTag; //need to generate name here, so we have it for the adapter (it is diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 5537dc67f5..706b42c080 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -82,7 +82,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations throw framing::NotImplementedException("Tunnel class not implemented"); } // Handlers no longer implemented in BrokerAdapter: -#define BADHANDLER() assert(0); throw framing::InternalErrorException() +#define BADHANDLER() assert(0); throw framing::NotImplementedException("") ExecutionHandler* getExecutionHandler() { BADHANDLER(); } ConnectionHandler* getConnectionHandler() { BADHANDLER(); } SessionHandler* getSessionHandler() { BADHANDLER(); } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ca0ca20849..f981d47ef7 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -28,9 +28,10 @@ #include "BrokerAdapter.h" #include "SemanticHandler.h" -#include <boost/utility/in_place_factory.hpp> #include <boost/bind.hpp> +#include <algorithm> + using namespace boost; using namespace qpid::sys; using namespace qpid::framing; @@ -61,6 +62,7 @@ void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { adapter.close(code, text, classId, methodId); + channels.clear(); getOutput().close(); } @@ -73,8 +75,11 @@ void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed(){ +void Connection::closed(){ // Physically closed, suspend open sessions. try { + std::for_each( + channels.begin(), channels.end(), + boost::bind(&SessionHandler::localSuspend, _1)); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index f697986194..dd645b595e 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -46,9 +46,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) AMQMethodBody* method=frame.getBody()->getMethod(); try{ if (!invoke(*handler.get(), *method)) - throw ConnectionException(503, "Class can't be accessed over channel 0"); + throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); }catch(ConnectionException& e){ - handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp index 0bb3449289..3fcc487324 100644 --- a/cpp/src/qpid/broker/Daemon.cpp +++ b/cpp/src/qpid/broker/Daemon.cpp @@ -17,7 +17,7 @@ */ #include "Daemon.h" #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include <boost/iostreams/stream.hpp> #include <boost/iostreams/device/file_descriptor.hpp> diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 5887d13f85..ec042ff56a 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -44,7 +44,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, if (fail) { state.endDtx(xid, true); if (suspend) { - throw ConnectionException(503, "End and suspend cannot both be set."); + throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { return DtxDemarcationEndResult(XA_RBROLLBACK); } @@ -67,7 +67,7 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, bool resume) { if (join && resume) { - throw ConnectionException(503, "Join and resume cannot both be set."); + throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set.")); } try { if (resume) { @@ -161,7 +161,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, const string& xid) { //Currently no heuristic completion is supported, so this should never be used. - throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); + throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); } DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 0d211017de..0597b41f98 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -20,16 +20,20 @@ */ #include "DtxManager.h" #include "DtxTimeout.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include <boost/format.hpp> #include <iostream> using qpid::sys::Mutex; using namespace qpid::broker; +using namespace qpid::framing; DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {} -DtxManager::~DtxManager() {} +DtxManager::~DtxManager() { + // timer.stop(); // FIXME aconway 2007-10-23: leaking threads. +} void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops) { @@ -84,7 +88,7 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid)); } return i; } @@ -94,7 +98,7 @@ void DtxManager::remove(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid)); } else { work.erase(i); } @@ -105,7 +109,7 @@ DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i != work.end()) { - throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid); + throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)")); } else { return work.insert(xid, new DtxWorkRecord(xid, store)).first; } diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h index 33da62e7f4..7d0b8622d0 100644 --- a/cpp/src/qpid/broker/DtxTimeout.h +++ b/cpp/src/qpid/broker/DtxTimeout.h @@ -29,12 +29,7 @@ namespace broker { class DtxManager; - -struct DtxTimeoutException : public Exception -{ - DtxTimeoutException() {} -}; - +struct DtxTimeoutException : public Exception {}; struct DtxTimeout : public TimerTask { diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index f2f118c5e4..fe9e42ca32 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -19,12 +19,14 @@ * */ #include "DtxWorkRecord.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> #include <boost/mem_fn.hpp> using boost::mem_fn; using qpid::sys::Mutex; using namespace qpid::broker; +using namespace qpid::framing; DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {} @@ -71,8 +73,7 @@ bool DtxWorkRecord::commit(bool onePhase) if (prepared) { //already prepared i.e. 2pc if (onePhase) { - throw ConnectionException(503, - boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!")); } store->commit(*txn); @@ -83,8 +84,7 @@ bool DtxWorkRecord::commit(bool onePhase) } else { //1pc commit optimisation, don't need a 2pc transaction context: if (!onePhase) { - throw ConnectionException(503, - boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); } std::auto_ptr<TransactionContext> localtxn = store->begin(); if (prepare(localtxn.get())) { @@ -119,7 +119,7 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) throw DtxTimeoutException(); } if (completed) { - throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!")); } work.push_back(ops); } @@ -133,7 +133,7 @@ bool DtxWorkRecord::check() //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { if (!(*i)->isEnded()) { - throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!")); } else if ((*i)->isRollbackOnly()) { rolledback = true; } diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index ae1afe5abb..98e3cc7347 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -24,6 +24,7 @@ #include "HeadersExchange.h" #include "TopicExchange.h" #include "ManagementExchange.h" +#include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; using namespace qpid::sys; @@ -75,9 +76,8 @@ void ExchangeRegistry::destroy(const string& name){ Exchange::shared_ptr ExchangeRegistry::get(const string& name){ RWlock::ScopedRlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) { - throw ChannelException(404, "Exchange not found: " + name); - } + if (i == exchanges.end()) + throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name)); return i->second; } diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 215a002517..dd688cdfcf 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -20,7 +20,7 @@ */ #include "HeadersExchange.h" #include "qpid/framing/FieldValue.h" -#include "qpid/QpidError.h" +#include "qpid/framing/reply_exceptions.h" #include <algorithm> @@ -46,9 +46,8 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ RWlock::ScopedWlock locker(lock); FieldTable::ValuePtr what = args->get(x_match); - if (!what || (*what != all && *what != any)) { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); - } + if (!what || (*what != all && *what != any)) + throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange.")); Binding binding(*args, queue); Bindings::iterator i = std::find(bindings.begin(),bindings.end(), binding); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index b12910893a..834ce0a203 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -16,7 +16,7 @@ * */ -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "MessageHandlerImpl.h" #include "qpid/framing/FramingContent.h" @@ -56,39 +56,39 @@ MessageHandlerImpl::cancel(const string& destination ) void MessageHandlerImpl::open(const string& /*reference*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::close(const string& /*reference*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::checkpoint(const string& /*reference*/, const string& /*identifier*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::resume(const string& /*reference*/, const string& /*identifier*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::offset(uint64_t /*value*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void @@ -97,19 +97,19 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& /*destination*/, bool /*noAck*/ ) { - throw ConnectionException(540, "get no longer supported"); + throw NotImplementedException("get no longer supported"); } void MessageHandlerImpl::empty() { - throw ConnectionException(540, "empty no longer supported"); + throw NotImplementedException("empty no longer supported"); } void MessageHandlerImpl::ok() { - throw ConnectionException(540, "Message.Ok no longer supported"); + throw NotImplementedException("Message.Ok no longer supported"); } void @@ -134,7 +134,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, { Queue::shared_ptr queue = state.getQueue(queueName); if(!destination.empty() && state.exists(destination)) - throw ConnectionException(530, "Consumer tags must be unique"); + throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); string tag = destination; state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), @@ -165,7 +165,7 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i state.addByteCredit(destination, value); } else { //unknown - throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); + throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit)); } } @@ -179,7 +179,7 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) //window state.setWindowMode(destination); } else{ - throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); + throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode)); } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 116e8d9431..18c1ab1056 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -19,9 +19,8 @@ * */ -#include <boost/format.hpp> - #include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" #include "Broker.h" #include "Queue.h" #include "Exchange.h" @@ -37,7 +36,6 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; -using boost::format; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -269,17 +267,15 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { void Queue::consume(Consumer::ptr c, bool requestExclusive){ RWlock::ScopedWlock locker(consumerLock); if(exclusive) { - throw ChannelException( - 403, format("Queue '%s' has an exclusive consumer." - " No more consumers allowed.") % getName()); + throw AccessRefusedException( + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); } if(requestExclusive) { if(acquirers.empty() && browsers.empty()) { exclusive = c; } else { - throw ChannelException( - 403, format("Queue '%s' already has consumers." - "Exclusive access denied.") % getName()); + throw AccessRefusedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } } if (c->preAcquires()) { diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 8535dc6a60..e1a8ae470d 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -125,7 +125,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) incoming.complete(id); if (!invoker.wasHandled()) { - throw ConnectionException(540, "Not implemented"); + throw NotImplementedException("Not implemented"); } else if (invoker.hasResult()) { session.getProxy().getExecution().result(id.getValue(), invoker.getResult()); } @@ -139,7 +139,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) void SemanticHandler::handleL3(framing::AMQMethodBody* method) { if (!invoke(*this, *method)) - throw ConnectionException(540, "Not implemented"); + throw NotImplementedException("Not implemented"); } void SemanticHandler::handleContent(AMQFrame& frame) diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 1f7436da94..e0e4315d03 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -31,7 +31,6 @@ #include "SessionHandler.h" #include "TxAck.h" #include "TxPublish.h" -#include "qpid/QpidError.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" @@ -116,7 +115,8 @@ void SemanticState::startTx() void SemanticState::commit(MessageStore* const store) { - if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + if (!txBuffer) throw + CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); txBuffer->enlist(txAck); @@ -127,7 +127,8 @@ void SemanticState::commit(MessageStore* const store) void SemanticState::rollback() { - if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + if (!txBuffer) + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); txBuffer->rollback(); accumulatedAck.clear(); @@ -141,7 +142,7 @@ void SemanticState::selectDtx() void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) { if (!dtxSelected) { - throw ConnectionException(503, "Session has not been selected for use with dtx"); + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); } dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); @@ -155,11 +156,12 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) void SemanticState::endDtx(const std::string& xid, bool fail) { if (!dtxBuffer) { - throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid); + throw CommandInvalidException(QPID_MSG("xid " << xid << " not associated with this session")); } if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end")); + } txBuffer.reset();//ops on this session no longer transactional @@ -176,8 +178,8 @@ void SemanticState::endDtx(const std::string& xid, bool fail) void SemanticState::suspendDtx(const std::string& xid) { if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend")); } txBuffer.reset();//ops on this session no longer transactional @@ -188,11 +190,12 @@ void SemanticState::suspendDtx(const std::string& xid) void SemanticState::resumeDtx(const std::string& xid) { if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume")); + } if (!dtxBuffer->isSuspended()) { - throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); + throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended")); } checkDtxTimeout(); diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index ed092d6a05..9b065be8af 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -26,6 +26,8 @@ #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" +#include <boost/bind.hpp> + namespace qpid { namespace broker { using namespace framing; @@ -33,7 +35,9 @@ using namespace std; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : InOutHandler(0, &c.getOutput()), - connection(c), channel(ch), proxy(out), + connection(c), channel(ch, &c.getOutput()), + proxy(out), // Via my own handleOut() for L2 data. + peerSession(channel), // Direct to channel for L2 commands. ignoring(false) {} SessionHandler::~SessionHandler() {} @@ -54,15 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) { try { if (m && invoke(*this, *m)) return; - else if (session.get()) - session->in(f); + else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->in.handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (!ignoring) throw ChannelErrorException( - QPID_MSG("Channel " << channel << " is not open")); + QPID_MSG("Channel " << channel.get() << " is not open")); } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. session.reset(); - getProxy().getSession().closed(e.code, e.toString()); + peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -72,21 +80,22 @@ void SessionHandler::handleIn(AMQFrame& f) { } void SessionHandler::handleOut(AMQFrame& f) { - f.setChannel(getChannel()); - out.next->handle(f); + channel.handle(f); // Send it. + if (session->sent(f)) + peerSession.solicitAck(); } -void SessionHandler::assertOpen(const char* method) { - if (!session.get()) +void SessionHandler::assertAttached(const char* method) const { + if (!session.get()) throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); } -void SessionHandler::assertClosed(const char* method) { +void SessionHandler::assertClosed(const char* method) const { if (session.get()) throw ChannelBusyException( - QPID_MSG(method << " failed: channel " << channel + QPID_MSG(method << " failed: channel " << channel.get() << " is already open.")); } @@ -95,32 +104,38 @@ void SessionHandler::open(uint32_t detachedLifetime) { std::auto_ptr<SessionState> state( connection.broker.getSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); - getProxy().getSession().attached(session->getId(), session->getTimeout()); + peerSession.attached(session->getId(), session->getTimeout()); } void SessionHandler::resume(const Uuid& id) { assertClosed("resume"); - session = connection.broker.getSessionManager().resume(*this, id); - getProxy().getSession().attached(session->getId(), session->getTimeout()); + session = connection.broker.getSessionManager().resume(id); + session->attach(*this); + SequenceNumber seq = session->resuming(); + peerSession.attached(session->getId(), session->getTimeout()); + proxy.getSession().ack(seq, SequenceNumberSet()); } void SessionHandler::flow(bool /*active*/) { + assertAttached("flow"); // FIXME aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException(); + assert(0); throw NotImplementedException("session.flow"); } void SessionHandler::flowOk(bool /*active*/) { + assertAttached("flowOk"); // FIXME aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException(); + assert(0); throw NotImplementedException("session.flowOk"); } void SessionHandler::close() { + assertAttached("close"); QPID_LOG(info, "Received session.close"); ignoring=false; session.reset(); - getProxy().getSession().closed(REPLY_SUCCESS, "ok"); - assert(&connection.getChannel(channel) == this); - connection.closeChannel(channel); + peerSession.closed(REPLY_SUCCESS, "ok"); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); } void SessionHandler::closed(uint16_t replyCode, const string& replyText) { @@ -129,26 +144,43 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) { session.reset(); } +void SessionHandler::localSuspend() { + if (session.get() && session->getState() == SessionState::ATTACHED) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + } +} + void SessionHandler::suspend() { - assertOpen("suspend"); - connection.broker.getSessionManager().suspend(session); - assert(!session.get()); - getProxy().getSession().detached(); - assert(&connection.getChannel(channel) == this); - connection.closeChannel(channel); + assertAttached("suspend"); + localSuspend(); + peerSession.detached(); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); } -void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/, - const SequenceNumberSet& /*seenFrameSet*/) { - assert(0); throw NotImplementedException(); +void SessionHandler::ack(uint32_t cumulativeSeenMark, + const SequenceNumberSet& /*seenFrameSet*/) +{ + assertAttached("ack"); + if (session->getState() == SessionState::RESUMING) { + session->receivedAck(cumulativeSeenMark); + framing::SessionState::Replay replay=session->replay(); + std::for_each(replay.begin(), replay.end(), + boost::bind(&SessionHandler::handleOut, this, _1)); + } + else + session->receivedAck(cumulativeSeenMark); } void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - assert(0); throw NotImplementedException(); + // FIXME aconway 2007-10-02: may be removed from spec. + assert(0); throw NotImplementedException("session.high-water-mark"); } void SessionHandler::solicitAck() { - assert(0); throw NotImplementedException(); + assertAttached("solicit-ack"); + peerSession.ack(session->sendingAck(), SequenceNumberSet()); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 51a65e3092..9a68ddb46f 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -26,6 +26,7 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/ChannelHandler.h" #include <boost/noncopyable.hpp> @@ -52,7 +53,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, SessionState* getSession() { return session.get(); } const SessionState* getSession() const { return session.get(); } - framing::ChannelId getChannel() const { return channel; } + framing::ChannelId getChannel() const { return channel.get(); } Connection& getConnection() { return connection; } const Connection& getConnection() const { return connection; } @@ -60,6 +61,9 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } + // Called by closing connection. + void localSuspend(); + protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); @@ -79,12 +83,14 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, void solicitAck(); - void assertOpen(const char* method); - void assertClosed(const char* method); + void assertAttached(const char* method) const; + void assertActive(const char* method) const; + void assertClosed(const char* method) const; Connection& connection; - const framing::ChannelId channel; + framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; + framing::AMQP_ClientProxy::Session peerSession; bool ignoring; std::auto_ptr<SessionState> session; }; diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 303687c788..f12ebc6db1 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -39,7 +39,7 @@ namespace broker { using namespace sys; using namespace framing; -SessionManager::SessionManager() {} +SessionManager::SessionManager(uint32_t a) : ack(a) {} SessionManager::~SessionManager() {} @@ -47,7 +47,8 @@ std::auto_ptr<SessionState> SessionManager::open( SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); - std::auto_ptr<SessionState> session(new SessionState(*this, h, timeout_)); + std::auto_ptr<SessionState> session( + new SessionState(*this, h, timeout_, ack)); active.insert(session->getId()); return session; } @@ -55,14 +56,13 @@ std::auto_ptr<SessionState> SessionManager::open( void SessionManager::suspend(std::auto_ptr<SessionState> session) { Mutex::ScopedLock l(lock); active.erase(session->getId()); + session->suspend(); session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); - session->handler = 0; suspended.push_back(session.release()); // In expiry order eraseExpired(); } -std::auto_ptr<SessionState> SessionManager::resume( - SessionHandler& sh, const Uuid& id) +std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id) { Mutex::ScopedLock l(lock); eraseExpired(); @@ -78,7 +78,6 @@ std::auto_ptr<SessionState> SessionManager::resume( QPID_MSG("No suspended session with id=" << id)); active.insert(id); std::auto_ptr<SessionState> state(suspended.release(i).release()); - state->handler = &sh; return state; } @@ -94,8 +93,10 @@ void SessionManager::eraseExpired() { Suspended::iterator keep = std::lower_bound( suspended.begin(), suspended.end(), now(), boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2)); - QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); - suspended.erase(suspended.begin(), keep); + if (suspended.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); + suspended.erase(suspended.begin(), keep); + } } } diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index 58a7b3f01f..fa7262252d 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -44,7 +44,7 @@ class SessionHandler; */ class SessionManager : private boost::noncopyable { public: - SessionManager(); + SessionManager(uint32_t ack); ~SessionManager(); /** Open a new active session, caller takes ownership */ std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_); @@ -57,18 +57,20 @@ class SessionManager : private boost::noncopyable { /** Resume a suspended session. *@throw Exception if timed out or non-existant. */ - std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&); + std::auto_ptr<SessionState> resume(const framing::Uuid&); private: typedef boost::ptr_vector<SessionState> Suspended; typedef std::set<framing::Uuid> Active; + void erase(const framing::Uuid&); + void eraseExpired(); + sys::Mutex lock; Suspended suspended; Active active; - - void erase(const framing::Uuid&); - void eraseExpired(); + uint32_t ack; + friend class SessionState; // removes deleted sessions from active set. }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 17537e11be..45d78c9307 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -31,22 +31,25 @@ namespace broker { using namespace framing; -SessionState::SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_) - : factory(f), handler(&h), id(true), timeout(timeout_), - broker(h.getConnection().broker), - version(h.getConnection().getVersion()) -{ - // FIXME aconway 2007-09-21: Break dependnecy - broker updates session. - chain.push_back(new SemanticHandler(*this)); - in = &chain[0]; // Incoming frame to handler chain. - out = &handler->out; // Outgoing frames to SessionHandler +void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); } - // FIXME aconway 2007-09-20: use broker to add plugin - // handlers to the chain. - // FIXME aconway 2007-08-31: Shouldn't be passing channel ID. - broker.update(handler->getChannel(), *this); +void SessionState::handleOut(AMQFrame& f) { + assert(handler); + handler->out.handle(f); } +SessionState::SessionState( + SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack) + : framing::SessionState(ack), + factory(f), handler(&h), id(true), timeout(timeout_), + broker(h.getConnection().broker), + version(h.getConnection().getVersion()), + semanticHandler(new SemanticHandler(*this)) +{ + // FIXME aconway 2007-09-20: SessionManager may add plugin + // handlers to the chain. + } + SessionState::~SessionState() { // Remove ID from active session list. factory.erase(getId()); @@ -65,4 +68,12 @@ Connection& SessionState::getConnection() { return getHandler().getConnection(); } +void SessionState::detach() { + handler = 0; +} + +void SessionState::attach(SessionHandler& h) { + handler = &h; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index d152937692..eed088af31 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -24,11 +24,12 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SessionState.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Time.h" -#include <boost/ptr_container/ptr_vector.hpp> #include <boost/noncopyable.hpp> +#include <boost/scoped_ptr.hpp> #include <set> #include <vector> @@ -42,31 +43,26 @@ class AMQP_ClientProxy; namespace broker { +class SemanticHandler; class SessionHandler; class SessionManager; class Broker; class Connection; /** - * State of a session. - * - * An attached session has a SessionHandler which is attached to a - * connection. A suspended session has no handler. - * - * A SessionState is always associated with an open session (attached or - * suspended) it is destroyed when the session is closed. - * - * The SessionState includes the sessions handler chains, which may - * themselves have state. The handlers will be preserved as long as - * the session is alive. + * Broker-side session state includes sessions handler chains, which may + * themselves have state. */ -class SessionState : public framing::FrameHandler::Chains, - private boost::noncopyable +class SessionState : public framing::SessionState, + public framing::FrameHandler::InOutHandler { public: ~SessionState(); bool isAttached() { return handler; } + void detach(); + void attach(SessionHandler& handler); + /** @pre isAttached() */ SessionHandler& getHandler(); @@ -76,23 +72,30 @@ class SessionState : public framing::FrameHandler::Chains, /** @pre isAttached() */ Connection& getConnection(); - const framing::Uuid& getId() const { return id; } uint32_t getTimeout() const { return timeout; } Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } + + protected: + void handleIn(framing::AMQFrame&); + void handleOut(framing::AMQFrame&); private: - /** Only SessionManager can open sessions */ - SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_); - + // SessionManager creates sessions. + SessionState(SessionManager&, + SessionHandler& out, + uint32_t timeout, + uint32_t ackInterval); + SessionManager& factory; SessionHandler* handler; framing::Uuid id; uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. Broker& broker; - boost::ptr_vector<framing::FrameHandler> chain; framing::ProtocolVersion version; + + boost::scoped_ptr<SemanticHandler> semanticHandler; friend class SessionManager; }; diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp index be75346578..14727b3b35 100644 --- a/cpp/src/qpid/broker/Timer.cpp +++ b/cpp/src/qpid/broker/Timer.cpp @@ -73,17 +73,14 @@ void Timer::start() Monitor::ScopedLock l(monitor); if (!active) { active = true; - runner = std::auto_ptr<Thread>(new Thread(this)); + runner = Thread(this); } } void Timer::stop() { signalStop(); - if (runner.get()) { - runner->join(); - runner.reset(); - } + runner.join(); } void Timer::signalStop() { diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h index c70ffeaedc..e89ae499b7 100644 --- a/cpp/src/qpid/broker/Timer.h +++ b/cpp/src/qpid/broker/Timer.h @@ -53,7 +53,7 @@ class Timer : private qpid::sys::Runnable { qpid::sys::Monitor monitor; std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks; - std::auto_ptr<qpid::sys::Thread> runner; + qpid::sys::Thread runner; bool active; void run(); |
