summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl24
-rw-r--r--src/rabbit_mirror_queue_master.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
3 files changed, 26 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index eb3b13cc36..81e260bdda 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -494,7 +494,9 @@ attempt_delivery(#delivery{txn = none,
end,
case BQ:validate_message(Message, BQS) of
{invalid, BQS1} ->
- {invalid, NeedsConfirming, State#q{backing_queue_state = BQS1}};
+ %% if the message is invalid, we pretend it was delivered
+ %% fine
+ {true, NeedsConfirming, State#q{backing_queue_state = BQS1}};
{valid, BQS1} ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -516,7 +518,7 @@ attempt_delivery(#delivery{txn = none,
{Delivered, State2} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
State#q{backing_queue_state = BQS1}),
- {{valid, Delivered}, NeedsConfirming, State2}
+ {Delivered, NeedsConfirming, State2}
end;
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
@@ -525,22 +527,19 @@ attempt_delivery(#delivery{txn = Txn,
backing_queue_state = BQS}}) ->
case BQ:validate_message(Message, BQS) of
{invalid, BQS1} ->
- {invalid, NeedsConfirming, State#q{backing_queue_state = BQS1}};
+ {true, NeedsConfirming, State#q{backing_queue_state = BQS1}};
{valid, BQS1} ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
BQS1),
- {{valid, true}, NeedsConfirming,
- State#q{backing_queue_state = BQS2}}
+ {true, NeedsConfirming, State#q{backing_queue_state = BQS2}}
end.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {invalid, _, State1} ->
+ {true, _, State1} ->
State1;
- {{valid, true}, _, State1} ->
- State1;
- {{valid, false}, NeedsConfirming,
+ {false, NeedsConfirming,
State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
#delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
@@ -878,12 +877,9 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Valid, _NeedsConfirming, State1} =
+ {Delivered, _NeedsConfirming, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(case Valid of
- valid -> true;
- invalid -> false
- end, State1);
+ reply(Delivered, State1);
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index dd2357bb48..704e62c10f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3]).
+ status/1, invoke/3, validate_message/2]).
-export([start/1, stop/0]).
@@ -113,7 +113,6 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
- backing_queue = BQ,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -252,7 +251,7 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ,
validate_message(Message = #basic_message { id = MsgId },
State = #state { seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BSQ }) ->
+ backing_queue_state = BQS }) ->
%% Here, we need to deal with the possibility that we're about to
%% receive a message that we've already seen when we were a slave
%% (we received it via gm). Thus if we do receive such message now
@@ -266,14 +265,15 @@ validate_message(Message = #basic_message { id = MsgId },
%% only if we ourselves are not filtering out the msg.
{Result, BQS1} = BQ:validate_message(Message, BQS),
{Result, State #state { backing_queue_state = BQS1 }};
- {ok, {published, ChPid}} ->
+ {ok, {published, _ChPid}} ->
%% It already got published when we were a slave and no
%% confirmation is waiting. amqqueue_process will have, in
%% its msg_id_to_channel mapping, the entry for dealing
%% with the confirm when that comes back in. The msg is
- %% invalid. We will not see this again, so erase.
+ %% invalid. We will not see this again, nor will we be
+ %% further involved in confirming this message, so erase.
{invalid, State #state { seen_status = dict:erase(MsgId, SS) }};
- {ok, {confirmed, ChPid}} ->
+ {ok, {confirmed, _ChPid}} ->
%% It got confirmed before we became master, but we've
%% only just received the publish from the channel, so
%% couldn't previously know what the msg_seq_no was. Thus
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 87ce31d8df..68dd50e21c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -128,13 +128,17 @@ handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
%% get promoted then at that point we have no consumers, thus
%% 'false' is precisely the correct answer. However, we must be
%% careful to _not_ enqueue the message in this case.
+
+ %% Note this is distinct from the case where we receive the msg
+ %% via gm first, then we're promoted to master, and only then do
+ %% we receive the msg from the channel.
gen_server2:reply(From, false), %% master may deliver it, not us
- noreply(maybe_enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, false, State));
handle_call({deliver, Delivery = #delivery {}}, From, State) ->
%% Synchronous, "mandatory" delivery mode
gen_server2:reply(From, true), %% amqqueue throws away the result anyway
- noreply(maybe_enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, true, State));
handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
@@ -170,7 +174,7 @@ handle_cast({gm, Instruction}, State) ->
handle_cast({deliver, Delivery = #delivery {}}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- noreply(maybe_enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -438,6 +442,7 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
msg_seq_no = MsgSeqNo,
sender = ChPid },
+ EnqueueOnPromotion,
State = #state { sender_queues = SQ,
msg_id_status = MS }) ->
%% We will never see {published, ChPid, MsgSeqNo} here.
@@ -447,7 +452,8 @@ maybe_enqueue_message(
{ok, MQ1} -> MQ1;
error -> queue:new()
end,
- SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ),
+ SQ1 = dict:store(ChPid,
+ queue:in({Delivery, EnqueueOnPromotion}, MQ), SQ),
State #state { sender_queues = SQ1 };
{ok, {confirmed, ChPid}} ->
%% BQ has confirmed it but we didn't know what the