diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.h | 37 | ||||
| -rw-r--r-- | cpp/src/qpid/client/QueueOptions.cpp | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/client/QueueOptions.h | 119 |
4 files changed, 147 insertions, 147 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index fb8bd1288f..b153d14720 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -54,15 +54,15 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { - const std::string qpidMaxSize("qpid.max_size"); - const std::string qpidMaxCount("qpid.max_count"); - const std::string qpidNoLocal("no-local"); - const std::string qpidTraceIdentity("qpid.trace.id"); - const std::string qpidTraceExclude("qpid.trace.exclude"); - const std::string qpidLastValueQueue("qpid.last_value_queue"); - const std::string qpidOptimisticConsume("qpid.optimistic_consume"); - const std::string qpidPersistLastNode("qpid.persist_last_node"); - const std::string qpidVQMatchProperty("qpid.LVQ_key"); +const std::string qpidMaxSize("qpid.max_size"); +const std::string qpidMaxCount("qpid.max_count"); +const std::string qpidNoLocal("no-local"); +const std::string qpidTraceIdentity("qpid.trace.id"); +const std::string qpidTraceExclude("qpid.trace.exclude"); +const std::string qpidLastValueQueue("qpid.last_value_queue"); +const std::string qpidOptimisticConsume("qpid.optimistic_consume"); +const std::string qpidPersistLastNode("qpid.persist_last_node"); +const std::string qpidVQMatchProperty("qpid.LVQ_key"); } @@ -81,7 +81,7 @@ Queue::Queue(const string& _name, bool _autodelete, lastValueQueue(false), optimisticConsume(false), persistLastNode(false), - inLastNodeFailure(false), + inLastNodeFailure(false), persistenceId(0), policyExceeded(false), mgmtObject(0) @@ -156,7 +156,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); }else { - push(msg); + push(msg); } mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -179,10 +179,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); mgntEnqStats(msg); - if (mgmtObject != 0){ + if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); - } + } } void Queue::requeue(const QueuedMessage& msg){ @@ -432,9 +432,9 @@ void Queue::popMsg(QueuedMessage& qmsg) { if (lastValueQueue){ const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); - string key = ft->getString(qpidVQMatchProperty); - lvq.erase(key); - } + string key = ft->getString(qpidVQMatchProperty); + lvq.erase(key); + } messages.pop_front(); } @@ -446,24 +446,24 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ if (policy.get()) policy->tryEnqueue(qm); //if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of existing message - LVQ::iterator i; - if (lastValueQueue){ - const framing::FieldTable* ft = msg->getApplicationHeaders(); - string key = ft->getString(qpidVQMatchProperty); + LVQ::iterator i; + if (lastValueQueue){ + const framing::FieldTable* ft = msg->getApplicationHeaders(); + string key = ft->getString(qpidVQMatchProperty); - i = lvq.find(key); - if (i == lvq.end()){ + i = lvq.find(key); + if (i == lvq.end()){ messages.push_back(qm); listeners.swap(copy); - lvq[key] = &messages.back(); - }else { - i->second->payload = msg; - } - }else { + lvq[key] = &messages.back(); + }else { + i->second->payload = msg; + } + }else { messages.push_back(qm); listeners.swap(copy); - } + } } for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } @@ -501,31 +501,31 @@ void Queue::clearLastNodeFailure() void Queue::setLastNodeFailure() { if (persistLastNode){ - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - i->payload->forcePersistent(); - if (i->payload->getPersistenceId() == 0){ + i->payload->forcePersistent(); + if (i->payload->getPersistenceId() == 0){ enqueue(0, i->payload); - } + } } - inLastNodeFailure = true; - } + inLastNodeFailure = true; + } } // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (inLastNodeFailure && persistLastNode){ - msg->forcePersistent(); - } + msg->forcePersistent(); + } - if (traceId.size()) { + if (traceId.size()) { msg->addTraceId(traceId); } if (msg->isPersistent() && store) { - msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); + msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; } @@ -542,7 +542,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) } if (msg.payload->isPersistent() && store) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); return true; } @@ -557,7 +557,7 @@ void Queue::popAndDequeue() { QueuedMessage msg = messages.front(); popMsg(msg); - dequeue(0, msg); + dequeue(0, msg); } /** @@ -593,7 +593,7 @@ void Queue::configure(const FieldTable& _settings) optimisticConsume= _settings.get(qpidOptimisticConsume); if (optimisticConsume) QPID_LOG(debug, "Configured queue with optimistic consume"); - persistLastNode= _settings.get(qpidPersistLastNode); + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); traceId = _settings.getString(qpidTraceIdentity); @@ -783,7 +783,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str switch (methodId) { - case _qmf::Queue::METHOD_PURGE : + case _qmf::Queue::METHOD_PURGE : _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args; purge (iargs.i_request); status = Manageable::STATUS_OK; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 213a36d59d..9a7c90181c 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -77,12 +77,12 @@ namespace qpid { bool lastValueQueue; bool optimisticConsume; bool persistLastNode; - bool inLastNodeFailure; + bool inLastNodeFailure; std::string traceId; std::vector<std::string> traceExclude; Listeners listeners; Messages messages; - LVQ lvq; + LVQ lvq; mutable qpid::sys::Mutex consumerLock; mutable qpid::sys::Mutex messageLock; mutable qpid::sys::Mutex ownershipLock; @@ -109,26 +109,29 @@ namespace qpid { void dequeued(const QueuedMessage& msg); void popAndDequeue(); - inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg){ + + inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) + { if (mgmtObject != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); if (msg->isPersistent ()) { - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + } + } + } + inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg) + { + if (mgmtObject != 0){ + mgmtObject->inc_msgTotalDequeues (); + mgmtObject->inc_byteTotalDequeues (msg->contentSize()); + if (msg->isPersistent ()){ + mgmtObject->inc_msgPersistDequeues (); + mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + } } - }; - inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg){ - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg->contentSize()); - if (msg->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg->contentSize()); - } - } - }; + } public: diff --git a/cpp/src/qpid/client/QueueOptions.cpp b/cpp/src/qpid/client/QueueOptions.cpp index 5d1cb74efd..9719183ec4 100644 --- a/cpp/src/qpid/client/QueueOptions.cpp +++ b/cpp/src/qpid/client/QueueOptions.cpp @@ -49,45 +49,45 @@ void QueueOptions::setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t if (maxCount) setInt(strMaxCountKey, maxCount); if (maxSize) setInt(strMaxSizeKey, maxSize); if (maxSize || maxCount){ - switch (sp) - { - case REJECT: - setString(strTypeKey, strREJECT); - break; - case FLOW_TO_DISK: - setString(strTypeKey, strFLOW_TO_DISK); - break; - case RING: - setString(strTypeKey, strRING); - break; - case RING_STRICT: - setString(strTypeKey, strRING_STRICT); - break; - case NONE: - clearSizePolicy(); - break; - } - } + switch (sp) + { + case REJECT: + setString(strTypeKey, strREJECT); + break; + case FLOW_TO_DISK: + setString(strTypeKey, strFLOW_TO_DISK); + break; + case RING: + setString(strTypeKey, strRING); + break; + case RING_STRICT: + setString(strTypeKey, strRING_STRICT); + break; + case NONE: + clearSizePolicy(); + break; + } + } } void QueueOptions::setOptimisticConsume() { - setInt(strOptimisticConsume, 1); + setInt(strOptimisticConsume, 1); } void QueueOptions::setPersistLastNode() { - setInt(strPersistLastNode, 1); + setInt(strPersistLastNode, 1); } void QueueOptions::setOrdering(QueueOrderingPolicy op) { - if (op == LVQ){ - setInt(strLastValueQueue, 1); - }else{ - clearOrdering(); - } + if (op == LVQ){ + setInt(strLastValueQueue, 1); + }else{ + clearOrdering(); + } } void QueueOptions::getLVQKey(std::string& key) diff --git a/cpp/src/qpid/client/QueueOptions.h b/cpp/src/qpid/client/QueueOptions.h index 37cb8616e3..149bb6c34a 100644 --- a/cpp/src/qpid/client/QueueOptions.h +++ b/cpp/src/qpid/client/QueueOptions.h @@ -39,76 +39,73 @@ class QueueOptions: public framing::FieldTable QueueOptions(); virtual ~QueueOptions(); - /** - * Sets the queue sizing plocy - * - * @param sp SizePolicy - * REJECT - reject if queue greater than size/count - * FLOW_TO_DISK - page messages to disk from this point is greater than size/count - * RING - limit the queue to size/count and over-write old messages round a ring - * RING_STRICT - limit the queue to size/count and reject is head == tail - * NONE - Use default broker sizing policy - * @param maxSize Set the max number of bytes for the sizing policies - * @param setMaxCount Set the max number of messages for the sizing policies - */ - void setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t maxCount ); + /** + * Sets the queue sizing plocy + * + * @param sp SizePolicy + * REJECT - reject if queue greater than size/count + * FLOW_TO_DISK - page messages to disk from this point is greater than size/count + * RING - limit the queue to size/count and over-write old messages round a ring + * RING_STRICT - limit the queue to size/count and reject is head == tail + * NONE - Use default broker sizing policy + * @param maxSize Set the max number of bytes for the sizing policies + * @param setMaxCount Set the max number of messages for the sizing policies + */ + void setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t maxCount ); - /** - * Enables optimistic consume allowing the consumer to dequeue the message before the - * broker has safe stored it. - */ - void setOptimisticConsume(); + /** + * Enables optimistic consume allowing the consumer to dequeue the message before the + * broker has safe stored it. + */ + void setOptimisticConsume(); - /** - * Enables the persisting of a queue to the store module when a cluster fails down to it's last - * node. Does so optimistically. Will start persisting when cluster count >1 again. - */ - void setPersistLastNode(); + /** + * Enables the persisting of a queue to the store module when a cluster fails down to it's last + * node. Does so optimistically. Will start persisting when cluster count >1 again. + */ + void setPersistLastNode(); - /** - * Sets the odering policy on the Queue, default ordering is FIFO. - */ - void setOrdering(QueueOrderingPolicy op); + /** + * Sets the odering policy on the Queue, default ordering is FIFO. + */ + void setOrdering(QueueOrderingPolicy op); - /** - * Use broker defualt sizing ploicy - */ - void clearSizePolicy(); + /** + * Use broker defualt sizing ploicy + */ + void clearSizePolicy(); - /** - * Clear Optimistic Consume Policy - */ - void clearOptimisticConsume(); + /** + * Clear Optimistic Consume Policy + */ + void clearOptimisticConsume(); - /** - * Clear Persist Last Node Policy - */ - void clearPersistLastNode(); + /** + * Clear Persist Last Node Policy + */ + void clearPersistLastNode(); - /** - * get the key used match LVQ in args for message transfer - */ - void getLVQKey(std::string& key); + /** + * get the key used match LVQ in args for message transfer + */ + void getLVQKey(std::string& key); - /** - * Use default odering policy - */ - void clearOrdering(); + /** + * Use default odering policy + */ + void clearOrdering(); - static const std::string strMaxCountKey; - static const std::string strMaxSizeKey; - static const std::string strTypeKey; - static const std::string strREJECT; - static const std::string strFLOW_TO_DISK; - static const std::string strRING; - static const std::string strRING_STRICT; - static const std::string strLastValueQueue; - static const std::string strOptimisticConsume; - static const std::string strPersistLastNode; - static const std::string strLVQMatchProperty; - - - + static const std::string strMaxCountKey; + static const std::string strMaxSizeKey; + static const std::string strTypeKey; + static const std::string strREJECT; + static const std::string strFLOW_TO_DISK; + static const std::string strRING; + static const std::string strRING_STRICT; + static const std::string strLastValueQueue; + static const std::string strOptimisticConsume; + static const std::string strPersistLastNode; + static const std::string strLVQMatchProperty; }; } |
