summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl38
2 files changed, 35 insertions, 17 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d783c43e46..fcc3d92c7b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1801,7 +1801,8 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ {fun nop/1, fun nop/1, fun nop/1}),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1913,7 +1914,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ {fun nop/1, fun nop/1, fun nop/1}),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -1929,7 +1931,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ {fun nop/1, fun nop/1, fun nop/1}),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -1959,10 +1962,13 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true),
+ VQ1 = rabbit_variable_queue:init(QName, true, true,
+ {fun nop/1, fun nop/1, fun nop/1}),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
+
+nop(_) -> ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7163834579..0b0cec46ed 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
+-export([init/3, init/4, terminate/1, delete_and_terminate/1,
purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
@@ -236,7 +236,8 @@
ram_index_count,
out_counter,
in_counter,
- rates
+ rates,
+ confirm_functions
}).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -322,7 +323,8 @@
ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
- rates :: rates() }).
+ rates :: rates(),
+ confirm_functions :: {any(), any(), any()} }).
-include("rabbit_backing_queue_spec.hrl").
@@ -370,6 +372,19 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
+ init(QueueName, IsDurable, Recover,
+ { fun(Guids) -> %% index-on-disk fun
+ gen_server2:cast(Self,
+ {msg_indices_written_to_disk, Guids})
+ end,
+ fun (Guids) -> %% msg-on-disk fun
+ gen_server2:cast(Self, {msgs_written_to_disk, Guids})
+ end,
+ fun (Guids) -> %% ack-received fun
+ gen_server2:cast(Self, {confirm_messages, Guids})
+ end }).
+
+init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) ->
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
QueueName, Recover,
@@ -377,9 +392,7 @@ init(QueueName, IsDurable, Recover) ->
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end,
- fun (Guids) ->
- gen_server2:cast(Self, {msg_indices_written_to_disk, Guids})
- end),
+ IndicesOnDisk),
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{PRef, TRef, Terms1} =
@@ -403,12 +416,9 @@ init(QueueName, IsDurable, Recover) ->
false -> undefined
end,
- Self = self(),
rabbit_msg_store:register_sync_callback(
?PERSISTENT_MSG_STORE,
- fun (Guids) ->
- gen_server2:cast(Self, {msgs_written_to_disk, Guids})
- end),
+ MsgsOnDisk),
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
@@ -440,7 +450,8 @@ init(QueueName, IsDurable, Recover) ->
ingress = {Now, DeltaCount1},
avg_egress = 0.0,
avg_ingress = 0.0,
- timestamp = Now } },
+ timestamp = Now },
+ confirm_functions = CF},
a(maybe_deltas_to_betas(State)).
terminate(State) ->
@@ -1115,7 +1126,8 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
State;
-ack(MsgStoreFun, Fun, AckTags, State) ->
+ack(MsgStoreFun, Fun, AckTags,
+ State = #vqstate { confirm_functions = {_, _, AcksReceived} }) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
@@ -1127,7 +1139,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- gen_server2:cast(self(), {confirm_messages, Guids}),
+ AcksReceived(Guids),
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of