diff options
Diffstat (limited to 'cpp/src/qpid/broker/MessageGroupManager.cpp')
-rw-r--r-- | cpp/src/qpid/broker/MessageGroupManager.cpp | 411 |
1 files changed, 411 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/MessageGroupManager.cpp b/cpp/src/qpid/broker/MessageGroupManager.cpp new file mode 100644 index 0000000000..07b05f3b92 --- /dev/null +++ b/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -0,0 +1,411 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FieldTable.h" +#include "qpid/types/Variant.h" +#include "qpid/log/Statement.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/MessageGroupManager.h" + +using namespace qpid::broker; + +namespace { + const std::string GROUP_QUERY_KEY("qpid.message_group_queue"); + const std::string GROUP_HEADER_KEY("group_header_key"); + const std::string GROUP_STATE_KEY("group_state"); + const std::string GROUP_ID_KEY("group_id"); + const std::string GROUP_MSG_COUNT("msg_count"); + const std::string GROUP_TIMESTAMP("timestamp"); + const std::string GROUP_CONSUMER("consumer"); +} + + +const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); +const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); +const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); + + +void MessageGroupManager::unFree( const GroupState& state ) +{ + GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + assert( pos != freeGroups.end() && pos->second == &state ); + freeGroups.erase( pos ); +} + +void MessageGroupManager::own( GroupState& state, const std::string& owner ) +{ + state.owner = owner; + unFree( state ); +} + +void MessageGroupManager::disown( GroupState& state ) +{ + state.owner.clear(); + assert(state.members.size()); + assert(freeGroups.find(state.members.front()) == freeGroups.end()); + freeGroups[state.members.front()] = &state; +} + +const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const +{ + const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); + if (!headers) return defaultGroupId; + qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); + if (!id || !id->convertsTo<std::string>()) return defaultGroupId; + return id->get<std::string>(); +} + + +void MessageGroupManager::enqueued( const QueuedMessage& qm ) +{ + // @todo KAG optimization - store reference to group state in QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupState &state(messageGroups[group]); + state.members.push_back(qm.position); + uint32_t total = state.members.size(); + QPID_LOG( trace, "group queue " << qName << + ": added message to group id=" << group << " total=" << total ); + if (total == 1) { + // newly created group, no owner + state.group = group; + assert(freeGroups.find(qm.position) == freeGroups.end()); + freeGroups[qm.position] = &state; + } +} + + +void MessageGroupManager::acquired( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + state.acquired += 1; + QPID_LOG( trace, "group queue " << qName << + ": acquired message in group id=" << group << " acquired=" << state.acquired ); +} + + +void MessageGroupManager::requeued( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.acquired != 0 ); + state.acquired -= 1; + if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } + QPID_LOG( trace, "group queue " << qName << + ": requeued message to group id=" << group << " acquired=" << state.acquired ); +} + + +void MessageGroupManager::dequeued( const QueuedMessage& qm ) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + // issue: const-ness?? + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + assert( state.members.size() != 0 ); + assert( state.acquired != 0 ); + state.acquired -= 1; + + // likely to be at or near begin() if dequeued in order + bool reFreeNeeded = false; + if (state.members.front() == qm.position) { + if (!state.owned()) { + // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! + // if on freelist, it is indexed by first member, which is about to be removed! + unFree(state); + reFreeNeeded = true; + } + state.members.pop_front(); + } else { + GroupState::PositionFifo::iterator pos = state.members.begin() + 1; + GroupState::PositionFifo::iterator end = state.members.end(); + while (pos != end) { + if (*pos == qm.position) { + state.members.erase(pos); + break; + } + ++pos; + } + } + + uint32_t total = state.members.size(); + if (total == 0) { + QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); + messageGroups.erase( gs ); + } else if (state.acquired == 0 && state.owned()) { + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << state.owner << " released group id=" << gs->first); + disown(state); + } else if (reFreeNeeded) { + disown(state); + } + QPID_LOG( trace, "group queue " << qName << + ": dequeued message from group id=" << group << " total=" << total ); +} + +bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + if (messages.empty()) + return false; + + if (!freeGroups.empty()) { + framing::SequenceNumber nextFree = freeGroups.begin()->first; + if (nextFree < c->position) { // next free group's msg is older than current position + bool ok = messages.find(nextFree, next); + (void) ok; assert( ok ); + } else { + if (!messages.next( c->position, next )) + return false; // shouldn't happen - should find nextFree + } + } else { // no free groups available + if (!messages.next( c->position, next )) + return false; + } + + do { + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId( next ) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + if (!state.owned() || state.owner == c->getName()) { + return true; + } + } while (messages.next( next.position, next )); + return false; +} + + +bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) +{ + // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage + std::string group( getGroupId(qm) ); + GroupMap::iterator gs = messageGroups.find( group ); + assert( gs != messageGroups.end() ); + GroupState& state( gs->second ); + + if (!state.owned()) { + own( state, consumer ); + QPID_LOG( trace, "group queue " << qName << + ": consumer name=" << consumer << " has acquired group id=" << gs->first); + return true; + } + return state.owner == consumer; +} + +bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) +{ + // browse: allow access to any available msg, regardless of group ownership (?ok?) + if (!messages.empty() && messages.next(c->position, next)) + return true; + return false; +} + +void MessageGroupManager::query(qpid::types::Variant::Map& status) const +{ + /** Add a description of the current state of the message groups for this queue. + FORMAT: + { "qpid.message_group_queue": + { "group_header_key" : "<KEY>", + "group_state" : + [ { "group_id" : "<name>", + "msg_count" : <int>, + "timestamp" : <absTime>, + "consumer" : <consumer name> }, + {...} // one for each known group + ] + } + } + **/ + + assert(status.find(GROUP_QUERY_KEY) == status.end()); + qpid::types::Variant::Map state; + qpid::types::Variant::List groups; + + state[GROUP_HEADER_KEY] = groupIdHeader; + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + qpid::types::Variant::Map info; + info[GROUP_ID_KEY] = g->first; + info[GROUP_MSG_COUNT] = g->second.members.size(); + info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ + info[GROUP_CONSUMER] = g->second.owner; + groups.push_back(info); + } + state[GROUP_STATE_KEY] = groups; + status[GROUP_QUERY_KEY] = state; +} + + +boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName, + Messages& messages, + const qpid::framing::FieldTable& settings ) +{ + boost::shared_ptr<MessageGroupManager> empty; + + if (settings.isSet(qpidMessageGroupKey)) { + + // @todo: remove once "sticky" consumers are supported - see QPID-3347 + if (!settings.isSet(qpidSharedGroup)) { + QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." ); + return empty; + } + + std::string headerKey = settings.getAsString(qpidMessageGroupKey); + if (headerKey.empty()) { + QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName); + return empty; + } + unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); + + boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) ); + + QPID_LOG( debug, "Configured Queue '" << qName << + "' for message grouping using header key '" << headerKey << "'" << + " (timestamp=" << timestamp << ")"); + return manager; + } + return empty; +} + +std::string MessageGroupManager::defaultGroupId; +void MessageGroupManager::setDefaults(const std::string& groupId) // static +{ + defaultGroupId = groupId; +} + +/** Cluster replication: + + state map format: + + { "group-state": [ {"name": <group-name>, + "owner": <consumer-name>-or-empty, + "acquired-ct": <acquired count>, + "positions": [Seqnumbers, ... ]}, + {...} + ] + } +*/ + +namespace { + const std::string GROUP_NAME("name"); + const std::string GROUP_OWNER("owner"); + const std::string GROUP_ACQUIRED_CT("acquired-ct"); + const std::string GROUP_POSITIONS("positions"); + const std::string GROUP_STATE("group-state"); +} + + +/** Runs on UPDATER to snapshot current state */ +void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const +{ + using namespace qpid::framing; + state.clear(); + framing::Array groupState(TYPE_CODE_MAP); + for (GroupMap::const_iterator g = messageGroups.begin(); + g != messageGroups.end(); ++g) { + + framing::FieldTable group; + group.setString(GROUP_NAME, g->first); + group.setString(GROUP_OWNER, g->second.owner); + group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); + framing::Array positions(TYPE_CODE_UINT32); + for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); + p != g->second.members.end(); ++p) + positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); + group.setArray(GROUP_POSITIONS, positions); + groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); + } + state.setArray(GROUP_STATE, groupState); + + QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); +} + + +/** called on UPDATEE to set state from snapshot */ +void MessageGroupManager::setState(const qpid::framing::FieldTable& state) +{ + using namespace qpid::framing; + messageGroups.clear(); + //consumers.clear(); + freeGroups.clear(); + + framing::Array groupState(TYPE_CODE_MAP); + + bool ok = state.getArray(GROUP_STATE, groupState); + if (!ok) { + QPID_LOG(error, "Unable to find message group state information for queue \"" << + qName << "\": cluster inconsistency error!"); + return; + } + + for (framing::Array::const_iterator g = groupState.begin(); + g != groupState.end(); ++g) { + framing::FieldTable group; + ok = framing::getEncodedValue<FieldTable>(*g, group); + if (!ok) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": table encoding error!"); + return; + } + MessageGroupManager::GroupState state; + if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": fields missing error!"); + return; + } + state.group = group.getAsString(GROUP_NAME); + state.owner = group.getAsString(GROUP_OWNER); + state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); + framing::Array positions(TYPE_CODE_UINT32); + ok = group.getArray(GROUP_POSITIONS, positions); + if (!ok) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": position encoding error!"); + return; + } + + for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) + state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); + messageGroups[state.group] = state; + if (!state.owned()) { + assert(state.members.size()); + freeGroups[state.members.front()] = &messageGroups[state.group]; + } + } + + QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) +} |