diff options
Diffstat (limited to 'cpp/lib/broker/BrokerMessage.cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 52 |
1 files changed, 26 insertions, 26 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index 07b14a4eff..a5192beede 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -33,7 +33,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::sys; -Message::Message(const ConnectionToken* const _publisher, +BasicMessage::BasicMessage(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, bool _mandatory, bool _immediate) : publisher(_publisher), exchange(_exchange), @@ -44,23 +44,23 @@ Message::Message(const ConnectionToken* const _publisher, size(0), persistenceId(0) {} -Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : +BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){ decode(buffer, headersOnly, contentChunkSize); } -Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} +BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){} -Message::~Message(){ +BasicMessage::~BasicMessage(){ if (content.get()) content->destroy(); } -void Message::setHeader(AMQHeaderBody::shared_ptr _header){ +void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){ this->header = _header; } -void Message::addContent(AMQContentBody::shared_ptr data){ +void BasicMessage::addContent(AMQContentBody::shared_ptr data){ if (!content.get()) { content = std::auto_ptr<Content>(new InMemoryContent()); } @@ -68,15 +68,15 @@ void Message::addContent(AMQContentBody::shared_ptr data){ size += data->size(); } -bool Message::isComplete(){ +bool BasicMessage::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } -void Message::redeliver(){ +void BasicMessage::redeliver(){ redelivered = true; } -void Message::deliver(OutputHandler* out, int channel, +void BasicMessage::deliver(OutputHandler* out, int channel, const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize, ProtocolVersion* version){ @@ -85,7 +85,7 @@ void Message::deliver(OutputHandler* out, int channel, sendContent(out, channel, framesize, version); } -void Message::sendGetOk(OutputHandler* out, +void BasicMessage::sendGetOk(OutputHandler* out, int channel, u_int32_t messageCount, u_int64_t deliveryTag, @@ -96,7 +96,7 @@ void Message::sendGetOk(OutputHandler* out, sendContent(out, channel, framesize, version); } -void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ +void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(*version, channel, headerBody)); @@ -104,28 +104,28 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, if (content.get()) content->send(*version, out, channel, framesize); } -BasicHeaderProperties* Message::getHeaderProperties(){ +BasicHeaderProperties* BasicMessage::getHeaderProperties(){ return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); } -const ConnectionToken* const Message::getPublisher(){ +const ConnectionToken* const BasicMessage::getPublisher(){ return publisher; } -bool Message::isPersistent() +bool BasicMessage::isPersistent() { if(!header) return false; BasicHeaderProperties* props = getHeaderProperties(); return props && props->getDeliveryMode() == PERSISTENT; } -void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) +void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) { decodeHeader(buffer); if (!headersOnly) decodeContent(buffer, contentChunkSize); } -void Message::decodeHeader(Buffer& buffer) +void BasicMessage::decodeHeader(Buffer& buffer) { buffer.getShortString(exchange); buffer.getShortString(routingKey); @@ -136,7 +136,7 @@ void Message::decodeHeader(Buffer& buffer) setHeader(headerBody); } -void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) +void BasicMessage::decodeContent(Buffer& buffer, u_int32_t chunkSize) { u_int64_t expected = expectedContentSize(); if (expected != buffer.available()) { @@ -158,13 +158,13 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize) } } -void Message::encode(Buffer& buffer) +void BasicMessage::encode(Buffer& buffer) { encodeHeader(buffer); encodeContent(buffer); } -void Message::encodeHeader(Buffer& buffer) +void BasicMessage::encodeHeader(Buffer& buffer) { buffer.putShortString(exchange); buffer.putShortString(routingKey); @@ -172,36 +172,36 @@ void Message::encodeHeader(Buffer& buffer) header->encode(buffer); } -void Message::encodeContent(Buffer& buffer) +void BasicMessage::encodeContent(Buffer& buffer) { Mutex::ScopedLock locker(contentLock); if (content.get()) content->encode(buffer); } -u_int32_t Message::encodedSize() +u_int32_t BasicMessage::encodedSize() { return encodedHeaderSize() + encodedContentSize(); } -u_int32_t Message::encodedContentSize() +u_int32_t BasicMessage::encodedContentSize() { Mutex::ScopedLock locker(contentLock); return content.get() ? content->size() : 0; } -u_int32_t Message::encodedHeaderSize() +u_int32_t BasicMessage::encodedHeaderSize() { return exchange.size() + 1 + routingKey.size() + 1 + header->size() + 4;//4 extra bytes for size } -u_int64_t Message::expectedContentSize() +u_int64_t BasicMessage::expectedContentSize() { return header.get() ? header->getContentSize() : 0; } -void Message::releaseContent(MessageStore* store) +void BasicMessage::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); if (!isPersistent() && persistenceId == 0) { @@ -217,7 +217,7 @@ void Message::releaseContent(MessageStore* store) } } -void Message::setContent(std::auto_ptr<Content>& _content) +void BasicMessage::setContent(std::auto_ptr<Content>& _content) { Mutex::ScopedLock locker(contentLock); content = _content; |
