diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 16:16:44 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 16:16:44 +0100 |
| commit | b4e72e423bb26ad9b139b6fe86571ae150a0b72c (patch) | |
| tree | 8958dcdf416ef33697433c846e73660d65fe8216 | |
| parent | ea9361d575c3f1ea292c1880e49fabab747e2e79 (diff) | |
| download | rabbitmq-server-git-b4e72e423bb26ad9b139b6fe86571ae150a0b72c.tar.gz | |
simplified code a bit
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 |
1 files changed, 7 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2f03846204..4fd503803f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -324,7 +324,7 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun, ConfirmFun}, FunAcc, +deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers}) -> @@ -344,7 +344,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun, ConfirmFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - ConfirmFun(Message), + confirm_message(Message), ChAckTags1 = case AckRequired of true -> sets:add_element(AckTag, ChAckTags); false -> ChAckTags @@ -398,7 +398,7 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. -confirm_function(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) -> +confirm_message(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) -> case MsgSeqNo of undefined -> ok; _ -> rabbit_channel:confirm(ChPid, MsgSeqNo) @@ -406,13 +406,12 @@ confirm_function(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) -> run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, - fun deliver_from_queue_deliver/3, - fun confirm_function/1}, + fun deliver_from_queue_deliver/3}, IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo}, +attempt_delivery(none, _ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ}) -> rabbit_log:info("Attempting delivery of message #~p~n", [MsgSeqNo]), PredFun = fun (IsEmpty, _State) -> not IsEmpty end, @@ -423,8 +422,7 @@ attempt_delivery(none, ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo}, {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, - ConfirmFun = fun confirm_function/1, - deliver_msgs_to_consumers({ PredFun, DeliverFun, ConfirmFun }, false, State); + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), @@ -688,7 +686,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - confirm_function(Message), + confirm_message(Message), case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), store_ch_record( |
