summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-29 08:30:48 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-29 08:30:48 +0000
commit7511ecd0b5dd9ecbc350daed964c19f7b7663744 (patch)
tree738da8d9faea3b77d83975f72d6911245ec1a3eb /src
parent5fd28c283b04b119f2fac23a28616dd80301ae4e (diff)
downloadrabbitmq-server-git-7511ecd0b5dd9ecbc350daed964c19f7b7663744.tar.gz
confirm sequences of messages in channel
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl135
1 files changed, 69 insertions, 66 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b2b0c4a4f8..a62fcc4f1d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -94,7 +94,7 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
+-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -133,8 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
+confirm(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
list() ->
pg_local:get_members(rabbit_channels).
@@ -283,8 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, confirm(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNos, From}, State) ->
+ {noreply, confirm(MsgSeqNos, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -292,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> confirm(Msg, QPid, State0);
+ 0 -> confirm([Msg], QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -460,42 +460,47 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-confirm(undefined, _QPid, State) ->
+confirm([], _QPid, State) ->
State;
-confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) ->
State;
-confirm(MsgSeqNo, QPid, State) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(State0) ->
- internal_flush_confirms(State0, gb_sets:singleton(MsgSeqNo))
- end, State).
-
-do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ MsgSeqNos1 = lists:filter(fun(MSN) -> gb_sets:is_element(MSN, UC) end,
+ MsgSeqNos),
+ do_if_unconfirmed(MsgSeqNos1, QPid,
+ fun internal_flush_confirms/2, State).
+
+%% clears references to MsgSeqNo and does ConfirmFun
+do_if_unconfirmed(MsgSeqNos, undefined, ConfirmFun,
State = #ch{unconfirmed = UC,
queues_for_msg = QFM}) ->
- %% clears references to MsgSeqNo and does ConfirmFun
- case gb_sets:is_element(MsgSeqNo, UC) of
- true ->
- Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
- case QPid of
- undefined ->
- ConfirmFun(State#ch{unconfirmed = Unconfirmed1});
- _ ->
- {ok, Qs} = dict:find(MsgSeqNo, QFM),
- Qs1 = sets:del_element(QPid, Qs),
- case sets:size(Qs1) of
- 0 -> ConfirmFun(
- State#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM),
- unconfirmed = Unconfirmed1});
- _ -> State#ch{queues_for_msg =
- dict:store(MsgSeqNo, Qs1, QFM)}
- end
- end;
- false ->
- State
- end.
+ MS = gb_sets:from_list(MsgSeqNos),
+ Unconfirmed1 = gb_sets:difference(UC, MS),
+ QFM1 = dict:filter(fun(M, _Q) ->
+ not(gb_sets:is_element(M, MS))
+ end, QFM),
+ ConfirmFun(State#ch{unconfirmed = Unconfirmed1,
+ queues_for_msg = QFM1}, MsgSeqNos);
+do_if_unconfirmed(MsgSeqNos, QPid, ConfirmFun, State) ->
+ {DoneMessages, State1} =
+ lists:foldl(fun(MsgSeqNo,
+ {DMs, State0 = #ch{unconfirmed = UC0,
+ queues_for_msg = QFM0}}) ->
+ {ok, Qs} = dict:find(MsgSeqNo, QFM0),
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | DMs],
+ State0#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM0),
+ unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC0)}};
+ _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
+ {DMs, State0#ch{queues_for_msg = QFM1}}
+ end
+ end, {[], State}, MsgSeqNos),
+ ConfirmFun(State1, DoneMessages).
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1209,12 +1214,12 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- confirm(MsgSeqNo, undefined, State);
+ confirm([MsgSeqNo], undefined, State);
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _,
@@ -1228,33 +1233,31 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
+internal_flush_confirms(State, []) ->
+ State;
internal_flush_confirms(State = #ch{writer_pid = WriterPid,
unconfirmed = UC}, Cs) ->
- case gb_sets:is_empty(Cs) of
- true -> State;
- false -> [First | Rest] = gb_sets:to_list(Cs),
- LUC = case gb_sets:size(UC) of
- 0 -> gb_sets:largest(Cs) + 1;
- _ -> gb_sets:smallest(UC)
- end,
- Is = case First < LUC of
- true -> {Mult, Inds} =
- find_consecutive_sequence(LUC, First,
- Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult,
- multiple = true}),
- Inds;
- _ -> [First | Rest]
- end,
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Is),
- State
- end.
+ [First | Rest] = lists:usort(Cs),
+ LUC = case gb_sets:size(UC) of
+ 0 -> lists:last(Cs) + 1;
+ _ -> gb_sets:smallest(UC)
+ end,
+ Is = case First < LUC of
+ true -> {Mult, Inds} =
+ find_consecutive_sequence(LUC, First, Rest),
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = Mult,
+ multiple = true}),
+ Inds;
+ _ -> [First | Rest]
+ end,
+ ok = lists:foldl(
+ fun(T, ok) -> rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = T})
+ end, ok, Is),
+ State.
%% Find longest sequence of consecutive numbers at the beginning with
%% no elements exceeding limit.