summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerMessage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerMessage.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerMessage.cpp299
1 files changed, 0 insertions, 299 deletions
diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp
deleted file mode 100644
index bddd5802cf..0000000000
--- a/cpp/src/qpid/broker/BrokerMessage.cpp
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <boost/cast.hpp>
-
-#include "BrokerMessage.h"
-#include <iostream>
-
-#include "InMemoryContent.h"
-#include "LazyLoadedContent.h"
-#include "MessageStore.h"
-#include "BrokerQueue.h"
-#include "qpid/log/Statement.h"
-#include "qpid/framing/BasicDeliverBody.h"
-#include "qpid/framing/BasicGetOkBody.h"
-#include "qpid/framing/BasicPublishBody.h"
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/AMQMethodBody.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ChannelAdapter.h"
-#include "RecoveryManagerImpl.h"
-
-namespace qpid{
-namespace broker{
-
-struct BasicGetToken : DeliveryToken
-{
- typedef boost::shared_ptr<BasicGetToken> shared_ptr;
-
- Queue::shared_ptr queue;
-
- BasicGetToken(Queue::shared_ptr q) : queue(q) {}
-};
-
-struct BasicConsumeToken : DeliveryToken
-{
- typedef boost::shared_ptr<BasicConsumeToken> shared_ptr;
-
- const string consumer;
-
- BasicConsumeToken(const string c) : consumer(c) {}
-};
-
-}
-}
-
-using namespace boost;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-BasicMessage::BasicMessage(
- const ConnectionToken* const _publisher,
- const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate
-) :
- Message(_publisher, _exchange, _routingKey, _mandatory, _immediate),
- size(0)
-{}
-
-// For tests only.
-BasicMessage::BasicMessage() : isHeaderSet(false), size(0) {}
-
-BasicMessage::~BasicMessage(){}
-
-void BasicMessage::setHeader(AMQHeaderBody* _header){
- if (_header) {
- this->header = *_header;
- isHeaderSet = true;
- }
- else
- isHeaderSet = false;
-}
-
-void BasicMessage::addContent(AMQContentBody* data){
- if (!content.get()) {
- content = std::auto_ptr<Content>(new InMemoryContent());
- }
- content->add(data);
- size += data->size();
-}
-
-bool BasicMessage::isComplete(){
- return isHeaderSet && (header.getContentSize() == contentSize());
-}
-
-DeliveryToken::shared_ptr BasicMessage::createGetToken(Queue::shared_ptr queue)
-{
- return DeliveryToken::shared_ptr(new BasicGetToken(queue));
-}
-
-DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consumer)
-{
- return DeliveryToken::shared_ptr(new BasicConsumeToken(consumer));
-}
-
-void BasicMessage::deliver(ChannelAdapter& channel,
- const string& consumerTag, DeliveryId id,
- uint32_t framesize)
-{
- channel.send(BasicDeliverBody(
- channel.getVersion(), consumerTag, id.getValue(),
- getRedelivered(), getExchange(), getRoutingKey()));
- sendContent(channel, framesize);
-}
-
-void BasicMessage::sendGetOk(ChannelAdapter& channel,
- uint32_t messageCount,
- DeliveryId id,
- uint32_t framesize)
-{
- channel.send(
- BasicGetOkBody(
- channel.getVersion(),
- id.getValue(), getRedelivered(), getExchange(),
- getRoutingKey(), messageCount));
- sendContent(channel, framesize);
-}
-
-void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize)
-{
- BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token);
- if (consume) {
- deliver(channel, consume->consumer, id, framesize);
- } else {
- BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token);
- if (get) {
- sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize);
- } else {
- //TODO:
- //either need to be able to convert to a message transfer or
- //throw error of some kind to allow this to be handled higher up
- throw Exception("Conversion to BasicMessage not defined!");
- }
- }
-}
-
-void BasicMessage::sendContent(ChannelAdapter& channel, uint32_t framesize)
-{
- channel.send(header);
- Mutex::ScopedLock locker(contentLock);
- if (content.get())
- content->send(channel, framesize);
-}
-
-BasicHeaderProperties* BasicMessage::getHeaderProperties(){
- return isHeaderSet ? dynamic_cast<BasicHeaderProperties*>(header.getProperties()) : 0;
-}
-
-const FieldTable& BasicMessage::getApplicationHeaders(){
- return getHeaderProperties()->getHeaders();
-}
-
-bool BasicMessage::isPersistent()
-{
- if(!isHeaderSet) return false;
- BasicHeaderProperties* props = getHeaderProperties();
- return props && props->getDeliveryMode() == PERSISTENT;
-}
-
-void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChunkSize)
-{
- decodeHeader(buffer);
- if (!headersOnly) decodeContent(buffer, contentChunkSize);
-}
-
-void BasicMessage::decodeHeader(Buffer& buffer)
-{
- //don't care about the type here, but want encode/decode to be symmetric
- RecoveryManagerImpl::decodeMessageType(buffer);
-
- string exchange;
- string routingKey;
-
- buffer.getShortString(exchange);
- buffer.getShortString(routingKey);
- setRouting(exchange, routingKey);
-
- uint32_t headerSize = buffer.getLong();
- AMQHeaderBody headerBody;
- headerBody.decode(buffer, headerSize);
- setHeader(&headerBody);
-}
-
-void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize)
-{
- uint64_t expected = expectedContentSize();
- if (expected != buffer.available()) {
- QPID_LOG(error, "Expected " << expectedContentSize() << " bytes, got " << buffer.available());
- throw Exception("Cannot decode content, buffer not large enough.");
- }
-
- if (!chunkSize || chunkSize > expected) {
- chunkSize = expected;
- }
-
- uint64_t total = 0;
- while (total < expectedContentSize()) {
- uint64_t remaining = expected - total;
- AMQContentBody contentBody;
- contentBody.decode(buffer, remaining < chunkSize ? remaining : chunkSize);
- addContent(&contentBody);
- total += chunkSize;
- }
-}
-
-void BasicMessage::encode(Buffer& buffer) const
-{
- encodeHeader(buffer);
- encodeContent(buffer);
-}
-
-void BasicMessage::encodeHeader(Buffer& buffer) const
-{
- RecoveryManagerImpl::encodeMessageType(*this, buffer);
- buffer.putShortString(getExchange());
- buffer.putShortString(getRoutingKey());
- buffer.putLong(header.size());
- header.encode(buffer);
-}
-
-void BasicMessage::encodeContent(Buffer& buffer) const
-{
- Mutex::ScopedLock locker(contentLock);
- if (content.get()) content->encode(buffer);
-}
-
-uint32_t BasicMessage::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t BasicMessage::encodedContentSize() const
-{
- Mutex::ScopedLock locker(contentLock);
- return content.get() ? content->size() : 0;
-}
-
-uint32_t BasicMessage::encodedHeaderSize() const
-{
- return RecoveryManagerImpl::encodedMessageTypeSize()
- +getExchange().size() + 1
- + getRoutingKey().size() + 1
- + header.size() + 4;//4 extra bytes for size
-}
-
-uint64_t BasicMessage::expectedContentSize()
-{
- return isHeaderSet ? header.getContentSize() : 0;
-}
-
-void BasicMessage::releaseContent(MessageStore* store)
-{
- Mutex::ScopedLock locker(contentLock);
- if (!isPersistent() && getPersistenceId() == 0) {
- store->stage(*this);
- }
- if (!content.get() || content->size() > 0) {
- //set content to lazy loading mode (but only if there is
- //stored content):
-
- //Note: the LazyLoadedContent instance contains a raw pointer
- //to the message, however it is then set as a member of that
- //message so its lifetime is guaranteed to be no longer than
- //that of the message itself
- content = std::auto_ptr<Content>(
- new LazyLoadedContent(store, this, expectedContentSize()));
- }
-}
-
-void BasicMessage::setContent(std::auto_ptr<Content>& _content)
-{
- Mutex::ScopedLock locker(contentLock);
- content = _content;
-}
-
-
-uint32_t BasicMessage::getRequiredCredit() const
-{
- return header.size() + contentSize();
-}