diff options
| author | Ted Ross <tross@apache.org> | 2010-03-31 21:13:12 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-03-31 21:13:12 +0000 |
| commit | 2e29faa768283390452b7d432db28d43cd4a27aa (patch) | |
| tree | 521e9711b340a330408245ba699b35d12b36ce9c /cpp/src/qpid/management/ManagementObject.cpp | |
| parent | 5e50981ac8a35db09723ad19f5994703d00e10d9 (diff) | |
| download | qpid-python-2e29faa768283390452b7d432db28d43cd4a27aa.tar.gz | |
Merged the changes from the qmf-devel0.7a branch back to the trunk.
This is a checkpoint along the QMFv2 development path.
This update introduces portions of QMFv2 into the code:
- The C++ agent (qpid/agent) uses QMFv2 for data and method transfer
o The APIs no longer use qpid::framing::*
o Consequently, boost is no longer referenced from the API headers.
o Agents and Objects are now referenced by strings, not numbers.
o Schema transfer still uses the QMFv1 format.
- The broker-resident agent can use QMFv1 or QMFv2 based on the command line options.
It defaults to QMFv1 for compatibility.
- The pure-python QMF console (qmf.console) can concurrently interact with both
QMFv1 and QMFv2 agents.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@929716 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management/ManagementObject.cpp')
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 224 |
1 files changed, 184 insertions, 40 deletions
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 4b87800174..46fc67d07f 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -22,7 +22,10 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/Buffer.h" #include "qpid/sys/Thread.h" +#include "qpid/log/Statement.h" +#include <boost/lexical_cast.hpp> #include <stdlib.h> @@ -36,26 +39,37 @@ void AgentAttachment::setBanks(uint32_t broker, uint32_t bank) ((uint64_t) (bank & 0x0fffffff)); } -ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint32_t bank, uint64_t object) - : agent(0) +// Deprecated +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker, uint64_t object) + : agent(0), agentEpoch(seq) { first = ((uint64_t) (flags & 0x0f)) << 60 | ((uint64_t) (seq & 0x0fff)) << 48 | - ((uint64_t) (broker & 0x000fffff)) << 28 | - ((uint64_t) (bank & 0x0fffffff)); + ((uint64_t) (broker & 0x000fffff)) << 28; second = object; } -ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq, uint64_t object) - : agent(_agent) + +ObjectId::ObjectId(uint8_t flags, uint16_t seq, uint32_t broker) + : agent(0), second(0), agentEpoch(seq) { first = + ((uint64_t) (flags & 0x0f)) << 60 | + ((uint64_t) (seq & 0x0fff)) << 48 | + ((uint64_t) (broker & 0x000fffff)) << 28; +} + +ObjectId::ObjectId(AgentAttachment* _agent, uint8_t flags, uint16_t seq) + : agent(_agent), second(0), agentEpoch(seq) +{ + + first = ((uint64_t) (flags & 0x0f)) << 60 | ((uint64_t) (seq & 0x0fff)) << 48; - second = object; } + ObjectId::ObjectId(std::istream& in) : agent(0) { std::string text; @@ -75,6 +89,10 @@ void ObjectId::fromString(const std::string& text) # define atoll(X) _atoi64(X) #endif + // format: + // V1: <flags>-<sequence>-<broker-bank>-<agent-bank>-<uint64-app-id> + // V2: Not used + std::string copy(text.c_str()); char* cText; char* field[FIELDS]; @@ -99,10 +117,13 @@ void ObjectId::fromString(const std::string& text) if (idx != FIELDS) throw Exception("Invalid ObjectId format"); + agentEpoch = atoll(field[1]); + first = (atoll(field[0]) << 60) + (atoll(field[1]) << 48) + - (atoll(field[2]) << 28) + - atoll(field[3]); + (atoll(field[2]) << 28); + + agentName = std::string(field[3]); second = atoll(field[4]); } @@ -123,21 +144,40 @@ bool ObjectId::equalV1(const ObjectId &other) const return first == otherFirst && second == other.second; } -void ObjectId::encode(framing::Buffer& buffer) const +// encode as V1-format binary +void ObjectId::encode(std::string& buffer) const { + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + if (agent == 0) - buffer.putLongLong(first); + body.putLongLong(first); else - buffer.putLongLong(first | agent->first); - buffer.putLongLong(second); + body.putLongLong(first | agent->first); + body.putLongLong(second); + + body.reset(); + body.getRawData(buffer, len); } -void ObjectId::decode(framing::Buffer& buffer) +// decode as V1-format binary +void ObjectId::decode(const std::string& buffer) { - first = buffer.getLongLong(); - second = buffer.getLongLong(); + const uint32_t len = 16; + char _data[len]; + qpid::framing::Buffer body(_data, len); + + body.checkAvailable(buffer.length()); + body.putRawData(buffer); + body.reset(); + first = body.getLongLong(); + second = body.getLongLong(); + v2Key = boost::lexical_cast<std::string>(second); } +// generate the V2 key from the index fields defined +// in the schema. void ObjectId::setV2Key(const ManagementObject& object) { std::stringstream oname; @@ -145,6 +185,42 @@ void ObjectId::setV2Key(const ManagementObject& object) v2Key = oname.str(); } +// encode as V2-format map +void ObjectId::mapEncode(types::Variant::Map& map) const +{ + map["_object_name"] = v2Key; + if (!agentName.empty()) + map["_agent_name"] = agentName; + if (agentEpoch) + map["_agent_epoch"] = agentEpoch; +} + +// decode as v2-format map +void ObjectId::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_object_name")) != map.end()) + v2Key = i->second.asString(); + else + throw Exception("Required _object_name field missing."); + + if ((i = map.find("_agent_name")) != map.end()) + agentName = i->second.asString(); + + if ((i = map.find("_agent_epoch")) != map.end()) + agentEpoch = i->second.asInt64(); +} + + +ObjectId::operator types::Variant::Map() const +{ + types::Variant::Map m; + mapEncode(m); + return m; +} + + namespace qpid { namespace management { @@ -158,7 +234,7 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) out << ((virtFirst & 0xF000000000000000LL) >> 60) << "-" << ((virtFirst & 0x0FFF000000000000LL) >> 48) << "-" << ((virtFirst & 0x0000FFFFF0000000LL) >> 28) << - "-" << (virtFirst & 0x000000000FFFFFFFLL) << + "-" << i.agentName << "-" << i.second; return out; } @@ -168,43 +244,88 @@ std::ostream& operator<<(std::ostream& out, const ObjectId& i) int ManagementObject::maxThreads = 1; int ManagementObject::nextThreadIndex = 0; -void ManagementObject::writeTimestamps (framing::Buffer& buf) const +void ManagementObject::writeTimestamps (std::string& buf) const { - buf.putShortString (getPackageName ()); - buf.putShortString (getClassName ()); - buf.putBin128 (getMd5Sum ()); - buf.putLongLong (updateTime); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); - objectId.encode(buf); + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); + + body.putShortString (getPackageName ()); + body.putShortString (getClassName ()); + body.putBin128 (getMd5Sum ()); + body.putLongLong (updateTime); + body.putLongLong (createTime); + body.putLongLong (destroyTime); + + uint32_t len = body.getPosition(); + body.reset(); + body.getRawData(buf, len); + + std::string oid; + objectId.encode(oid); + buf += oid; } -void ManagementObject::readTimestamps (framing::Buffer& buf) +void ManagementObject::readTimestamps (const std::string& buf) { + char _data[4000]; + qpid::framing::Buffer body(_data, 4000); std::string unused; uint8_t unusedUuid[16]; - ObjectId unusedObjectId; - buf.getShortString(unused); - buf.getShortString(unused); - buf.getBin128(unusedUuid); - updateTime = buf.getLongLong(); - createTime = buf.getLongLong(); - destroyTime = buf.getLongLong(); - unusedObjectId.decode(buf); + body.checkAvailable(buf.length()); + body.putRawData(buf); + body.reset(); + + body.getShortString(unused); + body.getShortString(unused); + body.getBin128(unusedUuid); + updateTime = body.getLongLong(); + createTime = body.getLongLong(); + destroyTime = body.getLongLong(); } uint32_t ManagementObject::writeTimestampsSize() const { return 1 + getPackageName().length() + // str8 - 1 + getClassName().length() + // str8 - 16 + // bin128 - 8 + // uint64 - 8 + // uint64 - 8 + // uint64 - objectId.encodedSize(); // objectId + 1 + getClassName().length() + // str8 + 16 + // bin128 + 8 + // uint64 + 8 + // uint64 + 8 + // uint64 + objectId.encodedSize(); // objectId +} + + +void ManagementObject::writeTimestamps (types::Variant::Map& map) const +{ + types::Variant::Map oid, sid; + + sid["_package_name"] = getPackageName(); + sid["_class_name"] = getClassName(); + sid["_hash"] = qpid::types::Uuid(getMd5Sum()); + map["_schema_id"] = sid; + + objectId.mapEncode(oid); + map["_object_id"] = oid; + + map["_update_ts"] = updateTime; + map["_create_ts"] = createTime; + map["_delete_ts"] = destroyTime; +} + +void ManagementObject::readTimestamps (const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + if ((i = map.find("_update_ts")) != map.end()) + updateTime = i->second.asUint64(); + if ((i = map.find("_create_ts")) != map.end()) + createTime = i->second.asUint64(); + if ((i = map.find("_delete_ts")) != map.end()) + destroyTime = i->second.asUint64(); } + void ManagementObject::setReference(ObjectId) {} int ManagementObject::getThreadIndex() { @@ -217,3 +338,26 @@ int ManagementObject::getThreadIndex() { } return thisIndex; } + + +void ManagementObject::mapEncode(types::Variant::Map& map, + bool includeProperties, + bool includeStatistics) +{ + types::Variant::Map values; + + writeTimestamps(map); + + mapEncodeValues(values, includeProperties, includeStatistics); + map["_values"] = values; +} + +void ManagementObject::mapDecode(const types::Variant::Map& map) +{ + types::Variant::Map::const_iterator i; + + readTimestamps(map); + + if ((i = map.find("_values")) != map.end()) + mapDecodeValues(i->second.asMap()); +} |
