summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-21 19:37:04 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-21 19:37:04 +0000
commit5b283b20ef93224c2562440ecf53e985f180d1a3 (patch)
treedc97fd4d8abf75e0c0dcd0e3666f7d744442a564 /src
parent8cee1d4be1fac4c51a2047334b86afab4f7fd67a (diff)
downloadrabbitmq-server-git-5b283b20ef93224c2562440ecf53e985f180d1a3.tar.gz
removes form the msg_store cause confirms to be sent out immediately
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_tests.erl9
-rw-r--r--src/rabbit_variable_queue.erl11
3 files changed, 20 insertions, 10 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f0934d487d..f7fc291ff1 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -738,7 +738,8 @@ handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
fun (Guid, State2) -> remove_message(Guid, State2) end,
State, Guids),
- State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
+ State2 = client_confirm(CRef, gb_sets:from_list(Guids),
+ false, State1),
noreply(maybe_compact(State2));
handle_cast({release, Guids}, State =
@@ -875,7 +876,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
true -> file_handle_cache:sync(CurHdl)
end,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- %%[client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ %% [client_confirm(CRef, Guids, true, State1)
+ %% || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
@@ -1055,11 +1057,11 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-client_confirm(CRef, Guids,
+client_confirm(CRef, Guids, WaitForIndex,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids),
+ {ok, Fun} -> Fun(Guids, WaitForIndex),
CTG1 = case dict:find(CRef, CTG) of
{ok, Gs} ->
Guids1 = gb_sets:difference(Gs, Guids),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 057d651099..809b062359 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1854,7 +1854,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -2030,7 +2030,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2047,7 +2047,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2078,7 +2078,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2140,3 +2140,4 @@ test_configurable_server_properties() ->
passed.
nop(_) -> ok.
+nop(_, _) -> ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4879cf32b7..465b401c7d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -412,7 +412,9 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids, WaitForIndex) ->
+ msgs_written_to_disk(Self, Guids, WaitForIndex)
+ end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
@@ -1392,7 +1394,12 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msgs_confirmed(GuidSet, State) ->
{gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-msgs_written_to_disk(QPid, GuidSet) ->
+msgs_written_to_disk(QPid, GuidSet, false) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun (State) ->
+ msgs_confirmed(GuidSet, State)
+ end);
+msgs_written_to_disk(QPid, GuidSet, true) ->
%%io:format("variable queue notified of msgs written to disk: ~p~n",
%% [gb_sets:size(GuidSet)]),
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(