diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 86 |
1 files changed, 43 insertions, 43 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index fb8bd1288f..b153d14720 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -54,15 +54,15 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { - const std::string qpidMaxSize("qpid.max_size"); - const std::string qpidMaxCount("qpid.max_count"); - const std::string qpidNoLocal("no-local"); - const std::string qpidTraceIdentity("qpid.trace.id"); - const std::string qpidTraceExclude("qpid.trace.exclude"); - const std::string qpidLastValueQueue("qpid.last_value_queue"); - const std::string qpidOptimisticConsume("qpid.optimistic_consume"); - const std::string qpidPersistLastNode("qpid.persist_last_node"); - const std::string qpidVQMatchProperty("qpid.LVQ_key"); +const std::string qpidMaxSize("qpid.max_size"); +const std::string qpidMaxCount("qpid.max_count"); +const std::string qpidNoLocal("no-local"); +const std::string qpidTraceIdentity("qpid.trace.id"); +const std::string qpidTraceExclude("qpid.trace.exclude"); +const std::string qpidLastValueQueue("qpid.last_value_queue"); +const std::string qpidOptimisticConsume("qpid.optimistic_consume"); +const std::string qpidPersistLastNode("qpid.persist_last_node"); +const std::string qpidVQMatchProperty("qpid.LVQ_key"); } @@ -81,7 +81,7 @@ Queue::Queue(const string& _name, bool _autodelete, lastValueQueue(false), optimisticConsume(false), persistLastNode(false), - inLastNodeFailure(false), + inLastNodeFailure(false), persistenceId(0), policyExceeded(false), mgmtObject(0) @@ -156,7 +156,7 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ push(msg); msg->enqueueComplete(); }else { - push(msg); + push(msg); } mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -179,10 +179,10 @@ void Queue::recover(boost::intrusive_ptr<Message>& msg){ void Queue::process(boost::intrusive_ptr<Message>& msg){ push(msg); mgntEnqStats(msg); - if (mgmtObject != 0){ + if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); - } + } } void Queue::requeue(const QueuedMessage& msg){ @@ -432,9 +432,9 @@ void Queue::popMsg(QueuedMessage& qmsg) { if (lastValueQueue){ const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); - string key = ft->getString(qpidVQMatchProperty); - lvq.erase(key); - } + string key = ft->getString(qpidVQMatchProperty); + lvq.erase(key); + } messages.pop_front(); } @@ -446,24 +446,24 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ if (policy.get()) policy->tryEnqueue(qm); //if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of existing message - LVQ::iterator i; - if (lastValueQueue){ - const framing::FieldTable* ft = msg->getApplicationHeaders(); - string key = ft->getString(qpidVQMatchProperty); + LVQ::iterator i; + if (lastValueQueue){ + const framing::FieldTable* ft = msg->getApplicationHeaders(); + string key = ft->getString(qpidVQMatchProperty); - i = lvq.find(key); - if (i == lvq.end()){ + i = lvq.find(key); + if (i == lvq.end()){ messages.push_back(qm); listeners.swap(copy); - lvq[key] = &messages.back(); - }else { - i->second->payload = msg; - } - }else { + lvq[key] = &messages.back(); + }else { + i->second->payload = msg; + } + }else { messages.push_back(qm); listeners.swap(copy); - } + } } for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } @@ -501,31 +501,31 @@ void Queue::clearLastNodeFailure() void Queue::setLastNodeFailure() { if (persistLastNode){ - Mutex::ScopedLock locker(messageLock); + Mutex::ScopedLock locker(messageLock); for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - i->payload->forcePersistent(); - if (i->payload->getPersistenceId() == 0){ + i->payload->forcePersistent(); + if (i->payload->getPersistenceId() == 0){ enqueue(0, i->payload); - } + } } - inLastNodeFailure = true; - } + inLastNodeFailure = true; + } } // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { if (inLastNodeFailure && persistLastNode){ - msg->forcePersistent(); - } + msg->forcePersistent(); + } - if (traceId.size()) { + if (traceId.size()) { msg->addTraceId(traceId); } if (msg->isPersistent() && store) { - msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); + msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); store->enqueue(ctxt, pmsg, *this); return true; } @@ -542,7 +542,7 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) } if (msg.payload->isPersistent() && store) { msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue - boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); + boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); store->dequeue(ctxt, pmsg, *this); return true; } @@ -557,7 +557,7 @@ void Queue::popAndDequeue() { QueuedMessage msg = messages.front(); popMsg(msg); - dequeue(0, msg); + dequeue(0, msg); } /** @@ -593,7 +593,7 @@ void Queue::configure(const FieldTable& _settings) optimisticConsume= _settings.get(qpidOptimisticConsume); if (optimisticConsume) QPID_LOG(debug, "Configured queue with optimistic consume"); - persistLastNode= _settings.get(qpidPersistLastNode); + persistLastNode= _settings.get(qpidPersistLastNode); if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); traceId = _settings.getString(qpidTraceIdentity); @@ -783,7 +783,7 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str switch (methodId) { - case _qmf::Queue::METHOD_PURGE : + case _qmf::Queue::METHOD_PURGE : _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args; purge (iargs.i_request); status = Manageable::STATUS_OK; |
