/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/console/SessionManager.h" #include "qpid/console/Schema.h" #include "qpid/console/Agent.h" #include "qpid/console/ConsoleListener.h" #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/FieldTable.h" using namespace qpid::console; using namespace qpid::sys; using namespace qpid; using namespace std; using qpid::framing::Buffer; using qpid::framing::FieldTable; SessionManager::SessionManager(ConsoleListener* _listener, Settings _settings) : listener(_listener), settings(_settings) { bindingKeys(); } SessionManager::~SessionManager() { for (vector::iterator iter = brokers.begin(); iter != brokers.end(); iter++) delete *iter; for (map::iterator iter = packages.begin(); iter != packages.end(); iter++) { for (Package::ClassMap::iterator citer = iter->second->classes.begin(); citer != iter->second->classes.end(); citer++) delete citer->second; delete iter->second; } } Broker* SessionManager::addBroker(client::ConnectionSettings& settings) { Broker* broker(new Broker(*this, settings)); { Mutex::ScopedLock l(brokerListLock); brokers.push_back(broker); } return broker; } void SessionManager::delBroker(Broker* broker) { Mutex::ScopedLock l(brokerListLock); for (vector::iterator iter = brokers.begin(); iter != brokers.end(); iter++) if (*iter == broker) { brokers.erase(iter); delete broker; return; } } void SessionManager::getPackages(NameVector& packageNames) { allBrokersStable(); packageNames.clear(); { Mutex::ScopedLock l(lock); for (map::iterator iter = packages.begin(); iter != packages.end(); iter++) packageNames.push_back(iter->first); } } void SessionManager::getClasses(KeyVector& classKeys, const std::string& packageName) { allBrokersStable(); classKeys.clear(); map::iterator iter = packages.find(packageName); if (iter == packages.end()) return; Package& package = *(iter->second); for (Package::ClassMap::const_iterator piter = package.classes.begin(); piter != package.classes.end(); piter++) { ClassKey key(piter->second->getClassKey()); classKeys.push_back(key); } } SchemaClass& SessionManager::getSchema(const ClassKey& classKey) { allBrokersStable(); map::iterator iter = packages.find(classKey.getPackageName()); if (iter == packages.end()) throw Exception("Unknown package"); Package& package = *(iter->second); Package::NameHash key(classKey.getClassName(), classKey.getHash()); Package::ClassMap::iterator cIter = package.classes.find(key); if (cIter == package.classes.end()) throw Exception("Unknown class"); return *(cIter->second); } void SessionManager::bindPackage(const std::string& packageName) { stringstream key; key << "console.obj.*.*." << packageName << ".#"; bindingKeyList.push_back(key.str()); for (vector::iterator iter = brokers.begin(); iter != brokers.end(); iter++) (*iter)->addBinding(key.str()); } void SessionManager::bindClass(const ClassKey& classKey) { bindClass(classKey.getPackageName(), classKey.getClassName()); } void SessionManager::bindClass(const std::string& packageName, const std::string& className) { stringstream key; key << "console.obj.*.*." << packageName << "." << className << ".#"; bindingKeyList.push_back(key.str()); for (vector::iterator iter = brokers.begin(); iter != brokers.end(); iter++) (*iter)->addBinding(key.str()); } void SessionManager::bindEvent(const ClassKey& classKey) { bindEvent(classKey.getPackageName(), classKey.getClassName()); } void SessionManager::bindEvent(const std::string& packageName, const std::string& eventName) { if (!settings.userBindings) throw Exception("Session not configured for userBindings."); if (settings.rcvEvents) throw Exception("Session already configured to receive all events."); stringstream key; key << "console.event.*.*." << packageName; if (eventName.length()) { key << "." << eventName << ".#"; } else { key << ".#"; } bindingKeyList.push_back(key.str()); for (vector::iterator iter = brokers.begin(); iter != brokers.end(); iter++) (*iter)->addBinding(key.str()); } void SessionManager::getAgents(Agent::Vector& agents, Broker* broker) { agents.clear(); if (broker != 0) { broker->appendAgents(agents); } else { for (vector::iterator iter = brokers.begin(); iter != brokers.end(); iter++) { (*iter)->appendAgents(agents); } } } void SessionManager::getObjects(Object::Vector& objects, const std::string& className, Broker* _broker, Agent* _agent) { Agent::Vector agentList; if (_agent != 0) { agentList.push_back(_agent); _agent->getBroker()->waitForStable(); } else { if (_broker != 0) { _broker->appendAgents(agentList); _broker->waitForStable(); } else { allBrokersStable(); Mutex::ScopedLock _lock(brokerListLock); for (vector::iterator iter = brokers.begin(); iter != brokers.end(); iter++) { (*iter)->appendAgents(agentList); } } } FieldTable ft; uint32_t sequence; ft.setString("_class", className); getResult.clear(); syncSequenceList.clear(); error = string(); if (agentList.empty()) { objects = getResult; return; } for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) { Agent* agent = *iter; char rawbuffer[512]; Buffer buffer(rawbuffer, 512); stringstream routingKey; routingKey << "agent." << agent->getBrokerBank() << "." << agent->getAgentBank(); { Mutex::ScopedLock _lock(lock); sequence = sequenceManager.reserve("multiget"); syncSequenceList.insert(sequence); } agent->getBroker()->encodeHeader(buffer, 'G', sequence); ft.encode(buffer); uint32_t length = buffer.getPosition(); buffer.reset(); agent->getBroker()->connThreadBody.sendBuffer(buffer, length, "qpid.management", routingKey.str()); } { Mutex::ScopedLock _lock(lock); sys::AbsTime startTime = sys::now(); while (!syncSequenceList.empty() && error.empty()) { cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC)); sys::AbsTime currTime = sys::now(); if (sys::Duration(startTime, currTime) > settings.getTimeout * TIME_SEC) break; } } objects = getResult; } void SessionManager::bindingKeys() { bindingKeyList.push_back("schema.#"); if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) { bindingKeyList.push_back("console.#"); } else { if (settings.rcvObjects && !settings.userBindings) bindingKeyList.push_back("console.obj.#"); else bindingKeyList.push_back("console.obj.*.*.org.apache.qpid.broker.agent"); if (settings.rcvEvents) bindingKeyList.push_back("console.event.#"); if (settings.rcvHeartbeats) bindingKeyList.push_back("console.heartbeat"); } } void SessionManager::allBrokersStable() { Mutex::ScopedLock l(brokerListLock); for (vector::iterator iter = brokers.begin(); iter != brokers.end(); iter++) if ((*iter)->isConnected()) (*iter)->waitForStable(); } void SessionManager::startProtocol(Broker* broker) { char rawbuffer[512]; Buffer buffer(rawbuffer, 512); broker->encodeHeader(buffer, 'B'); uint32_t length = 512 - buffer.available(); buffer.reset(); broker->connThreadBody.sendBuffer(buffer, length); } void SessionManager::handleBrokerResp(Broker* broker, Buffer& inBuffer, uint32_t) { framing::Uuid brokerId; brokerId.decode(inBuffer); broker->setBrokerId(brokerId); char rawbuffer[512]; Buffer buffer(rawbuffer, 512); uint32_t sequence = sequenceManager.reserve("startup"); broker->encodeHeader(buffer, 'P', sequence); uint32_t length = 512 - buffer.available(); buffer.reset(); broker->connThreadBody.sendBuffer(buffer, length); if (listener != 0) { listener->brokerInfo(*broker); } } void SessionManager::handlePackageInd(Broker* broker, Buffer& inBuffer, uint32_t) { string packageName; inBuffer.getShortString(packageName); { Mutex::ScopedLock l(lock); map::iterator iter = packages.find(packageName); if (iter == packages.end()) { packages[packageName] = new Package(packageName); if (listener != 0) listener->newPackage(packageName); } } broker->incOutstanding(); char rawbuffer[512]; Buffer buffer(rawbuffer, 512); uint32_t sequence = sequenceManager.reserve("startup"); broker->encodeHeader(buffer, 'Q', sequence); buffer.putShortString(packageName); uint32_t length = 512 - buffer.available(); buffer.reset(); broker->connThreadBody.sendBuffer(buffer, length); } void SessionManager::handleCommandComplete(Broker* broker, Buffer& inBuffer, uint32_t sequence) { Mutex::ScopedLock l(lock); uint32_t resultCode = inBuffer.getLong(); string resultText; inBuffer.getShortString(resultText); string context = sequenceManager.release(sequence); if (resultCode != 0) QPID_LOG(debug, "Received error in completion: " << resultCode << " " << resultText); if (context == "startup") { broker->decOutstanding(); } else if (context == "multiget") { if (syncSequenceList.count(sequence) == 1) { syncSequenceList.erase(sequence); if (syncSequenceList.empty()) { cv.notify(); } } } // TODO: Other context cases } void SessionManager::handleClassInd(Broker* broker, Buffer& inBuffer, uint32_t) { string packageName; string className; uint8_t hash[16]; /*kind*/ (void) inBuffer.getOctet(); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); { Mutex::ScopedLock l(lock); map::iterator pIter = packages.find(packageName); if (pIter == packages.end() || pIter->second->getClass(className, hash)) return; } broker->incOutstanding(); char rawbuffer[512]; Buffer buffer(rawbuffer, 512); uint32_t sequence = sequenceManager.reserve("startup"); broker->encodeHeader(buffer, 'S', sequence); buffer.putShortString(packageName); buffer.putShortString(className); buffer.putBin128(hash); uint32_t length = 512 - buffer.available(); buffer.reset(); broker->connThreadBody.sendBuffer(buffer, length); } void SessionManager::handleMethodResp(Broker* broker, Buffer& buffer, uint32_t sequence) { if (broker->methodObject) { broker->methodObject->handleMethodResp(buffer, sequence); } } void SessionManager::handleHeartbeatInd(Broker* /*broker*/, Buffer& /*inBuffer*/, uint32_t /*sequence*/) { } void SessionManager::handleEventInd(Broker* broker, Buffer& buffer, uint32_t /*sequence*/) { string packageName; string className; uint8_t hash[16]; SchemaClass* schemaClass; buffer.getShortString(packageName); buffer.getShortString(className); buffer.getBin128(hash); { Mutex::ScopedLock l(lock); map::iterator pIter = packages.find(packageName); if (pIter == packages.end()) return; schemaClass = pIter->second->getClass(className, hash); if (schemaClass == 0) return; } Event event(broker, schemaClass, buffer); if (listener) listener->event(event); } void SessionManager::handleSchemaResp(Broker* broker, Buffer& inBuffer, uint32_t sequence) { uint8_t kind; string packageName; string className; uint8_t hash[16]; kind = inBuffer.getOctet(); inBuffer.getShortString(packageName); inBuffer.getShortString(className); inBuffer.getBin128(hash); { Mutex::ScopedLock l(lock); map::iterator pIter = packages.find(packageName); if (pIter != packages.end() && !pIter->second->getClass(className, hash)) { ClassKey key(packageName, className, hash); SchemaClass* schemaClass(new SchemaClass(kind, key, inBuffer)); pIter->second->addClass(className, hash, schemaClass); if (listener != 0) { listener->newClass(schemaClass->getClassKey()); } } } sequenceManager.release(sequence); broker->decOutstanding(); } void SessionManager::handleContentInd(Broker* broker, Buffer& buffer, uint32_t sequence, bool prop, bool stat) { string packageName; string className; uint8_t hash[16]; SchemaClass* schemaClass; buffer.getShortString(packageName); buffer.getShortString(className); buffer.getBin128(hash); { Mutex::ScopedLock l(lock); map::iterator pIter = packages.find(packageName); if (pIter == packages.end()) return; schemaClass = pIter->second->getClass(className, hash); if (schemaClass == 0) return; } Object object(broker, schemaClass, buffer, prop, stat); if (prop && className == "agent" && packageName == "org.apache.qpid.broker") broker->updateAgent(object); { Mutex::ScopedLock l(lock); if (syncSequenceList.count(sequence) == 1) { if (!object.isDeleted()) getResult.push_back(object); return; } } if (listener) { if (prop) listener->objectProps(*broker, object); if (stat) listener->objectStats(*broker, object); } } void SessionManager::handleBrokerConnect(Broker* broker) { if (listener != 0) listener->brokerConnected(*broker); } void SessionManager::handleBrokerDisconnect(Broker* broker) { if (listener != 0) listener->brokerDisconnected(*broker); }