diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-21 19:37:04 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-21 19:37:04 +0000 |
| commit | 5b283b20ef93224c2562440ecf53e985f180d1a3 (patch) | |
| tree | dc97fd4d8abf75e0c0dcd0e3666f7d744442a564 /src | |
| parent | 8cee1d4be1fac4c51a2047334b86afab4f7fd67a (diff) | |
| download | rabbitmq-server-git-5b283b20ef93224c2562440ecf53e985f180d1a3.tar.gz | |
removes form the msg_store cause confirms to be sent out immediately
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 11 |
3 files changed, 20 insertions, 10 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f0934d487d..f7fc291ff1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -738,7 +738,8 @@ handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( fun (Guid, State2) -> remove_message(Guid, State2) end, State, Guids), - State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), + State2 = client_confirm(CRef, gb_sets:from_list(Guids), + false, State1), noreply(maybe_compact(State2)); handle_cast({release, Guids}, State = @@ -875,7 +876,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, true -> file_handle_cache:sync(CurHdl) end, lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - %%[client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], + %% [client_confirm(CRef, Guids, true, State1) + %% || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. @@ -1055,11 +1057,11 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). -client_confirm(CRef, Guids, +client_confirm(CRef, Guids, WaitForIndex, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(Guids), + {ok, Fun} -> Fun(Guids, WaitForIndex), CTG1 = case dict:find(CRef, CTG) of {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 057d651099..809b062359 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1854,7 +1854,7 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), VQ = rabbit_variable_queue:init(test_queue(), true, false, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2030,7 +2030,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2047,7 +2047,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2078,7 +2078,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true, - fun nop/1, fun nop/1), + fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), @@ -2140,3 +2140,4 @@ test_configurable_server_properties() -> passed. nop(_) -> ok. +nop(_, _) -> ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4879cf32b7..465b401c7d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -412,7 +412,9 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), init(QueueName, IsDurable, Recover, - fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids, WaitForIndex) -> + msgs_written_to_disk(Self, Guids, WaitForIndex) + end, fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> @@ -1392,7 +1394,12 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msgs_confirmed(GuidSet, State) -> {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. -msgs_written_to_disk(QPid, GuidSet) -> +msgs_written_to_disk(QPid, GuidSet, false) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun (State) -> + msgs_confirmed(GuidSet, State) + end); +msgs_written_to_disk(QPid, GuidSet, true) -> %%io:format("variable queue notified of msgs written to disk: ~p~n", %% [gb_sets:size(GuidSet)]), rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( |
