summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2019-01-21 15:11:33 +0100
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2019-02-01 11:23:16 +0100
commitd142bbc45a4d0f8482b6a98d1f16a725cdf8d8a8 (patch)
tree2a6b71edd4f0a638e773d594b0be004c5ca5199d /src
parent7341d336304c7d5903052ca4d2bd8ad70708abf7 (diff)
downloadrabbitmq-server-git-d142bbc45a4d0f8482b6a98d1f16a725cdf8d8a8.tar.gz
Restore `mandatory_received` message handling
The message is also sent from `rabbit_amqqueue_process` and `rabbit_mirror_queue_slave` to avoid a leak of mandatory references in 3.7.x channels. For 3.8.x channels, this will be a no-op. This is just to make the node compatible with a 3.7.x cluster. This message is unused in 3.8.x and marked as deprecated. [#163222515,#159298729]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--src/rabbit_mirror_queue_slave.erl19
3 files changed, 40 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3644f2cf90..dd05b98e81 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -627,6 +627,18 @@ send_or_record_confirm(#delivery{confirm = true,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
+%% This feature was used by `rabbit_amqqueue_process` and
+%% `rabbit_mirror_queue_slave` up-to and including RabbitMQ 3.7.x. It is
+%% unused in 3.8.x and thus deprecated. We keep it to support in-place
+%% upgrades to 3.8.x (i.e. mixed-version clusters), but it is a no-op
+%% starting with that version.
+send_mandatory(#delivery{mandatory = false}) ->
+ ok;
+send_mandatory(#delivery{mandatory = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
+
discard(#delivery{confirm = Confirm,
sender = SenderPid,
flow = Flow,
@@ -690,6 +702,7 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
State = #q{overflow = Overflow,
backing_queue = BQ,
backing_queue_state = BQS}) ->
+ send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2491316304..3f199a4a00 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -643,6 +643,16 @@ handle_cast({force_event_refresh, Ref}, State) ->
Ref),
noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer));
+handle_cast({mandatory_received, _MsgSeqNo}, State) ->
+ %% This feature was used by `rabbit_amqqueue_process` and
+ %% `rabbit_mirror_queue_slave` up-to and including RabbitMQ 3.7.x.
+ %% It is unused in 3.8.x and thus deprecated. We keep it to support
+ %% in-place upgrades to 3.8.x (i.e. mixed-version clusters), but it
+ %% is a no-op starting with that version.
+ %%
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ noreply_coalesce(State);
+
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
%% It does not matter which queue rejected the message,
%% if any queue rejected it - it should not be confirmed.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 637d49035e..3697051960 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -562,6 +562,18 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
+%% This feature was used by `rabbit_amqqueue_process` and
+%% `rabbit_mirror_queue_slave` up-to and including RabbitMQ 3.7.x. It is
+%% unused in 3.8.x and thus deprecated. We keep it to support in-place
+%% upgrades to 3.8.x (i.e. mixed-version clusters), but it is a no-op
+%% starting with that version.
+send_mandatory(#delivery{mandatory = false}) ->
+ ok;
+send_mandatory(#delivery{mandatory = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
+
send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) ->
MS;
send_or_record_confirm(published, #delivery { sender = ChPid,
@@ -721,11 +733,13 @@ promote_me(From, #state { q = Q0,
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
-%% We need to send an ack for these messages since the channel is waiting
+%% We reset mandatory to false here because we will have sent the
+%% mandatory_received already as soon as we got the message. We also
+%% need to send an ack for these messages since the channel is waiting
%% for one for the via-GM case and we will not now receive one.
promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
maybe_flow_ack(Sender, Flow),
- Delivery.
+ Delivery#delivery{mandatory = false}.
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -844,6 +858,7 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case maps:find(MsgId, MS) of