diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_tests.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 38 |
2 files changed, 35 insertions, 17 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d783c43e46..fcc3d92c7b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1801,7 +1801,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + {fun nop/1, fun nop/1, fun nop/1}), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1913,7 +1914,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + {fun nop/1, fun nop/1, fun nop/1}), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -1929,7 +1931,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + {fun nop/1, fun nop/1, fun nop/1}), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -1959,10 +1962,13 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + {fun nop/1, fun nop/1, fun nop/1}), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. + +nop(_) -> ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7163834579..0b0cec46ed 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, +-export([init/3, init/4, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, @@ -236,7 +236,8 @@ ram_index_count, out_counter, in_counter, - rates + rates, + confirm_functions }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -322,7 +323,8 @@ ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), - rates :: rates() }). + rates :: rates(), + confirm_functions :: {any(), any(), any()} }). -include("rabbit_backing_queue_spec.hrl"). @@ -370,6 +372,19 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), + init(QueueName, IsDurable, Recover, + { fun(Guids) -> %% index-on-disk fun + gen_server2:cast(Self, + {msg_indices_written_to_disk, Guids}) + end, + fun (Guids) -> %% msg-on-disk fun + gen_server2:cast(Self, {msgs_written_to_disk, Guids}) + end, + fun (Guids) -> %% ack-received fun + gen_server2:cast(Self, {confirm_messages, Guids}) + end }). + +init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, @@ -377,9 +392,7 @@ init(QueueName, IsDurable, Recover) -> fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, - fun (Guids) -> - gen_server2:cast(Self, {msg_indices_written_to_disk, Guids}) - end), + IndicesOnDisk), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -403,12 +416,9 @@ init(QueueName, IsDurable, Recover) -> false -> undefined end, - Self = self(), rabbit_msg_store:register_sync_callback( ?PERSISTENT_MSG_STORE, - fun (Guids) -> - gen_server2:cast(Self, {msgs_written_to_disk, Guids}) - end), + MsgsOnDisk), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { @@ -440,7 +450,8 @@ init(QueueName, IsDurable, Recover) -> ingress = {Now, DeltaCount1}, avg_egress = 0.0, avg_ingress = 0.0, - timestamp = Now } }, + timestamp = Now }, + confirm_functions = CF}, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -1115,7 +1126,8 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> State; -ack(MsgStoreFun, Fun, AckTags, State) -> +ack(MsgStoreFun, Fun, AckTags, + State = #vqstate { confirm_functions = {_, _, AcksReceived} }) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( @@ -1127,7 +1139,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> - gen_server2:cast(self(), {confirm_messages, Guids}), + AcksReceived(Guids), MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of |
