summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-21 20:23:18 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-21 20:23:18 +0000
commit3b698d4fa132036bbc4ac1a4e045d994458b5683 (patch)
tree97be4320f590810ee3c3acbde169aae58b8c4786 /src
parent5b283b20ef93224c2562440ecf53e985f180d1a3 (diff)
downloadrabbitmq-server-git-3b698d4fa132036bbc4ac1a4e045d994458b5683.tar.gz
move another confirm into VQ
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_msg_store.erl4
-rw-r--r--src/rabbit_variable_queue.erl15
3 files changed, 13 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 960bfd9763..89f357a42d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -378,8 +378,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
case AckRequired of
true -> {State1,
sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message(Message, State1),
- ChAckTags}
+ false -> {State1, ChAckTags}
+ %%{confirm_message(Message, State1),
+ %% ChAckTags}
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -432,7 +433,6 @@ confirm_messages(Guids, State) ->
confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
case dict:find(Guid, GTC) of
- {ok, {_ , undefined}} -> ok;
{ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
_ -> ok
end,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f7fc291ff1..7de12fd110 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -876,8 +876,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
true -> file_handle_cache:sync(CurHdl)
end,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- %% [client_confirm(CRef, Guids, true, State1)
- %% || {CRef, Guids} <- CGs],
+ [client_confirm(CRef, Guids, true, State1)
+ || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 465b401c7d..c30be37cfd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -521,7 +521,9 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid },
+ _MsgProps, State = #vqstate { len = 0 }) ->
+ blind_confirm(self(), gb_sets:singleton(Guid)),
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -1394,14 +1396,15 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msgs_confirmed(GuidSet, State) ->
{gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-msgs_written_to_disk(QPid, GuidSet, false) ->
+blind_confirm(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State) ->
msgs_confirmed(GuidSet, State)
- end);
+ end).
+
+msgs_written_to_disk(QPid, GuidSet, false) ->
+ blind_confirm(QPid, GuidSet);
msgs_written_to_disk(QPid, GuidSet, true) ->
- %%io:format("variable queue notified of msgs written to disk: ~p~n",
- %% [gb_sets:size(GuidSet)]),
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
@@ -1414,8 +1417,6 @@ msgs_written_to_disk(QPid, GuidSet, true) ->
end).
msg_indices_written_to_disk(QPid, GuidSet) ->
- %%io:format("variable queue notified of msg idx written to disk: ~p~n",
- %% [gb_sets:size(GuidSet)]),
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,