summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_variable_queue.erl26
3 files changed, 20 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bc2ffd173c..b8b0cf8d1b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -870,11 +870,11 @@ handle_cast({ack, Txn, AckTags, ChPid},
none ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- {NewBQS, {confirm, AckdGuids}} = BQ:ack(AckTags, BQS),
- NewState =
- confirm_messages_internal(AckdGuids,
- State #q { backing_queue_state =
- NewBQS }),
+ AckdGuids = BQ:seqids_to_guids(AckTags, BQS),
+ NewBQS = BQ:ack(AckTags, BQS),
+ NewState = confirm_messages_internal(
+ AckdGuids,
+ State #q { backing_queue_state = NewBQS }),
{NewC, NewState};
_ ->
{C#cr{txn = Txn},
@@ -894,7 +894,8 @@ handle_cast({reject, AckTags, Requeue, ChPid},
store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> {BQS1, {confirm, AckdGuids}} = BQ:ack(AckTags, BQS),
+ false -> AckdGuids = BQ:seqids_to_guids(AckTags, BQS),
+ BQS1 = BQ:ack(AckTags, BQS),
confirm_messages_internal(
AckdGuids,
State #q { backing_queue_state = BQS1 })
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 3cf2ec4fcb..9190456254 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -638,9 +638,8 @@ handle_call({client_terminate, #client_msstate { client_ref = CRef }},
_From,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
- reply(ok,
- State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
- cref_to_guids = dict:erase(CRef, CTG) }).
+ reply(ok, State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
+ cref_to_guids = dict:erase(CRef, CTG) }).
handle_cast({write, CRef, Guid},
State = #msstate { current_file_handle = CurHdl,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3c0e14a056..efdf34f236 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -37,7 +37,7 @@
requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1]).
+ status/1, seqids_to_guids/2]).
-export([start/1, stop/0]).
@@ -766,14 +766,16 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
{avg_egress_rate , AvgEgressRate},
{avg_ingress_rate , AvgIngressRate} ].
+seqids_to_guids(SeqIds, #vqstate{ pending_ack = PA }) ->
+ lists:foldl(fun(SeqId, Guids) ->
+ {ok, #msg_status { msg = Msg }} = dict:find(SeqId, PA),
+ [Msg#basic_message.guid | Guids]
+ end, [], SeqIds).
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
-a({State, _} = I) ->
- a(State),
- I;
-
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
persistent_count = PersistentCount,
@@ -1140,7 +1142,7 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {State, {confirm, []}};
+ State;
ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
@@ -1155,16 +1157,13 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
- AckdGuids = lists:append([Guids ||
- {_Store, Guids} <- orddict:to_list(GuidsByStore)]),
- State2 = msgs_confirmed(AckdGuids, State1),
+ State2 = msgs_confirmed(seqids_to_guids(AckTags, State), State1),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
{ok, Guids} -> length(Guids)
end,
- {State2 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 },
- {confirm, AckdGuids}}.
+ State2 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1272,9 +1271,6 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
end
end.
-reduce_memory_use({State, Other}) ->
- {reduce_memory_use(State), Other};
-
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun limit_ram_index/2,