summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl28
1 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 30822aa6ae..5a93670f31 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -478,8 +478,9 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
case gb_trees:lookup(MsgId, MTC0) of
- {value, {ChPid, MsgSeqNo}} ->
- {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ {value, {SenderPid, MsgSeqNo}} ->
+ {rabbit_misc:gb_trees_cons(SenderPid,
+ MsgSeqNo, CMs),
gb_trees:delete(MsgId, MTC0)};
none ->
{CMs, MTC0}
@@ -490,22 +491,23 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
never;
-should_confirm_message(#delivery{sender = ChPid,
+should_confirm_message(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
#q{q = #amqqueue{durable = true}}) ->
- {eventually, ChPid, MsgSeqNo, MsgId};
+ {eventually, SenderPid, MsgSeqNo, MsgId};
should_confirm_message(_Delivery, _State) ->
immediately.
needs_confirming({eventually, _, _, _}) -> true;
needs_confirming(_) -> false.
-maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
+maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)};
+ State#q{msg_id_to_channel =
+ gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
maybe_record_confirm_message(_Confirm, State) ->
State.
@@ -517,13 +519,13 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(Delivery = #delivery{sender = ChPid,
+attempt_delivery(Delivery = #delivery{sender = SenderPid,
message = Message,
msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
case Confirm of
- immediately -> rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]);
+ immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]);
_ -> ok
end,
case BQ:is_duplicate(Message, BQS) of
@@ -538,7 +540,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
needs_confirming = needs_confirming(Confirm)},
- ChPid, BQS2),
+ SenderPid, BQS2),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS3}}
end,
@@ -559,7 +561,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = ChPid}, State) ->
+ sender = SenderPid}, State) ->
{Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
@@ -567,7 +569,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
true -> State2;
false -> Props = (message_properties(State)) #message_properties{
needs_confirming = needs_confirming(Confirm)},
- BQS1 = BQ:publish(Message, Props, ChPid, BQS),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
@@ -688,11 +690,11 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-discard_delivery(#delivery{sender = ChPid,
+discard_delivery(#delivery{sender = SenderPid,
message = Message},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
+ State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.