diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-16 16:43:15 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-16 16:43:15 +0100 |
| commit | 01346bcfb11aa88e5a1ab51361997f5415ce0dd6 (patch) | |
| tree | f1238f1d57b12307366bcf1f49ef23b19bb686eb | |
| parent | 641a66abb12a2605c4538b53b336ca11755b0560 (diff) | |
| download | rabbitmq-server-git-01346bcfb11aa88e5a1ab51361997f5415ce0dd6.tar.gz | |
Bug fix.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 14 |
2 files changed, 17 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7190953db0..546d8fbea5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -486,9 +486,10 @@ commit_transaction(Txn, From, State) -> store_ch_record(C#cr{unacked_messages = Remaining}), [AckTag || {_Msg, AckTag} <- MsgsWithAcks] end, - VQS = rabbit_variable_queue:tx_commit( - PendingMessagesOrdered, Acks, From, State #q.variable_queue_state), - State #q { variable_queue_state = VQS }. + {RunQueue, VQS} = + rabbit_variable_queue:tx_commit( + PendingMessagesOrdered, Acks, From, State #q.variable_queue_state), + {RunQueue, State #q { variable_queue_state = VQS }}. rollback_transaction(Txn, State) -> #tx { pending_messages = PendingMessages @@ -573,9 +574,12 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> - NewState = commit_transaction(Txn, From, State), + {RunQueue, NewState} = commit_transaction(Txn, From, State), erase_tx(Txn), - noreply(NewState); + noreply(case RunQueue of + true -> run_message_queue(NewState); + false -> NewState + end); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index dddfb4a85c..ac2bab0fd7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -287,10 +287,12 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = IndexState1 }. purge(State = #vqstate { prefetcher = undefined, q4 = Q4, - index_state = IndexState }) -> + index_state = IndexState, len = Len }) -> {Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState), - purge1(Q4Count, State #vqstate { index_state = IndexState1, - q4 = queue:new() }); + {TotalCount, State1} = + purge1(Q4Count, State #vqstate { index_state = IndexState1, + q4 = queue:new() }), + {TotalCount, State1 #vqstate { len = 0 }}; purge(State) -> purge(drain_prefetcher(stop, State)). @@ -305,7 +307,7 @@ delete(State) -> {GammaSeqId, NextSeqId} -> {_DeleteCount, IndexState1} = delete1(NextSeqId, 0, GammaSeqId, IndexState), - State1 #vqstate { index_state = IndexState1 } + State1 #vqstate { index_state = IndexState1, len = 0 } end. %% [{Msg, AckTag}] @@ -352,7 +354,7 @@ tx_rollback(Pubs, State) -> tx_commit(Pubs, AckTags, From, State) -> case persistent_msg_ids(Pubs) of [] -> - do_tx_commit(Pubs, AckTags, From, State); + {true, do_tx_commit(Pubs, AckTags, From, State)}; PersistentMsgIds -> Self = self(), ok = rabbit_msg_store:sync( @@ -360,7 +362,7 @@ tx_commit(Pubs, AckTags, From, State) -> fun () -> ok = rabbit_amqqueue:tx_commit_callback( Self, Pubs, AckTags, From) end), - State + {false, State} end. do_tx_commit(Pubs, AckTags, From, State) -> |
