summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp172
1 files changed, 111 insertions, 61 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index c589669e5a..d13109dad1 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,6 +30,7 @@
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <time.h>
@@ -49,27 +50,16 @@ TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0),
- inCallback(false), requiredCredit(0) {}
+ staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
+ expiration(FAR_FUTURE), dequeueCallback(0),
+ inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false)
+{}
-Message::Message(const Message& original) :
- PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
- staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
- expiration(original.expiration), enqueueCallback(0), dequeueCallback(0),
- inCallback(false), requiredCredit(0)
-{
- setExpiryPolicy(original.expiryPolicy);
-}
-
-Message::~Message()
-{
- if (expiryPolicy)
- expiryPolicy->forget(*this);
-}
+Message::~Message() {}
void Message::forcePersistent()
{
+ sys::Mutex::ScopedLock l(lock);
// only set forced bit if we actually need to force.
if (! getAdapter().isPersistent(frames) ){
forcePersistentPolicy = true;
@@ -86,7 +76,7 @@ std::string Message::getRoutingKey() const
return getAdapter().getRoutingKey(frames);
}
-std::string Message::getExchangeName() const
+std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
@@ -95,7 +85,7 @@ const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registr
{
if (!exchange) {
exchange = registry.get(getExchangeName());
- }
+ }
return exchange;
}
@@ -106,16 +96,19 @@ bool Message::isImmediate() const
const FieldTable* Message::getApplicationHeaders() const
{
+ sys::Mutex::ScopedLock l(lock);
return getAdapter().getApplicationHeaders(frames);
}
std::string Message::getAppId() const
{
+ sys::Mutex::ScopedLock l(lock);
return getAdapter().getAppId(frames);
}
bool Message::isPersistent() const
{
+ sys::Mutex::ScopedLock l(lock);
return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
}
@@ -195,7 +188,7 @@ void Message::decodeContent(framing::Buffer& buffer)
} else {
//adjust header flags
MarkLastSegment f;
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
}
//mark content loaded
loaded = true;
@@ -247,7 +240,7 @@ void Message::destroy()
bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
intrusive_ptr<const PersistableMessage> pmsg(this);
-
+
bool done = false;
string& data = frame.castBody<AMQContentBody>()->getData();
store->loadContent(queue, pmsg, data, offset, maxContentSize);
@@ -272,7 +265,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
- {
+ {
AMQFrame frame((AMQContentBody()));
morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
@@ -290,7 +283,10 @@ void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
{
sys::Mutex::ScopedLock l(lock);
Relay f(out);
- frames.map_if(f, TypeFilter<HEADER_BODY>());
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+ //as frame (and pointer to body) has now been passed to handler,
+ //subsequent modifications should use a copy
+ copyHeaderOnWrite = true;
}
// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -320,13 +316,14 @@ bool Message::isContentLoaded() const
}
-namespace
+namespace
{
const std::string X_QPID_TRACE("x-qpid.trace");
}
bool Message::isExcluded(const std::vector<std::string>& excludes) const
{
+ sys::Mutex::ScopedLock l(lock);
const FieldTable* headers = getApplicationHeaders();
if (headers) {
std::string traceStr = headers->getAsString(X_QPID_TRACE);
@@ -345,11 +342,30 @@ bool Message::isExcluded(const std::vector<std::string>& excludes) const
return false;
}
+class CloneHeaderBody
+{
+public:
+ void operator()(AMQFrame& f)
+ {
+ f.cloneBody();
+ }
+};
+
+AMQHeaderBody* Message::getHeaderBody()
+{
+ if (copyHeaderOnWrite) {
+ CloneHeaderBody f;
+ frames.map_if(f, TypeFilter<HEADER_BODY>());
+ copyHeaderOnWrite = false;
+ }
+ return frames.getHeaders();
+}
+
void Message::addTraceId(const std::string& id)
{
sys::Mutex::ScopedLock l(lock);
if (isA<MessageTransferBody>()) {
- FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
+ FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
std::string trace = headers.getAsString(X_QPID_TRACE);
if (trace.empty()) {
headers.setString(X_QPID_TRACE, id);
@@ -357,13 +373,22 @@ void Message::addTraceId(const std::string& id)
trace += ",";
trace += id;
headers.setString(X_QPID_TRACE, trace);
- }
+ }
}
}
-void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
+void Message::setTimestamp()
+{
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
+ time_t now = ::time(0);
+ props->setTimestamp(now); // AMQP-0.10: posix time_t - secs since Epoch
+}
+
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
if (props->getTtl()) {
// AMQP requires setting the expiration property to be posix
// time_t in seconds. TTL is in milliseconds
@@ -372,26 +397,70 @@ void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
time_t now = ::time(0);
props->setExpiration(now + (props->getTtl()/1000));
}
- // Use higher resolution time for the internal expiry calculation.
- expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
- setExpiryPolicy(e);
+ if (e) {
+ // Use higher resolution time for the internal expiry calculation.
+ // Prevent overflow as a signed int64_t
+ Duration ttl(std::min(props->getTtl() * TIME_MSEC,
+ (uint64_t) std::numeric_limits<int64_t>::max()));
+ expiration = AbsTime(e->getCurrentTime(), ttl);
+ setExpiryPolicy(e);
+ }
}
}
void Message::adjustTtl()
{
- DeliveryProperties* props = getProperties<DeliveryProperties>();
+ sys::Mutex::ScopedLock l(lock);
+ DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
if (props->getTtl()) {
- sys::Mutex::ScopedLock l(lock);
- sys::Duration d(sys::AbsTime::now(), getExpiration());
- props->setTtl(int64_t(d) > 0 ? int64_t(d)/1000000 : 1); // convert from ns to ms; set to 1 if expired
+ if (expiration < FAR_FUTURE) {
+ sys::AbsTime current(
+ expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+ sys::Duration ttl(current, getExpiration());
+ // convert from ns to ms; set to 1 if expired
+ props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
+ }
}
}
+void Message::setRedelivered()
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+}
+
+void Message::insertCustomProperty(const std::string& key, int64_t value)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+}
+
+void Message::insertCustomProperty(const std::string& key, const std::string& value)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+}
+
+void Message::removeCustomProperty(const std::string& key)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+}
+
+void Message::setExchange(const std::string& exchange)
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+}
+
+void Message::clearApplicationHeadersFlag()
+{
+ sys::Mutex::ScopedLock l(lock);
+ getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
+}
+
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
- if (expiryPolicy)
- expiryPolicy->willExpire(*this);
}
bool Message::hasExpired()
@@ -415,30 +484,12 @@ struct ScopedSet {
};
}
-void Message::allEnqueuesComplete() {
- ScopedSet ss(callbackLock, inCallback);
- MessageCallback* cb = enqueueCallback;
- if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
void Message::allDequeuesComplete() {
ScopedSet ss(callbackLock, inCallback);
MessageCallback* cb = dequeueCallback;
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
-void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- enqueueCallback = &cb;
-}
-
-void Message::resetEnqueueCompleteCallback() {
- sys::Mutex::ScopedLock l(callbackLock);
- while (inCallback) callbackLock.wait();
- enqueueCallback = 0;
-}
-
void Message::setDequeueCompleteCallback(MessageCallback& cb) {
sys::Mutex::ScopedLock l(callbackLock);
while (inCallback) callbackLock.wait();
@@ -452,12 +503,11 @@ void Message::resetDequeueCompleteCallback() {
}
uint8_t Message::getPriority() const {
+ sys::Mutex::ScopedLock l(lock);
return getAdapter().getPriority(frames);
}
-framing::FieldTable& Message::getOrInsertHeaders()
-{
- return getProperties<MessageProperties>()->getApplicationHeaders();
-}
+bool Message::getIsManagementMessage() const { return isManagementMessage; }
+void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
}} // namespace qpid::broker