summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-16 16:43:15 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-16 16:43:15 +0100
commit01346bcfb11aa88e5a1ab51361997f5415ce0dd6 (patch)
treef1238f1d57b12307366bcf1f49ef23b19bb686eb
parent641a66abb12a2605c4538b53b336ca11755b0560 (diff)
downloadrabbitmq-server-git-01346bcfb11aa88e5a1ab51361997f5415ce0dd6.tar.gz
Bug fix.
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_variable_queue.erl14
2 files changed, 17 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7190953db0..546d8fbea5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -486,9 +486,10 @@ commit_transaction(Txn, From, State) ->
store_ch_record(C#cr{unacked_messages = Remaining}),
[AckTag || {_Msg, AckTag} <- MsgsWithAcks]
end,
- VQS = rabbit_variable_queue:tx_commit(
- PendingMessagesOrdered, Acks, From, State #q.variable_queue_state),
- State #q { variable_queue_state = VQS }.
+ {RunQueue, VQS} =
+ rabbit_variable_queue:tx_commit(
+ PendingMessagesOrdered, Acks, From, State #q.variable_queue_state),
+ {RunQueue, State #q { variable_queue_state = VQS }}.
rollback_transaction(Txn, State) ->
#tx { pending_messages = PendingMessages
@@ -573,9 +574,12 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
- NewState = commit_transaction(Txn, From, State),
+ {RunQueue, NewState} = commit_transaction(Txn, From, State),
erase_tx(Txn),
- noreply(NewState);
+ noreply(case RunQueue of
+ true -> run_message_queue(NewState);
+ false -> NewState
+ end);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index dddfb4a85c..ac2bab0fd7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -287,10 +287,12 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = IndexState1 }.
purge(State = #vqstate { prefetcher = undefined, q4 = Q4,
- index_state = IndexState }) ->
+ index_state = IndexState, len = Len }) ->
{Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState),
- purge1(Q4Count, State #vqstate { index_state = IndexState1,
- q4 = queue:new() });
+ {TotalCount, State1} =
+ purge1(Q4Count, State #vqstate { index_state = IndexState1,
+ q4 = queue:new() }),
+ {TotalCount, State1 #vqstate { len = 0 }};
purge(State) ->
purge(drain_prefetcher(stop, State)).
@@ -305,7 +307,7 @@ delete(State) ->
{GammaSeqId, NextSeqId} ->
{_DeleteCount, IndexState1} =
delete1(NextSeqId, 0, GammaSeqId, IndexState),
- State1 #vqstate { index_state = IndexState1 }
+ State1 #vqstate { index_state = IndexState1, len = 0 }
end.
%% [{Msg, AckTag}]
@@ -352,7 +354,7 @@ tx_rollback(Pubs, State) ->
tx_commit(Pubs, AckTags, From, State) ->
case persistent_msg_ids(Pubs) of
[] ->
- do_tx_commit(Pubs, AckTags, From, State);
+ {true, do_tx_commit(Pubs, AckTags, From, State)};
PersistentMsgIds ->
Self = self(),
ok = rabbit_msg_store:sync(
@@ -360,7 +362,7 @@ tx_commit(Pubs, AckTags, From, State) ->
fun () -> ok = rabbit_amqqueue:tx_commit_callback(
Self, Pubs, AckTags, From)
end),
- State
+ {false, State}
end.
do_tx_commit(Pubs, AckTags, From, State) ->