summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl50
1 files changed, 26 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d6bcfad6ea..508427b4dd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -264,14 +264,14 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
case (IsMsgReady andalso
rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
true ->
- {{Msg, IsDelivered, AckTag}, FunAcc1, State1} =
+ {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc, State),
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, IsDelivered, Msg}),
+ {QName, self(), NextId, IsDelivered, Message}),
NewUAM =
case AckRequired of
- true -> dict:store(NextId, {Msg, AckTag}, UAM);
+ true -> dict:store(NextId, {Message, AckTag}, UAM);
false -> UAM
end,
NewC = C#cr{unsent_message_count = Count + 1,
@@ -317,13 +317,13 @@ deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
not IsEmpty.
deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
State = #q { variable_queue_state = VQS }) ->
- {{Msg, IsDelivered, AckTag, Remaining}, VQS1} =
+ {{Message, IsDelivered, AckTag, Remaining}, VQS1} =
rabbit_variable_queue:fetch(VQS),
AutoAcks1 = case AckRequired of
true -> AutoAcks;
false -> [AckTag | AutoAcks]
end,
- {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
+ {{Message, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
State #q { variable_queue_state = VQS1 }}.
run_message_queue(State = #q { variable_queue_state = VQS }) ->
@@ -335,7 +335,7 @@ run_message_queue(State = #q { variable_queue_state = VQS }) ->
VQS1 = rabbit_variable_queue:ack(AutoAcks, State1 #q.variable_queue_state),
State1 #q { variable_queue_state = VQS1 }.
-attempt_immediate_delivery(none, _ChPid, Msg, State) ->
+attempt_immediate_delivery(none, _ChPid, Message, State) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1) ->
@@ -344,27 +344,28 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
true ->
{AckTag1, VQS} =
rabbit_variable_queue:publish_delivered(
- Msg, State1 #q.variable_queue_state),
+ Message, State1 #q.variable_queue_state),
{AckTag1, State1 #q { variable_queue_state = VQS }};
false ->
{noack, State1}
end,
- {{Msg, false, AckTag}, true, State2}
+ {{Message, false, AckTag}, true, State2}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
- VQS = rabbit_variable_queue:tx_publish(Msg, State #q.variable_queue_state),
- record_pending_message(Txn, ChPid, Msg),
+attempt_immediate_delivery(Txn, ChPid, Message, State) ->
+ VQS = rabbit_variable_queue:tx_publish(
+ Message, State #q.variable_queue_state),
+ record_pending_message(Txn, ChPid, Message),
{true, State #q { variable_queue_state = VQS }}.
-deliver_or_enqueue(Txn, ChPid, Msg, State) ->
- case attempt_immediate_delivery(Txn, ChPid, Msg, State) of
+deliver_or_enqueue(Txn, ChPid, Message, State) ->
+ case attempt_immediate_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
{_SeqId, VQS} = rabbit_variable_queue:publish(
- Msg, State #q.variable_queue_state),
+ Message, State #q.variable_queue_state),
{false, NewState #q { variable_queue_state = VQS }}
end.
@@ -388,11 +389,12 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
0 < Len.
deliver_or_requeue_msgs_deliver(
- false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
- {{Msg, true, noack}, {Len - 1, [AckTag | AcksAcc], MsgsWithAcks}, State};
+ false, {Len, AcksAcc, [{Message, AckTag} | MsgsWithAcks]}, State) ->
+ {{Message, true, noack}, {Len - 1, [AckTag | AcksAcc], MsgsWithAcks},
+ State};
deliver_or_requeue_msgs_deliver(
- true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
- {{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
+ true, {Len, AcksAcc, [{Message, AckTag} | MsgsWithAcks]}, State) ->
+ {{Message, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -537,7 +539,7 @@ commit_transaction(Txn, From, State) ->
{MsgsWithAcks, Remaining} =
collect_messages(PendingAcksOrdered, UAM),
store_ch_record(C#cr{unacked_messages = Remaining}),
- [AckTag || {_Msg, AckTag} <- MsgsWithAcks]
+ [AckTag || {_Message, AckTag} <- MsgsWithAcks]
end,
{RunQueue, VQS} =
rabbit_variable_queue:tx_commit(
@@ -673,20 +675,20 @@ handle_call({basic_get, ChPid, NoAck}, _From,
}) ->
case rabbit_variable_queue:fetch(VQS) of
{empty, VQS1} -> reply(empty, State #q { variable_queue_state = VQS1 });
- {{Msg, IsDelivered, AckTag, Remaining}, VQS1} ->
+ {{Message, IsDelivered, AckTag, Remaining}, VQS1} ->
AckRequired = not(NoAck),
VQS2 =
case AckRequired of
true ->
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = dict:store(NextId, {Msg, AckTag}, UAM),
+ NewUAM = dict:store(NextId, {Message, AckTag}, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
VQS1;
false ->
rabbit_variable_queue:ack([AckTag], VQS1)
end,
- Message = {QName, self(), NextId, IsDelivered, Msg},
- reply({ok, Remaining, Message},
+ Msg = {QName, self(), NextId, IsDelivered, Message},
+ reply({ok, Remaining, Msg},
State #q { next_msg_id = NextId + 1, variable_queue_state = VQS2 })
end;
@@ -827,7 +829,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
none ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
VQS = rabbit_variable_queue:ack(
- [AckTag || {_Msg, AckTag} <- MsgWithAcks],
+ [AckTag || {_Message, AckTag} <- MsgWithAcks],
State #q.variable_queue_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
noreply(State #q { variable_queue_state = VQS });