diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 |
2 files changed, 37 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c2dab3da6f..52925ce165 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, State#q{consumers = Consumers})} end. -maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) -> +maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, + Delivered, + State = #q{overflow = Overflow, + backing_queue = BQ, + backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher send_reject_publish(Delivery, Delivered, State); _ -> - %% Enqueue and maybe drop head later - deliver_or_enqueue(Delivery, Delivered, State) + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case IsDuplicate of + true -> State1; + {true, drop} -> State1; + %% Drop publish and nack to publisher + {true, reject} -> + send_reject_publish(Delivery, Delivered, State1); + %% Enqueue and maybe drop head later + false -> + deliver_or_enqueue(Delivery, Delivered, State1) + end end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid, flow = Flow}, - Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + Delivered, + State = #q{backing_queue = BQ}) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State1), - {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), - State2 = State1#q{backing_queue_state = BQS1}, - case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, - State2) of - true -> + case attempt_delivery(Delivery, Props, Delivered, State1) of + {delivered, State2} -> State2; - {delivered, State3} -> - State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined, - backing_queue_state = BQS2, + {undelivered, State2 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS, msg_id_to_channel = MTC}} -> - {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), - State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; - {undelivered, State3 = #q{backing_queue_state = BQS2}} -> - - BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), - {Dropped, State4 = #q{backing_queue_state = BQS4}} = - maybe_drop_head(State3#q{backing_queue_state = BQS3}), - QLen = BQ:len(BQS4), + {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC), + State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}; + {undelivered, State2 = #q{backing_queue_state = BQS}} -> + + BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS), + {Dropped, State3 = #q{backing_queue_state = BQS2}} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS2), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an @@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped, QLen =:= 1, Props#message_properties.expiry} of - {false, false, _} -> State4; - {true, true, undefined} -> State4; - {_, _, _} -> drop_expired_msgs(State4) + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) end end. @@ -1610,7 +1619,3 @@ update_ha_mode(State) -> {ok, Q} = rabbit_amqqueue:lookup(qname(State)), ok = rabbit_mirror_queue_misc:update_mirrors(Q), State. - - - - diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a3050c570f..04353423cc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {true, State #state { seen_status = maps:remove(MsgId, SS) }}; + {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }}; {ok, Disposition} when Disposition =:= confirmed %% It got published when we were a slave via gm, and @@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Message was discarded while we were a slave. Confirm now. %% As above, amqqueue_process will have the entry for the %% msg_id_to_channel mapping. - {true, State #state { seen_status = maps:remove(MsgId, SS), - confirmed = [MsgId | Confirmed] }} + {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. set_queue_mode(Mode, State = #state { gm = GM, |
