diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-21 20:23:18 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-21 20:23:18 +0000 |
| commit | 3b698d4fa132036bbc4ac1a4e045d994458b5683 (patch) | |
| tree | 97be4320f590810ee3c3acbde169aae58b8c4786 /src | |
| parent | 5b283b20ef93224c2562440ecf53e985f180d1a3 (diff) | |
| download | rabbitmq-server-git-3b698d4fa132036bbc4ac1a4e045d994458b5683.tar.gz | |
move another confirm into VQ
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 15 |
3 files changed, 13 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 960bfd9763..89f357a42d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -378,8 +378,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, case AckRequired of true -> {State1, sets:add_element(AckTag, ChAckTags)}; - false -> {confirm_message(Message, State1), - ChAckTags} + false -> {State1, ChAckTags} + %%{confirm_message(Message, State1), + %% ChAckTags} end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -432,7 +433,6 @@ confirm_messages(Guids, State) -> confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> case dict:find(Guid, GTC) of - {ok, {_ , undefined}} -> ok; {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); _ -> ok end, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f7fc291ff1..7de12fd110 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -876,8 +876,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, true -> file_handle_cache:sync(CurHdl) end, lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - %% [client_confirm(CRef, Guids, true, State1) - %% || {CRef, Guids} <- CGs], + [client_confirm(CRef, Guids, true, State1) + || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 465b401c7d..c30be37cfd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -521,7 +521,9 @@ publish(Msg, MsgProps, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> +publish_delivered(false, #basic_message { guid = Guid }, + _MsgProps, State = #vqstate { len = 0 }) -> + blind_confirm(self(), gb_sets:singleton(Guid)), {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, @@ -1394,14 +1396,15 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msgs_confirmed(GuidSet, State) -> {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. -msgs_written_to_disk(QPid, GuidSet, false) -> +blind_confirm(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun (State) -> msgs_confirmed(GuidSet, State) - end); + end). + +msgs_written_to_disk(QPid, GuidSet, false) -> + blind_confirm(QPid, GuidSet); msgs_written_to_disk(QPid, GuidSet, true) -> - %%io:format("variable queue notified of msgs written to disk: ~p~n", - %% [gb_sets:size(GuidSet)]), rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, @@ -1414,8 +1417,6 @@ msgs_written_to_disk(QPid, GuidSet, true) -> end). msg_indices_written_to_disk(QPid, GuidSet) -> - %%io:format("variable queue notified of msg idx written to disk: ~p~n", - %% [gb_sets:size(GuidSet)]), rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, |
