diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 |
3 files changed, 26 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index eb3b13cc36..81e260bdda 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -494,7 +494,9 @@ attempt_delivery(#delivery{txn = none, end, case BQ:validate_message(Message, BQS) of {invalid, BQS1} -> - {invalid, NeedsConfirming, State#q{backing_queue_state = BQS1}}; + %% if the message is invalid, we pretend it was delivered + %% fine + {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}; {valid, BQS1} -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -516,7 +518,7 @@ attempt_delivery(#delivery{txn = none, {Delivered, State2} = deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State#q{backing_queue_state = BQS1}), - {{valid, Delivered}, NeedsConfirming, State2} + {Delivered, NeedsConfirming, State2} end; attempt_delivery(#delivery{txn = Txn, sender = ChPid, @@ -525,22 +527,19 @@ attempt_delivery(#delivery{txn = Txn, backing_queue_state = BQS}}) -> case BQ:validate_message(Message, BQS) of {invalid, BQS1} -> - {invalid, NeedsConfirming, State#q{backing_queue_state = BQS1}}; + {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}; {valid, BQS1} -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, BQS1), - {{valid, true}, NeedsConfirming, - State#q{backing_queue_state = BQS2}} + {true, NeedsConfirming, State#q{backing_queue_state = BQS2}} end. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {invalid, _, State1} -> + {true, _, State1} -> State1; - {{valid, true}, _, State1} -> - State1; - {{valid, false}, NeedsConfirming, + {false, NeedsConfirming, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, @@ -878,12 +877,9 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Valid, _NeedsConfirming, State1} = + {Delivered, _NeedsConfirming, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(case Valid of - valid -> true; - invalid -> false - end, State1); + reply(Delivered, State1); handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" delivery mode. Reply asap. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index dd2357bb48..704e62c10f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3]). + status/1, invoke/3, validate_message/2]). -export([start/1, stop/0]). @@ -113,7 +113,6 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, - backing_queue = BQ, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -252,7 +251,7 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ, validate_message(Message = #basic_message { id = MsgId }, State = #state { seen_status = SS, backing_queue = BQ, - backing_queue_state = BSQ }) -> + backing_queue_state = BQS }) -> %% Here, we need to deal with the possibility that we're about to %% receive a message that we've already seen when we were a slave %% (we received it via gm). Thus if we do receive such message now @@ -266,14 +265,15 @@ validate_message(Message = #basic_message { id = MsgId }, %% only if we ourselves are not filtering out the msg. {Result, BQS1} = BQ:validate_message(Message, BQS), {Result, State #state { backing_queue_state = BQS1 }}; - {ok, {published, ChPid}} -> + {ok, {published, _ChPid}} -> %% It already got published when we were a slave and no %% confirmation is waiting. amqqueue_process will have, in %% its msg_id_to_channel mapping, the entry for dealing %% with the confirm when that comes back in. The msg is - %% invalid. We will not see this again, so erase. + %% invalid. We will not see this again, nor will we be + %% further involved in confirming this message, so erase. {invalid, State #state { seen_status = dict:erase(MsgId, SS) }}; - {ok, {confirmed, ChPid}} -> + {ok, {confirmed, _ChPid}} -> %% It got confirmed before we became master, but we've %% only just received the publish from the channel, so %% couldn't previously know what the msg_seq_no was. Thus diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 87ce31d8df..68dd50e21c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -128,13 +128,17 @@ handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> %% get promoted then at that point we have no consumers, thus %% 'false' is precisely the correct answer. However, we must be %% careful to _not_ enqueue the message in this case. + + %% Note this is distinct from the case where we receive the msg + %% via gm first, then we're promoted to master, and only then do + %% we receive the msg from the channel. gen_server2:reply(From, false), %% master may deliver it, not us - noreply(maybe_enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, false, State)); handle_call({deliver, Delivery = #delivery {}}, From, State) -> %% Synchronous, "mandatory" delivery mode gen_server2:reply(From, true), %% amqqueue throws away the result anyway - noreply(maybe_enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, true, State)); handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, @@ -170,7 +174,7 @@ handle_cast({gm, Instruction}, State) -> handle_cast({deliver, Delivery = #delivery {}}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - noreply(maybe_enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, true, State)); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -438,6 +442,7 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, msg_seq_no = MsgSeqNo, sender = ChPid }, + EnqueueOnPromotion, State = #state { sender_queues = SQ, msg_id_status = MS }) -> %% We will never see {published, ChPid, MsgSeqNo} here. @@ -447,7 +452,8 @@ maybe_enqueue_message( {ok, MQ1} -> MQ1; error -> queue:new() end, - SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ), + SQ1 = dict:store(ChPid, + queue:in({Delivery, EnqueueOnPromotion}, MQ), SQ), State #state { sender_queues = SQ1 }; {ok, {confirmed, ChPid}} -> %% BQ has confirmed it but we didn't know what the |
