/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/framing/FieldTable.h" #include "qpid/types/Variant.h" #include "qpid/log/Statement.h" #include "qpid/broker/Queue.h" #include "qpid/broker/MessageGroupManager.h" using namespace qpid::broker; namespace { const std::string GROUP_QUERY_KEY("qpid.message_group_queue"); const std::string GROUP_HEADER_KEY("group_header_key"); const std::string GROUP_STATE_KEY("group_state"); const std::string GROUP_ID_KEY("group_id"); const std::string GROUP_MSG_COUNT("msg_count"); const std::string GROUP_TIMESTAMP("timestamp"); const std::string GROUP_CONSUMER("consumer"); } const std::string MessageGroupManager::qpidMessageGroupKey("qpid.group_header_key"); const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); void MessageGroupManager::unFree( const GroupState& state ) { GroupFifo::iterator pos = freeGroups.find( state.members.front() ); assert( pos != freeGroups.end() && pos->second == &state ); freeGroups.erase( pos ); } void MessageGroupManager::own( GroupState& state, const std::string& owner ) { state.owner = owner; unFree( state ); } void MessageGroupManager::disown( GroupState& state ) { state.owner.clear(); assert(state.members.size()); assert(freeGroups.find(state.members.front()) == freeGroups.end()); freeGroups[state.members.front()] = &state; } const std::string MessageGroupManager::getGroupId( const QueuedMessage& qm ) const { const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders(); if (!headers) return defaultGroupId; qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader ); if (!id || !id->convertsTo()) return defaultGroupId; return id->get(); } void MessageGroupManager::enqueued( const QueuedMessage& qm ) { // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? std::string group( getGroupId(qm) ); GroupState &state(messageGroups[group]); state.members.push_back(qm.position); uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << ": added message to group id=" << group << " total=" << total ); if (total == 1) { // newly created group, no owner state.group = group; assert(freeGroups.find(qm.position) == freeGroups.end()); freeGroups[qm.position] = &state; } } void MessageGroupManager::acquired( const QueuedMessage& qm ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); state.acquired += 1; QPID_LOG( trace, "group queue " << qName << ": acquired message in group id=" << group << " acquired=" << state.acquired ); } void MessageGroupManager::requeued( const QueuedMessage& qm ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); assert( state.acquired != 0 ); state.acquired -= 1; if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << gs->first); disown(state); } QPID_LOG( trace, "group queue " << qName << ": requeued message to group id=" << group << " acquired=" << state.acquired ); } void MessageGroupManager::dequeued( const QueuedMessage& qm ) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); assert( state.members.size() != 0 ); assert( state.acquired != 0 ); state.acquired -= 1; // likely to be at or near begin() if dequeued in order bool reFreeNeeded = false; if (state.members.front() == qm.position) { if (!state.owned()) { // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! // if on freelist, it is indexed by first member, which is about to be removed! unFree(state); reFreeNeeded = true; } state.members.pop_front(); } else { GroupState::PositionFifo::iterator pos = state.members.begin() + 1; GroupState::PositionFifo::iterator end = state.members.end(); while (pos != end) { if (*pos == qm.position) { state.members.erase(pos); break; } ++pos; } } uint32_t total = state.members.size(); if (total == 0) { QPID_LOG( trace, "group queue " << qName << ": deleting group id=" << gs->first); messageGroups.erase( gs ); } else if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << gs->first); disown(state); } else if (reFreeNeeded) { disown(state); } QPID_LOG( trace, "group queue " << qName << ": dequeued message from group id=" << group << " total=" << total ); } bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { if (messages.empty()) return false; if (!freeGroups.empty()) { framing::SequenceNumber nextFree = freeGroups.begin()->first; if (nextFree < c->position) { // next free group's msg is older than current position bool ok = messages.find(nextFree, next); (void) ok; assert( ok ); } else { if (!messages.next( c->position, next )) return false; // shouldn't happen - should find nextFree } } else { // no free groups available if (!messages.next( c->position, next )) return false; } do { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage std::string group( getGroupId( next ) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); if (!state.owned() || state.owner == c->getName()) { return true; } } while (messages.next( next.position, next )); return false; } bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm) { // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage std::string group( getGroupId(qm) ); GroupMap::iterator gs = messageGroups.find( group ); assert( gs != messageGroups.end() ); GroupState& state( gs->second ); if (!state.owned()) { own( state, consumer ); QPID_LOG( trace, "group queue " << qName << ": consumer name=" << consumer << " has acquired group id=" << gs->first); return true; } return state.owner == consumer; } bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next ) { // browse: allow access to any available msg, regardless of group ownership (?ok?) if (!messages.empty() && messages.next(c->position, next)) return true; return false; } void MessageGroupManager::query(qpid::types::Variant::Map& status) const { /** Add a description of the current state of the message groups for this queue. FORMAT: { "qpid.message_group_queue": { "group_header_key" : "", "group_state" : [ { "group_id" : "", "msg_count" : , "timestamp" : , "consumer" : }, {...} // one for each known group ] } } **/ assert(status.find(GROUP_QUERY_KEY) == status.end()); qpid::types::Variant::Map state; qpid::types::Variant::List groups; state[GROUP_HEADER_KEY] = groupIdHeader; for (GroupMap::const_iterator g = messageGroups.begin(); g != messageGroups.end(); ++g) { qpid::types::Variant::Map info; info[GROUP_ID_KEY] = g->first; info[GROUP_MSG_COUNT] = g->second.members.size(); info[GROUP_TIMESTAMP] = 0; /** @todo KAG - NEED HEAD MSG TIMESTAMP */ info[GROUP_CONSUMER] = g->second.owner; groups.push_back(info); } state[GROUP_STATE_KEY] = groups; status[GROUP_QUERY_KEY] = state; } boost::shared_ptr MessageGroupManager::create( const std::string& qName, Messages& messages, const qpid::framing::FieldTable& settings ) { boost::shared_ptr empty; if (settings.isSet(qpidMessageGroupKey)) { // @todo: remove once "sticky" consumers are supported - see QPID-3347 if (!settings.isSet(qpidSharedGroup)) { QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." ); return empty; } std::string headerKey = settings.getAsString(qpidMessageGroupKey); if (headerKey.empty()) { QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName); return empty; } unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp); boost::shared_ptr manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) ); QPID_LOG( debug, "Configured Queue '" << qName << "' for message grouping using header key '" << headerKey << "'" << " (timestamp=" << timestamp << ")"); return manager; } return empty; } std::string MessageGroupManager::defaultGroupId; void MessageGroupManager::setDefaults(const std::string& groupId) // static { defaultGroupId = groupId; } /** Cluster replication: state map format: { "group-state": [ {"name": , "owner": -or-empty, "acquired-ct": , "positions": [Seqnumbers, ... ]}, {...} ] } */ namespace { const std::string GROUP_NAME("name"); const std::string GROUP_OWNER("owner"); const std::string GROUP_ACQUIRED_CT("acquired-ct"); const std::string GROUP_POSITIONS("positions"); const std::string GROUP_STATE("group-state"); } /** Runs on UPDATER to snapshot current state */ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const { using namespace qpid::framing; state.clear(); framing::Array groupState(TYPE_CODE_MAP); for (GroupMap::const_iterator g = messageGroups.begin(); g != messageGroups.end(); ++g) { framing::FieldTable group; group.setString(GROUP_NAME, g->first); group.setString(GROUP_OWNER, g->second.owner); group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); framing::Array positions(TYPE_CODE_UINT32); for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); p != g->second.members.end(); ++p) positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); group.setArray(GROUP_POSITIONS, positions); groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); } state.setArray(GROUP_STATE, groupState); QPID_LOG(debug, "Queue \"" << qName << "\": replicating message group state, key=" << groupIdHeader); } /** called on UPDATEE to set state from snapshot */ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) { using namespace qpid::framing; messageGroups.clear(); //consumers.clear(); freeGroups.clear(); framing::Array groupState(TYPE_CODE_MAP); bool ok = state.getArray(GROUP_STATE, groupState); if (!ok) { QPID_LOG(error, "Unable to find message group state information for queue \"" << qName << "\": cluster inconsistency error!"); return; } for (framing::Array::const_iterator g = groupState.begin(); g != groupState.end(); ++g) { framing::FieldTable group; ok = framing::getEncodedValue(*g, group); if (!ok) { QPID_LOG(error, "Invalid message group state information for queue \"" << qName << "\": table encoding error!"); return; } MessageGroupManager::GroupState state; if (!group.isSet(GROUP_NAME) || !group.isSet(GROUP_OWNER) || !group.isSet(GROUP_ACQUIRED_CT)) { QPID_LOG(error, "Invalid message group state information for queue \"" << qName << "\": fields missing error!"); return; } state.group = group.getAsString(GROUP_NAME); state.owner = group.getAsString(GROUP_OWNER); state.acquired = group.getAsInt(GROUP_ACQUIRED_CT); framing::Array positions(TYPE_CODE_UINT32); ok = group.getArray(GROUP_POSITIONS, positions); if (!ok) { QPID_LOG(error, "Invalid message group state information for queue \"" << qName << "\": position encoding error!"); return; } for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) state.members.push_back((*p)->getIntegerValue()); messageGroups[state.group] = state; if (!state.owned()) { assert(state.members.size()); freeGroups[state.members.front()] = &messageGroups[state.group]; } } QPID_LOG(debug, "Queue \"" << qName << "\": message group state replicated, key =" << groupIdHeader) }