summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl14
-rw-r--r--src/file_handle_cache_stats.erl6
-rw-r--r--src/gm.erl10
-rw-r--r--src/pg2_fixed.erl4
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl138
-rw-r--r--src/rabbit_binding.erl4
-rw-r--r--src/rabbit_control_main.erl167
-rw-r--r--src/rabbit_dead_letter.erl16
-rw-r--r--src/rabbit_direct.erl4
-rw-r--r--src/rabbit_disk_monitor.erl6
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl12
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_invalid.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl4
-rw-r--r--src/rabbit_file.erl10
-rw-r--r--src/rabbit_hipe.erl95
-rw-r--r--src/rabbit_mirror_queue_master.erl16
-rw-r--r--src/rabbit_mirror_queue_misc.erl17
-rw-r--r--src/rabbit_mirror_queue_mode_exactly.erl4
-rw-r--r--src/rabbit_mirror_queue_sync.erl8
-rw-r--r--src/rabbit_mnesia.erl111
-rw-r--r--src/rabbit_node_monitor.erl7
-rw-r--r--src/rabbit_password.erl4
-rw-r--r--src/rabbit_plugins.erl10
-rw-r--r--src/rabbit_policy.erl14
-rw-r--r--src/rabbit_priority_queue.erl69
-rw-r--r--src/rabbit_queue_consumers.erl10
-rw-r--r--src/rabbit_queue_location_random.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl19
-rw-r--r--src/rabbit_variable_queue.erl18
-rw-r--r--src/truncate.erl75
35 files changed, 577 insertions, 321 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d5f0cbee6f..e9754821e8 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -537,12 +537,12 @@ clear(Ref) ->
end).
set_maximum_since_use(MaximumAge) ->
- Now = time_compat:monotonic_time(),
+ Now = erlang:monotonic_time(),
case lists:foldl(
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
case Hdl =/= closed andalso
- time_compat:convert_time_unit(Now - Then,
+ erlang:convert_time_unit(Now - Then,
native,
micro_seconds)
>= MaximumAge of
@@ -715,7 +715,7 @@ get_or_reopen(RefNewOrReopens) ->
{ok, [Handle || {_Ref, Handle} <- OpenHdls]};
{OpenHdls, ClosedHdls} ->
Oldest = oldest(get_age_tree(),
- fun () -> time_compat:monotonic_time() end),
+ fun () -> erlang:monotonic_time() end),
case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
Oldest}, infinity) of
ok ->
@@ -751,7 +751,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
end,
case prim_file:open(Path, Mode) of
{ok, Hdl} ->
- Now = time_compat:monotonic_time(),
+ Now = erlang:monotonic_time(),
{{ok, _Offset}, Handle1} =
maybe_seek(Offset, reset_read_buffer(
Handle#handle{hdl = Hdl,
@@ -787,7 +787,7 @@ sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
- Now = time_compat:monotonic_time(),
+ Now = erlang:monotonic_time(),
age_tree_update(Then, Now, Ref),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
@@ -1429,14 +1429,14 @@ reduce(State = #fhc_state { open_pending = OpenPending,
elders = Elders,
clients = Clients,
timer_ref = TRef }) ->
- Now = time_compat:monotonic_time(),
+ Now = erlang:monotonic_time(),
{CStates, Sum, ClientCount} =
ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
[#cstate { pending_closes = PendingCloses,
opened = Opened,
blocked = Blocked } = CState] =
ets:lookup(Clients, Pid),
- TimeDiff = time_compat:convert_time_unit(
+ TimeDiff = erlang:convert_time_unit(
Now - Eldest, native, micro_seconds),
case Blocked orelse PendingCloses =:= Opened of
true -> Accs;
diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl
index ccf1e49662..8f368a8405 100644
--- a/src/file_handle_cache_stats.erl
+++ b/src/file_handle_cache_stats.erl
@@ -59,8 +59,8 @@ get() ->
lists:sort(ets:tab2list(?TABLE)).
timer_tc(Thunk) ->
- T1 = time_compat:monotonic_time(),
+ T1 = erlang:monotonic_time(),
Res = Thunk(),
- T2 = time_compat:monotonic_time(),
- Diff = time_compat:convert_time_unit(T2 - T1, native, micro_seconds),
+ T2 = erlang:monotonic_time(),
+ Diff = erlang:convert_time_unit(T2 - T1, native, micro_seconds),
{Diff, Res}.
diff --git a/src/gm.erl b/src/gm.erl
index 199cf7c4de..a83d8d1932 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -552,8 +552,8 @@ forget_group(GroupName) ->
init([GroupName, Module, Args, TxnFun]) ->
put(process_name, {?MODULE, GroupName}),
_ = random:seed(erlang:phash2([node()]),
- time_compat:monotonic_time(),
- time_compat:unique_integer()),
+ erlang:monotonic_time(),
+ erlang:unique_integer()),
Self = make_member(GroupName),
gen_server2:cast(self(), join),
{ok, #state { self = Self,
@@ -1338,7 +1338,11 @@ find_common(A, B, Common) ->
{{{value, Val}, A1}, {{value, Val}, B1}} ->
find_common(A1, B1, queue:in(Val, Common));
{{empty, _A}, _} ->
- {Common, B}
+ {Common, B};
+ {_, {_, B1}} ->
+ find_common(A, B1, Common);
+ {{_, A1}, _} ->
+ find_common(A1, B, Common)
end.
diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl
index 222a0bc849..73c05819d4 100644
--- a/src/pg2_fixed.erl
+++ b/src/pg2_fixed.erl
@@ -149,11 +149,11 @@ get_closest_pid(Name) ->
case get_members(Name) of
[] -> {error, {no_process, Name}};
Members ->
- X = time_compat:erlang_system_time(micro_seconds),
+ X = erlang:system_time(micro_seconds),
lists:nth((X rem length(Members))+1, Members)
end;
Members when is_list(Members) ->
- X = time_compat:erlang_system_time(micro_seconds),
+ X = erlang:system_time(micro_seconds),
lists:nth((X rem length(Members))+1, Members);
Else ->
Else
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7816cd53aa..d62a4b6935 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -507,6 +507,7 @@ await_startup(HaveSeenRabbitBoot) ->
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
+ %% The timeout value used is twice that of gen_server:call/2.
{running_applications, rabbit_misc:which_applications()},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
@@ -563,8 +564,9 @@ is_running() -> is_running(node()).
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
environment() ->
+ %% The timeout value is twice that of gen_server:call/2.
[{A, environment(A)} ||
- {A, _, _} <- lists:keysort(1, application:which_applications())].
+ {A, _, _} <- lists:keysort(1, application:which_applications(10000))].
environment(App) ->
Ignore = [default_pass, included_applications],
@@ -731,7 +733,7 @@ log_broker_started(Plugins) ->
erts_version_check() ->
ERTSVer = erlang:system_info(version),
- OTPRel = erlang:system_info(otp_release),
+ OTPRel = rabbit_misc:otp_release(),
case rabbit_misc:version_compare(?ERTS_MINIMUM, ERTSVer, lte) of
true when ?ERTS_MINIMUM =/= ERTSVer ->
ok;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6d3bf892b0..d9724e0012 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,27 +33,64 @@
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Queue's state
--record(q, {q,
+-record(q, {
+ %% an #amqqueue record
+ q,
+ %% none | {exclusive consumer channel PID, consumer tag}
exclusive_consumer,
+ %% Set to true if a queue has ever had a consumer.
+ %% This is used to determine when to delete auto-delete queues.
has_had_consumers,
+ %% backing queue module.
+ %% for mirrored queues, this will be rabbit_mirror_queue_master.
+ %% for non-priority and non-mirrored queues, rabbit_variable_queue.
+ %% see rabbit_backing_queue.
backing_queue,
+ %% backing queue state.
+ %% see rabbit_backing_queue, rabbit_variable_queue.
backing_queue_state,
+ %% consumers state, see rabbit_queue_consumers
consumers,
+ %% queue expiration value
expires,
+ %% timer used to periodically sync (flush) queue index
sync_timer_ref,
+ %% timer used to update ingress/egress rates and queue RAM duration target
rate_timer_ref,
+ %% timer used to clean up this queue due to TTL (on when unused)
expiry_timer_ref,
+ %% stats emission timer
stats_timer,
+ %% maps message IDs to {channel pid, MsgSeqNo}
+ %% pairs
msg_id_to_channel,
+ %% message TTL value
ttl,
+ %% timer used to delete expired messages
ttl_timer_ref,
ttl_timer_expiry,
+ %% Keeps track of channels that publish to this queue.
+ %% When channel process goes down, queues have to perform
+ %% certain cleanup.
senders,
+ %% dead letter exchange as a #resource record, if any
dlx,
dlx_routing_key,
+ %% max length in messages, if configured
max_length,
+ %% max length in bytes, if configured
max_bytes,
+ %% when policies change, this version helps queue
+ %% determine what previously scheduled/set up state to ignore,
+ %% e.g. message expiration messages from previously set up timers
+ %% that may or may not be still valid
args_policy_version,
+ %% used to discard outdated/superseded policy updates,
+ %% e.g. when policies are applied concurrently. See
+ %% https://github.com/rabbitmq/rabbitmq-server/issues/803 for one
+ %% example.
+ mirroring_policy_version = 0,
+ %% running | flow | idle
status
}).
@@ -431,7 +468,7 @@ ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
args_policy_version = Version}) ->
- After = (case Expiry - time_compat:os_system_time(micro_seconds) of
+ After = (case Expiry - os:system_time(micro_seconds) of
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
@@ -702,7 +739,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder1},
notify_decorators(State2),
case should_auto_delete(State2) of
- true ->
+ true ->
log_auto_delete(
io_lib:format(
"because all of its consumers (~p) were on a channel that was closed",
@@ -757,7 +794,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
case lists:min([TTL, MsgTTL]) of
undefined -> undefined;
- T -> time_compat:os_system_time(micro_seconds) + T * 1000
+ T -> os:system_time(micro_seconds) + T * 1000
end.
%% Logically this function should invoke maybe_send_drained/2.
@@ -768,7 +805,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
drop_expired_msgs(State) ->
case is_empty(State) of
true -> State;
- false -> drop_expired_msgs(time_compat:os_system_time(micro_seconds),
+ false -> drop_expired_msgs(os:system_time(micro_seconds),
State)
end.
@@ -1071,11 +1108,11 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true ->
+ true ->
log_auto_delete(
io_lib:format(
"because its last consumer with tag '~s' was cancelled",
- [ConsumerTag]),
+ [ConsumerTag]),
State),
stop(ok, State1)
end
@@ -1207,22 +1244,15 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast(start_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- %% lookup again to get policy for init_with_existing_bq
- {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
- true = BQ =/= rabbit_mirror_queue_master, %% assertion
- BQ1 = rabbit_mirror_queue_master,
- BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
-handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQ = rabbit_mirror_queue_master, %% assertion
- {BQ1, BQS1} = BQ:stop_mirroring(BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+handle_cast(update_mirroring, State = #q{q = Q,
+ mirroring_policy_version = Version}) ->
+ case needs_update_mirroring(Q, Version) of
+ false ->
+ noreply(State);
+ {Policy, NewVersion} ->
+ State1 = State#q{mirroring_policy_version = NewVersion},
+ noreply(update_mirroring(Policy, State1))
+ end;
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{consumers = Consumers,
@@ -1358,7 +1388,7 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
State, #q.stats_timer,
fun () -> emit_stats(State,
[{idle_since,
- time_compat:os_system_time(milli_seconds)},
+ os:system_time(milli_seconds)},
{consumer_utilisation, ''}])
end),
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
@@ -1371,19 +1401,67 @@ log_delete_exclusive({ConPid, _ConRef}, State) ->
log_delete_exclusive(ConPid, State);
log_delete_exclusive(ConPid, #q{ q = #amqqueue{ name = Resource } }) ->
#resource{ name = QName, virtual_host = VHost } = Resource,
- rabbit_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++
- " because its declaring connection ~p was closed",
- [QName, VHost, ConPid]).
+ rabbit_log_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++
+ "because its declaring connection ~p was closed",
+ [QName, VHost, ConPid]).
log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) ->
#resource{ name = QName, virtual_host = VHost } = Resource,
- rabbit_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++
- Reason,
- [QName, VHost]).
+ rabbit_log_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++
+ Reason,
+ [QName, VHost]).
+
+needs_update_mirroring(Q, Version) ->
+ {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name),
+ DBVersion = UpQ#amqqueue.policy_version,
+ case DBVersion > Version of
+ true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion};
+ false -> false
+ end.
+update_mirroring(Policy, State = #q{backing_queue = BQ}) ->
+ case update_to(Policy, BQ) of
+ start_mirroring ->
+ start_mirroring(State);
+ stop_mirroring ->
+ stop_mirroring(State);
+ ignore ->
+ State;
+ update_ha_mode ->
+ update_ha_mode(State)
+ end.
+update_to(undefined, rabbit_mirror_queue_master) ->
+ stop_mirroring;
+update_to(_, rabbit_mirror_queue_master) ->
+ update_ha_mode;
+update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ ignore;
+update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ start_mirroring.
+
+start_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+stop_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+
+update_ha_mode(State) ->
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ ok = rabbit_mirror_queue_misc:update_mirrors(Q),
+ State.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 299e254c50..8904c1dd74 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -100,7 +100,8 @@
-define(INFO_KEYS, [source_name, source_kind,
destination_name, destination_kind,
- routing_key, arguments]).
+ routing_key, arguments,
+ vhost]).
recover(XNames, QNames) ->
rabbit_misc:table_filter(
@@ -272,6 +273,7 @@ infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
i(source_name, #binding{source = SrcName}) -> SrcName#resource.name;
i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind;
+i(vhost, #binding{source = SrcName}) -> SrcName#resource.virtual_host;
i(destination_name, #binding{destination = DstName}) -> DstName#resource.name;
i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind;
i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 7f653c3780..bf50fd6c49 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -23,7 +23,7 @@
sync_queue/1, cancel_sync_queue/1, become/1,
purge_queue/1]).
--import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]).
+-import(rabbit_misc, [rpc_call/4, rpc_call/5]).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -37,6 +37,7 @@
reset,
force_reset,
rotate_logs,
+ hipe_compile,
{join_cluster, [?RAM_DEF]},
change_cluster_node_type,
@@ -113,7 +114,7 @@
[stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
join_cluster, change_cluster_node_type, update_cluster_nodes,
forget_cluster_node, rename_cluster_node, cluster_status, status,
- environment, eval, force_boot, help, node_health_check]).
+ environment, eval, force_boot, help, node_health_check, hipe_compile]).
-define(COMMANDS_WITH_TIMEOUT,
[list_user_permissions, list_policies, list_queues, list_exchanges,
@@ -380,6 +381,16 @@ action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Rotating logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, []});
+action(hipe_compile, _Node, [TargetDir], _Opts, _Inform) ->
+ ok = application:load(rabbit),
+ case rabbit_hipe:can_hipe_compile() of
+ true ->
+ {ok, _, _} = rabbit_hipe:compile_to_directory(TargetDir),
+ ok;
+ false ->
+ {error, "HiPE compilation is not supported"}
+ end;
+
action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
Inform("Closing connection \"~s\"", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
@@ -579,56 +590,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
action(list_users, Node, [], _Opts, Inform, Timeout) ->
Inform("Listing users", []),
- call(Node, {rabbit_auth_backend_internal, list_users, []},
- rabbit_auth_backend_internal:user_info_keys(), true, Timeout);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
+ rabbit_auth_backend_internal:user_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8]);
action(list_permissions, Node, [], Opts, Inform, Timeout) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost \"~s\"", [VHost]),
- call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
- rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout,
- true);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
+ rabbit_auth_backend_internal:vhost_perms_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_parameters, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing runtime parameters", []),
- call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
- rabbit_runtime_parameters:info_keys(), Timeout);
+ call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
+ rabbit_runtime_parameters:info_keys(),
+ [{timeout, Timeout}]);
action(list_policies, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing policies", []),
- call(Node, {rabbit_policy, list_formatted, [VHostArg]},
- rabbit_policy:info_keys(), Timeout);
+ call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
+ rabbit_policy:info_keys(),
+ [{timeout, Timeout}]);
action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing vhosts", []),
ArgAtoms = default_if_empty(Args, [name]),
- call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout);
+ call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
+ [{timeout, Timeout}, to_bin_utf8]);
action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
{error_string,
"list_user_permissions expects a username argument, but none provided."};
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
Inform("Listing permissions for user ~p", Args),
- call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
- rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout,
- true);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
+ rabbit_auth_backend_internal:user_perms_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_queues, Node, Args, Opts, Inform, Timeout) ->
- [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
Inform("Listing queues", []),
+ %% User options
+ [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, messages]),
- call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
- ArgAtoms, Timeout);
+
+ %% Data for emission
+ Nodes = nodes_in_cluster(Node, Timeout),
+ OnlineChunks = if Online -> length(Nodes); true -> 0 end,
+ OfflineChunks = if Offline -> 1; true -> 0 end,
+ ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
+ TimeoutOpt = {timeout, Timeout},
+ EmissionRef = make_ref(),
+ EmissionRefOpt = {ref, EmissionRef},
+
+ _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, type]),
- call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, Timeout);
+ call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}]);
action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing bindings", []),
@@ -636,27 +665,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
ArgAtoms = default_if_empty(Args, [source_name, source_kind,
destination_name, destination_kind,
routing_key, arguments]),
- call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, Timeout);
+ call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}]);
action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
- call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]},
- ArgAtoms, Timeout);
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
messages_unacknowledged]),
- call(Node, {rabbit_channel, info_all, [ArgAtoms]},
- ArgAtoms, Timeout);
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
+ [{timeout, Timeout}, {chunks, length(Nodes)}]);
action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
Inform("Listing consumers", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
- rabbit_amqqueue:consumer_info_keys(), Timeout).
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
+ rabbit_amqqueue:consumer_info_keys(),
+ [{timeout, Timeout}, {chunks, length(Nodes)}]).
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
@@ -766,17 +799,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
{X, Value} -> Value
end, IsEscaped) || X <- InfoItemKeys]).
-display_info_message(IsEscaped) ->
+display_info_message(IsEscaped, InfoItemKeys) ->
fun ([], _) ->
ok;
- ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
+ ([FirstResult|_] = List, _) when is_list(FirstResult) ->
lists:foreach(fun(Result) ->
display_info_message_row(IsEscaped, Result, InfoItemKeys)
end,
List),
ok;
- (Result, InfoItemKeys) ->
- display_info_message_row(IsEscaped, Result, InfoItemKeys)
+ (Result, _) ->
+ display_info_message_row(IsEscaped, Result, InfoItemKeys),
+ ok
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -833,7 +867,10 @@ display_call_result(Node, MFA) ->
end.
unsafe_rpc(Node, Mod, Fun, Args) ->
- case rpc_call(Node, Mod, Fun, Args) of
+ unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
+
+unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
+ case rpc_call(Node, Mod, Fun, Args, Timeout) of
{badrpc, _} = Res -> throw(Res);
Normal -> Normal
end.
@@ -852,33 +889,42 @@ ensure_app_running(Node) ->
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
-call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) ->
- call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false).
+call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
+ Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
+ display_emission_result(Ref, InfoKeys, Opts).
+
+start_emission(Node, {Mod, Fun, Args}, Opts) ->
+ ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
+ Timeout = proplists:get_value(timeout, Opts, infinity),
+ Ref = proplists:get_value(ref, Opts, make_ref()),
+ rabbit_control_misc:spawn_emitter_caller(
+ Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
+ Ref, self(), Timeout),
+ Ref.
+
+display_emission_result(Ref, InfoKeys, Opts) ->
+ IsEscaped = proplists:get_value(is_escaped, Opts, false),
+ Chunks = proplists:get_value(chunks, Opts, 1),
+ Timeout = proplists:get_value(timeout, Opts, infinity),
+ EmissionStatus = rabbit_control_misc:wait_for_info_messages(
+ self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
+ emission_to_action_result(EmissionStatus).
+
+%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
+%% into form expected by rabbit_cli:main/3.
+emission_to_action_result({ok, ok}) ->
+ ok;
+emission_to_action_result({error, Error}) ->
+ Error.
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
- call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false).
+prepare_call_args(Args, ToBinUtf8) ->
+ case ToBinUtf8 of
+ true -> valid_utf8_args(Args);
+ false -> Args
+ end.
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) ->
- Args0 = case ToBinUtf8 of
- true -> lists:map(fun list_to_binary_utf8/1, Args);
- false -> Args
- end,
- Ref = make_ref(),
- Pid = self(),
- spawn_link(
- fun () ->
- case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
- Ref, Pid, Timeout) of
- {error, _} = Error ->
- Pid ! {error, Error};
- {bad_argument, _} = Error ->
- Pid ! {error, Error};
- _ ->
- ok
- end
- end),
- rabbit_control_misc:wait_for_info_messages(
- Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout).
+valid_utf8_args(Args) ->
+ lists:map(fun list_to_binary_utf8/1, Args).
list_to_binary_utf8(L) ->
B = list_to_binary(L),
@@ -928,7 +974,10 @@ split_list([_]) -> exit(even_list_needed);
split_list([A, B | T]) -> [{A, B} | split_list(T)].
nodes_in_cluster(Node) ->
- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
+
+nodes_in_cluster(Node, Timeout) ->
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
alarms_by_node(Name) ->
Status = unsafe_rpc(Name, rabbit, status, []),
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 252405d62b..b5182ee2e0 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -53,7 +53,7 @@ make_msg(Msg = #basic_message{content = Content,
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
- TimeSec = time_compat:os_system_time(seconds),
+ TimeSec = os:system_time(seconds),
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
HeadersFun2 =
fun (Headers) ->
@@ -139,7 +139,19 @@ update_x_death_header(Info, Headers) ->
end,
rabbit_misc:set_table_value(
Headers, <<"x-death">>, array,
- [{table, rabbit_misc:sort_field_table(Info1)} | Others])
+ [{table, rabbit_misc:sort_field_table(Info1)} | Others]);
+ {<<"x-death">>, InvalidType, Header} ->
+ rabbit_log:warning("Message has invalid x-death header (type: ~p)."
+ " Resetting header ~p~n",
+ [InvalidType, Header]),
+ %% if x-death is something other than an array (list)
+ %% then we reset it: this happens when some clients consume
+ %% a message and re-publish is, converting header values
+ %% to strings, intentionally or not.
+ %% See rabbitmq/rabbitmq-server#767 for details.
+ rabbit_misc:set_table_value(
+ Headers, <<"x-death">>, array,
+ [{table, [{<<"count">>, long, 1} | Info]}])
end.
ensure_xdeath_event_count({table, Info}, InitialVal) when InitialVal >= 1 ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 35d7eb7940..b5970274d4 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -76,8 +76,8 @@ connect({Username, none}, VHost, Protocol, Pid, Infos) ->
VHost, Protocol, Pid, Infos);
connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_pass_login(
- Username, Password) end,
+ connect0(fun () -> rabbit_access_control:check_user_login(
+ Username, [{password, Password}, {vhost, VHost}]) end,
VHost, Protocol, Pid, Infos).
connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 88a8096fd4..a56b92b501 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -213,9 +213,11 @@ get_disk_free(Dir) ->
get_disk_free(Dir, {unix, Sun})
when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris ->
- parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir));
+ Df = os:find_executable("df"),
+ parse_free_unix(rabbit_misc:os_cmd(Df ++ " -k " ++ Dir));
get_disk_free(Dir, {unix, _}) ->
- parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
+ Df = os:find_executable("df"),
+ parse_free_unix(rabbit_misc:os_cmd(Df ++ " -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ Dir ++ "\"")).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index efe8495299..0103d4e503 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -100,7 +100,7 @@ publish(_Other, _Format, _Data, _State) ->
publish1(RoutingKey, Format, Data, LogExch) ->
%% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
%% second resolution, not millisecond.
- Timestamp = time_compat:os_system_time(seconds),
+ Timestamp = os:system_time(seconds),
Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data],
Headers = [{<<"node">>, longstr, list_to_binary(atom_to_list(node()))}],
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2e9afbfd2e..5d646e0c90 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -346,11 +346,17 @@ i(policy, X) -> case rabbit_policy:name(X) of
none -> '';
Policy -> Policy
end;
-i(Item, _) -> throw({bad_argument, Item}).
+i(Item, #exchange{type = Type} = X) ->
+ case (type_to_module(Type)):info(X, [Item]) of
+ [{Item, I}] -> I;
+ [] -> throw({bad_argument, Item})
+ end.
-info(X = #exchange{}) -> infos(?INFO_KEYS, X).
+info(X = #exchange{type = Type}) ->
+ infos(?INFO_KEYS, X) ++ (type_to_module(Type)):info(X).
-info(X = #exchange{}, Items) -> infos(Items, X).
+info(X = #exchange{type = _Type}, Items) ->
+ infos(Items, X).
info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 8a6886e376..ed675b572a 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -23,6 +23,7 @@
-export([validate/1, validate_binding/2,
create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
+-export([info/1, info/2]).
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
@@ -31,6 +32,9 @@
{requires, rabbit_registry},
{enables, kernel_ready}]}).
+info(_X) -> [].
+info(_X, _) -> [].
+
description() ->
[{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index d81e407f8f..3aebc07b41 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -23,6 +23,7 @@
-export([validate/1, validate_binding/2,
create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
+-export([info/1, info/2]).
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
@@ -31,6 +32,9 @@
{requires, rabbit_registry},
{enables, kernel_ready}]}).
+info(_X) -> [].
+info(_X, _) -> [].
+
description() ->
[{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 444d507c7e..cd41b903c2 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -24,6 +24,7 @@
-export([validate/1, validate_binding/2,
create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
+-export([info/1, info/2]).
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
@@ -37,6 +38,9 @@
rabbit_framing:amqp_table()) -> boolean()).
-endif.
+info(_X) -> [].
+info(_X, _) -> [].
+
description() ->
[{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index c8ca7ecae4..b2e2798e3a 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -23,6 +23,10 @@
-export([validate/1, validate_binding/2,
create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
+-export([info/1, info/2]).
+
+info(_X) -> [].
+info(_X, _) -> [].
description() ->
[{description,
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 0eccb66cfd..60be070426 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -24,6 +24,7 @@
-export([validate/1, validate_binding/2,
create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
+-export([info/1, info/2]).
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
@@ -34,6 +35,9 @@
%%----------------------------------------------------------------------------
+info(_X) -> [].
+info(_X, _) -> [].
+
description() ->
[{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 3dd0421485..d2d37f0ec0 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -24,6 +24,7 @@
-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]).
-export([lock_file/1]).
-export([read_file_info/1]).
+-export([filename_as_a_directory/1]).
-import(file_handle_cache, [with_handle/1, with_handle/2]).
@@ -59,6 +60,7 @@
(file:filename(), file:filename())
-> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
+-spec(filename_as_a_directory/1 :: (file:filename()) -> file:filename()).
-endif.
@@ -306,3 +308,11 @@ lock_file(Path) ->
ok = prim_file:close(Lock)
end)
end.
+
+filename_as_a_directory(FileName) ->
+ case lists:last(FileName) of
+ "/" ->
+ FileName;
+ _ ->
+ FileName ++ "/"
+ end.
diff --git a/src/rabbit_hipe.erl b/src/rabbit_hipe.erl
index 05b5f3719d..6957d85cb4 100644
--- a/src/rabbit_hipe.erl
+++ b/src/rabbit_hipe.erl
@@ -5,15 +5,15 @@
%% practice 2 processes seems just as fast as any other number > 1,
%% and keeps the progress bar realistic-ish.
-define(HIPE_PROCESSES, 2).
--export([maybe_hipe_compile/0, log_hipe_result/1]).
-%% HiPE compilation happens before we have log handlers - so we have
-%% to io:format/2, it's all we can do.
+-export([maybe_hipe_compile/0, log_hipe_result/1]).
+-export([compile_to_directory/1]).
+-export([can_hipe_compile/0]).
+%% Compile and load during server startup sequence
maybe_hipe_compile() ->
{ok, Want} = application:get_env(rabbit, hipe_compile),
- Can = code:which(hipe) =/= non_existing,
- case {Want, Can} of
+ case {Want, can_hipe_compile()} of
{true, true} -> hipe_compile();
{true, false} -> false;
{false, _} -> {ok, disabled}
@@ -33,42 +33,53 @@ log_hipe_result(false) ->
rabbit_log:warning(
"Not HiPE compiling: HiPE not found in this Erlang installation.~n").
+hipe_compile() ->
+ hipe_compile(fun compile_and_load/1, false).
+
+compile_to_directory(Dir0) ->
+ Dir = rabbit_file:filename_as_a_directory(Dir0),
+ ok = prepare_ebin_directory(Dir),
+ hipe_compile(fun (Mod) -> compile_and_save(Mod, Dir) end, true).
+
+needs_compilation(Mod, Force) ->
+ Exists = code:which(Mod) =/= non_existing,
+ %% We skip modules already natively compiled. This
+ %% happens when RabbitMQ is stopped (just the
+ %% application, not the entire node) and started
+ %% again.
+ NotYetCompiled = not already_hipe_compiled(Mod),
+ NotVersioned = not compiled_with_version_support(Mod),
+ Exists andalso (Force orelse (NotYetCompiled andalso NotVersioned)).
+
%% HiPE compilation happens before we have log handlers and can take a
%% long time, so make an exception to our no-stdout policy and display
%% progress via stdout.
-hipe_compile() ->
+hipe_compile(CompileFun, Force) ->
{ok, HipeModulesAll} = application:get_env(rabbit, hipe_modules),
- HipeModules = [HM || HM <- HipeModulesAll,
- code:which(HM) =/= non_existing andalso
- %% We skip modules already natively compiled. This
- %% happens when RabbitMQ is stopped (just the
- %% application, not the entire node) and started
- %% again.
- already_hipe_compiled(HM)
- andalso (not compiled_with_version_support(HM))],
+ HipeModules = lists:filter(fun(Mod) -> needs_compilation(Mod, Force) end, HipeModulesAll),
case HipeModules of
[] -> {ok, already_compiled};
- _ -> do_hipe_compile(HipeModules)
+ _ -> do_hipe_compile(HipeModules, CompileFun)
end.
already_hipe_compiled(Mod) ->
try
%% OTP 18.x or later
- Mod:module_info(native) =:= false
+ Mod:module_info(native) =:= true
%% OTP prior to 18.x
catch error:badarg ->
- code:is_module_native(Mod) =:= false
+ code:is_module_native(Mod) =:= true
end.
compiled_with_version_support(Mod) ->
proplists:get_value(erlang_version_support, Mod:module_info(attributes))
=/= undefined.
-do_hipe_compile(HipeModules) ->
+do_hipe_compile(HipeModules, CompileFun) ->
Count = length(HipeModules),
io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
- T1 = time_compat:monotonic_time(),
+ T1 = erlang:monotonic_time(),
%% We use code:get_object_code/1 below to get the beam binary,
%% instead of letting hipe get it itself, because hipe:c/{1,2}
%% expects the given filename to actually exist on disk: it does not
@@ -79,11 +90,7 @@ do_hipe_compile(HipeModules) ->
%% advanced API does not load automatically the code, except if the
%% 'load' option is set.
PidMRefs = [spawn_monitor(fun () -> [begin
- {M, Beam, _} =
- code:get_object_code(M),
- {ok, _} =
- hipe:compile(M, [], Beam,
- [o3, load]),
+ CompileFun(M),
io:format("#")
end || M <- Ms]
end) ||
@@ -92,8 +99,8 @@ do_hipe_compile(HipeModules) ->
{'DOWN', MRef, process, _, normal} -> ok;
{'DOWN', MRef, process, _, Reason} -> exit(Reason)
end || {_Pid, MRef} <- PidMRefs],
- T2 = time_compat:monotonic_time(),
- Duration = time_compat:convert_time_unit(T2 - T1, native, seconds),
+ T2 = erlang:monotonic_time(),
+ Duration = erlang:convert_time_unit(T2 - T1, native, seconds),
io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]),
{ok, Count, Duration}.
@@ -101,3 +108,39 @@ split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
split0([], Ls) -> Ls;
split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]).
+
+prepare_ebin_directory(Dir) ->
+ ok = rabbit_file:ensure_dir(Dir),
+ ok = delete_beam_files(Dir),
+ ok.
+
+delete_beam_files(Dir) ->
+ {ok, Files} = file:list_dir(Dir),
+ lists:foreach(fun(File) ->
+ case filename:extension(File) of
+ ".beam" ->
+ ok = file:delete(filename:join([Dir, File]));
+ _ ->
+ ok
+ end
+ end,
+ Files).
+
+compile_and_load(Mod) ->
+ {Mod, Beam, _} = code:get_object_code(Mod),
+ {ok, _} = hipe:compile(Mod, [], Beam, [o3, load]).
+
+compile_and_save(Module, Dir) ->
+ {Module, BeamCode, _} = code:get_object_code(Module),
+ BeamName = filename:join([Dir, atom_to_list(Module) ++ ".beam"]),
+ {ok, {Architecture, NativeCode}} = hipe:compile(Module, [], BeamCode, [o3]),
+ {ok, _, Chunks0} = beam_lib:all_chunks(BeamCode),
+ ChunkName = hipe_unified_loader:chunk_name(Architecture),
+ Chunks1 = lists:keydelete(ChunkName, 1, Chunks0),
+ Chunks = Chunks1 ++ [{ChunkName,NativeCode}],
+ {ok, BeamPlusNative} = beam_lib:build_module(Chunks),
+ ok = file:write_file(BeamName, BeamPlusNative),
+ BeamName.
+
+can_hipe_compile() ->
+ code:which(hipe) =/= non_existing.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e447e9de82..9674a4ef2c 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -363,7 +363,7 @@ fetch(AckRequired, State = #state { backing_queue = BQ,
State1 = State #state { backing_queue_state = BQS1 },
{Result, case Result of
empty -> State1;
- {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1)
+ {_MsgId, _IsDelivered, _AckTag} -> drop_one(AckRequired, State1)
end}.
drop(AckRequired, State = #state { backing_queue = BQ,
@@ -372,7 +372,7 @@ drop(AckRequired, State = #state { backing_queue = BQ,
State1 = State #state { backing_queue_state = BQS1 },
{Result, case Result of
empty -> State1;
- {_MsgId, AckTag} -> drop_one(AckTag, State1)
+ {_MsgId, _AckTag} -> drop_one(AckRequired, State1)
end}.
ack(AckTags, State = #state { gm = GM,
@@ -518,6 +518,7 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
Depth = BQ:depth(BQS1),
true = Len == Depth, %% ASSERTION: everything must have been requeued
ok = gm:broadcast(GM, {depth, Depth}),
+ WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
#state { name = QName,
gm = GM,
coordinator = CPid,
@@ -525,7 +526,8 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
backing_queue_state = BQS1,
seen_status = Seen,
confirmed = [],
- known_senders = sets:from_list(KS) }.
+ known_senders = sets:from_list(KS),
+ wait_timeout = WaitTimeout }.
sender_death_fun() ->
Self = self(),
@@ -556,10 +558,10 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop_one(AckTag, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
+drop_one(AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckRequired}),
State.
drop(PrevLen, AckRequired, State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 849efa3611..b188298a9b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -20,7 +20,7 @@
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, validate_policy/1,
+ is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
@@ -64,6 +64,8 @@
-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()).
-spec(update_mirrors/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
+-spec(update_mirrors/1 ::
+ (rabbit_types:amqqueue()) -> 'ok').
-spec(maybe_drop_master_after_sync/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok').
@@ -384,15 +386,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
- {true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
- {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
- {true, true} -> update_mirrors0(OldQ, NewQ)
+ _ -> rabbit_amqqueue:update_mirroring(QPid)
end.
-update_mirrors0(OldQ = #amqqueue{name = QName},
- NewQ = #amqqueue{name = QName}) ->
- {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
- {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+update_mirrors(Q = #amqqueue{name = QName}) ->
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
%% When a mirror dies, remove_from_queue/2 might have to add new
@@ -406,7 +405,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
drop_mirrors(QName, OldNodes -- NewNodes),
%% This is for the case where no extra nodes were added but we changed to
%% a policy requiring auto-sync.
- maybe_auto_sync(NewQ),
+ maybe_auto_sync(Q),
ok.
%% The arrival of a newly synced slave may cause the master to die if
diff --git a/src/rabbit_mirror_queue_mode_exactly.erl b/src/rabbit_mirror_queue_mode_exactly.erl
index 4721ad6136..28ed8ca463 100644
--- a/src/rabbit_mirror_queue_mode_exactly.erl
+++ b/src/rabbit_mirror_queue_mode_exactly.erl
@@ -46,8 +46,8 @@ suggested_queue_nodes(Count, MNode, SNodes, _SSNodes, Poss) ->
shuffle(L) ->
random:seed(erlang:phash2([node()]),
- time_compat:monotonic_time(),
- time_compat:unique_integer()),
+ erlang:monotonic_time(),
+ erlang:unique_integer()),
{_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
L1.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index a97a9b50c8..898aa5abcf 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -108,7 +108,7 @@ master_batch_go0(Args, BatchSize, BQ, BQS) ->
false -> {cont, Acc1}
end
end,
- FoldAcc = {[], 0, {0, BQ:depth(BQS)}, time_compat:monotonic_time()},
+ FoldAcc = {[], 0, {0, BQ:depth(BQS)}, erlang:monotonic_time()},
bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).
master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
@@ -168,12 +168,12 @@ stop_syncer(Syncer, Msg) ->
end.
maybe_emit_stats(Last, I, EmitStats, Log) ->
- Interval = time_compat:convert_time_unit(
- time_compat:monotonic_time() - Last, native, micro_seconds),
+ Interval = erlang:convert_time_unit(
+ erlang:monotonic_time() - Last, native, micro_seconds),
case Interval > ?SYNC_PROGRESS_INTERVAL of
true -> EmitStats({syncing, I}),
Log("~p messages", [I]),
- time_compat:monotonic_time();
+ erlang:monotonic_time();
false -> Last
end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index afd0508aac..6a57f6bb2c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -423,6 +423,7 @@ cluster_status(WhichNodes) ->
node_info() ->
{rabbit_misc:otp_release(), rabbit_misc:version(),
+ mnesia:system_info(protocol_version),
cluster_status_from_mnesia()}.
node_type() ->
@@ -593,26 +594,37 @@ check_cluster_consistency() ->
end.
check_cluster_consistency(Node, CheckNodesConsistency) ->
- case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ case remote_node_info(Node) of
{badrpc, _Reason} ->
{error, not_found};
- {_OTP, _Rabbit, {error, _}} ->
+ {_OTP, Rabbit, DelegateModuleHash, _Status} when is_binary(DelegateModuleHash) ->
+ %% when a delegate module .beam file hash is present
+ %% in the tuple, we are dealing with an old version
+ rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit);
+ {_OTP, _Rabbit, _Protocol, {error, _}} ->
{error, not_found};
- {OTP, Rabbit, {ok, Status}} when CheckNodesConsistency ->
- case check_consistency(OTP, Rabbit, Node, Status) of
+ {OTP, Rabbit, Protocol, {ok, Status}} when CheckNodesConsistency ->
+ case check_consistency(Node, OTP, Rabbit, Protocol, Status) of
{error, _} = E -> E;
{ok, Res} -> {ok, Res}
end;
- {OTP, Rabbit, {ok, Status}} ->
- case check_consistency(OTP, Rabbit) of
+ {OTP, Rabbit, Protocol, {ok, Status}} ->
+ case check_consistency(Node, OTP, Rabbit, Protocol) of
{error, _} = E -> E;
ok -> {ok, Status}
- end;
- {_OTP, Rabbit, _Hash, _Status} ->
- %% delegate hash checking implies version mismatch
- rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit)
+ end
+ end.
+
+remote_node_info(Node) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _} = Error -> Error;
+ %% RabbitMQ prior to 3.6.2
+ {OTP, Rabbit, Status} -> {OTP, Rabbit, unsupported, Status};
+ %% RabbitMQ 3.6.2 or later
+ {OTP, Rabbit, Protocol, Status} -> {OTP, Rabbit, Protocol, Status}
end.
+
%%--------------------------------------------------------------------
%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
@@ -763,14 +775,14 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-check_consistency(OTP, Rabbit) ->
+check_consistency(Node, OTP, Rabbit, ProtocolVersion) ->
rabbit_misc:sequence_error(
- [rabbit_version:check_otp_consistency(OTP),
+ [check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP),
check_rabbit_consistency(Rabbit)]).
-check_consistency(OTP, Rabbit, Node, Status) ->
+check_consistency(Node, OTP, Rabbit, ProtocolVersion, Status) ->
rabbit_misc:sequence_error(
- [rabbit_version:check_otp_consistency(OTP),
+ [check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP),
check_rabbit_consistency(Rabbit),
check_nodes_consistency(Node, Status)]).
@@ -785,6 +797,55 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
[node(), Node, Node])}}
end.
+check_mnesia_or_otp_consistency(_Node, unsupported, OTP) ->
+ rabbit_version:check_otp_consistency(OTP);
+check_mnesia_or_otp_consistency(Node, ProtocolVersion, _) ->
+ check_mnesia_consistency(Node, ProtocolVersion).
+
+check_mnesia_consistency(Node, ProtocolVersion) ->
+ % If mnesia is running we will just check protocol version
+ % If it's not running, we don't want it to join cluster until all checks pass
+ % so we start it without `dir` env variable to prevent
+ % joining cluster and/or corrupting data
+ with_running_or_clean_mnesia(fun() ->
+ case negotiate_protocol([Node]) of
+ [Node] -> ok;
+ [] ->
+ LocalVersion = mnesia:system_info(protocol_version),
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("Mnesia protocol negotiation failed."
+ " Local version: ~p."
+ " Remote version ~p",
+ [LocalVersion, ProtocolVersion])}}
+ end
+ end).
+
+negotiate_protocol([Node]) ->
+ mnesia_monitor:negotiate_protocol([Node]).
+
+with_running_or_clean_mnesia(Fun) ->
+ IsMnesiaRunning = case mnesia:system_info(is_running) of
+ yes -> true;
+ no -> false;
+ stopping ->
+ ensure_mnesia_not_running(),
+ false;
+ starting ->
+ ensure_mnesia_running(),
+ true
+ end,
+ case IsMnesiaRunning of
+ true -> Fun();
+ false ->
+ {ok, MnesiaDir} = application:get_env(mnesia, dir),
+ application:unset_env(mnesia, dir),
+ mnesia:start(),
+ Result = Fun(),
+ application:stop(mnesia),
+ application:set_env(mnesia, dir, MnesiaDir),
+ Result
+ end.
+
check_rabbit_consistency(Remote) ->
rabbit_version:check_version_consistency(
rabbit_misc:version(), Remote, "Rabbit",
@@ -819,16 +880,20 @@ find_auto_cluster_node([Node | Nodes]) ->
"Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]),
find_auto_cluster_node(Nodes)
end,
- case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _} = Reason -> Fail("~p~n", [Reason]);
+ case remote_node_info(Node) of
+ {badrpc, _} = Reason ->
+ Fail("~p~n", [Reason]);
%% old delegate hash check
- {_OTP, RMQ, _Hash, _} -> Fail("version ~s~n", [RMQ]);
- {_OTP, _RMQ, {error, _} = E} -> Fail("~p~n", [E]);
- {OTP, RMQ, _} -> case check_consistency(OTP, RMQ) of
- {error, _} -> Fail("versions ~p~n",
- [{OTP, RMQ}]);
- ok -> {ok, Node}
- end
+ {_OTP, RMQ, Hash, _} when is_binary(Hash) ->
+ Fail("version ~s~n", [RMQ]);
+ {_OTP, _RMQ, _Protocol, {error, _} = E} ->
+ Fail("~p~n", [E]);
+ {OTP, RMQ, Protocol, _} ->
+ case check_consistency(Node, OTP, RMQ, Protocol) of
+ {error, _} -> Fail("versions ~p~n",
+ [{OTP, RMQ}]);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 6f41836b98..976f4a4b2f 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -414,7 +414,12 @@ handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
fun () ->
case rpc:call(Node, rabbit, is_running, []) of
{badrpc, _} -> ok;
- _ -> cast(Rep, {partial_partition,
+ _ ->
+ rabbit_log:warning("Received a 'DOWN' message"
+ " from ~p but still can"
+ " communicate with it ~n",
+ [Node]),
+ cast(Rep, {partial_partition,
Node, node(), RepGUID})
end
end);
diff --git a/src/rabbit_password.erl b/src/rabbit_password.erl
index d5b0945de9..8d5cf8d69e 100644
--- a/src/rabbit_password.erl
+++ b/src/rabbit_password.erl
@@ -36,8 +36,8 @@ hash(HashingMod, Cleartext) ->
generate_salt() ->
random:seed(erlang:phash2([node()]),
- time_compat:monotonic_time(),
- time_compat:unique_integer()),
+ erlang:monotonic_time(),
+ erlang:unique_integer()),
Salt = random:uniform(16#ffffffff),
<<Salt:32>>.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 8f7319182e..47574a9d55 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -253,8 +253,7 @@ prepare_plugins(Enabled) ->
Wanted = dependencies(false, Enabled, AllPlugins),
WantedPlugins = lookup_plugins(Wanted, AllPlugins),
{ValidPlugins, Problems} = validate_plugins(WantedPlugins),
- %% TODO: error message formatting
- rabbit_log:warning(format_invalid_plugins(Problems)),
+ maybe_warn_about_invalid_plugins(Problems),
case filelib:ensure_dir(ExpandDir ++ "/") of
ok -> ok;
{error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
@@ -266,6 +265,13 @@ prepare_plugins(Enabled) ->
PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")],
Wanted.
+maybe_warn_about_invalid_plugins([]) ->
+ ok;
+maybe_warn_about_invalid_plugins(InvalidPlugins) ->
+ %% TODO: error message formatting
+ rabbit_log:warning(format_invalid_plugins(InvalidPlugins)).
+
+
format_invalid_plugins(InvalidPlugins) ->
lists:flatten(["Failed to enable some plugins: \r\n"
| [format_invalid_plugin(Plugin)
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 1f7e521dfd..a9caadf972 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -221,11 +221,11 @@ validate(_VHost, <<"policy">>, Name, Term, _User) ->
Name, policy_validation(), Term).
notify(VHost, <<"policy">>, Name, Term) ->
- rabbit_event:notify(policy_set, [{name, Name} | Term]),
+ rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]),
update_policies(VHost).
notify_clear(VHost, <<"policy">>, Name) ->
- rabbit_event:notify(policy_cleared, [{name, Name}]),
+ rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]),
update_policies(VHost).
%%----------------------------------------------------------------------------
@@ -242,8 +242,10 @@ update_policies(VHost) ->
fun() ->
[mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
case catch list(VHost) of
- {error, {no_such_vhost, _}} ->
- ok; %% [2]
+ {'EXIT', {throw, {error, {no_such_vhost, _}}}} ->
+ {[], []}; %% [2]
+ {'EXIT', Exit} ->
+ exit(Exit);
Policies ->
{[update_exchange(X, Policies) ||
X <- rabbit_exchange:list(VHost)],
@@ -274,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
NewPolicy -> case rabbit_amqqueue:update(
QName, fun(Q1) ->
rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy})
+ Q1#amqqueue{policy = NewPolicy,
+ policy_version =
+ Q1#amqqueue.policy_version + 1 })
end) of
#amqqueue{} = Q1 -> {Q, Q1};
not_found -> {Q, Q }
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index b58a8c535e..a3bfb5cdfa 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -205,8 +205,8 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
-batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = partition_publish_batch(Publishes),
+batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
+ PubDict = partition_publish_batch(Publishes, MaxP),
lists:foldl(
fun ({Priority, Pubs}, St) ->
pick1(fun (_P, BQSN) ->
@@ -227,8 +227,8 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
-batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = partition_publish_delivered_batch(Publishes),
+batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
+ PubDict = partition_publish_delivered_batch(Publishes, MaxP),
{PrioritiesAndAcks, State1} =
lists:foldl(
fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
@@ -404,7 +404,6 @@ msg_rates(#state{bq = BQ, bqss = BQSs}) ->
end, {0.0, 0.0}, BQSs);
msg_rates(#passthrough{bq = BQ, bqs = BQS}) ->
BQ:msg_rates(BQS).
-
info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) ->
fold0(fun (P, BQSN, Acc) ->
combine_status(P, BQ:info(backing_queue_status, BQSN), Acc)
@@ -433,8 +432,8 @@ set_queue_mode(Mode, State = #state{bq = BQ}) ->
set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(set_queue_mode(Mode, BQS)).
-zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{}) ->
- MsgsByPriority = partition_publish_delivered_batch(Msgs),
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) ->
+ MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP),
lists:foldl(fun (Acks, MAs) ->
{P, _AckTag} = hd(Acks),
Pubs = orddict:fetch(P, MsgsByPriority),
@@ -484,13 +483,14 @@ foreach1(_Fun, [], BQSAcc) ->
%% For a given thing, just go to its BQ
pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) ->
- {P, BQSN} = priority(Prioritisable, BQSs),
+ {P, BQSN} = priority_bq(Prioritisable, BQSs),
a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}).
%% Fold over results
fold2(Fun, Acc, State = #state{bqss = BQSs}) ->
{Res, BQSs1} = fold2(Fun, Acc, BQSs, []),
{Res, a(State#state{bqss = BQSs1})}.
+
fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) ->
{Acc1, BQSN1} = Fun(P, BQSN, Acc),
fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]);
@@ -532,7 +532,7 @@ fold_by_acktags2(Fun, AckTags, State) ->
%% For a given thing, just go to its BQ
pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) ->
- {P, BQSN} = priority(Prioritisable, BQSs),
+ {P, BQSN} = priority_bq(Prioritisable, BQSs),
{Res, BQSN1} = Fun(P, BQSN),
{Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}.
@@ -563,8 +563,8 @@ findfold3(Fun, Acc, NotFound, [{P, BQSN} | Rest], BQSAcc) ->
findfold3(_Fun, Acc, NotFound, [], BQSAcc) ->
{NotFound, Acc, lists:reverse(BQSAcc)}.
-bq_fetch(P, []) -> exit({not_found, P});
-bq_fetch(P, [{P, BQSN} | _]) -> BQSN;
+bq_fetch(P, []) -> exit({not_found, P});
+bq_fetch(P, [{P, BQSN} | _]) -> {P, BQSN};
bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T).
bq_store(P, BQS, BQSs) ->
@@ -582,41 +582,36 @@ a(State = #state{bqss = BQSs}) ->
end.
%%----------------------------------------------------------------------------
-partition_publish_batch(Publishes) ->
+partition_publish_batch(Publishes, MaxP) ->
partition_publishes(
- Publishes, fun ({Msg, _, _}) -> Msg end).
+ Publishes, fun ({Msg, _, _}) -> Msg end, MaxP).
-partition_publish_delivered_batch(Publishes) ->
+partition_publish_delivered_batch(Publishes, MaxP) ->
partition_publishes(
- Publishes, fun ({Msg, _}) -> Msg end).
+ Publishes, fun ({Msg, _}) -> Msg end, MaxP).
-partition_publishes(Publishes, ExtractMsg) ->
+partition_publishes(Publishes, ExtractMsg, MaxP) ->
lists:foldl(fun (Pub, Dict) ->
Msg = ExtractMsg(Pub),
- rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict)
+ rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict)
end, orddict:new(), Publishes).
-priority(P, BQSs) when is_integer(P) ->
- {P, bq_fetch(P, BQSs)};
-priority(#basic_message{content = Content}, BQSs) ->
- priority1(rabbit_binary_parser:ensure_content_decoded(Content), BQSs).
-
-priority1(_Content, [{P, BQSN}]) ->
- {P, BQSN};
-priority1(Content, [{P, BQSN} | Rest]) ->
- case priority2(Content) >= P of
- true -> {P, BQSN};
- false -> priority1(Content, Rest)
- end.
-
-priority2(#basic_message{content = Content}) ->
- priority2(rabbit_binary_parser:ensure_content_decoded(Content));
-priority2(#content{properties = Props}) ->
+priority_bq(Priority, [{MaxP, _} | _] = BQSs) ->
+ bq_fetch(priority(Priority, MaxP), BQSs).
+
+%% Messages with a priority which is higher than the queue's maximum are treated
+%% as if they were published with the maximum priority.
+priority(undefined, _MaxP) ->
+ 0;
+priority(Priority, MaxP) when is_integer(Priority), Priority =< MaxP ->
+ Priority;
+priority(Priority, MaxP) when is_integer(Priority), Priority > MaxP ->
+ MaxP;
+priority(#basic_message{content = Content}, MaxP) ->
+ priority(rabbit_binary_parser:ensure_content_decoded(Content), MaxP);
+priority(#content{properties = Props}, MaxP) ->
#'P_basic'{priority = Priority0} = Props,
- case Priority0 of
- undefined -> 0;
- _ when is_integer(Priority0) -> Priority0
- end.
+ priority(Priority0, MaxP).
add_maybe_infinity(infinity, _) -> infinity;
add_maybe_infinity(_, infinity) -> infinity;
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 5b5c9b3074..6200f9d2c1 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -100,7 +100,7 @@
new() -> #state{consumers = priority_queue:new(),
use = {active,
- time_compat:monotonic_time(micro_seconds),
+ erlang:monotonic_time(micro_seconds),
1.0}}.
max_active_priority(#state{consumers = Consumers}) ->
@@ -350,9 +350,9 @@ drain_mode(true) -> drain;
drain_mode(false) -> manual.
utilisation(#state{use = {active, Since, Avg}}) ->
- use_avg(time_compat:monotonic_time(micro_seconds) - Since, 0, Avg);
+ use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
- use_avg(Active, time_compat:monotonic_time(micro_seconds) - Since, Avg).
+ use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
%%----------------------------------------------------------------------------
@@ -459,10 +459,10 @@ update_use({inactive, _, _, _} = CUInfo, inactive) ->
update_use({active, _, _} = CUInfo, active) ->
CUInfo;
update_use({active, Since, Avg}, inactive) ->
- Now = time_compat:monotonic_time(micro_seconds),
+ Now = erlang:monotonic_time(micro_seconds),
{inactive, Now, Now - Since, Avg};
update_use({inactive, Since, Active, Avg}, active) ->
- Now = time_compat:monotonic_time(micro_seconds),
+ Now = erlang:monotonic_time(micro_seconds),
{active, Now, use_avg(Active, Now - Since, Avg)}.
use_avg(0, 0, Avg) ->
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index 2579cbb2b1..73d509bf33 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -39,6 +39,6 @@ description() ->
queue_master_location(#amqqueue{}) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(),
- RandomPos = erlang:phash2(time_compat:monotonic_time(), length(Cluster)),
+ RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)),
MasterNode = lists:nth(RandomPos + 1, Cluster),
{ok, MasterNode}.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b99a1d12ee..0f55b9e4a9 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -52,6 +52,7 @@
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
+-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -447,6 +448,24 @@ recoverable_slaves(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators,
state]).
+policy_version() ->
+ ok = policy_version(rabbit_queue),
+ ok = policy_version(rabbit_durable_queue).
+
+policy_version(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, 0}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
+ policy_version]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d5b090bed4..5b86cbd3d1 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -688,12 +688,12 @@ requeue(AckTags, #vqstate { mode = default,
State2),
MsgCount = length(MsgIds2),
{MsgIds2, a(reduce_memory_use(
- maybe_update_rates(
+ maybe_update_rates(ui(
State3 #vqstate { delta = Delta1,
q3 = Q3a,
q4 = Q4a,
in_counter = InCounter + MsgCount,
- len = Len + MsgCount })))};
+ len = Len + MsgCount }))))};
requeue(AckTags, #vqstate { mode = lazy,
delta = Delta,
q3 = Q3,
@@ -706,11 +706,11 @@ requeue(AckTags, #vqstate { mode = lazy,
State1),
MsgCount = length(MsgIds1),
{MsgIds1, a(reduce_memory_use(
- maybe_update_rates(
+ maybe_update_rates(ui(
State2 #vqstate { delta = Delta1,
q3 = Q3a,
in_counter = InCounter + MsgCount,
- len = Len + MsgCount })))}.
+ len = Len + MsgCount }))))}.
ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
@@ -774,7 +774,7 @@ update_rates(State = #vqstate{ in_counter = InCount,
ack_in = AckInRate,
ack_out = AckOutRate,
timestamp = TS }}) ->
- Now = time_compat:monotonic_time(),
+ Now = erlang:monotonic_time(),
Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
out = update_rate(Now, TS, OutCount, OutRate),
@@ -789,7 +789,7 @@ update_rates(State = #vqstate{ in_counter = InCount,
rates = Rates }.
update_rate(Now, TS, Count, Rate) ->
- Time = time_compat:convert_time_unit(Now - TS, native, micro_seconds) /
+ Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) /
?MICROS_PER_SECOND,
if
Time == 0 -> Rate;
@@ -1287,7 +1287,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
count = DeltaCount1,
end_seq_id = NextSeqId })
end,
- Now = time_compat:monotonic_time(),
+ Now = erlang:monotonic_time(),
IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
?IO_BATCH_SIZE),
@@ -2124,7 +2124,7 @@ publish_alpha(MsgStatus, State) ->
{MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}.
publish_beta(MsgStatus, State) ->
- {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
{MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
@@ -2161,7 +2161,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
{#msg_status { msg_id = MsgId } = MsgStatus, State1} =
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
- maybe_write_to_disk(true, true, MsgStatus, State1),
+ maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
stats({1, -1}, {MsgStatus, none}, State2)}
end, {Delta, MsgIds, State}, SeqIds).
diff --git a/src/truncate.erl b/src/truncate.erl
index 1c9b08ed27..a1586b0cb0 100644
--- a/src/truncate.erl
+++ b/src/truncate.erl
@@ -21,8 +21,10 @@
-record(params, {content, struct, content_dec, struct_dec}).
-export([log_event/2, term/2]).
-%% exported for testing
--export([test/0]).
+
+-ifdef(TEST).
+-export([term_size/3]).
+-endif.
log_event({Type, GL, {Pid, Format, Args}}, Params)
when Type =:= error orelse
@@ -123,72 +125,3 @@ tuple_term_size(_T, M, I, S, _W) when I > S ->
M;
tuple_term_size(T, M, I, S, W) ->
tuple_term_size(T, lim(term_size(element(I, T), M, W), 2 * W), I + 1, S, W).
-
-%%----------------------------------------------------------------------------
-
-test() ->
- test_short_examples_exactly(),
- test_term_limit(),
- test_large_examples_for_size(),
- ok.
-
-test_short_examples_exactly() ->
- F = fun (Term, Exp) ->
- Exp = term(Term, {1, {10, 10, 5, 5}}),
- Term = term(Term, {100000, {10, 10, 5, 5}})
- end,
- FSmall = fun (Term, Exp) ->
- Exp = term(Term, {1, {2, 2, 2, 2}}),
- Term = term(Term, {100000, {2, 2, 2, 2}})
- end,
- F([], []),
- F("h", "h"),
- F("hello world", "hello w..."),
- F([[h,e,l,l,o,' ',w,o,r,l,d]], [[h,e,l,l,o,'...']]),
- F([a|b], [a|b]),
- F(<<"hello">>, <<"hello">>),
- F([<<"hello world">>], [<<"he...">>]),
- F(<<1:1>>, <<1:1>>),
- F(<<1:81>>, <<0:56, "...">>),
- F({{{{a}}},{b},c,d,e,f,g,h,i,j,k}, {{{'...'}},{b},c,d,e,f,g,h,i,j,'...'}),
- FSmall({a,30,40,40,40,40}, {a,30,'...'}),
- FSmall([a,30,40,40,40,40], [a,30,'...']),
- P = spawn(fun() -> receive die -> ok end end),
- F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]),
- P ! die,
- R = make_ref(),
- F([R], [R]),
- ok.
-
-test_term_limit() ->
- W = erlang:system_info(wordsize),
- S = <<"abc">>,
- 1 = term_size(S, 4, W),
- limit_exceeded = term_size(S, 3, W),
- case 100 - term_size([S, S], 100, W) of
- 22 -> ok; %% 32 bit
- 38 -> ok %% 64 bit
- end,
- case 100 - term_size([S, [S]], 100, W) of
- 30 -> ok; %% ditto
- 54 -> ok
- end,
- limit_exceeded = term_size([S, S], 6, W),
- ok.
-
-test_large_examples_for_size() ->
- %% Real world values
- Shrink = fun(Term) -> term(Term, {1, {1000, 100, 50, 5}}) end,
- TestSize = fun(Term) ->
- true = 5000000 < size(term_to_binary(Term)),
- true = 500000 > size(term_to_binary(Shrink(Term)))
- end,
- TestSize(lists:seq(1, 5000000)),
- TestSize(recursive_list(1000, 10)),
- TestSize(recursive_list(5000, 20)),
- TestSize(gb_sets:from_list([I || I <- lists:seq(1, 1000000)])),
- TestSize(gb_trees:from_orddict([{I, I} || I <- lists:seq(1, 1000000)])),
- ok.
-
-recursive_list(S, 0) -> lists:seq(1, S);
-recursive_list(S, N) -> [recursive_list(S div N, N-1) || _ <- lists:seq(1, S)].