diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 24 |
4 files changed, 69 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8bed40bef4..8979062b9d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -873,7 +873,6 @@ handle_cast({msgs_written_to_disk, Guids}, handle_cast({msg_indices_written_to_disk, Guids}, State = #q{msgs_on_disk = MOD, msg_indices_on_disk = MIOD}) -> - rabbit_log:info("Message indices written to disk: ~p~n", [Guids]), GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), gb_sets:fold(fun (Guid, State0) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 17d84533dd..76b962fb57 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, confirm_enabled, published_count, confirm_multiple, confirm_tref, - held_confirms, need_confirming}). + held_confirms, need_confirming, qpid_to_msgs}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -188,7 +188,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> published_count = 0, confirm_multiple = false, held_confirms = gb_sets:new(), - need_confirming = gb_sets:new() }, + need_confirming = gb_sets:new(), + qpid_to_msgs = dict:new() }, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -269,8 +270,14 @@ handle_cast(multiple_ack_flush, end, {noreply, State #ch { held_confirms = gb_sets:new(), confirm_tref = undefined }}; + handle_cast({confirm, MsgSeqNo}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, State)}. + {noreply, send_or_enqueue_ack(MsgSeqNo, State)}; + +handle_cast({msg_sent_to_queues, MsgSeqNo, QPids}, State) -> + {noreply, lists:foldl(fun (QPid, State0) -> + msg_sent_to_queues(MsgSeqNo, QPid, State0) + end, State, QPids)}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, @@ -279,9 +286,18 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> - erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State)}. +handle_info({'DOWN', _MRef, process, QPid, Reason}, + State = #ch{qpid_to_msgs = QTM}) -> + case dict:find(QPid, QTM) of + {ok, Msgs} -> + S = gb_sets:fold(fun (MsgSeqNo, State0) -> + send_or_enqueue_ack(MsgSeqNo, State0) + end, State, Msgs), + {noreply, S #ch {qpid_to_msgs = dict:erase(QPid, QTM)}}; + error -> + erase_queue_stats(QPid), + {noreply, queue_blocked(QPid, State)} + end. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -433,20 +449,42 @@ send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) -> rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]), do_if_not_dup(MsgSeqNo, State, - fun(MSN, S = #ch{writer_pid = WriterPid}) -> + fun(MSN, S = #ch{writer_pid = WriterPid, + qpid_to_msgs = QTM}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.ack'{delivery_tag = MSN}), - S + S #ch { qpid_to_msgs = + dict:map(fun (_, Msgs) -> + gb_sets:delete_any(MsgSeqNo, Msgs) + end, QTM) } end); send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]), do_if_not_dup(MsgSeqNo, State, - fun(MSN, S) -> + fun(MSN, S = #ch{qpid_to_msgs = QTM}) -> State1 = start_ack_timer(S), State1 #ch { held_confirms = - gb_sets:add(MSN, State1#ch.held_confirms) } + gb_sets:add(MSN, State1#ch.held_confirms), + qpid_to_msgs = + dict:map(fun (_, Msgs) -> + gb_sets:delete_any(MsgSeqNo, + Msgs) + end, QTM) } end). +msg_sent_to_queues(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> + case dict:find(QPid, QTM) of + {ok, Msgs} -> + State #ch {qpid_to_msgs = dict:store(QPid, + gb_sets:add(MsgSeqNo, Msgs), + QTM) }; + error -> + erlang:monitor(process, QPid), + State #ch { qpid_to_msgs = dict:store(QPid, + gb_sets:add(MsgSeqNo, gb_sets:new()), + QTM) } + end. + do_if_not_dup(MsgSeqNo, State = #ch{need_confirming = NA}, Fun) -> case gb_sets:is_element(MsgSeqNo, NA) of true -> @@ -489,7 +527,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - %% PubAck transient messages immediately {MsgSeqNo, State1} = case State#ch.confirm_enabled of false -> @@ -1256,9 +1293,11 @@ maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> end. incr_stats({QPid, _} = QX, Inc, Measure) -> + io:format("incr_stats for ~p~n", [QPid]), maybe_monitor(QPid), update_measures(queue_exchange_stats, QX, Inc, Measure); incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + io:format("incr_stats for ~p~n", [QPid]), maybe_monitor(QPid), update_measures(queue_stats, QPid, Inc, Measure); incr_stats(X, Inc, Measure) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9e917fe599..8fe029392d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -746,7 +746,6 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #msstate { pid_to_fun = PTF, pid_to_guids = PTG }) -> % A queue with a callback has died, so remove it from dicts. - rabbit_log:info("Queue ~p has gone down~n", [Pid]), {noreply, State #msstate { pid_to_fun = dict:erase(Pid, PTF), pid_to_guids = dict:erase(Pid, PTG) }}; diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index a927ec647e..179c45bb41 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -55,7 +55,6 @@ deliver(QPids, Delivery = #delivery{mandatory = false, immediate = false, - message = Msg, msg_seq_no = MsgSeqNo}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue @@ -67,12 +66,16 @@ deliver(QPids, Delivery = #delivery{mandatory = false, delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), case {QPids, MsgSeqNo} of - {[], _} -> rabbit_channel:confirm(self(), MsgSeqNo); + {[], _} -> + %% No queues will get the message. This is fine, so we + %% just confirm it. + rabbit_channel:confirm(self(), MsgSeqNo); _ -> ok end, + maybe_inform_channel(MsgSeqNo, QPids), {routed, QPids}; -deliver(QPids, Delivery) -> +deliver(QPids, Delivery = #delivery{msg_seq_no = MsgSeqNo}) -> {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -80,8 +83,14 @@ deliver(QPids, Delivery) -> end), {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, Handled}). + case check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + {Routed, Handled}) of + {routed, Qs} -> + maybe_inform_channel(MsgSeqNo, Qs), + {routed, Qs}; + O -> + O + end. %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange @@ -119,3 +128,8 @@ fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. check_delivery(true, _ , {false, []}) -> {unroutable, []}; check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. + +maybe_inform_channel(undefine, _) -> + ok; +maybe_inform_channel(MsgSeqNo, QPids) -> + gen_server2:cast(self(), {msg_sent_to_queues, MsgSeqNo, QPids}). |
