diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-07 12:06:57 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-07 12:06:57 +0000 |
| commit | ffbe61e27e210ae6e0002418c27f014af9ba8689 (patch) | |
| tree | e409b82b27e68499a9f5310dbb788b10eee06a30 /src | |
| parent | 61f48f0867ca77de86a224db3ba1855497dc722b (diff) | |
| parent | c60fdf21357804931761fa154c803259dcdf20c6 (diff) | |
| download | rabbitmq-server-git-ffbe61e27e210ae6e0002418c27f014af9ba8689.tar.gz | |
merge default into bug20337
Diffstat (limited to 'src')
| -rw-r--r-- | src/mirrored_supervisor_tests.erl | 2 | ||||
| -rw-r--r-- | src/rabbit.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 48 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 263 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 99 |
15 files changed, 461 insertions, 220 deletions
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 5e782a08d3..0900f56fd8 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -202,7 +202,7 @@ with_sups(Fun, Sups) -> Pids = [begin {ok, Pid} = start_sup(Sup), Pid end || Sup <- Sups], Fun(Pids), [kill(Pid) || Pid <- Pids, is_process_alive(Pid)], - timer:sleep(100), + timer:sleep(500), passed. start_sup(Spec) -> diff --git a/src/rabbit.erl b/src/rabbit.erl index e98ca9be33..0a2681a219 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -18,8 +18,8 @@ -behaviour(application). --export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, - is_running/0 , is_running/1, environment/0, +-export([maybe_hipe_compile/0, prepare/0, start/0, stop/0, stop_and_halt/0, + status/0, is_running/0, is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]). -export([start/2, stop/1]). @@ -177,6 +177,27 @@ -define(APPS, [os_mon, mnesia, rabbit]). +%% see bug 24513 for how this list was created +-define(HIPE_WORTHY, + [rabbit_reader, rabbit_channel, gen_server2, + rabbit_exchange, rabbit_command_assembler, rabbit_framing_amqp_0_9_1, + rabbit_basic, rabbit_event, lists, queue, priority_queue, + rabbit_router, rabbit_trace, rabbit_misc, rabbit_binary_parser, + rabbit_exchange_type_direct, rabbit_guid, rabbit_net, + rabbit_amqqueue_process, rabbit_variable_queue, + rabbit_binary_generator, rabbit_writer, delegate, gb_sets, lqueue, + sets, orddict, rabbit_amqqueue, rabbit_limiter, gb_trees, + rabbit_queue_index, gen, dict, ordsets, file_handle_cache, + rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, + rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]). + +%% HiPE compilation uses multiple cores anyway, but some bits are +%% IO-bound so we can go faster if we parallelise a bit more. In +%% practice 2 processes seems just as fast as any other number > 1, +%% and keeps the progress bar realistic-ish. +-define(HIPE_PROCESSES, 2). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -185,6 +206,7 @@ %% this really should be an abstract type -type(log_location() :: 'tty' | 'undefined' | file:filename()). +-spec(maybe_hipe_compile/0 :: () -> 'ok'). -spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -218,12 +240,53 @@ %%---------------------------------------------------------------------------- +maybe_hipe_compile() -> + {ok, Want} = application:get_env(rabbit, hipe_compile), + Can = code:which(hipe) =/= non_existing, + case {Want, Can} of + {true, true} -> hipe_compile(); + {true, false} -> io:format("Not HiPE compiling: HiPE not found in " + "this Erlang installation.~n"); + {false, _} -> ok + end. + +hipe_compile() -> + Count = length(?HIPE_WORTHY), + io:format("HiPE compiling: |~s|~n |", + [string:copies("-", Count)]), + T1 = erlang:now(), + PidMRefs = [spawn_monitor(fun () -> [begin + {ok, M} = hipe:c(M, [o3]), + io:format("#") + end || M <- Ms] + end) || + Ms <- split(?HIPE_WORTHY, ?HIPE_PROCESSES)], + [receive + {'DOWN', MRef, process, _, normal} -> ok; + {'DOWN', MRef, process, _, Reason} -> exit(Reason) + end || {_Pid, MRef} <- PidMRefs], + T2 = erlang:now(), + io:format("|~n~nCompiled ~B modules in ~Bs~n", + [Count, timer:now_diff(T2, T1) div 1000000]). + +split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]). + +split0([], Ls) -> Ls; +split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]). + prepare() -> ok = ensure_working_log_handlers(), ok = rabbit_upgrade:maybe_upgrade_mnesia(). start() -> try + %% prepare/1 ends up looking at the rabbit app's env, so it + %% needs to be loaded, but during the tests, it may end up + %% getting loaded twice, so guard against that + case application:load(rabbit) of + ok -> ok; + {error, {already_loaded, rabbit}} -> ok + end, ok = prepare(), ok = rabbit_misc:start_applications(application_load_order()) after diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index d38ecb91fe..fd03ca85b3 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -45,11 +45,7 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), - ok = case MemoryWatermark == 0 of - true -> ok; - false -> rabbit_sup:start_restartable_child(vm_memory_monitor, - [MemoryWatermark]) - end, + rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]), ok. stop() -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8b5e984a12..014c36bc53 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -131,7 +131,7 @@ init(Q) -> expiry_timer_ref = undefined, ttl = undefined, dlx = undefined, - msg_id_to_channel = dict:new()}, + msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -464,11 +464,11 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> - case dict:find(MsgId, MTC0) of - {ok, {ChPid, MsgSeqNo}} -> + case gb_trees:lookup(MsgId, MTC0) of + {value, {ChPid, MsgSeqNo}} -> {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(MsgId, MTC0)}; - _ -> + gb_trees:delete(MsgId, MTC0)}; + none -> {CMs, MTC0} end end, {gb_trees:empty(), MTC}, MsgIds), @@ -492,7 +492,7 @@ needs_confirming(_) -> false. maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> - State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}; + State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)}; maybe_record_confirm_message(_Confirm, State) -> State. @@ -561,13 +561,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> - run_backing_queue( - BQ, fun (M, BQS) -> - {_MsgIds, BQS1} = - M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), - BQS1 - end, State). +requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + run_backing_queue(BQ, fun (M, BQS) -> + {_MsgIds, BQS1} = M:requeue(AckTags, BQS), + BQS1 + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -680,11 +678,6 @@ discard_delivery(#delivery{sender = ChPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. -reset_msg_expiry_fun(TTL) -> - fun(MsgProps) -> - MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} - end. - message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 0952e73424..72c00e3d01 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -111,7 +111,7 @@ behaviour_info(callbacks) -> %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 3}, + {requeue, 2}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 095202dd11..c61184a601 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -34,14 +34,15 @@ -export([initial_state/0, command/1, precondition/2, postcondition/3, next_state/3]). --export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]). +-export([prop_backing_queue_test/0, publish_multiple/1, timeout/2]). -record(state, {bqstate, len, %% int next_seq_id, %% int messages, %% gb_trees of seqid => {msg_props, basic_msg} acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}] - confirms}). %% set of msgid + confirms, %% set of msgid + publishing}).%% int %% Initialise model @@ -51,7 +52,8 @@ initial_state() -> next_seq_id = 0, messages = gb_trees:empty(), acks = [], - confirms = gb_sets:new()}. + confirms = gb_sets:new(), + publishing = 0}. %% Property @@ -112,10 +114,8 @@ qc_publish(#state{bqstate = BQ}) -> expiry = oneof([undefined | lists:seq(1, 10)])}, self(), BQ]}. -qc_publish_multiple(#state{bqstate = BQ}) -> - {call, ?MODULE, publish_multiple, - [qc_message(), #message_properties{}, BQ, - resize(?QUEUE_MAXLEN, pos_integer())]}. +qc_publish_multiple(#state{}) -> + {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}. qc_publish_delivered(#state{bqstate = BQ}) -> {call, ?BQMOD, publish_delivered, @@ -128,8 +128,7 @@ qc_ack(#state{bqstate = BQ, acks = Acks}) -> {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. qc_requeue(#state{bqstate = BQ, acks = Acks}) -> - {call, ?BQMOD, requeue, - [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. + {call, ?BQMOD, requeue, [rand_choice(proplists:get_keys(Acks)), BQ]}. qc_set_ram_duration_target(#state{bqstate = BQ}) -> {call, ?BQMOD, set_ram_duration_target, @@ -155,6 +154,10 @@ qc_purge(#state{bqstate = BQ}) -> %% Preconditions +%% Create long queues by only allowing publishing +precondition(#state{publishing = Count}, {call, _Mod, Fun, _Arg}) + when Count > 0, Fun /= publish -> + false; precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) when Fun =:= ack; Fun =:= requeue -> length(Acks) > 0; @@ -174,6 +177,7 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> #state{len = Len, messages = Messages, confirms = Confirms, + publishing = PublishCount, next_seq_id = NextSeq} = S, MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, NeedsConfirm = @@ -183,21 +187,15 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> len = Len + 1, next_seq_id = NextSeq + 1, messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages), + publishing = {call, erlang, max, [0, {call, erlang, '-', + [PublishCount, 1]}]}, confirms = case eval(NeedsConfirm) of true -> gb_sets:add(MsgId, Confirms); _ -> Confirms end}; -next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) -> - #state{len = Len, messages = Messages} = S, - {S1, Msgs1} = repeat({S, Messages}, - fun ({#state{next_seq_id = NextSeq} = State, Msgs}) -> - {State #state { next_seq_id = NextSeq + 1}, - gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)} - end, Count), - S1#state{bqstate = BQ, - len = Len + Count, - messages = Msgs1}; +next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) -> + S#state{publishing = PublishCount}; next_state(S, Res, {call, ?BQMOD, publish_delivered, @@ -245,7 +243,7 @@ next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> S#state{bqstate = BQ1, acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; -next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) -> +next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _V]}) -> #state{messages = Messages, acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, Messages1 = lists:foldl(fun (AckTag, Msgs) -> @@ -322,12 +320,8 @@ postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> %% Helpers -repeat(Result, _Fun, 0) -> Result; -repeat(Result, Fun, Times) -> repeat(Fun(Result), Fun, Times - 1). - -publish_multiple(Msg, MsgProps, BQ, Count) -> - repeat(BQ, fun(BQ1) -> ?BQMOD:publish(Msg, MsgProps, self(), BQ1) end, - Count). +publish_multiple(_C) -> + ok. timeout(BQ, 0) -> BQ; diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 905e4fd062..fa8dd262e1 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -20,6 +20,7 @@ -export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). +-define(EXTERNAL_CHECK_INTERVAL, 1000). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -161,9 +162,16 @@ usage() -> %%---------------------------------------------------------------------------- -action(stop, Node, [], _Opts, Inform) -> +action(stop, Node, Args, _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), - call(Node, {rabbit, stop_and_halt, []}); + Res = call(Node, {rabbit, stop_and_halt, []}), + case {Res, Args} of + {ok, [PidFile]} -> wait_for_process_death( + read_pid_file(PidFile, false)); + {ok, [_, _| _]} -> exit({badarg, Args}); + _ -> ok + end, + Res; action(stop_app, Node, [], _Opts, Inform) -> Inform("Stopping node ~p", [Node]), @@ -325,7 +333,10 @@ action(trace_off, Node, [], Opts, Inform) -> rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> - Frac = list_to_float(Arg), + Frac = list_to_float(case string:chr(Arg, $.) of + 0 -> Arg ++ ".0"; + _ -> Arg + end), Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]), rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]); @@ -362,7 +373,7 @@ action(report, Node, _Args, _Opts, Inform) -> %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Inform) -> - Pid = wait_and_read_pid_file(PidFile), + Pid = read_pid_file(PidFile, true), Inform("pid is ~s", [Pid]), wait_for_application(Node, Pid). @@ -370,18 +381,33 @@ wait_for_application(Node, Pid) -> case process_up(Pid) of true -> case rabbit:is_running(Node) of true -> ok; - false -> timer:sleep(1000), + false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), wait_for_application(Node, Pid) end; false -> {error, process_not_running} end. -wait_and_read_pid_file(PidFile) -> - case file:read_file(PidFile) of - {ok, Bin} -> string:strip(binary_to_list(Bin), right, $\n); - {error, enoent} -> timer:sleep(500), - wait_and_read_pid_file(PidFile); - {error, _} = E -> exit({error, {could_not_read_pid, E}}) +wait_for_process_death(Pid) -> + case process_up(Pid) of + true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), + wait_for_process_death(Pid); + false -> ok + end. + +read_pid_file(PidFile, Wait) -> + case {file:read_file(PidFile), Wait} of + {{ok, Bin}, _} -> + S = string:strip(binary_to_list(Bin), right, $\n), + try list_to_integer(S) + catch error:badarg -> + exit({error, {garbage_in_pid_file, PidFile}}) + end, + S; + {{error, enoent}, true} -> + timer:sleep(?EXTERNAL_CHECK_INTERVAL), + read_pid_file(PidFile, Wait); + {{error, _} = E, _} -> + exit({error, {could_not_read_pid, E}}) end. % Test using some OS clunkiness since we shouldn't trust diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index afa483557d..a15b9be490 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -257,6 +257,8 @@ route1(Delivery, {WorkList, SeenXs, QNames}) -> DstNames)) end. +process_alternate(#exchange{arguments = []}, Results) -> %% optimisation + Results; process_alternate(#exchange{name = XName, arguments = Args}, []) -> case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of undefined -> []; @@ -355,5 +357,9 @@ peek_serial(XName) -> %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, Module} = rabbit_registry:lookup_module(exchange, T), - Module. + case get({xtype_to_module, T}) of + undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T), + put({xtype_to_module, T}, Module), + Module; + Module -> Module + end. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 3deb9580e9..02f3158f3b 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -211,17 +211,19 @@ internal_update(State = #state { queue_durations = Durations, queue_duration_sum = Sum, queue_duration_count = Count }) -> MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(), - MemoryRatio = erlang:memory(total) / MemoryLimit, + MemoryRatio = case MemoryLimit > 0.0 of + true -> erlang:memory(total) / MemoryLimit; + false -> infinity + end, DesiredDurationAvg1 = - case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of - true -> + if MemoryRatio =:= infinity -> + 0.0; + MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 -> infinity; - false -> - Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of - true -> Sum + ?SUM_INC_AMOUNT; - false -> Sum - end, - (Sum1 / Count) / MemoryRatio + MemoryRatio < ?SUM_INC_THRESHOLD -> + ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; + true -> + (Sum / Count) / MemoryRatio end, State1 = State #state { desired_duration = DesiredDurationAvg1 }, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c4173ec600..9f12b954bd 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/3, - requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, + requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3]). @@ -248,11 +248,11 @@ ack(AckTags, Fun, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -requeue(AckTags, MsgPropsFun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), - ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}), +requeue(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 52511c9637..33d7da58fb 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -827,14 +827,14 @@ process_instruction({ack, Fun, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; -process_instruction({requeue, MsgPropsFun, MsgIds}, +process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {ok, case length(AckTags) =:= length(MsgIds) of true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }; false -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index d67c30a38d..e6a32b9023 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -68,6 +68,7 @@ file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table cur_file_cache_ets, %% tid of current file cache table + flying_ets, %% tid of writes/removes in flight dying_clients, %% set of dying clients clients, %% map of references of all registered clients %% to callbacks @@ -86,7 +87,8 @@ gc_pid, file_handles_ets, file_summary_ets, - cur_file_cache_ets + cur_file_cache_ets, + flying_ets }). -record(file_summary, @@ -128,12 +130,13 @@ gc_pid :: pid(), file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid()}). + cur_file_cache_ets :: ets:tid(), + flying_ets :: ets:tid()}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). -type(maybe_msg_id_fun() :: - 'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())). + 'undefined' | fun ((gb_set(), 'written' | 'ignored') -> any())). -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). -type(deletion_thunk() :: fun (() -> boolean())). @@ -375,6 +378,45 @@ %% performance with many healthy clients and few, if any, dying %% clients, which is the typical case. %% +%% When the msg_store has a backlog (i.e. it has unprocessed messages +%% in its mailbox / gen_server priority queue), a further optimisation +%% opportunity arises: we can eliminate pairs of 'write' and 'remove' +%% from the same client for the same message. A typical occurrence of +%% these is when an empty durable queue delivers persistent messages +%% to ack'ing consumers. The queue will asynchronously ask the +%% msg_store to 'write' such messages, and when they are acknowledged +%% it will issue a 'remove'. That 'remove' may be issued before the +%% msg_store has processed the 'write'. There is then no point going +%% ahead with the processing of that 'write'. +%% +%% To detect this situation a 'flying_ets' table is shared between the +%% clients and the server. The table is keyed on the combination of +%% client (reference) and msg id, and the value represents an +%% integration of all the writes and removes currently "in flight" for +%% that message between the client and server - '+1' means all the +%% writes/removes add up to a single 'write', '-1' to a 'remove', and +%% '0' to nothing. (NB: the integration can never add up to more than +%% one 'write' or 'read' since clients must not write/remove a message +%% more than once without first removing/writing it). +%% +%% Maintaining this table poses two challenges: 1) both the clients +%% and the server access and update the table, which causes +%% concurrency issues, 2) we must ensure that entries do not stay in +%% the table forever, since that would constitute a memory leak. We +%% address the former by carefully modelling all operations as +%% sequences of atomic actions that produce valid results in all +%% possible interleavings. We address the latter by deleting table +%% entries whenever the server finds a 0-valued entry during the +%% processing of a write/remove. 0 is essentially equivalent to "no +%% entry". If, OTOH, the value is non-zero we know there is at least +%% one other 'write' or 'remove' in flight, so we get an opportunity +%% later to delete the table entry when processing these. +%% +%% There are two further complications. We need to ensure that 1) +%% eliminated writes still get confirmed, and 2) the write-back cache +%% doesn't grow unbounded. These are quite straightforward to +%% address. See the comments in the code. +%% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -392,7 +434,7 @@ successfully_recovered_state(Server) -> client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, CurFileCacheEts} = + FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, @@ -404,7 +446,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }. + cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -420,6 +463,7 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(MsgId, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> + ok = client_update_flying(+1, MsgId, CState), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), ok = server_cast(CState, {write, CRef, MsgId}). @@ -440,6 +484,7 @@ read(MsgId, contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> + [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). set_maximum_since_use(Server, Age) -> @@ -566,6 +611,21 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, end end. +client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, + client_ref = CRef }) -> + Key = {MsgId, CRef}, + case ets:insert_new(FlyingEts, {Key, Diff}) of + true -> ok; + false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) + catch error:badarg -> + %% this is guaranteed to succeed since the + %% server only removes and updates flying_ets + %% entries; it never inserts them + true = ets:insert_new(FlyingEts, {Key, Diff}) + end, + ok + end. + clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, dying_clients = DyingClients }) -> State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), @@ -619,6 +679,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), @@ -645,6 +706,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, dying_clients = sets:new(), clients = Clients, successfully_recovered = CleanShutdown, @@ -700,11 +762,13 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, - CurFileCacheEts}, State #msstate { clients = Clients1 }); + CurFileCacheEts, FlyingEts}, + State #msstate { clients = Clients1 }); handle_call({client_terminate, CRef}, _From, State) -> reply(ok, clear_client(CRef, State)); @@ -723,40 +787,54 @@ handle_cast({client_dying, CRef}, noreply(write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 })); -handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> +handle_cast({client_delete, CRef}, + State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, MsgId}, State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), - [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId), - noreply( - case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of - {write, State1} -> - write_message(CRef, MsgId, Msg, State1); - {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> - State1; - {ignore, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), - State1; - {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> - record_pending_confirm(CRef, MsgId, State1); - {confirm, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), - update_pending_confirms( - fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(gb_sets:singleton(MsgId), written), - CTM - end, CRef, State1) - end); + case update_flying(-1, MsgId, CRef, State) of + process -> + [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), + noreply(write_message(MsgId, Msg, CRef, State)); + ignore -> + %% A 'remove' has already been issued and eliminated the + %% 'write'. + State1 = blind_confirm(CRef, gb_sets:singleton(MsgId), + ignored, State), + %% If all writes get eliminated, cur_file_cache_ets could + %% grow unbounded. To prevent that we delete the cache + %% entry here, but only if the message isn't in the + %% current file. That way reads of the message can + %% continue to be done client side, from either the cache + %% or the non-current files. If the message *is* in the + %% current file then the cache entry will be removed by + %% the normal logic for that in write_message/4 and + %% maybe_roll_to_new_file/2. + case index_lookup(MsgId, State1) of + [#msg_location { file = File }] + when File == State1 #msstate.current_file -> + ok; + _ -> + true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0}) + end, + noreply(State1) + end; handle_cast({remove, CRef, MsgIds}, State) -> - State1 = lists:foldl( - fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end, - State, MsgIds), - noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), - removed, State1))); + {RemovedMsgIds, State1} = + lists:foldl( + fun (MsgId, {Removed, State2}) -> + case update_flying(+1, MsgId, CRef, State2) of + process -> {[MsgId | Removed], + remove_message(MsgId, CRef, State2)}; + ignore -> {Removed, State2} + end + end, {[], State}, MsgIds), + noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds), + ignored, State1))); handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, @@ -797,6 +875,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_ets = FlyingEts, clients = Clients, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull @@ -810,8 +889,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, end, State3 = close_all_handles(State1), ok = store_file_summary(FileSummaryEts, Dir), - [true = ets:delete(T) || - T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]], + [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, + CurFileCacheEts, FlyingEts]], IndexModule:terminate(IndexState), ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), @@ -874,6 +953,19 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, client_confirm(CRef, MsgIds, written, StateN) end, State1, CGs). +update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> + Key = {MsgId, CRef}, + NDiff = -Diff, + case ets:lookup(FlyingEts, Key) of + [] -> ignore; + [{_, Diff}] -> ignore; + [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}), + true = ets:delete_object(FlyingEts, {Key, 0}), + process; + [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}), + ignore + end. + write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; write_action({true, #msg_location { file = File }}, _MsgId, State) -> @@ -905,8 +997,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, %% field otherwise bad interaction with concurrent GC {confirm, File, State}. -write_message(CRef, MsgId, Msg, State) -> - write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)). +write_message(MsgId, Msg, CRef, + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> + case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of + {write, State1} -> + write_message(MsgId, Msg, + record_pending_confirm(CRef, MsgId, State1)); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, MsgId, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(gb_sets:singleton(MsgId), written), + CTM + end, CRef, State1) + end. + +remove_message(MsgId, CRef, + State = #msstate { file_summary_ets = FileSummaryEts }) -> + case should_mask_action(CRef, MsgId, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg whilst + %% it's being GC'd. + %% + %% ASSERTION: [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} + when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = fun () -> index_update_ref_count( + MsgId, RefCount - 1, State) end, + case RefCount of + %% don't remove from cur_file_cache_ets here because + %% there may be further writes in the mailbox for the + %% same msg. + 1 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, MsgId, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size( + File, -TotalSize, State)) + end; + _ -> ok = Dec(), + State + end + end. write_message(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, @@ -1004,43 +1153,6 @@ contains_message(MsgId, From, end end. -remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts }) -> - case should_mask_action(CRef, MsgId, State) of - {true, _Location} -> - State; - {false_if_increment, #msg_location { ref_count = 0 }} -> - %% CRef has tried to both write and remove this msg - %% whilst it's being GC'd. ASSERTION: - %% [#file_summary { locked = true }] = - %% ets:lookup(FileSummaryEts, File), - State; - {_Mask, #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize }} when RefCount > 0 -> - %% only update field, otherwise bad interaction with - %% concurrent GC - Dec = fun () -> - index_update_ref_count(MsgId, RefCount - 1, State) - end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here - %% because there may be further writes in the mailbox - %% for the same msg. - 1 -> case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> - add_to_pending_gc_completion( - {remove, MsgId, CRef}, File, State); - [#file_summary {}] -> - ok = Dec(), - delete_file_if_empty( - File, adjust_valid_total_size(File, -TotalSize, - State)) - end; - _ -> ok = Dec(), - State - end - end. - add_to_pending_gc_completion( Op, File, State = #msstate { pending_gc_completion = Pending }) -> State #msstate { pending_gc_completion = @@ -1120,6 +1232,11 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> end end, CRef, State). +blind_confirm(CRef, MsgIds, ActionTaken, State) -> + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end, + CRef, State). + %% Detect whether the MsgId is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death %% msg, and it has a 0 ref_count we must only alter the ref_count, not diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index d34ed44a8b..50444dc49d 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -22,6 +22,7 @@ -define(BaseApps, [rabbit]). -define(ERROR_CODE, 1). +-define(EPMD_TIMEOUT, 30000). %%---------------------------------------------------------------------------- %% Specs @@ -233,7 +234,8 @@ post_process_script(ScriptFile) -> end. process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> - [{apply,{rabbit,prepare,[]}}, Entry]; + [{apply,{rabbit,maybe_hipe_compile,[]}}, + {apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. @@ -244,7 +246,7 @@ duplicate_node_check([]) -> duplicate_node_check(NodeStr) -> Node = rabbit_misc:makenode(NodeStr), {NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - case net_adm:names(NodeHost) of + case names(NodeHost) of {ok, NamePorts} -> case proplists:is_defined(NodeName, NamePorts) of true -> io:format("node with name ~p " @@ -260,6 +262,7 @@ duplicate_node_check(NodeStr) -> [NodeHost, EpmdReason, case EpmdReason of address -> "unable to establish tcp connection"; + timeout -> "timed out establishing tcp connection"; _ -> inet:format_error(EpmdReason) end]) end. @@ -276,3 +279,15 @@ terminate(Status) -> after infinity -> ok end end. + +names(Hostname) -> + Self = self(), + process_flag(trap_exit, true), + Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), + timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), + Res = receive + {names, Names} -> Names; + {'EXIT', Pid, Reason} -> {error, Reason} + end, + process_flag(trap_exit, false), + Res. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b96934c6c0..2d55d133a4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1778,7 +1778,7 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> test_msg_store() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], - {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), + {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), Ref = rabbit_guid:guid(), {Cap, MSCState} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref), @@ -1789,6 +1789,8 @@ test_msg_store() -> false = msg_store_contains(false, MsgIds, MSCState), %% test confirm logic passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, MsgIds, MSCState), %% publish the first half ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half @@ -1896,12 +1898,12 @@ test_msg_store() -> false = msg_store_contains(false, MsgIdsBig, MSCStateM), MSCStateM end), + %% + passed = test_msg_store_client_delete_and_terminate(), %% restart empty restart_msg_store_empty(), passed. -%% We want to test that writes that get eliminated due to removes still -%% get confirmed. Removes themselves do not. test_msg_store_confirms(MsgIds, Cap, MSCState) -> %% write -> confirmed ok = msg_store_write(MsgIds, MSCState), @@ -1927,6 +1929,45 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) -> ok = msg_store_write(MsgIds, MSCState), ok = msg_store_remove(MsgIds, MSCState), ok = on_disk_await(Cap, MsgIds), + %% confirmation on timer-based sync + passed = test_msg_store_confirm_timer(), + passed. + +test_msg_store_confirm_timer() -> + Ref = rabbit_guid:guid(), + MsgId = msg_id_bin(1), + Self = self(), + MSCState = rabbit_msg_store:client_init( + ?PERSISTENT_MSG_STORE, Ref, + fun (MsgIds, _ActionTaken) -> + case gb_sets:is_member(MsgId, MsgIds) of + true -> Self ! on_disk; + false -> ok + end + end, undefined), + ok = msg_store_write([MsgId], MSCState), + ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState), + ok = msg_store_remove([MsgId], MSCState), + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), + passed. + +msg_store_keep_busy_until_confirm(MsgIds, MSCState) -> + receive + on_disk -> ok + after 0 -> + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + msg_store_keep_busy_until_confirm(MsgIds, MSCState) + end. + +test_msg_store_client_delete_and_terminate() -> + restart_msg_store_empty(), + MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], + Ref = rabbit_guid:guid(), + MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + ok = msg_store_write(MsgIds, MSCState), + %% test the 'dying client' fast path for writes + ok = rabbit_msg_store:client_delete_and_terminate(MSCState), passed. queue_name(Name) -> @@ -2238,11 +2279,10 @@ test_variable_queue_requeue(VQ0) -> (_, Acc) -> Acc end, [], lists:zip(Acks, Seq)), - {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, - fun(X) -> X end, VQ3), + {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3), VQ5 = lists:foldl(fun (AckTag, VQN) -> {_MsgId, VQM} = rabbit_variable_queue:requeue( - [AckTag], fun(X) -> X end, VQN), + [AckTag], VQN), VQM end, VQ4, Subset), VQ6 = lists:foldl(fun (AckTag, VQa) -> @@ -2436,7 +2476,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), {_Guids, VQ4} = - rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), + rabbit_variable_queue:requeue(AckTags, VQ3), VQ5 = rabbit_variable_queue:timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c51640bab1..bf9450f1b1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/3, fetch/2, ack/3, requeue/3, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, @@ -653,21 +653,19 @@ ack(AckTags, Fun, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> +requeue(AckTags, #vqstate { delta = Delta, + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), - fun publish_alpha/2, - MsgPropsFun, State), + fun publish_alpha/2, State), {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, delta_limit(Delta), - fun publish_beta/2, - MsgPropsFun, State1), + fun publish_beta/2, State1), {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, - MsgPropsFun, State2), + State2), MsgCount = length(MsgIds2), {MsgIds2, a(reduce_memory_use( State3 #vqstate { delta = Delta1, @@ -1317,7 +1315,7 @@ blind_confirm(Callback, MsgIdSet) -> Callback(?MODULE, fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). -msgs_written_to_disk(Callback, MsgIdSet, removed) -> +msgs_written_to_disk(Callback, MsgIdSet, ignored) -> blind_confirm(Callback, MsgIdSet); msgs_written_to_disk(Callback, MsgIdSet, written) -> Callback(?MODULE, @@ -1360,50 +1358,49 @@ publish_beta(MsgStatus, State) -> ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. %% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) -> +queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, - Limit, PubFun, MsgPropsFun, State). + Limit, PubFun, State). queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, - Limit, PubFun, MsgPropsFun, State) + Limit, PubFun, State) when Limit == undefined orelse SeqId < Limit -> case ?QUEUE:out(Q) of {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} when SeqIdQ < SeqId -> %% enqueue from the remaining queue queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, - Limit, PubFun, MsgPropsFun, State); + Limit, PubFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids - {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, - State), + {MsgStatus, State1} = msg_from_pending_ack(SeqId, State), {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, MsgPropsFun, State2) + Limit, PubFun, State2) end; queue_merge(SeqIds, Q, Front, MsgIds, - _Limit, _PubFun, _MsgPropsFun, State) -> + _Limit, _PubFun, State) -> {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. -delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> +delta_merge([], Delta, MsgIds, State) -> {Delta, MsgIds, State}; -delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) -> +delta_merge(SeqIds, Delta, MsgIds, State) -> lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - msg_from_pending_ack(SeqId, MsgPropsFun, State0), + msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 -msg_from_pending_ack(SeqId, MsgPropsFun, State) -> +msg_from_pending_ack(SeqId, State) -> {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = remove_pending_ack(SeqId, State), {MsgStatus #msg_status { - msg_props = (MsgPropsFun(MsgProps)) #message_properties { - needs_confirming = false } }, State1}. + msg_props = MsgProps #message_properties { needs_confirming = false } }, + State1}. beta_limit(Q) -> case ?QUEUE:peek(Q) of @@ -1601,39 +1598,31 @@ maybe_deltas_to_betas(State = #vqstate { end. push_alphas_to_betas(Quota, State) -> - {Quota1, State1} = maybe_push_q1_to_betas(Quota, State), - {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + {Quota1, State1} = + push_alphas_to_betas( + fun ?QUEUE:out/1, + fun (MsgStatus, Q1a, + State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> + State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; + (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> + State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } + end, Quota, State #vqstate.q1, State), + {Quota2, State2} = + push_alphas_to_betas( + fun ?QUEUE:out_r/1, + fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> + State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } + end, Quota1, State1 #vqstate.q4, State1), {Quota2, State2}. -maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> - maybe_push_alphas_to_betas( - fun ?QUEUE:out/1, - fun (MsgStatus, Q1a, - State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> - State1 #vqstate { q1 = Q1a, - q3 = ?QUEUE:in(MsgStatus, Q3) }; - (MsgStatus, Q1a, State1 = #vqstate { q2 = Q2 }) -> - State1 #vqstate { q1 = Q1a, - q2 = ?QUEUE:in(MsgStatus, Q2) } - end, Quota, Q1, State). - -maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> - maybe_push_alphas_to_betas( - fun ?QUEUE:out_r/1, - fun (MsgStatus, Q4a, State1 = #vqstate { q3 = Q3 }) -> - State1 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), - q4 = Q4a } - end, Quota, Q4, State). - -maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, - State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) +push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, + State = #vqstate { ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount }) when Quota =:= 0 orelse TargetRamCount =:= infinity orelse TargetRamCount >= RamMsgCount -> {Quota, State}; -maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> +push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of {empty, _Q} -> {Quota, State}; @@ -1643,8 +1632,8 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 }, - maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, - Consumer(MsgStatus2, Qa, State2)) + push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, + Consumer(MsgStatus2, Qa, State2)) end. push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, |
