summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-31 10:35:46 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-31 10:35:46 +0100
commit4c04aeb19191abeed94e5de11605432311ba5a43 (patch)
tree74e55af418d1e153c2fad68a22d7011ed95ad471 /src
parent14c76021b6b58dbf973cb229cfa00e7ba23130d9 (diff)
downloadrabbitmq-server-git-4c04aeb19191abeed94e5de11605432311ba5a43.tar.gz
fixed message leak caused by confirms
I've added variable_queue:init/4. It calls variable_queue:init/3 and the fourth parameter is a tuple of 3 functions: the function to call when the indices of some guids hit the disk, the function to call when some guids hit the disk and the function to call when some guids have been ack'd by the consumer. Rabbit_test supplies nops rather than the usual casts, so the superfluous messages aren't generated.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl38
2 files changed, 35 insertions, 17 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d783c43e46..fcc3d92c7b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1801,7 +1801,8 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ {fun nop/1, fun nop/1, fun nop/1}),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1913,7 +1914,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ {fun nop/1, fun nop/1, fun nop/1}),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -1929,7 +1931,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ {fun nop/1, fun nop/1, fun nop/1}),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -1959,10 +1962,13 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true),
+ VQ1 = rabbit_variable_queue:init(QName, true, true,
+ {fun nop/1, fun nop/1, fun nop/1}),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
+
+nop(_) -> ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7163834579..0b0cec46ed 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
+-export([init/3, init/4, terminate/1, delete_and_terminate/1,
purge/1, publish/2, publish_delivered/3, fetch/2, ack/2,
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
@@ -236,7 +236,8 @@
ram_index_count,
out_counter,
in_counter,
- rates
+ rates,
+ confirm_functions
}).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -322,7 +323,8 @@
ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
- rates :: rates() }).
+ rates :: rates(),
+ confirm_functions :: {any(), any(), any()} }).
-include("rabbit_backing_queue_spec.hrl").
@@ -370,6 +372,19 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
+ init(QueueName, IsDurable, Recover,
+ { fun(Guids) -> %% index-on-disk fun
+ gen_server2:cast(Self,
+ {msg_indices_written_to_disk, Guids})
+ end,
+ fun (Guids) -> %% msg-on-disk fun
+ gen_server2:cast(Self, {msgs_written_to_disk, Guids})
+ end,
+ fun (Guids) -> %% ack-received fun
+ gen_server2:cast(Self, {confirm_messages, Guids})
+ end }).
+
+init(QueueName, IsDurable, Recover, {IndicesOnDisk, MsgsOnDisk, _} = CF) ->
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
QueueName, Recover,
@@ -377,9 +392,7 @@ init(QueueName, IsDurable, Recover) ->
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end,
- fun (Guids) ->
- gen_server2:cast(Self, {msg_indices_written_to_disk, Guids})
- end),
+ IndicesOnDisk),
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{PRef, TRef, Terms1} =
@@ -403,12 +416,9 @@ init(QueueName, IsDurable, Recover) ->
false -> undefined
end,
- Self = self(),
rabbit_msg_store:register_sync_callback(
?PERSISTENT_MSG_STORE,
- fun (Guids) ->
- gen_server2:cast(Self, {msgs_written_to_disk, Guids})
- end),
+ MsgsOnDisk),
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
@@ -440,7 +450,8 @@ init(QueueName, IsDurable, Recover) ->
ingress = {Now, DeltaCount1},
avg_egress = 0.0,
avg_ingress = 0.0,
- timestamp = Now } },
+ timestamp = Now },
+ confirm_functions = CF},
a(maybe_deltas_to_betas(State)).
terminate(State) ->
@@ -1115,7 +1126,8 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
State;
-ack(MsgStoreFun, Fun, AckTags, State) ->
+ack(MsgStoreFun, Fun, AckTags,
+ State = #vqstate { confirm_functions = {_, _, AcksReceived} }) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
@@ -1127,7 +1139,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = orddict:fold(fun (MsgStore, Guids, ok) ->
- gen_server2:cast(self(), {confirm_messages, Guids}),
+ AcksReceived(Guids),
MsgStoreFun(MsgStore, Guids)
end, ok, GuidsByStore),
PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of