summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl26
-rw-r--r--src/rabbit_router.erl18
2 files changed, 17 insertions, 27 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7a8e1a3157..1f8455cc25 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -287,13 +287,7 @@ handle_cast(flush_multiple_acks,
confirm_tref = undefined}};
handle_cast({confirm, MsgSeqNo}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, State)};
-
-handle_cast({msg_sent_to_queues, MsgSeqNo, QPids}, State) ->
- {noreply, lists:foldl(fun (QPid, State0) ->
- msg_sent_to_queues(MsgSeqNo, QPid, State0)
- end, State, QPids)}.
-
+ {noreply, send_or_enqueue_ack(MsgSeqNo, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{qpid_to_msgs = QTM}) ->
@@ -484,7 +478,9 @@ send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
qpid_to_msgs = QTM1})
end).
-msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
+msg_sent_to_queue(undefined, _QPid, State) ->
+ State;
+msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
case dict:find(QPid, QTM) of
{ok, Msgs} ->
State#ch{
@@ -527,9 +523,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
- Content, State = #ch{virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ Content, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid,
+ confirm_enabled = ConfirmEnabled}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -538,7 +535,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1}
- = case State#ch.confirm_enabled of
+ = case ConfirmEnabled of
false ->
{undefined, State};
true ->
@@ -566,7 +563,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routed ->
case {IsPersistent, DeliveredQPids} of
{_, []} -> send_or_enqueue_ack(MsgSeqNo, State1);
- {true, _} -> State1;
+ {true, _} ->
+ lists:foldl(fun (QPid, State0) ->
+ msg_sent_to_queue(MsgSeqNo, QPid, State0)
+ end, State1, DeliveredQPids);
{false, _} -> send_or_enqueue_ack(MsgSeqNo, State1)
end;
%% Confirm after basic.returns
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index e5ffe863a9..707698b099 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -58,8 +58,7 @@
%%----------------------------------------------------------------------------
deliver(QPids, Delivery = #delivery{mandatory = false,
- immediate = false,
- msg_seq_no = MsgSeqNo}) ->
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver will deliver the message to the queue
%% process asynchronously, and return true, which means all the
@@ -69,10 +68,9 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% case below.
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
- maybe_inform_channel(MsgSeqNo, QPids),
{routed, QPids};
-deliver(QPids, Delivery = #delivery{msg_seq_no = MsgSeqNo}) ->
+deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
fun (Pid) ->
@@ -82,11 +80,8 @@ deliver(QPids, Delivery = #delivery{msg_seq_no = MsgSeqNo}) ->
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
case check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
{Routed, Handled}) of
- {routed, Qs} ->
- maybe_inform_channel(MsgSeqNo, Qs),
- {routed, Qs};
- O ->
- O
+ {routed, Qs} -> {routed, Qs};
+ O -> O
end.
%% TODO: Maybe this should be handled by a cursor instead.
@@ -125,8 +120,3 @@ fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
check_delivery(true, _ , {false, []}) -> {unroutable, []};
check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
-
-maybe_inform_channel(undefined, _) ->
- ok;
-maybe_inform_channel(MsgSeqNo, QPids) ->
- gen_server2:cast(self(), {msg_sent_to_queues, MsgSeqNo, QPids}).