diff options
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 172 |
1 files changed, 111 insertions, 61 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index c589669e5a..d13109dad1 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,6 +30,7 @@ #include "qpid/framing/SendContent.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/TypeFilter.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" #include <time.h> @@ -49,27 +50,16 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), - inCallback(false), requiredCredit(0) {} + staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), + expiration(FAR_FUTURE), dequeueCallback(0), + inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false) +{} -Message::Message(const Message& original) : - PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), - staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(original.expiration), enqueueCallback(0), dequeueCallback(0), - inCallback(false), requiredCredit(0) -{ - setExpiryPolicy(original.expiryPolicy); -} - -Message::~Message() -{ - if (expiryPolicy) - expiryPolicy->forget(*this); -} +Message::~Message() {} void Message::forcePersistent() { + sys::Mutex::ScopedLock l(lock); // only set forced bit if we actually need to force. if (! getAdapter().isPersistent(frames) ){ forcePersistentPolicy = true; @@ -86,7 +76,7 @@ std::string Message::getRoutingKey() const return getAdapter().getRoutingKey(frames); } -std::string Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -95,7 +85,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr { if (!exchange) { exchange = registry.get(getExchangeName()); - } + } return exchange; } @@ -106,16 +96,19 @@ bool Message::isImmediate() const const FieldTable* Message::getApplicationHeaders() const { + sys::Mutex::ScopedLock l(lock); return getAdapter().getApplicationHeaders(frames); } std::string Message::getAppId() const { + sys::Mutex::ScopedLock l(lock); return getAdapter().getAppId(frames); } bool Message::isPersistent() const { + sys::Mutex::ScopedLock l(lock); return (getAdapter().isPersistent(frames) || forcePersistentPolicy); } @@ -195,7 +188,7 @@ void Message::decodeContent(framing::Buffer& buffer) } else { //adjust header flags MarkLastSegment f; - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); } //mark content loaded loaded = true; @@ -247,7 +240,7 @@ void Message::destroy() bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const { intrusive_ptr<const PersistableMessage> pmsg(this); - + bool done = false; string& data = frame.castBody<AMQContentBody>()->getData(); store->loadContent(queue, pmsg, data, offset, maxContentSize); @@ -272,7 +265,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16 uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { + { AMQFrame frame((AMQContentBody())); morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); @@ -290,7 +283,10 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) { sys::Mutex::ScopedLock l(lock); Relay f(out); - frames.map_if(f, TypeFilter<HEADER_BODY>()); + frames.map_if(f, TypeFilter<HEADER_BODY>()); + //as frame (and pointer to body) has now been passed to handler, + //subsequent modifications should use a copy + copyHeaderOnWrite = true; } // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over @@ -320,13 +316,14 @@ bool Message::isContentLoaded() const } -namespace +namespace { const std::string X_QPID_TRACE("x-qpid.trace"); } bool Message::isExcluded(const std::vector<std::string>& excludes) const { + sys::Mutex::ScopedLock l(lock); const FieldTable* headers = getApplicationHeaders(); if (headers) { std::string traceStr = headers->getAsString(X_QPID_TRACE); @@ -345,11 +342,30 @@ bool Message::isExcluded(const std::vector<std::string>& excludes) const return false; } +class CloneHeaderBody +{ +public: + void operator()(AMQFrame& f) + { + f.cloneBody(); + } +}; + +AMQHeaderBody* Message::getHeaderBody() +{ + if (copyHeaderOnWrite) { + CloneHeaderBody f; + frames.map_if(f, TypeFilter<HEADER_BODY>()); + copyHeaderOnWrite = false; + } + return frames.getHeaders(); +} + void Message::addTraceId(const std::string& id) { sys::Mutex::ScopedLock l(lock); if (isA<MessageTransferBody>()) { - FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders(); + FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders(); std::string trace = headers.getAsString(X_QPID_TRACE); if (trace.empty()) { headers.setString(X_QPID_TRACE, id); @@ -357,13 +373,22 @@ void Message::addTraceId(const std::string& id) trace += ","; trace += id; headers.setString(X_QPID_TRACE, trace); - } + } } } -void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) +void Message::setTimestamp() +{ + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); + time_t now = ::time(0); + props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch +} + +void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e) { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { // AMQP requires setting the expiration property to be posix // time_t in seconds. TTL is in milliseconds @@ -372,26 +397,70 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); } - // Use higher resolution time for the internal expiry calculation. - expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); - setExpiryPolicy(e); + if (e) { + // Use higher resolution time for the internal expiry calculation. + // Prevent overflow as a signed int64_t + Duration ttl(std::min(props->getTtl() * TIME_MSEC, + (uint64_t) std::numeric_limits<int64_t>::max())); + expiration = AbsTime(e->getCurrentTime(), ttl); + setExpiryPolicy(e); + } } } void Message::adjustTtl() { - DeliveryProperties* props = getProperties<DeliveryProperties>(); + sys::Mutex::ScopedLock l(lock); + DeliveryProperties* props = getModifiableProperties<DeliveryProperties>(); if (props->getTtl()) { - sys::Mutex::ScopedLock l(lock); - sys::Duration d(sys::AbsTime::now(), getExpiration()); - props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired + if (expiration < FAR_FUTURE) { + sys::AbsTime current( + expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now()); + sys::Duration ttl(current, getExpiration()); + // convert from ns to ms; set to 1 if expired + props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1); + } } } +void Message::setRedelivered() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true); +} + +void Message::insertCustomProperty(const std::string& key, int64_t value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value); +} + +void Message::insertCustomProperty(const std::string& key, const std::string& value) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value); +} + +void Message::removeCustomProperty(const std::string& key) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key); +} + +void Message::setExchange(const std::string& exchange) +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<DeliveryProperties>()->setExchange(exchange); +} + +void Message::clearApplicationHeadersFlag() +{ + sys::Mutex::ScopedLock l(lock); + getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag(); +} + void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; - if (expiryPolicy) - expiryPolicy->willExpire(*this); } bool Message::hasExpired() @@ -415,30 +484,12 @@ struct ScopedSet { }; } -void Message::allEnqueuesComplete() { - ScopedSet ss(callbackLock, inCallback); - MessageCallback* cb = enqueueCallback; - if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); -} - void Message::allDequeuesComplete() { ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } -void Message::setEnqueueCompleteCallback(MessageCallback& cb) { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - enqueueCallback = &cb; -} - -void Message::resetEnqueueCompleteCallback() { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - enqueueCallback = 0; -} - void Message::setDequeueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); @@ -452,12 +503,11 @@ void Message::resetDequeueCompleteCallback() { } uint8_t Message::getPriority() const { + sys::Mutex::ScopedLock l(lock); return getAdapter().getPriority(frames); } -framing::FieldTable& Message::getOrInsertHeaders() -{ - return getProperties<MessageProperties>()->getApplicationHeaders(); -} +bool Message::getIsManagementMessage() const { return isManagementMessage; } +void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } }} // namespace qpid::broker |