summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_channel.erl61
-rw-r--r--src/rabbit_msg_store.erl1
-rw-r--r--src/rabbit_router.erl24
4 files changed, 69 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8bed40bef4..8979062b9d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -873,7 +873,6 @@ handle_cast({msgs_written_to_disk, Guids},
handle_cast({msg_indices_written_to_disk, Guids},
State = #q{msgs_on_disk = MOD,
msg_indices_on_disk = MIOD}) ->
- rabbit_log:info("Message indices written to disk: ~p~n", [Guids]),
GuidSet = gb_sets:from_list(Guids),
ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD),
gb_sets:fold(fun (Guid, State0) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 17d84533dd..76b962fb57 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm_enabled, published_count, confirm_multiple, confirm_tref,
- held_confirms, need_confirming}).
+ held_confirms, need_confirming, qpid_to_msgs}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -188,7 +188,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
published_count = 0,
confirm_multiple = false,
held_confirms = gb_sets:new(),
- need_confirming = gb_sets:new() },
+ need_confirming = gb_sets:new(),
+ qpid_to_msgs = dict:new() },
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -269,8 +270,14 @@ handle_cast(multiple_ack_flush,
end,
{noreply, State #ch { held_confirms = gb_sets:new(),
confirm_tref = undefined }};
+
handle_cast({confirm, MsgSeqNo}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, State)}.
+ {noreply, send_or_enqueue_ack(MsgSeqNo, State)};
+
+handle_cast({msg_sent_to_queues, MsgSeqNo, QPids}, State) ->
+ {noreply, lists:foldl(fun (QPid, State0) ->
+ msg_sent_to_queues(MsgSeqNo, QPid, State0)
+ end, State, QPids)}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
@@ -279,9 +286,18 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
- erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State)}.
+handle_info({'DOWN', _MRef, process, QPid, Reason},
+ State = #ch{qpid_to_msgs = QTM}) ->
+ case dict:find(QPid, QTM) of
+ {ok, Msgs} ->
+ S = gb_sets:fold(fun (MsgSeqNo, State0) ->
+ send_or_enqueue_ack(MsgSeqNo, State0)
+ end, State, Msgs),
+ {noreply, S #ch {qpid_to_msgs = dict:erase(QPid, QTM)}};
+ error ->
+ erase_queue_stats(QPid),
+ {noreply, queue_blocked(QPid, State)}
+ end.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -433,20 +449,42 @@ send_or_enqueue_ack(MsgSeqNo,
State = #ch{confirm_multiple = false}) ->
rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]),
do_if_not_dup(MsgSeqNo, State,
- fun(MSN, S = #ch{writer_pid = WriterPid}) ->
+ fun(MSN, S = #ch{writer_pid = WriterPid,
+ qpid_to_msgs = QTM}) ->
ok = rabbit_writer:send_command(
WriterPid, #'basic.ack'{delivery_tag = MSN}),
- S
+ S #ch { qpid_to_msgs =
+ dict:map(fun (_, Msgs) ->
+ gb_sets:delete_any(MsgSeqNo, Msgs)
+ end, QTM) }
end);
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]),
do_if_not_dup(MsgSeqNo, State,
- fun(MSN, S) ->
+ fun(MSN, S = #ch{qpid_to_msgs = QTM}) ->
State1 = start_ack_timer(S),
State1 #ch { held_confirms =
- gb_sets:add(MSN, State1#ch.held_confirms) }
+ gb_sets:add(MSN, State1#ch.held_confirms),
+ qpid_to_msgs =
+ dict:map(fun (_, Msgs) ->
+ gb_sets:delete_any(MsgSeqNo,
+ Msgs)
+ end, QTM) }
end).
+msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
+ case dict:find(QPid, QTM) of
+ {ok, Msgs} ->
+ State #ch {qpid_to_msgs = dict:store(QPid,
+ gb_sets:add(MsgSeqNo, Msgs),
+ QTM) };
+ error ->
+ erlang:monitor(process, QPid),
+ State #ch { qpid_to_msgs = dict:store(QPid,
+ gb_sets:add(MsgSeqNo, gb_sets:new()),
+ QTM) }
+ end.
+
do_if_not_dup(MsgSeqNo, State = #ch{need_confirming = NA}, Fun) ->
case gb_sets:is_element(MsgSeqNo, NA) of
true ->
@@ -489,7 +527,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
- %% PubAck transient messages immediately
{MsgSeqNo, State1}
= case State#ch.confirm_enabled of
false ->
@@ -1256,9 +1293,11 @@ maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
end.
incr_stats({QPid, _} = QX, Inc, Measure) ->
+ io:format("incr_stats for ~p~n", [QPid]),
maybe_monitor(QPid),
update_measures(queue_exchange_stats, QX, Inc, Measure);
incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ io:format("incr_stats for ~p~n", [QPid]),
maybe_monitor(QPid),
update_measures(queue_stats, QPid, Inc, Measure);
incr_stats(X, Inc, Measure) ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9e917fe599..8fe029392d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -746,7 +746,6 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #msstate { pid_to_fun = PTF,
pid_to_guids = PTG }) ->
% A queue with a callback has died, so remove it from dicts.
- rabbit_log:info("Queue ~p has gone down~n", [Pid]),
{noreply, State #msstate { pid_to_fun = dict:erase(Pid, PTF),
pid_to_guids = dict:erase(Pid, PTG) }};
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index a927ec647e..179c45bb41 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -55,7 +55,6 @@
deliver(QPids, Delivery = #delivery{mandatory = false,
immediate = false,
- message = Msg,
msg_seq_no = MsgSeqNo}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver will deliver the message to the queue
@@ -67,12 +66,16 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
case {QPids, MsgSeqNo} of
- {[], _} -> rabbit_channel:confirm(self(), MsgSeqNo);
+ {[], _} ->
+ %% No queues will get the message. This is fine, so we
+ %% just confirm it.
+ rabbit_channel:confirm(self(), MsgSeqNo);
_ -> ok
end,
+ maybe_inform_channel(MsgSeqNo, QPids),
{routed, QPids};
-deliver(QPids, Delivery) ->
+deliver(QPids, Delivery = #delivery{msg_seq_no = MsgSeqNo}) ->
{Success, _} =
delegate:invoke(QPids,
fun (Pid) ->
@@ -80,8 +83,14 @@ deliver(QPids, Delivery) ->
end),
{Routed, Handled} =
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, Handled}).
+ case check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
+ {Routed, Handled}) of
+ {routed, Qs} ->
+ maybe_inform_channel(MsgSeqNo, Qs),
+ {routed, Qs};
+ O ->
+ O
+ end.
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
@@ -119,3 +128,8 @@ fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
check_delivery(true, _ , {false, []}) -> {unroutable, []};
check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
+
+maybe_inform_channel(undefine, _) ->
+ ok;
+maybe_inform_channel(MsgSeqNo, QPids) ->
+ gen_server2:cast(self(), {msg_sent_to_queues, MsgSeqNo, QPids}).