diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2012-04-10 12:09:36 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2012-04-10 12:09:36 +0100 |
| commit | b4f0c71ffcce6e3b400942c55c42da738b5fc0bf (patch) | |
| tree | 17728cb46d59ae408f32dd164e9b0238fe89d277 /src | |
| parent | 0aad6109003d0c45df94704d019a0704b0f92e66 (diff) | |
| parent | 53afb468e8ac87998360228a2ee4726b25c6b1c6 (diff) | |
| download | rabbitmq-server-git-b4f0c71ffcce6e3b400942c55c42da738b5fc0bf.tar.gz | |
Merged bug24833 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 56 |
1 files changed, 26 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b7161a05c1..75b92f1fa6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -487,8 +487,10 @@ should_confirm_message(#delivery{sender = SenderPid, id = MsgId}}, #q{q = #amqqueue{durable = true}}) -> {eventually, SenderPid, MsgSeqNo, MsgId}; -should_confirm_message(_Delivery, _State) -> - immediately. +should_confirm_message(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo}, + _State) -> + {immediately, SenderPid, MsgSeqNo}. needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. @@ -497,6 +499,9 @@ maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> State#q{msg_id_to_channel = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; +maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + State; maybe_record_confirm_message(_Confirm, State) -> State. @@ -508,45 +513,35 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(Delivery = #delivery{sender = SenderPid, - message = Message, - msg_seq_no = MsgSeqNo}, +attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Confirm = should_confirm_message(Delivery, State), - case Confirm of - immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); - _ -> ok - end, case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - DeliverFun = - fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), - {AckTag, BQS3} = BQ:publish_delivered( - AckRequired, Message, Props, - SenderPid, BQS2), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS3}} - end, - {Delivered, State2} = - deliver_msgs_to_consumers(DeliverFun, false, - State#q{backing_queue_state = BQS1}), - {Delivered, Confirm, State2}; + deliver_msgs_to_consumers( + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> + Props = message_properties(Confirm, State1), + {AckTag, BQS3} = BQ:publish_delivered( + AckRequired, Message, Props, + SenderPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, false, State#q{backing_queue_state = BQS1}); {Duplicate, BQS1} -> %% if the message has previously been seen by the BQ then %% it must have been seen under the same circumstances as %% now: i.e. if it is now a deliver_immediately then it %% must have been before. - Delivered = case Duplicate of - published -> true; - discarded -> false - end, - {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + {case Duplicate of + published -> true; + discarded -> false + end, + State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, State) -> - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + Confirm = should_confirm_message(Delivery, State), + {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), case Delivered of @@ -1053,7 +1048,8 @@ handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), + Confirm = should_confirm_message(Delivery, State), + {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); false -> discard_delivery(Delivery, State1) |
