summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-10-20 17:01:50 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-10-20 17:01:50 +0100
commit61f48f0867ca77de86a224db3ba1855497dc722b (patch)
treeb3ca7bb160b878065df7ae8b5a6c81698dfc7d23 /src
parente8dfee98ae3033453a4bb83baa464f1e822fdded (diff)
parent0128bf7cce3568e00753f42a516646eb04399c2d (diff)
downloadrabbitmq-server-git-61f48f0867ca77de86a224db3ba1855497dc722b.tar.gz
Crude merge-from-default to help examine this thing. Currently some tests fail due to q_p blowing up with failed assertions, I don't know if it was like that before.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl43
-rw-r--r--src/rabbit_amqqueue_process.erl89
-rw-r--r--src/rabbit_backing_queue.erl12
-rw-r--r--src/rabbit_mirror_queue_master.erl26
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_tests.erl22
-rw-r--r--src/rabbit_variable_queue.erl41
7 files changed, 181 insertions, 56 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b3e92b6918..108e57082b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -316,34 +316,49 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
Args, RequiredArgs, QueueName,
[<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
-check_declare_arguments(QueueName, Args) ->
- [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of
+check_declare_arguments(QueueName = #resource{virtual_host = VHostPath},
+ Args) ->
+ [case Fun(rabbit_misc:table_lookup(Args, Key), Args, VHostPath) of
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
"invalid arg '~s' for ~s: ~255p",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_integer_argument/2},
- {<<"x-message-ttl">>, fun check_integer_argument/2},
- {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]],
+ end ||
+ {Key, Fun} <-
+ [{<<"x-expires">>, fun check_integer_argument/3},
+ {<<"x-message-ttl">>, fun check_integer_argument/3},
+ {<<"x-ha-policy">>, fun check_ha_policy_argument/3},
+ {<<"x-dead-letter-exchange">>, fun check_exchange_argument/3}]],
ok.
-check_integer_argument(undefined, _Args) ->
+check_integer_argument(undefined, _Args, _VHostPath) ->
ok;
-check_integer_argument({Type, Val}, _Args) when Val > 0 ->
+check_integer_argument({Type, Val}, _Args, _VHostPath) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, Val}, _Args) ->
+check_integer_argument({_Type, Val}, _Args, _VHostPath) ->
{error, {value_zero_or_less, Val}}.
-check_ha_policy_argument(undefined, _Args) ->
+check_exchange_argument(undefined, _Args, _VHostPath) ->
ok;
-check_ha_policy_argument({longstr, <<"all">>}, _Args) ->
+check_exchange_argument({longstr, Val}, _Args, VHostPath) ->
+ case rabbit_exchange:lookup(rabbit_misc:r(VHostPath, exchange, Val)) of
+ {ok, _Exchange} -> ok;
+ {error, not_found} -> {error, {non_existent_exchange, Val}}
+ end;
+check_exchange_argument({Type, _Val}, _Args, _VHostPath) ->
+ {error, {unacceptable_type, Type}}.
+
+
+
+check_ha_policy_argument(undefined, _Args, _VHostPath) ->
+ ok;
+check_ha_policy_argument({longstr, <<"all">>}, _Args, _VHostPath) ->
ok;
-check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
+check_ha_policy_argument({longstr, <<"nodes">>}, Args, _VHostPath) ->
case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
undefined ->
{error, {require, 'x-ha-policy-params'}};
@@ -359,9 +374,9 @@ check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
{Type, _} ->
{error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
end;
-check_ha_policy_argument({longstr, Policy}, _Args) ->
+check_ha_policy_argument({longstr, Policy}, _Args, _VHostPath) ->
{error, {invalid_ha_policy, Policy}};
-check_ha_policy_argument({Type, _}, _Args) ->
+check_ha_policy_argument({Type, _}, _Args, _VHostPath) ->
{error, {unacceptable_type, Type}}.
list() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 46f6674b04..8b5e984a12 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,7 +49,8 @@
stats_timer,
msg_id_to_channel,
ttl,
- ttl_timer_ref
+ ttl_timer_ref,
+ dlx
}).
-record(consumer, {tag, ack_required}).
@@ -129,6 +130,7 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
+ dlx = undefined,
msg_id_to_channel = dict:new()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -165,17 +167,19 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(Reason, State = #q{q = #amqqueue{name = QName},
backing_queue = BQ}) ->
+ State1 = maybe_dead_letter_queue(queue_deleted, State),
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
+
rabbit_event:notify(
queue_deleted, [{pid, self()},
{name, QName}]),
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(qname(State)),
+ rabbit_amqqueue:internal_delete(qname(State1)),
BQS1
- end, State).
+ end, State1).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -218,12 +222,18 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
undefined -> State1
end
end, State, [{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-dead-letter-exchange">>,
+ fun init_dlx/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{
+ virtual_host = VHostPath}}}) ->
+ State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -688,6 +698,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ dead_letter_callback_fun(expired, State),
BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
@@ -704,6 +715,62 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+dead_letter_callback_fun(_Reason, #q{dlx = undefined}) ->
+ fun(_MsgFun, BQS) -> BQS end;
+dead_letter_callback_fun(Reason, State) ->
+ fun(MsgFun, BQS) ->
+ {Msg, BQS1} = MsgFun(BQS),
+ dead_letter_msg(Msg, Reason, State),
+ BQS1
+ end.
+
+maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) ->
+ State;
+maybe_dead_letter_queue(Reason, State = #q{
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ case BQ:fetch(false, BQS) of
+ {empty, BQS1} ->
+ State#q{backing_queue_state = BQS1};
+ {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} ->
+ dead_letter_msg(Msg, Reason, State),
+ maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1})
+ end.
+
+dead_letter_msg(Msg, Reason, State = #q{dlx = DLX}) ->
+ Exchange = rabbit_exchange:lookup_or_die(DLX),
+
+ rabbit_basic:publish(
+ rabbit_basic:delivery(
+ false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
+ undefined)),
+ ok.
+
+make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content},
+ State) ->
+
+ Content1 = #content{
+ properties = Props = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+
+ #resource{name = QName} = qname(State),
+
+ DeathHeaders = [{<<"x-death-reason">>, longstr,
+ list_to_binary(atom_to_list(Reason))},
+ {<<"x-death-queue">>, longstr, QName}],
+
+ Headers1 = case Headers of
+ undefined -> DeathHeaders;
+ _ -> Headers ++ DeathHeaders
+ end,
+ Content2 =
+ rabbit_binary_generator:clear_encoded_content(
+ Content1#content{properties = Props#'P_basic'{headers = Headers1}}),
+
+ Msg#basic_message{exchange_name = DLX, id = rabbit_guid:guid(),
+ content = Content2}.
+
+
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) ->
@@ -1014,10 +1081,11 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
{stop, normal, {ok, BQ:len(BQS)}, State}
end;
-handle_call(purge, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+handle_call(purge, _From, State = #q{backing_queue = BQ}) ->
+ State1 = #q{backing_queue_state = BQS} =
+ maybe_dead_letter_queue(queue_purged, State),
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ reply({ok, Count}, State1#q{backing_queue_state = BQS1});
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1037,7 +1105,8 @@ handle_cast({ack, AckTags, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ Fun = fun(_, BQS0) -> BQS0 end,
+ {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS),
State1#q{backing_queue_state = BQS1}
end));
@@ -1048,7 +1117,9 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
backing_queue_state = BQS}) ->
case Requeue of
true -> requeue_and_run(AckTags, State1);
- false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ false -> Fun = dead_letter_callback_fun(rejected,
+ State),
+ {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS),
State1#q{backing_queue_state = BQS1}
end
end));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 77278416e7..0952e73424 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -95,15 +95,19 @@ behaviour_info(callbacks) ->
{drain_confirmed, 1},
%% Drop messages from the head of the queue while the supplied
- %% predicate returns true.
- {dropwhile, 2},
+ %% predicate returns true. A callback function is supplied
+ %% allowing callers access to messages that are about to be
+ %% dropped.
+ {dropwhile, 3},
%% Produce the next message.
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as Acks.
- {ack, 2},
+ %% about. Must return 1 msg_id per Ack, in the same order as
+ %% Acks. A callback function is supplied allowing callers to
+ %% access messages that are being acked.
+ {ack, 3},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 328fe639f7..c4173ec600 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,8 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/3,
+ requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3]).
@@ -172,12 +172,12 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Fun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+dropwhile(Pred, DropFun, State = #state{gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Fun, BQS),
+ BQS1 = BQ:dropwhile(Pred, DropFun, BQS),
Dropped = Len - BQ:len(BQS1),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
@@ -235,15 +235,15 @@ fetch(AckRequired, State = #state { gm = GM,
ack_msg_id = AM1 }}
end.
-ack(AckTags, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
- {MsgIds, BQS1} = BQ:ack(AckTags, BQS),
+ack(AckTags, Fun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ {MsgIds, BQS1} = BQ:ack(AckTags, Fun, BQS),
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, MsgIds})
+ _ -> ok = gm:broadcast(GM, {ack, Fun, MsgIds})
end,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f423760a3d..52511c9637 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -818,12 +818,12 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
%% we must be shorter than the master
State
end};
-process_instruction({ack, MsgIds},
+process_instruction({ack, Fun, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
+ {MsgIds1, BQS1} = BQ:ack(AckTags, Fun, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3a4f6f84e2..b96934c6c0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2294,7 +2294,9 @@ test_dropwhile(VQ0) ->
VQ2 = rabbit_variable_queue:dropwhile(
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
- end, VQ1),
+ end,
+ dummy_msg_lookup_fun(),
+ VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2308,13 +2310,18 @@ test_dropwhile(VQ0) ->
VQ4.
+dummy_msg_lookup_fun() ->
+ fun(_Fun, State) -> State end.
+
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2),
+ VQ3 = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, dummy_msg_lookup_fun(), VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5).
+ rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, dummy_msg_lookup_fun(), VQ5).
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -2339,7 +2346,8 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags,
+ dummy_msg_lookup_fun(), VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2349,7 +2357,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag],
+ dummy_msg_lookup_fun(), VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2383,7 +2392,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
+ dummy_msg_lookup_fun(), VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 895fc388a7..c51640bab1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1,
+ dropwhile/3, fetch/2, ack/3, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@@ -581,14 +581,14 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, State) ->
+dropwhile(Pred, DropFun, State) ->
case queue_out(State) of
{empty, State1} ->
a(State1);
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
- true -> {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, State2);
+ true -> State2 = DropFun(read_msg_callback(MsgStatus), State1),
+ dropwhile(Pred, DropFun, State2);
false -> a(in_r(MsgStatus, State1))
end
end.
@@ -603,20 +603,45 @@ fetch(AckRequired, State) ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
{Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
{Res, a(State3)}
+
+ end.
+
+read_msg_callback(#msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent }) ->
+ fun(State) ->
+ read_msg_callback1(MsgId, IsPersistent, State)
+ end;
+read_msg_callback(#msg_status{ msg = Msg}) ->
+ fun(State) ->
+ {Msg, State}
+ end;
+read_msg_callback({IsPersistent, MsgId, _MsgProps}) ->
+ fun(State) ->
+ read_msg_callback1(MsgId, IsPersistent, State)
end.
-ack([], State) ->
+read_msg_callback1(MsgId, IsPersistent,
+ State = #vqstate{ msg_store_clients = MSCState }) ->
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, State #vqstate { msg_store_clients = MSCState1 }}.
+
+ack([], _Fun, State) ->
{[], State};
-ack(AckTags, State) ->
+
+ack(AckTags, Fun, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
ack_out_counter = AckOutCount }} =
lists:foldl(
- fun (SeqId, {Acc, State2}) ->
+ fun (SeqId, {Acc, State2 = #vqstate{pending_ack = PA}}) ->
+ AckEntry = gb_trees:get(SeqId, PA),
{MsgStatus, State3} = remove_pending_ack(SeqId, State2),
- {accumulate_ack(MsgStatus, Acc), State3}
+ {accumulate_ack(MsgStatus, Acc),
+ Fun(read_msg_callback(AckEntry), State3)}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)