summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl13
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_variable_queue.erl6
5 files changed, 13 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b92a0d3ccb..3843ed1a36 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1279,8 +1279,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
case Requeue of
true -> requeue_and_run(AckTags, State1);
false -> Fun = dead_letter_fun(rejected, State),
- BQS1 =
- BQ:process_messages(AckTags, Fun, BQS),
+ BQS1 = BQ:fold(AckTags, Fun, BQS),
State1#q{backing_queue_state = BQS1}
end
end));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 11dcfa35f2..42627aae7e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -111,7 +111,7 @@ behaviour_info(callbacks) ->
%% Acktags supplied are for messages which should be
%% processed. The provided callback function is called with each
%% message.
- {process_messages, 3},
+ {fold, 3},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 880a54916f..4b298341de 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -21,7 +21,7 @@
requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3, process_messages/3]).
+ status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
-export([start/1, stop/0]).
@@ -249,12 +249,11 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-process_messages(AckTags, MsgFun,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQS1 = BQ:process_messages(AckTags, MsgFun, BQS),
- ok = gm:broadcast(GM, {process_messages, MsgFun, AckTags}),
+fold(AckTags, MsgFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:fold(AckTags, MsgFun, BQS),
+ ok = gm:broadcast(GM, {fold, MsgFun, AckTags}),
State #state { backing_queue_state = BQS1 }.
requeue(AckTags, State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 2f72e2ba40..98a80a2619 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -843,10 +843,10 @@ process_instruction({ack, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
-process_instruction({process_messages, MsgFun, AckTags},
+process_instruction({fold, MsgFun, AckTags},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- BQS1 = BQ:process_messages(AckTags, MsgFun, BQS),
+ BQS1 = BQ:fold(AckTags, MsgFun, BQS),
{ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9f187d927d..0d1f06f8ee 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
- multiple_routing_keys/0, process_messages/3]).
+ multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -636,9 +636,9 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-process_messages(_AckTags, undefined, State) ->
+fold(_AckTags, undefined, State) ->
State;
-process_messages(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
+fold(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
lists:foldl(
fun(SeqId, State1) ->
{MsgStatus, State2} =