diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-29 08:30:48 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-29 08:30:48 +0000 |
| commit | 7511ecd0b5dd9ecbc350daed964c19f7b7663744 (patch) | |
| tree | 738da8d9faea3b77d83975f72d6911245ec1a3eb /src | |
| parent | 5fd28c283b04b119f2fac23a28616dd80301ae4e (diff) | |
| download | rabbitmq-server-git-7511ecd0b5dd9ecbc350daed964c19f7b7663744.tar.gz | |
confirm sequences of messages in channel
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 135 |
1 files changed, 69 insertions, 66 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b2b0c4a4f8..a62fcc4f1d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -94,7 +94,7 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). +-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -133,8 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -confirm(Pid, MsgSeqNo) -> - gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). +confirm(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). list() -> pg_local:get_members(rabbit_channels). @@ -283,8 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, confirm(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNos, From}, State) -> + {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM}) -> @@ -292,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> confirm(Msg, QPid, State0); + 0 -> confirm([Msg], QPid, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -460,42 +460,47 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -confirm(undefined, _QPid, State) -> +confirm([], _QPid, State) -> State; -confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) -> State; -confirm(MsgSeqNo, QPid, State) -> - do_if_unconfirmed(MsgSeqNo, QPid, - fun(State0) -> - internal_flush_confirms(State0, gb_sets:singleton(MsgSeqNo)) - end, State). - -do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + MsgSeqNos1 = lists:filter(fun(MSN) -> gb_sets:is_element(MSN, UC) end, + MsgSeqNos), + do_if_unconfirmed(MsgSeqNos1, QPid, + fun internal_flush_confirms/2, State). + +%% clears references to MsgSeqNo and does ConfirmFun +do_if_unconfirmed(MsgSeqNos, undefined, ConfirmFun, State = #ch{unconfirmed = UC, queues_for_msg = QFM}) -> - %% clears references to MsgSeqNo and does ConfirmFun - case gb_sets:is_element(MsgSeqNo, UC) of - true -> - Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), - case QPid of - undefined -> - ConfirmFun(State#ch{unconfirmed = Unconfirmed1}); - _ -> - {ok, Qs} = dict:find(MsgSeqNo, QFM), - Qs1 = sets:del_element(QPid, Qs), - case sets:size(Qs1) of - 0 -> ConfirmFun( - State#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM), - unconfirmed = Unconfirmed1}); - _ -> State#ch{queues_for_msg = - dict:store(MsgSeqNo, Qs1, QFM)} - end - end; - false -> - State - end. + MS = gb_sets:from_list(MsgSeqNos), + Unconfirmed1 = gb_sets:difference(UC, MS), + QFM1 = dict:filter(fun(M, _Q) -> + not(gb_sets:is_element(M, MS)) + end, QFM), + ConfirmFun(State#ch{unconfirmed = Unconfirmed1, + queues_for_msg = QFM1}, MsgSeqNos); +do_if_unconfirmed(MsgSeqNos, QPid, ConfirmFun, State) -> + {DoneMessages, State1} = + lists:foldl(fun(MsgSeqNo, + {DMs, State0 = #ch{unconfirmed = UC0, + queues_for_msg = QFM0}}) -> + {ok, Qs} = dict:find(MsgSeqNo, QFM0), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | DMs], + State0#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM0), + unconfirmed = + gb_sets:delete(MsgSeqNo, UC0)}}; + _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), + {DMs, State0#ch{queues_for_msg = QFM1}} + end + end, {[], State}, MsgSeqNos), + ConfirmFun(State1, DoneMessages). + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1209,12 +1214,12 @@ is_message_persistent(Content) -> process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(routed, [], MsgSeqNo, _, State) -> - confirm(MsgSeqNo, undefined, State); + confirm([MsgSeqNo], undefined, State); process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, @@ -1228,33 +1233,31 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +internal_flush_confirms(State, []) -> + State; internal_flush_confirms(State = #ch{writer_pid = WriterPid, unconfirmed = UC}, Cs) -> - case gb_sets:is_empty(Cs) of - true -> State; - false -> [First | Rest] = gb_sets:to_list(Cs), - LUC = case gb_sets:size(UC) of - 0 -> gb_sets:largest(Cs) + 1; - _ -> gb_sets:smallest(UC) - end, - Is = case First < LUC of - true -> {Mult, Inds} = - find_consecutive_sequence(LUC, First, - Rest), - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Mult, - multiple = true}), - Inds; - _ -> [First | Rest] - end, - ok = lists:foldl( - fun(T, ok) -> rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = T}) - end, ok, Is), - State - end. + [First | Rest] = lists:usort(Cs), + LUC = case gb_sets:size(UC) of + 0 -> lists:last(Cs) + 1; + _ -> gb_sets:smallest(UC) + end, + Is = case First < LUC of + true -> {Mult, Inds} = + find_consecutive_sequence(LUC, First, Rest), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Mult, + multiple = true}), + Inds; + _ -> [First | Rest] + end, + ok = lists:foldl( + fun(T, ok) -> rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = T}) + end, ok, Is), + State. %% Find longest sequence of consecutive numbers at the beginning with %% no elements exceeding limit. |
