summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2018-12-05 00:04:05 +0300
committerGitHub <noreply@github.com>2018-12-05 00:04:05 +0300
commitdb888df2a9156fefda626f57cbb9d9591e27d41f (patch)
tree3a800213985a72475f8aeee459456a0505410a93
parent3abfe381ae7f481da149d42276d145f396efed5d (diff)
parent11002abb8cbf71cea43519fde9efed7cbb87c4ef (diff)
downloadrabbitmq-server-git-db888df2a9156fefda626f57cbb9d9591e27d41f.tar.gz
Merge pull request #1774 from noxdafox/master
RFC rabbit_amqqueue_process: improve message duplicates handling
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_mirror_queue_master.erl6
2 files changed, 37 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c2dab3da6f..52925ce165 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.
-maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
+maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
+ Delivered,
+ State = #q{overflow = Overflow,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
_ ->
- %% Enqueue and maybe drop head later
- deliver_or_enqueue(Delivery, Delivered, State)
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case IsDuplicate of
+ true -> State1;
+ {true, drop} -> State1;
+ %% Drop publish and nack to publisher
+ {true, reject} ->
+ send_reject_publish(Delivery, Delivered, State1);
+ %% Enqueue and maybe drop head later
+ false ->
+ deliver_or_enqueue(Delivery, Delivered, State1)
+ end
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
- Delivered, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ Delivered,
+ State = #q{backing_queue = BQ}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
- {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
- State2 = State1#q{backing_queue_state = BQS1},
- case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
- State2) of
- true ->
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
+ {delivered, State2} ->
State2;
- {delivered, State3} ->
- State3;
%% The next one is an optimisation
- {undelivered, State3 = #q{ttl = 0, dlx = undefined,
- backing_queue_state = BQS2,
+ {undelivered, State2 = #q{ttl = 0, dlx = undefined,
+ backing_queue_state = BQS,
msg_id_to_channel = MTC}} ->
- {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
- State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
- {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
-
- BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
- {Dropped, State4 = #q{backing_queue_state = BQS4}} =
- maybe_drop_head(State3#q{backing_queue_state = BQS3}),
- QLen = BQ:len(BQS4),
+ {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
+ State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
+ {undelivered, State2 = #q{backing_queue_state = BQS}} ->
+
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS),
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
@@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
- {false, false, _} -> State4;
- {true, true, undefined} -> State4;
- {_, _, _} -> drop_expired_msgs(State4)
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
@@ -1610,7 +1619,3 @@ update_ha_mode(State) ->
{ok, Q} = rabbit_amqqueue:lookup(qname(State)),
ok = rabbit_mirror_queue_misc:update_mirrors(Q),
State.
-
-
-
-
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a3050c570f..04353423cc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = maps:remove(MsgId, SS) }};
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
@@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
- {true, State #state { seen_status = maps:remove(MsgId, SS),
- confirmed = [MsgId | Confirmed] }}
+ {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
set_queue_mode(Mode, State = #state { gm = GM,