diff options
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 228 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 13 |
2 files changed, 44 insertions, 197 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 23c999a98a..5799a1adca 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -31,6 +31,7 @@ #include <qpid/broker/Message.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Thread.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/AclModule.h" #include "qpid/types/Variant.h" @@ -74,6 +75,18 @@ namespace { } return n2; } + +struct ScopedManagementContext +{ + ScopedManagementContext(const qpid::broker::ConnectionState* context) + { + setManagementExecutionContext(context); + } + ~ScopedManagementContext() + { + setManagementExecutionContext(0); + } +}; } @@ -535,6 +548,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf, dp->setRoutingKey(routingKey); msg->getFrames().append(content); + msg->setIsManagementMessage(true); { sys::Mutex::ScopedUnlock u(userLock); @@ -600,7 +614,7 @@ void ManagementAgent::sendBufferLH(const string& data, props->setAppId("qmf2"); for (i = headers.begin(); i != headers.end(); ++i) { - msg->getOrInsertHeaders().setString(i->first, i->second.asString()); + msg->insertCustomProperty(i->first, i->second.asString()); } DeliveryProperties* dp = @@ -608,9 +622,10 @@ void ManagementAgent::sendBufferLH(const string& data, dp->setRoutingKey(routingKey); if (ttl_msec) { dp->setTtl(ttl_msec); - msg->setTimestamp(broker->getExpiryPolicy()); + msg->computeExpiration(broker->getExpiryPolicy()); } msg->getFrames().append(content); + msg->setIsManagementMessage(true); { sys::Mutex::ScopedUnlock u(userLock); @@ -2237,6 +2252,7 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); + ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher()); const framing::FieldTable *headers = msg.getApplicationHeaders(); if (headers && msg.getAppId() == "qmf2") { @@ -2740,200 +2756,14 @@ void ManagementAgent::debugSnapshot(const char* title) { title << ": new objects" << dumpVector(newManagementObjects)); } + Variant::Map ManagementAgent::toMap(const FieldTable& from) { Variant::Map map; - - for (FieldTable::const_iterator iter = from.begin(); iter != from.end(); iter++) { - const string& key(iter->first); - const FieldTable::ValuePtr& val(iter->second); - - map[key] = toVariant(val); - } - + qpid::amqp_0_10::translate(from, map); return map; } -Variant::List ManagementAgent::toList(const List& from) -{ - Variant::List _list; - - for (List::const_iterator iter = from.begin(); iter != from.end(); iter++) { - const List::ValuePtr& val(*iter); - - _list.push_back(toVariant(val)); - } - - return _list; -} - -qpid::framing::FieldTable ManagementAgent::fromMap(const Variant::Map& from) -{ - qpid::framing::FieldTable ft; - - for (Variant::Map::const_iterator iter = from.begin(); - iter != from.end(); - iter++) { - const string& key(iter->first); - const Variant& val(iter->second); - - ft.set(key, toFieldValue(val)); - } - - return ft; -} - - -List ManagementAgent::fromList(const Variant::List& from) -{ - List fa; - - for (Variant::List::const_iterator iter = from.begin(); - iter != from.end(); - iter++) { - const Variant& val(*iter); - - fa.push_back(toFieldValue(val)); - } - - return fa; -} - - -boost::shared_ptr<FieldValue> ManagementAgent::toFieldValue(const Variant& in) -{ - - switch(in.getType()) { - - case types::VAR_VOID: return boost::shared_ptr<FieldValue>(new VoidValue()); - case types::VAR_BOOL: return boost::shared_ptr<FieldValue>(new BoolValue(in.asBool())); - case types::VAR_UINT8: return boost::shared_ptr<FieldValue>(new Unsigned8Value(in.asUint8())); - case types::VAR_UINT16: return boost::shared_ptr<FieldValue>(new Unsigned16Value(in.asUint16())); - case types::VAR_UINT32: return boost::shared_ptr<FieldValue>(new Unsigned32Value(in.asUint32())); - case types::VAR_UINT64: return boost::shared_ptr<FieldValue>(new Unsigned64Value(in.asUint64())); - case types::VAR_INT8: return boost::shared_ptr<FieldValue>(new Integer8Value(in.asInt8())); - case types::VAR_INT16: return boost::shared_ptr<FieldValue>(new Integer16Value(in.asInt16())); - case types::VAR_INT32: return boost::shared_ptr<FieldValue>(new Integer32Value(in.asInt32())); - case types::VAR_INT64: return boost::shared_ptr<FieldValue>(new Integer64Value(in.asInt64())); - case types::VAR_FLOAT: return boost::shared_ptr<FieldValue>(new FloatValue(in.asFloat())); - case types::VAR_DOUBLE: return boost::shared_ptr<FieldValue>(new DoubleValue(in.asDouble())); - case types::VAR_STRING: return boost::shared_ptr<FieldValue>(new Str16Value(in.asString())); - case types::VAR_UUID: return boost::shared_ptr<FieldValue>(new UuidValue(in.asUuid().data())); - case types::VAR_MAP: return boost::shared_ptr<FieldValue>(new FieldTableValue(ManagementAgent::fromMap(in.asMap()))); - case types::VAR_LIST: return boost::shared_ptr<FieldValue>(new ListValue(ManagementAgent::fromList(in.asList()))); - } - - QPID_LOG(error, "Unknown Variant type - not converted: [" << in.getType() << "]"); - return boost::shared_ptr<FieldValue>(new VoidValue()); -} - -// stolen from qpid/client/amqp0_10/Codecs.cpp - TODO: make Codecs public, and remove this dup. -Variant ManagementAgent::toVariant(const boost::shared_ptr<FieldValue>& in) -{ - const string iso885915("iso-8859-15"); - const string utf8("utf8"); - const string utf16("utf16"); - //const string binary("binary"); - const string amqp0_10_binary("amqp0-10:binary"); - //const string amqp0_10_bit("amqp0-10:bit"); - const string amqp0_10_datetime("amqp0-10:datetime"); - const string amqp0_10_struct("amqp0-10:struct"); - Variant out; - - //based on AMQP 0-10 typecode, pick most appropriate variant type - switch (in->getType()) { - //Fixed Width types: - case 0x00: //bin8 - case 0x01: out.setEncoding(amqp0_10_binary); // int8 - case 0x02: out = in->getIntegerValue<int8_t, 1>(); break; //uint8 - case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break; // - // case 0x04: break; //TODO: iso-8859-15 char // char - case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break; // bool int8 - - case 0x10: out.setEncoding(amqp0_10_binary); // bin16 - case 0x11: out = in->getIntegerValue<int16_t, 2>(); break; // int16 - case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break; //uint16 - - case 0x20: out.setEncoding(amqp0_10_binary); // bin32 - case 0x21: out = in->getIntegerValue<int32_t, 4>(); break; // int32 - case 0x22: out = in->getIntegerValue<uint32_t, 4>(); break; // uint32 - - case 0x23: out = in->get<float>(); break; // float(32) - - // case 0x27: break; //TODO: utf-32 char - - case 0x30: out.setEncoding(amqp0_10_binary); // bin64 - case 0x31: out = in->getIntegerValue<int64_t, 8>(); break; //int64 - - case 0x38: out.setEncoding(amqp0_10_datetime); //treat datetime as uint64_t, but set encoding - case 0x32: out = in->getIntegerValue<uint64_t, 8>(); break; //uint64 - case 0x33: out = in->get<double>(); break; // double - - case 0x48: // uuid - { - unsigned char data[16]; - in->getFixedWidthValue<16>(data); - out = qpid::types::Uuid(data); - } break; - - //TODO: figure out whether and how to map values with codes 0x40-0xd8 - - case 0xf0: break;//void, which is the default value for Variant - // case 0xf1: out.setEncoding(amqp0_10_bit); break;//treat 'bit' as void, which is the default value for Variant - - //Variable Width types: - //strings: - case 0x80: // str8 - case 0x90: // str16 - case 0xa0: // str32 - out = in->get<string>(); - out.setEncoding(amqp0_10_binary); - break; - - case 0x84: // str8 - case 0x94: // str16 - out = in->get<string>(); - out.setEncoding(iso885915); - break; - - case 0x85: // str8 - case 0x95: // str16 - out = in->get<string>(); - out.setEncoding(utf8); - break; - - case 0x86: // str8 - case 0x96: // str16 - out = in->get<string>(); - out.setEncoding(utf16); - break; - - case 0xab: // str32 - out = in->get<string>(); - out.setEncoding(amqp0_10_struct); - break; - - case 0xa8: // map - out = ManagementAgent::toMap(in->get<FieldTable>()); - break; - - case 0xa9: // list of variant types - out = ManagementAgent::toList(in->get<List>()); - break; - //case 0xaa: //convert amqp0-10 array (uniform type) into variant list - // out = Variant::List(); - // translate<Array>(in, out.asList(), &toVariant); - // break; - - default: - //error? - QPID_LOG(error, "Unknown FieldValue type - not converted: [" << (unsigned int)(in->getType()) << "]"); - break; - } - - return out; -} - // Build up a list of the current set of deleted objects that are pending their // next (last) publish-ment. @@ -3085,3 +2915,21 @@ bool ManagementAgent::moveDeletedObjectsLH() { } return !deleteList.empty(); } + +namespace qpid { +namespace management { + +namespace { +QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; +} + +void setManagementExecutionContext(const qpid::broker::ConnectionState* ctxt) +{ + executionContext = ctxt; +} +const qpid::broker::ConnectionState* getManagementExecutionContext() +{ + return executionContext; +} + +}} diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 0db19594a7..c21f384433 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -41,6 +41,9 @@ #include <map> namespace qpid { +namespace broker { +class ConnectionState; +} namespace management { class ManagementAgent @@ -142,13 +145,7 @@ public: const framing::Uuid& getUuid() const { return uuid; } void setUuid(const framing::Uuid& id) { uuid = id; writeData(); } - // TODO: remove these when Variant API moved into common library. static types::Variant::Map toMap(const framing::FieldTable& from); - static framing::FieldTable fromMap(const types::Variant::Map& from); - static types::Variant::List toList(const framing::List& from); - static framing::List fromList(const types::Variant::List& from); - static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in); - static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); // For Clustering: management objects that have been marked as // "deleted", but are waiting for their last published object @@ -422,6 +419,8 @@ private: void debugSnapshot(const char* title); }; +void setManagementExecutionContext(const qpid::broker::ConnectionState*); +const qpid::broker::ConnectionState* getManagementExecutionContext(); }} - + #endif /*!_ManagementAgent_*/ |
