diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.h')
-rw-r--r-- | cpp/src/qpid/broker/Queue.h | 131 |
1 files changed, 91 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 12a3d273be..59ae41e768 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -32,9 +32,9 @@ #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" #include "qpid/broker/QueueObserver.h" -#include "qpid/broker/RateTracker.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" @@ -59,7 +59,7 @@ class MessageStore; class QueueEvents; class QueueRegistry; class TransactionContext; -class Exchange; +class MessageDistributor; /** * The brokers representation of an amqp queue. Messages are @@ -74,13 +74,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, { Queue& parent; uint count; - + UsageBarrier(Queue&); bool acquire(); void release(); void destroy(); }; - + struct ScopedUse { UsageBarrier& barrier; @@ -88,7 +88,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {} ~ScopedUse() { if (acquired) barrier.release(); } }; - + typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; @@ -119,7 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; - RateTracker dequeueTracker; + sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; bool insertSeqNo; @@ -129,26 +129,36 @@ class Queue : public boost::enable_shared_from_this<Queue>, UsageBarrier barrier; int autoDeleteTimeout; boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; + boost::shared_ptr<MessageDistributor> allocator; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); - bool seek(QueuedMessage& msg, Consumer::shared_ptr position); - bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); + bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr& c); void notifyListener(); void removeListener(Consumer::shared_ptr); bool isExcluded(boost::intrusive_ptr<Message>& msg); - void enqueued(const QueuedMessage& msg); - void dequeued(const QueuedMessage& msg); - void pop(); - void popAndDequeue(); - QueuedMessage getFront(); + /** update queue observers, stats, policy, etc when the messages' state changes. Lock + * must be held by caller */ + void observeEnqueue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeAcquire(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeRequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + void observeDequeue(const QueuedMessage& msg, const sys::Mutex::ScopedLock& lock); + + /** modify the Queue's message container - assumes messageLock held */ + void pop(const sys::Mutex::ScopedLock& held); // acquire front msg + void popAndDequeue(const sys::Mutex::ScopedLock& held); // acquire and dequeue front msg + // acquire message @ position, return true and set msg if acquire succeeds + bool acquire(const qpid::framing::SequenceNumber& position, QueuedMessage& msg, + const sys::Mutex::ScopedLock& held); + void forcePersistent(QueuedMessage& msg); int getEventMode(); + void configureImpl(const qpid::framing::FieldTable& settings); inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) { @@ -172,8 +182,9 @@ class Queue : public boost::enable_shared_from_this<Queue>, } } } - + void checkNotDeleted(); + void notifyDeleted(); public: @@ -182,29 +193,50 @@ class Queue : public boost::enable_shared_from_this<Queue>, typedef std::vector<shared_ptr> vector; QPID_BROKER_EXTERN Queue(const std::string& name, - bool autodelete = false, - MessageStore* const store = 0, + bool autodelete = false, + MessageStore* const store = 0, const OwnershipToken* const owner = 0, management::Manageable* parent = 0, Broker* broker = 0); QPID_BROKER_EXTERN ~Queue(); + /** allow the Consumer to consume or browse the next available message */ QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); - void create(const qpid::framing::FieldTable& settings); + /** allow the Consumer to acquire a message that it has browsed. + * @param msg - message to be acquired. + * @return false if message is no longer available for acquire. + */ + QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg, const std::string& consumer); - // "recovering" means we are doing a MessageStore recovery. - QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings, - bool recovering = false); - void destroy(); - void notifyDeleted(); + /** + * Used to configure a new queue and create a persistent record + * for it in store if required. + */ + QPID_BROKER_EXTERN void create(const qpid::framing::FieldTable& settings); + + /** + * Used to reconfigure a recovered queue (does not create + * persistent record in store). + */ + QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings); + void destroyed(); QPID_BROKER_EXTERN void bound(const std::string& exchange, const std::string& key, const qpid::framing::FieldTable& args); - QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges, - Queue::shared_ptr shared_ref); + //TODO: get unbind out of the public interface; only there for purposes of one unit test + QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges); + /** + * Bind self to specified exchange, and record that binding for unbinding on delete. + */ + bool bind(boost::shared_ptr<Exchange> exchange, const std::string& key, + const qpid::framing::FieldTable& arguments=qpid::framing::FieldTable()); - QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); + /** Acquire the message at the given position if it is available for acquire. Not to + * be used by clients, but used by the broker for queue management. + * @param message - set to the acquired message if true returned. + * @return true if the message has been acquired. + */ QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); /** @@ -233,11 +265,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool exclusive = false); QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages - QPID_BROKER_EXTERN void purgeExpired(); + uint32_t purge(const uint32_t purge_request=0, //defaults to all messages + boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>(), + const ::qpid::types::Variant::Map *filter=0); + QPID_BROKER_EXTERN void purgeExpired(sys::Duration); //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + uint32_t move(const Queue::shared_ptr destq, uint32_t qty, + const qpid::types::Variant::Map *filter=0); QPID_BROKER_EXTERN uint32_t getMessageCount() const; QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; @@ -276,8 +311,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, * Inform queue of messages that were enqueued, have since * been acquired but not yet accepted or released (and * thus are still logically on the queue) - used in - * clustered broker. - */ + * clustered broker. + */ void updateEnqueued(const QueuedMessage& msg); /** @@ -288,14 +323,14 @@ class Queue : public boost::enable_shared_from_this<Queue>, * accepted it). */ bool isEnqueued(const QueuedMessage& msg); - + /** - * Gets the next available message + * Acquires the next available (oldest) message */ QPID_BROKER_EXTERN QueuedMessage get(); - /** Get the message at position pos */ - QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const; + /** Get the message at position pos, returns true if found and sets msg */ + QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, QueuedMessage& msg ) const; const QueuePolicy* getPolicy(); @@ -309,8 +344,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, void encode(framing::Buffer& buffer) const; uint32_t encodedSize() const; - // "recovering" means we are doing a MessageStore recovery. - static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false ); + /** + * Restores a queue from encoded data (used in recovery) + * + * Note: restored queue will be neither auto-deleted or have an + * exclusive owner + */ + static Queue::shared_ptr restore(QueueRegistry& queues, framing::Buffer& buffer); static void tryAutoDelete(Broker& broker, Queue::shared_ptr); virtual void setExternalQueueStore(ExternalQueueStore* inst); @@ -319,6 +359,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + void query(::qpid::types::Variant::Map&) const; /** Apply f to each Message on the queue. */ template <class F> void eachMessage(F f) { @@ -331,6 +372,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, bindings.eachBinding(f); } + /** Apply f to each Observer on the queue */ + template <class F> void eachObserver(F f) { + std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); + } + /** Set the position sequence number for the next message on the queue. * Must be >= the current sequence number. * Used by cluster to replicate queues. @@ -358,6 +404,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, void recoverPrepared(boost::intrusive_ptr<Message>& msg); void flush(); + + const Broker* getBroker(); + + uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); } + void setDequeueSincePurge(uint32_t value); }; } } |