diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-01-15 12:01:27 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-01-15 12:01:27 +0000 |
| commit | 2593ee9e9f4f8ff1e0821f04f89df5ea46ce420d (patch) | |
| tree | 270508fe6f66a1fd956a69e0cb291b8790cc51fb /src | |
| parent | 45632289309e80a38348ed936961330a4ae0763b (diff) | |
| download | rabbitmq-server-git-2593ee9e9f4f8ff1e0821f04f89df5ea46ce420d.tar.gz | |
Make mandatory based on route data only
Instead of waiting for a mandatory_received message from the queue the
mandatory result is calculated in the channel based on the routing
result only. This may seem like a weakening of the mandatory semantics but
considering that the mandatory_received message is returned _before_ the
message is enqueued and/or persisted in the queue it doesn't actually
open up any further failure scenarios.
[#163222515]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 |
3 files changed, 18 insertions, 54 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a4a044f1e4..caa4dfe093 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -15,8 +15,8 @@ %% -module(rabbit_amqqueue_process). --include("rabbit.hrl"). --include("rabbit_framing.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). -behaviour(gen_server2). @@ -604,13 +604,6 @@ send_or_record_confirm(#delivery{confirm = true, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. -send_mandatory(#delivery{mandatory = false}) -> - ok; -send_mandatory(#delivery{mandatory = true, - sender = SenderPid, - msg_seq_no = MsgSeqNo}) -> - gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). - discard(#delivery{confirm = Confirm, sender = SenderPid, flow = Flow, @@ -674,7 +667,6 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, State = #q{overflow = Overflow, backing_queue = BQ, backing_queue_state = BQS}) -> - send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 805d9f538f..634789adab 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,8 +34,6 @@ %% * Keeping track of consumers %% * Keeping track of unacknowledged deliveries to consumers %% * Keeping track of publisher confirms -%% * Keeping track of mandatory message routing confirmations -%% and returns %% * Transaction management %% * Authorisation (enforcing permissions) %% * Publishing trace events if tracing is enabled @@ -143,9 +141,6 @@ %% a list of tags for published messages that were %% rejected but are yet to be sent to the client rejected, - %% a dtree used to track oustanding notifications - %% for messages published as mandatory - mandatory, %% same as capabilities in the reader capabilities, %% tracing exchange resource if tracing is enabled, @@ -469,7 +464,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, unconfirmed = dtree:empty(), rejected = [], confirmed = [], - mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), consumer_prefetch = Prefetch, @@ -502,7 +496,6 @@ prioritise_cast(Msg, _Len, _State) -> case Msg of {confirm, _MsgSeqNos, _QPid} -> 5; {reject_publish, _MsgSeqNos, _QPid} -> 5; - {mandatory_received, _MsgSeqNo, _QPid} -> 5; _ -> 0 end. @@ -637,10 +630,6 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> || {ConsumerTag, CreditDrained} <- CTagCredit], noreply(State); -handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> - %% NB: don't call noreply/1 since we don't want to send confirms. - noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)}); - handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) -> %% It does not matter which queue rejected the message, %% if any queue rejected it - it should not be confirmed. @@ -1707,17 +1696,13 @@ track_delivering_queue(NoAck, QPid, QName, false -> sets:add_element(QRef, DQ) end}. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, - mandatory = Mand}) +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) when ?IS_CLASSIC(QPid) -> - {MMsgs, Mand1} = dtree:take(QPid, Mand), - [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs], - State1 = State#ch{mandatory = Mand1}, case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), - record_rejects(MXs, State1#ch{unconfirmed = UC1}); + record_rejects(MXs, State#ch{unconfirmed = UC1}); false -> {MXs, UC1} = dtree:take(QPid, UC), - record_confirms(MXs, State1#ch{unconfirmed = UC1}) + record_confirms(MXs, State#ch{unconfirmed = UC1}) end; handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) -> @@ -1972,7 +1957,7 @@ foreach_per_queue(F, UAL, Acc) -> consumer_queue_refs(Consumers) -> lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}} - <- maps:to_list(Consumers)]). + <- maps:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for @@ -2038,11 +2023,11 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ queue_monitors = QMons1}, %% NB: the order here is important since basic.returns must be %% sent before confirms. - State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo, - Message, State1), - State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo, - XName, State2), - case rabbit_event:stats_level(State3, #ch.stats_timer) of + ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs, + Message, State1), + State2 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo, + XName, State1), + case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || @@ -2051,16 +2036,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ _ -> ok end, - State3#ch{queue_states = QueueStates}. + State2#ch{queue_states = QueueStates}. -process_routing_mandatory(false, _, _, _, State) -> - State; -process_routing_mandatory(true, [], _, Msg, State) -> +process_routing_mandatory(true, [], Msg, State) -> ok = basic_return(Msg, State, no_route), - State; -process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) -> - State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg, - State#ch.mandatory)}. + ok; +process_routing_mandatory(_, _, _, _) -> + ok. process_routing_confirm(false, _, _, _, State) -> State; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 61f27a4a83..bf0a5a7ed0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -542,13 +542,6 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -send_mandatory(#delivery{mandatory = false}) -> - ok; -send_mandatory(#delivery{mandatory = true, - sender = SenderPid, - msg_seq_no = MsgSeqNo}) -> - gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). - send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) -> MS; send_or_record_confirm(published, #delivery { sender = ChPid, @@ -707,13 +700,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). -%% We reset mandatory to false here because we will have sent the -%% mandatory_received already as soon as we got the message. We also -%% need to send an ack for these messages since the channel is waiting +%% We need to send an ack for these messages since the channel is waiting %% for one for the via-GM case and we will not now receive one. promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> maybe_flow_ack(Sender, Flow), - Delivery#delivery{mandatory = false}. + Delivery. noreply(State) -> {NewState, Timeout} = next_state(State), @@ -832,7 +823,6 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> - send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case maps:find(MsgId, MS) of |
