summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-17 12:53:18 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-17 12:53:18 +0100
commit2e57b7e5812fe6932263354a07410cd8956f28e4 (patch)
tree202ea6818415c63ef3957ed028f0404868b6d4a3 /src
parentcaefab720fc4bffae44f057438ea91debda63393 (diff)
downloadrabbitmq-server-git-2e57b7e5812fe6932263354a07410cd8956f28e4.tar.gz
persistent messages are ack'd only after the consumer acks it
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl39
-rw-r--r--src/rabbit_variable_queue.erl1
2 files changed, 25 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0dbd7f1778..f3fce61a03 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -346,12 +346,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- % PubAck after message delivered to consumer (disregard consumer acks)
- State2 = confirm_message(Message#basic_message.guid, State1),
- ChAckTags1 = case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
- false -> ChAckTags
- end,
+ {State2, ChAckTags1} =
+ case AckRequired of
+ true -> {State1, sets:add_element(AckTag, ChAckTags)};
+ false ->
+ {confirm_message_internal(Message#basic_message.guid,
+ State1), ChAckTags}
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
store_ch_record(NewC),
@@ -401,7 +402,7 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
-confirm_message(Guid, State = #q{guid_to_channel = GTC}) ->
+confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) ->
case dict:find(Guid, GTC) of
{ok, {ChPid, MsgSeqNo}} ->
rabbit_channel:confirm(ChPid, MsgSeqNo),
@@ -698,14 +699,16 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case BQ:fetch(AckRequired, BQS) of
{empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
- % PubAck after message got
- State2 = confirm_message(Message#basic_message.guid, State1),
- case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- store_ch_record(
- C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
- false -> ok
- end,
+ State2 = case AckRequired of
+ true ->
+ C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ store_ch_record(
+ C#cr{acktags = sets:add_element(AckTag, ChAckTags)}),
+ State1;
+ false ->
+ confirm_message_internal(Message#basic_message.guid,
+ State1)
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1})
end;
@@ -816,6 +819,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+
handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State),
@@ -838,6 +842,11 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State#q{backing_queue_state = BQS1})
end;
+handle_cast({confirm_messages, Guids}, State) ->
+ noreply(lists:foldl(fun (Guid, State0) ->
+ confirm_message_internal(Guid, State0)
+ end, State, Guids));
+
handle_cast({reject, AckTags, Requeue, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ca459665d4..e4020b6069 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1137,6 +1137,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
+ gen_server2:cast(self(), {confirm_messages, Guids}),
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of