diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-07 14:21:48 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-10-07 14:21:48 +0000 |
| commit | 4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6 (patch) | |
| tree | 4a54f245efa1c2df1601d648c1fdd41fba08b802 /cpp/src/qpid | |
| parent | 92d889931fe1cea19d1e33658d5f30348bd7070e (diff) | |
| download | qpid-python-4fbbc6ecf68bd8f118f4a6165c8f5bfca2c3c8b6.tar.gz | |
QPID-3346: move message group feature into trunk.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1180050 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
23 files changed, 1350 insertions, 208 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 598c43b1d8..bd94582d10 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -33,10 +33,12 @@ #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" @@ -122,7 +124,8 @@ Broker::Options::Options(const std::string& name) : qmf1Support(true), queueFlowStopRatio(80), queueFlowResumeRatio(70), - queueThresholdEventRatio(80) + queueThresholdEventRatio(80), + defaultMsgGroup("qpid.no-group") { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -158,7 +161,8 @@ Broker::Options::Options(const std::string& name) : ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") - ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised"); + ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") + ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier."); } const std::string empty; @@ -249,6 +253,7 @@ Broker::Broker(const Broker::Options& conf) : Plugin::earlyInitAll(*this); QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); + MessageGroupManager::setDefaults(conf.defaultMsgGroup); // If no plugin store module registered itself, set up the null store. if (NullMessageStore::isNullStore(store.get())) @@ -453,7 +458,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); QPID_LOG (debug, "Broker::queueMoveMessages()"); - if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter)) status = Manageable::STATUS_OK; else return Manageable::STATUS_PARAMETER_INVALID; @@ -483,6 +488,13 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = Manageable::STATUS_OK; break; } + case _qmf::Broker::METHOD_QUERY : + { + _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args); + status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -655,6 +667,50 @@ void Broker::deleteObject(const std::string& type, const std::string& name, } +Manageable::status_t Broker::queryObject(const std::string& type, + const std::string& name, + Variant::Map& results, + const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")"); + + if (type == TYPE_QUEUE) + return queryQueue( name, userId, connectionId, results ); + + if (type == TYPE_EXCHANGE || + type == TYPE_TOPIC || + type == TYPE_BINDING) + return Manageable::STATUS_NOT_IMPLEMENTED; + + throw UnknownObjectType(type); +} + +Manageable::status_t Broker::queryQueue( const std::string& name, + const std::string& userId, + const std::string& /*connectionId*/, + Variant::Map& results ) +{ + (void) results; + if (acl) { + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId)); + } + + boost::shared_ptr<Queue> q(queues.find(name)); + if (!q) { + QPID_LOG(error, "Query failed: queue not found, name=" << name); + return Manageable::STATUS_UNKNOWN_OBJECT; + } + q->query( results ); + return Manageable::STATUS_OK;; +} + void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); @@ -724,7 +780,8 @@ void Broker::connect( uint32_t Broker::queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, - uint32_t qty) + uint32_t qty, + const Variant::Map& filter) { Queue::shared_ptr src_queue = queues.find(srcQueue); if (!src_queue) @@ -733,7 +790,7 @@ uint32_t Broker::queueMoveMessages( if (!dest_queue) return 0; - return src_queue->move(dest_queue, qty); + return src_queue->move(dest_queue, qty, &filter); } diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 40f7b6273f..8b347db3c0 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -121,6 +121,7 @@ public: uint queueFlowStopRatio; // producer flow control: on uint queueFlowResumeRatio; // producer flow control: off uint16_t queueThresholdEventRatio; + std::string defaultMsgGroup; private: std::string getHome(); @@ -157,7 +158,12 @@ public: const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context); void deleteObject(const std::string& type, const std::string& name, const qpid::types::Variant::Map& options, const ConnectionState* context); - + Manageable::status_t queryObject(const std::string& type, const std::string& name, + qpid::types::Variant::Map& results, const ConnectionState* context); + Manageable::status_t queryQueue( const std::string& name, + const std::string& userId, + const std::string& connectionId, + qpid::types::Variant::Map& results); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; std::auto_ptr<sys::Timer> clusterTimer; @@ -258,7 +264,8 @@ public: */ uint32_t queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, - uint32_t qty); + uint32_t qty, + const qpid::types::Variant::Map& filter); boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const; diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index 317338a8ad..2af9b0c121 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -36,13 +36,19 @@ class Consumer { // inListeners allows QueueListeners to efficiently track if this instance is registered // for notifications without having to search its containers bool inListeners; - public: - typedef boost::shared_ptr<Consumer> shared_ptr; - + // the name is generated by broker and is unique within broker scope. It is not + // provided or known by the remote Consumer. + const std::string name; + public: + typedef boost::shared_ptr<Consumer> shared_ptr; + framing::SequenceNumber position; - - Consumer(bool preAcquires = true) : acquires(preAcquires), inListeners(false) {} + + Consumer(const std::string& _name, bool preAcquires = true) + : acquires(preAcquires), inListeners(false), name(_name), position(0) {} bool preAcquires() const { return acquires; } + const std::string& getName() const { return name; } + virtual bool deliver(QueuedMessage& msg) = 0; virtual void notify() = 0; virtual bool filter(boost::intrusive_ptr<Message>) { return true; } diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 11970db394..0b8fe95d5e 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -153,7 +153,7 @@ uint32_t DeliveryRecord::getCredit() const } void DeliveryRecord::acquire(DeliveryIds& results) { - if (queue->acquire(msg)) { + if (queue->acquire(msg, tag)) { acquired = true; results.push_back(id); if (!acceptExpected) { diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 19ab37ac17..5a331357be 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -46,7 +46,7 @@ class DeliveryRecord { QueuedMessage msg; mutable boost::shared_ptr<Queue> queue; - std::string tag; + std::string tag; // name of consumer DeliveryId id; bool acquired : 1; bool acceptExpected : 1; diff --git a/cpp/src/qpid/broker/FifoDistributor.cpp b/cpp/src/qpid/broker/FifoDistributor.cpp new file mode 100644 index 0000000000..cdb32d8c8c --- /dev/null +++ b/cpp/src/qpid/broker/FifoDistributor.cpp @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +#include "qpid/broker/Queue.h" +#include "qpid/broker/FifoDistributor.h" + +using namespace qpid::broker; + +FifoDistributor::FifoDistributor(Messages& container) + : messages(container) {} + +bool FifoDistributor::nextConsumableMessage( Consumer::shared_ptr&, QueuedMessage& next ) +{ + if (!messages.empty()) { + next = messages.front(); // by default, consume oldest msg + return true; + } + return false; +} + +bool FifoDistributor::allocate(const std::string&, const QueuedMessage& ) +{ + // by default, all messages present on the queue may be allocated as they have yet to + // be acquired. + return true; +} + +bool FifoDistributor::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; +} + +void FifoDistributor::query(qpid::types::Variant::Map&) const +{ + // nothing to see here.... +} + diff --git a/cpp/src/qpid/broker/FifoDistributor.h b/cpp/src/qpid/broker/FifoDistributor.h new file mode 100644 index 0000000000..245537ed12 --- /dev/null +++ b/cpp/src/qpid/broker/FifoDistributor.h @@ -0,0 +1,58 @@ +#ifndef _broker_FifoDistributor_h +#define _broker_FifoDistributor_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** Simple MessageDistributor for FIFO Queues - the HEAD message is always the next + * available message for consumption. + */ + +#include "qpid/broker/MessageDistributor.h" + +namespace qpid { +namespace broker { + +class Messages; + +class FifoDistributor : public MessageDistributor +{ + public: + FifoDistributor(Messages& container); + + /** Locking Note: all methods assume the caller is holding the Queue::messageLock + * during the method call. + */ + + /** MessageDistributor interface */ + + bool nextConsumableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); + bool allocate(const std::string& consumer, const QueuedMessage& target); + bool nextBrowsableMessage( Consumer::shared_ptr& consumer, QueuedMessage& next ); + void query(qpid::types::Variant::Map&) const; + + private: + Messages& messages; +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/MessageDistributor.h b/cpp/src/qpid/broker/MessageDistributor.h new file mode 100644 index 0000000000..090393c160 --- /dev/null +++ b/cpp/src/qpid/broker/MessageDistributor.h @@ -0,0 +1,76 @@ +#ifndef _broker_MessageDistributor_h +#define _broker_MessageDistributor_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** Abstraction used by Queue to determine the next "most desirable" message to provide to + * a particular consuming client + */ + + +#include "qpid/broker/Consumer.h" + +namespace qpid { +namespace broker { + +struct QueuedMessage; + +class MessageDistributor +{ + public: + virtual ~MessageDistributor() {}; + + /** Locking Note: all methods assume the caller is holding the Queue::messageLock + * during the method call. + */ + + /** Determine the next message available for consumption by the consumer + * @param consumer the consumer that needs a message to consume + * @param next set to the next message that the consumer may consume. + * @return true if message is available and next is set + */ + virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer, + QueuedMessage& next ) = 0; + + /** Allow the comsumer to take ownership of the given message. + * @param consumer the name of the consumer that is attempting to acquire the message + * @param qm the message to be acquired, previously returned from nextConsumableMessage() + * @return true if ownership is permitted, false if ownership cannot be assigned. + */ + virtual bool allocate( const std::string& consumer, + const QueuedMessage& target) = 0; + + /** Determine the next message available for browsing by the consumer + * @param consumer the consumer that is browsing the queue + * @param next set to the next message that the consumer may browse. + * @return true if a message is available and next is returned + */ + virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer, + QueuedMessage& next ) = 0; + + /** hook to add any interesting management state to the status map */ + virtual void query(qpid::types::Variant::Map&) const = 0; +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp new file mode 100644 index 0000000000..d4ca6af1d5 --- /dev/null +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -0,0 +1,443 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FieldTable.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/MessageGroupManager.h" + +using namespace qpid::broker; + +namespace { + const std::string GROUP_QUERY_KEY("qpid.message_group_queue"); + const std::string GROUP_HEADER_KEY("group_header_key"); + const std::string GROUP_STATE_KEY("group_state"); + const std::string GROUP_ID_KEY("group_id"); + const std::string GROUP_MSG_COUNT("msg_count"); + const std::string GROUP_TIMESTAMP("timestamp"); + const std::string GROUP_CONSUMER("consumer"); +} + + +const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); +const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); +const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); + + +const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const +{ + const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); + if (!headers) return defaultGroupId; + qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); + if (!id || !id->convertsTo<std::string>()) return defaultGroupId; + return id->get<std::string>(); +} + + +void MessageGroupManager::enqueued( const QueuedMessage& qm ) +{ + // @todo KAG optimization - store reference to group state in QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupState &state(messageGroups[group]); + state.members.push_back(qm.position); + uint32_t total = state.members.size(); + QPID_LOG( trace, "group queue " << qName << + ": added message to group id=" << group << " total=" << total ); + if (total == 1) { + // newly created group, no owner + state.group = group; + assert(freeGroups.find(qm.position) == freeGroups.end()); + freeGroups[qm.position] = &state; + } +} + + +void MessageGroupManager::acquired( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + state.acquired += 1; + QPID_LOG( trace, "group queue " << qName << + ": acquired message in group id=" << group << " acquired=" << state.acquired ); +} + + +void MessageGroupManager::requeued( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.acquired != 0 ); + state.acquired -= 1; + if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } + QPID_LOG( trace, "group queue " << qName << + ": requeued message to group id=" << group << " acquired=" << state.acquired ); +} + + +void MessageGroupManager::dequeued( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.members.size() != 0 ); + assert( state.acquired != 0 ); + state.acquired -= 1; + + // likely to be at or near begin() if dequeued in order + bool reFreeNeeded = false; + if (state.members.front() == qm.position) { + if (!state.owned()) { + // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! + // if on freelist, it is indexed by first member, which is about to be removed! + unFree(state); + reFreeNeeded = true; + } + state.members.pop_front(); + } else { + GroupState::PositionFifo::iterator pos = state.members.begin() + 1; + GroupState::PositionFifo::iterator end = state.members.end(); + while (pos != end) { + if (*pos == qm.position) { + state.members.erase(pos); + break; + } + ++pos; + } + } + + uint32_t total = state.members.size(); + if (total == 0) { + QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); + messageGroups.erase( gs ); + } else if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } else if (reFreeNeeded) { + disown(state); + } + QPID_LOG( trace, "group queue " << qName << + ": dequeued message from group id=" << group << " total=" << total ); +} + +void MessageGroupManager::consumerAdded( const Consumer& /*c*/ ) +{ +#if 0 + // allow a re-subscribing consumer + if (consumers.find(c.getName()) == consumers.end()) { + consumers[c.getName()] = 0; // no groups owned yet + QPID_LOG( trace, "group queue " << qName << ": added consumer, name=" << c.getName() ); + } else { + QPID_LOG( trace, "group queue " << qName << ": consumer re-subscribed, name=" << c.getName() ); + } +#endif +} + +void MessageGroupManager::consumerRemoved( const Consumer& /*c*/ ) +{ +#if 0 + const std::string& name(c.getName()); + Consumers::iterator consumer = consumers.find(name); + assert(consumer != consumers.end()); + size_t count = consumer->second; + + for (GroupMap::iterator gs = messageGroups.begin(); + count && gs != messageGroups.end(); ++gs) { + + GroupState& state( gs->second ); + if (state.owner == name) { + if (state.acquired == 0) { + --count; + disown(state); + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << name << " released group id=" << gs->first); + } + } + } + if (count == 0) { + consumers.erase( consumer ); + QPID_LOG( trace, "group queue " << qName << ": removed consumer name=" << name ); + } else { + // don't release groups with outstanding acquired msgs - consumer may re-subscribe! + QPID_LOG( trace, "group queue " << qName << ": consumer name=" << name << " unsubscribed with outstanding messages."); + } +#endif +} + + +bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + if (messages.empty()) + return false; + + if (!freeGroups.empty()) { + framing::SequenceNumber nextFree = freeGroups.begin()->first; + if (nextFree < c->position) { // next free group's msg is older than current position + bool ok = messages.find(nextFree, next); + (void) ok; assert( ok ); + } else { + if (!messages.next( c->position, next )) + return false; // shouldn't happen - should find nextFree + } + } else { // no free groups available +#if 0 + if (consumers[c->getName()] == 0) { // and none currently owned + return false; // so nothing available to consume + } +#endif + if (!messages.next( c->position, next )) + return false; + } + + do { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId( next ) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + if (!state.owned() || state.owner == c->getName()) { + return true; + } + } while (messages.next( next.position, next )); + return false; +} + + +bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + + if (!state.owned()) { + own( state, consumer ); + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << consumer << " has acquired group id=" << gs->first); + return true; + } + return state.owner == consumer; +} + +bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + // browse: allow access to any available msg, regardless of group ownership (?ok?) + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; +} + +void MessageGroupManager::query(qpid::types::Variant::Map& status) const +{ + /** Add a description of the current state of the message groups for this queue. + FORMAT: + { "qpid.message_group_queue": + { "group_header_key" : "<KEY>", + "group_state" : + [ { "group_id" : "<name>", + "msg_count" : <int>, + "timestamp" : <absTime>, + "consumer" : <consumer name> }, + {...} // one for each known group + ] + } + } + **/ + + assert(status.find(GROUP_QUERY_KEY) == status.end()); + qpid::types::Variant::Map state; + qpid::types::Variant::List groups; + + state[GROUP_HEADER_KEY] = groupIdHeader; + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + qpid::types::Variant::Map info; + info[GROUP_ID_KEY] = g->first; + info[GROUP_MSG_COUNT] = g->second.members.size(); + info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ + info[GROUP_CONSUMER] = g->second.owner; + groups.push_back(info); + } + state[GROUP_STATE_KEY] = groups; + status[GROUP_QUERY_KEY] = state; +} + + +boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName, + Messages& messages, + const qpid::framing::FieldTable& settings ) +{ + boost::shared_ptr<MessageGroupManager> empty; + + if (settings.isSet(qpidMessageGroupKey)) { + + // @todo: remove once "sticky" consumers are supported - see QPID-3347 + if (!settings.isSet(qpidSharedGroup)) { + QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." ); + return empty; + } + + std::string headerKey = settings.getAsString(qpidMessageGroupKey); + if (headerKey.empty()) { + QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName); + return empty; + } + unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); + + boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) ); + + QPID_LOG( debug, "Configured Queue '" << qName << + "' for message grouping using header key '" << headerKey << "'" << + " (timestamp=" << timestamp << ")"); + return manager; + } + return empty; +} + +std::string MessageGroupManager::defaultGroupId; +void MessageGroupManager::setDefaults(const std::string& groupId) // static +{ + defaultGroupId = groupId; +} + +/** Cluster replication: + + state map format: + + { "group-state": [ {"name": <group-name>, + "owner": <consumer-name>-or-empty, + "acquired-ct": <acquired count>, + "positions": [Seqnumbers, ... ]}, + {...} + ] + } +*/ + +namespace { + const std::string GROUP_NAME("name"); + const std::string GROUP_OWNER("owner"); + const std::string GROUP_ACQUIRED_CT("acquired-ct"); + const std::string GROUP_POSITIONS("positions"); + const std::string GROUP_STATE("group-state"); +} + + +/** Runs on UPDATER to snapshot current state */ +void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const +{ + using namespace qpid::framing; + state.clear(); + framing::Array groupState(TYPE_CODE_MAP); + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + + framing::FieldTable group; + group.setString(GROUP_NAME, g->first); + group.setString(GROUP_OWNER, g->second.owner); + group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); + framing::Array positions(TYPE_CODE_UINT32); + for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); + p != g->second.members.end(); ++p) + positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); + group.setArray(GROUP_POSITIONS, positions); + groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); + } + state.setArray(GROUP_STATE, groupState); + + QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); +} + + +/** called on UPDATEE to set state from snapshot */ +void MessageGroupManager::setState(const qpid::framing::FieldTable& state) +{ + using namespace qpid::framing; + messageGroups.clear(); + //consumers.clear(); + freeGroups.clear(); + + framing::Array groupState(TYPE_CODE_MAP); + + bool ok = state.getArray(GROUP_STATE, groupState); + if (!ok) { + QPID_LOG(error, "Unable to find message group state information for queue \"" << + qName << "\": cluster inconsistency error!"); + return; + } + + for (framing::Array::const_iterator g = groupState.begin(); + g != groupState.end(); ++g) { + framing::FieldTable group; + ok = framing::getEncodedValue<FieldTable>(*g, group); + if (!ok) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": table encoding error!"); + return; + } + MessageGroupManager::GroupState state; + if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": fields missing error!"); + return; + } + state.group = group.getAsString(GROUP_NAME); + state.owner = group.getAsString(GROUP_OWNER); + state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); + framing::Array positions(TYPE_CODE_UINT32); + ok = group.getArray(GROUP_POSITIONS, positions); + if (!ok) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": position encoding error!"); + return; + } + + for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) + state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); + messageGroups[state.group] = state; + if (state.owned()) + //consumers[state.owner]++; + ; + else { + assert(state.members.size()); + freeGroups[state.members.front()] = &messageGroups[state.group]; + } + } + + QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) +} diff --git a/cpp/src/qpid/broker/MessageGroupManager.h b/cpp/src/qpid/broker/MessageGroupManager.h new file mode 100644 index 0000000000..35bdda94d5 --- /dev/null +++ b/cpp/src/qpid/broker/MessageGroupManager.h @@ -0,0 +1,125 @@ +#ifndef _broker_MessageGroupManager_h +#define _broker_MessageGroupManager_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* for managing message grouping on Queues */ + +#include "qpid/broker/StatefulQueueObserver.h" +#include "qpid/broker/MessageDistributor.h" + + +namespace qpid { +namespace broker { + +class QueueObserver; +class MessageDistributor; + +class MessageGroupManager : public StatefulQueueObserver, public MessageDistributor +{ + static std::string defaultGroupId; // assigned if no group id header present + + const std::string groupIdHeader; // msg header holding group identifier + const unsigned int timestamp; // mark messages with timestamp if set + Messages& messages; // parent Queue's in memory message container + const std::string qName; // name of parent queue (for logs) + + struct GroupState { + typedef std::deque<framing::SequenceNumber> PositionFifo; + + std::string group; // group identifier + std::string owner; // consumer with outstanding acquired messages + uint32_t acquired; // count of outstanding acquired messages + //uint32_t total; // count of enqueued messages in this group + PositionFifo members; // msgs belonging to this group + + GroupState() : acquired(0) {} + bool owned() const {return !owner.empty();} + }; + typedef std::map<std::string, struct GroupState> GroupMap; + //typedef std::map<std::string, uint32_t> Consumers; // count of owned groups + typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; + + // note: update getState()/setState() when changing this object's state implementation + GroupMap messageGroups; // index: group name + GroupFifo freeGroups; // ordered by oldest free msg + //Consumers consumers; // index: consumer name + + static const std::string qpidMessageGroupKey; + static const std::string qpidSharedGroup; // if specified, one group can be consumed by multiple receivers + static const std::string qpidMessageGroupTimestamp; + + const std::string getGroupId( const QueuedMessage& qm ) const; + void unFree( const GroupState& state ) + { + GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + assert( pos != freeGroups.end() && pos->second == &state ); + freeGroups.erase( pos ); + } + void own( GroupState& state, const std::string& owner ) + { + state.owner = owner; + //consumers[state.owner]++; + unFree( state ); + } + void disown( GroupState& state ) + { + //assert(consumers[state.owner]); + //consumers[state.owner]--; + state.owner.clear(); + assert(state.members.size()); + assert(freeGroups.find(state.members.front()) == freeGroups.end()); + freeGroups[state.members.front()] = &state; + } + + public: + + static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId); + static boost::shared_ptr<MessageGroupManager> create( const std::string& qName, + Messages& messages, + const qpid::framing::FieldTable& settings ); + + MessageGroupManager(const std::string& header, const std::string& _qName, + Messages& container, unsigned int _timestamp=0 ) + : StatefulQueueObserver(std::string("MessageGroupManager:") + header), + groupIdHeader( header ), timestamp(_timestamp), messages(container), qName(_qName) {} + void enqueued( const QueuedMessage& qm ); + void acquired( const QueuedMessage& qm ); + void requeued( const QueuedMessage& qm ); + void dequeued( const QueuedMessage& qm ); + void consumerAdded( const Consumer& ); + void consumerRemoved( const Consumer& ); + void getState(qpid::framing::FieldTable& state ) const; + void setState(const qpid::framing::FieldTable&); + + // MessageDistributor iface + bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next); + bool allocate(const std::string& c, const QueuedMessage& qm); + bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next); + void query(qpid::types::Variant::Map&) const; + + bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const; +}; + +}} + +#endif diff --git a/cpp/src/qpid/broker/Messages.h b/cpp/src/qpid/broker/Messages.h index c535fd1936..448f17432a 100644 --- a/cpp/src/qpid/broker/Messages.h +++ b/cpp/src/qpid/broker/Messages.h @@ -76,7 +76,6 @@ class Messages * @return true if there is another message, false otherwise. */ virtual bool next(const framing::SequenceNumber&, QueuedMessage&) = 0; - /** * Note: Caller is responsible for ensuring that there is a front * (e.g. empty() returns false) diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index dd3f982699..3d878d02a8 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -33,6 +33,8 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" +#include "qpid/broker/FifoDistributor.h" +#include "qpid/broker/MessageGroupManager.h" #include "qpid/StringUtils.h" #include "qpid/log/Statement.h" @@ -42,6 +44,7 @@ #include "qpid/sys/ClusterSafe.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" @@ -111,7 +114,8 @@ Queue::Queue(const string& _name, bool _autodelete, broker(b), deleted(false), barrier(*this), - autoDeleteTimeout(0) + autoDeleteTimeout(0), + allocator(new FifoDistributor( *messages )) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); @@ -220,6 +224,7 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } + observeRequeue(msg, locker); } copy.notify(); } @@ -229,7 +234,7 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess Mutex::ScopedLock locker(messageLock); assertClusterSafe(); QPID_LOG(debug, "Attempting to acquire message at " << position); - if (messages->remove(position, message)) { + if (acquire(position, message, locker)) { QPID_LOG(debug, "Acquired message at " << position << " from " << name); return true; } else { @@ -238,9 +243,24 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess } } -bool Queue::acquire(const QueuedMessage& msg) { - QueuedMessage copy = msg; - return acquireMessageAt(msg.position, copy); +bool Queue::acquire(const QueuedMessage& msg, const std::string& consumer) +{ + Mutex::ScopedLock locker(messageLock); + assertClusterSafe(); + QPID_LOG(debug, consumer << " attempting to acquire message at " << msg.position); + + if (!allocator->allocate( consumer, msg )) { + QPID_LOG(debug, "Not permitted to acquire msg at " << msg.position << " from '" << name); + return false; + } + + QueuedMessage copy(msg); + if (acquire( msg.position, copy, locker)) { + QPID_LOG(debug, "Acquired message at " << msg.position << " from " << name); + return true; + } + QPID_LOG(debug, "Could not acquire message at " << msg.position << " from " << name << "; no message at that position"); + return false; } void Queue::notifyListener() @@ -256,7 +276,7 @@ void Queue::notifyListener() set.notify(); } -bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { checkNotDeleted(); if (c->preAcquires()) { @@ -274,46 +294,65 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) } } -Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { while (true) { Mutex::ScopedLock locker(messageLock); - if (messages->empty()) { - QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + QueuedMessage msg; + + if (!allocator->nextConsumableMessage(c, msg)) { // no next available + QPID_LOG(debug, "No messages available to dispatch to consumer " << + c->getName() << " on queue '" << name << "'"); listeners.addListener(c); return NO_MESSAGES; - } else { - QueuedMessage msg = messages->front(); - if (msg.payload->hasExpired()) { - QPID_LOG(debug, "Message expired from queue '" << name << "'"); - popAndDequeue(); - continue; - } + } - if (c->filter(msg.payload)) { - if (c->accept(msg.payload)) { - m = msg; - pop(); - return CONSUMED; - } else { - //message(s) are available but consumer hasn't got enough credit - QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); - return CANT_CONSUME; - } + if (msg.payload->hasExpired()) { + QPID_LOG(debug, "Message expired from queue '" << name << "'"); + c->position = msg.position; + acquire( msg.position, msg, locker); + dequeue( 0, msg ); + continue; + } + + // a message is available for this consumer - can the consumer use it? + + if (c->filter(msg.payload)) { + if (c->accept(msg.payload)) { + bool ok = allocator->allocate( c->getName(), msg ); // inform allocator + (void) ok; assert(ok); + ok = acquire( msg.position, msg, locker); + (void) ok; assert(ok); + m = msg; + c->position = m.position; + return CONSUMED; } else { - //consumer will never want this message - QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + //message(s) are available but consumer hasn't got enough credit + QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); return CANT_CONSUME; } + } else { + //consumer will never want this message + QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); + c->position = msg.position; + return CANT_CONSUME; } } } - -bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) +bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr& c) { - QueuedMessage msg(this); - while (seek(msg, c)) { + while (true) { + Mutex::ScopedLock locker(messageLock); + QueuedMessage msg; + + if (!allocator->nextBrowsableMessage(c, msg)) { // no next available + QPID_LOG(debug, "No browsable messages available for consumer " << + c->getName() << " on queue '" << name << "'"); + listeners.addListener(c); + return false; + } + if (c->filter(msg.payload) && !msg.payload->hasExpired()) { if (c->accept(msg.payload)) { //consumer wants the message @@ -327,8 +366,8 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) } } else { //consumer will never want this message, continue seeking - c->position = msg.position; QPID_LOG(debug, "Browser skipping message from '" << name << "'"); + c->position = msg.position; } } return false; @@ -358,61 +397,71 @@ bool Queue::dispatch(Consumer::shared_ptr c) } } -// Find the next message -bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { - Mutex::ScopedLock locker(messageLock); - if (messages->next(c->position, msg)) { - return true; - } else { - listeners.addListener(c); - return false; - } -} - -QueuedMessage Queue::find(SequenceNumber pos) const { +bool Queue::find(SequenceNumber pos, QueuedMessage& msg) const { Mutex::ScopedLock locker(messageLock); - QueuedMessage msg; - messages->find(pos, msg); - return msg; + if (messages->find(pos, msg)) + return true; + return false; } void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ assertClusterSafe(); - Mutex::ScopedLock locker(consumerLock); - if(exclusive) { - throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); - } else if(requestExclusive) { - if(consumerCount) { + { + Mutex::ScopedLock locker(consumerLock); + if(exclusive) { throw ResourceLockedException( - QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); - } else { - exclusive = c->getSession(); + QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); + } else if(requestExclusive) { + if(consumerCount) { + throw ResourceLockedException( + QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); + } else { + exclusive = c->getSession(); + } + } + consumerCount++; + if (mgmtObject != 0) + mgmtObject->inc_consumerCount (); + //reset auto deletion timer if necessary + if (autoDeleteTimeout && autoDeleteTask) { + autoDeleteTask->cancel(); } } - consumerCount++; - if (mgmtObject != 0) - mgmtObject->inc_consumerCount (); - //reset auto deletion timer if necessary - if (autoDeleteTimeout && autoDeleteTask) { - autoDeleteTask->cancel(); + Mutex::ScopedLock locker(messageLock); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerAdded(*c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); + } } } void Queue::cancel(Consumer::shared_ptr c){ removeListener(c); - Mutex::ScopedLock locker(consumerLock); - consumerCount--; - if(exclusive) exclusive = 0; - if (mgmtObject != 0) - mgmtObject->dec_consumerCount (); + { + Mutex::ScopedLock locker(consumerLock); + consumerCount--; + if(exclusive) exclusive = 0; + if (mgmtObject != 0) + mgmtObject->dec_consumerCount (); + } + Mutex::ScopedLock locker(messageLock); + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->consumerRemoved(*c); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); + } + } } QueuedMessage Queue::get(){ Mutex::ScopedLock locker(messageLock); QueuedMessage msg(this); - messages->pop(msg); + if (messages->pop(msg)) + observeAcquire(msg, locker); return msg; } @@ -443,11 +492,118 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) Mutex::ScopedLock locker(messageLock); messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } - for_each(expired.begin(), expired.end(), - boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); + i != expired.end(); ++i) { + { + Mutex::ScopedLock locker(messageLock); + observeAcquire(*i, locker); + } + dequeue( 0, *i ); + } } } + +namespace { + // for use with purge/move below - collect messages that match a given filter + // + class MessageFilter + { + public: + static const std::string typeKey; + static const std::string paramsKey; + static MessageFilter *create( const ::qpid::types::Variant::Map *filter ); + virtual bool match( const QueuedMessage& ) const { return true; } + virtual ~MessageFilter() {} + protected: + MessageFilter() {}; + }; + const std::string MessageFilter::typeKey("filter_type"); + const std::string MessageFilter::paramsKey("filter_params"); + + // filter by message header string value exact match + class HeaderMatchFilter : public MessageFilter + { + public: + /* Config: + { 'filter_type' : 'header_match_str', + 'filter_params' : { 'header_key' : "<header name>", + 'header_value' : "<value to match>" + } + } + */ + static const std::string typeKey; + static const std::string headerKey; + static const std::string valueKey; + HeaderMatchFilter( const std::string& _header, const std::string& _value ) + : MessageFilter (), header(_header), value(_value) {} + bool match( const QueuedMessage& msg ) const + { + const qpid::framing::FieldTable* headers = msg.payload->getApplicationHeaders(); + if (!headers) return false; + FieldTable::ValuePtr h = headers->get(header); + if (!h || !h->convertsTo<std::string>()) return false; + return h->get<std::string>() == value; + } + private: + const std::string header; + const std::string value; + }; + const std::string HeaderMatchFilter::typeKey("header_match_str"); + const std::string HeaderMatchFilter::headerKey("header_key"); + const std::string HeaderMatchFilter::valueKey("header_value"); + + // factory to create correct filter based on map + MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter ) + { + using namespace qpid::types; + if (filter && !filter->empty()) { + Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey); + if (i != filter->end()) { + + if (i->second.asString() == HeaderMatchFilter::typeKey) { + Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey); + if (p != filter->end() && p->second.getType() == VAR_MAP) { + Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey); + Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey); + if (k != p->second.asMap().end() && v != p->second.asMap().end()) { + std::string headerKey(k->second.asString()); + std::string value(v->second.asString()); + QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value ); + return new HeaderMatchFilter( headerKey, value ); + } + } + } + } + QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'"); + } + return new MessageFilter(); + } + + // used by removeIf() to collect all messages matching a filter, maximum match count is + // optional. + struct Collector { + const uint32_t maxMatches; + MessageFilter& filter; + std::deque<QueuedMessage> matches; + Collector(MessageFilter& filter, uint32_t max) + : maxMatches(max), filter(filter) {} + bool operator() (QueuedMessage& qm) + { + if (maxMatches == 0 || matches.size() < maxMatches) { + if (filter.match( qm )) { + matches.push_back(qm); + return true; + } + } + return false; + } + }; + +} // end namespace + + /** * purge - for purging all or some messages on a queue * depending on the purge_request @@ -459,62 +615,77 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) * The dest exchange may be supplied to re-route messages through the exchange. * It is safe to re-route messages such that they arrive back on the same queue, * even if the queue is ordered by priority. + * + * An optional filter can be supplied that will be applied against each message. The + * message is purged only if the filter matches. See MessageDistributor for more detail. */ -uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest) +uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest, + const qpid::types::Variant::Map *filter) { - Mutex::ScopedLock locker(messageLock); - uint32_t purge_count = purge_request; // only comes into play if >0 - std::deque<DeliverableMessage> rerouteQueue; + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), purge_request); - uint32_t count = 0; - // Either purge them all or just the some (purge_count) while the queue isn't empty. - while((!purge_request || purge_count--) && !messages->empty()) { + Mutex::ScopedLock locker(messageLock); + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + observeAcquire(*qmsg, locker); + dequeue(0, *qmsg); + // now reroute if necessary if (dest.get()) { - // - // If there is a destination exchange, stage the messages onto a reroute queue - // so they don't wind up getting purged more than once. - // - DeliverableMessage msg(messages->front().payload); - rerouteQueue.push_back(msg); + assert(qmsg->payload); + DeliverableMessage dmsg(qmsg->payload); + dest->routeWithAlternate(dmsg); } - popAndDequeue(); - count++; - } - - // - // Re-route purged messages into the destination exchange. Note that there's no need - // to test dest.get() here because if it is NULL, the rerouteQueue will be empty. - // - while (!rerouteQueue.empty()) { - DeliverableMessage msg(rerouteQueue.front()); - rerouteQueue.pop_front(); - dest->routeWithAlternate(msg); } - - return count; + return c.matches.size(); } -uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { +uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter) +{ + std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter)); + Collector c(*mf.get(), qty); + Mutex::ScopedLock locker(messageLock); - uint32_t move_count = qty; // only comes into play if qty >0 - uint32_t count = 0; // count how many were moved for returning + messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); - while((!qty || move_count--) && !messages->empty()) { - QueuedMessage qmsg = messages->front(); - boost::intrusive_ptr<Message> msg = qmsg.payload; - destq->deliver(msg); // deliver message to the destination queue - pop(); - dequeue(0, qmsg); - count++; + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); + qmsg != c.matches.end(); ++qmsg) { + // Update observers and message state: + observeAcquire(*qmsg, locker); + dequeue(0, *qmsg); + // and move to destination Queue. + assert(qmsg->payload); + destq->deliver(qmsg->payload); } - return count; + return c.matches.size(); } -void Queue::pop() +/** Acquire the front (oldest) message from the in-memory queue. + * assumes messageLock held by caller + */ +void Queue::pop(const Mutex::ScopedLock& locker) { assertClusterSafe(); - messages->pop(); - ++dequeueSincePurge; + QueuedMessage msg; + if (messages->pop(msg)) { + observeAcquire(msg, locker); + ++dequeueSincePurge; + } +} + +/** Acquire the message at the given position, return true and msg if acquire succeeds */ +bool Queue::acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const Mutex::ScopedLock& locker) +{ + if (messages->remove(position, msg)) { + observeAcquire(msg, locker); + ++dequeueSincePurge; + return true; + } + return false; } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ @@ -528,8 +699,10 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); + if (dequeueRequired) + observeAcquire(removed, locker); listeners.populate(copy); - enqueued(qm); + observeEnqueue(qm, locker); } copy.notify(); if (dequeueRequired) { @@ -663,7 +836,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) Mutex::ScopedLock locker(messageLock); if (!isEnqueued(msg)) return false; if (!ctxt) { - dequeued(msg); + observeDequeue(msg, locker); } } // This check prevents messages which have been forced persistent on one queue from dequeuing @@ -683,7 +856,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::dequeueCommitted(const QueuedMessage& msg) { Mutex::ScopedLock locker(messageLock); - dequeued(msg); + observeDequeue(msg, locker); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); @@ -691,21 +864,23 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) } /** - * Removes a message from the in-memory delivery queue as well - * dequeing it from the logical (and persistent if applicable) queue + * Removes the first (oldest) message from the in-memory delivery queue as well dequeing + * it from the logical (and persistent if applicable) queue */ -void Queue::popAndDequeue() +void Queue::popAndDequeue(const Mutex::ScopedLock& held) { - QueuedMessage msg = messages->front(); - pop(); - dequeue(0, msg); + if (!messages->empty()) { + QueuedMessage msg = messages->front(); + pop(held); + dequeue(0, msg); + } } /** * Updates policy and management when a message has been dequeued, * expects messageLock to be held */ -void Queue::dequeued(const QueuedMessage& msg) +void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); @@ -718,6 +893,33 @@ void Queue::dequeued(const QueuedMessage& msg) } } +/** updates queue observers when a message has become unavailable for transfer, + * expects messageLock to be held + */ +void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->acquired(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what()); + } + } +} + +/** updates queue observers when a message has become re-available for transfer, + * expects messageLock to be held + */ +void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) +{ + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { + try{ + (*i)->requeued(msg); + } catch (const std::exception& e) { + QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what()); + } + } +} void Queue::create(const FieldTable& _settings) { @@ -788,17 +990,28 @@ void Queue::configureImpl(const FieldTable& _settings) if (lvqKey.size()) { QPID_LOG(debug, "Configured queue " << getName() << " as Last Value Queue with key " << lvqKey); messages = std::auto_ptr<Messages>(new MessageMap(lvqKey)); + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); } else if (_settings.get(qpidLastValueQueueNoBrowse)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue with 'no-browse' on"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, true, broker); + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); } else if (_settings.get(qpidLastValueQueue)) { QPID_LOG(debug, "Configured queue " << getName() << " as Legacy Last Value Queue"); messages = LegacyLVQ::updateOrReplace(messages, qpidVQMatchProperty, false, broker); + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); } else { std::auto_ptr<Messages> m = Fairshare::create(_settings); if (m.get()) { messages = m; + allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *messages )); QPID_LOG(debug, "Configured queue " << getName() << " as priority queue."); + } else { // default (FIFO) queue type + // override default message allocator if message groups configured. + boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( getName(), *messages, _settings)); + if (mgm) { + allocator = mgm; + addObserver(mgm); + } } } @@ -835,7 +1048,7 @@ void Queue::destroyed() while(!messages->empty()){ DeliverableMessage msg(messages->front().payload); alternateExchange->routeWithAlternate(msg); - popAndDequeue(); + popAndDequeue(locker); } alternateExchange->decAlternateUsers(); } @@ -1057,7 +1270,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str case _qmf::Queue::METHOD_PURGE : { _qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args; - purge(purgeArgs.i_request); + purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1078,7 +1291,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str } } - purge(rerouteArgs.i_request, dest); + purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter); status = Manageable::STATUS_OK; } break; @@ -1087,6 +1300,14 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str return status; } + +void Queue::query(qpid::types::Variant::Map& results) const +{ + Mutex::ScopedLock locker(messageLock); + /** @todo add any interesting queue state into results */ + if (allocator) allocator->query(results); +} + void Queue::setPosition(SequenceNumber n) { Mutex::ScopedLock locker(messageLock); sequence = n; @@ -1121,7 +1342,10 @@ void Queue::insertSequenceNumbers(const std::string& key) QPID_LOG(debug, "Inserting sequence numbers as " << key); } -void Queue::enqueued(const QueuedMessage& m) +/** updates queue observers and state when a message has become available for transfer, + * expects messageLock to be held + */ +void Queue::observeEnqueue(const QueuedMessage& m, const Mutex::ScopedLock&) { for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { try { @@ -1144,7 +1368,8 @@ void Queue::updateEnqueued(const QueuedMessage& m) if (policy.get()) { policy->recoverEnqueued(payload); } - enqueued(m); + Mutex::ScopedLock locker(messageLock); + observeEnqueue(m, locker); } else { QPID_LOG(warning, "Queue informed of enqueued message that has no payload"); } @@ -1168,6 +1393,7 @@ void Queue::checkNotDeleted() void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) { + Mutex::ScopedLock locker(messageLock); observers.insert(observer); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 8435e75cab..59ae41e768 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -59,7 +59,7 @@ class MessageStore; class QueueEvents; class QueueRegistry; class TransactionContext; -class Exchange; +class MessageDistributor; /** * The brokers representation of an amqp queue. Messages are @@ -129,23 +129,32 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; + boost::shared_ptr<MessageDistributor> allocator; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); - bool seek(QueuedMessage& msg, Consumer::shared_ptr position); - bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); void notifyListener(); void removeListener(Consumer::shared_ptr); bool isExcluded(boost::intrusive_ptr<Message>& msg); - void enqueued(const QueuedMessage& msg); - void dequeued(const QueuedMessage& msg); - void pop(); - void popAndDequeue(); + /** update queue observers, stats, policy, etc when the messages' state changes. Lock + * must be held by caller */ + void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + + /** modify the Queue's message container - assumes messageLock held */ + void pop(const sys::Mutex::ScopedLock& held); // acquire front msg + void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg + // acquire message @ position, return true and set msg if acquire succeeds + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const sys::Mutex::ScopedLock& held); void forcePersistent(QueuedMessage& msg); int getEventMode(); @@ -191,8 +200,15 @@ class Queue : public boost::enable_shared_from_this<Queue>, Broker* broker = 0); QPID_BROKER_EXTERN ~Queue(); + /** allow the Consumer to consume or browse the next available message */ QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); + /** allow the Consumer to acquire a message that it has browsed. + * @param msg - message to be acquired. + * @return false if message is no longer available for acquire. + */ + QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer); + /** * Used to configure a new queue and create a persistent record * for it in store if required. @@ -216,7 +232,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); - QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); + /** Acquire the message at the given position if it is available for acquire. Not to + * be used by clients, but used by the broker for queue management. + * @param message - set to the acquired message if true returned. + * @return true if the message has been acquired. + */ QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); /** @@ -245,11 +265,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages + uint32_t purge(const uint32_t purge_request=0, //defaults to all messages + boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(), + const ::qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -302,12 +325,12 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool isEnqueued(const QueuedMessage& msg); /** - * Gets the next available message + * Acquires the next available (oldest) message */ QPID_BROKER_EXTERN QueuedMessage get(); - /** Get the message at position pos */ - QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const; + /** Get the message at position pos, returns true if found and sets msg */ + QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const; const QueuePolicy* getPolicy(); @@ -336,6 +359,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + void query(::qpid::types::Variant::Map&) const; /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { diff --git a/cpp/src/qpid/broker/QueueEvents.cpp b/cpp/src/qpid/broker/QueueEvents.cpp index 2c540ff1ad..c66bdabf0f 100644 --- a/cpp/src/qpid/broker/QueueEvents.cpp +++ b/cpp/src/qpid/broker/QueueEvents.cpp @@ -129,6 +129,10 @@ class EventGenerator : public QueueObserver { if (!enqueueOnly) manager.dequeued(m); } + + void acquired(const QueuedMessage&) {}; + void requeued(const QueuedMessage&) {}; + private: QueueEvents& manager; const bool enqueueOnly; diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index fcf8d089f9..f15bb45c01 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -377,7 +377,8 @@ void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) ++i; fcmsg.add(first, last); for (SequenceNumber seq = first; seq <= last; ++seq) { - QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked + QueuedMessage msg; + queue->find(seq, msg); // fyi: may not be found if msg is acquired & unacked bool unique; unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; // Like this to avoid tripping up unused variable warning when NDEBUG set diff --git a/cpp/src/qpid/broker/QueueFlowLimit.h b/cpp/src/qpid/broker/QueueFlowLimit.h index c02e479976..ad8a2720ef 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/cpp/src/qpid/broker/QueueFlowLimit.h @@ -84,6 +84,9 @@ class Broker; QPID_BROKER_EXTERN void enqueued(const QueuedMessage&); /** the queue has removed QueuedMessage. Returns true if flow state changes */ QPID_BROKER_EXTERN void dequeued(const QueuedMessage&); + /** ignored */ + QPID_BROKER_EXTERN void acquired(const QueuedMessage&) {}; + QPID_BROKER_EXTERN void requeued(const QueuedMessage&) {}; /** for clustering: */ QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const; diff --git a/cpp/src/qpid/broker/QueueObserver.h b/cpp/src/qpid/broker/QueueObserver.h index 3ca01c051e..b58becd2ae 100644 --- a/cpp/src/qpid/broker/QueueObserver.h +++ b/cpp/src/qpid/broker/QueueObserver.h @@ -25,17 +25,51 @@ namespace qpid { namespace broker { struct QueuedMessage; +class Consumer; + /** - * Interface for notifying classes who want to act as 'observers' of a - * queue of particular events. + * Interface for notifying classes who want to act as 'observers' of a queue of particular + * events. + * + * The events that are monitored reflect the relationship between a particular message and + * the queue it has been delivered to. A message can be considered in one of three states + * with respect to the queue: + * + * 1) "Available" - available for transfer to consumers (i.e. for browse or acquire), + * + * 2) "Acquired" - owned by a particular consumer, no longer available to other consumers + * (by either browse or acquire), but still considered on the queue. + * + * 3) "Dequeued" - removed from the queue and no longer available to any consumer. + * + * The queue events that are observable are: + * + * "Enqueued" - the message is "Available" - on the queue for transfer to any consumer + * (e.g. browse or acquire) + * + * "Acquired" - - a consumer has claimed exclusive access to it. It is no longer available + * for other consumers to browse or acquire, but it is not yet considered dequeued as it + * may be requeued by the consumer. + * + * "Requeued" - a previously-acquired message is released by its owner: it is put back on + * the queue at its original position and returns to the "Available" state. + * + * "Dequeued" - a message is no longer queued. At this point, the queue no longer tracks + * the message, and the broker considers the consumer's transaction complete. */ class QueueObserver { public: virtual ~QueueObserver() {} + + // note: the Queue will hold the messageLock while calling these methods! virtual void enqueued(const QueuedMessage&) = 0; virtual void dequeued(const QueuedMessage&) = 0; - private: + virtual void acquired(const QueuedMessage&) = 0; + virtual void requeued(const QueuedMessage&) = 0; + virtual void consumerAdded( const Consumer& ) {}; + virtual void consumerRemoved( const Consumer& ) {}; + private: }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index 6ae0d53b1a..0c245700af 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -269,8 +269,7 @@ bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) do { QueuedMessage oldest = queue.front(); - - if (oldest.queue->acquire(oldest) || !strict) { + if (oldest.queue->acquireMessageAt(oldest.position, oldest) || !strict) { queue.pop_front(); pendingDequeues.push_back(oldest); QPID_LOG(debug, "Ring policy triggered in " << name diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index b4f146e699..94d0cc87f7 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -107,11 +107,18 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } +namespace { + const std::string SEPARATOR("::"); +} + void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); + // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). + // Create a globally unique name so the broker can identify individual consumers + std::string name = session.getSessionId().str() + SEPARATOR + tag; + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -267,15 +274,15 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, bool ack, bool _acquire, bool _exclusive, + const string& _tag, const string& _resumeId, uint64_t _resumeTtl, const framing::FieldTable& _arguments ) : - Consumer(_acquire), + Consumer(_name, _acquire), parent(_parent), - name(_name), queue(_queue), ackExpected(ack), acquire(_acquire), @@ -284,6 +291,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, windowActive(false), exclusive(_exclusive), resumeId(_resumeId), + tag(_tag), resumeTtl(_resumeTtl), arguments(_arguments), msgCredit(0), @@ -300,7 +308,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); @@ -332,16 +340,15 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); - if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); } - if (acquire && !ackExpected) { - queue->dequeue(0, msg); + if (acquire && !ackExpected) { // auto acquire && auto accept + record.accept( 0 /*no ctxt*/ ); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; @@ -371,7 +378,7 @@ struct ConsumerName { }; ostream& operator<<(ostream& o, const ConsumerName& pc) { - return o << pc.consumer.getName() << " on " + return o << pc.consumer.getTag() << " on " << pc.consumer.getParent().getSession().getSessionId(); } } @@ -561,50 +568,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync) return deliveryAdapter.deliver(msg, sync); } -SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { - ConsumerImplMap::iterator i = consumers.find(destination); - if (i == consumers.end()) { - throw NotFoundException(QPID_MSG("Unknown destination " << destination)); + ConsumerImpl::shared_ptr consumer; + if (!find(destination, consumer)) { + throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId())); } else { - return *(i->second); + return consumer; + } +} + +bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const +{ + // @todo KAG gsim: shouldn't the consumers map be locked???? + ConsumerImplMap::const_iterator i = consumers.find(destination); + if (i == consumers.end()) { + return false; } + consumer = i->second; + return true; } void SemanticState::setWindowMode(const std::string& destination) { - find(destination).setWindowMode(); + find(destination)->setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination).setCreditMode(); + find(destination)->setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addByteCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addByteCredit(value); + c->requestDispatch(); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addMessageCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addMessageCredit(value); + c->requestDispatch(); } void SemanticState::flush(const std::string& destination) { - find(destination).flush(); + find(destination)->flush(); } void SemanticState::stop(const std::string& destination) { - find(destination).stop(); + find(destination)->stop(); } void SemanticState::ConsumerImpl::setWindowMode() diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 69d980947b..12ccc75f11 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -75,7 +75,6 @@ class SemanticState : private boost::noncopyable { { mutable qpid::sys::Mutex lock; SemanticState* const parent; - const std::string name; const boost::shared_ptr<Queue> queue; const bool ackExpected; const bool acquire; @@ -84,6 +83,7 @@ class SemanticState : private boost::noncopyable { bool windowActive; bool exclusive; std::string resumeId; + const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command uint64_t resumeTtl; framing::FieldTable arguments; uint32_t msgCredit; @@ -103,7 +103,8 @@ class SemanticState : private boost::noncopyable { ConsumerImpl(SemanticState* parent, const std::string& name, boost::shared_ptr<Queue> queue, bool ack, bool acquire, bool exclusive, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + const std::string& tag, const std::string& resumeId, + uint64_t resumeTtl, const framing::FieldTable& arguments); ~ConsumerImpl(); OwnershipToken* getSession(); bool deliver(QueuedMessage& msg); @@ -130,8 +131,6 @@ class SemanticState : private boost::noncopyable { bool doOutput(); - std::string getName() const { return name; } - bool isAckExpected() const { return ackExpected; } bool isAcquire() const { return acquire; } bool isWindowing() const { return windowing; } @@ -139,6 +138,7 @@ class SemanticState : private boost::noncopyable { uint32_t getMsgCredit() const { return msgCredit; } uint32_t getByteCredit() const { return byteCredit; } std::string getResumeId() const { return resumeId; }; + const std::string& getTag() const { return tag; } uint64_t getResumeTtl() const { return resumeTtl; } const framing::FieldTable& getArguments() const { return arguments; } @@ -190,7 +190,8 @@ class SemanticState : private boost::noncopyable { SessionContext& getSession() { return session; } const SessionContext& getSession() const { return session; } - ConsumerImpl& find(const std::string& destination); + const ConsumerImpl::shared_ptr find(const std::string& destination) const; + bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const; /** * Get named queue, never returns 0. diff --git a/cpp/src/qpid/broker/ThresholdAlerts.h b/cpp/src/qpid/broker/ThresholdAlerts.h index c77722e700..2b4a46b736 100644 --- a/cpp/src/qpid/broker/ThresholdAlerts.h +++ b/cpp/src/qpid/broker/ThresholdAlerts.h @@ -50,6 +50,9 @@ class ThresholdAlerts : public QueueObserver const long repeatInterval); void enqueued(const QueuedMessage&); void dequeued(const QueuedMessage&); + void acquired(const QueuedMessage&) {}; + void requeued(const QueuedMessage&) {}; + static void observe(Queue& queue, qpid::management::ManagementAgent& agent, const uint64_t countThreshold, const uint64_t sizeThreshold, diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 015301573e..394749aad2 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std::string& userId) { void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) { - broker::SemanticState::ConsumerImpl& c = semanticState().find(name); - c.position = position; - c.setBlocked(blocked); - if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - updateIn.consumerNumbering.add(c.shared_from_this()); + broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); + c->position = position; + c->setBlocked(blocked); + if (notifyEnabled) c->enableNotify(); else c->disableNotify(); + updateIn.consumerNumbering.add(c); } @@ -444,7 +444,7 @@ void Connection::outputTask(uint16_t channel, const std::string& name) { if (!session) throw Exception(QPID_MSG(cluster << " channel not attached " << *this << "[" << channel << "] ")); - OutputTask* task = &session->getSemanticState().find(name); + OutputTask* task = session->getSemanticState().find(name).get(); connection->getOutputTasks().addOutputTask(task); } @@ -547,7 +547,7 @@ void Connection::deliveryRecord(const string& qname, m.position = position; if (enqueued) queue->updateEnqueued(m); //inform queue of the message } else { // Message at original position in original queue - m = queue->find(position); + queue->find(position, m); } // FIXME aconway 2011-08-19: removed: // if (!m.payload) diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 2be1bf1f77..2446c12f2b 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -421,8 +421,8 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) { boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); uint16_t channel = ci->getParent().getSession().getChannel(); - ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, *this << " updating output task " << ci->getName() + ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getTag()); + QPID_LOG(debug, *this << " updating output task " << ci->getTag() << " channel=" << channel); } @@ -521,13 +521,13 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { void UpdateClient::updateConsumer( const broker::SemanticState::ConsumerImpl::shared_ptr& ci) { - QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " + QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), - arg::destination = ci->getName(), + arg::destination = ci->getTag(), arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, arg::exclusive = ci->isExclusive(), @@ -535,18 +535,18 @@ void UpdateClient::updateConsumer( arg::resumeTtl = ci->getResumeTtl(), arg::arguments = ci->getArguments() ); - shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit()); ClusterConnectionProxy(shadowSession).consumerState( - ci->getName(), + ci->getTag(), ci->isBlocked(), ci->isNotifyEnabled(), ci->position ); consumerNumbering.add(ci.get()); - QPID_LOG(debug, *this << " updated consumer " << ci->getName() + QPID_LOG(debug, *this << " updated consumer " << ci->getTag() << " on " << shadowSession.getId()); } |
