summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl92
-rw-r--r--src/rabbit_control_main.erl167
-rw-r--r--src/rabbit_file.erl10
-rw-r--r--src/rabbit_hipe.erl89
-rw-r--r--src/rabbit_mirror_queue_master.erl16
-rw-r--r--src/rabbit_mirror_queue_misc.erl17
-rw-r--r--src/rabbit_policy.erl4
-rw-r--r--src/rabbit_priority_queue.erl68
-rw-r--r--src/rabbit_upgrade_functions.erl19
-rw-r--r--src/truncate.erl75
10 files changed, 325 insertions, 232 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3ee14e4f7d..2db86391a5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,6 +54,7 @@
max_length,
max_bytes,
args_policy_version,
+ mirroring_policy_version = 0,
status
}).
@@ -702,7 +703,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder1},
notify_decorators(State2),
case should_auto_delete(State2) of
- true ->
+ true ->
log_auto_delete(
io_lib:format(
"because all of its consumers (~p) were on a channel that was closed",
@@ -1071,11 +1072,11 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true ->
+ true ->
log_auto_delete(
io_lib:format(
"because its last consumer with tag '~s' was cancelled",
- [ConsumerTag]),
+ [ConsumerTag]),
State),
stop(ok, State1)
end
@@ -1207,22 +1208,15 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast(start_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- %% lookup again to get policy for init_with_existing_bq
- {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
- true = BQ =/= rabbit_mirror_queue_master, %% assertion
- BQ1 = rabbit_mirror_queue_master,
- BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
-handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQ = rabbit_mirror_queue_master, %% assertion
- {BQ1, BQS1} = BQ:stop_mirroring(BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+handle_cast(update_mirroring, State = #q{q = Q,
+ mirroring_policy_version = Version}) ->
+ case needs_update_mirroring(Q, Version) of
+ false ->
+ noreply(State);
+ {Policy, NewVersion} ->
+ State1 = State#q{mirroring_policy_version = NewVersion},
+ noreply(update_mirroring(Policy, State1))
+ end;
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{consumers = Consumers,
@@ -1371,19 +1365,67 @@ log_delete_exclusive({ConPid, _ConRef}, State) ->
log_delete_exclusive(ConPid, State);
log_delete_exclusive(ConPid, #q{ q = #amqqueue{ name = Resource } }) ->
#resource{ name = QName, virtual_host = VHost } = Resource,
- rabbit_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++
- " because its declaring connection ~p was closed",
- [QName, VHost, ConPid]).
+ rabbit_log_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++
+ "because its declaring connection ~p was closed",
+ [QName, VHost, ConPid]).
log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) ->
#resource{ name = QName, virtual_host = VHost } = Resource,
- rabbit_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++
- Reason,
- [QName, VHost]).
+ rabbit_log_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++
+ Reason,
+ [QName, VHost]).
+
+needs_update_mirroring(Q, Version) ->
+ {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name),
+ DBVersion = UpQ#amqqueue.policy_version,
+ case DBVersion > Version of
+ true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion};
+ false -> false
+ end.
+update_mirroring(Policy, State = #q{backing_queue = BQ}) ->
+ case update_to(Policy, BQ) of
+ start_mirroring ->
+ start_mirroring(State);
+ stop_mirroring ->
+ stop_mirroring(State);
+ ignore ->
+ State;
+ update_ha_mode ->
+ update_ha_mode(State)
+ end.
+update_to(undefined, rabbit_mirror_queue_master) ->
+ stop_mirroring;
+update_to(_, rabbit_mirror_queue_master) ->
+ update_ha_mode;
+update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ ignore;
+update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ start_mirroring.
+
+start_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+stop_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+
+update_ha_mode(State) ->
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ ok = rabbit_mirror_queue_misc:update_mirrors(Q),
+ State.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 7f653c3780..bf50fd6c49 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -23,7 +23,7 @@
sync_queue/1, cancel_sync_queue/1, become/1,
purge_queue/1]).
--import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]).
+-import(rabbit_misc, [rpc_call/4, rpc_call/5]).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -37,6 +37,7 @@
reset,
force_reset,
rotate_logs,
+ hipe_compile,
{join_cluster, [?RAM_DEF]},
change_cluster_node_type,
@@ -113,7 +114,7 @@
[stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
join_cluster, change_cluster_node_type, update_cluster_nodes,
forget_cluster_node, rename_cluster_node, cluster_status, status,
- environment, eval, force_boot, help, node_health_check]).
+ environment, eval, force_boot, help, node_health_check, hipe_compile]).
-define(COMMANDS_WITH_TIMEOUT,
[list_user_permissions, list_policies, list_queues, list_exchanges,
@@ -380,6 +381,16 @@ action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Rotating logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, []});
+action(hipe_compile, _Node, [TargetDir], _Opts, _Inform) ->
+ ok = application:load(rabbit),
+ case rabbit_hipe:can_hipe_compile() of
+ true ->
+ {ok, _, _} = rabbit_hipe:compile_to_directory(TargetDir),
+ ok;
+ false ->
+ {error, "HiPE compilation is not supported"}
+ end;
+
action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
Inform("Closing connection \"~s\"", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
@@ -579,56 +590,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
action(list_users, Node, [], _Opts, Inform, Timeout) ->
Inform("Listing users", []),
- call(Node, {rabbit_auth_backend_internal, list_users, []},
- rabbit_auth_backend_internal:user_info_keys(), true, Timeout);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
+ rabbit_auth_backend_internal:user_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8]);
action(list_permissions, Node, [], Opts, Inform, Timeout) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost \"~s\"", [VHost]),
- call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
- rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout,
- true);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
+ rabbit_auth_backend_internal:vhost_perms_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_parameters, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing runtime parameters", []),
- call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
- rabbit_runtime_parameters:info_keys(), Timeout);
+ call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
+ rabbit_runtime_parameters:info_keys(),
+ [{timeout, Timeout}]);
action(list_policies, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing policies", []),
- call(Node, {rabbit_policy, list_formatted, [VHostArg]},
- rabbit_policy:info_keys(), Timeout);
+ call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
+ rabbit_policy:info_keys(),
+ [{timeout, Timeout}]);
action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing vhosts", []),
ArgAtoms = default_if_empty(Args, [name]),
- call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout);
+ call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
+ [{timeout, Timeout}, to_bin_utf8]);
action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
{error_string,
"list_user_permissions expects a username argument, but none provided."};
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
Inform("Listing permissions for user ~p", Args),
- call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
- rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout,
- true);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
+ rabbit_auth_backend_internal:user_perms_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_queues, Node, Args, Opts, Inform, Timeout) ->
- [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
Inform("Listing queues", []),
+ %% User options
+ [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, messages]),
- call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
- ArgAtoms, Timeout);
+
+ %% Data for emission
+ Nodes = nodes_in_cluster(Node, Timeout),
+ OnlineChunks = if Online -> length(Nodes); true -> 0 end,
+ OfflineChunks = if Offline -> 1; true -> 0 end,
+ ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
+ TimeoutOpt = {timeout, Timeout},
+ EmissionRef = make_ref(),
+ EmissionRefOpt = {ref, EmissionRef},
+
+ _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, type]),
- call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, Timeout);
+ call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}]);
action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing bindings", []),
@@ -636,27 +665,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
ArgAtoms = default_if_empty(Args, [source_name, source_kind,
destination_name, destination_kind,
routing_key, arguments]),
- call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, Timeout);
+ call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}]);
action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
- call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]},
- ArgAtoms, Timeout);
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
messages_unacknowledged]),
- call(Node, {rabbit_channel, info_all, [ArgAtoms]},
- ArgAtoms, Timeout);
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
+ [{timeout, Timeout}, {chunks, length(Nodes)}]);
action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
Inform("Listing consumers", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
- rabbit_amqqueue:consumer_info_keys(), Timeout).
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
+ rabbit_amqqueue:consumer_info_keys(),
+ [{timeout, Timeout}, {chunks, length(Nodes)}]).
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
@@ -766,17 +799,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
{X, Value} -> Value
end, IsEscaped) || X <- InfoItemKeys]).
-display_info_message(IsEscaped) ->
+display_info_message(IsEscaped, InfoItemKeys) ->
fun ([], _) ->
ok;
- ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
+ ([FirstResult|_] = List, _) when is_list(FirstResult) ->
lists:foreach(fun(Result) ->
display_info_message_row(IsEscaped, Result, InfoItemKeys)
end,
List),
ok;
- (Result, InfoItemKeys) ->
- display_info_message_row(IsEscaped, Result, InfoItemKeys)
+ (Result, _) ->
+ display_info_message_row(IsEscaped, Result, InfoItemKeys),
+ ok
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -833,7 +867,10 @@ display_call_result(Node, MFA) ->
end.
unsafe_rpc(Node, Mod, Fun, Args) ->
- case rpc_call(Node, Mod, Fun, Args) of
+ unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
+
+unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
+ case rpc_call(Node, Mod, Fun, Args, Timeout) of
{badrpc, _} = Res -> throw(Res);
Normal -> Normal
end.
@@ -852,33 +889,42 @@ ensure_app_running(Node) ->
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
-call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) ->
- call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false).
+call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
+ Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
+ display_emission_result(Ref, InfoKeys, Opts).
+
+start_emission(Node, {Mod, Fun, Args}, Opts) ->
+ ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
+ Timeout = proplists:get_value(timeout, Opts, infinity),
+ Ref = proplists:get_value(ref, Opts, make_ref()),
+ rabbit_control_misc:spawn_emitter_caller(
+ Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
+ Ref, self(), Timeout),
+ Ref.
+
+display_emission_result(Ref, InfoKeys, Opts) ->
+ IsEscaped = proplists:get_value(is_escaped, Opts, false),
+ Chunks = proplists:get_value(chunks, Opts, 1),
+ Timeout = proplists:get_value(timeout, Opts, infinity),
+ EmissionStatus = rabbit_control_misc:wait_for_info_messages(
+ self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
+ emission_to_action_result(EmissionStatus).
+
+%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
+%% into form expected by rabbit_cli:main/3.
+emission_to_action_result({ok, ok}) ->
+ ok;
+emission_to_action_result({error, Error}) ->
+ Error.
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
- call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false).
+prepare_call_args(Args, ToBinUtf8) ->
+ case ToBinUtf8 of
+ true -> valid_utf8_args(Args);
+ false -> Args
+ end.
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) ->
- Args0 = case ToBinUtf8 of
- true -> lists:map(fun list_to_binary_utf8/1, Args);
- false -> Args
- end,
- Ref = make_ref(),
- Pid = self(),
- spawn_link(
- fun () ->
- case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
- Ref, Pid, Timeout) of
- {error, _} = Error ->
- Pid ! {error, Error};
- {bad_argument, _} = Error ->
- Pid ! {error, Error};
- _ ->
- ok
- end
- end),
- rabbit_control_misc:wait_for_info_messages(
- Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout).
+valid_utf8_args(Args) ->
+ lists:map(fun list_to_binary_utf8/1, Args).
list_to_binary_utf8(L) ->
B = list_to_binary(L),
@@ -928,7 +974,10 @@ split_list([_]) -> exit(even_list_needed);
split_list([A, B | T]) -> [{A, B} | split_list(T)].
nodes_in_cluster(Node) ->
- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
+
+nodes_in_cluster(Node, Timeout) ->
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
alarms_by_node(Name) ->
Status = unsafe_rpc(Name, rabbit, status, []),
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 3dd0421485..d2d37f0ec0 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -24,6 +24,7 @@
-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]).
-export([lock_file/1]).
-export([read_file_info/1]).
+-export([filename_as_a_directory/1]).
-import(file_handle_cache, [with_handle/1, with_handle/2]).
@@ -59,6 +60,7 @@
(file:filename(), file:filename())
-> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
+-spec(filename_as_a_directory/1 :: (file:filename()) -> file:filename()).
-endif.
@@ -306,3 +308,11 @@ lock_file(Path) ->
ok = prim_file:close(Lock)
end)
end.
+
+filename_as_a_directory(FileName) ->
+ case lists:last(FileName) of
+ "/" ->
+ FileName;
+ _ ->
+ FileName ++ "/"
+ end.
diff --git a/src/rabbit_hipe.erl b/src/rabbit_hipe.erl
index cbd9181e6a..6957d85cb4 100644
--- a/src/rabbit_hipe.erl
+++ b/src/rabbit_hipe.erl
@@ -5,15 +5,15 @@
%% practice 2 processes seems just as fast as any other number > 1,
%% and keeps the progress bar realistic-ish.
-define(HIPE_PROCESSES, 2).
--export([maybe_hipe_compile/0, log_hipe_result/1]).
-%% HiPE compilation happens before we have log handlers - so we have
-%% to io:format/2, it's all we can do.
+-export([maybe_hipe_compile/0, log_hipe_result/1]).
+-export([compile_to_directory/1]).
+-export([can_hipe_compile/0]).
+%% Compile and load during server startup sequence
maybe_hipe_compile() ->
{ok, Want} = application:get_env(rabbit, hipe_compile),
- Can = code:which(hipe) =/= non_existing,
- case {Want, Can} of
+ case {Want, can_hipe_compile()} of
{true, true} -> hipe_compile();
{true, false} -> false;
{false, _} -> {ok, disabled}
@@ -33,38 +33,49 @@ log_hipe_result(false) ->
rabbit_log:warning(
"Not HiPE compiling: HiPE not found in this Erlang installation.~n").
+hipe_compile() ->
+ hipe_compile(fun compile_and_load/1, false).
+
+compile_to_directory(Dir0) ->
+ Dir = rabbit_file:filename_as_a_directory(Dir0),
+ ok = prepare_ebin_directory(Dir),
+ hipe_compile(fun (Mod) -> compile_and_save(Mod, Dir) end, true).
+
+needs_compilation(Mod, Force) ->
+ Exists = code:which(Mod) =/= non_existing,
+ %% We skip modules already natively compiled. This
+ %% happens when RabbitMQ is stopped (just the
+ %% application, not the entire node) and started
+ %% again.
+ NotYetCompiled = not already_hipe_compiled(Mod),
+ NotVersioned = not compiled_with_version_support(Mod),
+ Exists andalso (Force orelse (NotYetCompiled andalso NotVersioned)).
+
%% HiPE compilation happens before we have log handlers and can take a
%% long time, so make an exception to our no-stdout policy and display
%% progress via stdout.
-hipe_compile() ->
+hipe_compile(CompileFun, Force) ->
{ok, HipeModulesAll} = application:get_env(rabbit, hipe_modules),
- HipeModules = [HM || HM <- HipeModulesAll,
- code:which(HM) =/= non_existing andalso
- %% We skip modules already natively compiled. This
- %% happens when RabbitMQ is stopped (just the
- %% application, not the entire node) and started
- %% again.
- already_hipe_compiled(HM)
- andalso (not compiled_with_version_support(HM))],
+ HipeModules = lists:filter(fun(Mod) -> needs_compilation(Mod, Force) end, HipeModulesAll),
case HipeModules of
[] -> {ok, already_compiled};
- _ -> do_hipe_compile(HipeModules)
+ _ -> do_hipe_compile(HipeModules, CompileFun)
end.
already_hipe_compiled(Mod) ->
try
%% OTP 18.x or later
- Mod:module_info(native) =:= false
+ Mod:module_info(native) =:= true
%% OTP prior to 18.x
catch error:badarg ->
- code:is_module_native(Mod) =:= false
+ code:is_module_native(Mod) =:= true
end.
compiled_with_version_support(Mod) ->
proplists:get_value(erlang_version_support, Mod:module_info(attributes))
=/= undefined.
-do_hipe_compile(HipeModules) ->
+do_hipe_compile(HipeModules, CompileFun) ->
Count = length(HipeModules),
io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
@@ -79,11 +90,7 @@ do_hipe_compile(HipeModules) ->
%% advanced API does not load automatically the code, except if the
%% 'load' option is set.
PidMRefs = [spawn_monitor(fun () -> [begin
- {M, Beam, _} =
- code:get_object_code(M),
- {ok, _} =
- hipe:compile(M, [], Beam,
- [o3, load]),
+ CompileFun(M),
io:format("#")
end || M <- Ms]
end) ||
@@ -101,3 +108,39 @@ split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
split0([], Ls) -> Ls;
split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]).
+
+prepare_ebin_directory(Dir) ->
+ ok = rabbit_file:ensure_dir(Dir),
+ ok = delete_beam_files(Dir),
+ ok.
+
+delete_beam_files(Dir) ->
+ {ok, Files} = file:list_dir(Dir),
+ lists:foreach(fun(File) ->
+ case filename:extension(File) of
+ ".beam" ->
+ ok = file:delete(filename:join([Dir, File]));
+ _ ->
+ ok
+ end
+ end,
+ Files).
+
+compile_and_load(Mod) ->
+ {Mod, Beam, _} = code:get_object_code(Mod),
+ {ok, _} = hipe:compile(Mod, [], Beam, [o3, load]).
+
+compile_and_save(Module, Dir) ->
+ {Module, BeamCode, _} = code:get_object_code(Module),
+ BeamName = filename:join([Dir, atom_to_list(Module) ++ ".beam"]),
+ {ok, {Architecture, NativeCode}} = hipe:compile(Module, [], BeamCode, [o3]),
+ {ok, _, Chunks0} = beam_lib:all_chunks(BeamCode),
+ ChunkName = hipe_unified_loader:chunk_name(Architecture),
+ Chunks1 = lists:keydelete(ChunkName, 1, Chunks0),
+ Chunks = Chunks1 ++ [{ChunkName,NativeCode}],
+ {ok, BeamPlusNative} = beam_lib:build_module(Chunks),
+ ok = file:write_file(BeamName, BeamPlusNative),
+ BeamName.
+
+can_hipe_compile() ->
+ code:which(hipe) =/= non_existing.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e447e9de82..9674a4ef2c 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -363,7 +363,7 @@ fetch(AckRequired, State = #state { backing_queue = BQ,
State1 = State #state { backing_queue_state = BQS1 },
{Result, case Result of
empty -> State1;
- {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1)
+ {_MsgId, _IsDelivered, _AckTag} -> drop_one(AckRequired, State1)
end}.
drop(AckRequired, State = #state { backing_queue = BQ,
@@ -372,7 +372,7 @@ drop(AckRequired, State = #state { backing_queue = BQ,
State1 = State #state { backing_queue_state = BQS1 },
{Result, case Result of
empty -> State1;
- {_MsgId, AckTag} -> drop_one(AckTag, State1)
+ {_MsgId, _AckTag} -> drop_one(AckRequired, State1)
end}.
ack(AckTags, State = #state { gm = GM,
@@ -518,6 +518,7 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
Depth = BQ:depth(BQS1),
true = Len == Depth, %% ASSERTION: everything must have been requeued
ok = gm:broadcast(GM, {depth, Depth}),
+ WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
#state { name = QName,
gm = GM,
coordinator = CPid,
@@ -525,7 +526,8 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
backing_queue_state = BQS1,
seen_status = Seen,
confirmed = [],
- known_senders = sets:from_list(KS) }.
+ known_senders = sets:from_list(KS),
+ wait_timeout = WaitTimeout }.
sender_death_fun() ->
Self = self(),
@@ -556,10 +558,10 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop_one(AckTag, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
+drop_one(AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckRequired}),
State.
drop(PrevLen, AckRequired, State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 849efa3611..b188298a9b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -20,7 +20,7 @@
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, validate_policy/1,
+ is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
@@ -64,6 +64,8 @@
-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()).
-spec(update_mirrors/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
+-spec(update_mirrors/1 ::
+ (rabbit_types:amqqueue()) -> 'ok').
-spec(maybe_drop_master_after_sync/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok').
@@ -384,15 +386,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
- {true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
- {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
- {true, true} -> update_mirrors0(OldQ, NewQ)
+ _ -> rabbit_amqqueue:update_mirroring(QPid)
end.
-update_mirrors0(OldQ = #amqqueue{name = QName},
- NewQ = #amqqueue{name = QName}) ->
- {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
- {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+update_mirrors(Q = #amqqueue{name = QName}) ->
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
%% When a mirror dies, remove_from_queue/2 might have to add new
@@ -406,7 +405,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
drop_mirrors(QName, OldNodes -- NewNodes),
%% This is for the case where no extra nodes were added but we changed to
%% a policy requiring auto-sync.
- maybe_auto_sync(NewQ),
+ maybe_auto_sync(Q),
ok.
%% The arrival of a newly synced slave may cause the master to die if
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index eb8cf63327..a9caadf972 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -276,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
NewPolicy -> case rabbit_amqqueue:update(
QName, fun(Q1) ->
rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy})
+ Q1#amqqueue{policy = NewPolicy,
+ policy_version =
+ Q1#amqqueue.policy_version + 1 })
end) of
#amqqueue{} = Q1 -> {Q, Q1};
not_found -> {Q, Q }
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 6141796f7b..a3bfb5cdfa 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -205,8 +205,8 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
-batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = partition_publish_batch(Publishes),
+batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
+ PubDict = partition_publish_batch(Publishes, MaxP),
lists:foldl(
fun ({Priority, Pubs}, St) ->
pick1(fun (_P, BQSN) ->
@@ -227,8 +227,8 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
-batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = partition_publish_delivered_batch(Publishes),
+batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
+ PubDict = partition_publish_delivered_batch(Publishes, MaxP),
{PrioritiesAndAcks, State1} =
lists:foldl(
fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
@@ -404,7 +404,6 @@ msg_rates(#state{bq = BQ, bqss = BQSs}) ->
end, {0.0, 0.0}, BQSs);
msg_rates(#passthrough{bq = BQ, bqs = BQS}) ->
BQ:msg_rates(BQS).
-
info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) ->
fold0(fun (P, BQSN, Acc) ->
combine_status(P, BQ:info(backing_queue_status, BQSN), Acc)
@@ -433,8 +432,8 @@ set_queue_mode(Mode, State = #state{bq = BQ}) ->
set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(set_queue_mode(Mode, BQS)).
-zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{}) ->
- MsgsByPriority = partition_publish_delivered_batch(Msgs),
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) ->
+ MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP),
lists:foldl(fun (Acks, MAs) ->
{P, _AckTag} = hd(Acks),
Pubs = orddict:fetch(P, MsgsByPriority),
@@ -484,13 +483,14 @@ foreach1(_Fun, [], BQSAcc) ->
%% For a given thing, just go to its BQ
pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) ->
- {P, BQSN} = priority(Prioritisable, BQSs),
+ {P, BQSN} = priority_bq(Prioritisable, BQSs),
a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}).
%% Fold over results
fold2(Fun, Acc, State = #state{bqss = BQSs}) ->
{Res, BQSs1} = fold2(Fun, Acc, BQSs, []),
{Res, a(State#state{bqss = BQSs1})}.
+
fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) ->
{Acc1, BQSN1} = Fun(P, BQSN, Acc),
fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]);
@@ -532,7 +532,7 @@ fold_by_acktags2(Fun, AckTags, State) ->
%% For a given thing, just go to its BQ
pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) ->
- {P, BQSN} = priority(Prioritisable, BQSs),
+ {P, BQSN} = priority_bq(Prioritisable, BQSs),
{Res, BQSN1} = Fun(P, BQSN),
{Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}.
@@ -564,8 +564,7 @@ findfold3(_Fun, Acc, NotFound, [], BQSAcc) ->
{NotFound, Acc, lists:reverse(BQSAcc)}.
bq_fetch(P, []) -> exit({not_found, P});
-bq_fetch(P, [{P, BQSN} | _]) -> BQSN;
-bq_fetch(P, [{P1, BQSN} | _]) when P > P1 -> BQSN;
+bq_fetch(P, [{P, BQSN} | _]) -> {P, BQSN};
bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T).
bq_store(P, BQS, BQSs) ->
@@ -583,41 +582,36 @@ a(State = #state{bqss = BQSs}) ->
end.
%%----------------------------------------------------------------------------
-partition_publish_batch(Publishes) ->
+partition_publish_batch(Publishes, MaxP) ->
partition_publishes(
- Publishes, fun ({Msg, _, _}) -> Msg end).
+ Publishes, fun ({Msg, _, _}) -> Msg end, MaxP).
-partition_publish_delivered_batch(Publishes) ->
+partition_publish_delivered_batch(Publishes, MaxP) ->
partition_publishes(
- Publishes, fun ({Msg, _}) -> Msg end).
+ Publishes, fun ({Msg, _}) -> Msg end, MaxP).
-partition_publishes(Publishes, ExtractMsg) ->
+partition_publishes(Publishes, ExtractMsg, MaxP) ->
lists:foldl(fun (Pub, Dict) ->
Msg = ExtractMsg(Pub),
- rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict)
+ rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict)
end, orddict:new(), Publishes).
-priority(P, BQSs) when is_integer(P) ->
- {P, bq_fetch(P, BQSs)};
-priority(#basic_message{content = Content}, BQSs) ->
- priority1(rabbit_binary_parser:ensure_content_decoded(Content), BQSs).
-
-priority1(_Content, [{P, BQSN}]) ->
- {P, BQSN};
-priority1(Content, [{P, BQSN} | Rest]) ->
- case priority2(Content) >= P of
- true -> {P, BQSN};
- false -> priority1(Content, Rest)
- end.
-
-priority2(#basic_message{content = Content}) ->
- priority2(rabbit_binary_parser:ensure_content_decoded(Content));
-priority2(#content{properties = Props}) ->
+priority_bq(Priority, [{MaxP, _} | _] = BQSs) ->
+ bq_fetch(priority(Priority, MaxP), BQSs).
+
+%% Messages with a priority which is higher than the queue's maximum are treated
+%% as if they were published with the maximum priority.
+priority(undefined, _MaxP) ->
+ 0;
+priority(Priority, MaxP) when is_integer(Priority), Priority =< MaxP ->
+ Priority;
+priority(Priority, MaxP) when is_integer(Priority), Priority > MaxP ->
+ MaxP;
+priority(#basic_message{content = Content}, MaxP) ->
+ priority(rabbit_binary_parser:ensure_content_decoded(Content), MaxP);
+priority(#content{properties = Props}, MaxP) ->
#'P_basic'{priority = Priority0} = Props,
- case Priority0 of
- undefined -> 0;
- _ when is_integer(Priority0) -> Priority0
- end.
+ priority(Priority0, MaxP).
add_maybe_infinity(infinity, _) -> infinity;
add_maybe_infinity(_, infinity) -> infinity;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b99a1d12ee..0f55b9e4a9 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -52,6 +52,7 @@
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
+-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -447,6 +448,24 @@ recoverable_slaves(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators,
state]).
+policy_version() ->
+ ok = policy_version(rabbit_queue),
+ ok = policy_version(rabbit_durable_queue).
+
+policy_version(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, 0}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
+ policy_version]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/src/truncate.erl b/src/truncate.erl
index 1c9b08ed27..a1586b0cb0 100644
--- a/src/truncate.erl
+++ b/src/truncate.erl
@@ -21,8 +21,10 @@
-record(params, {content, struct, content_dec, struct_dec}).
-export([log_event/2, term/2]).
-%% exported for testing
--export([test/0]).
+
+-ifdef(TEST).
+-export([term_size/3]).
+-endif.
log_event({Type, GL, {Pid, Format, Args}}, Params)
when Type =:= error orelse
@@ -123,72 +125,3 @@ tuple_term_size(_T, M, I, S, _W) when I > S ->
M;
tuple_term_size(T, M, I, S, W) ->
tuple_term_size(T, lim(term_size(element(I, T), M, W), 2 * W), I + 1, S, W).
-
-%%----------------------------------------------------------------------------
-
-test() ->
- test_short_examples_exactly(),
- test_term_limit(),
- test_large_examples_for_size(),
- ok.
-
-test_short_examples_exactly() ->
- F = fun (Term, Exp) ->
- Exp = term(Term, {1, {10, 10, 5, 5}}),
- Term = term(Term, {100000, {10, 10, 5, 5}})
- end,
- FSmall = fun (Term, Exp) ->
- Exp = term(Term, {1, {2, 2, 2, 2}}),
- Term = term(Term, {100000, {2, 2, 2, 2}})
- end,
- F([], []),
- F("h", "h"),
- F("hello world", "hello w..."),
- F([[h,e,l,l,o,' ',w,o,r,l,d]], [[h,e,l,l,o,'...']]),
- F([a|b], [a|b]),
- F(<<"hello">>, <<"hello">>),
- F([<<"hello world">>], [<<"he...">>]),
- F(<<1:1>>, <<1:1>>),
- F(<<1:81>>, <<0:56, "...">>),
- F({{{{a}}},{b},c,d,e,f,g,h,i,j,k}, {{{'...'}},{b},c,d,e,f,g,h,i,j,'...'}),
- FSmall({a,30,40,40,40,40}, {a,30,'...'}),
- FSmall([a,30,40,40,40,40], [a,30,'...']),
- P = spawn(fun() -> receive die -> ok end end),
- F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]),
- P ! die,
- R = make_ref(),
- F([R], [R]),
- ok.
-
-test_term_limit() ->
- W = erlang:system_info(wordsize),
- S = <<"abc">>,
- 1 = term_size(S, 4, W),
- limit_exceeded = term_size(S, 3, W),
- case 100 - term_size([S, S], 100, W) of
- 22 -> ok; %% 32 bit
- 38 -> ok %% 64 bit
- end,
- case 100 - term_size([S, [S]], 100, W) of
- 30 -> ok; %% ditto
- 54 -> ok
- end,
- limit_exceeded = term_size([S, S], 6, W),
- ok.
-
-test_large_examples_for_size() ->
- %% Real world values
- Shrink = fun(Term) -> term(Term, {1, {1000, 100, 50, 5}}) end,
- TestSize = fun(Term) ->
- true = 5000000 < size(term_to_binary(Term)),
- true = 500000 > size(term_to_binary(Shrink(Term)))
- end,
- TestSize(lists:seq(1, 5000000)),
- TestSize(recursive_list(1000, 10)),
- TestSize(recursive_list(5000, 20)),
- TestSize(gb_sets:from_list([I || I <- lists:seq(1, 1000000)])),
- TestSize(gb_trees:from_orddict([{I, I} || I <- lists:seq(1, 1000000)])),
- ok.
-
-recursive_list(S, 0) -> lists:seq(1, S);
-recursive_list(S, N) -> [recursive_list(S div N, N-1) || _ <- lists:seq(1, S)].