diff options
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp | 15 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp | 6 |
4 files changed, 26 insertions, 5 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 86de96468d..b4160edbd6 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -179,6 +179,10 @@ void Queue::deliver(boost::intrusive_ptr<Message>& msg){ } } +void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg) +{ + if (policy.get()) policy->recoverEnqueued(msg); +} void Queue::recover(boost::intrusive_ptr<Message>& msg){ if (policy.get()) policy->recoverEnqueued(msg); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 661e46f619..a2dad96fe0 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -336,6 +336,12 @@ namespace qpid { // For cluster update QueueListeners& getListeners(); + + /** + * Reserve space in policy for an enqueued message that + * has been recovered in the prepared state (dtx only) + */ + void recoverPrepared(boost::intrusive_ptr<Message>& msg); }; } } diff --git a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp index c4dd6d5f87..658fd5a89e 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp @@ -23,17 +23,24 @@ using boost::intrusive_ptr; using namespace qpid::broker; -RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {} +RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) +{ + queue->recoverPrepared(msg); +} -bool RecoveredDequeue::prepare(TransactionContext*) throw(){ +bool RecoveredDequeue::prepare(TransactionContext*) throw() +{ //should never be called; transaction has already prepared if an enqueue is recovered return false; } -void RecoveredDequeue::commit() throw(){ +void RecoveredDequeue::commit() throw() +{ + queue->enqueueAborted(msg); } -void RecoveredDequeue::rollback() throw(){ +void RecoveredDequeue::rollback() throw() +{ msg->enqueueComplete(); queue->process(msg); } diff --git a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp index 419c9771fc..48faa0942c 100644 --- a/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp @@ -23,7 +23,10 @@ using boost::intrusive_ptr; using namespace qpid::broker; -RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {} +RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) +{ + queue->recoverPrepared(msg); +} bool RecoveredEnqueue::prepare(TransactionContext*) throw(){ //should never be called; transaction has already prepared if an enqueue is recovered @@ -36,5 +39,6 @@ void RecoveredEnqueue::commit() throw(){ } void RecoveredEnqueue::rollback() throw(){ + queue->enqueueAborted(msg); } |
