diff options
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r-- | cpp/src/qpid/management/Buffer.cpp | 106 | ||||
-rw-r--r-- | cpp/src/qpid/management/ConnectionSettings.cpp | 40 | ||||
-rw-r--r-- | cpp/src/qpid/management/Manageable.cpp | 53 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 3121 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 432 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.cpp | 67 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.h | 59 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 385 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.cpp | 75 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.h | 63 | ||||
-rw-r--r-- | cpp/src/qpid/management/Mutex.cpp | 29 |
11 files changed, 0 insertions, 4430 deletions
diff --git a/cpp/src/qpid/management/Buffer.cpp b/cpp/src/qpid/management/Buffer.cpp deleted file mode 100644 index 7556b2a243..0000000000 --- a/cpp/src/qpid/management/Buffer.cpp +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/management/Buffer.h" -#include "qpid/framing/Buffer.h" -#include "qpid/amqp_0_10/Codecs.h" - -using namespace std; - -namespace qpid { -namespace management { - -Buffer::Buffer(char* data, uint32_t size) : impl(new framing::Buffer(data, size)) {} -Buffer::~Buffer() { delete impl; } -void Buffer::record() { impl->record(); } -void Buffer::restore(bool reRecord) { impl->restore(reRecord); } -void Buffer::reset() { impl->reset(); } -uint32_t Buffer::available() { return impl->available(); } -uint32_t Buffer::getSize() { return impl->getSize(); } -uint32_t Buffer::getPosition() { return impl->getPosition(); } -char* Buffer::getPointer() { return impl->getPointer(); } -void Buffer::putOctet(uint8_t i) { impl->putOctet(i); } -void Buffer::putShort(uint16_t i) { impl->putShort(i); } -void Buffer::putLong(uint32_t i) { impl->putLong(i); } -void Buffer::putLongLong(uint64_t i) { impl->putLongLong(i); } -void Buffer::putInt8(int8_t i) { impl->putInt8(i); } -void Buffer::putInt16(int16_t i) { impl->putInt16(i); } -void Buffer::putInt32(int32_t i) { impl->putInt32(i); } -void Buffer::putInt64(int64_t i) { impl->putInt64(i); } -void Buffer::putFloat(float i) { impl->putFloat(i); } -void Buffer::putDouble(double i) { impl->putDouble(i); } -void Buffer::putBin128(const uint8_t* i) { impl->putBin128(i); } -uint8_t Buffer::getOctet() { return impl->getOctet(); } -uint16_t Buffer::getShort() { return impl->getShort(); } -uint32_t Buffer::getLong() { return impl->getLong(); } -uint64_t Buffer::getLongLong() { return impl->getLongLong(); } -int8_t Buffer:: getInt8() { return impl-> getInt8(); } -int16_t Buffer::getInt16() { return impl->getInt16(); } -int32_t Buffer::getInt32() { return impl->getInt32(); } -int64_t Buffer::getInt64() { return impl->getInt64(); } -float Buffer::getFloat() { return impl->getFloat(); } -double Buffer::getDouble() { return impl->getDouble(); } -void Buffer::putShortString(const string& i) { impl->putShortString(i); } -void Buffer::putMediumString(const string& i) { impl->putMediumString(i); } -void Buffer::putLongString(const string& i) { impl->putLongString(i); } -void Buffer::getShortString(string& i) { impl->getShortString(i); } -void Buffer::getMediumString(string& i) { impl->getMediumString(i); } -void Buffer::getLongString(string& i) { impl->getLongString(i); } -void Buffer::getBin128(uint8_t* i) { impl->getBin128(i); } -void Buffer::putRawData(const string& i) { impl->putRawData(i); } -void Buffer::getRawData(string& s, uint32_t size) { impl->getRawData(s, size); } -void Buffer::putRawData(const uint8_t* data, size_t size) { impl->putRawData(data, size); } -void Buffer::getRawData(uint8_t* data, size_t size) { impl->getRawData(data, size); } - -void Buffer::putMap(const types::Variant::Map& i) -{ - string encoded; - amqp_0_10::MapCodec::encode(i, encoded); - impl->putRawData(encoded); -} - -void Buffer::putList(const types::Variant::List& i) -{ - string encoded; - amqp_0_10::ListCodec::encode(i, encoded); - impl->putRawData(encoded); -} - -void Buffer::getMap(types::Variant::Map& map) -{ - string encoded; - uint32_t saved = impl->getPosition(); - uint32_t length = impl->getLong(); - impl->setPosition(saved); - impl->getRawData(encoded, length + sizeof(uint32_t)); - amqp_0_10::MapCodec::decode(encoded, map); -} - -void Buffer::getList(types::Variant::List& list) -{ - string encoded; - uint32_t saved = impl->getPosition(); - uint32_t length = impl->getLong(); - impl->setPosition(saved); - impl->getRawData(encoded, length + sizeof(uint32_t)); - amqp_0_10::ListCodec::decode(encoded, list); -} - -}} diff --git a/cpp/src/qpid/management/ConnectionSettings.cpp b/cpp/src/qpid/management/ConnectionSettings.cpp deleted file mode 100644 index 1421a26867..0000000000 --- a/cpp/src/qpid/management/ConnectionSettings.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/management/ConnectionSettings.h" -#include "qpid/Version.h" - -qpid::management::ConnectionSettings::ConnectionSettings() : - protocol("tcp"), - host("localhost"), - port(5672), - locale("en_US"), - heartbeat(0), - maxChannels(32767), - maxFrameSize(65535), - bounds(2), - tcpNoDelay(false), - service(qpid::saslName), - minSsf(0), - maxSsf(256) -{} - -qpid::management::ConnectionSettings::~ConnectionSettings() {} - diff --git a/cpp/src/qpid/management/Manageable.cpp b/cpp/src/qpid/management/Manageable.cpp deleted file mode 100644 index 651215ffb5..0000000000 --- a/cpp/src/qpid/management/Manageable.cpp +++ /dev/null @@ -1,53 +0,0 @@ -// -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// - -#include "qpid/management/Manageable.h" - -using namespace qpid::management; -using std::string; - -string Manageable::StatusText (status_t status, string text) -{ - if ((status & STATUS_USER) == STATUS_USER) - return text; - - switch (status) - { - case STATUS_OK : return "OK"; - case STATUS_UNKNOWN_OBJECT : return "UnknownObject"; - case STATUS_UNKNOWN_METHOD : return "UnknownMethod"; - case STATUS_NOT_IMPLEMENTED : return "NotImplemented"; - case STATUS_PARAMETER_INVALID : return "InvalidParameter"; - case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented"; - case STATUS_FORBIDDEN : return "Forbidden"; - } - - return "??"; -} - -Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&) -{ - return STATUS_UNKNOWN_METHOD; -} - -bool Manageable::AuthorizeMethod(uint32_t, Args&, const std::string&) -{ - return true; -} - diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp deleted file mode 100644 index 8a12a57fa6..0000000000 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ /dev/null @@ -1,3121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -// NOTE on use of log levels: The criteria for using trace vs. debug -// is to use trace for log messages that are generated for each -// unbatched stats/props notification and debug for everything else. - -#include "qpid/management/ManagementAgent.h" -#include "qpid/management/ManagementObject.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/log/Statement.h" -#include <qpid/broker/Message.h> -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Thread.h" -#include "qpid/broker/ConnectionState.h" -#include "qpid/broker/AclModule.h" -#include "qpid/types/Variant.h" -#include "qpid/types/Uuid.h" -#include "qpid/framing/List.h" -#include "qpid/amqp_0_10/Codecs.h" -#include <list> -#include <iostream> -#include <fstream> -#include <sstream> -#include <typeinfo> - -using boost::intrusive_ptr; -using qpid::framing::Uuid; -using qpid::types::Variant; -using qpid::amqp_0_10::MapCodec; -using qpid::amqp_0_10::ListCodec; -using qpid::sys::Mutex; -using namespace qpid::framing; -using namespace qpid::management; -using namespace qpid::broker; -using namespace qpid; -using namespace std; -namespace _qmf = qmf::org::apache::qpid::broker; - - -namespace { - const string defaultVendorName("vendor"); - const string defaultProductName("product"); - - // Create a valid binding key substring by - // replacing all '.' chars with '_' - const string keyifyNameStr(const string& name) - { - string n2 = name; - - size_t pos = n2.find('.'); - while (pos != n2.npos) { - n2.replace(pos, 1, "_"); - pos = n2.find('.', pos); - } - return n2; - } - -struct ScopedManagementContext -{ - ScopedManagementContext(const qpid::broker::ConnectionState* context) - { - setManagementExecutionContext(context); - } - ~ScopedManagementContext() - { - setManagementExecutionContext(0); - } -}; -} - - -static Variant::Map mapEncodeSchemaId(const string& pname, - const string& cname, - const string& type, - const uint8_t *md5Sum) -{ - Variant::Map map_; - - map_["_package_name"] = pname; - map_["_class_name"] = cname; - map_["_type"] = type; - map_["_hash"] = qpid::types::Uuid(md5Sum); - return map_; -} - - -ManagementAgent::RemoteAgent::~RemoteAgent () -{ - QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); - if (mgmtObject != 0) { - mgmtObject->resourceDestroy(); - agent.deleteObjectNowLH(mgmtObject->getObjectId()); - } -} - -ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : - threadPoolSize(1), interval(10), broker(0), timer(0), - startTime(sys::now()), - suppressed(false), disallowAllV1Methods(false), - vendorNameKey(defaultVendorName), productNameKey(defaultProductName), - qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), - msgBuffer(MA_BUFFER_SIZE) -{ - nextObjectId = 1; - brokerBank = 1; - bootSequence = 1; - nextRemoteBank = 10; - nextRequestSequence = 1; - clientWasAdded = false; - attrMap["_vendor"] = defaultVendorName; - attrMap["_product"] = defaultProductName; -} - -ManagementAgent::~ManagementAgent () -{ - { - sys::Mutex::ScopedLock lock (userLock); - - // Reset the shared pointers to exchanges. If this is not done now, the exchanges - // will stick around until dExchange and mExchange are implicitly destroyed (long - // after this destructor completes). Those exchanges hold references to management - // objects that will be invalid. - dExchange.reset(); - mExchange.reset(); - v2Topic.reset(); - v2Direct.reset(); - - moveNewObjectsLH(); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - delete object; - } - managementObjects.clear(); - } -} - -void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, - qpid::broker::Broker* _broker, int _threads) -{ - dataDir = _dataDir; - interval = _interval; - broker = _broker; - threadPoolSize = _threads; - ManagementObject::maxThreads = threadPoolSize; - - // Get from file or generate and save to file. - if (dataDir.empty()) - { - uuid.generate(); - QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " - << uuid); - } - else - { - string filename(dataDir + "/.mbrokerdata"); - ifstream inFile(filename.c_str ()); - - if (inFile.good()) - { - inFile >> uuid; - inFile >> bootSequence; - inFile >> nextRemoteBank; - inFile.close(); - if (uuid.isNull()) { - uuid.generate(); - QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid); - } else - QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid); - - // if sequence goes beyond a 12-bit field, skip zero and wrap to 1. - bootSequence++; - if (bootSequence & 0xF000) - bootSequence = 1; - writeData(); - } - else - { - uuid.generate(); - QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid); - writeData(); - } - - QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); - } -} - -void ManagementAgent::pluginsInitialized() { - // Do this here so cluster plugin has the chance to set up the timer. - timer = &broker->getClusterTimer(); - timer->add(new Periodic(*this, interval)); -} - - -void ManagementAgent::setName(const string& vendor, const string& product, const string& instance) -{ - if (vendor.find(':') != vendor.npos) { - throw Exception("vendor string cannot contain a ':' character."); - } - if (product.find(':') != product.npos) { - throw Exception("product string cannot contain a ':' character."); - } - attrMap["_vendor"] = vendor; - attrMap["_product"] = product; - string inst; - if (instance.empty()) { - if (uuid.isNull()) - { - throw Exception("ManagementAgent::configure() must be called if default name is used."); - } - inst = uuid.str(); - } else - inst = instance; - - name_address = vendor + ":" + product + ":" + inst; - attrMap["_instance"] = inst; - attrMap["_name"] = name_address; - - vendorNameKey = keyifyNameStr(vendor); - productNameKey = keyifyNameStr(product); - instanceNameKey = keyifyNameStr(inst); -} - - -void ManagementAgent::getName(string& vendor, string& product, string& instance) -{ - vendor = std::string(attrMap["_vendor"]); - product = std::string(attrMap["_product"]); - instance = std::string(attrMap["_instance"]); -} - - -const std::string& ManagementAgent::getAddress() -{ - return name_address; -} - - -void ManagementAgent::writeData () -{ - string filename (dataDir + "/.mbrokerdata"); - ofstream outFile (filename.c_str ()); - - if (outFile.good()) - { - outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl; - outFile.close(); - } -} - -void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange, - qpid::broker::Exchange::shared_ptr _dexchange) -{ - mExchange = _mexchange; - dExchange = _dexchange; -} - -void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange, - qpid::broker::Exchange::shared_ptr _dexchange) -{ - v2Topic = _texchange; - v2Direct = _dexchange; -} - -void ManagementAgent::registerClass (const string& packageName, - const string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - sys::Mutex::ScopedLock lock(userLock); - PackageMap::iterator pIter = findOrAddPackageLH(packageName); - addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); -} - -void ManagementAgent::registerEvent (const string& packageName, - const string& eventName, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - sys::Mutex::ScopedLock lock(userLock); - PackageMap::iterator pIter = findOrAddPackageLH(packageName); - addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); -} - -// Deprecated: V1 objects -ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool persistent) -{ - uint16_t sequence; - uint64_t objectNum; - - sequence = persistent ? 0 : bootSequence; - objectNum = persistId ? persistId : nextObjectId++; - - ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum); - objId.setV2Key(*object); // let object generate the v2 key - - object->setObjectId(objId); - - { - sys::Mutex::ScopedLock lock(addLock); - newManagementObjects.push_back(object); - } - QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); - return objId; -} - - - -ObjectId ManagementAgent::addObject(ManagementObject* object, - const string& key, - bool persistent) -{ - uint16_t sequence; - - sequence = persistent ? 0 : bootSequence; - - ObjectId objId(0 /*flags*/, sequence, brokerBank); - if (key.empty()) { - objId.setV2Key(*object); // let object generate the key - } else { - objId.setV2Key(key); - } - - object->setObjectId(objId); - { - sys::Mutex::ScopedLock lock(addLock); - newManagementObjects.push_back(object); - } - QPID_LOG(debug, "Management object added: " << objId.getV2Key()); - return objId; -} - -void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) -{ - static const std::string severityStr[] = { - "emerg", "alert", "crit", "error", "warn", - "note", "info", "debug" - }; - sys::Mutex::ScopedLock lock (userLock); - uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; - - if (qmf1Support) { - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'e'); - outBuffer.putShortString(event.getPackageName()); - outBuffer.putShortString(event.getEventName()); - outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now()))); - outBuffer.putOctet(sev); - string sBuf; - event.encode(sBuf); - outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, mExchange, - "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); - QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); - } - - if (qmf2Support) { - Variant::Map map_; - Variant::Map schemaId; - Variant::Map values; - Variant::Map headers; - - map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), - event.getEventName(), - "_event", - event.getMd5Sum()); - event.mapEncode(values); - map_["_values"] = values; - map_["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); - map_["_severity"] = sev; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_event"; - headers["qmf.agent"] = name_address; - - stringstream key; - key << "agent.ind.event." << keyifyNameStr(event.getPackageName()) - << "." << keyifyNameStr(event.getEventName()) - << "." << severityStr[sev] - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - key << "." << instanceNameKey; - - - string content; - Variant::List list_; - list_.push_back(map_); - ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); - QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); - } -} - -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), - "ManagementAgent::periodicProcessing"), - agent(_agent) {} - -ManagementAgent::Periodic::~Periodic () {} - -void ManagementAgent::Periodic::fire () -{ - agent.timer->add (new Periodic (agent, agent.interval)); - agent.periodicProcessing (); -} - -void ManagementAgent::clientAdded (const string& routingKey) -{ - sys::Mutex::ScopedLock lock(userLock); - - // - // If this routing key is not relevant to object updates, exit. - // - if ((routingKey.compare(0, 1, "#") != 0) && - (routingKey.compare(0, 9, "console.#") != 0) && - (routingKey.compare(0, 12, "console.obj.") != 0)) - return; - - // - // Mark local objects for full-update. - // - clientWasAdded = true; - - // - // If the routing key is relevant for local objects only, don't involve - // any of the remote agents. - // - if (routingKey.compare(0, 39, "console.obj.*.*.org.apache.qpid.broker.") == 0) - return; - - std::list<std::string> rkeys; - - for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); - aIter != remoteAgents.end(); - aIter++) { - rkeys.push_back(aIter->second->routingKey); - } - - while (rkeys.size()) { - char localBuffer[16]; - Buffer outBuffer(localBuffer, 16); - uint32_t outLen; - - encodeHeader(outBuffer, 'x'); - outLen = outBuffer.getPosition(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); - QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front()); - rkeys.pop_front(); - } -} - -void ManagementAgent::clusterUpdate() { - // Called on all cluster memebers when a new member joins a cluster. - // Set clientWasAdded so that on the next periodicProcessing we will do - // a full update on all cluster members. - sys::Mutex::ScopedLock l(userLock); - moveNewObjectsLH(); // keep lists consistent with updater/updatee. - moveDeletedObjectsLH(); - clientWasAdded = true; - debugSnapshot("Cluster member joined"); -} - -void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet ('A'); - buf.putOctet ('M'); - buf.putOctet ('2'); - buf.putOctet (opcode); - buf.putLong (seq); -} - -bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - uint8_t h1 = buf.getOctet(); - uint8_t h2 = buf.getOctet(); - uint8_t h3 = buf.getOctet(); - - *opcode = buf.getOctet(); - *seq = buf.getLong(); - - return h1 == 'A' && h2 == 'M' && h3 == '2'; -} - -// NOTE WELL: assumes userLock is held by caller (LH) -// NOTE EVEN WELLER: drops this lock when delivering the message!!! -void ManagementAgent::sendBufferLH(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey) -{ - if (suppressed) { - QPID_LOG(debug, "Suppressing management message to " << routingKey); - return; - } - if (exchange.get() == 0) return; - - intrusive_ptr<Message> msg(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody())); - - content.castBody<AMQContentBody>()->decode(buf, length); - - method.setEof(false); - header.setBof(false); - header.setEof(false); - content.setBof(false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(length); - - DeliveryProperties* dp = - msg->getFrames().getHeaders()->get<DeliveryProperties>(true); - dp->setRoutingKey(routingKey); - - msg->getFrames().append(content); - msg->setIsManagementMessage(true); - - { - sys::Mutex::ScopedUnlock u(userLock); - - DeliverableMessage deliverable (msg); - try { - exchange->route(deliverable, routingKey, 0); - } catch(exception&) {} - } - buf.reset(); -} - - -void ManagementAgent::sendBufferLH(Buffer& buf, - uint32_t length, - const string& exchange, - const string& routingKey) -{ - qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); - if (ex.get() != 0) - sendBufferLH(buf, length, ex, routingKey); -} - - -// NOTE WELL: assumes userLock is held by caller (LH) -// NOTE EVEN WELLER: drops this lock when delivering the message!!! -void ManagementAgent::sendBufferLH(const string& data, - const string& cid, - const Variant::Map& headers, - const string& content_type, - qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey, - uint64_t ttl_msec) -{ - Variant::Map::const_iterator i; - - if (suppressed) { - QPID_LOG(debug, "Suppressing management message to " << routingKey); - return; - } - if (exchange.get() == 0) return; - - intrusive_ptr<Message> msg(new Message()); - AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0))); - AMQFrame header((AMQHeaderBody())); - AMQFrame content((AMQContentBody(data))); - - method.setEof(false); - header.setBof(false); - header.setEof(false); - content.setBof(false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); - props->setContentLength(data.length()); - if (!cid.empty()) { - props->setCorrelationId(cid); - } - props->setContentType(content_type); - props->setAppId("qmf2"); - - for (i = headers.begin(); i != headers.end(); ++i) { - msg->getOrInsertHeaders().setString(i->first, i->second.asString()); - } - - DeliveryProperties* dp = - msg->getFrames().getHeaders()->get<DeliveryProperties>(true); - dp->setRoutingKey(routingKey); - if (ttl_msec) { - dp->setTtl(ttl_msec); - msg->setTimestamp(broker->getExpiryPolicy()); - } - msg->getFrames().append(content); - msg->setIsManagementMessage(true); - - { - sys::Mutex::ScopedUnlock u(userLock); - - DeliverableMessage deliverable (msg); - try { - exchange->route(deliverable, routingKey, 0); - } catch(exception&) {} - } -} - - -void ManagementAgent::sendBufferLH(const string& data, - const string& cid, - const Variant::Map& headers, - const string& content_type, - const string& exchange, - const string& routingKey, - uint64_t ttl_msec) -{ - qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); - if (ex.get() != 0) - sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec); -} - - -/** Objects that have been added since the last periodic poll are temporarily - * saved in the newManagementObjects list. This allows objects to be - * added without needing to block on the userLock (addLock is used instead). - * These new objects need to be integrated into the object database - * (managementObjects) *before* they can be properly managed. This routine - * performs the integration. - * - * Note well: objects on the newManagementObjects list may have been - * marked as "deleted", and, possibly re-added. This would result in - * duplicate object ids. To avoid clashes, don't put deleted objects - * into the active object database. - */ -void ManagementAgent::moveNewObjectsLH() -{ - sys::Mutex::ScopedLock lock (addLock); - while (!newManagementObjects.empty()) { - ManagementObject *object = newManagementObjects.back(); - newManagementObjects.pop_back(); - - if (object->isDeleted()) { - DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support)); - pendingDeletedObjs[dptr->getKey()].push_back(dptr); - delete object; - } else { // add to active object list, check for duplicates. - ObjectId oid = object->getObjectId(); - ManagementObjectMap::iterator destIter = managementObjects.find(oid); - if (destIter != managementObjects.end()) { - // duplicate found. It is OK if the old object has been marked - // deleted... - ManagementObject *oldObj = destIter->second; - if (oldObj->isDeleted()) { - DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); - pendingDeletedObjs[dptr->getKey()].push_back(dptr); - delete oldObj; - } else { - // Duplicate non-deleted objects? This is a user error - oids must be unique. - // for now, leak the old object (safer than deleting - may still be referenced) - // and complain loudly... - QPID_LOG(error, "Detected two management objects with the same identifier: " << oid); - } - } - managementObjects[oid] = object; - } - } -} - -void ManagementAgent::periodicProcessing (void) -{ -#define BUFSIZE 65536 -#define HEADROOM 4096 - debugSnapshot("Management agent periodic processing"); - sys::Mutex::ScopedLock lock (userLock); - uint32_t contentSize; - string routingKey; - string sBuf; - - uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); - - moveNewObjectsLH(); - - // - // Clear the been-here flag on all objects in the map. - // - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - object->setFlags(0); - if (clientWasAdded) { - object->setForcePublish(true); - } - } - - clientWasAdded = false; - - // first send the pending deletes before sending updates. This prevents a - // "false delete" scenario: if an object was deleted then re-added during - // the last poll cycle, it will have a delete entry and an active entry. - // if we sent the active update first, _then_ the delete update, clients - // would incorrectly think the object was deleted. See QPID-2997 - // - bool objectsDeleted = moveDeletedObjectsLH(); - if (!pendingDeletedObjs.empty()) { - // use a temporary copy of the pending deletes so dropping the lock when - // the buffer is sent is safe. - PendingDeletedObjsMap tmp(pendingDeletedObjs); - pendingDeletedObjs.clear(); - - for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { - std::string packageName; - std::string className; - msgBuffer.reset(); - uint32_t v1Objs = 0; - uint32_t v2Objs = 0; - Variant::List list_; - - size_t pos = mIter->first.find(":"); - packageName = mIter->first.substr(0, pos); - className = mIter->first.substr(pos+1); - - for (DeletedObjectList::iterator lIter = mIter->second.begin(); - lIter != mIter->second.end(); lIter++) { - msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space. - std::string oid = (*lIter)->objectId; - if (!(*lIter)->encodedV1Config.empty()) { - encodeHeader(msgBuffer, 'c'); - msgBuffer.putRawData((*lIter)->encodedV1Config); - QPID_LOG(trace, "Deleting V1 properties " << oid - << " len=" << (*lIter)->encodedV1Config.size()); - v1Objs++; - } - if (!(*lIter)->encodedV1Inst.empty()) { - encodeHeader(msgBuffer, 'i'); - msgBuffer.putRawData((*lIter)->encodedV1Inst); - QPID_LOG(trace, "Deleting V1 statistics " << oid - << " len=" << (*lIter)->encodedV1Inst.size()); - v1Objs++; - } - if (v1Objs >= maxReplyObjs) { - v1Objs = 0; - contentSize = msgBuffer.getSize(); - stringstream key; - key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" - << key.str() << " len=" << contentSize); - } - - if (!(*lIter)->encodedV2.empty()) { - QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); - list_.push_back((*lIter)->encodedV2); - if (++v2Objs >= maxReplyObjs) { - v2Objs = 0; - - string content; - ListCodec::encode(list_, content); - list_.clear(); - if (content.length()) { - stringstream key; - Variant::Map headers; - key << "agent.ind.data." << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - key << "." << instanceNameKey; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); - } - } - } - } // end current list - - // send any remaining objects... - - if (v1Objs) { - contentSize = BUFSIZE - msgBuffer.available(); - stringstream key; - key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); - } - - if (!list_.empty()) { - string content; - ListCodec::encode(list_, content); - list_.clear(); - if (content.length()) { - stringstream key; - Variant::Map headers; - key << "agent.ind.data." << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - key << "." << instanceNameKey; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); - } - } - } // end map - } - - // - // Process the entire object map. Remember: we drop the userLock each time we call - // sendBuffer(). This allows the managementObjects map to be altered during the - // sendBuffer() call, so always restart the search after a sendBuffer() call - // - while (1) { - msgBuffer.reset(); - Variant::List list_; - uint32_t pcount; - uint32_t scount; - uint32_t v1Objs, v2Objs; - ManagementObjectMap::iterator baseIter; - std::string packageName; - std::string className; - - for (baseIter = managementObjects.begin(); - baseIter != managementObjects.end(); - baseIter++) { - ManagementObject* baseObject = baseIter->second; - // - // Skip until we find a base object requiring processing... - // - if (baseObject->getFlags() == 0) { - packageName = baseObject->getPackageName(); - className = baseObject->getClassName(); - break; - } - } - - if (baseIter == managementObjects.end()) - break; // done - all objects processed - - pcount = scount = 0; - v1Objs = 0; - v2Objs = 0; - list_.clear(); - msgBuffer.reset(); - - for (ManagementObjectMap::iterator iter = baseIter; - iter != managementObjects.end(); - iter++) { - msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space - ManagementObject* baseObject = baseIter->second; - ManagementObject* object = iter->second; - bool send_stats, send_props; - if (baseObject->isSameClass(*object) && object->getFlags() == 0) { - object->setFlags(1); - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - // skip any objects marked deleted since our first pass. Deal with them - // on the next periodic cycle... - if (object->isDeleted()) { - continue; - } - - send_props = (object->getConfigChanged() || object->getForcePublish()); - send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish())); - - if (send_props && qmf1Support) { - size_t pos = msgBuffer.getPosition(); - encodeHeader(msgBuffer, 'c'); - sBuf.clear(); - object->writeProperties(sBuf); - msgBuffer.putRawData(sBuf); - QPID_LOG(trace, "Changed V1 properties " - << object->getObjectId().getV2Key() - << " len=" << msgBuffer.getPosition()-pos); - ++v1Objs; - } - - if (send_stats && qmf1Support) { - size_t pos = msgBuffer.getPosition(); - encodeHeader(msgBuffer, 'i'); - sBuf.clear(); - object->writeStatistics(sBuf); - msgBuffer.putRawData(sBuf); - QPID_LOG(trace, "Changed V1 statistics " - << object->getObjectId().getV2Key() - << " len=" << msgBuffer.getPosition()-pos); - ++v1Objs; - } - - if ((send_stats || send_props) && qmf2Support) { - Variant::Map map_; - Variant::Map values; - Variant::Map oid; - - object->getObjectId().mapEncode(oid); - map_["_object_id"] = oid; - map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), - object->getClassName(), - "_data", - object->getMd5Sum()); - object->writeTimestamps(map_); - object->mapEncodeValues(values, send_props, send_stats); - map_["_values"] = values; - list_.push_back(map_); - v2Objs++; - QPID_LOG(trace, "Changed V2" - << (send_stats? " statistics":"") - << (send_props? " properties":"") - << " map=" << map_); - } - - if (send_props) pcount++; - if (send_stats) scount++; - - object->setForcePublish(false); - - if ((qmf1Support && (v1Objs >= maxReplyObjs)) || - (qmf2Support && (v2Objs >= maxReplyObjs))) - break; // have enough objects, send an indication... - } - } - - if (pcount || scount) { - if (qmf1Support) { - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { - stringstream key; - key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() - << " props=" << pcount - << " stats=" << scount - << " len=" << contentSize); - } - } - - if (qmf2Support) { - string content; - ListCodec::encode(list_, content); - if (content.length()) { - stringstream key; - Variant::Map headers; - key << "agent.ind.data." << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - key << "." << instanceNameKey; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() - << " props=" << pcount - << " stats=" << scount - << " len=" << content.length()); - } - } - } - } // end processing updates for all objects - - if (objectsDeleted) deleteOrphanedAgentsLH(); - - // heartbeat generation - - if (qmf1Support) { -#define BUFSIZE 65536 - uint32_t contentSize; - char msgChars[BUFSIZE]; - Buffer msgBuffer(msgChars, BUFSIZE); - encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now()))); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - routingKey = "console.heartbeat.1.0"; - sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); - QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); - } - - if (qmf2Support) { - std::stringstream addr_key; - - addr_key << "agent.ind.heartbeat." << vendorNameKey << "." << productNameKey; - if (!instanceNameKey.empty()) - addr_key << "." << instanceNameKey; - - Variant::Map map; - Variant::Map headers; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_agent_heartbeat_indication"; - headers["qmf.agent"] = name_address; - - map["_values"] = attrMap; - map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); - map["_values"].asMap()["_heartbeat_interval"] = interval; - map["_values"].asMap()["_epoch"] = bootSequence; - - string content; - MapCodec::encode(map, content); - - // Set TTL (in msecs) on outgoing heartbeat indications based on the interval - // time to prevent stale heartbeats from getting to the consoles. - sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); - - QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address); - } -} - -void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) -{ - ManagementObjectMap::iterator iter = managementObjects.find(oid); - if (iter == managementObjects.end()) - return; - ManagementObject* object = iter->second; - if (!object->isDeleted()) - return; - - // since sendBufferLH drops the userLock, don't call it until we - // are done manipulating the object. -#define DNOW_BUFSIZE 2048 - char msgChars[DNOW_BUFSIZE]; - Buffer msgBuffer(msgChars, DNOW_BUFSIZE); - Variant::List list_; - stringstream v1key, v2key; - - if (qmf1Support) { - string sBuf; - - v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); - encodeHeader(msgBuffer, 'c'); - object->writeProperties(sBuf); - msgBuffer.putRawData(sBuf); - } - - if (qmf2Support) { - Variant::Map map_; - Variant::Map values; - - map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), - object->getClassName(), - "_data", - object->getMd5Sum()); - object->writeTimestamps(map_); - object->mapEncodeValues(values, true, false); - map_["_values"] = values; - list_.push_back(map_); - v2key << "agent.ind.data." << keyifyNameStr(object->getPackageName()) - << "." << keyifyNameStr(object->getClassName()) - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - v2key << "." << instanceNameKey; - } - - object = 0; - managementObjects.erase(oid); - - // object deleted, ok to drop lock now. - - if (qmf1Support) { - uint32_t contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); - QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); - } - - if (qmf2Support) { - Variant::Map headers; - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - string content; - ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); - QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str()); - } -} - -void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t sequence, - uint32_t code, const string& text) -{ - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader (outBuffer, 'z', sequence); - outBuffer.putLong (code); - outBuffer.putShortString (text); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << - replyToKey << " seq=" << sequence); -} - -void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid, - const string& text, uint32_t code, bool viaLocal) -{ - static const string addr_exchange("qmf.default.direct"); - - Variant::Map map; - Variant::Map headers; - Variant::Map values; - string content; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_exception"; - headers["qmf.agent"] = viaLocal ? "broker" : name_address; - - values["error_code"] = code; - values["error_text"] = text; - map["_values"] = values; - - MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); - - QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); -} - -bool ManagementAgent::dispatchCommand (Deliverable& deliverable, - const string& routingKey, - const FieldTable* /*args*/, - const bool topic, - int qmfVersion) -{ - sys::Mutex::ScopedLock lock (userLock); - Message& msg = ((DeliverableMessage&) deliverable).getMessage (); - - if (topic && qmfVersion == 1) { - - // qmf1 is bound only to the topic management exchange. - // Parse the routing key. This management broker should act as though it - // is bound to the exchange to match the following keys: - // - // agent.1.0.# - // broker - // schema.# - - if (routingKey == "broker") { - dispatchAgentCommandLH(msg); - return false; - } - - if (routingKey.length() > 6) { - - if (routingKey.compare(0, 9, "agent.1.0") == 0) { - dispatchAgentCommandLH(msg); - return false; - } - - if (routingKey.compare(0, 8, "agent.1.") == 0) { - return authorizeAgentMessageLH(msg); - } - - if (routingKey.compare(0, 7, "schema.") == 0) { - dispatchAgentCommandLH(msg); - return true; - } - } - } - - if (qmfVersion == 2) { - - if (topic) { - // Intercept messages bound to: - // "console.ind.locate.# - process these messages, and also allow them to be forwarded. - if (routingKey == "console.request.agent_locate") { - dispatchAgentCommandLH(msg); - return true; - } - - } else { // direct exchange - - // Intercept messages bound to: - // "broker" - generic alias for the local broker - // "<name_address>" - the broker agent's proper name - // and do not forward them futher - if (routingKey == "broker" || routingKey == name_address) { - dispatchAgentCommandLH(msg, routingKey == "broker"); - return false; - } - } - } - - return true; -} - -void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) -{ - moveNewObjectsLH(); - - string methodName; - string packageName; - string className; - uint8_t hash[16]; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - AclModule* acl = broker->getAcl(); - string inArgs; - - string sBuf; - inBuffer.getRawData(sBuf, 16); - ObjectId objId; - objId.decode(sBuf); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); - inBuffer.getRawData(inArgs, inBuffer.available()); - - QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" << - methodName << " replyTo=" << replyToKey); - - encodeHeader(outBuffer, 'm', sequence); - - if (disallowAllV1Methods) { - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2"); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); - return; - } - - DisallowedMethods::const_iterator i = disallowed.find(make_pair(className, methodName)); - if (i != disallowed.end()) { - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString(i->second); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); - return; - } - - string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); - if (acl != 0) { - map<acl::Property, string> params; - params[acl::PROP_SCHEMAPACKAGE] = packageName; - params[acl::PROP_SCHEMACLASS] = className; - - if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); - return; - } - } - - ManagementObjectMap::iterator iter = numericFind(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { - outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); - } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { - outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); - outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); - } - else - try { - outBuffer.record(); - sys::Mutex::ScopedUnlock u(userLock); - string outBuf; - iter->second->doMethod(methodName, inArgs, outBuf, userId); - outBuffer.putRawData(outBuf); - } catch(exception& e) { - outBuffer.restore(); - outBuffer.putLong(Manageable::STATUS_EXCEPTION); - outBuffer.putMediumString(e.what()); - } - } - - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); -} - - -void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk, - const string& cid, const ConnectionToken* connToken, bool viaLocal) -{ - moveNewObjectsLH(); - - string methodName; - Variant::Map inMap; - MapCodec::decode(body, inMap); - Variant::Map::const_iterator oid, mid; - string content; - string error; - uint32_t errorCode(0); - - Variant::Map outMap; - Variant::Map headers; - - headers["method"] = "response"; - headers["qmf.opcode"] = "_method_response"; - headers["qmf.agent"] = viaLocal ? "broker" : name_address; - - if ((oid = inMap.find("_object_id")) == inMap.end() || - (mid = inMap.find("_method_name")) == inMap.end()) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), - Manageable::STATUS_PARAMETER_INVALID, viaLocal); - return; - } - - ObjectId objId; - Variant::Map inArgs; - Variant::Map callMap; - - try { - // coversions will throw if input is invalid. - objId = ObjectId(oid->second.asMap()); - methodName = mid->second.getString(); - - mid = inMap.find("_arguments"); - if (mid != inMap.end()) { - inArgs = (mid->second).asMap(); - } - } catch(exception& e) { - sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); - return; - } - - ManagementObjectMap::iterator iter = managementObjects.find(objId); - - if (iter == managementObjects.end() || iter->second->isDeleted()) { - stringstream estr; - estr << "No object found with ID=" << objId; - sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal); - return; - } - - // validate - AclModule* acl = broker->getAcl(); - DisallowedMethods::const_iterator i; - - i = disallowed.find(make_pair(iter->second->getClassName(), methodName)); - if (i != disallowed.end()) { - sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); - return; - } - - string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); - if (acl != 0) { - map<acl::Property, string> params; - params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); - params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); - - if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), - Manageable::STATUS_FORBIDDEN, viaLocal); - return; - } - } - - // invoke the method - - QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() - << ":" << iter->second->getClassName() << " method=" << - methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs); - - try { - sys::Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inArgs, callMap, userId); - errorCode = callMap["_status_code"].asUint32(); - if (errorCode == 0) { - outMap["_arguments"] = Variant::Map(); - for (Variant::Map::const_iterator iter = callMap.begin(); - iter != callMap.end(); iter++) - if (iter->first != "_status_code" && iter->first != "_status_text") - outMap["_arguments"].asMap()[iter->first] = iter->second; - } else - error = callMap["_status_text"].asString(); - } catch(exception& e) { - sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); - return; - } - - if (errorCode != 0) { - sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal); - return; - } - - MapCodec::encode(outMap, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); - QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap); -} - - -void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, uint32_t sequence) -{ - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey); - - encodeHeader (outBuffer, 'b', sequence); - uuid.encode (outBuffer); - - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); -} - -void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence) -{ - QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey); - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - for (PackageMap::iterator pIter = packages.begin (); - pIter != packages.end (); - pIter++) - { - encodeHeader (outBuffer, 'p', sequence); - encodePackageIndication (outBuffer, pIter); - } - - outLen = MA_BUFFER_SIZE - outBuffer.available (); - if (outLen) { - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); - } - - sendCommandCompleteLH(replyToKey, sequence); -} - -void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence) -{ - string packageName; - - inBuffer.getShortString(packageName); - - QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); - - findOrAddPackageLH(packageName); -} - -void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) -{ - string packageName; - - inBuffer.getShortString(packageName); - - QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); - - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) - { - typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; - std::list<_ckeyType> classes; - ClassMap &cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); - cIter != cMap.end(); - cIter++) { - if (cIter->second.hasSchema()) { - classes.push_back(make_pair(cIter->first, cIter->second.kind)); - } - } - - while (classes.size()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'q', sequence); - encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); - - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << - "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); - classes.pop_front(); - } - - } - sendCommandCompleteLH(replyToKey, sequence); -} - -void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t) -{ - string packageName; - SchemaClassKey key; - - uint8_t kind = inBuffer.getOctet(); - inBuffer.getShortString(packageName); - inBuffer.getShortString(key.name); - inBuffer.getBin128(key.hash); - - QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << - "), replyTo=" << replyToKey); - - PackageMap::iterator pIter = findOrAddPackageLH(packageName); - ClassMap::iterator cIter = pIter->second.find(key); - if (cIter == pIter->second.end() || !cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - uint32_t sequence = nextRequestSequence++; - - // Schema Request - encodeHeader (outBuffer, 'S', sequence); - outBuffer.putShortString(packageName); - key.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << - "), to=" << replyToKey << " seq=" << sequence); - - if (cIter != pIter->second.end()) - pIter->second.erase(key); - - pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence))); - } -} - -void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) -{ - // If the management package is attached locally (embedded in the broker or - // linked in via plug-in), call the schema handler directly. If the package - // is from a remote management agent, send the stored schema information. - - if (writeSchemaCall != 0) { - string schema; - writeSchemaCall(schema); - buf.putRawData(schema); - } else - buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); -} - -void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) -{ - string packageName; - SchemaClassKey key; - - inBuffer.getShortString (packageName); - key.decode(inBuffer); - - QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << - "), replyTo=" << rte << "/" << rtk << " seq=" << sequence); - - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) { - ClassMap& cMap = pIter->second; - ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - SchemaClass& classInfo = cIter->second; - - if (classInfo.hasSchema()) { - encodeHeader(outBuffer, 's', sequence); - classInfo.appendSchema(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, rte, rtk); - QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence); - } - else - sendCommandCompleteLH(rtk, sequence, 1, "Schema not available"); - } - else - sendCommandCompleteLH(rtk, sequence, 1, "Class key not found"); - } - else - sendCommandCompleteLH(rtk, sequence, 1, "Package not found"); -} - -void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) -{ - string packageName; - SchemaClassKey key; - - inBuffer.record(); - inBuffer.getOctet(); - inBuffer.getShortString(packageName); - key.decode(inBuffer); - inBuffer.restore(); - - QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); - - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) { - ClassMap& cMap = pIter->second; - ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) { - size_t length = validateSchema(inBuffer, cIter->second.kind); - if (length == 0) { - QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name); - cMap.erase(key); - } else { - cIter->second.data.resize(length); - inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length); - - // Publish a class-indication message - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'q'); - encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); - QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << - " to=schema.class"); - } - } - } -} - -bool ManagementAgent::bankInUse (uint32_t bank) -{ - for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); - aIter != remoteAgents.end(); - aIter++) - if (aIter->second->agentBank == bank) - return true; - return false; -} - -uint32_t ManagementAgent::allocateNewBank () -{ - while (bankInUse (nextRemoteBank)) - nextRemoteBank++; - - uint32_t allocated = nextRemoteBank++; - writeData (); - return allocated; -} - -uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank) -{ - if (requestedBank == 0 || bankInUse (requestedBank)) - return allocateNewBank (); - return requestedBank; -} - -void ManagementAgent::deleteOrphanedAgentsLH() -{ - list<ObjectId> deleteList; - - for (RemoteAgentMap::const_iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { - bool found = false; - - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - if (iter->first == aIter->first && !iter->second->isDeleted()) { - found = true; - break; - } - } - - if (!found) - deleteList.push_back(aIter->first); - } - - for (list<ObjectId>::const_iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) - remoteAgents.erase(*dIter); -} - -void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) -{ - string label; - uint32_t requestedBrokerBank, requestedAgentBank; - uint32_t assignedBank; - ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); - Uuid systemId; - - moveNewObjectsLH(); - deleteOrphanedAgentsLH(); - RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); - if (aIter != remoteAgents.end()) { - // There already exists an agent on this session. Reject the request. - sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent"); - return; - } - - inBuffer.getShortString(label); - systemId.decode(inBuffer); - requestedBrokerBank = inBuffer.getLong(); - requestedAgentBank = inBuffer.getLong(); - - QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank << - " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence); - - assignedBank = assignBankLH(requestedAgentBank); - - boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this)); - agent->brokerBank = brokerBank; - agent->agentBank = assignedBank; - agent->routingKey = replyToKey; - agent->connectionRef = connectionRef; - agent->mgmtObject = new _qmf::Agent (this, agent.get()); - agent->mgmtObject->set_connectionRef(agent->connectionRef); - agent->mgmtObject->set_label (label); - agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); - agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data()); - agent->mgmtObject->set_brokerBank (brokerBank); - agent->mgmtObject->set_agentBank (assignedBank); - addObject (agent->mgmtObject, 0); - remoteAgents[connectionRef] = agent; - - QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); - - // Send an Attach Response - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader (outBuffer, 'a', sequence); - outBuffer.putLong (brokerBank); - outBuffer.putLong (assignedBank); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << - " to=" << replyToKey << " seq=" << sequence); -} - -void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) -{ - FieldTable ft; - FieldTable::ValuePtr value; - - moveNewObjectsLH(); - - ft.decode(inBuffer); - - QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence); - - value = ft.get("_class"); - if (value.get() == 0 || !value->convertsTo<string>()) { - value = ft.get("_objectid"); - if (value.get() == 0 || !value->convertsTo<string>()) - return; - - ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = numericFind(selector); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - if (!object->isDeleted()) { - string sBuf; - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(sBuf); - outBuffer.putRawData(sBuf); - sBuf.clear(); - object->writeStatistics(sBuf, true); - outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); - } - } - sendCommandCompleteLH(replyToKey, sequence); - return; - } - - string className (value->get<string>()); - std::list<ObjectId>matches; - - // build up a set of all objects to be dumped - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName () == className) { - matches.push_back(object->getObjectId()); - } - } - - // send them (as sendBufferLH drops the userLock) - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - while (matches.size()) { - ObjectId objId = matches.front(); - ManagementObjectMap::iterator oIter = managementObjects.find( objId ); - if (oIter != managementObjects.end()) { - ManagementObject* object = oIter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - if (!object->isDeleted()) { - string sProps, sStats; - object->writeProperties(sProps); - object->writeStatistics(sStats, true); - - size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. - if (len > MA_BUFFER_SIZE) { - QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!"); - } else { - if (outBuffer.available() < len) { // not enough room in current buffer, send it. - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock - QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); - continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. - } - encodeHeader(outBuffer, 'g', sequence); - outBuffer.putRawData(sProps); - outBuffer.putRawData(sStats); - } - } - } - matches.pop_front(); - } - - outLen = MA_BUFFER_SIZE - outBuffer.available (); - if (outLen) { - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); - } - - sendCommandCompleteLH(replyToKey, sequence); -} - - -void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) -{ - moveNewObjectsLH(); - - Variant::Map inMap; - Variant::Map::const_iterator i; - Variant::Map headers; - - MapCodec::decode(body, inMap); - QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid); - - headers["method"] = "response"; - headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = viaLocal ? "broker" : name_address; - - /* - * Unpack the _what element of the query. Currently we only support OBJECT queries. - */ - i = inMap.find("_what"); - if (i == inMap.end()) { - sendExceptionLH(rte, rtk, cid, "_what element missing in Query"); - return; - } - - if (i->second.getType() != qpid::types::VAR_STRING) { - sendExceptionLH(rte, rtk, cid, "_what element is not a string"); - return; - } - - if (i->second.asString() != "OBJECT") { - sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); - return; - } - - string className; - string packageName; - - /* - * Handle the _schema_id element, if supplied. - */ - i = inMap.find("_schema_id"); - if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { - const Variant::Map& schemaIdMap(i->second.asMap()); - - Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); - if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) - className = s_iter->second.asString(); - - s_iter = schemaIdMap.find("_package_name"); - if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) - packageName = s_iter->second.asString(); - } - - - /* - * Unpack the _object_id element of the query if it is present. If it is present, find that one - * object and return it. If it is not present, send a class-based result. - */ - i = inMap.find("_object_id"); - if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { - Variant::List list_; - ObjectId objId(i->second.asMap()); - - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - if (!object->isDeleted()) { - Variant::Map map_; - Variant::Map values; - Variant::Map oidMap; - - object->mapEncodeValues(values, true, true); // write both stats and properties - objId.mapEncode(oidMap); - map_["_values"] = values; - map_["_object_id"] = oidMap; - map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), - object->getClassName(), - "_data", - object->getMd5Sum()); - list_.push_back(map_); - } - - string content; - - ListCodec::encode(list_, content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); - QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); - return; - } - } else { - // send class-based result. - Variant::List _list; - Variant::List _subList; - unsigned int objCount = 0; - - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName() == className && - (packageName.empty() || object->getPackageName() == packageName)) { - - - if (!object->isDeleted()) { - Variant::Map map_; - Variant::Map values; - Variant::Map oidMap; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - object->writeTimestamps(map_); - object->mapEncodeValues(values, true, true); // write both stats and properties - iter->first.mapEncode(oidMap); - - map_["_values"] = values; - map_["_object_id"] = oidMap; - map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), - object->getClassName(), - "_data", - object->getMd5Sum()); - _subList.push_back(map_); - if (++objCount >= maxReplyObjs) { - objCount = 0; - _list.push_back(_subList); - _subList.clear(); - } - } - } - } - - if (_subList.size()) - _list.push_back(_subList); - - headers["partial"] = Variant(); - string content; - while (_list.size() > 1) { - ListCodec::encode(_list.front().asList(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); - _list.pop_front(); - QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); - } - headers.erase("partial"); - ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); - QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); - return; - } - - // Unrecognized query - Send empty message to indicate CommandComplete - string content; - ListCodec::encode(Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); - QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk); -} - - -void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid) -{ - QPID_LOG(debug, "RCVD AgentLocateRequest"); - - Variant::Map map; - Variant::Map headers; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_agent_locate_response"; - headers["qmf.agent"] = name_address; - - map["_values"] = attrMap; - map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); - map["_values"].asMap()["_heartbeat_interval"] = interval; - map["_values"].asMap()["_epoch"] = bootSequence; - - string content; - MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); - clientWasAdded = true; - - QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); -} - - -bool ManagementAgent::authorizeAgentMessageLH(Message& msg) -{ - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); - uint32_t sequence = 0; - bool methodReq = false; - bool mapMsg = false; - string packageName; - string className; - string methodName; - string cid; - - // - // If the message is larger than our working buffer size, we can't determine if it's - // authorized or not. In this case, return true (authorized) if there is no ACL in place, - // otherwise return false; - // - if (msg.encodedSize() > MA_BUFFER_SIZE) - return broker->getAcl() == 0; - - msg.encodeContent(inBuffer); - uint32_t bufferLen = inBuffer.getPosition(); - inBuffer.reset(); - - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - - const framing::FieldTable *headers = msg.getApplicationHeaders(); - - if (headers && msg.getAppId() == "qmf2") - { - mapMsg = true; - - if (p && p->hasCorrelationId()) { - cid = p->getCorrelationId(); - } - - if (headers->getAsString("qmf.opcode") == "_method_request") - { - methodReq = true; - - // extract object id and method name - - string body; - inBuffer.getRawData(body, bufferLen); - Variant::Map inMap; - MapCodec::decode(body, inMap); - Variant::Map::const_iterator oid, mid; - - ObjectId objId; - - if ((oid = inMap.find("_object_id")) == inMap.end() || - (mid = inMap.find("_method_name")) == inMap.end()) { - QPID_LOG(warning, - "Missing fields in QMF authorize req received."); - return false; - } - - try { - // coversions will throw if input is invalid. - objId = ObjectId(oid->second.asMap()); - methodName = mid->second.getString(); - } catch(exception& /*e*/) { - QPID_LOG(warning, - "Badly formatted QMF authorize req received."); - return false; - } - - // look up schema for object to get package and class name - - ManagementObjectMap::iterator iter = managementObjects.find(objId); - - if (iter == managementObjects.end() || iter->second->isDeleted()) { - QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " << - objId); - return false; - } - - packageName = iter->second->getPackageName(); - className = iter->second->getClassName(); - } - } else { // old style binary message format - - uint8_t opcode; - - if (!checkHeader(inBuffer, &opcode, &sequence)) - return false; - - if (opcode == 'M') { - methodReq = true; - - // extract method name & schema package and class name - - uint8_t hash[16]; - inBuffer.getLongLong(); // skip over object id - inBuffer.getLongLong(); - inBuffer.getShortString(packageName); - inBuffer.getShortString(className); - inBuffer.getBin128(hash); - inBuffer.getShortString(methodName); - - } - } - - if (methodReq) { - // TODO: check method call against ACL list. - map<acl::Property, string> params; - AclModule* acl = broker->getAcl(); - if (acl == 0) - return true; - - string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId(); - params[acl::PROP_SCHEMAPACKAGE] = packageName; - params[acl::PROP_SCHEMACLASS] = className; - - if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) - return true; - - // authorization failed, send reply if replyTo present - - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) { - const framing::ReplyTo& rt = p->getReplyTo(); - string rte = rt.getExchange(); - string rtk = rt.getRoutingKey(); - string cid; - if (p && p->hasCorrelationId()) - cid = p->getCorrelationId(); - - if (mapMsg) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), - Manageable::STATUS_FORBIDDEN, false); - } else { - - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'm', sequence); - outBuffer.putLong(Manageable::STATUS_FORBIDDEN); - outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, rte, rtk); - } - - QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); - } - - return false; - } - - return true; -} - -void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) -{ - string rte; - string rtk; - const framing::MessageProperties* p = - msg.getFrames().getHeaders()->get<framing::MessageProperties>(); - if (p && p->hasReplyTo()) { - const framing::ReplyTo& rt = p->getReplyTo(); - rte = rt.getExchange(); - rtk = rt.getRoutingKey(); - } - else - return; - - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); - uint8_t opcode; - - if (msg.encodedSize() > MA_BUFFER_SIZE) { - QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << - msg.encodedSize()); - return; - } - - msg.encodeContent(inBuffer); - uint32_t bufferLen = inBuffer.getPosition(); - inBuffer.reset(); - - ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher()); - const framing::FieldTable *headers = msg.getApplicationHeaders(); - if (headers && msg.getAppId() == "qmf2") - { - string opcode = headers->getAsString("qmf.opcode"); - string contentType = headers->getAsString("qmf.content"); - string body; - string cid; - inBuffer.getRawData(body, bufferLen); - - if (p && p->hasCorrelationId()) { - cid = p->getCorrelationId(); - } - - if (opcode == "_method_request") - return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal); - else if (opcode == "_query_request") - return handleGetQueryLH(body, rte, rtk, cid, viaLocal); - else if (opcode == "_agent_locate_request") - return handleLocateRequestLH(body, rte, rtk, cid); - - QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); - return; - } - - // old preV2 binary messages - - while (inBuffer.getPosition() < bufferLen) { - uint32_t sequence; - if (!checkHeader(inBuffer, &opcode, &sequence)) - return; - - if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence); - else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence); - else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); - else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); - } -} - -ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name) -{ - PackageMap::iterator pIter = packages.find (name); - if (pIter != packages.end ()) - return pIter; - - // No such package found, create a new map entry. - pair<PackageMap::iterator, bool> result = - packages.insert(pair<string, ClassMap>(name, ClassMap())); - QPID_LOG (debug, "ManagementAgent added package " << name); - - // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader (outBuffer, 'p'); - encodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); - QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package"); - - return result.first; -} - -void ManagementAgent::addClassLH(uint8_t kind, - PackageMap::iterator pIter, - const string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall) -{ - SchemaClassKey key; - ClassMap& cMap = pIter->second; - - key.name = className; - memcpy(&key.hash, md5Sum, 16); - - ClassMap::iterator cIter = cMap.find(key); - if (cIter != cMap.end()) - return; - - // No such class found, create a new class with local information. - QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" << - key.name); - - cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall))); - cIter = cMap.find(key); -} - -void ManagementAgent::encodePackageIndication(Buffer& buf, - PackageMap::iterator pIter) -{ - buf.putShortString((*pIter).first); -} - -void ManagementAgent::encodeClassIndication(Buffer& buf, - const std::string packageName, - const SchemaClassKey key, - uint8_t kind) -{ - buf.putOctet(kind); - buf.putShortString(packageName); - key.encode(buf); -} - -size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind) -{ - if (kind == ManagementItem::CLASS_KIND_TABLE) - return validateTableSchema(inBuffer); - else if (kind == ManagementItem::CLASS_KIND_EVENT) - return validateEventSchema(inBuffer); - return 0; -} - -size_t ManagementAgent::validateTableSchema(Buffer& inBuffer) -{ - uint32_t start = inBuffer.getPosition(); - uint32_t end; - string text; - uint8_t hash[16]; - - try { - inBuffer.record(); - uint8_t kind = inBuffer.getOctet(); - if (kind != ManagementItem::CLASS_KIND_TABLE) - return 0; - - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); - - uint8_t superType = 0; //inBuffer.getOctet(); - - uint16_t propCount = inBuffer.getShort(); - uint16_t statCount = inBuffer.getShort(); - uint16_t methCount = inBuffer.getShort(); - - if (superType == 1) { - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); - } - - for (uint16_t idx = 0; idx < propCount + statCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - } - - for (uint16_t idx = 0; idx < methCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - if (!ft.isSet("argCount")) - return 0; - int argCount = ft.getAsInt("argCount"); - for (int mIdx = 0; mIdx < argCount; mIdx++) { - FieldTable aft; - aft.decode(inBuffer); - } - } - } catch (exception& /*e*/) { - return 0; - } - - end = inBuffer.getPosition(); - inBuffer.restore(); // restore original position - return end - start; -} - -size_t ManagementAgent::validateEventSchema(Buffer& inBuffer) -{ - uint32_t start = inBuffer.getPosition(); - uint32_t end; - string text; - uint8_t hash[16]; - - try { - inBuffer.record(); - uint8_t kind = inBuffer.getOctet(); - if (kind != ManagementItem::CLASS_KIND_EVENT) - return 0; - - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); - - uint8_t superType = 0; //inBuffer.getOctet(); - - uint16_t argCount = inBuffer.getShort(); - - if (superType == 1) { - inBuffer.getShortString(text); - inBuffer.getShortString(text); - inBuffer.getBin128(hash); - } - for (uint16_t idx = 0; idx < argCount; idx++) { - FieldTable ft; - ft.decode(inBuffer); - } - } catch (exception& /*e*/) { - return 0; - } - - end = inBuffer.getPosition(); - inBuffer.restore(); // restore original position - return end - start; -} - -ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) -{ - ManagementObjectMap::iterator iter = managementObjects.begin(); - for (; iter != managementObjects.end(); iter++) { - if (oid.equalV1(iter->first)) - break; - } - - return iter; -} - -void ManagementAgent::disallow(const string& className, const string& methodName, const string& message) { - disallowed[make_pair(className, methodName)] = message; -} - -void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const { - _map["_cname"] = name; - _map["_hash"] = qpid::types::Uuid(hash); -} - -void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) { - Variant::Map::const_iterator i; - - if ((i = _map.find("_cname")) != _map.end()) { - name = i->second.asString(); - } - - if ((i = _map.find("_hash")) != _map.end()) { - const qpid::types::Uuid& uuid = i->second.asUuid(); - memcpy(hash, uuid.data(), uuid.size()); - } -} - -void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const { - buffer.checkAvailable(encodedBufSize()); - buffer.putShortString(name); - buffer.putBin128(hash); -} - -void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) { - buffer.checkAvailable(encodedBufSize()); - buffer.getShortString(name); - buffer.getBin128(hash); -} - -uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const { - return 1 + name.size() + 16 /* bin128 */; -} - -void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const { - _map["_type"] = kind; - _map["_pending_sequence"] = pendingSequence; - _map["_data"] = data; -} - -void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) { - Variant::Map::const_iterator i; - - if ((i = _map.find("_type")) != _map.end()) { - kind = i->second; - } - if ((i = _map.find("_pending_sequence")) != _map.end()) { - pendingSequence = i->second; - } - if ((i = _map.find("_data")) != _map.end()) { - data = i->second.asString(); - } -} - -void ManagementAgent::exportSchemas(string& out) { - Variant::List list_; - Variant::Map map_, kmap, cmap; - - for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) { - string name = i->first; - const ClassMap& classes = i ->second; - for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) { - const SchemaClassKey& key = j->first; - const SchemaClass& klass = j->second; - if (klass.writeSchemaCall == 0) { // Ignore built-in schemas. - // Encode name, schema-key, schema-class - - map_.clear(); - kmap.clear(); - cmap.clear(); - - key.mapEncode(kmap); - klass.mapEncode(cmap); - - map_["_pname"] = name; - map_["_key"] = kmap; - map_["_class"] = cmap; - list_.push_back(map_); - } - } - } - - ListCodec::encode(list_, out); -} - -void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) { - - string buf(inBuf.getPointer(), inBuf.available()); - Variant::List content; - ListCodec::decode(buf, content); - Variant::List::const_iterator l; - - - for (l = content.begin(); l != content.end(); l++) { - string package; - SchemaClassKey key; - SchemaClass klass; - Variant::Map map_, kmap, cmap; - Variant::Map::const_iterator i; - - map_ = l->asMap(); - - if ((i = map_.find("_pname")) != map_.end()) { - package = i->second.asString(); - - if ((i = map_.find("_key")) != map_.end()) { - key.mapDecode(i->second.asMap()); - - if ((i = map_.find("_class")) != map_.end()) { - klass.mapDecode(i->second.asMap()); - - packages[package][key] = klass; - } - } - } - } -} - -void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const { - Variant::Map _objId, _values; - - map_["_brokerBank"] = brokerBank; - map_["_agentBank"] = agentBank; - map_["_routingKey"] = routingKey; - - connectionRef.mapEncode(_objId); - map_["_object_id"] = _objId; - - mgmtObject->mapEncodeValues(_values, true, false); - map_["_values"] = _values; -} - -void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { - Variant::Map::const_iterator i; - - if ((i = map_.find("_brokerBank")) != map_.end()) { - brokerBank = i->second; - } - - if ((i = map_.find("_agentBank")) != map_.end()) { - agentBank = i->second; - } - - if ((i = map_.find("_routingKey")) != map_.end()) { - routingKey = i->second.getString(); - } - - if ((i = map_.find("_object_id")) != map_.end()) { - connectionRef.mapDecode(i->second.asMap()); - } - - mgmtObject = new _qmf::Agent(&agent, this); - - if ((i = map_.find("_values")) != map_.end()) { - mgmtObject->mapDecodeValues(i->second.asMap()); - } - - // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key. - mgmtObject->set_connectionRef(connectionRef); -} - -void ManagementAgent::exportAgents(string& out) { - Variant::List list_; - Variant::Map map_, omap, amap; - - for (RemoteAgentMap::const_iterator i = remoteAgents.begin(); - i != remoteAgents.end(); - ++i) - { - // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode - boost::shared_ptr<RemoteAgent> agent(i->second); - - map_.clear(); - amap.clear(); - - agent->mapEncode(amap); - map_["_remote_agent"] = amap; - list_.push_back(map_); - } - - ListCodec::encode(list_, out); -} - -void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { - string buf(inBuf.getPointer(), inBuf.available()); - Variant::List content; - ListCodec::decode(buf, content); - Variant::List::const_iterator l; - sys::Mutex::ScopedLock lock(userLock); - - for (l = content.begin(); l != content.end(); l++) { - boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this)); - Variant::Map map_; - Variant::Map::const_iterator i; - - map_ = l->asMap(); - - if ((i = map_.find("_remote_agent")) != map_.end()) { - - agent->mapDecode(i->second.asMap()); - - addObject (agent->mgmtObject, 0, false); - remoteAgents[agent->connectionRef] = agent; - } - } -} - -namespace { -bool isDeletedMap(const ManagementObjectMap::value_type& value) { - return value.second->isDeleted(); -} - -bool isDeletedVector(const ManagementObjectVector::value_type& value) { - return value->isDeleted(); -} - -string summarizeMap(const char* name, const ManagementObjectMap& map) { - ostringstream o; - size_t deleted = std::count_if(map.begin(), map.end(), isDeletedMap); - o << map.size() << " " << name << " (" << deleted << " deleted), "; - return o.str(); -} - -string summarizeVector(const char* name, const ManagementObjectVector& map) { - ostringstream o; - size_t deleted = std::count_if(map.begin(), map.end(), isDeletedVector); - o << map.size() << " " << name << " (" << deleted << " deleted), "; - return o.str(); -} - -string dumpMap(const ManagementObjectMap& map) { - ostringstream o; - for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) { - o << endl << " " << i->second->getObjectId().getV2Key() - << (i->second->isDeleted() ? " (deleted)" : ""); - } - return o.str(); -} - -string dumpVector(const ManagementObjectVector& map) { - ostringstream o; - for (ManagementObjectVector::const_iterator i = map.begin(); i != map.end(); ++i) { - o << endl << " " << (*i)->getObjectId().getV2Key() - << ((*i)->isDeleted() ? " (deleted)" : ""); - } - return o.str(); -} - -} // namespace - -string ManagementAgent::summarizeAgents() { - ostringstream msg; - if (!remoteAgents.empty()) { - msg << remoteAgents.size() << " agents("; - for (RemoteAgentMap::const_iterator i=remoteAgents.begin(); - i != remoteAgents.end(); ++i) - msg << " " << i->second->routingKey; - msg << "), "; - } - return msg.str(); -} - - -void ManagementAgent::debugSnapshot(const char* title) { - QPID_LOG(debug, title << ": management snapshot: " - << packages.size() << " packages, " - << summarizeMap("objects", managementObjects) - << summarizeVector("new objects ", newManagementObjects) - << pendingDeletedObjs.size() << " pending deletes" - << summarizeAgents()); - - QPID_LOG_IF(trace, managementObjects.size(), - title << ": objects" << dumpMap(managementObjects)); - QPID_LOG_IF(trace, newManagementObjects.size(), - title << ": new objects" << dumpVector(newManagementObjects)); -} - -Variant::Map ManagementAgent::toMap(const FieldTable& from) -{ - Variant::Map map; - - for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) { - const string& key(iter->first); - const FieldTable::ValuePtr& val(iter->second); - - map[key] = toVariant(val); - } - - return map; -} - -Variant::List ManagementAgent::toList(const List& from) -{ - Variant::List _list; - - for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) { - const List::ValuePtr& val(*iter); - - _list.push_back(toVariant(val)); - } - - return _list; -} - -qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from) -{ - qpid::framing::FieldTable ft; - - for (Variant::Map::const_iterator iter = from.begin(); - iter != from.end(); - iter++) { - const string& key(iter->first); - const Variant& val(iter->second); - - ft.set(key, toFieldValue(val)); - } - - return ft; -} - - -List ManagementAgent::fromList(const Variant::List& from) -{ - List fa; - - for (Variant::List::const_iterator iter = from.begin(); - iter != from.end(); - iter++) { - const Variant& val(*iter); - - fa.push_back(toFieldValue(val)); - } - - return fa; -} - - -boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in) -{ - - switch(in.getType()) { - - case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue()); - case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); - case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); - case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); - case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); - case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); - case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); - case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); - case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); - case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); - case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); - case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); - case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); - case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); - case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap()))); - case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList()))); - } - - QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]"); - return boost::shared_ptr<FieldValue>(new VoidValue()); -} - -// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup. -Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in) -{ - const string iso885915("iso-8859-15"); - const string utf8("utf8"); - const string utf16("utf16"); - //const string binary("binary"); - const string amqp0_10_binary("amqp0-10:binary"); - //const string amqp0_10_bit("amqp0-10:bit"); - const string amqp0_10_datetime("amqp0-10:datetime"); - const string amqp0_10_struct("amqp0-10:struct"); - Variant out; - - //based on AMQP 0-10 typecode, pick most appropriate variant type - switch (in->getType()) { - //Fixed Width types: - case 0x00: //bin8 - case 0x01: out.setEncoding(amqp0_10_binary); // int8 - case 0x02: out = in->getIntegerValue<int8_t>(); break; //uint8 - case 0x03: out = in->getIntegerValue<uint8_t>(); break; // - // case 0x04: break; //TODO: iso-8859-15 char // char - case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t>()); break; // bool int8 - - case 0x10: out.setEncoding(amqp0_10_binary); // bin16 - case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16 - case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16 - - case 0x20: out.setEncoding(amqp0_10_binary); // bin32 - case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32 - case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32 - - case 0x23: out = in->get<float>(); break; // float(32) - - // case 0x27: break; //TODO: utf-32 char - - case 0x30: out.setEncoding(amqp0_10_binary); // bin64 - case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64 - - case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding - case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64 - case 0x33: out = in->get<double>(); break; // double - - case 0x48: // uuid - { - unsigned char data[16]; - in->getFixedWidthValue<16>(data); - out = qpid::types::Uuid(data); - } break; - - //TODO: figure out whether and how to map values with codes 0x40-0xd8 - - case 0xf0: break;//void, which is the default value for Variant - // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant - - //Variable Width types: - //strings: - case 0x80: // str8 - case 0x90: // str16 - case 0xa0: // str32 - out = in->get<string>(); - out.setEncoding(amqp0_10_binary); - break; - - case 0x84: // str8 - case 0x94: // str16 - out = in->get<string>(); - out.setEncoding(iso885915); - break; - - case 0x85: // str8 - case 0x95: // str16 - out = in->get<string>(); - out.setEncoding(utf8); - break; - - case 0x86: // str8 - case 0x96: // str16 - out = in->get<string>(); - out.setEncoding(utf16); - break; - - case 0xab: // str32 - out = in->get<string>(); - out.setEncoding(amqp0_10_struct); - break; - - case 0xa8: // map - out = ManagementAgent::toMap(in->get<FieldTable>()); - break; - - case 0xa9: // list of variant types - out = ManagementAgent::toList(in->get<List>()); - break; - //case 0xaa: //convert amqp0-10 array (uniform type) into variant list - // out = Variant::List(); - // translate<Array>(in, out.asList(), &toVariant); - // break; - - default: - //error? - QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]"); - break; - } - - return out; -} - - -// Build up a list of the current set of deleted objects that are pending their -// next (last) publish-ment. -void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) -{ - outList.clear(); - - sys::Mutex::ScopedLock lock (userLock); - - moveNewObjectsLH(); - moveDeletedObjectsLH(); - - // now copy the pending deletes into the outList - for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin(); - mIter != pendingDeletedObjs.end(); mIter++) { - for (DeletedObjectList::iterator lIter = mIter->second.begin(); - lIter != mIter->second.end(); lIter++) { - outList.push_back(*lIter); - } - } -} - -// Called by cluster to reset the management agent's list of deleted -// objects to match the rest of the cluster. -void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) -{ - sys::Mutex::ScopedLock lock (userLock); - // Clear out any existing deleted objects - moveNewObjectsLH(); - pendingDeletedObjs.clear(); - ManagementObjectMap::iterator i = managementObjects.begin(); - // Silently drop any deleted objects left over from receiving the update. - while (i != managementObjects.end()) { - ManagementObject* object = i->second; - if (object->isDeleted()) { - delete object; - managementObjects.erase(i++); - } - else ++i; - } - for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) { - - std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className); - pendingDeletedObjs[classkey].push_back(*lIter); - } -} - - -// construct a DeletedObject from a management object. -ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2) - : packageName(src->getPackageName()), - className(src->getClassName()) -{ - bool send_stats = (src->hasInst() && (src->getInstChanged() || src->getForcePublish())); - - stringstream oid; - oid << src->getObjectId(); - objectId = oid.str(); - - if (v1) { - src->writeProperties(encodedV1Config); - if (send_stats) { - src->writeStatistics(encodedV1Inst); - } - } - - if (v2) { - Variant::Map map_; - Variant::Map values; - Variant::Map oid; - - src->getObjectId().mapEncode(oid); - map_["_object_id"] = oid; - map_["_schema_id"] = mapEncodeSchemaId(src->getPackageName(), - src->getClassName(), - "_data", - src->getMd5Sum()); - src->writeTimestamps(map_); - src->mapEncodeValues(values, true, send_stats); - map_["_values"] = values; - - encodedV2 = map_; - } -} - - - -// construct a DeletedObject from an encoded representation. Used by -// clustering to move deleted objects between clustered brokers. See -// DeletedObject::encode() for the reverse. -ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded) -{ - qpid::types::Variant::Map map_; - MapCodec::decode(encoded, map_); - - packageName = map_["_package_name"].getString(); - className = map_["_class_name"].getString(); - objectId = map_["_object_id"].getString(); - - encodedV1Config = map_["_v1_config"].getString(); - encodedV1Inst = map_["_v1_inst"].getString(); - encodedV2 = map_["_v2_data"].asMap(); -} - - -// encode a DeletedObject to a string buffer. Used by -// clustering to move deleted objects between clustered brokers. See -// DeletedObject(const std::string&) for the reverse. -void ManagementAgent::DeletedObject::encode(std::string& toBuffer) -{ - qpid::types::Variant::Map map_; - - - map_["_package_name"] = packageName; - map_["_class_name"] = className; - map_["_object_id"] = objectId; - - map_["_v1_config"] = encodedV1Config; - map_["_v1_inst"] = encodedV1Inst; - map_["_v2_data"] = encodedV2; - - MapCodec::encode(map_, toBuffer); -} - -// Remove Deleted objects, and save for later publishing... -bool ManagementAgent::moveDeletedObjectsLH() { - typedef vector<pair<ObjectId, ManagementObject*> > DeleteList; - DeleteList deleteList; - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - ++iter) - { - ManagementObject* object = iter->second; - if (object->isDeleted()) deleteList.push_back(*iter); - } - - // Iterate in reverse over deleted object list - for (DeleteList::reverse_iterator iter = deleteList.rbegin(); - iter != deleteList.rend(); - iter++) - { - ManagementObject* delObj = iter->second; - assert(delObj->isDeleted()); - DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support)); - - pendingDeletedObjs[dptr->getKey()].push_back(dptr); - managementObjects.erase(iter->first); - delete iter->second; - } - return !deleteList.empty(); -} - -namespace qpid { -namespace management { - -namespace { -QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; -} - -void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt) -{ - executionContext = ctxt; -} -const qpid::broker::ConnectionState* getManagementExecutionContext() -{ - return executionContext; -} - -}} diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h deleted file mode 100644 index fb15dc6ed1..0000000000 --- a/cpp/src/qpid/management/ManagementAgent.h +++ /dev/null @@ -1,432 +0,0 @@ -#ifndef _ManagementAgent_ -#define _ManagementAgent_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/BrokerImportExport.h" -#include "qpid/Options.h" -#include "qpid/broker/Exchange.h" -#include "qpid/framing/Uuid.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Timer.h" -#include "qpid/broker/ConnectionToken.h" -#include "qpid/management/ManagementObject.h" -#include "qpid/management/ManagementEvent.h" -#include "qpid/management/Manageable.h" -#include "qmf/org/apache/qpid/broker/Agent.h" -#include "qpid/types/Variant.h" -#include <qpid/framing/AMQFrame.h> -#include <qpid/framing/FieldValue.h> -#include <qpid/framing/ResizableBuffer.h> -#include <memory> -#include <string> -#include <map> - -namespace qpid { -namespace broker { -class ConnectionState; -} -namespace management { - -class ManagementAgent -{ -private: - - int threadPoolSize; - -public: - typedef enum { - SEV_EMERG = 0, - SEV_ALERT = 1, - SEV_CRIT = 2, - SEV_ERROR = 3, - SEV_WARN = 4, - SEV_NOTE = 5, - SEV_INFO = 6, - SEV_DEBUG = 7, - SEV_DEFAULT = 8 - } severity_t; - - - ManagementAgent (const bool qmfV1, const bool qmfV2); - virtual ~ManagementAgent (); - - /** Called before plugins are initialized */ - void configure (const std::string& dataDir, uint16_t interval, - qpid::broker::Broker* broker, int threadPoolSize); - /** Called after plugins are initialized. */ - void pluginsInitialized(); - - /** Called by cluster to suppress management output during update. */ - void suppress(bool s) { suppressed = s; } - - void setName(const std::string& vendor, - const std::string& product, - const std::string& instance=""); - void getName(std::string& vendor, std::string& product, std::string& instance); - const std::string& getAddress(); - - void setInterval(uint16_t _interval) { interval = _interval; } - void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, - qpid::broker::Exchange::shared_ptr directExchange); - void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange, - qpid::broker::Exchange::shared_ptr directExchange); - - int getMaxThreads () { return threadPoolSize; } - QPID_BROKER_EXTERN void registerClass (const std::string& packageName, - const std::string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - QPID_BROKER_EXTERN void registerEvent (const std::string& packageName, - const std::string& eventName, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0, - bool persistent = false); - QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - const std::string& key, - bool persistent = false); - QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, - severity_t severity = SEV_DEFAULT); - QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); - - QPID_BROKER_EXTERN void clusterUpdate(); - - bool dispatchCommand (qpid::broker::Deliverable& msg, - const std::string& routingKey, - const framing::FieldTable* args, - const bool topic, - int qmfVersion); - - /** Disallow a method. Attempts to call it will receive an exception with message. */ - void disallow(const std::string& className, const std::string& methodName, const std::string& message); - - /** Disallow all QMFv1 methods (used in clustered brokers). */ - void disallowV1Methods() { disallowAllV1Methods = true; } - - /** Serialize my schemas as a binary blob into schemaOut */ - void exportSchemas(std::string& schemaOut); - - /** Serialize my remote-agent map as a binary blob into agentsOut */ - void exportAgents(std::string& agentsOut); - - /** Decode a serialized schemas and add to my schema cache */ - void importSchemas(framing::Buffer& inBuf); - - /** Decode a serialized agent map */ - void importAgents(framing::Buffer& inBuf); - - // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers - uint64_t getNextObjectId(void) { return nextObjectId; } - void setNextObjectId(uint64_t o) { nextObjectId = o; } - - uint16_t getBootSequence(void) { return bootSequence; } - void setBootSequence(uint16_t b) { bootSequence = b; writeData(); } - - const framing::Uuid& getUuid() const { return uuid; } - void setUuid(const framing::Uuid& id) { uuid = id; writeData(); } - - // TODO: remove these when Variant API moved into common library. - static types::Variant::Map toMap(const framing::FieldTable& from); - static framing::FieldTable fromMap(const types::Variant::Map& from); - static types::Variant::List toList(const framing::List& from); - static framing::List fromList(const types::Variant::List& from); - static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in); - static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); - - // For Clustering: management objects that have been marked as - // "deleted", but are waiting for their last published object - // update are not visible to the cluster replication code. These - // interfaces allow clustering to gather up all the management - // objects that are deleted in order to allow all clustered - // brokers to publish the same set of deleted objects. - - class DeletedObject { - public: - typedef boost::shared_ptr<DeletedObject> shared_ptr; - DeletedObject(ManagementObject *, bool v1, bool v2); - DeletedObject( const std::string &encoded ); - ~DeletedObject() {}; - void encode( std::string& toBuffer ); - const std::string getKey() const { - // used to batch up objects of the same class type - return std::string(packageName + std::string(":") + className); - } - - private: - friend class ManagementAgent; - - std::string packageName; - std::string className; - std::string objectId; - - std::string encodedV1Config; // qmfv1 properties - std::string encodedV1Inst; // qmfv1 statistics - qpid::types::Variant::Map encodedV2; - }; - - typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList; - - /** returns a snapshot of all currently deleted management objects. */ - void exportDeletedObjects( DeletedObjectList& outList ); - - /** Import a list of deleted objects to send on next publish interval. */ - void importDeletedObjects( const DeletedObjectList& inList ); - -private: - struct Periodic : public qpid::sys::TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - virtual ~Periodic (); - void fire (); - }; - - // Storage for tracking remote management agents, attached via the client - // management agent API. - // - struct RemoteAgent : public Manageable - { - ManagementAgent& agent; - uint32_t brokerBank; - uint32_t agentBank; - std::string routingKey; - ObjectId connectionRef; - qmf::org::apache::qpid::broker::Agent* mgmtObject; - RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {} - ManagementObject* GetManagementObject (void) const { return mgmtObject; } - - virtual ~RemoteAgent (); - void mapEncode(qpid::types::Variant::Map& _map) const; - void mapDecode(const qpid::types::Variant::Map& _map); - }; - - typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap; - - // Storage for known schema classes: - // - // SchemaClassKey -- Key elements for map lookups - // SchemaClassKeyComp -- Comparison class for SchemaClassKey - // SchemaClass -- Non-key elements for classes - // - struct SchemaClassKey - { - std::string name; - uint8_t hash[16]; - - void mapEncode(qpid::types::Variant::Map& _map) const; - void mapDecode(const qpid::types::Variant::Map& _map); - void encode(framing::Buffer& buffer) const; - void decode(framing::Buffer& buffer); - uint32_t encodedBufSize() const; - }; - - struct SchemaClassKeyComp - { - bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const - { - if (lhs.name != rhs.name) - return lhs.name < rhs.name; - else - for (int i = 0; i < 16; i++) - if (lhs.hash[i] != rhs.hash[i]) - return lhs.hash[i] < rhs.hash[i]; - return false; - } - }; - - - struct SchemaClass - { - uint8_t kind; - ManagementObject::writeSchemaCall_t writeSchemaCall; - std::string data; - uint32_t pendingSequence; - - SchemaClass(uint8_t _kind=0, uint32_t seq=0) : - kind(_kind), writeSchemaCall(0), pendingSequence(seq) {} - SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : - kind(_kind), writeSchemaCall(call), pendingSequence(0) {} - bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } - void appendSchema (framing::Buffer& buf); - - void mapEncode(qpid::types::Variant::Map& _map) const; - void mapDecode(const qpid::types::Variant::Map& _map); - }; - - typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; - typedef std::map<std::string, ClassMap> PackageMap; - - RemoteAgentMap remoteAgents; - PackageMap packages; - - // - // Protected by userLock - // - ManagementObjectMap managementObjects; - - // - // Protected by addLock - // - ManagementObjectVector newManagementObjects; - - framing::Uuid uuid; - - // - // Lock hierarchy: If a thread needs to take both addLock and userLock, - // it MUST take userLock first, then addLock. - // - sys::Mutex userLock; - sys::Mutex addLock; - - qpid::broker::Exchange::shared_ptr mExchange; - qpid::broker::Exchange::shared_ptr dExchange; - qpid::broker::Exchange::shared_ptr v2Topic; - qpid::broker::Exchange::shared_ptr v2Direct; - std::string dataDir; - uint16_t interval; - qpid::broker::Broker* broker; - qpid::sys::Timer* timer; - uint16_t bootSequence; - uint32_t nextObjectId; - uint32_t brokerBank; - uint32_t nextRemoteBank; - uint32_t nextRequestSequence; - bool clientWasAdded; - const qpid::sys::AbsTime startTime; - bool suppressed; - - typedef std::pair<std::string,std::string> MethodName; - typedef std::map<MethodName, std::string> DisallowedMethods; - DisallowedMethods disallowed; - bool disallowAllV1Methods; - - // Agent name and address - qpid::types::Variant::Map attrMap; - std::string name_address; - std::string vendorNameKey; // "." --> "_" - std::string productNameKey; // "." --> "_" - std::string instanceNameKey; // "." --> "_" - - // supported management protocol - bool qmf1Support; - bool qmf2Support; - - // Maximum # of objects allowed in a single V2 response - // message. - uint32_t maxReplyObjs; - - // list of objects that have been deleted, but have yet to be published - // one final time. - // Indexed by a string composed of the object's package and class name. - // Protected by userLock. - typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; - PendingDeletedObjsMap pendingDeletedObjs; - -# define MA_BUFFER_SIZE 65536 - char inputBuffer[MA_BUFFER_SIZE]; - char outputBuffer[MA_BUFFER_SIZE]; - char eventBuffer[MA_BUFFER_SIZE]; - framing::ResizableBuffer msgBuffer; - - void writeData (); - void periodicProcessing (void); - void deleteObjectNowLH(const ObjectId& oid); - void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendBufferLH(framing::Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey); - void sendBufferLH(framing::Buffer& buf, - uint32_t length, - const std::string& exchange, - const std::string& routingKey); - void sendBufferLH(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - const std::string& content_type, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey, - uint64_t ttl_msec = 0); - void sendBufferLH(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - const std::string& content_type, - const std::string& exchange, - const std::string& routingKey, - uint64_t ttl_msec = 0); - void moveNewObjectsLH(); - bool moveDeletedObjectsLH(); - - bool authorizeAgentMessageLH(qpid::broker::Message& msg); - void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false); - - PackageMap::iterator findOrAddPackageLH(std::string name); - void addClassLH(uint8_t kind, - PackageMap::iterator pIter, - const std::string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void encodePackageIndication (framing::Buffer& buf, - PackageMap::iterator pIter); - void encodeClassIndication (framing::Buffer& buf, - const std::string packageName, - const struct SchemaClassKey key, - uint8_t kind); - bool bankInUse (uint32_t bank); - uint32_t allocateNewBank (); - uint32_t assignBankLH (uint32_t requestedPrefix); - void deleteOrphanedAgentsLH(); - void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence, - uint32_t code = 0, const std::string& text = "OK"); - void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); - void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); - void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); - void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); - void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); - - - size_t validateSchema(framing::Buffer&, uint8_t kind); - size_t validateTableSchema(framing::Buffer&); - size_t validateEventSchema(framing::Buffer&); - ManagementObjectMap::iterator numericFind(const ObjectId& oid); - - std::string summarizeAgents(); - void debugSnapshot(const char* title); -}; - -void setManagementExecutionContext(const qpid::broker::ConnectionState*); -const qpid::broker::ConnectionState* getManagementExecutionContext(); -}} - -#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp deleted file mode 100644 index 1d5f8bbd6b..0000000000 --- a/cpp/src/qpid/management/ManagementDirectExchange.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/management/ManagementDirectExchange.h" -#include "qpid/log/Statement.h" -#include <assert.h> - -using namespace qpid::management; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), - DirectExchange(_name, _parent, b), - managementAgent(0) {} -ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, - bool _durable, - const FieldTable& _args, - Manageable* _parent, Broker* b) : - Exchange (_name, _durable, _args, _parent, b), - DirectExchange(_name, _durable, _args, _parent, b), - managementAgent(0) {} - -void ManagementDirectExchange::route(Deliverable& msg, - const string& routingKey, - const FieldTable* args) -{ - bool routeIt = true; - - if (managementAgent) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion); - - if (routeIt) - DirectExchange::route(msg, routingKey, args); -} - -void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv) -{ - managementAgent = agent; - qmfVersion = qv; - assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange -} - - -ManagementDirectExchange::~ManagementDirectExchange() {} - -const std::string ManagementDirectExchange::typeName("management-direct"); - diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h deleted file mode 100644 index 7507179c06..0000000000 --- a/cpp/src/qpid/management/ManagementDirectExchange.h +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ManagementDirectExchange_ -#define _ManagementDirectExchange_ - -#include "qpid/broker/DirectExchange.h" -#include "qpid/management/ManagementAgent.h" - -namespace qpid { -namespace broker { - -class ManagementDirectExchange : public virtual DirectExchange -{ - private: - management::ManagementAgent* managementAgent; - int qmfVersion; - - public: - static const std::string typeName; - - ManagementDirectExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0); - ManagementDirectExchange(const std::string& _name, bool _durable, - const qpid::framing::FieldTable& _args, - Manageable* _parent = 0, Broker* broker = 0); - - virtual std::string getType() const { return typeName; } - - virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); - - void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); - - virtual ~ManagementDirectExchange(); -}; - - -} -} - -#endif diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp deleted file mode 100644 index b4d469afbe..0000000000 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ /dev/null @@ -1,385 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/management/Manageable.h" -#include "qpid/management/ManagementObject.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/Buffer.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Thread.h" -#include "qpid/log/Statement.h" -#include <boost/lexical_cast.hpp> - -#include <stdlib.h> - -using namespace std; -using namespace qpid; -using namespace qpid::management; - -void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) -{ - first = - ((uint64_t) (broker & 0x000fffff)) << 28 | - ((uint64_t) (bank & 0x0fffffff)); -} - -// Deprecated -ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object) - : agent(0), agentEpoch(seq) -{ - first = - ((uint64_t) (flags & 0x0f)) << 60 | - ((uint64_t) (seq & 0x0fff)) << 48 | - ((uint64_t) (broker & 0x000fffff)) << 28; - second = object; -} - - -ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker) - : agent(0), second(0), agentEpoch(seq) -{ - first = - ((uint64_t) (flags & 0x0f)) << 60 | - ((uint64_t) (seq & 0x0fff)) << 48 | - ((uint64_t) (broker & 0x000fffff)) << 28; -} - -ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq) - : agent(_agent), second(0), agentEpoch(seq) -{ - - first = - ((uint64_t) (flags & 0x0f)) << 60 | - ((uint64_t) (seq & 0x0fff)) << 48; -} - - -ObjectId::ObjectId(istream& in) : agent(0) -{ - string text; - in >> text; - fromString(text); -} - -ObjectId::ObjectId(const string& text) : agent(0) -{ - fromString(text); -} - -void ObjectId::fromString(const string& text) -{ -#define FIELDS 5 -#if defined (_WIN32) && !defined (atoll) -# define atoll(X) _atoi64(X) -#endif - - // format: - // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id> - // V2: Not used - - string copy(text.c_str()); - char* cText; - char* field[FIELDS]; - bool atFieldStart = true; - int idx = 0; - - cText = const_cast<char*>(copy.c_str()); - for (char* cursor = cText; *cursor; cursor++) { - if (atFieldStart) { - if (idx >= FIELDS) - throw Exception("Invalid ObjectId format"); - field[idx++] = cursor; - atFieldStart = false; - } else { - if (*cursor == '-') { - *cursor = '\0'; - atFieldStart = true; - } - } - } - - if (idx != FIELDS) - throw Exception("Invalid ObjectId format"); - - agentEpoch = atoll(field[1]); - - first = (atoll(field[0]) << 60) + - (atoll(field[1]) << 48) + - (atoll(field[2]) << 28); - - agentName = string(field[3]); - second = atoll(field[4]); -} - - -bool ObjectId::operator==(const ObjectId &other) const -{ - return v2Key == other.v2Key; -} - -bool ObjectId::operator<(const ObjectId &other) const -{ - return v2Key < other.v2Key; -} - -bool ObjectId::equalV1(const ObjectId &other) const -{ - uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL; - return first == otherFirst && second == other.second; -} - -// encode as V1-format binary -void ObjectId::encode(string& buffer) const -{ - const uint32_t len = 16; - char _data[len]; - qpid::framing::Buffer body(_data, len); - - if (agent == 0) - body.putLongLong(first); - else - body.putLongLong(first | agent->first); - body.putLongLong(second); - - body.reset(); - body.getRawData(buffer, len); -} - -// decode as V1-format binary -void ObjectId::decode(const string& buffer) -{ - const uint32_t len = 16; - char _data[len]; - qpid::framing::Buffer body(_data, len); - - body.checkAvailable(buffer.length()); - body.putRawData(buffer); - body.reset(); - first = body.getLongLong(); - second = body.getLongLong(); - v2Key = boost::lexical_cast<string>(second); -} - -// generate the V2 key from the index fields defined -// in the schema. -void ObjectId::setV2Key(const ManagementObject& object) -{ - stringstream oname; - oname << object.getPackageName() << ":" << object.getClassName() << ":" << object.getKey(); - v2Key = oname.str(); -} - - -// encode as V2-format map -void ObjectId::mapEncode(types::Variant::Map& map) const -{ - map["_object_name"] = v2Key; - if (!agentName.empty()) - map["_agent_name"] = agentName; - if (agentEpoch) - map["_agent_epoch"] = agentEpoch; -} - -// decode as v2-format map -void ObjectId::mapDecode(const types::Variant::Map& map) -{ - types::Variant::Map::const_iterator i; - - if ((i = map.find("_object_name")) != map.end()) - v2Key = i->second.asString(); - else - throw Exception("Required _object_name field missing."); - - if ((i = map.find("_agent_name")) != map.end()) - agentName = i->second.asString(); - - if ((i = map.find("_agent_epoch")) != map.end()) - agentEpoch = i->second.asInt64(); -} - - -ObjectId::operator types::Variant::Map() const -{ - types::Variant::Map m; - mapEncode(m); - return m; -} - - - -namespace qpid { -namespace management { - -ostream& operator<<(ostream& out, const ObjectId& i) -{ - uint64_t virtFirst = i.first; - if (i.agent) - virtFirst |= i.agent->getFirst(); - - out << ((virtFirst & 0xF000000000000000LL) >> 60) << - "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) << - "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) << - "-" << i.agentName << - "-" << i.second << - "(" << i.v2Key << ")"; - return out; -} - -}} - -ManagementObject::ManagementObject(Manageable* _core) : -createTime(qpid::sys::Duration(sys::EPOCH, sys::now())), - destroyTime(0), updateTime(createTime), configChanged(true), - instChanged(true), deleted(false), - coreObject(_core), flags(0), forcePublish(false) {} - -void ManagementObject::setUpdateTime() -{ - updateTime = sys::Duration(sys::EPOCH, sys::now()); -} - -void ManagementObject::resourceDestroy() -{ - QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key()); - destroyTime = sys::Duration(sys::EPOCH, sys::now()); - deleted = true; -} - -int ManagementObject::maxThreads = 1; -int ManagementObject::nextThreadIndex = 0; - -void ManagementObject::writeTimestamps (string& buf) const -{ - char _data[4000]; - qpid::framing::Buffer body(_data, 4000); - - body.putShortString (getPackageName ()); - body.putShortString (getClassName ()); - body.putBin128 (getMd5Sum ()); - body.putLongLong (updateTime); - body.putLongLong (createTime); - body.putLongLong (destroyTime); - - uint32_t len = body.getPosition(); - body.reset(); - body.getRawData(buf, len); - - string oid; - objectId.encode(oid); - buf += oid; -} - -void ManagementObject::readTimestamps (const string& buf) -{ - char _data[4000]; - qpid::framing::Buffer body(_data, 4000); - string unused; - uint8_t unusedUuid[16]; - - body.checkAvailable(buf.length()); - body.putRawData(buf); - body.reset(); - - body.getShortString(unused); - body.getShortString(unused); - body.getBin128(unusedUuid); - updateTime = body.getLongLong(); - createTime = body.getLongLong(); - destroyTime = body.getLongLong(); -} - -uint32_t ManagementObject::writeTimestampsSize() const -{ - return 1 + getPackageName().length() + // str8 - 1 + getClassName().length() + // str8 - 16 + // bin128 - 8 + // uint64 - 8 + // uint64 - 8 + // uint64 - objectId.encodedSize(); // objectId -} - - -void ManagementObject::writeTimestamps (types::Variant::Map& map) const -{ - // types::Variant::Map oid, sid; - - // sid["_package_name"] = getPackageName(); - // sid["_class_name"] = getClassName(); - // sid["_hash"] = qpid::types::Uuid(getMd5Sum()); - // map["_schema_id"] = sid; - - // objectId.mapEncode(oid); - // map["_object_id"] = oid; - - map["_update_ts"] = updateTime; - map["_create_ts"] = createTime; - map["_delete_ts"] = destroyTime; -} - -void ManagementObject::readTimestamps (const types::Variant::Map& map) -{ - types::Variant::Map::const_iterator i; - - if ((i = map.find("_update_ts")) != map.end()) - updateTime = i->second.asUint64(); - if ((i = map.find("_create_ts")) != map.end()) - createTime = i->second.asUint64(); - if ((i = map.find("_delete_ts")) != map.end()) - destroyTime = i->second.asUint64(); -} - - -void ManagementObject::setReference(ObjectId) {} - -int ManagementObject::getThreadIndex() { - static QPID_TSS int thisIndex = -1; - if (thisIndex == -1) { - Mutex::ScopedLock mutex(accessLock); - thisIndex = nextThreadIndex; - if (nextThreadIndex < maxThreads - 1) - nextThreadIndex++; - } - return thisIndex; -} - - -// void ManagementObject::mapEncode(types::Variant::Map& map, -// bool includeProperties, -// bool includeStatistics) -// { -// types::Variant::Map values; - -// writeTimestamps(map); - -// mapEncodeValues(values, includeProperties, includeStatistics); -// map["_values"] = values; -// } - -// void ManagementObject::mapDecode(const types::Variant::Map& map) -// { -// types::Variant::Map::const_iterator i; - -// readTimestamps(map); - -// if ((i = map.find("_values")) != map.end()) -// mapDecodeValues(i->second.asMap()); -// } diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp deleted file mode 100644 index ee8657646f..0000000000 --- a/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/management/ManagementTopicExchange.h" -#include "qpid/log/Statement.h" - -using namespace qpid::management; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) : - Exchange (_name, _parent, b), - TopicExchange(_name, _parent, b), - managementAgent(0) {} -ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, - bool _durable, - const FieldTable& _args, - Manageable* _parent, Broker* b) : - Exchange (_name, _durable, _args, _parent, b), - TopicExchange(_name, _durable, _args, _parent, b), - managementAgent(0) {} - -void ManagementTopicExchange::route(Deliverable& msg, - const string& routingKey, - const FieldTable* args) -{ - bool routeIt = true; - - // Intercept management agent commands - if (managementAgent) - routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion); - - if (routeIt) - TopicExchange::route(msg, routingKey, args); -} - -bool ManagementTopicExchange::bind(Queue::shared_ptr queue, - const string& routingKey, - const qpid::framing::FieldTable* args) -{ - if (qmfVersion == 1) - managementAgent->clientAdded(routingKey); - return TopicExchange::bind(queue, routingKey, args); -} - -void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv) -{ - managementAgent = agent; - qmfVersion = qv; -} - - -ManagementTopicExchange::~ManagementTopicExchange() {} - -const std::string ManagementTopicExchange::typeName("management-topic"); - diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h deleted file mode 100644 index 232300265e..0000000000 --- a/cpp/src/qpid/management/ManagementTopicExchange.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ManagementTopicExchange_ -#define _ManagementTopicExchange_ - -#include "qpid/broker/TopicExchange.h" -#include "qpid/management/ManagementAgent.h" - -namespace qpid { -namespace broker { - -class ManagementTopicExchange : public virtual TopicExchange -{ - private: - management::ManagementAgent* managementAgent; - int qmfVersion; - - public: - static const std::string typeName; - - ManagementTopicExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0); - ManagementTopicExchange(const std::string& _name, bool _durable, - const qpid::framing::FieldTable& _args, - Manageable* _parent = 0, Broker* broker = 0); - - virtual std::string getType() const { return typeName; } - - virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); - - virtual bool bind(Queue::shared_ptr queue, - const std::string& routingKey, - const qpid::framing::FieldTable* args); - - void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); - - virtual ~ManagementTopicExchange(); -}; - - -} -} - -#endif diff --git a/cpp/src/qpid/management/Mutex.cpp b/cpp/src/qpid/management/Mutex.cpp deleted file mode 100644 index f05abb01dc..0000000000 --- a/cpp/src/qpid/management/Mutex.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Copyright (c) 2008 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/management/Mutex.h" -#include "qpid/sys/Mutex.h" - -using namespace std; -using namespace qpid::management; - -Mutex::Mutex() : impl(new sys::Mutex()) {} -Mutex::~Mutex() { delete impl; } -void Mutex::lock() { impl->lock(); } -void Mutex::unlock() { impl->unlock(); } - |