summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/ConsoleSession.cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
commitf83677056891e436bf5ba99e79240df2a44528cd (patch)
tree625bfd644b948e89105630759cf6decb0435354d /cpp/src/qmf/ConsoleSession.cpp
parentebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff)
downloadqpid-python-QPID-2519.tar.gz
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r--cpp/src/qmf/ConsoleSession.cpp125
1 files changed, 107 insertions, 18 deletions
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp
index e12c1152f6..2dfc894c58 100644
--- a/cpp/src/qmf/ConsoleSession.cpp
+++ b/cpp/src/qmf/ConsoleSession.cpp
@@ -54,6 +54,7 @@ void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f);
void ConsoleSession::open() { impl->open(); }
void ConsoleSession::close() { impl->close(); }
bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); }
+int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); }
uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
@@ -65,9 +66,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), maxAgentAgeMinutes(5),
- opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
- connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
+ connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+ opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -91,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
}
+
+ if (maxThreadWaitTime > 60)
+ maxThreadWaitTime = 60;
}
@@ -99,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl()
{
if (opened)
close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
}
@@ -153,6 +166,12 @@ void ConsoleSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
// Establish messaging addresses
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
@@ -181,45 +200,57 @@ void ConsoleSessionImpl::open()
// Start the receiver thread
threadCanceled = false;
+ opened = true;
thread = new qpid::sys::Thread(*this);
// Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
sendBrokerLocate();
if (agentQuery)
sendAgentLocate();
-
- opened = true;
}
-void ConsoleSessionImpl::close()
+void ConsoleSessionImpl::closeAsync()
{
if (!opened)
throw QmfException("The session is already closed");
- // Stop and join the receiver thread
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
threadCanceled = true;
- thread->join();
- delete thread;
-
- // Close the AMQP session
- session.close();
opened = false;
}
+void ConsoleSessionImpl::close()
+{
+ closeAsync();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ thread = 0;
+ }
+}
+
+
bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
{
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty())
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
- qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
return true;
}
@@ -227,6 +258,27 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
}
+int ConsoleSessionImpl::pendingEvents() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventQueue.size();
+}
+
+
+void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ eventNotifier = notifier;
+}
+
+
+EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventNotifier;
+}
+
+
uint32_t ConsoleSessionImpl::getAgentCount() const
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -268,8 +320,10 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event)
{
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify)
+ if (notify) {
cond.notify();
+ alertEventNotifierLH(true);
+ }
}
@@ -421,7 +475,23 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
iter = content.find("_values");
if (iter == content.end())
return;
- Variant::Map attrs(iter->second.asMap());
+ const Variant::Map& in_attrs(iter->second.asMap());
+ Variant::Map attrs;
+
+ //
+ // Copy the map from the message to "attrs". Translate any old-style
+ // keys to their new key values in the process.
+ //
+ for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
+ if (iter->first == "epoch")
+ attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
+ else if (iter->first == "timestamp")
+ attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
+ else if (iter->first == "heartbeat_interval")
+ attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
+ else
+ attrs[iter->first] = iter->second;
+ }
iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
if (iter != attrs.end())
@@ -562,6 +632,13 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
}
+void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
void ConsoleSessionImpl::run()
{
QPID_LOG(debug, "ConsoleSession thread started");
@@ -572,7 +649,7 @@ void ConsoleSessionImpl::run()
qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND);
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
if (threadCanceled)
break;
if (valid) {
@@ -589,6 +666,18 @@ void ConsoleSessionImpl::run()
enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
}
+ session.close();
QPID_LOG(debug, "ConsoleSession thread exiting");
}
+
+ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
+{
+ return *session.impl;
+}
+
+
+const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
+{
+ return *session.impl;
+}