summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp452
1 files changed, 0 insertions, 452 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
deleted file mode 100644
index 763dc55e40..0000000000
--- a/cpp/src/qpid/broker/Message.cpp
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/ExpiryPolicy.h"
-#include "qpid/StringUtils.h"
-#include "qpid/framing/frame_functors.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/SendContent.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/TypeFilter.h"
-#include "qpid/log/Statement.h"
-
-#include <time.h>
-
-using boost::intrusive_ptr;
-using qpid::sys::AbsTime;
-using qpid::sys::Duration;
-using qpid::sys::TIME_MSEC;
-using qpid::sys::FAR_FUTURE;
-using std::string;
-using namespace qpid::framing;
-
-namespace qpid {
-namespace broker {
-
-TransferAdapter Message::TRANSFER;
-
-Message::Message(const framing::SequenceNumber& id) :
- frames(id), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), dequeueCallback(0),
- inCallback(false), requiredCredit(0), isManagementMessage(false)
-{}
-
-Message::Message(const Message& original) :
- PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(original.expiration), dequeueCallback(0),
- inCallback(false), requiredCredit(0)
-{
- setExpiryPolicy(original.expiryPolicy);
-}
-
-Message::~Message()
-{
- if (expiryPolicy)
- expiryPolicy->forget(*this);
-}
-
-void Message::forcePersistent()
-{
- // only set forced bit if we actually need to force.
- if (! getAdapter().isPersistent(frames) ){
- forcePersistentPolicy = true;
- }
-}
-
-bool Message::isForcedPersistent()
-{
- return forcePersistentPolicy;
-}
-
-std::string Message::getRoutingKey() const
-{
- return getAdapter().getRoutingKey(frames);
-}
-
-std::string Message::getExchangeName() const
-{
- return getAdapter().getExchange(frames);
-}
-
-const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const
-{
- if (!exchange) {
- exchange = registry.get(getExchangeName());
- }
- return exchange;
-}
-
-bool Message::isImmediate() const
-{
- return getAdapter().isImmediate(frames);
-}
-
-const FieldTable* Message::getApplicationHeaders() const
-{
- return getAdapter().getApplicationHeaders(frames);
-}
-
-std::string Message::getAppId() const
-{
- return getAdapter().getAppId(frames);
-}
-
-bool Message::isPersistent() const
-{
- return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
-}
-
-bool Message::requiresAccept()
-{
- return getAdapter().requiresAccept(frames);
-}
-
-uint32_t Message::getRequiredCredit()
-{
- sys::Mutex::ScopedLock l(lock);
- if (!requiredCredit) {
- //add up payload for all header and content frames in the frameset
- SumBodySize sum;
- frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
- requiredCredit = sum.getSize();
- }
- return requiredCredit;
-}
-
-void Message::encode(framing::Buffer& buffer) const
-{
- //encode method and header frames
- EncodeFrame f1(buffer);
- frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
-
- //then encode the payload of each content frame
- framing::EncodeBody f2(buffer);
- frames.map_if(f2, TypeFilter<CONTENT_BODY>());
-}
-
-void Message::encodeContent(framing::Buffer& buffer) const
-{
- //encode the payload of each content frame
- EncodeBody f2(buffer);
- frames.map_if(f2, TypeFilter<CONTENT_BODY>());
-}
-
-uint32_t Message::encodedSize() const
-{
- return encodedHeaderSize() + encodedContentSize();
-}
-
-uint32_t Message::encodedContentSize() const
-{
- return frames.getContentSize();
-}
-
-uint32_t Message::encodedHeaderSize() const
-{
- //add up the size for all method and header frames in the frameset
- SumFrameSize sum;
- frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
- return sum.getSize();
-}
-
-void Message::decodeHeader(framing::Buffer& buffer)
-{
- AMQFrame method;
- method.decode(buffer);
- frames.append(method);
-
- AMQFrame header;
- header.decode(buffer);
- frames.append(header);
-}
-
-void Message::decodeContent(framing::Buffer& buffer)
-{
- if (buffer.available()) {
- //get the data as a string and set that as the content
- //body on a frame then add that frame to the frameset
- AMQFrame frame((AMQContentBody()));
- frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
- frame.setFirstSegment(false);
- frames.append(frame);
- } else {
- //adjust header flags
- MarkLastSegment f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
- }
- //mark content loaded
- loaded = true;
-}
-
-// Used for testing only
-void Message::tryReleaseContent()
-{
- if (checkContentReleasable()) {
- releaseContent();
- }
-}
-
-void Message::releaseContent(MessageStore* s)
-{
- //deprecated, use setStore(store); releaseContent(); instead
- if (!store) setStore(s);
- releaseContent();
-}
-
-void Message::releaseContent()
-{
- sys::Mutex::ScopedLock l(lock);
- if (store) {
- if (!getPersistenceId()) {
- intrusive_ptr<PersistableMessage> pmsg(this);
- store->stage(pmsg);
- staged = true;
- }
- //ensure required credit is cached before content frames are released
- getRequiredCredit();
- //remove any content frames from the frameset
- frames.remove(TypeFilter<CONTENT_BODY>());
- setContentReleased();
- }
-}
-
-void Message::destroy()
-{
- if (staged) {
- if (store) {
- store->destroy(*this);
- } else {
- QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
- }
- }
-}
-
-bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
-{
- intrusive_ptr<const PersistableMessage> pmsg(this);
-
- bool done = false;
- string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, pmsg, data, offset, maxContentSize);
- done = data.size() < maxContentSize;
- frame.setBof(false);
- frame.setEof(true);
- QPID_LOG(debug, "loaded frame" << frame);
- if (offset > 0) {
- frame.setBos(false);
- }
- if (!done) {
- frame.setEos(false);
- } else return false;
- return true;
-}
-
-void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
-{
- sys::Mutex::ScopedLock l(lock);
- if (isContentReleased() && !frames.isComplete()) {
- sys::Mutex::ScopedUnlock u(lock);
- uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
- bool morecontent = true;
- for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
- AMQFrame frame((AMQContentBody()));
- morecontent = getContentFrame(queue, frame, maxContentSize, offset);
- out.handle(frame);
- }
- } else {
- Count c;
- frames.map_if(c, TypeFilter<CONTENT_BODY>());
-
- SendContent f(out, maxFrameSize, c.getCount());
- frames.map_if(f, TypeFilter<CONTENT_BODY>());
- }
-}
-
-void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const
-{
- sys::Mutex::ScopedLock l(lock);
- Relay f(out);
- frames.map_if(f, TypeFilter<HEADER_BODY>());
-}
-
-// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
-// 0-8/0-9 message differences.
-MessageAdapter& Message::getAdapter() const
-{
- if (!adapter) {
- if(frames.isA<MessageTransferBody>()) {
- adapter = &TRANSFER;
- } else {
- const AMQMethodBody* method = frames.getMethod();
- if (!method) throw Exception("Can't adapt message with no method");
- else throw Exception(QPID_MSG("Can't adapt message based on " << *method));
- }
- }
- return *adapter;
-}
-
-uint64_t Message::contentSize() const
-{
- return frames.getContentSize();
-}
-
-bool Message::isContentLoaded() const
-{
- return loaded;
-}
-
-
-namespace
-{
-const std::string X_QPID_TRACE("x-qpid.trace");
-}
-
-bool Message::isExcluded(const std::vector<std::string>& excludes) const
-{
- const FieldTable* headers = getApplicationHeaders();
- if (headers) {
- std::string traceStr = headers->getAsString(X_QPID_TRACE);
- if (traceStr.size()) {
- std::vector<std::string> trace = split(traceStr, ", ");
-
- for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
- for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
- if (*i == *j) {
- return true;
- }
- }
- }
- }
- }
- return false;
-}
-
-void Message::addTraceId(const std::string& id)
-{
- sys::Mutex::ScopedLock l(lock);
- if (isA<MessageTransferBody>()) {
- FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
- std::string trace = headers.getAsString(X_QPID_TRACE);
- if (trace.empty()) {
- headers.setString(X_QPID_TRACE, id);
- } else if (trace.find(id) == std::string::npos) {
- trace += ",";
- trace += id;
- headers.setString(X_QPID_TRACE, trace);
- }
- }
-}
-
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
-{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
- if (props->getTtl()) {
- // AMQP requires setting the expiration property to be posix
- // time_t in seconds. TTL is in milliseconds
- if (!props->getExpiration()) {
- //only set expiration in delivery properties if not already set
- time_t now = ::time(0);
- props->setExpiration(now + (props->getTtl()/1000));
- }
- // Use higher resolution time for the internal expiry calculation.
- Duration ttl(std::min(props->getTtl() * TIME_MSEC, (uint64_t) std::numeric_limits<int64_t>::max()));//Prevent overflow
- expiration = AbsTime(AbsTime::now(), ttl);
- setExpiryPolicy(e);
- }
-}
-
-void Message::adjustTtl()
-{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
- if (props->getTtl()) {
- sys::Mutex::ScopedLock l(lock);
- if (expiration < FAR_FUTURE) {
- sys::Duration d(sys::AbsTime::now(), getExpiration());
- props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired
- }
- }
-}
-
-void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
- expiryPolicy = e;
- if (expiryPolicy)
- expiryPolicy->willExpire(*this);
-}
-
-bool Message::hasExpired()
-{
- return expiryPolicy && expiryPolicy->hasExpired(*this);
-}
-
-namespace {
-struct ScopedSet {
- sys::Monitor& lock;
- bool& flag;
- ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
- sys::Monitor::ScopedLock sl(lock);
- flag = true;
- }
- ~ScopedSet(){
- sys::Monitor::ScopedLock sl(lock);
- flag = false;
- lock.notifyAll();
- }
-};
-}
-
-void Message::allDequeuesComplete() {
- ScopedSet ss(callbackLock, inCallback);
- MessageCallback* cb = dequeueCallback;
- if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
-void Message::setDequeueCompleteCallback(MessageCallback& cb) {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- dequeueCallback = &cb;
-}
-
-void Message::resetDequeueCompleteCallback() {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- dequeueCallback = 0;
-}
-
-uint8_t Message::getPriority() const {
- return getAdapter().getPriority(frames);
-}
-
-framing::FieldTable& Message::getOrInsertHeaders()
-{
- return getProperties<MessageProperties>()->getApplicationHeaders();
-}
-
-bool Message::getIsManagementMessage() const { return isManagementMessage; }
-void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
-
-}} // namespace qpid::broker