diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qmf/ConsoleSession.cpp | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 125 |
1 files changed, 107 insertions, 18 deletions
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index e12c1152f6..2dfc894c58 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -54,6 +54,7 @@ void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); void ConsoleSession::open() { impl->open(); } void ConsoleSession::close() { impl->close(); } bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); } +int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); } uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } @@ -65,9 +66,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), - opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), - connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) + connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), + opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), + connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -91,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); + + iter = optMap.find("max-thread-wait-time"); + if (iter != optMap.end()) + maxThreadWaitTime = iter->second.asUint32(); } + + if (maxThreadWaitTime > 60) + maxThreadWaitTime = 60; } @@ -99,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl() { if (opened) close(); + + if (thread) { + thread->join(); + delete thread; + } } @@ -153,6 +166,12 @@ void ConsoleSessionImpl::open() if (opened) throw QmfException("The session is already open"); + // If the thread exists, join and delete it before creating a new one. + if (thread) { + thread->join(); + delete thread; + } + // Establish messaging addresses directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; @@ -181,45 +200,57 @@ void ConsoleSessionImpl::open() // Start the receiver thread threadCanceled = false; + opened = true; thread = new qpid::sys::Thread(*this); // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. sendBrokerLocate(); if (agentQuery) sendAgentLocate(); - - opened = true; } -void ConsoleSessionImpl::close() +void ConsoleSessionImpl::closeAsync() { if (!opened) throw QmfException("The session is already closed"); - // Stop and join the receiver thread + // Stop the receiver thread. Don't join it until the destructor is called or open() is called. threadCanceled = true; - thread->join(); - delete thread; - - // Close the AMQP session - session.close(); opened = false; } +void ConsoleSessionImpl::close() +{ + closeAsync(); + + if (thread) { + thread->join(); + delete thread; + thread = 0; + } +} + + bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) { uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty()) - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), - qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (eventQueue.empty() && milliseconds > 0) { + int64_t nsecs(qpid::sys::TIME_INFINITE); + if ((uint64_t)(nsecs / 1000000) > milliseconds) + nsecs = (int64_t) milliseconds * 1000000; + qpid::sys::Duration then(nsecs); + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); + } if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); + if (eventQueue.empty()) + alertEventNotifierLH(false); return true; } @@ -227,6 +258,27 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) } +int ConsoleSessionImpl::pendingEvents() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventQueue.size(); +} + + +void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier) +{ + qpid::sys::Mutex::ScopedLock l(lock); + eventNotifier = notifier; +} + + +EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventNotifier; +} + + uint32_t ConsoleSessionImpl::getAgentCount() const { qpid::sys::Mutex::ScopedLock l(lock); @@ -268,8 +320,10 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event) { bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) + if (notify) { cond.notify(); + alertEventNotifierLH(true); + } } @@ -421,7 +475,23 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian iter = content.find("_values"); if (iter == content.end()) return; - Variant::Map attrs(iter->second.asMap()); + const Variant::Map& in_attrs(iter->second.asMap()); + Variant::Map attrs; + + // + // Copy the map from the message to "attrs". Translate any old-style + // keys to their new key values in the process. + // + for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) { + if (iter->first == "epoch") + attrs[protocol::AGENT_ATTR_EPOCH] = iter->second; + else if (iter->first == "timestamp") + attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second; + else if (iter->first == "heartbeat_interval") + attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second; + else + attrs[iter->first] = iter->second; + } iter = attrs.find(protocol::AGENT_ATTR_EPOCH); if (iter != attrs.end()) @@ -562,6 +632,13 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) } +void ConsoleSessionImpl::alertEventNotifierLH(bool readable) +{ + if (eventNotifier) + eventNotifier->setReadable(readable); +} + + void ConsoleSessionImpl::run() { QPID_LOG(debug, "ConsoleSession thread started"); @@ -572,7 +649,7 @@ void ConsoleSessionImpl::run() qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND); + bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); if (threadCanceled) break; if (valid) { @@ -589,6 +666,18 @@ void ConsoleSessionImpl::run() enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED))); } + session.close(); QPID_LOG(debug, "ConsoleSession thread exiting"); } + +ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session) +{ + return *session.impl; +} + + +const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session) +{ + return *session.impl; +} |