diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/MessageGroupManager.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.cpp | 110 |
1 files changed, 0 insertions, 110 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp index 47e40a4794..c083e4ee0f 100644 --- a/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -302,19 +302,6 @@ void MessageGroupManager::setDefaults(const std::string& groupId) // static defaultGroupId = groupId; } -/** Cluster replication: - - state map format: - - { "group-state": [ {"name": <group-name>, - "owner": <consumer-name>-or-empty, - "acquired-ct": <acquired count>, - "positions": [Seqnumbers, ... ]}, - {...} - ] - } -*/ - namespace { const std::string GROUP_NAME("name"); const std::string GROUP_OWNER("owner"); @@ -324,100 +311,3 @@ namespace { const std::string GROUP_STATE("group-state"); } - -/** Runs on UPDATER to snapshot current state */ -void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const -{ - using namespace qpid::framing; - state.clear(); - framing::Array groupState(TYPE_CODE_MAP); - for (GroupMap::const_iterator g = messageGroups.begin(); - g != messageGroups.end(); ++g) { - - framing::FieldTable group; - group.setString(GROUP_NAME, g->first); - group.setString(GROUP_OWNER, g->second.owner); - group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); - framing::Array positions(TYPE_CODE_UINT32); - framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); - for (GroupState::MessageFifo::const_iterator p = g->second.members.begin(); - p != g->second.members.end(); ++p) { - positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position ))); - acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired ))); - } - group.setArray(GROUP_POSITIONS, positions); - group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); - groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); - } - state.setArray(GROUP_STATE, groupState); - - QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); -} - - -/** called on UPDATEE to set state from snapshot */ -void MessageGroupManager::setState(const qpid::framing::FieldTable& state) -{ - using namespace qpid::framing; - messageGroups.clear(); - freeGroups.clear(); - cachedGroup = 0; - - framing::Array groupState(TYPE_CODE_MAP); - - bool ok = state.getArray(GROUP_STATE, groupState); - if (!ok) { - QPID_LOG(error, "Unable to find message group state information for queue \"" << - qName << "\": cluster inconsistency error!"); - return; - } - - for (framing::Array::const_iterator g = groupState.begin(); - g != groupState.end(); ++g) { - framing::FieldTable group; - ok = framing::getEncodedValue<FieldTable>(*g, group); - if (!ok) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": table encoding error!"); - return; - } - MessageGroupManager::GroupState state; - if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": fields missing error!"); - return; - } - state.group = group.getAsString(GROUP_NAME); - state.owner = group.getAsString(GROUP_OWNER); - state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); - framing::Array positions(TYPE_CODE_UINT32); - ok = group.getArray(GROUP_POSITIONS, positions); - if (!ok) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": position encoding error!"); - return; - } - framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); - ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); - if (!ok || positions.count() != acquiredMsgs.count()) { - QPID_LOG(error, "Invalid message group state information for queue \"" << - qName << "\": acquired flag encoding error!"); - return; - } - - Array::const_iterator a = acquiredMsgs.begin(); - for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) { - GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>()); - mState.acquired = (*a++)->getIntegerValue<bool>(); - state.members.push_back(mState); - } - - messageGroups[state.group] = state; - if (!state.owned()) { - assert(state.members.size()); - freeGroups[state.members.front().position] = &messageGroups[state.group]; - } - } - - QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) -} |