summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-09-27 19:20:49 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-09-27 19:20:49 +0000
commit64fc8f6eee884ce3fea3fd98fe8da8ee92e67b03 (patch)
tree425ec6843e0132a28730e8e0cd3008934f84af16
parent5d80e3df1afe0d47e34b9e2c39b3dfffb462d63d (diff)
downloadqpid-python-64fc8f6eee884ce3fea3fd98fe8da8ee92e67b03.tar.gz
QPID-3346: incorporate more review feedback
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1176538 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/Makefile.am3
-rw-r--r--qpid/cpp/src/qpid/broker/FifoAllocator.cpp (renamed from qpid/cpp/src/qpid/broker/MessageAllocator.cpp)34
-rw-r--r--qpid/cpp/src/qpid/broker/FifoAllocator.h58
-rw-r--r--qpid/cpp/src/qpid/broker/MessageAllocator.h47
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.cpp63
-rw-r--r--qpid/cpp/src/qpid/broker/MessageGroupManager.h25
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp25
8 files changed, 160 insertions, 97 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 1088693949..c06b4a5310 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -975,7 +975,7 @@ set (qpidbroker_SOURCES
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
qpid/broker/QueueListeners.cpp
- qpid/broker/MessageAllocator.cpp
+ qpid/broker/FifoAllocator.cpp
qpid/broker/MessageGroupManager.cpp
qpid/broker/PersistableMessage.cpp
qpid/broker/Bridge.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 2f1bd47dd8..c56c2b1fb2 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -672,7 +672,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Vhost.cpp \
qpid/broker/Vhost.h \
qpid/broker/MessageAllocator.h \
- qpid/broker/MessageAllocator.cpp \
+ qpid/broker/FifoAllocator.h \
+ qpid/broker/FifoAllocator.cpp \
qpid/broker/MessageGroupManager.cpp \
qpid/broker/MessageGroupManager.h \
qpid/management/ManagementAgent.cpp \
diff --git a/qpid/cpp/src/qpid/broker/MessageAllocator.cpp b/qpid/cpp/src/qpid/broker/FifoAllocator.cpp
index c8cf792bc7..edbf953ad7 100644
--- a/qpid/cpp/src/qpid/broker/MessageAllocator.cpp
+++ b/qpid/cpp/src/qpid/broker/FifoAllocator.cpp
@@ -19,18 +19,17 @@
*
*/
-/* Used by queues to allocate the next "most desirable" message to a consuming client */
#include "qpid/broker/Queue.h"
-#include "qpid/broker/MessageAllocator.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/broker/FifoAllocator.h"
using namespace qpid::broker;
-bool MessageAllocator::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next,
- const qpid::sys::Mutex::ScopedLock&)
+FifoAllocator::FifoAllocator(Messages& container)
+ : messages(container) {}
+
+bool FifoAllocator::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next )
{
- Messages& messages(queue->getMessages());
if (!messages.empty()) {
next = messages.front(); // by default, consume oldest msg
return true;
@@ -38,25 +37,22 @@ bool MessageAllocator::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessa
return false;
}
-bool MessageAllocator::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
- const qpid::sys::Mutex::ScopedLock&)
+bool FifoAllocator::allocate(const std::string&, const QueuedMessage& )
{
- Messages& messages(queue->getMessages());
- if (!messages.empty() && messages.next(c->position, next))
- return true;
- return false;
+ // by default, all messages present on the queue may be allocated as they have yet to
+ // be acquired.
+ return true;
}
-
-bool MessageAllocator::acquirable( const std::string&,
- const QueuedMessage&,
- const qpid::sys::Mutex::ScopedLock&)
+bool FifoAllocator::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- // by default, all messages present on the queue are acquireable
- return true;
+ if (!messages.empty() && messages.next(c->position, next))
+ return true;
+ return false;
}
-void MessageAllocator::query(qpid::types::Variant::Map&, const qpid::sys::Mutex::ScopedLock&) const
+void FifoAllocator::query(qpid::types::Variant::Map&) const
{
+ // nothing to see here....
}
diff --git a/qpid/cpp/src/qpid/broker/FifoAllocator.h b/qpid/cpp/src/qpid/broker/FifoAllocator.h
new file mode 100644
index 0000000000..6f8e972a1b
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/FifoAllocator.h
@@ -0,0 +1,58 @@
+#ifndef _broker_FifoAllocator_h
+#define _broker_FifoAllocator_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/** Simple MessageAllocator for FIFO Queues - the HEAD message is always the next
+ * available message for consumption.
+ */
+
+#include "qpid/broker/MessageAllocator.h"
+
+namespace qpid {
+namespace broker {
+
+class Messages;
+
+class FifoAllocator : public MessageAllocator
+{
+ public:
+ FifoAllocator(Messages& container);
+
+ /** Locking Note: all methods assume the caller is holding the Queue::messageLock
+ * during the method call.
+ */
+
+ /** MessageAllocator interface */
+
+ bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+ bool allocate(const std::string& consumer, const QueuedMessage& target);
+ bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next );
+ void query(qpid::types::Variant::Map&) const;
+
+ private:
+ Messages& messages;
+};
+
+}}
+
+#endif
diff --git a/qpid/cpp/src/qpid/broker/MessageAllocator.h b/qpid/cpp/src/qpid/broker/MessageAllocator.h
index e22aee8bf4..729e406abf 100644
--- a/qpid/cpp/src/qpid/broker/MessageAllocator.h
+++ b/qpid/cpp/src/qpid/broker/MessageAllocator.h
@@ -22,7 +22,9 @@
*
*/
-/* Used by queues to allocate the next "most desirable" message to a consuming client */
+/** Abstraction used by Queue to determine the next "most desirable" message to provide to
+ * a particular consuming client
+ */
#include "qpid/broker/Consumer.h"
@@ -30,48 +32,43 @@
namespace qpid {
namespace broker {
-class Queue;
struct QueuedMessage;
class MessageAllocator
{
- protected:
- Queue *queue;
public:
- MessageAllocator( Queue *q ) : queue(q) {}
virtual ~MessageAllocator() {};
- // Note: all methods taking a mutex assume the caller is holding the
- // Queue::messageLock during the method call.
+ /** Locking Note: all methods assume the caller is holding the Queue::messageLock
+ * during the method call.
+ */
/** Determine the next message available for consumption by the consumer
- * @param next set to the next message that the consumer may acquire.
- * @return true if message is available
+ * @param consumer the consumer that needs a message to consume
+ * @param next set to the next message that the consumer may consume.
+ * @return true if message is available and next is set
*/
virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
- QueuedMessage& next,
- const sys::Mutex::ScopedLock& lock);
+ QueuedMessage& next ) = 0;
+
+ /** Allow the comsumer to take ownership of the given message.
+ * @param consumer the name of the consumer that is attempting to acquire the message
+ * @param qm the message to be acquired, previously returned from nextConsumableMessage()
+ * @return true if ownership is permitted, false if ownership cannot be assigned.
+ */
+ virtual bool allocate( const std::string& consumer,
+ const QueuedMessage& target) = 0;
/** Determine the next message available for browsing by the consumer
+ * @param consumer the consumer that is browsing the queue
* @param next set to the next message that the consumer may browse.
- * @return true if a message is available
+ * @return true if a message is available and next is returned
*/
virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
- QueuedMessage& next,
- const sys::Mutex::ScopedLock& lock);
-
- /** check if a message previously returned via next*Message() may be acquired.
- * @param consumer name of consumer that is attempting to acquire the message
- * @param qm the message to be acquired
- * @param messageLock - ensures caller is holding it!
- * @return true if acquire is permitted, false if acquire is no longer permitted.
- */
- virtual bool acquirable( const std::string&,
- const QueuedMessage&,
- const sys::Mutex::ScopedLock&);
+ QueuedMessage& next ) = 0;
/** hook to add any interesting management state to the status map */
- virtual void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const;
+ virtual void query(qpid::types::Variant::Map&) const = 0;
};
}}
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
index 78a3f10056..485dc763cf 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
@@ -61,7 +61,7 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm )
GroupState &state(messageGroups[group]);
state.members.push_back(qm.position);
uint32_t total = state.members.size();
- QPID_LOG( trace, "group queue " << queue->getName() <<
+ QPID_LOG( trace, "group queue " << qName <<
": added message to group id=" << group << " total=" << total );
if (total == 1) {
// newly created group, no owner
@@ -81,7 +81,7 @@ void MessageGroupManager::acquired( const QueuedMessage& qm )
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
state.acquired += 1;
- QPID_LOG( trace, "group queue " << queue->getName() <<
+ QPID_LOG( trace, "group queue " << qName <<
": acquired message in group id=" << group << " acquired=" << state.acquired );
}
@@ -99,11 +99,11 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
assert( state.acquired != 0 );
state.acquired -= 1;
if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << queue->getName() <<
+ QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << gs->first);
disown(state);
}
- QPID_LOG( trace, "group queue " << queue->getName() <<
+ QPID_LOG( trace, "group queue " << qName <<
": requeued message to group id=" << group << " acquired=" << state.acquired );
}
@@ -138,16 +138,16 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
if (!state.owned()) { // unlikely, but need to remove from the free list before erase
unFree( state );
}
- QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first);
+ QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first);
messageGroups.erase( gs );
} else {
if (state.acquired == 0 && state.owned()) {
- QPID_LOG( trace, "group queue " << queue->getName() <<
+ QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << state.owner << " released group id=" << gs->first);
disown(state);
}
}
- QPID_LOG( trace, "group queue " << queue->getName() <<
+ QPID_LOG( trace, "group queue " << qName <<
": dequeued message from group id=" << group << " total=" << total );
}
@@ -155,7 +155,7 @@ void MessageGroupManager::consumerAdded( const Consumer& c )
{
assert(consumers.find(c.getName()) == consumers.end());
consumers[c.getName()] = 0; // no groups owned yet
- QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, name=" << c.getName() );
+ QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() );
}
void MessageGroupManager::consumerRemoved( const Consumer& c )
@@ -172,20 +172,17 @@ void MessageGroupManager::consumerRemoved( const Consumer& c )
if (state.owner == name) {
--count;
disown(state);
- QPID_LOG( trace, "group queue " << queue->getName() <<
+ QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << name << " released group id=" << gs->first);
}
}
consumers.erase( consumer );
- QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name );
+ QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name );
}
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
- const qpid::sys::Mutex::ScopedLock& )
+bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
{
- Messages& messages(queue->getMessages());
-
if (messages.empty())
return false;
@@ -220,8 +217,7 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued
}
-bool MessageGroupManager::acquirable(const std::string& consumer, const QueuedMessage& qm,
- const qpid::sys::Mutex::ScopedLock&)
+bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm)
{
// @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
std::string group( getGroupId(qm) );
@@ -231,16 +227,22 @@ bool MessageGroupManager::acquirable(const std::string& consumer, const QueuedMe
if (!state.owned()) {
own( state, consumer );
- QPID_LOG( trace, "group queue " << queue->getName() <<
+ QPID_LOG( trace, "group queue " << qName <<
": consumer name=" << consumer << " has acquired group id=" << gs->first);
return true;
}
return state.owner == consumer;
}
+bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+{
+ // browse: allow access to any aquired msg, regardless of group ownership (?ok?)
+ if (!messages.empty() && messages.next(c->position, next))
+ return true;
+ return false;
+}
-void MessageGroupManager::query(qpid::types::Variant::Map& status,
- const qpid::sys::Mutex::ScopedLock&) const
+void MessageGroupManager::query(qpid::types::Variant::Map& status) const
{
/** Add a description of the current state of the message groups for this queue.
FORMAT:
@@ -276,7 +278,8 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status,
}
-boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
+boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName,
+ Messages& messages,
const qpid::framing::FieldTable& settings )
{
boost::shared_ptr<MessageGroupManager> empty;
@@ -285,16 +288,14 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
std::string headerKey = settings.getAsString(qpidMessageGroupKey);
if (headerKey.empty()) {
- QPID_LOG( error, "A Message Group header key must be configured, queue=" << q->getName());
+ QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);
return empty;
}
unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
- boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, q, timestamp ) );
-
- q->addObserver( boost::static_pointer_cast<QueueObserver>(manager) );
+ boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) );
- QPID_LOG( debug, "Configured Queue '" << q->getName() <<
+ QPID_LOG( debug, "Configured Queue '" << qName <<
"' for message grouping using header key '" << headerKey << "'" <<
" (timestamp=" << timestamp << ")");
return manager;
@@ -346,7 +347,7 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const
}
state.setArray(GROUP_STATE, groupState);
- QPID_LOG(debug, "Queue \"" << queue->getName() << "\": replicating message group state, key=" << groupIdHeader);
+ QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader);
}
@@ -363,7 +364,7 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
bool ok = state.getArray(GROUP_STATE, groupState);
if (!ok) {
QPID_LOG(error, "Unable to find message group state information for queue \"" <<
- queue->getName() << "\": cluster inconsistency error!");
+ qName << "\": cluster inconsistency error!");
return;
}
@@ -373,13 +374,13 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
ok = framing::getEncodedValue<FieldTable>(*g, group);
if (!ok) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
- queue->getName() << "\": table encoding error!");
+ qName << "\": table encoding error!");
return;
}
MessageGroupManager::GroupState state;
if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
- queue->getName() << "\": fields missing error!");
+ qName << "\": fields missing error!");
return;
}
state.group = group.getAsString(GROUP_NAME);
@@ -389,7 +390,7 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
ok = group.getArray(GROUP_POSITIONS, positions);
if (!ok) {
QPID_LOG(error, "Invalid message group state information for queue \"" <<
- queue->getName() << "\": position encoding error!");
+ qName << "\": position encoding error!");
return;
}
@@ -404,5 +405,5 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state)
}
}
- QPID_LOG(debug, "Queue \"" << queue->getName() << "\": message group state replicated, key =" << groupIdHeader)
+ QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader)
}
diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
index a9b73ade5d..e55374f104 100644
--- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h
+++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h
@@ -38,6 +38,8 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato
{
const std::string groupIdHeader; // msg header holding group identifier
const unsigned int timestamp; // mark messages with timestamp if set
+ Messages& messages; // parent Queue's in memory message container
+ const std::string qName; // name of parent queue (for logs)
struct GroupState {
typedef std::list<framing::SequenceNumber> PositionFifo;
@@ -93,11 +95,14 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato
public:
- static boost::shared_ptr<MessageGroupManager> create( Queue *q, const qpid::framing::FieldTable& settings );
+ static boost::shared_ptr<MessageGroupManager> create( const std::string& qName,
+ Messages& messages,
+ const qpid::framing::FieldTable& settings );
- MessageGroupManager(const std::string& header, Queue *q, unsigned int _timestamp=0 )
- : StatefulQueueObserver(std::string("MessageGroupManager:") + header), MessageAllocator(q),
- groupIdHeader( header ), timestamp(_timestamp) {}
+ MessageGroupManager(const std::string& header, const std::string& _qName,
+ Messages& container, unsigned int _timestamp=0 )
+ : StatefulQueueObserver(std::string("MessageGroupManager:") + header),
+ groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {}
void enqueued( const QueuedMessage& qm );
void acquired( const QueuedMessage& qm );
void requeued( const QueuedMessage& qm );
@@ -107,12 +112,12 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageAllocato
void getState(qpid::framing::FieldTable& state ) const;
void setState(const qpid::framing::FieldTable&);
- bool nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next,
- const sys::Mutex::ScopedLock&);
- // uses default nextBrowsableMessage()
- bool acquirable(const std::string& consumer, const QueuedMessage& msg,
- const sys::Mutex::ScopedLock&);
- void query(qpid::types::Variant::Map&, const sys::Mutex::ScopedLock&) const;
+ // MessageAllocator iface
+ bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+ bool allocate(const std::string& c, const QueuedMessage& qm);
+ bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+ void query(qpid::types::Variant::Map&) const;
+
bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
};
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 06b7780599..56df6cb233 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -33,7 +33,7 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/ThresholdAlerts.h"
-#include "qpid/broker/MessageAllocator.h"
+#include "qpid/broker/FifoAllocator.h"
#include "qpid/broker/MessageGroupManager.h"
#include "qpid/StringUtils.h"
@@ -115,7 +115,7 @@ Queue::Queue(const string& _name, bool _autodelete,
deleted(false),
barrier(*this),
autoDeleteTimeout(0),
- allocator(new MessageAllocator( this ))
+ allocator(new FifoAllocator( *messages ))
{
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
@@ -249,7 +249,7 @@ bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
assertClusterSafe();
QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
- if (!allocator->acquirable( consumer, msg, locker )) {
+ if (!allocator->allocate( consumer, msg )) {
QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
return false;
}
@@ -300,7 +300,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
- if (!allocator->nextConsumableMessage(c, msg, locker)) { // no next available
+ if (!allocator->nextConsumableMessage(c, msg)) { // no next available
QPID_LOG(debug, "No messages available to dispatch to consumer " <<
c->getName() << " on queue '" << name << "'");
listeners.addListener(c);
@@ -319,7 +319,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
- bool ok = allocator->acquirable( c->getName(), msg, locker ); // inform allocator
+ bool ok = allocator->allocate( c->getName(), msg ); // inform allocator
(void) ok; assert(ok);
ok = acquire( msg.position, msg, locker);
(void) ok; assert(ok);
@@ -346,7 +346,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c)
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
- if (!allocator->nextBrowsableMessage(c, msg, locker)) { // no next available
+ if (!allocator->nextBrowsableMessage(c, msg)) { // no next available
QPID_LOG(debug, "No browsable messages available for consumer " <<
c->getName() << " on queue '" << name << "'");
listeners.addListener(c);
@@ -990,22 +990,27 @@ void Queue::configureImpl(const FieldTable& _settings)
if (lvqKey.size()) {
QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey);
messages = std::auto_ptr<Messages>(new MessageMap(lvqKey));
+ allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( *messages ));
} else if (_settings.get(qpidLastValueQueueNoBrowse)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker);
+ allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( *messages ));
} else if (_settings.get(qpidLastValueQueue)) {
QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue");
messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker);
+ allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( *messages ));
} else {
std::auto_ptr<Messages> m = Fairshare::create(_settings);
if (m.get()) {
messages = m;
+ allocator = boost::shared_ptr<MessageAllocator>(new FifoAllocator( *messages ));
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
} else { // default (FIFO) queue type
// override default message allocator if message groups configured.
- boost::shared_ptr<MessageAllocator> ma = boost::static_pointer_cast<MessageAllocator>(MessageGroupManager::create( this, _settings ));
- if (ma) {
- allocator = ma;
+ boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings));
+ if (mgm) {
+ allocator = mgm;
+ addObserver(mgm);
}
}
}
@@ -1300,7 +1305,7 @@ void Queue::query(qpid::types::Variant::Map& results) const
{
Mutex::ScopedLock locker(messageLock);
/** @todo add any interesting queue state into results */
- if (allocator) allocator->query( results, messageLock );
+ if (allocator) allocator->query(results);
}
void Queue::setPosition(SequenceNumber n) {