diff options
Diffstat (limited to 'cpp/lib/broker/BrokerMessage.cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 49 |
1 files changed, 26 insertions, 23 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index a5192beede..b738040470 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -35,22 +35,23 @@ using namespace qpid::sys; BasicMessage::BasicMessage(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, - bool _mandatory, bool _immediate) : publisher(_publisher), - exchange(_exchange), - routingKey(_routingKey), - mandatory(_mandatory), - immediate(_immediate), - redelivered(false), - size(0), - persistenceId(0) {} + bool _mandatory, bool _immediate) : + Message(_exchange, _routingKey, _mandatory, _immediate), + publisher(_publisher), + size(0) +{ +} BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : - publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ + publisher(0), size(0) +{ decode(buffer, headersOnly, contentChunkSize); } -BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} +BasicMessage::BasicMessage() : publisher(0), size(0) +{ +} BasicMessage::~BasicMessage(){ if (content.get()) content->destroy(); @@ -72,16 +73,13 @@ bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } -void BasicMessage::redeliver(){ - redelivered = true; -} - void BasicMessage::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize, ProtocolVersion* version){ // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version, consumerTag, deliveryTag, redelivered, exchange, routingKey))); + out->send(new AMQFrame(*version, channel, + new BasicDeliverBody(*version, consumerTag, deliveryTag, getRedelivered(), getExchange(), getRoutingKey()))); sendContent(out, channel, framesize, version); } @@ -90,9 +88,10 @@ void BasicMessage::sendGetOk(OutputHandler* out, u_int32_t messageCount, u_int64_t deliveryTag, u_int32_t framesize, - ProtocolVersion* version){ - // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction - out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version, deliveryTag, redelivered, exchange, routingKey, messageCount))); + ProtocolVersion* version){ + // CCT -- TODO - Update code generator to take pointer/ not instance to avoid extra contruction + out->send(new AMQFrame(*version, channel, + new BasicGetOkBody(*version, deliveryTag, getRedelivered(), getExchange(), getRoutingKey(), messageCount))); sendContent(out, channel, framesize, version); } @@ -127,8 +126,12 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChu void BasicMessage::decodeHeader(Buffer& buffer) { + string exchange; + string routingKey; + buffer.getShortString(exchange); buffer.getShortString(routingKey); + setRouting(exchange, routingKey); u_int32_t headerSize = buffer.getLong(); AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody()); @@ -166,8 +169,8 @@ void BasicMessage::encode(Buffer& buffer) void BasicMessage::encodeHeader(Buffer& buffer) { - buffer.putShortString(exchange); - buffer.putShortString(routingKey); + buffer.putShortString(getExchange()); + buffer.putShortString(getRoutingKey()); buffer.putLong(header->size()); header->encode(buffer); } @@ -191,8 +194,8 @@ u_int32_t BasicMessage::encodedContentSize() u_int32_t BasicMessage::encodedHeaderSize() { - return exchange.size() + 1 - + routingKey.size() + 1 + return getExchange().size() + 1 + + getRoutingKey().size() + 1 + header->size() + 4;//4 extra bytes for size } @@ -204,7 +207,7 @@ u_int64_t BasicMessage::expectedContentSize() void BasicMessage::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); - if (!isPersistent() && persistenceId == 0) { + if (!isPersistent() && getPersistenceId() == 0) { store->stage(this); } if (!content.get() || content->size() > 0) { |
