diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-08 13:02:49 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-08 13:02:49 +0100 |
| commit | b8105271c1fe59997851cc7a7fb60b0e0677700d (patch) | |
| tree | 03e9ff5ca8b23cd8913bb0750b3ca0d8fdc5abd7 /src | |
| parent | a5d175f5b4f5930739917ca418ce26be9d369ddd (diff) | |
| download | rabbitmq-server-git-b8105271c1fe59997851cc7a7fb60b0e0677700d.tar.gz | |
Unify the APIs of the various commit callbacks. Prevents running the queue when we are doing ack-only txns
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 17 |
2 files changed, 22 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fa445c3ae5..b92de66743 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -905,10 +905,12 @@ handle_cast({tx_commit_msg_store_callback, IsTransientPubs, Pubs, AckTags, From} end); handle_cast(tx_commit_vq_callback, State = #q{variable_queue_state = VQS}) -> - noreply( - run_message_queue( - State#q{variable_queue_state = - rabbit_variable_queue:tx_commit_from_vq(VQS)})); + {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS), + State1 = State#q{variable_queue_state = VQS1}, + noreply(case RunQueue of + true -> run_message_queue(State1); + false -> State1 + end); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -970,10 +972,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> end; handle_info(timeout, State = #q{variable_queue_state = VQS}) -> - noreply( - run_message_queue( - State#q{variable_queue_state = - rabbit_variable_queue:tx_commit_from_vq(VQS)})); + {RunQueue, VQS1} = rabbit_variable_queue:tx_commit_from_vq(VQS), + State1 = State#q{variable_queue_state = VQS1}, + noreply(case RunQueue of + true -> run_message_queue(State1); + false -> State1 + end); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8f813fb49c..9d33cc7ce5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -245,7 +245,7 @@ -spec(tx_commit_from_msg_store/5 :: (boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) -> {boolean(), vqstate()}). --spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()). +-spec(tx_commit_from_vq/1 :: (vqstate()) -> {boolean(), vqstate()}). -spec(needs_sync/1 :: (vqstate()) -> boolean()). -spec(flush_journal/1 :: (vqstate()) -> vqstate()). -spec(status/1 :: (vqstate()) -> [{atom(), any()}]). @@ -616,16 +616,17 @@ tx_commit_from_msg_store(IsTransientPubs, Pubs, AckTags, From, State = lists:filter(fun (AckTag) -> AckTag /= ack_not_on_disk end, AckTags), case PersistentStore == ?TRANSIENT_MSG_STORE orelse (IsTransientPubs andalso [] == DiskAcks) of - true -> State1 = tx_commit_from_vq(State #vqstate { - on_sync = {[], [Pubs], [From]} }), - {true, State1 #vqstate { on_sync = OnSync }}; + true -> {Res, State1} = + tx_commit_from_vq(State #vqstate { + on_sync = {[], [Pubs], [From]} }), + {Res, State1 #vqstate { on_sync = OnSync }}; false -> {false, State #vqstate { on_sync = { [DiskAcks | SAcks], [Pubs | SPubs], [From | SFroms] }}} end. tx_commit_from_vq(State = #vqstate { on_sync = {_, _, []} }) -> - State; + {false, State}; tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, persistent_store = PersistentStore }) -> Acks = lists:flatten(SAcks), @@ -637,6 +638,7 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, SeqIdsAcc end, [], Acks), IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore, + Pubs = lists:flatten(lists:reverse(SPubs)), {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, @@ -646,11 +648,12 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, StateN1} - end, {AckSeqIds, State1}, lists:flatten(lists:reverse(SPubs))), + end, {AckSeqIds, State1}, Pubs), IndexState1 = rabbit_queue_index:sync_seq_ids(SeqIds, IndexState), [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], - State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. + {Pubs /= [], + State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }}. needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; |
