summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp16
-rw-r--r--cpp/src/qpid/broker/Queue.h8
-rw-r--r--cpp/src/qpid/broker/SessionState.h4
3 files changed, 10 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 968720050d..388b2d77dd 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -380,15 +380,12 @@ struct PositionEquals {
};
}// namespace
-bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const {
+QueuedMessage Queue::find(SequenceNumber pos) const {
Mutex::ScopedLock locker(messageLock);
Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
- if (i == messages.end())
- return false;
- else {
- msg = *i;
- return true;
- }
+ if (i != messages.end())
+ return *i;
+ return QueuedMessage();
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
@@ -876,9 +873,6 @@ Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, str
}
void Queue::setPosition(SequenceNumber n) {
- if (n <= sequence)
- throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence
- << " for queue " << name));
+ Mutex::ScopedLock locker(messageLock);
sequence = n;
- --sequence; // Decrement so ++sequence will return n.
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index bca01f7ef5..21cb5ad42d 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -230,12 +230,8 @@ RateTracker dequeueTracker;
*/
QueuedMessage get();
- /** Get the message at position pos
- *@param msg out parameter, assigned to the message found.
- *@param pos position to search for.
- *@return True if there is a message at pos, false otherwise.
- */
- bool find(QueuedMessage& msg, framing::SequenceNumber pos) const;
+ /** Get the message at position pos */
+ QueuedMessage find(framing::SequenceNumber pos) const;
const QueuePolicy* getPolicy();
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index d08cc5fa2c..250d520145 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -100,10 +100,12 @@ class SessionState : public qpid::SessionState,
void readyToSend();
+ // Used by cluster to create replica sessions.
template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); }
+ template <class F> void eachUnacked(F f) { semanticState.eachUnacked(f); }
SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); }
-
boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+ void record(const DeliveryRecord& delivery) { semanticState.record(delivery); }
private: