diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qmf/ConsoleSession.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/ConsoleSession.cpp')
-rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 618 |
1 files changed, 0 insertions, 618 deletions
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp deleted file mode 100644 index 7b839930e1..0000000000 --- a/cpp/src/qmf/ConsoleSession.cpp +++ /dev/null @@ -1,618 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qmf/PrivateImplRef.h" -#include "qmf/ConsoleSessionImpl.h" -#include "qmf/AgentImpl.h" -#include "qmf/SchemaId.h" -#include "qmf/SchemaImpl.h" -#include "qmf/ConsoleEventImpl.h" -#include "qmf/constants.h" -#include "qpid/log/Statement.h" -#include "qpid/messaging/AddressParser.h" -#include "qpid/messaging/Sender.h" -#include "qpid/messaging/Receiver.h" - -using namespace std; -using namespace qmf; -using qpid::messaging::Address; -using qpid::messaging::Connection; -using qpid::messaging::Receiver; -using qpid::messaging::Sender; -using qpid::messaging::Duration; -using qpid::messaging::Message; -using qpid::types::Variant; - -typedef qmf::PrivateImplRef<ConsoleSession> PI; - -ConsoleSession::ConsoleSession(ConsoleSessionImpl* impl) { PI::ctor(*this, impl); } -ConsoleSession::ConsoleSession(const ConsoleSession& s) : qmf::Handle<ConsoleSessionImpl>() { PI::copy(*this, s); } -ConsoleSession::~ConsoleSession() { PI::dtor(*this); } -ConsoleSession& ConsoleSession::operator=(const ConsoleSession& s) { return PI::assign(*this, s); } - -ConsoleSession::ConsoleSession(Connection& c, const string& o) { PI::ctor(*this, new ConsoleSessionImpl(c, o)); } -void ConsoleSession::setDomain(const string& d) { impl->setDomain(d); } -void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); } -void ConsoleSession::open() { impl->open(); } -void ConsoleSession::close() { impl->close(); } -bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); } -int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); } -uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } -Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } -Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } -Subscription ConsoleSession::subscribe(const Query& q, const string& f, const string& o) { return impl->subscribe(q, f, o); } -Subscription ConsoleSession::subscribe(const string& q, const string& f, const string& o) { return impl->subscribe(q, f, o); } - -//======================================================================================== -// Impl Method Bodies -//======================================================================================== - -ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), - opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), - connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) -{ - if (!options.empty()) { - qpid::messaging::AddressParser parser(options); - Variant::Map optMap; - Variant::Map::const_iterator iter; - - parser.parseMap(optMap); - - iter = optMap.find("domain"); - if (iter != optMap.end()) - domain = iter->second.asString(); - - iter = optMap.find("max-agent-age"); - if (iter != optMap.end()) - maxAgentAgeMinutes = iter->second.asUint32(); - - iter = optMap.find("listen-on-direct"); - if (iter != optMap.end()) - listenOnDirect = iter->second.asBool(); - - iter = optMap.find("strict-security"); - if (iter != optMap.end()) - strictSecurity = iter->second.asBool(); - } -} - - -ConsoleSessionImpl::~ConsoleSessionImpl() -{ - if (opened) - close(); -} - - -void ConsoleSessionImpl::setAgentFilter(const string& predicate) -{ - agentQuery = Query(QUERY_OBJECT, predicate); - - // - // Purge the agent list of any agents that don't match the filter. - // - { - qpid::sys::Mutex::ScopedLock l(lock); - map<string, Agent> toDelete; - for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++) - if (!agentQuery.matchesPredicate(iter->second.getAttributes())) { - toDelete[iter->first] = iter->second; - if (iter->second.getName() == connectedBrokerAgent.getName()) - connectedBrokerInAgentList = false; - } - - for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { - agents.erase(iter->first); - auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_FILTER)); - eventImpl->setAgent(iter->second); - enqueueEventLH(eventImpl.release()); - } - - if (!connectedBrokerInAgentList && connectedBrokerAgent.isValid() && - agentQuery.matchesPredicate(connectedBrokerAgent.getAttributes())) { - agents[connectedBrokerAgent.getName()] = connectedBrokerAgent; - connectedBrokerInAgentList = true; - - // - // Enqueue a notification of the new agent. - // - auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); - eventImpl->setAgent(connectedBrokerAgent); - enqueueEventLH(ConsoleEvent(eventImpl.release())); - } - } - - // - // Broadcast an agent locate request with our new criteria. - // - if (opened) - sendAgentLocate(); -} - - -void ConsoleSessionImpl::open() -{ - if (opened) - throw QmfException("The session is already open"); - - // Establish messaging addresses - directBase = "qmf." + domain + ".direct"; - topicBase = "qmf." + domain + ".topic"; - - string myKey("direct-console." + qpid::types::Uuid(true).str()); - - replyAddress = Address(topicBase + "/" + myKey + ";{node:{type:topic}}"); - - // Create AMQP session, receivers, and senders - session = connection.createSession(); - Receiver directRx = session.createReceiver(replyAddress); - Receiver topicRx = session.createReceiver(topicBase + "/agent.#"); // TODO: be more discriminating - if (!strictSecurity) { - Receiver legacyRx = session.createReceiver("amq.direct/" + myKey + ";{node:{type:topic}}"); - legacyRx.setCapacity(64); - directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); - directSender.setCapacity(128); - } - - directRx.setCapacity(64); - topicRx.setCapacity(128); - - topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); - - topicSender.setCapacity(128); - - // Start the receiver thread - threadCanceled = false; - thread = new qpid::sys::Thread(*this); - - // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. - sendBrokerLocate(); - if (agentQuery) - sendAgentLocate(); - - opened = true; -} - - -void ConsoleSessionImpl::close() -{ - if (!opened) - throw QmfException("The session is already closed"); - - // Stop and join the receiver thread - threadCanceled = true; - thread->join(); - delete thread; - - // Close the AMQP session - session.close(); - opened = false; -} - - -bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) -{ - uint64_t milliseconds = timeout.getMilliseconds(); - qpid::sys::Mutex::ScopedLock l(lock); - - if (eventQueue.empty() && milliseconds > 0) - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), - qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); - - if (!eventQueue.empty()) { - event = eventQueue.front(); - eventQueue.pop(); - return true; - } - - return false; -} - - -int ConsoleSessionImpl::pendingEvents() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventQueue.size(); -} - - -uint32_t ConsoleSessionImpl::getAgentCount() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return agents.size(); -} - - -Agent ConsoleSessionImpl::getAgent(uint32_t i) const -{ - qpid::sys::Mutex::ScopedLock l(lock); - uint32_t count = 0; - for (map<string, Agent>::const_iterator iter = agents.begin(); iter != agents.end(); iter++) - if (count++ == i) - return iter->second; - throw IndexOutOfRange(); -} - - -Subscription ConsoleSessionImpl::subscribe(const Query&, const string&, const string&) -{ - return Subscription(); -} - - -Subscription ConsoleSessionImpl::subscribe(const string&, const string&, const string&) -{ - return Subscription(); -} - - -void ConsoleSessionImpl::enqueueEvent(const ConsoleEvent& event) -{ - qpid::sys::Mutex::ScopedLock l(lock); - enqueueEventLH(event); -} - - -void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event) -{ - bool notify = eventQueue.empty(); - eventQueue.push(event); - if (notify) - cond.notify(); -} - - -void ConsoleSessionImpl::dispatch(Message msg) -{ - const Variant::Map& properties(msg.getProperties()); - Variant::Map::const_iterator iter; - Variant::Map::const_iterator oiter; - - oiter = properties.find(protocol::HEADER_KEY_OPCODE); - iter = properties.find(protocol::HEADER_KEY_APP_ID); - if (iter == properties.end()) - iter = properties.find("app_id"); - if (iter != properties.end() && iter->second.asString() == protocol::HEADER_APP_ID_QMF && oiter != properties.end()) { - // - // Dispatch a QMFv2 formatted message - // - const string& opcode = oiter->second.asString(); - - iter = properties.find(protocol::HEADER_KEY_AGENT); - if (iter == properties.end()) { - QPID_LOG(trace, "Message received with no 'qmf.agent' header"); - return; - } - const string& agentName = iter->second.asString(); - - Agent agent; - { - qpid::sys::Mutex::ScopedLock l(lock); - map<string, Agent>::iterator aIter = agents.find(agentName); - if (aIter != agents.end()) { - agent = aIter->second; - AgentImplAccess::get(agent).touch(); - } - } - - if (msg.getContentType() == "amqp/map" && - (opcode == protocol::HEADER_OPCODE_AGENT_HEARTBEAT_INDICATION || opcode == protocol::HEADER_OPCODE_AGENT_LOCATE_RESPONSE)) { - // - // This is the one case where it's ok (necessary actually) to receive a QMFv2 - // message from an unknown agent (how else are they going to get known?) - // - Variant::Map content; - decode(msg, content); - handleAgentUpdate(agentName, content, msg); - return; - } - - if (!agent.isValid()) - return; - - AgentImpl& agentImpl(AgentImplAccess::get(agent)); - - if (msg.getContentType() == "amqp/map") { - Variant::Map content; - decode(msg, content); - - if (opcode == protocol::HEADER_OPCODE_EXCEPTION) agentImpl.handleException(content, msg); - else if (opcode == protocol::HEADER_OPCODE_METHOD_RESPONSE) agentImpl.handleMethodResponse(content, msg); - else - QPID_LOG(error, "Received a map-formatted QMFv2 message with opcode=" << opcode); - - return; - } - - if (msg.getContentType() == "amqp/list") { - Variant::List content; - decode(msg, content); - - if (opcode == protocol::HEADER_OPCODE_QUERY_RESPONSE) agentImpl.handleQueryResponse(content, msg); - else if (opcode == protocol::HEADER_OPCODE_DATA_INDICATION) agentImpl.handleDataIndication(content, msg); - else - QPID_LOG(error, "Received a list-formatted QMFv2 message with opcode=" << opcode); - - return; - } - } else { - // - // Dispatch a QMFv1 formatted message - // - const string& body(msg.getContent()); - if (body.size() < 8) - return; - qpid::management::Buffer buffer(const_cast<char*>(body.c_str()), body.size()); - - if (buffer.getOctet() != 'A') return; - if (buffer.getOctet() != 'M') return; - if (buffer.getOctet() != '2') return; - char v1Opcode(buffer.getOctet()); - uint32_t seq(buffer.getLong()); - - if (v1Opcode == 's') handleV1SchemaResponse(buffer, seq, msg); - else { - QPID_LOG(trace, "Unknown or Unsupported QMFv1 opcode: " << v1Opcode); - } - } -} - - -void ConsoleSessionImpl::sendBrokerLocate() -{ - Message msg; - Variant::Map& headers(msg.getProperties()); - - headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; - headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; - headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; - - msg.setReplyTo(replyAddress); - msg.setCorrelationId("broker-locate"); - msg.setSubject("broker"); - - Sender sender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); - sender.send(msg); - sender.close(); - - QPID_LOG(trace, "SENT AgentLocate to broker"); -} - - -void ConsoleSessionImpl::sendAgentLocate() -{ - Message msg; - Variant::Map& headers(msg.getProperties()); - static const string subject("console.request.agent_locate"); - - headers[protocol::HEADER_KEY_METHOD] = protocol::HEADER_METHOD_REQUEST; - headers[protocol::HEADER_KEY_OPCODE] = protocol::HEADER_OPCODE_AGENT_LOCATE_REQUEST; - headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; - - msg.setReplyTo(replyAddress); - msg.setCorrelationId("agent-locate"); - msg.setSubject(subject); - encode(agentQuery.getPredicate(), msg); - - topicSender.send(msg); - - QPID_LOG(trace, "SENT AgentLocate to=" << topicSender.getName() << "/" << subject); -} - - -void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Variant::Map& content, const Message& msg) -{ - Variant::Map::const_iterator iter; - Agent agent; - uint32_t epoch(0); - string cid(msg.getCorrelationId()); - - iter = content.find("_values"); - if (iter == content.end()) - return; - const Variant::Map& in_attrs(iter->second.asMap()); - Variant::Map attrs; - - // - // Copy the map from the message to "attrs". Translate any old-style - // keys to their new key values in the process. - // - for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) { - if (iter->first == "epoch") - attrs[protocol::AGENT_ATTR_EPOCH] = iter->second; - else if (iter->first == "timestamp") - attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second; - else if (iter->first == "heartbeat_interval") - attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second; - else - attrs[iter->first] = iter->second; - } - - iter = attrs.find(protocol::AGENT_ATTR_EPOCH); - if (iter != attrs.end()) - epoch = iter->second.asUint32(); - - if (cid == "broker-locate") { - qpid::sys::Mutex::ScopedLock l(lock); - auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); - for (iter = attrs.begin(); iter != attrs.end(); iter++) - if (iter->first != protocol::AGENT_ATTR_EPOCH) - impl->setAttribute(iter->first, iter->second); - agent = Agent(impl.release()); - connectedBrokerAgent = agent; - if (!agentQuery || agentQuery.matchesPredicate(attrs)) { - connectedBrokerInAgentList = true; - agents[agentName] = agent; - - // - // Enqueue a notification of the new agent. - // - auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); - eventImpl->setAgent(agent); - enqueueEventLH(ConsoleEvent(eventImpl.release())); - } - return; - } - - // - // Check this agent against the agent filter. Exit if it doesn't match. - // (only if this isn't the connected broker agent) - // - if (agentQuery && (!agentQuery.matchesPredicate(attrs))) - return; - - QPID_LOG(trace, "RCVD AgentHeartbeat from an agent matching our filter: " << agentName); - - { - qpid::sys::Mutex::ScopedLock l(lock); - map<string, Agent>::iterator aIter = agents.find(agentName); - if (aIter == agents.end()) { - // - // This is a new agent. We have no current record of its existence. - // - auto_ptr<AgentImpl> impl(new AgentImpl(agentName, epoch, *this)); - for (iter = attrs.begin(); iter != attrs.end(); iter++) - if (iter->first != protocol::AGENT_ATTR_EPOCH) - impl->setAttribute(iter->first, iter->second); - agent = Agent(impl.release()); - agents[agentName] = agent; - - // - // Enqueue a notification of the new agent. - // - auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_ADD)); - eventImpl->setAgent(agent); - enqueueEventLH(ConsoleEvent(eventImpl.release())); - } else { - // - // This is a refresh of an agent we are already tracking. - // - bool detectedRestart(false); - agent = aIter->second; - AgentImpl& impl(AgentImplAccess::get(agent)); - impl.touch(); - if (impl.getEpoch() != epoch) { - // - // The agent has restarted since the last time we heard from it. - // Enqueue a notification. - // - impl.setEpoch(epoch); - auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_RESTART)); - eventImpl->setAgent(agent); - enqueueEventLH(ConsoleEvent(eventImpl.release())); - detectedRestart = true; - } - - iter = attrs.find(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP); - if (iter != attrs.end()) { - uint64_t ts(iter->second.asUint64()); - if (ts > impl.getAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP).asUint64()) { - // - // The agent has added new schema entries since we last heard from it. - // Update the attribute and, if this doesn't accompany a restart, enqueue a notification. - // - if (!detectedRestart) { - auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_SCHEMA_UPDATE)); - eventImpl->setAgent(agent); - enqueueEventLH(ConsoleEvent(eventImpl.release())); - } - impl.setAttribute(protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP, iter->second); - } - } - } - } -} - - -void ConsoleSessionImpl::handleV1SchemaResponse(qpid::management::Buffer& buffer, uint32_t, const Message&) -{ - QPID_LOG(trace, "RCVD V1SchemaResponse"); - Schema schema(new SchemaImpl(buffer)); - schemaCache->declareSchema(schema); -} - - -void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) -{ - // - // The granularity of this timer is seconds. Don't waste time looking for work if - // it's been less than a second since we last visited. - // - if (seconds == lastVisit) - return; - lastVisit = seconds; - - // - // Handle the aging of agent records - // - if (lastAgePass == 0) - lastAgePass = seconds; - if (seconds - lastAgePass >= 60) { - lastAgePass = seconds; - map<string, Agent> toDelete; - qpid::sys::Mutex::ScopedLock l(lock); - - for (map<string, Agent>::iterator iter = agents.begin(); iter != agents.end(); iter++) - if ((iter->second.getName() != connectedBrokerAgent.getName()) && - (AgentImplAccess::get(iter->second).age() > maxAgentAgeMinutes)) - toDelete[iter->first] = iter->second; - - for (map<string, Agent>::iterator iter = toDelete.begin(); iter != toDelete.end(); iter++) { - agents.erase(iter->first); - auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_AGENT_DEL, AGENT_DEL_AGED)); - eventImpl->setAgent(iter->second); - enqueueEventLH(eventImpl.release()); - } - } -} - - -void ConsoleSessionImpl::run() -{ - QPID_LOG(debug, "ConsoleSession thread started"); - - try { - while (!threadCanceled) { - periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / - qpid::sys::TIME_SEC); - - Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND); - if (threadCanceled) - break; - if (valid) { - try { - dispatch(rx.fetch()); - } catch (qpid::types::Exception& e) { - QPID_LOG(error, "Exception caught in message dispatch: " << e.what()); - } - session.acknowledge(); - } - } - } catch (qpid::types::Exception& e) { - QPID_LOG(error, "Exception caught in message thread - exiting: " << e.what()); - enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED))); - } - - QPID_LOG(debug, "ConsoleSession thread exiting"); -} - |