diff options
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 101 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 54 |
8 files changed, 130 insertions, 123 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 602f598ea7..1e870bb74b 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -48,14 +48,14 @@ rabbit_types:message_properties(), pid(), state()) -> {undefined, state()}). -spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). --spec(dropwhile/4 :: +-spec(dropwhile/3 :: (fun ((rabbit_types:message_properties()) -> boolean()), - msg_fun(), non_neg_integer(), state()) - -> {non_neg_integer(), state()}). + msg_fun(), state()) + -> state()). -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). --spec(ack/4 :: ([ack()], msg_fun(), non_neg_integer(), state()) -> - {[rabbit_guid:guid()], non_neg_integer(), state()}). +-spec(ack/3 :: ([ack()], msg_fun(), state()) -> + {[rabbit_guid:guid()], state()}). -spec(requeue/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 24bebc368d..f7756232cf 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -51,6 +51,7 @@ ttl, ttl_timer_ref, publish_seqno, + unconfirmed, dlx }). @@ -133,6 +134,7 @@ init(Q) -> ttl = undefined, dlx = undefined, publish_seqno = 1, + unconfirmed = gb_trees:empty(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -156,6 +158,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, expiry_timer_ref = undefined, ttl = undefined, publish_seqno = 1, + unconfirmed = gb_trees:empty(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -474,12 +477,9 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> {CMs, MTC0} end end, {gb_trees:empty(), MTC}, MsgIds), - rabbit_misc:gb_trees_foreach(fun confirm_to_sender/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. -confirm_to_sender(Pid, MsgSeqNos) -> - gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). - should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> never; should_confirm_message(#delivery{sender = ChPid, @@ -516,7 +516,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case Confirm of - immediately -> confirm_to_sender(ChPid, [MsgSeqNo]); + immediately -> rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]); _ -> ok end, case BQ:is_duplicate(Message, BQS) of @@ -692,17 +692,13 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ, - publish_seqno = MsgSeqNo}) -> + backing_queue = BQ}) -> Now = now_micros(), - {MsgSeqNo1, BQS1} = - BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - mk_dead_letter_fun(expired, State), - MsgSeqNo, - BQS), - ensure_ttl_timer(State#q{backing_queue_state = BQS1, - publish_seqno = MsgSeqNo1}). + BQS1 = BQ:dropwhile( + fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + mk_dead_letter_fun(expired, State), + BQS), + ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, backing_queue_state = BQS, @@ -718,38 +714,40 @@ ensure_ttl_timer(State) -> State. mk_dead_letter_fun(_Reason, #q{dlx = undefined}) -> - fun(_MsgLookupFun, MsgSeqNo, BQS) -> {MsgSeqNo, BQS} end; -mk_dead_letter_fun(Reason, State) -> - fun(MsgLookupFun, MsgSeqNo, BQS) -> + fun(_MsgLookupFun, _Extra, BQS) -> BQS end; +mk_dead_letter_fun(Reason, _State) -> + fun(MsgLookupFun, Extra, BQS) -> {Msg, BQS1} = MsgLookupFun(BQS), - MsgSeqNo1 = dead_letter_msg(Msg, Reason, MsgSeqNo, State), - {MsgSeqNo1, BQS1} + gen_server2:cast(self(), {dead_letter, {Msg, Extra}, Reason}), + BQS1 end. maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) -> State; maybe_dead_letter_queue(Reason, State = #q{ backing_queue_state = BQS, - backing_queue = BQ, - publish_seqno = MsgSeqNo}) -> + backing_queue = BQ}) -> case BQ:fetch(false, BQS) of {empty, BQS1} -> State#q{backing_queue_state = BQS1}; {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} -> - MsgSeqNo1 = dead_letter_msg(Msg, Reason, MsgSeqNo, State), - maybe_dead_letter_queue(Reason, - State#q{backing_queue_state = BQS1, - publish_seqno = MsgSeqNo1}) + State1 = dead_letter_msg(Msg, undefined, Reason, + State#q{backing_queue_state = BQS1}), + maybe_dead_letter_queue(Reason, State1) end. -dead_letter_msg(Msg, Reason, MsgSeqNo, State = #q{dlx = DLX}) -> +dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = Unconfirmed, + dlx = DLX}) -> rabbit_exchange:lookup_or_die(DLX), rabbit_basic:publish( rabbit_basic:delivery( false, false, make_dead_letter_msg(DLX, Reason, Msg, State), MsgSeqNo)), - MsgSeqNo+1. + State#q{publish_seqno = MsgSeqNo + 1, + unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, + Unconfirmed)}. make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content}, State) -> @@ -1109,28 +1107,23 @@ handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS, - publish_seqno = MsgSeqNo}) -> - Fun = fun(_, MsgSeqNo1, BQS0) -> {MsgSeqNo1, BQS0} end, - {_Guids, MsgSeqNo1, BQS1} = - BQ:ack(AckTags, Fun, MsgSeqNo, BQS), - State1#q{backing_queue_state = BQS1, - publish_seqno = MsgSeqNo1} + backing_queue_state = BQS}) -> + {_Guids, BQS1} = + BQ:ack(AckTags, undefined, BQS), + State1#q{backing_queue_state = BQS1} end)); handle_cast({reject, AckTags, Requeue, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS, - publish_seqno = MsgSeqNo}) -> + backing_queue_state = BQS}) -> case Requeue of true -> requeue_and_run(AckTags, State1); false -> Fun = mk_dead_letter_fun(rejected, State), - {_Guids, MsgSeqNo1, BQS1} = - BQ:ack(AckTags, Fun, MsgSeqNo, BQS), - State1#q{backing_queue_state = BQS1, - publish_seqno = MsgSeqNo1} + {_Guids, BQS1} = + BQ:ack(AckTags, Fun, BQS), + State1#q{backing_queue_state = BQS1} end end)); @@ -1188,9 +1181,29 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> end, noreply(State); -handle_cast({confirm, MsgSeqNos, From}, State) -> - rabbit_log:info("Got a confirm for ~p~n", [MsgSeqNos]), - noreply(State). +handle_cast({confirm, MsgSeqNos, _From}, State) -> + noreply(lists:foldl( + fun (MsgSeqNo, + State1 = #q{unconfirmed = Unconfirmed, + backing_queue = BQ, + backing_queue_state = BQS}) -> + Reason = gb_trees:get(MsgSeqNo, Unconfirmed), + case Reason of + {expired, _} -> + ok; + {rejected, AckTag} -> + BQ:ack([AckTag], undefined, BQS); + {queue_deleted, _} -> + ok; + {queue_purged, _} -> + ok + end, + State1#q{unconfirmed = gb_trees:delete(MsgSeqNo, + Unconfirmed)} + end, State, MsgSeqNos)); + +handle_cast({dead_letter, {Msg, Extra}, Reason}, State) -> + noreply(dead_letter_msg(Msg, Extra, Reason, State)). handle_info(maybe_expire, State) -> case is_unused(State) of diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index a80d656d32..72c00e3d01 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -97,9 +97,8 @@ behaviour_info(callbacks) -> %% Drop messages from the head of the queue while the supplied %% predicate returns true. A callback function is supplied %% allowing callers access to messages that are about to be - %% dropped; the callback may publish messages and requires the - %% next message sequence number, which must also be supplied. - {dropwhile, 4}, + %% dropped. + {dropwhile, 3}, %% Produce the next message. {fetch, 2}, @@ -107,10 +106,8 @@ behaviour_info(callbacks) -> %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as %% Acks. A callback function is supplied allowing callers to - %% access messages that are being acked; the callback may publish - %% messages and requires the next message sequence number, which - %% must also be supplied. - {ack, 4}, + %% access messages that are being acked. + {ack, 3}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c048d4a8ca..4b54d82171 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,8 +17,8 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/4, - requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/4, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/3, + requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3]). @@ -172,18 +172,18 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Pred, MsgFun, MsgSeqNo, +dropwhile(Pred, MsgFun, State = #state{gm = GM, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {MsgSeqNo1, BQS1} = BQ:dropwhile(Pred, MsgFun, MsgSeqNo, BQS), + BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), Dropped = Len - BQ:len(BQS1), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), - {MsgSeqNo1, State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 }}. + State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -236,18 +236,18 @@ fetch(AckRequired, State = #state { gm = GM, ack_msg_id = AM1 }} end. -ack(AckTags, MsgFun, MsgSeqNo, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> - {MsgIds, MsgSeqNo1, BQS1} = BQ:ack(AckTags, MsgFun, MsgSeqNo, BQS), +ack(AckTags, MsgFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS), AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; _ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds}) end, - {MsgIds, MsgSeqNo1, State #state { backing_queue_state = BQS1, - ack_msg_id = AM1 }}. + {MsgIds, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 }}. requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 7238b16968..a8c2006d14 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -417,7 +417,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> Acc end end, {gb_trees:empty(), MS}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State #state { msg_id_status = MS1 }. handle_process_result({ok, State}) -> noreply(State); @@ -649,7 +649,7 @@ maybe_enqueue_message( {ok, {confirmed, ChPid}} -> %% BQ has confirmed it but we didn't know what the %% msg_seq_no was at the time. We do now! - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { sender_queues = SQ1, msg_id_status = dict:erase(MsgId, MS) }; @@ -666,7 +666,7 @@ maybe_enqueue_message( msg_id_status = dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } @@ -728,7 +728,7 @@ process_instruction( {MQ2, sets:add_element(MsgId, PendingCh), dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), {MQ2, PendingCh, MS} end; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index dcfbcaffb8..53f5ebb59d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -28,6 +28,7 @@ -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). +-export([confirm_to_sender/2]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -370,6 +371,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +confirm_to_sender(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). + throw_on_error(E, Thunk) -> case Thunk() of {error, Reason} -> throw({E, Reason}); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d9a3666474..2f03b2fb0b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2331,13 +2331,12 @@ test_dropwhile(VQ0) -> fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages - {_, VQ2} = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, - dummy_msg_fun(), - dummy_msgseqno(), - VQ1), + VQ2 = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, + dummy_msg_fun(), + VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2351,19 +2350,17 @@ test_dropwhile(VQ0) -> VQ4. -dummy_msg_fun() -> fun(_Fun, MsgSeqNo, State) -> {MsgSeqNo, State} end. -dummy_msgseqno() -> 1. +dummy_msg_fun() -> fun(_Fun, _Extra, State) -> State end. test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - {_, VQ3} = rabbit_variable_queue:dropwhile( - fun(_) -> false end, dummy_msg_fun(), dummy_msgseqno(), VQ2), + VQ3 = rabbit_variable_queue:dropwhile( + fun(_) -> false end, dummy_msg_fun(), VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - {_, VQ6} = rabbit_variable_queue:dropwhile( - fun(_) -> false end, dummy_msg_fun(), dummy_msgseqno(), VQ5), - VQ6. + rabbit_variable_queue:dropwhile( + fun(_) -> false end, dummy_msg_fun(), VQ5). test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -2388,9 +2385,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_Guids, _, VQ9} = - rabbit_variable_queue:ack(AckTags, dummy_msg_fun(), - dummy_msgseqno(), VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, undefined, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2400,9 +2395,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_Guids, _, VQ3} = - rabbit_variable_queue:ack([AckTag], dummy_msg_fun(), - dummy_msgseqno(), VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], undefined, VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2436,10 +2429,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_Guids, _, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, - dummy_msg_fun(), - dummy_msgseqno(), - VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, + undefined, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 34a28afec3..811017d969 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/4, fetch/2, ack/4, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, @@ -581,19 +581,18 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, MsgFun, MsgSeqNo, State) -> +dropwhile(Pred, MsgFun, State) -> case queue_out(State) of {empty, State1} -> - {MsgSeqNo, a(State1)}; + a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of true -> - {MsgSeqNo1, State2} = - MsgFun(read_msg_callback(MsgStatus), MsgSeqNo, State1), + State2 = MsgFun(read_msg_callback(MsgStatus), undefined, State1), {_, State3} = internal_fetch(false, MsgStatus, State2), - dropwhile(Pred, MsgFun, MsgSeqNo1, State3); + dropwhile(Pred, MsgFun, State3); false -> - {MsgSeqNo, a(in_r(MsgStatus, State1))} + a(in_r(MsgStatus, State1)) end end. @@ -626,39 +625,42 @@ read_msg_callback1(MsgId, IsPersistent, msg_store_read(MSCState, IsPersistent, MsgId), {Msg, State #vqstate { msg_store_clients = MSCState1 }}. -ack([], _Fun, MsgSeqNo, State) -> - {[], MsgSeqNo, State}; +ack([], _Fun, State) -> + {[], State}; -ack(AckTags, MsgFun, MsgSeqNo, State) -> +ack(AckTags, undefined, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, - {MsgSeqNo2, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - persistent_count = PCount, - ack_out_counter = AckOutCount }}} = + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount, + ack_out_counter = AckOutCount }} = lists:foldl( - fun (SeqId, {Acc, {MsgSeqNo1, State2 = #vqstate{pending_ack = PA}}}) -> - AckEntry = gb_trees:get(SeqId, PA), + fun (SeqId, {Acc, State2}) -> {MsgStatus, State3} = remove_pending_ack(SeqId, State2), - {accumulate_ack(MsgStatus, Acc), - MsgFun(read_msg_callback(AckEntry), MsgSeqNo1, State3)} - end, {accumulate_ack_init(), {MsgSeqNo, State}}, AckTags), + {accumulate_ack(MsgStatus, Acc), State3} + end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( orddict:new(), MsgIdsByStore)), {lists:reverse(AllMsgIds), - MsgSeqNo2, a(State1 #vqstate { index_state = IndexState1, persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) })}. + ack_out_counter = AckOutCount + length(AckTags) })}; + +ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> + [begin + AckEntry = gb_trees:get(SeqId, PA), + MsgFun(read_msg_callback(AckEntry), SeqId, State) + end || SeqId <- AckTags], + {[], State}. requeue(AckTags, #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), fun publish_alpha/2, State), |
