diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 63 |
1 files changed, 34 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 334980bf49..f5b7957dc7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -658,44 +658,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, State#q{consumers = Consumers})} end. -maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) -> +maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, + Delivered, + State = #q{overflow = Overflow, + backing_queue = BQ, + backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher send_reject_publish(Delivery, Delivered, State); _ -> - %% Enqueue and maybe drop head later - deliver_or_enqueue(Delivery, Delivered, State) + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case IsDuplicate of + true -> State1; + {true, drop} -> State1; + %% Drop publish and nack to publisher + {true, reject} -> + send_reject_publish(Delivery, Delivered, State1); + %% Enqueue and maybe drop head later + false -> + deliver_or_enqueue(Delivery, Delivered, State1) + end end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid, flow = Flow}, - Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + Delivered, + State = #q{backing_queue = BQ}) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State1), - {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), - State2 = State1#q{backing_queue_state = BQS1}, - case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, - State2) of - true -> + case attempt_delivery(Delivery, Props, Delivered, State1) of + {delivered, State2} -> State2; - {delivered, State3} -> - State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined, - backing_queue_state = BQS2, + {undelivered, State2 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS, msg_id_to_channel = MTC}} -> - {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), - State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; - {undelivered, State3 = #q{backing_queue_state = BQS2}} -> - - BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), - {Dropped, State4 = #q{backing_queue_state = BQS4}} = - maybe_drop_head(State3#q{backing_queue_state = BQS3}), - QLen = BQ:len(BQS4), + {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC), + State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}; + {undelivered, State2 = #q{backing_queue_state = BQS}} -> + + BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS), + {Dropped, State3 = #q{backing_queue_state = BQS2}} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS2), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an @@ -704,9 +713,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped, QLen =:= 1, Props#message_properties.expiry} of - {false, false, _} -> State4; - {true, true, undefined} -> State4; - {_, _, _} -> drop_expired_msgs(State4) + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) end end. @@ -1630,7 +1639,3 @@ update_ha_mode(State) -> {ok, Q} = rabbit_amqqueue:lookup(qname(State)), ok = rabbit_mirror_queue_misc:update_mirrors(Q), State. - - - - |
