summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessage.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp24
1 files changed, 14 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
index bf0e37e8e3..244bee4a92 100644
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ b/cpp/src/qpid/broker/BrokerMessage.cpp
@@ -110,45 +110,43 @@ DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consume
}
void BasicMessage::deliver(ChannelAdapter& channel,
- const string& consumerTag, uint64_t deliveryTag,
+ const string& consumerTag, DeliveryId id,
uint32_t framesize)
{
channel.send(make_shared_ptr(
new BasicDeliverBody(
- channel.getVersion(), consumerTag, deliveryTag,
+ channel.getVersion(), consumerTag, id.getValue(),
getRedelivered(), getExchange(), getRoutingKey())));
sendContent(channel, framesize);
}
void BasicMessage::sendGetOk(ChannelAdapter& channel,
uint32_t messageCount,
- uint64_t /*responseTo*/,
- uint64_t deliveryTag,
+ DeliveryId id,
uint32_t framesize)
{
channel.send(make_shared_ptr(
new BasicGetOkBody(
channel.getVersion(),
- //responseTo,
- deliveryTag, getRedelivered(), getExchange(),
+ id.getValue(), getRedelivered(), getExchange(),
getRoutingKey(), messageCount)));
sendContent(channel, framesize);
}
-void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize)
+void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize)
{
BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
if (consume) {
- deliver(channel, consume->consumer, deliveryTag, framesize);
+ deliver(channel, consume->consumer, id, framesize);
} else {
BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
if (get) {
- uint64_t request(1/*actual value doesn't affect anything at present*/);
- sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize);
+ sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize);
} else {
//TODO:
//either need to be able to convert to a message transfer or
//throw error of some kind to allow this to be handled higher up
+ throw Exception("Conversion to BasicMessage not defined!");
}
}
}
@@ -292,3 +290,9 @@ void BasicMessage::setContent(std::auto_ptr<Content>& _content)
Mutex::ScopedLock locker(contentLock);
content = _content;
}
+
+
+uint32_t BasicMessage::getRequiredCredit() const
+{
+ return header->size() + contentSize();
+}