diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2019-01-21 15:11:33 +0100 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2019-02-01 11:23:16 +0100 |
| commit | d142bbc45a4d0f8482b6a98d1f16a725cdf8d8a8 (patch) | |
| tree | 2a6b71edd4f0a638e773d594b0be004c5ca5199d /src | |
| parent | 7341d336304c7d5903052ca4d2bd8ad70708abf7 (diff) | |
| download | rabbitmq-server-git-d142bbc45a4d0f8482b6a98d1f16a725cdf8d8a8.tar.gz | |
Restore `mandatory_received` message handling
The message is also sent from `rabbit_amqqueue_process` and
`rabbit_mirror_queue_slave` to avoid a leak of mandatory references in
3.7.x channels. For 3.8.x channels, this will be a no-op.
This is just to make the node compatible with a 3.7.x cluster. This
message is unused in 3.8.x and marked as deprecated.
[#163222515,#159298729]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 19 |
3 files changed, 40 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3644f2cf90..dd05b98e81 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -627,6 +627,18 @@ send_or_record_confirm(#delivery{confirm = true, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. +%% This feature was used by `rabbit_amqqueue_process` and +%% `rabbit_mirror_queue_slave` up-to and including RabbitMQ 3.7.x. It is +%% unused in 3.8.x and thus deprecated. We keep it to support in-place +%% upgrades to 3.8.x (i.e. mixed-version clusters), but it is a no-op +%% starting with that version. +send_mandatory(#delivery{mandatory = false}) -> + ok; +send_mandatory(#delivery{mandatory = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). + discard(#delivery{confirm = Confirm, sender = SenderPid, flow = Flow, @@ -690,6 +702,7 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, State = #q{overflow = Overflow, backing_queue = BQ, backing_queue_state = BQS}) -> + send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2491316304..3f199a4a00 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -643,6 +643,16 @@ handle_cast({force_event_refresh, Ref}, State) -> Ref), noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer)); +handle_cast({mandatory_received, _MsgSeqNo}, State) -> + %% This feature was used by `rabbit_amqqueue_process` and + %% `rabbit_mirror_queue_slave` up-to and including RabbitMQ 3.7.x. + %% It is unused in 3.8.x and thus deprecated. We keep it to support + %% in-place upgrades to 3.8.x (i.e. mixed-version clusters), but it + %% is a no-op starting with that version. + %% + %% NB: don't call noreply/1 since we don't want to send confirms. + noreply_coalesce(State); + handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) -> %% It does not matter which queue rejected the message, %% if any queue rejected it - it should not be confirmed. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 637d49035e..3697051960 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -562,6 +562,18 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. +%% This feature was used by `rabbit_amqqueue_process` and +%% `rabbit_mirror_queue_slave` up-to and including RabbitMQ 3.7.x. It is +%% unused in 3.8.x and thus deprecated. We keep it to support in-place +%% upgrades to 3.8.x (i.e. mixed-version clusters), but it is a no-op +%% starting with that version. +send_mandatory(#delivery{mandatory = false}) -> + ok; +send_mandatory(#delivery{mandatory = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). + send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) -> MS; send_or_record_confirm(published, #delivery { sender = ChPid, @@ -721,11 +733,13 @@ promote_me(From, #state { q = Q0, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). -%% We need to send an ack for these messages since the channel is waiting +%% We reset mandatory to false here because we will have sent the +%% mandatory_received already as soon as we got the message. We also +%% need to send an ack for these messages since the channel is waiting %% for one for the via-GM case and we will not now receive one. promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> maybe_flow_ack(Sender, Flow), - Delivery. + Delivery#delivery{mandatory = false}. noreply(State) -> {NewState, Timeout} = next_state(State), @@ -844,6 +858,7 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> + send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case maps:find(MsgId, MS) of |
