summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl52
1 files changed, 25 insertions, 27 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a62fcc4f1d..579f109a49 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -465,13 +465,11 @@ confirm([], _QPid, State) ->
confirm(_MsgSeqNos, _QPid, State = #ch{confirm_enabled = false}) ->
State;
confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- MsgSeqNos1 = lists:filter(fun(MSN) -> gb_sets:is_element(MSN, UC) end,
- MsgSeqNos),
- do_if_unconfirmed(MsgSeqNos1, QPid,
- fun internal_flush_confirms/2, State).
+ do_if_unconfirmed([MSN || MSN <- MsgSeqNos, gb_sets:is_element(MSN, UC)],
+ QPid, State).
-%% clears references to MsgSeqNo and does ConfirmFun
-do_if_unconfirmed(MsgSeqNos, undefined, ConfirmFun,
+%% clears references to MsgSeqNo and does internal_flush_confirms
+do_if_unconfirmed(MsgSeqNos, undefined,
State = #ch{unconfirmed = UC,
queues_for_msg = QFM}) ->
MS = gb_sets:from_list(MsgSeqNos),
@@ -479,28 +477,28 @@ do_if_unconfirmed(MsgSeqNos, undefined, ConfirmFun,
QFM1 = dict:filter(fun(M, _Q) ->
not(gb_sets:is_element(M, MS))
end, QFM),
- ConfirmFun(State#ch{unconfirmed = Unconfirmed1,
- queues_for_msg = QFM1}, MsgSeqNos);
-do_if_unconfirmed(MsgSeqNos, QPid, ConfirmFun, State) ->
+ internal_flush_confirms(State#ch{unconfirmed = Unconfirmed1,
+ queues_for_msg = QFM1}, MsgSeqNos);
+do_if_unconfirmed(MsgSeqNos, QPid, State) ->
{DoneMessages, State1} =
- lists:foldl(fun(MsgSeqNo,
- {DMs, State0 = #ch{unconfirmed = UC0,
- queues_for_msg = QFM0}}) ->
- {ok, Qs} = dict:find(MsgSeqNo, QFM0),
- Qs1 = sets:del_element(QPid, Qs),
- case sets:size(Qs1) of
- 0 -> {[MsgSeqNo | DMs],
- State0#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM0),
- unconfirmed =
- gb_sets:delete(MsgSeqNo, UC0)}};
- _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
- {DMs, State0#ch{queues_for_msg = QFM1}}
- end
- end, {[], State}, MsgSeqNos),
- ConfirmFun(State1, DoneMessages).
-
+ lists:foldl(
+ fun(MsgSeqNo,
+ {DMs, State0 = #ch{unconfirmed = UC0,
+ queues_for_msg = QFM0}}) ->
+ {ok, Qs} = dict:find(MsgSeqNo, QFM0),
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | DMs],
+ State0#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM0),
+ unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC0)}};
+ _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
+ {DMs, State0#ch{queues_for_msg = QFM1}}
+ end
+ end, {[], State}, MsgSeqNos),
+ internal_flush_confirms(State1, DoneMessages).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};