summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/Buffer.cpp106
-rw-r--r--cpp/src/qpid/management/ConnectionSettings.cpp40
-rw-r--r--cpp/src/qpid/management/Manageable.cpp53
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp3121
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h432
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.cpp67
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.h59
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp385
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.cpp75
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.h63
-rw-r--r--cpp/src/qpid/management/Mutex.cpp29
11 files changed, 0 insertions, 4430 deletions
diff --git a/cpp/src/qpid/management/Buffer.cpp b/cpp/src/qpid/management/Buffer.cpp
deleted file mode 100644
index 7556b2a243..0000000000
--- a/cpp/src/qpid/management/Buffer.cpp
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/management/Buffer.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/amqp_0_10/Codecs.h"
-
-using namespace std;
-
-namespace qpid {
-namespace management {
-
-Buffer::Buffer(char* data, uint32_t size) : impl(new framing::Buffer(data, size)) {}
-Buffer::~Buffer() { delete impl; }
-void Buffer::record() { impl->record(); }
-void Buffer::restore(bool reRecord) { impl->restore(reRecord); }
-void Buffer::reset() { impl->reset(); }
-uint32_t Buffer::available() { return impl->available(); }
-uint32_t Buffer::getSize() { return impl->getSize(); }
-uint32_t Buffer::getPosition() { return impl->getPosition(); }
-char* Buffer::getPointer() { return impl->getPointer(); }
-void Buffer::putOctet(uint8_t i) { impl->putOctet(i); }
-void Buffer::putShort(uint16_t i) { impl->putShort(i); }
-void Buffer::putLong(uint32_t i) { impl->putLong(i); }
-void Buffer::putLongLong(uint64_t i) { impl->putLongLong(i); }
-void Buffer::putInt8(int8_t i) { impl->putInt8(i); }
-void Buffer::putInt16(int16_t i) { impl->putInt16(i); }
-void Buffer::putInt32(int32_t i) { impl->putInt32(i); }
-void Buffer::putInt64(int64_t i) { impl->putInt64(i); }
-void Buffer::putFloat(float i) { impl->putFloat(i); }
-void Buffer::putDouble(double i) { impl->putDouble(i); }
-void Buffer::putBin128(const uint8_t* i) { impl->putBin128(i); }
-uint8_t Buffer::getOctet() { return impl->getOctet(); }
-uint16_t Buffer::getShort() { return impl->getShort(); }
-uint32_t Buffer::getLong() { return impl->getLong(); }
-uint64_t Buffer::getLongLong() { return impl->getLongLong(); }
-int8_t Buffer:: getInt8() { return impl-> getInt8(); }
-int16_t Buffer::getInt16() { return impl->getInt16(); }
-int32_t Buffer::getInt32() { return impl->getInt32(); }
-int64_t Buffer::getInt64() { return impl->getInt64(); }
-float Buffer::getFloat() { return impl->getFloat(); }
-double Buffer::getDouble() { return impl->getDouble(); }
-void Buffer::putShortString(const string& i) { impl->putShortString(i); }
-void Buffer::putMediumString(const string& i) { impl->putMediumString(i); }
-void Buffer::putLongString(const string& i) { impl->putLongString(i); }
-void Buffer::getShortString(string& i) { impl->getShortString(i); }
-void Buffer::getMediumString(string& i) { impl->getMediumString(i); }
-void Buffer::getLongString(string& i) { impl->getLongString(i); }
-void Buffer::getBin128(uint8_t* i) { impl->getBin128(i); }
-void Buffer::putRawData(const string& i) { impl->putRawData(i); }
-void Buffer::getRawData(string& s, uint32_t size) { impl->getRawData(s, size); }
-void Buffer::putRawData(const uint8_t* data, size_t size) { impl->putRawData(data, size); }
-void Buffer::getRawData(uint8_t* data, size_t size) { impl->getRawData(data, size); }
-
-void Buffer::putMap(const types::Variant::Map& i)
-{
- string encoded;
- amqp_0_10::MapCodec::encode(i, encoded);
- impl->putRawData(encoded);
-}
-
-void Buffer::putList(const types::Variant::List& i)
-{
- string encoded;
- amqp_0_10::ListCodec::encode(i, encoded);
- impl->putRawData(encoded);
-}
-
-void Buffer::getMap(types::Variant::Map& map)
-{
- string encoded;
- uint32_t saved = impl->getPosition();
- uint32_t length = impl->getLong();
- impl->setPosition(saved);
- impl->getRawData(encoded, length + sizeof(uint32_t));
- amqp_0_10::MapCodec::decode(encoded, map);
-}
-
-void Buffer::getList(types::Variant::List& list)
-{
- string encoded;
- uint32_t saved = impl->getPosition();
- uint32_t length = impl->getLong();
- impl->setPosition(saved);
- impl->getRawData(encoded, length + sizeof(uint32_t));
- amqp_0_10::ListCodec::decode(encoded, list);
-}
-
-}}
diff --git a/cpp/src/qpid/management/ConnectionSettings.cpp b/cpp/src/qpid/management/ConnectionSettings.cpp
deleted file mode 100644
index 1421a26867..0000000000
--- a/cpp/src/qpid/management/ConnectionSettings.cpp
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/management/ConnectionSettings.h"
-#include "qpid/Version.h"
-
-qpid::management::ConnectionSettings::ConnectionSettings() :
- protocol("tcp"),
- host("localhost"),
- port(5672),
- locale("en_US"),
- heartbeat(0),
- maxChannels(32767),
- maxFrameSize(65535),
- bounds(2),
- tcpNoDelay(false),
- service(qpid::saslName),
- minSsf(0),
- maxSsf(256)
-{}
-
-qpid::management::ConnectionSettings::~ConnectionSettings() {}
-
diff --git a/cpp/src/qpid/management/Manageable.cpp b/cpp/src/qpid/management/Manageable.cpp
deleted file mode 100644
index 651215ffb5..0000000000
--- a/cpp/src/qpid/management/Manageable.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-
-#include "qpid/management/Manageable.h"
-
-using namespace qpid::management;
-using std::string;
-
-string Manageable::StatusText (status_t status, string text)
-{
- if ((status & STATUS_USER) == STATUS_USER)
- return text;
-
- switch (status)
- {
- case STATUS_OK : return "OK";
- case STATUS_UNKNOWN_OBJECT : return "UnknownObject";
- case STATUS_UNKNOWN_METHOD : return "UnknownMethod";
- case STATUS_NOT_IMPLEMENTED : return "NotImplemented";
- case STATUS_PARAMETER_INVALID : return "InvalidParameter";
- case STATUS_FEATURE_NOT_IMPLEMENTED : return "FeatureNotImplemented";
- case STATUS_FORBIDDEN : return "Forbidden";
- }
-
- return "??";
-}
-
-Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&)
-{
- return STATUS_UNKNOWN_METHOD;
-}
-
-bool Manageable::AuthorizeMethod(uint32_t, Args&, const std::string&)
-{
- return true;
-}
-
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
deleted file mode 100644
index 8a12a57fa6..0000000000
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ /dev/null
@@ -1,3121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-// NOTE on use of log levels: The criteria for using trace vs. debug
-// is to use trace for log messages that are generated for each
-// unbatched stats/props notification and debug for everything else.
-
-#include "qpid/management/ManagementAgent.h"
-#include "qpid/management/ManagementObject.h"
-#include "qpid/broker/DeliverableMessage.h"
-#include "qpid/log/Statement.h"
-#include <qpid/broker/Message.h>
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/sys/Time.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/broker/ConnectionState.h"
-#include "qpid/broker/AclModule.h"
-#include "qpid/types/Variant.h"
-#include "qpid/types/Uuid.h"
-#include "qpid/framing/List.h"
-#include "qpid/amqp_0_10/Codecs.h"
-#include <list>
-#include <iostream>
-#include <fstream>
-#include <sstream>
-#include <typeinfo>
-
-using boost::intrusive_ptr;
-using qpid::framing::Uuid;
-using qpid::types::Variant;
-using qpid::amqp_0_10::MapCodec;
-using qpid::amqp_0_10::ListCodec;
-using qpid::sys::Mutex;
-using namespace qpid::framing;
-using namespace qpid::management;
-using namespace qpid::broker;
-using namespace qpid;
-using namespace std;
-namespace _qmf = qmf::org::apache::qpid::broker;
-
-
-namespace {
- const string defaultVendorName("vendor");
- const string defaultProductName("product");
-
- // Create a valid binding key substring by
- // replacing all '.' chars with '_'
- const string keyifyNameStr(const string& name)
- {
- string n2 = name;
-
- size_t pos = n2.find('.');
- while (pos != n2.npos) {
- n2.replace(pos, 1, "_");
- pos = n2.find('.', pos);
- }
- return n2;
- }
-
-struct ScopedManagementContext
-{
- ScopedManagementContext(const qpid::broker::ConnectionState* context)
- {
- setManagementExecutionContext(context);
- }
- ~ScopedManagementContext()
- {
- setManagementExecutionContext(0);
- }
-};
-}
-
-
-static Variant::Map mapEncodeSchemaId(const string& pname,
- const string& cname,
- const string& type,
- const uint8_t *md5Sum)
-{
- Variant::Map map_;
-
- map_["_package_name"] = pname;
- map_["_class_name"] = cname;
- map_["_type"] = type;
- map_["_hash"] = qpid::types::Uuid(md5Sum);
- return map_;
-}
-
-
-ManagementAgent::RemoteAgent::~RemoteAgent ()
-{
- QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
- if (mgmtObject != 0) {
- mgmtObject->resourceDestroy();
- agent.deleteObjectNowLH(mgmtObject->getObjectId());
- }
-}
-
-ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
- threadPoolSize(1), interval(10), broker(0), timer(0),
- startTime(sys::now()),
- suppressed(false), disallowAllV1Methods(false),
- vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
- qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
- msgBuffer(MA_BUFFER_SIZE)
-{
- nextObjectId = 1;
- brokerBank = 1;
- bootSequence = 1;
- nextRemoteBank = 10;
- nextRequestSequence = 1;
- clientWasAdded = false;
- attrMap["_vendor"] = defaultVendorName;
- attrMap["_product"] = defaultProductName;
-}
-
-ManagementAgent::~ManagementAgent ()
-{
- {
- sys::Mutex::ScopedLock lock (userLock);
-
- // Reset the shared pointers to exchanges. If this is not done now, the exchanges
- // will stick around until dExchange and mExchange are implicitly destroyed (long
- // after this destructor completes). Those exchanges hold references to management
- // objects that will be invalid.
- dExchange.reset();
- mExchange.reset();
- v2Topic.reset();
- v2Direct.reset();
-
- moveNewObjectsLH();
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++) {
- ManagementObject* object = iter->second;
- delete object;
- }
- managementObjects.clear();
- }
-}
-
-void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
- qpid::broker::Broker* _broker, int _threads)
-{
- dataDir = _dataDir;
- interval = _interval;
- broker = _broker;
- threadPoolSize = _threads;
- ManagementObject::maxThreads = threadPoolSize;
-
- // Get from file or generate and save to file.
- if (dataDir.empty())
- {
- uuid.generate();
- QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
- << uuid);
- }
- else
- {
- string filename(dataDir + "/.mbrokerdata");
- ifstream inFile(filename.c_str ());
-
- if (inFile.good())
- {
- inFile >> uuid;
- inFile >> bootSequence;
- inFile >> nextRemoteBank;
- inFile.close();
- if (uuid.isNull()) {
- uuid.generate();
- QPID_LOG (info, "No stored broker ID found - ManagementAgent generated broker ID: " << uuid);
- } else
- QPID_LOG (info, "ManagementAgent restored broker ID: " << uuid);
-
- // if sequence goes beyond a 12-bit field, skip zero and wrap to 1.
- bootSequence++;
- if (bootSequence & 0xF000)
- bootSequence = 1;
- writeData();
- }
- else
- {
- uuid.generate();
- QPID_LOG (info, "ManagementAgent generated broker ID: " << uuid);
- writeData();
- }
-
- QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence);
- }
-}
-
-void ManagementAgent::pluginsInitialized() {
- // Do this here so cluster plugin has the chance to set up the timer.
- timer = &broker->getClusterTimer();
- timer->add(new Periodic(*this, interval));
-}
-
-
-void ManagementAgent::setName(const string& vendor, const string& product, const string& instance)
-{
- if (vendor.find(':') != vendor.npos) {
- throw Exception("vendor string cannot contain a ':' character.");
- }
- if (product.find(':') != product.npos) {
- throw Exception("product string cannot contain a ':' character.");
- }
- attrMap["_vendor"] = vendor;
- attrMap["_product"] = product;
- string inst;
- if (instance.empty()) {
- if (uuid.isNull())
- {
- throw Exception("ManagementAgent::configure() must be called if default name is used.");
- }
- inst = uuid.str();
- } else
- inst = instance;
-
- name_address = vendor + ":" + product + ":" + inst;
- attrMap["_instance"] = inst;
- attrMap["_name"] = name_address;
-
- vendorNameKey = keyifyNameStr(vendor);
- productNameKey = keyifyNameStr(product);
- instanceNameKey = keyifyNameStr(inst);
-}
-
-
-void ManagementAgent::getName(string& vendor, string& product, string& instance)
-{
- vendor = std::string(attrMap["_vendor"]);
- product = std::string(attrMap["_product"]);
- instance = std::string(attrMap["_instance"]);
-}
-
-
-const std::string& ManagementAgent::getAddress()
-{
- return name_address;
-}
-
-
-void ManagementAgent::writeData ()
-{
- string filename (dataDir + "/.mbrokerdata");
- ofstream outFile (filename.c_str ());
-
- if (outFile.good())
- {
- outFile << uuid << " " << bootSequence << " " << nextRemoteBank << endl;
- outFile.close();
- }
-}
-
-void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange,
- qpid::broker::Exchange::shared_ptr _dexchange)
-{
- mExchange = _mexchange;
- dExchange = _dexchange;
-}
-
-void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange,
- qpid::broker::Exchange::shared_ptr _dexchange)
-{
- v2Topic = _texchange;
- v2Direct = _dexchange;
-}
-
-void ManagementAgent::registerClass (const string& packageName,
- const string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- sys::Mutex::ScopedLock lock(userLock);
- PackageMap::iterator pIter = findOrAddPackageLH(packageName);
- addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
-}
-
-void ManagementAgent::registerEvent (const string& packageName,
- const string& eventName,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- sys::Mutex::ScopedLock lock(userLock);
- PackageMap::iterator pIter = findOrAddPackageLH(packageName);
- addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
-}
-
-// Deprecated: V1 objects
-ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool persistent)
-{
- uint16_t sequence;
- uint64_t objectNum;
-
- sequence = persistent ? 0 : bootSequence;
- objectNum = persistId ? persistId : nextObjectId++;
-
- ObjectId objId(0 /*flags*/, sequence, brokerBank, objectNum);
- objId.setV2Key(*object); // let object generate the v2 key
-
- object->setObjectId(objId);
-
- {
- sys::Mutex::ScopedLock lock(addLock);
- newManagementObjects.push_back(object);
- }
- QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());
- return objId;
-}
-
-
-
-ObjectId ManagementAgent::addObject(ManagementObject* object,
- const string& key,
- bool persistent)
-{
- uint16_t sequence;
-
- sequence = persistent ? 0 : bootSequence;
-
- ObjectId objId(0 /*flags*/, sequence, brokerBank);
- if (key.empty()) {
- objId.setV2Key(*object); // let object generate the key
- } else {
- objId.setV2Key(key);
- }
-
- object->setObjectId(objId);
- {
- sys::Mutex::ScopedLock lock(addLock);
- newManagementObjects.push_back(object);
- }
- QPID_LOG(debug, "Management object added: " << objId.getV2Key());
- return objId;
-}
-
-void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
-{
- static const std::string severityStr[] = {
- "emerg", "alert", "crit", "error", "warn",
- "note", "info", "debug"
- };
- sys::Mutex::ScopedLock lock (userLock);
- uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
-
- if (qmf1Support) {
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'e');
- outBuffer.putShortString(event.getPackageName());
- outBuffer.putShortString(event.getEventName());
- outBuffer.putBin128(event.getMd5Sum());
- outBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now())));
- outBuffer.putOctet(sev);
- string sBuf;
- event.encode(sBuf);
- outBuffer.putRawData(sBuf);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, mExchange,
- "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
- QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
- }
-
- if (qmf2Support) {
- Variant::Map map_;
- Variant::Map schemaId;
- Variant::Map values;
- Variant::Map headers;
-
- map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
- event.getEventName(),
- "_event",
- event.getMd5Sum());
- event.mapEncode(values);
- map_["_values"] = values;
- map_["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
- map_["_severity"] = sev;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_event";
- headers["qmf.agent"] = name_address;
-
- stringstream key;
- key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
- << "." << keyifyNameStr(event.getEventName())
- << "." << severityStr[sev]
- << "." << vendorNameKey
- << "." << productNameKey;
- if (!instanceNameKey.empty())
- key << "." << instanceNameKey;
-
-
- string content;
- Variant::List list_;
- list_.push_back(map_);
- ListCodec::encode(list_, content);
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
- QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
- }
-}
-
-ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask (sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
- "ManagementAgent::periodicProcessing"),
- agent(_agent) {}
-
-ManagementAgent::Periodic::~Periodic () {}
-
-void ManagementAgent::Periodic::fire ()
-{
- agent.timer->add (new Periodic (agent, agent.interval));
- agent.periodicProcessing ();
-}
-
-void ManagementAgent::clientAdded (const string& routingKey)
-{
- sys::Mutex::ScopedLock lock(userLock);
-
- //
- // If this routing key is not relevant to object updates, exit.
- //
- if ((routingKey.compare(0, 1, "#") != 0) &&
- (routingKey.compare(0, 9, "console.#") != 0) &&
- (routingKey.compare(0, 12, "console.obj.") != 0))
- return;
-
- //
- // Mark local objects for full-update.
- //
- clientWasAdded = true;
-
- //
- // If the routing key is relevant for local objects only, don't involve
- // any of the remote agents.
- //
- if (routingKey.compare(0, 39, "console.obj.*.*.org.apache.qpid.broker.") == 0)
- return;
-
- std::list<std::string> rkeys;
-
- for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
- aIter != remoteAgents.end();
- aIter++) {
- rkeys.push_back(aIter->second->routingKey);
- }
-
- while (rkeys.size()) {
- char localBuffer[16];
- Buffer outBuffer(localBuffer, 16);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'x');
- outLen = outBuffer.getPosition();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, rkeys.front());
- QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front());
- rkeys.pop_front();
- }
-}
-
-void ManagementAgent::clusterUpdate() {
- // Called on all cluster memebers when a new member joins a cluster.
- // Set clientWasAdded so that on the next periodicProcessing we will do
- // a full update on all cluster members.
- sys::Mutex::ScopedLock l(userLock);
- moveNewObjectsLH(); // keep lists consistent with updater/updatee.
- moveDeletedObjectsLH();
- clientWasAdded = true;
- debugSnapshot("Cluster member joined");
-}
-
-void ManagementAgent::encodeHeader (Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet ('A');
- buf.putOctet ('M');
- buf.putOctet ('2');
- buf.putOctet (opcode);
- buf.putLong (seq);
-}
-
-bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '2';
-}
-
-// NOTE WELL: assumes userLock is held by caller (LH)
-// NOTE EVEN WELLER: drops this lock when delivering the message!!!
-void ManagementAgent::sendBufferLH(Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- const string& routingKey)
-{
- if (suppressed) {
- QPID_LOG(debug, "Suppressing management message to " << routingKey);
- return;
- }
- if (exchange.get() == 0) return;
-
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content((AMQContentBody()));
-
- content.castBody<AMQContentBody>()->decode(buf, length);
-
- method.setEof(false);
- header.setBof(false);
- header.setEof(false);
- content.setBof(false);
-
- msg->getFrames().append(method);
- msg->getFrames().append(header);
-
- MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(length);
-
- DeliveryProperties* dp =
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
- dp->setRoutingKey(routingKey);
-
- msg->getFrames().append(content);
- msg->setIsManagementMessage(true);
-
- {
- sys::Mutex::ScopedUnlock u(userLock);
-
- DeliverableMessage deliverable (msg);
- try {
- exchange->route(deliverable, routingKey, 0);
- } catch(exception&) {}
- }
- buf.reset();
-}
-
-
-void ManagementAgent::sendBufferLH(Buffer& buf,
- uint32_t length,
- const string& exchange,
- const string& routingKey)
-{
- qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
- if (ex.get() != 0)
- sendBufferLH(buf, length, ex, routingKey);
-}
-
-
-// NOTE WELL: assumes userLock is held by caller (LH)
-// NOTE EVEN WELLER: drops this lock when delivering the message!!!
-void ManagementAgent::sendBufferLH(const string& data,
- const string& cid,
- const Variant::Map& headers,
- const string& content_type,
- qpid::broker::Exchange::shared_ptr exchange,
- const string& routingKey,
- uint64_t ttl_msec)
-{
- Variant::Map::const_iterator i;
-
- if (suppressed) {
- QPID_LOG(debug, "Suppressing management message to " << routingKey);
- return;
- }
- if (exchange.get() == 0) return;
-
- intrusive_ptr<Message> msg(new Message());
- AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
- AMQFrame header((AMQHeaderBody()));
- AMQFrame content((AMQContentBody(data)));
-
- method.setEof(false);
- header.setBof(false);
- header.setEof(false);
- content.setBof(false);
-
- msg->getFrames().append(method);
- msg->getFrames().append(header);
-
- MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setContentLength(data.length());
- if (!cid.empty()) {
- props->setCorrelationId(cid);
- }
- props->setContentType(content_type);
- props->setAppId("qmf2");
-
- for (i = headers.begin(); i != headers.end(); ++i) {
- msg->getOrInsertHeaders().setString(i->first, i->second.asString());
- }
-
- DeliveryProperties* dp =
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
- dp->setRoutingKey(routingKey);
- if (ttl_msec) {
- dp->setTtl(ttl_msec);
- msg->setTimestamp(broker->getExpiryPolicy());
- }
- msg->getFrames().append(content);
- msg->setIsManagementMessage(true);
-
- {
- sys::Mutex::ScopedUnlock u(userLock);
-
- DeliverableMessage deliverable (msg);
- try {
- exchange->route(deliverable, routingKey, 0);
- } catch(exception&) {}
- }
-}
-
-
-void ManagementAgent::sendBufferLH(const string& data,
- const string& cid,
- const Variant::Map& headers,
- const string& content_type,
- const string& exchange,
- const string& routingKey,
- uint64_t ttl_msec)
-{
- qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
- if (ex.get() != 0)
- sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec);
-}
-
-
-/** Objects that have been added since the last periodic poll are temporarily
- * saved in the newManagementObjects list. This allows objects to be
- * added without needing to block on the userLock (addLock is used instead).
- * These new objects need to be integrated into the object database
- * (managementObjects) *before* they can be properly managed. This routine
- * performs the integration.
- *
- * Note well: objects on the newManagementObjects list may have been
- * marked as "deleted", and, possibly re-added. This would result in
- * duplicate object ids. To avoid clashes, don't put deleted objects
- * into the active object database.
- */
-void ManagementAgent::moveNewObjectsLH()
-{
- sys::Mutex::ScopedLock lock (addLock);
- while (!newManagementObjects.empty()) {
- ManagementObject *object = newManagementObjects.back();
- newManagementObjects.pop_back();
-
- if (object->isDeleted()) {
- DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support));
- pendingDeletedObjs[dptr->getKey()].push_back(dptr);
- delete object;
- } else { // add to active object list, check for duplicates.
- ObjectId oid = object->getObjectId();
- ManagementObjectMap::iterator destIter = managementObjects.find(oid);
- if (destIter != managementObjects.end()) {
- // duplicate found. It is OK if the old object has been marked
- // deleted...
- ManagementObject *oldObj = destIter->second;
- if (oldObj->isDeleted()) {
- DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support));
- pendingDeletedObjs[dptr->getKey()].push_back(dptr);
- delete oldObj;
- } else {
- // Duplicate non-deleted objects? This is a user error - oids must be unique.
- // for now, leak the old object (safer than deleting - may still be referenced)
- // and complain loudly...
- QPID_LOG(error, "Detected two management objects with the same identifier: " << oid);
- }
- }
- managementObjects[oid] = object;
- }
- }
-}
-
-void ManagementAgent::periodicProcessing (void)
-{
-#define BUFSIZE 65536
-#define HEADROOM 4096
- debugSnapshot("Management agent periodic processing");
- sys::Mutex::ScopedLock lock (userLock);
- uint32_t contentSize;
- string routingKey;
- string sBuf;
-
- uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
-
- moveNewObjectsLH();
-
- //
- // Clear the been-here flag on all objects in the map.
- //
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- object->setFlags(0);
- if (clientWasAdded) {
- object->setForcePublish(true);
- }
- }
-
- clientWasAdded = false;
-
- // first send the pending deletes before sending updates. This prevents a
- // "false delete" scenario: if an object was deleted then re-added during
- // the last poll cycle, it will have a delete entry and an active entry.
- // if we sent the active update first, _then_ the delete update, clients
- // would incorrectly think the object was deleted. See QPID-2997
- //
- bool objectsDeleted = moveDeletedObjectsLH();
- if (!pendingDeletedObjs.empty()) {
- // use a temporary copy of the pending deletes so dropping the lock when
- // the buffer is sent is safe.
- PendingDeletedObjsMap tmp(pendingDeletedObjs);
- pendingDeletedObjs.clear();
-
- for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
- std::string packageName;
- std::string className;
- msgBuffer.reset();
- uint32_t v1Objs = 0;
- uint32_t v2Objs = 0;
- Variant::List list_;
-
- size_t pos = mIter->first.find(":");
- packageName = mIter->first.substr(0, pos);
- className = mIter->first.substr(pos+1);
-
- for (DeletedObjectList::iterator lIter = mIter->second.begin();
- lIter != mIter->second.end(); lIter++) {
- msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
- std::string oid = (*lIter)->objectId;
- if (!(*lIter)->encodedV1Config.empty()) {
- encodeHeader(msgBuffer, 'c');
- msgBuffer.putRawData((*lIter)->encodedV1Config);
- QPID_LOG(trace, "Deleting V1 properties " << oid
- << " len=" << (*lIter)->encodedV1Config.size());
- v1Objs++;
- }
- if (!(*lIter)->encodedV1Inst.empty()) {
- encodeHeader(msgBuffer, 'i');
- msgBuffer.putRawData((*lIter)->encodedV1Inst);
- QPID_LOG(trace, "Deleting V1 statistics " << oid
- << " len=" << (*lIter)->encodedV1Inst.size());
- v1Objs++;
- }
- if (v1Objs >= maxReplyObjs) {
- v1Objs = 0;
- contentSize = msgBuffer.getSize();
- stringstream key;
- key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
- << key.str() << " len=" << contentSize);
- }
-
- if (!(*lIter)->encodedV2.empty()) {
- QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
- list_.push_back((*lIter)->encodedV2);
- if (++v2Objs >= maxReplyObjs) {
- v2Objs = 0;
-
- string content;
- ListCodec::encode(list_, content);
- list_.clear();
- if (content.length()) {
- stringstream key;
- Variant::Map headers;
- key << "agent.ind.data." << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey;
- if (!instanceNameKey.empty())
- key << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
- }
- }
- }
- } // end current list
-
- // send any remaining objects...
-
- if (v1Objs) {
- contentSize = BUFSIZE - msgBuffer.available();
- stringstream key;
- key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
- }
-
- if (!list_.empty()) {
- string content;
- ListCodec::encode(list_, content);
- list_.clear();
- if (content.length()) {
- stringstream key;
- Variant::Map headers;
- key << "agent.ind.data." << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey;
- if (!instanceNameKey.empty())
- key << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
- }
- }
- } // end map
- }
-
- //
- // Process the entire object map. Remember: we drop the userLock each time we call
- // sendBuffer(). This allows the managementObjects map to be altered during the
- // sendBuffer() call, so always restart the search after a sendBuffer() call
- //
- while (1) {
- msgBuffer.reset();
- Variant::List list_;
- uint32_t pcount;
- uint32_t scount;
- uint32_t v1Objs, v2Objs;
- ManagementObjectMap::iterator baseIter;
- std::string packageName;
- std::string className;
-
- for (baseIter = managementObjects.begin();
- baseIter != managementObjects.end();
- baseIter++) {
- ManagementObject* baseObject = baseIter->second;
- //
- // Skip until we find a base object requiring processing...
- //
- if (baseObject->getFlags() == 0) {
- packageName = baseObject->getPackageName();
- className = baseObject->getClassName();
- break;
- }
- }
-
- if (baseIter == managementObjects.end())
- break; // done - all objects processed
-
- pcount = scount = 0;
- v1Objs = 0;
- v2Objs = 0;
- list_.clear();
- msgBuffer.reset();
-
- for (ManagementObjectMap::iterator iter = baseIter;
- iter != managementObjects.end();
- iter++) {
- msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
- ManagementObject* baseObject = baseIter->second;
- ManagementObject* object = iter->second;
- bool send_stats, send_props;
- if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
- object->setFlags(1);
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- // skip any objects marked deleted since our first pass. Deal with them
- // on the next periodic cycle...
- if (object->isDeleted()) {
- continue;
- }
-
- send_props = (object->getConfigChanged() || object->getForcePublish());
- send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
-
- if (send_props && qmf1Support) {
- size_t pos = msgBuffer.getPosition();
- encodeHeader(msgBuffer, 'c');
- sBuf.clear();
- object->writeProperties(sBuf);
- msgBuffer.putRawData(sBuf);
- QPID_LOG(trace, "Changed V1 properties "
- << object->getObjectId().getV2Key()
- << " len=" << msgBuffer.getPosition()-pos);
- ++v1Objs;
- }
-
- if (send_stats && qmf1Support) {
- size_t pos = msgBuffer.getPosition();
- encodeHeader(msgBuffer, 'i');
- sBuf.clear();
- object->writeStatistics(sBuf);
- msgBuffer.putRawData(sBuf);
- QPID_LOG(trace, "Changed V1 statistics "
- << object->getObjectId().getV2Key()
- << " len=" << msgBuffer.getPosition()-pos);
- ++v1Objs;
- }
-
- if ((send_stats || send_props) && qmf2Support) {
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oid;
-
- object->getObjectId().mapEncode(oid);
- map_["_object_id"] = oid;
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- "_data",
- object->getMd5Sum());
- object->writeTimestamps(map_);
- object->mapEncodeValues(values, send_props, send_stats);
- map_["_values"] = values;
- list_.push_back(map_);
- v2Objs++;
- QPID_LOG(trace, "Changed V2"
- << (send_stats? " statistics":"")
- << (send_props? " properties":"")
- << " map=" << map_);
- }
-
- if (send_props) pcount++;
- if (send_stats) scount++;
-
- object->setForcePublish(false);
-
- if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
- (qmf2Support && (v2Objs >= maxReplyObjs)))
- break; // have enough objects, send an indication...
- }
- }
-
- if (pcount || scount) {
- if (qmf1Support) {
- contentSize = BUFSIZE - msgBuffer.available();
- if (contentSize > 0) {
- stringstream key;
- key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
- << " props=" << pcount
- << " stats=" << scount
- << " len=" << contentSize);
- }
- }
-
- if (qmf2Support) {
- string content;
- ListCodec::encode(list_, content);
- if (content.length()) {
- stringstream key;
- Variant::Map headers;
- key << "agent.ind.data." << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey;
- if (!instanceNameKey.empty())
- key << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
- QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
- << " props=" << pcount
- << " stats=" << scount
- << " len=" << content.length());
- }
- }
- }
- } // end processing updates for all objects
-
- if (objectsDeleted) deleteOrphanedAgentsLH();
-
- // heartbeat generation
-
- if (qmf1Support) {
-#define BUFSIZE 65536
- uint32_t contentSize;
- char msgChars[BUFSIZE];
- Buffer msgBuffer(msgChars, BUFSIZE);
- encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now())));
-
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
- routingKey = "console.heartbeat.1.0";
- sendBufferLH(msgBuffer, contentSize, mExchange, routingKey);
- QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
- }
-
- if (qmf2Support) {
- std::stringstream addr_key;
-
- addr_key << "agent.ind.heartbeat." << vendorNameKey << "." << productNameKey;
- if (!instanceNameKey.empty())
- addr_key << "." << instanceNameKey;
-
- Variant::Map map;
- Variant::Map headers;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_agent_heartbeat_indication";
- headers["qmf.agent"] = name_address;
-
- map["_values"] = attrMap;
- map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
- map["_values"].asMap()["_heartbeat_interval"] = interval;
- map["_values"].asMap()["_epoch"] = bootSequence;
-
- string content;
- MapCodec::encode(map, content);
-
- // Set TTL (in msecs) on outgoing heartbeat indications based on the interval
- // time to prevent stale heartbeats from getting to the consoles.
- sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
-
- QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
- }
-}
-
-void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
-{
- ManagementObjectMap::iterator iter = managementObjects.find(oid);
- if (iter == managementObjects.end())
- return;
- ManagementObject* object = iter->second;
- if (!object->isDeleted())
- return;
-
- // since sendBufferLH drops the userLock, don't call it until we
- // are done manipulating the object.
-#define DNOW_BUFSIZE 2048
- char msgChars[DNOW_BUFSIZE];
- Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
- Variant::List list_;
- stringstream v1key, v2key;
-
- if (qmf1Support) {
- string sBuf;
-
- v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
- encodeHeader(msgBuffer, 'c');
- object->writeProperties(sBuf);
- msgBuffer.putRawData(sBuf);
- }
-
- if (qmf2Support) {
- Variant::Map map_;
- Variant::Map values;
-
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- "_data",
- object->getMd5Sum());
- object->writeTimestamps(map_);
- object->mapEncodeValues(values, true, false);
- map_["_values"] = values;
- list_.push_back(map_);
- v2key << "agent.ind.data." << keyifyNameStr(object->getPackageName())
- << "." << keyifyNameStr(object->getClassName())
- << "." << vendorNameKey
- << "." << productNameKey;
- if (!instanceNameKey.empty())
- v2key << "." << instanceNameKey;
- }
-
- object = 0;
- managementObjects.erase(oid);
-
- // object deleted, ok to drop lock now.
-
- if (qmf1Support) {
- uint32_t contentSize = msgBuffer.getPosition();
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str());
- QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
- }
-
- if (qmf2Support) {
- Variant::Map headers;
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- string content;
- ListCodec::encode(list_, content);
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str());
- QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str());
- }
-}
-
-void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t sequence,
- uint32_t code, const string& text)
-{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader (outBuffer, 'z', sequence);
- outBuffer.putLong (code);
- outBuffer.putShortString (text);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
- replyToKey << " seq=" << sequence);
-}
-
-void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid,
- const string& text, uint32_t code, bool viaLocal)
-{
- static const string addr_exchange("qmf.default.direct");
-
- Variant::Map map;
- Variant::Map headers;
- Variant::Map values;
- string content;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_exception";
- headers["qmf.agent"] = viaLocal ? "broker" : name_address;
-
- values["error_code"] = code;
- values["error_text"] = text;
- map["_values"] = values;
-
- MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
-
- QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
-}
-
-bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
- const string& routingKey,
- const FieldTable* /*args*/,
- const bool topic,
- int qmfVersion)
-{
- sys::Mutex::ScopedLock lock (userLock);
- Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
-
- if (topic && qmfVersion == 1) {
-
- // qmf1 is bound only to the topic management exchange.
- // Parse the routing key. This management broker should act as though it
- // is bound to the exchange to match the following keys:
- //
- // agent.1.0.#
- // broker
- // schema.#
-
- if (routingKey == "broker") {
- dispatchAgentCommandLH(msg);
- return false;
- }
-
- if (routingKey.length() > 6) {
-
- if (routingKey.compare(0, 9, "agent.1.0") == 0) {
- dispatchAgentCommandLH(msg);
- return false;
- }
-
- if (routingKey.compare(0, 8, "agent.1.") == 0) {
- return authorizeAgentMessageLH(msg);
- }
-
- if (routingKey.compare(0, 7, "schema.") == 0) {
- dispatchAgentCommandLH(msg);
- return true;
- }
- }
- }
-
- if (qmfVersion == 2) {
-
- if (topic) {
- // Intercept messages bound to:
- // "console.ind.locate.# - process these messages, and also allow them to be forwarded.
- if (routingKey == "console.request.agent_locate") {
- dispatchAgentCommandLH(msg);
- return true;
- }
-
- } else { // direct exchange
-
- // Intercept messages bound to:
- // "broker" - generic alias for the local broker
- // "<name_address>" - the broker agent's proper name
- // and do not forward them futher
- if (routingKey == "broker" || routingKey == name_address) {
- dispatchAgentCommandLH(msg, routingKey == "broker");
- return false;
- }
- }
- }
-
- return true;
-}
-
-void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
-{
- moveNewObjectsLH();
-
- string methodName;
- string packageName;
- string className;
- uint8_t hash[16];
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- AclModule* acl = broker->getAcl();
- string inArgs;
-
- string sBuf;
- inBuffer.getRawData(sBuf, 16);
- ObjectId objId;
- objId.decode(sBuf);
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
- inBuffer.getShortString(methodName);
- inBuffer.getRawData(inArgs, inBuffer.available());
-
- QPID_LOG(debug, "RECV MethodRequest (v1) class=" << packageName << ":" << className << "(" << Uuid(hash) << ") method=" <<
- methodName << " replyTo=" << replyToKey);
-
- encodeHeader(outBuffer, 'm', sequence);
-
- if (disallowAllV1Methods) {
- outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
- outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2");
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
- return;
- }
-
- DisallowedMethods::const_iterator i = disallowed.find(make_pair(className, methodName));
- if (i != disallowed.end()) {
- outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
- outBuffer.putMediumString(i->second);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
- return;
- }
-
- string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
- if (acl != 0) {
- map<acl::Property, string> params;
- params[acl::PROP_SCHEMAPACKAGE] = packageName;
- params[acl::PROP_SCHEMACLASS] = className;
-
- if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
- outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
- outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
- return;
- }
- }
-
- ManagementObjectMap::iterator iter = numericFind(objId);
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
- outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
- outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
- } else {
- if ((iter->second->getPackageName() != packageName) ||
- (iter->second->getClassName() != className)) {
- outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
- outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
- }
- else
- try {
- outBuffer.record();
- sys::Mutex::ScopedUnlock u(userLock);
- string outBuf;
- iter->second->doMethod(methodName, inArgs, outBuf, userId);
- outBuffer.putRawData(outBuf);
- } catch(exception& e) {
- outBuffer.restore();
- outBuffer.putLong(Manageable::STATUS_EXCEPTION);
- outBuffer.putMediumString(e.what());
- }
- }
-
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
-}
-
-
-void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk,
- const string& cid, const ConnectionToken* connToken, bool viaLocal)
-{
- moveNewObjectsLH();
-
- string methodName;
- Variant::Map inMap;
- MapCodec::decode(body, inMap);
- Variant::Map::const_iterator oid, mid;
- string content;
- string error;
- uint32_t errorCode(0);
-
- Variant::Map outMap;
- Variant::Map headers;
-
- headers["method"] = "response";
- headers["qmf.opcode"] = "_method_response";
- headers["qmf.agent"] = viaLocal ? "broker" : name_address;
-
- if ((oid = inMap.find("_object_id")) == inMap.end() ||
- (mid = inMap.find("_method_name")) == inMap.end()) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
- Manageable::STATUS_PARAMETER_INVALID, viaLocal);
- return;
- }
-
- ObjectId objId;
- Variant::Map inArgs;
- Variant::Map callMap;
-
- try {
- // coversions will throw if input is invalid.
- objId = ObjectId(oid->second.asMap());
- methodName = mid->second.getString();
-
- mid = inMap.find("_arguments");
- if (mid != inMap.end()) {
- inArgs = (mid->second).asMap();
- }
- } catch(exception& e) {
- sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
- return;
- }
-
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
-
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
- stringstream estr;
- estr << "No object found with ID=" << objId;
- sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal);
- return;
- }
-
- // validate
- AclModule* acl = broker->getAcl();
- DisallowedMethods::const_iterator i;
-
- i = disallowed.find(make_pair(iter->second->getClassName(), methodName));
- if (i != disallowed.end()) {
- sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
- return;
- }
-
- string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
- if (acl != 0) {
- map<acl::Property, string> params;
- params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName();
- params[acl::PROP_SCHEMACLASS] = iter->second->getClassName();
-
- if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
- Manageable::STATUS_FORBIDDEN, viaLocal);
- return;
- }
- }
-
- // invoke the method
-
- QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
- << ":" << iter->second->getClassName() << " method=" <<
- methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs);
-
- try {
- sys::Mutex::ScopedUnlock u(userLock);
- iter->second->doMethod(methodName, inArgs, callMap, userId);
- errorCode = callMap["_status_code"].asUint32();
- if (errorCode == 0) {
- outMap["_arguments"] = Variant::Map();
- for (Variant::Map::const_iterator iter = callMap.begin();
- iter != callMap.end(); iter++)
- if (iter->first != "_status_code" && iter->first != "_status_text")
- outMap["_arguments"].asMap()[iter->first] = iter->second;
- } else
- error = callMap["_status_text"].asString();
- } catch(exception& e) {
- sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
- return;
- }
-
- if (errorCode != 0) {
- sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal);
- return;
- }
-
- MapCodec::encode(outMap, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
- QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap);
-}
-
-
-void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, uint32_t sequence)
-{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
-
- encodeHeader (outBuffer, 'b', sequence);
- uuid.encode (outBuffer);
-
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
-}
-
-void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence)
-{
- QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- for (PackageMap::iterator pIter = packages.begin ();
- pIter != packages.end ();
- pIter++)
- {
- encodeHeader (outBuffer, 'p', sequence);
- encodePackageIndication (outBuffer, pIter);
- }
-
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- if (outLen) {
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
- }
-
- sendCommandCompleteLH(replyToKey, sequence);
-}
-
-void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
-{
- string packageName;
-
- inBuffer.getShortString(packageName);
-
- QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
-
- findOrAddPackageLH(packageName);
-}
-
-void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
-{
- string packageName;
-
- inBuffer.getShortString(packageName);
-
- QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
-
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end())
- {
- typedef std::pair<SchemaClassKey, uint8_t> _ckeyType;
- std::list<_ckeyType> classes;
- ClassMap &cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin();
- cIter != cMap.end();
- cIter++) {
- if (cIter->second.hasSchema()) {
- classes.push_back(make_pair(cIter->first, cIter->second.kind));
- }
- }
-
- while (classes.size()) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'q', sequence);
- encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second);
-
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
- "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
- classes.pop_front();
- }
-
- }
- sendCommandCompleteLH(replyToKey, sequence);
-}
-
-void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t)
-{
- string packageName;
- SchemaClassKey key;
-
- uint8_t kind = inBuffer.getOctet();
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(key.name);
- inBuffer.getBin128(key.hash);
-
- QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
- "), replyTo=" << replyToKey);
-
- PackageMap::iterator pIter = findOrAddPackageLH(packageName);
- ClassMap::iterator cIter = pIter->second.find(key);
- if (cIter == pIter->second.end() || !cIter->second.hasSchema()) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- uint32_t sequence = nextRequestSequence++;
-
- // Schema Request
- encodeHeader (outBuffer, 'S', sequence);
- outBuffer.putShortString(packageName);
- key.encode(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
- "), to=" << replyToKey << " seq=" << sequence);
-
- if (cIter != pIter->second.end())
- pIter->second.erase(key);
-
- pIter->second.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, sequence)));
- }
-}
-
-void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
-{
- // If the management package is attached locally (embedded in the broker or
- // linked in via plug-in), call the schema handler directly. If the package
- // is from a remote management agent, send the stored schema information.
-
- if (writeSchemaCall != 0) {
- string schema;
- writeSchemaCall(schema);
- buf.putRawData(schema);
- } else
- buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
-}
-
-void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
-{
- string packageName;
- SchemaClassKey key;
-
- inBuffer.getShortString (packageName);
- key.decode(inBuffer);
-
- QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
- "), replyTo=" << rte << "/" << rtk << " seq=" << sequence);
-
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end()) {
- ClassMap& cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end()) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- SchemaClass& classInfo = cIter->second;
-
- if (classInfo.hasSchema()) {
- encodeHeader(outBuffer, 's', sequence);
- classInfo.appendSchema(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, rte, rtk);
- QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence);
- }
- else
- sendCommandCompleteLH(rtk, sequence, 1, "Schema not available");
- }
- else
- sendCommandCompleteLH(rtk, sequence, 1, "Class key not found");
- }
- else
- sendCommandCompleteLH(rtk, sequence, 1, "Package not found");
-}
-
-void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
-{
- string packageName;
- SchemaClassKey key;
-
- inBuffer.record();
- inBuffer.getOctet();
- inBuffer.getShortString(packageName);
- key.decode(inBuffer);
- inBuffer.restore();
-
- QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
-
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end()) {
- ClassMap& cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end() && cIter->second.pendingSequence == sequence) {
- size_t length = validateSchema(inBuffer, cIter->second.kind);
- if (length == 0) {
- QPID_LOG(warning, "Management Agent received invalid schema response: " << packageName << "." << key.name);
- cMap.erase(key);
- } else {
- cIter->second.data.resize(length);
- inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length);
-
- // Publish a class-indication message
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'q');
- encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, mExchange, "schema.class");
- QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
- " to=schema.class");
- }
- }
- }
-}
-
-bool ManagementAgent::bankInUse (uint32_t bank)
-{
- for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
- aIter != remoteAgents.end();
- aIter++)
- if (aIter->second->agentBank == bank)
- return true;
- return false;
-}
-
-uint32_t ManagementAgent::allocateNewBank ()
-{
- while (bankInUse (nextRemoteBank))
- nextRemoteBank++;
-
- uint32_t allocated = nextRemoteBank++;
- writeData ();
- return allocated;
-}
-
-uint32_t ManagementAgent::assignBankLH (uint32_t requestedBank)
-{
- if (requestedBank == 0 || bankInUse (requestedBank))
- return allocateNewBank ();
- return requestedBank;
-}
-
-void ManagementAgent::deleteOrphanedAgentsLH()
-{
- list<ObjectId> deleteList;
-
- for (RemoteAgentMap::const_iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) {
- bool found = false;
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- if (iter->first == aIter->first && !iter->second->isDeleted()) {
- found = true;
- break;
- }
- }
-
- if (!found)
- deleteList.push_back(aIter->first);
- }
-
- for (list<ObjectId>::const_iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++)
- remoteAgents.erase(*dIter);
-}
-
-void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
-{
- string label;
- uint32_t requestedBrokerBank, requestedAgentBank;
- uint32_t assignedBank;
- ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
- Uuid systemId;
-
- moveNewObjectsLH();
- deleteOrphanedAgentsLH();
- RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
- if (aIter != remoteAgents.end()) {
- // There already exists an agent on this session. Reject the request.
- sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent");
- return;
- }
-
- inBuffer.getShortString(label);
- systemId.decode(inBuffer);
- requestedBrokerBank = inBuffer.getLong();
- requestedAgentBank = inBuffer.getLong();
-
- QPID_LOG(debug, "RECV (Agent)AttachRequest label=" << label << " reqBrokerBank=" << requestedBrokerBank <<
- " reqAgentBank=" << requestedAgentBank << " replyTo=" << replyToKey << " seq=" << sequence);
-
- assignedBank = assignBankLH(requestedAgentBank);
-
- boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this));
- agent->brokerBank = brokerBank;
- agent->agentBank = assignedBank;
- agent->routingKey = replyToKey;
- agent->connectionRef = connectionRef;
- agent->mgmtObject = new _qmf::Agent (this, agent.get());
- agent->mgmtObject->set_connectionRef(agent->connectionRef);
- agent->mgmtObject->set_label (label);
- agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
- agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
- agent->mgmtObject->set_brokerBank (brokerBank);
- agent->mgmtObject->set_agentBank (assignedBank);
- addObject (agent->mgmtObject, 0);
- remoteAgents[connectionRef] = agent;
-
- QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
-
- // Send an Attach Response
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader (outBuffer, 'a', sequence);
- outBuffer.putLong (brokerBank);
- outBuffer.putLong (assignedBank);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
- " to=" << replyToKey << " seq=" << sequence);
-}
-
-void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
-{
- FieldTable ft;
- FieldTable::ValuePtr value;
-
- moveNewObjectsLH();
-
- ft.decode(inBuffer);
-
- QPID_LOG(debug, "RECV GetQuery (v1) query=" << ft << " seq=" << sequence);
-
- value = ft.get("_class");
- if (value.get() == 0 || !value->convertsTo<string>()) {
- value = ft.get("_objectid");
- if (value.get() == 0 || !value->convertsTo<string>())
- return;
-
- ObjectId selector(value->get<string>());
- ManagementObjectMap::iterator iter = numericFind(selector);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- if (!object->isDeleted()) {
- string sBuf;
- encodeHeader(outBuffer, 'g', sequence);
- object->writeProperties(sBuf);
- outBuffer.putRawData(sBuf);
- sBuf.clear();
- object->writeStatistics(sBuf, true);
- outBuffer.putRawData(sBuf);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
- }
- }
- sendCommandCompleteLH(replyToKey, sequence);
- return;
- }
-
- string className (value->get<string>());
- std::list<ObjectId>matches;
-
- // build up a set of all objects to be dumped
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName () == className) {
- matches.push_back(object->getObjectId());
- }
- }
-
- // send them (as sendBufferLH drops the userLock)
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
- while (matches.size()) {
- ObjectId objId = matches.front();
- ManagementObjectMap::iterator oIter = managementObjects.find( objId );
- if (oIter != managementObjects.end()) {
- ManagementObject* object = oIter->second;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- if (!object->isDeleted()) {
- string sProps, sStats;
- object->writeProperties(sProps);
- object->writeStatistics(sStats, true);
-
- size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes.
- if (len > MA_BUFFER_SIZE) {
- QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!");
- } else {
- if (outBuffer.available() < len) { // not enough room in current buffer, send it.
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock
- QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
- continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
- }
- encodeHeader(outBuffer, 'g', sequence);
- outBuffer.putRawData(sProps);
- outBuffer.putRawData(sStats);
- }
- }
- }
- matches.pop_front();
- }
-
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- if (outLen) {
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
- }
-
- sendCommandCompleteLH(replyToKey, sequence);
-}
-
-
-void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
-{
- moveNewObjectsLH();
-
- Variant::Map inMap;
- Variant::Map::const_iterator i;
- Variant::Map headers;
-
- MapCodec::decode(body, inMap);
- QPID_LOG(debug, "RECV GetQuery (v2): map=" << inMap << " seq=" << cid);
-
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = viaLocal ? "broker" : name_address;
-
- /*
- * Unpack the _what element of the query. Currently we only support OBJECT queries.
- */
- i = inMap.find("_what");
- if (i == inMap.end()) {
- sendExceptionLH(rte, rtk, cid, "_what element missing in Query");
- return;
- }
-
- if (i->second.getType() != qpid::types::VAR_STRING) {
- sendExceptionLH(rte, rtk, cid, "_what element is not a string");
- return;
- }
-
- if (i->second.asString() != "OBJECT") {
- sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
- return;
- }
-
- string className;
- string packageName;
-
- /*
- * Handle the _schema_id element, if supplied.
- */
- i = inMap.find("_schema_id");
- if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
- const Variant::Map& schemaIdMap(i->second.asMap());
-
- Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
- if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
- className = s_iter->second.asString();
-
- s_iter = schemaIdMap.find("_package_name");
- if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
- packageName = s_iter->second.asString();
- }
-
-
- /*
- * Unpack the _object_id element of the query if it is present. If it is present, find that one
- * object and return it. If it is not present, send a class-based result.
- */
- i = inMap.find("_object_id");
- if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
- Variant::List list_;
- ObjectId objId(i->second.asMap());
-
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- if (!object->isDeleted()) {
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oidMap;
-
- object->mapEncodeValues(values, true, true); // write both stats and properties
- objId.mapEncode(oidMap);
- map_["_values"] = values;
- map_["_object_id"] = oidMap;
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- "_data",
- object->getMd5Sum());
- list_.push_back(map_);
- }
-
- string content;
-
- ListCodec::encode(list_, content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
- QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
- return;
- }
- } else {
- // send class-based result.
- Variant::List _list;
- Variant::List _subList;
- unsigned int objCount = 0;
-
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName() == className &&
- (packageName.empty() || object->getPackageName() == packageName)) {
-
-
- if (!object->isDeleted()) {
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oidMap;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- object->writeTimestamps(map_);
- object->mapEncodeValues(values, true, true); // write both stats and properties
- iter->first.mapEncode(oidMap);
-
- map_["_values"] = values;
- map_["_object_id"] = oidMap;
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- "_data",
- object->getMd5Sum());
- _subList.push_back(map_);
- if (++objCount >= maxReplyObjs) {
- objCount = 0;
- _list.push_back(_subList);
- _subList.clear();
- }
- }
- }
- }
-
- if (_subList.size())
- _list.push_back(_subList);
-
- headers["partial"] = Variant();
- string content;
- while (_list.size() > 1) {
- ListCodec::encode(_list.front().asList(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
- _list.pop_front();
- QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
- }
- headers.erase("partial");
- ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
- QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
- return;
- }
-
- // Unrecognized query - Send empty message to indicate CommandComplete
- string content;
- ListCodec::encode(Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
- QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
-}
-
-
-void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid)
-{
- QPID_LOG(debug, "RCVD AgentLocateRequest");
-
- Variant::Map map;
- Variant::Map headers;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_agent_locate_response";
- headers["qmf.agent"] = name_address;
-
- map["_values"] = attrMap;
- map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now()));
- map["_values"].asMap()["_heartbeat_interval"] = interval;
- map["_values"].asMap()["_epoch"] = bootSequence;
-
- string content;
- MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
- clientWasAdded = true;
-
- QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
-}
-
-
-bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
-{
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence = 0;
- bool methodReq = false;
- bool mapMsg = false;
- string packageName;
- string className;
- string methodName;
- string cid;
-
- //
- // If the message is larger than our working buffer size, we can't determine if it's
- // authorized or not. In this case, return true (authorized) if there is no ACL in place,
- // otherwise return false;
- //
- if (msg.encodedSize() > MA_BUFFER_SIZE)
- return broker->getAcl() == 0;
-
- msg.encodeContent(inBuffer);
- uint32_t bufferLen = inBuffer.getPosition();
- inBuffer.reset();
-
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
-
- const framing::FieldTable *headers = msg.getApplicationHeaders();
-
- if (headers && msg.getAppId() == "qmf2")
- {
- mapMsg = true;
-
- if (p && p->hasCorrelationId()) {
- cid = p->getCorrelationId();
- }
-
- if (headers->getAsString("qmf.opcode") == "_method_request")
- {
- methodReq = true;
-
- // extract object id and method name
-
- string body;
- inBuffer.getRawData(body, bufferLen);
- Variant::Map inMap;
- MapCodec::decode(body, inMap);
- Variant::Map::const_iterator oid, mid;
-
- ObjectId objId;
-
- if ((oid = inMap.find("_object_id")) == inMap.end() ||
- (mid = inMap.find("_method_name")) == inMap.end()) {
- QPID_LOG(warning,
- "Missing fields in QMF authorize req received.");
- return false;
- }
-
- try {
- // coversions will throw if input is invalid.
- objId = ObjectId(oid->second.asMap());
- methodName = mid->second.getString();
- } catch(exception& /*e*/) {
- QPID_LOG(warning,
- "Badly formatted QMF authorize req received.");
- return false;
- }
-
- // look up schema for object to get package and class name
-
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
-
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
- QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " <<
- objId);
- return false;
- }
-
- packageName = iter->second->getPackageName();
- className = iter->second->getClassName();
- }
- } else { // old style binary message format
-
- uint8_t opcode;
-
- if (!checkHeader(inBuffer, &opcode, &sequence))
- return false;
-
- if (opcode == 'M') {
- methodReq = true;
-
- // extract method name & schema package and class name
-
- uint8_t hash[16];
- inBuffer.getLongLong(); // skip over object id
- inBuffer.getLongLong();
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(className);
- inBuffer.getBin128(hash);
- inBuffer.getShortString(methodName);
-
- }
- }
-
- if (methodReq) {
- // TODO: check method call against ACL list.
- map<acl::Property, string> params;
- AclModule* acl = broker->getAcl();
- if (acl == 0)
- return true;
-
- string userId = ((const qpid::broker::ConnectionState*) msg.getPublisher())->getUserId();
- params[acl::PROP_SCHEMAPACKAGE] = packageName;
- params[acl::PROP_SCHEMACLASS] = className;
-
- if (acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params))
- return true;
-
- // authorization failed, send reply if replyTo present
-
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo()) {
- const framing::ReplyTo& rt = p->getReplyTo();
- string rte = rt.getExchange();
- string rtk = rt.getRoutingKey();
- string cid;
- if (p && p->hasCorrelationId())
- cid = p->getCorrelationId();
-
- if (mapMsg) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
- Manageable::STATUS_FORBIDDEN, false);
- } else {
-
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'm', sequence);
- outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
- outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, rte, rtk);
- }
-
- QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
- }
-
- return false;
- }
-
- return true;
-}
-
-void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
-{
- string rte;
- string rtk;
- const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
- if (p && p->hasReplyTo()) {
- const framing::ReplyTo& rt = p->getReplyTo();
- rte = rt.getExchange();
- rtk = rt.getRoutingKey();
- }
- else
- return;
-
- Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
- uint8_t opcode;
-
- if (msg.encodedSize() > MA_BUFFER_SIZE) {
- QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
- msg.encodedSize());
- return;
- }
-
- msg.encodeContent(inBuffer);
- uint32_t bufferLen = inBuffer.getPosition();
- inBuffer.reset();
-
- ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
- const framing::FieldTable *headers = msg.getApplicationHeaders();
- if (headers && msg.getAppId() == "qmf2")
- {
- string opcode = headers->getAsString("qmf.opcode");
- string contentType = headers->getAsString("qmf.content");
- string body;
- string cid;
- inBuffer.getRawData(body, bufferLen);
-
- if (p && p->hasCorrelationId()) {
- cid = p->getCorrelationId();
- }
-
- if (opcode == "_method_request")
- return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
- else if (opcode == "_query_request")
- return handleGetQueryLH(body, rte, rtk, cid, viaLocal);
- else if (opcode == "_agent_locate_request")
- return handleLocateRequestLH(body, rte, rtk, cid);
-
- QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
- return;
- }
-
- // old preV2 binary messages
-
- while (inBuffer.getPosition() < bufferLen) {
- uint32_t sequence;
- if (!checkHeader(inBuffer, &opcode, &sequence))
- return;
-
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence);
- else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence);
- else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
- else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
- }
-}
-
-ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string name)
-{
- PackageMap::iterator pIter = packages.find (name);
- if (pIter != packages.end ())
- return pIter;
-
- // No such package found, create a new map entry.
- pair<PackageMap::iterator, bool> result =
- packages.insert(pair<string, ClassMap>(name, ClassMap()));
- QPID_LOG (debug, "ManagementAgent added package " << name);
-
- // Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader (outBuffer, 'p');
- encodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, mExchange, "schema.package");
- QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package");
-
- return result.first;
-}
-
-void ManagementAgent::addClassLH(uint8_t kind,
- PackageMap::iterator pIter,
- const string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- SchemaClassKey key;
- ClassMap& cMap = pIter->second;
-
- key.name = className;
- memcpy(&key.hash, md5Sum, 16);
-
- ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end())
- return;
-
- // No such class found, create a new class with local information.
- QPID_LOG (debug, "ManagementAgent added class " << pIter->first << ":" <<
- key.name);
-
- cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(kind, schemaCall)));
- cIter = cMap.find(key);
-}
-
-void ManagementAgent::encodePackageIndication(Buffer& buf,
- PackageMap::iterator pIter)
-{
- buf.putShortString((*pIter).first);
-}
-
-void ManagementAgent::encodeClassIndication(Buffer& buf,
- const std::string packageName,
- const SchemaClassKey key,
- uint8_t kind)
-{
- buf.putOctet(kind);
- buf.putShortString(packageName);
- key.encode(buf);
-}
-
-size_t ManagementAgent::validateSchema(Buffer& inBuffer, uint8_t kind)
-{
- if (kind == ManagementItem::CLASS_KIND_TABLE)
- return validateTableSchema(inBuffer);
- else if (kind == ManagementItem::CLASS_KIND_EVENT)
- return validateEventSchema(inBuffer);
- return 0;
-}
-
-size_t ManagementAgent::validateTableSchema(Buffer& inBuffer)
-{
- uint32_t start = inBuffer.getPosition();
- uint32_t end;
- string text;
- uint8_t hash[16];
-
- try {
- inBuffer.record();
- uint8_t kind = inBuffer.getOctet();
- if (kind != ManagementItem::CLASS_KIND_TABLE)
- return 0;
-
- inBuffer.getShortString(text);
- inBuffer.getShortString(text);
- inBuffer.getBin128(hash);
-
- uint8_t superType = 0; //inBuffer.getOctet();
-
- uint16_t propCount = inBuffer.getShort();
- uint16_t statCount = inBuffer.getShort();
- uint16_t methCount = inBuffer.getShort();
-
- if (superType == 1) {
- inBuffer.getShortString(text);
- inBuffer.getShortString(text);
- inBuffer.getBin128(hash);
- }
-
- for (uint16_t idx = 0; idx < propCount + statCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- }
-
- for (uint16_t idx = 0; idx < methCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- if (!ft.isSet("argCount"))
- return 0;
- int argCount = ft.getAsInt("argCount");
- for (int mIdx = 0; mIdx < argCount; mIdx++) {
- FieldTable aft;
- aft.decode(inBuffer);
- }
- }
- } catch (exception& /*e*/) {
- return 0;
- }
-
- end = inBuffer.getPosition();
- inBuffer.restore(); // restore original position
- return end - start;
-}
-
-size_t ManagementAgent::validateEventSchema(Buffer& inBuffer)
-{
- uint32_t start = inBuffer.getPosition();
- uint32_t end;
- string text;
- uint8_t hash[16];
-
- try {
- inBuffer.record();
- uint8_t kind = inBuffer.getOctet();
- if (kind != ManagementItem::CLASS_KIND_EVENT)
- return 0;
-
- inBuffer.getShortString(text);
- inBuffer.getShortString(text);
- inBuffer.getBin128(hash);
-
- uint8_t superType = 0; //inBuffer.getOctet();
-
- uint16_t argCount = inBuffer.getShort();
-
- if (superType == 1) {
- inBuffer.getShortString(text);
- inBuffer.getShortString(text);
- inBuffer.getBin128(hash);
- }
- for (uint16_t idx = 0; idx < argCount; idx++) {
- FieldTable ft;
- ft.decode(inBuffer);
- }
- } catch (exception& /*e*/) {
- return 0;
- }
-
- end = inBuffer.getPosition();
- inBuffer.restore(); // restore original position
- return end - start;
-}
-
-ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid)
-{
- ManagementObjectMap::iterator iter = managementObjects.begin();
- for (; iter != managementObjects.end(); iter++) {
- if (oid.equalV1(iter->first))
- break;
- }
-
- return iter;
-}
-
-void ManagementAgent::disallow(const string& className, const string& methodName, const string& message) {
- disallowed[make_pair(className, methodName)] = message;
-}
-
-void ManagementAgent::SchemaClassKey::mapEncode(Variant::Map& _map) const {
- _map["_cname"] = name;
- _map["_hash"] = qpid::types::Uuid(hash);
-}
-
-void ManagementAgent::SchemaClassKey::mapDecode(const Variant::Map& _map) {
- Variant::Map::const_iterator i;
-
- if ((i = _map.find("_cname")) != _map.end()) {
- name = i->second.asString();
- }
-
- if ((i = _map.find("_hash")) != _map.end()) {
- const qpid::types::Uuid& uuid = i->second.asUuid();
- memcpy(hash, uuid.data(), uuid.size());
- }
-}
-
-void ManagementAgent::SchemaClassKey::encode(qpid::framing::Buffer& buffer) const {
- buffer.checkAvailable(encodedBufSize());
- buffer.putShortString(name);
- buffer.putBin128(hash);
-}
-
-void ManagementAgent::SchemaClassKey::decode(qpid::framing::Buffer& buffer) {
- buffer.checkAvailable(encodedBufSize());
- buffer.getShortString(name);
- buffer.getBin128(hash);
-}
-
-uint32_t ManagementAgent::SchemaClassKey::encodedBufSize() const {
- return 1 + name.size() + 16 /* bin128 */;
-}
-
-void ManagementAgent::SchemaClass::mapEncode(Variant::Map& _map) const {
- _map["_type"] = kind;
- _map["_pending_sequence"] = pendingSequence;
- _map["_data"] = data;
-}
-
-void ManagementAgent::SchemaClass::mapDecode(const Variant::Map& _map) {
- Variant::Map::const_iterator i;
-
- if ((i = _map.find("_type")) != _map.end()) {
- kind = i->second;
- }
- if ((i = _map.find("_pending_sequence")) != _map.end()) {
- pendingSequence = i->second;
- }
- if ((i = _map.find("_data")) != _map.end()) {
- data = i->second.asString();
- }
-}
-
-void ManagementAgent::exportSchemas(string& out) {
- Variant::List list_;
- Variant::Map map_, kmap, cmap;
-
- for (PackageMap::const_iterator i = packages.begin(); i != packages.end(); ++i) {
- string name = i->first;
- const ClassMap& classes = i ->second;
- for (ClassMap::const_iterator j = classes.begin(); j != classes.end(); ++j) {
- const SchemaClassKey& key = j->first;
- const SchemaClass& klass = j->second;
- if (klass.writeSchemaCall == 0) { // Ignore built-in schemas.
- // Encode name, schema-key, schema-class
-
- map_.clear();
- kmap.clear();
- cmap.clear();
-
- key.mapEncode(kmap);
- klass.mapEncode(cmap);
-
- map_["_pname"] = name;
- map_["_key"] = kmap;
- map_["_class"] = cmap;
- list_.push_back(map_);
- }
- }
- }
-
- ListCodec::encode(list_, out);
-}
-
-void ManagementAgent::importSchemas(qpid::framing::Buffer& inBuf) {
-
- string buf(inBuf.getPointer(), inBuf.available());
- Variant::List content;
- ListCodec::decode(buf, content);
- Variant::List::const_iterator l;
-
-
- for (l = content.begin(); l != content.end(); l++) {
- string package;
- SchemaClassKey key;
- SchemaClass klass;
- Variant::Map map_, kmap, cmap;
- Variant::Map::const_iterator i;
-
- map_ = l->asMap();
-
- if ((i = map_.find("_pname")) != map_.end()) {
- package = i->second.asString();
-
- if ((i = map_.find("_key")) != map_.end()) {
- key.mapDecode(i->second.asMap());
-
- if ((i = map_.find("_class")) != map_.end()) {
- klass.mapDecode(i->second.asMap());
-
- packages[package][key] = klass;
- }
- }
- }
- }
-}
-
-void ManagementAgent::RemoteAgent::mapEncode(Variant::Map& map_) const {
- Variant::Map _objId, _values;
-
- map_["_brokerBank"] = brokerBank;
- map_["_agentBank"] = agentBank;
- map_["_routingKey"] = routingKey;
-
- connectionRef.mapEncode(_objId);
- map_["_object_id"] = _objId;
-
- mgmtObject->mapEncodeValues(_values, true, false);
- map_["_values"] = _values;
-}
-
-void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) {
- Variant::Map::const_iterator i;
-
- if ((i = map_.find("_brokerBank")) != map_.end()) {
- brokerBank = i->second;
- }
-
- if ((i = map_.find("_agentBank")) != map_.end()) {
- agentBank = i->second;
- }
-
- if ((i = map_.find("_routingKey")) != map_.end()) {
- routingKey = i->second.getString();
- }
-
- if ((i = map_.find("_object_id")) != map_.end()) {
- connectionRef.mapDecode(i->second.asMap());
- }
-
- mgmtObject = new _qmf::Agent(&agent, this);
-
- if ((i = map_.find("_values")) != map_.end()) {
- mgmtObject->mapDecodeValues(i->second.asMap());
- }
-
- // TODO aconway 2010-03-04: see comment in encode(), readProperties doesn't set v2key.
- mgmtObject->set_connectionRef(connectionRef);
-}
-
-void ManagementAgent::exportAgents(string& out) {
- Variant::List list_;
- Variant::Map map_, omap, amap;
-
- for (RemoteAgentMap::const_iterator i = remoteAgents.begin();
- i != remoteAgents.end();
- ++i)
- {
- // TODO aconway 2010-03-04: see comment in ManagementAgent::RemoteAgent::encode
- boost::shared_ptr<RemoteAgent> agent(i->second);
-
- map_.clear();
- amap.clear();
-
- agent->mapEncode(amap);
- map_["_remote_agent"] = amap;
- list_.push_back(map_);
- }
-
- ListCodec::encode(list_, out);
-}
-
-void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) {
- string buf(inBuf.getPointer(), inBuf.available());
- Variant::List content;
- ListCodec::decode(buf, content);
- Variant::List::const_iterator l;
- sys::Mutex::ScopedLock lock(userLock);
-
- for (l = content.begin(); l != content.end(); l++) {
- boost::shared_ptr<RemoteAgent> agent(new RemoteAgent(*this));
- Variant::Map map_;
- Variant::Map::const_iterator i;
-
- map_ = l->asMap();
-
- if ((i = map_.find("_remote_agent")) != map_.end()) {
-
- agent->mapDecode(i->second.asMap());
-
- addObject (agent->mgmtObject, 0, false);
- remoteAgents[agent->connectionRef] = agent;
- }
- }
-}
-
-namespace {
-bool isDeletedMap(const ManagementObjectMap::value_type& value) {
- return value.second->isDeleted();
-}
-
-bool isDeletedVector(const ManagementObjectVector::value_type& value) {
- return value->isDeleted();
-}
-
-string summarizeMap(const char* name, const ManagementObjectMap& map) {
- ostringstream o;
- size_t deleted = std::count_if(map.begin(), map.end(), isDeletedMap);
- o << map.size() << " " << name << " (" << deleted << " deleted), ";
- return o.str();
-}
-
-string summarizeVector(const char* name, const ManagementObjectVector& map) {
- ostringstream o;
- size_t deleted = std::count_if(map.begin(), map.end(), isDeletedVector);
- o << map.size() << " " << name << " (" << deleted << " deleted), ";
- return o.str();
-}
-
-string dumpMap(const ManagementObjectMap& map) {
- ostringstream o;
- for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) {
- o << endl << " " << i->second->getObjectId().getV2Key()
- << (i->second->isDeleted() ? " (deleted)" : "");
- }
- return o.str();
-}
-
-string dumpVector(const ManagementObjectVector& map) {
- ostringstream o;
- for (ManagementObjectVector::const_iterator i = map.begin(); i != map.end(); ++i) {
- o << endl << " " << (*i)->getObjectId().getV2Key()
- << ((*i)->isDeleted() ? " (deleted)" : "");
- }
- return o.str();
-}
-
-} // namespace
-
-string ManagementAgent::summarizeAgents() {
- ostringstream msg;
- if (!remoteAgents.empty()) {
- msg << remoteAgents.size() << " agents(";
- for (RemoteAgentMap::const_iterator i=remoteAgents.begin();
- i != remoteAgents.end(); ++i)
- msg << " " << i->second->routingKey;
- msg << "), ";
- }
- return msg.str();
-}
-
-
-void ManagementAgent::debugSnapshot(const char* title) {
- QPID_LOG(debug, title << ": management snapshot: "
- << packages.size() << " packages, "
- << summarizeMap("objects", managementObjects)
- << summarizeVector("new objects ", newManagementObjects)
- << pendingDeletedObjs.size() << " pending deletes"
- << summarizeAgents());
-
- QPID_LOG_IF(trace, managementObjects.size(),
- title << ": objects" << dumpMap(managementObjects));
- QPID_LOG_IF(trace, newManagementObjects.size(),
- title << ": new objects" << dumpVector(newManagementObjects));
-}
-
-Variant::Map ManagementAgent::toMap(const FieldTable& from)
-{
- Variant::Map map;
-
- for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) {
- const string& key(iter->first);
- const FieldTable::ValuePtr& val(iter->second);
-
- map[key] = toVariant(val);
- }
-
- return map;
-}
-
-Variant::List ManagementAgent::toList(const List& from)
-{
- Variant::List _list;
-
- for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) {
- const List::ValuePtr& val(*iter);
-
- _list.push_back(toVariant(val));
- }
-
- return _list;
-}
-
-qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from)
-{
- qpid::framing::FieldTable ft;
-
- for (Variant::Map::const_iterator iter = from.begin();
- iter != from.end();
- iter++) {
- const string& key(iter->first);
- const Variant& val(iter->second);
-
- ft.set(key, toFieldValue(val));
- }
-
- return ft;
-}
-
-
-List ManagementAgent::fromList(const Variant::List& from)
-{
- List fa;
-
- for (Variant::List::const_iterator iter = from.begin();
- iter != from.end();
- iter++) {
- const Variant& val(*iter);
-
- fa.push_back(toFieldValue(val));
- }
-
- return fa;
-}
-
-
-boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in)
-{
-
- switch(in.getType()) {
-
- case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue());
- case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool()));
- case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8()));
- case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16()));
- case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32()));
- case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64()));
- case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8()));
- case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16()));
- case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32()));
- case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64()));
- case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat()));
- case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble()));
- case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString()));
- case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data()));
- case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap())));
- case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList())));
- }
-
- QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]");
- return boost::shared_ptr<FieldValue>(new VoidValue());
-}
-
-// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup.
-Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in)
-{
- const string iso885915("iso-8859-15");
- const string utf8("utf8");
- const string utf16("utf16");
- //const string binary("binary");
- const string amqp0_10_binary("amqp0-10:binary");
- //const string amqp0_10_bit("amqp0-10:bit");
- const string amqp0_10_datetime("amqp0-10:datetime");
- const string amqp0_10_struct("amqp0-10:struct");
- Variant out;
-
- //based on AMQP 0-10 typecode, pick most appropriate variant type
- switch (in->getType()) {
- //Fixed Width types:
- case 0x00: //bin8
- case 0x01: out.setEncoding(amqp0_10_binary); // int8
- case 0x02: out = in->getIntegerValue<int8_t>(); break; //uint8
- case 0x03: out = in->getIntegerValue<uint8_t>(); break; //
- // case 0x04: break; //TODO: iso-8859-15 char // char
- case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t>()); break; // bool int8
-
- case 0x10: out.setEncoding(amqp0_10_binary); // bin16
- case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16
- case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16
-
- case 0x20: out.setEncoding(amqp0_10_binary); // bin32
- case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32
- case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32
-
- case 0x23: out = in->get<float>(); break; // float(32)
-
- // case 0x27: break; //TODO: utf-32 char
-
- case 0x30: out.setEncoding(amqp0_10_binary); // bin64
- case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64
-
- case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding
- case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64
- case 0x33: out = in->get<double>(); break; // double
-
- case 0x48: // uuid
- {
- unsigned char data[16];
- in->getFixedWidthValue<16>(data);
- out = qpid::types::Uuid(data);
- } break;
-
- //TODO: figure out whether and how to map values with codes 0x40-0xd8
-
- case 0xf0: break;//void, which is the default value for Variant
- // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant
-
- //Variable Width types:
- //strings:
- case 0x80: // str8
- case 0x90: // str16
- case 0xa0: // str32
- out = in->get<string>();
- out.setEncoding(amqp0_10_binary);
- break;
-
- case 0x84: // str8
- case 0x94: // str16
- out = in->get<string>();
- out.setEncoding(iso885915);
- break;
-
- case 0x85: // str8
- case 0x95: // str16
- out = in->get<string>();
- out.setEncoding(utf8);
- break;
-
- case 0x86: // str8
- case 0x96: // str16
- out = in->get<string>();
- out.setEncoding(utf16);
- break;
-
- case 0xab: // str32
- out = in->get<string>();
- out.setEncoding(amqp0_10_struct);
- break;
-
- case 0xa8: // map
- out = ManagementAgent::toMap(in->get<FieldTable>());
- break;
-
- case 0xa9: // list of variant types
- out = ManagementAgent::toList(in->get<List>());
- break;
- //case 0xaa: //convert amqp0-10 array (uniform type) into variant list
- // out = Variant::List();
- // translate<Array>(in, out.asList(), &toVariant);
- // break;
-
- default:
- //error?
- QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]");
- break;
- }
-
- return out;
-}
-
-
-// Build up a list of the current set of deleted objects that are pending their
-// next (last) publish-ment.
-void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
-{
- outList.clear();
-
- sys::Mutex::ScopedLock lock (userLock);
-
- moveNewObjectsLH();
- moveDeletedObjectsLH();
-
- // now copy the pending deletes into the outList
- for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin();
- mIter != pendingDeletedObjs.end(); mIter++) {
- for (DeletedObjectList::iterator lIter = mIter->second.begin();
- lIter != mIter->second.end(); lIter++) {
- outList.push_back(*lIter);
- }
- }
-}
-
-// Called by cluster to reset the management agent's list of deleted
-// objects to match the rest of the cluster.
-void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
-{
- sys::Mutex::ScopedLock lock (userLock);
- // Clear out any existing deleted objects
- moveNewObjectsLH();
- pendingDeletedObjs.clear();
- ManagementObjectMap::iterator i = managementObjects.begin();
- // Silently drop any deleted objects left over from receiving the update.
- while (i != managementObjects.end()) {
- ManagementObject* object = i->second;
- if (object->isDeleted()) {
- delete object;
- managementObjects.erase(i++);
- }
- else ++i;
- }
- for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != inList.end(); lIter++) {
-
- std::string classkey((*lIter)->packageName + std::string(":") + (*lIter)->className);
- pendingDeletedObjs[classkey].push_back(*lIter);
- }
-}
-
-
-// construct a DeletedObject from a management object.
-ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2)
- : packageName(src->getPackageName()),
- className(src->getClassName())
-{
- bool send_stats = (src->hasInst() && (src->getInstChanged() || src->getForcePublish()));
-
- stringstream oid;
- oid << src->getObjectId();
- objectId = oid.str();
-
- if (v1) {
- src->writeProperties(encodedV1Config);
- if (send_stats) {
- src->writeStatistics(encodedV1Inst);
- }
- }
-
- if (v2) {
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oid;
-
- src->getObjectId().mapEncode(oid);
- map_["_object_id"] = oid;
- map_["_schema_id"] = mapEncodeSchemaId(src->getPackageName(),
- src->getClassName(),
- "_data",
- src->getMd5Sum());
- src->writeTimestamps(map_);
- src->mapEncodeValues(values, true, send_stats);
- map_["_values"] = values;
-
- encodedV2 = map_;
- }
-}
-
-
-
-// construct a DeletedObject from an encoded representation. Used by
-// clustering to move deleted objects between clustered brokers. See
-// DeletedObject::encode() for the reverse.
-ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded)
-{
- qpid::types::Variant::Map map_;
- MapCodec::decode(encoded, map_);
-
- packageName = map_["_package_name"].getString();
- className = map_["_class_name"].getString();
- objectId = map_["_object_id"].getString();
-
- encodedV1Config = map_["_v1_config"].getString();
- encodedV1Inst = map_["_v1_inst"].getString();
- encodedV2 = map_["_v2_data"].asMap();
-}
-
-
-// encode a DeletedObject to a string buffer. Used by
-// clustering to move deleted objects between clustered brokers. See
-// DeletedObject(const std::string&) for the reverse.
-void ManagementAgent::DeletedObject::encode(std::string& toBuffer)
-{
- qpid::types::Variant::Map map_;
-
-
- map_["_package_name"] = packageName;
- map_["_class_name"] = className;
- map_["_object_id"] = objectId;
-
- map_["_v1_config"] = encodedV1Config;
- map_["_v1_inst"] = encodedV1Inst;
- map_["_v2_data"] = encodedV2;
-
- MapCodec::encode(map_, toBuffer);
-}
-
-// Remove Deleted objects, and save for later publishing...
-bool ManagementAgent::moveDeletedObjectsLH() {
- typedef vector<pair<ObjectId, ManagementObject*> > DeleteList;
- DeleteList deleteList;
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- ++iter)
- {
- ManagementObject* object = iter->second;
- if (object->isDeleted()) deleteList.push_back(*iter);
- }
-
- // Iterate in reverse over deleted object list
- for (DeleteList::reverse_iterator iter = deleteList.rbegin();
- iter != deleteList.rend();
- iter++)
- {
- ManagementObject* delObj = iter->second;
- assert(delObj->isDeleted());
- DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support));
-
- pendingDeletedObjs[dptr->getKey()].push_back(dptr);
- managementObjects.erase(iter->first);
- delete iter->second;
- }
- return !deleteList.empty();
-}
-
-namespace qpid {
-namespace management {
-
-namespace {
-QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
-}
-
-void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt)
-{
- executionContext = ctxt;
-}
-const qpid::broker::ConnectionState* getManagementExecutionContext()
-{
- return executionContext;
-}
-
-}}
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
deleted file mode 100644
index fb15dc6ed1..0000000000
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ /dev/null
@@ -1,432 +0,0 @@
-#ifndef _ManagementAgent_
-#define _ManagementAgent_
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/Options.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
-#include "qpid/broker/ConnectionToken.h"
-#include "qpid/management/ManagementObject.h"
-#include "qpid/management/ManagementEvent.h"
-#include "qpid/management/Manageable.h"
-#include "qmf/org/apache/qpid/broker/Agent.h"
-#include "qpid/types/Variant.h"
-#include <qpid/framing/AMQFrame.h>
-#include <qpid/framing/FieldValue.h>
-#include <qpid/framing/ResizableBuffer.h>
-#include <memory>
-#include <string>
-#include <map>
-
-namespace qpid {
-namespace broker {
-class ConnectionState;
-}
-namespace management {
-
-class ManagementAgent
-{
-private:
-
- int threadPoolSize;
-
-public:
- typedef enum {
- SEV_EMERG = 0,
- SEV_ALERT = 1,
- SEV_CRIT = 2,
- SEV_ERROR = 3,
- SEV_WARN = 4,
- SEV_NOTE = 5,
- SEV_INFO = 6,
- SEV_DEBUG = 7,
- SEV_DEFAULT = 8
- } severity_t;
-
-
- ManagementAgent (const bool qmfV1, const bool qmfV2);
- virtual ~ManagementAgent ();
-
- /** Called before plugins are initialized */
- void configure (const std::string& dataDir, uint16_t interval,
- qpid::broker::Broker* broker, int threadPoolSize);
- /** Called after plugins are initialized. */
- void pluginsInitialized();
-
- /** Called by cluster to suppress management output during update. */
- void suppress(bool s) { suppressed = s; }
-
- void setName(const std::string& vendor,
- const std::string& product,
- const std::string& instance="");
- void getName(std::string& vendor, std::string& product, std::string& instance);
- const std::string& getAddress();
-
- void setInterval(uint16_t _interval) { interval = _interval; }
- void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
- qpid::broker::Exchange::shared_ptr directExchange);
- void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
- qpid::broker::Exchange::shared_ptr directExchange);
-
- int getMaxThreads () { return threadPoolSize; }
- QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
- const std::string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- QPID_BROKER_EXTERN void registerEvent (const std::string& packageName,
- const std::string& eventName,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0,
- bool persistent = false);
- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- const std::string& key,
- bool persistent = false);
- QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
- severity_t severity = SEV_DEFAULT);
- QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
-
- QPID_BROKER_EXTERN void clusterUpdate();
-
- bool dispatchCommand (qpid::broker::Deliverable& msg,
- const std::string& routingKey,
- const framing::FieldTable* args,
- const bool topic,
- int qmfVersion);
-
- /** Disallow a method. Attempts to call it will receive an exception with message. */
- void disallow(const std::string& className, const std::string& methodName, const std::string& message);
-
- /** Disallow all QMFv1 methods (used in clustered brokers). */
- void disallowV1Methods() { disallowAllV1Methods = true; }
-
- /** Serialize my schemas as a binary blob into schemaOut */
- void exportSchemas(std::string& schemaOut);
-
- /** Serialize my remote-agent map as a binary blob into agentsOut */
- void exportAgents(std::string& agentsOut);
-
- /** Decode a serialized schemas and add to my schema cache */
- void importSchemas(framing::Buffer& inBuf);
-
- /** Decode a serialized agent map */
- void importAgents(framing::Buffer& inBuf);
-
- // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
- uint64_t getNextObjectId(void) { return nextObjectId; }
- void setNextObjectId(uint64_t o) { nextObjectId = o; }
-
- uint16_t getBootSequence(void) { return bootSequence; }
- void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
-
- const framing::Uuid& getUuid() const { return uuid; }
- void setUuid(const framing::Uuid& id) { uuid = id; writeData(); }
-
- // TODO: remove these when Variant API moved into common library.
- static types::Variant::Map toMap(const framing::FieldTable& from);
- static framing::FieldTable fromMap(const types::Variant::Map& from);
- static types::Variant::List toList(const framing::List& from);
- static framing::List fromList(const types::Variant::List& from);
- static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in);
- static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val);
-
- // For Clustering: management objects that have been marked as
- // "deleted", but are waiting for their last published object
- // update are not visible to the cluster replication code. These
- // interfaces allow clustering to gather up all the management
- // objects that are deleted in order to allow all clustered
- // brokers to publish the same set of deleted objects.
-
- class DeletedObject {
- public:
- typedef boost::shared_ptr<DeletedObject> shared_ptr;
- DeletedObject(ManagementObject *, bool v1, bool v2);
- DeletedObject( const std::string &encoded );
- ~DeletedObject() {};
- void encode( std::string& toBuffer );
- const std::string getKey() const {
- // used to batch up objects of the same class type
- return std::string(packageName + std::string(":") + className);
- }
-
- private:
- friend class ManagementAgent;
-
- std::string packageName;
- std::string className;
- std::string objectId;
-
- std::string encodedV1Config; // qmfv1 properties
- std::string encodedV1Inst; // qmfv1 statistics
- qpid::types::Variant::Map encodedV2;
- };
-
- typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
-
- /** returns a snapshot of all currently deleted management objects. */
- void exportDeletedObjects( DeletedObjectList& outList );
-
- /** Import a list of deleted objects to send on next publish interval. */
- void importDeletedObjects( const DeletedObjectList& inList );
-
-private:
- struct Periodic : public qpid::sys::TimerTask
- {
- ManagementAgent& agent;
-
- Periodic (ManagementAgent& agent, uint32_t seconds);
- virtual ~Periodic ();
- void fire ();
- };
-
- // Storage for tracking remote management agents, attached via the client
- // management agent API.
- //
- struct RemoteAgent : public Manageable
- {
- ManagementAgent& agent;
- uint32_t brokerBank;
- uint32_t agentBank;
- std::string routingKey;
- ObjectId connectionRef;
- qmf::org::apache::qpid::broker::Agent* mgmtObject;
- RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {}
- ManagementObject* GetManagementObject (void) const { return mgmtObject; }
-
- virtual ~RemoteAgent ();
- void mapEncode(qpid::types::Variant::Map& _map) const;
- void mapDecode(const qpid::types::Variant::Map& _map);
- };
-
- typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap;
-
- // Storage for known schema classes:
- //
- // SchemaClassKey -- Key elements for map lookups
- // SchemaClassKeyComp -- Comparison class for SchemaClassKey
- // SchemaClass -- Non-key elements for classes
- //
- struct SchemaClassKey
- {
- std::string name;
- uint8_t hash[16];
-
- void mapEncode(qpid::types::Variant::Map& _map) const;
- void mapDecode(const qpid::types::Variant::Map& _map);
- void encode(framing::Buffer& buffer) const;
- void decode(framing::Buffer& buffer);
- uint32_t encodedBufSize() const;
- };
-
- struct SchemaClassKeyComp
- {
- bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
- {
- if (lhs.name != rhs.name)
- return lhs.name < rhs.name;
- else
- for (int i = 0; i < 16; i++)
- if (lhs.hash[i] != rhs.hash[i])
- return lhs.hash[i] < rhs.hash[i];
- return false;
- }
- };
-
-
- struct SchemaClass
- {
- uint8_t kind;
- ManagementObject::writeSchemaCall_t writeSchemaCall;
- std::string data;
- uint32_t pendingSequence;
-
- SchemaClass(uint8_t _kind=0, uint32_t seq=0) :
- kind(_kind), writeSchemaCall(0), pendingSequence(seq) {}
- SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
- kind(_kind), writeSchemaCall(call), pendingSequence(0) {}
- bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
- void appendSchema (framing::Buffer& buf);
-
- void mapEncode(qpid::types::Variant::Map& _map) const;
- void mapDecode(const qpid::types::Variant::Map& _map);
- };
-
- typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
- typedef std::map<std::string, ClassMap> PackageMap;
-
- RemoteAgentMap remoteAgents;
- PackageMap packages;
-
- //
- // Protected by userLock
- //
- ManagementObjectMap managementObjects;
-
- //
- // Protected by addLock
- //
- ManagementObjectVector newManagementObjects;
-
- framing::Uuid uuid;
-
- //
- // Lock hierarchy: If a thread needs to take both addLock and userLock,
- // it MUST take userLock first, then addLock.
- //
- sys::Mutex userLock;
- sys::Mutex addLock;
-
- qpid::broker::Exchange::shared_ptr mExchange;
- qpid::broker::Exchange::shared_ptr dExchange;
- qpid::broker::Exchange::shared_ptr v2Topic;
- qpid::broker::Exchange::shared_ptr v2Direct;
- std::string dataDir;
- uint16_t interval;
- qpid::broker::Broker* broker;
- qpid::sys::Timer* timer;
- uint16_t bootSequence;
- uint32_t nextObjectId;
- uint32_t brokerBank;
- uint32_t nextRemoteBank;
- uint32_t nextRequestSequence;
- bool clientWasAdded;
- const qpid::sys::AbsTime startTime;
- bool suppressed;
-
- typedef std::pair<std::string,std::string> MethodName;
- typedef std::map<MethodName, std::string> DisallowedMethods;
- DisallowedMethods disallowed;
- bool disallowAllV1Methods;
-
- // Agent name and address
- qpid::types::Variant::Map attrMap;
- std::string name_address;
- std::string vendorNameKey; // "." --> "_"
- std::string productNameKey; // "." --> "_"
- std::string instanceNameKey; // "." --> "_"
-
- // supported management protocol
- bool qmf1Support;
- bool qmf2Support;
-
- // Maximum # of objects allowed in a single V2 response
- // message.
- uint32_t maxReplyObjs;
-
- // list of objects that have been deleted, but have yet to be published
- // one final time.
- // Indexed by a string composed of the object's package and class name.
- // Protected by userLock.
- typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
- PendingDeletedObjsMap pendingDeletedObjs;
-
-# define MA_BUFFER_SIZE 65536
- char inputBuffer[MA_BUFFER_SIZE];
- char outputBuffer[MA_BUFFER_SIZE];
- char eventBuffer[MA_BUFFER_SIZE];
- framing::ResizableBuffer msgBuffer;
-
- void writeData ();
- void periodicProcessing (void);
- void deleteObjectNowLH(const ObjectId& oid);
- void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendBufferLH(framing::Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey);
- void sendBufferLH(framing::Buffer& buf,
- uint32_t length,
- const std::string& exchange,
- const std::string& routingKey);
- void sendBufferLH(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- const std::string& content_type,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey,
- uint64_t ttl_msec = 0);
- void sendBufferLH(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- const std::string& content_type,
- const std::string& exchange,
- const std::string& routingKey,
- uint64_t ttl_msec = 0);
- void moveNewObjectsLH();
- bool moveDeletedObjectsLH();
-
- bool authorizeAgentMessageLH(qpid::broker::Message& msg);
- void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false);
-
- PackageMap::iterator findOrAddPackageLH(std::string name);
- void addClassLH(uint8_t kind,
- PackageMap::iterator pIter,
- const std::string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall);
- void encodePackageIndication (framing::Buffer& buf,
- PackageMap::iterator pIter);
- void encodeClassIndication (framing::Buffer& buf,
- const std::string packageName,
- const struct SchemaClassKey key,
- uint8_t kind);
- bool bankInUse (uint32_t bank);
- uint32_t allocateNewBank ();
- uint32_t assignBankLH (uint32_t requestedPrefix);
- void deleteOrphanedAgentsLH();
- void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
- uint32_t code = 0, const std::string& text = "OK");
- void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
- void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
- void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
- void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
-
-
- size_t validateSchema(framing::Buffer&, uint8_t kind);
- size_t validateTableSchema(framing::Buffer&);
- size_t validateEventSchema(framing::Buffer&);
- ManagementObjectMap::iterator numericFind(const ObjectId& oid);
-
- std::string summarizeAgents();
- void debugSnapshot(const char* title);
-};
-
-void setManagementExecutionContext(const qpid::broker::ConnectionState*);
-const qpid::broker::ConnectionState* getManagementExecutionContext();
-}}
-
-#endif /*!_ManagementAgent_*/
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp
deleted file mode 100644
index 1d5f8bbd6b..0000000000
--- a/cpp/src/qpid/management/ManagementDirectExchange.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/management/ManagementDirectExchange.h"
-#include "qpid/log/Statement.h"
-#include <assert.h>
-
-using namespace qpid::management;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
- Exchange (_name, _parent, b),
- DirectExchange(_name, _parent, b),
- managementAgent(0) {}
-ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
- bool _durable,
- const FieldTable& _args,
- Manageable* _parent, Broker* b) :
- Exchange (_name, _durable, _args, _parent, b),
- DirectExchange(_name, _durable, _args, _parent, b),
- managementAgent(0) {}
-
-void ManagementDirectExchange::route(Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
-{
- bool routeIt = true;
-
- if (managementAgent)
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion);
-
- if (routeIt)
- DirectExchange::route(msg, routingKey, args);
-}
-
-void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv)
-{
- managementAgent = agent;
- qmfVersion = qv;
- assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange
-}
-
-
-ManagementDirectExchange::~ManagementDirectExchange() {}
-
-const std::string ManagementDirectExchange::typeName("management-direct");
-
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h
deleted file mode 100644
index 7507179c06..0000000000
--- a/cpp/src/qpid/management/ManagementDirectExchange.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _ManagementDirectExchange_
-#define _ManagementDirectExchange_
-
-#include "qpid/broker/DirectExchange.h"
-#include "qpid/management/ManagementAgent.h"
-
-namespace qpid {
-namespace broker {
-
-class ManagementDirectExchange : public virtual DirectExchange
-{
- private:
- management::ManagementAgent* managementAgent;
- int qmfVersion;
-
- public:
- static const std::string typeName;
-
- ManagementDirectExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0);
- ManagementDirectExchange(const std::string& _name, bool _durable,
- const qpid::framing::FieldTable& _args,
- Manageable* _parent = 0, Broker* broker = 0);
-
- virtual std::string getType() const { return typeName; }
-
- virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
-
- void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
-
- virtual ~ManagementDirectExchange();
-};
-
-
-}
-}
-
-#endif
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
deleted file mode 100644
index b4d469afbe..0000000000
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementObject.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/sys/Time.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/log/Statement.h"
-#include <boost/lexical_cast.hpp>
-
-#include <stdlib.h>
-
-using namespace std;
-using namespace qpid;
-using namespace qpid::management;
-
-void AgentAttachment::setBanks(uint32_t broker, uint32_t bank)
-{
- first =
- ((uint64_t) (broker & 0x000fffff)) << 28 |
- ((uint64_t) (bank & 0x0fffffff));
-}
-
-// Deprecated
-ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object)
- : agent(0), agentEpoch(seq)
-{
- first =
- ((uint64_t) (flags & 0x0f)) << 60 |
- ((uint64_t) (seq & 0x0fff)) << 48 |
- ((uint64_t) (broker & 0x000fffff)) << 28;
- second = object;
-}
-
-
-ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker)
- : agent(0), second(0), agentEpoch(seq)
-{
- first =
- ((uint64_t) (flags & 0x0f)) << 60 |
- ((uint64_t) (seq & 0x0fff)) << 48 |
- ((uint64_t) (broker & 0x000fffff)) << 28;
-}
-
-ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq)
- : agent(_agent), second(0), agentEpoch(seq)
-{
-
- first =
- ((uint64_t) (flags & 0x0f)) << 60 |
- ((uint64_t) (seq & 0x0fff)) << 48;
-}
-
-
-ObjectId::ObjectId(istream& in) : agent(0)
-{
- string text;
- in >> text;
- fromString(text);
-}
-
-ObjectId::ObjectId(const string& text) : agent(0)
-{
- fromString(text);
-}
-
-void ObjectId::fromString(const string& text)
-{
-#define FIELDS 5
-#if defined (_WIN32) && !defined (atoll)
-# define atoll(X) _atoi64(X)
-#endif
-
- // format:
- // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id>
- // V2: Not used
-
- string copy(text.c_str());
- char* cText;
- char* field[FIELDS];
- bool atFieldStart = true;
- int idx = 0;
-
- cText = const_cast<char*>(copy.c_str());
- for (char* cursor = cText; *cursor; cursor++) {
- if (atFieldStart) {
- if (idx >= FIELDS)
- throw Exception("Invalid ObjectId format");
- field[idx++] = cursor;
- atFieldStart = false;
- } else {
- if (*cursor == '-') {
- *cursor = '\0';
- atFieldStart = true;
- }
- }
- }
-
- if (idx != FIELDS)
- throw Exception("Invalid ObjectId format");
-
- agentEpoch = atoll(field[1]);
-
- first = (atoll(field[0]) << 60) +
- (atoll(field[1]) << 48) +
- (atoll(field[2]) << 28);
-
- agentName = string(field[3]);
- second = atoll(field[4]);
-}
-
-
-bool ObjectId::operator==(const ObjectId &other) const
-{
- return v2Key == other.v2Key;
-}
-
-bool ObjectId::operator<(const ObjectId &other) const
-{
- return v2Key < other.v2Key;
-}
-
-bool ObjectId::equalV1(const ObjectId &other) const
-{
- uint64_t otherFirst = agent == 0 ? other.first : other.first & 0xffff000000000000LL;
- return first == otherFirst && second == other.second;
-}
-
-// encode as V1-format binary
-void ObjectId::encode(string& buffer) const
-{
- const uint32_t len = 16;
- char _data[len];
- qpid::framing::Buffer body(_data, len);
-
- if (agent == 0)
- body.putLongLong(first);
- else
- body.putLongLong(first | agent->first);
- body.putLongLong(second);
-
- body.reset();
- body.getRawData(buffer, len);
-}
-
-// decode as V1-format binary
-void ObjectId::decode(const string& buffer)
-{
- const uint32_t len = 16;
- char _data[len];
- qpid::framing::Buffer body(_data, len);
-
- body.checkAvailable(buffer.length());
- body.putRawData(buffer);
- body.reset();
- first = body.getLongLong();
- second = body.getLongLong();
- v2Key = boost::lexical_cast<string>(second);
-}
-
-// generate the V2 key from the index fields defined
-// in the schema.
-void ObjectId::setV2Key(const ManagementObject& object)
-{
- stringstream oname;
- oname << object.getPackageName() << ":" << object.getClassName() << ":" << object.getKey();
- v2Key = oname.str();
-}
-
-
-// encode as V2-format map
-void ObjectId::mapEncode(types::Variant::Map& map) const
-{
- map["_object_name"] = v2Key;
- if (!agentName.empty())
- map["_agent_name"] = agentName;
- if (agentEpoch)
- map["_agent_epoch"] = agentEpoch;
-}
-
-// decode as v2-format map
-void ObjectId::mapDecode(const types::Variant::Map& map)
-{
- types::Variant::Map::const_iterator i;
-
- if ((i = map.find("_object_name")) != map.end())
- v2Key = i->second.asString();
- else
- throw Exception("Required _object_name field missing.");
-
- if ((i = map.find("_agent_name")) != map.end())
- agentName = i->second.asString();
-
- if ((i = map.find("_agent_epoch")) != map.end())
- agentEpoch = i->second.asInt64();
-}
-
-
-ObjectId::operator types::Variant::Map() const
-{
- types::Variant::Map m;
- mapEncode(m);
- return m;
-}
-
-
-
-namespace qpid {
-namespace management {
-
-ostream& operator<<(ostream& out, const ObjectId& i)
-{
- uint64_t virtFirst = i.first;
- if (i.agent)
- virtFirst |= i.agent->getFirst();
-
- out << ((virtFirst & 0xF000000000000000LL) >> 60) <<
- "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) <<
- "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) <<
- "-" << i.agentName <<
- "-" << i.second <<
- "(" << i.v2Key << ")";
- return out;
-}
-
-}}
-
-ManagementObject::ManagementObject(Manageable* _core) :
-createTime(qpid::sys::Duration(sys::EPOCH, sys::now())),
- destroyTime(0), updateTime(createTime), configChanged(true),
- instChanged(true), deleted(false),
- coreObject(_core), flags(0), forcePublish(false) {}
-
-void ManagementObject::setUpdateTime()
-{
- updateTime = sys::Duration(sys::EPOCH, sys::now());
-}
-
-void ManagementObject::resourceDestroy()
-{
- QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key());
- destroyTime = sys::Duration(sys::EPOCH, sys::now());
- deleted = true;
-}
-
-int ManagementObject::maxThreads = 1;
-int ManagementObject::nextThreadIndex = 0;
-
-void ManagementObject::writeTimestamps (string& buf) const
-{
- char _data[4000];
- qpid::framing::Buffer body(_data, 4000);
-
- body.putShortString (getPackageName ());
- body.putShortString (getClassName ());
- body.putBin128 (getMd5Sum ());
- body.putLongLong (updateTime);
- body.putLongLong (createTime);
- body.putLongLong (destroyTime);
-
- uint32_t len = body.getPosition();
- body.reset();
- body.getRawData(buf, len);
-
- string oid;
- objectId.encode(oid);
- buf += oid;
-}
-
-void ManagementObject::readTimestamps (const string& buf)
-{
- char _data[4000];
- qpid::framing::Buffer body(_data, 4000);
- string unused;
- uint8_t unusedUuid[16];
-
- body.checkAvailable(buf.length());
- body.putRawData(buf);
- body.reset();
-
- body.getShortString(unused);
- body.getShortString(unused);
- body.getBin128(unusedUuid);
- updateTime = body.getLongLong();
- createTime = body.getLongLong();
- destroyTime = body.getLongLong();
-}
-
-uint32_t ManagementObject::writeTimestampsSize() const
-{
- return 1 + getPackageName().length() + // str8
- 1 + getClassName().length() + // str8
- 16 + // bin128
- 8 + // uint64
- 8 + // uint64
- 8 + // uint64
- objectId.encodedSize(); // objectId
-}
-
-
-void ManagementObject::writeTimestamps (types::Variant::Map& map) const
-{
- // types::Variant::Map oid, sid;
-
- // sid["_package_name"] = getPackageName();
- // sid["_class_name"] = getClassName();
- // sid["_hash"] = qpid::types::Uuid(getMd5Sum());
- // map["_schema_id"] = sid;
-
- // objectId.mapEncode(oid);
- // map["_object_id"] = oid;
-
- map["_update_ts"] = updateTime;
- map["_create_ts"] = createTime;
- map["_delete_ts"] = destroyTime;
-}
-
-void ManagementObject::readTimestamps (const types::Variant::Map& map)
-{
- types::Variant::Map::const_iterator i;
-
- if ((i = map.find("_update_ts")) != map.end())
- updateTime = i->second.asUint64();
- if ((i = map.find("_create_ts")) != map.end())
- createTime = i->second.asUint64();
- if ((i = map.find("_delete_ts")) != map.end())
- destroyTime = i->second.asUint64();
-}
-
-
-void ManagementObject::setReference(ObjectId) {}
-
-int ManagementObject::getThreadIndex() {
- static QPID_TSS int thisIndex = -1;
- if (thisIndex == -1) {
- Mutex::ScopedLock mutex(accessLock);
- thisIndex = nextThreadIndex;
- if (nextThreadIndex < maxThreads - 1)
- nextThreadIndex++;
- }
- return thisIndex;
-}
-
-
-// void ManagementObject::mapEncode(types::Variant::Map& map,
-// bool includeProperties,
-// bool includeStatistics)
-// {
-// types::Variant::Map values;
-
-// writeTimestamps(map);
-
-// mapEncodeValues(values, includeProperties, includeStatistics);
-// map["_values"] = values;
-// }
-
-// void ManagementObject::mapDecode(const types::Variant::Map& map)
-// {
-// types::Variant::Map::const_iterator i;
-
-// readTimestamps(map);
-
-// if ((i = map.find("_values")) != map.end())
-// mapDecodeValues(i->second.asMap());
-// }
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp
deleted file mode 100644
index ee8657646f..0000000000
--- a/cpp/src/qpid/management/ManagementTopicExchange.cpp
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/management/ManagementTopicExchange.h"
-#include "qpid/log/Statement.h"
-
-using namespace qpid::management;
-using namespace qpid::broker;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
- Exchange (_name, _parent, b),
- TopicExchange(_name, _parent, b),
- managementAgent(0) {}
-ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
- bool _durable,
- const FieldTable& _args,
- Manageable* _parent, Broker* b) :
- Exchange (_name, _durable, _args, _parent, b),
- TopicExchange(_name, _durable, _args, _parent, b),
- managementAgent(0) {}
-
-void ManagementTopicExchange::route(Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
-{
- bool routeIt = true;
-
- // Intercept management agent commands
- if (managementAgent)
- routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion);
-
- if (routeIt)
- TopicExchange::route(msg, routingKey, args);
-}
-
-bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
- const string& routingKey,
- const qpid::framing::FieldTable* args)
-{
- if (qmfVersion == 1)
- managementAgent->clientAdded(routingKey);
- return TopicExchange::bind(queue, routingKey, args);
-}
-
-void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv)
-{
- managementAgent = agent;
- qmfVersion = qv;
-}
-
-
-ManagementTopicExchange::~ManagementTopicExchange() {}
-
-const std::string ManagementTopicExchange::typeName("management-topic");
-
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h
deleted file mode 100644
index 232300265e..0000000000
--- a/cpp/src/qpid/management/ManagementTopicExchange.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _ManagementTopicExchange_
-#define _ManagementTopicExchange_
-
-#include "qpid/broker/TopicExchange.h"
-#include "qpid/management/ManagementAgent.h"
-
-namespace qpid {
-namespace broker {
-
-class ManagementTopicExchange : public virtual TopicExchange
-{
- private:
- management::ManagementAgent* managementAgent;
- int qmfVersion;
-
- public:
- static const std::string typeName;
-
- ManagementTopicExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0);
- ManagementTopicExchange(const std::string& _name, bool _durable,
- const qpid::framing::FieldTable& _args,
- Manageable* _parent = 0, Broker* broker = 0);
-
- virtual std::string getType() const { return typeName; }
-
- virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
-
- virtual bool bind(Queue::shared_ptr queue,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
-
- void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
-
- virtual ~ManagementTopicExchange();
-};
-
-
-}
-}
-
-#endif
diff --git a/cpp/src/qpid/management/Mutex.cpp b/cpp/src/qpid/management/Mutex.cpp
deleted file mode 100644
index f05abb01dc..0000000000
--- a/cpp/src/qpid/management/Mutex.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Copyright (c) 2008 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/management/Mutex.h"
-#include "qpid/sys/Mutex.h"
-
-using namespace std;
-using namespace qpid::management;
-
-Mutex::Mutex() : impl(new sys::Mutex()) {}
-Mutex::~Mutex() { delete impl; }
-void Mutex::lock() { impl->lock(); }
-void Mutex::unlock() { impl->unlock(); }
-