diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/cpp/src/qmf/AgentSession.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/AgentSession.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/AgentSession.cpp | 1072 |
1 files changed, 1072 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp new file mode 100644 index 0000000000..71d369325f --- /dev/null +++ b/qpid/cpp/src/qmf/AgentSession.cpp @@ -0,0 +1,1072 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/AgentSession.h" +#include "qmf/AgentEventImpl.h" +#include "qmf/SchemaIdImpl.h" +#include "qmf/SchemaImpl.h" +#include "qmf/DataAddrImpl.h" +#include "qmf/DataImpl.h" +#include "qmf/QueryImpl.h" +#include "qmf/agentCapability.h" +#include "qmf/constants.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/management/Buffer.h" +#include <queue> +#include <map> +#include <set> +#include <iostream> +#include <memory> + +using namespace std; +using namespace qpid::messaging; +using namespace qmf; +using qpid::types::Variant; + +namespace qmf { + class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { + public: + ~AgentSessionImpl(); + + // + // Methods from API handle + // + AgentSessionImpl(Connection& c, const string& o); + void setDomain(const string& d) { checkOpen(); domain = d; } + void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } + void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } + void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } + void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } + const string& getName() const { return agentName; } + void open(); + void close(); + bool nextEvent(AgentEvent& e, Duration t); + int pendingEvents() const; + + void registerSchema(Schema& s); + DataAddr addData(Data& d, const string& n, bool persist); + void delData(const DataAddr&); + + void authAccept(AgentEvent& e); + void authReject(AgentEvent& e, const string& m); + void raiseException(AgentEvent& e, const string& s); + void raiseException(AgentEvent& e, const Data& d); + void response(AgentEvent& e, const Data& d); + void complete(AgentEvent& e); + void methodSuccess(AgentEvent& e); + void raiseEvent(const Data& d); + void raiseEvent(const Data& d, int s); + + private: + typedef map<DataAddr, Data, DataAddrCompare> DataIndex; + typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; + + mutable qpid::sys::Mutex lock; + qpid::sys::Condition cond; + Connection connection; + Session session; + Sender directSender; + Sender topicSender; + string domain; + Variant::Map attributes; + Variant::Map options; + string agentName; + bool opened; + queue<AgentEvent> eventQueue; + qpid::sys::Thread* thread; + bool threadCanceled; + uint32_t bootSequence; + uint32_t interval; + uint64_t lastHeartbeat; + uint64_t lastVisit; + bool forceHeartbeat; + bool externalStorage; + bool autoAllowQueries; + bool autoAllowMethods; + uint32_t maxSubscriptions; + uint32_t minSubInterval; + uint32_t subLifetime; + bool publicEvents; + bool listenOnDirect; + bool strictSecurity; + uint64_t schemaUpdateTime; + string directBase; + string topicBase; + + SchemaMap schemata; + DataIndex globalIndex; + map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; + + void checkOpen(); + void setAgentName(); + void enqueueEvent(const AgentEvent&); + void handleLocateRequest(const Variant::List& content, const Message& msg); + void handleMethodRequest(const Variant::Map& content, const Message& msg); + void handleQueryRequest(const Variant::Map& content, const Message& msg); + void handleSchemaRequest(AgentEvent&); + void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); + void dispatch(Message); + void sendHeartbeat(); + void send(Message, const Address&); + void flushResponses(AgentEvent&, bool); + void periodicProcessing(uint64_t); + void run(); + }; +} + +typedef qmf::PrivateImplRef<AgentSession> PI; + +AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); } +AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); } +AgentSession::~AgentSession() { PI::dtor(*this); } +AgentSession& AgentSession::operator=(const AgentSession& s) { return PI::assign(*this, s); } + +AgentSession::AgentSession(Connection& c, const string& o) { PI::ctor(*this, new AgentSessionImpl(c, o)); } +void AgentSession::setDomain(const string& d) { impl->setDomain(d); } +void AgentSession::setVendor(const string& v) { impl->setVendor(v); } +void AgentSession::setProduct(const string& p) { impl->setProduct(p); } +void AgentSession::setInstance(const string& i) { impl->setInstance(i); } +void AgentSession::setAttribute(const string& k, const qpid::types::Variant& v) { impl->setAttribute(k, v); } +const string& AgentSession::getName() const { return impl->getName(); } +void AgentSession::open() { impl->open(); } +void AgentSession::close() { impl->close(); } +bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); } +int AgentSession::pendingEvents() const { return impl->pendingEvents(); } +void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); } +DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); } +void AgentSession::delData(const DataAddr& a) { impl->delData(a); } +void AgentSession::authAccept(AgentEvent& e) { impl->authAccept(e); } +void AgentSession::authReject(AgentEvent& e, const string& m) { impl->authReject(e, m); } +void AgentSession::raiseException(AgentEvent& e, const string& s) { impl->raiseException(e, s); } +void AgentSession::raiseException(AgentEvent& e, const Data& d) { impl->raiseException(e, d); } +void AgentSession::response(AgentEvent& e, const Data& d) { impl->response(e, d); } +void AgentSession::complete(AgentEvent& e) { impl->complete(e); } +void AgentSession::methodSuccess(AgentEvent& e) { impl->methodSuccess(e); } +void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); } +void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); } + +//======================================================================================== +// Impl Method Bodies +//======================================================================================== + +AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : + connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), + bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), + externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), + maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), + listenOnDirect(true), strictSecurity(false), + schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) +{ + // + // Set Agent Capability Level + // + attributes["qmf.agent_capability"] = AGENT_CAPABILITY_0_8; + + if (!options.empty()) { + qpid::messaging::AddressParser parser(options); + Variant::Map optMap; + Variant::Map::const_iterator iter; + + parser.parseMap(optMap); + + iter = optMap.find("domain"); + if (iter != optMap.end()) + domain = iter->second.asString(); + + iter = optMap.find("interval"); + if (iter != optMap.end()) { + interval = iter->second.asUint32(); + if (interval < 1) + interval = 1; + } + + iter = optMap.find("external"); + if (iter != optMap.end()) + externalStorage = iter->second.asBool(); + + iter = optMap.find("allow-queries"); + if (iter != optMap.end()) + autoAllowQueries = iter->second.asBool(); + + iter = optMap.find("allow-methods"); + if (iter != optMap.end()) + autoAllowMethods = iter->second.asBool(); + + iter = optMap.find("max-subscriptions"); + if (iter != optMap.end()) + maxSubscriptions = iter->second.asUint32(); + + iter = optMap.find("min-sub-interval"); + if (iter != optMap.end()) + minSubInterval = iter->second.asUint32(); + + iter = optMap.find("sub-lifetime"); + if (iter != optMap.end()) + subLifetime = iter->second.asUint32(); + + iter = optMap.find("public-events"); + if (iter != optMap.end()) + publicEvents = iter->second.asBool(); + + iter = optMap.find("listen-on-direct"); + if (iter != optMap.end()) + listenOnDirect = iter->second.asBool(); + + iter = optMap.find("strict-security"); + if (iter != optMap.end()) + strictSecurity = iter->second.asBool(); + } +} + + +AgentSessionImpl::~AgentSessionImpl() +{ + if (opened) + close(); +} + + +void AgentSessionImpl::open() +{ + if (opened) + throw QmfException("The session is already open"); + + const string addrArgs(";{create:never,node:{type:topic}}"); + const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str()); + attributes["_direct_subject"] = routableAddr; + + // Establish messaging addresses + setAgentName(); + directBase = "qmf." + domain + ".direct"; + topicBase = "qmf." + domain + ".topic"; + + // Create AMQP session, receivers, and senders + session = connection.createSession(); + Receiver directRx; + Receiver routableDirectRx = session.createReceiver(topicBase + "/" + routableAddr + addrArgs); + Receiver topicRx = session.createReceiver(topicBase + "/console.#" + addrArgs); + + if (listenOnDirect && !strictSecurity) { + directRx = session.createReceiver(directBase + "/" + agentName + addrArgs); + directRx.setCapacity(64); + } + + routableDirectRx.setCapacity(64); + topicRx.setCapacity(64); + + if (!strictSecurity) + directSender = session.createSender(directBase + addrArgs); + topicSender = session.createSender(topicBase + addrArgs); + + // Start the receiver thread + threadCanceled = false; + opened = true; + thread = new qpid::sys::Thread(*this); + + // Send an initial agent heartbeat message + sendHeartbeat(); +} + + +void AgentSessionImpl::close() +{ + if (!opened) + return; + + // Stop and join the receiver thread + threadCanceled = true; + thread->join(); + delete thread; + + // Close the AMQP session + session.close(); + opened = false; +} + + +bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) +{ + uint64_t milliseconds = timeout.getMilliseconds(); + qpid::sys::Mutex::ScopedLock l(lock); + + if (eventQueue.empty() && milliseconds > 0) + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + + if (!eventQueue.empty()) { + event = eventQueue.front(); + eventQueue.pop(); + return true; + } + + return false; +} + + +int AgentSessionImpl::pendingEvents() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventQueue.size(); +} + + +void AgentSessionImpl::registerSchema(Schema& schema) +{ + if (!schema.isFinalized()) + schema.finalize(); + const SchemaId& schemaId(schema.getSchemaId()); + + qpid::sys::Mutex::ScopedLock l(lock); + schemata[schemaId] = schema; + schemaIndex[schemaId] = DataIndex(); + + // + // Get the news out at the next periodic interval that there is new schema information. + // + schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + forceHeartbeat = true; +} + + +DataAddr AgentSessionImpl::addData(Data& data, const string& name, bool persistent) +{ + if (externalStorage) + throw QmfException("addData() must not be called when the 'external' option is enabled."); + + string dataName; + if (name.empty()) + dataName = qpid::types::Uuid(true).str(); + else + dataName = name; + + DataAddr addr(dataName, agentName, persistent ? 0 : bootSequence); + data.setAddr(addr); + + { + qpid::sys::Mutex::ScopedLock l(lock); + DataIndex::const_iterator iter = globalIndex.find(addr); + if (iter != globalIndex.end()) + throw QmfException("Duplicate Data Address"); + + globalIndex[addr] = data; + if (data.hasSchema()) + schemaIndex[data.getSchemaId()][addr] = data; + } + + // + // TODO: Survey active subscriptions to see if they need to hear about this new data. + // + + return addr; +} + + +void AgentSessionImpl::delData(const DataAddr& addr) +{ + { + qpid::sys::Mutex::ScopedLock l(lock); + DataIndex::iterator iter = globalIndex.find(addr); + if (iter == globalIndex.end()) + return; + if (iter->second.hasSchema()) { + const SchemaId& schemaId(iter->second.getSchemaId()); + schemaIndex[schemaId].erase(addr); + } + globalIndex.erase(iter); + } + + // + // TODO: Survey active subscriptions to see if they need to hear about this deleted data. + // +} + + +void AgentSessionImpl::authAccept(AgentEvent& authEvent) +{ + auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_QUERY)); + eventImpl->setQuery(authEvent.getQuery()); + eventImpl->setUserId(authEvent.getUserId()); + eventImpl->setReplyTo(AgentEventImplAccess::get(authEvent).getReplyTo()); + eventImpl->setCorrelationId(AgentEventImplAccess::get(authEvent).getCorrelationId()); + AgentEvent event(eventImpl.release()); + + if (externalStorage) { + enqueueEvent(event); + return; + } + + const Query& query(authEvent.getQuery()); + if (query.getDataAddr().isValid()) { + { + qpid::sys::Mutex::ScopedLock l(lock); + DataIndex::const_iterator iter = globalIndex.find(query.getDataAddr()); + if (iter != globalIndex.end()) + response(event, iter->second); + } + complete(event); + return; + } + + if (query.getSchemaId().isValid()) { + { + qpid::sys::Mutex::ScopedLock l(lock); + map<SchemaId, DataIndex>::const_iterator iter = schemaIndex.find(query.getSchemaId()); + if (iter != schemaIndex.end()) + for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++) + if (query.matchesPredicate(dIter->second.getProperties())) + response(event, dIter->second); + } + complete(event); + return; + } + + raiseException(event, "Query is Invalid"); +} + + +void AgentSessionImpl::authReject(AgentEvent& event, const string& error) +{ + raiseException(event, "Action Forbidden - " + error); +} + + +void AgentSessionImpl::raiseException(AgentEvent& event, const string& error) +{ + Data exception(new DataImpl()); + exception.setProperty("error_text", error); + raiseException(event, exception); +} + + +void AgentSessionImpl::raiseException(AgentEvent& event, const Data& data) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_EXCEPTION; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + const DataImpl& dataImpl(DataImplAccess::get(data)); + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(dataImpl.asMap(), msg); + send(msg, eventImpl.getReplyTo()); + + QPID_LOG(trace, "SENT Exception to=" << eventImpl.getReplyTo()); +} + + +void AgentSessionImpl::response(AgentEvent& event, const Data& data) +{ + AgentEventImpl& impl(AgentEventImplAccess::get(event)); + uint32_t count = impl.enqueueData(data); + if (count >= 8) + flushResponses(event, false); +} + + +void AgentSessionImpl::complete(AgentEvent& event) +{ + flushResponses(event, true); +} + + +void AgentSessionImpl::methodSuccess(AgentEvent& event) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + + const Variant::Map& outArgs(eventImpl.getReturnArguments()); + const Variant::Map& outSubtypes(eventImpl.getReturnArgumentSubtypes()); + + map["_arguments"] = outArgs; + if (!outSubtypes.empty()) + map["_subtypes"] = outSubtypes; + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(map, msg); + send(msg, eventImpl.getReplyTo()); + + QPID_LOG(trace, "SENT MethodResponse to=" << eventImpl.getReplyTo()); +} + + +void AgentSessionImpl::raiseEvent(const Data& data) +{ + int severity(SEV_NOTICE); + if (data.hasSchema()) { + const Schema& schema(DataImplAccess::get(data).getSchema()); + if (schema.isValid()) + severity = schema.getDefaultSeverity(); + } + + raiseEvent(data, severity); +} + + +void AgentSessionImpl::raiseEvent(const Data& data, int severity) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + string subject("agent.ind.event"); + + if (data.hasSchema()) { + const SchemaId& schemaId(data.getSchemaId()); + if (schemaId.getType() != SCHEMA_TYPE_EVENT) + throw QmfException("Cannot call raiseEvent on data that is not an Event"); + subject = subject + "." + schemaId.getPackageName() + "." + schemaId.getName(); + } + + if (severity < SEV_EMERG || severity > SEV_DEBUG) + throw QmfException("Invalid severity value"); + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_DATA_INDICATION; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_EVENT; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + msg.setSubject(subject); + + Variant::List list; + Variant::Map dataAsMap(DataImplAccess::get(data).asMap()); + dataAsMap["_severity"] = severity; + dataAsMap["_timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + list.push_back(dataAsMap); + encode(list, msg); + topicSender.send(msg); + + QPID_LOG(trace, "SENT EventIndication to=" << topicSender.getName() << "/" << subject); +} + + +void AgentSessionImpl::checkOpen() +{ + if (opened) + throw QmfException("Operation must be performed before calling open()"); +} + + +void AgentSessionImpl::enqueueEvent(const AgentEvent& event) +{ + qpid::sys::Mutex::ScopedLock l(lock); + bool notify = eventQueue.empty(); + eventQueue.push(event); + if (notify) + cond.notify(); +} + + +void AgentSessionImpl::setAgentName() +{ + Variant::Map::iterator iter; + string vendor; + string product; + string instance; + + iter = attributes.find("_vendor"); + if (iter == attributes.end()) + attributes["_vendor"] = vendor; + else + vendor = iter->second.asString(); + + iter = attributes.find("_product"); + if (iter == attributes.end()) + attributes["_product"] = product; + else + product = iter->second.asString(); + + iter = attributes.find("_instance"); + if (iter == attributes.end()) { + instance = qpid::types::Uuid(true).str(); + attributes["_instance"] = instance; + } else + instance = iter->second.asString(); + + agentName = vendor + ":" + product + ":" + instance; + attributes["_name"] = agentName; +} + + +void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg) +{ + QPID_LOG(trace, "RCVD AgentLocateRequest from=" << msg.getReplyTo()); + + if (!predicate.empty()) { + Query agentQuery(QUERY_OBJECT); + agentQuery.setPredicate(predicate); + if (!agentQuery.matchesPredicate(attributes)) { + QPID_LOG(trace, "AgentLocate predicate does not match this agent, ignoring"); + return; + } + } + + Message reply; + Variant::Map map; + Variant::Map& headers(reply.getProperties()); + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + + map["_values"] = attributes; + map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; + map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; + map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; + + encode(map, reply); + send(reply, msg.getReplyTo()); + QPID_LOG(trace, "SENT AgentLocateResponse to=" << msg.getReplyTo()); +} + + +void AgentSessionImpl::handleMethodRequest(const Variant::Map& content, const Message& msg) +{ + QPID_LOG(trace, "RCVD MethodRequest map=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId()); + + // + // Construct an AgentEvent to be sent to the application. + // + auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_METHOD)); + eventImpl->setUserId(msg.getUserId()); + eventImpl->setReplyTo(msg.getReplyTo()); + eventImpl->setCorrelationId(msg.getCorrelationId()); + + Variant::Map::const_iterator iter; + + iter = content.find("_method_name"); + if (iter == content.end()) { + AgentEvent event(eventImpl.release()); + raiseException(event, "Malformed MethodRequest: missing _method_name field"); + return; + } + eventImpl->setMethodName(iter->second.asString()); + + iter = content.find("_arguments"); + if (iter != content.end()) + eventImpl->setArguments(iter->second.asMap()); + + iter = content.find("_subtypes"); + if (iter != content.end()) + eventImpl->setArgumentSubtypes(iter->second.asMap()); + + iter = content.find("_object_id"); + if (iter != content.end()) { + DataAddr addr(new DataAddrImpl(iter->second.asMap())); + eventImpl->setDataAddr(addr); + DataIndex::const_iterator iter(globalIndex.find(addr)); + if (iter == globalIndex.end()) { + AgentEvent event(eventImpl.release()); + raiseException(event, "No data object found with the specified address"); + return; + } + + const Schema& schema(DataImplAccess::get(iter->second).getSchema()); + if (schema.isValid()) { + eventImpl->setSchema(schema); + for (Variant::Map::const_iterator aIter = eventImpl->getArguments().begin(); + aIter != eventImpl->getArguments().end(); aIter++) { + const Schema& schema(DataImplAccess::get(iter->second).getSchema()); + if (!SchemaImplAccess::get(schema).isValidMethodInArg(eventImpl->getMethodName(), aIter->first, aIter->second)) { + AgentEvent event(eventImpl.release()); + raiseException(event, "Invalid argument: " + aIter->first); + return; + } + } + } + } + + enqueueEvent(AgentEvent(eventImpl.release())); +} + + +void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Message& msg) +{ + QPID_LOG(trace, "RCVD QueryRequest query=" << content << " from=" << msg.getReplyTo() << " cid=" << msg.getCorrelationId()); + + // + // Construct an AgentEvent to be sent to the application or directly handled by the agent. + // + auto_ptr<QueryImpl> queryImpl(new QueryImpl(content)); + auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY)); + eventImpl->setUserId(msg.getUserId()); + eventImpl->setReplyTo(msg.getReplyTo()); + eventImpl->setCorrelationId(msg.getCorrelationId()); + eventImpl->setQuery(queryImpl.release()); + AgentEvent ae(eventImpl.release()); + + if (ae.getQuery().getTarget() == QUERY_SCHEMA_ID || ae.getQuery().getTarget() == QUERY_SCHEMA) { + handleSchemaRequest(ae); + return; + } + + if (autoAllowQueries) + authAccept(ae); + else + enqueueEvent(ae); +} + + +void AgentSessionImpl::handleSchemaRequest(AgentEvent& event) +{ + SchemaMap::const_iterator iter; + string error; + const Query& query(event.getQuery()); + + Message msg; + Variant::List content; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + + { + qpid::sys::Mutex::ScopedLock l(lock); + if (query.getTarget() == QUERY_SCHEMA_ID) { + headers[protocol::HEADER_KEY_CONTENT] = "_schema_id"; + for (iter = schemata.begin(); iter != schemata.end(); iter++) + content.push_back(SchemaIdImplAccess::get(iter->first).asMap()); + } else if (query.getSchemaId().isValid()) { + headers[protocol::HEADER_KEY_CONTENT] = "_schema"; + iter = schemata.find(query.getSchemaId()); + if (iter != schemata.end()) + content.push_back(SchemaImplAccess::get(iter->second).asMap()); + } else { + error = "Invalid Schema Query: Requests for SCHEMA must supply a valid schema ID."; + } + } + + if (!error.empty()) { + raiseException(event, error); + return; + } + + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(content, msg); + send(msg, eventImpl.getReplyTo()); + + QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo()); +} + + +void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, uint32_t seq, const Message& msg) +{ + string packageName; + string className; + uint8_t hashBits[16]; + + buffer.getShortString(packageName); + buffer.getShortString(className); + buffer.getBin128(hashBits); + + QPID_LOG(trace, "RCVD QMFv1 SchemaRequest for " << packageName << ":" << className); + + qpid::types::Uuid hash(hashBits); + map<SchemaId, Schema>::const_iterator iter; + string replyContent; + + SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className); + dataId.setHash(hash); + + { + qpid::sys::Mutex::ScopedLock l(lock); + iter = schemata.find(dataId); + if (iter != schemata.end()) + replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); + else { + SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className); + eventId.setHash(hash); + iter = schemata.find(dataId); + if (iter != schemata.end()) + replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); + else + return; + } + } + + Message reply; + Variant::Map& headers(reply.getProperties()); + + headers[protocol::HEADER_KEY_AGENT] = agentName; + reply.setContent(replyContent); + + send(reply, msg.getReplyTo()); + QPID_LOG(trace, "SENT QMFv1 SchemaResponse to=" << msg.getReplyTo()); +} + + +void AgentSessionImpl::dispatch(Message msg) +{ + const Variant::Map& properties(msg.getProperties()); + Variant::Map::const_iterator iter; + + // + // If strict-security is enabled, make sure that reply-to address complies with the + // strict-security addressing pattern (i.e. start with 'qmf.<domain>.topic/direct-console.'). + // + if (strictSecurity && msg.getReplyTo()) { + if (msg.getReplyTo().getName() != topicBase || msg.getReplyTo().getSubject().find("direct-console.") != 0) { + QPID_LOG(warning, "Reply-to violates strict-security policy: " << msg.getReplyTo().str()); + return; + } + } + + iter = properties.find(protocol::HEADER_KEY_APP_ID); + if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF) { + // + // Dispatch a QMFv2 formatted message + // + iter = properties.find(protocol::HEADER_KEY_OPCODE); + if (iter == properties.end()) { + QPID_LOG(trace, "Message received with no 'qmf.opcode' header"); + return; + } + + const string& opcode = iter->second.asString(); + + if (msg.getContentType() == "amqp/list") { + Variant::List content; + decode(msg, content); + + if (opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST) handleLocateRequest(content, msg); + else { + QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode); + } + + } else if (msg.getContentType() == "amqp/map") { + Variant::Map content; + decode(msg, content); + + if (opcode == protocol::HEADER_OPCODE_METHOD_REQUEST) handleMethodRequest(content, msg); + else if (opcode == protocol::HEADER_OPCODE_QUERY_REQUEST) handleQueryRequest(content, msg); + else { + QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode); + } + } else { + QPID_LOG(trace, "Unexpected QMFv2 content type. Expected amqp/list or amqp/map"); + } + + } else { + // + // Dispatch a QMFv1 formatted message + // + const string& body(msg.getContent()); + if (body.size() < 8) + return; + qpid::management::Buffer buffer(const_cast<char*>(body.c_str()), body.size()); + + if (buffer.getOctet() != 'A') return; + if (buffer.getOctet() != 'M') return; + if (buffer.getOctet() != '2') return; + char v1Opcode(buffer.getOctet()); + uint32_t seq(buffer.getLong()); + + if (v1Opcode == 'S') handleV1SchemaRequest(buffer, seq, msg); + else { + QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode); + } + } +} + + +void AgentSessionImpl::sendHeartbeat() +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + std::stringstream address; + + address << "agent.ind.heartbeat"; + + // append .<vendor>.<product> to address key if present. + Variant::Map::const_iterator v; + if ((v = attributes.find("_vendor")) != attributes.end() && !v->second.getString().empty()) { + address << "." << v->second.getString(); + if ((v = attributes.find("_product")) != attributes.end() && !v->second.getString().empty()) { + address << "." << v->second.getString(); + } + } + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_INDICATION; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + msg.setSubject(address.str()); + + map["_values"] = attributes; + map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; + map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; + map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; + + encode(map, msg); + topicSender.send(msg); + QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName); +} + + +void AgentSessionImpl::send(Message msg, const Address& to) +{ + Sender sender; + + if (strictSecurity && to.getName() != topicBase) { + QPID_LOG(warning, "Address violates strict-security policy: " << to); + return; + } + + if (to.getName() == directBase) { + msg.setSubject(to.getSubject()); + sender = directSender; + } else if (to.getName() == topicBase) { + msg.setSubject(to.getSubject()); + sender = topicSender; + } else + sender = session.createSender(to); + + sender.send(msg); +} + + +void AgentSessionImpl::flushResponses(AgentEvent& event, bool final) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_RESPONSE; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_RESPONSE; + headers[protocol::HEADER_KEY_CONTENT] = protocol::HEADER_CONTENT_DATA; + headers[protocol::HEADER_KEY_AGENT] = agentName; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + if (!final) + headers[protocol::HEADER_KEY_PARTIAL] = Variant(); + + Variant::List body; + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + Data data(eventImpl.dequeueData()); + while (data.isValid()) { + DataImpl& dataImpl(DataImplAccess::get(data)); + body.push_back(dataImpl.asMap()); + data = eventImpl.dequeueData(); + } + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(body, msg); + send(msg, eventImpl.getReplyTo()); + + QPID_LOG(trace, "SENT QueryResponse to=" << eventImpl.getReplyTo()); +} + + +void AgentSessionImpl::periodicProcessing(uint64_t seconds) +{ + // + // The granularity of this timer is seconds. Don't waste time looking for work if + // it's been less than a second since we last visited. + // + if (seconds == lastVisit) + return; + //uint64_t thisInterval(seconds - lastVisit); + lastVisit = seconds; + + // + // First time through, set lastHeartbeat to the current time. + // + if (lastHeartbeat == 0) + lastHeartbeat = seconds; + + // + // If the hearbeat interval has elapsed, send a heartbeat. + // + if (forceHeartbeat || (seconds - lastHeartbeat >= interval)) { + lastHeartbeat = seconds; + forceHeartbeat = false; + sendHeartbeat(); + } + + // + // TODO: process any active subscriptions on their intervals. + // +} + + +void AgentSessionImpl::run() +{ + QPID_LOG(debug, "AgentSession thread started for agent " << agentName); + + try { + while (!threadCanceled) { + periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC); + + Receiver rx; + bool valid = session.nextReceiver(rx, Duration::SECOND); + if (threadCanceled) + break; + if (valid) { + try { + dispatch(rx.fetch()); + } catch (qpid::types::Exception& e) { + QPID_LOG(error, "Exception caught in message dispatch: " << e.what()); + } + session.acknowledge(); + } + } + } catch (qpid::types::Exception& e) { + QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what()); + enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED))); + } + + QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); +} + |