diff options
| author | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-10-26 19:48:31 +0000 |
| commit | f61e1ef7589da893b9b54448224dc0961515eb40 (patch) | |
| tree | 258ac1fd99ac122b105ad90ad4394d8d544c5cbf /cpp/src/qpid | |
| parent | c5294d471ade7a18c52ca7d4028a494011c82293 (diff) | |
| download | qpid-python-f61e1ef7589da893b9b54448224dc0961515eb40.tar.gz | |
Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network
failure are automatically re-transmitted for transparent re-connection.
client::Session improvements:
- Locking to avoid races between network & user threads.
- Replaced client::StateManager with sys::StateMonitor - avoid heap allocation.
qpid::Exception clean up:
- use QPID_MSG consistently to format exception messages.
- throw typed exceptions (in reply_exceptions.h) for AMQP exceptions.
- re-throw correct typed exception on client for exceptions from broker.
- Removed QpidError.h
rubygen/templates/constants.rb:
- constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants.
- reply_constants.h: Added throwReplyException(code, text)
log::Logger:
- Fixed shutdown race in Statement::~Initializer()
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@588761 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
86 files changed, 1343 insertions, 1191 deletions
diff --git a/cpp/src/qpid/Exception.cpp b/cpp/src/qpid/Exception.cpp index 11051d1a2e..e0747df49e 100644 --- a/cpp/src/qpid/Exception.cpp +++ b/cpp/src/qpid/Exception.cpp @@ -23,6 +23,7 @@ #include "Exception.h" #include <typeinfo> #include <errno.h> +#include <assert.h> namespace qpid { @@ -31,45 +32,22 @@ std::string strError(int err) { return std::string(strerror_r(err, buf, sizeof(buf))); } -static void ctorLog(const std::exception* e) { - QPID_LOG(trace, "Exception: " << e->what()); +Exception::Exception(const std::string& s) throw() : msg(s) { + QPID_LOG(warning, "Exception: " << msg); } - -Exception::Exception() throw() { ctorLog(this); } - -Exception::Exception(const std::string& str) throw() - : whatStr(str) { ctorLog(this); } - -Exception::Exception(const char* str) throw() : whatStr(str) { ctorLog(this); } - -Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {} Exception::~Exception() throw() {} -const char* Exception::what() const throw() { return whatStr.c_str(); } - -std::string Exception::toString() const throw() { return whatStr; } - -Exception::auto_ptr Exception::clone() const throw() { return Exception::auto_ptr(new Exception(*this)); } - -void Exception::throwSelf() const { throw *this; } - -ShutdownException::ShutdownException() : Exception("Shut down.") {} - -EmptyException::EmptyException() : Exception("Empty.") {} - -const char* Exception::defaultMessage = "Unexpected exception"; - -void Exception::log(const char* what, const char* message) { - QPID_LOG(error, message << ": " << what); +std::string Exception::str() const throw() { + if (msg.empty()) + const_cast<std::string&>(msg).assign(typeid(*this).name()); + return msg; } -void Exception::log(const std::exception& e, const char* message) { - log(e.what(), message); -} +const char* Exception::what() const throw() { return str().c_str(); } -void Exception::logUnknown(const char* message) { - log("unknown exception.", message); +std::auto_ptr<Exception> Exception::clone() const throw() { + return std::auto_ptr<Exception>(new Exception(*this)); } } // namespace qpid diff --git a/cpp/src/qpid/Exception.h b/cpp/src/qpid/Exception.h index a7ab1fa8aa..bf6c1fb872 100644 --- a/cpp/src/qpid/Exception.h +++ b/cpp/src/qpid/Exception.h @@ -24,135 +24,52 @@ #include "qpid/framing/amqp_types.h" #include "qpid/Msg.h" -#include <exception> -#include <string> + #include <memory> -#include <boost/shared_ptr.hpp> -#include <boost/lexical_cast.hpp> -#include <boost/function.hpp> +#include <string> namespace qpid { -/** Get the error message for error number err. */ +/** Get the error message for a system number err, e.g. errno. */ std::string strError(int err); /** - * Exception base class for all Qpid exceptions. + * Base class for Qpid runtime exceptions. */ class Exception : public std::exception { - protected: - std::string whatStr; - public: - typedef boost::shared_ptr<Exception> shared_ptr; - typedef boost::shared_ptr<const Exception> shared_ptr_const; - typedef std::auto_ptr<Exception> auto_ptr; - - Exception() throw(); - Exception(const std::string& str) throw(); - Exception(const char* str) throw(); - Exception(const std::exception&) throw(); - - /** Allow any type that has ostream operator<< to act as message */ - template <class T> - Exception(const T& message) - : whatStr(boost::lexical_cast<std::string>(message)) {} - + explicit Exception(const std::string& str=std::string()) throw(); virtual ~Exception() throw(); - - virtual const char* what() const throw(); - virtual std::string toString() const throw(); - - virtual auto_ptr clone() const throw(); - virtual void throwSelf() const; - - /** Default message: "Unknown exception" or something like it. */ - static const char* defaultMessage; - - /** - * Log a message of the form "message: what" - *@param what Exception's what() message. - *@param message Prefix message. - */ - static void log(const char* what, const char* message = defaultMessage); - - /** - * Log an exception. - *@param e Exception to log. - - */ - static void log( - const std::exception& e, const char* message = defaultMessage); - - /** - * Log an unknown exception - use in catch(...) - *@param message Prefix message. - */ - static void logUnknown(const char* message = defaultMessage); - - /** - * Wrapper template function to call another function inside - * try/catch and log any exception. Use boost::bind to wrap - * member function calls or functions with arguments. - * - *@param f Function to call in try block. - *@param retrhow If true the exception is rethrown. - *@param message Prefix message. - */ - template <class T> - static T tryCatchLog(boost::function0<T> f, bool rethrow=true, - const char* message=defaultMessage) - { - try { - return f(); - } - catch (const std::exception& e) { - log(e, message); - if (rethrow) - throw; - } - catch (...) { - logUnknown(message); - if (rethrow) - throw; - } - } + virtual const char *what() const throw(); + virtual std::auto_ptr<Exception> clone() const throw(); + virtual std::string str() const throw(); + private: + std::string msg; }; struct ChannelException : public Exception { - framing::ReplyCode code; - template <class T> - ChannelException(framing::ReplyCode code_, const T& message) + const framing::ReplyCode code; + ChannelException(framing::ReplyCode code_, const std::string& message) : Exception(message), code(code_) {} - void throwSelf() const { throw *this; } }; struct ConnectionException : public Exception { - framing::ReplyCode code; - template <class T> - ConnectionException(framing::ReplyCode code_, const T& message) + const framing::ReplyCode code; + ConnectionException(framing::ReplyCode code_, const std::string& message) : Exception(message), code(code_) {} - void throwSelf() const { throw *this; } }; -/** - * Exception used to indicate that a thread should shut down. - * Does not indicate an error that should be signalled to the user. +/** Clone an exception. + * For qpid::Exception this calls the clone member function. + * For standard exceptions, uses the copy constructor. + * For unknown exception types creates a std::exception + * with the same what() string. */ -struct ShutdownException : public Exception { - ShutdownException(); - void throwSelf() const { throw *this; } -}; - -/** Exception to indicate empty queue or other empty state */ -struct EmptyException : public Exception { - EmptyException(); - void throwSelf() const { throw *this; } -}; +std::auto_ptr<std::exception> clone(const std::exception&); -} +} // namespace qpid #endif /*!_Exception_*/ diff --git a/cpp/src/qpid/Msg.h b/cpp/src/qpid/Msg.h index c1a6b54d05..7214db611f 100644 --- a/cpp/src/qpid/Msg.h +++ b/cpp/src/qpid/Msg.h @@ -54,7 +54,7 @@ inline std::ostream& operator<<(std::ostream& o, const Msg& m) { } /** Construct a message using operator << and append (file:line) */ -#define QPID_MSG(message) Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")" +#define QPID_MSG(message) ::qpid::Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")" } // namespace qpid diff --git a/cpp/src/qpid/QpidError.cpp b/cpp/src/qpid/QpidError.cpp deleted file mode 100644 index 740ec24e54..0000000000 --- a/cpp/src/qpid/QpidError.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/format.hpp> - -#include "QpidError.h" -#include <sstream> - -using namespace qpid; - -QpidError::QpidError() : code(0) {} - -QpidError::~QpidError() throw() {} - -Exception::auto_ptr QpidError::clone() const throw() { return Exception::auto_ptr(new QpidError(*this)); } - -void QpidError::throwSelf() const { throw *this; } - -std::string QpidError::message(int code, const std::string& msg, const char* file, int line) { - return (boost::format("Error [%d] %s (%s:%d)") % code % msg % file % line).str(); -} - - diff --git a/cpp/src/qpid/QpidError.h b/cpp/src/qpid/QpidError.h deleted file mode 100644 index 2ff6571365..0000000000 --- a/cpp/src/qpid/QpidError.h +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef __QpidError__ -#define __QpidError__ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <string> -#include <memory> -#include <ostream> - -#include "Exception.h" - -namespace qpid { - -struct SrcLine { - public: - SrcLine(const std::string& file_="", int line_=0) : - file(file_), line(line_) {} - - std::string file; - int line; -}; - -class QpidError : public Exception -{ - public: - const int code; - SrcLine loc; - std::string msg; - - QpidError(); - - template <class T> - QpidError(int code_, const T& msg_, const SrcLine& loc_) throw() - : Exception(message(code_, boost::lexical_cast<std::string>(msg_), loc_.file.c_str(), loc_.line)), - code(code_), loc(loc_), msg(boost::lexical_cast<std::string>(msg_)) {} - - ~QpidError() throw(); - Exception::auto_ptr clone() const throw(); - void throwSelf() const; - - /** Format message for exception. */ - static std::string message(int code, const std::string& msg, const char* file, int line); -}; - - -} // namespace qpid - -#define SRCLINE ::qpid::SrcLine(__FILE__, __LINE__) - -#define QPID_ERROR(CODE, MESSAGE) ::qpid::QpidError((CODE), (MESSAGE), SRCLINE) - -#define THROW_QPID_ERROR(CODE, MESSAGE) throw QPID_ERROR(CODE,MESSAGE) - -#define THROW_QPID_ERRNO_IF(cond) if (cond) QPID_ERROR(INTERNAL, strError(errno)); - -const int PROTOCOL_ERROR = 10000; -const int APR_ERROR = 20000; -const int FRAMING_ERROR = 30000; -const int CLIENT_ERROR = 40000; -const int INTERNAL_ERROR = 50000; - -#endif diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e53774740a..b88f1c6c6a 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -64,7 +64,8 @@ Broker::Options::Options(const std::string& name) : storeDir("/var"), storeAsync(false), enableMgmt(0), - mgmtPubInterval(10) + mgmtPubInterval(10), + ack(100) { addOptions() ("port,p", optValue(port,"PORT"), @@ -102,7 +103,8 @@ Broker::Broker(const Broker::Options& conf) : queues(store.get()), stagingThreshold(0), factory(*this), - dtxManager(store.get()) + dtxManager(store.get()), + sessionManager(conf.ack) { if(conf.enableMgmt){ managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval)); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 2018371624..817197a351 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -69,6 +69,7 @@ class Broker : public sys::Runnable, public Plugin::Target bool storeAsync; bool enableMgmt; uint16_t mgmtPubInterval; + uint32_t ack; }; virtual ~Broker(); diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 99b585406e..dad40868d6 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -21,6 +21,7 @@ #include "MessageDelivery.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace broker { @@ -75,8 +76,7 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri checkAlternate(response.first, alternate); } }catch(UnknownExchangeTypeException& e){ - throw ConnectionException( - 503, "Exchange type not implemented: " + type); + throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type)); } } } @@ -84,24 +84,23 @@ void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const stri void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) { if (!type.empty() && exchange->getType() != type) { - throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type); + throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type)); } } void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) { - if (alternate && alternate != exchange->getAlternate()) { - throw ConnectionException(530, "Exchange declared with alternate-exchange " - + exchange->getAlternate()->getName() + ", requested " - + alternate->getName()); - } - + if (alternate && alternate != exchange->getAlternate()) + throw NotAllowedException( + QPID_MSG("Exchange declared with alternate-exchange " + << exchange->getAlternate()->getName() << ", requested " + << alternate->getName())); } void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ //TODO: implement unused Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange."); + if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); if (exchange->isDurable()) getBroker().getStore().destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); getBroker().getExchanges().destroy(name); @@ -292,7 +291,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, Queue::shared_ptr queue = state.getQueue(queueName); if(!consumerTag.empty() && state.exists(consumerTag)){ - throw ConnectionException(530, "Consumer tags must be unique"); + throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); } string newTag = consumerTag; //need to generate name here, so we have it for the adapter (it is diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index 5537dc67f5..706b42c080 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -82,7 +82,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations throw framing::NotImplementedException("Tunnel class not implemented"); } // Handlers no longer implemented in BrokerAdapter: -#define BADHANDLER() assert(0); throw framing::InternalErrorException() +#define BADHANDLER() assert(0); throw framing::NotImplementedException("") ExecutionHandler* getExecutionHandler() { BADHANDLER(); } ConnectionHandler* getConnectionHandler() { BADHANDLER(); } SessionHandler* getSessionHandler() { BADHANDLER(); } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index ca0ca20849..f981d47ef7 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -28,9 +28,10 @@ #include "BrokerAdapter.h" #include "SemanticHandler.h" -#include <boost/utility/in_place_factory.hpp> #include <boost/bind.hpp> +#include <algorithm> + using namespace boost; using namespace qpid::sys; using namespace qpid::framing; @@ -61,6 +62,7 @@ void Connection::close( ReplyCode code, const string& text, ClassId classId, MethodId methodId) { adapter.close(code, text, classId, methodId); + channels.clear(); getOutput().close(); } @@ -73,8 +75,11 @@ void Connection::idleOut(){} void Connection::idleIn(){} -void Connection::closed(){ +void Connection::closed(){ // Physically closed, suspend open sessions. try { + std::for_each( + channels.begin(), channels.end(), + boost::bind(&SessionHandler::localSuspend, _1)); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index f697986194..dd645b595e 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -46,9 +46,9 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) AMQMethodBody* method=frame.getBody()->getMethod(); try{ if (!invoke(*handler.get(), *method)) - throw ConnectionException(503, "Class can't be accessed over channel 0"); + throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0")); }catch(ConnectionException& e){ - handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); + handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId()); }catch(std::exception& e){ handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } diff --git a/cpp/src/qpid/broker/Daemon.cpp b/cpp/src/qpid/broker/Daemon.cpp index 0bb3449289..3fcc487324 100644 --- a/cpp/src/qpid/broker/Daemon.cpp +++ b/cpp/src/qpid/broker/Daemon.cpp @@ -17,7 +17,7 @@ */ #include "Daemon.h" #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include <boost/iostreams/stream.hpp> #include <boost/iostreams/device/file_descriptor.hpp> diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 5887d13f85..ec042ff56a 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -44,7 +44,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, if (fail) { state.endDtx(xid, true); if (suspend) { - throw ConnectionException(503, "End and suspend cannot both be set."); + throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { return DtxDemarcationEndResult(XA_RBROLLBACK); } @@ -67,7 +67,7 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, bool resume) { if (join && resume) { - throw ConnectionException(503, "Join and resume cannot both be set."); + throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set.")); } try { if (resume) { @@ -161,7 +161,7 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, const string& xid) { //Currently no heuristic completion is supported, so this should never be used. - throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); + throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); } DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 0d211017de..0597b41f98 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -20,16 +20,20 @@ */ #include "DtxManager.h" #include "DtxTimeout.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include <boost/format.hpp> #include <iostream> using qpid::sys::Mutex; using namespace qpid::broker; +using namespace qpid::framing; DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {} -DtxManager::~DtxManager() {} +DtxManager::~DtxManager() { + // timer.stop(); // FIXME aconway 2007-10-23: leaking threads. +} void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops) { @@ -84,7 +88,7 @@ DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid)); } return i; } @@ -94,7 +98,7 @@ void DtxManager::remove(const std::string& xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i == work.end()) { - throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid); + throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid)); } else { work.erase(i); } @@ -105,7 +109,7 @@ DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid) Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); if (i != work.end()) { - throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid); + throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)")); } else { return work.insert(xid, new DtxWorkRecord(xid, store)).first; } diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h index 33da62e7f4..7d0b8622d0 100644 --- a/cpp/src/qpid/broker/DtxTimeout.h +++ b/cpp/src/qpid/broker/DtxTimeout.h @@ -29,12 +29,7 @@ namespace broker { class DtxManager; - -struct DtxTimeoutException : public Exception -{ - DtxTimeoutException() {} -}; - +struct DtxTimeoutException : public Exception {}; struct DtxTimeout : public TimerTask { diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index f2f118c5e4..fe9e42ca32 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -19,12 +19,14 @@ * */ #include "DtxWorkRecord.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> #include <boost/mem_fn.hpp> using boost::mem_fn; using qpid::sys::Mutex; using namespace qpid::broker; +using namespace qpid::framing; DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {} @@ -71,8 +73,7 @@ bool DtxWorkRecord::commit(bool onePhase) if (prepared) { //already prepared i.e. 2pc if (onePhase) { - throw ConnectionException(503, - boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!")); } store->commit(*txn); @@ -83,8 +84,7 @@ bool DtxWorkRecord::commit(bool onePhase) } else { //1pc commit optimisation, don't need a 2pc transaction context: if (!onePhase) { - throw ConnectionException(503, - boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!")); } std::auto_ptr<TransactionContext> localtxn = store->begin(); if (prepare(localtxn.get())) { @@ -119,7 +119,7 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) throw DtxTimeoutException(); } if (completed) { - throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!")); } work.push_back(ops); } @@ -133,7 +133,7 @@ bool DtxWorkRecord::check() //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { if (!(*i)->isEnded()) { - throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid); + throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!")); } else if ((*i)->isRollbackOnly()) { rolledback = true; } diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index ae1afe5abb..98e3cc7347 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -24,6 +24,7 @@ #include "HeadersExchange.h" #include "TopicExchange.h" #include "ManagementExchange.h" +#include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; using namespace qpid::sys; @@ -75,9 +76,8 @@ void ExchangeRegistry::destroy(const string& name){ Exchange::shared_ptr ExchangeRegistry::get(const string& name){ RWlock::ScopedRlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) { - throw ChannelException(404, "Exchange not found: " + name); - } + if (i == exchanges.end()) + throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name)); return i->second; } diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 215a002517..dd688cdfcf 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -20,7 +20,7 @@ */ #include "HeadersExchange.h" #include "qpid/framing/FieldValue.h" -#include "qpid/QpidError.h" +#include "qpid/framing/reply_exceptions.h" #include <algorithm> @@ -46,9 +46,8 @@ HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, const bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){ RWlock::ScopedWlock locker(lock); FieldTable::ValuePtr what = args->get(x_match); - if (!what || (*what != all && *what != any)) { - THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); - } + if (!what || (*what != all && *what != any)) + throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange.")); Binding binding(*args, queue); Bindings::iterator i = std::find(bindings.begin(),bindings.end(), binding); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index b12910893a..834ce0a203 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -16,7 +16,7 @@ * */ -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include "qpid/log/Statement.h" #include "MessageHandlerImpl.h" #include "qpid/framing/FramingContent.h" @@ -56,39 +56,39 @@ MessageHandlerImpl::cancel(const string& destination ) void MessageHandlerImpl::open(const string& /*reference*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::close(const string& /*reference*/) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::checkpoint(const string& /*reference*/, const string& /*identifier*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::resume(const string& /*reference*/, const string& /*identifier*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void MessageHandlerImpl::offset(uint64_t /*value*/ ) { - throw ConnectionException(540, "References no longer supported"); + throw NotImplementedException("References no longer supported"); } void @@ -97,19 +97,19 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, const string& /*destination*/, bool /*noAck*/ ) { - throw ConnectionException(540, "get no longer supported"); + throw NotImplementedException("get no longer supported"); } void MessageHandlerImpl::empty() { - throw ConnectionException(540, "empty no longer supported"); + throw NotImplementedException("empty no longer supported"); } void MessageHandlerImpl::ok() { - throw ConnectionException(540, "Message.Ok no longer supported"); + throw NotImplementedException("Message.Ok no longer supported"); } void @@ -134,7 +134,7 @@ MessageHandlerImpl::subscribe(uint16_t /*ticket*/, { Queue::shared_ptr queue = state.getQueue(queueName); if(!destination.empty() && state.exists(destination)) - throw ConnectionException(530, "Consumer tags must be unique"); + throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); string tag = destination; state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), @@ -165,7 +165,7 @@ void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_i state.addByteCredit(destination, value); } else { //unknown - throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); + throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit)); } } @@ -179,7 +179,7 @@ void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) //window state.setWindowMode(destination); } else{ - throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); + throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode)); } } diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 116e8d9431..18c1ab1056 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -19,9 +19,8 @@ * */ -#include <boost/format.hpp> - #include "qpid/log/Statement.h" +#include "qpid/framing/reply_exceptions.h" #include "Broker.h" #include "Queue.h" #include "Exchange.h" @@ -37,7 +36,6 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; -using boost::format; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, @@ -269,17 +267,15 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) { void Queue::consume(Consumer::ptr c, bool requestExclusive){ RWlock::ScopedWlock locker(consumerLock); if(exclusive) { - throw ChannelException( - 403, format("Queue '%s' has an exclusive consumer." - " No more consumers allowed.") % getName()); + throw AccessRefusedException( + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); } if(requestExclusive) { if(acquirers.empty() && browsers.empty()) { exclusive = c; } else { - throw ChannelException( - 403, format("Queue '%s' already has consumers." - "Exclusive access denied.") % getName()); + throw AccessRefusedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); } } if (c->preAcquires()) { diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 8535dc6a60..e1a8ae470d 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -125,7 +125,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) incoming.complete(id); if (!invoker.wasHandled()) { - throw ConnectionException(540, "Not implemented"); + throw NotImplementedException("Not implemented"); } else if (invoker.hasResult()) { session.getProxy().getExecution().result(id.getValue(), invoker.getResult()); } @@ -139,7 +139,7 @@ void SemanticHandler::handleCommand(framing::AMQMethodBody* method) void SemanticHandler::handleL3(framing::AMQMethodBody* method) { if (!invoke(*this, *method)) - throw ConnectionException(540, "Not implemented"); + throw NotImplementedException("Not implemented"); } void SemanticHandler::handleContent(AMQFrame& frame) diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 1f7436da94..e0e4315d03 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -31,7 +31,6 @@ #include "SessionHandler.h" #include "TxAck.h" #include "TxPublish.h" -#include "qpid/QpidError.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" @@ -116,7 +115,8 @@ void SemanticState::startTx() void SemanticState::commit(MessageStore* const store) { - if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + if (!txBuffer) throw + CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); txBuffer->enlist(txAck); @@ -127,7 +127,8 @@ void SemanticState::commit(MessageStore* const store) void SemanticState::rollback() { - if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions"); + if (!txBuffer) + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); txBuffer->rollback(); accumulatedAck.clear(); @@ -141,7 +142,7 @@ void SemanticState::selectDtx() void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) { if (!dtxSelected) { - throw ConnectionException(503, "Session has not been selected for use with dtx"); + throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); } dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); @@ -155,11 +156,12 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) void SemanticState::endDtx(const std::string& xid, bool fail) { if (!dtxBuffer) { - throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid); + throw CommandInvalidException(QPID_MSG("xid " << xid << " not associated with this session")); } if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end")); + } txBuffer.reset();//ops on this session no longer transactional @@ -176,8 +178,8 @@ void SemanticState::endDtx(const std::string& xid, bool fail) void SemanticState::suspendDtx(const std::string& xid) { if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend")); } txBuffer.reset();//ops on this session no longer transactional @@ -188,11 +190,12 @@ void SemanticState::suspendDtx(const std::string& xid) void SemanticState::resumeDtx(const std::string& xid) { if (dtxBuffer->getXid() != xid) { - throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") - % dtxBuffer->getXid() % xid); + throw CommandInvalidException( + QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume")); + } if (!dtxBuffer->isSuspended()) { - throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); + throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended")); } checkDtxTimeout(); diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index ed092d6a05..9b065be8af 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -26,6 +26,8 @@ #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" +#include <boost/bind.hpp> + namespace qpid { namespace broker { using namespace framing; @@ -33,7 +35,9 @@ using namespace std; SessionHandler::SessionHandler(Connection& c, ChannelId ch) : InOutHandler(0, &c.getOutput()), - connection(c), channel(ch), proxy(out), + connection(c), channel(ch, &c.getOutput()), + proxy(out), // Via my own handleOut() for L2 data. + peerSession(channel), // Direct to channel for L2 commands. ignoring(false) {} SessionHandler::~SessionHandler() {} @@ -54,15 +58,19 @@ void SessionHandler::handleIn(AMQFrame& f) { try { if (m && invoke(*this, *m)) return; - else if (session.get()) - session->in(f); + else if (session.get()) { + boost::optional<SequenceNumber> ack=session->received(f); + session->in.handle(f); + if (ack) + peerSession.ack(*ack, SequenceNumberSet()); + } else if (!ignoring) throw ChannelErrorException( - QPID_MSG("Channel " << channel << " is not open")); + QPID_MSG("Channel " << channel.get() << " is not open")); } catch(const ChannelException& e) { ignoring=true; // Ignore trailing frames sent by client. session.reset(); - getProxy().getSession().closed(e.code, e.toString()); + peerSession.closed(e.code, e.what()); }catch(const ConnectionException& e){ connection.close(e.code, e.what(), classId(m), methodId(m)); }catch(const std::exception& e){ @@ -72,21 +80,22 @@ void SessionHandler::handleIn(AMQFrame& f) { } void SessionHandler::handleOut(AMQFrame& f) { - f.setChannel(getChannel()); - out.next->handle(f); + channel.handle(f); // Send it. + if (session->sent(f)) + peerSession.solicitAck(); } -void SessionHandler::assertOpen(const char* method) { - if (!session.get()) +void SessionHandler::assertAttached(const char* method) const { + if (!session.get()) throw ChannelErrorException( QPID_MSG(method << " failed: No session for channel " << getChannel())); } -void SessionHandler::assertClosed(const char* method) { +void SessionHandler::assertClosed(const char* method) const { if (session.get()) throw ChannelBusyException( - QPID_MSG(method << " failed: channel " << channel + QPID_MSG(method << " failed: channel " << channel.get() << " is already open.")); } @@ -95,32 +104,38 @@ void SessionHandler::open(uint32_t detachedLifetime) { std::auto_ptr<SessionState> state( connection.broker.getSessionManager().open(*this, detachedLifetime)); session.reset(state.release()); - getProxy().getSession().attached(session->getId(), session->getTimeout()); + peerSession.attached(session->getId(), session->getTimeout()); } void SessionHandler::resume(const Uuid& id) { assertClosed("resume"); - session = connection.broker.getSessionManager().resume(*this, id); - getProxy().getSession().attached(session->getId(), session->getTimeout()); + session = connection.broker.getSessionManager().resume(id); + session->attach(*this); + SequenceNumber seq = session->resuming(); + peerSession.attached(session->getId(), session->getTimeout()); + proxy.getSession().ack(seq, SequenceNumberSet()); } void SessionHandler::flow(bool /*active*/) { + assertAttached("flow"); // FIXME aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException(); + assert(0); throw NotImplementedException("session.flow"); } void SessionHandler::flowOk(bool /*active*/) { + assertAttached("flowOk"); // FIXME aconway 2007-09-19: Removed in 0-10, remove - assert(0); throw NotImplementedException(); + assert(0); throw NotImplementedException("session.flowOk"); } void SessionHandler::close() { + assertAttached("close"); QPID_LOG(info, "Received session.close"); ignoring=false; session.reset(); - getProxy().getSession().closed(REPLY_SUCCESS, "ok"); - assert(&connection.getChannel(channel) == this); - connection.closeChannel(channel); + peerSession.closed(REPLY_SUCCESS, "ok"); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); } void SessionHandler::closed(uint16_t replyCode, const string& replyText) { @@ -129,26 +144,43 @@ void SessionHandler::closed(uint16_t replyCode, const string& replyText) { session.reset(); } +void SessionHandler::localSuspend() { + if (session.get() && session->getState() == SessionState::ATTACHED) { + session->detach(); + connection.broker.getSessionManager().suspend(session); + } +} + void SessionHandler::suspend() { - assertOpen("suspend"); - connection.broker.getSessionManager().suspend(session); - assert(!session.get()); - getProxy().getSession().detached(); - assert(&connection.getChannel(channel) == this); - connection.closeChannel(channel); + assertAttached("suspend"); + localSuspend(); + peerSession.detached(); + assert(&connection.getChannel(channel.get()) == this); + connection.closeChannel(channel.get()); } -void SessionHandler::ack(uint32_t /*cumulativeSeenMark*/, - const SequenceNumberSet& /*seenFrameSet*/) { - assert(0); throw NotImplementedException(); +void SessionHandler::ack(uint32_t cumulativeSeenMark, + const SequenceNumberSet& /*seenFrameSet*/) +{ + assertAttached("ack"); + if (session->getState() == SessionState::RESUMING) { + session->receivedAck(cumulativeSeenMark); + framing::SessionState::Replay replay=session->replay(); + std::for_each(replay.begin(), replay.end(), + boost::bind(&SessionHandler::handleOut, this, _1)); + } + else + session->receivedAck(cumulativeSeenMark); } void SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) { - assert(0); throw NotImplementedException(); + // FIXME aconway 2007-10-02: may be removed from spec. + assert(0); throw NotImplementedException("session.high-water-mark"); } void SessionHandler::solicitAck() { - assert(0); throw NotImplementedException(); + assertAttached("solicit-ack"); + peerSession.ack(session->sendingAck(), SequenceNumberSet()); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index 51a65e3092..9a68ddb46f 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -26,6 +26,7 @@ #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/amqp_types.h" +#include "qpid/framing/ChannelHandler.h" #include <boost/noncopyable.hpp> @@ -52,7 +53,7 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, SessionState* getSession() { return session.get(); } const SessionState* getSession() const { return session.get(); } - framing::ChannelId getChannel() const { return channel; } + framing::ChannelId getChannel() const { return channel.get(); } Connection& getConnection() { return connection; } const Connection& getConnection() const { return connection; } @@ -60,6 +61,9 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } + // Called by closing connection. + void localSuspend(); + protected: void handleIn(framing::AMQFrame&); void handleOut(framing::AMQFrame&); @@ -79,12 +83,14 @@ class SessionHandler : public framing::FrameHandler::InOutHandler, void solicitAck(); - void assertOpen(const char* method); - void assertClosed(const char* method); + void assertAttached(const char* method) const; + void assertActive(const char* method) const; + void assertClosed(const char* method) const; Connection& connection; - const framing::ChannelId channel; + framing::ChannelHandler channel; framing::AMQP_ClientProxy proxy; + framing::AMQP_ClientProxy::Session peerSession; bool ignoring; std::auto_ptr<SessionState> session; }; diff --git a/cpp/src/qpid/broker/SessionManager.cpp b/cpp/src/qpid/broker/SessionManager.cpp index 303687c788..f12ebc6db1 100644 --- a/cpp/src/qpid/broker/SessionManager.cpp +++ b/cpp/src/qpid/broker/SessionManager.cpp @@ -39,7 +39,7 @@ namespace broker { using namespace sys; using namespace framing; -SessionManager::SessionManager() {} +SessionManager::SessionManager(uint32_t a) : ack(a) {} SessionManager::~SessionManager() {} @@ -47,7 +47,8 @@ std::auto_ptr<SessionState> SessionManager::open( SessionHandler& h, uint32_t timeout_) { Mutex::ScopedLock l(lock); - std::auto_ptr<SessionState> session(new SessionState(*this, h, timeout_)); + std::auto_ptr<SessionState> session( + new SessionState(*this, h, timeout_, ack)); active.insert(session->getId()); return session; } @@ -55,14 +56,13 @@ std::auto_ptr<SessionState> SessionManager::open( void SessionManager::suspend(std::auto_ptr<SessionState> session) { Mutex::ScopedLock l(lock); active.erase(session->getId()); + session->suspend(); session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); - session->handler = 0; suspended.push_back(session.release()); // In expiry order eraseExpired(); } -std::auto_ptr<SessionState> SessionManager::resume( - SessionHandler& sh, const Uuid& id) +std::auto_ptr<SessionState> SessionManager::resume(const Uuid& id) { Mutex::ScopedLock l(lock); eraseExpired(); @@ -78,7 +78,6 @@ std::auto_ptr<SessionState> SessionManager::resume( QPID_MSG("No suspended session with id=" << id)); active.insert(id); std::auto_ptr<SessionState> state(suspended.release(i).release()); - state->handler = &sh; return state; } @@ -94,8 +93,10 @@ void SessionManager::eraseExpired() { Suspended::iterator keep = std::lower_bound( suspended.begin(), suspended.end(), now(), boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2)); - QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); - suspended.erase(suspended.begin(), keep); + if (suspended.begin() != keep) { + QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep)); + suspended.erase(suspended.begin(), keep); + } } } diff --git a/cpp/src/qpid/broker/SessionManager.h b/cpp/src/qpid/broker/SessionManager.h index 58a7b3f01f..fa7262252d 100644 --- a/cpp/src/qpid/broker/SessionManager.h +++ b/cpp/src/qpid/broker/SessionManager.h @@ -44,7 +44,7 @@ class SessionHandler; */ class SessionManager : private boost::noncopyable { public: - SessionManager(); + SessionManager(uint32_t ack); ~SessionManager(); /** Open a new active session, caller takes ownership */ std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_); @@ -57,18 +57,20 @@ class SessionManager : private boost::noncopyable { /** Resume a suspended session. *@throw Exception if timed out or non-existant. */ - std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&); + std::auto_ptr<SessionState> resume(const framing::Uuid&); private: typedef boost::ptr_vector<SessionState> Suspended; typedef std::set<framing::Uuid> Active; + void erase(const framing::Uuid&); + void eraseExpired(); + sys::Mutex lock; Suspended suspended; Active active; - - void erase(const framing::Uuid&); - void eraseExpired(); + uint32_t ack; + friend class SessionState; // removes deleted sessions from active set. }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 17537e11be..45d78c9307 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -31,22 +31,25 @@ namespace broker { using namespace framing; -SessionState::SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_) - : factory(f), handler(&h), id(true), timeout(timeout_), - broker(h.getConnection().broker), - version(h.getConnection().getVersion()) -{ - // FIXME aconway 2007-09-21: Break dependnecy - broker updates session. - chain.push_back(new SemanticHandler(*this)); - in = &chain[0]; // Incoming frame to handler chain. - out = &handler->out; // Outgoing frames to SessionHandler +void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); } - // FIXME aconway 2007-09-20: use broker to add plugin - // handlers to the chain. - // FIXME aconway 2007-08-31: Shouldn't be passing channel ID. - broker.update(handler->getChannel(), *this); +void SessionState::handleOut(AMQFrame& f) { + assert(handler); + handler->out.handle(f); } +SessionState::SessionState( + SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack) + : framing::SessionState(ack), + factory(f), handler(&h), id(true), timeout(timeout_), + broker(h.getConnection().broker), + version(h.getConnection().getVersion()), + semanticHandler(new SemanticHandler(*this)) +{ + // FIXME aconway 2007-09-20: SessionManager may add plugin + // handlers to the chain. + } + SessionState::~SessionState() { // Remove ID from active session list. factory.erase(getId()); @@ -65,4 +68,12 @@ Connection& SessionState::getConnection() { return getHandler().getConnection(); } +void SessionState::detach() { + handler = 0; +} + +void SessionState::attach(SessionHandler& h) { + handler = &h; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index d152937692..eed088af31 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -24,11 +24,12 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/SessionState.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/sys/Time.h" -#include <boost/ptr_container/ptr_vector.hpp> #include <boost/noncopyable.hpp> +#include <boost/scoped_ptr.hpp> #include <set> #include <vector> @@ -42,31 +43,26 @@ class AMQP_ClientProxy; namespace broker { +class SemanticHandler; class SessionHandler; class SessionManager; class Broker; class Connection; /** - * State of a session. - * - * An attached session has a SessionHandler which is attached to a - * connection. A suspended session has no handler. - * - * A SessionState is always associated with an open session (attached or - * suspended) it is destroyed when the session is closed. - * - * The SessionState includes the sessions handler chains, which may - * themselves have state. The handlers will be preserved as long as - * the session is alive. + * Broker-side session state includes sessions handler chains, which may + * themselves have state. */ -class SessionState : public framing::FrameHandler::Chains, - private boost::noncopyable +class SessionState : public framing::SessionState, + public framing::FrameHandler::InOutHandler { public: ~SessionState(); bool isAttached() { return handler; } + void detach(); + void attach(SessionHandler& handler); + /** @pre isAttached() */ SessionHandler& getHandler(); @@ -76,23 +72,30 @@ class SessionState : public framing::FrameHandler::Chains, /** @pre isAttached() */ Connection& getConnection(); - const framing::Uuid& getId() const { return id; } uint32_t getTimeout() const { return timeout; } Broker& getBroker() { return broker; } framing::ProtocolVersion getVersion() const { return version; } + + protected: + void handleIn(framing::AMQFrame&); + void handleOut(framing::AMQFrame&); private: - /** Only SessionManager can open sessions */ - SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_); - + // SessionManager creates sessions. + SessionState(SessionManager&, + SessionHandler& out, + uint32_t timeout, + uint32_t ackInterval); + SessionManager& factory; SessionHandler* handler; framing::Uuid id; uint32_t timeout; sys::AbsTime expiry; // Used by SessionManager. Broker& broker; - boost::ptr_vector<framing::FrameHandler> chain; framing::ProtocolVersion version; + + boost::scoped_ptr<SemanticHandler> semanticHandler; friend class SessionManager; }; diff --git a/cpp/src/qpid/broker/Timer.cpp b/cpp/src/qpid/broker/Timer.cpp index be75346578..14727b3b35 100644 --- a/cpp/src/qpid/broker/Timer.cpp +++ b/cpp/src/qpid/broker/Timer.cpp @@ -73,17 +73,14 @@ void Timer::start() Monitor::ScopedLock l(monitor); if (!active) { active = true; - runner = std::auto_ptr<Thread>(new Thread(this)); + runner = Thread(this); } } void Timer::stop() { signalStop(); - if (runner.get()) { - runner->join(); - runner.reset(); - } + runner.join(); } void Timer::signalStop() { diff --git a/cpp/src/qpid/broker/Timer.h b/cpp/src/qpid/broker/Timer.h index c70ffeaedc..e89ae499b7 100644 --- a/cpp/src/qpid/broker/Timer.h +++ b/cpp/src/qpid/broker/Timer.h @@ -53,7 +53,7 @@ class Timer : private qpid::sys::Runnable { qpid::sys::Monitor monitor; std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks; - std::auto_ptr<qpid::sys::Thread> runner; + qpid::sys::Thread runner; bool active; void run(); diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index cef34630db..16e0428a56 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -24,7 +24,6 @@ #include "Channel.h" #include "qpid/sys/Monitor.h" #include "Message.h" -#include "qpid/QpidError.h" #include "Connection.h" #include "Demux.h" #include "FutureResponse.h" @@ -71,7 +70,7 @@ void Channel::open(const Session& s) { Mutex::ScopedLock l(stopLock); if (isOpen()) - THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel"); + throw ChannelBusyException(); active = true; session = s; if(isTransactional()) { @@ -142,7 +141,7 @@ void Channel::consume( Mutex::ScopedLock l(lock); ConsumerMap::iterator i = consumers.find(tag); if (i != consumers.end()) - throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag); + throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag )); Consumer& c = consumers[tag]; c.listener = listener; c.ackMode = ackMode; diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index 0a6a88ae90..932fab8881 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -29,7 +29,7 @@ #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" +#include "qpid/shared_ptr.h" #include <iostream> #include <sstream> #include <functional> @@ -44,23 +44,26 @@ namespace client { Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : channelIdCounter(0), version(_version), max_frame_size(_max_frame_size), - impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))), - isOpen(false) {} + isOpen(false), + impl(new ConnectionImpl( + shared_ptr<Connector>(new Connector(_version, _debug)))) +{} -Connection::Connection(boost::shared_ptr<Connector> c) : +Connection::Connection(shared_ptr<Connector> c) : channelIdCounter(0), version(framing::highestProtocolVersion), max_frame_size(65536), - impl(new ConnectionImpl(c)), - isOpen(false) {} + isOpen(false), + impl(new ConnectionImpl(c)) +{} -Connection::~Connection(){} +Connection::~Connection(){ } void Connection::open( const std::string& host, int port, const std::string& uid, const std::string& pwd, const std::string& vhost) { if (isOpen) - THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open"); + throw Exception(QPID_MSG("Channel object is already open")); impl->open(host, port, uid, pwd, vhost); isOpen = true; @@ -79,10 +82,9 @@ Session Connection::newSession(uint32_t detachedLifetime) { } void Connection::resume(Session& session) { - shared_ptr<SessionCore> core=session.impl; - core->setChannel(++channelIdCounter); - impl->addSession(core); - core->resume(impl); + session.impl->setChannel(++channelIdCounter); + impl->addSession(session.impl); + session.impl->resume(impl); } void Connection::close() { diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 2e5059f135..d2612ca754 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -23,7 +23,6 @@ */ #include <map> #include <string> -#include "qpid/QpidError.h" #include "Channel.h" #include "ConnectionImpl.h" #include "qpid/client/Session.h" @@ -57,10 +56,12 @@ class Connection framing::ChannelId channelIdCounter; framing::ProtocolVersion version; const uint32_t max_frame_size; - shared_ptr<ConnectionImpl> impl; bool isOpen; bool debug; - + + protected: + boost::shared_ptr<ConnectionImpl> impl; + public: /** * Creates a connection object, but does not open the diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 4058bfb33f..a8f10c32a9 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -68,7 +68,7 @@ void ConnectionHandler::incoming(AMQFrame& frame) try { in(frame); }catch(ConnectionException& e){ - error(e.code, e.toString(), body); + error(e.code, e.what(), body); }catch(std::exception& e){ error(541/*internal error*/, e.what(), body); } @@ -124,6 +124,8 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_ void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) { + if (onError) + onError(code, message); AMQMethodBody* method = body->getMethod(); if (method) error(code, message, method->amqpClassId(), method->amqpMethodId()); diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index fae93e8294..f9273bc165 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" #include "ConnectionImpl.h" @@ -35,8 +36,9 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) { handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); handler.out = boost::bind(&Connector::send, connector, _1); - handler.onClose = boost::bind(&ConnectionImpl::closed, this); - handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2); + handler.onClose = boost::bind(&ConnectionImpl::closed, this, + REPLY_SUCCESS, std::string()); + handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); connector->setInputHandler(&handler); connector->setTimeoutHandler(this); connector->setShutdownHandler(this); @@ -64,7 +66,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) s = sessions[frame.getChannel()].lock(); } if (!s) - throw ChannelErrorException(); + throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel())); s->in(frame); } @@ -84,19 +86,8 @@ void ConnectionImpl::open(const std::string& host, int port, void ConnectionImpl::close() { - assertNotClosed(); - handler.close(); -} - -void ConnectionImpl::closed() -{ - closedByPeer(200, "OK"); -} - -void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text) -{ - signalClose(code, text); - connector->close(); + if (!isClosed) + handler.close(); } void ConnectionImpl::idleIn() @@ -110,26 +101,39 @@ void ConnectionImpl::idleOut() connector->send(frame); } +template <class F> +void ConnectionImpl::forChannels(F functor) { + for (SessionMap::iterator i = sessions.begin(); + i != sessions.end(); ++i) { + try { + boost::shared_ptr<SessionCore> s = i->second.lock(); + if (s) functor(*s); + } catch (...) { assert(0); } + } +} + void ConnectionImpl::shutdown() { - //this indicates that the socket to the server has closed - signalClose(0, "Unexpected socket closure."); + Mutex::ScopedLock l(lock); + if (isClosed) return; + forChannels(boost::bind(&SessionCore::connectionBroke, _1, + INTERNAL_ERROR, "Unexpected socket closure.")); + sessions.clear(); + isClosed = true; } -void ConnectionImpl::signalClose(uint16_t code, const std::string& text) +void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - boost::shared_ptr<SessionCore> s = i->second.lock(); - if (s) - s->closed(code, text); - } + if (isClosed) return; + forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text)); sessions.clear(); isClosed = true; + connector->close(); } -void ConnectionImpl::assertNotClosed() -{ +void ConnectionImpl::erase(uint16_t ch) { Mutex::ScopedLock l(lock); - if (isClosed) throw Exception("Connection has been closed"); + sessions.erase(ch); } + diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index f20534f1aa..46bd5b685d 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -51,14 +51,14 @@ class ConnectionImpl : public framing::FrameHandler, bool isClosed; void incoming(framing::AMQFrame& frame); - void closed(); - void closedByPeer(uint16_t, const std::string&); + void closed(uint16_t, const std::string&); void idleOut(); void idleIn(); void shutdown(); - void signalClose(uint16_t, const std::string&); - void assertNotClosed(); -public: + + template <class F> void forChannels(F functor); + + public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; ConnectionImpl(boost::shared_ptr<Connector> c); @@ -69,7 +69,9 @@ public: const std::string& pwd = "guest", const std::string& virtualhost = "/"); void close(); - void handle(framing::AMQFrame& frame); + void handle(framing::AMQFrame& frame); + void erase(uint16_t channel); + boost::shared_ptr<Connector> getConnector() { return connector; } }; }} diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index b1ec580605..ba11ea5569 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -20,7 +20,6 @@ */ #include <iostream> #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" #include "Connector.h" @@ -36,7 +35,6 @@ namespace client { using namespace qpid::sys; using namespace qpid::framing; -using qpid::QpidError; Connector::Connector( ProtocolVersion ver, bool _debug, uint32_t buffer_size diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 8aaaea247a..af6badd6e0 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -98,6 +98,7 @@ class Connector : public framing::OutputHandler, virtual void setInputHandler(framing::InputHandler* handler); virtual void setTimeoutHandler(sys::TimeoutHandler* handler); virtual void setShutdownHandler(sys::ShutdownHandler* handler); + virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; } virtual framing::OutputHandler* getOutputHandler(); virtual void send(framing::AMQFrame& frame); virtual void setReadTimeout(uint16_t timeout); diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index e4edece414..c70b0fc455 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -73,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame) void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range) { if (range.size() % 2) { //must be even number - throw ConnectionException(530, "Received odd number of elements in ranged mark"); + throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark")); } else { SequenceNumber mark(cumulative); { diff --git a/cpp/src/qpid/client/Future.h b/cpp/src/qpid/client/Future.h index 667a19e942..d07f9f149c 100644 --- a/cpp/src/qpid/client/Future.h +++ b/cpp/src/qpid/client/Future.h @@ -63,7 +63,7 @@ public: boost::bind(&FutureCompletion::completed, &callback) ); callback.waitForCompletion(); - session.checkClosed(); + session.assertOpen(); complete = true; } } diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index 73b7c3a7a6..5d36a1d873 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -31,7 +31,7 @@ using namespace qpid::sys; AMQMethodBody* FutureResponse::getResponse(SessionCore& session) { waitForCompletion(); - session.checkClosed(); + session.assertOpen(); return response.get(); } diff --git a/cpp/src/qpid/client/FutureResult.cpp b/cpp/src/qpid/client/FutureResult.cpp index a523129206..681202edea 100644 --- a/cpp/src/qpid/client/FutureResult.cpp +++ b/cpp/src/qpid/client/FutureResult.cpp @@ -30,7 +30,7 @@ using namespace qpid::sys; const std::string& FutureResult::getResult(SessionCore& session) const { waitForCompletion(); - session.checkClosed(); + session.assertOpen(); return result; } diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 966d07eaef..27440465fe 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -24,105 +24,301 @@ #include "FutureResponse.h" #include "FutureResult.h" #include "ConnectionImpl.h" - +#include "qpid/framing/FrameSet.h" #include "qpid/framing/constants.h" +#include "qpid/framing/ClientInvoker.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> -using namespace qpid::client; +namespace qpid { +namespace client { + using namespace qpid::framing; -SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, uint16_t ch, uint64_t maxFrameSize) - : connection(conn), channel(ch), l2(*this), l3(maxFrameSize), - uuid(false), sync(false) +namespace { const std::string OK="ok"; } + +typedef sys::Monitor::ScopedLock Lock; +typedef sys::Monitor::ScopedUnlock UnLock; + +inline void SessionCore::invariant() const { + switch (state.get()) { + case OPENING: + assert(!session); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case RESUMING: + assert(session); + assert(session->getState() == SessionState::RESUMING); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case OPEN: + case CLOSING: + case SUSPENDING: + assert(session); + assert(code==REPLY_SUCCESS); + assert(connection); + assert(channel.get()); + assert(channel.next == connection.get()); + break; + case SUSPENDED: + assert(code==REPLY_SUCCESS); + assert(session); + assert(!connection); + break; + case CLOSED: + assert(!session); + assert(!connection); + break; + } +} + +inline void SessionCore::setState(State s) { + state = s; + invariant(); +} + +inline void SessionCore::waitFor(State s) { + invariant(); + // We can be CLOSED or SUSPENDED by error at any time. + state.waitFor(States(s, CLOSED, SUSPENDED)); + check(); + assert(state==s); + invariant(); +} + +SessionCore::SessionCore(shared_ptr<ConnectionImpl> conn, + uint16_t ch, uint64_t maxFrameSize) + : l3(maxFrameSize), + sync(false), + channel(ch), + proxy(channel), + state(OPENING) { - l2.next = &l3; l3.out = &out; - out.next = connection.get(); + attaching(conn); } -SessionCore::~SessionCore() {} +void SessionCore::attaching(shared_ptr<ConnectionImpl> c) { + assert(c); + assert(channel.get()); + connection = c; + channel.next = connection.get(); + code = REPLY_SUCCESS; + text = OK; + state = session ? RESUMING : OPENING; + invariant(); +} -ExecutionHandler& SessionCore::getExecution() -{ - checkClosed(); - return l3; +SessionCore::~SessionCore() { + Lock l(state); + invariant(); + detach(COMMAND_INVALID, "Session deleted"); + state.waitAll(); } -FrameSet::shared_ptr SessionCore::get() -{ - checkClosed(); - return l3.getDemux().getDefault().pop(); +void SessionCore::detach(int c, const std::string& t) { + connection.reset(); + channel.next = 0; + code=c; + text=t; } -void SessionCore::setSync(bool s) -{ +void SessionCore::doClose(int code, const std::string& text) { + if (state != CLOSED) { + session.reset(); + l3.getDemux().close(); + l3.getCompletionTracker().close(); + detach(code, text); + setState(CLOSED); + } + invariant(); +} + +void SessionCore::doSuspend(int code, const std::string& text) { + if (state != CLOSED) { + invariant(); + detach(code, text); + session->suspend(); + setState(SUSPENDED); + } +} + +ExecutionHandler& SessionCore::getExecution() { // user thread + return l3; +} + +void SessionCore::setSync(bool s) { // user thread sync = s; } -bool SessionCore::isSync() -{ +bool SessionCore::isSync() { // user thread return sync; } -namespace { -struct ClosedOnExit { - SessionCore& core; - int code; - std::string text; - ClosedOnExit(SessionCore& s, int c, const std::string& t) - : core(s), code(c), text(t) {} - ~ClosedOnExit() { core.closed(code, text); } -}; +FrameSet::shared_ptr SessionCore::get() { // user thread + // No lock here: pop does a blocking wait. + return l3.getDemux().getDefault().pop(); +} + +void SessionCore::open(uint32_t detachedLifetime) { // user thread + Lock l(state); + check(state==OPENING && !session, + COMMAND_INVALID, QPID_MSG("Cannot re-open a session.")); + proxy.open(detachedLifetime); + waitFor(OPEN); +} + +void SessionCore::close() { // user thread + Lock l(state); + check(); + if (state==OPEN) { + setState(CLOSING); + proxy.close(); + waitFor(CLOSED); + } + else + doClose(REPLY_SUCCESS, OK); +} + +void SessionCore::suspend() { // user thread + Lock l(state); + checkOpen(); + setState(SUSPENDING); + proxy.suspend(); + waitFor(SUSPENDED); } -void SessionCore::close() +void SessionCore::setChannel(uint16_t ch) { channel=ch; } + +void SessionCore::resume(shared_ptr<ConnectionImpl> c) { + // user thread + { + Lock l(state); + if (state==OPEN) + doSuspend(REPLY_SUCCESS, OK); + check(state==SUSPENDED, COMMAND_INVALID, QPID_MSG("Session cannot be resumed.")); + SequenceNumber sendAck=session->resuming(); + attaching(c); + proxy.resume(getId()); + waitFor(OPEN); + proxy.ack(sendAck, SequenceNumberSet()); + // FIXME aconway 2007-10-23: Replay inside the lock might be a prolem + // for large replay sets. + SessionState::Replay replay=session->replay(); + for (SessionState::Replay::iterator i = replay.begin(); + i != replay.end(); ++i) + { + invariant(); + channel.handle(*i); // Direct to channel. + check(); + } + } +} + +void SessionCore::assertOpen() const { + Lock l(state); + checkOpen(); +} + +// network thread +void SessionCore::attached(const Uuid& sessionId, + uint32_t /*detachedLifetime*/) { - checkClosed(); - ClosedOnExit closer(*this, CHANNEL_ERROR, "Session closed by user."); - l2.close(); + Lock l(state); + invariant(); + check(state == OPENING || state == RESUMING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.attached")); + if (state==OPENING) { // New session + // FIXME aconway 2007-10-17: arbitrary ack value of 100 for + // client, allow configuration. + session=in_place<SessionState>(100, sessionId); + setState(OPEN); + } + else { // RESUMING + check(sessionId == session->getId(), + INVALID_ARGUMENT, QPID_MSG("session.resumed has invalid ID.")); + // Don't setState yet, wait for first incoming ack. + } } -void SessionCore::suspend() { - checkClosed(); - ClosedOnExit closer(*this, CHANNEL_ERROR, "Client session is suspended"); - l2.suspend(); +void SessionCore::detached() { // network thread + Lock l(state); + check(state == SUSPENDING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.detached.")); + connection->erase(channel); + doSuspend(REPLY_SUCCESS, OK); +} + +void SessionCore::ack(uint32_t ack, const SequenceNumberSet&) { + Lock l(state); + invariant(); + check(state==OPEN || state==RESUMING, + COMMAND_INVALID, QPID_MSG("Received unexpected session.ack")); + session->receivedAck(ack); + if (state==RESUMING) { + setState(OPEN); + } + invariant(); } void SessionCore::closed(uint16_t code, const std::string& text) -{ - out.next = 0; - reason.code = code; - reason.text = text; - l2.closed(); - l3.getDemux().close(); - l3.getCompletionTracker().close(); +{ // network thread + Lock l(state); + invariant(); + doClose(code, text); } -void SessionCore::checkClosed() const -{ - // TODO: could have been a connection exception - if(out.next == 0) - throw ChannelException(reason.code, reason.text); +// closed by connection +void SessionCore::connectionClosed(uint16_t code, const std::string& text) { + Lock l(state); + try { + doClose(code, text); + } catch(...) { assert (0); } } -void SessionCore::open(uint32_t detachedLifetime) { - assert(out.next); - l2.open(detachedLifetime); +void SessionCore::connectionBroke(uint16_t code, const std::string& text) { + Lock l(state); + try { + doSuspend(code, text); + } catch (...) { assert(0); } } -void SessionCore::resume(shared_ptr<ConnectionImpl> conn) { - connection = conn; - out.next = connection.get(); - l2.resume(); +void SessionCore::check() const { // Called with lock held. + invariant(); + if (code != REPLY_SUCCESS) + throwReplyException(code, text); +} + +void SessionCore::check(bool cond, int newCode, const std::string& msg) const { + check(); + if (!cond) { + const_cast<SessionCore*>(this)->doClose(newCode, msg); + throwReplyException(code, text); + } } -Future SessionCore::send(const AMQBody& command) -{ - checkClosed(); +void SessionCore::checkOpen() const { + if (state==SUSPENDED) { + std::string cause; + if (code != REPLY_SUCCESS) + cause=" by :"+text; + throw CommandInvalidException(QPID_MSG("Session is suspended" << cause)); + } + check(state==OPEN, COMMAND_INVALID, QPID_MSG("Session is not open")); +} +Future SessionCore::send(const AMQBody& command) +{ + Lock l(state); + checkOpen(); command.getMethod()->setSync(sync); - Future f; //any result/response listeners must be set before the command is sent if (command.getMethod()->resultExpected()) { @@ -145,21 +341,61 @@ Future SessionCore::send(const AMQBody& command) Future SessionCore::send(const AMQBody& command, const MethodContent& content) { - checkClosed(); + Lock l(state); + checkOpen(); //content bearing methods don't currently have responses or //results, if that changes should follow procedure for the other //send method impl: return Future(l3.send(command, content)); } +// Network thread. void SessionCore::handleIn(AMQFrame& frame) { - l2.handle(frame); + try { + // Cast to expose private SessionHandler functions. + if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) { + session->received(frame); + l3.handle(frame); + } + } catch (const ChannelException& e) { + QPID_LOG(error, "Channel exception:" << e.what()); + doClose(e.code, e.what()); + } } void SessionCore::handleOut(AMQFrame& frame) { - checkClosed(); - frame.setChannel(channel); - out.next->handle(frame); + Lock l(state); + if (state==OPEN) { + if (session->sent(frame)) + proxy.solicitAck(); + channel.handle(frame); + } +} + +void SessionCore::solicitAck( ) { + Lock l(state); + checkOpen(); + proxy.ack(session->sendingAck(), SequenceNumberSet()); +} + +void SessionCore::flow(bool) { + assert(0); throw NotImplementedException("session.flow"); +} + +void SessionCore::flowOk(bool /*active*/) { + assert(0); throw NotImplementedException("session.flow"); +} + +void SessionCore::highWaterMark(uint32_t /*lastSentMark*/) { + // FIXME aconway 2007-10-02: may be removed from spec. + assert(0); throw NotImplementedException("session.highWaterMark"); +} + +const Uuid SessionCore::getId() const { + if (session) + return session->getId(); + throw Exception(QPID_MSG("Closed session, no ID.")); } +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index ac109e1f5c..38c72359a3 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -22,17 +22,25 @@ #ifndef _SessionCore_ #define _SessionCore_ -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> -#include "qpid/framing/AMQMethodBody.h" +#include "qpid/shared_ptr.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/framing/MethodContent.h" -#include "qpid/framing/Uuid.h" -#include "SessionHandler.h" +#include "qpid/framing/ChannelHandler.h" +#include "qpid/framing/SessionState.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/AMQP_ClientOperations.h" +#include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/sys/StateMonitor.h" #include "ExecutionHandler.h" +#include <boost/optional.hpp> + namespace qpid { +namespace framing { +class FrameSet; +class MethodContent; +class SequenceNumberSet; +} + namespace client { class Future; @@ -43,60 +51,90 @@ class ConnectionImpl; * Attaches to a SessionHandler when active, detaches * when closed. */ -class SessionCore : public framing::FrameHandler::InOutHandler +class SessionCore : public framing::FrameHandler::InOutHandler, + private framing::AMQP_ClientOperations::SessionHandler { - struct Reason - { - uint16_t code; - std::string text; - }; - - shared_ptr<ConnectionImpl> connection; - uint16_t channel; - SessionHandler l2; - ExecutionHandler l3; - framing::Uuid uuid; - volatile bool sync; - Reason reason; - - protected: - void handleIn(framing::AMQFrame& frame); - void handleOut(framing::AMQFrame& frame); - public: SessionCore(shared_ptr<ConnectionImpl>, uint16_t channel, uint64_t maxFrameSize); ~SessionCore(); framing::FrameSet::shared_ptr get(); + const framing::Uuid getId() const; + uint16_t getChannel() const { return channel; } + void assertOpen() const; - framing::Uuid getId() const { return uuid; } - void setId(const framing::Uuid& id) { uuid= id; } - - uint16_t getChannel() const { assert(channel); return channel; } - void setChannel(uint16_t ch) { assert(ch); channel=ch; } - + // NOTE: Public functions called in user thread. void open(uint32_t detachedLifetime); - - /** Closed by client code */ void close(); - - /** Closed by peer */ - void closed(uint16_t code, const std::string& text); - void resume(shared_ptr<ConnectionImpl>); void suspend(); + void setChannel(uint16_t channel); - void setSync(bool); + void setSync(bool s); bool isSync(); ExecutionHandler& getExecution(); - void checkClosed() const; Future send(const framing::AMQBody& command); + Future send(const framing::AMQBody& command, const framing::MethodContent& content); -}; -} -} + void connectionClosed(uint16_t code, const std::string& text); + void connectionBroke(uint16_t code, const std::string& text); + + private: + enum State { + OPENING, + RESUMING, + OPEN, + CLOSING, + SUSPENDING, + SUSPENDED, + CLOSED + }; + typedef framing::AMQP_ClientOperations::SessionHandler SessionHandler; + typedef sys::StateMonitor<State, CLOSED> StateMonitor; + typedef StateMonitor::Set States; + + inline void invariant() const; + inline void setState(State s); + inline void waitFor(State); + void doClose(int code, const std::string& text); + void doSuspend(int code, const std::string& text); + + /** If there is an error, throw the exception */ + void check(bool condition, int code, const std::string& text) const; + /** Throw if *error */ + void check() const; + + void handleIn(framing::AMQFrame& frame); + void handleOut(framing::AMQFrame& frame); + + // Private functions are called by broker in network thread. + void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime); + void flow(bool active); + void flowOk(bool active); + void detached(); + void ack(uint32_t cumulativeSeenMark, + const framing::SequenceNumberSet& seenFrameSet); + void highWaterMark(uint32_t lastSentMark); + void solicitAck(); + void closed(uint16_t code, const std::string& text); + + void attaching(shared_ptr<ConnectionImpl>); + void detach(int code, const std::string& text); + void checkOpen() const; + + int code; // Error code + std::string text; // Error text + boost::optional<framing::SessionState> session; + shared_ptr<ConnectionImpl> connection; + ExecutionHandler l3; + volatile bool sync; + framing::ChannelHandler channel; + framing::AMQP_ServerProxy::Session proxy; + mutable StateMonitor state; +}; +}} // namespace qpid::client #endif diff --git a/cpp/src/qpid/client/SessionHandler.cpp b/cpp/src/qpid/client/SessionHandler.cpp deleted file mode 100644 index d3b04e5356..0000000000 --- a/cpp/src/qpid/client/SessionHandler.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "SessionHandler.h" -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/all_method_bodies.h" -#include "qpid/client/SessionCore.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" - -using namespace qpid::client; -using namespace qpid::framing; -using namespace boost; - -namespace { -// TODO aconway 2007-09-28: hack till we have multi-version support. -ProtocolVersion version; -} - -SessionHandler::SessionHandler(SessionCore& parent) - : StateManager(CLOSED), core(parent) {} - -SessionHandler::~SessionHandler() {} - -void SessionHandler::handle(AMQFrame& frame) -{ - AMQBody* body = frame.getBody(); - if (getState() == OPEN) { - core.checkClosed(); - SessionClosedBody* closedBody= - dynamic_cast<SessionClosedBody*>(body->getMethod()); - if (closedBody) { - closed(); - core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); - } else { - try { - next->handle(frame); - } - catch(const ChannelException& e){ - QPID_LOG(error, "Channel exception:" << e.what()); - closed(); - AMQFrame f(0, SessionClosedBody(version, e.code, e.toString())); - core.out(f); - core.closed(closedBody->getReplyCode(), closedBody->getReplyText()); - } - } - } else { - if (body->getMethod()) - handleMethod(body->getMethod()); - else - throw ConnectionException(504, "Channel not open for content."); - } -} - -void SessionHandler::attach(const AMQMethodBody& command) -{ - setState(OPENING); - AMQFrame f(0, command); - core.out(f); - std::set<int> states; - states.insert(OPEN); - states.insert(CLOSED); - waitFor(states); - if (getState() != OPEN) - throw Exception(QPID_MSG("Failed to attach session to channel "<<core.getChannel())); -} - -void SessionHandler::open(uint32_t detachedLifetime) { - attach(SessionOpenBody(version, detachedLifetime)); -} - -void SessionHandler::resume() { - attach(SessionResumeBody(version, core.getId())); -} - -void SessionHandler::detach(const AMQMethodBody& command) -{ - setState(CLOSING); - AMQFrame f(0, command); - core.out(f); - waitFor(CLOSED); -} - -void SessionHandler::close() { detach(SessionCloseBody(version)); } -void SessionHandler::suspend() { detach(SessionSuspendBody(version)); } -void SessionHandler::closed() { setState(CLOSED); } - -void SessionHandler::handleMethod(AMQMethodBody* method) -{ - switch (getState()) { - case OPENING: { - SessionAttachedBody* attached = dynamic_cast<SessionAttachedBody*>(method); - if (attached) { - core.setId(attached->getSessionId()); - setState(OPEN); - } else - throw ChannelErrorException(); - break; - } - case CLOSING: - if (method->isA<SessionClosedBody>() || - method->isA<SessionDetachedBody>()) - closed(); - break; - - case CLOSED: - throw ChannelErrorException(); - - default: - assert(0); - throw InternalErrorException(QPID_MSG("Internal Error.")); - } -} - diff --git a/cpp/src/qpid/client/SessionHandler.h b/cpp/src/qpid/client/SessionHandler.h deleted file mode 100644 index 994b8402de..0000000000 --- a/cpp/src/qpid/client/SessionHandler.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _SessionHandler_ -#define _SessionHandler_ - -#include "StateManager.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/Uuid.h" -#include "qpid/shared_ptr.h" - -namespace qpid { -namespace client { -class SessionCore; - -/** - * Handles incoming session (L2) commands. - */ -class SessionHandler : public framing::FrameHandler, - private StateManager -{ - enum STATES {OPENING, OPEN, CLOSING, CLOSED}; - SessionCore& core; - - void handleMethod(framing::AMQMethodBody* method); - void attach(const framing::AMQMethodBody&); - void detach(const framing::AMQMethodBody&); - - public: - SessionHandler(SessionCore& parent); - ~SessionHandler(); - - /** Incoming from broker */ - void handle(framing::AMQFrame&); - - void open(uint32_t detachedLifetime); - void resume(); - void close(); - void closed(); - void suspend(); -}; - -}} - -#endif diff --git a/cpp/src/qpid/client/StateManager.cpp b/cpp/src/qpid/client/StateManager.cpp index b72967c098..0cb3c6b9d4 100644 --- a/cpp/src/qpid/client/StateManager.cpp +++ b/cpp/src/qpid/client/StateManager.cpp @@ -60,7 +60,7 @@ void StateManager::setState(int s) stateLock.notifyAll(); } -int StateManager::getState() +int StateManager::getState() const { Monitor::ScopedLock l(stateLock); return state; diff --git a/cpp/src/qpid/client/StateManager.h b/cpp/src/qpid/client/StateManager.h index fd0c1b7f86..2f8ecb772c 100644 --- a/cpp/src/qpid/client/StateManager.h +++ b/cpp/src/qpid/client/StateManager.h @@ -30,12 +30,12 @@ namespace client { class StateManager { int state; - sys::Monitor stateLock; + mutable sys::Monitor stateLock; public: StateManager(int initial); void setState(int state); - int getState(); + int getState() const ; void waitForStateChange(int current); void waitFor(std::set<int> states); void waitFor(int state); diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index abd33c4158..423af06173 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -20,9 +20,9 @@ */ #include "AMQFrame.h" -#include "qpid/QpidError.h" #include "qpid/framing/variant.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> @@ -103,7 +103,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t flags = buffer.getOctet(); uint8_t framing_version = (flags & 0xc0) >> 6; if (framing_version != 0) - THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported"); + throw SyntaxErrorException(QPID_MSG("Framing version unsupported")); bof = flags & 0x08; eof = flags & 0x04; bos = flags & 0x02; @@ -111,7 +111,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t type = buffer.getOctet(); uint16_t frame_size = buffer.getShort(); if (frame_size < frameOverhead()-1) - THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small"); + throw SyntaxErrorException(QPID_MSG("Frame size too small")); uint8_t reserved1 = buffer.getOctet(); uint8_t field1 = buffer.getOctet(); subchannel = field1 & 0x0f; @@ -121,7 +121,7 @@ bool AMQFrame::decode(Buffer& buffer) // Verify that the protocol header meets current spec // TODO: should we check reserved2 against zero as well? - the spec isn't clear if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0) - THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero"); + throw SyntaxErrorException(QPID_MSG("Reserved bits not zero")); // TODO: should no longer care about body size and only pass up B,E,b,e flags uint16_t body_size = frame_size + 1 - frameOverhead(); @@ -133,7 +133,7 @@ bool AMQFrame::decode(Buffer& buffer) uint8_t end = buffer.getOctet(); if (end != 0xCE) - THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); + throw SyntaxErrorException(QPID_MSG("Frame end not found")); return true; } @@ -147,9 +147,7 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type) case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break; default: - THROW_QPID_ERROR( - FRAMING_ERROR, - boost::format("Unknown frame type %d") % type); + throw SyntaxErrorException(QPID_MSG("Invalid frame type " << type)); } boost::apply_visitor(DecodeVisitor(buffer,size), body); } diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp index 53a01141c1..fb84be7cd6 100644 --- a/cpp/src/qpid/framing/BodyHandler.cpp +++ b/cpp/src/qpid/framing/BodyHandler.cpp @@ -18,14 +18,13 @@ * under the License. * */ -#include "qpid/QpidError.h" #include "BodyHandler.h" #include "AMQMethodBody.h" #include "AMQHeaderBody.h" #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" - #include <boost/cast.hpp> +#include "qpid/framing/reply_exceptions.h" using namespace qpid::framing; using namespace boost; @@ -49,7 +48,8 @@ void BodyHandler::handleBody(AMQBody* body) { handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); break; default: - QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type()); + throw SyntaxErrorException( + QPID_MSG("Invalid frame type " << body->type())); } } diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index 6a466fdfab..8c1a4e1e9e 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -15,8 +15,6 @@ * limitations under the License. * */ -#include <boost/format.hpp> - #include "ChannelAdapter.h" #include "OutputHandler.h" #include "AMQFrame.h" @@ -26,8 +24,6 @@ #include "AMQMethodBody.h" #include "qpid/framing/ConnectionOpenBody.h" -using boost::format; - namespace qpid { namespace framing { @@ -53,20 +49,20 @@ void ChannelAdapter::send(const AMQBody& body) void ChannelAdapter::assertMethodOk(AMQMethodBody& method) const { if (getId() != 0 && method.amqpClassId() == ConnectionOpenBody::CLASS_ID) - throw ConnectionException( - 504, format("Connection method on non-0 channel %d.")%getId()); + throw ChannelErrorException( + QPID_MSG("Connection method on non-0 channel " << getId())); } void ChannelAdapter::assertChannelOpen() const { if (getId() != 0 && !isOpen()) - throw ConnectionException( - 504, format("Channel %d is not open.")%getId()); + throw ChannelErrorException( + QPID_MSG("Channel " << getId() << " is not open.")); } void ChannelAdapter::assertChannelNotOpen() const { if (getId() != 0 && isOpen()) - throw ConnectionException( - 504, format("Channel %d is already open.") % getId()); + throw ChannelErrorException( + QPID_MSG("Channel " << getId() << " is already open.")); } void ChannelAdapter::handle(AMQFrame& f) { handleBody(f.getBody()); } diff --git a/cpp/src/qpid/framing/ProtocolVersionException.h b/cpp/src/qpid/framing/ChannelHandler.h index bd16804470..69aaeac492 100644 --- a/cpp/src/qpid/framing/ProtocolVersionException.h +++ b/cpp/src/qpid/framing/ChannelHandler.h @@ -1,3 +1,6 @@ +#ifndef QPID_FRAMING_CHANNELHANDLER_H +#define QPID_FRAMING_CHANNELHANDLER_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,39 +21,33 @@ * under the License. * */ - -#ifndef _ProtocolVersionException_ -#define _ProtocolVersionException_ - -#include "qpid/Exception.h" -#include "ProtocolVersion.h" -#include <string> -#include <vector> +#include "FrameHandler.h" +#include "AMQFrame.h" namespace qpid { namespace framing { -class ProtocolVersionException : public qpid::Exception +/** + * Sets the channel number on outgoing frames. + */ +class ChannelHandler : public FrameHandler { -protected: - ProtocolVersion versionFound; - -public: - ~ProtocolVersionException() throw() {} - - template <class T> - ProtocolVersionException( - ProtocolVersion ver, const T& msg) throw () : versionFound(ver) - { init(boost::lexical_cast<std::string>(msg)); } - - template <class T> - ProtocolVersionException(const T& msg) throw () - { init(boost::lexical_cast<std::string>(msg)); } + public: + ChannelHandler(uint16_t channelId=0, FrameHandler* next=0) + : FrameHandler(next), channel(channelId) {} + void handle(AMQFrame& frame) { + frame.setChannel(channel); + next->handle(frame); + } + uint16_t get() const { return channel; } + ChannelHandler& set(uint16_t ch) { channel=ch; return *this; } + operator uint16_t() const { return get(); } + ChannelHandler& operator=(uint16_t ch) { return set(ch); } private: - void init(const std::string& msg); + uint16_t channel; }; }} // namespace qpid::framing -#endif //ifndef _ProtocolVersionException_ +#endif /*!QPID_FRAMING_CHANNELHANDLER_H*/ diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp index 3c0284f2c8..089bc5d4a5 100644 --- a/cpp/src/qpid/framing/FieldTable.cpp +++ b/cpp/src/qpid/framing/FieldTable.cpp @@ -19,9 +19,10 @@ * */ #include "FieldTable.h" -#include "qpid/QpidError.h" #include "Buffer.h" #include "FieldValue.h" +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" #include <assert.h> namespace qpid { @@ -132,7 +133,7 @@ void FieldTable::decode(Buffer& buffer){ uint32_t len = buffer.getLong(); uint32_t available = buffer.available(); if (available < len) - THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for field table."); + throw SyntaxErrorException(QPID_MSG("Not enough data for field table.")); uint32_t leftover = available - len; while(buffer.available() > leftover){ std::string name; diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp index a7535ae4b9..5526c9cb72 100644 --- a/cpp/src/qpid/framing/FieldValue.cpp +++ b/cpp/src/qpid/framing/FieldValue.cpp @@ -20,8 +20,7 @@ */ #include "FieldValue.h" #include "Buffer.h" -#include "qpid/QpidError.h" - +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -75,9 +74,7 @@ void FieldValue::decode(Buffer& buffer) data.reset(new FixedWidthValue<0>()); break; default: - std::stringstream out; - out << "Unknown field table value type: " << typeOctet; - THROW_QPID_ERROR(FRAMING_ERROR, out.str()); + throw SyntaxErrorException(QPID_MSG("Unknown field table value type: " << (int)typeOctet)); } data->decode(buffer); } diff --git a/cpp/src/qpid/framing/FramingContent.cpp b/cpp/src/qpid/framing/FramingContent.cpp index 813e6fb49b..cd134b0e89 100644 --- a/cpp/src/qpid/framing/FramingContent.cpp +++ b/cpp/src/qpid/framing/FramingContent.cpp @@ -18,12 +18,10 @@ * under the License. * */ -#include <assert.h> - #include "Buffer.h" #include "FramingContent.h" -#include "qpid/QpidError.h" -#include <sstream> +#include "qpid/Exception.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -37,12 +35,12 @@ Content::Content(uint8_t _discriminator, const string& _value): discriminator(_d void Content::validate() { if (discriminator == REFERENCE) { if(value.empty()) { - THROW_QPID_ERROR(FRAMING_ERROR, "Reference cannot be empty"); + throw InvalidArgumentException( + QPID_MSG("Reference cannot be empty")); } }else if (discriminator != INLINE) { - std::stringstream out; - out << "Invalid discriminator: " << (int) discriminator; - THROW_QPID_ERROR(FRAMING_ERROR, out.str()); + throw SyntaxErrorException( + QPID_MSG("Invalid discriminator: " << discriminator)); } } diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index 3e55dff1bd..fbf3c0b7ca 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -33,6 +33,7 @@ template <class T> struct Handler { typedef T HandledType; typedef void handleFptr(T); + typedef void result_type; // Compatible with std/boost functors. Handler(Handler<T>* next_=0) : next(next_) {} virtual ~Handler() {} @@ -51,7 +52,7 @@ struct Handler { struct Chain : public Handler<T> { Chain(Handler<T>* first=0) : Handler(first) {} void operator=(Handler<T>* h) { next = h; } - void handle(T t) { (*next)(t); } + void handle(T t) { next->handle(t); } // TODO aconway 2007-08-29: chain modifier ops here. }; diff --git a/cpp/src/qpid/framing/ProtocolVersionException.cpp b/cpp/src/qpid/framing/ProtocolVersionException.cpp deleted file mode 100644 index b68b3af1f9..0000000000 --- a/cpp/src/qpid/framing/ProtocolVersionException.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/format.hpp> -#include "ProtocolVersionException.h" - - -using namespace qpid::framing; - -void ProtocolVersionException::init(const std::string& msg) -{ - whatStr = boost::str( - boost::format("ProtocolVersionException: %s found: %s") - % versionFound.toString() % msg); -} - diff --git a/cpp/src/qpid/framing/ResumeHandler.cpp b/cpp/src/qpid/framing/ResumeHandler.cpp deleted file mode 100644 index 9d2c971459..0000000000 --- a/cpp/src/qpid/framing/ResumeHandler.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ResumeHandler.h" -#include "qpid/framing/reply_exceptions.h" - -#include <boost/bind.hpp> - -#include <algorithm> - -namespace qpid { -namespace framing { - -void ResumeHandler::ackReceived(SequenceNumber acked) { - if (lastSent < acked) - throw InvalidArgumentException("Invalid sequence number in ack"); - size_t keep = lastSent - acked; - if (keep < unacked.size()) - unacked.erase(unacked.begin(), unacked.end()-keep); -} - -void ResumeHandler::resend() { - std::for_each(unacked.begin(), unacked.end(), - boost::bind(&FrameHandler::handle,out->next, _1)); -} - -void ResumeHandler::handleIn(AMQFrame& f) { - ++lastReceived; - in.next->handle(f); -} - -void ResumeHandler::handleOut(AMQFrame& f) { - ++lastSent; - unacked.push_back(f); - out.next->handle(f); -} - - -}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/ResumeHandler.h b/cpp/src/qpid/framing/ResumeHandler.h deleted file mode 100644 index c86a60b9cb..0000000000 --- a/cpp/src/qpid/framing/ResumeHandler.h +++ /dev/null @@ -1,69 +0,0 @@ -#ifndef QPID_FRAMING_RESUMEHANDLER_H -#define QPID_FRAMING_RESUMEHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/SequenceNumber.h" - -#include <deque> - -namespace qpid { -namespace framing { - -/** - * In/out handler pair for managing exactly-once session delivery. - * The same handler is used by client and broker. - * This handler only deals with TCP style SequenceNumber acks, - * not with fragmented SequenceNumberSet. - * - * THREAD UNSAFE. Expected to be used in a serialized context. - */ -class ResumeHandler : public FrameHandler::InOutHandler -{ - public: - /** Received acknowledgement for sent frames up to and including sentOk */ - void ackReceived(SequenceNumber sentOk); - - /** What was the last sequence number we received. */ - SequenceNumber getLastReceived() { return lastReceived; } - - /** Resend the unacked frames to the output handler */ - void resend(); - - protected: - void handleIn(AMQFrame&); - void handleOut(AMQFrame&); - - private: - typedef std::deque<AMQFrame> Frames; - Frames unacked; - SequenceNumber lastReceived; - SequenceNumber lastSent; -}; - - -}} // namespace qpid::common - - -#endif /*!QPID_FRAMING_RESUMEHANDLER_H*/ diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h index 9b8f0659b2..3aee04a4ce 100644 --- a/cpp/src/qpid/framing/SequenceNumber.h +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -47,6 +47,7 @@ class SequenceNumber bool operator<=(const SequenceNumber& other) const; bool operator>=(const SequenceNumber& other) const; uint32_t getValue() const { return (uint32_t) value; } + operator uint32_t() const { return (uint32_t) value; } friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); }; diff --git a/cpp/src/qpid/framing/SessionState.cpp b/cpp/src/qpid/framing/SessionState.cpp new file mode 100644 index 0000000000..045a0ae115 --- /dev/null +++ b/cpp/src/qpid/framing/SessionState.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIE4bS OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SessionState.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/constants.h" +#include "qpid/framing/AMQMethodBody.h" +#include "qpid/log/Statement.h" + +#include <algorithm> + +#include <boost/bind.hpp> +#include <boost/none.hpp> + +namespace qpid { +namespace framing { + +SessionState::SessionState(uint32_t ack, const Uuid& uuid) : + state(ATTACHED), + id(uuid), + lastReceived(-1), + lastSent(-1), + ackInterval(ack), + sendAckAt(lastReceived+ackInterval), + solicitAckAt(lastSent+ackInterval), + ackSolicited(false) +{ + assert(ackInterval > 0); +} + +namespace { +bool isSessionCommand(const AMQFrame& f) { + return f.getMethod() && f.getMethod()->amqpClassId() == SESSION_CLASS_ID; +} +} + +boost::optional<SequenceNumber> SessionState::received(const AMQFrame& f) { + if (isSessionCommand(f)) + return boost::none; + if (state==RESUMING) + throw CommandInvalidException( + QPID_MSG("Invalid frame: Resuming session, expected session-ack")); + assert(state = ATTACHED); + assert(lastReceived<sendAckAt); + ++lastReceived; + QPID_LOG(trace, "Recv # "<< lastReceived << " " << id); + if (lastReceived == sendAckAt) + return sendingAck(); + else + return boost::none; +} + +bool SessionState::sent(const AMQFrame& f) { + if (isSessionCommand(f)) + return false; + unackedOut.push_back(f); + ++lastSent; + QPID_LOG(trace, "Sent # "<< lastSent << " " << id); + return (state!=RESUMING) && + (lastSent == solicitAckAt) && + sendingSolicit(); +} + +SessionState::Replay SessionState::replay() { + Replay r(unackedOut.size()); + std::copy(unackedOut.begin(), unackedOut.end(), r.begin()); + return r; +} + +void SessionState::receivedAck(SequenceNumber acked) { + if (state==RESUMING) state=ATTACHED; + assert(state==ATTACHED); + if (lastSent < acked) + throw InvalidArgumentException("Invalid sequence number in ack"); + size_t keep = lastSent - acked; + if (keep < unackedOut.size()) + unackedOut.erase(unackedOut.begin(), unackedOut.end()-keep); + solicitAckAt = std::max(solicitAckAt, SequenceNumber(acked+ackInterval)); +} + +SequenceNumber SessionState::sendingAck() { + sendAckAt = lastReceived+ackInterval; + return lastReceived; +} + +bool SessionState::sendingSolicit() { + assert(state == ATTACHED); + if (ackSolicited) + return false; + solicitAckAt = lastSent + ackInterval; + return true; +} + +SequenceNumber SessionState::resuming() { + state = RESUMING; + return sendingAck(); +} + +void SessionState::suspend() { + state = SUSPENDED; +} + +}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/SessionState.h b/cpp/src/qpid/framing/SessionState.h new file mode 100644 index 0000000000..66fc083d3f --- /dev/null +++ b/cpp/src/qpid/framing/SessionState.h @@ -0,0 +1,127 @@ +#ifndef QPID_FRAMING_SESSIONSTATE_H +#define QPID_FRAMING_SESSIONSTATE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/AMQFrame.h" + +#include <boost/optional.hpp> + +#include <deque> + +namespace qpid { +namespace framing { + +/** + * Session state common to client and broker. + * Implements session ack/resume protcools. + * + * A SessionState is always associated with an _open_ session (attached or + * suspended) it is destroyed when the session is closed. + * + * A template to make it protocol independent and easy to test. + */ +class SessionState +{ + public: + typedef std::vector<AMQFrame> Replay; + + /** States of a session. */ + enum State { + SUSPENDED, ///< Suspended, detached from any channel. + RESUMING, ///< Resuming: waiting for initial ack from peer. + ATTACHED ///< Attached to channel and operating normally. + }; + + /** + *Create a newly opened active session. + *@param ackInterval send/solicit an ack whenever N unacked frames + * have been received/sent. + *@pre ackInterval > 0 + */ + SessionState(uint32_t ackInterval=1, const framing::Uuid& id=framing::Uuid(true)); + + const framing::Uuid& getId() const { return id; } + State getState() const { return state; } + + /** Received incoming L3 frame. + * @return SequenceNumber if an ack should be sent, empty otherwise. + * SessionState assumes that acks are sent whenever it returns + * a seq. number. + */ + boost::optional<SequenceNumber> received(const AMQFrame&); + + /** Sent outgoing L3 frame. + *@return true if solicit-ack should be sent. Note the SessionState + *assumes that a solicit-ack is sent every time it returns true. + */ + bool sent(const AMQFrame&); + + /** Received normal incoming ack. */ + void receivedAck(SequenceNumber); + + /** Frames to replay + *@pre getState()==ATTACHED + */ + Replay replay(); + + /** Suspend the session. */ + void suspend(); + + /** Start resume protocol for the session. + *@returns sequence number to ack immediately. */ + SequenceNumber resuming(); + + /** About to send an unscheduled ack, e.g. to respond to a solicit-ack. + * + * Note: when received() returns a sequence number this function + * should not be called. SessionState assumes that the ack is sent + * every time received() returns a sequence number. + */ + SequenceNumber sendingAck(); + + SequenceNumber getLastSent() const { return lastSent; } + SequenceNumber getLastReceived() const { return lastReceived; } + private: + typedef std::deque<AMQFrame> Unacked; + + bool sendingSolicit(); + + State state; + framing::Uuid id; + Unacked unackedOut; + SequenceNumber lastReceived; + SequenceNumber lastSent; + uint32_t ackInterval; + SequenceNumber sendAckAt; + SequenceNumber solicitAckAt; + bool ackSolicited; + bool suspending; +}; + + +}} // namespace qpid::common + + +#endif /*!QPID_FRAMING_SESSIONSTATE_H*/ diff --git a/cpp/src/qpid/framing/TemplateVisitor.h b/cpp/src/qpid/framing/TemplateVisitor.h new file mode 100644 index 0000000000..8c719e5110 --- /dev/null +++ b/cpp/src/qpid/framing/TemplateVisitor.h @@ -0,0 +1,89 @@ +#ifndef QPID_FRAMING_TEMPLATEVISITOR_H +#define QPID_FRAMING_TEMPLATEVISITOR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/mpl/fold.hpp> +#include <boost/utility/value_init.hpp> + +namespace qpid { +namespace framing { + +/** + * Metafunction to generate a visitor class derived from Base with a + * visit for each type in TypeList calling functor F. TypeList may be + * any boost::mpl type collection e.g. mpl::list. + * + * Generated class is: TemplateVisitor<Base, F, TypeList>::type + * + * @see make_visitor + */ +template <class VisitTemplate, class TypeList, class F> +class TemplateVisitor +{ + struct Base : public VisitorBase { + F action; + Base(F f) : action(f) {} + using VisitorBase::visit; + }; + + template <class B, class T> struct Visit : public B { + Visit(F action) : B(action) {} + using B::visit; + void visit(const T& body) { action(body); } + }; + + typedef typename boost::mpl::fold< + TypeList, Base, Visit<boost::mpl::placeholders::_1, + boost::mpl::placeholders::_2> + >::type type; +}; + +/** + * Construct a TemplateVisitor to perform the given action, + * for example: + * @code + */ +template <class VisitorBase, class TypeList, class F> +TemplateVisitor<VisitorBase,TypeList,F>::type make_visitor(F action) { + return TemplateVisitor<VisitorBase,TypeList,F>::type(action); +}; + +/** + * For method body classes in TypeList, invoke the corresponding function + * on Target and return true. For other body types return false. + */ +template <class TypeList, class Target> +bool invoke(const AMQBody& body, Target& target) { + typename InvokeVisitor<TypeList, Target>::type v(target); + body.accept(v); + return v.target; +} + +}} // namespace qpid::framing + + +#endif /*!QPID_FRAMING_INVOKEVISITOR_H*/ + +}} // namespace qpid::framing + + + +#endif /*!QPID_FRAMING_TEMPLATEVISITOR_H*/ diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index e0372b2f68..1bb69fbca9 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -24,9 +24,13 @@ namespace qpid { namespace framing { -TransferContent::TransferContent(const std::string& _data) +TransferContent::TransferContent(const std::string& data, + const std::string& routingKey, + const std::string& exchange) { - setData(_data); + setData(data); + getDeliveryProperties().setRoutingKey(routingKey); + getDeliveryProperties().setExchange(exchange); } AMQHeaderBody TransferContent::getHeader() const @@ -73,14 +77,14 @@ void TransferContent::populate(const FrameSet& frameset) const MessageProperties& TransferContent::getMessageProperties() const { const MessageProperties* props = header.get<MessageProperties>(); - if (!props) throw NoSuchPropertiesException(); + if (!props) throw Exception("No message properties."); return *props; } const DeliveryProperties& TransferContent::getDeliveryProperties() const { const DeliveryProperties* props = header.get<DeliveryProperties>(); - if (!props) throw NoSuchPropertiesException(); + if (!props) throw Exception("No message properties."); return *props; } diff --git a/cpp/src/qpid/framing/TransferContent.h b/cpp/src/qpid/framing/TransferContent.h index 6fd96f3587..88f45b7e0a 100644 --- a/cpp/src/qpid/framing/TransferContent.h +++ b/cpp/src/qpid/framing/TransferContent.h @@ -30,14 +30,15 @@ namespace qpid { namespace framing { -struct NoSuchPropertiesException : public Exception {}; - class TransferContent : public MethodContent { AMQHeaderBody header; std::string data; public: - TransferContent(const std::string& data = ""); + TransferContent(const std::string& data = std::string(), + const std::string& routingKey = std::string(), + const std::string& exchange = std::string()); + AMQHeaderBody getHeader() const; void setData(const std::string&); void appendData(const std::string&); diff --git a/cpp/src/qpid/framing/Uuid.cpp b/cpp/src/qpid/framing/Uuid.cpp index 3a83430d56..2918c48ce3 100644 --- a/cpp/src/qpid/framing/Uuid.cpp +++ b/cpp/src/qpid/framing/Uuid.cpp @@ -17,9 +17,9 @@ */ #include "Uuid.h" - -#include "qpid/QpidError.h" +#include "qpid/Exception.h" #include "qpid/framing/Buffer.h" +#include "qpid/framing/reply_exceptions.h" namespace qpid { namespace framing { @@ -34,7 +34,7 @@ void Uuid::encode(Buffer& buf) const { void Uuid::decode(Buffer& buf) { if (buf.available() < size()) - THROW_QPID_ERROR(FRAMING_ERROR, "Not enough data for UUID."); + throw SyntaxErrorException(QPID_MSG("Not enough data for UUID.")); buf.getRawData(c_array(), size()); } @@ -52,4 +52,10 @@ istream& operator>>(istream& in, Uuid& uuid) { return in; } +std::string Uuid::str() const { + std::ostringstream os; + os << *this; + return os.str(); +} + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Uuid.h b/cpp/src/qpid/framing/Uuid.h index 19ae79db6a..9bde67ad8e 100644 --- a/cpp/src/qpid/framing/Uuid.h +++ b/cpp/src/qpid/framing/Uuid.h @@ -62,6 +62,9 @@ struct Uuid : public boost::array<uint8_t, 16> { void encode(framing::Buffer& buf) const; void decode(framing::Buffer& buf); + + /** String value in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */ + std::string str() const; }; /** Print in format 1b4e28ba-2fa1-11d2-883f-b9a761bde3fb */ diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h index eec28333bc..69b5942ba0 100644 --- a/cpp/src/qpid/framing/amqp_framing.h +++ b/cpp/src/qpid/framing/amqp_framing.h @@ -32,4 +32,3 @@ #include "ProtocolInitiation.h" #include "BasicHeaderProperties.h" #include "ProtocolVersion.h" -#include "ProtocolVersionException.h" diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h index a788fe36e4..94442aa357 100644 --- a/cpp/src/qpid/framing/amqp_types.h +++ b/cpp/src/qpid/framing/amqp_types.h @@ -61,5 +61,11 @@ class Uuid; const ChannelId CHANNEL_MAX=(ChannelId(~1))>>1; const ChannelId CHANNEL_HIGH_BIT= ChannelId(~CHANNEL_MAX); +// Forward declare class types +class FramingContent; +class FieldTable; +class SequenceNumberSet; +class Uuid; + }} // namespace qpid::framing #endif diff --git a/cpp/src/qpid/framing/variant.h b/cpp/src/qpid/framing/variant.h index 3cb8aece5d..1fe81f8f67 100644 --- a/cpp/src/qpid/framing/variant.h +++ b/cpp/src/qpid/framing/variant.h @@ -23,7 +23,6 @@ /**@file Tools for using boost::variant */ -#include "qpid/QpidError.h" #include <boost/variant.hpp> @@ -39,7 +38,7 @@ template <class R=void> struct NoBlankVisitor : public boost::static_visitor<R> { R foundBlank() const { assert(0); - THROW_QPID_ERROR(INTERNAL_ERROR, "Invalid variant value."); + throw Exception(QPID_MSG("Invalid variant value.")); } R operator()(const boost::blank&) const { return foundBlank(); } R operator()(boost::blank&) const { return foundBlank(); } diff --git a/cpp/src/qpid/log/Logger.cpp b/cpp/src/qpid/log/Logger.cpp index 6e8f3a59cc..c483ed2379 100644 --- a/cpp/src/qpid/log/Logger.cpp +++ b/cpp/src/qpid/log/Logger.cpp @@ -190,12 +190,6 @@ void Logger::add(Statement& s) { statements.insert(&s); } -void Logger::remove(Statement& s) { - ScopedLock l(lock); - s.enabled = false; - statements.erase(&s); -} - void Logger::configure(const Options& opts, const std::string& prog) { clear(); diff --git a/cpp/src/qpid/log/Logger.h b/cpp/src/qpid/log/Logger.h index a2103f5ec6..7851c65406 100644 --- a/cpp/src/qpid/log/Logger.h +++ b/cpp/src/qpid/log/Logger.h @@ -67,9 +67,6 @@ class Logger : private boost::noncopyable { /** Add a statement. */ void add(Statement& s); - /** Remove a statement */ - void remove(Statement& s); - /** Log a message. */ void log(const Statement&, const std::string&); @@ -93,7 +90,7 @@ class Logger : private boost::noncopyable { /** Add an output destination for messages */ void output(std::auto_ptr<Output> out); - /** Reset the logger to it's original state */ + /** Reset the logger to it's original state. */ void clear(); private: diff --git a/cpp/src/qpid/log/Statement.cpp b/cpp/src/qpid/log/Statement.cpp index 9ab314b81c..de130bc455 100644 --- a/cpp/src/qpid/log/Statement.cpp +++ b/cpp/src/qpid/log/Statement.cpp @@ -32,10 +32,6 @@ Statement::Initializer::Initializer(Statement& s) : statement(s) { Logger::instance().add(s); } -Statement::Initializer::~Initializer() { - Logger::instance().remove(statement); -} - namespace { const char* names[LevelTraits::COUNT] = { "trace", "debug", "info", "notice", "warning", "error", "critical" diff --git a/cpp/src/qpid/log/Statement.h b/cpp/src/qpid/log/Statement.h index 4eb4d1e7d8..18162971b0 100644 --- a/cpp/src/qpid/log/Statement.h +++ b/cpp/src/qpid/log/Statement.h @@ -19,8 +19,9 @@ * */ +#include "qpid/Msg.h" + #include <boost/current_function.hpp> -#include <sstream> namespace qpid { namespace log { @@ -69,23 +70,14 @@ struct Statement { struct Initializer { Initializer(Statement& s); - ~Initializer(); Statement& statement; }; }; -///@internal trickery to make QPID_LOG_STRINGSTREAM work. -inline std::ostream& noop(std::ostream& s) { return s; } - ///@internal static initializer for a Statement. #define QPID_LOG_STATEMENT_INIT(level) \ { 0, __FILE__, __LINE__, BOOST_CURRENT_FUNCTION, (::qpid::log::level) } -///@internal Stream streamable message and return a string. -#define QPID_LOG_STRINGSTREAM(message) \ - static_cast<std::ostringstream&>( \ - std::ostringstream() << qpid::log::noop << message).str() - /** * Macro for log statements. Example of use: * @code @@ -110,19 +102,9 @@ inline std::ostream& noop(std::ostream& s) { return s; } static ::qpid::log::Statement stmt_= QPID_LOG_STATEMENT_INIT(level); \ static ::qpid::log::Statement::Initializer init_(stmt_); \ if (stmt_.enabled) \ - stmt_.log(QPID_LOG_STRINGSTREAM(message)); \ + stmt_.log(::qpid::Msg() << message); \ } while(0) -/** - * Macro for complicated logging logic that can't fit in a simple QPID_LOG - * statement. For example: - * @code - * QPID_IF_LOG(debug) { - * message = do_complicated_stuff; - * QPID_LOG(debug, message); - * } - */ -#define QPID_IF_LOG(level) }} // namespace qpid::log diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 733a892cff..eccfb1465e 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -29,6 +29,7 @@ #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" @@ -301,7 +302,7 @@ void AsynchIOHandler::idle(AsynchIO&){ } // If frame was egregiously large complain if (frameSize > buff->byteCount) - THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); buff->dataCount = buffUsed; aio->queueWrite(buff); diff --git a/cpp/src/qpid/sys/ConcurrentQueue.h b/cpp/src/qpid/sys/ConcurrentQueue.h index 917afc5704..cf8199954e 100644 --- a/cpp/src/qpid/sys/ConcurrentQueue.h +++ b/cpp/src/qpid/sys/ConcurrentQueue.h @@ -22,7 +22,7 @@ * */ -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Waitable.h" #include "qpid/sys/ScopedIncrement.h" #include <boost/bind.hpp> @@ -39,73 +39,73 @@ namespace sys { * * Also allows consuming threads to wait until an item is available. */ -template <class T> class ConcurrentQueue { +template <class T> class ConcurrentQueue : public Waitable { public: - ConcurrentQueue() : waiters(0), shutdown(false) {} + struct ShutdownException {}; + + ConcurrentQueue() : shutdownFlag(false) {} - /** Threads in wait() are woken with ShutdownException before - * destroying the queue. - */ - ~ConcurrentQueue() { - Mutex::ScopedLock l(lock); - shutdown = true; - lock.notifyAll(); - while (waiters > 0) - lock.wait(); + /** Waiting threads are notified by ~Waitable */ + ~ConcurrentQueue() { shutdown(); } + + bool shutdown(bool wait=true) { + ScopedLock l(lock); + if (!shutdownFlag) { + shutdownFlag=true; + lock.notifyAll(); + if (wait) lock.waitAll(); + shutdownFlag=true; + return true; + } + return false; } - + /** Push a data item onto the back of the queue */ void push(const T& data) { Mutex::ScopedLock l(lock); queue.push_back(data); + lock.notify(); } /** If the queue is non-empty, pop the front item into data and * return true. If the queue is empty, return false */ - bool pop(T& data) { + bool tryPop(T& data) { Mutex::ScopedLock l(lock); - return popInternal(data); + if (shutdownFlag || queue.empty()) + return false; + data = queue.front(); + queue.pop_front(); + return true; } - /** Wait up to deadline for a data item to be available. - *@return true if data was available, false if timed out. + /** Wait up to a timeout for a data item to be available. + *@return true if data was available, false if timed out or shut down. *@throws ShutdownException if the queue is destroyed. */ - bool waitPop(T& data, Duration timeout) { - Mutex::ScopedLock l(lock); - ScopedIncrement<size_t> w( - waiters, boost::bind(&ConcurrentQueue::noWaiters, this)); + bool waitPop(T& data, Duration timeout=TIME_INFINITE) { + ScopedLock l(lock); AbsTime deadline(now(), timeout); - while (queue.empty() && lock.wait(deadline)) - ; - return popInternal(data); - } - - private: - - bool popInternal(T& data) { - if (shutdown) - throw ShutdownException(); + { + ScopedWait(*this); + while (!shutdownFlag && queue.empty()) + if (!lock.wait(deadline)) + return false; + } if (queue.empty()) return false; - else { - data = queue.front(); - queue.pop_front(); - return true; - } + data = queue.front(); + queue.pop_front(); + return true; } + + bool isShutdown() { ScopedLock l(lock); return shutdownFlag; } - void noWaiters() { - assert(waiters == 0); - if (shutdown) - lock.notify(); // Notify dtor thread. - } - - Monitor lock; + protected: + Waitable lock; + private: std::deque<T> queue; - size_t waiters; - bool shutdown; + bool shutdownFlag; }; }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/ScopedIncrement.h b/cpp/src/qpid/sys/ScopedIncrement.h index ba9e89ba5f..8645ab2484 100644 --- a/cpp/src/qpid/sys/ScopedIncrement.h +++ b/cpp/src/qpid/sys/ScopedIncrement.h @@ -30,17 +30,17 @@ namespace sys { * Optionally call a function if the decremented counter value is 0. * Note the function must not throw, it is called in the destructor. */ -template <class T> +template <class T, class F=boost::function<void()> > class ScopedIncrement : boost::noncopyable { public: - ScopedIncrement(T& c, boost::function0<void> f=0) + ScopedIncrement(T& c, F f=0) : count(c), callback(f) { ++count; } ~ScopedIncrement() { if (--count == 0 && callback) callback(); } private: T& count; - boost::function0<void> callback; + F callback; }; diff --git a/cpp/src/qpid/sys/Serializer.h b/cpp/src/qpid/sys/Serializer.h index 085d51d7e2..7bb3b07ae0 100644 --- a/cpp/src/qpid/sys/Serializer.h +++ b/cpp/src/qpid/sys/Serializer.h @@ -23,7 +23,7 @@ * */ - +#include "qpid/Exception.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Thread.h" @@ -41,6 +41,8 @@ class SerializerBase : private boost::noncopyable, private Runnable { public: typedef boost::function<void()> VoidFn0; + struct ShutdownException : public Exception {}; + /** @see Serializer::Serializer */ SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0()); diff --git a/cpp/src/qpid/sys/StateMonitor.h b/cpp/src/qpid/sys/StateMonitor.h new file mode 100644 index 0000000000..5a92756f3a --- /dev/null +++ b/cpp/src/qpid/sys/StateMonitor.h @@ -0,0 +1,78 @@ +#ifndef QPID_SYS_STATEMONITOR_H +#define QPID_SYS_STATEMONITOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Waitable.h" + +#include <bitset> + +namespace qpid { +namespace sys { + +/** + * A monitor with an enum state value. + * + *@param Enum: enum type to use for states. + *@param EnumMax: Highest enum value. + */ +template <class Enum, size_t MaxEnum> +class StateMonitor : public Waitable +{ + public: + struct Set : public std::bitset<MaxEnum + 1> { + Set() {} + Set(Enum s) { set(s); } + Set(Enum s, Enum t) { set(s).set(t); } + Set(Enum s, Enum t, Enum u) { set(s).set(t).set(u); } + Set(Enum s, Enum t, Enum u, Enum v) { set(s).set(t).set(u).set(v); } + }; + + + StateMonitor(Enum initial) { state=initial; } + + /** @pre Caller holds a ScopedLock. */ + void set(Enum s) { state=s; notifyAll(); } + /** @pre Caller holds a ScopedLock. */ + StateMonitor& operator=(Enum s) { set(s); return *this; } + + /** @pre Caller holds a ScopedLock. */ + Enum get() const { return state; } + /** @pre Caller holds a ScopedLock. */ + operator Enum() const { return state; } + + /** @pre Caller holds a ScopedLock */ + void waitFor(Enum s) { ScopedWait(*this); while (s != state) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitFor(Set s) { ScopedWait(*this); while (!s.test(state)) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitNot(Enum s) { ScopedWait(*this); while (s == state) wait(); } + /** @pre Caller holds a ScopedLock */ + void waitNot(Set s) { ScopedWait(*this); while (s.test(state)) wait(); } + + private: + Enum state; +}; + +}} + + +#endif /*!QPID_SYS_STATEMONITOR_H*/ diff --git a/cpp/src/qpid/sys/Waitable.h b/cpp/src/qpid/sys/Waitable.h new file mode 100644 index 0000000000..eb71a1d742 --- /dev/null +++ b/cpp/src/qpid/sys/Waitable.h @@ -0,0 +1,73 @@ +#ifndef QPID_SYS_WAITABLE_H +#define QPID_SYS_WAITABLE_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Monitor.h" + +#include <assert.h> + +namespace qpid { +namespace sys { + +/** + * A monitor that keeps track of waiting threads. + * Threads that use a WaitLock are counted as waiters, threads that + * use a normal ScopedLock are not considered waiters. + */ +class Waitable : public Monitor { + public: + Waitable() : waiters(0) {} + + /** Use this inside a scoped lock around the + * call to Monitor::wait to be counted as a waiter + */ + struct ScopedWait { + Waitable& w; + ScopedWait(Waitable& w_) : w(w_) { ++w.waiters; } + ~ScopedWait() { --w.waiters; w.notifyAll(); } + }; + + /** Block till all waiters have finished waiting. + * The calling thread does not count as a waiter. + *@pre Must be called inside a ScopedLock but NOT a ScopedWait. + */ + bool waitAll(Duration timeout=TIME_INFINITE) { + AbsTime deadline(now(), timeout); + while (waiters > 0) { + if (!wait(deadline)) { + assert(timeout != TIME_INFINITE); + return false; + } + } + return true; + } + + private: + friend struct ScopedWait; + size_t waiters; +}; + +}} // namespace qpid::sys + + + +#endif /*!QPID_SYS_WAITABLE_H*/ diff --git a/cpp/src/qpid/sys/apr/APRBase.cpp b/cpp/src/qpid/sys/apr/APRBase.cpp index f527e0d0b2..724c489303 100644 --- a/cpp/src/qpid/sys/apr/APRBase.cpp +++ b/cpp/src/qpid/sys/apr/APRBase.cpp @@ -20,7 +20,6 @@ */ #include <iostream> #include "qpid/log/Statement.h" -#include "qpid/QpidError.h" #include "APRBase.h" using namespace qpid::sys; diff --git a/cpp/src/qpid/sys/apr/APRBase.h b/cpp/src/qpid/sys/apr/APRBase.h index c6b1854fb1..7b5644a129 100644 --- a/cpp/src/qpid/sys/apr/APRBase.h +++ b/cpp/src/qpid/sys/apr/APRBase.h @@ -24,7 +24,6 @@ #include <string> #include <apr_thread_mutex.h> #include <apr_errno.h> -#include "qpid/QpidError.h" namespace qpid { namespace sys { @@ -64,11 +63,8 @@ namespace sys { // Inlined as it is called *a lot* void inline qpid::sys::check(apr_status_t status, const char* file, const int line){ if (status != APR_SUCCESS){ - const int size = 50; - char tmp[size]; - std::string msg(apr_strerror(status, tmp, size)); - throw qpid::QpidError(APR_ERROR + ((int) status), msg, - qpid::SrcLine(file, line)); + char tmp[256]; + throw Exception(QPID_MSG(apr_strerror(status, tmp, size))) } } diff --git a/cpp/src/qpid/sys/posix/Shlib.cpp b/cpp/src/qpid/sys/posix/Shlib.cpp index 2630337408..1552aa06b5 100644 --- a/cpp/src/qpid/sys/posix/Shlib.cpp +++ b/cpp/src/qpid/sys/posix/Shlib.cpp @@ -19,8 +19,7 @@ */ #include "qpid/sys/Shlib.h" - -#include <qpid/QpidError.h> +#include "qpid/Exception.h" #include <dlfcn.h> @@ -32,7 +31,7 @@ void Shlib::load(const char* name) { handle = ::dlopen(name, RTLD_NOW); const char* error = ::dlerror(); if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); } } @@ -42,7 +41,7 @@ void Shlib::unload() { ::dlclose(handle); const char* error = ::dlerror(); if (error) { - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); } handle = 0; } @@ -53,7 +52,7 @@ void* Shlib::getSymbol(const char* name) { void* sym = ::dlsym(handle, name); const char* error = ::dlerror(); if (error) - THROW_QPID_ERROR(INTERNAL_ERROR, error); + throw Exception(QPID_MSG(error)); return sym; } diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index c7a83df581..f0cc8cd5a5 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -21,7 +21,6 @@ #include "qpid/sys/Socket.h" -#include "qpid/QpidError.h" #include "check.h" #include "PrivatePosix.h" @@ -60,8 +59,7 @@ std::string SocketPrivate::getName(bool local, bool includeService) const result = ::getpeername(fd, (::sockaddr*)&name, &namelen); } - if (result < 0) - throw QPID_POSIX_ERROR(errno); + QPID_POSIX_CHECK(result); char servName[NI_MAXSERV]; char dispName[NI_MAXHOST]; diff --git a/cpp/src/qpid/sys/posix/check.cpp b/cpp/src/qpid/sys/posix/check.cpp deleted file mode 100644 index 408679caa8..0000000000 --- a/cpp/src/qpid/sys/posix/check.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <cerrno> -#include "check.h" - -namespace qpid { -namespace sys { - -std::string -PosixError::getMessage(int errNo) -{ - char buf[512]; - return std::string(strerror_r(errNo, buf, sizeof(buf))); -} - -PosixError::PosixError(int errNo, const qpid::SrcLine& loc) throw() - : qpid::QpidError(INTERNAL_ERROR + errNo, getMessage(errNo), loc) -{ } - -}} diff --git a/cpp/src/qpid/sys/posix/check.h b/cpp/src/qpid/sys/posix/check.h index 7fa7b69d3b..f864bf8762 100644 --- a/cpp/src/qpid/sys/posix/check.h +++ b/cpp/src/qpid/sys/posix/check.h @@ -22,41 +22,13 @@ * */ -#include <cerrno> -#include <string> -#include "qpid/QpidError.h" - -namespace qpid { -namespace sys { - -/** - * Exception with message from errno. - */ -class PosixError : public qpid::QpidError -{ - public: - static std::string getMessage(int errNo); - - PosixError(int errNo, const qpid::SrcLine& location) throw(); - - ~PosixError() throw() {} - - int getErrNo() { return errNo; } +#include "qpid/Exception.h" - Exception::auto_ptr clone() const throw() { return Exception::auto_ptr(new PosixError(*this)); } - - void throwSelf() const { throw *this; } - - private: - int errNo; -}; - -}} +#include <cerrno> -/** Create a PosixError for the current file/line and errno. */ -#define QPID_POSIX_ERROR(errNo) ::qpid::sys::PosixError(errNo, SRCLINE) +#define QPID_POSIX_ERROR(ERRNO) qpid::Exception(QPID_MSG(qpid::strError(ERRNO)) << " " << ERRNO) -/** Throw QPID_POSIX_ERROR(errno) if RESULT is less than zero */ +/** THROW QPID_POSIX_ERROR(errno) if RESULT is less than zero */ #define QPID_POSIX_CHECK(RESULT) \ if ((RESULT) < 0) throw QPID_POSIX_ERROR((errno)) |
