diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 17 |
2 files changed, 22 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fa445c3ae5..b92de66743 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -905,10 +905,12 @@ handle_cast({tx_commit_msg_store_callback, IsTransientPubs, Pubs, AckTags, From} end); handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) -> - noreply( - run_message_queue( - State#q{variable_queue_state = - rabbit_variable_queue:tx_commit_from_vq(VQS)})); + {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS), + State1 = State#q{variable_queue_state = VQS1}, + noreply(case RunQueue of + true -> run_message_queue(State1); + false -> State1 + end); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -970,10 +972,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> end; handle_info(timeout, State = #q{variable_queue_state = VQS}) -> - noreply( - run_message_queue( - State#q{variable_queue_state = - rabbit_variable_queue:tx_commit_from_vq(VQS)})); + {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS), + State1 = State#q{variable_queue_state = VQS1}, + noreply(case RunQueue of + true -> run_message_queue(State1); + false -> State1 + end); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8f813fb49c..9d33cc7ce5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -245,7 +245,7 @@ -spec(tx_commit_from_msg_store/5 :: (boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) -> {boolean(), vqstate()}). --spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()). +-spec(tx_commit_from_vq/1 :: (vqstate()) -> {boolean(), vqstate()}). -spec(needs_sync/1 :: (vqstate()) -> boolean()). -spec(flush_journal/1 :: (vqstate()) -> vqstate()). -spec(status/1 :: (vqstate()) -> [{atom(), any()}]). @@ -616,16 +616,17 @@ tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State = lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags), case PersistentStore == ?TRANSIENT_MSG_STORE orelse (IsTransientPubs andalso [] == DiskAcks) of - true -> State1 = tx_commit_from_vq(State #vqstate { - on_sync = {[], [Pubs], [From]} }), - {true, State1 #vqstate { on_sync = OnSync }}; + true -> {Res, State1} = + tx_commit_from_vq(State #vqstate { + on_sync = {[], [Pubs], [From]} }), + {Res, State1 #vqstate { on_sync = OnSync }}; false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks], [Pubs | SPubs], [From | SFroms] }}} end. tx_commit_from_vq(State = #vqstate { on_sync = {_, _, []} }) -> - State; + {false, State}; tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, persistent_store = PersistentStore }) -> Acks = lists:flatten(SAcks), @@ -637,6 +638,7 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, SeqIdsAcc end, [], Acks), IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore, + Pubs = lists:flatten(lists:reverse(SPubs)), {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, @@ -646,11 +648,12 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, StateN1} - end, {AckSeqIds, State1}, lists:flatten(lists:reverse(SPubs))), + end, {AckSeqIds, State1}, Pubs), IndexState1 = rabbit_queue_index:sync_seq_ids(SeqIds, IndexState), [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], - State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. + {Pubs /= [], + State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}. needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; |
