summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
committerAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
commitf61e1ef7589da893b9b54448224dc0961515eb40 (patch)
tree258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid/broker
parentc5294d471ade7a18c52ca7d4028a494011c82293 (diff)
downloadqpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network failure are automatically re-transmitted for transparent re-connection. client::Session improvements: - Locking to avoid races between network & user threads. - Replaced client::StateManager with sys::StateMonitor - avoid heap allocation. qpid::Exception clean up: - use QPID_MSG consistently to format exception messages. - throw typed exceptions (in reply_exceptions.h) for AMQP exceptions. - re-throw correct typed exception on client for exceptions from broker. - Removed QpidError.h rubygen/templates/constants.rb: - constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants. - reply_constants.h: Added throwReplyException(code, text) log::Logger: - Fixed shutdown race in Statement::~Initializer() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp21
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp9
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/Daemon.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp6
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp12
-rw-r--r--cpp/src/qpid/broker/DtxTimeout.h7
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp12
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp7
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp26
-rw-r--r--cpp/src/qpid/broker/Queue.cpp14
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp27
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp92
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h14
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionManager.h12
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp37
-rw-r--r--cpp/src/qpid/broker/SessionState.h41
-rw-r--r--cpp/src/qpid/broker/Timer.cpp7
-rw-r--r--cpp/src/qpid/broker/Timer.h2
25 files changed, 222 insertions, 166 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index e53774740a..b88f1c6c6a 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -64,7 +64,8 @@ Broker::Options::Options(const std::string& name) :
storeDir("/var"),
storeAsync(false),
enableMgmt(0),
- mgmtPubInterval(10)
+ mgmtPubInterval(10),
+ ack(100)
{
addOptions()
("port,p", optValue(port,"PORT"),
@@ -102,7 +103,8 @@ Broker::Broker(const Broker::Options& conf) :
queues(store.get()),
stagingThreshold(0),
factory(*this),
- dtxManager(store.get())
+ dtxManager(store.get()),
+ sessionManager(conf.ack)
{
if(conf.enableMgmt){
managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 2018371624..817197a351 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -69,6 +69,7 @@ class Broker : public sys::Runnable, public Plugin::Target
bool storeAsync;
bool enableMgmt;
uint16_t mgmtPubInterval;
+ uint32_t ack;
};
virtual ~Broker();
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 99b585406e..dad40868d6 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -21,6 +21,7 @@
#include "MessageDelivery.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace broker {
@@ -75,8 +76,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri
checkAlternate(response.first, alternate);
}
}catch(UnknownExchangeTypeException& e){
- throw ConnectionException(
- 503, "Exchange type not implemented: " + type);
+ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
}
}
}
@@ -84,24 +84,23 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri
void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
{
if (!type.empty() && exchange->getType() != type) {
- throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type);
+ throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type));
}
}
void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
{
- if (alternate && alternate != exchange->getAlternate()) {
- throw ConnectionException(530, "Exchange declared with alternate-exchange "
- + exchange->getAlternate()->getName() + ", requested "
- + alternate->getName());
- }
-
+ if (alternate && alternate != exchange->getAlternate())
+ throw NotAllowedException(
+ QPID_MSG("Exchange declared with alternate-exchange "
+ << exchange->getAlternate()->getName() << ", requested "
+ << alternate->getName()));
}
void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
//TODO: implement unused
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
+ if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
getBroker().getExchanges().destroy(name);
@@ -292,7 +291,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
Queue::shared_ptr queue = state.getQueue(queueName);
if(!consumerTag.empty() && state.exists(consumerTag)){
- throw ConnectionException(530, "Consumer tags must be unique");
+ throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
}
string newTag = consumerTag;
//need to generate name here, so we have it for the adapter (it is
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 5537dc67f5..706b42c080 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -82,7 +82,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
throw framing::NotImplementedException("Tunnel class not implemented"); }
// Handlers no longer implemented in BrokerAdapter:
-#define BADHANDLER() assert(0); throw framing::InternalErrorException()
+#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
SessionHandler* getSessionHandler() { BADHANDLER(); }
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ca0ca20849..f981d47ef7 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -28,9 +28,10 @@
#include "BrokerAdapter.h"
#include "SemanticHandler.h"
-#include <boost/utility/in_place_factory.hpp>
#include <boost/bind.hpp>
+#include <algorithm>
+
using namespace boost;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -61,6 +62,7 @@ void Connection::close(
ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
adapter.close(code, text, classId, methodId);
+ channels.clear();
getOutput().close();
}
@@ -73,8 +75,11 @@ void Connection::idleOut(){}
void Connection::idleIn(){}
-void Connection::closed(){
+void Connection::closed(){ // Physically closed, suspend open sessions.
try {
+ std::for_each(
+ channels.begin(), channels.end(),
+ boost::bind(&SessionHandler::localSuspend, _1));
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index f697986194..dd645b595e 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -46,9 +46,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
AMQMethodBody* method=frame.getBody()->getMethod();
try{
if (!invoke(*handler.get(), *method))
- throw ConnectionException(503, "Class can't be accessed over channel 0");
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
}catch(ConnectionException& e){
- handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp
index 0bb3449289..3fcc487324 100644
--- a/cpp/src/qpid/broker/Daemon.cpp
+++ b/cpp/src/qpid/broker/Daemon.cpp
@@ -17,7 +17,7 @@
*/
#include "Daemon.h"
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include <boost/iostreams/stream.hpp>
#include <boost/iostreams/device/file_descriptor.hpp>
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 5887d13f85..ec042ff56a 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -44,7 +44,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (fail) {
state.endDtx(xid, true);
if (suspend) {
- throw ConnectionException(503, "End and suspend cannot both be set.");
+ throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
return DtxDemarcationEndResult(XA_RBROLLBACK);
}
@@ -67,7 +67,7 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
bool resume)
{
if (join && resume) {
- throw ConnectionException(503, "Join and resume cannot both be set.");
+ throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
}
try {
if (resume) {
@@ -161,7 +161,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
const string& xid)
{
//Currently no heuristic completion is supported, so this should never be used.
- throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
}
DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 0d211017de..0597b41f98 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -20,16 +20,20 @@
*/
#include "DtxManager.h"
#include "DtxTimeout.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <iostream>
using qpid::sys::Mutex;
using namespace qpid::broker;
+using namespace qpid::framing;
DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {}
-DtxManager::~DtxManager() {}
+DtxManager::~DtxManager() {
+ // timer.stop(); // FIXME aconway 2007-10-23: leaking threads.
+}
void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops)
{
@@ -84,7 +88,7 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
}
return i;
}
@@ -94,7 +98,7 @@ void DtxManager::remove(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
} else {
work.erase(i);
}
@@ -105,7 +109,7 @@ DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i != work.end()) {
- throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid);
+ throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
} else {
return work.insert(xid, new DtxWorkRecord(xid, store)).first;
}
diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h
index 33da62e7f4..7d0b8622d0 100644
--- a/cpp/src/qpid/broker/DtxTimeout.h
+++ b/cpp/src/qpid/broker/DtxTimeout.h
@@ -29,12 +29,7 @@ namespace broker {
class DtxManager;
-
-struct DtxTimeoutException : public Exception
-{
- DtxTimeoutException() {}
-};
-
+struct DtxTimeoutException : public Exception {};
struct DtxTimeout : public TimerTask
{
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index f2f118c5e4..fe9e42ca32 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -19,12 +19,14 @@
*
*/
#include "DtxWorkRecord.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
#include <boost/mem_fn.hpp>
using boost::mem_fn;
using qpid::sys::Mutex;
using namespace qpid::broker;
+using namespace qpid::framing;
DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
@@ -71,8 +73,7 @@ bool DtxWorkRecord::commit(bool onePhase)
if (prepared) {
//already prepared i.e. 2pc
if (onePhase) {
- throw ConnectionException(503,
- boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!"));
}
store->commit(*txn);
@@ -83,8 +84,7 @@ bool DtxWorkRecord::commit(bool onePhase)
} else {
//1pc commit optimisation, don't need a 2pc transaction context:
if (!onePhase) {
- throw ConnectionException(503,
- boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
}
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
@@ -119,7 +119,7 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
throw DtxTimeoutException();
}
if (completed) {
- throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
}
work.push_back(ops);
}
@@ -133,7 +133,7 @@ bool DtxWorkRecord::check()
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
if (!(*i)->isEnded()) {
- throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!"));
} else if ((*i)->isRollbackOnly()) {
rolledback = true;
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index ae1afe5abb..98e3cc7347 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -24,6 +24,7 @@
#include "HeadersExchange.h"
#include "TopicExchange.h"
#include "ManagementExchange.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -75,9 +76,8 @@ void ExchangeRegistry::destroy(const string& name){
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end()) {
- throw ChannelException(404, "Exchange not found: " + name);
- }
+ if (i == exchanges.end())
+ throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
return i->second;
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 215a002517..dd688cdfcf 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -20,7 +20,7 @@
*/
#include "HeadersExchange.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/QpidError.h"
+#include "qpid/framing/reply_exceptions.h"
#include <algorithm>
@@ -46,9 +46,8 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
FieldTable::ValuePtr what = args->get(x_match);
- if (!what || (*what != all && *what != any)) {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
- }
+ if (!what || (*what != all && *what != any))
+ throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
Binding binding(*args, queue);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), binding);
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index b12910893a..834ce0a203 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -16,7 +16,7 @@
*
*/
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "MessageHandlerImpl.h"
#include "qpid/framing/FramingContent.h"
@@ -56,39 +56,39 @@ MessageHandlerImpl::cancel(const string& destination )
void
MessageHandlerImpl::open(const string& /*reference*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::close(const string& /*reference*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::checkpoint(const string& /*reference*/,
const string& /*identifier*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::resume(const string& /*reference*/,
const string& /*identifier*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::offset(uint64_t /*value*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
@@ -97,19 +97,19 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& /*destination*/,
bool /*noAck*/ )
{
- throw ConnectionException(540, "get no longer supported");
+ throw NotImplementedException("get no longer supported");
}
void
MessageHandlerImpl::empty()
{
- throw ConnectionException(540, "empty no longer supported");
+ throw NotImplementedException("empty no longer supported");
}
void
MessageHandlerImpl::ok()
{
- throw ConnectionException(540, "Message.Ok no longer supported");
+ throw NotImplementedException("Message.Ok no longer supported");
}
void
@@ -134,7 +134,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
{
Queue::shared_ptr queue = state.getQueue(queueName);
if(!destination.empty() && state.exists(destination))
- throw ConnectionException(530, "Consumer tags must be unique");
+ throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
string tag = destination;
state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
@@ -165,7 +165,7 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i
state.addByteCredit(destination, value);
} else {
//unknown
- throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
+ throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
}
}
@@ -179,7 +179,7 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
//window
state.setWindowMode(destination);
} else{
- throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
+ throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
}
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 116e8d9431..18c1ab1056 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -19,9 +19,8 @@
*
*/
-#include <boost/format.hpp>
-
#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#include "Broker.h"
#include "Queue.h"
#include "Exchange.h"
@@ -37,7 +36,6 @@
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
-using boost::format;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -269,17 +267,15 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
void Queue::consume(Consumer::ptr c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
if(exclusive) {
- throw ChannelException(
- 403, format("Queue '%s' has an exclusive consumer."
- " No more consumers allowed.") % getName());
+ throw AccessRefusedException(
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
}
if(requestExclusive) {
if(acquirers.empty() && browsers.empty()) {
exclusive = c;
} else {
- throw ChannelException(
- 403, format("Queue '%s' already has consumers."
- "Exclusive access denied.") % getName());
+ throw AccessRefusedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
}
}
if (c->preAcquires()) {
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 8535dc6a60..e1a8ae470d 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -125,7 +125,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
incoming.complete(id);
if (!invoker.wasHandled()) {
- throw ConnectionException(540, "Not implemented");
+ throw NotImplementedException("Not implemented");
} else if (invoker.hasResult()) {
session.getProxy().getExecution().result(id.getValue(), invoker.getResult());
}
@@ -139,7 +139,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
if (!invoke(*this, *method))
- throw ConnectionException(540, "Not implemented");
+ throw NotImplementedException("Not implemented");
}
void SemanticHandler::handleContent(AMQFrame& frame)
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 1f7436da94..e0e4315d03 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -31,7 +31,6 @@
#include "SessionHandler.h"
#include "TxAck.h"
#include "TxPublish.h"
-#include "qpid/QpidError.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -116,7 +115,8 @@ void SemanticState::startTx()
void SemanticState::commit(MessageStore* const store)
{
- if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+ if (!txBuffer) throw
+ CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
txBuffer->enlist(txAck);
@@ -127,7 +127,8 @@ void SemanticState::commit(MessageStore* const store)
void SemanticState::rollback()
{
- if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+ if (!txBuffer)
+ throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
txBuffer->rollback();
accumulatedAck.clear();
@@ -141,7 +142,7 @@ void SemanticState::selectDtx()
void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
{
if (!dtxSelected) {
- throw ConnectionException(503, "Session has not been selected for use with dtx");
+ throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
}
dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
@@ -155,11 +156,12 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
void SemanticState::endDtx(const std::string& xid, bool fail)
{
if (!dtxBuffer) {
- throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
+ throw CommandInvalidException(QPID_MSG("xid " << xid << " not associated with this session"));
}
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end"));
+
}
txBuffer.reset();//ops on this session no longer transactional
@@ -176,8 +178,8 @@ void SemanticState::endDtx(const std::string& xid, bool fail)
void SemanticState::suspendDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend"));
}
txBuffer.reset();//ops on this session no longer transactional
@@ -188,11 +190,12 @@ void SemanticState::suspendDtx(const std::string& xid)
void SemanticState::resumeDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume"));
+
}
if (!dtxBuffer->isSuspended()) {
- throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+ throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended"));
}
checkDtxTimeout();
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index ed092d6a05..9b065be8af 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -26,6 +26,8 @@
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+
namespace qpid {
namespace broker {
using namespace framing;
@@ -33,7 +35,9 @@ using namespace std;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: InOutHandler(0, &c.getOutput()),
- connection(c), channel(ch), proxy(out),
+ connection(c), channel(ch, &c.getOutput()),
+ proxy(out), // Via my own handleOut() for L2 data.
+ peerSession(channel), // Direct to channel for L2 commands.
ignoring(false) {}
SessionHandler::~SessionHandler() {}
@@ -54,15 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) {
try {
if (m && invoke(*this, *m))
return;
- else if (session.get())
- session->in(f);
+ else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->in.handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ }
else if (!ignoring)
throw ChannelErrorException(
- QPID_MSG("Channel " << channel << " is not open"));
+ QPID_MSG("Channel " << channel.get() << " is not open"));
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
session.reset();
- getProxy().getSession().closed(e.code, e.toString());
+ peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -72,21 +80,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
void SessionHandler::handleOut(AMQFrame& f) {
- f.setChannel(getChannel());
- out.next->handle(f);
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
}
-void SessionHandler::assertOpen(const char* method) {
- if (!session.get())
+void SessionHandler::assertAttached(const char* method) const {
+ if (!session.get())
throw ChannelErrorException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
}
-void SessionHandler::assertClosed(const char* method) {
+void SessionHandler::assertClosed(const char* method) const {
if (session.get())
throw ChannelBusyException(
- QPID_MSG(method << " failed: channel " << channel
+ QPID_MSG(method << " failed: channel " << channel.get()
<< " is already open."));
}
@@ -95,32 +104,38 @@ void SessionHandler::open(uint32_t detachedLifetime) {
std::auto_ptr<SessionState> state(
connection.broker.getSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
- getProxy().getSession().attached(session->getId(), session->getTimeout());
+ peerSession.attached(session->getId(), session->getTimeout());
}
void SessionHandler::resume(const Uuid& id) {
assertClosed("resume");
- session = connection.broker.getSessionManager().resume(*this, id);
- getProxy().getSession().attached(session->getId(), session->getTimeout());
+ session = connection.broker.getSessionManager().resume(id);
+ session->attach(*this);
+ SequenceNumber seq = session->resuming();
+ peerSession.attached(session->getId(), session->getTimeout());
+ proxy.getSession().ack(seq, SequenceNumberSet());
}
void SessionHandler::flow(bool /*active*/) {
+ assertAttached("flow");
// FIXME aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException();
+ assert(0); throw NotImplementedException("session.flow");
}
void SessionHandler::flowOk(bool /*active*/) {
+ assertAttached("flowOk");
// FIXME aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException();
+ assert(0); throw NotImplementedException("session.flowOk");
}
void SessionHandler::close() {
+ assertAttached("close");
QPID_LOG(info, "Received session.close");
ignoring=false;
session.reset();
- getProxy().getSession().closed(REPLY_SUCCESS, "ok");
- assert(&connection.getChannel(channel) == this);
- connection.closeChannel(channel);
+ peerSession.closed(REPLY_SUCCESS, "ok");
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
}
void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
@@ -129,26 +144,43 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
session.reset();
}
+void SessionHandler::localSuspend() {
+ if (session.get() && session->getState() == SessionState::ATTACHED) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ }
+}
+
void SessionHandler::suspend() {
- assertOpen("suspend");
- connection.broker.getSessionManager().suspend(session);
- assert(!session.get());
- getProxy().getSession().detached();
- assert(&connection.getChannel(channel) == this);
- connection.closeChannel(channel);
+ assertAttached("suspend");
+ localSuspend();
+ peerSession.detached();
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
}
-void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
- const SequenceNumberSet& /*seenFrameSet*/) {
- assert(0); throw NotImplementedException();
+void SessionHandler::ack(uint32_t cumulativeSeenMark,
+ const SequenceNumberSet& /*seenFrameSet*/)
+{
+ assertAttached("ack");
+ if (session->getState() == SessionState::RESUMING) {
+ session->receivedAck(cumulativeSeenMark);
+ framing::SessionState::Replay replay=session->replay();
+ std::for_each(replay.begin(), replay.end(),
+ boost::bind(&SessionHandler::handleOut, this, _1));
+ }
+ else
+ session->receivedAck(cumulativeSeenMark);
}
void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- assert(0); throw NotImplementedException();
+ // FIXME aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.high-water-mark");
}
void SessionHandler::solicitAck() {
- assert(0); throw NotImplementedException();
+ assertAttached("solicit-ack");
+ peerSession.ack(session->sendingAck(), SequenceNumberSet());
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 51a65e3092..9a68ddb46f 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -26,6 +26,7 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/ChannelHandler.h"
#include <boost/noncopyable.hpp>
@@ -52,7 +53,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- framing::ChannelId getChannel() const { return channel; }
+ framing::ChannelId getChannel() const { return channel.get(); }
Connection& getConnection() { return connection; }
const Connection& getConnection() const { return connection; }
@@ -60,6 +61,9 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+ // Called by closing connection.
+ void localSuspend();
+
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
@@ -79,12 +83,14 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
void solicitAck();
- void assertOpen(const char* method);
- void assertClosed(const char* method);
+ void assertAttached(const char* method) const;
+ void assertActive(const char* method) const;
+ void assertClosed(const char* method) const;
Connection& connection;
- const framing::ChannelId channel;
+ framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
+ framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
};
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index 303687c788..f12ebc6db1 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -39,7 +39,7 @@ namespace broker {
using namespace sys;
using namespace framing;
-SessionManager::SessionManager() {}
+SessionManager::SessionManager(uint32_t a) : ack(a) {}
SessionManager::~SessionManager() {}
@@ -47,7 +47,8 @@ std::auto_ptr<SessionState> SessionManager::open(
SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
- std::auto_ptr<SessionState> session(new SessionState(*this, h, timeout_));
+ std::auto_ptr<SessionState> session(
+ new SessionState(*this, h, timeout_, ack));
active.insert(session->getId());
return session;
}
@@ -55,14 +56,13 @@ std::auto_ptr<SessionState> SessionManager::open(
void SessionManager::suspend(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
active.erase(session->getId());
+ session->suspend();
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
- session->handler = 0;
suspended.push_back(session.release()); // In expiry order
eraseExpired();
}
-std::auto_ptr<SessionState> SessionManager::resume(
- SessionHandler& sh, const Uuid& id)
+std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id)
{
Mutex::ScopedLock l(lock);
eraseExpired();
@@ -78,7 +78,6 @@ std::auto_ptr<SessionState> SessionManager::resume(
QPID_MSG("No suspended session with id=" << id));
active.insert(id);
std::auto_ptr<SessionState> state(suspended.release(i).release());
- state->handler = &sh;
return state;
}
@@ -94,8 +93,10 @@ void SessionManager::eraseExpired() {
Suspended::iterator keep = std::lower_bound(
suspended.begin(), suspended.end(), now(),
boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
- QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
- suspended.erase(suspended.begin(), keep);
+ if (suspended.begin() != keep) {
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+ suspended.erase(suspended.begin(), keep);
+ }
}
}
diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h
index 58a7b3f01f..fa7262252d 100644
--- a/cpp/src/qpid/broker/SessionManager.h
+++ b/cpp/src/qpid/broker/SessionManager.h
@@ -44,7 +44,7 @@ class SessionHandler;
*/
class SessionManager : private boost::noncopyable {
public:
- SessionManager();
+ SessionManager(uint32_t ack);
~SessionManager();
/** Open a new active session, caller takes ownership */
std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
@@ -57,18 +57,20 @@ class SessionManager : private boost::noncopyable {
/** Resume a suspended session.
*@throw Exception if timed out or non-existant.
*/
- std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&);
+ std::auto_ptr<SessionState> resume(const framing::Uuid&);
private:
typedef boost::ptr_vector<SessionState> Suspended;
typedef std::set<framing::Uuid> Active;
+ void erase(const framing::Uuid&);
+ void eraseExpired();
+
sys::Mutex lock;
Suspended suspended;
Active active;
-
- void erase(const framing::Uuid&);
- void eraseExpired();
+ uint32_t ack;
+
friend class SessionState; // removes deleted sessions from active set.
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 17537e11be..45d78c9307 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -31,22 +31,25 @@ namespace broker {
using namespace framing;
-SessionState::SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_)
- : factory(f), handler(&h), id(true), timeout(timeout_),
- broker(h.getConnection().broker),
- version(h.getConnection().getVersion())
-{
- // FIXME aconway 2007-09-21: Break dependnecy - broker updates session.
- chain.push_back(new SemanticHandler(*this));
- in = &chain[0]; // Incoming frame to handler chain.
- out = &handler->out; // Outgoing frames to SessionHandler
+void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
- // FIXME aconway 2007-09-20: use broker to add plugin
- // handlers to the chain.
- // FIXME aconway 2007-08-31: Shouldn't be passing channel ID.
- broker.update(handler->getChannel(), *this);
+void SessionState::handleOut(AMQFrame& f) {
+ assert(handler);
+ handler->out.handle(f);
}
+SessionState::SessionState(
+ SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack)
+ : framing::SessionState(ack),
+ factory(f), handler(&h), id(true), timeout(timeout_),
+ broker(h.getConnection().broker),
+ version(h.getConnection().getVersion()),
+ semanticHandler(new SemanticHandler(*this))
+{
+ // FIXME aconway 2007-09-20: SessionManager may add plugin
+ // handlers to the chain.
+ }
+
SessionState::~SessionState() {
// Remove ID from active session list.
factory.erase(getId());
@@ -65,4 +68,12 @@ Connection& SessionState::getConnection() {
return getHandler().getConnection();
}
+void SessionState::detach() {
+ handler = 0;
+}
+
+void SessionState::attach(SessionHandler& h) {
+ handler = &h;
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index d152937692..eed088af31 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -24,11 +24,12 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SessionState.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Time.h"
-#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
#include <set>
#include <vector>
@@ -42,31 +43,26 @@ class AMQP_ClientProxy;
namespace broker {
+class SemanticHandler;
class SessionHandler;
class SessionManager;
class Broker;
class Connection;
/**
- * State of a session.
- *
- * An attached session has a SessionHandler which is attached to a
- * connection. A suspended session has no handler.
- *
- * A SessionState is always associated with an open session (attached or
- * suspended) it is destroyed when the session is closed.
- *
- * The SessionState includes the sessions handler chains, which may
- * themselves have state. The handlers will be preserved as long as
- * the session is alive.
+ * Broker-side session state includes sessions handler chains, which may
+ * themselves have state.
*/
-class SessionState : public framing::FrameHandler::Chains,
- private boost::noncopyable
+class SessionState : public framing::SessionState,
+ public framing::FrameHandler::InOutHandler
{
public:
~SessionState();
bool isAttached() { return handler; }
+ void detach();
+ void attach(SessionHandler& handler);
+
/** @pre isAttached() */
SessionHandler& getHandler();
@@ -76,23 +72,30 @@ class SessionState : public framing::FrameHandler::Chains,
/** @pre isAttached() */
Connection& getConnection();
- const framing::Uuid& getId() const { return id; }
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
framing::ProtocolVersion getVersion() const { return version; }
+
+ protected:
+ void handleIn(framing::AMQFrame&);
+ void handleOut(framing::AMQFrame&);
private:
- /** Only SessionManager can open sessions */
- SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_);
-
+ // SessionManager creates sessions.
+ SessionState(SessionManager&,
+ SessionHandler& out,
+ uint32_t timeout,
+ uint32_t ackInterval);
+
SessionManager& factory;
SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
Broker& broker;
- boost::ptr_vector<framing::FrameHandler> chain;
framing::ProtocolVersion version;
+
+ boost::scoped_ptr<SemanticHandler> semanticHandler;
friend class SessionManager;
};
diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp
index be75346578..14727b3b35 100644
--- a/cpp/src/qpid/broker/Timer.cpp
+++ b/cpp/src/qpid/broker/Timer.cpp
@@ -73,17 +73,14 @@ void Timer::start()
Monitor::ScopedLock l(monitor);
if (!active) {
active = true;
- runner = std::auto_ptr<Thread>(new Thread(this));
+ runner = Thread(this);
}
}
void Timer::stop()
{
signalStop();
- if (runner.get()) {
- runner->join();
- runner.reset();
- }
+ runner.join();
}
void Timer::signalStop()
{
diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h
index c70ffeaedc..e89ae499b7 100644
--- a/cpp/src/qpid/broker/Timer.h
+++ b/cpp/src/qpid/broker/Timer.h
@@ -53,7 +53,7 @@ class Timer : private qpid::sys::Runnable
{
qpid::sys::Monitor monitor;
std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks;
- std::auto_ptr<qpid::sys::Thread> runner;
+ qpid::sys::Thread runner;
bool active;
void run();