diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-27 15:48:01 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-27 15:48:01 +0100 |
| commit | 961b38f9564e8c20e1182d398191d456f2bc8352 (patch) | |
| tree | 08df148704664231be4ef899f8ecccc3a1d1477b | |
| parent | 021b0a160979c069703a00d9da54f36510a94628 (diff) | |
| download | rabbitmq-server-git-961b38f9564e8c20e1182d398191d456f2bc8352.tar.gz | |
server tests pass
| -rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 18 |
3 files changed, 28 insertions, 13 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9190456254..695b44250f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -681,7 +681,7 @@ handle_cast({write, CRef, Guid}, cref_to_guids = case dict:find(CRef, CODC) of {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG); - error -> CRef + error -> CTG end})); #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a72656b73b..b814390048 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -75,7 +75,6 @@ all_tests() -> passed = maybe_run_cluster_dependent_tests(), passed. - maybe_run_cluster_dependent_tests() -> SecondaryNode = rabbit_misc:makenode("hare"), @@ -1608,6 +1607,9 @@ init_test_queue() -> test_queue(), true, false, fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) + end, + fun (_) -> + ok %% Sync! end). restart_test_queue(Qi) -> @@ -1790,7 +1792,8 @@ variable_queue_publish(IsPersistent, Count, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), VQN) + end}, <<>>), + false, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1810,7 +1813,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + fun nop/1, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1922,7 +1926,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -1938,7 +1943,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -1968,10 +1974,13 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) end), passed. + +nop(_) -> ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e3a0898992..9256b8ac89 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, +-export([init/5, init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, @@ -375,6 +375,15 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> + Self = self(), + init(QueueName, IsDurable, Recover, + fun (Guids) -> + msgs_written_to_disk(Self, Guids) + end, + fun msg_indices_written_to_disk/1). + +init(QueueName, IsDurable, Recover, + MsgOnDiskFun, MsgIdxOnDiskFun) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, @@ -382,7 +391,7 @@ init(QueueName, IsDurable, Recover) -> fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, - fun msg_indices_written_to_disk/1), + MsgIdxOnDiskFun), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -406,13 +415,10 @@ init(QueueName, IsDurable, Recover) -> false -> undefined end, - Self = self(), rabbit_msg_store:register_sync_callback( ?PERSISTENT_MSG_STORE, PRef, - fun (Guids) -> - msgs_written_to_disk(Self, Guids) - end), + MsgOnDiskFun), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { |
