summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-01-15 12:01:27 +0000
committerkjnilsson <knilsson@pivotal.io>2019-01-15 12:01:27 +0000
commit2593ee9e9f4f8ff1e0821f04f89df5ea46ce420d (patch)
tree270508fe6f66a1fd956a69e0cb291b8790cc51fb /src
parent45632289309e80a38348ed936961330a4ae0763b (diff)
downloadrabbitmq-server-git-2593ee9e9f4f8ff1e0821f04f89df5ea46ce420d.tar.gz
Make mandatory based on route data only
Instead of waiting for a mandatory_received message from the queue the mandatory result is calculated in the channel based on the routing result only. This may seem like a weakening of the mandatory semantics but considering that the mandatory_received message is returned _before_ the message is enqueued and/or persisted in the queue it doesn't actually open up any further failure scenarios. [#163222515]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_channel.erl46
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
3 files changed, 18 insertions, 54 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a4a044f1e4..caa4dfe093 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -15,8 +15,8 @@
%%
-module(rabbit_amqqueue_process).
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
-behaviour(gen_server2).
@@ -604,13 +604,6 @@ send_or_record_confirm(#delivery{confirm = true,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-send_mandatory(#delivery{mandatory = false}) ->
- ok;
-send_mandatory(#delivery{mandatory = true,
- sender = SenderPid,
- msg_seq_no = MsgSeqNo}) ->
- gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
-
discard(#delivery{confirm = Confirm,
sender = SenderPid,
flow = Flow,
@@ -674,7 +667,6 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
State = #q{overflow = Overflow,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 805d9f538f..634789adab 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -34,8 +34,6 @@
%% * Keeping track of consumers
%% * Keeping track of unacknowledged deliveries to consumers
%% * Keeping track of publisher confirms
-%% * Keeping track of mandatory message routing confirmations
-%% and returns
%% * Transaction management
%% * Authorisation (enforcing permissions)
%% * Publishing trace events if tracing is enabled
@@ -143,9 +141,6 @@
%% a list of tags for published messages that were
%% rejected but are yet to be sent to the client
rejected,
- %% a dtree used to track oustanding notifications
- %% for messages published as mandatory
- mandatory,
%% same as capabilities in the reader
capabilities,
%% tracing exchange resource if tracing is enabled,
@@ -469,7 +464,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
unconfirmed = dtree:empty(),
rejected = [],
confirmed = [],
- mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = Prefetch,
@@ -502,7 +496,6 @@ prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
{reject_publish, _MsgSeqNos, _QPid} -> 5;
- {mandatory_received, _MsgSeqNo, _QPid} -> 5;
_ -> 0
end.
@@ -637,10 +630,6 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
|| {ConsumerTag, CreditDrained} <- CTagCredit],
noreply(State);
-handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
- %% NB: don't call noreply/1 since we don't want to send confirms.
- noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
-
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
%% It does not matter which queue rejected the message,
%% if any queue rejected it - it should not be confirmed.
@@ -1707,17 +1696,13 @@ track_delivering_queue(NoAck, QPid, QName,
false -> sets:add_element(QRef, DQ)
end}.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
- mandatory = Mand})
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC})
when ?IS_CLASSIC(QPid) ->
- {MMsgs, Mand1} = dtree:take(QPid, Mand),
- [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
- State1 = State#ch{mandatory = Mand1},
case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- record_rejects(MXs, State1#ch{unconfirmed = UC1});
+ record_rejects(MXs, State#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State1#ch{unconfirmed = UC1})
+ record_confirms(MXs, State#ch{unconfirmed = UC1})
end;
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
@@ -1972,7 +1957,7 @@ foreach_per_queue(F, UAL, Acc) ->
consumer_queue_refs(Consumers) ->
lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
- <- maps:to_list(Consumers)]).
+ <- maps:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
@@ -2038,11 +2023,11 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
queue_monitors = QMons1},
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
- State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo,
- Message, State1),
- State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
- XName, State2),
- case rabbit_event:stats_level(State3, #ch.stats_timer) of
+ ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs,
+ Message, State1),
+ State2 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
+ XName, State1),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
@@ -2051,16 +2036,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
_ ->
ok
end,
- State3#ch{queue_states = QueueStates}.
+ State2#ch{queue_states = QueueStates}.
-process_routing_mandatory(false, _, _, _, State) ->
- State;
-process_routing_mandatory(true, [], _, Msg, State) ->
+process_routing_mandatory(true, [], Msg, State) ->
ok = basic_return(Msg, State, no_route),
- State;
-process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) ->
- State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg,
- State#ch.mandatory)}.
+ ok;
+process_routing_mandatory(_, _, _, _) ->
+ ok.
process_routing_confirm(false, _, _, _, State) ->
State;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 61f27a4a83..bf0a5a7ed0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -542,13 +542,6 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-send_mandatory(#delivery{mandatory = false}) ->
- ok;
-send_mandatory(#delivery{mandatory = true,
- sender = SenderPid,
- msg_seq_no = MsgSeqNo}) ->
- gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
-
send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) ->
MS;
send_or_record_confirm(published, #delivery { sender = ChPid,
@@ -707,13 +700,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
-%% We reset mandatory to false here because we will have sent the
-%% mandatory_received already as soon as we got the message. We also
-%% need to send an ack for these messages since the channel is waiting
+%% We need to send an ack for these messages since the channel is waiting
%% for one for the via-GM case and we will not now receive one.
promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
maybe_flow_ack(Sender, Flow),
- Delivery#delivery{mandatory = false}.
+ Delivery.
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -832,7 +823,6 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
- send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case maps:find(MsgId, MS) of