summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-11-06 22:08:14 +0000
committerGordon Sim <gsim@apache.org>2008-11-06 22:08:14 +0000
commit786fa5f11c99e17db30f59a8303eb8a3470cbba6 (patch)
tree3ec54714c7564cf4e8d65162d8d9eb512d30047e
parent460891bba8d8dd8bd47fedd44311778af6ba7d04 (diff)
downloadqpid-python-786fa5f11c99e17db30f59a8303eb8a3470cbba6.tar.gz
Restrict connection close codes to the set defined in the spec
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@711989 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/rubygen/framing.0-10/constants.rb8
-rw-r--r--qpid/cpp/src/qpid/Exception.h13
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/Exception.h98
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp13
-rw-r--r--qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h4
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp20
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.h5
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp2
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp56
18 files changed, 104 insertions, 166 deletions
diff --git a/qpid/cpp/rubygen/framing.0-10/constants.rb b/qpid/cpp/rubygen/framing.0-10/constants.rb
index d8f8354dff..d07c84e63a 100755
--- a/qpid/cpp/rubygen/framing.0-10/constants.rb
+++ b/qpid/cpp/rubygen/framing.0-10/constants.rb
@@ -127,19 +127,20 @@ EOS
}
end
- def declare_exception(c, base, package)
+ def declare_exception(c, base, package, enum)
name=c.name.caps+"Exception"
+ value="#{package}::#{enum.parent.name.shout}_#{c.name.shout}"
genl
doxygen_comment { genl c.doc }
struct(c.name.caps+"Exception", base) {
genl "std::string getPrefix() const { return \"#{c.name}\"; }"
- genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"\"+msg) {}"
+ genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{value}, \"\"+msg) {}"
}
end
def declare_exceptions(class_name, domain_name, base)
enum = @amqp.class_(class_name).domain(domain_name).enum
- enum.choices.each { |c| declare_exception(c, base, class_name) unless c.name == "normal" }
+ enum.choices.each { |c| declare_exception(c, base, class_name, enum) unless c.name == "normal" }
genl
genl "sys::ExceptionHolder create#{base}(int code, const std::string& text);"
end
@@ -163,6 +164,7 @@ EOS
h_file("#{@dir}/reply_exceptions") {
include "qpid/Exception"
include "qpid/sys/ExceptionHolder"
+ include "enum"
namespace(@namespace) {
declare_exceptions("execution", "error-code", "SessionException")
declare_exceptions("connection", "close-code", "ConnectionException")
diff --git a/qpid/cpp/src/qpid/Exception.h b/qpid/cpp/src/qpid/Exception.h
index 57e7c682eb..86bf8fbc4a 100644
--- a/qpid/cpp/src/qpid/Exception.h
+++ b/qpid/cpp/src/qpid/Exception.h
@@ -24,6 +24,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/constants.h"
+#include "qpid/framing/enum.h"
#include "qpid/sys/StrError.h"
#include "qpid/Msg.h"
@@ -58,20 +59,20 @@ struct ErrnoException : public Exception {
};
struct SessionException : public Exception {
- const framing::ReplyCode code;
- SessionException(framing::ReplyCode code_, const std::string& message)
+ const framing::execution::ErrorCode code;
+ SessionException(framing::execution::ErrorCode code_, const std::string& message)
: Exception(message), code(code_) {}
};
struct ChannelException : public Exception {
- const framing::ReplyCode code;
- ChannelException(framing::ReplyCode _code, const std::string& message)
+ const framing::session::DetachCode code;
+ ChannelException(framing::session::DetachCode _code, const std::string& message)
: Exception(message), code(_code) {}
};
struct ConnectionException : public Exception {
- const framing::ReplyCode code;
- ConnectionException(framing::ReplyCode _code, const std::string& message)
+ const framing::connection::CloseCode code;
+ ConnectionException(framing::connection::CloseCode _code, const std::string& message)
: Exception(message), code(_code) {}
};
diff --git a/qpid/cpp/src/qpid/amqp_0_10/Exception.h b/qpid/cpp/src/qpid/amqp_0_10/Exception.h
index 4841d91215..6d526c1706 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/Exception.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/Exception.h
@@ -32,12 +32,12 @@ namespace amqp_0_10 {
* Raised when the connection is unexpectedly closed. Sessions with
* non-0 timeout may be available for re-attachment on another connection.
*/
-struct ConnectionException : public qpid::ConnectionException {
+struct ConnectionException : public qpid::Exception {
// FIXME aconway 2008-04-04: Merge qpid::ConnectionException
// into this when the old code is removed.
typedef connection::CloseCode Code;
ConnectionException(Code c, const std::string m)
- : qpid::ConnectionException(c,m), code(c) {}
+ : qpid::Exception(m), code(c) {}
Code code;
};
@@ -45,10 +45,10 @@ struct ConnectionException : public qpid::ConnectionException {
* Raised when a session is unexpectedly detached for any reason, or
* if an attempt is made to use a session that is not attached.
*/
-struct SessionException : public qpid::SessionException {
+struct SessionException : public qpid::Exception {
// FIXME aconway 2008-04-04: should not have a code at this level.
// Leave in place till old preview code is gone.
- SessionException(int code, const std::string& msg) : qpid::SessionException(code, msg) {}
+ SessionException(int /*code*/, const std::string& msg) : qpid::Exception(msg) {}
};
/** Raised when the state of a session has been destroyed */
@@ -94,93 +94,3 @@ struct SessionDetachedException : public SessionException {
}} // namespace qpid::amqp_0_10
#endif /*!QPID_AMQP_0_10_EXCEPTION_H*/
-#ifndef QPID_AMQP_0_10_EXCEPTION_H
-#define QPID_AMQP_0_10_EXCEPTION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/Exception.h"
-#include "qpid/amqp_0_10/specification_fwd.h"
-
-namespace qpid {
-namespace amqp_0_10 {
-
-/**
- * Raised when the connection is unexpectedly closed. Sessions with
- * non-0 timeout may be available for re-attachment on another connection.
- */
-struct ConnectionException : public Exception {
- typedef connection::CloseCode Code;
- ConnectionException(Code c, const std::string m)
- : Exception(m), code(c) {}
- Code code;
-};
-
-/**
- * Raised when a session is unexpectedly detached for any reason, or
- * if an attempt is made to use a session that is not attached.
- */
-struct SessionException : public Exception {
- SessionException(const std::string& msg) : Exception(msg) {}
-};
-
-/** Raised when the state of a session has been destroyed */
-struct SessionDestroyedException : public SessionException {
- SessionDestroyedException(const std::string& msg) : SessionException(msg){}
-};
-
-/** Raised when a session is destroyed due to an execution.exception */
-struct SessionAbortedException : public SessionDestroyedException {
- typedef execution::ErrorCode Code;
- SessionAbortedException(Code c, const std::string m)
- : SessionDestroyedException(m), code(c) {}
- Code code;
-};
-
-/**
- * Raised when a session with 0 timeout is unexpectedly detached
- * and therefore expires and is destroyed.
- */
-struct SessionExpiredException : public SessionDestroyedException {
- typedef session::DetachCode Code;
- SessionExpiredException(Code c, const std::string m)
- : SessionDestroyedException(m), code(c) {}
- Code code;
-};
-
-/**
- * Raised when a session with non-0 timeout is unexpectedly detached
- * or if an attempt is made to use a session that is not attached.
- *
- * The session is not necessarily destroyed, it may be possible to
- * re-attach.
- */
-struct SessionDetachedException : public SessionException {
- typedef session::DetachCode Code;
- SessionDetachedException(Code c, const std::string m)
- : SessionException(m), code(c) {}
- Code code;
-};
-
-}} // namespace qpid::amqp_0_10
-
-#endif /*!QPID_AMQP_0_10_EXCEPTION_H*/
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
index 8f48e72c2d..0e57e4b3f1 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
@@ -48,6 +48,17 @@ bool isSessionDetachedControl(AMQMethodBody* m) {
return isSessionControl(m) &&
m->amqpMethodId() == SESSION_DETACHED_METHOD_ID;
}
+
+session::DetachCode convert(uint8_t code) {
+ switch(code) {
+ case 0: return session::DETACH_CODE_NORMAL;
+ case 1: return session::DETACH_CODE_SESSION_BUSY;
+ case 2: return session::DETACH_CODE_TRANSPORT_BUSY;
+ case 3: return session::DETACH_CODE_NOT_ATTACHED;
+ case 4: default: return session::DETACH_CODE_UNKNOWN_IDS;
+ }
+}
+
} // namespace
void SessionHandler::checkAttached() {
@@ -167,7 +178,7 @@ void SessionHandler::detached(const std::string& name, uint8_t code) {
checkName(name);
ignoring = false;
if (code != session::DETACH_CODE_NORMAL)
- channelException(code, "session.detached from peer.");
+ channelException(convert(code), "session.detached from peer.");
else {
handleDetach();
}
diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
index 7628cc88ae..016de454cc 100644
--- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
+++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
@@ -85,8 +85,8 @@ class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
virtual void invoke(const framing::AMQMethodBody& m);
virtual void setState(const std::string& sessionName, bool force) = 0;
- virtual void channelException(uint16_t code, const std::string& msg) = 0;
- virtual void connectionException(uint16_t code, const std::string& msg) = 0;
+ virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0;
+ virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0;
virtual void detaching() = 0;
// Notification of events
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 28ab6cbf1b..4dbfb153f2 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -164,11 +164,10 @@ void Connection::setFederationLink(bool b)
mgmtObject->set_federationLink(b);
}
-void Connection::close(
- ReplyCode code, const string& text, ClassId classId, MethodId methodId)
+void Connection::close(connection::CloseCode code, const string& text)
{
- QPID_LOG_IF(error, code != 200, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
- adapter.close(code, text, classId, methodId);
+ QPID_LOG_IF(error, code != connection::CLOSE_CODE_NORMAL, "Connection " << mgmtId << " closed by error: " << text << "(" << code << ")");
+ adapter.close(code, text);
//make sure we delete dangling pointers from outputTasks before deleting sessions
outputTasks.removeAll();
channels.clear();
@@ -177,7 +176,7 @@ void Connection::close(
// Send a close to the client but keep the channels. Used by cluster.
void Connection::sendClose() {
- adapter.close(200, "OK", 0, 0);
+ adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
getOutput().close();
}
@@ -212,14 +211,14 @@ bool Connection::doOutput() {
ioCallback = 0;
if (mgmtClosing)
- close(execution::ERROR_CODE_UNAUTHORIZED_ACCESS, "Closed by Management Request", 0, 0);
+ close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
else
//then do other output as needed:
return outputTasks.doOutput();
}catch(ConnectionException& e){
- close(e.code, e.getMessage(), 0, 0);
+ close(e.code, e.getMessage());
}catch(std::exception& e){
- close(execution::ERROR_CODE_INTERNAL_ERROR, e.what(), 0, 0);
+ close(connection::CLOSE_CODE_CONNECTION_FORCED, e.what());
}
return false;
}
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 87cda02971..8ab58c1067 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -69,10 +69,7 @@ class Connection : public sys::ConnectionInputHandler,
SessionHandler& getChannel(framing::ChannelId channel);
/** Close the connection */
- void close(framing::ReplyCode code = 403,
- const string& text = string(),
- framing::ClassId classId = 0,
- framing::MethodId methodId = 0);
+ void close(framing::connection::CloseCode code, const string& text);
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index c55db2c339..8639b7949e 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -43,7 +43,7 @@ const std::string QPID_FED_LINK = "qpid.fed_link";
const std::string QPID_FED_TAG = "qpid.federation_tag";
}
-void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, MethodId)
+void ConnectionHandler::close(connection::CloseCode code, const string& text)
{
handler->client.close(code, text);
}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
index 9d8a091f21..d3d5965dfc 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
@@ -29,6 +29,7 @@
#include "qpid/framing/AMQP_ClientProxy.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/framing/enum.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
@@ -84,7 +85,7 @@ class ConnectionHandler : public framing::FrameHandler
std::auto_ptr<Handler> handler;
public:
ConnectionHandler(Connection& connection, bool isClient);
- void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
+ void close(framing::connection::CloseCode code, const std::string& text);
void handle(framing::AMQFrame& frame);
};
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 4f9b8bc104..d2886050dd 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -26,6 +26,7 @@
#include "qpid/agent/ManagementAgent.h"
#include "boost/bind.hpp"
#include "qpid/log/Statement.h"
+#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "AclModule.h"
@@ -33,6 +34,7 @@ using namespace qpid::broker;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::framing::NotAllowedException;
+using qpid::framing::connection::CLOSE_CODE_CONNECTION_FORCED;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -78,7 +80,7 @@ Link::Link(LinkRegistry* _links,
Link::~Link ()
{
if (state == STATE_OPERATIONAL && connection != 0)
- connection->close();
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -169,7 +171,7 @@ void Link::destroy ()
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
if (connection)
- connection->close(403, "closed by management");
+ connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
setStateLH(STATE_CLOSED);
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index d66157d8d4..15a47de45d 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -22,7 +22,6 @@
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/enum.h"
#include "qpid/log/Statement.h"
-#include "qpid/amqp_0_10/exceptions.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/agent/ManagementAgent.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -405,7 +404,6 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
throw NotAllowedException("ACL denied queue delete request");
}
- ChannelException error(0, "");
Queue::shared_ptr q = getQueue(queue);
if(ifEmpty && q->getMessageCount() > 0){
throw PreconditionFailedException("Queue not empty.");
@@ -731,11 +729,11 @@ void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
Queue::shared_ptr SessionAdapter::HandlerHelper::getQueue(const string& name) const {
Queue::shared_ptr queue;
if (name.empty()) {
- throw amqp_0_10::IllegalArgumentException(QPID_MSG("No queue name specified."));
+ throw framing::IllegalArgumentException(QPID_MSG("No queue name specified."));
} else {
queue = session.getBroker().getQueues().find(name);
if (!queue)
- throw amqp_0_10::NotFoundException(QPID_MSG("Queue not found: "<<name));
+ throw framing::NotFoundException(QPID_MSG("Queue not found: "<<name));
}
return queue;
}
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 0f67c1e0c5..163102d008 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -44,12 +44,12 @@ ClassId classId(AMQMethodBody* m) { return m ? m->amqpMethodId() : 0; }
MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
-void SessionHandler::channelException(uint16_t, const std::string&) {
- handleDetach();
+void SessionHandler::channelException(framing::session::DetachCode, const std::string&) {
+ handleDetach();
}
-void SessionHandler::connectionException(uint16_t code, const std::string& msg) {
- connection.close(code, msg, 0, 0);
+void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ connection.close(code, msg);
}
ConnectionState& SessionHandler::getConnection() { return connection; }
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index a8f741bc1b..7449db1560 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -63,8 +63,8 @@ class SessionHandler : public amqp_0_10::SessionHandler {
virtual void setState(const std::string& sessionName, bool force);
virtual qpid::SessionState* getState();
virtual framing::FrameHandler* getInHandler();
- virtual void channelException(uint16_t code, const std::string& msg);
- virtual void connectionException(uint16_t code, const std::string& msg);
+ virtual void channelException(framing::session::DetachCode code, const std::string& msg);
+ virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
virtual void detaching();
virtual void readyToSend();
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index efff4027aa..fc53499fc5 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -30,6 +30,7 @@
using namespace qpid::client;
using namespace qpid::framing;
+using namespace qpid::framing::connection;
namespace {
const std::string OK("OK");
@@ -40,10 +41,23 @@ const std::string INVALID_STATE_START("start received in invalid state");
const std::string INVALID_STATE_TUNE("tune received in invalid state");
const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
+
+}
+
+CloseCode ConnectionHandler::convert(uint16_t replyCode)
+{
+ switch (replyCode) {
+ case 200: return CLOSE_CODE_NORMAL;
+ case 320: return CLOSE_CODE_CONNECTION_FORCED;
+ case 402: return CLOSE_CODE_INVALID_PATH;
+ case 501: default:
+ return CLOSE_CODE_FRAMING_ERROR;
+ }
}
ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, ProtocolVersion& v)
- : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(200), version(v)
+ : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler),
+ errorCode(CLOSE_CODE_NORMAL), version(v)
{
insist = true;
@@ -125,7 +139,7 @@ void ConnectionHandler::checkState(STATES s, const std::string& msg)
void ConnectionHandler::fail(const std::string& message)
{
- errorCode = 502;
+ errorCode = CLOSE_CODE_FRAMING_ERROR;
errorText = message;
QPID_LOG(warning, message);
setState(FAILED);
@@ -177,7 +191,7 @@ void ConnectionHandler::redirect(const std::string& /*host*/, const Array& /*kno
void ConnectionHandler::close(uint16_t replyCode, const std::string& replyText)
{
proxy.closeOk();
- errorCode = replyCode;
+ errorCode = convert(replyCode);
errorText = replyText;
setState(CLOSED);
QPID_LOG(warning, "Broker closed connection: " << replyCode << ", " << replyText);
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h
index 28ca875ace..12323684a5 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h
@@ -29,6 +29,7 @@
#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/Array.h"
+#include "qpid/framing/enum.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/InputHandler.h"
@@ -57,7 +58,7 @@ class ConnectionHandler : private StateManager,
Adapter outHandler;
framing::AMQP_ServerProxy::Connection proxy;
- uint16_t errorCode;
+ framing::connection::CloseCode errorCode;
std::string errorText;
bool insist;
framing::ProtocolVersion version;
@@ -106,6 +107,8 @@ public:
ErrorListener onError;
std::vector<Url> knownBrokersUrls;
+
+ static framing::connection::CloseCode convert(uint16_t replyCode);
};
}}
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index ed296cbd4d..b284fb6312 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -150,7 +150,7 @@ template <class F> void ConnectionImpl::closeInternal(const F& f) {
void ConnectionImpl::closed(uint16_t code, const std::string& text) {
Mutex::ScopedLock l(lock);
- setException(new ConnectionException(code, text));
+ setException(new ConnectionException(ConnectionHandler::convert(code), text));
closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index bcdc4ffe27..ddfba03850 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -125,7 +125,7 @@ bool Connection::checkUnsupported(const AMQBody& body) {
if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster.";
}
if (!message.empty())
- connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0);
+ connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message);
return !message.empty();
}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index ab9f146fd7..e854510eba 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -65,7 +65,7 @@ public:
Message& getMessage() { return msg; }
};
-intrusive_ptr<Message> message(std::string exchange, std::string routingKey) {
+intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0));
AMQFrame header(in_place<AMQHeaderBody>());
@@ -86,7 +86,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
//Test basic delivery:
- intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
msg1->enqueueAsync();//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
@@ -101,7 +101,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
QPID_AUTO_TEST_CASE(testAsyncMessageCount){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
msg1->enqueueAsync();//this is done on enqueue which is not called from process
queue->process(msg1);
@@ -125,9 +125,9 @@ QPID_AUTO_TEST_CASE(testConsumers){
BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
//Test basic delivery:
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
@@ -171,9 +171,9 @@ QPID_AUTO_TEST_CASE(testRegistry){
QPID_AUTO_TEST_CASE(testDequeue){
Queue::shared_ptr queue(new Queue("my_queue", true));
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
intrusive_ptr<Message> received;
queue->deliver(msg1);
@@ -247,9 +247,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
Queue::shared_ptr queue(new Queue("my-queue", true));
queue->configure(args);
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -300,7 +300,7 @@ QPID_AUTO_TEST_CASE(testOptimisticConsume){
Queue::shared_ptr queue(new Queue("my-queue", true, &store));
queue->setLastNodeFailure();
- intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
msg1->forcePersistent();
//change mode
@@ -330,10 +330,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
- intrusive_ptr<Message> msg4 = message("e", "D");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg4 = create_message("e", "D");
intrusive_ptr<Message> received;
//set deliever match for LVQ a,b,c,a
@@ -365,9 +365,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg3.get(), received.get());
- intrusive_ptr<Message> msg5 = message("e", "A");
- intrusive_ptr<Message> msg6 = message("e", "B");
- intrusive_ptr<Message> msg7 = message("e", "C");
+ intrusive_ptr<Message> msg5 = create_message("e", "A");
+ intrusive_ptr<Message> msg6 = create_message("e", "B");
+ intrusive_ptr<Message> msg7 = create_message("e", "C");
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
@@ -397,11 +397,11 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "B");
- intrusive_ptr<Message> msg3 = message("e", "C");
- intrusive_ptr<Message> msg4 = message("e", "D");
- intrusive_ptr<Message> msg5 = message("e", "F");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg4 = create_message("e", "D");
+ intrusive_ptr<Message> msg5 = create_message("e", "F");
//set deliever match for LVQ a,b,c,a
@@ -450,8 +450,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
queue1->configure(args);
queue2->configure(args);
- intrusive_ptr<Message> msg1 = message("e", "A");
- intrusive_ptr<Message> msg2 = message("e", "A");
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "A");
string key;
args.getLVQKey(key);
@@ -475,7 +475,7 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = message("exchange", "key");
+ intrusive_ptr<Message> m = create_message("exchange", "key");
if (i % 2) {
if (oddTtl) m->getProperties<DeliveryProperties>()->setTtl(oddTtl);
} else {