diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 28 |
6 files changed, 55 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ed6ae9b96b..dcd6ae08ba 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -764,7 +764,7 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State1 = lists:foldl(fun monitor_queue/2, State, QPids), State2 = State1#q{publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS), + [] -> {_, BQS1} = BQ:ack([AckTag], BQS), cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); _ -> State3 = lists:foldl( @@ -835,7 +835,7 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, MsgSeqNo, {QPids1, AckTag}, UMQ1)} end end, {[], UMQ}, MsgSeqNos), - {_Guids, BQS1} = BQ:ack(AckTags1, undefined, BQS), + {_Guids, BQS1} = BQ:ack(AckTags1, BQS), MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), gb_sets:from_list(MsgSeqNos)), State1 = case gb_sets:is_empty(MsgSeqNos1) of @@ -1298,7 +1298,7 @@ handle_cast({ack, AckTags, ChPid}, State) -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_Guids, BQS1} = - BQ:ack(AckTags, undefined, BQS), + BQ:ack(AckTags, BQS), State1#q{backing_queue_state = BQS1} end)); @@ -1310,8 +1310,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> case Requeue of true -> requeue_and_run(AckTags, State1); false -> Fun = dead_letter_fun(rejected, State), - {_Guids, BQS1} = - BQ:ack(AckTags, Fun, BQS), + BQS1 = + BQ:process_messages(AckTags, Fun, BQS), State1#q{backing_queue_state = BQS1} end end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 50e4746230..11dcfa35f2 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -105,9 +105,13 @@ behaviour_info(callbacks) -> %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as - %% Acks. A callback function is supplied allowing callers to - %% access messages that are being acked. - {ack, 3}, + %% Acks. + {ack, 2}, + + %% Acktags supplied are for messages which should be + %% processed. The provided callback function is called with each + %% message. + {process_messages, 3}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 8d7b9ded8f..2b69e9fc19 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,11 +17,11 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/3, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3]). + status/1, invoke/3, is_duplicate/2, discard/3, process_messages/3]). -export([start/1, stop/0]). @@ -236,19 +236,27 @@ fetch(AckRequired, State = #state { gm = GM, ack_msg_id = AM1 }} end. -ack(AckTags, MsgFun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> - {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS), +ack(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, BQS), AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds}) + _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. +process_messages(AckTags, MsgFun, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:process_messages(AckTags, BQS), + ok = gm:broadcast(GM, {process_messages, MsgFun, AckTags}), + State #state { backing_queue_state = BQS1 }. + requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 29a2e8bd71..2f72e2ba40 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -834,15 +834,20 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, %% we must be shorter than the master State end}; -process_instruction({ack, MsgFun, MsgIds}, +process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {MsgIds1, BQS1} = BQ:ack(AckTags, MsgFun, BQS), + {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; +process_instruction({process_messages, MsgFun, AckTags}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:process_messages(AckTags, MsgFun, BQS), + {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 434366485a..165bdbe246 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2353,9 +2353,7 @@ test_dropwhile(VQ0) -> VQ2 = rabbit_variable_queue:dropwhile( fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 - end, - dummy_msg_fun(), - VQ1), + end, undefined, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2369,17 +2367,14 @@ test_dropwhile(VQ0) -> VQ4. -dummy_msg_fun() -> fun(_Msg, _SeqId) -> ok end. - test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), VQ3 = rabbit_variable_queue:dropwhile( - fun(_) -> false end, dummy_msg_fun(), VQ2), + fun(_) -> false end, undefined, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile( - fun(_) -> false end, dummy_msg_fun(), VQ5). + rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5). test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -2404,7 +2399,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, undefined, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2414,7 +2409,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], undefined, VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2448,8 +2443,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, - undefined, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a5f1c2122e..225fc2ddaa 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,11 +18,11 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0]). + multiple_routing_keys/0, process_messages/3]). -export([start/1, stop/0]). @@ -613,10 +613,10 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. -ack([], _Fun, State) -> +ack([], State) -> {[], State}; -ack(AckTags, undefined, State) -> +ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, @@ -635,16 +635,16 @@ ack(AckTags, undefined, State) -> {lists:reverse(AllMsgIds), a(State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) })}; - -ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> - {[], lists:foldl( - fun(SeqId, State1) -> - {MsgStatus, State2} = - read_msg(gb_trees:get(SeqId, PA), State1), - MsgFun(MsgStatus#msg_status.msg, SeqId), - State2 - end, State, AckTags)}. + ack_out_counter = AckOutCount + length(AckTags) })}. + +process_messages(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> + lists:foldl( + fun(SeqId, State1) -> + {MsgStatus, State2} = + read_msg(gb_trees:get(SeqId, PA), State1), + MsgFun(MsgStatus#msg_status.msg, SeqId), + State2 + end, State, AckTags). requeue(AckTags, #vqstate { delta = Delta, q3 = Q3, |
