diff options
Diffstat (limited to 'cpp/src/qpid/broker')
21 files changed, 150 insertions, 104 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 9bf148bcf0..376108193a 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -55,8 +55,7 @@ ProtocolVersion BrokerAdapter::getVersion() const { void BrokerAdapter::ChannelHandlerImpl::open(const string& /*outOfBand*/){ channel.open(); - // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - client.openOk(std::string()/* ID */); + client.openOk(); } void BrokerAdapter::ChannelHandlerImpl::flow(bool active){ @@ -80,41 +79,63 @@ void BrokerAdapter::ChannelHandlerImpl::closeOk(){} void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool durable, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& args){ + const string& alternateExchange, + bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){ + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = broker.getExchanges().get(alternateExchange); + } if(passive){ - if(!broker.getExchanges().get(exchange)) { - throw ChannelException(404, "Exchange not found: " + exchange); - } + Exchange::shared_ptr actual(broker.getExchanges().get(exchange)); + checkType(actual, type); + checkAlternate(actual, alternate); }else{ try{ std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args); if (response.second) { - if (durable) broker.getStore().create(*response.first); - } else if (response.first->getType() != type) { - throw ConnectionException( - 530, - "Exchange already declared to be of type " - + response.first->getType() + ", requested " + type); + if (durable) { + broker.getStore().create(*response.first); + } + if (alternate) { + response.first->setAlternate(alternate); + alternate->incAlternateUsers(); + } + } else { + checkType(response.first, type); + checkAlternate(response.first, alternate); } }catch(UnknownExchangeTypeException& e){ throw ConnectionException( 503, "Exchange type not implemented: " + type); } } - if(!nowait){ - client.declareOk(); +} + +void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type) +{ + if (!type.empty() && exchange->getType() != type) { + throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type); + } +} + +void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate) +{ + if (alternate && alternate != exchange->getAlternate()) { + throw ConnectionException(530, "Exchange declared with alternate-exchange " + + exchange->getAlternate()->getName() + ", requested " + + alternate->getName()); } + } -void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, - const string& name, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){ //TODO: implement unused Exchange::shared_ptr exchange(broker.getExchanges().get(name)); + if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange."); if (exchange->isDurable()) broker.getStore().destroy(*exchange); + if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); broker.getExchanges().destroy(name); - if(!nowait) client.deleteOk(); } void BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name) @@ -159,12 +180,17 @@ void BrokerAdapter::BindingHandlerImpl::query(u_int16_t /*ticket*/, } } -void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, +void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& name, const string& alternateExchange, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = broker.getExchanges().get(alternateExchange); + } Queue::shared_ptr queue; if (passive && !name.empty()) { queue = getQueue(name); + //TODO: check alternate-exchange is as expected } else { std::pair<Queue::shared_ptr, bool> queue_created = broker.getQueues().declare( @@ -175,6 +201,11 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& assert(queue); if (queue_created.second) { // This is a new queue channel.setDefaultQueue(queue); + if (alternate) { + queue->setAlternateExchange(alternate); + alternate->incAlternateUsers(); + } + //apply settings & create persistent record if required queue_created.first->create(arguments); @@ -201,7 +232,7 @@ void BrokerAdapter::QueueHandlerImpl::declare(uint16_t /*ticket*/, const string& } void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, + const string& exchangeName, const string& routingKey, const FieldTable& arguments){ Queue::shared_ptr queue = getQueue(queueName); @@ -214,7 +245,6 @@ void BrokerAdapter::QueueHandlerImpl::bind(uint16_t /*ticket*/, const string& qu broker.getStore().bind(*exchange, *queue, routingKey, arguments); } } - if(!nowait) client.bindOk(); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -238,7 +268,6 @@ BrokerAdapter::QueueHandlerImpl::unbind(uint16_t /*ticket*/, broker.getStore().unbind(*exchange, *queue, routingKey, arguments); } - client.unbindOk(); } void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queueName, bool nowait){ @@ -280,7 +309,6 @@ void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefet //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.qosOk(); } void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, @@ -314,12 +342,12 @@ void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag, bool now void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, const string& exchangeName, const string& routingKey, - bool mandatory, bool immediate) + bool rejectUnroutable, bool immediate) { Exchange::shared_ptr exchange = exchangeName.empty() ? broker.getExchanges().getDefault() : broker.getExchanges().get(exchangeName); if(exchange){ - BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, mandatory, immediate); + BasicMessage* msg = new BasicMessage(&connection, exchangeName, routingKey, rejectUnroutable, immediate); channel.handlePublish(msg); }else{ throw ChannelException( @@ -351,19 +379,16 @@ void BrokerAdapter::BasicHandlerImpl::recover(bool requeue) void BrokerAdapter::TxHandlerImpl::select() { channel.startTx(); - client.selectOk(); } void BrokerAdapter::TxHandlerImpl::commit() { channel.commit(); - client.commitOk(); } void BrokerAdapter::TxHandlerImpl::rollback() { channel.rollback(); - client.rollbackOk(); channel.recover(false); } @@ -372,28 +397,6 @@ void BrokerAdapter::ChannelHandlerImpl::ok() //no specific action required, generic response handling should be sufficient } - -// -// Message class method handlers -// -void BrokerAdapter::ChannelHandlerImpl::ping() -{ - client.ok(); - client.pong(); -} - - -void -BrokerAdapter::ChannelHandlerImpl::pong() -{ - client.ok(); -} - -void BrokerAdapter::ChannelHandlerImpl::resume(const string& /*channel*/) -{ - assert(0); // FIXME aconway 2007-01-04: 0-9 feature -} - void BrokerAdapter::setResponseTo(RequestId r) { basicHandler.client.setResponseTo(r); diff --git a/cpp/src/qpid/broker/BrokerAdapter.h b/cpp/src/qpid/broker/BrokerAdapter.h index a7e27a0ee6..4ae8346580 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.h +++ b/cpp/src/qpid/broker/BrokerAdapter.h @@ -72,10 +72,9 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations throw ConnectionException(540, "File class not implemented"); } StreamHandler* getStreamHandler() { throw ConnectionException(540, "Stream class not implemented"); } - DtxHandler* getDtxHandler() { - throw ConnectionException(540, "Dtx class not implemented"); } TunnelHandler* getTunnelHandler() { throw ConnectionException(540, "Tunnel class not implemented"); } + SessionHandler* getSessionHandler() { throw ConnectionException(503, "Session class not implemented yet"); } DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; } DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; } @@ -117,13 +116,16 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} void declare(uint16_t ticket, - const std::string& exchange, const std::string& type, - bool passive, bool durable, bool autoDelete, - bool internal, bool nowait, + const std::string& exchange, const std::string& type, + const std::string& alternateExchange, + bool passive, bool durable, bool autoDelete, const qpid::framing::FieldTable& arguments); void delete_(uint16_t ticket, - const std::string& exchange, bool ifUnused, bool nowait); + const std::string& exchange, bool ifUnused); void query(u_int16_t ticket, const string& name); + private: + void checkType(Exchange::shared_ptr exchange, const std::string& type); + void checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate); }; class BindingHandlerImpl : @@ -147,13 +149,14 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations public: QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} - void declare(uint16_t ticket, const std::string& queue, + void declare(uint16_t ticket, const std::string& queue, + const std::string& alternateExchange, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments); void bind(uint16_t ticket, const std::string& queue, const std::string& exchange, const std::string& routingKey, - bool nowait, const qpid::framing::FieldTable& arguments); + const qpid::framing::FieldTable& arguments); void unbind(uint16_t ticket, const std::string& queue, const std::string& exchange, @@ -186,7 +189,7 @@ class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations bool nowait); void publish(uint16_t ticket, const std::string& exchange, const std::string& routingKey, - bool mandatory, bool immediate); + bool rejectUnroutable, bool immediate); void get(uint16_t ticket, const std::string& queue, bool noAck); void ack(uint64_t deliveryTag, bool multiple); diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 9b6bdf5a2b..a598717c5d 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -280,22 +280,31 @@ void Channel::handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) { } void Channel::complete(Message::shared_ptr msg) { - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - assert(exchange.get()); if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); - exchange->route(*deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); + route(msg, *deliverable); txBuffer->enlist(op); } else { DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); + route(msg, deliverable); } } +void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { + Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); + assert(exchange.get()); + exchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + if (!strategy.delivered) { + //TODO:if reject-unroutable, then reject + //else route to alternate exchange + if (exchange->getAlternate()) { + exchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + } + } + +} + // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ if (multiple) diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index a2b6bd3ef9..a70dce0ce8 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -33,6 +33,7 @@ #include "Consumer.h" #include "DeliveryAdapter.h" #include "DeliveryRecord.h" +#include "Deliverable.h" #include "DtxBuffer.h" #include "DtxManager.h" #include "MessageBuilder.h" @@ -102,7 +103,7 @@ class Channel : public CompletionHandler MessageBuilder messageBuilder;//builder for in-progress message bool opened; bool flowActive; - + void route(Message::shared_ptr msg, Deliverable& strategy); void complete(Message::shared_ptr msg);// completion handler for MessageBuilder void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); diff --git a/cpp/src/qpid/broker/BrokerExchange.h b/cpp/src/qpid/broker/BrokerExchange.h index 968775cfe5..91c295e1b7 100644 --- a/cpp/src/qpid/broker/BrokerExchange.h +++ b/cpp/src/qpid/broker/BrokerExchange.h @@ -48,7 +48,7 @@ namespace qpid { explicit Exchange(const string& _name) : name(_name), durable(false), persistenceId(0){} Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args) - : name(_name), durable(_durable), args(_args), persistenceId(0){} + : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0){} virtual ~Exchange(){} string getName() const { return name; } @@ -59,6 +59,7 @@ namespace qpid { void setAlternate(Exchange::shared_ptr _alternate) { alternate = _alternate; } void incAlternateUsers() { alternateUsers++; } void decAlternateUsers() { alternateUsers--; } + bool inUseAsAlternate() { return alternateUsers > 0; } virtual string getType() const = 0; virtual bool bind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args) = 0; diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h index a6b039cd4d..73af3935a8 100644 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/cpp/src/qpid/broker/BrokerMessageBase.h @@ -165,6 +165,8 @@ class Message : public PersistableMessage{ */ virtual void releaseContent(MessageStore* /*store*/) {}; + bool isImmediate() const { return immediate; } + private: const ConnectionToken* publisher; std::string exchange; diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index 01f8250b84..efa295e44f 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -43,7 +43,7 @@ MessageMessage::MessageMessage( ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), - transfer_->getMandatory(), + transfer_->getRejectUnroutable(), transfer_->getImmediate(), transfer_), requestId(requestId_), @@ -57,7 +57,7 @@ MessageMessage::MessageMessage( ReferencePtr reference_ ) : Message(publisher, transfer_->getDestination(), transfer_->getRoutingKey(), - transfer_->getMandatory(), + transfer_->getRejectUnroutable(), transfer_->getImmediate(), transfer_), requestId(requestId_), @@ -113,6 +113,7 @@ void MessageMessage::transferMessage( transfer->getTicket(), consumerTag, getRedelivered(), + transfer->getRejectUnroutable(), transfer->getImmediate(), transfer->getTtl(), transfer->getPriority(), @@ -126,13 +127,14 @@ void MessageMessage::transferMessage( transfer->getReplyTo(), transfer->getContentType(), transfer->getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ transfer->getUserId(), transfer->getAppId(), transfer->getTransactionId(), transfer->getSecurityToken(), transfer->getApplicationHeaders(), - body, - transfer->getMandatory()))); + body))); } else { // Thing to do here is to construct a simple reference message then deliver that instead // fragmentation will be taken care of in the delivery if necessary; @@ -143,6 +145,7 @@ void MessageMessage::transferMessage( transfer->getTicket(), consumerTag, getRedelivered(), + transfer->getRejectUnroutable(), transfer->getImmediate(), transfer->getTtl(), transfer->getPriority(), @@ -156,13 +159,14 @@ void MessageMessage::transferMessage( transfer->getReplyTo(), transfer->getContentType(), transfer->getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ transfer->getUserId(), transfer->getAppId(), transfer->getTransactionId(), transfer->getSecurityToken(), transfer->getApplicationHeaders(), - framing::Content(REFERENCE, refname), - transfer->getMandatory())); + framing::Content(REFERENCE, refname))); ReferencePtr newRef(new Reference(refname)); Reference::AppendPtr newAppend(new MessageAppendBody(channel.getVersion(), refname, content)); newRef->append(newAppend); @@ -288,6 +292,7 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version transfer->getTicket(), destination, getRedelivered(), + transfer->getRejectUnroutable(), transfer->getImmediate(), transfer->getTtl(), transfer->getPriority(), @@ -301,13 +306,14 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version transfer->getReplyTo(), transfer->getContentType(), transfer->getContentEncoding(), + 0, /*content-length*/ + string(), /*type*/ transfer->getUserId(), transfer->getAppId(), transfer->getTransactionId(), transfer->getSecurityToken(), transfer->getApplicationHeaders(), - body, - transfer->getMandatory()); + body); } diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index cf6beff375..f8bffa01a3 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -56,8 +56,15 @@ Queue::Queue(const string& _name, bool _autodelete, Queue::~Queue(){} void Queue::deliver(Message::shared_ptr& msg){ - enqueue(0, msg); - process(msg); + if (msg->isImmediate() && getConsumerCount() == 0) { + if (alternateExchange) { + DeliverableMessage deliverable(msg); + alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + } + } else { + enqueue(0, msg); + process(msg); + } } void Queue::recover(Message::shared_ptr& msg){ @@ -255,6 +262,7 @@ void Queue::destroy() &(msg.getMessage().getApplicationHeaders())); pop(); } + alternateExchange->decAlternateUsers(); } if (store) { @@ -318,3 +326,8 @@ void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) { alternateExchange = exchange; } + +boost::shared_ptr<Exchange> Queue::getAlternateExchange() +{ + return alternateExchange; +} diff --git a/cpp/src/qpid/broker/BrokerQueue.h b/cpp/src/qpid/broker/BrokerQueue.h index 0ed368e404..f82a7dac55 100644 --- a/cpp/src/qpid/broker/BrokerQueue.h +++ b/cpp/src/qpid/broker/BrokerQueue.h @@ -155,7 +155,7 @@ namespace qpid { const QueuePolicy* const getPolicy(); void setAlternateExchange(boost::shared_ptr<Exchange> exchange); - + boost::shared_ptr<Exchange> getAlternateExchange(); //PersistableQueue support: uint64_t getPersistenceId() const; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 7a987f28d2..5b22167323 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -87,6 +87,7 @@ void Connection::closed(){ broker.getQueues().destroy(q->getName()); exclusiveQueues.erase(exclusiveQueues.begin()); q->unbind(broker.getExchanges(), q); + q->destroy(); } } catch(std::exception& e) { QPID_LOG(error, " Unhandled exception while closing session: " << diff --git a/cpp/src/qpid/broker/ConnectionAdapter.cpp b/cpp/src/qpid/broker/ConnectionAdapter.cpp index bb2a66bfdb..65933660f1 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.cpp +++ b/cpp/src/qpid/broker/ConnectionAdapter.cpp @@ -106,15 +106,14 @@ void Handler::open(const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/) { string knownhosts; - client.openOk( - knownhosts);//GRS, context.getRequestId()); + client.openOk(knownhosts); } void Handler::close(uint16_t /*replyCode*/, const string& /*replyText*/, uint16_t /*classId*/, uint16_t /*methodId*/) { - client.closeOk();//GRS context.getRequestId()); + client.closeOk(); connection.getOutput().close(); } diff --git a/cpp/src/qpid/broker/ConnectionAdapter.h b/cpp/src/qpid/broker/ConnectionAdapter.h index b624102cd2..6890b014a4 100644 --- a/cpp/src/qpid/broker/ConnectionAdapter.h +++ b/cpp/src/qpid/broker/ConnectionAdapter.h @@ -67,11 +67,11 @@ public: AccessHandler* getAccessHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } FileHandler* getFileHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } StreamHandler* getStreamHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } - DtxHandler* getDtxHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } TunnelHandler* getTunnelHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } DtxCoordinationHandler* getDtxCoordinationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } DtxDemarcationHandler* getDtxDemarcationHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } ExecutionHandler* getExecutionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } + SessionHandler* getSessionHandler() { throw ConnectionException(503, "Class can't be accessed over channel 0"); } framing::ProtocolVersion getVersion() const; }; diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h index 1570917849..cd1dbaa85d 100644 --- a/cpp/src/qpid/broker/Deliverable.h +++ b/cpp/src/qpid/broker/Deliverable.h @@ -27,6 +27,8 @@ namespace qpid { namespace broker { class Deliverable{ public: + bool delivered; + Deliverable() : delivered(false) {} virtual void deliverTo(Queue::shared_ptr& queue) = 0; virtual ~Deliverable(){} }; diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp index a713f306a8..9a3752d71c 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.cpp +++ b/cpp/src/qpid/broker/DeliverableMessage.cpp @@ -29,9 +29,11 @@ DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg) void DeliverableMessage::deliverTo(Queue::shared_ptr& queue) { queue->deliver(msg); + delivered = true; } Message& DeliverableMessage::getMessage() { return *msg; } + diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index d1f925e40c..72d3888e37 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -52,7 +52,6 @@ const int XA_OK(8); void DtxHandlerImpl::select() { channel.selectDtx(); - dClient.selectOk(); } void DtxHandlerImpl::end(u_int16_t /*ticket*/, @@ -140,7 +139,7 @@ void DtxHandlerImpl::rollback(u_int16_t /*ticket*/, void DtxHandlerImpl::recover(u_int16_t /*ticket*/, bool /*startscan*/, - u_int32_t /*endscan*/ ) + bool /*endscan*/ ) { //TODO: what do startscan and endscan actually mean? @@ -193,7 +192,6 @@ void DtxHandlerImpl::setTimeout(u_int16_t /*ticket*/, u_int32_t timeout) { broker.getDtxManager().setTimeout(xid, timeout); - cClient.setTimeoutOk(); } void DtxHandlerImpl::setResponseTo(framing::RequestId r) diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.h b/cpp/src/qpid/broker/DtxHandlerImpl.h index e18d3c153d..6139b95bd6 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.h +++ b/cpp/src/qpid/broker/DtxHandlerImpl.h @@ -48,7 +48,7 @@ public: void prepare(u_int16_t ticket, const std::string& xid); - void recover(u_int16_t ticket, bool startscan, u_int32_t endscan); + void recover(u_int16_t ticket, bool startscan, bool endscan); void rollback(u_int16_t ticket, const std::string& xid); diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 732d45dc44..edc9a5b63b 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -72,8 +72,9 @@ void ExchangeRegistry::destroy(const string& name){ Exchange::shared_ptr ExchangeRegistry::get(const string& name){ RWlock::ScopedRlock locker(lock); ExchangeMap::iterator i = exchanges.find(name); - if (i == exchanges.end()) - throw ChannelException(404, "Exchange not found:" + name); + if (i == exchanges.end()) { + throw ChannelException(404, "Exchange not found: " + name); + } return i->second; } diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index de32368158..41dd8cc145 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -45,14 +45,14 @@ void MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - client.ok(); + //client.ok(); } void MessageHandlerImpl::open(const string& reference) { references.open(reference); - client.ok(); + //client.ok(); } void @@ -60,14 +60,14 @@ MessageHandlerImpl::append(const framing::MethodContext& context) { MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); references.get(body->getReference())->append(body); - client.ok(); + //client.ok(); } void MessageHandlerImpl::close(const string& reference) { - Reference::shared_ptr ref = references.get(reference); - client.ok(); + Reference::shared_ptr ref = references.get(reference); + //client.ok(); // Send any transfer messages to their correct exchanges and okay them const Reference::Messages& msgs = ref->getMessages(); @@ -85,7 +85,7 @@ MessageHandlerImpl::checkpoint(const string& /*reference*/, { // Initial implementation (which is conforming) is to do nothing here // and return offset zero for the resume - client.ok(); + //client.ok(); } void @@ -123,7 +123,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok(); + //client.ok(); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -137,10 +137,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, Queue::shared_ptr queue = getQueue(queueName); GetAdapter out(adapter, queue, destination, connection.getFrameMax()); - if(channel.get(out, queue, !noAck)) + if(channel.get(out, queue, !noAck)) { client.ok(); - else + } else { client.empty(); + } } void @@ -166,14 +167,14 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok(); + //client.ok(); } void MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - client.ok(); + //client.ok(); } void @@ -192,7 +193,7 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) if (transfer->getBody().isInline()) { MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - client.ok(); + client.ok(); } else { Reference::shared_ptr ref(references.get(transfer->getBody().getValue())); MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref)); diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index f616ec2db8..2b1de1bbc0 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -75,7 +75,7 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ } } -void SemanticHandler::complete(u_int32_t mark) +void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/) { //just record it for now (will eventually need to use it to ack messages): outgoing.lwm = SequenceNumber(mark); @@ -85,7 +85,10 @@ void SemanticHandler::flush() { //flush doubles as a sync to begin with - send an execution.complete incoming.lwm = incoming.hwm; - send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue()))); + if (isOpen()) { + /*use dummy value for range which is not yet encoded correctly*/ + send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0))); + } } void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index 6003bbec0c..a57559d043 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -60,7 +60,7 @@ public: void handle(framing::AMQFrame& frame); //execution class method handlers: - void complete(u_int32_t cumulativeExecutionMark); + void complete(uint32_t cumulativeExecutionMark, uint16_t); void flush(); }; diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 6e03f37dcd..db02673b1f 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -44,6 +44,7 @@ void TxPublish::rollback() throw(){ void TxPublish::deliverTo(Queue::shared_ptr& queue){ queues.push_back(queue); + delivered = true; } TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg) |
