diff options
author | Gordon Sim <gsim@apache.org> | 2008-11-06 22:08:14 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2008-11-06 22:08:14 +0000 |
commit | 786fa5f11c99e17db30f59a8303eb8a3470cbba6 (patch) | |
tree | 3ec54714c7564cf4e8d65162d8d9eb512d30047e | |
parent | 460891bba8d8dd8bd47fedd44311778af6ba7d04 (diff) | |
download | qpid-python-786fa5f11c99e17db30f59a8303eb8a3470cbba6.tar.gz |
Restrict connection close codes to the set defined in the spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@711989 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/rubygen/framing.0-10/constants.rb | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/Exception.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/Exception.h | 98 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 20 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 56 |
18 files changed, 104 insertions, 166 deletions
diff --git a/qpid/cpp/rubygen/framing.0-10/constants.rb b/qpid/cpp/rubygen/framing.0-10/constants.rb index d8f8354dff..d07c84e63a 100755 --- a/qpid/cpp/rubygen/framing.0-10/constants.rb +++ b/qpid/cpp/rubygen/framing.0-10/constants.rb @@ -127,19 +127,20 @@ EOS } end - def declare_exception(c, base, package) + def declare_exception(c, base, package, enum) name=c.name.caps+"Exception" + value="#{package}::#{enum.parent.name.shout}_#{c.name.shout}" genl doxygen_comment { genl c.doc } struct(c.name.caps+"Exception", base) { genl "std::string getPrefix() const { return \"#{c.name}\"; }" - genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"\"+msg) {}" + genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{value}, \"\"+msg) {}" } end def declare_exceptions(class_name, domain_name, base) enum = @amqp.class_(class_name).domain(domain_name).enum - enum.choices.each { |c| declare_exception(c, base, class_name) unless c.name == "normal" } + enum.choices.each { |c| declare_exception(c, base, class_name, enum) unless c.name == "normal" } genl genl "sys::ExceptionHolder create#{base}(int code, const std::string& text);" end @@ -163,6 +164,7 @@ EOS h_file("#{@dir}/reply_exceptions") { include "qpid/Exception" include "qpid/sys/ExceptionHolder" + include "enum" namespace(@namespace) { declare_exceptions("execution", "error-code", "SessionException") declare_exceptions("connection", "close-code", "ConnectionException") diff --git a/qpid/cpp/src/qpid/Exception.h b/qpid/cpp/src/qpid/Exception.h index 57e7c682eb..86bf8fbc4a 100644 --- a/qpid/cpp/src/qpid/Exception.h +++ b/qpid/cpp/src/qpid/Exception.h @@ -24,6 +24,7 @@ #include "qpid/framing/amqp_types.h" #include "qpid/framing/constants.h" +#include "qpid/framing/enum.h" #include "qpid/sys/StrError.h" #include "qpid/Msg.h" @@ -58,20 +59,20 @@ struct ErrnoException : public Exception { }; struct SessionException : public Exception { - const framing::ReplyCode code; - SessionException(framing::ReplyCode code_, const std::string& message) + const framing::execution::ErrorCode code; + SessionException(framing::execution::ErrorCode code_, const std::string& message) : Exception(message), code(code_) {} }; struct ChannelException : public Exception { - const framing::ReplyCode code; - ChannelException(framing::ReplyCode _code, const std::string& message) + const framing::session::DetachCode code; + ChannelException(framing::session::DetachCode _code, const std::string& message) : Exception(message), code(_code) {} }; struct ConnectionException : public Exception { - const framing::ReplyCode code; - ConnectionException(framing::ReplyCode _code, const std::string& message) + const framing::connection::CloseCode code; + ConnectionException(framing::connection::CloseCode _code, const std::string& message) : Exception(message), code(_code) {} }; diff --git a/qpid/cpp/src/qpid/amqp_0_10/Exception.h b/qpid/cpp/src/qpid/amqp_0_10/Exception.h index 4841d91215..6d526c1706 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Exception.h +++ b/qpid/cpp/src/qpid/amqp_0_10/Exception.h @@ -32,12 +32,12 @@ namespace amqp_0_10 { * Raised when the connection is unexpectedly closed. Sessions with * non-0 timeout may be available for re-attachment on another connection. */ -struct ConnectionException : public qpid::ConnectionException { +struct ConnectionException : public qpid::Exception { // FIXME aconway 2008-04-04: Merge qpid::ConnectionException // into this when the old code is removed. typedef connection::CloseCode Code; ConnectionException(Code c, const std::string m) - : qpid::ConnectionException(c,m), code(c) {} + : qpid::Exception(m), code(c) {} Code code; }; @@ -45,10 +45,10 @@ struct ConnectionException : public qpid::ConnectionException { * Raised when a session is unexpectedly detached for any reason, or * if an attempt is made to use a session that is not attached. */ -struct SessionException : public qpid::SessionException { +struct SessionException : public qpid::Exception { // FIXME aconway 2008-04-04: should not have a code at this level. // Leave in place till old preview code is gone. - SessionException(int code, const std::string& msg) : qpid::SessionException(code, msg) {} + SessionException(int /*code*/, const std::string& msg) : qpid::Exception(msg) {} }; /** Raised when the state of a session has been destroyed */ @@ -94,93 +94,3 @@ struct SessionDetachedException : public SessionException { }} // namespace qpid::amqp_0_10 #endif /*!QPID_AMQP_0_10_EXCEPTION_H*/ -#ifndef QPID_AMQP_0_10_EXCEPTION_H -#define QPID_AMQP_0_10_EXCEPTION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/Exception.h" -#include "qpid/amqp_0_10/specification_fwd.h" - -namespace qpid { -namespace amqp_0_10 { - -/** - * Raised when the connection is unexpectedly closed. Sessions with - * non-0 timeout may be available for re-attachment on another connection. - */ -struct ConnectionException : public Exception { - typedef connection::CloseCode Code; - ConnectionException(Code c, const std::string m) - : Exception(m), code(c) {} - Code code; -}; - -/** - * Raised when a session is unexpectedly detached for any reason, or - * if an attempt is made to use a session that is not attached. - */ -struct SessionException : public Exception { - SessionException(const std::string& msg) : Exception(msg) {} -}; - -/** Raised when the state of a session has been destroyed */ -struct SessionDestroyedException : public SessionException { - SessionDestroyedException(const std::string& msg) : SessionException(msg){} -}; - -/** Raised when a session is destroyed due to an execution.exception */ -struct SessionAbortedException : public SessionDestroyedException { - typedef execution::ErrorCode Code; - SessionAbortedException(Code c, const std::string m) - : SessionDestroyedException(m), code(c) {} - Code code; -}; - -/** - * Raised when a session with 0 timeout is unexpectedly detached - * and therefore expires and is destroyed. - */ -struct SessionExpiredException : public SessionDestroyedException { - typedef session::DetachCode Code; - SessionExpiredException(Code c, const std::string m) - : SessionDestroyedException(m), code(c) {} - Code code; -}; - -/** - * Raised when a session with non-0 timeout is unexpectedly detached - * or if an attempt is made to use a session that is not attached. - * - * The session is not necessarily destroyed, it may be possible to - * re-attach. - */ -struct SessionDetachedException : public SessionException { - typedef session::DetachCode Code; - SessionDetachedException(Code c, const std::string m) - : SessionException(m), code(c) {} - Code code; -}; - -}} // namespace qpid::amqp_0_10 - -#endif /*!QPID_AMQP_0_10_EXCEPTION_H*/ diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 8f48e72c2d..0e57e4b3f1 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -48,6 +48,17 @@ bool isSessionDetachedControl(AMQMethodBody* m) { return isSessionControl(m) && m->amqpMethodId() == SESSION_DETACHED_METHOD_ID; } + +session::DetachCode convert(uint8_t code) { + switch(code) { + case 0: return session::DETACH_CODE_NORMAL; + case 1: return session::DETACH_CODE_SESSION_BUSY; + case 2: return session::DETACH_CODE_TRANSPORT_BUSY; + case 3: return session::DETACH_CODE_NOT_ATTACHED; + case 4: default: return session::DETACH_CODE_UNKNOWN_IDS; + } +} + } // namespace void SessionHandler::checkAttached() { @@ -167,7 +178,7 @@ void SessionHandler::detached(const std::string& name, uint8_t code) { checkName(name); ignoring = false; if (code != session::DETACH_CODE_NORMAL) - channelException(code, "session.detached from peer."); + channelException(convert(code), "session.detached from peer."); else { handleDetach(); } diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h index 7628cc88ae..016de454cc 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h @@ -85,8 +85,8 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler, virtual void invoke(const framing::AMQMethodBody& m); virtual void setState(const std::string& sessionName, bool force) = 0; - virtual void channelException(uint16_t code, const std::string& msg) = 0; - virtual void connectionException(uint16_t code, const std::string& msg) = 0; + virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0; + virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0; virtual void detaching() = 0; // Notification of events diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 28ab6cbf1b..4dbfb153f2 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -164,11 +164,10 @@ void Connection::setFederationLink(bool b) mgmtObject->set_federationLink(b); } -void Connection::close( - ReplyCode code, const string& text, ClassId classId, MethodId methodId) +void Connection::close(connection::CloseCode code, const string& text) { - QPID_LOG_IF(error, code != 200, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); - adapter.close(code, text, classId, methodId); + QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")"); + adapter.close(code, text); //make sure we delete dangling pointers from outputTasks before deleting sessions outputTasks.removeAll(); channels.clear(); @@ -177,7 +176,7 @@ void Connection::close( // Send a close to the client but keep the channels. Used by cluster. void Connection::sendClose() { - adapter.close(200, "OK", 0, 0); + adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); getOutput().close(); } @@ -212,14 +211,14 @@ bool Connection::doOutput() { ioCallback = 0; if (mgmtClosing) - close(execution::ERROR_CODE_UNAUTHORIZED_ACCESS, "Closed by Management Request", 0, 0); + close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request"); else //then do other output as needed: return outputTasks.doOutput(); }catch(ConnectionException& e){ - close(e.code, e.getMessage(), 0, 0); + close(e.code, e.getMessage()); }catch(std::exception& e){ - close(execution::ERROR_CODE_INTERNAL_ERROR, e.what(), 0, 0); + close(connection::CLOSE_CODE_CONNECTION_FORCED, e.what()); } return false; } diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 87cda02971..8ab58c1067 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -69,10 +69,7 @@ class Connection : public sys::ConnectionInputHandler, SessionHandler& getChannel(framing::ChannelId channel); /** Close the connection */ - void close(framing::ReplyCode code = 403, - const string& text = string(), - framing::ClassId classId = 0, - framing::MethodId methodId = 0); + void close(framing::connection::CloseCode code, const string& text); // ConnectionInputHandler methods void received(framing::AMQFrame& frame); diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index c55db2c339..8639b7949e 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -43,7 +43,7 @@ const std::string QPID_FED_LINK = "qpid.fed_link"; const std::string QPID_FED_TAG = "qpid.federation_tag"; } -void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId) +void ConnectionHandler::close(connection::CloseCode code, const string& text) { handler->client.close(code, text); } diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index 9d8a091f21..d3d5965dfc 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -29,6 +29,7 @@ #include "qpid/framing/AMQP_ClientProxy.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/framing/enum.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" @@ -84,7 +85,7 @@ class ConnectionHandler : public framing::FrameHandler std::auto_ptr<Handler> handler; public: ConnectionHandler(Connection& connection, bool isClient); - void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); + void close(framing::connection::CloseCode code, const std::string& text); void handle(framing::AMQFrame& frame); }; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 4f9b8bc104..d2886050dd 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -26,6 +26,7 @@ #include "qpid/agent/ManagementAgent.h" #include "boost/bind.hpp" #include "qpid/log/Statement.h" +#include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" #include "AclModule.h" @@ -33,6 +34,7 @@ using namespace qpid::broker; using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::framing::NotAllowedException; +using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -78,7 +80,7 @@ Link::Link(LinkRegistry* _links, Link::~Link () { if (state == STATE_OPERATIONAL && connection != 0) - connection->close(); + connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -169,7 +171,7 @@ void Link::destroy () QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); if (connection) - connection->close(403, "closed by management"); + connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); setStateLH(STATE_CLOSED); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index d66157d8d4..15a47de45d 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -22,7 +22,6 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/enum.h" #include "qpid/log/Statement.h" -#include "qpid/amqp_0_10/exceptions.h" #include "qpid/framing/SequenceSet.h" #include "qpid/agent/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" @@ -405,7 +404,6 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse throw NotAllowedException("ACL denied queue delete request"); } - ChannelException error(0, ""); Queue::shared_ptr q = getQueue(queue); if(ifEmpty && q->getMessageCount() > 0){ throw PreconditionFailedException("Queue not empty."); @@ -731,11 +729,11 @@ void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const { Queue::shared_ptr queue; if (name.empty()) { - throw amqp_0_10::IllegalArgumentException(QPID_MSG("No queue name specified.")); + throw framing::IllegalArgumentException(QPID_MSG("No queue name specified.")); } else { queue = session.getBroker().getQueues().find(name); if (!queue) - throw amqp_0_10::NotFoundException(QPID_MSG("Queue not found: "<<name)); + throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name)); } return queue; } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 0f67c1e0c5..163102d008 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -44,12 +44,12 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; } MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; } } // namespace -void SessionHandler::channelException(uint16_t, const std::string&) { - handleDetach(); +void SessionHandler::channelException(framing::session::DetachCode, const std::string&) { + handleDetach(); } -void SessionHandler::connectionException(uint16_t code, const std::string& msg) { - connection.close(code, msg, 0, 0); +void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) { + connection.close(code, msg); } ConnectionState& SessionHandler::getConnection() { return connection; } diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index a8f741bc1b..7449db1560 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -63,8 +63,8 @@ class SessionHandler : public amqp_0_10::SessionHandler { virtual void setState(const std::string& sessionName, bool force); virtual qpid::SessionState* getState(); virtual framing::FrameHandler* getInHandler(); - virtual void channelException(uint16_t code, const std::string& msg); - virtual void connectionException(uint16_t code, const std::string& msg); + virtual void channelException(framing::session::DetachCode code, const std::string& msg); + virtual void connectionException(framing::connection::CloseCode code, const std::string& msg); virtual void detaching(); virtual void readyToSend(); diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index efff4027aa..fc53499fc5 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -30,6 +30,7 @@ using namespace qpid::client; using namespace qpid::framing; +using namespace qpid::framing::connection; namespace { const std::string OK("OK"); @@ -40,10 +41,23 @@ const std::string INVALID_STATE_START("start received in invalid state"); const std::string INVALID_STATE_TUNE("tune received in invalid state"); const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); + +} + +CloseCode ConnectionHandler::convert(uint16_t replyCode) +{ + switch (replyCode) { + case 200: return CLOSE_CODE_NORMAL; + case 320: return CLOSE_CODE_CONNECTION_FORCED; + case 402: return CLOSE_CODE_INVALID_PATH; + case 501: default: + return CLOSE_CODE_FRAMING_ERROR; + } } ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v) - : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(200), version(v) + : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), + errorCode(CLOSE_CODE_NORMAL), version(v) { insist = true; @@ -125,7 +139,7 @@ void ConnectionHandler::checkState(STATES s, const std::string& msg) void ConnectionHandler::fail(const std::string& message) { - errorCode = 502; + errorCode = CLOSE_CODE_FRAMING_ERROR; errorText = message; QPID_LOG(warning, message); setState(FAILED); @@ -177,7 +191,7 @@ void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*kno void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText) { proxy.closeOk(); - errorCode = replyCode; + errorCode = convert(replyCode); errorText = replyText; setState(CLOSED); QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText); diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h index 28ca875ace..12323684a5 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h @@ -29,6 +29,7 @@ #include "qpid/framing/AMQP_ClientOperations.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/Array.h" +#include "qpid/framing/enum.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/InputHandler.h" @@ -57,7 +58,7 @@ class ConnectionHandler : private StateManager, Adapter outHandler; framing::AMQP_ServerProxy::Connection proxy; - uint16_t errorCode; + framing::connection::CloseCode errorCode; std::string errorText; bool insist; framing::ProtocolVersion version; @@ -106,6 +107,8 @@ public: ErrorListener onError; std::vector<Url> knownBrokersUrls; + + static framing::connection::CloseCode convert(uint16_t replyCode); }; }} diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index ed296cbd4d..b284fb6312 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -150,7 +150,7 @@ template <class F> void ConnectionImpl::closeInternal(const F& f) { void ConnectionImpl::closed(uint16_t code, const std::string& text) { Mutex::ScopedLock l(lock); - setException(new ConnectionException(code, text)); + setException(new ConnectionException(ConnectionHandler::convert(code), text)); closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index bcdc4ffe27..ddfba03850 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -125,7 +125,7 @@ bool Connection::checkUnsupported(const AMQBody& body) { if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster."; } if (!message.empty()) - connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0); + connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message); return !message.empty(); } diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index ab9f146fd7..e854510eba 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -65,7 +65,7 @@ public: Message& getMessage() { return msg; } }; -intrusive_ptr<Message> message(std::string exchange, std::string routingKey) { +intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) { intrusive_ptr<Message> msg(new Message()); AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); AMQFrame header(in_place<AMQHeaderBody>()); @@ -86,7 +86,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { //Test basic delivery: - intrusive_ptr<Message> msg1 = message("e", "A"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); msg1->enqueueAsync();//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); @@ -101,7 +101,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { QPID_AUTO_TEST_CASE(testAsyncMessageCount){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); - intrusive_ptr<Message> msg1 = message("e", "A"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); msg1->enqueueAsync();//this is done on enqueue which is not called from process queue->process(msg1); @@ -125,9 +125,9 @@ QPID_AUTO_TEST_CASE(testConsumers){ BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); //Test basic delivery: - intrusive_ptr<Message> msg1 = message("e", "A"); - intrusive_ptr<Message> msg2 = message("e", "B"); - intrusive_ptr<Message> msg3 = message("e", "C"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); @@ -171,9 +171,9 @@ QPID_AUTO_TEST_CASE(testRegistry){ QPID_AUTO_TEST_CASE(testDequeue){ Queue::shared_ptr queue(new Queue("my_queue", true)); - intrusive_ptr<Message> msg1 = message("e", "A"); - intrusive_ptr<Message> msg2 = message("e", "B"); - intrusive_ptr<Message> msg3 = message("e", "C"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); intrusive_ptr<Message> received; queue->deliver(msg1); @@ -247,9 +247,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); - intrusive_ptr<Message> msg1 = message("e", "A"); - intrusive_ptr<Message> msg2 = message("e", "B"); - intrusive_ptr<Message> msg3 = message("e", "C"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -300,7 +300,7 @@ QPID_AUTO_TEST_CASE(testOptimisticConsume){ Queue::shared_ptr queue(new Queue("my-queue", true, &store)); queue->setLastNodeFailure(); - intrusive_ptr<Message> msg1 = message("e", "A"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); msg1->forcePersistent(); //change mode @@ -330,10 +330,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = message("e", "A"); - intrusive_ptr<Message> msg2 = message("e", "B"); - intrusive_ptr<Message> msg3 = message("e", "C"); - intrusive_ptr<Message> msg4 = message("e", "D"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg4 = create_message("e", "D"); intrusive_ptr<Message> received; //set deliever match for LVQ a,b,c,a @@ -365,9 +365,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ received = queue->get().payload; BOOST_CHECK_EQUAL(msg3.get(), received.get()); - intrusive_ptr<Message> msg5 = message("e", "A"); - intrusive_ptr<Message> msg6 = message("e", "B"); - intrusive_ptr<Message> msg7 = message("e", "C"); + intrusive_ptr<Message> msg5 = create_message("e", "A"); + intrusive_ptr<Message> msg6 = create_message("e", "B"); + intrusive_ptr<Message> msg7 = create_message("e", "C"); msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a"); msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b"); msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c"); @@ -397,11 +397,11 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = message("e", "A"); - intrusive_ptr<Message> msg2 = message("e", "B"); - intrusive_ptr<Message> msg3 = message("e", "C"); - intrusive_ptr<Message> msg4 = message("e", "D"); - intrusive_ptr<Message> msg5 = message("e", "F"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg4 = create_message("e", "D"); + intrusive_ptr<Message> msg5 = create_message("e", "F"); //set deliever match for LVQ a,b,c,a @@ -450,8 +450,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ queue1->configure(args); queue2->configure(args); - intrusive_ptr<Message> msg1 = message("e", "A"); - intrusive_ptr<Message> msg2 = message("e", "A"); + intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg2 = create_message("e", "A"); string key; args.getLVQKey(key); @@ -475,7 +475,7 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { - intrusive_ptr<Message> m = message("exchange", "key"); + intrusive_ptr<Message> m = create_message("exchange", "key"); if (i % 2) { if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl); } else { |