summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl34
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_variable_queue.erl42
3 files changed, 50 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3df59de6b1..6eeb397c68 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -402,6 +402,11 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
+confirm_messages_internal(Guids, State) ->
+ lists:foldl(fun(Guid, State0) ->
+ confirm_message_internal(Guid, State0)
+ end, State, Guids).
+
confirm_message_internal(Guid, State = #q { guid_to_channel = GTC }) ->
case dict:find(Guid, GTC) of
{ok, {_ , undefined}} -> ok;
@@ -547,10 +552,8 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
case Fun(BQS) of
{BQS1, {confirm, Guids}} ->
- State1 = lists:foldl(fun (Guid, State0) ->
- confirm_message_internal(Guid, State0) end,
- State, Guids),
- State1 #q { backing_queue_state = BQS1};
+ confirm_messages_internal(Guids,
+ State #q { backing_queue_state = BQS1 });
BQS1 ->
run_message_queue(State#q{backing_queue_state = BQS1})
end.
@@ -841,14 +844,22 @@ handle_cast({ack, Txn, AckTags, ChPid},
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- {C1, BQS1} =
+ {C1, State1} =
case Txn of
- none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
- _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
+ none ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ NewC = C#cr{acktags = ChAckTags1},
+ {NewBQS, AckdGuids} = BQ:ack(AckTags, BQS),
+ NewState =
+ confirm_messages_internal(AckdGuids,
+ State #q { backing_queue_state = NewBQS }),
+ {NewC, NewState};
+ _ ->
+ {C#cr{txn = Txn},
+ State #q { backing_queue_state = BQ:tx_ack(Txn, AckTags, BQS) }}
end,
store_ch_record(C1),
- noreply(State#q{backing_queue_state = BQS1})
+ noreply(State1)
end;
handle_cast({reject, AckTags, Requeue, ChPid},
@@ -861,8 +872,9 @@ handle_cast({reject, AckTags, Requeue, ChPid},
store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> BQS1 = BQ:ack(AckTags, BQS),
- State #q { backing_queue_state = BQS1 }
+ false -> {BQS1, AckdGuids} = BQ:ack(AckTags, BQS),
+ confirm_messages_internal(AckdGuids,
+ State #q { backing_queue_state = BQS1 })
end)
end;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fcc3d92c7b..c8af7c1000 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1801,8 +1801,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false,
- {fun nop/1, fun nop/1, fun nop/1}),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1914,8 +1913,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- {fun nop/1, fun nop/1, fun nop/1}),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -1931,8 +1929,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- {fun nop/1, fun nop/1, fun nop/1}),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -1962,8 +1959,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true,
- {fun nop/1, fun nop/1, fun nop/1}),
+ VQ1 = rabbit_variable_queue:init(QName, true, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 448e8c93af..c33859ee59 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -758,6 +758,10 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
%% Minor helpers
%%----------------------------------------------------------------------------
+a({State, _} = I) ->
+ a(State),
+ I;
+
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
persistent_count = PersistentCount,
@@ -966,7 +970,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
IsPersistent1 = IsDurable andalso IsPersistent,
{SeqId, State3} = publish(Msg, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ end, {PAcks, element(1, ack(Acks, State))}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1117,7 +1121,7 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {State, []};
ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
@@ -1130,15 +1134,19 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- confirm_messages(Guids),
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
+ AckdGuids = lists:append(orddict:fold(fun (_, Guids, Acc) ->
+ [Guids || Acc]
+ end, [], GuidsByStore)),
+ State2 = msgs_confirmed(AckdGuids, State1),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
{ok, Guids} -> length(Guids)
end,
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
+ {State2 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 },
+ AckdGuids}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1153,20 +1161,13 @@ accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
-confirm_messages(Guids) ->
- Self = self(),
- spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self,
- fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD }) ->
- { State #vqstate {
- msgs_on_disk =
- gb_sets:difference(MOD, gb_sets:from_list(Guids)),
- msg_indices_on_disk =
- gb_sets:delete_any(MIOD, gb_sets:from_list(Guids)) },
- {confirm, Guids} }
- end)
- end).
+msgs_confirmed(Guids, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD }) ->
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:difference(MOD, gb_sets:from_list(Guids)),
+ msg_indices_on_disk =
+ gb_sets:delete_any(MIOD, gb_sets:from_list(Guids)) }.
msgs_written_to_disk(QPid, Guids) ->
spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
@@ -1242,6 +1243,9 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
end
end.
+reduce_memory_use({State, Other}) ->
+ {reduce_memory_use(State), Other};
+
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun limit_ram_index/2,