diff options
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
5 files changed, 33 insertions, 26 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index b85e4ad6ec..f5e441dc72 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -71,5 +71,5 @@ -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). -spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). --spec(validate_message/2 :: (rabbit_types:basic_message(), state()) -> - {'invalid' | 'valid', state()}). +-spec(is_duplicate/2 :: (rabbit_types:basic_message(), state()) -> + {boolean(), state()}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9e54312f10..575d69f463 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -523,12 +523,17 @@ attempt_delivery(Delivery = #delivery{txn = none, immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - case BQ:validate_message(Message, BQS) of - {invalid, BQS1} -> - %% if the message is invalid, we pretend it was delivered - %% fine + case BQ:is_duplicate(Message, BQS) of + {true, BQS1} -> + %% if the message has previously been seen by the BQ then + %% it must have been seen under the same circumstances as + %% now: i.e. if it is now a deliver_immediately then it + %% must have been before. Consequently, if the BQ has seen + %% it before then it's safe to assume it's been delivered + %% (i.e. the only thing that cares about that is + %% deliver_immediately). {true, Confirm, State#q{backing_queue_state = BQS1}}; - {valid, BQS1} -> + {false, BQS1} -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, @@ -555,10 +560,10 @@ attempt_delivery(Delivery = #delivery{txn = Txn, message = Message}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), - case BQ:validate_message(Message, BQS) of - {invalid, BQS1} -> + case BQ:is_duplicate(Message, BQS) of + {true, BQS1} -> {true, Confirm, State#q{backing_queue_state = BQS1}}; - {valid, BQS1} -> + {false, BQS1} -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, BQS1), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 7087be9137..dfa5500e97 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -171,8 +171,10 @@ behaviour_info(callbacks) -> %% components need to pass functions into the backing queue. {invoke, 3}, - %% TODO: document me - {validate_message, 2} + %% Called prior to a publish or publish_delivered call. Allows + %% the BQ to signal that it's already seen this message and thus + %% the message should be dropped. + {is_duplicate, 2} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0ca73f0349..42af4e51ec 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, validate_message/2]). + status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -274,11 +274,11 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -validate_message(Message = #basic_message { id = MsgId }, - State = #state { seen_status = SS, - backing_queue = BQ, - backing_queue_state = BQS, - confirmed = Confirmed }) -> +is_duplicate(Message = #basic_message { id = MsgId }, + State = #state { seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS, + confirmed = Confirmed }) -> %% Here, we need to deal with the possibility that we're about to %% receive a message that we've already seen when we were a slave %% (we received it via gm). Thus if we do receive such message now @@ -297,10 +297,10 @@ validate_message(Message = #basic_message { id = MsgId }, %% confirmation is waiting. amqqueue_process will have, in %% its msg_id_to_channel mapping, the entry for dealing %% with the confirm when that comes back in (it's added - %% immediately prior to calling validate_message). The msg - %% is invalid. We will not see this again, nor will we be + %% immediately after calling is_duplicate). The msg is + %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {invalid, State #state { seen_status = dict:erase(MsgId, SS) }}; + {true, State #state { seen_status = dict:erase(MsgId, SS) }}; {ok, confirmed} -> %% It got published when we were a slave via gm, and %% confirmed some time after that (maybe even after @@ -309,7 +309,7 @@ validate_message(Message = #basic_message { id = MsgId }, %% msg_seq_no was (and thus confirm as a slave). So we %% need to confirm now. As above, amqqueue_process will %% have the entry for the msg_id_to_channel mapping added - %% immediately prior to calling validate_message/2. - {invalid, State #state { seen_status = dict:erase(MsgId, SS), - confirmed = [MsgId | Confirmed] }} + %% immediately after calling is_duplicate/2. + {true, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 486d30fd25..a8f9974adc 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, validate_message/2, multiple_routing_keys/0]). + status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -886,7 +886,7 @@ status(#vqstate { invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). -validate_message(_Msg, State) -> {valid, State}. +is_duplicate(_Msg, State) -> {false, State}. %%---------------------------------------------------------------------------- %% Minor helpers |
