diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-19 15:56:32 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-08-19 15:56:32 +0000 |
commit | e0c2638285f77dc6f024ff91cfaef583c4d69be7 (patch) | |
tree | d37d826258a76aaccff0dd40313df0a7bc0407ac | |
parent | 6833c389115a220fa0a05453bd19d54e5bd8e891 (diff) | |
download | qpid-python-e0c2638285f77dc6f024ff91cfaef583c4d69be7.tar.gz |
QPID-3346: checkpoint - free group heuristic and unit test
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1159671 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 279 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 277 |
2 files changed, 422 insertions, 134 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 853bf09a9c..883d149910 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -101,19 +101,48 @@ class MessageAllocator MessageAllocator( Queue *q ) : queue(q) {} virtual ~MessageAllocator() {}; - // assumes caller holds messageLock - virtual bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next, - const Mutex::ScopedLock&); - /** acquire a message previously browsed via nextMessage(). assume messageLock held + // Note: all methods taking a mutex assume the caller is holding the + // Queue::messageLock during the method call. + + /** Determine the next message available for consumption by the consumer + * @param next set to the next message that the consumer may acquire. + * @return true if message is available + */ + virtual bool nextConsumableMessage( Consumer::shared_ptr, QueuedMessage& next, + const Mutex::ScopedLock&) + { + Messages& messages(queue->getMessages()); + if (!messages.empty()) { + next = messages.front(); // by default, consume oldest msg + return true; + } + return false; + } + /** Determine the next message available for browsing by the consumer + * @param next set to the next message that the consumer may browse. + * @return true if a message is available + */ + virtual bool nextBrowsableMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock&) + { + Messages& messages(queue->getMessages()); + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; + } + /** acquire a message previously returned via next*Message(). * @param consumer name of consumer that is attempting to acquire the message * @param qm the message to be acquired * @param messageLock - ensures caller is holding it! * @returns true if acquire is successful, false if acquire failed. */ - virtual bool canAcquire( const std::string& consumer, const QueuedMessage& qm, - const Mutex::ScopedLock&); + virtual bool acquireMessage( const std::string&, const QueuedMessage&, + const Mutex::ScopedLock&) + { + return true; + } - /** hook to add any interesting management state to the status map (lock held) */ + /** hook to add any interesting management state to the status map */ virtual void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const {}; }; @@ -125,24 +154,55 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator const unsigned int timestamp; // mark messages with timestamp if set struct GroupState { - //const std::string group; // group identifier - //Consumer::shared_ptr owner; // consumer with outstanding acquired messages - std::string owner; // consumer with outstanding acquired messages + typedef std::list<framing::SequenceNumber> PositionFifo; + + std::string group; // group identifier + std::string owner; // consumer with outstanding acquired messages uint32_t acquired; // count of outstanding acquired messages - uint32_t total; // count of enqueued messages in this group - GroupState() : acquired(0), total(0) {} + //uint32_t total; // count of enqueued messages in this group + PositionFifo members; // msgs belonging to this group + + GroupState() : acquired(0) {} + bool owned() const {return !owner.empty();} }; typedef std::map<std::string, struct GroupState> GroupMap; - typedef std::set<std::string> Consumers; + typedef std::map<std::string, uint32_t> Consumers; // count of owned groups + typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; - GroupMap messageGroups; - Consumers consumers; + GroupMap messageGroups; // index: group name + GroupFifo freeGroups; // ordered by oldest free msg + Consumers consumers; // index: consumer name static const std::string qpidMessageGroupKey; static const std::string qpidMessageGroupTimestamp; static const std::string qpidMessageGroupDefault; const std::string getGroupId( const QueuedMessage& qm ) const; + void unFree( const GroupState& state ) + { + GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + assert( pos != freeGroups.end() && pos->second == &state ); + freeGroups.erase( pos ); + } + void own( GroupState& state, const std::string& owner ) + { + state.owner = owner; + consumers[state.owner]++; + unFree( state ); + } + void disown( GroupState& state ) + { + assert(consumers[state.owner]); + consumers[state.owner]--; + state.owner.clear(); + assert(state.members.size()); +#ifdef NDEBUG + freeGroups[state.members.front()] = &state; +#else + bool unique = freeGroups.insert(GroupFifo::value_type(state.members.front(), &state)).second; + (void) unique; assert(unique); +#endif + } public: @@ -156,10 +216,11 @@ class MessageGroupManager : public QueueObserver, public MessageAllocator void dequeued( const QueuedMessage& qm ); void consumerAdded( const Consumer& ); void consumerRemoved( const Consumer& ); - bool nextMessage( Consumer::shared_ptr c, QueuedMessage& next, - const Mutex::ScopedLock&); - bool canAcquire(const std::string& consumer, const QueuedMessage& msg, - const Mutex::ScopedLock&); + bool nextConsumableMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock&); + // uses default nextBrowsableMessage() + bool acquireMessage(const std::string& consumer, const QueuedMessage& msg, + const Mutex::ScopedLock&); void query(qpid::types::Variant::Map&, const Mutex::ScopedLock&) const; bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; }; @@ -339,7 +400,7 @@ bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) assertClusterSafe(); QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); - if (!allocator->canAcquire( consumer, msg, locker )) { + if (!allocator->acquireMessage( consumer, msg, locker )) { QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); return false; } @@ -391,7 +452,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - if (!allocator->nextMessage(c, msg, locker)) { // no next available + if (!allocator->nextConsumableMessage(c, msg, locker)) { // no next available QPID_LOG(debug, "No messages available to dispatch to consumer " << c->getName() << " on queue '" << name << "'"); listeners.addListener(c); @@ -410,7 +471,9 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { - bool ok = acquire( msg.position, msg ); + bool ok = allocator->acquireMessage( c->getName(), msg, locker ); // inform allocator + (void) ok; assert(ok); + ok = acquire( msg.position, msg ); (void) ok; assert(ok); m = msg; c->position = m.position; @@ -435,7 +498,7 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Mutex::ScopedLock locker(messageLock); QueuedMessage msg; - if (!allocator->nextMessage(c, msg, locker)) { // no next available + if (!allocator->nextBrowsableMessage(c, msg, locker)) { // no next available QPID_LOG(debug, "No browsable messages available for consumer " << c->getName() << " on queue '" << name << "'"); listeners.addListener(c); @@ -1540,15 +1603,31 @@ const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) con void MessageGroupManager::enqueued( const QueuedMessage& qm ) { + // @todo KAG optimization - store reference to group state in QueuedMessage + // issue: const-ness?? std::string group( getGroupId(qm) ); - uint32_t total = ++messageGroups[group].total; + GroupState &state(messageGroups[group]); + state.members.push_back(qm.position); + uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << queue->getName() << ": added message to group id=" << group << " total=" << total ); + if (total == 1) { + // newly created group, no owner + state.group = group; +#ifdef NDEBUG + freeGroups[qm.position] = &state; +#else + bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second; + (void) unique; assert(unique); +#endif + } } void MessageGroupManager::acquired( const QueuedMessage& qm ) { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); @@ -1561,16 +1640,20 @@ void MessageGroupManager::acquired( const QueuedMessage& qm ) void MessageGroupManager::requeued( const QueuedMessage& qm ) { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + // @todo KAG BUG - how to ensure requeue happens in the correct order? + // @todo KAG BUG - if requeue is not in correct order - what do we do? throw? std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); assert( state.acquired != 0 ); state.acquired -= 1; - if (state.acquired == 0 && !state.owner.empty()) { + if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << queue->getName() << ": consumer name=" << state.owner << " released group id=" << gs->first); - state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? + disown(state); } QPID_LOG( trace, "group queue " << queue->getName() << ": requeued message to group id=" << group << " acquired=" << state.acquired ); @@ -1579,22 +1662,41 @@ void MessageGroupManager::requeued( const QueuedMessage& qm ) void MessageGroupManager::dequeued( const QueuedMessage& qm ) { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); - assert( state.total != 0 ); - uint32_t total = state.total -= 1; + assert( state.members.size() != 0 ); + + // likely to be at or near begin() if dequeued in order + { + GroupState::PositionFifo::iterator pos = state.members.begin(); + GroupState::PositionFifo::iterator end = state.members.end(); + while (pos != end) { + if (*pos == qm.position) { + state.members.erase(pos); + break; + } + ++pos; + } + } + assert( state.acquired != 0 ); state.acquired -= 1; - if (state.total == 0) { + uint32_t total = state.members.size(); + if (total == 0) { + if (!state.owned()) { // unlikely, but need to remove from the free list before erase + unFree( state ); + } QPID_LOG( trace, "group queue " << queue->getName() << ": deleting group id=" << gs->first); messageGroups.erase( gs ); } else { - if (state.acquired == 0 && !state.owner.empty()) { + if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << queue->getName() << ": consumer name=" << state.owner << " released group id=" << gs->first); - state.owner.clear(); // KAG TODO: need to invalidate consumer's positions? + disown(state); } } QPID_LOG( trace, "group queue " << queue->getName() << @@ -1603,81 +1705,84 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) void MessageGroupManager::consumerAdded( const Consumer& c ) { - const std::string& name(c.getName()); - bool unique = consumers.insert( name ).second; - (void) unique; assert( unique ); - QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer name=" << name ); + assert(consumers.find(c.getName()) == consumers.end()); + consumers[c.getName()] = 0; // no groups owned yet + QPID_LOG( trace, "group queue " << queue->getName() << ": added consumer, name=" << c.getName() ); } void MessageGroupManager::consumerRemoved( const Consumer& c ) { const std::string& name(c.getName()); - size_t count = consumers.erase( name ); - (void) count; assert( count == 1 ); + Consumers::iterator consumer = consumers.find(name); + assert(consumer != consumers.end()); + size_t count = consumer->second; - bool needReset = false; for (GroupMap::iterator gs = messageGroups.begin(); - gs != messageGroups.end(); ++gs) { + count && gs != messageGroups.end(); ++gs) { GroupState& state( gs->second ); if (state.owner == name) { - state.owner.clear(); - needReset = true; + --count; + disown(state); QPID_LOG( trace, "group queue " << queue->getName() << ": consumer name=" << name << " released group id=" << gs->first); } } - - if (needReset) { - // KAG TODO: How do I invalidate all consumers that need invalidating???? - } + consumers.erase( consumer ); QPID_LOG( trace, "group queue " << queue->getName() << ": removed consumer name=" << name ); } -bool MessageGroupManager::nextMessage( Consumer::shared_ptr c, QueuedMessage& next, - const Mutex::ScopedLock& ) +bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr c, QueuedMessage& next, + const Mutex::ScopedLock& ) { Messages& messages(queue->getMessages()); if (messages.empty()) return false; - if (c->preAcquires()) { // not browsing - next = messages.front(); - do { - /** @todo KAG: horrifingly suboptimal - optimize */ - std::string group( getGroupId( next ) ); - GroupMap::iterator gs = messageGroups.find( group ); /** @todo need to cache this somehow */ - assert( gs != messageGroups.end() ); - GroupState& state( gs->second ); - if (state.owner.empty()) { - state.owner = c->getName(); - QPID_LOG( trace, "group queue " << queue->getName() << - ": consumer name=" << c->getName() << " has acquired group id=" << group); - return true; - } - if (state.owner == c->getName()) { - return true; - } - } while (messages.next( next.position, next )); /** @todo: .next() is a linear search from front - optimize */ - return false; - } else if (messages.next(c->position, next)) - return true; + if (!freeGroups.empty()) { + framing::SequenceNumber nextFree = freeGroups.begin()->first; + if (nextFree < c->position) { // next free group's msg is older than current position + bool ok = messages.find(nextFree, next); + (void) ok; assert( ok ); + } else { + if (!messages.next( c->position, next )) + return false; // shouldn't happen - should find nextFree + } + } else { // no free groups available + if (consumers[c->getName()] == 0) { // and none currently owned + return false; // so nothing available to consume + } + if (!messages.next( c->position, next )) + return false; + } + + do { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId( next ) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + if (!state.owned() || state.owner == c->getName()) { + return true; + } + } while (messages.next( next.position, next )); return false; } -bool MessageGroupManager::canAcquire(const std::string& consumer, const QueuedMessage& qm, - const Mutex::ScopedLock&) +bool MessageGroupManager::acquireMessage(const std::string& consumer, const QueuedMessage& qm, + const Mutex::ScopedLock&) { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); - if (state.owner.empty()) { - state.owner = consumer; + if (!state.owned()) { + own( state, consumer ); QPID_LOG( trace, "group queue " << queue->getName() << ": consumer name=" << consumer << " has acquired group id=" << gs->first); return true; @@ -1722,7 +1827,7 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status, g != messageGroups.end(); ++g) { qpid::types::Variant::Map info; info[GroupIdKey] = g->first; - info[GroupMsgCount] = g->second.total; + info[GroupMsgCount] = g->second.members.size(); info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ info[GroupConsumer] = g->second.owner; groups.push_back(info); @@ -1761,33 +1866,5 @@ boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( Queue *q, -// default allocator - requires messageLock to be held by caller! -bool MessageAllocator::nextMessage( Consumer::shared_ptr c, QueuedMessage& next, - const Mutex::ScopedLock& /*just to enforce locking*/) -{ - Messages& messages(queue->getMessages()); - - if (messages.empty()) - return false; - - if (c->preAcquires()) { // not browsing - next = messages.front(); - return true; - } else if (messages.next(c->position, next)) - return true; - return false; -} - - -// default allocator - requires messageLock to be held by caller! -bool MessageAllocator::canAcquire(const std::string&, const QueuedMessage&, - const Mutex::ScopedLock& /*just to enforce locking*/) -{ - return true; // always give permission to acquire -} - - - - diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index e3dfa1452d..98fbc2cba4 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -705,23 +705,48 @@ QPID_AUTO_TEST_CASE(testQueueCleaner) { BOOST_CHECK_EQUAL(queue->getMessageCount(), 0u); } + +namespace { + // helper for group tests + void verifyAcquire( Queue::shared_ptr queue, + TestConsumer::shared_ptr c, + std::deque<QueuedMessage>& results, + const std::string& expectedGroup, + const int expectedId ) + { + queue->dispatch(c); + results.push_back(c->last); + std::string group = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); + int id = c->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( group, expectedGroup ); + BOOST_CHECK_EQUAL( id, expectedId ); + } +} + QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { + // + // Verify that consumers of grouped messages own the groups once a message is acquired, + // and release the groups once all acquired messages have been dequeued or requeued + // FieldTable args; Queue::shared_ptr queue(new Queue("my_queue", true)); args.setString("qpid.group_header_key", "GROUP-ID"); queue->configure(args); - for (int i = 0; i < 3; ++i) { - intrusive_ptr<Message> msg1 = create_message("e", "A"); - msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","a"); - queue->deliver(msg1); - - intrusive_ptr<Message> msg2 = create_message("e", "A"); - msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID","b"); - queue->deliver(msg2); + std::string groups[] = { std::string("a"), std::string("a"), std::string("a"), + std::string("b"), std::string("b"), std::string("b"), + std::string("c"), std::string("c"), std::string("c") }; + for (int i = 0; i < 9; ++i) { + intrusive_ptr<Message> msg = create_message("e", "A"); + msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", groups[i]); + msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", i); + queue->deliver(msg); } - BOOST_CHECK_EQUAL(uint32_t(6), queue->getMessageCount()); + // Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---, + + BOOST_CHECK_EQUAL(uint32_t(9), queue->getMessageCount()); TestConsumer::shared_ptr c1(new TestConsumer("C1")); TestConsumer::shared_ptr c2(new TestConsumer("C2")); @@ -729,38 +754,224 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { queue->consume(c1); queue->consume(c2); - std::deque<QueuedMessage> dequeMe; + std::deque<QueuedMessage> dequeMeC1; + std::deque<QueuedMessage> dequeMeC2; + + + verifyAcquire(queue, c1, dequeMeC1, "a", 0 ); // c1 now owns group "a" (acquire a-0) + verifyAcquire(queue, c2, dequeMeC2, "b", 3 ); // c2 should now own group "b" (acquire b-3) + + // now let c1 complete the 'a-0' message - this should free the 'a' group + queue->dequeue( 0, dequeMeC1.front() ); + dequeMeC1.pop_front(); + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ---, ---, ^C2, ^C2, ^C2, ---, ---, --- + + // now c2 should pick up the next 'a-1', since it is oldest free + verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); // c2 should now own groups "a" and "b" + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ---, ---, --- + + // c1 should only be able to snarf up the first "c" message now... + verifyAcquire(queue, c1, dequeMeC1, "c", 6 ); // should skip to the first "c" + + // Queue = a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C2, ^C2, ^C2, ^C1, ^C1, ^C1 + + // hmmm... what if c2 now dequeues "b-3"? (now only has a-1 acquired) + queue->dequeue( 0, dequeMeC2.front() ); + dequeMeC2.pop_front(); + + // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ---, ---, ^C1, ^C1, ^C1 + + // b group is free, c is owned by c1 - c1's next get should grab 'b-4' + verifyAcquire(queue, c1, dequeMeC1, "b", 4 ); + + // Queue = a-1, a-2, b-4, b-5, c-6, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ^C1, ^C1, ^C1 - queue->dispatch(c1); // now owns group "a" - dequeMe.push_back(c1->last); - queue->dispatch(c2); // now owns group "b" - dequeMe.push_back(c2->last); + // c2 can now only grab a-2, and that's all + verifyAcquire(queue, c2, dequeMeC2, "a", 2 ); - queue->dispatch(c2); // should skip next "a", get last "b" - std::string group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); - dequeMe.push_back(c2->last); - BOOST_CHECK_EQUAL( group, std::string("b") ); + // now C2 can't get any more, since C1 owns "b" and "c" group... + bool gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); - queue->dispatch(c1); // should get last "a" - group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); - dequeMe.push_back(c1->last); - BOOST_CHECK_EQUAL( group, std::string("a") ); + // hmmm... what if c1 now dequeues "c-6"? (now only own's b-4) + queue->dequeue( 0, dequeMeC1.front() ); + dequeMeC1.pop_front(); - // now "free up" the groups - while (!dequeMe.empty()) { - queue->dequeue( 0, dequeMe.front() ); - dequeMe.pop_front(); + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ---, --- + + // c2 can now grab c-7 + verifyAcquire(queue, c2, dequeMeC2, "c", 7 ); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C2, ^C2, ^C1, ^C1, ^C2, ^C2 + + // what happens if C-2 "requeues" a-1 and a-2? + queue->requeue( dequeMeC2.front() ); + dequeMeC2.pop_front(); + queue->requeue( dequeMeC2.front() ); + dequeMeC2.pop_front(); // now just has c-7 acquired + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ---, ---, ^C1, ^C1, ^C2, ^C2 + + // now c1 will grab a-1 and a-2... + verifyAcquire(queue, c1, dequeMeC1, "a", 1 ); + verifyAcquire(queue, c1, dequeMeC1, "a", 2 ); + + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ^C1, ^C1, ^C1, ^C1, ^C2, ^C2 + + // c2 can now acquire c-8 only + verifyAcquire(queue, c2, dequeMeC2, "c", 8 ); + + // and c1 can get b-5 + verifyAcquire(queue, c1, dequeMeC1, "b", 5 ); + + // should be no more acquire-able for anyone now: + gotOne = queue->dispatch(c1); + BOOST_CHECK( !gotOne ); + gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); + + // requeue all of C1's acquired messages, then cancel C1 + while (!dequeMeC1.empty()) { + queue->requeue(dequeMeC1.front()); + dequeMeC1.pop_front(); } + queue->cancel(c1); - // now c2 should be able to acquire group "a", and c1 group "b" + // Queue = a-1, a-2, b-4, b-5, c-7, c-8... + // Owners= ---, ---, ---, ---, ^C2, ^C2 - queue->dispatch(c2); - group = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); - BOOST_CHECK_EQUAL( group, std::string("a") ); + // b-4, a-1, a-2, b-5 all should be available, right? + verifyAcquire(queue, c2, dequeMeC2, "a", 1 ); - queue->dispatch(c1); - group = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsString("GROUP-ID"); - BOOST_CHECK_EQUAL( group, std::string("b") ); + while (!dequeMeC2.empty()) { + queue->dequeue(0, dequeMeC2.front()); + dequeMeC2.pop_front(); + } + + // Queue = a-2, b-4, b-5 + // Owners= ---, ---, --- + + TestConsumer::shared_ptr c3(new TestConsumer("C3")); + std::deque<QueuedMessage> dequeMeC3; + + verifyAcquire(queue, c3, dequeMeC3, "a", 2 ); + verifyAcquire(queue, c2, dequeMeC2, "b", 4 ); + + // Queue = a-2, b-4, b-5 + // Owners= ^C3, ^C2, ^C2 + + gotOne = queue->dispatch(c3); + BOOST_CHECK( !gotOne ); + + verifyAcquire(queue, c2, dequeMeC2, "b", 5 ); + + while (!dequeMeC2.empty()) { + queue->dequeue(0, dequeMeC2.front()); + dequeMeC2.pop_front(); + } + + // Queue = a-2, + // Owners= ^C3, + + intrusive_ptr<Message> msg = create_message("e", "A"); + msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", "a"); + msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", 9); + queue->deliver(msg); + + // Queue = a-2, a-9 + // Owners= ^C3, ^C3 + + gotOne = queue->dispatch(c2); + BOOST_CHECK( !gotOne ); + + msg = create_message("e", "A"); + msg->getProperties<MessageProperties>()->getApplicationHeaders().setString("GROUP-ID", "b"); + msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", 10); + queue->deliver(msg); + + // Queue = a-2, a-9, b-10 + // Owners= ^C3, ^C3, ---- + + verifyAcquire(queue, c2, dequeMeC2, "b", 10 ); + verifyAcquire(queue, c3, dequeMeC3, "a", 9 ); + + gotOne = queue->dispatch(c3); + BOOST_CHECK( !gotOne ); + + queue->cancel(c2); + queue->cancel(c3); +} + + +QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { + // + // Verify that the same default group name is automatically applied to messages that + // do not specify a group name. + // + FieldTable args; + Queue::shared_ptr queue(new Queue("my_queue", true)); + args.setString("qpid.group_header_key", "GROUP-ID"); + queue->configure(args); + + for (int i = 0; i < 3; ++i) { + intrusive_ptr<Message> msg = create_message("e", "A"); + // no "GROUP-ID" header + msg->getProperties<MessageProperties>()->getApplicationHeaders().setInt("MY-ID", i); + queue->deliver(msg); + } + + // Queue = 0, 1, 2 + + BOOST_CHECK_EQUAL(uint32_t(3), queue->getMessageCount()); + + TestConsumer::shared_ptr c1(new TestConsumer("C1")); + TestConsumer::shared_ptr c2(new TestConsumer("C2")); + + queue->consume(c1); + queue->consume(c2); + + std::deque<QueuedMessage> dequeMeC1; + std::deque<QueuedMessage> dequeMeC2; + + queue->dispatch(c1); // c1 now owns default group (acquired 0) + dequeMeC1.push_back(c1->last); + int id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 0 ); + + bool gotOne = queue->dispatch(c2); // c2 should get nothing + BOOST_CHECK( !gotOne ); + + queue->dispatch(c1); // c1 now acquires 1 + dequeMeC1.push_back(c1->last); + id = c1->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 1 ); + + gotOne = queue->dispatch(c2); // c2 should still get nothing + BOOST_CHECK( !gotOne ); + + while (!dequeMeC1.empty()) { + queue->dequeue(0, dequeMeC1.front()); + dequeMeC1.pop_front(); + } + + // now default group should be available... + queue->dispatch(c2); // c2 now owns default group (acquired 2) + id = c2->last.payload->getProperties<MessageProperties>()->getApplicationHeaders().getAsInt("MY-ID"); + BOOST_CHECK_EQUAL( id, 2 ); + + gotOne = queue->dispatch(c1); // c1 should get nothing + BOOST_CHECK( !gotOne ); queue->cancel(c1); queue->cancel(c2); |