summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-27 15:48:01 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-27 15:48:01 +0100
commit961b38f9564e8c20e1182d398191d456f2bc8352 (patch)
tree08df148704664231be4ef899f8ecccc3a1d1477b
parent021b0a160979c069703a00d9da54f36510a94628 (diff)
downloadrabbitmq-server-git-961b38f9564e8c20e1182d398191d456f2bc8352.tar.gz
server tests pass
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_tests.erl21
-rw-r--r--src/rabbit_variable_queue.erl18
3 files changed, 28 insertions, 13 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9190456254..695b44250f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -681,7 +681,7 @@ handle_cast({write, CRef, Guid},
cref_to_guids =
case dict:find(CRef, CODC) of
{ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG);
- error -> CRef
+ error -> CTG
end}));
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a72656b73b..b814390048 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -75,7 +75,6 @@ all_tests() ->
passed = maybe_run_cluster_dependent_tests(),
passed.
-
maybe_run_cluster_dependent_tests() ->
SecondaryNode = rabbit_misc:makenode("hare"),
@@ -1608,6 +1607,9 @@ init_test_queue() ->
test_queue(), true, false,
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
+ end,
+ fun (_) ->
+ ok %% Sync!
end).
restart_test_queue(Qi) ->
@@ -1790,7 +1792,8 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>), VQN)
+ end}, <<>>),
+ false, VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1810,7 +1813,8 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ fun nop/1, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1922,7 +1926,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -1938,7 +1943,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ fun nop/1, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -1968,10 +1974,13 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true),
+ VQ1 = rabbit_variable_queue:init(QName, true, true,
+ fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
+
+nop(_) -> ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e3a0898992..9256b8ac89 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
+-export([init/5, init/3, terminate/1, delete_and_terminate/1,
purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3,
requeue/2, len/1, is_empty/1,
@@ -375,6 +375,15 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
init(QueueName, IsDurable, Recover) ->
+ Self = self(),
+ init(QueueName, IsDurable, Recover,
+ fun (Guids) ->
+ msgs_written_to_disk(Self, Guids)
+ end,
+ fun msg_indices_written_to_disk/1).
+
+init(QueueName, IsDurable, Recover,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
QueueName, Recover,
@@ -382,7 +391,7 @@ init(QueueName, IsDurable, Recover) ->
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end,
- fun msg_indices_written_to_disk/1),
+ MsgIdxOnDiskFun),
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{PRef, TRef, Terms1} =
@@ -406,13 +415,10 @@ init(QueueName, IsDurable, Recover) ->
false -> undefined
end,
- Self = self(),
rabbit_msg_store:register_sync_callback(
?PERSISTENT_MSG_STORE,
PRef,
- fun (Guids) ->
- msgs_written_to_disk(Self, Guids)
- end),
+ MsgOnDiskFun),
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {