summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-20 12:10:37 +0000
committerGordon Sim <gsim@apache.org>2008-04-20 12:10:37 +0000
commit0637677cf6653256b67c82dcb74f35133601220c (patch)
tree8507bb8373e8b6dfd8c9b96fcb4b262fd4d61501 /cpp/src/qpid/broker
parent48dab065ef526f68a5a7d4c4ba22c5b8b2e2e026 (diff)
downloadqpid-python-0637677cf6653256b67c82dcb74f35133601220c.tar.gz
QPID-920: converted c++ client to use final 0-10 protocol
* connection handler converted to using invoker & proxy and updated to final method defs * SessionCore & ExecutionHandler replace by SessionImpl * simplified handling of completion & results, removed handling of responses git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.cpp20
-rw-r--r--cpp/src/qpid/broker/BrokerAdapter.h6
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp40
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.h14
-rw-r--r--cpp/src/qpid/broker/Message.cpp6
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.cpp23
-rw-r--r--cpp/src/qpid/broker/MessageAdapter.h5
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp17
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp5
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp9
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp77
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h32
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp12
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h1
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp32
17 files changed, 160 insertions, 143 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp
index ea964ef3a3..b83a275959 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.cpp
+++ b/cpp/src/qpid/broker/BrokerAdapter.cpp
@@ -106,17 +106,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri
getBroker().getExchanges().destroy(name);
}
-ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
+ExchangeXQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
{
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ return ExchangeXQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- return ExchangeQueryResult("", false, true, FieldTable());
+ return ExchangeXQueryResult("", false, true, FieldTable());
}
}
-BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
+BindingXQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/,
const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
@@ -133,27 +133,27 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/
}
if (!exchange) {
- return BindingQueryResult(true, false, false, false, false);
+ return BindingXQueryResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- return BindingQueryResult(false, true, false, false, false);
+ return BindingXQueryResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- return BindingQueryResult(false, false, false, false, false);
+ return BindingXQueryResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
+ return BindingXQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
-QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
+QueueXQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
{
Queue::shared_ptr queue = state.getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
- return QueueQueryResult(queue->getName(),
+ return QueueXQueryResult(queue->getName(),
alternateExchange ? alternateExchange->getName() : "",
queue->isDurable(),
queue->hasExclusiveOwner(),
diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h
index b28c4ebdcc..26dfe802e1 100644
--- a/cpp/src/qpid/broker/BrokerAdapter.h
+++ b/cpp/src/qpid/broker/BrokerAdapter.h
@@ -111,7 +111,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
const qpid::framing::FieldTable& arguments);
void delete_(uint16_t ticket,
const std::string& exchange, bool ifUnused);
- framing::ExchangeQueryResult query(u_int16_t ticket,
+ framing::ExchangeXQueryResult query(u_int16_t ticket,
const std::string& name);
private:
void checkType(shared_ptr<Exchange> exchange, const std::string& type);
@@ -127,7 +127,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
public:
BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {}
- framing::BindingQueryResult query(u_int16_t ticket,
+ framing::BindingXQueryResult query(u_int16_t ticket,
const std::string& exchange,
const std::string& queue,
const std::string& routingKey,
@@ -154,7 +154,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
const std::string& exchange,
const std::string& routingKey,
const qpid::framing::FieldTable& arguments );
- framing::QueueQueryResult query(const std::string& queue);
+ framing::QueueXQueryResult query(const std::string& queue);
void purge(uint16_t ticket, const std::string& queue);
void delete_(uint16_t ticket, const std::string& queue,
bool ifUnused, bool ifEmpty);
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 0e91c081c0..79f9064b9d 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -22,8 +22,6 @@
#include "ConnectionHandler.h"
#include "Connection.h"
-#include "qpid/framing/ConnectionStartBody.h"
-#include "qpid/framing/Connection010StartBody.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 533872e849..61ab856fa9 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -36,7 +36,7 @@ void DtxHandlerImpl::select()
state.selectDtx();
}
-DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
+DtxDemarcationXEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
const string& xid,
bool fail,
bool suspend)
@@ -47,7 +47,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
- return DtxDemarcationEndResult(XA_RBROLLBACK);
+ return DtxDemarcationXEndResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -55,14 +55,14 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
} else {
state.endDtx(xid, false);
}
- return DtxDemarcationEndResult(XA_OK);
+ return DtxDemarcationXEndResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- return DtxDemarcationEndResult(XA_RBTIMEOUT);
+ return DtxDemarcationXEndResult(XA_RBTIMEOUT);
}
}
-DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
+DtxDemarcationXStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
const string& xid,
bool join,
bool resume)
@@ -76,50 +76,50 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/,
} else {
state.startDtx(xid, getBroker().getDtxManager(), join);
}
- return DtxDemarcationStartResult(XA_OK);
+ return DtxDemarcationXStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return DtxDemarcationStartResult(XA_RBTIMEOUT);
+ return DtxDemarcationXStartResult(XA_RBTIMEOUT);
}
}
// DtxCoordinationHandler:
-DtxCoordinationPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
+DtxCoordinationXPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/,
const string& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(xid);
- return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCoordinationXPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return DtxCoordinationPrepareResult(XA_RBTIMEOUT);
+ return DtxCoordinationXPrepareResult(XA_RBTIMEOUT);
}
}
-DtxCoordinationCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/,
+DtxCoordinationXCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/,
const string& xid,
bool onePhase)
{
try {
bool ok = getBroker().getDtxManager().commit(xid, onePhase);
- return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCoordinationXCommitResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return DtxCoordinationCommitResult(XA_RBTIMEOUT);
+ return DtxCoordinationXCommitResult(XA_RBTIMEOUT);
}
}
-DtxCoordinationRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
+DtxCoordinationXRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/,
const string& xid )
{
try {
getBroker().getDtxManager().rollback(xid);
- return DtxCoordinationRollbackResult(XA_OK);
+ return DtxCoordinationXRollbackResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return DtxCoordinationRollbackResult(XA_RBTIMEOUT);
+ return DtxCoordinationXRollbackResult(XA_RBTIMEOUT);
}
}
-DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
+DtxCoordinationXRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
bool /*startscan*/,
bool /*endscan*/ )
{
@@ -144,7 +144,7 @@ DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/,
data.push_back(*i);
}
Array indoubt(data);
- return DtxCoordinationRecoverResult(indoubt);
+ return DtxCoordinationXRecoverResult(indoubt);
}
void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
@@ -154,10 +154,10 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/,
throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
}
-DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
+DtxCoordinationXGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(xid);
- return DtxCoordinationGetTimeoutResult(timeout);
+ return DtxCoordinationXGetTimeoutResult(timeout);
}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h
index 5bc9d5142a..efb56dba95 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.h
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.h
@@ -36,27 +36,27 @@ public:
// DtxCoordinationHandler:
- framing::DtxCoordinationCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase);
+ framing::DtxCoordinationXCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase);
void forget(u_int16_t ticket, const std::string& xid);
- framing::DtxCoordinationGetTimeoutResult getTimeout(const std::string& xid);
+ framing::DtxCoordinationXGetTimeoutResult getTimeout(const std::string& xid);
- framing::DtxCoordinationPrepareResult prepare(u_int16_t ticket, const std::string& xid);
+ framing::DtxCoordinationXPrepareResult prepare(u_int16_t ticket, const std::string& xid);
- framing::DtxCoordinationRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan);
+ framing::DtxCoordinationXRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan);
- framing::DtxCoordinationRollbackResult rollback(u_int16_t ticket, const std::string& xid);
+ framing::DtxCoordinationXRollbackResult rollback(u_int16_t ticket, const std::string& xid);
void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout);
// DtxDemarcationHandler:
- framing::DtxDemarcationEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
+ framing::DtxDemarcationXEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend);
void select();
- framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
+ framing::DtxDemarcationXStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume);
};
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index b60a95228d..297e610418 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -22,9 +22,9 @@
#include "Message.h"
#include "ExchangeRegistry.h"
#include "qpid/framing/frame_functors.h"
-#include "qpid/framing/BasicPublishBody.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageXTransferBody.h"
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
@@ -225,9 +225,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
MessageAdapter& Message::getAdapter() const
{
if (!adapter) {
- if(frames.isA<MessageTransferBody>()) {
+ if(frames.isA<MessageXTransferBody>()) {
adapter = &TRANSFER_99_0;
- } else if(frames.isA<Message010TransferBody>()) {
+ } else if(frames.isA<MessageTransferBody>()) {
adapter = &TRANSFER;
} else {
const AMQMethodBody* method = frames.getMethod();
diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp
index 0e99d923d4..013e2c91ac 100644
--- a/cpp/src/qpid/broker/MessageAdapter.cpp
+++ b/cpp/src/qpid/broker/MessageAdapter.cpp
@@ -21,6 +21,11 @@
#include "MessageAdapter.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageXTransferBody.h"
+
namespace {
const std::string empty;
}
@@ -30,13 +35,13 @@ namespace broker{
std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f)
{
- const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>();
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
return p ? p->getRoutingKey() : empty;
}
std::string TransferAdapter::getExchange(const framing::FrameSet& f)
{
- return f.as<framing::Message010TransferBody>()->getDestination();
+ return f.as<framing::MessageTransferBody>()->getDestination();
}
bool TransferAdapter::isImmediate(const framing::FrameSet&)
@@ -47,42 +52,42 @@ namespace broker{
const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f)
{
- const framing::MessageProperties010* p = f.getHeaders()->get<framing::MessageProperties010>();
+ const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
return p ? &(p->getApplicationHeaders()) : 0;
}
bool TransferAdapter::isPersistent(const framing::FrameSet& f)
{
- const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>();
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
return p && p->getDeliveryMode() == 2;
}
bool TransferAdapter::requiresAccept(const framing::FrameSet& f)
{
- const framing::Message010TransferBody* b = f.as<framing::Message010TransferBody>();
+ const framing::MessageTransferBody* b = f.as<framing::MessageTransferBody>();
return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/;
}
std::string PreviewAdapter::getExchange(const framing::FrameSet& f)
{
- return f.as<framing::MessageTransferBody>()->getDestination();
+ return f.as<framing::MessageXTransferBody>()->getDestination();
}
std::string PreviewAdapter::getRoutingKey(const framing::FrameSet& f)
{
- const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>();
return p ? p->getRoutingKey() : empty;
}
const framing::FieldTable* PreviewAdapter::getApplicationHeaders(const framing::FrameSet& f)
{
- const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+ const framing::PreviewMessageProperties* p = f.getHeaders()->get<framing::PreviewMessageProperties>();
return p ? &(p->getApplicationHeaders()) : 0;
}
bool PreviewAdapter::isPersistent(const framing::FrameSet& f)
{
- const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>();
return p && p->getDeliveryMode() == 2;
}
diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h
index 9759f320ac..4c13e756e9 100644
--- a/cpp/src/qpid/broker/MessageAdapter.h
+++ b/cpp/src/qpid/broker/MessageAdapter.h
@@ -23,13 +23,8 @@
*/
#include <string>
-#include "qpid/framing/BasicPublishBody.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FrameSet.h"
-#include "qpid/framing/DeliveryProperties.h"
-#include "qpid/framing/MessageProperties.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/Message010TransferBody.h"
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index 9ef7090cd9..36862edf37 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -24,9 +24,10 @@
#include "Message.h"
#include "Queue.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/BasicXDeliverBody.h"
+#include "qpid/framing/BasicXGetOkBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageXTransferBody.h"
using namespace boost;
@@ -52,7 +53,7 @@ struct BasicGetToken : BaseToken
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
{
- return AMQFrame(in_place<BasicGetOkBody>(
+ return AMQFrame(in_place<BasicXGetOkBody>(
ProtocolVersion(), id.getValue(),
msg->getRedelivered(), msg->getExchangeName(),
msg->getRoutingKey(), queue->getMessageCount()));
@@ -69,7 +70,7 @@ struct BasicConsumeToken : BaseToken
AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
{
- return AMQFrame(in_place<BasicDeliverBody>(
+ return AMQFrame(in_place<BasicXDeliverBody>(
ProtocolVersion(), consumer, id.getValue(),
msg->getRedelivered(), msg->getExchangeName(),
msg->getRoutingKey()));
@@ -92,16 +93,16 @@ struct MessageDeliveryToken : BaseToken
//may need to set the redelivered flag:
if (isPreview) {
if (msg->getRedelivered()){
- msg->getProperties<DeliveryProperties>()->setRedelivered(true);
+ msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true);
}
- return AMQFrame(in_place<MessageTransferBody>(
+ return AMQFrame(in_place<MessageXTransferBody>(
ProtocolVersion(), 0, destination,
confirmMode, acquireMode));
} else {
if (msg->getRedelivered()){
- msg->getProperties<DeliveryProperties010>()->setRedelivered(true);
+ msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
- return AMQFrame(in_place<Message010TransferBody>(
+ return AMQFrame(in_place<MessageTransferBody>(
ProtocolVersion(), destination, confirmMode, acquireMode));
}
}
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index 64c0282963..5e0e759dfb 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -23,8 +23,6 @@
#include "Connection.h"
#include "Broker.h"
#include "MessageDelivery.h"
-#include "qpid/framing/MessageAppendBody.h"
-#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "BrokerAdapter.h"
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index eb45ff1492..411e0ce3c0 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -24,8 +24,7 @@
#include "SessionContext.h"
#include "BrokerAdapter.h"
#include "MessageDelivery.h"
-#include "qpid/framing/ExecutionCompleteBody.h"
-#include "qpid/framing/ExecutionResultBody.h"
+#include "qpid/framing/ExecutionXCompleteBody.h"
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
@@ -182,7 +181,7 @@ SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
classId = frame.castBody<AMQMethodBody>()->amqpClassId();
switch (classId) {
- case ExecutionCompleteBody::CLASS_ID:
+ case ExecutionXCompleteBody::CLASS_ID:
return EXECUTION_CONTROL_TRACK;
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 2251901340..c2f6e3c307 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -34,7 +34,7 @@
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/Message010TransferBody.h"
+#include "qpid/framing/MessageXTransferBody.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -351,10 +351,11 @@ void SemanticState::handle(intrusive_ptr<Message> msg) {
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
- if (msg->isA<MessageTransferBody>()) {
+ //TODO: the following should be hidden behind message (using MessageAdapter or similar)
+ if (msg->isA<MessageXTransferBody>()) {
+ msg->getProperties<PreviewDeliveryProperties>()->setExchange(exchangeName);
+ } else if (msg->isA<MessageTransferBody>()) {
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
- } else if (msg->isA<Message010TransferBody>()) {
- msg->getProperties<DeliveryProperties010>()->setExchange(exchangeName);
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index b7985e9ed8..3ad29e6271 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -107,13 +107,13 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU
getBroker().getExchanges().destroy(name);
}
-Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
+ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
{
try {
Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
- return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
+ return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
} catch (const ChannelException& e) {
- return Exchange010QueryResult("", false, true, FieldTable());
+ return ExchangeQueryResult("", false, true, FieldTable());
}
}
void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
@@ -154,7 +154,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
}
-Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
+ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
const std::string& queueName,
const std::string& key,
const framing::FieldTable& args)
@@ -170,18 +170,18 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str
}
if (!exchange) {
- return Exchange010BoundResult(true, false, false, false, false);
+ return ExchangeBoundResult(true, false, false, false, false);
} else if (!queueName.empty() && !queue) {
- return Exchange010BoundResult(false, true, false, false, false);
+ return ExchangeBoundResult(false, true, false, false, false);
} else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) {
- return Exchange010BoundResult(false, false, false, false, false);
+ return ExchangeBoundResult(false, false, false, false, false);
} else {
//need to test each specified option individually
bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0);
bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0);
bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args);
- return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched);
+ return ExchangeBoundResult(false, false, !queueMatched, !keyMatched, !argsMatched);
}
}
@@ -191,6 +191,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han
SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
{
+ destroyExclusiveQueues();
+}
+
+void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues()
+{
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -200,6 +205,7 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
+
bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
{
@@ -207,12 +213,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
}
-Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
+QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
Queue::shared_ptr queue = getQueue(name);
Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
- return Queue010QueryResult(queue->getName(),
+ return QueueQueryResult(queue->getName(),
alternateExchange ? alternateExchange->getName() : "",
queue->isDurable(),
queue->hasExclusiveOwner(),
@@ -313,6 +319,7 @@ void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/,
uint8_t /*acquireMode*/)
{
//not yet used (content containing assemblies treated differently at present
+ std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" << std::endl;
}
void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered)
@@ -396,7 +403,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm
commands.for_each(acceptOp);
}
-framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
+framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
{
//TODO: change this when SequenceNumberSet is deleted along with preview code
SequenceNumberSet results;
@@ -408,7 +415,7 @@ framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(con
RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2);
results.processRanges(g);
- return Message010AcquireResult(acquisitions);
+ return MessageAcquireResult(acquisitions);
}
@@ -450,7 +457,7 @@ void SessionAdapter::TxHandlerImpl::rollback()
state.rollback();
}
-std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid)
+std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid)
{
std::string encoded;
encode(xid, encoded);
@@ -462,7 +469,7 @@ void SessionAdapter::DtxHandlerImpl::select()
state.selectDtx();
}
-Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
+DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid,
bool fail,
bool suspend)
{
@@ -472,7 +479,7 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
if (suspend) {
throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
} else {
- return Dtx010EndResult(XA_RBROLLBACK);
+ return DtxEndResult(XA_RBROLLBACK);
}
} else {
if (suspend) {
@@ -480,14 +487,14 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid,
} else {
state.endDtx(convert(xid), false);
}
- return Dtx010EndResult(XA_OK);
+ return DtxEndResult(XA_OK);
}
} catch (const DtxTimeoutException& e) {
- return Dtx010EndResult(XA_RBTIMEOUT);
+ return DtxEndResult(XA_RBTIMEOUT);
}
}
-Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
+DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid,
bool join,
bool resume)
{
@@ -500,45 +507,45 @@ Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid,
} else {
state.startDtx(convert(xid), getBroker().getDtxManager(), join);
}
- return Dtx010StartResult(XA_OK);
+ return DtxStartResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return Dtx010StartResult(XA_RBTIMEOUT);
+ return DtxStartResult(XA_RBTIMEOUT);
}
}
-Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid)
+DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid)
{
try {
bool ok = getBroker().getDtxManager().prepare(convert(xid));
- return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return Dtx010PrepareResult(XA_RBTIMEOUT);
+ return DtxPrepareResult(XA_RBTIMEOUT);
}
}
-Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid,
+DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid,
bool onePhase)
{
try {
bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase);
- return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK);
+ return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK);
} catch (const DtxTimeoutException& e) {
- return Dtx010CommitResult(XA_RBTIMEOUT);
+ return DtxCommitResult(XA_RBTIMEOUT);
}
}
-Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid)
+DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid)
{
try {
getBroker().getDtxManager().rollback(convert(xid));
- return Dtx010RollbackResult(XA_OK);
+ return DtxRollbackResult(XA_OK);
} catch (const DtxTimeoutException& e) {
- return Dtx010RollbackResult(XA_RBTIMEOUT);
+ return DtxRollbackResult(XA_RBTIMEOUT);
}
}
-Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
+DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover()
{
std::set<std::string> xids;
getBroker().getStore().collectPreparedXids(xids);
@@ -550,23 +557,23 @@ Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover()
boost::shared_ptr<FieldValue> xid(new Struct32Value(*i));
indoubt.add(xid);
}
- return Dtx010RecoverResult(indoubt);
+ return DtxRecoverResult(indoubt);
}
-void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid)
+void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid)
{
//Currently no heuristic completion is supported, so this should never be used.
throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!"));
}
-Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid)
+DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid)
{
uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid));
- return Dtx010GetTimeoutResult(timeout);
+ return DtxGetTimeoutResult(timeout);
}
-void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid,
+void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid,
u_int32_t timeout)
{
getBroker().getDtxManager().setTimeout(convert(xid), timeout);
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index 0cbbd13777..a80e2b0776 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -83,6 +83,8 @@ class Queue;
Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); }
Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); }
+ void destroyExclusiveQueues() { queueImpl.destroyExclusiveQueues(); }
+
private:
//common base for utility methods etc that are specific to this adapter
struct HandlerHelper : public HandlerImpl
@@ -105,14 +107,14 @@ class Queue;
bool passive, bool durable, bool autoDelete,
const qpid::framing::FieldTable& arguments);
void delete_(const std::string& exchange, bool ifUnused);
- framing::Exchange010QueryResult query(const std::string& name);
+ framing::ExchangeQueryResult query(const std::string& name);
void bind(const std::string& queue,
const std::string& exchange, const std::string& routingKey,
const qpid::framing::FieldTable& arguments);
void unbind(const std::string& queue,
const std::string& exchange,
const std::string& routingKey);
- framing::Exchange010BoundResult bound(const std::string& exchange,
+ framing::ExchangeBoundResult bound(const std::string& exchange,
const std::string& queue,
const std::string& routingKey,
const framing::FieldTable& arguments);
@@ -141,8 +143,10 @@ class Queue;
void delete_(const std::string& queue,
bool ifUnused, bool ifEmpty);
void purge(const std::string& queue);
- framing::Queue010QueryResult query(const std::string& queue);
+ framing::QueueQueryResult query(const std::string& queue);
bool isLocal(const ConnectionToken* t) const;
+
+ void destroyExclusiveQueues();
};
class MessageHandlerImpl :
@@ -170,7 +174,7 @@ class Queue;
void release(const framing::SequenceSet& commands,
bool setRedelivered);
- framing::Message010AcquireResult acquire(const framing::SequenceSet&);
+ framing::MessageAcquireResult acquire(const framing::SequenceSet&);
void subscribe(const string& queue,
const string& destination,
@@ -225,35 +229,35 @@ class Queue;
class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper, private framing::StructHelper
{
- std::string convert(const framing::Xid010& xid);
+ std::string convert(const framing::Xid& xid);
public:
DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void select();
- framing::Dtx010StartResult start(const framing::Xid010& xid,
+ framing::DtxStartResult start(const framing::Xid& xid,
bool join,
bool resume);
- framing::Dtx010EndResult end(const framing::Xid010& xid,
+ framing::DtxEndResult end(const framing::Xid& xid,
bool fail,
bool suspend);
- framing::Dtx010CommitResult commit(const framing::Xid010& xid,
+ framing::DtxCommitResult commit(const framing::Xid& xid,
bool onePhase);
- void forget(const framing::Xid010& xid);
+ void forget(const framing::Xid& xid);
- framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid);
+ framing::DtxGetTimeoutResult getTimeout(const framing::Xid& xid);
- framing::Dtx010PrepareResult prepare(const framing::Xid010& xid);
+ framing::DtxPrepareResult prepare(const framing::Xid& xid);
- framing::Dtx010RecoverResult recover();
+ framing::DtxRecoverResult recover();
- framing::Dtx010RollbackResult rollback(const framing::Xid010& xid);
+ framing::DtxRollbackResult rollback(const framing::Xid& xid);
- void setTimeout(const framing::Xid010& xid, uint32_t timeout);
+ void setTimeout(const framing::Xid& xid, uint32_t timeout);
};
ExchangeHandlerImpl exchangeImpl;
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 3baa3a89a7..0b1e744e25 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -77,12 +77,6 @@ void SessionHandler::handleIn(AMQFrame& f) {
}
}
-void SessionHandler::destroy() {
- ignoring=true; // Ignore trailing frames sent by client.
- session->detach();
- session.reset();
-}
-
void SessionHandler::handleOut(AMQFrame& f) {
channel.handle(f); // Send it.
if (session->sent(f))
@@ -160,12 +154,12 @@ void SessionHandler::detached(const std::string& name, uint8_t code)
void SessionHandler::requestTimeout(uint32_t t)
{
session->setTimeout(t);
- //proxy.timeout(t);
+ peerSession.timeout(t);
}
-void SessionHandler::timeout(uint32_t)
+void SessionHandler::timeout(uint32_t t)
{
- //not sure what we need to do on the server for this...
+ session->setTimeout(t);
}
void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset)
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index fa013a1c15..4b031f2951 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -70,7 +70,6 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler,
void localSuspend();
void detach() { localSuspend(); }
void sendCompletion();
- void destroy();
protected:
void handleIn(framing::AMQFrame&);
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 64d62934b9..3c6bed4344 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -25,6 +25,9 @@
#include "SemanticHandler.h"
#include "SessionManager.h"
#include "SessionHandler.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/ServerInvoker.h"
@@ -182,7 +185,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber&
completed.add(id);
if (!invocation.wasHandled()) {
- throw NotImplementedException("Not implemented");
+ throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
nextOut++;//execution result is now a command, so the counter must be incremented
getProxy().getExecution010().result(id, invocation.getResult());
@@ -206,6 +209,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id)
}
msgBuilder.handle(frame);
if (frame.getEof() && frame.getEos()) {//end of frameset
+ if (frame.getBof()) {
+ //i.e this is a just a command frame, add a dummy header
+ AMQFrame header;
+ header.setBody(AMQHeaderBody());
+ header.setBof(false);
+ header.setEof(false);
+ msg->getFrames().append(header);
+ }
msg->setPublisher(&getConnection());
semanticState.handle(msg);
msgBuilder.end();
@@ -242,13 +253,14 @@ void SessionState::handle(AMQFrame& frame)
SequenceNumber commandId;
try {
//TODO: make command handling more uniform, regardless of whether
- //commands carry content. (For now, assume all single frame
- //assemblies are non-content bearing and all content-bearing
- //assemblies will have more than one frame):
- if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
- } else {
+ //commands carry content.
+ AMQMethodBody* m = frame.getMethod();
+ if (m == 0 || m->isContentBearing()) {
handleContent(frame, commandId);
+ } else if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod(), commandId);
+ } else {
+ throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
} catch(const SessionException& e) {
//TODO: better implementation of new exception handling mechanism
@@ -263,7 +275,11 @@ void SessionState::handle(AMQFrame& frame)
} else {
getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
}
- handler->destroy();
+ timeout = 0;
+ //The python client doesn't currently detach on receiving an exception
+ //so the session state isn't destroyed. This is a temporary workaround
+ //until that is addressed
+ adapter.destroyExclusiveQueues();
}
}