summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/AgentSession.cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
commitf83677056891e436bf5ba99e79240df2a44528cd (patch)
tree625bfd644b948e89105630759cf6decb0435354d /cpp/src/qmf/AgentSession.cpp
parentebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff)
downloadqpid-python-QPID-2519.tar.gz
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/AgentSession.cpp')
-rw-r--r--cpp/src/qmf/AgentSession.cpp229
1 files changed, 89 insertions, 140 deletions
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index 4c5a72a467..251c25fd44 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -19,132 +19,7 @@
*
*/
-#include "qpid/RefCounted.h"
-#include "qmf/PrivateImplRef.h"
-#include "qmf/exceptions.h"
-#include "qmf/AgentSession.h"
-#include "qmf/AgentEventImpl.h"
-#include "qmf/SchemaIdImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/DataAddrImpl.h"
-#include "qmf/DataImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/agentCapability.h"
-#include "qmf/constants.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Condition.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/log/Statement.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/messaging/Session.h"
-#include "qpid/messaging/Receiver.h"
-#include "qpid/messaging/Sender.h"
-#include "qpid/messaging/Message.h"
-#include "qpid/messaging/AddressParser.h"
-#include "qpid/management/Buffer.h"
-#include <queue>
-#include <map>
-#include <set>
-#include <iostream>
-#include <memory>
-
-using namespace std;
-using namespace qpid::messaging;
-using namespace qmf;
-using qpid::types::Variant;
-
-namespace qmf {
- class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
- public:
- ~AgentSessionImpl();
-
- //
- // Methods from API handle
- //
- AgentSessionImpl(Connection& c, const string& o);
- void setDomain(const string& d) { checkOpen(); domain = d; }
- void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; }
- void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; }
- void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; }
- void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
- const string& getName() const { return agentName; }
- void open();
- void close();
- bool nextEvent(AgentEvent& e, Duration t);
-
- void registerSchema(Schema& s);
- DataAddr addData(Data& d, const string& n, bool persist);
- void delData(const DataAddr&);
-
- void authAccept(AgentEvent& e);
- void authReject(AgentEvent& e, const string& m);
- void raiseException(AgentEvent& e, const string& s);
- void raiseException(AgentEvent& e, const Data& d);
- void response(AgentEvent& e, const Data& d);
- void complete(AgentEvent& e);
- void methodSuccess(AgentEvent& e);
- void raiseEvent(const Data& d);
- void raiseEvent(const Data& d, int s);
-
- private:
- typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
- typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
-
- mutable qpid::sys::Mutex lock;
- qpid::sys::Condition cond;
- Connection connection;
- Session session;
- Sender directSender;
- Sender topicSender;
- string domain;
- Variant::Map attributes;
- Variant::Map options;
- string agentName;
- bool opened;
- queue<AgentEvent> eventQueue;
- qpid::sys::Thread* thread;
- bool threadCanceled;
- uint32_t bootSequence;
- uint32_t interval;
- uint64_t lastHeartbeat;
- uint64_t lastVisit;
- bool forceHeartbeat;
- bool externalStorage;
- bool autoAllowQueries;
- bool autoAllowMethods;
- uint32_t maxSubscriptions;
- uint32_t minSubInterval;
- uint32_t subLifetime;
- bool publicEvents;
- bool listenOnDirect;
- bool strictSecurity;
- uint64_t schemaUpdateTime;
- string directBase;
- string topicBase;
-
- SchemaMap schemata;
- DataIndex globalIndex;
- map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
-
- void checkOpen();
- void setAgentName();
- void enqueueEvent(const AgentEvent&);
- void handleLocateRequest(const Variant::List& content, const Message& msg);
- void handleMethodRequest(const Variant::Map& content, const Message& msg);
- void handleQueryRequest(const Variant::Map& content, const Message& msg);
- void handleSchemaRequest(AgentEvent&);
- void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
- void dispatch(Message);
- void sendHeartbeat();
- void send(Message, const Address&);
- void flushResponses(AgentEvent&, bool);
- void periodicProcessing(uint64_t);
- void run();
- };
-}
-
-typedef qmf::PrivateImplRef<AgentSession> PI;
+#include "qmf/AgentSessionImpl.h"
AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
@@ -161,6 +36,7 @@ const string& AgentSession::getName() const { return impl->getName(); }
void AgentSession::open() { impl->open(); }
void AgentSession::close() { impl->close(); }
bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); }
+int AgentSession::pendingEvents() const { return impl->pendingEvents(); }
void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); }
DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); }
void AgentSession::delData(const DataAddr& a) { impl->delData(a); }
@@ -179,11 +55,11 @@ void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); }
//========================================================================================
AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
+ connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false),
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
- listenOnDirect(true), strictSecurity(false),
+ listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -244,7 +120,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
}
+
+ if (maxThreadWaitTime > interval)
+ maxThreadWaitTime = interval;
}
@@ -252,6 +135,11 @@ AgentSessionImpl::~AgentSessionImpl()
{
if (opened)
close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
}
@@ -260,6 +148,12 @@ void AgentSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
const string addrArgs(";{create:never,node:{type:topic}}");
const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
attributes["_direct_subject"] = routableAddr;
@@ -297,34 +191,47 @@ void AgentSessionImpl::open()
}
-void AgentSessionImpl::close()
+void AgentSessionImpl::closeAsync()
{
if (!opened)
return;
- // Stop and join the receiver thread
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
threadCanceled = true;
- thread->join();
- delete thread;
-
- // Close the AMQP session
- session.close();
opened = false;
}
+void AgentSessionImpl::close()
+{
+ closeAsync();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ thread = 0;
+ }
+}
+
+
bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
{
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty())
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
- qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
return true;
}
@@ -332,6 +239,26 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
}
+int AgentSessionImpl::pendingEvents() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventQueue.size();
+}
+
+
+void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ eventNotifier = notifier;
+}
+
+EventNotifierImpl* AgentSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventNotifier;
+}
+
+
void AgentSessionImpl::registerSchema(Schema& schema)
{
if (!schema.isFinalized())
@@ -587,8 +514,10 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event)
qpid::sys::Mutex::ScopedLock l(lock);
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify)
+ if (notify) {
cond.notify();
+ alertEventNotifierLH(true);
+ }
}
@@ -1032,6 +961,13 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
}
+void AgentSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
void AgentSessionImpl::run()
{
QPID_LOG(debug, "AgentSession thread started for agent " << agentName);
@@ -1041,7 +977,7 @@ void AgentSessionImpl::run()
periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND);
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
if (threadCanceled)
break;
if (valid) {
@@ -1058,6 +994,19 @@ void AgentSessionImpl::run()
enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
}
+ session.close();
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}
+
+AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session)
+{
+ return *session.impl;
+}
+
+
+const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session)
+{
+ return *session.impl;
+}
+