diff options
| author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
| commit | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch) | |
| tree | 13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker | |
| parent | a9232d5a02a19f093f212cb0b76772a20b45cb1b (diff) | |
| download | qpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz | |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerAdapter.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerMessage.cpp | 61 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerMessage.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerMessageBase.h | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerMessageMessage.cpp | 29 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerMessageMessage.h | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConsumeAdapter.cpp | 37 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryAdapter.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryToken.h (renamed from cpp/src/qpid/broker/ConsumeAdapter.h) | 24 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/GetAdapter.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/GetAdapter.h | 47 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 65 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.cpp | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticHandler.h | 15 |
16 files changed, 246 insertions, 281 deletions
diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 376108193a..8edf448bc4 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -20,8 +20,7 @@ #include "BrokerAdapter.h" #include "BrokerChannel.h" #include "Connection.h" -#include "ConsumeAdapter.h" -#include "GetAdapter.h" +#include "DeliveryToken.h" #include "qpid/framing/AMQMethodBody.h" #include "qpid/Exception.h" @@ -325,8 +324,8 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //need to generate name here, so we have it for the adapter (it is //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); - channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, newTag, connection.getFrameMax())), - newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); + DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag)); + channel.consume(token, newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); if(!nowait) client.consumeOk(newTag); @@ -357,8 +356,8 @@ void BrokerAdapter::BasicHandlerImpl::publish(uint16_t /*ticket*/, void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = getQueue(queueName); - GetAdapter out(adapter, queue, "", connection.getFrameMax()); - if(!channel.get(out, queue, !noAck)){ + DeliveryToken::shared_ptr token(BasicMessage::createGetToken(queue)); + if(!channel.get(token, queue, !noAck)){ string clusterId;//not used, part of an imatix hack client.getEmpty(clusterId); diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index a598717c5d..c50fbd5559 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -49,9 +49,10 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) : +Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) : id(_id), connection(con), + out(_out), currentDeliveryTag(1), prefetchSize(0), prefetchCount(0), @@ -76,7 +77,7 @@ bool Channel::exists(const string& consumerTag){ // TODO aconway 2007-02-12: Why is connection token passed in instead // of using the channel's parent connection? -void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, +void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) @@ -84,7 +85,7 @@ void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, if(tagInOut.empty()) tagInOut = tagGenerator.generate(); std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks)); + new ConsumerImpl(this, token, tagInOut, queue, connection, acks)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -97,7 +98,8 @@ void Channel::cancel(const string& tag){ consumers.erase(i); } -void Channel::close(){ +void Channel::close() +{ opened = false; consumers.clear(); if (dtxBuffer.get()) { @@ -106,11 +108,15 @@ void Channel::close(){ recover(true); } -void Channel::startTx(){ +void Channel::startTx() +{ txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void Channel::commit(){ +void Channel::commit() +{ + if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { @@ -118,16 +124,21 @@ void Channel::commit(){ } } -void Channel::rollback(){ +void Channel::rollback() +{ + if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); + txBuffer->rollback(); accumulatedAck.clear(); } -void Channel::selectDtx(){ +void Channel::selectDtx() +{ dtxSelected = true; } -void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ +void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join) +{ if (!dtxSelected) { throw ConnectionException(503, "Channel has not been selected for use with dtx"); } @@ -140,7 +151,8 @@ void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ } } -void Channel::endDtx(const std::string& xid, bool fail){ +void Channel::endDtx(const std::string& xid, bool fail) +{ if (!dtxBuffer) { throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid); } @@ -160,7 +172,8 @@ void Channel::endDtx(const std::string& xid, bool fail){ dtxBuffer.reset(); } -void Channel::suspendDtx(const std::string& xid){ +void Channel::suspendDtx(const std::string& xid) +{ if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") % dtxBuffer->getXid() % xid); @@ -171,7 +184,8 @@ void Channel::suspendDtx(const std::string& xid){ dtxBuffer->setSuspended(true); } -void Channel::resumeDtx(const std::string& xid){ +void Channel::resumeDtx(const std::string& xid) +{ if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") % dtxBuffer->getXid() % xid); @@ -199,20 +213,22 @@ void Channel::record(const DeliveryRecord& delivery) delivery.addTo(&outstanding); } -bool Channel::checkPrefetch(Message::shared_ptr& msg){ +bool Channel::checkPrefetch(Message::shared_ptr& msg) +{ Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter, +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token, const string& _tag, Queue::shared_ptr _queue, ConnectionToken* const _connection, bool ack - ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection), + ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection), ackExpected(ack), blocked(false) {} -bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ +bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg) +{ if(!connection || connection != msg->getPublisher()){//check for no_local if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){ blocked = true; @@ -220,11 +236,10 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ blocked = false; Mutex::ScopedLock locker(parent->deliveryLock); - uint64_t deliveryTag = adapter->getNextDeliveryTag(); + uint64_t deliveryTag = parent->out.deliver(msg, token); if(ackExpected){ parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); } - adapter->deliver(msg, deliveryTag); return true; } @@ -234,14 +249,15 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { Mutex::ScopedLock locker(parent->deliveryLock); - adapter->deliver(msg, deliveryTag); + parent->out.redeliver(msg, token, deliveryTag); } Channel::ConsumerImpl::~ConsumerImpl() { cancel(); } -void Channel::ConsumerImpl::cancel(){ +void Channel::ConsumerImpl::cancel() +{ if(queue) { queue->cancel(this); if (queue->canAutoDelete()) { @@ -251,27 +267,32 @@ void Channel::ConsumerImpl::cancel(){ } } -void Channel::ConsumerImpl::requestDispatch(){ +void Channel::ConsumerImpl::requestDispatch() +{ if(blocked) queue->requestDispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr msg){ +void Channel::handleInlineTransfer(Message::shared_ptr msg) +{ complete(msg); } -void Channel::handlePublish(Message* _message){ +void Channel::handlePublish(Message* _message) +{ Message::shared_ptr message(_message); messageBuilder.initialise(message); } -void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ +void Channel::handleHeader(AMQHeaderBody::shared_ptr header) +{ messageBuilder.setHeader(header); //at this point, decide based on the size of the message whether we want //to stage it by saving content directly to disk as it arrives } -void Channel::handleContent(AMQContentBody::shared_ptr content){ +void Channel::handleContent(AMQContentBody::shared_ptr content) +{ messageBuilder.addContent(content); } @@ -306,14 +327,16 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { } // Used by Basic -void Channel::ack(uint64_t deliveryTag, bool multiple){ +void Channel::ack(uint64_t deliveryTag, bool multiple) +{ if (multiple) ack(0, deliveryTag); else ack(deliveryTag, deliveryTag); } -void Channel::ack(uint64_t firstTag, uint64_t lastTag){ +void Channel::ack(uint64_t firstTag, uint64_t lastTag) +{ if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... @@ -355,7 +378,8 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ } } -void Channel::recover(bool requeue){ +void Channel::recover(bool requeue) +{ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ @@ -368,12 +392,12 @@ void Channel::recover(bool requeue){ } } -bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){ +bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) +{ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = adapter.getNextDeliveryTag(); - adapter.deliver(msg, myDeliveryTag); + uint64_t myDeliveryTag = out.deliver(msg, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index a70dce0ce8..e9672c96d7 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -33,6 +33,7 @@ #include "Consumer.h" #include "DeliveryAdapter.h" #include "DeliveryRecord.h" +#include "DeliveryToken.h" #include "Deliverable.h" #include "DtxBuffer.h" #include "DtxManager.h" @@ -64,7 +65,7 @@ class Channel : public CompletionHandler class ConsumerImpl : public Consumer { Channel* parent; - std::auto_ptr<DeliveryAdapter> adapter; + DeliveryToken::shared_ptr token; const string tag; Queue::shared_ptr queue; ConnectionToken* const connection; @@ -72,7 +73,7 @@ class Channel : public CompletionHandler bool blocked; public: - ConsumerImpl(Channel* parent, std::auto_ptr<DeliveryAdapter> adapter, + ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); ~ConsumerImpl(); @@ -86,6 +87,7 @@ class Channel : public CompletionHandler framing::ChannelId id; Connection& connection; + DeliveryAdapter& out; uint64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; ConsumerImplMap consumers; @@ -110,7 +112,7 @@ class Channel : public CompletionHandler void checkDtxTimeout(); public: - Channel(Connection& parent, framing::ChannelId id, MessageStore* const store = 0); + Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0); ~Channel(); bool isOpen() const { return opened; } @@ -127,11 +129,11 @@ class Channel : public CompletionHandler /** *@param tagInOut - if empty it is updated with the generated token. */ - void consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, Queue::shared_ptr queue, bool acks, + void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); - bool get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected); + bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); void commit(); diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp index d192b09a63..bf0e37e8e3 100644 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessage.cpp @@ -26,6 +26,7 @@ #include "InMemoryContent.h" #include "LazyLoadedContent.h" #include "MessageStore.h" +#include "BrokerQueue.h" #include "qpid/log/Statement.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/BasicGetOkBody.h" @@ -37,6 +38,30 @@ #include "qpid/framing/ChannelAdapter.h" #include "RecoveryManagerImpl.h" +namespace qpid{ +namespace broker{ + +struct BasicGetToken : DeliveryToken +{ + typedef boost::shared_ptr<BasicGetToken> shared_ptr; + + Queue::shared_ptr queue; + + BasicGetToken(Queue::shared_ptr q) : queue(q) {} +}; + +struct BasicConsumeToken : DeliveryToken +{ + typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; + + const string consumer; + + BasicConsumeToken(const string c) : consumer(c) {} +}; + +} +} + using namespace boost; using namespace qpid::broker; using namespace qpid::framing; @@ -74,6 +99,16 @@ bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } +DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue) +{ + return DeliveryToken::shared_ptr(new BasicGetToken(queue)); +} + +DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer) +{ + return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); +} + void BasicMessage::deliver(ChannelAdapter& channel, const string& consumerTag, uint64_t deliveryTag, uint32_t framesize) @@ -86,23 +121,39 @@ void BasicMessage::deliver(ChannelAdapter& channel, } void BasicMessage::sendGetOk(ChannelAdapter& channel, - const std::string& /*destination*/, uint32_t messageCount, - uint64_t responseTo, + uint64_t /*responseTo*/, uint64_t deliveryTag, uint32_t framesize) { channel.send(make_shared_ptr( new BasicGetOkBody( channel.getVersion(), - responseTo, + //responseTo, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount))); sendContent(channel, framesize); } -void BasicMessage::sendContent( - ChannelAdapter& channel, uint32_t framesize) +void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize) +{ + BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token); + if (consume) { + deliver(channel, consume->consumer, deliveryTag, framesize); + } else { + BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token); + if (get) { + uint64_t request(1/*actual value doesn't affect anything at present*/); + sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize); + } else { + //TODO: + //either need to be able to convert to a message transfer or + //throw error of some kind to allow this to be handled higher up + } + } +} + +void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize) { channel.send(header); Mutex::ScopedLock locker(contentLock); diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h index 2e031d0bb2..e6483b4733 100644 --- a/cpp/src/qpid/broker/BrokerMessage.h +++ b/cpp/src/qpid/broker/BrokerMessage.h @@ -43,6 +43,7 @@ class AMQHeaderBody; namespace broker { class MessageStore; +class Queue; using framing::string; /** @@ -70,13 +71,16 @@ class BasicMessage : public Message { void addContent(framing::AMQContentBody::shared_ptr data); bool isComplete(); + static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue); + static DeliveryToken::shared_ptr createConsumeToken(const string& consumer); + void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); + void deliver(framing::ChannelAdapter&, const string& consumerTag, uint64_t deliveryTag, uint32_t framesize); void sendGetOk(framing::ChannelAdapter& channel, - const std::string& destination, uint32_t messageCount, uint64_t responseTo, uint64_t deliveryTag, diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h index 73af3935a8..d9269fa94f 100644 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/cpp/src/qpid/broker/BrokerMessageBase.h @@ -25,6 +25,7 @@ #include <string> #include <boost/shared_ptr.hpp> #include "Content.h" +#include "DeliveryToken.h" #include "PersistableMessage.h" #include "qpid/framing/amqp_types.h" @@ -91,23 +92,9 @@ class Message : public PersistableMessage{ void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } void redeliver() { redelivered = true; } - /** - * Used to deliver the message from the queue - */ - virtual void deliver(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t deliveryTag, - uint32_t framesize) = 0; - /** - * Used to return a message in response to a get from a queue - */ - virtual void sendGetOk(framing::ChannelAdapter& channel, - const std::string& destination, - uint32_t messageCount, - uint64_t responseTo, - uint64_t deliveryTag, - uint32_t framesize) = 0; - + virtual void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag/*only needed for basic class*/, + DeliveryToken::shared_ptr token, uint32_t framesize) = 0; + virtual bool isComplete() = 0; virtual uint64_t contentSize() const = 0; diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index efa295e44f..8e8eaf23f0 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -34,10 +34,18 @@ #include <algorithm> using namespace std; +using namespace boost; using namespace qpid::framing; namespace qpid { namespace broker { + +struct MessageDeliveryToken : public DeliveryToken +{ + const std::string destination; + + MessageDeliveryToken(const std::string& d) : destination(d) {} +}; MessageMessage::MessageMessage( ConnectionToken* publisher, RequestId requestId_, TransferPtr transfer_ @@ -179,22 +187,13 @@ void MessageMessage::transferMessage( channel.send(make_shared_ptr(new MessageCloseBody(channel.getVersion(), ref->getId()))); } -void MessageMessage::deliver( - framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t /*deliveryTag*/, - uint32_t framesize) + +void MessageMessage::deliver(ChannelAdapter& channel, uint64_t, DeliveryToken::shared_ptr token, uint32_t framesize) { - transferMessage(channel, consumerTag, framesize); + transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize); } -void MessageMessage::sendGetOk( - framing::ChannelAdapter& channel, - const std::string& destination, - uint32_t /*messageCount*/, - uint64_t /*responseTo*/, - uint64_t /*deliveryTag*/, - uint32_t framesize) +void MessageMessage::deliver(ChannelAdapter& channel, const std::string& destination, uint32_t framesize) { transferMessage(channel, destination, framesize); } @@ -321,6 +320,10 @@ MessageMessage::ReferencePtr MessageMessage::getReference() const { return reference; } +DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination) +{ + return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination)); +} }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h index c2d4b7f20b..612f457ae4 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.h +++ b/cpp/src/qpid/broker/BrokerMessageMessage.h @@ -53,17 +53,8 @@ class MessageMessage: public Message{ TransferPtr getTransfer() const { return transfer; } ReferencePtr getReference() const ; - void deliver(framing::ChannelAdapter& channel, - const std::string& consumerTag, - uint64_t deliveryTag, - uint32_t framesize); - - void sendGetOk(framing::ChannelAdapter& channel, - const std::string& destination, - uint32_t messageCount, - uint64_t responseTo, - uint64_t deliveryTag, - uint32_t framesize); + void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); + void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize); bool isComplete(); @@ -81,6 +72,8 @@ class MessageMessage: public Message{ void decodeHeader(framing::Buffer& buffer); void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); + static DeliveryToken::shared_ptr getToken(const std::string& destination); + private: void transferMessage( framing::ChannelAdapter& channel, diff --git a/cpp/src/qpid/broker/ConsumeAdapter.cpp b/cpp/src/qpid/broker/ConsumeAdapter.cpp deleted file mode 100644 index 59b6795a77..0000000000 --- a/cpp/src/qpid/broker/ConsumeAdapter.cpp +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ConsumeAdapter.h" - -using namespace qpid::broker; -using qpid::framing::ChannelAdapter; -using qpid::framing::RequestId; - -ConsumeAdapter::ConsumeAdapter(ChannelAdapter& a, const std::string t, uint32_t f) : adapter(a), tag(t), framesize(f) {} - -RequestId ConsumeAdapter::getNextDeliveryTag() -{ - return adapter.getNextSendRequestId(); -} - -void ConsumeAdapter::deliver(Message::shared_ptr& msg, RequestId deliveryTag) -{ - msg->deliver(adapter, tag, deliveryTag, framesize); -} diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h index 45b103bd68..971f4095cf 100644 --- a/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -22,11 +22,13 @@ #define _DeliveryAdapter_ #include "BrokerMessageBase.h" +#include "DeliveryToken.h" #include "qpid/framing/amqp_types.h" namespace qpid { namespace broker { + typedef framing::RequestId DeliveryId; /** * The intention behind this interface is to separate the generic * handling of some form of message delivery to clients that is @@ -40,8 +42,8 @@ namespace broker { class DeliveryAdapter { public: - virtual framing::RequestId getNextDeliveryTag() = 0; - virtual void deliver(Message::shared_ptr& msg, framing::RequestId tag) = 0; + virtual DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) = 0; + virtual void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) = 0; virtual ~DeliveryAdapter(){} }; diff --git a/cpp/src/qpid/broker/ConsumeAdapter.h b/cpp/src/qpid/broker/DeliveryToken.h index 43cda7753e..8bdf5e6359 100644 --- a/cpp/src/qpid/broker/ConsumeAdapter.h +++ b/cpp/src/qpid/broker/DeliveryToken.h @@ -18,23 +18,25 @@ * under the License. * */ -#ifndef _ConsumeAdapter_ -#define _ConsumeAdapter_ +#ifndef _DeliveryToken_ +#define _DeliveryToken_ -#include "DeliveryAdapter.h" -#include "qpid/framing/ChannelAdapter.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { - class ConsumeAdapter : public DeliveryAdapter + + /** + * A DeliveryToken allows the delivery of a message to be + * associated with whatever mechanism caused it to be + * delivered. (i.e. its a form of Memento). + */ + class DeliveryToken { - framing::ChannelAdapter& adapter; - const std::string tag; - const uint32_t framesize; public: - ConsumeAdapter(framing::ChannelAdapter& adapter, const std::string tag, uint32_t framesize); - framing::RequestId getNextDeliveryTag(); - void deliver(Message::shared_ptr& msg, framing::RequestId tag); + typedef boost::shared_ptr<DeliveryToken> shared_ptr; + + virtual ~DeliveryToken(){} }; }} diff --git a/cpp/src/qpid/broker/GetAdapter.cpp b/cpp/src/qpid/broker/GetAdapter.cpp deleted file mode 100644 index bbffade712..0000000000 --- a/cpp/src/qpid/broker/GetAdapter.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "GetAdapter.h" -#include "qpid/framing/MethodContext.h" - -using namespace qpid::broker; -using qpid::framing::ChannelAdapter; -using qpid::framing::RequestId; -using qpid::framing::MethodContext; - -GetAdapter::GetAdapter(ChannelAdapter& a, Queue::shared_ptr q, const std::string d, uint32_t f) - : adapter(a), queue(q), destination(d), framesize(f) {} - -RequestId GetAdapter::getNextDeliveryTag() -{ - return adapter.getNextSendRequestId(); -} - -void GetAdapter::deliver(Message::shared_ptr& msg, framing::RequestId deliveryTag) -{ - msg->sendGetOk(adapter, destination, queue->getMessageCount(), 1, deliveryTag, framesize); -} diff --git a/cpp/src/qpid/broker/GetAdapter.h b/cpp/src/qpid/broker/GetAdapter.h deleted file mode 100644 index e90619a5f3..0000000000 --- a/cpp/src/qpid/broker/GetAdapter.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _GetAdapter_ -#define _GetAdapter_ - -#include "BrokerQueue.h" -#include "DeliveryAdapter.h" -#include "qpid/framing/ChannelAdapter.h" - -namespace qpid { -namespace broker { - - class GetAdapter : public DeliveryAdapter - { - framing::ChannelAdapter& adapter; - Queue::shared_ptr queue; - const std::string destination; - const uint32_t framesize; - public: - GetAdapter(framing::ChannelAdapter& adapter, Queue::shared_ptr queue, const std::string destination, uint32_t framesize); - ~GetAdapter(){} - framing::RequestId getNextDeliveryTag(); - void deliver(Message::shared_ptr& msg, framing::RequestId tag); - }; - -}} - - -#endif diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index 41dd8cc145..da57439e21 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -21,8 +21,6 @@ #include "BrokerChannel.h" #include "qpid/framing/FramingContent.h" #include "Connection.h" -#include "ConsumeAdapter.h" -#include "GetAdapter.h" #include "Broker.h" #include "BrokerMessageMessage.h" #include "qpid/framing/MessageAppendBody.h" @@ -45,66 +43,44 @@ void MessageHandlerImpl::cancel(const string& destination ) { channel.cancel(destination); - //client.ok(); } void -MessageHandlerImpl::open(const string& reference) +MessageHandlerImpl::open(const string& /*reference*/) { - references.open(reference); - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::append(const framing::MethodContext& context) +MessageHandlerImpl::append(const framing::MethodContext& /*context*/) { - MessageAppendBody::shared_ptr body(boost::shared_polymorphic_downcast<MessageAppendBody>(context.methodBody)); - references.get(body->getReference())->append(body); - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::close(const string& reference) +MessageHandlerImpl::close(const string& /*reference*/) { - Reference::shared_ptr ref = references.get(reference); - //client.ok(); - - // Send any transfer messages to their correct exchanges and okay them - const Reference::Messages& msgs = ref->getMessages(); - for (Reference::Messages::const_iterator m = msgs.begin(); m != msgs.end(); ++m) { - channel.handleInlineTransfer(*m); - client.setResponseTo((*m)->getRequestId()); - client.ok(); - } - ref->close(); + throw ConnectionException(540, "References no longer supported"); } void MessageHandlerImpl::checkpoint(const string& /*reference*/, const string& /*identifier*/ ) { - // Initial implementation (which is conforming) is to do nothing here - // and return offset zero for the resume - //client.ok(); + throw ConnectionException(540, "References no longer supported"); } void -MessageHandlerImpl::resume(const string& reference, +MessageHandlerImpl::resume(const string& /*reference*/, const string& /*identifier*/ ) { - // Initial (null) implementation - // open reference and return 0 offset - references.open(reference); - client.offset(0); + throw ConnectionException(540, "References no longer supported"); } void MessageHandlerImpl::offset(uint64_t /*value*/ ) { - // Shouldn't ever receive this as it is reponse to resume - // which is never sent - // TODO astitcher 2007-02-16 What is the correct exception to throw here? - THROW_QPID_ERROR(INTERNAL_ERROR, "impossible"); + throw ConnectionException(540, "References no longer supported"); } void @@ -120,14 +96,12 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(std::auto_ptr<DeliveryAdapter>(new ConsumeAdapter(adapter, destination, connection.getFrameMax())), - tag, queue, !noAck, exclusive, - noLocal ? &connection : 0, &filter); - //client.ok(); + channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } + void MessageHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, @@ -136,11 +110,11 @@ MessageHandlerImpl::get(uint16_t /*ticket*/, { Queue::shared_ptr queue = getQueue(queueName); - GetAdapter out(adapter, queue, destination, connection.getFrameMax()); - if(channel.get(out, queue, !noAck)) { - client.ok(); + if (channel.get(MessageMessage::getToken(destination), queue, !noAck)){ + //don't send any response... rely on execution completion } else { - client.empty(); + //temporarily disabled: + //client.empty(); } } @@ -167,14 +141,12 @@ MessageHandlerImpl::qos(uint32_t prefetchSize, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - //client.ok(); } void MessageHandlerImpl::recover(bool requeue) { channel.recover(requeue); - //client.ok(); } void @@ -193,11 +165,8 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) if (transfer->getBody().isInline()) { MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer)); channel.handleInlineTransfer(message); - client.ok(); } else { - Reference::shared_ptr ref(references.get(transfer->getBody().getValue())); - MessageMessage::shared_ptr message(new MessageMessage(&connection, requestId, transfer, ref)); - ref->addMessage(message); + throw ConnectionException(540, "References no longer supported"); } } diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 2b1de1bbc0..e9ec698400 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -25,10 +25,11 @@ using namespace qpid::broker; using namespace qpid::framing; +using namespace qpid::sys; SemanticHandler::SemanticHandler(ChannelId id, Connection& c) : connection(c), - channel(c, id, &c.broker.getStore()) + channel(c, *this, id, &c.broker.getStore()) { init(id, connection.getOutput(), connection.getVersion()); adapter = std::auto_ptr<BrokerAdapter>(new BrokerAdapter(channel, connection, connection.broker, *this)); @@ -75,10 +76,24 @@ void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQ } } -void SemanticHandler::complete(uint32_t mark, uint16_t /*range- not decoded correctly yet*/) +void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) { - //just record it for now (will eventually need to use it to ack messages): - outgoing.lwm = SequenceNumber(mark); + //record: + SequenceNumber mark(cumulative); + if (outgoing.lwm < mark) { + outgoing.lwm = mark; + //ack messages: + channel.ack(mark.getValue(), true); + //std::cout << "[" << this << "] acknowledged: " << mark << std::endl; + } + if (range.size() % 2) { //must be even number + throw ConnectionException(530, "Received odd number of elements in ranged mark"); + } else { + //TODO: need to keep a record of the full range previously acked + for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) { + channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + } + } } void SemanticHandler::flush() @@ -86,8 +101,8 @@ void SemanticHandler::flush() //flush doubles as a sync to begin with - send an execution.complete incoming.lwm = incoming.hwm; if (isOpen()) { - /*use dummy value for range which is not yet encoded correctly*/ - send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), 0))); + Mutex::ScopedLock l(outLock); + ChannelAdapter::send(make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); } } @@ -140,3 +155,28 @@ void SemanticHandler::handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartb channel.handleHeartbeat(body); } +DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token) +{ + Mutex::ScopedLock l(outLock); + SequenceNumber copy(outgoing.hwm); + ++copy; + msg->deliver(*this, copy.getValue(), token, connection.getFrameMax()); + //std::cout << "[" << this << "] delivered: " << outgoing.hwm.getValue() << std::endl; + return outgoing.hwm.getValue(); +} + +void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag) +{ + msg->deliver(*this, tag, token, connection.getFrameMax()); +} + +RequestId SemanticHandler::send(shared_ptr<AMQBody> body, Correlator::Action action) +{ + Mutex::ScopedLock l(outLock); + uint8_t type(body->type()); + if (type == REQUEST_BODY || type == RESPONSE_BODY || type == METHOD_BODY) { + ++outgoing.hwm; + //std::cout << "[" << this << "] allocated: " << outgoing.hwm.getValue() << " to " << *body << std::endl; + } + return ChannelAdapter::send(body, action); +} diff --git a/cpp/src/qpid/broker/SemanticHandler.h b/cpp/src/qpid/broker/SemanticHandler.h index a57559d043..b863b3486e 100644 --- a/cpp/src/qpid/broker/SemanticHandler.h +++ b/cpp/src/qpid/broker/SemanticHandler.h @@ -24,6 +24,7 @@ #include <memory> #include "BrokerChannel.h" #include "Connection.h" +#include "DeliveryAdapter.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/AMQP_ServerOperations.h" #include "qpid/framing/FrameHandler.h" @@ -36,6 +37,7 @@ class BrokerAdapter; class framing::ChannelAdapter; class SemanticHandler : private framing::ChannelAdapter, + private DeliveryAdapter, public framing::FrameHandler, public framing::AMQP_ServerOperations::ExecutionHandler { @@ -44,6 +46,7 @@ class SemanticHandler : private framing::ChannelAdapter, std::auto_ptr<BrokerAdapter> adapter; framing::Window incoming; framing::Window outgoing; + sys::Mutex outLock; void handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> method, const qpid::framing::MethodContext& context); @@ -55,12 +58,22 @@ class SemanticHandler : private framing::ChannelAdapter, void handleHeader(boost::shared_ptr<qpid::framing::AMQHeaderBody>); void handleContent(boost::shared_ptr<qpid::framing::AMQContentBody>); void handleHeartbeat(boost::shared_ptr<qpid::framing::AMQHeartbeatBody>); + + framing::RequestId send(shared_ptr<framing::AMQBody> body, framing::Correlator::Action action=framing::Correlator::Action()); + + + //delivery adapter methods: + DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token); + void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag); + public: SemanticHandler(framing::ChannelId id, Connection& c); + + //frame handler: void handle(framing::AMQFrame& frame); //execution class method handlers: - void complete(uint32_t cumulativeExecutionMark, uint16_t); + void complete(uint32_t cumulativeExecutionMark, framing::SequenceNumberSet range); void flush(); }; |
