summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-08-19 15:56:32 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-08-19 15:56:32 +0000
commite0c2638285f77dc6f024ff91cfaef583c4d69be7 (patch)
treed37d826258a76aaccff0dd40313df0a7bc0407ac
parent6833c389115a220fa0a05453bd19d54e5bd8e891 (diff)
downloadqpid-python-e0c2638285f77dc6f024ff91cfaef583c4d69be7.tar.gz
QPID-3346: checkpoint - free group heuristic and unit test
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1159671 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp279
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp277
2 files changed, 422 insertions, 134 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 853bf09a9c..883d149910 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -101,19 +101,48 @@ class MessageAllocator
MessageAllocator( Queue *q ) : queue(q) {}
virtual ~MessageAllocator() {};
- // assumes caller holds messageLock
- virtual bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
- const Mutex::ScopedLock&);
- /** acquire a message previously browsed via nextMessage(). assume messageLock held
+ // Note: all methods taking a mutex assume the caller is holding the
+ // Queue::messageLock during the method call.
+
+ /** Determine the next message available for consumption by the consumer
+ * @param next set to the next message that the consumer may acquire.
+ * @return true if message is available
+ */
+ virtual bool nextConsumableMessage( Consumer::shared_ptr, QueuedMessage& next,
+ const Mutex::ScopedLock&)
+ {
+ Messages& messages(queue->getMessages());
+ if (!messages.empty()) {
+ next = messages.front(); // by default, consume oldest msg
+ return true;
+ }
+ return false;
+ }
+ /** Determine the next message available for browsing by the consumer
+ * @param next set to the next message that the consumer may browse.
+ * @return true if a message is available
+ */
+ virtual bool nextBrowsableMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock&)
+ {
+ Messages& messages(queue->getMessages());
+ if (!messages.empty() && messages.next(c->position, next))
+ return true;
+ return false;
+ }
+ /** acquire a message previously returned via next*Message().
* @param consumer name of consumer that is attempting to acquire the message
* @param qm the message to be acquired
* @param messageLock - ensures caller is holding it!
* @returns true if acquire is successful, false if acquire failed.
*/
- virtual bool canAcquire( const std::string& consumer, const QueuedMessage& qm,
- const Mutex::ScopedLock&);
+ virtual bool acquireMessage( const std::string&, const QueuedMessage&,
+ const Mutex::ScopedLock&)
+ {
+ return true;
+ }
- /** hook to add any interesting management state to the status map (lock held) */
+ /** hook to add any interesting management state to the status map */
virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {};
};
@@ -125,24 +154,55 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator
const unsigned int timestamp; // mark messages with timestamp if set
struct GroupState {
- //const std::string group; // group identifier
- //Consumer::shared_ptr owner; // consumer with outstanding acquired messages
- std::string owner; // consumer with outstanding acquired messages
+ typedef std::list<framing::SequenceNumber> PositionFifo;
+
+ std::string group; // group identifier
+ std::string owner; // consumer with outstanding acquired messages
uint32_t acquired; // count of outstanding acquired messages
- uint32_t total; // count of enqueued messages in this group
- GroupState() : acquired(0), total(0) {}
+ //uint32_t total; // count of enqueued messages in this group
+ PositionFifo members; // msgs belonging to this group
+
+ GroupState() : acquired(0) {}
+ bool owned() const {return !owner.empty();}
};
typedef std::map<std::string, struct GroupState> GroupMap;
- typedef std::set<std::string> Consumers;
+ typedef std::map<std::string, uint32_t> Consumers; // count of owned groups
+ typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo;
- GroupMap messageGroups;
- Consumers consumers;
+ GroupMap messageGroups; // index: group name
+ GroupFifo freeGroups; // ordered by oldest free msg
+ Consumers consumers; // index: consumer name
static const std::string qpidMessageGroupKey;
static const std::string qpidMessageGroupTimestamp;
static const std::string qpidMessageGroupDefault;
const std::string getGroupId( const QueuedMessage& qm ) const;
+ void unFree( const GroupState& state )
+ {
+ GroupFifo::iterator pos = freeGroups.find( state.members.front() );
+ assert( pos != freeGroups.end() && pos->second == &state );
+ freeGroups.erase( pos );
+ }
+ void own( GroupState& state, const std::string& owner )
+ {
+ state.owner = owner;
+ consumers[state.owner]++;
+ unFree( state );
+ }
+ void disown( GroupState& state )
+ {
+ assert(consumers[state.owner]);
+ consumers[state.owner]--;
+ state.owner.clear();
+ assert(state.members.size());
+#ifdef NDEBUG
+ freeGroups[state.members.front()] = &state;
+#else
+ bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second;
+ (void) unique; assert(unique);
+#endif
+ }
public:
@@ -156,10 +216,11 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator
void dequeued( const QueuedMessage& qm );
void consumerAdded( const Consumer& );
void consumerRemoved( const Consumer& );
- bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
- const Mutex::ScopedLock&);
- bool canAcquire(const std::string& consumer, const QueuedMessage& msg,
- const Mutex::ScopedLock&);
+ bool nextConsumableMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock&);
+ // uses default nextBrowsableMessage()
+ bool acquireMessage(const std::string& consumer, const QueuedMessage& msg,
+ const Mutex::ScopedLock&);
void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const;
bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
};
@@ -339,7 +400,7 @@ bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer)
assertClusterSafe();
QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position);
- if (!allocator->canAcquire( consumer, msg, locker )) {
+ if (!allocator->acquireMessage( consumer, msg, locker )) {
QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name);
return false;
}
@@ -391,7 +452,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
- if (!allocator->nextMessage(c, msg, locker)) { // no next available
+ if (!allocator->nextConsumableMessage(c, msg, locker)) { // no next available
QPID_LOG(debug, "No messages available to dispatch to consumer " <<
c->getName() << " on queue '" << name << "'");
listeners.addListener(c);
@@ -410,7 +471,9 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
- bool ok = acquire( msg.position, msg );
+ bool ok = allocator->acquireMessage( c->getName(), msg, locker ); // inform allocator
+ (void) ok; assert(ok);
+ ok = acquire( msg.position, msg );
(void) ok; assert(ok);
m = msg;
c->position = m.position;
@@ -435,7 +498,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg;
- if (!allocator->nextMessage(c, msg, locker)) { // no next available
+ if (!allocator->nextBrowsableMessage(c, msg, locker)) { // no next available
QPID_LOG(debug, "No browsable messages available for consumer " <<
c->getName() << " on queue '" << name << "'");
listeners.addListener(c);
@@ -1540,15 +1603,31 @@ const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) con
void MessageGroupManager::enqueued( const QueuedMessage& qm )
{
+ // @todo KAG optimization - store reference to group state in QueuedMessage
+ // issue: const-ness??
std::string group( getGroupId(qm) );
- uint32_t total = ++messageGroups[group].total;
+ GroupState &state(messageGroups[group]);
+ state.members.push_back(qm.position);
+ uint32_t total = state.members.size();
QPID_LOG( trace, "group queue " << queue->getName() <<
": added message to group id=" << group << " total=" << total );
+ if (total == 1) {
+ // newly created group, no owner
+ state.group = group;
+#ifdef NDEBUG
+ freeGroups[qm.position] = &state;
+#else
+ bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second;
+ (void) unique; assert(unique);
+#endif
+ }
}
void MessageGroupManager::acquired( const QueuedMessage& qm )
{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
std::string group( getGroupId(qm) );
GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
@@ -1561,16 +1640,20 @@ void MessageGroupManager::acquired( const QueuedMessage& qm )
void MessageGroupManager::requeued( const QueuedMessage& qm )
{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
+ // @todo KAG BUG - how to ensure requeue happens in the correct order?
+ // @todo KAG BUG - if requeue is not in correct order - what do we do? throw?
std::string group( getGroupId(qm) );
GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
assert( state.acquired != 0 );
state.acquired -= 1;
- if (state.acquired == 0 && !state.owner.empty()) {
+ if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << queue->getName() <<
": consumer name=" << state.owner << " released group id=" << gs->first);
- state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
+ disown(state);
}
QPID_LOG( trace, "group queue " << queue->getName() <<
": requeued message to group id=" << group << " acquired=" << state.acquired );
@@ -1579,22 +1662,41 @@ void MessageGroupManager::requeued( const QueuedMessage& qm )
void MessageGroupManager::dequeued( const QueuedMessage& qm )
{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ // issue: const-ness??
std::string group( getGroupId(qm) );
GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
- assert( state.total != 0 );
- uint32_t total = state.total -= 1;
+ assert( state.members.size() != 0 );
+
+ // likely to be at or near begin() if dequeued in order
+ {
+ GroupState::PositionFifo::iterator pos = state.members.begin();
+ GroupState::PositionFifo::iterator end = state.members.end();
+ while (pos != end) {
+ if (*pos == qm.position) {
+ state.members.erase(pos);
+ break;
+ }
+ ++pos;
+ }
+ }
+
assert( state.acquired != 0 );
state.acquired -= 1;
- if (state.total == 0) {
+ uint32_t total = state.members.size();
+ if (total == 0) {
+ if (!state.owned()) { // unlikely, but need to remove from the free list before erase
+ unFree( state );
+ }
QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first);
messageGroups.erase( gs );
} else {
- if (state.acquired == 0 && !state.owner.empty()) {
+ if (state.acquired == 0 && state.owned()) {
QPID_LOG( trace, "group queue " << queue->getName() <<
": consumer name=" << state.owner << " released group id=" << gs->first);
- state.owner.clear(); // KAG TODO: need to invalidate consumer's positions?
+ disown(state);
}
}
QPID_LOG( trace, "group queue " << queue->getName() <<
@@ -1603,81 +1705,84 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm )
void MessageGroupManager::consumerAdded( const Consumer& c )
{
- const std::string& name(c.getName());
- bool unique = consumers.insert( name ).second;
- (void) unique; assert( unique );
- QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer name=" << name );
+ assert(consumers.find(c.getName()) == consumers.end());
+ consumers[c.getName()] = 0; // no groups owned yet
+ QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, name=" << c.getName() );
}
void MessageGroupManager::consumerRemoved( const Consumer& c )
{
const std::string& name(c.getName());
- size_t count = consumers.erase( name );
- (void) count; assert( count == 1 );
+ Consumers::iterator consumer = consumers.find(name);
+ assert(consumer != consumers.end());
+ size_t count = consumer->second;
- bool needReset = false;
for (GroupMap::iterator gs = messageGroups.begin();
- gs != messageGroups.end(); ++gs) {
+ count && gs != messageGroups.end(); ++gs) {
GroupState& state( gs->second );
if (state.owner == name) {
- state.owner.clear();
- needReset = true;
+ --count;
+ disown(state);
QPID_LOG( trace, "group queue " << queue->getName() <<
": consumer name=" << name << " released group id=" << gs->first);
}
}
-
- if (needReset) {
- // KAG TODO: How do I invalidate all consumers that need invalidating????
- }
+ consumers.erase( consumer );
QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name );
}
-bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
- const Mutex::ScopedLock& )
+bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr c, QueuedMessage& next,
+ const Mutex::ScopedLock& )
{
Messages& messages(queue->getMessages());
if (messages.empty())
return false;
- if (c->preAcquires()) { // not browsing
- next = messages.front();
- do {
- /** @todo KAG: horrifingly suboptimal - optimize */
- std::string group( getGroupId( next ) );
- GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */
- assert( gs != messageGroups.end() );
- GroupState& state( gs->second );
- if (state.owner.empty()) {
- state.owner = c->getName();
- QPID_LOG( trace, "group queue " << queue->getName() <<
- ": consumer name=" << c->getName() << " has acquired group id=" << group);
- return true;
- }
- if (state.owner == c->getName()) {
- return true;
- }
- } while (messages.next( next.position, next )); /** @todo: .next() is a linear search from front - optimize */
- return false;
- } else if (messages.next(c->position, next))
- return true;
+ if (!freeGroups.empty()) {
+ framing::SequenceNumber nextFree = freeGroups.begin()->first;
+ if (nextFree < c->position) { // next free group's msg is older than current position
+ bool ok = messages.find(nextFree, next);
+ (void) ok; assert( ok );
+ } else {
+ if (!messages.next( c->position, next ))
+ return false; // shouldn't happen - should find nextFree
+ }
+ } else { // no free groups available
+ if (consumers[c->getName()] == 0) { // and none currently owned
+ return false; // so nothing available to consume
+ }
+ if (!messages.next( c->position, next ))
+ return false;
+ }
+
+ do {
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+ std::string group( getGroupId( next ) );
+ GroupMap::iterator gs = messageGroups.find( group );
+ assert( gs != messageGroups.end() );
+ GroupState& state( gs->second );
+ if (!state.owned() || state.owner == c->getName()) {
+ return true;
+ }
+ } while (messages.next( next.position, next ));
return false;
}
-bool MessageGroupManager::canAcquire(const std::string& consumer, const QueuedMessage& qm,
- const Mutex::ScopedLock&)
+bool MessageGroupManager::acquireMessage(const std::string& consumer, const QueuedMessage& qm,
+ const Mutex::ScopedLock&)
{
+ // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
std::string group( getGroupId(qm) );
GroupMap::iterator gs = messageGroups.find( group );
assert( gs != messageGroups.end() );
GroupState& state( gs->second );
- if (state.owner.empty()) {
- state.owner = consumer;
+ if (!state.owned()) {
+ own( state, consumer );
QPID_LOG( trace, "group queue " << queue->getName() <<
": consumer name=" << consumer << " has acquired group id=" << gs->first);
return true;
@@ -1722,7 +1827,7 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status,
g != messageGroups.end(); ++g) {
qpid::types::Variant::Map info;
info[GroupIdKey] = g->first;
- info[GroupMsgCount] = g->second.total;
+ info[GroupMsgCount] = g->second.members.size();
info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */
info[GroupConsumer] = g->second.owner;
groups.push_back(info);
@@ -1761,33 +1866,5 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q,
-// default allocator - requires messageLock to be held by caller!
-bool MessageAllocator::nextMessage( Consumer::shared_ptr c, QueuedMessage& next,
- const Mutex::ScopedLock& /*just to enforce locking*/)
-{
- Messages& messages(queue->getMessages());
-
- if (messages.empty())
- return false;
-
- if (c->preAcquires()) { // not browsing
- next = messages.front();
- return true;
- } else if (messages.next(c->position, next))
- return true;
- return false;
-}
-
-
-// default allocator - requires messageLock to be held by caller!
-bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&,
- const Mutex::ScopedLock& /*just to enforce locking*/)
-{
- return true; // always give permission to acquire
-}
-
-
-
-
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index e3dfa1452d..98fbc2cba4 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -705,23 +705,48 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) {
BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u);
}
+
+namespace {
+ // helper for group tests
+ void verifyAcquire( Queue::shared_ptr queue,
+ TestConsumer::shared_ptr c,
+ std::deque<QueuedMessage>& results,
+ const std::string& expectedGroup,
+ const int expectedId )
+ {
+ queue->dispatch(c);
+ results.push_back(c->last);
+ std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
+ int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( group, expectedGroup );
+ BOOST_CHECK_EQUAL( id, expectedId );
+ }
+}
+
QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
+ //
+ // Verify that consumers of grouped messages own the groups once a message is acquired,
+ // and release the groups once all acquired messages have been dequeued or requeued
+ //
FieldTable args;
Queue::shared_ptr queue(new Queue("my_queue", true));
args.setString("qpid.group_header_key", "GROUP-ID");
queue->configure(args);
- for (int i = 0; i < 3; ++i) {
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","a");
- queue->deliver(msg1);
-
- intrusive_ptr<Message> msg2 = create_message("e", "A");
- msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","b");
- queue->deliver(msg2);
+ std::string groups[] = { std::string("a"), std::string("a"), std::string("a"),
+ std::string("b"), std::string("b"), std::string("b"),
+ std::string("c"), std::string("c"), std::string("c") };
+ for (int i = 0; i < 9; ++i) {
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", groups[i]);
+ msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", i);
+ queue->deliver(msg);
}
- BOOST_CHECK_EQUAL(uint32_t(6), queue->getMessageCount());
+ // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,
+
+ BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount());
TestConsumer::shared_ptr c1(new TestConsumer("C1"));
TestConsumer::shared_ptr c2(new TestConsumer("C2"));
@@ -729,38 +754,224 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
queue->consume(c1);
queue->consume(c2);
- std::deque<QueuedMessage> dequeMe;
+ std::deque<QueuedMessage> dequeMeC1;
+ std::deque<QueuedMessage> dequeMeC2;
+
+
+ verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0)
+ verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3)
+
+ // now let c1 complete the 'a-0' message - this should free the 'a' group
+ queue->dequeue( 0, dequeMeC1.front() );
+ dequeMeC1.pop_front();
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, ---
+
+ // now c2 should pick up the next 'a-1', since it is oldest free
+ verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b"
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, ---
+
+ // c1 should only be able to snarf up the first "c" message now...
+ verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c"
+
+ // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1
+
+ // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired)
+ queue->dequeue( 0, dequeMeC2.front() );
+ dequeMeC2.pop_front();
+
+ // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1
+
+ // b group is free, c is owned by c1 - c1's next get should grab 'b-4'
+ verifyAcquire(queue, c1, dequeMeC1, "b", 4 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1
- queue->dispatch(c1); // now owns group "a"
- dequeMe.push_back(c1->last);
- queue->dispatch(c2); // now owns group "b"
- dequeMe.push_back(c2->last);
+ // c2 can now only grab a-2, and that's all
+ verifyAcquire(queue, c2, dequeMeC2, "a", 2 );
- queue->dispatch(c2); // should skip next "a", get last "b"
- std::string group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
- dequeMe.push_back(c2->last);
- BOOST_CHECK_EQUAL( group, std::string("b") );
+ // now C2 can't get any more, since C1 owns "b" and "c" group...
+ bool gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
- queue->dispatch(c1); // should get last "a"
- group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
- dequeMe.push_back(c1->last);
- BOOST_CHECK_EQUAL( group, std::string("a") );
+ // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4)
+ queue->dequeue( 0, dequeMeC1.front() );
+ dequeMeC1.pop_front();
- // now "free up" the groups
- while (!dequeMe.empty()) {
- queue->dequeue( 0, dequeMe.front() );
- dequeMe.pop_front();
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ---, ---
+
+ // c2 can now grab c-7
+ verifyAcquire(queue, c2, dequeMeC2, "c", 7 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2
+
+ // what happens if C-2 "requeues" a-1 and a-2?
+ queue->requeue( dequeMeC2.front() );
+ dequeMeC2.pop_front();
+ queue->requeue( dequeMeC2.front() );
+ dequeMeC2.pop_front(); // now just has c-7 acquired
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2
+
+ // now c1 will grab a-1 and a-2...
+ verifyAcquire(queue, c1, dequeMeC1, "a", 1 );
+ verifyAcquire(queue, c1, dequeMeC1, "a", 2 );
+
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2
+
+ // c2 can now acquire c-8 only
+ verifyAcquire(queue, c2, dequeMeC2, "c", 8 );
+
+ // and c1 can get b-5
+ verifyAcquire(queue, c1, dequeMeC1, "b", 5 );
+
+ // should be no more acquire-able for anyone now:
+ gotOne = queue->dispatch(c1);
+ BOOST_CHECK( !gotOne );
+ gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ // requeue all of C1's acquired messages, then cancel C1
+ while (!dequeMeC1.empty()) {
+ queue->requeue(dequeMeC1.front());
+ dequeMeC1.pop_front();
}
+ queue->cancel(c1);
- // now c2 should be able to acquire group "a", and c1 group "b"
+ // Queue = a-1, a-2, b-4, b-5, c-7, c-8...
+ // Owners= ---, ---, ---, ---, ^C2, ^C2
- queue->dispatch(c2);
- group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
- BOOST_CHECK_EQUAL( group, std::string("a") );
+ // b-4, a-1, a-2, b-5 all should be available, right?
+ verifyAcquire(queue, c2, dequeMeC2, "a", 1 );
- queue->dispatch(c1);
- group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID");
- BOOST_CHECK_EQUAL( group, std::string("b") );
+ while (!dequeMeC2.empty()) {
+ queue->dequeue(0, dequeMeC2.front());
+ dequeMeC2.pop_front();
+ }
+
+ // Queue = a-2, b-4, b-5
+ // Owners= ---, ---, ---
+
+ TestConsumer::shared_ptr c3(new TestConsumer("C3"));
+ std::deque<QueuedMessage> dequeMeC3;
+
+ verifyAcquire(queue, c3, dequeMeC3, "a", 2 );
+ verifyAcquire(queue, c2, dequeMeC2, "b", 4 );
+
+ // Queue = a-2, b-4, b-5
+ // Owners= ^C3, ^C2, ^C2
+
+ gotOne = queue->dispatch(c3);
+ BOOST_CHECK( !gotOne );
+
+ verifyAcquire(queue, c2, dequeMeC2, "b", 5 );
+
+ while (!dequeMeC2.empty()) {
+ queue->dequeue(0, dequeMeC2.front());
+ dequeMeC2.pop_front();
+ }
+
+ // Queue = a-2,
+ // Owners= ^C3,
+
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", "a");
+ msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", 9);
+ queue->deliver(msg);
+
+ // Queue = a-2, a-9
+ // Owners= ^C3, ^C3
+
+ gotOne = queue->dispatch(c2);
+ BOOST_CHECK( !gotOne );
+
+ msg = create_message("e", "A");
+ msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", "b");
+ msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", 10);
+ queue->deliver(msg);
+
+ // Queue = a-2, a-9, b-10
+ // Owners= ^C3, ^C3, ----
+
+ verifyAcquire(queue, c2, dequeMeC2, "b", 10 );
+ verifyAcquire(queue, c3, dequeMeC3, "a", 9 );
+
+ gotOne = queue->dispatch(c3);
+ BOOST_CHECK( !gotOne );
+
+ queue->cancel(c2);
+ queue->cancel(c3);
+}
+
+
+QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
+ //
+ // Verify that the same default group name is automatically applied to messages that
+ // do not specify a group name.
+ //
+ FieldTable args;
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ args.setString("qpid.group_header_key", "GROUP-ID");
+ queue->configure(args);
+
+ for (int i = 0; i < 3; ++i) {
+ intrusive_ptr<Message> msg = create_message("e", "A");
+ // no "GROUP-ID" header
+ msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", i);
+ queue->deliver(msg);
+ }
+
+ // Queue = 0, 1, 2
+
+ BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount());
+
+ TestConsumer::shared_ptr c1(new TestConsumer("C1"));
+ TestConsumer::shared_ptr c2(new TestConsumer("C2"));
+
+ queue->consume(c1);
+ queue->consume(c2);
+
+ std::deque<QueuedMessage> dequeMeC1;
+ std::deque<QueuedMessage> dequeMeC2;
+
+ queue->dispatch(c1); // c1 now owns default group (acquired 0)
+ dequeMeC1.push_back(c1->last);
+ int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 0 );
+
+ bool gotOne = queue->dispatch(c2); // c2 should get nothing
+ BOOST_CHECK( !gotOne );
+
+ queue->dispatch(c1); // c1 now acquires 1
+ dequeMeC1.push_back(c1->last);
+ id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 1 );
+
+ gotOne = queue->dispatch(c2); // c2 should still get nothing
+ BOOST_CHECK( !gotOne );
+
+ while (!dequeMeC1.empty()) {
+ queue->dequeue(0, dequeMeC1.front());
+ dequeMeC1.pop_front();
+ }
+
+ // now default group should be available...
+ queue->dispatch(c2); // c2 now owns default group (acquired 2)
+ id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID");
+ BOOST_CHECK_EQUAL( id, 2 );
+
+ gotOne = queue->dispatch(c1); // c1 should get nothing
+ BOOST_CHECK( !gotOne );
queue->cancel(c1);
queue->cancel(c2);