diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
5 files changed, 13 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b92a0d3ccb..3843ed1a36 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1279,8 +1279,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> case Requeue of true -> requeue_and_run(AckTags, State1); false -> Fun = dead_letter_fun(rejected, State), - BQS1 = - BQ:process_messages(AckTags, Fun, BQS), + BQS1 = BQ:fold(AckTags, Fun, BQS), State1#q{backing_queue_state = BQS1} end end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 11dcfa35f2..42627aae7e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -111,7 +111,7 @@ behaviour_info(callbacks) -> %% Acktags supplied are for messages which should be %% processed. The provided callback function is called with each %% message. - {process_messages, 3}, + {fold, 3}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 880a54916f..4b298341de 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -21,7 +21,7 @@ requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, process_messages/3]). + status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). -export([start/1, stop/0]). @@ -249,12 +249,11 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -process_messages(AckTags, MsgFun, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS}) -> - BQS1 = BQ:process_messages(AckTags, MsgFun, BQS), - ok = gm:broadcast(GM, {process_messages, MsgFun, AckTags}), +fold(AckTags, MsgFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:fold(AckTags, MsgFun, BQS), + ok = gm:broadcast(GM, {fold, MsgFun, AckTags}), State #state { backing_queue_state = BQS1 }. requeue(AckTags, State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 2f72e2ba40..98a80a2619 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -843,10 +843,10 @@ process_instruction({ack, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; -process_instruction({process_messages, MsgFun, AckTags}, +process_instruction({fold, MsgFun, AckTags}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQS1 = BQ:process_messages(AckTags, MsgFun, BQS), + BQS1 = BQ:fold(AckTags, MsgFun, BQS), {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9f187d927d..0d1f06f8ee 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0, process_messages/3]). + multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -636,9 +636,9 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -process_messages(_AckTags, undefined, State) -> +fold(_AckTags, undefined, State) -> State; -process_messages(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> +fold(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> lists:foldl( fun(SeqId, State1) -> {MsgStatus, State2} = |
