From c47ffec9b9e4608e511c03ed597a6c988460703f Mon Sep 17 00:00:00 2001 From: "Carl C. Trieloff" Date: Thu, 16 Aug 2007 20:57:43 +0000 Subject: - Fix for asyncIO for store - Fix for dtx async IO recover - Temp patch for Tx commit ( existing bug uncovered ) - All store tests should be working again Know issues: - If a msg is sent to more than one queue, then the io complete is signaled on the first record written, not the last - Open issues for tx begin then commit with no prepare using duarble msgs and async IO. async complete bit not set on recovery. will be fixed with next commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566846 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/BrokerQueue.cpp | 8 ++++++-- cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/broker/BrokerQueue.cpp b/cpp/src/qpid/broker/BrokerQueue.cpp index e2b12ef316..3ae2ce8de3 100644 --- a/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/cpp/src/qpid/broker/BrokerQueue.cpp @@ -87,6 +87,7 @@ void Queue::deliver(Message::shared_ptr& msg){ void Queue::recover(Message::shared_ptr& msg){ push(msg); + msg->enqueueComplete(); // mark the message as enqueued if (store && msg->expectedContentSize() != msg->encodedContentSize()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -189,10 +190,13 @@ Message::shared_ptr Queue::dequeue(){ Message::shared_ptr msg; if(!messages.empty()){ msg = messages.front(); - if (msg->isEnqueueComplete()) + if (msg->isEnqueueComplete()){ pop(); + return msg; + } } - return msg; + Message::shared_ptr msg_empty; + return msg_empty; } uint32_t Queue::purge(){ diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 2daf3b2d0a..954c50faee 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -194,6 +194,7 @@ void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) { + msg->enqueueComplete(); // recoved nmessage to enqueued in store already buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); } -- cgit v1.2.1