diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-02-11 10:12:07 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-02-11 10:12:07 +0000 |
| commit | 304cbc2740e5067985b8cf92aa72ba4012700451 (patch) | |
| tree | 25a098693ccda7cb699f1840b251e66c59c90396 /src | |
| parent | 8e1511d1bd65868eebe7a97c688ae44a3e666bfa (diff) | |
| download | rabbitmq-server-git-304cbc2740e5067985b8cf92aa72ba4012700451.tar.gz | |
refactoring: rename Msg to Message
that's what we have on default, and it matches the type (message())
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d6bcfad6ea..508427b4dd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -264,14 +264,14 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, case (IsMsgReady andalso rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> - {{Msg, IsDelivered, AckTag}, FunAcc1, State1} = + {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, IsDelivered, Msg}), + {QName, self(), NextId, IsDelivered, Message}), NewUAM = case AckRequired of - true -> dict:store(NextId, {Msg, AckTag}, UAM); + true -> dict:store(NextId, {Message, AckTag}, UAM); false -> UAM end, NewC = C#cr{unsent_message_count = Count + 1, @@ -317,13 +317,13 @@ deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, State = #q { variable_queue_state = VQS }) -> - {{Msg, IsDelivered, AckTag, Remaining}, VQS1} = + {{Message, IsDelivered, AckTag, Remaining}, VQS1} = rabbit_variable_queue:fetch(VQS), AutoAcks1 = case AckRequired of true -> AutoAcks; false -> [AckTag | AutoAcks] end, - {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, + {{Message, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, State #q { variable_queue_state = VQS1 }}. run_message_queue(State = #q { variable_queue_state = VQS }) -> @@ -335,7 +335,7 @@ run_message_queue(State = #q { variable_queue_state = VQS }) -> VQS1 = rabbit_variable_queue:ack(AutoAcks, State1 #q.variable_queue_state), State1 #q { variable_queue_state = VQS1 }. -attempt_immediate_delivery(none, _ChPid, Msg, State) -> +attempt_immediate_delivery(none, _ChPid, Message, State) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1) -> @@ -344,27 +344,28 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> true -> {AckTag1, VQS} = rabbit_variable_queue:publish_delivered( - Msg, State1 #q.variable_queue_state), + Message, State1 #q.variable_queue_state), {AckTag1, State1 #q { variable_queue_state = VQS }}; false -> {noack, State1} end, - {{Msg, false, AckTag}, true, State2} + {{Message, false, AckTag}, true, State2} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_immediate_delivery(Txn, ChPid, Msg, State) -> - VQS = rabbit_variable_queue:tx_publish(Msg, State #q.variable_queue_state), - record_pending_message(Txn, ChPid, Msg), +attempt_immediate_delivery(Txn, ChPid, Message, State) -> + VQS = rabbit_variable_queue:tx_publish( + Message, State #q.variable_queue_state), + record_pending_message(Txn, ChPid, Message), {true, State #q { variable_queue_state = VQS }}. -deliver_or_enqueue(Txn, ChPid, Msg, State) -> - case attempt_immediate_delivery(Txn, ChPid, Msg, State) of +deliver_or_enqueue(Txn, ChPid, Message, State) -> + case attempt_immediate_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers {_SeqId, VQS} = rabbit_variable_queue:publish( - Msg, State #q.variable_queue_state), + Message, State #q.variable_queue_state), {false, NewState #q { variable_queue_state = VQS }} end. @@ -388,11 +389,12 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> 0 < Len. deliver_or_requeue_msgs_deliver( - false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> - {{Msg, true, noack}, {Len - 1, [AckTag | AcksAcc], MsgsWithAcks}, State}; + false, {Len, AcksAcc, [{Message, AckTag} | MsgsWithAcks]}, State) -> + {{Message, true, noack}, {Len - 1, [AckTag | AcksAcc], MsgsWithAcks}, + State}; deliver_or_requeue_msgs_deliver( - true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> - {{Msg, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. + true, {Len, AcksAcc, [{Message, AckTag} | MsgsWithAcks]}, State) -> + {{Message, true, AckTag}, {Len - 1, AcksAcc, MsgsWithAcks}, State}. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -537,7 +539,7 @@ commit_transaction(Txn, From, State) -> {MsgsWithAcks, Remaining} = collect_messages(PendingAcksOrdered, UAM), store_ch_record(C#cr{unacked_messages = Remaining}), - [AckTag || {_Msg, AckTag} <- MsgsWithAcks] + [AckTag || {_Message, AckTag} <- MsgsWithAcks] end, {RunQueue, VQS} = rabbit_variable_queue:tx_commit( @@ -673,20 +675,20 @@ handle_call({basic_get, ChPid, NoAck}, _From, }) -> case rabbit_variable_queue:fetch(VQS) of {empty, VQS1} -> reply(empty, State #q { variable_queue_state = VQS1 }); - {{Msg, IsDelivered, AckTag, Remaining}, VQS1} -> + {{Message, IsDelivered, AckTag, Remaining}, VQS1} -> AckRequired = not(NoAck), VQS2 = case AckRequired of true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, {Msg, AckTag}, UAM), + NewUAM = dict:store(NextId, {Message, AckTag}, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), VQS1; false -> rabbit_variable_queue:ack([AckTag], VQS1) end, - Message = {QName, self(), NextId, IsDelivered, Msg}, - reply({ok, Remaining, Message}, + Msg = {QName, self(), NextId, IsDelivered, Message}, + reply({ok, Remaining, Msg}, State #q { next_msg_id = NextId + 1, variable_queue_state = VQS2 }) end; @@ -827,7 +829,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> none -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), VQS = rabbit_variable_queue:ack( - [AckTag || {_Msg, AckTag} <- MsgWithAcks], + [AckTag || {_Message, AckTag} <- MsgWithAcks], State #q.variable_queue_state), store_ch_record(C#cr{unacked_messages = Remaining}), noreply(State #q { variable_queue_state = VQS }); |
