summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 16:16:44 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 16:16:44 +0100
commitb4e72e423bb26ad9b139b6fe86571ae150a0b72c (patch)
tree8958dcdf416ef33697433c846e73660d65fe8216
parentea9361d575c3f1ea292c1880e49fabab747e2e79 (diff)
downloadrabbitmq-server-git-b4e72e423bb26ad9b139b6fe86571ae150a0b72c.tar.gz
simplified code a bit
-rw-r--r--src/rabbit_amqqueue_process.erl16
1 files changed, 7 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2f03846204..4fd503803f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -324,7 +324,7 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun, ConfirmFun}, FunAcc,
+deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers}) ->
@@ -344,7 +344,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun, ConfirmFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- ConfirmFun(Message),
+ confirm_message(Message),
ChAckTags1 = case AckRequired of
true -> sets:add_element(AckTag, ChAckTags);
false -> ChAckTags
@@ -398,7 +398,7 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
-confirm_function(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) ->
+confirm_message(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) ->
case MsgSeqNo of
undefined -> ok;
_ -> rabbit_channel:confirm(ChPid, MsgSeqNo)
@@ -406,13 +406,12 @@ confirm_function(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) ->
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
- fun deliver_from_queue_deliver/3,
- fun confirm_function/1},
+ fun deliver_from_queue_deliver/3},
IsEmpty = BQ:is_empty(BQS),
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
-attempt_delivery(none, ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo},
+attempt_delivery(none, _ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ}) ->
rabbit_log:info("Attempting delivery of message #~p~n", [MsgSeqNo]),
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
@@ -423,8 +422,7 @@ attempt_delivery(none, ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo},
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
- ConfirmFun = fun confirm_function/1,
- deliver_msgs_to_consumers({ PredFun, DeliverFun, ConfirmFun }, false, State);
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
attempt_delivery(Txn, ChPid, Message,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
@@ -688,7 +686,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case BQ:fetch(AckRequired, BQS) of
{empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
- confirm_function(Message),
+ confirm_message(Message),
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
store_ch_record(