summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerMessage.cpp')
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp49
1 files changed, 26 insertions, 23 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index a5192beede..b738040470 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -35,22 +35,23 @@ using namespace qpid::sys;
BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) : publisher(_publisher),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- redelivered(false),
- size(0),
- persistenceId(0) {}
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate),
+ publisher(_publisher),
+ size(0)
+{
+}
BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
- publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
+ publisher(0), size(0)
+{
decode(buffer, headersOnly, contentChunkSize);
}
-BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
+BasicMessage::BasicMessage() : publisher(0), size(0)
+{
+}
BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
@@ -72,16 +73,13 @@ bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
-void BasicMessage::redeliver(){
- redelivered = true;
-}
-
void BasicMessage::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize,
ProtocolVersion* version){
// CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ out->send(new AMQFrame(*version, channel,
+ new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey())));
sendContent(out, channel, framesize, version);
}
@@ -90,9 +88,10 @@ void BasicMessage::sendGetOk(OutputHandler* out,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
- out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction
+ out->send(new AMQFrame(*version, channel,
+ new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount)));
sendContent(out, channel, framesize, version);
}
@@ -127,8 +126,12 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChu
void BasicMessage::decodeHeader(Buffer& buffer)
{
+ string exchange;
+ string routingKey;
+
buffer.getShortString(exchange);
buffer.getShortString(routingKey);
+ setRouting(exchange, routingKey);
u_int32_t headerSize = buffer.getLong();
AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
@@ -166,8 +169,8 @@ void BasicMessage::encode(Buffer& buffer)
void BasicMessage::encodeHeader(Buffer& buffer)
{
- buffer.putShortString(exchange);
- buffer.putShortString(routingKey);
+ buffer.putShortString(getExchange());
+ buffer.putShortString(getRoutingKey());
buffer.putLong(header->size());
header->encode(buffer);
}
@@ -191,8 +194,8 @@ u_int32_t BasicMessage::encodedContentSize()
u_int32_t BasicMessage::encodedHeaderSize()
{
- return exchange.size() + 1
- + routingKey.size() + 1
+ return getExchange().size() + 1
+ + getRoutingKey().size() + 1
+ header->size() + 4;//4 extra bytes for size
}
@@ -204,7 +207,7 @@ u_int64_t BasicMessage::expectedContentSize()
void BasicMessage::releaseContent(MessageStore* store)
{
Mutex::ScopedLock locker(contentLock);
- if (!isPersistent() && persistenceId == 0) {
+ if (!isPersistent() && getPersistenceId() == 0) {
store->stage(this);
}
if (!content.get() || content->size() > 0) {