diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-03 12:10:55 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-03 12:10:55 +0100 |
| commit | 1aef908aaf60adcbd3ef49f67baa8186a8ac4931 (patch) | |
| tree | d146dd7e9bf9969ed38ec693b3132b1edae93875 /src | |
| parent | 06d8fc293c2061b1c3ee995239242823f2d2b795 (diff) | |
| parent | c9c16fd96427d6e9a860b55fdd28294ec8fe6159 (diff) | |
| download | rabbitmq-server-git-1aef908aaf60adcbd3ef49f67baa8186a8ac4931.tar.gz | |
Merge bug25199
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 |
1 files changed, 14 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e68d691e3f..f92ca16a03 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -575,14 +575,13 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, State2#q{backing_queue_state = BQS1}) end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> - run_backing_queue(BQ, fun (M, BQS) -> - {_MsgIds, BQS1} = M:requeue(AckTags, BQS), - BQS1 - end, State). - -fetch(AckRequired, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +requeue_and_run(AckTags, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + run_message_queue(State#q{backing_queue_state = BQS1}). + +fetch(AckRequired, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), {Result, State#q{backing_queue_state = BQS1}}. @@ -676,12 +675,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -backing_queue_timeout(State = #q{backing_queue = BQ}) -> - run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). - -run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). +backing_queue_timeout(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + State#q{backing_queue_state = BQ:timeout(BQS)}. subtract_acks(ChPid, AckTags, State, Fun) -> case lookup_ch(ChPid) of @@ -1193,8 +1189,10 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); -handle_cast({run_backing_queue, Mod, Fun}, State) -> - noreply(run_backing_queue(Mod, Fun, State)); +handle_cast({run_backing_queue, Mod, Fun}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + noreply(run_message_queue( + State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)})); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State = #q{senders = Senders}) -> |
