summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp138
1 files changed, 63 insertions, 75 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 08ee133981..c3b14688d6 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -56,7 +56,7 @@ using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
@@ -76,16 +76,16 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
- name(_name),
+ name(_name),
autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(_owner),
consumerCount(0),
exclusive(0),
noLocal(false),
@@ -182,9 +182,9 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){
void Queue::recover(boost::intrusive_ptr<Message>& msg){
push(msg, true);
- if (store){
+ if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
+ msg->addToSyncList(shared_from_this(), store);
}
msg->enqueueComplete(); // mark the message as enqueued
mgntEnqStats(msg);
@@ -192,7 +192,7 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
- msg->releaseContent(store);
+ msg->releaseContent(true);
}
}
@@ -209,13 +209,13 @@ void Queue::requeue(const QueuedMessage& msg){
if (!isEnqueued(msg)) return;
QueueListeners::NotificationSet copy;
- {
+ {
Mutex::ScopedLock locker(messageLock);
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
listeners.populate(copy);
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
msg.payload->forcePersistent();
if (msg.payload->isForcedPersistent() ){
@@ -234,7 +234,7 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
}
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -258,7 +258,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
QPID_LOG(debug, "attempting to acquire " << msg.position);
for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set
- || (lastValueQueue && (i->position == msg.position) &&
+ || (lastValueQueue && (i->position == msg.position) &&
msg.payload.get() == checkLvqReplace(*i).payload.get()) ) {
clearLVQIndex(msg);
@@ -296,7 +296,7 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
case NO_MESSAGES:
default:
return false;
- }
+ }
} else {
return browseNextMessage(m, c);
}
@@ -317,7 +317,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
//enqueued and so is not available for consumption yet,
//register consumer for notification when this changes
listeners.addListener(c);
- return false;
+ return false;
} else {
//check that consumer has sufficient credit for the
//message (if it does not, no need to register it for
@@ -332,7 +332,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
+ if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -345,7 +345,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
}
if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
+ if (c->accept(msg.payload)) {
m = msg;
popMsg(msg);
return CONSUMED;
@@ -358,7 +358,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
return CANT_CONSUME;
- }
+ }
}
}
}
@@ -423,7 +423,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
if (c->position < getFront().position) {
msg = getFront();
return true;
- } else {
+ } else {
//TODO: can improve performance of this search, for now just searching linearly from end
Messages::reverse_iterator pos;
for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) {
@@ -524,7 +524,7 @@ void Queue::purgeExpired()
*/
uint32_t Queue::purge(const uint32_t purge_request){
Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
+ uint32_t purge_count = purge_request; // only comes into play if >0
uint32_t count = 0;
// Either purge them all or just the some (purge_count) while the queue isn't empty.
@@ -537,7 +537,7 @@ uint32_t Queue::purge(const uint32_t purge_request){
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
while((!qty || move_count--) && !messages.empty()) {
@@ -566,15 +566,16 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
Messages dequeues;
QueueListeners::NotificationSet copy;
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
+ msg->setStore(store);
if (policy.get()) {
policy->tryEnqueue(qm);
//depending on policy, may have some dequeues
if (!isRecovery) pendingDequeues.swap(dequeues);
}
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-
+
LVQ::iterator i;
const framing::FieldTable* ft = msg->getApplicationHeaders();
if (lastValueQueue && ft){
@@ -584,7 +585,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (i == lvq.end() || msg->isUpdateMessage()){
messages.push_back(qm);
listeners.populate(copy);
- lvq[key] = msg;
+ lvq[key] = msg;
}else {
boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
if (!old) old = i->second;
@@ -594,10 +595,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
//recovery is complete
pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
} else {
- Mutex::ScopedUnlock u(messageLock);
+ Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
- }
+ }
}else {
messages.push_back(qm);
listeners.populate(copy);
@@ -632,8 +633,8 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
if (ft) {
string key = ft->getAsString(qpidVQMatchProperty);
if (lvq.find(key) != lvq.end()){
- lvq[key] = replacement;
- }
+ lvq[key] = replacement;
+ }
}
msg.payload = replacement;
}
@@ -644,7 +645,7 @@ QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg)
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
-
+
uint32_t count = 0;
for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
//NOTE: don't need to use checkLvqReplace() here as it
@@ -652,7 +653,7 @@ uint32_t Queue::getMessageCount() const
//so the enqueueComplete check has no effect
if ( i->payload->isEnqueueComplete() ) count ++;
}
-
+
return count;
}
@@ -696,13 +697,13 @@ void Queue::setLastNodeFailure()
}
}
-// return true if store exists,
+// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
msg->addTraceId(traceId);
}
@@ -716,13 +717,13 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
return false;
}
-// return true if store exists,
+// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) {
+ if (!ctxt) {
dequeued(msg);
}
}
@@ -738,7 +739,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ dequeued(msg);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -794,7 +795,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
QPID_LOG(debug, "Configured queue as Last Value Queue No Browse");
lastValueQueue = lastValueQueueNoBrowse;
}
-
+
persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
@@ -803,7 +804,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
if (excludeList.size()) {
split(traceExclude, excludeList, ", ");
}
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
eventMode = _settings.getAsInt(qpidQueueEventGeneration);
@@ -859,9 +860,9 @@ const QueuePolicy* Queue::getPolicy()
return policy.get();
}
-uint64_t Queue::getPersistenceId() const
-{
- return persistenceId;
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -880,18 +881,18 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
persistenceId = _persistenceId;
}
-void Queue::encode(Buffer& buffer) const
+void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(settings);
- if (policy.get()) {
+ if (policy.get()) {
buffer.put(*policy);
}
}
uint32_t Queue::encodedSize() const
{
- return name.size() + 1/*short string size octet*/ + settings.encodedSize()
+ return name.size() + 1/*short string size octet*/ + settings.encodedSize()
+ (policy.get() ? (*policy).encodedSize() : 0);
}
@@ -922,50 +923,50 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
{
- if (broker.getQueues().destroyIf(queue->getName(),
+ if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
queue->unbind(broker.getExchanges(), queue);
queue->destroy();
}
}
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
-{
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
Mutex::ScopedLock locker(ownershipLock);
- return o == owner;
+ return o == owner;
}
-void Queue::releaseExclusiveOwnership()
-{
+void Queue::releaseExclusiveOwnership()
+{
Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
+ owner = 0;
}
-bool Queue::setExclusiveOwner(const OwnershipToken* const o)
-{
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
return false;
} else {
- owner = o;
+ owner = o;
return true;
}
}
-bool Queue::hasExclusiveOwner() const
-{
+bool Queue::hasExclusiveOwner() const
+{
Mutex::ScopedLock locker(ownershipLock);
- return owner != 0;
+ return owner != 0;
}
-bool Queue::hasExclusiveConsumer() const
-{
- return exclusive;
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
- if (externalQueueStore!=inst && externalQueueStore)
- delete externalQueueStore;
+ if (externalQueueStore!=inst && externalQueueStore)
+ delete externalQueueStore;
externalQueueStore = inst;
if (inst) {
@@ -975,19 +976,6 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
-bool Queue::releaseMessageContent(const QueuedMessage& m)
-{
- if (store && !NullMessageStore::isNullStore(store)) {
- QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory");
- m.payload->releaseContent(store);
- return true;
- } else {
- QPID_LOG(warning, "Message " << m.position << " on " << name
- << " cannot be released from memory as the queue is not durable");
- return false;
- }
-}
-
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
@@ -1062,7 +1050,7 @@ bool Queue::isEnqueued(const QueuedMessage& msg)
void Queue::addPendingDequeue(const QueuedMessage& msg)
{
//assumes lock is held - true at present but rather nasty as this is a public method
- pendingDequeues.push_back(msg);
+ pendingDequeues.push_back(msg);
}
QueueListeners& Queue::getListeners() { return listeners; }