diff options
Diffstat (limited to 'cpp/src/qmf/AgentSession.cpp')
-rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 229 |
1 files changed, 89 insertions, 140 deletions
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index 4c5a72a467..251c25fd44 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -19,132 +19,7 @@ * */ -#include "qpid/RefCounted.h" -#include "qmf/PrivateImplRef.h" -#include "qmf/exceptions.h" -#include "qmf/AgentSession.h" -#include "qmf/AgentEventImpl.h" -#include "qmf/SchemaIdImpl.h" -#include "qmf/SchemaImpl.h" -#include "qmf/DataAddrImpl.h" -#include "qmf/DataImpl.h" -#include "qmf/QueryImpl.h" -#include "qmf/agentCapability.h" -#include "qmf/constants.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Condition.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/log/Statement.h" -#include "qpid/messaging/Connection.h" -#include "qpid/messaging/Session.h" -#include "qpid/messaging/Receiver.h" -#include "qpid/messaging/Sender.h" -#include "qpid/messaging/Message.h" -#include "qpid/messaging/AddressParser.h" -#include "qpid/management/Buffer.h" -#include <queue> -#include <map> -#include <set> -#include <iostream> -#include <memory> - -using namespace std; -using namespace qpid::messaging; -using namespace qmf; -using qpid::types::Variant; - -namespace qmf { - class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { - public: - ~AgentSessionImpl(); - - // - // Methods from API handle - // - AgentSessionImpl(Connection& c, const string& o); - void setDomain(const string& d) { checkOpen(); domain = d; } - void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } - void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } - void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } - void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } - const string& getName() const { return agentName; } - void open(); - void close(); - bool nextEvent(AgentEvent& e, Duration t); - - void registerSchema(Schema& s); - DataAddr addData(Data& d, const string& n, bool persist); - void delData(const DataAddr&); - - void authAccept(AgentEvent& e); - void authReject(AgentEvent& e, const string& m); - void raiseException(AgentEvent& e, const string& s); - void raiseException(AgentEvent& e, const Data& d); - void response(AgentEvent& e, const Data& d); - void complete(AgentEvent& e); - void methodSuccess(AgentEvent& e); - void raiseEvent(const Data& d); - void raiseEvent(const Data& d, int s); - - private: - typedef map<DataAddr, Data, DataAddrCompare> DataIndex; - typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; - - mutable qpid::sys::Mutex lock; - qpid::sys::Condition cond; - Connection connection; - Session session; - Sender directSender; - Sender topicSender; - string domain; - Variant::Map attributes; - Variant::Map options; - string agentName; - bool opened; - queue<AgentEvent> eventQueue; - qpid::sys::Thread* thread; - bool threadCanceled; - uint32_t bootSequence; - uint32_t interval; - uint64_t lastHeartbeat; - uint64_t lastVisit; - bool forceHeartbeat; - bool externalStorage; - bool autoAllowQueries; - bool autoAllowMethods; - uint32_t maxSubscriptions; - uint32_t minSubInterval; - uint32_t subLifetime; - bool publicEvents; - bool listenOnDirect; - bool strictSecurity; - uint64_t schemaUpdateTime; - string directBase; - string topicBase; - - SchemaMap schemata; - DataIndex globalIndex; - map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; - - void checkOpen(); - void setAgentName(); - void enqueueEvent(const AgentEvent&); - void handleLocateRequest(const Variant::List& content, const Message& msg); - void handleMethodRequest(const Variant::Map& content, const Message& msg); - void handleQueryRequest(const Variant::Map& content, const Message& msg); - void handleSchemaRequest(AgentEvent&); - void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); - void dispatch(Message); - void sendHeartbeat(); - void send(Message, const Address&); - void flushResponses(AgentEvent&, bool); - void periodicProcessing(uint64_t); - void run(); - }; -} - -typedef qmf::PrivateImplRef<AgentSession> PI; +#include "qmf/AgentSessionImpl.h" AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); } AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); } @@ -161,6 +36,7 @@ const string& AgentSession::getName() const { return impl->getName(); } void AgentSession::open() { impl->open(); } void AgentSession::close() { impl->close(); } bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); } +int AgentSession::pendingEvents() const { return impl->pendingEvents(); } void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); } DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); } void AgentSession::delData(const DataAddr& a) { impl->delData(a); } @@ -179,11 +55,11 @@ void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); } //======================================================================================== AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), + connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false), bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), - listenOnDirect(true), strictSecurity(false), + listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // @@ -244,7 +120,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); + + iter = optMap.find("max-thread-wait-time"); + if (iter != optMap.end()) + maxThreadWaitTime = iter->second.asUint32(); } + + if (maxThreadWaitTime > interval) + maxThreadWaitTime = interval; } @@ -252,6 +135,11 @@ AgentSessionImpl::~AgentSessionImpl() { if (opened) close(); + + if (thread) { + thread->join(); + delete thread; + } } @@ -260,6 +148,12 @@ void AgentSessionImpl::open() if (opened) throw QmfException("The session is already open"); + // If the thread exists, join and delete it before creating a new one. + if (thread) { + thread->join(); + delete thread; + } + const string addrArgs(";{create:never,node:{type:topic}}"); const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str()); attributes["_direct_subject"] = routableAddr; @@ -297,34 +191,47 @@ void AgentSessionImpl::open() } -void AgentSessionImpl::close() +void AgentSessionImpl::closeAsync() { if (!opened) return; - // Stop and join the receiver thread + // Stop the receiver thread. Don't join it until the destructor is called or open() is called. threadCanceled = true; - thread->join(); - delete thread; - - // Close the AMQP session - session.close(); opened = false; } +void AgentSessionImpl::close() +{ + closeAsync(); + + if (thread) { + thread->join(); + delete thread; + thread = 0; + } +} + + bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) { uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty()) - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), - qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (eventQueue.empty() && milliseconds > 0) { + int64_t nsecs(qpid::sys::TIME_INFINITE); + if ((uint64_t)(nsecs / 1000000) > milliseconds) + nsecs = (int64_t) milliseconds * 1000000; + qpid::sys::Duration then(nsecs); + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); + } if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); + if (eventQueue.empty()) + alertEventNotifierLH(false); return true; } @@ -332,6 +239,26 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) } +int AgentSessionImpl::pendingEvents() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventQueue.size(); +} + + +void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier) +{ + qpid::sys::Mutex::ScopedLock l(lock); + eventNotifier = notifier; +} + +EventNotifierImpl* AgentSessionImpl::getEventNotifier() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventNotifier; +} + + void AgentSessionImpl::registerSchema(Schema& schema) { if (!schema.isFinalized()) @@ -587,8 +514,10 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event) qpid::sys::Mutex::ScopedLock l(lock); bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) + if (notify) { cond.notify(); + alertEventNotifierLH(true); + } } @@ -1032,6 +961,13 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds) } +void AgentSessionImpl::alertEventNotifierLH(bool readable) +{ + if (eventNotifier) + eventNotifier->setReadable(readable); +} + + void AgentSessionImpl::run() { QPID_LOG(debug, "AgentSession thread started for agent " << agentName); @@ -1041,7 +977,7 @@ void AgentSessionImpl::run() periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND); + bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); if (threadCanceled) break; if (valid) { @@ -1058,6 +994,19 @@ void AgentSessionImpl::run() enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED))); } + session.close(); QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); } + +AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session) +{ + return *session.impl; +} + + +const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session) +{ + return *session.impl; +} + |