summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp13
1 files changed, 10 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 8c9e5b8c48..7f7b2bc312 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -216,7 +216,8 @@ void Queue::requeue(const QueuedMessage& msg){
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
msg.payload->forcePersistent();
if (msg.payload->isForcedPersistent() ){
- enqueue(0, msg.payload);
+ boost::intrusive_ptr<Message> payload = msg.payload;
+ enqueue(0, payload);
}
}
}
@@ -720,7 +721,7 @@ void Queue::setLastNodeFailure()
// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg, bool suppressPolicyCheck)
+bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
@@ -741,6 +742,11 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg,
}
if (traceId.size()) {
+ //copy on write: take deep copy of message before modifying it
+ //as the frames may already be available for delivery on other
+ //threads
+ boost::intrusive_ptr<Message> copy(new Message(*msg));
+ msg = copy;
msg->addTraceId(traceId);
}
@@ -1158,7 +1164,8 @@ void Queue::enqueued(const QueuedMessage& m)
policy->enqueued(m);
}
mgntEnqStats(m.payload);
- enqueue ( 0, m.payload, true );
+ boost::intrusive_ptr<Message> payload = m.payload;
+ enqueue ( 0, payload, true );
} else {
QPID_LOG(warning, "Queue informed of enqueued message that has no payload");
}