diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 78 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 247 |
2 files changed, 142 insertions, 183 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 62ac6992a9..7723d7229e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -397,7 +397,7 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), {{Message, IsDelivered, AckTag}, 0 == Remaining, - State #q { backing_queue_state = BQS1 }}. + State#q{backing_queue_state = BQS1}}. confirm_messages(Guids, State) when is_list(Guids) -> lists:foldl(fun(Guid, State0) -> @@ -410,23 +410,21 @@ confirm_message(Guid, State = #q{guid_to_channel = GTC}) -> {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); _ -> ok end, - State #q { guid_to_channel = dict:erase(Guid, GTC) }. + State#q{guid_to_channel = dict:erase(Guid, GTC)}. -record_confirm_message(#delivery{msg_seq_no = undefined }, State) -> +record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> State; -record_confirm_message(#delivery{sender = ChPid, - message = #basic_message{guid = Guid}, - msg_seq_no = MsgSeqNo}, +record_confirm_message(#delivery{msg_seq_no = MsgSeqNo, + sender = ChPid, + message = #basic_message{guid = Guid}}, State = #q{guid_to_channel = GTC}) -> - State #q { guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC) }. + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}. ack_by_acktags(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> AckdGuids = BQ:seqids_to_guids(AckTags, BQS), - BQS1 = BQ:ack(AckTags, BQS), - confirm_messages( - AckdGuids, - State #q { backing_queue_state = BQS1 }). + confirm_messages(AckdGuids, + State#q{backing_queue_state = BQ:ack(AckTags, BQS)}). run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, @@ -460,15 +458,11 @@ attempt_delivery(#delivery{txn = Txn, deliver_or_enqueue(Delivery = #delivery{message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ}) -> - State1 = record_confirm_message(Delivery, State), - case attempt_delivery(Delivery, State1) of - {true, NewState} -> - {true, NewState}; - {false, NewState} -> - %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, MsgSeqNo =/= undefined, - State1 #q.backing_queue_state), - {false, NewState#q{backing_queue_state = BQS}} + case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of + {true, NewState} -> {true, NewState}; + {false, NewState} -> BQS = BQ:publish(Message, MsgSeqNo =/= undefined, + NewState#q.backing_queue_state), + {false, NewState#q{backing_queue_state = BQS}} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -566,13 +560,12 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - case Fun(BQS) of - {BQS1, {confirm, Guids}} -> - run_message_queue( - confirm_messages(Guids, State #q { backing_queue_state = BQS1 })); - BQS1 -> - run_message_queue(State#q{backing_queue_state = BQS1}) - end. + {BQS2, State1} = + case Fun(BQS) of + {BQS1, {confirm, Guids}} -> {BQS1, confirm_messages(Guids, State)}; + BQS1 -> {BQS1, State} + end, + run_message_queue(State1#q{backing_queue_state = BQS2}). commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -720,15 +713,15 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, State1} = attempt_delivery(Delivery, State), - State2 = confirm_message(Delivery#delivery.message#basic_message.guid, - record_confirm_message(Delivery, State1)), - reply(Delivered, State2); + {Delivered, NewState} = attempt_delivery(Delivery, State), + reply(Delivered, + confirm_message(Delivery#delivery.message#basic_message.guid, + record_confirm_message(Delivery, NewState))); handle_call({deliver, Delivery}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, State1} = deliver_or_enqueue(Delivery, State), - reply(Delivered, State1); + {Delivered, NewState} = deliver_or_enqueue(Delivery, State), + reply(Delivered, NewState); handle_call({commit, Txn, ChPid}, From, State) -> NewState = commit_transaction(Txn, From, ChPid, State), @@ -885,8 +878,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, State1} = deliver_or_enqueue(Delivery, State), - noreply(State1); + {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), + noreply(NewState); handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -896,14 +889,13 @@ handle_cast({ack, Txn, AckTags, ChPid}, C = #cr{acktags = ChAckTags} -> {C1, State1} = case Txn of - none -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - NewC = C#cr{acktags = ChAckTags1}, - NewState = ack_by_acktags(AckTags, State), - {NewC, NewState}; - _ -> - {C#cr{txn = Txn}, - State #q { backing_queue_state = BQ:tx_ack(Txn, AckTags, BQS) }} + none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), + NewC = C#cr{acktags = ChAckTags1}, + NewState = ack_by_acktags(AckTags, State), + {NewC, NewState}; + _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + {C#cr{txn = Txn}, + State#q{backing_queue_state = BQS1}} end, store_ch_record(C1), noreply(State1) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 20a554101c..722eff370e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -165,7 +165,6 @@ flush_multiple_acks(Pid) -> confirm(Pid, MsgSeqNo) -> gen_server2:cast(Pid, {confirm, MsgSeqNo}). - %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -173,30 +172,30 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), - State = #ch{ state = starting, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - start_limiter_fun = StartLimiterFun, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid, - stats_timer = StatsTimer, - confirm_enabled = false, - published_count = 0, - confirm_multiple = false, - held_confirms = gb_sets:new(), - unconfirmed = gb_sets:new(), - qpid_to_msgs = dict:new() }, + State = #ch{state = starting, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + start_limiter_fun = StartLimiterFun, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + stats_timer = StatsTimer, + confirm_enabled = false, + published_count = 0, + confirm_multiple = false, + held_confirms = gb_sets:new(), + unconfirmed = gb_sets:new(), + qpid_to_msgs = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -304,18 +303,19 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1)}. -handle_pre_hibernate(State = #ch{writer_pid = WriterPid, - held_confirms = As, - stats_timer = StatsTimer, - unconfirmed = UC}) -> +handle_pre_hibernate(State = #ch{writer_pid = WriterPid, + held_confirms = As, + stats_timer = StatsTimer, + unconfirmed = UC}) -> ok = clear_permission_cache(), flush_multiple(WriterPid, As, UC), rabbit_event:if_enabled(StatsTimer, fun() -> - internal_emit_stats(State) + internal_emit_stats(State) end), + StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{held_confirms = gb_sets:new(), - stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - confirm_tref = undefined}}. + stats_timer = StatsTimer1, + confirm_tref = undefined}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -456,52 +456,39 @@ send_or_enqueue_ack(undefined, State) -> send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> State; send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) -> - do_if_not_dup( - MsgSeqNo, State, - fun(MSN, State1 = #ch{writer_pid = WriterPid, qpid_to_msgs = QTM}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = MSN}), - QTM1 = dict:map(fun (_, Msgs) -> - gb_sets:delete_any(MsgSeqNo, Msgs) - end, QTM), - State1#ch{qpid_to_msgs = QTM1} - end); + do_if_not_dup(MsgSeqNo, State, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + State1 + end); send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> - do_if_not_dup( - MsgSeqNo, State, - fun(MSN, State1 = #ch{qpid_to_msgs = QTM}) -> - QTM1 = dict:map(fun (_, Msgs) -> - gb_sets:delete_any(MsgSeqNo, Msgs) - end, QTM), - start_ack_timer( - State1#ch{held_confirms = - gb_sets:add(MSN, State1#ch.held_confirms), - qpid_to_msgs = QTM1}) - end). + do_if_not_dup(MsgSeqNo, State, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_ack_timer(State1#ch{held_confirms = + gb_sets:add(MSN, As)}) + end). msg_sent_to_queue(undefined, _QPid, State) -> State; msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> - case dict:find(QPid, QTM) of - {ok, Msgs} -> - State#ch{ - qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs), QTM)}; - error -> - erlang:monitor(process, QPid), - State#ch{ - qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, gb_sets:new()), QTM)} - end. + Msgs1 = case dict:find(QPid, QTM) of + {ok, Msgs} -> Msgs; + error -> erlang:monitor(process, QPid), + gb_sets:new() + end, + State#ch{qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1))}. do_if_not_dup(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) -> case gb_sets:is_element(MsgSeqNo, UC) of - true -> - State1 = Fun(MsgSeqNo, State), - State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)}; - false -> - State + true -> QTM = dict:map(fun (_, Msgs) -> + gb_sets:delete_any(MsgSeqNo, Msgs) + end, State#ch.qpid_to_msgs), + State1 = Fun(MsgSeqNo, State#ch{qpid_to_msgs = QTM}), + State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)}; + false -> State end. - handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -526,7 +513,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - writer_pid = WriterPid, confirm_enabled = ConfirmEnabled}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), @@ -537,14 +523,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, IsPersistent = is_message_persistent(DecodedContent), {MsgSeqNo, State1} = case ConfirmEnabled of - false -> - {undefined, State}; - true -> - Count = State#ch.published_count, - {Count, - State#ch{published_count = Count + 1, - unconfirmed = - gb_sets:add(Count, State#ch.unconfirmed) }} + false -> {undefined, State}; + true -> Count = State#ch.published_count, + {Count, + State#ch{published_count = Count + 1, + unconfirmed = + gb_sets:add(Count, State#ch.unconfirmed)}} end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -559,25 +543,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> MsgSeqNo; false -> undefined end)), - State2 = case RoutingRes of - %% Confirm transient messages now - routed -> - case {IsPersistent, DeliveredQPids} of - {_, []} -> send_or_enqueue_ack(MsgSeqNo, State1); - {true, _} -> - lists:foldl(fun (QPid, State0) -> - msg_sent_to_queue(MsgSeqNo, QPid, State0) - end, State1, DeliveredQPids); - {false, _} -> send_or_enqueue_ack(MsgSeqNo, State1) - end; - %% Confirm after basic.returns - unroutable -> - ok = basic_return(Message, WriterPid, no_route), - send_or_enqueue_ack(MsgSeqNo, State1); - not_delivered -> - ok = basic_return(Message, WriterPid, no_consumers), - send_or_enqueue_ack(MsgSeqNo, State1) - end, + State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent, + MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State2), @@ -1002,30 +969,18 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) precondition_failed, "cannot switch from tx to confirm mode", []); handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, - State = #ch{confirm_enabled = false}) -> - State1 = State#ch{confirm_enabled = true, - confirm_multiple = Multiple}, - case NoWait of - true -> {noreply, State1}; - false -> {reply, #'confirm.select_ok'{}, State1} - end; + _, State = #ch{confirm_enabled = false}) -> + return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, + NoWait, #'confirm.select_ok'{}); handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, - _, - State = #ch{confirm_enabled = true, - confirm_multiple = Multiple}) -> - rabbit_log:info("got a confirm.select with same options~n"), - case NoWait of - true -> {noreply, State}; - false -> {reply, #'confirm.select_ok'{}, State} - end; + _, State = #ch{confirm_enabled = true, + confirm_multiple = Multiple}) -> + return_ok(State, NoWait, #'confirm.select_ok'{}); -handle_method(#'confirm.select'{}, - _, - #ch{confirm_enabled = true}) -> +handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> rabbit_misc:protocol_error( - precondition_failed, "cannot change confirm channel multiple setting", []); + precondition_failed, "cannot change confirm_multiple setting", []); handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> @@ -1249,6 +1204,21 @@ is_message_persistent(Content) -> IsPersistent end. +process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_route), + send_or_enqueue_ack(MsgSeqNo, State); +process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_consumers), + send_or_enqueue_ack(MsgSeqNo, State); +process_routing_result(routed, [], _, MsgSeqNo, _, State) -> + send_or_enqueue_ack(MsgSeqNo, State); +process_routing_result(routed, _, false, MsgSeqNo, _, State) -> + send_or_enqueue_ack(MsgSeqNo, State); +process_routing_result(routed, QPids, true, MsgSeqNo, _, State) -> + lists:foldl(fun (QPid, State0) -> + msg_sent_to_queue(MsgSeqNo, QPid, State0) + end, State, QPids). + lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> @@ -1370,33 +1340,30 @@ stop_ack_timer(State = #ch{confirm_tref = TRef}) -> flush_multiple(WriterPid, As, NA) -> case gb_sets:is_empty(As) of true -> ok; - false -> SmallestNotAcked = case gb_sets:is_empty(NA) of - false -> gb_sets:smallest(NA); - true -> gb_sets:largest(As)+1 - end, - [First | Rest] = gb_sets:to_list(As), - Remaining = - case Rest of - [] -> [First]; - _ -> flush_multiple(First, Rest, WriterPid, SmallestNotAcked) - end, - [rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = A}) - || A <- Remaining] -end. - + false -> [First | Rest] = gb_sets:to_list(As), + [rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = A}) || + A <- case Rest of + [] -> [First]; + _ -> flush_multiple( + First, Rest, WriterPid, + case gb_sets:is_empty(NA) of + false -> gb_sets:smallest(NA); + true -> gb_sets:largest(As) + 1 + end) + end], + ok + end. flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> - ExpNext = Prev+1, + ExpNext = Prev + 1, case {SNA >= Cur, Cur} of - {true, ExpNext} -> - flush_multiple(Cur, Rest, WriterPid, SNA); - _ -> - flush_multiple(Prev, [], WriterPid, SNA), - [Cur | Rest] + {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA); + _ -> flush_multiple(Prev, [], WriterPid, SNA), + [Cur | Rest] end; flush_multiple(Prev, [], WriterPid, _) -> - ok = rabbit_writer:send_command( - WriterPid, - #'basic.ack'{delivery_tag = Prev, - multiple = true}), + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = Prev, + multiple = true}), []. |
