summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_backing_queue_spec.hrl10
-rw-r--r--src/rabbit_amqqueue_process.erl101
-rw-r--r--src/rabbit_backing_queue.erl11
-rw-r--r--src/rabbit_mirror_queue_master.erl26
-rw-r--r--src/rabbit_mirror_queue_slave.erl8
-rw-r--r--src/rabbit_misc.erl4
-rw-r--r--src/rabbit_tests.erl39
-rw-r--r--src/rabbit_variable_queue.erl54
8 files changed, 130 insertions, 123 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 602f598ea7..1e870bb74b 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -48,14 +48,14 @@
rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
--spec(dropwhile/4 ::
+-spec(dropwhile/3 ::
(fun ((rabbit_types:message_properties()) -> boolean()),
- msg_fun(), non_neg_integer(), state())
- -> {non_neg_integer(), state()}).
+ msg_fun(), state())
+ -> state()).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/4 :: ([ack()], msg_fun(), non_neg_integer(), state()) ->
- {[rabbit_guid:guid()], non_neg_integer(), state()}).
+-spec(ack/3 :: ([ack()], msg_fun(), state()) ->
+ {[rabbit_guid:guid()], state()}).
-spec(requeue/2 :: ([ack()], state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 24bebc368d..f7756232cf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -51,6 +51,7 @@
ttl,
ttl_timer_ref,
publish_seqno,
+ unconfirmed,
dlx
}).
@@ -133,6 +134,7 @@ init(Q) ->
ttl = undefined,
dlx = undefined,
publish_seqno = 1,
+ unconfirmed = gb_trees:empty(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -156,6 +158,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
expiry_timer_ref = undefined,
ttl = undefined,
publish_seqno = 1,
+ unconfirmed = gb_trees:empty(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -474,12 +477,9 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun confirm_to_sender/2, CMs),
+ rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-confirm_to_sender(Pid, MsgSeqNos) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
-
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
never;
should_confirm_message(#delivery{sender = ChPid,
@@ -516,7 +516,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
case Confirm of
- immediately -> confirm_to_sender(ChPid, [MsgSeqNo]);
+ immediately -> rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]);
_ -> ok
end,
case BQ:is_duplicate(Message, BQS) of
@@ -692,17 +692,13 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ,
- publish_seqno = MsgSeqNo}) ->
+ backing_queue = BQ}) ->
Now = now_micros(),
- {MsgSeqNo1, BQS1} =
- BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- mk_dead_letter_fun(expired, State),
- MsgSeqNo,
- BQS),
- ensure_ttl_timer(State#q{backing_queue_state = BQS1,
- publish_seqno = MsgSeqNo1}).
+ BQS1 = BQ:dropwhile(
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ mk_dead_letter_fun(expired, State),
+ BQS),
+ ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
@@ -718,38 +714,40 @@ ensure_ttl_timer(State) ->
State.
mk_dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- fun(_MsgLookupFun, MsgSeqNo, BQS) -> {MsgSeqNo, BQS} end;
-mk_dead_letter_fun(Reason, State) ->
- fun(MsgLookupFun, MsgSeqNo, BQS) ->
+ fun(_MsgLookupFun, _Extra, BQS) -> BQS end;
+mk_dead_letter_fun(Reason, _State) ->
+ fun(MsgLookupFun, Extra, BQS) ->
{Msg, BQS1} = MsgLookupFun(BQS),
- MsgSeqNo1 = dead_letter_msg(Msg, Reason, MsgSeqNo, State),
- {MsgSeqNo1, BQS1}
+ gen_server2:cast(self(), {dead_letter, {Msg, Extra}, Reason}),
+ BQS1
end.
maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) ->
State;
maybe_dead_letter_queue(Reason, State = #q{
backing_queue_state = BQS,
- backing_queue = BQ,
- publish_seqno = MsgSeqNo}) ->
+ backing_queue = BQ}) ->
case BQ:fetch(false, BQS) of
{empty, BQS1} ->
State#q{backing_queue_state = BQS1};
{{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} ->
- MsgSeqNo1 = dead_letter_msg(Msg, Reason, MsgSeqNo, State),
- maybe_dead_letter_queue(Reason,
- State#q{backing_queue_state = BQS1,
- publish_seqno = MsgSeqNo1})
+ State1 = dead_letter_msg(Msg, undefined, Reason,
+ State#q{backing_queue_state = BQS1}),
+ maybe_dead_letter_queue(Reason, State1)
end.
-dead_letter_msg(Msg, Reason, MsgSeqNo, State = #q{dlx = DLX}) ->
+dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = Unconfirmed,
+ dlx = DLX}) ->
rabbit_exchange:lookup_or_die(DLX),
rabbit_basic:publish(
rabbit_basic:delivery(
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
MsgSeqNo)),
- MsgSeqNo+1.
+ State#q{publish_seqno = MsgSeqNo + 1,
+ unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra},
+ Unconfirmed)}.
make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content},
State) ->
@@ -1109,28 +1107,23 @@ handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- publish_seqno = MsgSeqNo}) ->
- Fun = fun(_, MsgSeqNo1, BQS0) -> {MsgSeqNo1, BQS0} end,
- {_Guids, MsgSeqNo1, BQS1} =
- BQ:ack(AckTags, Fun, MsgSeqNo, BQS),
- State1#q{backing_queue_state = BQS1,
- publish_seqno = MsgSeqNo1}
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} =
+ BQ:ack(AckTags, undefined, BQS),
+ State1#q{backing_queue_state = BQS1}
end));
handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
noreply(subtract_acks(
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- publish_seqno = MsgSeqNo}) ->
+ backing_queue_state = BQS}) ->
case Requeue of
true -> requeue_and_run(AckTags, State1);
false -> Fun = mk_dead_letter_fun(rejected, State),
- {_Guids, MsgSeqNo1, BQS1} =
- BQ:ack(AckTags, Fun, MsgSeqNo, BQS),
- State1#q{backing_queue_state = BQS1,
- publish_seqno = MsgSeqNo1}
+ {_Guids, BQS1} =
+ BQ:ack(AckTags, Fun, BQS),
+ State1#q{backing_queue_state = BQS1}
end
end));
@@ -1188,9 +1181,29 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
end,
noreply(State);
-handle_cast({confirm, MsgSeqNos, From}, State) ->
- rabbit_log:info("Got a confirm for ~p~n", [MsgSeqNos]),
- noreply(State).
+handle_cast({confirm, MsgSeqNos, _From}, State) ->
+ noreply(lists:foldl(
+ fun (MsgSeqNo,
+ State1 = #q{unconfirmed = Unconfirmed,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ Reason = gb_trees:get(MsgSeqNo, Unconfirmed),
+ case Reason of
+ {expired, _} ->
+ ok;
+ {rejected, AckTag} ->
+ BQ:ack([AckTag], undefined, BQS);
+ {queue_deleted, _} ->
+ ok;
+ {queue_purged, _} ->
+ ok
+ end,
+ State1#q{unconfirmed = gb_trees:delete(MsgSeqNo,
+ Unconfirmed)}
+ end, State, MsgSeqNos));
+
+handle_cast({dead_letter, {Msg, Extra}, Reason}, State) ->
+ noreply(dead_letter_msg(Msg, Extra, Reason, State)).
handle_info(maybe_expire, State) ->
case is_unused(State) of
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index a80d656d32..72c00e3d01 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -97,9 +97,8 @@ behaviour_info(callbacks) ->
%% Drop messages from the head of the queue while the supplied
%% predicate returns true. A callback function is supplied
%% allowing callers access to messages that are about to be
- %% dropped; the callback may publish messages and requires the
- %% next message sequence number, which must also be supplied.
- {dropwhile, 4},
+ %% dropped.
+ {dropwhile, 3},
%% Produce the next message.
{fetch, 2},
@@ -107,10 +106,8 @@ behaviour_info(callbacks) ->
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as
%% Acks. A callback function is supplied allowing callers to
- %% access messages that are being acked; the callback may publish
- %% messages and requires the next message sequence number, which
- %% must also be supplied.
- {ack, 4},
+ %% access messages that are being acked.
+ {ack, 3},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c048d4a8ca..4b54d82171 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,8 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/4,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/4,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/3,
+ requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3]).
@@ -172,18 +172,18 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Pred, MsgFun, MsgSeqNo,
+dropwhile(Pred, MsgFun,
State = #state{gm = GM,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {MsgSeqNo1, BQS1} = BQ:dropwhile(Pred, MsgFun, MsgSeqNo, BQS),
+ BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
Dropped = Len - BQ:len(BQS1),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
- {MsgSeqNo1, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 }}.
+ State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -236,18 +236,18 @@ fetch(AckRequired, State = #state { gm = GM,
ack_msg_id = AM1 }}
end.
-ack(AckTags, MsgFun, MsgSeqNo, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
- {MsgIds, MsgSeqNo1, BQS1} = BQ:ack(AckTags, MsgFun, MsgSeqNo, BQS),
+ack(AckTags, MsgFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS),
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds})
end,
- {MsgIds, MsgSeqNo1, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
+ {MsgIds, State #state { backing_queue_state = BQS1,
+ ack_msg_id = AM1 }}.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7238b16968..a8c2006d14 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -417,7 +417,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
Acc
end
end, {gb_trees:empty(), MS}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State #state { msg_id_status = MS1 }.
handle_process_result({ok, State}) -> noreply(State);
@@ -649,7 +649,7 @@ maybe_enqueue_message(
{ok, {confirmed, ChPid}} ->
%% BQ has confirmed it but we didn't know what the
%% msg_seq_no was at the time. We do now!
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { sender_queues = SQ1,
msg_id_status = dict:erase(MsgId, MS) };
@@ -666,7 +666,7 @@ maybe_enqueue_message(
msg_id_status =
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
@@ -728,7 +728,7 @@ process_instruction(
{MQ2, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
{MQ2, PendingCh, MS}
end;
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index dcfbcaffb8..53f5ebb59d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -28,6 +28,7 @@
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
+-export([confirm_to_sender/2]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -370,6 +371,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
end,
Mod]).
+confirm_to_sender(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+
throw_on_error(E, Thunk) ->
case Thunk() of
{error, Reason} -> throw({E, Reason});
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d9a3666474..2f03b2fb0b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2331,13 +2331,12 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- {_, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end,
- dummy_msg_fun(),
- dummy_msgseqno(),
- VQ1),
+ VQ2 = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end,
+ dummy_msg_fun(),
+ VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2351,19 +2350,17 @@ test_dropwhile(VQ0) ->
VQ4.
-dummy_msg_fun() -> fun(_Fun, MsgSeqNo, State) -> {MsgSeqNo, State} end.
-dummy_msgseqno() -> 1.
+dummy_msg_fun() -> fun(_Fun, _Extra, State) -> State end.
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {_, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, dummy_msg_fun(), dummy_msgseqno(), VQ2),
+ VQ3 = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, dummy_msg_fun(), VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {_, VQ6} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, dummy_msg_fun(), dummy_msgseqno(), VQ5),
- VQ6.
+ rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, dummy_msg_fun(), VQ5).
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -2388,9 +2385,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_Guids, _, VQ9} =
- rabbit_variable_queue:ack(AckTags, dummy_msg_fun(),
- dummy_msgseqno(), VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, undefined, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2400,9 +2395,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_Guids, _, VQ3} =
- rabbit_variable_queue:ack([AckTag], dummy_msg_fun(),
- dummy_msgseqno(), VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], undefined, VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2436,10 +2429,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_Guids, _, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
- dummy_msg_fun(),
- dummy_msgseqno(),
- VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
+ undefined, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 34a28afec3..811017d969 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/4, fetch/2, ack/4, requeue/2, len/1, is_empty/1,
+ dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@@ -581,19 +581,18 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, MsgFun, MsgSeqNo, State) ->
+dropwhile(Pred, MsgFun, State) ->
case queue_out(State) of
{empty, State1} ->
- {MsgSeqNo, a(State1)};
+ a(State1);
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
true ->
- {MsgSeqNo1, State2} =
- MsgFun(read_msg_callback(MsgStatus), MsgSeqNo, State1),
+ State2 = MsgFun(read_msg_callback(MsgStatus), undefined, State1),
{_, State3} = internal_fetch(false, MsgStatus, State2),
- dropwhile(Pred, MsgFun, MsgSeqNo1, State3);
+ dropwhile(Pred, MsgFun, State3);
false ->
- {MsgSeqNo, a(in_r(MsgStatus, State1))}
+ a(in_r(MsgStatus, State1))
end
end.
@@ -626,39 +625,42 @@ read_msg_callback1(MsgId, IsPersistent,
msg_store_read(MSCState, IsPersistent, MsgId),
{Msg, State #vqstate { msg_store_clients = MSCState1 }}.
-ack([], _Fun, MsgSeqNo, State) ->
- {[], MsgSeqNo, State};
+ack([], _Fun, State) ->
+ {[], State};
-ack(AckTags, MsgFun, MsgSeqNo, State) ->
+ack(AckTags, undefined, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
- {MsgSeqNo2,
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- persistent_count = PCount,
- ack_out_counter = AckOutCount }}} =
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
lists:foldl(
- fun (SeqId, {Acc, {MsgSeqNo1, State2 = #vqstate{pending_ack = PA}}}) ->
- AckEntry = gb_trees:get(SeqId, PA),
+ fun (SeqId, {Acc, State2}) ->
{MsgStatus, State3} = remove_pending_ack(SeqId, State2),
- {accumulate_ack(MsgStatus, Acc),
- MsgFun(read_msg_callback(AckEntry), MsgSeqNo1, State3)}
- end, {accumulate_ack_init(), {MsgSeqNo, State}}, AckTags),
+ {accumulate_ack(MsgStatus, Acc), State3}
+ end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
orddict:new(), MsgIdsByStore)),
{lists:reverse(AllMsgIds),
- MsgSeqNo2,
a(State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) })}.
+ ack_out_counter = AckOutCount + length(AckTags) })};
+
+ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) ->
+ [begin
+ AckEntry = gb_trees:get(SeqId, PA),
+ MsgFun(read_msg_callback(AckEntry), SeqId, State)
+ end || SeqId <- AckTags],
+ {[], State}.
requeue(AckTags, #vqstate { delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
fun publish_alpha/2, State),