summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
committerAlan Conway <aconway@apache.org>2007-10-26 19:48:31 +0000
commitf61e1ef7589da893b9b54448224dc0961515eb40 (patch)
tree258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid
parentc5294d471ade7a18c52ca7d4028a494011c82293 (diff)
downloadqpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network failure are automatically re-transmitted for transparent re-connection. client::Session improvements: - Locking to avoid races between network & user threads. - Replaced client::StateManager with sys::StateMonitor - avoid heap allocation. qpid::Exception clean up: - use QPID_MSG consistently to format exception messages. - throw typed exceptions (in reply_exceptions.h) for AMQP exceptions. - re-throw correct typed exception on client for exceptions from broker. - Removed QpidError.h rubygen/templates/constants.rb: - constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants. - reply_constants.h: Added throwReplyException(code, text) log::Logger: - Fixed shutdown race in Statement::~Initializer() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/Exception.cpp42
-rw-r--r--cpp/src/qpid/Exception.h125
-rw-r--r--cpp/src/qpid/Msg.h2
-rw-r--r--cpp/src/qpid/QpidError.cpp41
-rw-r--r--cpp/src/qpid/QpidError.h79
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp21
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp9
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/Daemon.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp6
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp12
-rw-r--r--cpp/src/qpid/broker/DtxTimeout.h7
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp12
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp6
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp7
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp26
-rw-r--r--cpp/src/qpid/broker/Queue.cpp14
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp4
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp27
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp92
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h14
-rw-r--r--cpp/src/qpid/broker/SessionManager.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionManager.h12
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp37
-rw-r--r--cpp/src/qpid/broker/SessionState.h41
-rw-r--r--cpp/src/qpid/broker/Timer.cpp7
-rw-r--r--cpp/src/qpid/broker/Timer.h2
-rw-r--r--cpp/src/qpid/client/Channel.cpp5
-rw-r--r--cpp/src/qpid/client/Connection.cpp26
-rw-r--r--cpp/src/qpid/client/Connection.h7
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp4
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp58
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h14
-rw-r--r--cpp/src/qpid/client/Connector.cpp2
-rw-r--r--cpp/src/qpid/client/Connector.h1
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/Future.h2
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp2
-rw-r--r--cpp/src/qpid/client/FutureResult.cpp2
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp366
-rw-r--r--cpp/src/qpid/client/SessionCore.h124
-rw-r--r--cpp/src/qpid/client/SessionHandler.cpp132
-rw-r--r--cpp/src/qpid/client/SessionHandler.h63
-rw-r--r--cpp/src/qpid/client/StateManager.cpp2
-rw-r--r--cpp/src/qpid/client/StateManager.h4
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp14
-rw-r--r--cpp/src/qpid/framing/BodyHandler.cpp6
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp16
-rw-r--r--cpp/src/qpid/framing/ChannelHandler.h (renamed from cpp/src/qpid/framing/ProtocolVersionException.h)47
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp5
-rw-r--r--cpp/src/qpid/framing/FieldValue.cpp7
-rw-r--r--cpp/src/qpid/framing/FramingContent.cpp14
-rw-r--r--cpp/src/qpid/framing/Handler.h3
-rw-r--r--cpp/src/qpid/framing/ProtocolVersionException.cpp33
-rw-r--r--cpp/src/qpid/framing/ResumeHandler.cpp56
-rw-r--r--cpp/src/qpid/framing/ResumeHandler.h69
-rw-r--r--cpp/src/qpid/framing/SequenceNumber.h1
-rw-r--r--cpp/src/qpid/framing/SessionState.cpp120
-rw-r--r--cpp/src/qpid/framing/SessionState.h127
-rw-r--r--cpp/src/qpid/framing/TemplateVisitor.h89
-rw-r--r--cpp/src/qpid/framing/TransferContent.cpp12
-rw-r--r--cpp/src/qpid/framing/TransferContent.h7
-rw-r--r--cpp/src/qpid/framing/Uuid.cpp12
-rw-r--r--cpp/src/qpid/framing/Uuid.h3
-rw-r--r--cpp/src/qpid/framing/amqp_framing.h1
-rw-r--r--cpp/src/qpid/framing/amqp_types.h6
-rw-r--r--cpp/src/qpid/framing/variant.h3
-rw-r--r--cpp/src/qpid/log/Logger.cpp6
-rw-r--r--cpp/src/qpid/log/Logger.h5
-rw-r--r--cpp/src/qpid/log/Statement.cpp4
-rw-r--r--cpp/src/qpid/log/Statement.h24
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp3
-rw-r--r--cpp/src/qpid/sys/ConcurrentQueue.h90
-rw-r--r--cpp/src/qpid/sys/ScopedIncrement.h6
-rw-r--r--cpp/src/qpid/sys/Serializer.h4
-rw-r--r--cpp/src/qpid/sys/StateMonitor.h78
-rw-r--r--cpp/src/qpid/sys/Waitable.h73
-rw-r--r--cpp/src/qpid/sys/apr/APRBase.cpp1
-rw-r--r--cpp/src/qpid/sys/apr/APRBase.h8
-rw-r--r--cpp/src/qpid/sys/posix/Shlib.cpp9
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp4
-rw-r--r--cpp/src/qpid/sys/posix/check.cpp39
-rw-r--r--cpp/src/qpid/sys/posix/check.h36
86 files changed, 1343 insertions, 1191 deletions
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp
index 11051d1a2e..e0747df49e 100644
--- a/cpp/src/qpid/Exception.cpp
+++ b/cpp/src/qpid/Exception.cpp
@@ -23,6 +23,7 @@
#include "Exception.h"
#include <typeinfo>
#include <errno.h>
+#include <assert.h>
namespace qpid {
@@ -31,45 +32,22 @@ std::string strError(int err) {
return std::string(strerror_r(err, buf, sizeof(buf)));
}
-static void ctorLog(const std::exception* e) {
- QPID_LOG(trace, "Exception: " << e->what());
+Exception::Exception(const std::string& s) throw() : msg(s) {
+ QPID_LOG(warning, "Exception: " << msg);
}
-
-Exception::Exception() throw() { ctorLog(this); }
-
-Exception::Exception(const std::string& str) throw()
- : whatStr(str) { ctorLog(this); }
-
-Exception::Exception(const char* str) throw() : whatStr(str) { ctorLog(this); }
-
-Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {}
Exception::~Exception() throw() {}
-const char* Exception::what() const throw() { return whatStr.c_str(); }
-
-std::string Exception::toString() const throw() { return whatStr; }
-
-Exception::auto_ptr Exception::clone() const throw() { return Exception::auto_ptr(new Exception(*this)); }
-
-void Exception::throwSelf() const { throw *this; }
-
-ShutdownException::ShutdownException() : Exception("Shut down.") {}
-
-EmptyException::EmptyException() : Exception("Empty.") {}
-
-const char* Exception::defaultMessage = "Unexpected exception";
-
-void Exception::log(const char* what, const char* message) {
- QPID_LOG(error, message << ": " << what);
+std::string Exception::str() const throw() {
+ if (msg.empty())
+ const_cast<std::string&>(msg).assign(typeid(*this).name());
+ return msg;
}
-void Exception::log(const std::exception& e, const char* message) {
- log(e.what(), message);
-}
+const char* Exception::what() const throw() { return str().c_str(); }
-void Exception::logUnknown(const char* message) {
- log("unknown exception.", message);
+std::auto_ptr<Exception> Exception::clone() const throw() {
+ return std::auto_ptr<Exception>(new Exception(*this));
}
} // namespace qpid
diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h
index a7ab1fa8aa..bf6c1fb872 100644
--- a/cpp/src/qpid/Exception.h
+++ b/cpp/src/qpid/Exception.h
@@ -24,135 +24,52 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/Msg.h"
-#include <exception>
-#include <string>
+
#include <memory>
-#include <boost/shared_ptr.hpp>
-#include <boost/lexical_cast.hpp>
-#include <boost/function.hpp>
+#include <string>
namespace qpid
{
-/** Get the error message for error number err. */
+/** Get the error message for a system number err, e.g. errno. */
std::string strError(int err);
/**
- * Exception base class for all Qpid exceptions.
+ * Base class for Qpid runtime exceptions.
*/
class Exception : public std::exception
{
- protected:
- std::string whatStr;
-
public:
- typedef boost::shared_ptr<Exception> shared_ptr;
- typedef boost::shared_ptr<const Exception> shared_ptr_const;
- typedef std::auto_ptr<Exception> auto_ptr;
-
- Exception() throw();
- Exception(const std::string& str) throw();
- Exception(const char* str) throw();
- Exception(const std::exception&) throw();
-
- /** Allow any type that has ostream operator<< to act as message */
- template <class T>
- Exception(const T& message)
- : whatStr(boost::lexical_cast<std::string>(message)) {}
-
+ explicit Exception(const std::string& str=std::string()) throw();
virtual ~Exception() throw();
-
- virtual const char* what() const throw();
- virtual std::string toString() const throw();
-
- virtual auto_ptr clone() const throw();
- virtual void throwSelf() const;
-
- /** Default message: "Unknown exception" or something like it. */
- static const char* defaultMessage;
-
- /**
- * Log a message of the form "message: what"
- *@param what Exception's what() message.
- *@param message Prefix message.
- */
- static void log(const char* what, const char* message = defaultMessage);
-
- /**
- * Log an exception.
- *@param e Exception to log.
-
- */
- static void log(
- const std::exception& e, const char* message = defaultMessage);
-
- /**
- * Log an unknown exception - use in catch(...)
- *@param message Prefix message.
- */
- static void logUnknown(const char* message = defaultMessage);
-
- /**
- * Wrapper template function to call another function inside
- * try/catch and log any exception. Use boost::bind to wrap
- * member function calls or functions with arguments.
- *
- *@param f Function to call in try block.
- *@param retrhow If true the exception is rethrown.
- *@param message Prefix message.
- */
- template <class T>
- static T tryCatchLog(boost::function0<T> f, bool rethrow=true,
- const char* message=defaultMessage)
- {
- try {
- return f();
- }
- catch (const std::exception& e) {
- log(e, message);
- if (rethrow)
- throw;
- }
- catch (...) {
- logUnknown(message);
- if (rethrow)
- throw;
- }
- }
+ virtual const char *what() const throw();
+ virtual std::auto_ptr<Exception> clone() const throw();
+ virtual std::string str() const throw();
+ private:
+ std::string msg;
};
struct ChannelException : public Exception {
- framing::ReplyCode code;
- template <class T>
- ChannelException(framing::ReplyCode code_, const T& message)
+ const framing::ReplyCode code;
+ ChannelException(framing::ReplyCode code_, const std::string& message)
: Exception(message), code(code_) {}
- void throwSelf() const { throw *this; }
};
struct ConnectionException : public Exception {
- framing::ReplyCode code;
- template <class T>
- ConnectionException(framing::ReplyCode code_, const T& message)
+ const framing::ReplyCode code;
+ ConnectionException(framing::ReplyCode code_, const std::string& message)
: Exception(message), code(code_) {}
- void throwSelf() const { throw *this; }
};
-/**
- * Exception used to indicate that a thread should shut down.
- * Does not indicate an error that should be signalled to the user.
+/** Clone an exception.
+ * For qpid::Exception this calls the clone member function.
+ * For standard exceptions, uses the copy constructor.
+ * For unknown exception types creates a std::exception
+ * with the same what() string.
*/
-struct ShutdownException : public Exception {
- ShutdownException();
- void throwSelf() const { throw *this; }
-};
-
-/** Exception to indicate empty queue or other empty state */
-struct EmptyException : public Exception {
- EmptyException();
- void throwSelf() const { throw *this; }
-};
+std::auto_ptr<std::exception> clone(const std::exception&);
-}
+} // namespace qpid
#endif /*!_Exception_*/
diff --git a/cpp/src/qpid/Msg.h b/cpp/src/qpid/Msg.h
index c1a6b54d05..7214db611f 100644
--- a/cpp/src/qpid/Msg.h
+++ b/cpp/src/qpid/Msg.h
@@ -54,7 +54,7 @@ inline std::ostream& operator<<(std::ostream& o, const Msg& m) {
}
/** Construct a message using operator << and append (file:line) */
-#define QPID_MSG(message) Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")"
+#define QPID_MSG(message) ::qpid::Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")"
} // namespace qpid
diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp
deleted file mode 100644
index 740ec24e54..0000000000
--- a/cpp/src/qpid/QpidError.cpp
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <boost/format.hpp>
-
-#include "QpidError.h"
-#include <sstream>
-
-using namespace qpid;
-
-QpidError::QpidError() : code(0) {}
-
-QpidError::~QpidError() throw() {}
-
-Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_ptr(new QpidError(*this)); }
-
-void QpidError::throwSelf() const { throw *this; }
-
-std::string QpidError::message(int code, const std::string& msg, const char* file, int line) {
- return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str();
-}
-
-
diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h
deleted file mode 100644
index 2ff6571365..0000000000
--- a/cpp/src/qpid/QpidError.h
+++ /dev/null
@@ -1,79 +0,0 @@
-#ifndef __QpidError__
-#define __QpidError__
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include <memory>
-#include <ostream>
-
-#include "Exception.h"
-
-namespace qpid {
-
-struct SrcLine {
- public:
- SrcLine(const std::string& file_="", int line_=0) :
- file(file_), line(line_) {}
-
- std::string file;
- int line;
-};
-
-class QpidError : public Exception
-{
- public:
- const int code;
- SrcLine loc;
- std::string msg;
-
- QpidError();
-
- template <class T>
- QpidError(int code_, const T& msg_, const SrcLine& loc_) throw()
- : Exception(message(code_, boost::lexical_cast<std::string>(msg_), loc_.file.c_str(), loc_.line)),
- code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) {}
-
- ~QpidError() throw();
- Exception::auto_ptr clone() const throw();
- void throwSelf() const;
-
- /** Format message for exception. */
- static std::string message(int code, const std::string& msg, const char* file, int line);
-};
-
-
-} // namespace qpid
-
-#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__)
-
-#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE)
-
-#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE)
-
-#define THROW_QPID_ERRNO_IF(cond) if (cond) QPID_ERROR(INTERNAL, strError(errno));
-
-const int PROTOCOL_ERROR = 10000;
-const int APR_ERROR = 20000;
-const int FRAMING_ERROR = 30000;
-const int CLIENT_ERROR = 40000;
-const int INTERNAL_ERROR = 50000;
-
-#endif
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index e53774740a..b88f1c6c6a 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -64,7 +64,8 @@ Broker::Options::Options(const std::string& name) :
storeDir("/var"),
storeAsync(false),
enableMgmt(0),
- mgmtPubInterval(10)
+ mgmtPubInterval(10),
+ ack(100)
{
addOptions()
("port,p", optValue(port,"PORT"),
@@ -102,7 +103,8 @@ Broker::Broker(const Broker::Options& conf) :
queues(store.get()),
stagingThreshold(0),
factory(*this),
- dtxManager(store.get())
+ dtxManager(store.get()),
+ sessionManager(conf.ack)
{
if(conf.enableMgmt){
managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 2018371624..817197a351 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -69,6 +69,7 @@ class Broker : public sys::Runnable, public Plugin::Target
bool storeAsync;
bool enableMgmt;
uint16_t mgmtPubInterval;
+ uint32_t ack;
};
virtual ~Broker();
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index 99b585406e..dad40868d6 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -21,6 +21,7 @@
#include "MessageDelivery.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace broker {
@@ -75,8 +76,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri
checkAlternate(response.first, alternate);
}
}catch(UnknownExchangeTypeException& e){
- throw ConnectionException(
- 503, "Exchange type not implemented: " + type);
+ throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
}
}
}
@@ -84,24 +84,23 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri
void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
{
if (!type.empty() && exchange->getType() != type) {
- throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type);
+ throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type));
}
}
void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
{
- if (alternate && alternate != exchange->getAlternate()) {
- throw ConnectionException(530, "Exchange declared with alternate-exchange "
- + exchange->getAlternate()->getName() + ", requested "
- + alternate->getName());
- }
-
+ if (alternate && alternate != exchange->getAlternate())
+ throw NotAllowedException(
+ QPID_MSG("Exchange declared with alternate-exchange "
+ << exchange->getAlternate()->getName() << ", requested "
+ << alternate->getName()));
}
void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
//TODO: implement unused
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
+ if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
getBroker().getExchanges().destroy(name);
@@ -292,7 +291,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/,
Queue::shared_ptr queue = state.getQueue(queueName);
if(!consumerTag.empty() && state.exists(consumerTag)){
- throw ConnectionException(530, "Consumer tags must be unique");
+ throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
}
string newTag = consumerTag;
//need to generate name here, so we have it for the adapter (it is
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index 5537dc67f5..706b42c080 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -82,7 +82,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
throw framing::NotImplementedException("Tunnel class not implemented"); }
// Handlers no longer implemented in BrokerAdapter:
-#define BADHANDLER() assert(0); throw framing::InternalErrorException()
+#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
SessionHandler* getSessionHandler() { BADHANDLER(); }
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index ca0ca20849..f981d47ef7 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -28,9 +28,10 @@
#include "BrokerAdapter.h"
#include "SemanticHandler.h"
-#include <boost/utility/in_place_factory.hpp>
#include <boost/bind.hpp>
+#include <algorithm>
+
using namespace boost;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -61,6 +62,7 @@ void Connection::close(
ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
adapter.close(code, text, classId, methodId);
+ channels.clear();
getOutput().close();
}
@@ -73,8 +75,11 @@ void Connection::idleOut(){}
void Connection::idleIn(){}
-void Connection::closed(){
+void Connection::closed(){ // Physically closed, suspend open sessions.
try {
+ std::for_each(
+ channels.begin(), channels.end(),
+ boost::bind(&SessionHandler::localSuspend, _1));
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index f697986194..dd645b595e 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -46,9 +46,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
AMQMethodBody* method=frame.getBody()->getMethod();
try{
if (!invoke(*handler.get(), *method))
- throw ConnectionException(503, "Class can't be accessed over channel 0");
+ throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
}catch(ConnectionException& e){
- handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+ handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
}catch(std::exception& e){
handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
}
diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp
index 0bb3449289..3fcc487324 100644
--- a/cpp/src/qpid/broker/Daemon.cpp
+++ b/cpp/src/qpid/broker/Daemon.cpp
@@ -17,7 +17,7 @@
*/
#include "Daemon.h"
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include <boost/iostreams/stream.hpp>
#include <boost/iostreams/device/file_descriptor.hpp>
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 5887d13f85..ec042ff56a 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -44,7 +44,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (fail) {
state.endDtx(xid, true);
if (suspend) {
- throw ConnectionException(503, "End and suspend cannot both be set.");
+ throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
return DtxDemarcationEndResult(XA_RBROLLBACK);
}
@@ -67,7 +67,7 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
bool resume)
{
if (join && resume) {
- throw ConnectionException(503, "Join and resume cannot both be set.");
+ throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
}
try {
if (resume) {
@@ -161,7 +161,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
const string& xid)
{
//Currently no heuristic completion is supported, so this should never be used.
- throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
}
DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 0d211017de..0597b41f98 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -20,16 +20,20 @@
*/
#include "DtxManager.h"
#include "DtxTimeout.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <iostream>
using qpid::sys::Mutex;
using namespace qpid::broker;
+using namespace qpid::framing;
DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {}
-DtxManager::~DtxManager() {}
+DtxManager::~DtxManager() {
+ // timer.stop(); // FIXME aconway 2007-10-23: leaking threads.
+}
void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops)
{
@@ -84,7 +88,7 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
}
return i;
}
@@ -94,7 +98,7 @@ void DtxManager::remove(const std::string& xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i == work.end()) {
- throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
} else {
work.erase(i);
}
@@ -105,7 +109,7 @@ DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid)
Mutex::ScopedLock locker(lock);
WorkMap::iterator i = work.find(xid);
if (i != work.end()) {
- throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid);
+ throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
} else {
return work.insert(xid, new DtxWorkRecord(xid, store)).first;
}
diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h
index 33da62e7f4..7d0b8622d0 100644
--- a/cpp/src/qpid/broker/DtxTimeout.h
+++ b/cpp/src/qpid/broker/DtxTimeout.h
@@ -29,12 +29,7 @@ namespace broker {
class DtxManager;
-
-struct DtxTimeoutException : public Exception
-{
- DtxTimeoutException() {}
-};
-
+struct DtxTimeoutException : public Exception {};
struct DtxTimeout : public TimerTask
{
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index f2f118c5e4..fe9e42ca32 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -19,12 +19,14 @@
*
*/
#include "DtxWorkRecord.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
#include <boost/mem_fn.hpp>
using boost::mem_fn;
using qpid::sys::Mutex;
using namespace qpid::broker;
+using namespace qpid::framing;
DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
@@ -71,8 +73,7 @@ bool DtxWorkRecord::commit(bool onePhase)
if (prepared) {
//already prepared i.e. 2pc
if (onePhase) {
- throw ConnectionException(503,
- boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!"));
}
store->commit(*txn);
@@ -83,8 +84,7 @@ bool DtxWorkRecord::commit(bool onePhase)
} else {
//1pc commit optimisation, don't need a 2pc transaction context:
if (!onePhase) {
- throw ConnectionException(503,
- boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
}
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
@@ -119,7 +119,7 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
throw DtxTimeoutException();
}
if (completed) {
- throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
}
work.push_back(ops);
}
@@ -133,7 +133,7 @@ bool DtxWorkRecord::check()
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
if (!(*i)->isEnded()) {
- throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid);
+ throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!"));
} else if ((*i)->isRollbackOnly()) {
rolledback = true;
}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index ae1afe5abb..98e3cc7347 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -24,6 +24,7 @@
#include "HeadersExchange.h"
#include "TopicExchange.h"
#include "ManagementExchange.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
using namespace qpid::sys;
@@ -75,9 +76,8 @@ void ExchangeRegistry::destroy(const string& name){
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end()) {
- throw ChannelException(404, "Exchange not found: " + name);
- }
+ if (i == exchanges.end())
+ throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
return i->second;
}
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 215a002517..dd688cdfcf 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -20,7 +20,7 @@
*/
#include "HeadersExchange.h"
#include "qpid/framing/FieldValue.h"
-#include "qpid/QpidError.h"
+#include "qpid/framing/reply_exceptions.h"
#include <algorithm>
@@ -46,9 +46,8 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const
bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
RWlock::ScopedWlock locker(lock);
FieldTable::ValuePtr what = args->get(x_match);
- if (!what || (*what != all && *what != any)) {
- THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
- }
+ if (!what || (*what != all && *what != any))
+ throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
Binding binding(*args, queue);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), binding);
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index b12910893a..834ce0a203 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -16,7 +16,7 @@
*
*/
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
#include "MessageHandlerImpl.h"
#include "qpid/framing/FramingContent.h"
@@ -56,39 +56,39 @@ MessageHandlerImpl::cancel(const string& destination )
void
MessageHandlerImpl::open(const string& /*reference*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::close(const string& /*reference*/)
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::checkpoint(const string& /*reference*/,
const string& /*identifier*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::resume(const string& /*reference*/,
const string& /*identifier*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
MessageHandlerImpl::offset(uint64_t /*value*/ )
{
- throw ConnectionException(540, "References no longer supported");
+ throw NotImplementedException("References no longer supported");
}
void
@@ -97,19 +97,19 @@ MessageHandlerImpl::get(uint16_t /*ticket*/,
const string& /*destination*/,
bool /*noAck*/ )
{
- throw ConnectionException(540, "get no longer supported");
+ throw NotImplementedException("get no longer supported");
}
void
MessageHandlerImpl::empty()
{
- throw ConnectionException(540, "empty no longer supported");
+ throw NotImplementedException("empty no longer supported");
}
void
MessageHandlerImpl::ok()
{
- throw ConnectionException(540, "Message.Ok no longer supported");
+ throw NotImplementedException("Message.Ok no longer supported");
}
void
@@ -134,7 +134,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/,
{
Queue::shared_ptr queue = state.getQueue(queueName);
if(!destination.empty() && state.exists(destination))
- throw ConnectionException(530, "Consumer tags must be unique");
+ throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
string tag = destination;
state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode),
@@ -165,7 +165,7 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i
state.addByteCredit(destination, value);
} else {
//unknown
- throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
+ throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
}
}
@@ -179,7 +179,7 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode)
//window
state.setWindowMode(destination);
} else{
- throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);
+ throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));
}
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 116e8d9431..18c1ab1056 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -19,9 +19,8 @@
*
*/
-#include <boost/format.hpp>
-
#include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
#include "Broker.h"
#include "Queue.h"
#include "Exchange.h"
@@ -37,7 +36,6 @@
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
-using boost::format;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
@@ -269,17 +267,15 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
void Queue::consume(Consumer::ptr c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
if(exclusive) {
- throw ChannelException(
- 403, format("Queue '%s' has an exclusive consumer."
- " No more consumers allowed.") % getName());
+ throw AccessRefusedException(
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
}
if(requestExclusive) {
if(acquirers.empty() && browsers.empty()) {
exclusive = c;
} else {
- throw ChannelException(
- 403, format("Queue '%s' already has consumers."
- "Exclusive access denied.") % getName());
+ throw AccessRefusedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
}
}
if (c->preAcquires()) {
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index 8535dc6a60..e1a8ae470d 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -125,7 +125,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
incoming.complete(id);
if (!invoker.wasHandled()) {
- throw ConnectionException(540, "Not implemented");
+ throw NotImplementedException("Not implemented");
} else if (invoker.hasResult()) {
session.getProxy().getExecution().result(id.getValue(), invoker.getResult());
}
@@ -139,7 +139,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
void SemanticHandler::handleL3(framing::AMQMethodBody* method)
{
if (!invoke(*this, *method))
- throw ConnectionException(540, "Not implemented");
+ throw NotImplementedException("Not implemented");
}
void SemanticHandler::handleContent(AMQFrame& frame)
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 1f7436da94..e0e4315d03 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -31,7 +31,6 @@
#include "SessionHandler.h"
#include "TxAck.h"
#include "TxPublish.h"
-#include "qpid/QpidError.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
@@ -116,7 +115,8 @@ void SemanticState::startTx()
void SemanticState::commit(MessageStore* const store)
{
- if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+ if (!txBuffer) throw
+ CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
txBuffer->enlist(txAck);
@@ -127,7 +127,8 @@ void SemanticState::commit(MessageStore* const store)
void SemanticState::rollback()
{
- if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+ if (!txBuffer)
+ throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
txBuffer->rollback();
accumulatedAck.clear();
@@ -141,7 +142,7 @@ void SemanticState::selectDtx()
void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
{
if (!dtxSelected) {
- throw ConnectionException(503, "Session has not been selected for use with dtx");
+ throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
}
dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
@@ -155,11 +156,12 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
void SemanticState::endDtx(const std::string& xid, bool fail)
{
if (!dtxBuffer) {
- throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
+ throw CommandInvalidException(QPID_MSG("xid " << xid << " not associated with this session"));
}
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end"));
+
}
txBuffer.reset();//ops on this session no longer transactional
@@ -176,8 +178,8 @@ void SemanticState::endDtx(const std::string& xid, bool fail)
void SemanticState::suspendDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend"));
}
txBuffer.reset();//ops on this session no longer transactional
@@ -188,11 +190,12 @@ void SemanticState::suspendDtx(const std::string& xid)
void SemanticState::resumeDtx(const std::string& xid)
{
if (dtxBuffer->getXid() != xid) {
- throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
- % dtxBuffer->getXid() % xid);
+ throw CommandInvalidException(
+ QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume"));
+
}
if (!dtxBuffer->isSuspended()) {
- throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+ throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended"));
}
checkDtxTimeout();
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index ed092d6a05..9b065be8af 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -26,6 +26,8 @@
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
+
namespace qpid {
namespace broker {
using namespace framing;
@@ -33,7 +35,9 @@ using namespace std;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: InOutHandler(0, &c.getOutput()),
- connection(c), channel(ch), proxy(out),
+ connection(c), channel(ch, &c.getOutput()),
+ proxy(out), // Via my own handleOut() for L2 data.
+ peerSession(channel), // Direct to channel for L2 commands.
ignoring(false) {}
SessionHandler::~SessionHandler() {}
@@ -54,15 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) {
try {
if (m && invoke(*this, *m))
return;
- else if (session.get())
- session->in(f);
+ else if (session.get()) {
+ boost::optional<SequenceNumber> ack=session->received(f);
+ session->in.handle(f);
+ if (ack)
+ peerSession.ack(*ack, SequenceNumberSet());
+ }
else if (!ignoring)
throw ChannelErrorException(
- QPID_MSG("Channel " << channel << " is not open"));
+ QPID_MSG("Channel " << channel.get() << " is not open"));
} catch(const ChannelException& e) {
ignoring=true; // Ignore trailing frames sent by client.
session.reset();
- getProxy().getSession().closed(e.code, e.toString());
+ peerSession.closed(e.code, e.what());
}catch(const ConnectionException& e){
connection.close(e.code, e.what(), classId(m), methodId(m));
}catch(const std::exception& e){
@@ -72,21 +80,22 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
void SessionHandler::handleOut(AMQFrame& f) {
- f.setChannel(getChannel());
- out.next->handle(f);
+ channel.handle(f); // Send it.
+ if (session->sent(f))
+ peerSession.solicitAck();
}
-void SessionHandler::assertOpen(const char* method) {
- if (!session.get())
+void SessionHandler::assertAttached(const char* method) const {
+ if (!session.get())
throw ChannelErrorException(
QPID_MSG(method << " failed: No session for channel "
<< getChannel()));
}
-void SessionHandler::assertClosed(const char* method) {
+void SessionHandler::assertClosed(const char* method) const {
if (session.get())
throw ChannelBusyException(
- QPID_MSG(method << " failed: channel " << channel
+ QPID_MSG(method << " failed: channel " << channel.get()
<< " is already open."));
}
@@ -95,32 +104,38 @@ void SessionHandler::open(uint32_t detachedLifetime) {
std::auto_ptr<SessionState> state(
connection.broker.getSessionManager().open(*this, detachedLifetime));
session.reset(state.release());
- getProxy().getSession().attached(session->getId(), session->getTimeout());
+ peerSession.attached(session->getId(), session->getTimeout());
}
void SessionHandler::resume(const Uuid& id) {
assertClosed("resume");
- session = connection.broker.getSessionManager().resume(*this, id);
- getProxy().getSession().attached(session->getId(), session->getTimeout());
+ session = connection.broker.getSessionManager().resume(id);
+ session->attach(*this);
+ SequenceNumber seq = session->resuming();
+ peerSession.attached(session->getId(), session->getTimeout());
+ proxy.getSession().ack(seq, SequenceNumberSet());
}
void SessionHandler::flow(bool /*active*/) {
+ assertAttached("flow");
// FIXME aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException();
+ assert(0); throw NotImplementedException("session.flow");
}
void SessionHandler::flowOk(bool /*active*/) {
+ assertAttached("flowOk");
// FIXME aconway 2007-09-19: Removed in 0-10, remove
- assert(0); throw NotImplementedException();
+ assert(0); throw NotImplementedException("session.flowOk");
}
void SessionHandler::close() {
+ assertAttached("close");
QPID_LOG(info, "Received session.close");
ignoring=false;
session.reset();
- getProxy().getSession().closed(REPLY_SUCCESS, "ok");
- assert(&connection.getChannel(channel) == this);
- connection.closeChannel(channel);
+ peerSession.closed(REPLY_SUCCESS, "ok");
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
}
void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
@@ -129,26 +144,43 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) {
session.reset();
}
+void SessionHandler::localSuspend() {
+ if (session.get() && session->getState() == SessionState::ATTACHED) {
+ session->detach();
+ connection.broker.getSessionManager().suspend(session);
+ }
+}
+
void SessionHandler::suspend() {
- assertOpen("suspend");
- connection.broker.getSessionManager().suspend(session);
- assert(!session.get());
- getProxy().getSession().detached();
- assert(&connection.getChannel(channel) == this);
- connection.closeChannel(channel);
+ assertAttached("suspend");
+ localSuspend();
+ peerSession.detached();
+ assert(&connection.getChannel(channel.get()) == this);
+ connection.closeChannel(channel.get());
}
-void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/,
- const SequenceNumberSet& /*seenFrameSet*/) {
- assert(0); throw NotImplementedException();
+void SessionHandler::ack(uint32_t cumulativeSeenMark,
+ const SequenceNumberSet& /*seenFrameSet*/)
+{
+ assertAttached("ack");
+ if (session->getState() == SessionState::RESUMING) {
+ session->receivedAck(cumulativeSeenMark);
+ framing::SessionState::Replay replay=session->replay();
+ std::for_each(replay.begin(), replay.end(),
+ boost::bind(&SessionHandler::handleOut, this, _1));
+ }
+ else
+ session->receivedAck(cumulativeSeenMark);
}
void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
- assert(0); throw NotImplementedException();
+ // FIXME aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.high-water-mark");
}
void SessionHandler::solicitAck() {
- assert(0); throw NotImplementedException();
+ assertAttached("solicit-ack");
+ peerSession.ack(session->sendingAck(), SequenceNumberSet());
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 51a65e3092..9a68ddb46f 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -26,6 +26,7 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/framing/ChannelHandler.h"
#include <boost/noncopyable.hpp>
@@ -52,7 +53,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- framing::ChannelId getChannel() const { return channel; }
+ framing::ChannelId getChannel() const { return channel.get(); }
Connection& getConnection() { return connection; }
const Connection& getConnection() const { return connection; }
@@ -60,6 +61,9 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+ // Called by closing connection.
+ void localSuspend();
+
protected:
void handleIn(framing::AMQFrame&);
void handleOut(framing::AMQFrame&);
@@ -79,12 +83,14 @@ class SessionHandler : public framing::FrameHandler::InOutHandler,
void solicitAck();
- void assertOpen(const char* method);
- void assertClosed(const char* method);
+ void assertAttached(const char* method) const;
+ void assertActive(const char* method) const;
+ void assertClosed(const char* method) const;
Connection& connection;
- const framing::ChannelId channel;
+ framing::ChannelHandler channel;
framing::AMQP_ClientProxy proxy;
+ framing::AMQP_ClientProxy::Session peerSession;
bool ignoring;
std::auto_ptr<SessionState> session;
};
diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp
index 303687c788..f12ebc6db1 100644
--- a/cpp/src/qpid/broker/SessionManager.cpp
+++ b/cpp/src/qpid/broker/SessionManager.cpp
@@ -39,7 +39,7 @@ namespace broker {
using namespace sys;
using namespace framing;
-SessionManager::SessionManager() {}
+SessionManager::SessionManager(uint32_t a) : ack(a) {}
SessionManager::~SessionManager() {}
@@ -47,7 +47,8 @@ std::auto_ptr<SessionState> SessionManager::open(
SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
- std::auto_ptr<SessionState> session(new SessionState(*this, h, timeout_));
+ std::auto_ptr<SessionState> session(
+ new SessionState(*this, h, timeout_, ack));
active.insert(session->getId());
return session;
}
@@ -55,14 +56,13 @@ std::auto_ptr<SessionState> SessionManager::open(
void SessionManager::suspend(std::auto_ptr<SessionState> session) {
Mutex::ScopedLock l(lock);
active.erase(session->getId());
+ session->suspend();
session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
- session->handler = 0;
suspended.push_back(session.release()); // In expiry order
eraseExpired();
}
-std::auto_ptr<SessionState> SessionManager::resume(
- SessionHandler& sh, const Uuid& id)
+std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id)
{
Mutex::ScopedLock l(lock);
eraseExpired();
@@ -78,7 +78,6 @@ std::auto_ptr<SessionState> SessionManager::resume(
QPID_MSG("No suspended session with id=" << id));
active.insert(id);
std::auto_ptr<SessionState> state(suspended.release(i).release());
- state->handler = &sh;
return state;
}
@@ -94,8 +93,10 @@ void SessionManager::eraseExpired() {
Suspended::iterator keep = std::lower_bound(
suspended.begin(), suspended.end(), now(),
boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
- QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
- suspended.erase(suspended.begin(), keep);
+ if (suspended.begin() != keep) {
+ QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+ suspended.erase(suspended.begin(), keep);
+ }
}
}
diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h
index 58a7b3f01f..fa7262252d 100644
--- a/cpp/src/qpid/broker/SessionManager.h
+++ b/cpp/src/qpid/broker/SessionManager.h
@@ -44,7 +44,7 @@ class SessionHandler;
*/
class SessionManager : private boost::noncopyable {
public:
- SessionManager();
+ SessionManager(uint32_t ack);
~SessionManager();
/** Open a new active session, caller takes ownership */
std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
@@ -57,18 +57,20 @@ class SessionManager : private boost::noncopyable {
/** Resume a suspended session.
*@throw Exception if timed out or non-existant.
*/
- std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&);
+ std::auto_ptr<SessionState> resume(const framing::Uuid&);
private:
typedef boost::ptr_vector<SessionState> Suspended;
typedef std::set<framing::Uuid> Active;
+ void erase(const framing::Uuid&);
+ void eraseExpired();
+
sys::Mutex lock;
Suspended suspended;
Active active;
-
- void erase(const framing::Uuid&);
- void eraseExpired();
+ uint32_t ack;
+
friend class SessionState; // removes deleted sessions from active set.
};
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 17537e11be..45d78c9307 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -31,22 +31,25 @@ namespace broker {
using namespace framing;
-SessionState::SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_)
- : factory(f), handler(&h), id(true), timeout(timeout_),
- broker(h.getConnection().broker),
- version(h.getConnection().getVersion())
-{
- // FIXME aconway 2007-09-21: Break dependnecy - broker updates session.
- chain.push_back(new SemanticHandler(*this));
- in = &chain[0]; // Incoming frame to handler chain.
- out = &handler->out; // Outgoing frames to SessionHandler
+void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
- // FIXME aconway 2007-09-20: use broker to add plugin
- // handlers to the chain.
- // FIXME aconway 2007-08-31: Shouldn't be passing channel ID.
- broker.update(handler->getChannel(), *this);
+void SessionState::handleOut(AMQFrame& f) {
+ assert(handler);
+ handler->out.handle(f);
}
+SessionState::SessionState(
+ SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack)
+ : framing::SessionState(ack),
+ factory(f), handler(&h), id(true), timeout(timeout_),
+ broker(h.getConnection().broker),
+ version(h.getConnection().getVersion()),
+ semanticHandler(new SemanticHandler(*this))
+{
+ // FIXME aconway 2007-09-20: SessionManager may add plugin
+ // handlers to the chain.
+ }
+
SessionState::~SessionState() {
// Remove ID from active session list.
factory.erase(getId());
@@ -65,4 +68,12 @@ Connection& SessionState::getConnection() {
return getHandler().getConnection();
}
+void SessionState::detach() {
+ handler = 0;
+}
+
+void SessionState::attach(SessionHandler& h) {
+ handler = &h;
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index d152937692..eed088af31 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -24,11 +24,12 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SessionState.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/sys/Time.h"
-#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
#include <set>
#include <vector>
@@ -42,31 +43,26 @@ class AMQP_ClientProxy;
namespace broker {
+class SemanticHandler;
class SessionHandler;
class SessionManager;
class Broker;
class Connection;
/**
- * State of a session.
- *
- * An attached session has a SessionHandler which is attached to a
- * connection. A suspended session has no handler.
- *
- * A SessionState is always associated with an open session (attached or
- * suspended) it is destroyed when the session is closed.
- *
- * The SessionState includes the sessions handler chains, which may
- * themselves have state. The handlers will be preserved as long as
- * the session is alive.
+ * Broker-side session state includes sessions handler chains, which may
+ * themselves have state.
*/
-class SessionState : public framing::FrameHandler::Chains,
- private boost::noncopyable
+class SessionState : public framing::SessionState,
+ public framing::FrameHandler::InOutHandler
{
public:
~SessionState();
bool isAttached() { return handler; }
+ void detach();
+ void attach(SessionHandler& handler);
+
/** @pre isAttached() */
SessionHandler& getHandler();
@@ -76,23 +72,30 @@ class SessionState : public framing::FrameHandler::Chains,
/** @pre isAttached() */
Connection& getConnection();
- const framing::Uuid& getId() const { return id; }
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
framing::ProtocolVersion getVersion() const { return version; }
+
+ protected:
+ void handleIn(framing::AMQFrame&);
+ void handleOut(framing::AMQFrame&);
private:
- /** Only SessionManager can open sessions */
- SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_);
-
+ // SessionManager creates sessions.
+ SessionState(SessionManager&,
+ SessionHandler& out,
+ uint32_t timeout,
+ uint32_t ackInterval);
+
SessionManager& factory;
SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
sys::AbsTime expiry; // Used by SessionManager.
Broker& broker;
- boost::ptr_vector<framing::FrameHandler> chain;
framing::ProtocolVersion version;
+
+ boost::scoped_ptr<SemanticHandler> semanticHandler;
friend class SessionManager;
};
diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp
index be75346578..14727b3b35 100644
--- a/cpp/src/qpid/broker/Timer.cpp
+++ b/cpp/src/qpid/broker/Timer.cpp
@@ -73,17 +73,14 @@ void Timer::start()
Monitor::ScopedLock l(monitor);
if (!active) {
active = true;
- runner = std::auto_ptr<Thread>(new Thread(this));
+ runner = Thread(this);
}
}
void Timer::stop()
{
signalStop();
- if (runner.get()) {
- runner->join();
- runner.reset();
- }
+ runner.join();
}
void Timer::signalStop()
{
diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h
index c70ffeaedc..e89ae499b7 100644
--- a/cpp/src/qpid/broker/Timer.h
+++ b/cpp/src/qpid/broker/Timer.h
@@ -53,7 +53,7 @@ class Timer : private qpid::sys::Runnable
{
qpid::sys::Monitor monitor;
std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks;
- std::auto_ptr<qpid::sys::Thread> runner;
+ qpid::sys::Thread runner;
bool active;
void run();
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index cef34630db..16e0428a56 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -24,7 +24,6 @@
#include "Channel.h"
#include "qpid/sys/Monitor.h"
#include "Message.h"
-#include "qpid/QpidError.h"
#include "Connection.h"
#include "Demux.h"
#include "FutureResponse.h"
@@ -71,7 +70,7 @@ void Channel::open(const Session& s)
{
Mutex::ScopedLock l(stopLock);
if (isOpen())
- THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
+ throw ChannelBusyException();
active = true;
session = s;
if(isTransactional()) {
@@ -142,7 +141,7 @@ void Channel::consume(
Mutex::ScopedLock l(lock);
ConsumerMap::iterator i = consumers.find(tag);
if (i != consumers.end())
- throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
+ throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag ));
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index 0a6a88ae90..932fab8881 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -29,7 +29,7 @@
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
+#include "qpid/shared_ptr.h"
#include <iostream>
#include <sstream>
#include <functional>
@@ -44,23 +44,26 @@ namespace client {
Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) :
channelIdCounter(0), version(_version),
max_frame_size(_max_frame_size),
- impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))),
- isOpen(false) {}
+ isOpen(false),
+ impl(new ConnectionImpl(
+ shared_ptr<Connector>(new Connector(_version, _debug))))
+{}
-Connection::Connection(boost::shared_ptr<Connector> c) :
+Connection::Connection(shared_ptr<Connector> c) :
channelIdCounter(0), version(framing::highestProtocolVersion),
max_frame_size(65536),
- impl(new ConnectionImpl(c)),
- isOpen(false) {}
+ isOpen(false),
+ impl(new ConnectionImpl(c))
+{}
-Connection::~Connection(){}
+Connection::~Connection(){ }
void Connection::open(
const std::string& host, int port,
const std::string& uid, const std::string& pwd, const std::string& vhost)
{
if (isOpen)
- THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+ throw Exception(QPID_MSG("Channel object is already open"));
impl->open(host, port, uid, pwd, vhost);
isOpen = true;
@@ -79,10 +82,9 @@ Session Connection::newSession(uint32_t detachedLifetime) {
}
void Connection::resume(Session& session) {
- shared_ptr<SessionCore> core=session.impl;
- core->setChannel(++channelIdCounter);
- impl->addSession(core);
- core->resume(impl);
+ session.impl->setChannel(++channelIdCounter);
+ impl->addSession(session.impl);
+ session.impl->resume(impl);
}
void Connection::close() {
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 2e5059f135..d2612ca754 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -23,7 +23,6 @@
*/
#include <map>
#include <string>
-#include "qpid/QpidError.h"
#include "Channel.h"
#include "ConnectionImpl.h"
#include "qpid/client/Session.h"
@@ -57,10 +56,12 @@ class Connection
framing::ChannelId channelIdCounter;
framing::ProtocolVersion version;
const uint32_t max_frame_size;
- shared_ptr<ConnectionImpl> impl;
bool isOpen;
bool debug;
-
+
+ protected:
+ boost::shared_ptr<ConnectionImpl> impl;
+
public:
/**
* Creates a connection object, but does not open the
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 4058bfb33f..a8f10c32a9 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -68,7 +68,7 @@ void ConnectionHandler::incoming(AMQFrame& frame)
try {
in(frame);
}catch(ConnectionException& e){
- error(e.code, e.toString(), body);
+ error(e.code, e.what(), body);
}catch(std::exception& e){
error(541/*internal error*/, e.what(), body);
}
@@ -124,6 +124,8 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_
void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
{
+ if (onError)
+ onError(code, message);
AMQMethodBody* method = body->getMethod();
if (method)
error(code, message, method->amqpClassId(), method->amqpMethodId());
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index fae93e8294..f9273bc165 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
#include "ConnectionImpl.h"
@@ -35,8 +36,9 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
{
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
handler.out = boost::bind(&Connector::send, connector, _1);
- handler.onClose = boost::bind(&ConnectionImpl::closed, this);
- handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2);
+ handler.onClose = boost::bind(&ConnectionImpl::closed, this,
+ REPLY_SUCCESS, std::string());
+ handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
connector->setInputHandler(&handler);
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
@@ -64,7 +66,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
s = sessions[frame.getChannel()].lock();
}
if (!s)
- throw ChannelErrorException();
+ throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel()));
s->in(frame);
}
@@ -84,19 +86,8 @@ void ConnectionImpl::open(const std::string& host, int port,
void ConnectionImpl::close()
{
- assertNotClosed();
- handler.close();
-}
-
-void ConnectionImpl::closed()
-{
- closedByPeer(200, "OK");
-}
-
-void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
-{
- signalClose(code, text);
- connector->close();
+ if (!isClosed)
+ handler.close();
}
void ConnectionImpl::idleIn()
@@ -110,26 +101,39 @@ void ConnectionImpl::idleOut()
connector->send(frame);
}
+template <class F>
+void ConnectionImpl::forChannels(F functor) {
+ for (SessionMap::iterator i = sessions.begin();
+ i != sessions.end(); ++i) {
+ try {
+ boost::shared_ptr<SessionCore> s = i->second.lock();
+ if (s) functor(*s);
+ } catch (...) { assert(0); }
+ }
+}
+
void ConnectionImpl::shutdown()
{
- //this indicates that the socket to the server has closed
- signalClose(0, "Unexpected socket closure.");
+ Mutex::ScopedLock l(lock);
+ if (isClosed) return;
+ forChannels(boost::bind(&SessionCore::connectionBroke, _1,
+ INTERNAL_ERROR, "Unexpected socket closure."));
+ sessions.clear();
+ isClosed = true;
}
-void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- boost::shared_ptr<SessionCore> s = i->second.lock();
- if (s)
- s->closed(code, text);
- }
+ if (isClosed) return;
+ forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
sessions.clear();
isClosed = true;
+ connector->close();
}
-void ConnectionImpl::assertNotClosed()
-{
+void ConnectionImpl::erase(uint16_t ch) {
Mutex::ScopedLock l(lock);
- if (isClosed) throw Exception("Connection has been closed");
+ sessions.erase(ch);
}
+
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index f20534f1aa..46bd5b685d 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -51,14 +51,14 @@ class ConnectionImpl : public framing::FrameHandler,
bool isClosed;
void incoming(framing::AMQFrame& frame);
- void closed();
- void closedByPeer(uint16_t, const std::string&);
+ void closed(uint16_t, const std::string&);
void idleOut();
void idleIn();
void shutdown();
- void signalClose(uint16_t, const std::string&);
- void assertNotClosed();
-public:
+
+ template <class F> void forChannels(F functor);
+
+ public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
ConnectionImpl(boost::shared_ptr<Connector> c);
@@ -69,7 +69,9 @@ public:
const std::string& pwd = "guest",
const std::string& virtualhost = "/");
void close();
- void handle(framing::AMQFrame& frame);
+ void handle(framing::AMQFrame& frame);
+ void erase(uint16_t channel);
+ boost::shared_ptr<Connector> getConnector() { return connector; }
};
}}
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index b1ec580605..ba11ea5569 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -20,7 +20,6 @@
*/
#include <iostream>
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
#include "Connector.h"
@@ -36,7 +35,6 @@ namespace client {
using namespace qpid::sys;
using namespace qpid::framing;
-using qpid::QpidError;
Connector::Connector(
ProtocolVersion ver, bool _debug, uint32_t buffer_size
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index 8aaaea247a..af6badd6e0 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -98,6 +98,7 @@ class Connector : public framing::OutputHandler,
virtual void setInputHandler(framing::InputHandler* handler);
virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
virtual void setShutdownHandler(sys::ShutdownHandler* handler);
+ virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
virtual framing::OutputHandler* getOutputHandler();
virtual void send(framing::AMQFrame& frame);
virtual void setReadTimeout(uint16_t timeout);
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index e4edece414..c70b0fc455 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -73,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
{
if (range.size() % 2) { //must be even number
- throw ConnectionException(530, "Received odd number of elements in ranged mark");
+ throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark"));
} else {
SequenceNumber mark(cumulative);
{
diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h
index 667a19e942..d07f9f149c 100644
--- a/cpp/src/qpid/client/Future.h
+++ b/cpp/src/qpid/client/Future.h
@@ -63,7 +63,7 @@ public:
boost::bind(&FutureCompletion::completed, &callback)
);
callback.waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
complete = true;
}
}
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index 73b7c3a7a6..5d36a1d873 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -31,7 +31,7 @@ using namespace qpid::sys;
AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
{
waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
return response.get();
}
diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp
index a523129206..681202edea 100644
--- a/cpp/src/qpid/client/FutureResult.cpp
+++ b/cpp/src/qpid/client/FutureResult.cpp
@@ -30,7 +30,7 @@ using namespace qpid::sys;
const std::string& FutureResult::getResult(SessionCore& session) const
{
waitForCompletion();
- session.checkClosed();
+ session.assertOpen();
return result;
}
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 966d07eaef..27440465fe 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -24,105 +24,301 @@
#include "FutureResponse.h"
#include "FutureResult.h"
#include "ConnectionImpl.h"
-
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
-using namespace qpid::client;
+namespace qpid {
+namespace client {
+
using namespace qpid::framing;
-SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t maxFrameSize)
- : connection(conn), channel(ch), l2(*this), l3(maxFrameSize),
- uuid(false), sync(false)
+namespace { const std::string OK="ok"; }
+
+typedef sys::Monitor::ScopedLock Lock;
+typedef sys::Monitor::ScopedUnlock UnLock;
+
+inline void SessionCore::invariant() const {
+ switch (state.get()) {
+ case OPENING:
+ assert(!session);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case RESUMING:
+ assert(session);
+ assert(session->getState() == SessionState::RESUMING);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case OPEN:
+ case CLOSING:
+ case SUSPENDING:
+ assert(session);
+ assert(code==REPLY_SUCCESS);
+ assert(connection);
+ assert(channel.get());
+ assert(channel.next == connection.get());
+ break;
+ case SUSPENDED:
+ assert(code==REPLY_SUCCESS);
+ assert(session);
+ assert(!connection);
+ break;
+ case CLOSED:
+ assert(!session);
+ assert(!connection);
+ break;
+ }
+}
+
+inline void SessionCore::setState(State s) {
+ state = s;
+ invariant();
+}
+
+inline void SessionCore::waitFor(State s) {
+ invariant();
+ // We can be CLOSED or SUSPENDED by error at any time.
+ state.waitFor(States(s, CLOSED, SUSPENDED));
+ check();
+ assert(state==s);
+ invariant();
+}
+
+SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn,
+ uint16_t ch, uint64_t maxFrameSize)
+ : l3(maxFrameSize),
+ sync(false),
+ channel(ch),
+ proxy(channel),
+ state(OPENING)
{
- l2.next = &l3;
l3.out = &out;
- out.next = connection.get();
+ attaching(conn);
}
-SessionCore::~SessionCore() {}
+void SessionCore::attaching(shared_ptr<ConnectionImpl> c) {
+ assert(c);
+ assert(channel.get());
+ connection = c;
+ channel.next = connection.get();
+ code = REPLY_SUCCESS;
+ text = OK;
+ state = session ? RESUMING : OPENING;
+ invariant();
+}
-ExecutionHandler& SessionCore::getExecution()
-{
- checkClosed();
- return l3;
+SessionCore::~SessionCore() {
+ Lock l(state);
+ invariant();
+ detach(COMMAND_INVALID, "Session deleted");
+ state.waitAll();
}
-FrameSet::shared_ptr SessionCore::get()
-{
- checkClosed();
- return l3.getDemux().getDefault().pop();
+void SessionCore::detach(int c, const std::string& t) {
+ connection.reset();
+ channel.next = 0;
+ code=c;
+ text=t;
}
-void SessionCore::setSync(bool s)
-{
+void SessionCore::doClose(int code, const std::string& text) {
+ if (state != CLOSED) {
+ session.reset();
+ l3.getDemux().close();
+ l3.getCompletionTracker().close();
+ detach(code, text);
+ setState(CLOSED);
+ }
+ invariant();
+}
+
+void SessionCore::doSuspend(int code, const std::string& text) {
+ if (state != CLOSED) {
+ invariant();
+ detach(code, text);
+ session->suspend();
+ setState(SUSPENDED);
+ }
+}
+
+ExecutionHandler& SessionCore::getExecution() { // user thread
+ return l3;
+}
+
+void SessionCore::setSync(bool s) { // user thread
sync = s;
}
-bool SessionCore::isSync()
-{
+bool SessionCore::isSync() { // user thread
return sync;
}
-namespace {
-struct ClosedOnExit {
- SessionCore& core;
- int code;
- std::string text;
- ClosedOnExit(SessionCore& s, int c, const std::string& t)
- : core(s), code(c), text(t) {}
- ~ClosedOnExit() { core.closed(code, text); }
-};
+FrameSet::shared_ptr SessionCore::get() { // user thread
+ // No lock here: pop does a blocking wait.
+ return l3.getDemux().getDefault().pop();
+}
+
+void SessionCore::open(uint32_t detachedLifetime) { // user thread
+ Lock l(state);
+ check(state==OPENING && !session,
+ COMMAND_INVALID, QPID_MSG("Cannot re-open a session."));
+ proxy.open(detachedLifetime);
+ waitFor(OPEN);
+}
+
+void SessionCore::close() { // user thread
+ Lock l(state);
+ check();
+ if (state==OPEN) {
+ setState(CLOSING);
+ proxy.close();
+ waitFor(CLOSED);
+ }
+ else
+ doClose(REPLY_SUCCESS, OK);
+}
+
+void SessionCore::suspend() { // user thread
+ Lock l(state);
+ checkOpen();
+ setState(SUSPENDING);
+ proxy.suspend();
+ waitFor(SUSPENDED);
}
-void SessionCore::close()
+void SessionCore::setChannel(uint16_t ch) { channel=ch; }
+
+void SessionCore::resume(shared_ptr<ConnectionImpl> c) {
+ // user thread
+ {
+ Lock l(state);
+ if (state==OPEN)
+ doSuspend(REPLY_SUCCESS, OK);
+ check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed."));
+ SequenceNumber sendAck=session->resuming();
+ attaching(c);
+ proxy.resume(getId());
+ waitFor(OPEN);
+ proxy.ack(sendAck, SequenceNumberSet());
+ // FIXME aconway 2007-10-23: Replay inside the lock might be a prolem
+ // for large replay sets.
+ SessionState::Replay replay=session->replay();
+ for (SessionState::Replay::iterator i = replay.begin();
+ i != replay.end(); ++i)
+ {
+ invariant();
+ channel.handle(*i); // Direct to channel.
+ check();
+ }
+ }
+}
+
+void SessionCore::assertOpen() const {
+ Lock l(state);
+ checkOpen();
+}
+
+// network thread
+void SessionCore::attached(const Uuid& sessionId,
+ uint32_t /*detachedLifetime*/)
{
- checkClosed();
- ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user.");
- l2.close();
+ Lock l(state);
+ invariant();
+ check(state == OPENING || state == RESUMING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.attached"));
+ if (state==OPENING) { // New session
+ // FIXME aconway 2007-10-17: arbitrary ack value of 100 for
+ // client, allow configuration.
+ session=in_place<SessionState>(100, sessionId);
+ setState(OPEN);
+ }
+ else { // RESUMING
+ check(sessionId == session->getId(),
+ INVALID_ARGUMENT, QPID_MSG("session.resumed has invalid ID."));
+ // Don't setState yet, wait for first incoming ack.
+ }
}
-void SessionCore::suspend() {
- checkClosed();
- ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended");
- l2.suspend();
+void SessionCore::detached() { // network thread
+ Lock l(state);
+ check(state == SUSPENDING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.detached."));
+ connection->erase(channel);
+ doSuspend(REPLY_SUCCESS, OK);
+}
+
+void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) {
+ Lock l(state);
+ invariant();
+ check(state==OPEN || state==RESUMING,
+ COMMAND_INVALID, QPID_MSG("Received unexpected session.ack"));
+ session->receivedAck(ack);
+ if (state==RESUMING) {
+ setState(OPEN);
+ }
+ invariant();
}
void SessionCore::closed(uint16_t code, const std::string& text)
-{
- out.next = 0;
- reason.code = code;
- reason.text = text;
- l2.closed();
- l3.getDemux().close();
- l3.getCompletionTracker().close();
+{ // network thread
+ Lock l(state);
+ invariant();
+ doClose(code, text);
}
-void SessionCore::checkClosed() const
-{
- // TODO: could have been a connection exception
- if(out.next == 0)
- throw ChannelException(reason.code, reason.text);
+// closed by connection
+void SessionCore::connectionClosed(uint16_t code, const std::string& text) {
+ Lock l(state);
+ try {
+ doClose(code, text);
+ } catch(...) { assert (0); }
}
-void SessionCore::open(uint32_t detachedLifetime) {
- assert(out.next);
- l2.open(detachedLifetime);
+void SessionCore::connectionBroke(uint16_t code, const std::string& text) {
+ Lock l(state);
+ try {
+ doSuspend(code, text);
+ } catch (...) { assert(0); }
}
-void SessionCore::resume(shared_ptr<ConnectionImpl> conn) {
- connection = conn;
- out.next = connection.get();
- l2.resume();
+void SessionCore::check() const { // Called with lock held.
+ invariant();
+ if (code != REPLY_SUCCESS)
+ throwReplyException(code, text);
+}
+
+void SessionCore::check(bool cond, int newCode, const std::string& msg) const {
+ check();
+ if (!cond) {
+ const_cast<SessionCore*>(this)->doClose(newCode, msg);
+ throwReplyException(code, text);
+ }
}
-Future SessionCore::send(const AMQBody& command)
-{
- checkClosed();
+void SessionCore::checkOpen() const {
+ if (state==SUSPENDED) {
+ std::string cause;
+ if (code != REPLY_SUCCESS)
+ cause=" by :"+text;
+ throw CommandInvalidException(QPID_MSG("Session is suspended" << cause));
+ }
+ check(state==OPEN, COMMAND_INVALID, QPID_MSG("Session is not open"));
+}
+Future SessionCore::send(const AMQBody& command)
+{
+ Lock l(state);
+ checkOpen();
command.getMethod()->setSync(sync);
-
Future f;
//any result/response listeners must be set before the command is sent
if (command.getMethod()->resultExpected()) {
@@ -145,21 +341,61 @@ Future SessionCore::send(const AMQBody& command)
Future SessionCore::send(const AMQBody& command, const MethodContent& content)
{
- checkClosed();
+ Lock l(state);
+ checkOpen();
//content bearing methods don't currently have responses or
//results, if that changes should follow procedure for the other
//send method impl:
return Future(l3.send(command, content));
}
+// Network thread.
void SessionCore::handleIn(AMQFrame& frame) {
- l2.handle(frame);
+ try {
+ // Cast to expose private SessionHandler functions.
+ if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ session->received(frame);
+ l3.handle(frame);
+ }
+ } catch (const ChannelException& e) {
+ QPID_LOG(error, "Channel exception:" << e.what());
+ doClose(e.code, e.what());
+ }
}
void SessionCore::handleOut(AMQFrame& frame)
{
- checkClosed();
- frame.setChannel(channel);
- out.next->handle(frame);
+ Lock l(state);
+ if (state==OPEN) {
+ if (session->sent(frame))
+ proxy.solicitAck();
+ channel.handle(frame);
+ }
+}
+
+void SessionCore::solicitAck( ) {
+ Lock l(state);
+ checkOpen();
+ proxy.ack(session->sendingAck(), SequenceNumberSet());
+}
+
+void SessionCore::flow(bool) {
+ assert(0); throw NotImplementedException("session.flow");
+}
+
+void SessionCore::flowOk(bool /*active*/) {
+ assert(0); throw NotImplementedException("session.flow");
+}
+
+void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) {
+ // FIXME aconway 2007-10-02: may be removed from spec.
+ assert(0); throw NotImplementedException("session.highWaterMark");
+}
+
+const Uuid SessionCore::getId() const {
+ if (session)
+ return session->getId();
+ throw Exception(QPID_MSG("Closed session, no ID."));
}
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index ac109e1f5c..38c72359a3 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -22,17 +22,25 @@
#ifndef _SessionCore_
#define _SessionCore_
-#include <boost/function.hpp>
-#include <boost/shared_ptr.hpp>
-#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/shared_ptr.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/MethodContent.h"
-#include "qpid/framing/Uuid.h"
-#include "SessionHandler.h"
+#include "qpid/framing/ChannelHandler.h"
+#include "qpid/framing/SessionState.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/sys/StateMonitor.h"
#include "ExecutionHandler.h"
+#include <boost/optional.hpp>
+
namespace qpid {
+namespace framing {
+class FrameSet;
+class MethodContent;
+class SequenceNumberSet;
+}
+
namespace client {
class Future;
@@ -43,60 +51,90 @@ class ConnectionImpl;
* Attaches to a SessionHandler when active, detaches
* when closed.
*/
-class SessionCore : public framing::FrameHandler::InOutHandler
+class SessionCore : public framing::FrameHandler::InOutHandler,
+ private framing::AMQP_ClientOperations::SessionHandler
{
- struct Reason
- {
- uint16_t code;
- std::string text;
- };
-
- shared_ptr<ConnectionImpl> connection;
- uint16_t channel;
- SessionHandler l2;
- ExecutionHandler l3;
- framing::Uuid uuid;
- volatile bool sync;
- Reason reason;
-
- protected:
- void handleIn(framing::AMQFrame& frame);
- void handleOut(framing::AMQFrame& frame);
-
public:
SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize);
~SessionCore();
framing::FrameSet::shared_ptr get();
+ const framing::Uuid getId() const;
+ uint16_t getChannel() const { return channel; }
+ void assertOpen() const;
- framing::Uuid getId() const { return uuid; }
- void setId(const framing::Uuid& id) { uuid= id; }
-
- uint16_t getChannel() const { assert(channel); return channel; }
- void setChannel(uint16_t ch) { assert(ch); channel=ch; }
-
+ // NOTE: Public functions called in user thread.
void open(uint32_t detachedLifetime);
-
- /** Closed by client code */
void close();
-
- /** Closed by peer */
- void closed(uint16_t code, const std::string& text);
-
void resume(shared_ptr<ConnectionImpl>);
void suspend();
+ void setChannel(uint16_t channel);
- void setSync(bool);
+ void setSync(bool s);
bool isSync();
ExecutionHandler& getExecution();
- void checkClosed() const;
Future send(const framing::AMQBody& command);
+
Future send(const framing::AMQBody& command, const framing::MethodContent& content);
-};
-}
-}
+ void connectionClosed(uint16_t code, const std::string& text);
+ void connectionBroke(uint16_t code, const std::string& text);
+
+ private:
+ enum State {
+ OPENING,
+ RESUMING,
+ OPEN,
+ CLOSING,
+ SUSPENDING,
+ SUSPENDED,
+ CLOSED
+ };
+ typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler;
+ typedef sys::StateMonitor<State, CLOSED> StateMonitor;
+ typedef StateMonitor::Set States;
+
+ inline void invariant() const;
+ inline void setState(State s);
+ inline void waitFor(State);
+ void doClose(int code, const std::string& text);
+ void doSuspend(int code, const std::string& text);
+
+ /** If there is an error, throw the exception */
+ void check(bool condition, int code, const std::string& text) const;
+ /** Throw if *error */
+ void check() const;
+
+ void handleIn(framing::AMQFrame& frame);
+ void handleOut(framing::AMQFrame& frame);
+
+ // Private functions are called by broker in network thread.
+ void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+ void flow(bool active);
+ void flowOk(bool active);
+ void detached();
+ void ack(uint32_t cumulativeSeenMark,
+ const framing::SequenceNumberSet& seenFrameSet);
+ void highWaterMark(uint32_t lastSentMark);
+ void solicitAck();
+ void closed(uint16_t code, const std::string& text);
+
+ void attaching(shared_ptr<ConnectionImpl>);
+ void detach(int code, const std::string& text);
+ void checkOpen() const;
+
+ int code; // Error code
+ std::string text; // Error text
+ boost::optional<framing::SessionState> session;
+ shared_ptr<ConnectionImpl> connection;
+ ExecutionHandler l3;
+ volatile bool sync;
+ framing::ChannelHandler channel;
+ framing::AMQP_ServerProxy::Session proxy;
+ mutable StateMonitor state;
+};
+}} // namespace qpid::client
#endif
diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp
deleted file mode 100644
index d3b04e5356..0000000000
--- a/cpp/src/qpid/client/SessionHandler.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "SessionHandler.h"
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/all_method_bodies.h"
-#include "qpid/client/SessionCore.h"
-#include "qpid/framing/reply_exceptions.h"
-#include "qpid/log/Statement.h"
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace boost;
-
-namespace {
-// TODO aconway 2007-09-28: hack till we have multi-version support.
-ProtocolVersion version;
-}
-
-SessionHandler::SessionHandler(SessionCore& parent)
- : StateManager(CLOSED), core(parent) {}
-
-SessionHandler::~SessionHandler() {}
-
-void SessionHandler::handle(AMQFrame& frame)
-{
- AMQBody* body = frame.getBody();
- if (getState() == OPEN) {
- core.checkClosed();
- SessionClosedBody* closedBody=
- dynamic_cast<SessionClosedBody*>(body->getMethod());
- if (closedBody) {
- closed();
- core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
- } else {
- try {
- next->handle(frame);
- }
- catch(const ChannelException& e){
- QPID_LOG(error, "Channel exception:" << e.what());
- closed();
- AMQFrame f(0, SessionClosedBody(version, e.code, e.toString()));
- core.out(f);
- core.closed(closedBody->getReplyCode(), closedBody->getReplyText());
- }
- }
- } else {
- if (body->getMethod())
- handleMethod(body->getMethod());
- else
- throw ConnectionException(504, "Channel not open for content.");
- }
-}
-
-void SessionHandler::attach(const AMQMethodBody& command)
-{
- setState(OPENING);
- AMQFrame f(0, command);
- core.out(f);
- std::set<int> states;
- states.insert(OPEN);
- states.insert(CLOSED);
- waitFor(states);
- if (getState() != OPEN)
- throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel()));
-}
-
-void SessionHandler::open(uint32_t detachedLifetime) {
- attach(SessionOpenBody(version, detachedLifetime));
-}
-
-void SessionHandler::resume() {
- attach(SessionResumeBody(version, core.getId()));
-}
-
-void SessionHandler::detach(const AMQMethodBody& command)
-{
- setState(CLOSING);
- AMQFrame f(0, command);
- core.out(f);
- waitFor(CLOSED);
-}
-
-void SessionHandler::close() { detach(SessionCloseBody(version)); }
-void SessionHandler::suspend() { detach(SessionSuspendBody(version)); }
-void SessionHandler::closed() { setState(CLOSED); }
-
-void SessionHandler::handleMethod(AMQMethodBody* method)
-{
- switch (getState()) {
- case OPENING: {
- SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method);
- if (attached) {
- core.setId(attached->getSessionId());
- setState(OPEN);
- } else
- throw ChannelErrorException();
- break;
- }
- case CLOSING:
- if (method->isA<SessionClosedBody>() ||
- method->isA<SessionDetachedBody>())
- closed();
- break;
-
- case CLOSED:
- throw ChannelErrorException();
-
- default:
- assert(0);
- throw InternalErrorException(QPID_MSG("Internal Error."));
- }
-}
-
diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h
deleted file mode 100644
index 994b8402de..0000000000
--- a/cpp/src/qpid/client/SessionHandler.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _SessionHandler_
-#define _SessionHandler_
-
-#include "StateManager.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/shared_ptr.h"
-
-namespace qpid {
-namespace client {
-class SessionCore;
-
-/**
- * Handles incoming session (L2) commands.
- */
-class SessionHandler : public framing::FrameHandler,
- private StateManager
-{
- enum STATES {OPENING, OPEN, CLOSING, CLOSED};
- SessionCore& core;
-
- void handleMethod(framing::AMQMethodBody* method);
- void attach(const framing::AMQMethodBody&);
- void detach(const framing::AMQMethodBody&);
-
- public:
- SessionHandler(SessionCore& parent);
- ~SessionHandler();
-
- /** Incoming from broker */
- void handle(framing::AMQFrame&);
-
- void open(uint32_t detachedLifetime);
- void resume();
- void close();
- void closed();
- void suspend();
-};
-
-}}
-
-#endif
diff --git a/cpp/src/qpid/client/StateManager.cpp b/cpp/src/qpid/client/StateManager.cpp
index b72967c098..0cb3c6b9d4 100644
--- a/cpp/src/qpid/client/StateManager.cpp
+++ b/cpp/src/qpid/client/StateManager.cpp
@@ -60,7 +60,7 @@ void StateManager::setState(int s)
stateLock.notifyAll();
}
-int StateManager::getState()
+int StateManager::getState() const
{
Monitor::ScopedLock l(stateLock);
return state;
diff --git a/cpp/src/qpid/client/StateManager.h b/cpp/src/qpid/client/StateManager.h
index fd0c1b7f86..2f8ecb772c 100644
--- a/cpp/src/qpid/client/StateManager.h
+++ b/cpp/src/qpid/client/StateManager.h
@@ -30,12 +30,12 @@ namespace client {
class StateManager
{
int state;
- sys::Monitor stateLock;
+ mutable sys::Monitor stateLock;
public:
StateManager(int initial);
void setState(int state);
- int getState();
+ int getState() const ;
void waitForStateChange(int current);
void waitFor(std::set<int> states);
void waitFor(int state);
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index abd33c4158..423af06173 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -20,9 +20,9 @@
*/
#include "AMQFrame.h"
-#include "qpid/QpidError.h"
#include "qpid/framing/variant.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
@@ -103,7 +103,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t flags = buffer.getOctet();
uint8_t framing_version = (flags & 0xc0) >> 6;
if (framing_version != 0)
- THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported");
+ throw SyntaxErrorException(QPID_MSG("Framing version unsupported"));
bof = flags & 0x08;
eof = flags & 0x04;
bos = flags & 0x02;
@@ -111,7 +111,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t type = buffer.getOctet();
uint16_t frame_size = buffer.getShort();
if (frame_size < frameOverhead()-1)
- THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small");
+ throw SyntaxErrorException(QPID_MSG("Frame size too small"));
uint8_t reserved1 = buffer.getOctet();
uint8_t field1 = buffer.getOctet();
subchannel = field1 & 0x0f;
@@ -121,7 +121,7 @@ bool AMQFrame::decode(Buffer& buffer)
// Verify that the protocol header meets current spec
// TODO: should we check reserved2 against zero as well? - the spec isn't clear
if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0)
- THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero");
+ throw SyntaxErrorException(QPID_MSG("Reserved bits not zero"));
// TODO: should no longer care about body size and only pass up B,E,b,e flags
uint16_t body_size = frame_size + 1 - frameOverhead();
@@ -133,7 +133,7 @@ bool AMQFrame::decode(Buffer& buffer)
uint8_t end = buffer.getOctet();
if (end != 0xCE)
- THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
+ throw SyntaxErrorException(QPID_MSG("Frame end not found"));
return true;
}
@@ -147,9 +147,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type)
case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break;
default:
- THROW_QPID_ERROR(
- FRAMING_ERROR,
- boost::format("Unknown frame type %d") % type);
+ throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type));
}
boost::apply_visitor(DecodeVisitor(buffer,size), body);
}
diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp
index 53a01141c1..fb84be7cd6 100644
--- a/cpp/src/qpid/framing/BodyHandler.cpp
+++ b/cpp/src/qpid/framing/BodyHandler.cpp
@@ -18,14 +18,13 @@
* under the License.
*
*/
-#include "qpid/QpidError.h"
#include "BodyHandler.h"
#include "AMQMethodBody.h"
#include "AMQHeaderBody.h"
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
-
#include <boost/cast.hpp>
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::framing;
using namespace boost;
@@ -49,7 +48,8 @@ void BodyHandler::handleBody(AMQBody* body) {
handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body));
break;
default:
- QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type());
+ throw SyntaxErrorException(
+ QPID_MSG("Invalid frame type " << body->type()));
}
}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index 6a466fdfab..8c1a4e1e9e 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -15,8 +15,6 @@
* limitations under the License.
*
*/
-#include <boost/format.hpp>
-
#include "ChannelAdapter.h"
#include "OutputHandler.h"
#include "AMQFrame.h"
@@ -26,8 +24,6 @@
#include "AMQMethodBody.h"
#include "qpid/framing/ConnectionOpenBody.h"
-using boost::format;
-
namespace qpid {
namespace framing {
@@ -53,20 +49,20 @@ void ChannelAdapter::send(const AMQBody& body)
void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const {
if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID)
- throw ConnectionException(
- 504, format("Connection method on non-0 channel %d.")%getId());
+ throw ChannelErrorException(
+ QPID_MSG("Connection method on non-0 channel " << getId()));
}
void ChannelAdapter::assertChannelOpen() const {
if (getId() != 0 && !isOpen())
- throw ConnectionException(
- 504, format("Channel %d is not open.")%getId());
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << getId() << " is not open."));
}
void ChannelAdapter::assertChannelNotOpen() const {
if (getId() != 0 && isOpen())
- throw ConnectionException(
- 504, format("Channel %d is already open.") % getId());
+ throw ChannelErrorException(
+ QPID_MSG("Channel " << getId() << " is already open."));
}
void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); }
diff --git a/cpp/src/qpid/framing/ProtocolVersionException.h b/cpp/src/qpid/framing/ChannelHandler.h
index bd16804470..69aaeac492 100644
--- a/cpp/src/qpid/framing/ProtocolVersionException.h
+++ b/cpp/src/qpid/framing/ChannelHandler.h
@@ -1,3 +1,6 @@
+#ifndef QPID_FRAMING_CHANNELHANDLER_H
+#define QPID_FRAMING_CHANNELHANDLER_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,39 +21,33 @@
* under the License.
*
*/
-
-#ifndef _ProtocolVersionException_
-#define _ProtocolVersionException_
-
-#include "qpid/Exception.h"
-#include "ProtocolVersion.h"
-#include <string>
-#include <vector>
+#include "FrameHandler.h"
+#include "AMQFrame.h"
namespace qpid {
namespace framing {
-class ProtocolVersionException : public qpid::Exception
+/**
+ * Sets the channel number on outgoing frames.
+ */
+class ChannelHandler : public FrameHandler
{
-protected:
- ProtocolVersion versionFound;
-
-public:
- ~ProtocolVersionException() throw() {}
-
- template <class T>
- ProtocolVersionException(
- ProtocolVersion ver, const T& msg) throw () : versionFound(ver)
- { init(boost::lexical_cast<std::string>(msg)); }
-
- template <class T>
- ProtocolVersionException(const T& msg) throw ()
- { init(boost::lexical_cast<std::string>(msg)); }
+ public:
+ ChannelHandler(uint16_t channelId=0, FrameHandler* next=0)
+ : FrameHandler(next), channel(channelId) {}
+ void handle(AMQFrame& frame) {
+ frame.setChannel(channel);
+ next->handle(frame);
+ }
+ uint16_t get() const { return channel; }
+ ChannelHandler& set(uint16_t ch) { channel=ch; return *this; }
+ operator uint16_t() const { return get(); }
+ ChannelHandler& operator=(uint16_t ch) { return set(ch); }
private:
- void init(const std::string& msg);
+ uint16_t channel;
};
}} // namespace qpid::framing
-#endif //ifndef _ProtocolVersionException_
+#endif /*!QPID_FRAMING_CHANNELHANDLER_H*/
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index 3c0284f2c8..089bc5d4a5 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -19,9 +19,10 @@
*
*/
#include "FieldTable.h"
-#include "qpid/QpidError.h"
#include "Buffer.h"
#include "FieldValue.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
#include <assert.h>
namespace qpid {
@@ -132,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){
uint32_t len = buffer.getLong();
uint32_t available = buffer.available();
if (available < len)
- THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table.");
+ throw SyntaxErrorException(QPID_MSG("Not enough data for field table."));
uint32_t leftover = available - len;
while(buffer.available() > leftover){
std::string name;
diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp
index a7535ae4b9..5526c9cb72 100644
--- a/cpp/src/qpid/framing/FieldValue.cpp
+++ b/cpp/src/qpid/framing/FieldValue.cpp
@@ -20,8 +20,7 @@
*/
#include "FieldValue.h"
#include "Buffer.h"
-#include "qpid/QpidError.h"
-
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -75,9 +74,7 @@ void FieldValue::decode(Buffer& buffer)
data.reset(new FixedWidthValue<0>());
break;
default:
- std::stringstream out;
- out << "Unknown field table value type: " << typeOctet;
- THROW_QPID_ERROR(FRAMING_ERROR, out.str());
+ throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet));
}
data->decode(buffer);
}
diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp
index 813e6fb49b..cd134b0e89 100644
--- a/cpp/src/qpid/framing/FramingContent.cpp
+++ b/cpp/src/qpid/framing/FramingContent.cpp
@@ -18,12 +18,10 @@
* under the License.
*
*/
-#include <assert.h>
-
#include "Buffer.h"
#include "FramingContent.h"
-#include "qpid/QpidError.h"
-#include <sstream>
+#include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -37,12 +35,12 @@ Content::Content(uint8_t _discriminator, const string& _value): discriminator(_d
void Content::validate() {
if (discriminator == REFERENCE) {
if(value.empty()) {
- THROW_QPID_ERROR(FRAMING_ERROR, "Reference cannot be empty");
+ throw InvalidArgumentException(
+ QPID_MSG("Reference cannot be empty"));
}
}else if (discriminator != INLINE) {
- std::stringstream out;
- out << "Invalid discriminator: " << (int) discriminator;
- THROW_QPID_ERROR(FRAMING_ERROR, out.str());
+ throw SyntaxErrorException(
+ QPID_MSG("Invalid discriminator: " << discriminator));
}
}
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index 3e55dff1bd..fbf3c0b7ca 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -33,6 +33,7 @@ template <class T>
struct Handler {
typedef T HandledType;
typedef void handleFptr(T);
+ typedef void result_type; // Compatible with std/boost functors.
Handler(Handler<T>* next_=0) : next(next_) {}
virtual ~Handler() {}
@@ -51,7 +52,7 @@ struct Handler {
struct Chain : public Handler<T> {
Chain(Handler<T>* first=0) : Handler(first) {}
void operator=(Handler<T>* h) { next = h; }
- void handle(T t) { (*next)(t); }
+ void handle(T t) { next->handle(t); }
// TODO aconway 2007-08-29: chain modifier ops here.
};
diff --git a/cpp/src/qpid/framing/ProtocolVersionException.cpp b/cpp/src/qpid/framing/ProtocolVersionException.cpp
deleted file mode 100644
index b68b3af1f9..0000000000
--- a/cpp/src/qpid/framing/ProtocolVersionException.cpp
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/format.hpp>
-#include "ProtocolVersionException.h"
-
-
-using namespace qpid::framing;
-
-void ProtocolVersionException::init(const std::string& msg)
-{
- whatStr = boost::str(
- boost::format("ProtocolVersionException: %s found: %s")
- % versionFound.toString() % msg);
-}
-
diff --git a/cpp/src/qpid/framing/ResumeHandler.cpp b/cpp/src/qpid/framing/ResumeHandler.cpp
deleted file mode 100644
index 9d2c971459..0000000000
--- a/cpp/src/qpid/framing/ResumeHandler.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ResumeHandler.h"
-#include "qpid/framing/reply_exceptions.h"
-
-#include <boost/bind.hpp>
-
-#include <algorithm>
-
-namespace qpid {
-namespace framing {
-
-void ResumeHandler::ackReceived(SequenceNumber acked) {
- if (lastSent < acked)
- throw InvalidArgumentException("Invalid sequence number in ack");
- size_t keep = lastSent - acked;
- if (keep < unacked.size())
- unacked.erase(unacked.begin(), unacked.end()-keep);
-}
-
-void ResumeHandler::resend() {
- std::for_each(unacked.begin(), unacked.end(),
- boost::bind(&FrameHandler::handle,out->next, _1));
-}
-
-void ResumeHandler::handleIn(AMQFrame& f) {
- ++lastReceived;
- in.next->handle(f);
-}
-
-void ResumeHandler::handleOut(AMQFrame& f) {
- ++lastSent;
- unacked.push_back(f);
- out.next->handle(f);
-}
-
-
-}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/ResumeHandler.h b/cpp/src/qpid/framing/ResumeHandler.h
deleted file mode 100644
index c86a60b9cb..0000000000
--- a/cpp/src/qpid/framing/ResumeHandler.h
+++ /dev/null
@@ -1,69 +0,0 @@
-#ifndef QPID_FRAMING_RESUMEHANDLER_H
-#define QPID_FRAMING_RESUMEHANDLER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/SequenceNumber.h"
-
-#include <deque>
-
-namespace qpid {
-namespace framing {
-
-/**
- * In/out handler pair for managing exactly-once session delivery.
- * The same handler is used by client and broker.
- * This handler only deals with TCP style SequenceNumber acks,
- * not with fragmented SequenceNumberSet.
- *
- * THREAD UNSAFE. Expected to be used in a serialized context.
- */
-class ResumeHandler : public FrameHandler::InOutHandler
-{
- public:
- /** Received acknowledgement for sent frames up to and including sentOk */
- void ackReceived(SequenceNumber sentOk);
-
- /** What was the last sequence number we received. */
- SequenceNumber getLastReceived() { return lastReceived; }
-
- /** Resend the unacked frames to the output handler */
- void resend();
-
- protected:
- void handleIn(AMQFrame&);
- void handleOut(AMQFrame&);
-
- private:
- typedef std::deque<AMQFrame> Frames;
- Frames unacked;
- SequenceNumber lastReceived;
- SequenceNumber lastSent;
-};
-
-
-}} // namespace qpid::common
-
-
-#endif /*!QPID_FRAMING_RESUMEHANDLER_H*/
diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h
index 9b8f0659b2..3aee04a4ce 100644
--- a/cpp/src/qpid/framing/SequenceNumber.h
+++ b/cpp/src/qpid/framing/SequenceNumber.h
@@ -47,6 +47,7 @@ class SequenceNumber
bool operator<=(const SequenceNumber& other) const;
bool operator>=(const SequenceNumber& other) const;
uint32_t getValue() const { return (uint32_t) value; }
+ operator uint32_t() const { return (uint32_t) value; }
friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b);
};
diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp
new file mode 100644
index 0000000000..045a0ae115
--- /dev/null
+++ b/cpp/src/qpid/framing/SessionState.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SessionState.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/constants.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+#include <boost/bind.hpp>
+#include <boost/none.hpp>
+
+namespace qpid {
+namespace framing {
+
+SessionState::SessionState(uint32_t ack, const Uuid& uuid) :
+ state(ATTACHED),
+ id(uuid),
+ lastReceived(-1),
+ lastSent(-1),
+ ackInterval(ack),
+ sendAckAt(lastReceived+ackInterval),
+ solicitAckAt(lastSent+ackInterval),
+ ackSolicited(false)
+{
+ assert(ackInterval > 0);
+}
+
+namespace {
+bool isSessionCommand(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID;
+}
+}
+
+boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) {
+ if (isSessionCommand(f))
+ return boost::none;
+ if (state==RESUMING)
+ throw CommandInvalidException(
+ QPID_MSG("Invalid frame: Resuming session, expected session-ack"));
+ assert(state = ATTACHED);
+ assert(lastReceived<sendAckAt);
+ ++lastReceived;
+ QPID_LOG(trace, "Recv # "<< lastReceived << " " << id);
+ if (lastReceived == sendAckAt)
+ return sendingAck();
+ else
+ return boost::none;
+}
+
+bool SessionState::sent(const AMQFrame& f) {
+ if (isSessionCommand(f))
+ return false;
+ unackedOut.push_back(f);
+ ++lastSent;
+ QPID_LOG(trace, "Sent # "<< lastSent << " " << id);
+ return (state!=RESUMING) &&
+ (lastSent == solicitAckAt) &&
+ sendingSolicit();
+}
+
+SessionState::Replay SessionState::replay() {
+ Replay r(unackedOut.size());
+ std::copy(unackedOut.begin(), unackedOut.end(), r.begin());
+ return r;
+}
+
+void SessionState::receivedAck(SequenceNumber acked) {
+ if (state==RESUMING) state=ATTACHED;
+ assert(state==ATTACHED);
+ if (lastSent < acked)
+ throw InvalidArgumentException("Invalid sequence number in ack");
+ size_t keep = lastSent - acked;
+ if (keep < unackedOut.size())
+ unackedOut.erase(unackedOut.begin(), unackedOut.end()-keep);
+ solicitAckAt = std::max(solicitAckAt, SequenceNumber(acked+ackInterval));
+}
+
+SequenceNumber SessionState::sendingAck() {
+ sendAckAt = lastReceived+ackInterval;
+ return lastReceived;
+}
+
+bool SessionState::sendingSolicit() {
+ assert(state == ATTACHED);
+ if (ackSolicited)
+ return false;
+ solicitAckAt = lastSent + ackInterval;
+ return true;
+}
+
+SequenceNumber SessionState::resuming() {
+ state = RESUMING;
+ return sendingAck();
+}
+
+void SessionState::suspend() {
+ state = SUSPENDED;
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h
new file mode 100644
index 0000000000..66fc083d3f
--- /dev/null
+++ b/cpp/src/qpid/framing/SessionState.h
@@ -0,0 +1,127 @@
+#ifndef QPID_FRAMING_SESSIONSTATE_H
+#define QPID_FRAMING_SESSIONSTATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/AMQFrame.h"
+
+#include <boost/optional.hpp>
+
+#include <deque>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Session state common to client and broker.
+ * Implements session ack/resume protcools.
+ *
+ * A SessionState is always associated with an _open_ session (attached or
+ * suspended) it is destroyed when the session is closed.
+ *
+ * A template to make it protocol independent and easy to test.
+ */
+class SessionState
+{
+ public:
+ typedef std::vector<AMQFrame> Replay;
+
+ /** States of a session. */
+ enum State {
+ SUSPENDED, ///< Suspended, detached from any channel.
+ RESUMING, ///< Resuming: waiting for initial ack from peer.
+ ATTACHED ///< Attached to channel and operating normally.
+ };
+
+ /**
+ *Create a newly opened active session.
+ *@param ackInterval send/solicit an ack whenever N unacked frames
+ * have been received/sent.
+ *@pre ackInterval > 0
+ */
+ SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true));
+
+ const framing::Uuid& getId() const { return id; }
+ State getState() const { return state; }
+
+ /** Received incoming L3 frame.
+ * @return SequenceNumber if an ack should be sent, empty otherwise.
+ * SessionState assumes that acks are sent whenever it returns
+ * a seq. number.
+ */
+ boost::optional<SequenceNumber> received(const AMQFrame&);
+
+ /** Sent outgoing L3 frame.
+ *@return true if solicit-ack should be sent. Note the SessionState
+ *assumes that a solicit-ack is sent every time it returns true.
+ */
+ bool sent(const AMQFrame&);
+
+ /** Received normal incoming ack. */
+ void receivedAck(SequenceNumber);
+
+ /** Frames to replay
+ *@pre getState()==ATTACHED
+ */
+ Replay replay();
+
+ /** Suspend the session. */
+ void suspend();
+
+ /** Start resume protocol for the session.
+ *@returns sequence number to ack immediately. */
+ SequenceNumber resuming();
+
+ /** About to send an unscheduled ack, e.g. to respond to a solicit-ack.
+ *
+ * Note: when received() returns a sequence number this function
+ * should not be called. SessionState assumes that the ack is sent
+ * every time received() returns a sequence number.
+ */
+ SequenceNumber sendingAck();
+
+ SequenceNumber getLastSent() const { return lastSent; }
+ SequenceNumber getLastReceived() const { return lastReceived; }
+ private:
+ typedef std::deque<AMQFrame> Unacked;
+
+ bool sendingSolicit();
+
+ State state;
+ framing::Uuid id;
+ Unacked unackedOut;
+ SequenceNumber lastReceived;
+ SequenceNumber lastSent;
+ uint32_t ackInterval;
+ SequenceNumber sendAckAt;
+ SequenceNumber solicitAckAt;
+ bool ackSolicited;
+ bool suspending;
+};
+
+
+}} // namespace qpid::common
+
+
+#endif /*!QPID_FRAMING_SESSIONSTATE_H*/
diff --git a/cpp/src/qpid/framing/TemplateVisitor.h b/cpp/src/qpid/framing/TemplateVisitor.h
new file mode 100644
index 0000000000..8c719e5110
--- /dev/null
+++ b/cpp/src/qpid/framing/TemplateVisitor.h
@@ -0,0 +1,89 @@
+#ifndef QPID_FRAMING_TEMPLATEVISITOR_H
+#define QPID_FRAMING_TEMPLATEVISITOR_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/mpl/fold.hpp>
+#include <boost/utility/value_init.hpp>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Metafunction to generate a visitor class derived from Base with a
+ * visit for each type in TypeList calling functor F. TypeList may be
+ * any boost::mpl type collection e.g. mpl::list.
+ *
+ * Generated class is: TemplateVisitor<Base, F, TypeList>::type
+ *
+ * @see make_visitor
+ */
+template <class VisitTemplate, class TypeList, class F>
+class TemplateVisitor
+{
+ struct Base : public VisitorBase {
+ F action;
+ Base(F f) : action(f) {}
+ using VisitorBase::visit;
+ };
+
+ template <class B, class T> struct Visit : public B {
+ Visit(F action) : B(action) {}
+ using B::visit;
+ void visit(const T& body) { action(body); }
+ };
+
+ typedef typename boost::mpl::fold<
+ TypeList, Base, Visit<boost::mpl::placeholders::_1,
+ boost::mpl::placeholders::_2>
+ >::type type;
+};
+
+/**
+ * Construct a TemplateVisitor to perform the given action,
+ * for example:
+ * @code
+ */
+template <class VisitorBase, class TypeList, class F>
+TemplateVisitor<VisitorBase,TypeList,F>::type make_visitor(F action) {
+ return TemplateVisitor<VisitorBase,TypeList,F>::type(action);
+};
+
+/**
+ * For method body classes in TypeList, invoke the corresponding function
+ * on Target and return true. For other body types return false.
+ */
+template <class TypeList, class Target>
+bool invoke(const AMQBody& body, Target& target) {
+ typename InvokeVisitor<TypeList, Target>::type v(target);
+ body.accept(v);
+ return v.target;
+}
+
+}} // namespace qpid::framing
+
+
+#endif /*!QPID_FRAMING_INVOKEVISITOR_H*/
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!QPID_FRAMING_TEMPLATEVISITOR_H*/
diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp
index e0372b2f68..1bb69fbca9 100644
--- a/cpp/src/qpid/framing/TransferContent.cpp
+++ b/cpp/src/qpid/framing/TransferContent.cpp
@@ -24,9 +24,13 @@
namespace qpid {
namespace framing {
-TransferContent::TransferContent(const std::string& _data)
+TransferContent::TransferContent(const std::string& data,
+ const std::string& routingKey,
+ const std::string& exchange)
{
- setData(_data);
+ setData(data);
+ getDeliveryProperties().setRoutingKey(routingKey);
+ getDeliveryProperties().setExchange(exchange);
}
AMQHeaderBody TransferContent::getHeader() const
@@ -73,14 +77,14 @@ void TransferContent::populate(const FrameSet& frameset)
const MessageProperties& TransferContent::getMessageProperties() const
{
const MessageProperties* props = header.get<MessageProperties>();
- if (!props) throw NoSuchPropertiesException();
+ if (!props) throw Exception("No message properties.");
return *props;
}
const DeliveryProperties& TransferContent::getDeliveryProperties() const
{
const DeliveryProperties* props = header.get<DeliveryProperties>();
- if (!props) throw NoSuchPropertiesException();
+ if (!props) throw Exception("No message properties.");
return *props;
}
diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h
index 6fd96f3587..88f45b7e0a 100644
--- a/cpp/src/qpid/framing/TransferContent.h
+++ b/cpp/src/qpid/framing/TransferContent.h
@@ -30,14 +30,15 @@
namespace qpid {
namespace framing {
-struct NoSuchPropertiesException : public Exception {};
-
class TransferContent : public MethodContent
{
AMQHeaderBody header;
std::string data;
public:
- TransferContent(const std::string& data = "");
+ TransferContent(const std::string& data = std::string(),
+ const std::string& routingKey = std::string(),
+ const std::string& exchange = std::string());
+
AMQHeaderBody getHeader() const;
void setData(const std::string&);
void appendData(const std::string&);
diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp
index 3a83430d56..2918c48ce3 100644
--- a/cpp/src/qpid/framing/Uuid.cpp
+++ b/cpp/src/qpid/framing/Uuid.cpp
@@ -17,9 +17,9 @@
*/
#include "Uuid.h"
-
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/framing/reply_exceptions.h"
namespace qpid {
namespace framing {
@@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const {
void Uuid::decode(Buffer& buf) {
if (buf.available() < size())
- THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for UUID.");
+ throw SyntaxErrorException(QPID_MSG("Not enough data for UUID."));
buf.getRawData(c_array(), size());
}
@@ -52,4 +52,10 @@ istream& operator>>(istream& in, Uuid& uuid) {
return in;
}
+std::string Uuid::str() const {
+ std::ostringstream os;
+ os << *this;
+ return os.str();
+}
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h
index 19ae79db6a..9bde67ad8e 100644
--- a/cpp/src/qpid/framing/Uuid.h
+++ b/cpp/src/qpid/framing/Uuid.h
@@ -62,6 +62,9 @@ struct Uuid : public boost::array<uint8_t, 16> {
void encode(framing::Buffer& buf) const;
void decode(framing::Buffer& buf);
+
+ /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */
+ std::string str() const;
};
/** Print in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */
diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h
index eec28333bc..69b5942ba0 100644
--- a/cpp/src/qpid/framing/amqp_framing.h
+++ b/cpp/src/qpid/framing/amqp_framing.h
@@ -32,4 +32,3 @@
#include "ProtocolInitiation.h"
#include "BasicHeaderProperties.h"
#include "ProtocolVersion.h"
-#include "ProtocolVersionException.h"
diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h
index a788fe36e4..94442aa357 100644
--- a/cpp/src/qpid/framing/amqp_types.h
+++ b/cpp/src/qpid/framing/amqp_types.h
@@ -61,5 +61,11 @@ class Uuid;
const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1;
const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX);
+// Forward declare class types
+class FramingContent;
+class FieldTable;
+class SequenceNumberSet;
+class Uuid;
+
}} // namespace qpid::framing
#endif
diff --git a/cpp/src/qpid/framing/variant.h b/cpp/src/qpid/framing/variant.h
index 3cb8aece5d..1fe81f8f67 100644
--- a/cpp/src/qpid/framing/variant.h
+++ b/cpp/src/qpid/framing/variant.h
@@ -23,7 +23,6 @@
/**@file Tools for using boost::variant */
-#include "qpid/QpidError.h"
#include <boost/variant.hpp>
@@ -39,7 +38,7 @@ template <class R=void>
struct NoBlankVisitor : public boost::static_visitor<R> {
R foundBlank() const {
assert(0);
- THROW_QPID_ERROR(INTERNAL_ERROR, "Invalid variant value.");
+ throw Exception(QPID_MSG("Invalid variant value."));
}
R operator()(const boost::blank&) const { return foundBlank(); }
R operator()(boost::blank&) const { return foundBlank(); }
diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp
index 6e8f3a59cc..c483ed2379 100644
--- a/cpp/src/qpid/log/Logger.cpp
+++ b/cpp/src/qpid/log/Logger.cpp
@@ -190,12 +190,6 @@ void Logger::add(Statement& s) {
statements.insert(&s);
}
-void Logger::remove(Statement& s) {
- ScopedLock l(lock);
- s.enabled = false;
- statements.erase(&s);
-}
-
void Logger::configure(const Options& opts, const std::string& prog)
{
clear();
diff --git a/cpp/src/qpid/log/Logger.h b/cpp/src/qpid/log/Logger.h
index a2103f5ec6..7851c65406 100644
--- a/cpp/src/qpid/log/Logger.h
+++ b/cpp/src/qpid/log/Logger.h
@@ -67,9 +67,6 @@ class Logger : private boost::noncopyable {
/** Add a statement. */
void add(Statement& s);
- /** Remove a statement */
- void remove(Statement& s);
-
/** Log a message. */
void log(const Statement&, const std::string&);
@@ -93,7 +90,7 @@ class Logger : private boost::noncopyable {
/** Add an output destination for messages */
void output(std::auto_ptr<Output> out);
- /** Reset the logger to it's original state */
+ /** Reset the logger to it's original state. */
void clear();
private:
diff --git a/cpp/src/qpid/log/Statement.cpp b/cpp/src/qpid/log/Statement.cpp
index 9ab314b81c..de130bc455 100644
--- a/cpp/src/qpid/log/Statement.cpp
+++ b/cpp/src/qpid/log/Statement.cpp
@@ -32,10 +32,6 @@ Statement::Initializer::Initializer(Statement& s) : statement(s) {
Logger::instance().add(s);
}
-Statement::Initializer::~Initializer() {
- Logger::instance().remove(statement);
-}
-
namespace {
const char* names[LevelTraits::COUNT] = {
"trace", "debug", "info", "notice", "warning", "error", "critical"
diff --git a/cpp/src/qpid/log/Statement.h b/cpp/src/qpid/log/Statement.h
index 4eb4d1e7d8..18162971b0 100644
--- a/cpp/src/qpid/log/Statement.h
+++ b/cpp/src/qpid/log/Statement.h
@@ -19,8 +19,9 @@
*
*/
+#include "qpid/Msg.h"
+
#include <boost/current_function.hpp>
-#include <sstream>
namespace qpid {
namespace log {
@@ -69,23 +70,14 @@ struct Statement {
struct Initializer {
Initializer(Statement& s);
- ~Initializer();
Statement& statement;
};
};
-///@internal trickery to make QPID_LOG_STRINGSTREAM work.
-inline std::ostream& noop(std::ostream& s) { return s; }
-
///@internal static initializer for a Statement.
#define QPID_LOG_STATEMENT_INIT(level) \
{ 0, __FILE__, __LINE__, BOOST_CURRENT_FUNCTION, (::qpid::log::level) }
-///@internal Stream streamable message and return a string.
-#define QPID_LOG_STRINGSTREAM(message) \
- static_cast<std::ostringstream&>( \
- std::ostringstream() << qpid::log::noop << message).str()
-
/**
* Macro for log statements. Example of use:
* @code
@@ -110,19 +102,9 @@ inline std::ostream& noop(std::ostream& s) { return s; }
static ::qpid::log::Statement stmt_= QPID_LOG_STATEMENT_INIT(level); \
static ::qpid::log::Statement::Initializer init_(stmt_); \
if (stmt_.enabled) \
- stmt_.log(QPID_LOG_STRINGSTREAM(message)); \
+ stmt_.log(::qpid::Msg() << message); \
} while(0)
-/**
- * Macro for complicated logging logic that can't fit in a simple QPID_LOG
- * statement. For example:
- * @code
- * QPID_IF_LOG(debug) {
- * message = do_complicated_stuff;
- * QPID_LOG(debug, message);
- * }
- */
-#define QPID_IF_LOG(level)
}} // namespace qpid::log
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 733a892cff..eccfb1465e 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -29,6 +29,7 @@
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
@@ -301,7 +302,7 @@ void AsynchIOHandler::idle(AsynchIO&){
}
// If frame was egregiously large complain
if (frameSize > buff->byteCount)
- THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+ throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
buff->dataCount = buffUsed;
aio->queueWrite(buff);
diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h
index 917afc5704..cf8199954e 100644
--- a/cpp/src/qpid/sys/ConcurrentQueue.h
+++ b/cpp/src/qpid/sys/ConcurrentQueue.h
@@ -22,7 +22,7 @@
*
*/
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Waitable.h"
#include "qpid/sys/ScopedIncrement.h"
#include <boost/bind.hpp>
@@ -39,73 +39,73 @@ namespace sys {
*
* Also allows consuming threads to wait until an item is available.
*/
-template <class T> class ConcurrentQueue {
+template <class T> class ConcurrentQueue : public Waitable {
public:
- ConcurrentQueue() : waiters(0), shutdown(false) {}
+ struct ShutdownException {};
+
+ ConcurrentQueue() : shutdownFlag(false) {}
- /** Threads in wait() are woken with ShutdownException before
- * destroying the queue.
- */
- ~ConcurrentQueue() {
- Mutex::ScopedLock l(lock);
- shutdown = true;
- lock.notifyAll();
- while (waiters > 0)
- lock.wait();
+ /** Waiting threads are notified by ~Waitable */
+ ~ConcurrentQueue() { shutdown(); }
+
+ bool shutdown(bool wait=true) {
+ ScopedLock l(lock);
+ if (!shutdownFlag) {
+ shutdownFlag=true;
+ lock.notifyAll();
+ if (wait) lock.waitAll();
+ shutdownFlag=true;
+ return true;
+ }
+ return false;
}
-
+
/** Push a data item onto the back of the queue */
void push(const T& data) {
Mutex::ScopedLock l(lock);
queue.push_back(data);
+ lock.notify();
}
/** If the queue is non-empty, pop the front item into data and
* return true. If the queue is empty, return false
*/
- bool pop(T& data) {
+ bool tryPop(T& data) {
Mutex::ScopedLock l(lock);
- return popInternal(data);
+ if (shutdownFlag || queue.empty())
+ return false;
+ data = queue.front();
+ queue.pop_front();
+ return true;
}
- /** Wait up to deadline for a data item to be available.
- *@return true if data was available, false if timed out.
+ /** Wait up to a timeout for a data item to be available.
+ *@return true if data was available, false if timed out or shut down.
*@throws ShutdownException if the queue is destroyed.
*/
- bool waitPop(T& data, Duration timeout) {
- Mutex::ScopedLock l(lock);
- ScopedIncrement<size_t> w(
- waiters, boost::bind(&ConcurrentQueue::noWaiters, this));
+ bool waitPop(T& data, Duration timeout=TIME_INFINITE) {
+ ScopedLock l(lock);
AbsTime deadline(now(), timeout);
- while (queue.empty() && lock.wait(deadline))
- ;
- return popInternal(data);
- }
-
- private:
-
- bool popInternal(T& data) {
- if (shutdown)
- throw ShutdownException();
+ {
+ ScopedWait(*this);
+ while (!shutdownFlag && queue.empty())
+ if (!lock.wait(deadline))
+ return false;
+ }
if (queue.empty())
return false;
- else {
- data = queue.front();
- queue.pop_front();
- return true;
- }
+ data = queue.front();
+ queue.pop_front();
+ return true;
}
+
+ bool isShutdown() { ScopedLock l(lock); return shutdownFlag; }
- void noWaiters() {
- assert(waiters == 0);
- if (shutdown)
- lock.notify(); // Notify dtor thread.
- }
-
- Monitor lock;
+ protected:
+ Waitable lock;
+ private:
std::deque<T> queue;
- size_t waiters;
- bool shutdown;
+ bool shutdownFlag;
};
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ScopedIncrement.h b/cpp/src/qpid/sys/ScopedIncrement.h
index ba9e89ba5f..8645ab2484 100644
--- a/cpp/src/qpid/sys/ScopedIncrement.h
+++ b/cpp/src/qpid/sys/ScopedIncrement.h
@@ -30,17 +30,17 @@ namespace sys {
* Optionally call a function if the decremented counter value is 0.
* Note the function must not throw, it is called in the destructor.
*/
-template <class T>
+template <class T, class F=boost::function<void()> >
class ScopedIncrement : boost::noncopyable
{
public:
- ScopedIncrement(T& c, boost::function0<void> f=0)
+ ScopedIncrement(T& c, F f=0)
: count(c), callback(f) { ++count; }
~ScopedIncrement() { if (--count == 0 && callback) callback(); }
private:
T& count;
- boost::function0<void> callback;
+ F callback;
};
diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h
index 085d51d7e2..7bb3b07ae0 100644
--- a/cpp/src/qpid/sys/Serializer.h
+++ b/cpp/src/qpid/sys/Serializer.h
@@ -23,7 +23,7 @@
*
*/
-
+#include "qpid/Exception.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Thread.h"
@@ -41,6 +41,8 @@ class SerializerBase : private boost::noncopyable, private Runnable
{
public:
typedef boost::function<void()> VoidFn0;
+ struct ShutdownException : public Exception {};
+
/** @see Serializer::Serializer */
SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0());
diff --git a/cpp/src/qpid/sys/StateMonitor.h b/cpp/src/qpid/sys/StateMonitor.h
new file mode 100644
index 0000000000..5a92756f3a
--- /dev/null
+++ b/cpp/src/qpid/sys/StateMonitor.h
@@ -0,0 +1,78 @@
+#ifndef QPID_SYS_STATEMONITOR_H
+#define QPID_SYS_STATEMONITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Waitable.h"
+
+#include <bitset>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor with an enum state value.
+ *
+ *@param Enum: enum type to use for states.
+ *@param EnumMax: Highest enum value.
+ */
+template <class Enum, size_t MaxEnum>
+class StateMonitor : public Waitable
+{
+ public:
+ struct Set : public std::bitset<MaxEnum + 1> {
+ Set() {}
+ Set(Enum s) { set(s); }
+ Set(Enum s, Enum t) { set(s).set(t); }
+ Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); }
+ Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); }
+ };
+
+
+ StateMonitor(Enum initial) { state=initial; }
+
+ /** @pre Caller holds a ScopedLock. */
+ void set(Enum s) { state=s; notifyAll(); }
+ /** @pre Caller holds a ScopedLock. */
+ StateMonitor& operator=(Enum s) { set(s); return *this; }
+
+ /** @pre Caller holds a ScopedLock. */
+ Enum get() const { return state; }
+ /** @pre Caller holds a ScopedLock. */
+ operator Enum() const { return state; }
+
+ /** @pre Caller holds a ScopedLock */
+ void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); }
+ /** @pre Caller holds a ScopedLock */
+ void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); }
+
+ private:
+ Enum state;
+};
+
+}}
+
+
+#endif /*!QPID_SYS_STATEMONITOR_H*/
diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h
new file mode 100644
index 0000000000..eb71a1d742
--- /dev/null
+++ b/cpp/src/qpid/sys/Waitable.h
@@ -0,0 +1,73 @@
+#ifndef QPID_SYS_WAITABLE_H
+#define QPID_SYS_WAITABLE_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Monitor.h"
+
+#include <assert.h>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A monitor that keeps track of waiting threads.
+ * Threads that use a WaitLock are counted as waiters, threads that
+ * use a normal ScopedLock are not considered waiters.
+ */
+class Waitable : public Monitor {
+ public:
+ Waitable() : waiters(0) {}
+
+ /** Use this inside a scoped lock around the
+ * call to Monitor::wait to be counted as a waiter
+ */
+ struct ScopedWait {
+ Waitable& w;
+ ScopedWait(Waitable& w_) : w(w_) { ++w.waiters; }
+ ~ScopedWait() { --w.waiters; w.notifyAll(); }
+ };
+
+ /** Block till all waiters have finished waiting.
+ * The calling thread does not count as a waiter.
+ *@pre Must be called inside a ScopedLock but NOT a ScopedWait.
+ */
+ bool waitAll(Duration timeout=TIME_INFINITE) {
+ AbsTime deadline(now(), timeout);
+ while (waiters > 0) {
+ if (!wait(deadline)) {
+ assert(timeout != TIME_INFINITE);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private:
+ friend struct ScopedWait;
+ size_t waiters;
+};
+
+}} // namespace qpid::sys
+
+
+
+#endif /*!QPID_SYS_WAITABLE_H*/
diff --git a/cpp/src/qpid/sys/apr/APRBase.cpp b/cpp/src/qpid/sys/apr/APRBase.cpp
index f527e0d0b2..724c489303 100644
--- a/cpp/src/qpid/sys/apr/APRBase.cpp
+++ b/cpp/src/qpid/sys/apr/APRBase.cpp
@@ -20,7 +20,6 @@
*/
#include <iostream>
#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
#include "APRBase.h"
using namespace qpid::sys;
diff --git a/cpp/src/qpid/sys/apr/APRBase.h b/cpp/src/qpid/sys/apr/APRBase.h
index c6b1854fb1..7b5644a129 100644
--- a/cpp/src/qpid/sys/apr/APRBase.h
+++ b/cpp/src/qpid/sys/apr/APRBase.h
@@ -24,7 +24,6 @@
#include <string>
#include <apr_thread_mutex.h>
#include <apr_errno.h>
-#include "qpid/QpidError.h"
namespace qpid {
namespace sys {
@@ -64,11 +63,8 @@ namespace sys {
// Inlined as it is called *a lot*
void inline qpid::sys::check(apr_status_t status, const char* file, const int line){
if (status != APR_SUCCESS){
- const int size = 50;
- char tmp[size];
- std::string msg(apr_strerror(status, tmp, size));
- throw qpid::QpidError(APR_ERROR + ((int) status), msg,
- qpid::SrcLine(file, line));
+ char tmp[256];
+ throw Exception(QPID_MSG(apr_strerror(status, tmp, size)))
}
}
diff --git a/cpp/src/qpid/sys/posix/Shlib.cpp b/cpp/src/qpid/sys/posix/Shlib.cpp
index 2630337408..1552aa06b5 100644
--- a/cpp/src/qpid/sys/posix/Shlib.cpp
+++ b/cpp/src/qpid/sys/posix/Shlib.cpp
@@ -19,8 +19,7 @@
*/
#include "qpid/sys/Shlib.h"
-
-#include <qpid/QpidError.h>
+#include "qpid/Exception.h"
#include <dlfcn.h>
@@ -32,7 +31,7 @@ void Shlib::load(const char* name) {
handle = ::dlopen(name, RTLD_NOW);
const char* error = ::dlerror();
if (error) {
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
}
}
@@ -42,7 +41,7 @@ void Shlib::unload() {
::dlclose(handle);
const char* error = ::dlerror();
if (error) {
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
}
handle = 0;
}
@@ -53,7 +52,7 @@ void* Shlib::getSymbol(const char* name) {
void* sym = ::dlsym(handle, name);
const char* error = ::dlerror();
if (error)
- THROW_QPID_ERROR(INTERNAL_ERROR, error);
+ throw Exception(QPID_MSG(error));
return sym;
}
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index c7a83df581..f0cc8cd5a5 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -21,7 +21,6 @@
#include "qpid/sys/Socket.h"
-#include "qpid/QpidError.h"
#include "check.h"
#include "PrivatePosix.h"
@@ -60,8 +59,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const
result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
}
- if (result < 0)
- throw QPID_POSIX_ERROR(errno);
+ QPID_POSIX_CHECK(result);
char servName[NI_MAXSERV];
char dispName[NI_MAXHOST];
diff --git a/cpp/src/qpid/sys/posix/check.cpp b/cpp/src/qpid/sys/posix/check.cpp
deleted file mode 100644
index 408679caa8..0000000000
--- a/cpp/src/qpid/sys/posix/check.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <cerrno>
-#include "check.h"
-
-namespace qpid {
-namespace sys {
-
-std::string
-PosixError::getMessage(int errNo)
-{
- char buf[512];
- return std::string(strerror_r(errNo, buf, sizeof(buf)));
-}
-
-PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw()
- : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc)
-{ }
-
-}}
diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h
index 7fa7b69d3b..f864bf8762 100644
--- a/cpp/src/qpid/sys/posix/check.h
+++ b/cpp/src/qpid/sys/posix/check.h
@@ -22,41 +22,13 @@
*
*/
-#include <cerrno>
-#include <string>
-#include "qpid/QpidError.h"
-
-namespace qpid {
-namespace sys {
-
-/**
- * Exception with message from errno.
- */
-class PosixError : public qpid::QpidError
-{
- public:
- static std::string getMessage(int errNo);
-
- PosixError(int errNo, const qpid::SrcLine& location) throw();
-
- ~PosixError() throw() {}
-
- int getErrNo() { return errNo; }
+#include "qpid/Exception.h"
- Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); }
-
- void throwSelf() const { throw *this; }
-
- private:
- int errNo;
-};
-
-}}
+#include <cerrno>
-/** Create a PosixError for the current file/line and errno. */
-#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE)
+#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO)) << " " << ERRNO)
-/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */
+/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */
#define QPID_POSIX_CHECK(RESULT) \
if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno))