summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-01-10 17:09:13 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-01-10 17:09:13 +0000
commit844fb5a360d43066fd511c34008e10df3e990a90 (patch)
tree8f73d4af14b9cdcb89b5e56f13bd07d819f33bb1
parent27e4b5bdded5f3ec43f9599c9798afac9b3e01ff (diff)
downloadrabbitmq-server-git-844fb5a360d43066fd511c34008e10df3e990a90.tar.gz
cosmetic
-rw-r--r--src/rabbit_channel.erl47
1 files changed, 18 insertions, 29 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4b166e28ff..5702625915 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -468,37 +468,29 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
do_if_unconfirmed([MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)],
QPid, State).
-%% clears references to MsgSeqNo and does internal_flush_confirms
-do_if_unconfirmed(MsgSeqNos, undefined,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
+do_if_unconfirmed(MsgSeqNos, undefined, State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM}) ->
MS = gb_sets:from_list(MsgSeqNos),
- Unconfirmed1 = gb_sets:difference(UC, MS),
- QFM1 = dict:filter(fun(M, _Q) ->
- not(gb_sets:is_element(M, MS))
- end, QFM),
- internal_flush_confirms(State#ch{unconfirmed = Unconfirmed1,
- queues_for_msg = QFM1}, MsgSeqNos);
+ QFM1 = dict:filter(fun(M, _Q) -> not(gb_sets:is_element(M, MS)) end, QFM),
+ flush_confirms(State#ch{unconfirmed = gb_sets:difference(UC, MS),
+ queues_for_msg = QFM1}, MsgSeqNos);
do_if_unconfirmed(MsgSeqNos, QPid, State) ->
{DoneMessages, State1} =
lists:foldl(
- fun(MsgSeqNo,
- {DMs, State0 = #ch{unconfirmed = UC0,
- queues_for_msg = QFM0}}) ->
+ fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0,
+ queues_for_msg = QFM0}}) ->
{ok, Qs} = dict:find(MsgSeqNo, QFM0),
Qs1 = sets:del_element(QPid, Qs),
case sets:size(Qs1) of
0 -> {[MsgSeqNo | DMs],
State0#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM0),
- unconfirmed =
- gb_sets:delete(MsgSeqNo, UC0)}};
+ queues_for_msg = dict:erase(MsgSeqNo, QFM0),
+ unconfirmed = gb_sets:delete(MsgSeqNo, UC0)}};
_ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
{DMs, State0#ch{queues_for_msg = QFM1}}
end
end, {[], State}, MsgSeqNos),
- internal_flush_confirms(State1, DoneMessages).
+ flush_confirms(State1, DoneMessages).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1231,10 +1223,9 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-internal_flush_confirms(State, []) ->
+flush_confirms(State, []) ->
State;
-internal_flush_confirms(State = #ch{writer_pid = WriterPid,
- unconfirmed = UC}, Cs) ->
+flush_confirms(State = #ch{writer_pid = WriterPid, unconfirmed = UC}, Cs) ->
SCs = lists:usort(Cs),
CutOff = case gb_sets:is_empty(UC) of
true -> lists:last(SCs) + 1;
@@ -1244,15 +1235,13 @@ internal_flush_confirms(State = #ch{writer_pid = WriterPid,
case Ms of
[] -> ok;
_ -> ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = lists:last(Ms),
- multiple = true})
+ WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
+ multiple = true})
end,
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Ss),
+ ok = lists:foldl(fun(T, ok) ->
+ rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = T})
+ end, ok, Ss),
State.
terminate(_State) ->