summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-24 13:10:40 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-24 13:10:40 +0000
commit93ca3857e92e73f0e1fbfe8e83fd6f76f22b3425 (patch)
treef923b129ee77dce6cf9c3a902d2be6b854f0a005 /src
parent948082ce50fbaaa204f1f4337568390a822590a9 (diff)
downloadrabbitmq-server-git-93ca3857e92e73f0e1fbfe8e83fd6f76f22b3425.tar.gz
rename the weird VQ:ack/3 head to VQ:process_messages/3
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_backing_queue.erl10
-rw-r--r--src/rabbit_mirror_queue_master.erl24
-rw-r--r--src/rabbit_mirror_queue_slave.erl9
-rw-r--r--src/rabbit_tests.erl18
-rw-r--r--src/rabbit_variable_queue.erl28
6 files changed, 55 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ed6ae9b96b..dcd6ae08ba 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -764,7 +764,7 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State1 = lists:foldl(fun monitor_queue/2, State, QPids),
State2 = State1#q{publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS),
+ [] -> {_, BQS1} = BQ:ack([AckTag], BQS),
cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
_ -> State3 =
lists:foldl(
@@ -835,7 +835,7 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
MsgSeqNo, {QPids1, AckTag}, UMQ1)}
end
end, {[], UMQ}, MsgSeqNos),
- {_Guids, BQS1} = BQ:ack(AckTags1, undefined, BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags1, BQS),
MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
gb_sets:from_list(MsgSeqNos)),
State1 = case gb_sets:is_empty(MsgSeqNos1) of
@@ -1298,7 +1298,7 @@ handle_cast({ack, AckTags, ChPid}, State) ->
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_Guids, BQS1} =
- BQ:ack(AckTags, undefined, BQS),
+ BQ:ack(AckTags, BQS),
State1#q{backing_queue_state = BQS1}
end));
@@ -1310,8 +1310,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
case Requeue of
true -> requeue_and_run(AckTags, State1);
false -> Fun = dead_letter_fun(rejected, State),
- {_Guids, BQS1} =
- BQ:ack(AckTags, Fun, BQS),
+ BQS1 =
+ BQ:process_messages(AckTags, Fun, BQS),
State1#q{backing_queue_state = BQS1}
end
end));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 50e4746230..11dcfa35f2 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -105,9 +105,13 @@ behaviour_info(callbacks) ->
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as
- %% Acks. A callback function is supplied allowing callers to
- %% access messages that are being acked.
- {ack, 3},
+ %% Acks.
+ {ack, 2},
+
+ %% Acktags supplied are for messages which should be
+ %% processed. The provided callback function is called with each
+ %% message.
+ {process_messages, 3},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8d7b9ded8f..2b69e9fc19 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,11 +17,11 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/3,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3]).
+ status/1, invoke/3, is_duplicate/2, discard/3, process_messages/3]).
-export([start/1, stop/0]).
@@ -236,19 +236,27 @@ fetch(AckRequired, State = #state { gm = GM,
ack_msg_id = AM1 }}
end.
-ack(AckTags, MsgFun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
- {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS),
+ack(AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ {MsgIds, BQS1} = BQ:ack(AckTags, BQS),
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds})
+ _ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
+process_messages(AckTags, MsgFun,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:process_messages(AckTags, BQS),
+ ok = gm:broadcast(GM, {process_messages, MsgFun, AckTags}),
+ State #state { backing_queue_state = BQS1 }.
+
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 29a2e8bd71..2f72e2ba40 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -834,15 +834,20 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
%% we must be shorter than the master
State
end};
-process_instruction({ack, MsgFun, MsgIds},
+process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {MsgIds1, BQS1} = BQ:ack(AckTags, MsgFun, BQS),
+ {MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
+process_instruction({process_messages, MsgFun, AckTags},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:process_messages(AckTags, MsgFun, BQS),
+ {ok, State #state { backing_queue_state = BQS1 }};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 434366485a..165bdbe246 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2353,9 +2353,7 @@ test_dropwhile(VQ0) ->
VQ2 = rabbit_variable_queue:dropwhile(
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
- end,
- dummy_msg_fun(),
- VQ1),
+ end, undefined, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2369,17 +2367,14 @@ test_dropwhile(VQ0) ->
VQ4.
-dummy_msg_fun() -> fun(_Msg, _SeqId) -> ok end.
-
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
VQ3 = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, dummy_msg_fun(), VQ2),
+ fun(_) -> false end, undefined, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(
- fun(_) -> false end, dummy_msg_fun(), VQ5).
+ rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5).
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -2404,7 +2399,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, undefined, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2414,7 +2409,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], undefined, VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2448,8 +2443,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
- undefined, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a5f1c2122e..225fc2ddaa 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,11 +18,11 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1,
+ dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
- multiple_routing_keys/0]).
+ multiple_routing_keys/0, process_messages/3]).
-export([start/1, stop/0]).
@@ -613,10 +613,10 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
-ack([], _Fun, State) ->
+ack([], State) ->
{[], State};
-ack(AckTags, undefined, State) ->
+ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
@@ -635,16 +635,16 @@ ack(AckTags, undefined, State) ->
{lists:reverse(AllMsgIds),
a(State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) })};
-
-ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
- {[], lists:foldl(
- fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)}.
+ ack_out_counter = AckOutCount + length(AckTags) })}.
+
+process_messages(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
+ lists:foldl(
+ fun(SeqId, State1) ->
+ {MsgStatus, State2} =
+ read_msg(gb_trees:get(SeqId, PA), State1),
+ MsgFun(MsgStatus#msg_status.msg, SeqId),
+ State2
+ end, State, AckTags).
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,