summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker/BrokerMessage.cpp')
-rw-r--r--cpp/lib/broker/BrokerMessage.cpp52
1 files changed, 26 insertions, 26 deletions
diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp
index 07b14a4eff..a5192beede 100644
--- a/cpp/lib/broker/BrokerMessage.cpp
+++ b/cpp/lib/broker/BrokerMessage.cpp
@@ -33,7 +33,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-Message::Message(const ConnectionToken* const _publisher,
+BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
bool _mandatory, bool _immediate) : publisher(_publisher),
exchange(_exchange),
@@ -44,23 +44,23 @@ Message::Message(const ConnectionToken* const _publisher,
size(0),
persistenceId(0) {}
-Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
+BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) :
publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
decode(buffer, headersOnly, contentChunkSize);
}
-Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
+BasicMessage::BasicMessage() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
-Message::~Message(){
+BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
}
-void Message::setHeader(AMQHeaderBody::shared_ptr _header){
+void BasicMessage::setHeader(AMQHeaderBody::shared_ptr _header){
this->header = _header;
}
-void Message::addContent(AMQContentBody::shared_ptr data){
+void BasicMessage::addContent(AMQContentBody::shared_ptr data){
if (!content.get()) {
content = std::auto_ptr<Content>(new InMemoryContent());
}
@@ -68,15 +68,15 @@ void Message::addContent(AMQContentBody::shared_ptr data){
size += data->size();
}
-bool Message::isComplete(){
+bool BasicMessage::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
-void Message::redeliver(){
+void BasicMessage::redeliver(){
redelivered = true;
}
-void Message::deliver(OutputHandler* out, int channel,
+void BasicMessage::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize,
ProtocolVersion* version){
@@ -85,7 +85,7 @@ void Message::deliver(OutputHandler* out, int channel,
sendContent(out, channel, framesize, version);
}
-void Message::sendGetOk(OutputHandler* out,
+void BasicMessage::sendGetOk(OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
@@ -96,7 +96,7 @@ void Message::sendGetOk(OutputHandler* out,
sendContent(out, channel, framesize, version);
}
-void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
+void BasicMessage::sendContent(OutputHandler* out, int channel, u_int32_t framesize, ProtocolVersion* version){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(*version, channel, headerBody));
@@ -104,28 +104,28 @@ void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize,
if (content.get()) content->send(*version, out, channel, framesize);
}
-BasicHeaderProperties* Message::getHeaderProperties(){
+BasicHeaderProperties* BasicMessage::getHeaderProperties(){
return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
}
-const ConnectionToken* const Message::getPublisher(){
+const ConnectionToken* const BasicMessage::getPublisher(){
return publisher;
}
-bool Message::isPersistent()
+bool BasicMessage::isPersistent()
{
if(!header) return false;
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
}
-void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
+void BasicMessage::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
{
decodeHeader(buffer);
if (!headersOnly) decodeContent(buffer, contentChunkSize);
}
-void Message::decodeHeader(Buffer& buffer)
+void BasicMessage::decodeHeader(Buffer& buffer)
{
buffer.getShortString(exchange);
buffer.getShortString(routingKey);
@@ -136,7 +136,7 @@ void Message::decodeHeader(Buffer& buffer)
setHeader(headerBody);
}
-void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
+void BasicMessage::decodeContent(Buffer& buffer, u_int32_t chunkSize)
{
u_int64_t expected = expectedContentSize();
if (expected != buffer.available()) {
@@ -158,13 +158,13 @@ void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
}
}
-void Message::encode(Buffer& buffer)
+void BasicMessage::encode(Buffer& buffer)
{
encodeHeader(buffer);
encodeContent(buffer);
}
-void Message::encodeHeader(Buffer& buffer)
+void BasicMessage::encodeHeader(Buffer& buffer)
{
buffer.putShortString(exchange);
buffer.putShortString(routingKey);
@@ -172,36 +172,36 @@ void Message::encodeHeader(Buffer& buffer)
header->encode(buffer);
}
-void Message::encodeContent(Buffer& buffer)
+void BasicMessage::encodeContent(Buffer& buffer)
{
Mutex::ScopedLock locker(contentLock);
if (content.get()) content->encode(buffer);
}
-u_int32_t Message::encodedSize()
+u_int32_t BasicMessage::encodedSize()
{
return encodedHeaderSize() + encodedContentSize();
}
-u_int32_t Message::encodedContentSize()
+u_int32_t BasicMessage::encodedContentSize()
{
Mutex::ScopedLock locker(contentLock);
return content.get() ? content->size() : 0;
}
-u_int32_t Message::encodedHeaderSize()
+u_int32_t BasicMessage::encodedHeaderSize()
{
return exchange.size() + 1
+ routingKey.size() + 1
+ header->size() + 4;//4 extra bytes for size
}
-u_int64_t Message::expectedContentSize()
+u_int64_t BasicMessage::expectedContentSize()
{
return header.get() ? header->getContentSize() : 0;
}
-void Message::releaseContent(MessageStore* store)
+void BasicMessage::releaseContent(MessageStore* store)
{
Mutex::ScopedLock locker(contentLock);
if (!isPersistent() && persistenceId == 0) {
@@ -217,7 +217,7 @@ void Message::releaseContent(MessageStore* store)
}
}
-void Message::setContent(std::auto_ptr<Content>& _content)
+void BasicMessage::setContent(std::auto_ptr<Content>& _content)
{
Mutex::ScopedLock locker(contentLock);
content = _content;