summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Consumer.h58
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp68
-rw-r--r--cpp/src/qpid/broker/Queue.h6
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp183
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.h101
-rw-r--r--cpp/src/qpid/broker/QueuedMessage.h46
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
8 files changed, 347 insertions, 121 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index 18fc3ec763..5de00668b3 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -21,47 +21,33 @@
#ifndef _Consumer_
#define _Consumer_
-namespace qpid {
- namespace broker {
- class Queue;
-}}
-
#include "Message.h"
+#include "QueuedMessage.h"
#include "OwnershipToken.h"
namespace qpid {
- namespace broker {
-
- struct QueuedMessage
- {
- boost::intrusive_ptr<Message> payload;
- framing::SequenceNumber position;
- Queue* queue;
-
- QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
- payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
- };
-
+namespace broker {
+
+class Queue;
+
+class Consumer {
+ const bool acquires;
+ public:
+ typedef boost::shared_ptr<Consumer> shared_ptr;
+
+ framing::SequenceNumber position;
+
+ Consumer(bool preAcquires = true) : acquires(preAcquires) {}
+ bool preAcquires() const { return acquires; }
+ virtual bool deliver(QueuedMessage& msg) = 0;
+ virtual void notify() = 0;
+ virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
+ virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+ virtual OwnershipToken* getSession() = 0;
+ virtual ~Consumer(){}
+};
- class Consumer {
- const bool acquires;
- public:
- typedef boost::shared_ptr<Consumer> shared_ptr;
-
- framing::SequenceNumber position;
-
- Consumer(bool preAcquires = true) : acquires(preAcquires) {}
- bool preAcquires() const { return acquires; }
- virtual bool deliver(QueuedMessage& msg) = 0;
- virtual void notify() = 0;
- virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
- virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
- virtual OwnershipToken* getSession() = 0;
- virtual ~Consumer(){}
- };
- }
-}
+}}
#endif
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 530dca99a4..fb950b8a83 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -130,7 +130,7 @@ void DeliveryRecord::complete()
void DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
- queue->dequeue(ctxt, msg.payload);
+ queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
@@ -138,7 +138,7 @@ void DeliveryRecord::accept(TransactionContext* ctxt) {
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
if (acquired && !ended) {
- queue->dequeue(ctxt, msg.payload);
+ queue->dequeue(ctxt, msg);
}
}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index befc5c4eff..8bbccda844 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -186,6 +186,8 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
}
void Queue::requeue(const QueuedMessage& msg){
+ if (policy.get() && !policy->isEnqueued(msg)) return;
+
Listeners copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -415,29 +417,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
Listeners copy;
{
Mutex::ScopedLock locker(messageLock);
- messages.push_back(QueuedMessage(this, msg, ++sequence));
- if (policy.get()) {
- policy->enqueued(msg->contentSize());
- if (policy->limitExceeded()) {
- if (!policyExceeded) {
- policyExceeded = true;
- QPID_LOG(info, "Queue size exceeded policy for " << name);
- }
- if (store) {
- QPID_LOG(debug, "Message " << msg << " on " << name << " released from memory");
- msg->releaseContent(store);
- } else {
- QPID_LOG(error, "Message " << msg << " on " << name
- << " exceeds the policy for the queue but can't be released from memory as the queue is not durable");
- throw ResourceLimitExceededException(QPID_MSG("Policy exceeded for " << name << " " << *policy));
- }
- } else {
- if (policyExceeded) {
- policyExceeded = false;
- QPID_LOG(info, "Queue size within policy for " << name);
- }
- }
- }
+ QueuedMessage qm(this, msg, ++sequence);
+ if (policy.get()) policy->tryEnqueue(qm);
+
+ messages.push_back(qm);
listeners.swap(copy);
}
for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
@@ -486,15 +469,16 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
}
// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
+bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
+ if (policy.get() && !policy->isEnqueued(msg)) return false;
{
Mutex::ScopedLock locker(messageLock);
dequeued(msg);
}
- if (msg->isPersistent() && store) {
- msg->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ if (msg.payload->isPersistent() && store) {
+ msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
return true;
}
@@ -508,7 +492,7 @@ bool Queue::dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
*/
void Queue::popAndDequeue()
{
- boost::intrusive_ptr<Message> msg = messages.front().payload;
+ QueuedMessage msg = messages.front();
messages.pop_front();
dequeue(0, msg);
}
@@ -517,15 +501,15 @@ void Queue::popAndDequeue()
* Updates policy and management when a message has been dequeued,
* expects messageLock to be held
*/
-void Queue::dequeued(boost::intrusive_ptr<Message>& msg)
+void Queue::dequeued(const QueuedMessage& msg)
{
- if (policy.get()) policy->dequeued(msg->contentSize());
+ if (policy.get()) policy->dequeued(msg);
if (mgmtObject != 0){
mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg->contentSize());
- if (msg->isPersistent ()){
+ mgmtObject->inc_byteTotalDequeues (msg.payload->contentSize());
+ if (msg.payload->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ mgmtObject->inc_bytePersistDequeues (msg.payload->contentSize());
}
}
}
@@ -551,10 +535,7 @@ void Queue::create(const FieldTable& _settings)
void Queue::configure(const FieldTable& _settings)
{
- std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
- if (_policy->getMaxCount() || _policy->getMaxSize()) {
- setPolicy(_policy);
- }
+ setPolicy(QueuePolicy::createQueuePolicy(_settings));
//set this regardless of owner to allow use of no-local with exclusive consumers also
noLocal = _settings.get(qpidNoLocal);
QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
@@ -720,6 +701,19 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+bool Queue::releaseMessageContent(const QueuedMessage& m)
+{
+ if (store) {
+ QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
+ m.payload->releaseContent(store);
+ return true;
+ } else {
+ QPID_LOG(warning, "Message " << m.position << " on " << name
+ << " cannot be released from memory as the queue is not durable");
+ return false;
+ }
+}
+
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index f9f249cda8..8f6ae0b967 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -101,7 +101,7 @@ namespace qpid {
bool isExcluded(boost::intrusive_ptr<Message>& msg);
- void dequeued(boost::intrusive_ptr<Message>& msg);
+ void dequeued(const QueuedMessage& msg);
void popAndDequeue();
public:
@@ -180,7 +180,7 @@ namespace qpid {
/**
* dequeue from store (only done once messages is acknowledged)
*/
- bool dequeue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg);
+ bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
/**
* Gets the next available message
@@ -219,6 +219,8 @@ namespace qpid {
template <class F> void eachBinding(const F& f) {
bindings.eachBinding(f);
}
+
+ bool releaseMessageContent(const QueuedMessage&);
};
}
}
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index 08838aac79..8aeaaabd55 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -19,39 +19,78 @@
*
*/
#include "QueuePolicy.h"
+#include "Queue.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize) :
- maxCount(_maxCount), maxSize(_maxSize), count(0), size(0) {}
-
-QueuePolicy::QueuePolicy(const FieldTable& settings) :
- maxCount(getInt(settings, maxCountKey, 0)),
- maxSize(getInt(settings, maxSizeKey, defaultMaxSize)), count(0), size(0) {}
+QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
void QueuePolicy::enqueued(uint64_t _size)
{
- if (maxCount) count++;
+ if (maxCount) ++count;
if (maxSize) size += _size;
}
void QueuePolicy::dequeued(uint64_t _size)
{
- if (maxCount) count--;
+ if (maxCount) --count;
if (maxSize) size -= _size;
}
-bool QueuePolicy::limitExceeded()
+bool QueuePolicy::checkLimit(const QueuedMessage& m)
+{
+ bool exceeded = (maxSize && (size.get() + m.payload->contentSize()) > maxSize) || (maxCount && (count.get() + 1) > maxCount);
+ if (exceeded) {
+ if (!policyExceeded) {
+ policyExceeded = true;
+ QPID_LOG(info, "Queue size exceeded policy for " << m.queue->getName());
+ }
+ } else {
+ if (policyExceeded) {
+ policyExceeded = false;
+ QPID_LOG(info, "Queue size within policy for " << m.queue->getName());
+ }
+ }
+ return !exceeded;
+}
+
+void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+{
+ if (checkLimit(m)) {
+ enqueued(m);
+ } else {
+ std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
+ throw ResourceLimitExceededException(
+ QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
+ << " of size " << m.payload->contentSize() << " , policy: " << *this));
+ }
+}
+
+void QueuePolicy::enqueued(const QueuedMessage& m)
+{
+ enqueued(m.payload->contentSize());
+}
+
+void QueuePolicy::dequeued(const QueuedMessage& m)
+{
+ dequeued(m.payload->contentSize());
+}
+
+bool QueuePolicy::isEnqueued(const QueuedMessage&)
{
- return (maxSize && size > maxSize) || (maxCount && count > maxCount);
+ return true;
}
void QueuePolicy::update(FieldTable& settings)
{
if (maxCount) settings.setInt(maxCountKey, maxCount);
- if (maxSize) settings.setInt(maxSizeKey, maxSize);
+ if (maxSize) settings.setInt(maxSizeKey, maxSize);
+ settings.setString(typeKey, type);
}
@@ -62,6 +101,17 @@ int QueuePolicy::getInt(const FieldTable& settings, const std::string& key, int
else return defaultValue;
}
+std::string QueuePolicy::getType(const FieldTable& settings)
+{
+ FieldTable::ValuePtr v = settings.get(typeKey);
+ if (v && v->convertsTo<std::string>()) {
+ std::string t = v->get<std::string>();
+ transform(t.begin(), t.end(), t.begin(), tolower);
+ if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
+ }
+ return REJECT;
+}
+
void QueuePolicy::setDefaultMaxSize(uint64_t s)
{
defaultMaxSize = s;
@@ -69,20 +119,123 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s)
const std::string QueuePolicy::maxCountKey("qpid.max_count");
const std::string QueuePolicy::maxSizeKey("qpid.max_size");
+const std::string QueuePolicy::typeKey("qpid.policy_type");
+const std::string QueuePolicy::REJECT("reject");
+const std::string QueuePolicy::FLOW_TO_DISK("flow_to_disk");
+const std::string QueuePolicy::RING("ring");
+const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
+FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
+ QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+
+bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+{
+ return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+}
+
+RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+
+void RingQueuePolicy::enqueued(const QueuedMessage& m)
+{
+ QueuePolicy::enqueued(m);
+ qpid::sys::Mutex::ScopedLock l(lock);
+ queue.push_back(m);
+}
+
+void RingQueuePolicy::dequeued(const QueuedMessage& m)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ QueuePolicy::dequeued(m);
+ //find and remove m from queue
+ for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
+ if (i->position == m.position) {
+ queue.erase(i);
+ break;
+ }
+ }
+}
+
+bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ //for non-strict ring policy, a message can be dequeued before acked; need to detect this
+ for (Messages::iterator i = queue.begin(); i != queue.end() && m.position <= i->position; i++) {
+ if (i->position == m.position) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+{
+ if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
+
+ QueuedMessage oldest;
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ oldest = queue.front();
+ }
+ if (oldest.queue->acquire(oldest) || !strict) {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (oldest.position == queue.front().position) {
+ queue.pop_front();
+ QPID_LOG(debug, "Ring policy triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ << ": removed message " << oldest.position << " to make way for " << m.position);
+ }
+ return true;
+ } else {
+ QPID_LOG(debug, "Ring policy could not be triggered in queue "
+ << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
+ //in strict mode, if oldest message has been delivered (hence
+ //cannot be acquired) but not yet acked, it should not be
+ //removed and the attempted enqueue should fail
+ return false;
+ }
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
+{
+ uint32_t maxCount = getInt(settings, maxCountKey, 0);
+ uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
+ if (maxCount || maxSize) {
+ return createQueuePolicy(maxCount, maxSize, getType(settings));
+ } else {
+ return std::auto_ptr<QueuePolicy>();
+ }
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+ if (type == RING || type == RING_STRICT) {
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+ } else if (type == FLOW_TO_DISK) {
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+ } else {
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+ }
+
+}
+
+
namespace qpid {
namespace broker {
std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
{
- if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
- else out << "size unlimited, current=" << p.size;
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+ else out << "size: unlimited";
out << "; ";
- if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
- else out << "count unlimited, current=" << p.count;
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+ else out << "count: unlimited";
+ out << "; type=" << p.type;
return out;
}
}
}
+
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h
index 4511a63b64..d39ce7dc11 100644
--- a/cpp/src/qpid/broker/QueuePolicy.h
+++ b/cpp/src/qpid/broker/QueuePolicy.h
@@ -21,40 +21,85 @@
#ifndef _QueuePolicy_
#define _QueuePolicy_
+#include <deque>
#include <iostream>
+#include <memory>
+#include "QueuedMessage.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
+#include "qpid/sys/Mutex.h"
namespace qpid {
- namespace broker {
- class QueuePolicy
- {
- static const std::string maxCountKey;
- static const std::string maxSizeKey;
-
- static uint64_t defaultMaxSize;
+namespace broker {
+
+class QueuePolicy
+{
+ static uint64_t defaultMaxSize;
- const uint32_t maxCount;
- const uint64_t maxSize;
- uint32_t count;
- uint64_t size;
+ const uint32_t maxCount;
+ const uint64_t maxSize;
+ const std::string type;
+ qpid::sys::AtomicValue<uint32_t> count;
+ qpid::sys::AtomicValue<uint64_t> size;
+ bool policyExceeded;
- static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
-
- public:
- QueuePolicy(uint32_t maxCount, uint64_t maxSize);
- QueuePolicy(const qpid::framing::FieldTable& settings);
- void enqueued(uint64_t size);
- void dequeued(uint64_t size);
- void update(qpid::framing::FieldTable& settings);
- bool limitExceeded();
- uint32_t getMaxCount() const { return maxCount; }
- uint64_t getMaxSize() const { return maxSize; }
-
- static void setDefaultMaxSize(uint64_t);
- friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
- };
- }
-}
+ static int getInt(const qpid::framing::FieldTable& settings, const std::string& key, int defaultValue);
+ static std::string getType(const qpid::framing::FieldTable& settings);
+
+ public:
+ static const std::string maxCountKey;
+ static const std::string maxSizeKey;
+ static const std::string typeKey;
+ static const std::string REJECT;
+ static const std::string FLOW_TO_DISK;
+ static const std::string RING;
+ static const std::string RING_STRICT;
+
+ virtual ~QueuePolicy() {}
+ void tryEnqueue(const QueuedMessage&);
+ virtual void dequeued(const QueuedMessage&);
+ virtual bool isEnqueued(const QueuedMessage&);
+ virtual bool checkLimit(const QueuedMessage&);
+ void update(qpid::framing::FieldTable& settings);
+ uint32_t getMaxCount() const { return maxCount; }
+ uint64_t getMaxSize() const { return maxSize; }
+
+ static std::auto_ptr<QueuePolicy> createQueuePolicy(const qpid::framing::FieldTable& settings);
+ static std::auto_ptr<QueuePolicy> createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+ static void setDefaultMaxSize(uint64_t);
+ friend std::ostream& operator<<(std::ostream&, const QueuePolicy&);
+ protected:
+ QueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
+
+ virtual void enqueued(const QueuedMessage&);
+ void enqueued(uint64_t size);
+ void dequeued(uint64_t size);
+};
+
+
+class FlowToDiskPolicy : public QueuePolicy
+{
+ public:
+ FlowToDiskPolicy(uint32_t maxCount, uint64_t maxSize);
+ bool checkLimit(const QueuedMessage&);
+};
+
+class RingQueuePolicy : public QueuePolicy
+{
+ public:
+ RingQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type = RING);
+ void enqueued(const QueuedMessage&);
+ void dequeued(const QueuedMessage&);
+ bool isEnqueued(const QueuedMessage&);
+ bool checkLimit(const QueuedMessage&);
+ private:
+ typedef std::deque<QueuedMessage> Messages;
+ qpid::sys::Mutex lock;
+ Messages queue;
+ const bool strict;
+};
+
+}}
#endif
diff --git a/cpp/src/qpid/broker/QueuedMessage.h b/cpp/src/qpid/broker/QueuedMessage.h
new file mode 100644
index 0000000000..82f5073d87
--- /dev/null
+++ b/cpp/src/qpid/broker/QueuedMessage.h
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _QueuedMessage_
+#define _QueuedMessage_
+
+#include "Message.h"
+
+namespace qpid {
+namespace broker {
+
+class Queue;
+
+struct QueuedMessage
+{
+ boost::intrusive_ptr<Message> payload;
+ framing::SequenceNumber position;
+ Queue* queue;
+
+ QueuedMessage() : queue(0) {}
+ QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
+ payload(msg), position(sn), queue(q) {}
+ QueuedMessage(Queue* q) : queue(q) {}
+};
+
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 675a6a304c..64bb155c01 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -277,7 +277,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected));
}
if (acquire && !ackExpected) {
- queue->dequeue(0, msg.payload);
+ queue->dequeue(0, msg);
}
return true;
}