summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-07 12:06:57 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-07 12:06:57 +0000
commitffbe61e27e210ae6e0002418c27f014af9ba8689 (patch)
treee409b82b27e68499a9f5310dbb788b10eee06a30 /src
parent61f48f0867ca77de86a224db3ba1855497dc722b (diff)
parentc60fdf21357804931761fa154c803259dcdf20c6 (diff)
downloadrabbitmq-server-git-ffbe61e27e210ae6e0002418c27f014af9ba8689.tar.gz
merge default into bug20337
Diffstat (limited to 'src')
-rw-r--r--src/mirrored_supervisor_tests.erl2
-rw-r--r--src/rabbit.erl67
-rw-r--r--src/rabbit_alarm.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_backing_queue_qc.erl46
-rw-r--r--src/rabbit_control.erl48
-rw-r--r--src/rabbit_exchange.erl10
-rw-r--r--src/rabbit_memory_monitor.erl20
-rw-r--r--src/rabbit_mirror_queue_master.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_msg_store.erl263
-rw-r--r--src/rabbit_prelaunch.erl19
-rw-r--r--src/rabbit_tests.erl54
-rw-r--r--src/rabbit_variable_queue.erl99
15 files changed, 461 insertions, 220 deletions
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 5e782a08d3..0900f56fd8 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -202,7 +202,7 @@ with_sups(Fun, Sups) ->
Pids = [begin {ok, Pid} = start_sup(Sup), Pid end || Sup <- Sups],
Fun(Pids),
[kill(Pid) || Pid <- Pids, is_process_alive(Pid)],
- timer:sleep(100),
+ timer:sleep(500),
passed.
start_sup(Spec) ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e98ca9be33..0a2681a219 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,8 +18,8 @@
-behaviour(application).
--export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0,
- is_running/0 , is_running/1, environment/0,
+-export([maybe_hipe_compile/0, prepare/0, start/0, stop/0, stop_and_halt/0,
+ status/0, is_running/0, is_running/1, environment/0,
rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -177,6 +177,27 @@
-define(APPS, [os_mon, mnesia, rabbit]).
+%% see bug 24513 for how this list was created
+-define(HIPE_WORTHY,
+ [rabbit_reader, rabbit_channel, gen_server2,
+ rabbit_exchange, rabbit_command_assembler, rabbit_framing_amqp_0_9_1,
+ rabbit_basic, rabbit_event, lists, queue, priority_queue,
+ rabbit_router, rabbit_trace, rabbit_misc, rabbit_binary_parser,
+ rabbit_exchange_type_direct, rabbit_guid, rabbit_net,
+ rabbit_amqqueue_process, rabbit_variable_queue,
+ rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue,
+ sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees,
+ rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
+ rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
+ rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]).
+
+%% HiPE compilation uses multiple cores anyway, but some bits are
+%% IO-bound so we can go faster if we parallelise a bit more. In
+%% practice 2 processes seems just as fast as any other number > 1,
+%% and keeps the progress bar realistic-ish.
+-define(HIPE_PROCESSES, 2).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -185,6 +206,7 @@
%% this really should be an abstract type
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
+-spec(maybe_hipe_compile/0 :: () -> 'ok').
-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
@@ -218,12 +240,53 @@
%%----------------------------------------------------------------------------
+maybe_hipe_compile() ->
+ {ok, Want} = application:get_env(rabbit, hipe_compile),
+ Can = code:which(hipe) =/= non_existing,
+ case {Want, Can} of
+ {true, true} -> hipe_compile();
+ {true, false} -> io:format("Not HiPE compiling: HiPE not found in "
+ "this Erlang installation.~n");
+ {false, _} -> ok
+ end.
+
+hipe_compile() ->
+ Count = length(?HIPE_WORTHY),
+ io:format("HiPE compiling: |~s|~n |",
+ [string:copies("-", Count)]),
+ T1 = erlang:now(),
+ PidMRefs = [spawn_monitor(fun () -> [begin
+ {ok, M} = hipe:c(M, [o3]),
+ io:format("#")
+ end || M <- Ms]
+ end) ||
+ Ms <- split(?HIPE_WORTHY, ?HIPE_PROCESSES)],
+ [receive
+ {'DOWN', MRef, process, _, normal} -> ok;
+ {'DOWN', MRef, process, _, Reason} -> exit(Reason)
+ end || {_Pid, MRef} <- PidMRefs],
+ T2 = erlang:now(),
+ io:format("|~n~nCompiled ~B modules in ~Bs~n",
+ [Count, timer:now_diff(T2, T1) div 1000000]).
+
+split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
+
+split0([], Ls) -> Ls;
+split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]).
+
prepare() ->
ok = ensure_working_log_handlers(),
ok = rabbit_upgrade:maybe_upgrade_mnesia().
start() ->
try
+ %% prepare/1 ends up looking at the rabbit app's env, so it
+ %% needs to be loaded, but during the tests, it may end up
+ %% getting loaded twice, so guard against that
+ case application:load(rabbit) of
+ ok -> ok;
+ {error, {already_loaded, rabbit}} -> ok
+ end,
ok = prepare(),
ok = rabbit_misc:start_applications(application_load_order())
after
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index d38ecb91fe..fd03ca85b3 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -45,11 +45,7 @@
start() ->
ok = alarm_handler:add_alarm_handler(?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
- ok = case MemoryWatermark == 0 of
- true -> ok;
- false -> rabbit_sup:start_restartable_child(vm_memory_monitor,
- [MemoryWatermark])
- end,
+ rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
ok.
stop() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8b5e984a12..014c36bc53 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -131,7 +131,7 @@ init(Q) ->
expiry_timer_ref = undefined,
ttl = undefined,
dlx = undefined,
- msg_id_to_channel = dict:new()},
+ msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -464,11 +464,11 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
- case dict:find(MsgId, MTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
+ case gb_trees:lookup(MsgId, MTC0) of
+ {value, {ChPid, MsgSeqNo}} ->
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(MsgId, MTC0)};
- _ ->
+ gb_trees:delete(MsgId, MTC0)};
+ none ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
@@ -492,7 +492,7 @@ needs_confirming(_) -> false.
maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)};
+ State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)};
maybe_record_confirm_message(_Confirm, State) ->
State.
@@ -561,13 +561,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
- run_backing_queue(
- BQ, fun (M, BQS) ->
- {_MsgIds, BQS1} =
- M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
- BQS1
- end, State).
+requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+ run_backing_queue(BQ, fun (M, BQS) ->
+ {_MsgIds, BQS1} = M:requeue(AckTags, BQS),
+ BQS1
+ end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -680,11 +678,6 @@ discard_delivery(#delivery{sender = ChPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
-reset_msg_expiry_fun(TTL) ->
- fun(MsgProps) ->
- MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
- end.
-
message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 0952e73424..72c00e3d01 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -111,7 +111,7 @@ behaviour_info(callbacks) ->
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
- {requeue, 3},
+ {requeue, 2},
%% How long is my queue?
{len, 1},
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 095202dd11..c61184a601 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -34,14 +34,15 @@
-export([initial_state/0, command/1, precondition/2, postcondition/3,
next_state/3]).
--export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]).
+-export([prop_backing_queue_test/0, publish_multiple/1, timeout/2]).
-record(state, {bqstate,
len, %% int
next_seq_id, %% int
messages, %% gb_trees of seqid => {msg_props, basic_msg}
acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}]
- confirms}). %% set of msgid
+ confirms, %% set of msgid
+ publishing}).%% int
%% Initialise model
@@ -51,7 +52,8 @@ initial_state() ->
next_seq_id = 0,
messages = gb_trees:empty(),
acks = [],
- confirms = gb_sets:new()}.
+ confirms = gb_sets:new(),
+ publishing = 0}.
%% Property
@@ -112,10 +114,8 @@ qc_publish(#state{bqstate = BQ}) ->
expiry = oneof([undefined | lists:seq(1, 10)])},
self(), BQ]}.
-qc_publish_multiple(#state{bqstate = BQ}) ->
- {call, ?MODULE, publish_multiple,
- [qc_message(), #message_properties{}, BQ,
- resize(?QUEUE_MAXLEN, pos_integer())]}.
+qc_publish_multiple(#state{}) ->
+ {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
@@ -128,8 +128,7 @@ qc_ack(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
- {call, ?BQMOD, requeue,
- [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+ {call, ?BQMOD, requeue, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_set_ram_duration_target(#state{bqstate = BQ}) ->
{call, ?BQMOD, set_ram_duration_target,
@@ -155,6 +154,10 @@ qc_purge(#state{bqstate = BQ}) ->
%% Preconditions
+%% Create long queues by only allowing publishing
+precondition(#state{publishing = Count}, {call, _Mod, Fun, _Arg})
+ when Count > 0, Fun /= publish ->
+ false;
precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
when Fun =:= ack; Fun =:= requeue ->
length(Acks) > 0;
@@ -174,6 +177,7 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
#state{len = Len,
messages = Messages,
confirms = Confirms,
+ publishing = PublishCount,
next_seq_id = NextSeq} = S,
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
NeedsConfirm =
@@ -183,21 +187,15 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
len = Len + 1,
next_seq_id = NextSeq + 1,
messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages),
+ publishing = {call, erlang, max, [0, {call, erlang, '-',
+ [PublishCount, 1]}]},
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end};
-next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) ->
- #state{len = Len, messages = Messages} = S,
- {S1, Msgs1} = repeat({S, Messages},
- fun ({#state{next_seq_id = NextSeq} = State, Msgs}) ->
- {State #state { next_seq_id = NextSeq + 1},
- gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)}
- end, Count),
- S1#state{bqstate = BQ,
- len = Len + Count,
- messages = Msgs1};
+next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) ->
+ S#state{publishing = PublishCount};
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
@@ -245,7 +243,7 @@ next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
S#state{bqstate = BQ1,
acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
-next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
+next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _V]}) ->
#state{messages = Messages, acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
Messages1 = lists:foldl(fun (AckTag, Msgs) ->
@@ -322,12 +320,8 @@ postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
%% Helpers
-repeat(Result, _Fun, 0) -> Result;
-repeat(Result, Fun, Times) -> repeat(Fun(Result), Fun, Times - 1).
-
-publish_multiple(Msg, MsgProps, BQ, Count) ->
- repeat(BQ, fun(BQ1) -> ?BQMOD:publish(Msg, MsgProps, self(), BQ1) end,
- Count).
+publish_multiple(_C) ->
+ ok.
timeout(BQ, 0) ->
BQ;
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 905e4fd062..fa8dd262e1 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -20,6 +20,7 @@
-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
+-define(EXTERNAL_CHECK_INTERVAL, 1000).
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
@@ -161,9 +162,16 @@ usage() ->
%%----------------------------------------------------------------------------
-action(stop, Node, [], _Opts, Inform) ->
+action(stop, Node, Args, _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
- call(Node, {rabbit, stop_and_halt, []});
+ Res = call(Node, {rabbit, stop_and_halt, []}),
+ case {Res, Args} of
+ {ok, [PidFile]} -> wait_for_process_death(
+ read_pid_file(PidFile, false));
+ {ok, [_, _| _]} -> exit({badarg, Args});
+ _ -> ok
+ end,
+ Res;
action(stop_app, Node, [], _Opts, Inform) ->
Inform("Stopping node ~p", [Node]),
@@ -325,7 +333,10 @@ action(trace_off, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
- Frac = list_to_float(Arg),
+ Frac = list_to_float(case string:chr(Arg, $.) of
+ 0 -> Arg ++ ".0";
+ _ -> Arg
+ end),
Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]),
rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
@@ -362,7 +373,7 @@ action(report, Node, _Args, _Opts, Inform) ->
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Inform) ->
- Pid = wait_and_read_pid_file(PidFile),
+ Pid = read_pid_file(PidFile, true),
Inform("pid is ~s", [Pid]),
wait_for_application(Node, Pid).
@@ -370,18 +381,33 @@ wait_for_application(Node, Pid) ->
case process_up(Pid) of
true -> case rabbit:is_running(Node) of
true -> ok;
- false -> timer:sleep(1000),
+ false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
wait_for_application(Node, Pid)
end;
false -> {error, process_not_running}
end.
-wait_and_read_pid_file(PidFile) ->
- case file:read_file(PidFile) of
- {ok, Bin} -> string:strip(binary_to_list(Bin), right, $\n);
- {error, enoent} -> timer:sleep(500),
- wait_and_read_pid_file(PidFile);
- {error, _} = E -> exit({error, {could_not_read_pid, E}})
+wait_for_process_death(Pid) ->
+ case process_up(Pid) of
+ true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
+ wait_for_process_death(Pid);
+ false -> ok
+ end.
+
+read_pid_file(PidFile, Wait) ->
+ case {file:read_file(PidFile), Wait} of
+ {{ok, Bin}, _} ->
+ S = string:strip(binary_to_list(Bin), right, $\n),
+ try list_to_integer(S)
+ catch error:badarg ->
+ exit({error, {garbage_in_pid_file, PidFile}})
+ end,
+ S;
+ {{error, enoent}, true} ->
+ timer:sleep(?EXTERNAL_CHECK_INTERVAL),
+ read_pid_file(PidFile, Wait);
+ {{error, _} = E, _} ->
+ exit({error, {could_not_read_pid, E}})
end.
% Test using some OS clunkiness since we shouldn't trust
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index afa483557d..a15b9be490 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -257,6 +257,8 @@ route1(Delivery, {WorkList, SeenXs, QNames}) ->
DstNames))
end.
+process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
+ Results;
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
@@ -355,5 +357,9 @@ peek_serial(XName) ->
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- Module.
+ case get({xtype_to_module, T}) of
+ undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T),
+ put({xtype_to_module, T}, Module),
+ Module;
+ Module -> Module
+ end.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 3deb9580e9..02f3158f3b 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -211,17 +211,19 @@ internal_update(State = #state { queue_durations = Durations,
queue_duration_sum = Sum,
queue_duration_count = Count }) ->
MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(),
- MemoryRatio = erlang:memory(total) / MemoryLimit,
+ MemoryRatio = case MemoryLimit > 0.0 of
+ true -> erlang:memory(total) / MemoryLimit;
+ false -> infinity
+ end,
DesiredDurationAvg1 =
- case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of
- true ->
+ if MemoryRatio =:= infinity ->
+ 0.0;
+ MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 ->
infinity;
- false ->
- Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of
- true -> Sum + ?SUM_INC_AMOUNT;
- false -> Sum
- end,
- (Sum1 / Count) / MemoryRatio
+ MemoryRatio < ?SUM_INC_THRESHOLD ->
+ ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
+ true ->
+ (Sum / Count) / MemoryRatio
end,
State1 = State #state { desired_duration = DesiredDurationAvg1 },
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c4173ec600..9f12b954bd 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/3,
- requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
+ requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3]).
@@ -248,11 +248,11 @@ ack(AckTags, Fun, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
- ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}),
+requeue(AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 52511c9637..33d7da58fb 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -827,14 +827,14 @@ process_instruction({ack, Fun, MsgIds},
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
-process_instruction({requeue, MsgPropsFun, MsgIds},
+process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{ok, case length(AckTags) =:= length(MsgIds) of
true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 };
false ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d67c30a38d..e6a32b9023 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -68,6 +68,7 @@
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
cur_file_cache_ets, %% tid of current file cache table
+ flying_ets, %% tid of writes/removes in flight
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
%% to callbacks
@@ -86,7 +87,8 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- cur_file_cache_ets
+ cur_file_cache_ets,
+ flying_ets
}).
-record(file_summary,
@@ -128,12 +130,13 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- cur_file_cache_ets :: ets:tid()}).
+ cur_file_cache_ets :: ets:tid(),
+ flying_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
-type(maybe_msg_id_fun() ::
- 'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())).
+ 'undefined' | fun ((gb_set(), 'written' | 'ignored') -> any())).
-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
-type(deletion_thunk() :: fun (() -> boolean())).
@@ -375,6 +378,45 @@
%% performance with many healthy clients and few, if any, dying
%% clients, which is the typical case.
%%
+%% When the msg_store has a backlog (i.e. it has unprocessed messages
+%% in its mailbox / gen_server priority queue), a further optimisation
+%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
+%% from the same client for the same message. A typical occurrence of
+%% these is when an empty durable queue delivers persistent messages
+%% to ack'ing consumers. The queue will asynchronously ask the
+%% msg_store to 'write' such messages, and when they are acknowledged
+%% it will issue a 'remove'. That 'remove' may be issued before the
+%% msg_store has processed the 'write'. There is then no point going
+%% ahead with the processing of that 'write'.
+%%
+%% To detect this situation a 'flying_ets' table is shared between the
+%% clients and the server. The table is keyed on the combination of
+%% client (reference) and msg id, and the value represents an
+%% integration of all the writes and removes currently "in flight" for
+%% that message between the client and server - '+1' means all the
+%% writes/removes add up to a single 'write', '-1' to a 'remove', and
+%% '0' to nothing. (NB: the integration can never add up to more than
+%% one 'write' or 'read' since clients must not write/remove a message
+%% more than once without first removing/writing it).
+%%
+%% Maintaining this table poses two challenges: 1) both the clients
+%% and the server access and update the table, which causes
+%% concurrency issues, 2) we must ensure that entries do not stay in
+%% the table forever, since that would constitute a memory leak. We
+%% address the former by carefully modelling all operations as
+%% sequences of atomic actions that produce valid results in all
+%% possible interleavings. We address the latter by deleting table
+%% entries whenever the server finds a 0-valued entry during the
+%% processing of a write/remove. 0 is essentially equivalent to "no
+%% entry". If, OTOH, the value is non-zero we know there is at least
+%% one other 'write' or 'remove' in flight, so we get an opportunity
+%% later to delete the table entry when processing these.
+%%
+%% There are two further complications. We need to ensure that 1)
+%% eliminated writes still get confirmed, and 2) the write-back cache
+%% doesn't grow unbounded. These are quite straightforward to
+%% address. See the comments in the code.
+%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -392,7 +434,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -404,7 +446,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }.
+ cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -420,6 +463,7 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgId}).
@@ -440,6 +484,7 @@ read(MsgId,
contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
+ [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
set_maximum_since_use(Server, Age) ->
@@ -566,6 +611,21 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
end
end.
+client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
+ client_ref = CRef }) ->
+ Key = {MsgId, CRef},
+ case ets:insert_new(FlyingEts, {Key, Diff}) of
+ true -> ok;
+ false -> try ets:update_counter(FlyingEts, Key, {2, Diff})
+ catch error:badarg ->
+ %% this is guaranteed to succeed since the
+ %% server only removes and updates flying_ets
+ %% entries; it never inserts them
+ true = ets:insert_new(FlyingEts, {Key, Diff})
+ end,
+ ok
+ end.
+
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
@@ -619,6 +679,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
+ FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
@@ -645,6 +706,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
dying_clients = sets:new(),
clients = Clients,
successfully_recovered = CleanShutdown,
@@ -700,11 +762,13 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
- CurFileCacheEts}, State #msstate { clients = Clients1 });
+ CurFileCacheEts, FlyingEts},
+ State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -723,40 +787,54 @@ handle_cast({client_dying, CRef},
noreply(write_message(CRef, <<>>,
State #msstate { dying_clients = DyingClients1 }));
-handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
+handle_cast({client_delete, CRef},
+ State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, MsgId},
State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
- [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
- noreply(
- case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
- {write, State1} ->
- write_message(CRef, MsgId, Msg, State1);
- {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
- State1;
- {ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
- State1;
- {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, MsgId, State1);
- {confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
- update_pending_confirms(
- fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(gb_sets:singleton(MsgId), written),
- CTM
- end, CRef, State1)
- end);
+ case update_flying(-1, MsgId, CRef, State) of
+ process ->
+ [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
+ noreply(write_message(MsgId, Msg, CRef, State));
+ ignore ->
+ %% A 'remove' has already been issued and eliminated the
+ %% 'write'.
+ State1 = blind_confirm(CRef, gb_sets:singleton(MsgId),
+ ignored, State),
+ %% If all writes get eliminated, cur_file_cache_ets could
+ %% grow unbounded. To prevent that we delete the cache
+ %% entry here, but only if the message isn't in the
+ %% current file. That way reads of the message can
+ %% continue to be done client side, from either the cache
+ %% or the non-current files. If the message *is* in the
+ %% current file then the cache entry will be removed by
+ %% the normal logic for that in write_message/4 and
+ %% maybe_roll_to_new_file/2.
+ case index_lookup(MsgId, State1) of
+ [#msg_location { file = File }]
+ when File == State1 #msstate.current_file ->
+ ok;
+ _ ->
+ true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0})
+ end,
+ noreply(State1)
+ end;
handle_cast({remove, CRef, MsgIds}, State) ->
- State1 = lists:foldl(
- fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
- State, MsgIds),
- noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
- removed, State1)));
+ {RemovedMsgIds, State1} =
+ lists:foldl(
+ fun (MsgId, {Removed, State2}) ->
+ case update_flying(+1, MsgId, CRef, State2) of
+ process -> {[MsgId | Removed],
+ remove_message(MsgId, CRef, State2)};
+ ignore -> {Removed, State2}
+ end
+ end, {[], State}, MsgIds),
+ noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds),
+ ignored, State1)));
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
@@ -797,6 +875,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
clients = Clients,
dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
@@ -810,8 +889,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
end,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
- [true = ets:delete(T) ||
- T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
+ [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
+ CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -874,6 +953,19 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
client_confirm(CRef, MsgIds, written, StateN)
end, State1, CGs).
+update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
+ Key = {MsgId, CRef},
+ NDiff = -Diff,
+ case ets:lookup(FlyingEts, Key) of
+ [] -> ignore;
+ [{_, Diff}] -> ignore;
+ [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}),
+ true = ets:delete_object(FlyingEts, {Key, 0}),
+ process;
+ [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
+ ignore
+ end.
+
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
write_action({true, #msg_location { file = File }}, _MsgId, State) ->
@@ -905,8 +997,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, MsgId, Msg, State) ->
- write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
+write_message(MsgId, Msg, CRef,
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
+ {write, State1} ->
+ write_message(MsgId, Msg,
+ record_pending_confirm(CRef, MsgId, State1));
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, MsgId, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
+ end, CRef, State1)
+ end.
+
+remove_message(MsgId, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case should_mask_action(CRef, MsgId, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg whilst
+ %% it's being GC'd.
+ %%
+ %% ASSERTION: [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }}
+ when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec = fun () -> index_update_ref_count(
+ MsgId, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from cur_file_cache_ets here because
+ %% there may be further writes in the mailbox for the
+ %% same msg.
+ 1 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, MsgId, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(
+ File, -TotalSize, State))
+ end;
+ _ -> ok = Dec(),
+ State
+ end
+ end.
write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
@@ -1004,43 +1153,6 @@ contains_message(MsgId, From,
end
end.
-remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts }) ->
- case should_mask_action(CRef, MsgId, State) of
- {true, _Location} ->
- State;
- {false_if_increment, #msg_location { ref_count = 0 }} ->
- %% CRef has tried to both write and remove this msg
- %% whilst it's being GC'd. ASSERTION:
- %% [#file_summary { locked = true }] =
- %% ets:lookup(FileSummaryEts, File),
- State;
- {_Mask, #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize }} when RefCount > 0 ->
- %% only update field, otherwise bad interaction with
- %% concurrent GC
- Dec = fun () ->
- index_update_ref_count(MsgId, RefCount - 1, State)
- end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here
- %% because there may be further writes in the mailbox
- %% for the same msg.
- 1 -> case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
- add_to_pending_gc_completion(
- {remove, MsgId, CRef}, File, State);
- [#file_summary {}] ->
- ok = Dec(),
- delete_file_if_empty(
- File, adjust_valid_total_size(File, -TotalSize,
- State))
- end;
- _ -> ok = Dec(),
- State
- end
- end.
-
add_to_pending_gc_completion(
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
State #msstate { pending_gc_completion =
@@ -1120,6 +1232,11 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
end
end, CRef, State).
+blind_confirm(CRef, MsgIds, ActionTaken, State) ->
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end,
+ CRef, State).
+
%% Detect whether the MsgId is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d34ed44a8b..50444dc49d 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -22,6 +22,7 @@
-define(BaseApps, [rabbit]).
-define(ERROR_CODE, 1).
+-define(EPMD_TIMEOUT, 30000).
%%----------------------------------------------------------------------------
%% Specs
@@ -233,7 +234,8 @@ post_process_script(ScriptFile) ->
end.
process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
- [{apply,{rabbit,prepare,[]}}, Entry];
+ [{apply,{rabbit,maybe_hipe_compile,[]}},
+ {apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
@@ -244,7 +246,7 @@ duplicate_node_check([]) ->
duplicate_node_check(NodeStr) ->
Node = rabbit_misc:makenode(NodeStr),
{NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- case net_adm:names(NodeHost) of
+ case names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
true -> io:format("node with name ~p "
@@ -260,6 +262,7 @@ duplicate_node_check(NodeStr) ->
[NodeHost, EpmdReason,
case EpmdReason of
address -> "unable to establish tcp connection";
+ timeout -> "timed out establishing tcp connection";
_ -> inet:format_error(EpmdReason)
end])
end.
@@ -276,3 +279,15 @@ terminate(Status) ->
after infinity -> ok
end
end.
+
+names(Hostname) ->
+ Self = self(),
+ process_flag(trap_exit, true),
+ Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
+ timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
+ Res = receive
+ {names, Names} -> Names;
+ {'EXIT', Pid, Reason} -> {error, Reason}
+ end,
+ process_flag(trap_exit, false),
+ Res.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b96934c6c0..2d55d133a4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1778,7 +1778,7 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
test_msg_store() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
- {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
+ {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
Ref = rabbit_guid:guid(),
{Cap, MSCState} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref),
@@ -1789,6 +1789,8 @@ test_msg_store() ->
false = msg_store_contains(false, MsgIds, MSCState),
%% test confirm logic
passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState),
+ %% check we don't contain any of the msgs we're about to publish
+ false = msg_store_contains(false, MsgIds, MSCState),
%% publish the first half
ok = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
@@ -1896,12 +1898,12 @@ test_msg_store() ->
false = msg_store_contains(false, MsgIdsBig, MSCStateM),
MSCStateM
end),
+ %%
+ passed = test_msg_store_client_delete_and_terminate(),
%% restart empty
restart_msg_store_empty(),
passed.
-%% We want to test that writes that get eliminated due to removes still
-%% get confirmed. Removes themselves do not.
test_msg_store_confirms(MsgIds, Cap, MSCState) ->
%% write -> confirmed
ok = msg_store_write(MsgIds, MSCState),
@@ -1927,6 +1929,45 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) ->
ok = msg_store_write(MsgIds, MSCState),
ok = msg_store_remove(MsgIds, MSCState),
ok = on_disk_await(Cap, MsgIds),
+ %% confirmation on timer-based sync
+ passed = test_msg_store_confirm_timer(),
+ passed.
+
+test_msg_store_confirm_timer() ->
+ Ref = rabbit_guid:guid(),
+ MsgId = msg_id_bin(1),
+ Self = self(),
+ MSCState = rabbit_msg_store:client_init(
+ ?PERSISTENT_MSG_STORE, Ref,
+ fun (MsgIds, _ActionTaken) ->
+ case gb_sets:is_member(MsgId, MsgIds) of
+ true -> Self ! on_disk;
+ false -> ok
+ end
+ end, undefined),
+ ok = msg_store_write([MsgId], MSCState),
+ ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState),
+ ok = msg_store_remove([MsgId], MSCState),
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
+ passed.
+
+msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
+ receive
+ on_disk -> ok
+ after 0 ->
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ msg_store_keep_busy_until_confirm(MsgIds, MSCState)
+ end.
+
+test_msg_store_client_delete_and_terminate() ->
+ restart_msg_store_empty(),
+ MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
+ Ref = rabbit_guid:guid(),
+ MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
+ ok = msg_store_write(MsgIds, MSCState),
+ %% test the 'dying client' fast path for writes
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
passed.
queue_name(Name) ->
@@ -2238,11 +2279,10 @@ test_variable_queue_requeue(VQ0) ->
(_, Acc) ->
Acc
end, [], lists:zip(Acks, Seq)),
- {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset,
- fun(X) -> X end, VQ3),
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3),
VQ5 = lists:foldl(fun (AckTag, VQN) ->
{_MsgId, VQM} = rabbit_variable_queue:requeue(
- [AckTag], fun(X) -> X end, VQN),
+ [AckTag], VQN),
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
@@ -2436,7 +2476,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
{_Guids, VQ4} =
- rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
+ rabbit_variable_queue:requeue(AckTags, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c51640bab1..bf9450f1b1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/3, requeue/3, len/1, is_empty/1,
+ dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@@ -653,21 +653,19 @@ ack(AckTags, Fun, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
+requeue(AckTags, #vqstate { delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
beta_limit(Q3),
- fun publish_alpha/2,
- MsgPropsFun, State),
+ fun publish_alpha/2, State),
{SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
delta_limit(Delta),
- fun publish_beta/2,
- MsgPropsFun, State1),
+ fun publish_beta/2, State1),
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
- MsgPropsFun, State2),
+ State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
State3 #vqstate { delta = Delta1,
@@ -1317,7 +1315,7 @@ blind_confirm(Callback, MsgIdSet) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
-msgs_written_to_disk(Callback, MsgIdSet, removed) ->
+msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
blind_confirm(Callback, MsgIdSet);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
Callback(?MODULE,
@@ -1360,50 +1358,49 @@ publish_beta(MsgStatus, State) ->
ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) ->
+queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
- Limit, PubFun, MsgPropsFun, State).
+ Limit, PubFun, State).
queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
- Limit, PubFun, MsgPropsFun, State)
+ Limit, PubFun, State)
when Limit == undefined orelse SeqId < Limit ->
case ?QUEUE:out(Q) of
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
when SeqIdQ < SeqId ->
%% enqueue from the remaining queue
queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
- Limit, PubFun, MsgPropsFun, State);
+ Limit, PubFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
- {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
- State),
+ {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, MsgPropsFun, State2)
+ Limit, PubFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds,
- _Limit, _PubFun, _MsgPropsFun, State) ->
+ _Limit, _PubFun, State) ->
{SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
-delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
+delta_merge([], Delta, MsgIds, State) ->
{Delta, MsgIds, State};
-delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) ->
+delta_merge(SeqIds, Delta, MsgIds, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
{#msg_status { msg_id = MsgId } = MsgStatus, State1} =
- msg_from_pending_ack(SeqId, MsgPropsFun, State0),
+ msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
-msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
+msg_from_pending_ack(SeqId, State) ->
{#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
remove_pending_ack(SeqId, State),
{MsgStatus #msg_status {
- msg_props = (MsgPropsFun(MsgProps)) #message_properties {
- needs_confirming = false } }, State1}.
+ msg_props = MsgProps #message_properties { needs_confirming = false } },
+ State1}.
beta_limit(Q) ->
case ?QUEUE:peek(Q) of
@@ -1601,39 +1598,31 @@ maybe_deltas_to_betas(State = #vqstate {
end.
push_alphas_to_betas(Quota, State) ->
- {Quota1, State1} = maybe_push_q1_to_betas(Quota, State),
- {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1),
+ {Quota1, State1} =
+ push_alphas_to_betas(
+ fun ?QUEUE:out/1,
+ fun (MsgStatus, Q1a,
+ State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
+ State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
+ (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
+ State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
+ end, Quota, State #vqstate.q1, State),
+ {Quota2, State2} =
+ push_alphas_to_betas(
+ fun ?QUEUE:out_r/1,
+ fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
+ State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
+ end, Quota1, State1 #vqstate.q4, State1),
{Quota2, State2}.
-maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
- maybe_push_alphas_to_betas(
- fun ?QUEUE:out/1,
- fun (MsgStatus, Q1a,
- State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
- State1 #vqstate { q1 = Q1a,
- q3 = ?QUEUE:in(MsgStatus, Q3) };
- (MsgStatus, Q1a, State1 = #vqstate { q2 = Q2 }) ->
- State1 #vqstate { q1 = Q1a,
- q2 = ?QUEUE:in(MsgStatus, Q2) }
- end, Quota, Q1, State).
-
-maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
- maybe_push_alphas_to_betas(
- fun ?QUEUE:out_r/1,
- fun (MsgStatus, Q4a, State1 = #vqstate { q3 = Q3 }) ->
- State1 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3),
- q4 = Q4a }
- end, Quota, Q4, State).
-
-maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
- State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount })
+push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
TargetRamCount =:= infinity orelse
TargetRamCount >= RamMsgCount ->
{Quota, State};
-maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
+push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of
{empty, _Q} ->
{Quota, State};
@@ -1643,8 +1632,8 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
- maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
- Consumer(MsgStatus2, Qa, State2))
+ push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
+ Consumer(MsgStatus2, Qa, State2))
end.
push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,