summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl20
-rw-r--r--src/rabbit_variable_queue.erl17
2 files changed, 22 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fa445c3ae5..b92de66743 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -905,10 +905,12 @@ handle_cast({tx_commit_msg_store_callback, IsTransientPubs, Pubs, AckTags, From}
end);
handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) ->
- noreply(
- run_message_queue(
- State#q{variable_queue_state =
- rabbit_variable_queue:tx_commit_from_vq(VQS)}));
+ {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS),
+ State1 = State#q{variable_queue_state = VQS1},
+ noreply(case RunQueue of
+ true -> run_message_queue(State1);
+ false -> State1
+ end);
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
@@ -970,10 +972,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
end;
handle_info(timeout, State = #q{variable_queue_state = VQS}) ->
- noreply(
- run_message_queue(
- State#q{variable_queue_state =
- rabbit_variable_queue:tx_commit_from_vq(VQS)}));
+ {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS),
+ State1 = State#q{variable_queue_state = VQS1},
+ noreply(case RunQueue of
+ true -> run_message_queue(State1);
+ false -> State1
+ end);
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8f813fb49c..9d33cc7ce5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -245,7 +245,7 @@
-spec(tx_commit_from_msg_store/5 ::
(boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) ->
{boolean(), vqstate()}).
--spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()).
+-spec(tx_commit_from_vq/1 :: (vqstate()) -> {boolean(), vqstate()}).
-spec(needs_sync/1 :: (vqstate()) -> boolean()).
-spec(flush_journal/1 :: (vqstate()) -> vqstate()).
-spec(status/1 :: (vqstate()) -> [{atom(), any()}]).
@@ -616,16 +616,17 @@ tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State =
lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags),
case PersistentStore == ?TRANSIENT_MSG_STORE orelse
(IsTransientPubs andalso [] == DiskAcks) of
- true -> State1 = tx_commit_from_vq(State #vqstate {
- on_sync = {[], [Pubs], [From]} }),
- {true, State1 #vqstate { on_sync = OnSync }};
+ true -> {Res, State1} =
+ tx_commit_from_vq(State #vqstate {
+ on_sync = {[], [Pubs], [From]} }),
+ {Res, State1 #vqstate { on_sync = OnSync }};
false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks],
[Pubs | SPubs],
[From | SFroms] }}}
end.
tx_commit_from_vq(State = #vqstate { on_sync = {_, _, []} }) ->
- State;
+ {false, State};
tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
persistent_store = PersistentStore }) ->
Acks = lists:flatten(SAcks),
@@ -637,6 +638,7 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
SeqIdsAcc
end, [], Acks),
IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore,
+ Pubs = lists:flatten(lists:reverse(SPubs)),
{SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
@@ -646,11 +648,12 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
end, StateN1}
- end, {AckSeqIds, State1}, lists:flatten(lists:reverse(SPubs))),
+ end, {AckSeqIds, State1}, Pubs),
IndexState1 =
rabbit_queue_index:sync_seq_ids(SeqIds, IndexState),
[ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ],
- State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
+ {Pubs /= [],
+ State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}.
needs_sync(#vqstate { on_sync = {_, _, []} }) ->
false;