diff options
| author | Gordon Sim <gsim@apache.org> | 2008-04-20 12:10:37 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-04-20 12:10:37 +0000 |
| commit | 0637677cf6653256b67c82dcb74f35133601220c (patch) | |
| tree | 8507bb8373e8b6dfd8c9b96fcb4b262fd4d61501 /cpp/src/qpid/broker | |
| parent | 48dab065ef526f68a5a7d4c4ba22c5b8b2e2e026 (diff) | |
| download | qpid-python-0637677cf6653256b67c82dcb74f35133601220c.tar.gz | |
QPID-920: converted c++ client to use final 0-10 protocol
* connection handler converted to using invoker & proxy and updated to final method defs
* SessionCore & ExecutionHandler replace by SessionImpl
* simplified handling of completion & results, removed handling of responses
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@649915 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 20 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.h | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageAdapter.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageDelivery.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 77 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 32 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 32 |
17 files changed, 160 insertions, 143 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index ea964ef3a3..b83a275959 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -106,17 +106,17 @@ void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const stri getBroker().getExchanges().destroy(name); } -ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) +ExchangeXQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) { try { Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); + return ExchangeXQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - return ExchangeQueryResult("", false, true, FieldTable()); + return ExchangeXQueryResult("", false, true, FieldTable()); } } -BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, +BindingXQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, const std::string& exchangeName, const std::string& queueName, const std::string& key, @@ -133,27 +133,27 @@ BindingQueryResult BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/ } if (!exchange) { - return BindingQueryResult(true, false, false, false, false); + return BindingXQueryResult(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - return BindingQueryResult(false, true, false, false, false); + return BindingXQueryResult(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - return BindingQueryResult(false, false, false, false, false); + return BindingXQueryResult(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - return BindingQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched); + return BindingXQueryResult(false, false, !queueMatched, !keyMatched, !argsMatched); } } -QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) +QueueXQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name) { Queue::shared_ptr queue = state.getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - return QueueQueryResult(queue->getName(), + return QueueXQueryResult(queue->getName(), alternateExchange ? alternateExchange->getName() : "", queue->isDurable(), queue->hasExclusiveOwner(), diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index b28c4ebdcc..26dfe802e1 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -111,7 +111,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations const qpid::framing::FieldTable& arguments); void delete_(uint16_t ticket, const std::string& exchange, bool ifUnused); - framing::ExchangeQueryResult query(u_int16_t ticket, + framing::ExchangeXQueryResult query(u_int16_t ticket, const std::string& name); private: void checkType(shared_ptr<Exchange> exchange, const std::string& type); @@ -127,7 +127,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations public: BindingHandlerImpl(SemanticState& session) : HandlerImpl(session) {} - framing::BindingQueryResult query(u_int16_t ticket, + framing::BindingXQueryResult query(u_int16_t ticket, const std::string& exchange, const std::string& queue, const std::string& routingKey, @@ -154,7 +154,7 @@ class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments ); - framing::QueueQueryResult query(const std::string& queue); + framing::QueueXQueryResult query(const std::string& queue); void purge(uint16_t ticket, const std::string& queue); void delete_(uint16_t ticket, const std::string& queue, bool ifUnused, bool ifEmpty); diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 0e91c081c0..79f9064b9d 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -22,8 +22,6 @@ #include "ConnectionHandler.h" #include "Connection.h" -#include "qpid/framing/ConnectionStartBody.h" -#include "qpid/framing/Connection010StartBody.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 533872e849..61ab856fa9 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -36,7 +36,7 @@ void DtxHandlerImpl::select() state.selectDtx(); } -DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, +DtxDemarcationXEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, const string& xid, bool fail, bool suspend) @@ -47,7 +47,7 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, if (suspend) { throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { - return DtxDemarcationEndResult(XA_RBROLLBACK); + return DtxDemarcationXEndResult(XA_RBROLLBACK); } } else { if (suspend) { @@ -55,14 +55,14 @@ DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/, } else { state.endDtx(xid, false); } - return DtxDemarcationEndResult(XA_OK); + return DtxDemarcationXEndResult(XA_OK); } } catch (const DtxTimeoutException& e) { - return DtxDemarcationEndResult(XA_RBTIMEOUT); + return DtxDemarcationXEndResult(XA_RBTIMEOUT); } } -DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, +DtxDemarcationXStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, const string& xid, bool join, bool resume) @@ -76,50 +76,50 @@ DtxDemarcationStartResult DtxHandlerImpl::start(u_int16_t /*ticket*/, } else { state.startDtx(xid, getBroker().getDtxManager(), join); } - return DtxDemarcationStartResult(XA_OK); + return DtxDemarcationXStartResult(XA_OK); } catch (const DtxTimeoutException& e) { - return DtxDemarcationStartResult(XA_RBTIMEOUT); + return DtxDemarcationXStartResult(XA_RBTIMEOUT); } } // DtxCoordinationHandler: -DtxCoordinationPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/, +DtxCoordinationXPrepareResult DtxHandlerImpl::prepare(u_int16_t /*ticket*/, const string& xid) { try { bool ok = getBroker().getDtxManager().prepare(xid); - return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxCoordinationXPrepareResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return DtxCoordinationPrepareResult(XA_RBTIMEOUT); + return DtxCoordinationXPrepareResult(XA_RBTIMEOUT); } } -DtxCoordinationCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/, +DtxCoordinationXCommitResult DtxHandlerImpl::commit(u_int16_t /*ticket*/, const string& xid, bool onePhase) { try { bool ok = getBroker().getDtxManager().commit(xid, onePhase); - return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxCoordinationXCommitResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return DtxCoordinationCommitResult(XA_RBTIMEOUT); + return DtxCoordinationXCommitResult(XA_RBTIMEOUT); } } -DtxCoordinationRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/, +DtxCoordinationXRollbackResult DtxHandlerImpl::rollback(u_int16_t /*ticket*/, const string& xid ) { try { getBroker().getDtxManager().rollback(xid); - return DtxCoordinationRollbackResult(XA_OK); + return DtxCoordinationXRollbackResult(XA_OK); } catch (const DtxTimeoutException& e) { - return DtxCoordinationRollbackResult(XA_RBTIMEOUT); + return DtxCoordinationXRollbackResult(XA_RBTIMEOUT); } } -DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, +DtxCoordinationXRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, bool /*startscan*/, bool /*endscan*/ ) { @@ -144,7 +144,7 @@ DtxCoordinationRecoverResult DtxHandlerImpl::recover(u_int16_t /*ticket*/, data.push_back(*i); } Array indoubt(data); - return DtxCoordinationRecoverResult(indoubt); + return DtxCoordinationXRecoverResult(indoubt); } void DtxHandlerImpl::forget(u_int16_t /*ticket*/, @@ -154,10 +154,10 @@ void DtxHandlerImpl::forget(u_int16_t /*ticket*/, throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); } -DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) +DtxCoordinationXGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid) { uint32_t timeout = getBroker().getDtxManager().getTimeout(xid); - return DtxCoordinationGetTimeoutResult(timeout); + return DtxCoordinationXGetTimeoutResult(timeout); } diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index 5bc9d5142a..efb56dba95 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -36,27 +36,27 @@ public: // DtxCoordinationHandler: - framing::DtxCoordinationCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase); + framing::DtxCoordinationXCommitResult commit(u_int16_t ticket, const std::string& xid, bool onePhase); void forget(u_int16_t ticket, const std::string& xid); - framing::DtxCoordinationGetTimeoutResult getTimeout(const std::string& xid); + framing::DtxCoordinationXGetTimeoutResult getTimeout(const std::string& xid); - framing::DtxCoordinationPrepareResult prepare(u_int16_t ticket, const std::string& xid); + framing::DtxCoordinationXPrepareResult prepare(u_int16_t ticket, const std::string& xid); - framing::DtxCoordinationRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan); + framing::DtxCoordinationXRecoverResult recover(u_int16_t ticket, bool startscan, bool endscan); - framing::DtxCoordinationRollbackResult rollback(u_int16_t ticket, const std::string& xid); + framing::DtxCoordinationXRollbackResult rollback(u_int16_t ticket, const std::string& xid); void setTimeout(u_int16_t ticket, const std::string& xid, u_int32_t timeout); // DtxDemarcationHandler: - framing::DtxDemarcationEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend); + framing::DtxDemarcationXEndResult end(u_int16_t ticket, const std::string& xid, bool fail, bool suspend); void select(); - framing::DtxDemarcationStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume); + framing::DtxDemarcationXStartResult start(u_int16_t ticket, const std::string& xid, bool join, bool resume); }; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index b60a95228d..297e610418 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -22,9 +22,9 @@ #include "Message.h" #include "ExchangeRegistry.h" #include "qpid/framing/frame_functors.h" -#include "qpid/framing/BasicPublishBody.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/MessageXTransferBody.h" #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" @@ -225,9 +225,9 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) MessageAdapter& Message::getAdapter() const { if (!adapter) { - if(frames.isA<MessageTransferBody>()) { + if(frames.isA<MessageXTransferBody>()) { adapter = &TRANSFER_99_0; - } else if(frames.isA<Message010TransferBody>()) { + } else if(frames.isA<MessageTransferBody>()) { adapter = &TRANSFER; } else { const AMQMethodBody* method = frames.getMethod(); diff --git a/cpp/src/qpid/broker/MessageAdapter.cpp b/cpp/src/qpid/broker/MessageAdapter.cpp index 0e99d923d4..013e2c91ac 100644 --- a/cpp/src/qpid/broker/MessageAdapter.cpp +++ b/cpp/src/qpid/broker/MessageAdapter.cpp @@ -21,6 +21,11 @@ #include "MessageAdapter.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/MessageXTransferBody.h" + namespace { const std::string empty; } @@ -30,13 +35,13 @@ namespace broker{ std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f) { - const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>(); + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); return p ? p->getRoutingKey() : empty; } std::string TransferAdapter::getExchange(const framing::FrameSet& f) { - return f.as<framing::Message010TransferBody>()->getDestination(); + return f.as<framing::MessageTransferBody>()->getDestination(); } bool TransferAdapter::isImmediate(const framing::FrameSet&) @@ -47,42 +52,42 @@ namespace broker{ const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f) { - const framing::MessageProperties010* p = f.getHeaders()->get<framing::MessageProperties010>(); + const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>(); return p ? &(p->getApplicationHeaders()) : 0; } bool TransferAdapter::isPersistent(const framing::FrameSet& f) { - const framing::DeliveryProperties010* p = f.getHeaders()->get<framing::DeliveryProperties010>(); + const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); return p && p->getDeliveryMode() == 2; } bool TransferAdapter::requiresAccept(const framing::FrameSet& f) { - const framing::Message010TransferBody* b = f.as<framing::Message010TransferBody>(); + const framing::MessageTransferBody* b = f.as<framing::MessageTransferBody>(); return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/; } std::string PreviewAdapter::getExchange(const framing::FrameSet& f) { - return f.as<framing::MessageTransferBody>()->getDestination(); + return f.as<framing::MessageXTransferBody>()->getDestination(); } std::string PreviewAdapter::getRoutingKey(const framing::FrameSet& f) { - const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>(); return p ? p->getRoutingKey() : empty; } const framing::FieldTable* PreviewAdapter::getApplicationHeaders(const framing::FrameSet& f) { - const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>(); + const framing::PreviewMessageProperties* p = f.getHeaders()->get<framing::PreviewMessageProperties>(); return p ? &(p->getApplicationHeaders()) : 0; } bool PreviewAdapter::isPersistent(const framing::FrameSet& f) { - const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>(); + const framing::PreviewDeliveryProperties* p = f.getHeaders()->get<framing::PreviewDeliveryProperties>(); return p && p->getDeliveryMode() == 2; } diff --git a/cpp/src/qpid/broker/MessageAdapter.h b/cpp/src/qpid/broker/MessageAdapter.h index 9759f320ac..4c13e756e9 100644 --- a/cpp/src/qpid/broker/MessageAdapter.h +++ b/cpp/src/qpid/broker/MessageAdapter.h @@ -23,13 +23,8 @@ */ #include <string> -#include "qpid/framing/BasicPublishBody.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FrameSet.h" -#include "qpid/framing/DeliveryProperties.h" -#include "qpid/framing/MessageProperties.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/Message010TransferBody.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp index 9ef7090cd9..36862edf37 100644 --- a/cpp/src/qpid/broker/MessageDelivery.cpp +++ b/cpp/src/qpid/broker/MessageDelivery.cpp @@ -24,9 +24,10 @@ #include "Message.h" #include "Queue.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/BasicDeliverBody.h" -#include "qpid/framing/BasicGetOkBody.h" +#include "qpid/framing/BasicXDeliverBody.h" +#include "qpid/framing/BasicXGetOkBody.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/MessageXTransferBody.h" using namespace boost; @@ -52,7 +53,7 @@ struct BasicGetToken : BaseToken AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) { - return AMQFrame(in_place<BasicGetOkBody>( + return AMQFrame(in_place<BasicXGetOkBody>( ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey(), queue->getMessageCount())); @@ -69,7 +70,7 @@ struct BasicConsumeToken : BaseToken AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) { - return AMQFrame(in_place<BasicDeliverBody>( + return AMQFrame(in_place<BasicXDeliverBody>( ProtocolVersion(), consumer, id.getValue(), msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey())); @@ -92,16 +93,16 @@ struct MessageDeliveryToken : BaseToken //may need to set the redelivered flag: if (isPreview) { if (msg->getRedelivered()){ - msg->getProperties<DeliveryProperties>()->setRedelivered(true); + msg->getProperties<PreviewDeliveryProperties>()->setRedelivered(true); } - return AMQFrame(in_place<MessageTransferBody>( + return AMQFrame(in_place<MessageXTransferBody>( ProtocolVersion(), 0, destination, confirmMode, acquireMode)); } else { if (msg->getRedelivered()){ - msg->getProperties<DeliveryProperties010>()->setRedelivered(true); + msg->getProperties<DeliveryProperties>()->setRedelivered(true); } - return AMQFrame(in_place<Message010TransferBody>( + return AMQFrame(in_place<MessageTransferBody>( ProtocolVersion(), destination, confirmMode, acquireMode)); } } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 64c0282963..5e0e759dfb 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -23,8 +23,6 @@ #include "Connection.h" #include "Broker.h" #include "MessageDelivery.h" -#include "qpid/framing/MessageAppendBody.h" -#include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" #include "BrokerAdapter.h" diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index eb45ff1492..411e0ce3c0 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -24,8 +24,7 @@ #include "SessionContext.h" #include "BrokerAdapter.h" #include "MessageDelivery.h" -#include "qpid/framing/ExecutionCompleteBody.h" -#include "qpid/framing/ExecutionResultBody.h" +#include "qpid/framing/ExecutionXCompleteBody.h" #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" @@ -182,7 +181,7 @@ SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame) classId = frame.castBody<AMQMethodBody>()->amqpClassId(); switch (classId) { - case ExecutionCompleteBody::CLASS_ID: + case ExecutionXCompleteBody::CLASS_ID: return EXECUTION_CONTROL_TRACK; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 2251901340..c2f6e3c307 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -34,7 +34,7 @@ #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/Message010TransferBody.h" +#include "qpid/framing/MessageXTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" @@ -351,10 +351,11 @@ void SemanticState::handle(intrusive_ptr<Message> msg) { void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); - if (msg->isA<MessageTransferBody>()) { + //TODO: the following should be hidden behind message (using MessageAdapter or similar) + if (msg->isA<MessageXTransferBody>()) { + msg->getProperties<PreviewDeliveryProperties>()->setExchange(exchangeName); + } else if (msg->isA<MessageTransferBody>()) { msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); - } else if (msg->isA<Message010TransferBody>()) { - msg->getProperties<DeliveryProperties010>()->setExchange(exchangeName); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index b7985e9ed8..3ad29e6271 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -107,13 +107,13 @@ void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifU getBroker().getExchanges().destroy(name); } -Exchange010QueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) +ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) { try { Exchange::shared_ptr exchange(getBroker().getExchanges().get(name)); - return Exchange010QueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); + return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs()); } catch (const ChannelException& e) { - return Exchange010QueryResult("", false, true, FieldTable()); + return ExchangeQueryResult("", false, true, FieldTable()); } } void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, @@ -154,7 +154,7 @@ SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, } -Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, +ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, const std::string& queueName, const std::string& key, const framing::FieldTable& args) @@ -170,18 +170,18 @@ Exchange010BoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::str } if (!exchange) { - return Exchange010BoundResult(true, false, false, false, false); + return ExchangeBoundResult(true, false, false, false, false); } else if (!queueName.empty() && !queue) { - return Exchange010BoundResult(false, true, false, false, false); + return ExchangeBoundResult(false, true, false, false, false); } else if (exchange->isBound(queue, key.empty() ? 0 : &key, args.count() > 0 ? &args : &args)) { - return Exchange010BoundResult(false, false, false, false, false); + return ExchangeBoundResult(false, false, false, false, false); } else { //need to test each specified option individually bool queueMatched = queueName.empty() || exchange->isBound(queue, 0, 0); bool keyMatched = key.empty() || exchange->isBound(Queue::shared_ptr(), &key, 0); bool argsMatched = args.count() == 0 || exchange->isBound(Queue::shared_ptr(), 0, &args); - return Exchange010BoundResult(false, false, !queueMatched, !keyMatched, !argsMatched); + return ExchangeBoundResult(false, false, !queueMatched, !keyMatched, !argsMatched); } } @@ -191,6 +191,11 @@ SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : Han SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() { + destroyExclusiveQueues(); +} + +void SessionAdapter::QueueHandlerImpl::destroyExclusiveQueues() +{ while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -200,6 +205,7 @@ SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl() exclusiveQueues.erase(exclusiveQueues.begin()); } } + bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const { @@ -207,12 +213,12 @@ bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const } -Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) +QueueQueryResult SessionAdapter::QueueHandlerImpl::query(const string& name) { Queue::shared_ptr queue = getQueue(name); Exchange::shared_ptr alternateExchange = queue->getAlternateExchange(); - return Queue010QueryResult(queue->getName(), + return QueueQueryResult(queue->getName(), alternateExchange ? alternateExchange->getName() : "", queue->isDurable(), queue->hasExclusiveOwner(), @@ -313,6 +319,7 @@ void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/, uint8_t /*acquireMode*/) { //not yet used (content containing assemblies treated differently at present + std::cout << "SessionAdapter::MessageHandlerImpl::transfer() called" << std::endl; } void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered) @@ -396,7 +403,7 @@ void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& comm commands.for_each(acceptOp); } -framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) +framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers) { //TODO: change this when SequenceNumberSet is deleted along with preview code SequenceNumberSet results; @@ -408,7 +415,7 @@ framing::Message010AcquireResult SessionAdapter::MessageHandlerImpl::acquire(con RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2); results.processRanges(g); - return Message010AcquireResult(acquisitions); + return MessageAcquireResult(acquisitions); } @@ -450,7 +457,7 @@ void SessionAdapter::TxHandlerImpl::rollback() state.rollback(); } -std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid010& xid) +std::string SessionAdapter::DtxHandlerImpl::convert(const framing::Xid& xid) { std::string encoded; encode(xid, encoded); @@ -462,7 +469,7 @@ void SessionAdapter::DtxHandlerImpl::select() state.selectDtx(); } -Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, +DtxEndResult SessionAdapter::DtxHandlerImpl::end(const Xid& xid, bool fail, bool suspend) { @@ -472,7 +479,7 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, if (suspend) { throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set.")); } else { - return Dtx010EndResult(XA_RBROLLBACK); + return DtxEndResult(XA_RBROLLBACK); } } else { if (suspend) { @@ -480,14 +487,14 @@ Dtx010EndResult SessionAdapter::DtxHandlerImpl::end(const Xid010& xid, } else { state.endDtx(convert(xid), false); } - return Dtx010EndResult(XA_OK); + return DtxEndResult(XA_OK); } } catch (const DtxTimeoutException& e) { - return Dtx010EndResult(XA_RBTIMEOUT); + return DtxEndResult(XA_RBTIMEOUT); } } -Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, +DtxStartResult SessionAdapter::DtxHandlerImpl::start(const Xid& xid, bool join, bool resume) { @@ -500,45 +507,45 @@ Dtx010StartResult SessionAdapter::DtxHandlerImpl::start(const Xid010& xid, } else { state.startDtx(convert(xid), getBroker().getDtxManager(), join); } - return Dtx010StartResult(XA_OK); + return DtxStartResult(XA_OK); } catch (const DtxTimeoutException& e) { - return Dtx010StartResult(XA_RBTIMEOUT); + return DtxStartResult(XA_RBTIMEOUT); } } -Dtx010PrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid010& xid) +DtxPrepareResult SessionAdapter::DtxHandlerImpl::prepare(const Xid& xid) { try { bool ok = getBroker().getDtxManager().prepare(convert(xid)); - return Dtx010PrepareResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxPrepareResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return Dtx010PrepareResult(XA_RBTIMEOUT); + return DtxPrepareResult(XA_RBTIMEOUT); } } -Dtx010CommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid010& xid, +DtxCommitResult SessionAdapter::DtxHandlerImpl::commit(const Xid& xid, bool onePhase) { try { bool ok = getBroker().getDtxManager().commit(convert(xid), onePhase); - return Dtx010CommitResult(ok ? XA_OK : XA_RBROLLBACK); + return DtxCommitResult(ok ? XA_OK : XA_RBROLLBACK); } catch (const DtxTimeoutException& e) { - return Dtx010CommitResult(XA_RBTIMEOUT); + return DtxCommitResult(XA_RBTIMEOUT); } } -Dtx010RollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid010& xid) +DtxRollbackResult SessionAdapter::DtxHandlerImpl::rollback(const Xid& xid) { try { getBroker().getDtxManager().rollback(convert(xid)); - return Dtx010RollbackResult(XA_OK); + return DtxRollbackResult(XA_OK); } catch (const DtxTimeoutException& e) { - return Dtx010RollbackResult(XA_RBTIMEOUT); + return DtxRollbackResult(XA_RBTIMEOUT); } } -Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() +DtxRecoverResult SessionAdapter::DtxHandlerImpl::recover() { std::set<std::string> xids; getBroker().getStore().collectPreparedXids(xids); @@ -550,23 +557,23 @@ Dtx010RecoverResult SessionAdapter::DtxHandlerImpl::recover() boost::shared_ptr<FieldValue> xid(new Struct32Value(*i)); indoubt.add(xid); } - return Dtx010RecoverResult(indoubt); + return DtxRecoverResult(indoubt); } -void SessionAdapter::DtxHandlerImpl::forget(const Xid010& xid) +void SessionAdapter::DtxHandlerImpl::forget(const Xid& xid) { //Currently no heuristic completion is supported, so this should never be used. throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid " << xid << " not heuristically completed!")); } -Dtx010GetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid010& xid) +DtxGetTimeoutResult SessionAdapter::DtxHandlerImpl::getTimeout(const Xid& xid) { uint32_t timeout = getBroker().getDtxManager().getTimeout(convert(xid)); - return Dtx010GetTimeoutResult(timeout); + return DtxGetTimeoutResult(timeout); } -void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid010& xid, +void SessionAdapter::DtxHandlerImpl::setTimeout(const Xid& xid, u_int32_t timeout) { getBroker().getDtxManager().setTimeout(convert(xid), timeout); diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index 0cbbd13777..a80e2b0776 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -83,6 +83,8 @@ class Queue; Connection010Handler* getConnection010Handler() { throw framing::NotImplementedException("Class not implemented"); } Session010Handler* getSession010Handler() { throw framing::NotImplementedException("Class not implemented"); } + void destroyExclusiveQueues() { queueImpl.destroyExclusiveQueues(); } + private: //common base for utility methods etc that are specific to this adapter struct HandlerHelper : public HandlerImpl @@ -105,14 +107,14 @@ class Queue; bool passive, bool durable, bool autoDelete, const qpid::framing::FieldTable& arguments); void delete_(const std::string& exchange, bool ifUnused); - framing::Exchange010QueryResult query(const std::string& name); + framing::ExchangeQueryResult query(const std::string& name); void bind(const std::string& queue, const std::string& exchange, const std::string& routingKey, const qpid::framing::FieldTable& arguments); void unbind(const std::string& queue, const std::string& exchange, const std::string& routingKey); - framing::Exchange010BoundResult bound(const std::string& exchange, + framing::ExchangeBoundResult bound(const std::string& exchange, const std::string& queue, const std::string& routingKey, const framing::FieldTable& arguments); @@ -141,8 +143,10 @@ class Queue; void delete_(const std::string& queue, bool ifUnused, bool ifEmpty); void purge(const std::string& queue); - framing::Queue010QueryResult query(const std::string& queue); + framing::QueueQueryResult query(const std::string& queue); bool isLocal(const ConnectionToken* t) const; + + void destroyExclusiveQueues(); }; class MessageHandlerImpl : @@ -170,7 +174,7 @@ class Queue; void release(const framing::SequenceSet& commands, bool setRedelivered); - framing::Message010AcquireResult acquire(const framing::SequenceSet&); + framing::MessageAcquireResult acquire(const framing::SequenceSet&); void subscribe(const string& queue, const string& destination, @@ -225,35 +229,35 @@ class Queue; class DtxHandlerImpl : public Dtx010Handler, public HandlerHelper, private framing::StructHelper { - std::string convert(const framing::Xid010& xid); + std::string convert(const framing::Xid& xid); public: DtxHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void select(); - framing::Dtx010StartResult start(const framing::Xid010& xid, + framing::DtxStartResult start(const framing::Xid& xid, bool join, bool resume); - framing::Dtx010EndResult end(const framing::Xid010& xid, + framing::DtxEndResult end(const framing::Xid& xid, bool fail, bool suspend); - framing::Dtx010CommitResult commit(const framing::Xid010& xid, + framing::DtxCommitResult commit(const framing::Xid& xid, bool onePhase); - void forget(const framing::Xid010& xid); + void forget(const framing::Xid& xid); - framing::Dtx010GetTimeoutResult getTimeout(const framing::Xid010& xid); + framing::DtxGetTimeoutResult getTimeout(const framing::Xid& xid); - framing::Dtx010PrepareResult prepare(const framing::Xid010& xid); + framing::DtxPrepareResult prepare(const framing::Xid& xid); - framing::Dtx010RecoverResult recover(); + framing::DtxRecoverResult recover(); - framing::Dtx010RollbackResult rollback(const framing::Xid010& xid); + framing::DtxRollbackResult rollback(const framing::Xid& xid); - void setTimeout(const framing::Xid010& xid, uint32_t timeout); + void setTimeout(const framing::Xid& xid, uint32_t timeout); }; ExchangeHandlerImpl exchangeImpl; diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 3baa3a89a7..0b1e744e25 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -77,12 +77,6 @@ void SessionHandler::handleIn(AMQFrame& f) { } } -void SessionHandler::destroy() { - ignoring=true; // Ignore trailing frames sent by client. - session->detach(); - session.reset(); -} - void SessionHandler::handleOut(AMQFrame& f) { channel.handle(f); // Send it. if (session->sent(f)) @@ -160,12 +154,12 @@ void SessionHandler::detached(const std::string& name, uint8_t code) void SessionHandler::requestTimeout(uint32_t t) { session->setTimeout(t); - //proxy.timeout(t); + peerSession.timeout(t); } -void SessionHandler::timeout(uint32_t) +void SessionHandler::timeout(uint32_t t) { - //not sure what we need to do on the server for this... + session->setTimeout(t); } void SessionHandler::commandPoint(const framing::SequenceNumber& id, uint64_t offset) diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h index fa013a1c15..4b031f2951 100644 --- a/cpp/src/qpid/broker/SessionHandler.h +++ b/cpp/src/qpid/broker/SessionHandler.h @@ -70,7 +70,6 @@ class SessionHandler : public framing::AMQP_ServerOperations::Session010Handler, void localSuspend(); void detach() { localSuspend(); } void sendCompletion(); - void destroy(); protected: void handleIn(framing::AMQFrame&); diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 64d62934b9..3c6bed4344 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -25,6 +25,9 @@ #include "SemanticHandler.h" #include "SessionManager.h" #include "SessionHandler.h" +#include "qpid/framing/AMQContentBody.h" +#include "qpid/framing/AMQHeaderBody.h" +#include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/ServerInvoker.h" @@ -182,7 +185,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, SequenceNumber& completed.add(id); if (!invocation.wasHandled()) { - throw NotImplementedException("Not implemented"); + throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { nextOut++;//execution result is now a command, so the counter must be incremented getProxy().getExecution010().result(id, invocation.getResult()); @@ -206,6 +209,14 @@ void SessionState::handleContent(AMQFrame& frame, SequenceNumber& id) } msgBuilder.handle(frame); if (frame.getEof() && frame.getEos()) {//end of frameset + if (frame.getBof()) { + //i.e this is a just a command frame, add a dummy header + AMQFrame header; + header.setBody(AMQHeaderBody()); + header.setBof(false); + header.setEof(false); + msg->getFrames().append(header); + } msg->setPublisher(&getConnection()); semanticState.handle(msg); msgBuilder.end(); @@ -242,13 +253,14 @@ void SessionState::handle(AMQFrame& frame) SequenceNumber commandId; try { //TODO: make command handling more uniform, regardless of whether - //commands carry content. (For now, assume all single frame - //assemblies are non-content bearing and all content-bearing - //assemblies will have more than one frame): - if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod(), commandId); - } else { + //commands carry content. + AMQMethodBody* m = frame.getMethod(); + if (m == 0 || m->isContentBearing()) { handleContent(frame, commandId); + } else if (frame.getBof() && frame.getEof()) { + handleCommand(frame.getMethod(), commandId); + } else { + throw InternalErrorException("Cannot handle multi-frame command segments yet"); } } catch(const SessionException& e) { //TODO: better implementation of new exception handling mechanism @@ -263,7 +275,11 @@ void SessionState::handle(AMQFrame& frame) } else { getProxy().getExecution010().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable()); } - handler->destroy(); + timeout = 0; + //The python client doesn't currently detach on receiving an exception + //so the session state isn't destroyed. This is a temporary workaround + //until that is addressed + adapter.destroyExclusiveQueues(); } } |
