diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 30822aa6ae..5a93670f31 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -478,8 +478,9 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> lists:foldl( fun(MsgId, {CMs, MTC0}) -> case gb_trees:lookup(MsgId, MTC0) of - {value, {ChPid, MsgSeqNo}} -> - {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs), + {value, {SenderPid, MsgSeqNo}} -> + {rabbit_misc:gb_trees_cons(SenderPid, + MsgSeqNo, CMs), gb_trees:delete(MsgId, MTC0)}; none -> {CMs, MTC0} @@ -490,22 +491,23 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> never; -should_confirm_message(#delivery{sender = ChPid, +should_confirm_message(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, #q{q = #amqqueue{durable = true}}) -> - {eventually, ChPid, MsgSeqNo, MsgId}; + {eventually, SenderPid, MsgSeqNo, MsgId}; should_confirm_message(_Delivery, _State) -> immediately. needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. -maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, +maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> - State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)}; + State#q{msg_id_to_channel = + gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; maybe_record_confirm_message(_Confirm, State) -> State. @@ -517,13 +519,13 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(Delivery = #delivery{sender = ChPid, +attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case Confirm of - immediately -> rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]); + immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); _ -> ok end, case BQ:is_duplicate(Message, BQS) of @@ -538,7 +540,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ needs_confirming = needs_confirming(Confirm)}, - ChPid, BQS2), + SenderPid, BQS2), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, @@ -559,7 +561,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, end. deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = ChPid}, State) -> + sender = SenderPid}, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), @@ -567,7 +569,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, true -> State2; false -> Props = (message_properties(State)) #message_properties{ needs_confirming = needs_confirming(Confirm)}, - BQS1 = BQ:publish(Message, Props, ChPid, BQS), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -688,11 +690,11 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -discard_delivery(#delivery{sender = ChPid, +discard_delivery(#delivery{sender = SenderPid, message = Message}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. + State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. |
