diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-09-22 17:49:25 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-09-22 17:49:25 +0000 |
commit | 5d80e3df1afe0d47e34b9e2c39b3dfffb462d63d (patch) | |
tree | a49d552b99c13a3a251d5ac264a9fad8db63f5ba | |
parent | 862422c2962c812455adc6e4814602d9c04674e8 (diff) | |
download | qpid-python-5d80e3df1afe0d47e34b9e2c39b3dfffb462d63d.tar.gz |
QPID-3346: incorporate feedback from aconway's review
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3346@1174282 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageAllocator.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/MessageGroupManager.cpp | 36 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 80 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueObserver.h | 22 |
5 files changed, 86 insertions, 70 deletions
diff --git a/qpid/cpp/src/qpid/broker/MessageAllocator.cpp b/qpid/cpp/src/qpid/broker/MessageAllocator.cpp index 2ab91adff0..c8cf792bc7 100644 --- a/qpid/cpp/src/qpid/broker/MessageAllocator.cpp +++ b/qpid/cpp/src/qpid/broker/MessageAllocator.cpp @@ -52,6 +52,7 @@ bool MessageAllocator::acquirable( const std::string&, const QueuedMessage&, const qpid::sys::Mutex::ScopedLock&) { + // by default, all messages present on the queue are acquireable return true; } diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 8047cd639d..78a3f10056 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -28,13 +28,13 @@ using namespace qpid::broker; namespace { - const std::string GroupQueryKey("qpid.message_group_queue"); - const std::string GroupHeaderKey("group_header_key"); - const std::string GroupStateKey("group_state"); - const std::string GroupIdKey("group_id"); - const std::string GroupMsgCount("msg_count"); - const std::string GroupTimestamp("timestamp"); - const std::string GroupConsumer("consumer"); + const std::string GROUP_QUERY_KEY("qpid.message_group_queue"); + const std::string GROUP_HEADER_KEY("group_header_key"); + const std::string GROUP_STATE_KEY("group_state"); + const std::string GROUP_ID_KEY("group_id"); + const std::string GROUP_MSG_COUNT("msg_count"); + const std::string GROUP_TIMESTAMP("timestamp"); + const std::string GROUP_CONSUMER("consumer"); } @@ -66,12 +66,8 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm ) if (total == 1) { // newly created group, no owner state.group = group; -#ifdef NDEBUG + assert(freeGroups.find(qm.position) == freeGroups.end()); freeGroups[qm.position] = &state; -#else - bool unique = freeGroups.insert(GroupFifo::value_type(qm.position, &state)).second; - (void) unique; assert(unique); -#endif } } @@ -261,22 +257,22 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status, } **/ - assert(status.find(GroupQueryKey) == status.end()); + assert(status.find(GROUP_QUERY_KEY) == status.end()); qpid::types::Variant::Map state; qpid::types::Variant::List groups; - state[GroupHeaderKey] = groupIdHeader; + state[GROUP_HEADER_KEY] = groupIdHeader; for (GroupMap::const_iterator g = messageGroups.begin(); g != messageGroups.end(); ++g) { qpid::types::Variant::Map info; - info[GroupIdKey] = g->first; - info[GroupMsgCount] = g->second.members.size(); - info[GroupTimestamp] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ - info[GroupConsumer] = g->second.owner; + info[GROUP_ID_KEY] = g->first; + info[GROUP_MSG_COUNT] = g->second.members.size(); + info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ + info[GROUP_CONSUMER] = g->second.owner; groups.push_back(info); } - state[GroupStateKey] = groups; - status[GroupQueryKey] = state; + state[GROUP_STATE_KEY] = groups; + status[GROUP_QUERY_KEY] = state; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d4140f3505..06b7780599 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -224,14 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } - - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->requeued(msg); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what()); - } - } + observeRequeue(msg, locker); } copy.notify(); } @@ -241,7 +234,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - if (acquire(position, message )) { + if (acquire(position, message, locker)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; } else { @@ -262,7 +255,7 @@ bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) } QueuedMessage copy(msg); - if (acquire( msg.position, copy )) { + if (acquire( msg.position, copy, locker)) { QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); return true; } @@ -317,7 +310,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (msg.payload->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); c->position = msg.position; - acquire( msg.position, msg ); + acquire( msg.position, msg, locker); dequeue( 0, msg ); continue; } @@ -328,7 +321,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ if (c->accept(msg.payload)) { bool ok = allocator->acquirable( c->getName(), msg, locker ); // inform allocator (void) ok; assert(ok); - ok = acquire( msg.position, msg ); + ok = acquire( msg.position, msg, locker); (void) ok; assert(ok); m = msg; c->position = m.position; @@ -435,9 +428,9 @@ void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ autoDeleteTask->cancel(); } } + Mutex::ScopedLock locker(messageLock); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ - Mutex::ScopedLock locker(messageLock); (*i)->consumerAdded(*c); } catch (const std::exception& e) { QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); @@ -454,9 +447,9 @@ void Queue::cancel(Consumer::shared_ptr c){ if (mgmtObject != 0) mgmtObject->dec_consumerCount (); } + Mutex::ScopedLock locker(messageLock); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ - Mutex::ScopedLock locker(messageLock); (*i)->consumerRemoved(*c); } catch (const std::exception& e) { QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); @@ -468,7 +461,7 @@ QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); if (messages->pop(msg)) - acquired( msg ); + observeAcquire(msg, locker); return msg; } @@ -504,7 +497,7 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) i != expired.end(); ++i) { { Mutex::ScopedLock locker(messageLock); - acquired( *i ); // expects messageLock held + observeAcquire(*i, locker); } dequeue( 0, *i ); } @@ -637,7 +630,7 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); qmsg != c.matches.end(); ++qmsg) { // Update observers and message state: - acquired(*qmsg); + observeAcquire(*qmsg, locker); dequeue(0, *qmsg); // now reroute if necessary if (dest.get()) { @@ -661,7 +654,7 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); qmsg != c.matches.end(); ++qmsg) { // Update observers and message state: - acquired(*qmsg); + observeAcquire(*qmsg, locker); dequeue(0, *qmsg); // and move to destination Queue. assert(qmsg->payload); @@ -673,21 +666,22 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, /** Acquire the front (oldest) message from the in-memory queue. * assumes messageLock held by caller */ -void Queue::pop() +void Queue::pop(const Mutex::ScopedLock& locker) { assertClusterSafe(); QueuedMessage msg; if (messages->pop(msg)) { - acquired( msg ); // mark it removed + observeAcquire(msg, locker); ++dequeueSincePurge; } } /** Acquire the message at the given position, return true and msg if acquire succeeds */ -bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg ) +bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const Mutex::ScopedLock& locker) { if (messages->remove(position, msg)) { - acquired( msg ); + observeAcquire(msg, locker); ++dequeueSincePurge; return true; } @@ -705,12 +699,13 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); + if (dequeueRequired) + observeAcquire(removed, locker); listeners.populate(copy); - enqueued(qm); + observeEnqueue(qm, locker); } copy.notify(); if (dequeueRequired) { - acquired( removed ); // tell observers if (isRecovery) { //can't issue new requests for the store until //recovery is complete @@ -841,7 +836,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; if (!ctxt) { - dequeued(msg); + observeDequeue(msg, locker); } } // This check prevents messages which have been forced persistent on one queue from dequeuing @@ -861,7 +856,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + observeDequeue(msg, locker); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -872,11 +867,11 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) * Removes the first (oldest) message from the in-memory delivery queue as well dequeing * it from the logical (and persistent if applicable) queue */ -void Queue::popAndDequeue() +void Queue::popAndDequeue(const Mutex::ScopedLock& held) { if (!messages->empty()) { QueuedMessage msg = messages->front(); - pop(); + pop(held); dequeue(0, msg); } } @@ -885,7 +880,7 @@ void Queue::popAndDequeue() * Updates policy and management when a message has been dequeued, * expects messageLock to be held */ -void Queue::dequeued(const QueuedMessage& msg) +void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); @@ -901,7 +896,7 @@ void Queue::dequeued(const QueuedMessage& msg) /** updates queue observers when a message has become unavailable for transfer, * expects messageLock to be held */ -void Queue::acquired(const QueuedMessage& msg) +void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) { for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -912,6 +907,20 @@ void Queue::acquired(const QueuedMessage& msg) } } +/** updates queue observers when a message has become re-available for transfer, + * expects messageLock to be held + */ +void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->requeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what()); + } + } +} + void Queue::create(const FieldTable& _settings) { settings = _settings; @@ -1034,7 +1043,7 @@ void Queue::destroyed() while(!messages->empty()){ DeliverableMessage msg(messages->front().payload); alternateExchange->routeWithAlternate(msg); - popAndDequeue(); + popAndDequeue(locker); } alternateExchange->decAlternateUsers(); } @@ -1328,7 +1337,10 @@ void Queue::insertSequenceNumbers(const std::string& key) QPID_LOG(debug, "Inserting sequence numbers as " << key); } -void Queue::enqueued(const QueuedMessage& m) +/** updates queue observers and state when a message has become available for transfer, + * expects messageLock to be held + */ +void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1351,7 +1363,8 @@ void Queue::updateEnqueued(const QueuedMessage& m) if (policy.get()) { policy->recoverEnqueued(payload); } - enqueued(m); + Mutex::ScopedLock locker(messageLock); + observeEnqueue(m, locker); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1375,6 +1388,7 @@ void Queue::checkNotDeleted() void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) { + Mutex::ScopedLock locker(messageLock); observers.insert(observer); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 25d0f26a75..18faba9732 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -142,16 +142,19 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isExcluded(boost::intrusive_ptr<Message>& msg); - /** update queue observers with new message state */ - void enqueued(const QueuedMessage& msg); - void acquired(const QueuedMessage& msg); - void dequeued(const QueuedMessage& msg); + /** update queue observers, stats, policy, etc when the messages' state changes. Lock + * must be held by caller */ + void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); /** modify the Queue's message container - assumes messageLock held */ - void pop(); // acquire front msg - void popAndDequeue(); // acquire and dequeue front msg + void pop(const sys::Mutex::ScopedLock& held); // acquire front msg + void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg // acquire message @ position, return true and set msg if acquire succeeds - bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg ); + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const sys::Mutex::ScopedLock& held); void forcePersistent(QueuedMessage& msg); int getEventMode(); diff --git a/qpid/cpp/src/qpid/broker/QueueObserver.h b/qpid/cpp/src/qpid/broker/QueueObserver.h index a2e8aa5aca..b58becd2ae 100644 --- a/qpid/cpp/src/qpid/broker/QueueObserver.h +++ b/qpid/cpp/src/qpid/broker/QueueObserver.h @@ -35,9 +35,11 @@ class Consumer; * the queue it has been delivered to. A message can be considered in one of three states * with respect to the queue: * - * 1) "Available" - available for transfer to consumers, - * 2) "Locked" - to a particular consumer, no longer available for transfer, but not - * considered fully dequeued. + * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire), + * + * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers + * (by either browse or acquire), but still considered on the queue. + * * 3) "Dequeued" - removed from the queue and no longer available to any consumer. * * The queue events that are observable are: @@ -45,15 +47,15 @@ class Consumer; * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer * (e.g. browse or acquire) * - * "Acquired" - the message is "Locked" - a consumer has claimed exclusive access to it. - * It is no longer available for other consumers to browse or acquire, but it is not yet - * considered dequeued as it may be requeued by the consumer. + * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available + * for other consumers to browse or acquire, but it is not yet considered dequeued as it + * may be requeued by the consumer. * - * "Requeued" - a previously-consumed message is 'unlocked': it is put back on the queue - * at its original position and returns to the "Available" state. + * "Requeued" - a previously-acquired message is released by its owner: it is put back on + * the queue at its original position and returns to the "Available" state. * - * "Dequeued" - a Locked message is no longer queued. At this point, the queue no longer - * tracks the message, and the broker considers the consumer's transaction complete. + * "Dequeued" - a message is no longer queued. At this point, the queue no longer tracks + * the message, and the broker considers the consumer's transaction complete. */ class QueueObserver { |