diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-31 10:35:46 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-31 10:35:46 +0100 |
| commit | 4c04aeb19191abeed94e5de11605432311ba5a43 (patch) | |
| tree | 74e55af418d1e153c2fad68a22d7011ed95ad471 /src | |
| parent | 14c76021b6b58dbf973cb229cfa00e7ba23130d9 (diff) | |
| download | rabbitmq-server-git-4c04aeb19191abeed94e5de11605432311ba5a43.tar.gz | |
fixed message leak caused by confirms
I've added variable_queue:init/4. It calls variable_queue:init/3 and
the fourth parameter is a tuple of 3 functions: the function to call
when the indices of some guids hit the disk, the function to call when
some guids hit the disk and the function to call when some guids have
been ack'd by the consumer. Rabbit_test supplies nops rather than the
usual casts, so the superfluous messages aren't generated.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_tests.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 38 |
2 files changed, 35 insertions, 17 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d783c43e46..fcc3d92c7b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1801,7 +1801,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + {fun nop/1, fun nop/1, fun nop/1}), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1913,7 +1914,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + {fun nop/1, fun nop/1, fun nop/1}), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -1929,7 +1931,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + {fun nop/1, fun nop/1, fun nop/1}), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -1959,10 +1962,13 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + {fun nop/1, fun nop/1, fun nop/1}), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. + +nop(_) -> ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7163834579..0b0cec46ed 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, +-export([init/3, init/4, terminate/1, delete_and_terminate/1, purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, @@ -236,7 +236,8 @@ ram_index_count, out_counter, in_counter, - rates + rates, + confirm_functions }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -322,7 +323,8 @@ ram_index_count :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), - rates :: rates() }). + rates :: rates(), + confirm_functions :: {any(), any(), any()} }). -include("rabbit_backing_queue_spec.hrl"). @@ -370,6 +372,19 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), + init(QueueName, IsDurable, Recover, + { fun(Guids) -> %% index-on-disk fun + gen_server2:cast(Self, + {msg_indices_written_to_disk, Guids}) + end, + fun (Guids) -> %% msg-on-disk fun + gen_server2:cast(Self, {msgs_written_to_disk, Guids}) + end, + fun (Guids) -> %% ack-received fun + gen_server2:cast(Self, {confirm_messages, Guids}) + end }). + +init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, @@ -377,9 +392,7 @@ init(QueueName, IsDurable, Recover) -> fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, - fun (Guids) -> - gen_server2:cast(Self, {msg_indices_written_to_disk, Guids}) - end), + IndicesOnDisk), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -403,12 +416,9 @@ init(QueueName, IsDurable, Recover) -> false -> undefined end, - Self = self(), rabbit_msg_store:register_sync_callback( ?PERSISTENT_MSG_STORE, - fun (Guids) -> - gen_server2:cast(Self, {msgs_written_to_disk, Guids}) - end), + MsgsOnDisk), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { @@ -440,7 +450,8 @@ init(QueueName, IsDurable, Recover) -> ingress = {Now, DeltaCount1}, avg_egress = 0.0, avg_ingress = 0.0, - timestamp = Now } }, + timestamp = Now }, + confirm_functions = CF}, a(maybe_deltas_to_betas(State)). terminate(State) -> @@ -1115,7 +1126,8 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> State; -ack(MsgStoreFun, Fun, AckTags, State) -> +ack(MsgStoreFun, Fun, AckTags, + State = #vqstate { confirm_functions = {_, _, AcksReceived} }) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( @@ -1127,7 +1139,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> - gen_server2:cast(self(), {confirm_messages, Guids}), + AcksReceived(Guids), MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of |
