summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerMessage.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commit80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch)
tree13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker/BrokerMessage.cpp
parenta9232d5a02a19f093f212cb0b76772a20b45cb1b (diff)
downloadqpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessage.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp61
1 files changed, 56 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index d192b09a63..bf0e37e8e3 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -26,6 +26,7 @@
#include "InMemoryContent.h"
#include "LazyLoadedContent.h"
#include "MessageStore.h"
+#include "BrokerQueue.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/BasicGetOkBody.h"
@@ -37,6 +38,30 @@
#include "qpid/framing/ChannelAdapter.h"
#include "RecoveryManagerImpl.h"
+namespace qpid{
+namespace broker{
+
+struct BasicGetToken : DeliveryToken
+{
+ typedef boost::shared_ptr<BasicGetToken> shared_ptr;
+
+ Queue::shared_ptr queue;
+
+ BasicGetToken(Queue::shared_ptr q) : queue(q) {}
+};
+
+struct BasicConsumeToken : DeliveryToken
+{
+ typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
+
+ const string consumer;
+
+ BasicConsumeToken(const string c) : consumer(c) {}
+};
+
+}
+}
+
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
@@ -74,6 +99,16 @@ bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
+DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
+{
+ return DeliveryToken::shared_ptr(new BasicGetToken(queue));
+}
+
+DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer)
+{
+ return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
+}
+
void BasicMessage::deliver(ChannelAdapter& channel,
const string& consumerTag, uint64_t deliveryTag,
uint32_t framesize)
@@ -86,23 +121,39 @@ void BasicMessage::deliver(ChannelAdapter& channel,
}
void BasicMessage::sendGetOk(ChannelAdapter& channel,
- const std::string& /*destination*/,
uint32_t messageCount,
- uint64_t responseTo,
+ uint64_t /*responseTo*/,
uint64_t deliveryTag,
uint32_t framesize)
{
channel.send(make_shared_ptr(
new BasicGetOkBody(
channel.getVersion(),
- responseTo,
+ //responseTo,
deliveryTag, getRedelivered(), getExchange(),
getRoutingKey(), messageCount)));
sendContent(channel, framesize);
}
-void BasicMessage::sendContent(
- ChannelAdapter& channel, uint32_t framesize)
+void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize)
+{
+ BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
+ if (consume) {
+ deliver(channel, consume->consumer, deliveryTag, framesize);
+ } else {
+ BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
+ if (get) {
+ uint64_t request(1/*actual value doesn't affect anything at present*/);
+ sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize);
+ } else {
+ //TODO:
+ //either need to be able to convert to a message transfer or
+ //throw error of some kind to allow this to be handled higher up
+ }
+ }
+}
+
+void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
{
channel.send(header);
Mutex::ScopedLock locker(contentLock);