diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 27 |
1 files changed, 27 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 1f508a1cc7..52404c826c 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -365,6 +365,25 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { return false; } +namespace { +struct PositionEquals { + SequenceNumber pos; + PositionEquals(SequenceNumber p) : pos(p) {} + bool operator()(const QueuedMessage& msg) const { return msg.position == pos; } +}; +}// namespace + +bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const { + Mutex::ScopedLock locker(messageLock); + Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos)); + if (i == messages.end()) + return false; + else { + msg = *i; + return true; + } +} + void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ Mutex::ScopedLock locker(consumerLock); if(exclusive) { @@ -827,3 +846,11 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str return status; } + +void Queue::setPosition(SequenceNumber n) { + if (n <= sequence) + throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence + << " for queue " << name)); + sequence = n; + --sequence; // Decrement so ++sequence will return n. +} |
