summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl24
-rw-r--r--src/rabbit_variable_queue.erl4
5 files changed, 33 insertions, 26 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index b85e4ad6ec..f5e441dc72 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -71,5 +71,5 @@
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
--spec(validate_message/2 :: (rabbit_types:basic_message(), state()) ->
- {'invalid' | 'valid', state()}).
+-spec(is_duplicate/2 :: (rabbit_types:basic_message(), state()) ->
+ {boolean(), state()}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9e54312f10..575d69f463 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -523,12 +523,17 @@ attempt_delivery(Delivery = #delivery{txn = none,
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
- case BQ:validate_message(Message, BQS) of
- {invalid, BQS1} ->
- %% if the message is invalid, we pretend it was delivered
- %% fine
+ case BQ:is_duplicate(Message, BQS) of
+ {true, BQS1} ->
+ %% if the message has previously been seen by the BQ then
+ %% it must have been seen under the same circumstances as
+ %% now: i.e. if it is now a deliver_immediately then it
+ %% must have been before. Consequently, if the BQ has seen
+ %% it before then it's safe to assume it's been delivered
+ %% (i.e. the only thing that cares about that is
+ %% deliver_immediately).
{true, Confirm, State#q{backing_queue_state = BQS1}};
- {valid, BQS1} ->
+ {false, BQS1} ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false,
@@ -555,10 +560,10 @@ attempt_delivery(Delivery = #delivery{txn = Txn,
message = Message},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
- case BQ:validate_message(Message, BQS) of
- {invalid, BQS1} ->
+ case BQ:is_duplicate(Message, BQS) of
+ {true, BQS1} ->
{true, Confirm, State#q{backing_queue_state = BQS1}};
- {valid, BQS1} ->
+ {false, BQS1} ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
BQS1),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 7087be9137..dfa5500e97 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -171,8 +171,10 @@ behaviour_info(callbacks) ->
%% components need to pass functions into the backing queue.
{invoke, 3},
- %% TODO: document me
- {validate_message, 2}
+ %% Called prior to a publish or publish_delivered call. Allows
+ %% the BQ to signal that it's already seen this message and thus
+ %% the message should be dropped.
+ {is_duplicate, 2}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0ca73f0349..42af4e51ec 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, validate_message/2]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -274,11 +274,11 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-validate_message(Message = #basic_message { id = MsgId },
- State = #state { seen_status = SS,
- backing_queue = BQ,
- backing_queue_state = BQS,
- confirmed = Confirmed }) ->
+is_duplicate(Message = #basic_message { id = MsgId },
+ State = #state { seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ confirmed = Confirmed }) ->
%% Here, we need to deal with the possibility that we're about to
%% receive a message that we've already seen when we were a slave
%% (we received it via gm). Thus if we do receive such message now
@@ -297,10 +297,10 @@ validate_message(Message = #basic_message { id = MsgId },
%% confirmation is waiting. amqqueue_process will have, in
%% its msg_id_to_channel mapping, the entry for dealing
%% with the confirm when that comes back in (it's added
- %% immediately prior to calling validate_message). The msg
- %% is invalid. We will not see this again, nor will we be
+ %% immediately after calling is_duplicate). The msg is
+ %% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {invalid, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {true, State #state { seen_status = dict:erase(MsgId, SS) }};
{ok, confirmed} ->
%% It got published when we were a slave via gm, and
%% confirmed some time after that (maybe even after
@@ -309,7 +309,7 @@ validate_message(Message = #basic_message { id = MsgId },
%% msg_seq_no was (and thus confirm as a slave). So we
%% need to confirm now. As above, amqqueue_process will
%% have the entry for the msg_id_to_channel mapping added
- %% immediately prior to calling validate_message/2.
- {invalid, State #state { seen_status = dict:erase(MsgId, SS),
- confirmed = [MsgId | Confirmed] }}
+ %% immediately after calling is_duplicate/2.
+ {true, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 486d30fd25..a8f9974adc 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, validate_message/2, multiple_routing_keys/0]).
+ status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -886,7 +886,7 @@ status(#vqstate {
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State).
-validate_message(_Msg, State) -> {valid, State}.
+is_duplicate(_Msg, State) -> {false, State}.
%%----------------------------------------------------------------------------
%% Minor helpers