summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp27
1 files changed, 27 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 1f508a1cc7..52404c826c 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -365,6 +365,25 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
return false;
}
+namespace {
+struct PositionEquals {
+ SequenceNumber pos;
+ PositionEquals(SequenceNumber p) : pos(p) {}
+ bool operator()(const QueuedMessage& msg) const { return msg.position == pos; }
+};
+}// namespace
+
+bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const {
+ Mutex::ScopedLock locker(messageLock);
+ Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
+ if (i == messages.end())
+ return false;
+ else {
+ msg = *i;
+ return true;
+ }
+}
+
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
@@ -827,3 +846,11 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
return status;
}
+
+void Queue::setPosition(SequenceNumber n) {
+ if (n <= sequence)
+ throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence
+ << " for queue " << name));
+ sequence = n;
+ --sequence; // Decrement so ++sequence will return n.
+}