diff options
| author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
| commit | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch) | |
| tree | 13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker/BrokerMessage.cpp | |
| parent | a9232d5a02a19f093f212cb0b76772a20b45cb1b (diff) | |
| download | qpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz | |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessage.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerMessage.cpp | 61 |
1 files changed, 56 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp index d192b09a63..bf0e37e8e3 100644 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessage.cpp @@ -26,6 +26,7 @@ #include "InMemoryContent.h" #include "LazyLoadedContent.h" #include "MessageStore.h" +#include "BrokerQueue.h" #include "qpid/log/Statement.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/BasicGetOkBody.h" @@ -37,6 +38,30 @@ #include "qpid/framing/ChannelAdapter.h" #include "RecoveryManagerImpl.h" +namespace qpid{ +namespace broker{ + +struct BasicGetToken : DeliveryToken +{ + typedef boost::shared_ptr<BasicGetToken> shared_ptr; + + Queue::shared_ptr queue; + + BasicGetToken(Queue::shared_ptr q) : queue(q) {} +}; + +struct BasicConsumeToken : DeliveryToken +{ + typedef boost::shared_ptr<BasicConsumeToken> shared_ptr; + + const string consumer; + + BasicConsumeToken(const string c) : consumer(c) {} +}; + +} +} + using namespace boost; using namespace qpid::broker; using namespace qpid::framing; @@ -74,6 +99,16 @@ bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } +DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue) +{ + return DeliveryToken::shared_ptr(new BasicGetToken(queue)); +} + +DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer) +{ + return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer)); +} + void BasicMessage::deliver(ChannelAdapter& channel, const string& consumerTag, uint64_t deliveryTag, uint32_t framesize) @@ -86,23 +121,39 @@ void BasicMessage::deliver(ChannelAdapter& channel, } void BasicMessage::sendGetOk(ChannelAdapter& channel, - const std::string& /*destination*/, uint32_t messageCount, - uint64_t responseTo, + uint64_t /*responseTo*/, uint64_t deliveryTag, uint32_t framesize) { channel.send(make_shared_ptr( new BasicGetOkBody( channel.getVersion(), - responseTo, + //responseTo, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount))); sendContent(channel, framesize); } -void BasicMessage::sendContent( - ChannelAdapter& channel, uint32_t framesize) +void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize) +{ + BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token); + if (consume) { + deliver(channel, consume->consumer, deliveryTag, framesize); + } else { + BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token); + if (get) { + uint64_t request(1/*actual value doesn't affect anything at present*/); + sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize); + } else { + //TODO: + //either need to be able to convert to a message transfer or + //throw error of some kind to allow this to be handled higher up + } + } +} + +void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize) { channel.send(header); Mutex::ScopedLock locker(contentLock); |
