diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-27 11:53:57 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-27 11:53:57 +0100 |
| commit | 935110e2e43ca165c33d63f347db532cfdb36c5b (patch) | |
| tree | 14565410c011ec0a4c4fad5d5a6e10e6d797f51d | |
| parent | 140f0a5028a9366951f1ab8149aeaed0941445fb (diff) | |
| download | rabbitmq-server-git-935110e2e43ca165c33d63f347db532cfdb36c5b.tar.gz | |
better separation between amqqueue_process and bq
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
3 files changed, 20 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index bc2ffd173c..b8b0cf8d1b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -870,11 +870,11 @@ handle_cast({ack, Txn, AckTags, ChPid}, none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - {NewBQS, {confirm, AckdGuids}} = BQ:ack(AckTags, BQS), - NewState = - confirm_messages_internal(AckdGuids, - State #q { backing_queue_state = - NewBQS }), + AckdGuids = BQ:seqids_to_guids(AckTags, BQS), + NewBQS = BQ:ack(AckTags, BQS), + NewState = confirm_messages_internal( + AckdGuids, + State #q { backing_queue_state = NewBQS }), {NewC, NewState}; _ -> {C#cr{txn = Txn}, @@ -894,7 +894,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> {BQS1, {confirm, AckdGuids}} = BQ:ack(AckTags, BQS), + false -> AckdGuids = BQ:seqids_to_guids(AckTags, BQS), + BQS1 = BQ:ack(AckTags, BQS), confirm_messages_internal( AckdGuids, State #q { backing_queue_state = BQS1 }) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 3cf2ec4fcb..9190456254 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -638,9 +638,8 @@ handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> - reply(ok, - State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), - cref_to_guids = dict:erase(CRef, CTG) }). + reply(ok, State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), + cref_to_guids = dict:erase(CRef, CTG) }). handle_cast({write, CRef, Guid}, State = #msstate { current_file_handle = CurHdl, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3c0e14a056..efdf34f236 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -37,7 +37,7 @@ requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1]). + status/1, seqids_to_guids/2]). -export([start/1, stop/0]). @@ -766,14 +766,16 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, {avg_egress_rate , AvgEgressRate}, {avg_ingress_rate , AvgIngressRate} ]. +seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) -> + lists:foldl(fun(SeqId, Guids) -> + {ok, #msg_status { msg = Msg }} = dict:find(SeqId, PA), + [Msg#basic_message.guid | Guids] + end, [], SeqIds). + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- -a({State, _} = I) -> - a(State), - I; - a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, persistent_count = PersistentCount, @@ -1140,7 +1142,7 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - {State, {confirm, []}}; + State; ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = @@ -1155,16 +1157,13 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ok = orddict:fold(fun (MsgStore, Guids, ok) -> MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), - AckdGuids = lists:append([Guids || - {_Store, Guids} <- orddict:to_list(GuidsByStore)]), - State2 = msgs_confirmed(AckdGuids, State1), + State2 = msgs_confirmed(seqids_to_guids(AckTags, State), State1), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) end, - {State2 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }, - {confirm, AckdGuids}}. + State2 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, @@ -1272,9 +1271,6 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> end end. -reduce_memory_use({State, Other}) -> - {reduce_memory_use(State), Other}; - reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, |
