diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/cpp/src/qmf/Agent.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/Agent.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/Agent.cpp | 657 |
1 files changed, 657 insertions, 0 deletions
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp new file mode 100644 index 0000000000..915f2a1c88 --- /dev/null +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -0,0 +1,657 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qmf/AgentImpl.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/ConsoleEventImpl.h" +#include "qmf/ConsoleSession.h" +#include "qmf/DataImpl.h" +#include "qmf/Query.h" +#include "qmf/SchemaImpl.h" +#include "qmf/agentCapability.h" +#include "qmf/constants.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/management/Buffer.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> + +using qpid::types::Variant; +using qpid::messaging::Duration; +using qpid::messaging::Message; +using qpid::messaging::Sender; +using namespace std; +using namespace qmf; + +typedef PrivateImplRef<Agent> PI; + +Agent::Agent(AgentImpl* impl) { PI::ctor(*this, impl); } +Agent::Agent(const Agent& s) : qmf::Handle<AgentImpl>() { PI::copy(*this, s); } +Agent::~Agent() { PI::dtor(*this); } +Agent& Agent::operator=(const Agent& s) { return PI::assign(*this, s); } +string Agent::getName() const { return isValid() ? impl->getName() : ""; } +uint32_t Agent::getEpoch() const { return isValid() ? impl->getEpoch() : 0; } +string Agent::getVendor() const { return isValid() ? impl->getVendor() : ""; } +string Agent::getProduct() const { return isValid() ? impl->getProduct() : ""; } +string Agent::getInstance() const { return isValid() ? impl->getInstance() : ""; } +const Variant& Agent::getAttribute(const string& k) const { return impl->getAttribute(k); } +const Variant::Map& Agent::getAttributes() const { return impl->getAttributes(); } +ConsoleEvent Agent::querySchema(Duration t) { return impl->querySchema(t); } +uint32_t Agent::querySchemaAsync() { return impl->querySchemaAsync(); } +ConsoleEvent Agent::query(const Query& q, Duration t) { return impl->query(q, t); } +ConsoleEvent Agent::query(const string& q, Duration t) { return impl->query(q, t); } +uint32_t Agent::queryAsync(const Query& q) { return impl->queryAsync(q); } +uint32_t Agent::queryAsync(const string& q) { return impl->queryAsync(q); } +ConsoleEvent Agent::callMethod(const string& m, const Variant::Map& a, const DataAddr& d, Duration t) { return impl->callMethod(m, a, d, t); } +uint32_t Agent::callMethodAsync(const string& m, const Variant::Map& a, const DataAddr& d) { return impl->callMethodAsync(m, a, d); } +uint32_t Agent::getPackageCount() const { return impl->getPackageCount(); } +const string& Agent::getPackage(uint32_t i) const { return impl->getPackage(i); } +uint32_t Agent::getSchemaIdCount(const string& p) const { return impl->getSchemaIdCount(p); } +SchemaId Agent::getSchemaId(const string& p, uint32_t i) const { return impl->getSchemaId(p, i); } +Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(s, t); } + + + +AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : + name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), + sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache) +{ +} + +void AgentImpl::setAttribute(const std::string& k, const qpid::types::Variant& v) +{ + attributes[k] = v; + if (k == "qmf.agent_capability") + try { + capability = v.asUint32(); + } catch (std::exception&) {} + if (k == "_direct_subject") + try { + directSubject = v.asString(); + sender = session.topicSender; + } catch (std::exception&) {} +} + +const Variant& AgentImpl::getAttribute(const string& k) const +{ + Variant::Map::const_iterator iter = attributes.find(k); + if (iter == attributes.end()) + throw KeyNotFound(k); + return iter->second; +} + + +ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) +{ + boost::shared_ptr<SyncContext> context(new SyncContext()); + uint32_t correlator; + ConsoleEvent result; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + contextMap[correlator] = context; + } + try { + sendQuery(query, correlator); + { + uint64_t milliseconds = timeout.getMilliseconds(); + qpid::sys::Mutex::ScopedLock cl(context->lock); + if (!context->response.isValid() || !context->response.isFinal()) + context->cond.wait(context->lock, + qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (context->response.isValid() && + ((context->response.getType() == CONSOLE_QUERY_RESPONSE && context->response.isFinal()) || + (context->response.getType() == CONSOLE_EXCEPTION))) + result = context->response; + else { + auto_ptr<ConsoleEventImpl> impl(new ConsoleEventImpl(CONSOLE_EXCEPTION)); + Data exception(new DataImpl()); + exception.setProperty("error_text", "Timed out waiting for the agent to respond"); + impl->addData(exception); + result = ConsoleEvent(impl.release()); + } + } + } catch (qpid::types::Exception&) { + } + + { + qpid::sys::Mutex::ScopedLock l(lock); + contextMap.erase(correlator); + } + + return result; +} + + +ConsoleEvent AgentImpl::query(const string& text, Duration timeout) +{ + return query(stringToQuery(text), timeout); +} + + +uint32_t AgentImpl::queryAsync(const Query& query) +{ + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } + + sendQuery(query, correlator); + return correlator; +} + + +uint32_t AgentImpl::queryAsync(const string& text) +{ + return queryAsync(stringToQuery(text)); +} + + +ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout) +{ + boost::shared_ptr<SyncContext> context(new SyncContext()); + uint32_t correlator; + ConsoleEvent result; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + contextMap[correlator] = context; + } + try { + sendMethod(method, args, addr, correlator); + { + uint64_t milliseconds = timeout.getMilliseconds(); + qpid::sys::Mutex::ScopedLock cl(context->lock); + if (!context->response.isValid()) + context->cond.wait(context->lock, + qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (context->response.isValid()) + result = context->response; + else { + auto_ptr<ConsoleEventImpl> impl(new ConsoleEventImpl(CONSOLE_EXCEPTION)); + Data exception(new DataImpl()); + exception.setProperty("error_text", "Timed out waiting for the agent to respond"); + impl->addData(exception); + result = ConsoleEvent(impl.release()); + } + } + } catch (qpid::types::Exception&) { + } + + { + qpid::sys::Mutex::ScopedLock l(lock); + contextMap.erase(correlator); + } + + return result; +} + + +uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr) +{ + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } + + sendMethod(method, args, addr, correlator); + return correlator; +} + + +uint32_t AgentImpl::getPackageCount() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + + // + // Populate the package set. + // + for (set<SchemaId>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++) + packageSet.insert(iter->getPackageName()); + + return packageSet.size(); +} + + +const string& AgentImpl::getPackage(uint32_t idx) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + uint32_t count(0); + for (set<string>::const_iterator iter = packageSet.begin(); iter != packageSet.end(); iter++) { + if (idx == count) + return *iter; + count++; + } + throw IndexOutOfRange(); +} + + +uint32_t AgentImpl::getSchemaIdCount(const string& pname) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + uint32_t count(0); + for (set<SchemaId>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++) + if (iter->getPackageName() == pname) + count++; + return count; +} + + +SchemaId AgentImpl::getSchemaId(const string& pname, uint32_t idx) const +{ + qpid::sys::Mutex::ScopedLock l(lock); + uint32_t count(0); + for (set<SchemaId>::const_iterator iter = schemaIdSet.begin(); iter != schemaIdSet.end(); iter++) { + if (iter->getPackageName() == pname) { + if (idx == count) + return *iter; + count++; + } + } + throw IndexOutOfRange(); +} + + +Schema AgentImpl::getSchema(const SchemaId& id, Duration timeout) +{ + if (!schemaCache->haveSchema(id)) + // + // The desired schema is not in the cache. We need to asynchronously query the remote + // agent for the information. The call to schemaCache->getSchema will block waiting for + // the response to be received. + // + sendSchemaRequest(id); + + return schemaCache->getSchema(id, timeout); +} + + +void AgentImpl::handleException(const Variant::Map& content, const Message& msg) +{ + const string& cid(msg.getCorrelationId()); + Variant::Map::const_iterator aIter; + uint32_t correlator; + boost::shared_ptr<SyncContext> context; + + try { correlator = boost::lexical_cast<uint32_t>(cid); } + catch(const boost::bad_lexical_cast&) { correlator = 0; } + + { + qpid::sys::Mutex::ScopedLock l(lock); + map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator); + if (iter != contextMap.end()) + context = iter->second; + } + + if (context.get() != 0) { + // + // This exception is associated with a synchronous request. + // + qpid::sys::Mutex::ScopedLock cl(context->lock); + context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_EXCEPTION)); + ConsoleEventImplAccess::get(context->response).addData(new DataImpl(content, this)); + ConsoleEventImplAccess::get(context->response).setAgent(this); + context->cond.notify(); + } else { + // + // This exception is associated with an asynchronous request. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_EXCEPTION)); + eventImpl->setCorrelator(correlator); + eventImpl->setAgent(this); + eventImpl->addData(new DataImpl(content, this)); + session.enqueueEvent(eventImpl.release()); + } +} + + +void AgentImpl::handleMethodResponse(const Variant::Map& response, const Message& msg) +{ + const string& cid(msg.getCorrelationId()); + Variant::Map::const_iterator aIter; + Variant::Map argMap; + uint32_t correlator; + boost::shared_ptr<SyncContext> context; + + QPID_LOG(trace, "RCVD MethodResponse cid=" << cid << " map=" << response); + + aIter = response.find("_arguments"); + if (aIter != response.end()) + argMap = aIter->second.asMap(); + + try { correlator = boost::lexical_cast<uint32_t>(cid); } + catch(const boost::bad_lexical_cast&) { correlator = 0; } + + { + qpid::sys::Mutex::ScopedLock l(lock); + map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator); + if (iter != contextMap.end()) + context = iter->second; + } + + if (context.get() != 0) { + // + // This response is associated with a synchronous request. + // + qpid::sys::Mutex::ScopedLock cl(context->lock); + context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_METHOD_RESPONSE)); + ConsoleEventImplAccess::get(context->response).setArguments(argMap); + ConsoleEventImplAccess::get(context->response).setAgent(this); + context->cond.notify(); + } else { + // + // This response is associated with an asynchronous request. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_METHOD_RESPONSE)); + eventImpl->setCorrelator(correlator); + eventImpl->setAgent(this); + eventImpl->setArguments(argMap); + session.enqueueEvent(eventImpl.release()); + } +} + + +void AgentImpl::handleDataIndication(const Variant::List& list, const Message& msg) +{ + Variant::Map::const_iterator aIter; + const Variant::Map& props(msg.getProperties()); + boost::shared_ptr<SyncContext> context; + + aIter = props.find("qmf.content"); + if (aIter == props.end()) + return; + + string content_type(aIter->second.asString()); + if (content_type != "_event") + return; + + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + const Variant::Map& eventMap(lIter->asMap()); + Data data(new DataImpl(eventMap, this)); + int severity(SEV_NOTICE); + uint64_t timestamp(0); + + aIter = eventMap.find("_severity"); + if (aIter != eventMap.end()) + severity = int(aIter->second.asInt8()); + + aIter = eventMap.find("_timestamp"); + if (aIter != eventMap.end()) + timestamp = aIter->second.asUint64(); + + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_EVENT)); + eventImpl->setAgent(this); + eventImpl->addData(data); + eventImpl->setSeverity(severity); + eventImpl->setTimestamp(timestamp); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + session.enqueueEvent(eventImpl.release()); + } +} + + +void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& msg) +{ + const string& cid(msg.getCorrelationId()); + Variant::Map::const_iterator aIter; + const Variant::Map& props(msg.getProperties()); + uint32_t correlator; + bool final(false); + boost::shared_ptr<SyncContext> context; + + aIter = props.find("partial"); + if (aIter == props.end()) + final = true; + + aIter = props.find("qmf.content"); + if (aIter == props.end()) + return; + + string content_type(aIter->second.asString()); + if (content_type != "_schema" && content_type != "_schema_id" && content_type != "_data") + return; + + try { correlator = boost::lexical_cast<uint32_t>(cid); } + catch(const boost::bad_lexical_cast&) { correlator = 0; } + + { + qpid::sys::Mutex::ScopedLock l(lock); + map<uint32_t, boost::shared_ptr<SyncContext> >::iterator iter = contextMap.find(correlator); + if (iter != contextMap.end()) + context = iter->second; + } + + if (context.get() != 0) { + // + // This response is associated with a synchronous request. + // + qpid::sys::Mutex::ScopedLock cl(context->lock); + if (!context->response.isValid()) + context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE)); + + if (content_type == "_data") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Data data(new DataImpl(lIter->asMap(), this)); + ConsoleEventImplAccess::get(context->response).addData(data); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + } + else if (content_type == "_schema_id") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + SchemaId schemaId(new SchemaIdImpl(lIter->asMap())); + ConsoleEventImplAccess::get(context->response).addSchemaId(schemaId); + learnSchemaId(schemaId); + } + else if (content_type == "_schema") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Schema schema(new SchemaImpl(lIter->asMap())); + schemaCache->declareSchema(schema); + } + + if (final) { + ConsoleEventImplAccess::get(context->response).setFinal(); + ConsoleEventImplAccess::get(context->response).setAgent(this); + context->cond.notify(); + } + } else { + // + // This response is associated with an asynchronous request. + // + auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE)); + eventImpl->setCorrelator(correlator); + eventImpl->setAgent(this); + + if (content_type == "_data") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Data data(new DataImpl(lIter->asMap(), this)); + eventImpl->addData(data); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + } + else if (content_type == "_schema_id") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + SchemaId schemaId(new SchemaIdImpl(lIter->asMap())); + eventImpl->addSchemaId(schemaId); + learnSchemaId(schemaId); + } + else if (content_type == "_schema") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Schema schema(new SchemaImpl(lIter->asMap())); + schemaCache->declareSchema(schema); + } + + if (final) + eventImpl->setFinal(); + if (content_type != "_schema") + session.enqueueEvent(eventImpl.release()); + } +} + + +Query AgentImpl::stringToQuery(const std::string& text) +{ + qpid::messaging::AddressParser parser(text); + Variant::Map map; + Variant::Map::const_iterator iter; + string className; + string packageName; + + parser.parseMap(map); + + iter = map.find("class"); + if (iter != map.end()) + className = iter->second.asString(); + + iter = map.find("package"); + if (iter != map.end()) + packageName = iter->second.asString(); + + Query query(QUERY_OBJECT, className, packageName); + + iter = map.find("where"); + if (iter != map.end()) + query.setPredicate(iter->second.asList()); + + return query; +} + + +void AgentImpl::sendQuery(const Query& query, uint32_t correlator) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_QUERY_REQUEST; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + + msg.setReplyTo(session.replyAddress); + msg.setCorrelationId(boost::lexical_cast<string>(correlator)); + msg.setSubject(directSubject); + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); + encode(QueryImplAccess::get(query).asMap(), msg); + if (sender.isValid()) { + sender.send(msg); + QPID_LOG(trace, "SENT QueryRequest to=" << sender.getName() << "/" << directSubject << " cid=" << correlator); + } +} + + +void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const DataAddr& addr, uint32_t correlator) +{ + Message msg; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); + + headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; + headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_METHOD_REQUEST; + headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; + + map["_method_name"] = method; + map["_object_id"] = addr.asMap(); + map["_arguments"] = args; + + msg.setReplyTo(session.replyAddress); + msg.setCorrelationId(boost::lexical_cast<string>(correlator)); + msg.setSubject(directSubject); + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); + encode(map, msg); + if (sender.isValid()) { + sender.send(msg); + QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << sender.getName() << "/" << directSubject << " content=" << map << " cid=" << correlator); + } +} + +void AgentImpl::sendSchemaRequest(const SchemaId& id) +{ + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } + + if (capability >= AGENT_CAPABILITY_V2_SCHEMA) { + Query query(QUERY_SCHEMA, id); + sendQuery(query, correlator); + return; + } + +#define RAW_BUFFER_SIZE 1024 + char rawBuffer[RAW_BUFFER_SIZE]; + qpid::management::Buffer buffer(rawBuffer, RAW_BUFFER_SIZE); + + buffer.putOctet('A'); + buffer.putOctet('M'); + buffer.putOctet('2'); + buffer.putOctet('S'); + buffer.putLong(correlator); + buffer.putShortString(id.getPackageName()); + buffer.putShortString(id.getName()); + buffer.putBin128(id.getHash().data()); + + string content(rawBuffer, buffer.getPosition()); + + Message msg; + msg.setReplyTo(session.replyAddress); + msg.setContent(content); + msg.setSubject(directSubject); + string userId(session.connection.getAuthenticatedUsername()); + if (!userId.empty()) + msg.setUserId(userId); + if (sender.isValid()) { + sender.send(msg); + QPID_LOG(trace, "SENT V1SchemaRequest to=" << sender.getName() << "/" << directSubject); + } +} + + +void AgentImpl::learnSchemaId(const SchemaId& id) +{ + schemaCache->declareSchemaId(id); + schemaIdSet.insert(id); +} + + +AgentImpl& AgentImplAccess::get(Agent& item) +{ + return *item.impl; +} + + +const AgentImpl& AgentImplAccess::get(const Agent& item) +{ + return *item.impl; +} |