summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-18 15:51:09 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-18 15:51:09 +0100
commit2d024373c6791988ba707d2b80c91786819be00b (patch)
treefaae41dd8ca19bb5cb371823d24d3c0cd713c3b8 /src
parent20a051f19cb338823f488c02a7d07c51d4f75066 (diff)
parent13c794e13636bc9cf4c91d6c44bd58ed3c9d0cac (diff)
downloadrabbitmq-server-git-2d024373c6791988ba707d2b80c91786819be00b.tar.gz
Merge bug24997
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl33
-rw-r--r--src/mirrored_supervisor.erl2
-rw-r--r--src/mochijson2.erl893
-rw-r--r--src/mochinum.erl358
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_amqqueue.erl45
-rw-r--r--src/rabbit_amqqueue_process.erl137
-rw-r--r--src/rabbit_backing_queue.erl16
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_binding.erl58
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_control_main.erl90
-rw-r--r--src/rabbit_direct.erl28
-rw-r--r--src/rabbit_disk_monitor.erl12
-rw-r--r--src/rabbit_exchange.erl7
-rw-r--r--src/rabbit_heartbeat.erl45
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl44
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_misc.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl302
-rw-r--r--src/rabbit_misc.erl53
-rw-r--r--src/rabbit_mnesia.erl1289
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_net.erl33
-rw-r--r--src/rabbit_networking.erl12
-rw-r--r--src/rabbit_node_monitor.erl253
-rw-r--r--src/rabbit_parameter_validation.erl16
-rw-r--r--src/rabbit_policy.erl60
-rw-r--r--src/rabbit_queue_index.erl8
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_runtime_parameter.erl18
-rw-r--r--src/rabbit_runtime_parameters.erl193
-rw-r--r--src/rabbit_runtime_parameters_test.erl18
-rw-r--r--src/rabbit_tests.erl292
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_upgrade.erl18
-rw-r--r--src/rabbit_variable_queue.erl15
-rw-r--r--src/rabbit_vhost.erl13
-rw-r--r--src/supervisor2.erl135
-rw-r--r--src/supervisor2_tests.erl70
40 files changed, 3254 insertions, 1405 deletions
diff --git a/src/gm.erl b/src/gm.erl
index f88ed18fbf..90433e84c5 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -77,9 +77,13 @@
%% confirmed_broadcast/2 directly from the callback module otherwise
%% you will deadlock the entire group.
%%
-%% group_members/1
-%% Provide the Pid. Returns a list of the current group members.
+%% info/1
+%% Provide the Pid. Returns a proplist with various facts, including
+%% the group name and the current group members.
%%
+%% forget_group/1
+%% Provide the group name. Removes its mnesia record. Makes no attempt
+%% to ensure the group is empty.
%%
%% Implementation Overview
%% -----------------------
@@ -373,7 +377,7 @@
-behaviour(gen_server2).
-export([create_tables/0, start_link/3, leave/1, broadcast/2,
- confirmed_broadcast/2, group_members/1]).
+ confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/2]).
@@ -431,7 +435,8 @@
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
--spec(group_members/1 :: (pid()) -> [pid()]).
+-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
%% any of the following terms:
@@ -514,9 +519,15 @@ broadcast(Server, Msg) ->
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
-group_members(Server) ->
- gen_server2:call(Server, group_members, infinity).
+info(Server) ->
+ gen_server2:call(Server, info, infinity).
+forget_group(GroupName) ->
+ {atomic, ok} = mnesia:sync_transaction(
+ fun () ->
+ mnesia:delete({?GROUP_TABLE, GroupName})
+ end),
+ ok.
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
@@ -553,12 +564,16 @@ handle_call({confirmed_broadcast, Msg}, _From,
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
-handle_call(group_members, _From,
+handle_call(info, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
-handle_call(group_members, _From, State = #state { view = View }) ->
- reply(get_pids(alive_view_members(View)), State);
+handle_call(info, _From, State = #state { group_name = GroupName,
+ module = Module,
+ view = View }) ->
+ reply([{group_name, GroupName},
+ {module, Module},
+ {group_members, get_pids(alive_view_members(View))}], State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 4fc488b88a..24c3ebd008 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -174,7 +174,7 @@
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
ChildSpecs :: [supervisor2:child_spec()],
- Result :: supervisor2:startlink_ret().
+ Result :: {'ok', pid()} | {'error', term()}.
-spec create_tables() -> Result when
Result :: 'ok'.
diff --git a/src/mochijson2.erl b/src/mochijson2.erl
new file mode 100644
index 0000000000..bddb52cc6f
--- /dev/null
+++ b/src/mochijson2.erl
@@ -0,0 +1,893 @@
+%% This file is a copy of `mochijson2.erl' from mochiweb, revision
+%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see
+%% `LICENSE-MIT-Mochi'.
+
+%% @author Bob Ippolito <bob@mochimedia.com>
+%% @copyright 2007 Mochi Media, Inc.
+
+%% @doc Yet another JSON (RFC 4627) library for Erlang. mochijson2 works
+%% with binaries as strings, arrays as lists (without an {array, _})
+%% wrapper and it only knows how to decode UTF-8 (and ASCII).
+%%
+%% JSON terms are decoded as follows (javascript -> erlang):
+%% <ul>
+%% <li>{"key": "value"} ->
+%% {struct, [{&lt;&lt;"key">>, &lt;&lt;"value">>}]}</li>
+%% <li>["array", 123, 12.34, true, false, null] ->
+%% [&lt;&lt;"array">>, 123, 12.34, true, false, null]
+%% </li>
+%% </ul>
+%% <ul>
+%% <li>Strings in JSON decode to UTF-8 binaries in Erlang</li>
+%% <li>Objects decode to {struct, PropList}</li>
+%% <li>Numbers decode to integer or float</li>
+%% <li>true, false, null decode to their respective terms.</li>
+%% </ul>
+%% The encoder will accept the same format that the decoder will produce,
+%% but will also allow additional cases for leniency:
+%% <ul>
+%% <li>atoms other than true, false, null will be considered UTF-8
+%% strings (even as a proplist key)
+%% </li>
+%% <li>{json, IoList} will insert IoList directly into the output
+%% with no validation
+%% </li>
+%% <li>{array, Array} will be encoded as Array
+%% (legacy mochijson style)
+%% </li>
+%% <li>A non-empty raw proplist will be encoded as an object as long
+%% as the first pair does not have an atom key of json, struct,
+%% or array
+%% </li>
+%% </ul>
+
+-module(mochijson2).
+-author('bob@mochimedia.com').
+-export([encoder/1, encode/1]).
+-export([decoder/1, decode/1, decode/2]).
+
+%% This is a macro to placate syntax highlighters..
+-define(Q, $\").
+-define(ADV_COL(S, N), S#decoder{offset=N+S#decoder.offset,
+ column=N+S#decoder.column}).
+-define(INC_COL(S), S#decoder{offset=1+S#decoder.offset,
+ column=1+S#decoder.column}).
+-define(INC_LINE(S), S#decoder{offset=1+S#decoder.offset,
+ column=1,
+ line=1+S#decoder.line}).
+-define(INC_CHAR(S, C),
+ case C of
+ $\n ->
+ S#decoder{column=1,
+ line=1+S#decoder.line,
+ offset=1+S#decoder.offset};
+ _ ->
+ S#decoder{column=1+S#decoder.column,
+ offset=1+S#decoder.offset}
+ end).
+-define(IS_WHITESPACE(C),
+ (C =:= $\s orelse C =:= $\t orelse C =:= $\r orelse C =:= $\n)).
+
+%% @type json_string() = atom | binary()
+%% @type json_number() = integer() | float()
+%% @type json_array() = [json_term()]
+%% @type json_object() = {struct, [{json_string(), json_term()}]}
+%% @type json_eep18_object() = {[{json_string(), json_term()}]}
+%% @type json_iolist() = {json, iolist()}
+%% @type json_term() = json_string() | json_number() | json_array() |
+%% json_object() | json_eep18_object() | json_iolist()
+
+-record(encoder, {handler=null,
+ utf8=false}).
+
+-record(decoder, {object_hook=null,
+ offset=0,
+ line=1,
+ column=1,
+ state=null}).
+
+%% @spec encoder([encoder_option()]) -> function()
+%% @doc Create an encoder/1 with the given options.
+%% @type encoder_option() = handler_option() | utf8_option()
+%% @type utf8_option() = boolean(). Emit unicode as utf8 (default - false)
+encoder(Options) ->
+ State = parse_encoder_options(Options, #encoder{}),
+ fun (O) -> json_encode(O, State) end.
+
+%% @spec encode(json_term()) -> iolist()
+%% @doc Encode the given as JSON to an iolist.
+encode(Any) ->
+ json_encode(Any, #encoder{}).
+
+%% @spec decoder([decoder_option()]) -> function()
+%% @doc Create a decoder/1 with the given options.
+decoder(Options) ->
+ State = parse_decoder_options(Options, #decoder{}),
+ fun (O) -> json_decode(O, State) end.
+
+%% @spec decode(iolist(), [{format, proplist | eep18 | struct}]) -> json_term()
+%% @doc Decode the given iolist to Erlang terms using the given object format
+%% for decoding, where proplist returns JSON objects as [{binary(), json_term()}]
+%% proplists, eep18 returns JSON objects as {[binary(), json_term()]}, and struct
+%% returns them as-is.
+decode(S, Options) ->
+ json_decode(S, parse_decoder_options(Options, #decoder{})).
+
+%% @spec decode(iolist()) -> json_term()
+%% @doc Decode the given iolist to Erlang terms.
+decode(S) ->
+ json_decode(S, #decoder{}).
+
+%% Internal API
+
+parse_encoder_options([], State) ->
+ State;
+parse_encoder_options([{handler, Handler} | Rest], State) ->
+ parse_encoder_options(Rest, State#encoder{handler=Handler});
+parse_encoder_options([{utf8, Switch} | Rest], State) ->
+ parse_encoder_options(Rest, State#encoder{utf8=Switch}).
+
+parse_decoder_options([], State) ->
+ State;
+parse_decoder_options([{object_hook, Hook} | Rest], State) ->
+ parse_decoder_options(Rest, State#decoder{object_hook=Hook});
+parse_decoder_options([{format, Format} | Rest], State)
+ when Format =:= struct orelse Format =:= eep18 orelse Format =:= proplist ->
+ parse_decoder_options(Rest, State#decoder{object_hook=Format}).
+
+json_encode(true, _State) ->
+ <<"true">>;
+json_encode(false, _State) ->
+ <<"false">>;
+json_encode(null, _State) ->
+ <<"null">>;
+json_encode(I, _State) when is_integer(I) ->
+ integer_to_list(I);
+json_encode(F, _State) when is_float(F) ->
+ mochinum:digits(F);
+json_encode(S, State) when is_binary(S); is_atom(S) ->
+ json_encode_string(S, State);
+json_encode([{K, _}|_] = Props, State) when (K =/= struct andalso
+ K =/= array andalso
+ K =/= json) ->
+ json_encode_proplist(Props, State);
+json_encode({struct, Props}, State) when is_list(Props) ->
+ json_encode_proplist(Props, State);
+json_encode({Props}, State) when is_list(Props) ->
+ json_encode_proplist(Props, State);
+json_encode({}, State) ->
+ json_encode_proplist([], State);
+json_encode(Array, State) when is_list(Array) ->
+ json_encode_array(Array, State);
+json_encode({array, Array}, State) when is_list(Array) ->
+ json_encode_array(Array, State);
+json_encode({json, IoList}, _State) ->
+ IoList;
+json_encode(Bad, #encoder{handler=null}) ->
+ exit({json_encode, {bad_term, Bad}});
+json_encode(Bad, State=#encoder{handler=Handler}) ->
+ json_encode(Handler(Bad), State).
+
+json_encode_array([], _State) ->
+ <<"[]">>;
+json_encode_array(L, State) ->
+ F = fun (O, Acc) ->
+ [$,, json_encode(O, State) | Acc]
+ end,
+ [$, | Acc1] = lists:foldl(F, "[", L),
+ lists:reverse([$\] | Acc1]).
+
+json_encode_proplist([], _State) ->
+ <<"{}">>;
+json_encode_proplist(Props, State) ->
+ F = fun ({K, V}, Acc) ->
+ KS = json_encode_string(K, State),
+ VS = json_encode(V, State),
+ [$,, VS, $:, KS | Acc]
+ end,
+ [$, | Acc1] = lists:foldl(F, "{", Props),
+ lists:reverse([$\} | Acc1]).
+
+json_encode_string(A, State) when is_atom(A) ->
+ L = atom_to_list(A),
+ case json_string_is_safe(L) of
+ true ->
+ [?Q, L, ?Q];
+ false ->
+ json_encode_string_unicode(xmerl_ucs:from_utf8(L), State, [?Q])
+ end;
+json_encode_string(B, State) when is_binary(B) ->
+ case json_bin_is_safe(B) of
+ true ->
+ [?Q, B, ?Q];
+ false ->
+ json_encode_string_unicode(xmerl_ucs:from_utf8(B), State, [?Q])
+ end;
+json_encode_string(I, _State) when is_integer(I) ->
+ [?Q, integer_to_list(I), ?Q];
+json_encode_string(L, State) when is_list(L) ->
+ case json_string_is_safe(L) of
+ true ->
+ [?Q, L, ?Q];
+ false ->
+ json_encode_string_unicode(L, State, [?Q])
+ end.
+
+json_string_is_safe([]) ->
+ true;
+json_string_is_safe([C | Rest]) ->
+ case C of
+ ?Q ->
+ false;
+ $\\ ->
+ false;
+ $\b ->
+ false;
+ $\f ->
+ false;
+ $\n ->
+ false;
+ $\r ->
+ false;
+ $\t ->
+ false;
+ C when C >= 0, C < $\s; C >= 16#7f, C =< 16#10FFFF ->
+ false;
+ C when C < 16#7f ->
+ json_string_is_safe(Rest);
+ _ ->
+ false
+ end.
+
+json_bin_is_safe(<<>>) ->
+ true;
+json_bin_is_safe(<<C, Rest/binary>>) ->
+ case C of
+ ?Q ->
+ false;
+ $\\ ->
+ false;
+ $\b ->
+ false;
+ $\f ->
+ false;
+ $\n ->
+ false;
+ $\r ->
+ false;
+ $\t ->
+ false;
+ C when C >= 0, C < $\s; C >= 16#7f ->
+ false;
+ C when C < 16#7f ->
+ json_bin_is_safe(Rest)
+ end.
+
+json_encode_string_unicode([], _State, Acc) ->
+ lists:reverse([$\" | Acc]);
+json_encode_string_unicode([C | Cs], State, Acc) ->
+ Acc1 = case C of
+ ?Q ->
+ [?Q, $\\ | Acc];
+ %% Escaping solidus is only useful when trying to protect
+ %% against "</script>" injection attacks which are only
+ %% possible when JSON is inserted into a HTML document
+ %% in-line. mochijson2 does not protect you from this, so
+ %% if you do insert directly into HTML then you need to
+ %% uncomment the following case or escape the output of encode.
+ %%
+ %% $/ ->
+ %% [$/, $\\ | Acc];
+ %%
+ $\\ ->
+ [$\\, $\\ | Acc];
+ $\b ->
+ [$b, $\\ | Acc];
+ $\f ->
+ [$f, $\\ | Acc];
+ $\n ->
+ [$n, $\\ | Acc];
+ $\r ->
+ [$r, $\\ | Acc];
+ $\t ->
+ [$t, $\\ | Acc];
+ C when C >= 0, C < $\s ->
+ [unihex(C) | Acc];
+ C when C >= 16#7f, C =< 16#10FFFF, State#encoder.utf8 ->
+ [xmerl_ucs:to_utf8(C) | Acc];
+ C when C >= 16#7f, C =< 16#10FFFF, not State#encoder.utf8 ->
+ [unihex(C) | Acc];
+ C when C < 16#7f ->
+ [C | Acc];
+ _ ->
+ exit({json_encode, {bad_char, C}})
+ end,
+ json_encode_string_unicode(Cs, State, Acc1).
+
+hexdigit(C) when C >= 0, C =< 9 ->
+ C + $0;
+hexdigit(C) when C =< 15 ->
+ C + $a - 10.
+
+unihex(C) when C < 16#10000 ->
+ <<D3:4, D2:4, D1:4, D0:4>> = <<C:16>>,
+ Digits = [hexdigit(D) || D <- [D3, D2, D1, D0]],
+ [$\\, $u | Digits];
+unihex(C) when C =< 16#10FFFF ->
+ N = C - 16#10000,
+ S1 = 16#d800 bor ((N bsr 10) band 16#3ff),
+ S2 = 16#dc00 bor (N band 16#3ff),
+ [unihex(S1), unihex(S2)].
+
+json_decode(L, S) when is_list(L) ->
+ json_decode(iolist_to_binary(L), S);
+json_decode(B, S) ->
+ {Res, S1} = decode1(B, S),
+ {eof, _} = tokenize(B, S1#decoder{state=trim}),
+ Res.
+
+decode1(B, S=#decoder{state=null}) ->
+ case tokenize(B, S#decoder{state=any}) of
+ {{const, C}, S1} ->
+ {C, S1};
+ {start_array, S1} ->
+ decode_array(B, S1);
+ {start_object, S1} ->
+ decode_object(B, S1)
+ end.
+
+make_object(V, #decoder{object_hook=N}) when N =:= null orelse N =:= struct ->
+ V;
+make_object({struct, P}, #decoder{object_hook=eep18}) ->
+ {P};
+make_object({struct, P}, #decoder{object_hook=proplist}) ->
+ P;
+make_object(V, #decoder{object_hook=Hook}) ->
+ Hook(V).
+
+decode_object(B, S) ->
+ decode_object(B, S#decoder{state=key}, []).
+
+decode_object(B, S=#decoder{state=key}, Acc) ->
+ case tokenize(B, S) of
+ {end_object, S1} ->
+ V = make_object({struct, lists:reverse(Acc)}, S1),
+ {V, S1#decoder{state=null}};
+ {{const, K}, S1} ->
+ {colon, S2} = tokenize(B, S1),
+ {V, S3} = decode1(B, S2#decoder{state=null}),
+ decode_object(B, S3#decoder{state=comma}, [{K, V} | Acc])
+ end;
+decode_object(B, S=#decoder{state=comma}, Acc) ->
+ case tokenize(B, S) of
+ {end_object, S1} ->
+ V = make_object({struct, lists:reverse(Acc)}, S1),
+ {V, S1#decoder{state=null}};
+ {comma, S1} ->
+ decode_object(B, S1#decoder{state=key}, Acc)
+ end.
+
+decode_array(B, S) ->
+ decode_array(B, S#decoder{state=any}, []).
+
+decode_array(B, S=#decoder{state=any}, Acc) ->
+ case tokenize(B, S) of
+ {end_array, S1} ->
+ {lists:reverse(Acc), S1#decoder{state=null}};
+ {start_array, S1} ->
+ {Array, S2} = decode_array(B, S1),
+ decode_array(B, S2#decoder{state=comma}, [Array | Acc]);
+ {start_object, S1} ->
+ {Array, S2} = decode_object(B, S1),
+ decode_array(B, S2#decoder{state=comma}, [Array | Acc]);
+ {{const, Const}, S1} ->
+ decode_array(B, S1#decoder{state=comma}, [Const | Acc])
+ end;
+decode_array(B, S=#decoder{state=comma}, Acc) ->
+ case tokenize(B, S) of
+ {end_array, S1} ->
+ {lists:reverse(Acc), S1#decoder{state=null}};
+ {comma, S1} ->
+ decode_array(B, S1#decoder{state=any}, Acc)
+ end.
+
+tokenize_string(B, S=#decoder{offset=O}) ->
+ case tokenize_string_fast(B, O) of
+ {escape, O1} ->
+ Length = O1 - O,
+ S1 = ?ADV_COL(S, Length),
+ <<_:O/binary, Head:Length/binary, _/binary>> = B,
+ tokenize_string(B, S1, lists:reverse(binary_to_list(Head)));
+ O1 ->
+ Length = O1 - O,
+ <<_:O/binary, String:Length/binary, ?Q, _/binary>> = B,
+ {{const, String}, ?ADV_COL(S, Length + 1)}
+ end.
+
+tokenize_string_fast(B, O) ->
+ case B of
+ <<_:O/binary, ?Q, _/binary>> ->
+ O;
+ <<_:O/binary, $\\, _/binary>> ->
+ {escape, O};
+ <<_:O/binary, C1, _/binary>> when C1 < 128 ->
+ tokenize_string_fast(B, 1 + O);
+ <<_:O/binary, C1, C2, _/binary>> when C1 >= 194, C1 =< 223,
+ C2 >= 128, C2 =< 191 ->
+ tokenize_string_fast(B, 2 + O);
+ <<_:O/binary, C1, C2, C3, _/binary>> when C1 >= 224, C1 =< 239,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191 ->
+ tokenize_string_fast(B, 3 + O);
+ <<_:O/binary, C1, C2, C3, C4, _/binary>> when C1 >= 240, C1 =< 244,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191,
+ C4 >= 128, C4 =< 191 ->
+ tokenize_string_fast(B, 4 + O);
+ _ ->
+ throw(invalid_utf8)
+ end.
+
+tokenize_string(B, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, ?Q, _/binary>> ->
+ {{const, iolist_to_binary(lists:reverse(Acc))}, ?INC_COL(S)};
+ <<_:O/binary, "\\\"", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\" | Acc]);
+ <<_:O/binary, "\\\\", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\\ | Acc]);
+ <<_:O/binary, "\\/", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$/ | Acc]);
+ <<_:O/binary, "\\b", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\b | Acc]);
+ <<_:O/binary, "\\f", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\f | Acc]);
+ <<_:O/binary, "\\n", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\n | Acc]);
+ <<_:O/binary, "\\r", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\r | Acc]);
+ <<_:O/binary, "\\t", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\t | Acc]);
+ <<_:O/binary, "\\u", C3, C2, C1, C0, Rest/binary>> ->
+ C = erlang:list_to_integer([C3, C2, C1, C0], 16),
+ if C > 16#D7FF, C < 16#DC00 ->
+ %% coalesce UTF-16 surrogate pair
+ <<"\\u", D3, D2, D1, D0, _/binary>> = Rest,
+ D = erlang:list_to_integer([D3,D2,D1,D0], 16),
+ [CodePoint] = xmerl_ucs:from_utf16be(<<C:16/big-unsigned-integer,
+ D:16/big-unsigned-integer>>),
+ Acc1 = lists:reverse(xmerl_ucs:to_utf8(CodePoint), Acc),
+ tokenize_string(B, ?ADV_COL(S, 12), Acc1);
+ true ->
+ Acc1 = lists:reverse(xmerl_ucs:to_utf8(C), Acc),
+ tokenize_string(B, ?ADV_COL(S, 6), Acc1)
+ end;
+ <<_:O/binary, C1, _/binary>> when C1 < 128 ->
+ tokenize_string(B, ?INC_CHAR(S, C1), [C1 | Acc]);
+ <<_:O/binary, C1, C2, _/binary>> when C1 >= 194, C1 =< 223,
+ C2 >= 128, C2 =< 191 ->
+ tokenize_string(B, ?ADV_COL(S, 2), [C2, C1 | Acc]);
+ <<_:O/binary, C1, C2, C3, _/binary>> when C1 >= 224, C1 =< 239,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191 ->
+ tokenize_string(B, ?ADV_COL(S, 3), [C3, C2, C1 | Acc]);
+ <<_:O/binary, C1, C2, C3, C4, _/binary>> when C1 >= 240, C1 =< 244,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191,
+ C4 >= 128, C4 =< 191 ->
+ tokenize_string(B, ?ADV_COL(S, 4), [C4, C3, C2, C1 | Acc]);
+ _ ->
+ throw(invalid_utf8)
+ end.
+
+tokenize_number(B, S) ->
+ case tokenize_number(B, sign, S, []) of
+ {{int, Int}, S1} ->
+ {{const, list_to_integer(Int)}, S1};
+ {{float, Float}, S1} ->
+ {{const, list_to_float(Float)}, S1}
+ end.
+
+tokenize_number(B, sign, S=#decoder{offset=O}, []) ->
+ case B of
+ <<_:O/binary, $-, _/binary>> ->
+ tokenize_number(B, int, ?INC_COL(S), [$-]);
+ _ ->
+ tokenize_number(B, int, S, [])
+ end;
+tokenize_number(B, int, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, $0, _/binary>> ->
+ tokenize_number(B, frac, ?INC_COL(S), [$0 | Acc]);
+ <<_:O/binary, C, _/binary>> when C >= $1 andalso C =< $9 ->
+ tokenize_number(B, int1, ?INC_COL(S), [C | Acc])
+ end;
+tokenize_number(B, int1, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C >= $0 andalso C =< $9 ->
+ tokenize_number(B, int1, ?INC_COL(S), [C | Acc]);
+ _ ->
+ tokenize_number(B, frac, S, Acc)
+ end;
+tokenize_number(B, frac, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, $., C, _/binary>> when C >= $0, C =< $9 ->
+ tokenize_number(B, frac1, ?ADV_COL(S, 2), [C, $. | Acc]);
+ <<_:O/binary, E, _/binary>> when E =:= $e orelse E =:= $E ->
+ tokenize_number(B, esign, ?INC_COL(S), [$e, $0, $. | Acc]);
+ _ ->
+ {{int, lists:reverse(Acc)}, S}
+ end;
+tokenize_number(B, frac1, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C >= $0 andalso C =< $9 ->
+ tokenize_number(B, frac1, ?INC_COL(S), [C | Acc]);
+ <<_:O/binary, E, _/binary>> when E =:= $e orelse E =:= $E ->
+ tokenize_number(B, esign, ?INC_COL(S), [$e | Acc]);
+ _ ->
+ {{float, lists:reverse(Acc)}, S}
+ end;
+tokenize_number(B, esign, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C =:= $- orelse C=:= $+ ->
+ tokenize_number(B, eint, ?INC_COL(S), [C | Acc]);
+ _ ->
+ tokenize_number(B, eint, S, Acc)
+ end;
+tokenize_number(B, eint, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C >= $0 andalso C =< $9 ->
+ tokenize_number(B, eint1, ?INC_COL(S), [C | Acc])
+ end;
+tokenize_number(B, eint1, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C >= $0 andalso C =< $9 ->
+ tokenize_number(B, eint1, ?INC_COL(S), [C | Acc]);
+ _ ->
+ {{float, lists:reverse(Acc)}, S}
+ end.
+
+tokenize(B, S=#decoder{offset=O}) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when ?IS_WHITESPACE(C) ->
+ tokenize(B, ?INC_CHAR(S, C));
+ <<_:O/binary, "{", _/binary>> ->
+ {start_object, ?INC_COL(S)};
+ <<_:O/binary, "}", _/binary>> ->
+ {end_object, ?INC_COL(S)};
+ <<_:O/binary, "[", _/binary>> ->
+ {start_array, ?INC_COL(S)};
+ <<_:O/binary, "]", _/binary>> ->
+ {end_array, ?INC_COL(S)};
+ <<_:O/binary, ",", _/binary>> ->
+ {comma, ?INC_COL(S)};
+ <<_:O/binary, ":", _/binary>> ->
+ {colon, ?INC_COL(S)};
+ <<_:O/binary, "null", _/binary>> ->
+ {{const, null}, ?ADV_COL(S, 4)};
+ <<_:O/binary, "true", _/binary>> ->
+ {{const, true}, ?ADV_COL(S, 4)};
+ <<_:O/binary, "false", _/binary>> ->
+ {{const, false}, ?ADV_COL(S, 5)};
+ <<_:O/binary, "\"", _/binary>> ->
+ tokenize_string(B, ?INC_COL(S));
+ <<_:O/binary, C, _/binary>> when (C >= $0 andalso C =< $9)
+ orelse C =:= $- ->
+ tokenize_number(B, S);
+ <<_:O/binary>> ->
+ trim = S#decoder.state,
+ {eof, S}
+ end.
+%%
+%% Tests
+%%
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+%% testing constructs borrowed from the Yaws JSON implementation.
+
+%% Create an object from a list of Key/Value pairs.
+
+obj_new() ->
+ {struct, []}.
+
+is_obj({struct, Props}) ->
+ F = fun ({K, _}) when is_binary(K) -> true end,
+ lists:all(F, Props).
+
+obj_from_list(Props) ->
+ Obj = {struct, Props},
+ ?assert(is_obj(Obj)),
+ Obj.
+
+%% Test for equivalence of Erlang terms.
+%% Due to arbitrary order of construction, equivalent objects might
+%% compare unequal as erlang terms, so we need to carefully recurse
+%% through aggregates (tuples and objects).
+
+equiv({struct, Props1}, {struct, Props2}) ->
+ equiv_object(Props1, Props2);
+equiv(L1, L2) when is_list(L1), is_list(L2) ->
+ equiv_list(L1, L2);
+equiv(N1, N2) when is_number(N1), is_number(N2) -> N1 == N2;
+equiv(B1, B2) when is_binary(B1), is_binary(B2) -> B1 == B2;
+equiv(A, A) when A =:= true orelse A =:= false orelse A =:= null -> true.
+
+%% Object representation and traversal order is unknown.
+%% Use the sledgehammer and sort property lists.
+
+equiv_object(Props1, Props2) ->
+ L1 = lists:keysort(1, Props1),
+ L2 = lists:keysort(1, Props2),
+ Pairs = lists:zip(L1, L2),
+ true = lists:all(fun({{K1, V1}, {K2, V2}}) ->
+ equiv(K1, K2) and equiv(V1, V2)
+ end, Pairs).
+
+%% Recursively compare tuple elements for equivalence.
+
+equiv_list([], []) ->
+ true;
+equiv_list([V1 | L1], [V2 | L2]) ->
+ equiv(V1, V2) andalso equiv_list(L1, L2).
+
+decode_test() ->
+ [1199344435545.0, 1] = decode(<<"[1199344435545.0,1]">>),
+ <<16#F0,16#9D,16#9C,16#95>> = decode([34,"\\ud835","\\udf15",34]).
+
+e2j_vec_test() ->
+ test_one(e2j_test_vec(utf8), 1).
+
+test_one([], _N) ->
+ %% io:format("~p tests passed~n", [N-1]),
+ ok;
+test_one([{E, J} | Rest], N) ->
+ %% io:format("[~p] ~p ~p~n", [N, E, J]),
+ true = equiv(E, decode(J)),
+ true = equiv(E, decode(encode(E))),
+ test_one(Rest, 1+N).
+
+e2j_test_vec(utf8) ->
+ [
+ {1, "1"},
+ {3.1416, "3.14160"}, %% text representation may truncate, trail zeroes
+ {-1, "-1"},
+ {-3.1416, "-3.14160"},
+ {12.0e10, "1.20000e+11"},
+ {1.234E+10, "1.23400e+10"},
+ {-1.234E-10, "-1.23400e-10"},
+ {10.0, "1.0e+01"},
+ {123.456, "1.23456E+2"},
+ {10.0, "1e1"},
+ {<<"foo">>, "\"foo\""},
+ {<<"foo", 5, "bar">>, "\"foo\\u0005bar\""},
+ {<<"">>, "\"\""},
+ {<<"\n\n\n">>, "\"\\n\\n\\n\""},
+ {<<"\" \b\f\r\n\t\"">>, "\"\\\" \\b\\f\\r\\n\\t\\\"\""},
+ {obj_new(), "{}"},
+ {obj_from_list([{<<"foo">>, <<"bar">>}]), "{\"foo\":\"bar\"}"},
+ {obj_from_list([{<<"foo">>, <<"bar">>}, {<<"baz">>, 123}]),
+ "{\"foo\":\"bar\",\"baz\":123}"},
+ {[], "[]"},
+ {[[]], "[[]]"},
+ {[1, <<"foo">>], "[1,\"foo\"]"},
+
+ %% json array in a json object
+ {obj_from_list([{<<"foo">>, [123]}]),
+ "{\"foo\":[123]}"},
+
+ %% json object in a json object
+ {obj_from_list([{<<"foo">>, obj_from_list([{<<"bar">>, true}])}]),
+ "{\"foo\":{\"bar\":true}}"},
+
+ %% fold evaluation order
+ {obj_from_list([{<<"foo">>, []},
+ {<<"bar">>, obj_from_list([{<<"baz">>, true}])},
+ {<<"alice">>, <<"bob">>}]),
+ "{\"foo\":[],\"bar\":{\"baz\":true},\"alice\":\"bob\"}"},
+
+ %% json object in a json array
+ {[-123, <<"foo">>, obj_from_list([{<<"bar">>, []}]), null],
+ "[-123,\"foo\",{\"bar\":[]},null]"}
+ ].
+
+%% test utf8 encoding
+encoder_utf8_test() ->
+ %% safe conversion case (default)
+ [34,"\\u0001","\\u0442","\\u0435","\\u0441","\\u0442",34] =
+ encode(<<1,"\321\202\320\265\321\201\321\202">>),
+
+ %% raw utf8 output (optional)
+ Enc = mochijson2:encoder([{utf8, true}]),
+ [34,"\\u0001",[209,130],[208,181],[209,129],[209,130],34] =
+ Enc(<<1,"\321\202\320\265\321\201\321\202">>).
+
+input_validation_test() ->
+ Good = [
+ {16#00A3, <<?Q, 16#C2, 16#A3, ?Q>>}, %% pound
+ {16#20AC, <<?Q, 16#E2, 16#82, 16#AC, ?Q>>}, %% euro
+ {16#10196, <<?Q, 16#F0, 16#90, 16#86, 16#96, ?Q>>} %% denarius
+ ],
+ lists:foreach(fun({CodePoint, UTF8}) ->
+ Expect = list_to_binary(xmerl_ucs:to_utf8(CodePoint)),
+ Expect = decode(UTF8)
+ end, Good),
+
+ Bad = [
+ %% 2nd, 3rd, or 4th byte of a multi-byte sequence w/o leading byte
+ <<?Q, 16#80, ?Q>>,
+ %% missing continuations, last byte in each should be 80-BF
+ <<?Q, 16#C2, 16#7F, ?Q>>,
+ <<?Q, 16#E0, 16#80,16#7F, ?Q>>,
+ <<?Q, 16#F0, 16#80, 16#80, 16#7F, ?Q>>,
+ %% we don't support code points > 10FFFF per RFC 3629
+ <<?Q, 16#F5, 16#80, 16#80, 16#80, ?Q>>,
+ %% escape characters trigger a different code path
+ <<?Q, $\\, $\n, 16#80, ?Q>>
+ ],
+ lists:foreach(
+ fun(X) ->
+ ok = try decode(X) catch invalid_utf8 -> ok end,
+ %% could be {ucs,{bad_utf8_character_code}} or
+ %% {json_encode,{bad_char,_}}
+ {'EXIT', _} = (catch encode(X))
+ end, Bad).
+
+inline_json_test() ->
+ ?assertEqual(<<"\"iodata iodata\"">>,
+ iolist_to_binary(
+ encode({json, [<<"\"iodata">>, " iodata\""]}))),
+ ?assertEqual({struct, [{<<"key">>, <<"iodata iodata">>}]},
+ decode(
+ encode({struct,
+ [{key, {json, [<<"\"iodata">>, " iodata\""]}}]}))),
+ ok.
+
+big_unicode_test() ->
+ UTF8Seq = list_to_binary(xmerl_ucs:to_utf8(16#0001d120)),
+ ?assertEqual(
+ <<"\"\\ud834\\udd20\"">>,
+ iolist_to_binary(encode(UTF8Seq))),
+ ?assertEqual(
+ UTF8Seq,
+ decode(iolist_to_binary(encode(UTF8Seq)))),
+ ok.
+
+custom_decoder_test() ->
+ ?assertEqual(
+ {struct, [{<<"key">>, <<"value">>}]},
+ (decoder([]))("{\"key\": \"value\"}")),
+ F = fun ({struct, [{<<"key">>, <<"value">>}]}) -> win end,
+ ?assertEqual(
+ win,
+ (decoder([{object_hook, F}]))("{\"key\": \"value\"}")),
+ ok.
+
+atom_test() ->
+ %% JSON native atoms
+ [begin
+ ?assertEqual(A, decode(atom_to_list(A))),
+ ?assertEqual(iolist_to_binary(atom_to_list(A)),
+ iolist_to_binary(encode(A)))
+ end || A <- [true, false, null]],
+ %% Atom to string
+ ?assertEqual(
+ <<"\"foo\"">>,
+ iolist_to_binary(encode(foo))),
+ ?assertEqual(
+ <<"\"\\ud834\\udd20\"">>,
+ iolist_to_binary(encode(list_to_atom(xmerl_ucs:to_utf8(16#0001d120))))),
+ ok.
+
+key_encode_test() ->
+ %% Some forms are accepted as keys that would not be strings in other
+ %% cases
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode({struct, [{foo, 1}]}))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode({struct, [{<<"foo">>, 1}]}))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode({struct, [{"foo", 1}]}))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode([{foo, 1}]))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode([{<<"foo">>, 1}]))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode([{"foo", 1}]))),
+ ?assertEqual(
+ <<"{\"\\ud834\\udd20\":1}">>,
+ iolist_to_binary(
+ encode({struct, [{[16#0001d120], 1}]}))),
+ ?assertEqual(
+ <<"{\"1\":1}">>,
+ iolist_to_binary(encode({struct, [{1, 1}]}))),
+ ok.
+
+unsafe_chars_test() ->
+ Chars = "\"\\\b\f\n\r\t",
+ [begin
+ ?assertEqual(false, json_string_is_safe([C])),
+ ?assertEqual(false, json_bin_is_safe(<<C>>)),
+ ?assertEqual(<<C>>, decode(encode(<<C>>)))
+ end || C <- Chars],
+ ?assertEqual(
+ false,
+ json_string_is_safe([16#0001d120])),
+ ?assertEqual(
+ false,
+ json_bin_is_safe(list_to_binary(xmerl_ucs:to_utf8(16#0001d120)))),
+ ?assertEqual(
+ [16#0001d120],
+ xmerl_ucs:from_utf8(
+ binary_to_list(
+ decode(encode(list_to_atom(xmerl_ucs:to_utf8(16#0001d120))))))),
+ ?assertEqual(
+ false,
+ json_string_is_safe([16#110000])),
+ ?assertEqual(
+ false,
+ json_bin_is_safe(list_to_binary(xmerl_ucs:to_utf8([16#110000])))),
+ %% solidus can be escaped but isn't unsafe by default
+ ?assertEqual(
+ <<"/">>,
+ decode(<<"\"\\/\"">>)),
+ ok.
+
+int_test() ->
+ ?assertEqual(0, decode("0")),
+ ?assertEqual(1, decode("1")),
+ ?assertEqual(11, decode("11")),
+ ok.
+
+large_int_test() ->
+ ?assertEqual(<<"-2147483649214748364921474836492147483649">>,
+ iolist_to_binary(encode(-2147483649214748364921474836492147483649))),
+ ?assertEqual(<<"2147483649214748364921474836492147483649">>,
+ iolist_to_binary(encode(2147483649214748364921474836492147483649))),
+ ok.
+
+float_test() ->
+ ?assertEqual(<<"-2147483649.0">>, iolist_to_binary(encode(-2147483649.0))),
+ ?assertEqual(<<"2147483648.0">>, iolist_to_binary(encode(2147483648.0))),
+ ok.
+
+handler_test() ->
+ ?assertEqual(
+ {'EXIT',{json_encode,{bad_term,{x,y}}}},
+ catch encode({x,y})),
+ F = fun ({x,y}) -> [] end,
+ ?assertEqual(
+ <<"[]">>,
+ iolist_to_binary((encoder([{handler, F}]))({x, y}))),
+ ok.
+
+encode_empty_test_() ->
+ [{A, ?_assertEqual(<<"{}">>, iolist_to_binary(encode(B)))}
+ || {A, B} <- [{"eep18 {}", {}},
+ {"eep18 {[]}", {[]}},
+ {"{struct, []}", {struct, []}}]].
+
+encode_test_() ->
+ P = [{<<"k">>, <<"v">>}],
+ JSON = iolist_to_binary(encode({struct, P})),
+ [{atom_to_list(F),
+ ?_assertEqual(JSON, iolist_to_binary(encode(decode(JSON, [{format, F}]))))}
+ || F <- [struct, eep18, proplist]].
+
+format_test_() ->
+ P = [{<<"k">>, <<"v">>}],
+ JSON = iolist_to_binary(encode({struct, P})),
+ [{atom_to_list(F),
+ ?_assertEqual(A, decode(JSON, [{format, F}]))}
+ || {F, A} <- [{struct, {struct, P}},
+ {eep18, {P}},
+ {proplist, P}]].
+
+-endif.
diff --git a/src/mochinum.erl b/src/mochinum.erl
new file mode 100644
index 0000000000..4ea7a22acf
--- /dev/null
+++ b/src/mochinum.erl
@@ -0,0 +1,358 @@
+%% This file is a copy of `mochijson2.erl' from mochiweb, revision
+%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see
+%% `LICENSE-MIT-Mochi'.
+
+%% @copyright 2007 Mochi Media, Inc.
+%% @author Bob Ippolito <bob@mochimedia.com>
+
+%% @doc Useful numeric algorithms for floats that cover some deficiencies
+%% in the math module. More interesting is digits/1, which implements
+%% the algorithm from:
+%% http://www.cs.indiana.edu/~burger/fp/index.html
+%% See also "Printing Floating-Point Numbers Quickly and Accurately"
+%% in Proceedings of the SIGPLAN '96 Conference on Programming Language
+%% Design and Implementation.
+
+-module(mochinum).
+-author("Bob Ippolito <bob@mochimedia.com>").
+-export([digits/1, frexp/1, int_pow/2, int_ceil/1]).
+
+%% IEEE 754 Float exponent bias
+-define(FLOAT_BIAS, 1022).
+-define(MIN_EXP, -1074).
+-define(BIG_POW, 4503599627370496).
+
+%% External API
+
+%% @spec digits(number()) -> string()
+%% @doc Returns a string that accurately represents the given integer or float
+%% using a conservative amount of digits. Great for generating
+%% human-readable output, or compact ASCII serializations for floats.
+digits(N) when is_integer(N) ->
+ integer_to_list(N);
+digits(0.0) ->
+ "0.0";
+digits(Float) ->
+ {Frac1, Exp1} = frexp_int(Float),
+ [Place0 | Digits0] = digits1(Float, Exp1, Frac1),
+ {Place, Digits} = transform_digits(Place0, Digits0),
+ R = insert_decimal(Place, Digits),
+ case Float < 0 of
+ true ->
+ [$- | R];
+ _ ->
+ R
+ end.
+
+%% @spec frexp(F::float()) -> {Frac::float(), Exp::float()}
+%% @doc Return the fractional and exponent part of an IEEE 754 double,
+%% equivalent to the libc function of the same name.
+%% F = Frac * pow(2, Exp).
+frexp(F) ->
+ frexp1(unpack(F)).
+
+%% @spec int_pow(X::integer(), N::integer()) -> Y::integer()
+%% @doc Moderately efficient way to exponentiate integers.
+%% int_pow(10, 2) = 100.
+int_pow(_X, 0) ->
+ 1;
+int_pow(X, N) when N > 0 ->
+ int_pow(X, N, 1).
+
+%% @spec int_ceil(F::float()) -> integer()
+%% @doc Return the ceiling of F as an integer. The ceiling is defined as
+%% F when F == trunc(F);
+%% trunc(F) when F &lt; 0;
+%% trunc(F) + 1 when F &gt; 0.
+int_ceil(X) ->
+ T = trunc(X),
+ case (X - T) of
+ Pos when Pos > 0 -> T + 1;
+ _ -> T
+ end.
+
+
+%% Internal API
+
+int_pow(X, N, R) when N < 2 ->
+ R * X;
+int_pow(X, N, R) ->
+ int_pow(X * X, N bsr 1, case N band 1 of 1 -> R * X; 0 -> R end).
+
+insert_decimal(0, S) ->
+ "0." ++ S;
+insert_decimal(Place, S) when Place > 0 ->
+ L = length(S),
+ case Place - L of
+ 0 ->
+ S ++ ".0";
+ N when N < 0 ->
+ {S0, S1} = lists:split(L + N, S),
+ S0 ++ "." ++ S1;
+ N when N < 6 ->
+ %% More places than digits
+ S ++ lists:duplicate(N, $0) ++ ".0";
+ _ ->
+ insert_decimal_exp(Place, S)
+ end;
+insert_decimal(Place, S) when Place > -6 ->
+ "0." ++ lists:duplicate(abs(Place), $0) ++ S;
+insert_decimal(Place, S) ->
+ insert_decimal_exp(Place, S).
+
+insert_decimal_exp(Place, S) ->
+ [C | S0] = S,
+ S1 = case S0 of
+ [] ->
+ "0";
+ _ ->
+ S0
+ end,
+ Exp = case Place < 0 of
+ true ->
+ "e-";
+ false ->
+ "e+"
+ end,
+ [C] ++ "." ++ S1 ++ Exp ++ integer_to_list(abs(Place - 1)).
+
+
+digits1(Float, Exp, Frac) ->
+ Round = ((Frac band 1) =:= 0),
+ case Exp >= 0 of
+ true ->
+ BExp = 1 bsl Exp,
+ case (Frac =/= ?BIG_POW) of
+ true ->
+ scale((Frac * BExp * 2), 2, BExp, BExp,
+ Round, Round, Float);
+ false ->
+ scale((Frac * BExp * 4), 4, (BExp * 2), BExp,
+ Round, Round, Float)
+ end;
+ false ->
+ case (Exp =:= ?MIN_EXP) orelse (Frac =/= ?BIG_POW) of
+ true ->
+ scale((Frac * 2), 1 bsl (1 - Exp), 1, 1,
+ Round, Round, Float);
+ false ->
+ scale((Frac * 4), 1 bsl (2 - Exp), 2, 1,
+ Round, Round, Float)
+ end
+ end.
+
+scale(R, S, MPlus, MMinus, LowOk, HighOk, Float) ->
+ Est = int_ceil(math:log10(abs(Float)) - 1.0e-10),
+ %% Note that the scheme implementation uses a 326 element look-up table
+ %% for int_pow(10, N) where we do not.
+ case Est >= 0 of
+ true ->
+ fixup(R, S * int_pow(10, Est), MPlus, MMinus, Est,
+ LowOk, HighOk);
+ false ->
+ Scale = int_pow(10, -Est),
+ fixup(R * Scale, S, MPlus * Scale, MMinus * Scale, Est,
+ LowOk, HighOk)
+ end.
+
+fixup(R, S, MPlus, MMinus, K, LowOk, HighOk) ->
+ TooLow = case HighOk of
+ true ->
+ (R + MPlus) >= S;
+ false ->
+ (R + MPlus) > S
+ end,
+ case TooLow of
+ true ->
+ [(K + 1) | generate(R, S, MPlus, MMinus, LowOk, HighOk)];
+ false ->
+ [K | generate(R * 10, S, MPlus * 10, MMinus * 10, LowOk, HighOk)]
+ end.
+
+generate(R0, S, MPlus, MMinus, LowOk, HighOk) ->
+ D = R0 div S,
+ R = R0 rem S,
+ TC1 = case LowOk of
+ true ->
+ R =< MMinus;
+ false ->
+ R < MMinus
+ end,
+ TC2 = case HighOk of
+ true ->
+ (R + MPlus) >= S;
+ false ->
+ (R + MPlus) > S
+ end,
+ case TC1 of
+ false ->
+ case TC2 of
+ false ->
+ [D | generate(R * 10, S, MPlus * 10, MMinus * 10,
+ LowOk, HighOk)];
+ true ->
+ [D + 1]
+ end;
+ true ->
+ case TC2 of
+ false ->
+ [D];
+ true ->
+ case R * 2 < S of
+ true ->
+ [D];
+ false ->
+ [D + 1]
+ end
+ end
+ end.
+
+unpack(Float) ->
+ <<Sign:1, Exp:11, Frac:52>> = <<Float:64/float>>,
+ {Sign, Exp, Frac}.
+
+frexp1({_Sign, 0, 0}) ->
+ {0.0, 0};
+frexp1({Sign, 0, Frac}) ->
+ Exp = log2floor(Frac),
+ <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, (Frac-1):52>>,
+ {Frac1, -(?FLOAT_BIAS) - 52 + Exp};
+frexp1({Sign, Exp, Frac}) ->
+ <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, Frac:52>>,
+ {Frac1, Exp - ?FLOAT_BIAS}.
+
+log2floor(Int) ->
+ log2floor(Int, 0).
+
+log2floor(0, N) ->
+ N;
+log2floor(Int, N) ->
+ log2floor(Int bsr 1, 1 + N).
+
+
+transform_digits(Place, [0 | Rest]) ->
+ transform_digits(Place, Rest);
+transform_digits(Place, Digits) ->
+ {Place, [$0 + D || D <- Digits]}.
+
+
+frexp_int(F) ->
+ case unpack(F) of
+ {_Sign, 0, Frac} ->
+ {Frac, ?MIN_EXP};
+ {_Sign, Exp, Frac} ->
+ {Frac + (1 bsl 52), Exp - 53 - ?FLOAT_BIAS}
+ end.
+
+%%
+%% Tests
+%%
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+int_ceil_test() ->
+ ?assertEqual(1, int_ceil(0.0001)),
+ ?assertEqual(0, int_ceil(0.0)),
+ ?assertEqual(1, int_ceil(0.99)),
+ ?assertEqual(1, int_ceil(1.0)),
+ ?assertEqual(-1, int_ceil(-1.5)),
+ ?assertEqual(-2, int_ceil(-2.0)),
+ ok.
+
+int_pow_test() ->
+ ?assertEqual(1, int_pow(1, 1)),
+ ?assertEqual(1, int_pow(1, 0)),
+ ?assertEqual(1, int_pow(10, 0)),
+ ?assertEqual(10, int_pow(10, 1)),
+ ?assertEqual(100, int_pow(10, 2)),
+ ?assertEqual(1000, int_pow(10, 3)),
+ ok.
+
+digits_test() ->
+ ?assertEqual("0",
+ digits(0)),
+ ?assertEqual("0.0",
+ digits(0.0)),
+ ?assertEqual("1.0",
+ digits(1.0)),
+ ?assertEqual("-1.0",
+ digits(-1.0)),
+ ?assertEqual("0.1",
+ digits(0.1)),
+ ?assertEqual("0.01",
+ digits(0.01)),
+ ?assertEqual("0.001",
+ digits(0.001)),
+ ?assertEqual("1.0e+6",
+ digits(1000000.0)),
+ ?assertEqual("0.5",
+ digits(0.5)),
+ ?assertEqual("4503599627370496.0",
+ digits(4503599627370496.0)),
+ %% small denormalized number
+ %% 4.94065645841246544177e-324 =:= 5.0e-324
+ <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
+ ?assertEqual("5.0e-324",
+ digits(SmallDenorm)),
+ ?assertEqual(SmallDenorm,
+ list_to_float(digits(SmallDenorm))),
+ %% large denormalized number
+ %% 2.22507385850720088902e-308
+ <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
+ ?assertEqual("2.225073858507201e-308",
+ digits(BigDenorm)),
+ ?assertEqual(BigDenorm,
+ list_to_float(digits(BigDenorm))),
+ %% small normalized number
+ %% 2.22507385850720138309e-308
+ <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
+ ?assertEqual("2.2250738585072014e-308",
+ digits(SmallNorm)),
+ ?assertEqual(SmallNorm,
+ list_to_float(digits(SmallNorm))),
+ %% large normalized number
+ %% 1.79769313486231570815e+308
+ <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
+ ?assertEqual("1.7976931348623157e+308",
+ digits(LargeNorm)),
+ ?assertEqual(LargeNorm,
+ list_to_float(digits(LargeNorm))),
+ %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
+ ?assertEqual("5.0e-324",
+ digits(math:pow(2, -1074))),
+ ok.
+
+frexp_test() ->
+ %% zero
+ ?assertEqual({0.0, 0}, frexp(0.0)),
+ %% one
+ ?assertEqual({0.5, 1}, frexp(1.0)),
+ %% negative one
+ ?assertEqual({-0.5, 1}, frexp(-1.0)),
+ %% small denormalized number
+ %% 4.94065645841246544177e-324
+ <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
+ ?assertEqual({0.5, -1073}, frexp(SmallDenorm)),
+ %% large denormalized number
+ %% 2.22507385850720088902e-308
+ <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
+ ?assertEqual(
+ {0.99999999999999978, -1022},
+ frexp(BigDenorm)),
+ %% small normalized number
+ %% 2.22507385850720138309e-308
+ <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
+ ?assertEqual({0.5, -1021}, frexp(SmallNorm)),
+ %% large normalized number
+ %% 1.79769313486231570815e+308
+ <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
+ ?assertEqual(
+ {0.99999999999999989, 1024},
+ frexp(LargeNorm)),
+ %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
+ ?assertEqual(
+ {0.5, -1073},
+ frexp(math:pow(2, -1074))),
+ ok.
+
+-endif.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ed258c71c9..7a021e378d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -176,7 +176,7 @@
-rabbit_boot_step({notify_cluster,
[{description, "notify cluster nodes"},
- {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {mfa, {rabbit_node_monitor, notify_node_up, []}},
{requires, networking}]}).
%%---------------------------------------------------------------------------
@@ -300,6 +300,8 @@ start() ->
%% We do not want to HiPE compile or upgrade
%% mnesia after just restarting the app
ok = ensure_application_loaded(),
+ ok = rabbit_node_monitor:prepare_cluster_status_files(),
+ ok = rabbit_mnesia:check_cluster_consistency(),
ok = ensure_working_log_handlers(),
ok = app_utils:start_applications(app_startup_order()),
ok = print_plugin_info(rabbit_plugins:active())
@@ -309,8 +311,13 @@ boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
maybe_hipe_compile(),
+ ok = rabbit_node_monitor:prepare_cluster_status_files(),
ok = ensure_working_log_handlers(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
+ %% It's important that the consistency check happens after
+ %% the upgrade, since if we are a secondary node the
+ %% primary node will have forgotten us
+ ok = rabbit_mnesia:check_cluster_consistency(),
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
ok = app_utils:load_applications(ToBeLoaded),
@@ -408,7 +415,6 @@ start(normal, []) ->
end.
stop(_State) ->
- ok = rabbit_mnesia:record_running_nodes(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
@@ -505,12 +511,12 @@ sort_boot_steps(UnsortedSteps) ->
end.
boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
{Err, Nodes} =
- case rabbit_mnesia:read_previously_running_nodes() of
+ case AllNodes -- [node()] of
[] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
" shut down forcefully~nit cannot determine which nodes"
- " are timing out. Details on all nodes will~nfollow.~n",
- rabbit_mnesia:all_clustered_nodes() -- [node()]};
+ " are timing out.~n", []};
Ns -> {rabbit_misc:format(
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a5f227bc19..b2473f9132 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,6 +40,8 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MAX_EXPIRY_TIMER, 4294967295).
+
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
-define(FAILOVER_WAIT_MILLIS, 100).
@@ -311,8 +313,17 @@ with(Name, F, E) ->
case lookup(Name) of
{ok, Q = #amqqueue{slave_pids = []}} ->
rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
- {ok, Q} ->
- E1 = fun () -> timer:sleep(25), with(Name, F, E) end,
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ %% We check is_process_alive(QPid) in case we receive a
+ %% nodedown (for example) in F() that has nothing to do
+ %% with the QPid.
+ E1 = fun () ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> E();
+ false -> timer:sleep(25),
+ with(Name, F, E)
+ end
+ end,
rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
{error, not_found} ->
E()
@@ -388,16 +399,18 @@ check_int_arg({Type, _}, _) ->
check_positive_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > 0 -> ok;
- ok -> {error, {value_zero_or_less, Val}};
- Error -> Error
+ ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
end.
check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val >= 0 -> ok;
- ok -> {error, {value_less_than_zero, Val}};
- Error -> Error
+ ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
@@ -565,7 +578,12 @@ flush_all(QPids, ChPid) ->
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_queue, QueueName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
+ end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
@@ -681,11 +699,10 @@ safe_delegate_call_ok(F, Pids) ->
fun () -> ok end,
fun () -> F(Pid) end)
end),
- case lists:filter(fun ({_Pid, {exit, {R, _}, _}}) ->
- rabbit_misc:is_abnormal_exit(R);
- ({_Pid, _}) ->
- false
- end, Bads) of
+ case lists:filter(
+ fun ({_Pid, {exit, {R, _}, _}}) -> rabbit_misc:is_abnormal_exit(R);
+ ({_Pid, _}) -> false
+ end, Bads) of
[] -> ok;
Bads1 -> {error, Bads1}
end.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b4071627ce..e647627cd9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ ttl_timer_expiry,
senders,
publish_seqno,
unconfirmed,
@@ -107,7 +108,7 @@
]).
-define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
%%----------------------------------------------------------------------------
@@ -559,24 +560,22 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- msg_seq_no = MsgSeqNo,
sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
case attempt_delivery(Delivery, Confirm, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
- %% the next two are optimisations
+ %% the next one is an optimisations
+ %% TODO: optimise the Confirm =/= never case too
{false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
discard_delivery(Delivery, State1);
- {false, State1 = #q{ttl = 0, dlx = undefined}} ->
- rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- discard_delivery(Delivery, State1);
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -716,28 +715,41 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
- ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- case DLXFun of
- undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
- BQS1;
- _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
- lists:foreach(
- fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
- BQS1
- end,
- ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
- when TTL =/= undefined ->
- case BQ:is_empty(BQS) of
- true -> State;
- false -> TRef = erlang:send_after(TTL, self(), drop_expired),
- State#q{ttl_timer_ref = TRef}
+ ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
+ {Props, BQS1} =
+ case DLXFun of
+ undefined ->
+ {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ ->
+ {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
+ DLXFun(Msgs),
+ {Next, BQS2}
+ end,
+ ensure_ttl_timer(case Props of
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
+ end, State#q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(undefined, State) ->
+ State;
+ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
+ State;
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
+ State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
+ ttl_timer_expiry = TExpiry})
+ when Expiry + 1000 < TExpiry ->
+ case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
-ensure_ttl_timer(State) ->
+ensure_ttl_timer(_Expiry, State) ->
State.
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
@@ -751,37 +763,17 @@ ack_if_no_dlx(_AckTags, State) ->
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
undefined;
dead_letter_fun(Reason, _State) ->
- fun(Msg, AckTag) ->
- gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
- end.
-
-dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
- DLMsg = #basic_message{exchange_name = XName} =
- make_dead_letter_msg(Reason, Msg, State),
- case rabbit_exchange:lookup(XName) of
- {ok, X} ->
- Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
- {Queues, Cycles} = detect_dead_letter_cycles(
- DLMsg, rabbit_exchange:route(X, Delivery)),
- lists:foreach(fun log_cycle_once/1, Cycles),
- QPids = rabbit_amqqueue:lookup(Queues),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
- DeliveredQPids;
- {error, not_found} ->
- []
- end.
-
-dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC}) ->
- QPids = dead_letter_publish(Msg, Reason, State),
- State1 = State#q{queue_monitors = pmon:monitor_all(
- QPids, State#q.queue_monitors),
- publish_seqno = MsgSeqNo + 1},
- case QPids of
- [] -> cleanup_after_confirm([AckTag], State1);
- _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
- noreply(State1#q{unconfirmed = UC1})
- end.
+ fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
+
+dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
+ DLMsg = make_dead_letter_msg(Reason, Msg, State),
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ {Queues, Cycles} = detect_dead_letter_cycles(
+ DLMsg, rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
@@ -1228,7 +1220,12 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
false -> fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- Fun = dead_letter_fun(rejected, State1),
+ Fun =
+ case dead_letter_fun(rejected, State1) of
+ undefined -> undefined;
+ F -> fun(M, A) -> F([{M, A}])
+ end
+ end,
BQS1 = BQ:fold(Fun, BQS, AckTags),
ack_if_no_dlx(
AckTags,
@@ -1280,8 +1277,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
- dead_letter_msg(Msg, AckTag, Reason, State);
+handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ noreply(lists:foldl(
+ fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMon}) ->
+ QPids = dead_letter_publish(Msg, Reason, X,
+ State1),
+ UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
+ QMons = pmon:monitor_all(QPids, QMon),
+ State1#q{queue_monitors = QMons,
+ publish_seqno = SeqNo + 1,
+ unconfirmed = UC1}
+ end, State, Msgs));
+ {error, not_found} ->
+ cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
+ end;
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 95523bedb8..d69a6c3b98 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -124,9 +124,11 @@
%% necessitate an ack or not. If they do, the function returns a list of
%% messages with the respective acktags.
-callback dropwhile(msg_pred(), true, state())
- -> {[{rabbit_types:basic_message(), ack()}], state()};
+ -> {rabbit_types:message_properties() | undefined,
+ [{rabbit_types:basic_message(), ack()}], state()};
(msg_pred(), false, state())
- -> {undefined, state()}.
+ -> {rabbit_types:message_properties() | undefined,
+ undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
@@ -150,6 +152,9 @@
%% Is my queue empty?
-callback is_empty(state()) -> boolean().
+%% What's the queue depth, where depth = length + number of pending acks
+-callback depth(state()) -> non_neg_integer().
+
%% For the next three functions, the assumption is that you're
%% monitoring something like the ingress and egress rates of the
%% queue. The RAM duration is thus the length of time represented by
@@ -210,9 +215,10 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
- {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1},
- {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1},
- {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}];
+ {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
+ {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
+ {discard, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a84800c0ec..e40d9b29e6 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [2, Res]},
+ BQ = {call, erlang, element, [3, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index f0ea514dcf..2e46235404 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -277,21 +277,15 @@ has_for_source(SrcName) ->
remove_for_source(SrcName) ->
lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
- Routes = lists:usort(
- mnesia:match_object(rabbit_route, Match, write) ++
- mnesia:match_object(rabbit_durable_route, Match, write)),
- [begin
- sync_route(Route, fun mnesia:delete_object/3),
- Route#route.binding
- end || Route <- Routes].
+ remove_routes(
+ lists:usort(mnesia:match_object(rabbit_route, Match, write) ++
+ mnesia:match_object(rabbit_durable_route, Match, write))).
-remove_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end).
+remove_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_routes/1).
-remove_transient_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end).
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_transient_routes/1).
%%----------------------------------------------------------------------------
@@ -308,6 +302,14 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
+delete_object(Tab, Record, LockKind) ->
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:match_object(Tab, Record, LockKind) of
+ [] -> ok;
+ [_] -> mnesia:delete_object(Tab, Record, LockKind)
+ end.
+
sync_route(R, Fun) -> sync_route(R, true, true, Fun).
sync_route(Route, true, true, Fun) ->
@@ -370,16 +372,32 @@ lock_route_tables() ->
rabbit_semi_durable_route,
rabbit_durable_route]].
-remove_for_destination(DstName, DeleteFun) ->
+remove_routes(Routes) ->
+ %% This partitioning allows us to suppress unnecessary delete
+ %% operations on disk tables, which require an fsync.
+ {TransientRoutes, DurableRoutes} =
+ lists:partition(fun (R) -> mnesia:match_object(
+ rabbit_durable_route, R, write) == [] end,
+ Routes),
+ [ok = sync_transient_route(R, fun mnesia:delete_object/3) ||
+ R <- TransientRoutes],
+ [ok = sync_route(R, fun mnesia:delete_object/3) ||
+ R <- DurableRoutes],
+ [R#route.binding || R <- Routes].
+
+remove_transient_routes(Routes) ->
+ [begin
+ ok = sync_transient_route(R, fun delete_object/3),
+ R#route.binding
+ end || R <- Routes].
+
+remove_for_destination(DstName, Fun) ->
lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
- ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
- Bindings = [begin
- Route = reverse_route(ReverseRoute),
- ok = DeleteFun(Route),
- Route#route.binding
- end || ReverseRoute <- ReverseRoutes],
+ Routes = [reverse_route(R) || R <- mnesia:match_object(
+ rabbit_reverse_route, Match, write)],
+ Bindings = Fun(Routes),
group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
lists:keysort(#binding.source, Bindings)).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 69fe0edca3..e50e823c7e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -465,10 +465,14 @@ check_user_id_header(#'P_basic'{user_id = Username},
#ch{user = #user{username = Username}}) ->
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
- #ch{user = #user{username = Actual}}) ->
- precondition_failed(
- "user_id property set to '~s' but authenticated user was '~s'",
- [Claimed, Actual]).
+ #ch{user = #user{username = Actual,
+ tags = Tags}}) ->
+ case lists:member(impersonator, Tags) of
+ true -> ok;
+ false -> precondition_failed(
+ "user_id property set to '~s' but authenticated user was "
+ "'~s'", [Claimed, Actual])
+ end.
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 0dda32f1f1..bd01a1b1d7 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -25,10 +25,14 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(RAM_OPT, "--ram").
+-define(OFFLINE_OPT, "--offline").
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(RAM_DEF, {?RAM_OPT, flag}).
+-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
@@ -41,8 +45,10 @@
force_reset,
rotate_logs,
- cluster,
- force_cluster,
+ {join_cluster, [?RAM_DEF]},
+ change_cluster_node_type,
+ update_cluster_nodes,
+ {forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
add_user,
@@ -60,9 +66,9 @@
{list_permissions, [?VHOST_DEF]},
list_user_permissions,
- set_parameter,
- clear_parameter,
- list_parameters,
+ {set_parameter, [?VHOST_DEF]},
+ {clear_parameter, [?VHOST_DEF]},
+ {list_parameters, [?VHOST_DEF]},
{list_queues, [?VHOST_DEF]},
{list_exchanges, [?VHOST_DEF]},
@@ -164,8 +170,8 @@ start() ->
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
- {error_string, Reason} ->
- print_error("~s", [Reason]),
+ {parse_error, {_Line, Mod, Err}} ->
+ print_error("~s", [lists:flatten(Mod:format_error(Err))]),
rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
@@ -239,17 +245,31 @@ action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-
-action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ DiscNode = not proplists:get_bool(?RAM_OPT, Opts),
+ Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]);
+
+action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
+ Inform("Turning ~p into a ram node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]);
+action(change_cluster_node_type, Node, [Type], _Opts, Inform)
+ when Type =:= "disc" orelse Type =:= "disk" ->
+ Inform("Turning ~p into a disc node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]);
+
+action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]);
+
+action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
+ Inform("Removing node ~p from cluster", [ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, forget_cluster_node,
+ [ClusterNode, RemoveWhenOffline]);
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
@@ -414,21 +434,25 @@ action(list_permissions, Node, [], Opts, Inform) ->
list_vhost_permissions, [VHost]}),
rabbit_auth_backend_internal:vhost_perms_info_keys());
-action(set_parameter, Node, [Component, Key, Value], _Opts, Inform) ->
+action(set_parameter, Node, [Component, Key, Value], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Setting runtime parameter ~p for component ~p to ~p",
[Key, Component, Value]),
rpc_call(Node, rabbit_runtime_parameters, parse_set,
- [list_to_binary(Component), list_to_binary(Key), Value]);
+ [VHostArg, list_to_binary(Component), list_to_binary(Key), Value]);
-action(clear_parameter, Node, [Component, Key], _Opts, Inform) ->
+action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]),
- rpc_call(Node, rabbit_runtime_parameters, clear, [list_to_binary(Component),
+ rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg,
+ list_to_binary(Component),
list_to_binary(Key)]);
-action(list_parameters, Node, Args = [], _Opts, Inform) ->
+action(list_parameters, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing runtime parameters", []),
display_info_list(
- rpc_call(Node, rabbit_runtime_parameters, list_formatted, Args),
+ rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
action(report, Node, _Args, _Opts, Inform) ->
@@ -445,16 +469,15 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
case erl_scan:string(Expr) of
{ok, Scanned, _} ->
case erl_parse:parse_exprs(Scanned) of
- {ok, Parsed} ->
- {value, Value, _} = unsafe_rpc(
- Node, erl_eval, exprs, [Parsed, []]),
- io:format("~p~n", [Value]),
- ok;
- {error, E} ->
- {error_string, format_parse_error(E)}
+ {ok, Parsed} -> {value, Value, _} =
+ unsafe_rpc(
+ Node, erl_eval, exprs, [Parsed, []]),
+ io:format("~p~n", [Value]),
+ ok;
+ {error, E} -> {parse_error, E}
end;
{error, E, _} ->
- {error_string, format_parse_error(E)}
+ {parse_error, E}
end.
%%----------------------------------------------------------------------------
@@ -543,9 +566,6 @@ exit_loop(Port) ->
{Port, _} -> exit_loop(Port)
end.
-format_parse_error({_Line, Mod, Err}) ->
- lists:flatten(Mod:format_error(Err)).
-
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index c87b1dc1ec..a669a2b37e 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -31,8 +31,8 @@
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
--spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(),
- rabbit_types:protocol(), pid(),
+-spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user()),
+ rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
@@ -64,22 +64,22 @@ list() ->
%%----------------------------------------------------------------------------
+connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_event:notify(connection_created, Infos),
+ {ok, {User, rabbit_reader:server_properties(Protocol)}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end;
+
connect(Username, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
true ->
case rabbit_access_control:check_user_login(Username, []) of
- {ok, User} ->
- try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> ok = pg_local:join(rabbit_direct, Pid),
- rabbit_event:notify(connection_created, Infos),
- {ok, {User,
- rabbit_reader:server_properties(Protocol)}}
- catch
- exit:#amqp_error{name = access_refused} ->
- {error, access_refused}
- end;
- {refused, _Msg, _Args} ->
- {error, auth_failure}
+ {ok, User} -> connect(User, VHost, Protocol, Pid, Infos);
+ {refused, _M, _A} -> {error, auth_failure}
end;
false ->
{error, broker_not_found_on_node}
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index e72181c061..6330d555fe 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -137,7 +137,7 @@ dir() -> rabbit_mnesia:dir().
set_disk_limits(State, Limit) ->
State1 = State#state { limit = Limit },
rabbit_log:info("Disk free limit set to ~pMB~n",
- [trunc(interpret_limit(Limit) / 1048576)]),
+ [trunc(interpret_limit(Limit) / 1000000)]),
internal_update(State1).
internal_update(State = #state { limit = Limit,
@@ -148,10 +148,10 @@ internal_update(State = #state { limit = Limit,
NewAlarmed = CurrentFreeBytes < LimitBytes,
case {Alarmed, NewAlarmed} of
{false, true} ->
- emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
+ emit_update_info("insufficient", CurrentFreeBytes, LimitBytes),
rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []});
{true, false} ->
- emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
+ emit_update_info("sufficient", CurrentFreeBytes, LimitBytes),
rabbit_alarm:clear_alarm({resource_limit, disk, node()});
_ ->
ok
@@ -187,10 +187,10 @@ interpret_limit({mem_relative, R}) ->
interpret_limit(L) ->
L.
-emit_update_info(State, CurrentFree, Limit) ->
+emit_update_info(StateStr, CurrentFree, Limit) ->
rabbit_log:info(
- "Disk free space limit now ~s. Free bytes:~p Limit:~p~n",
- [State, CurrentFree, Limit]).
+ "Disk free space ~s. Free bytes:~p Limit:~p~n",
+ [StateStr, CurrentFree, Limit]).
start_timer(Timeout) ->
{ok, TRef} = timer:send_interval(Timeout, update),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 57c571f1ff..4cc96ef552 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -402,7 +402,12 @@ conditional_delete(X = #exchange{name = XName}) ->
end.
unconditional_delete(X = #exchange{name = XName}) ->
- ok = mnesia:delete({rabbit_durable_exchange, XName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_exchange, XName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName})
+ end,
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 80b4e768a3..05aad8c903 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -59,21 +59,15 @@ start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- heartbeater(
- {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () ->
- SendFun(),
- continue
- end}).
+ heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () -> SendFun(), continue end}).
start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
- ReceiveFun(),
- stop
- end}).
+ heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
+ fun () -> ReceiveFun(), stop end}).
start_heartbeat_fun(SupPid) ->
fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
@@ -88,17 +82,11 @@ start_heartbeat_fun(SupPid) ->
{Sender, Receiver}
end.
-pause_monitor({_Sender, none}) ->
- ok;
-pause_monitor({_Sender, Receiver}) ->
- Receiver ! pause,
- ok.
+pause_monitor({_Sender, none}) -> ok;
+pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
-resume_monitor({_Sender, none}) ->
- ok;
-resume_monitor({_Sender, Receiver}) ->
- Receiver ! resume,
- ok.
+resume_monitor({_Sender, none}) -> ok;
+resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok.
%%----------------------------------------------------------------------------
start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
@@ -106,8 +94,7 @@ start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
supervisor2:start_child(
SupPid, {Name,
- {rabbit_heartbeat, Callback,
- [Sock, TimeoutSec, TimeoutFun]},
+ {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
@@ -117,15 +104,11 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
{StatVal, SameCount}) ->
Recurse = fun (V) -> heartbeater(Params, V) end,
receive
- pause ->
- receive
- resume ->
- Recurse({0, 0});
- Other ->
- exit({unexpected_message, Other})
- end;
- Other ->
- exit({unexpected_message, Other})
+ pause -> receive
+ resume -> Recurse({0, 0});
+ Other -> exit({unexpected_message, Other})
+ end;
+ Other -> exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 10debb0b08..4455b4419f 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -132,25 +132,31 @@
%% gm should be processed as normal, but fetches which are for
%% messages the slave has never seen should be ignored. Similarly,
%% acks for messages the slave never fetched should be
-%% ignored. Eventually, as the master is consumed from, the messages
-%% at the head of the queue which were there before the slave joined
-%% will disappear, and the slave will become fully synced with the
-%% state of the master. The detection of the sync-status of a slave is
-%% done entirely based on length: if the slave and the master both
-%% agree on the length of the queue after the fetch of the head of the
-%% queue (or a 'set_length' results in a slave having to drop some
-%% messages from the head of its queue), then the queues must be in
-%% sync. The only other possibility is that the slave's queue is
-%% shorter, and thus the fetch should be ignored. In case slaves are
-%% joined to an empty queue which only goes on to receive publishes,
-%% they start by asking the master to broadcast its length. This is
-%% enough for slaves to always be able to work out when their head
-%% does not differ from the master (and is much simpler and cheaper
-%% than getting the master to hang on to the guid of the msg at the
-%% head of its queue). When a slave is promoted to a master, it
-%% unilaterally broadcasts its length, in order to solve the problem
-%% of length requests from new slaves being unanswered by a dead
-%% master.
+%% ignored. Similarly, we don't republish rejected messages that we
+%% haven't seen. Eventually, as the master is consumed from, the
+%% messages at the head of the queue which were there before the slave
+%% joined will disappear, and the slave will become fully synced with
+%% the state of the master.
+%%
+%% The detection of the sync-status is based on the depth of the BQs,
+%% where the depth is defined as the sum of the length of the BQ (as
+%% per BQ:len) and the messages pending an acknowledgement. When the
+%% depth of the slave is equal to the master's, then the slave is
+%% synchronised. We only store the difference between the two for
+%% simplicity. Comparing the length is not enough since we need to
+%% take into account rejected messages which will make it back into
+%% the master queue but can't go back in the slave, since we don't
+%% want "holes" in the slave queue. Note that the depth, and the
+%% length likewise, must always be shorter on the slave - we assert
+%% that in various places. In case slaves are joined to an empty queue
+%% which only goes on to receive publishes, they start by asking the
+%% master to broadcast its depth. This is enough for slaves to always
+%% be able to work out when their head does not differ from the master
+%% (and is much simpler and cheaper than getting the master to hang on
+%% to the guid of the msg at the head of its queue). When a slave is
+%% promoted to a master, it unilaterally broadcasts its length, in
+%% order to solve the problem of length requests from new slaves being
+%% unanswered by a dead master.
%%
%% Obviously, due to the async nature of communication across gm, the
%% slaves can fall behind. This does not matter from a sync pov: if
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 750bcd56e2..c11a8ff7c0 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,8 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
- set_ram_duration_target/2, ram_duration/1,
+ requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
@@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -127,10 +127,13 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ Info = gm:info(GM),
+ Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
+ node(Pid) =/= node()],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
monitor_wait(MRefs),
+ ok = gm:forget_group(proplists:get_value(group_name, Info)),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
@@ -145,7 +148,7 @@ monitor_wait([MRef | MRefs]) ->
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0, false}),
+ ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -185,13 +188,13 @@ dropwhile(Pred, AckRequired,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
+ {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
+ ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -274,6 +277,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:is_empty(BQS).
+depth(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:depth(BQS).
+
set_ram_duration_target(Target, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state =
@@ -372,7 +378,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {length, Len}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -403,7 +409,7 @@ length_fun() ->
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State
end)
end.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 29e2d29f5c..89e334ddd2 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -64,9 +64,7 @@ remove_from_queue(QueueName, DeadPids) ->
slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid),
- DeadNodes) orelse
- rabbit_misc:is_process_alive(Pid)],
+ not lists:member(node(Pid), DeadNodes)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, []};
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e4d78c45da..3e45f02638 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -19,17 +19,8 @@
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
%%
-%% We join the GM group before we add ourselves to the amqqueue
-%% record. As a result:
-%% 1. We can receive msgs from GM that correspond to messages we will
-%% never receive from publishers.
-%% 2. When we receive a message from publishers, we must receive a
-%% message from the GM group for it.
-%% 3. However, that instruction from the GM group can arrive either
-%% before or after the actual message. We need to be able to
-%% distinguish between GM instructions arriving early, and case (1)
-%% above.
-%%
+%% We receive messages from GM and from publishers, and the gm
+%% messages can arrive either before or after the 'actual' message.
%% All instructions from the GM group must be processed in the order
%% in which they're received.
@@ -86,31 +77,37 @@
msg_id_status,
known_senders,
- synchronised
+ %% Master depth - local depth
+ depth_delta
}).
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-info(QPid) ->
- gen_server2:call(QPid, info, infinity).
+info(QPid) -> gen_server2:call(QPid, info, infinity).
init(#amqqueue { name = QueueName } = Q) ->
+ %% We join the GM group before we add ourselves to the amqqueue
+ %% record. As a result:
+ %% 1. We can receive msgs from GM that correspond to messages we will
+ %% never receive from publishers.
+ %% 2. When we receive a message from publishers, we must receive a
+ %% message from the GM group for it.
+ %% 3. However, that instruction from the GM group can arrive either
+ %% before or after the actual message. We need to be able to
+ %% distinguish between GM instructions arriving early, and case (1)
+ %% above.
+ %%
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(fun() ->
- init_it(Self, Node,
- QueueName)
- end) of
+ case rabbit_misc:execute_mnesia_transaction(
+ fun() -> init_it(Self, Node, QueueName) end) of
{new, MPid} ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@@ -133,7 +130,7 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ depth_delta = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -153,24 +150,21 @@ init_it(Self, Node, QueueName) ->
[Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] ->
- MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid};
- [QPid] ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> duplicate_live_master;
- false -> {stale, QPid}
- end;
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> existing;
- false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid}
- end
+ [] -> MPids1 = MPids ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid};
+ [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid}
+ end
end.
handle_call({deliver, Delivery = #delivery { immediate = true }},
@@ -356,14 +350,10 @@ prioritise_info(Msg, _State) ->
%% GM
%% ---------------------------------------------------------------------------
-joined([SPid], _Members) ->
- SPid ! {joined, self()},
- ok.
+joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, []) ->
- ok;
-members_changed([SPid], _Births, Deaths) ->
- inform_deaths(SPid, Deaths).
+members_changed([_SPid], _Births, []) -> ok;
+members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
handle_msg([_SPid], _From, master_changed) ->
ok;
@@ -396,7 +386,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
i(master_pid, #state { master_pid = MPid }) -> MPid;
-i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
@@ -584,9 +574,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
confirm_messages(MsgIds, State #state {
backing_queue_state = BQS1 })),
case BQ:needs_timeout(BQS1) of
- false -> {stop_sync_timer(State1), hibernate};
- idle -> {stop_sync_timer(State1), 0 };
- timed -> {ensure_sync_timer(State1), 0 }
+ false -> {stop_sync_timer(State1), hibernate };
+ idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
+ timed -> {ensure_sync_timer(State1), 0 }
end.
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
@@ -680,26 +670,24 @@ maybe_enqueue_message(
%% msg_seq_no was at the time. We do now!
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { sender_queues = SQ1,
- msg_id_status = dict:erase(MsgId, MS) };
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
{ok, {published, ChPid}} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
- case needs_confirming(Delivery, State1) of
- never ->
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- eventually ->
- State1 #state {
- msg_id_status =
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 }
- end;
+ {MS1, SQ1} =
+ case needs_confirming(Delivery, State1) of
+ never -> {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)};
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ {dict:store(MsgId, MMS, MS), SQ};
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)}
+ end,
+ State1 #state { msg_id_status = MS1,
+ sender_queues = SQ1 };
{ok, discarded} ->
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
@@ -748,18 +736,17 @@ process_instruction(
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } },
_EnqueueOnPromotion}}, MQ2} ->
- %% We received the msg from the channel first. Thus we
- %% need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never ->
- {MQ2, PendingCh, MS};
- eventually ->
- {MQ2, PendingCh,
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- {MQ2, PendingCh, MS}
- end;
+ {MQ2, PendingCh,
+ %% We received the msg from the channel first. Thus
+ %% we need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never -> MS;
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ dict:store(MsgId, MMS , MS);
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ MS
+ end};
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
@@ -814,43 +801,45 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- ToDrop = QLen - Length,
- {ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ ToDrop = case QLen - Length of
+ N when N > 0 -> N;
+ _ -> 0
+ end,
+ State1 = lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
+ BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ {ok, case AckRequired of
+ true -> State1;
+ false -> set_synchronised(ToDrop - Dropped, State1)
+ end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
- end};
+ {State1, Delta} =
+ case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ {maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 }),
+ 0};
+ _ when QLen =< Remaining ->
+ {State, case AckRequired of
+ true -> 0;
+ false -> -1
+ end}
+ end,
+ {ok, set_synchronised(Delta, State1)};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -858,27 +847,17 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
+ {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -896,10 +875,11 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
+process_instruction({depth, Depth},
+ State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+ {ok, set_synchronised(
+ 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -918,9 +898,6 @@ msg_ids_to_acktags(MsgIds, MA) ->
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
@@ -928,23 +905,38 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-%% We intentionally leave out the head where a slave becomes
-%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
- synchronised = false }) ->
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
- Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
- rabbit_mirror_queue_misc:store_updated_slaves(Q2)
- end
- end),
- State #state { synchronised = true };
-set_synchronised(true, State) ->
+set_synchronised(Delta, State) ->
+ set_synchronised(Delta, false, State).
+
+set_synchronised(_Delta, _AddAnyway,
+ State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
- State.
+set_synchronised(Delta, AddAnyway,
+ State = #state { depth_delta = DepthDelta,
+ q = #amqqueue { name = QName }}) ->
+ DepthDelta1 = DepthDelta + Delta,
+ %% We intentionally leave out the head where a slave becomes
+ %% unsynchronised: we assert that can never happen.
+ %% The `AddAnyway' param is there since in the `depth' instruction we
+ %% receive the master depth for the first time, and we want to set the sync
+ %% state anyway if we are synced.
+ case DepthDelta1 =:= 0 of
+ true when not (DepthDelta =:= 0) orelse AddAnyway ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ %% We might be there already, in the `AddAnyway'
+ %% case
+ SSPids1 = SSPids -- [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
+ end
+ end);
+ _ when DepthDelta1 >= 0 ->
+ ok
+ end,
+ State #state { depth_delta = DepthDelta1 }.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 8f6a9bcf4e..a0536a50a9 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,9 @@
-export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
+-export([version/0]).
+-export([sequence_error/1]).
+-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -217,6 +220,13 @@
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
+-spec(version/0 :: () -> string()).
+-spec(sequence_error/1 :: ([({'error', any()} | any())])
+ -> {'error', any()} | any()).
+-spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}).
+-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
+-spec(json_to_term/1 :: (any()) -> any()).
+-spec(term_to_json/1 :: (any()) -> any()).
-endif.
@@ -934,3 +944,46 @@ os_cmd(Command) ->
gb_sets_difference(S1, S2) ->
gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
+
+version() ->
+ {ok, VSN} = application:get_key(rabbit, vsn),
+ VSN.
+
+sequence_error([T]) -> T;
+sequence_error([{error, _} = Error | _]) -> Error;
+sequence_error([_ | Rest]) -> sequence_error(Rest).
+
+json_encode(Term) ->
+ try
+ {ok, mochijson2:encode(Term)}
+ catch
+ exit:{json_encode, E} ->
+ {error, E}
+ end.
+
+json_decode(Term) ->
+ try
+ {ok, mochijson2:decode(Term)}
+ catch
+ %% Sadly `mochijson2:decode/1' does not offer a nice way to catch
+ %% decoding errors...
+ error:_ -> error
+ end.
+
+json_to_term({struct, L}) ->
+ [{K, json_to_term(V)} || {K, V} <- L];
+json_to_term(L) when is_list(L) ->
+ [json_to_term(I) || I <- L];
+json_to_term(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
+ V =:= true orelse V =:= false ->
+ V.
+
+%% This has the flaw that empty lists will never be JSON objects, so use with
+%% care.
+term_to_json([{_, _}|_] = L) ->
+ {struct, [{K, term_to_json(V)} || {K, V} <- L]};
+term_to_json(L) when is_list(L) ->
+ [term_to_json(I) || I <- L];
+term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
+ V =:= true orelse V =:= false ->
+ V.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 5a97124638..f19046a07a 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -17,16 +17,42 @@
-module(rabbit_mnesia).
--export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3,
- is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
- create_cluster_nodes_config/1, read_cluster_nodes_config/0,
- record_running_nodes/0, read_previously_running_nodes/0,
- running_nodes_filename/0, is_disc_node/0, on_node_down/1,
- on_node_up/1]).
-
--export([table_names/0]).
+-export([init/0,
+ join_cluster/2,
+ reset/0,
+ force_reset/0,
+ update_cluster_nodes/1,
+ change_cluster_node_type/1,
+ forget_cluster_node/2,
+
+ status/0,
+ is_db_empty/0,
+ is_clustered/0,
+ all_clustered_nodes/0,
+ clustered_disc_nodes/0,
+ running_clustered_nodes/0,
+ is_disc_node/0,
+ dir/0,
+ table_names/0,
+ wait_for_tables/1,
+ cluster_status_from_mnesia/0,
+
+ init_db/3,
+ empty_ram_only_tables/0,
+ copy_db/1,
+ wait_for_tables/0,
+ check_cluster_consistency/0,
+ ensure_mnesia_dir/0,
+
+ on_node_up/1,
+ on_node_down/1
+ ]).
+
+%% Used internally in rpc calls
+-export([node_info/0,
+ remove_node_if_mnesia_running/1,
+ is_running_remote/0
+ ]).
%% create_tables/0 exported for helping embed RabbitMQ in or alongside
%% other mnesia-using Erlang applications, such as ejabberd
@@ -38,147 +64,138 @@
-ifdef(use_specs).
--export_type([node_type/0]).
+-export_type([node_type/0, cluster_status/0]).
--type(node_type() :: disc_only | disc | ram | unknown).
--spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
--spec(dir/0 :: () -> file:filename()).
--spec(ensure_mnesia_dir/0 :: () -> 'ok').
+-type(node_type() :: disc | ram).
+-type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
+ ordsets:ordset(node())}).
+
+%% Main interface
-spec(init/0 :: () -> 'ok').
--spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok').
--spec(is_db_empty/0 :: () -> boolean()).
--spec(cluster/1 :: ([node()]) -> 'ok').
--spec(force_cluster/1 :: ([node()]) -> 'ok').
--spec(cluster/2 :: ([node()], boolean()) -> 'ok').
+-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
+-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
+-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok').
+-spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok').
+
+%% Various queries to get the status of the db
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]}]).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
--spec(running_clustered_nodes/0 :: () -> [node()]).
-spec(all_clustered_nodes/0 :: () -> [node()]).
+-spec(clustered_disc_nodes/0 :: () -> [node()]).
+-spec(running_clustered_nodes/0 :: () -> [node()]).
+-spec(is_disc_node/0 :: () -> boolean()).
+-spec(dir/0 :: () -> file:filename()).
+-spec(table_names/0 :: () -> [atom()]).
+-spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} |
+ {'error', any()}).
+
+%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
+-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
--spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
--spec(read_cluster_nodes_config/0 :: () -> [node()]).
--spec(record_running_nodes/0 :: () -> 'ok').
--spec(read_previously_running_nodes/0 :: () -> [node()]).
--spec(running_nodes_filename/0 :: () -> file:filename()).
--spec(is_disc_node/0 :: () -> boolean()).
+-spec(check_cluster_consistency/0 :: () -> 'ok').
+-spec(ensure_mnesia_dir/0 :: () -> 'ok').
+
+%% Hooks used in `rabbit_node_monitor'
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(table_names/0 :: () -> [atom()]).
+%% Functions used in internal rpc calls
+-spec(node_info/0 :: () -> {string(), string(),
+ ({'ok', cluster_status()} | 'error')}).
+-spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' |
+ {'error', term()}).
-endif.
%%----------------------------------------------------------------------------
-
-status() ->
- [{nodes, case mnesia:system_info(is_running) of
- yes -> [{Key, Nodes} ||
- {Key, CopyType} <- [{disc_only, disc_only_copies},
- {disc, disc_copies},
- {ram, ram_copies}],
- begin
- Nodes = nodes_of_type(CopyType),
- Nodes =/= []
- end];
- no -> case all_clustered_nodes() of
- [] -> [];
- Nodes -> [{unknown, Nodes}]
- end;
- Reason when Reason =:= starting; Reason =:= stopping ->
- exit({rabbit_busy, try_again_later})
- end},
- {running_nodes, running_clustered_nodes()}].
+%% Main interface
+%%----------------------------------------------------------------------------
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- Nodes = read_cluster_nodes_config(),
- ok = init_db(Nodes, should_be_disc_node(Nodes)),
+ case is_virgin_node() of
+ true -> init_from_config();
+ false -> init(is_disc_node(), all_clustered_nodes())
+ end,
%% We intuitively expect the global name server to be synced when
- %% Mnesia is up. In fact that's not guaranteed to be the case - let's
- %% make it so.
+ %% Mnesia is up. In fact that's not guaranteed to be the case -
+ %% let's make it so.
ok = global:sync(),
- ok = delete_previously_running_nodes(),
ok.
-is_db_empty() ->
- lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
- table_names()).
+init(WantDiscNode, AllNodes) ->
+ init_db_and_upgrade(AllNodes, WantDiscNode, WantDiscNode).
+
+init_from_config() ->
+ {ok, {TryNodes, WantDiscNode}} =
+ application:get_env(rabbit, cluster_nodes),
+ case find_good_node(TryNodes -- [node()]) of
+ {ok, Node} ->
+ rabbit_log:info("Node '~p' selected for clustering from "
+ "configuration~n", [Node]),
+ {ok, {_, DiscNodes, _}} = discover_cluster(Node),
+ init_db_and_upgrade(DiscNodes, WantDiscNode, false),
+ rabbit_node_monitor:notify_joined_cluster();
+ none ->
+ rabbit_log:warning("Could not find any suitable node amongst the "
+ "ones provided in the configuration: ~p~n",
+ [TryNodes]),
+ init(true, [node()])
+ end.
+
+%% Make the node join a cluster. The node will be reset automatically
+%% before we actually cluster it. The nodes provided will be used to
+%% find out about the nodes in the cluster.
+%%
+%% This function will fail if:
+%%
+%% * The node is currently the only disc node of its cluster
+%% * We can't connect to any of the nodes provided
+%% * The node is currently already clustered with the cluster of the nodes
+%% provided
+%%
+%% Note that we make no attempt to verify that the nodes provided are
+%% all in the same cluster, we simply pick the first online node and
+%% we cluster to its cluster.
+join_cluster(DiscoveryNode, WantDiscNode) ->
+ case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of
+ true -> e(clustering_only_disc_node);
+ _ -> ok
+ end,
-cluster(ClusterNodes) ->
- cluster(ClusterNodes, false).
-force_cluster(ClusterNodes) ->
- cluster(ClusterNodes, true).
-
-%% Alter which disk nodes this node is clustered with. This can be a
-%% subset of all the disk nodes in the cluster but can (and should)
-%% include the node itself if it is to be a disk rather than a ram
-%% node. If Force is false, only connections to online nodes are
-%% allowed.
-cluster(ClusterNodes, Force) ->
- rabbit_misc:local_info_msg("Clustering with ~p~s~n",
- [ClusterNodes, if Force -> " forcefully";
- true -> ""
- end]),
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false) andalso
- not should_be_disc_node(ClusterNodes)
- of
- true -> log_both("last running disc node leaving cluster");
- _ -> ok
- end,
+ {ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of
+ {ok, Res} -> Res;
+ E = {error, _} -> throw(E)
+ end,
- %% Wipe mnesia if we're changing type from disc to ram
- case {is_disc_node(), should_be_disc_node(ClusterNodes)} of
- {true, false} -> rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg(
- "changing node type; wiping "
- "mnesia...~n~n")
- end),
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema);
- _ -> ok
+ case lists:member(node(), ClusterNodes) of
+ true -> e(already_clustered);
+ false -> ok
end,
- %% Pre-emptively leave the cluster
- %%
- %% We're trying to handle the following two cases:
- %% 1. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is re-clustered as a ram node. When it tries to
- %% re-join the cluster, but before it has time to update its
- %% tables definitions, the other node will order it to re-create
- %% its disc tables. So, we need to leave the cluster before we
- %% can join it again.
- %% 2. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is forcefully reset (so, the other node thinks its
- %% still a part of the cluster). The reset node is re-clustered
- %% as a ram node. Same as above, we need to leave the cluster
- %% before we can join it. But, since we don't know if we're in a
- %% cluster or not, we just pre-emptively leave it before joining.
- ProperClusterNodes = ClusterNodes -- [node()],
- try
- ok = leave_cluster(ProperClusterNodes, ProperClusterNodes)
- catch
- {error, {no_running_cluster_nodes, _, _}} when Force ->
- ok
- end,
+ %% reset the node. this simplifies things and it will be needed in
+ %% this case - we're joining a new cluster with new nodes which
+ %% are not in synch with the current node. I also lifts the burden
+ %% of reseting the node from the user.
+ reset(false),
+
+ rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]),
%% Join the cluster
- start_mnesia(),
- try
- ok = init_db(ClusterNodes, Force),
- ok = create_cluster_nodes_config(ClusterNodes)
- after
- stop_mnesia()
- end,
+ ok = init_db_with_mnesia(ClusterNodes, WantDiscNode, false),
+
+ rabbit_node_monitor:notify_joined_cluster(),
ok.
@@ -188,15 +205,399 @@ cluster(ClusterNodes, Force) ->
reset() -> reset(false).
force_reset() -> reset(true).
+reset(Force) ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
+ [if Force -> " forcefully";
+ true -> ""
+ end]),
+ ensure_mnesia_not_running(),
+ Node = node(),
+ case Force of
+ true ->
+ disconnect_nodes(nodes());
+ false ->
+ AllNodes = all_clustered_nodes(),
+ %% Reconnecting so that we will get an up to date nodes.
+ %% We don't need to check for consistency because we are
+ %% resetting. Force=true here so that reset still works
+ %% when clustered with a node which is down.
+ init_db_with_mnesia(AllNodes, is_disc_node(), false, true),
+ case is_disc_and_clustered() andalso
+ [node()] =:= clustered_disc_nodes()
+ of
+ true -> e(resetting_only_disc_node);
+ false -> ok
+ end,
+ leave_cluster(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
+ cannot_delete_schema),
+ disconnect_nodes(all_clustered_nodes()),
+ ok
+ end,
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ ok = rabbit_node_monitor:reset_cluster_status(),
+ ok.
+
+%% We need to make sure that we don't end up in a distributed Erlang
+%% system with nodes while not being in an Mnesia cluster with
+%% them. We don't handle that well.
+disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
+
+change_cluster_node_type(Type) ->
+ ensure_mnesia_dir(),
+ ensure_mnesia_not_running(),
+ case is_clustered() of
+ false -> e(not_clustered);
+ true -> ok
+ end,
+ {_, _, RunningNodes} =
+ case discover_cluster(all_clustered_nodes()) of
+ {ok, Status} -> Status;
+ {error, _Reason} -> e(cannot_connect_to_cluster)
+ end,
+ Node = case RunningNodes of
+ [] -> e(no_online_cluster_nodes);
+ [Node0|_] -> Node0
+ end,
+ ok = reset(false),
+ ok = join_cluster(Node, case Type of
+ ram -> false;
+ disc -> true
+ end).
+
+update_cluster_nodes(DiscoveryNode) ->
+ ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
+
+ Status = {AllNodes, _, _} =
+ case discover_cluster(DiscoveryNode) of
+ {ok, Status0} -> Status0;
+ {error, _Reason} -> e(cannot_connect_to_node)
+ end,
+ case ordsets:is_element(node(), AllNodes) of
+ true ->
+ %% As in `check_consistency/0', we can safely delete the
+ %% schema here, since it'll be replicated from the other
+ %% nodes
+ mnesia:delete_schema([node()]),
+ rabbit_node_monitor:write_cluster_status(Status),
+ init_db_with_mnesia(AllNodes, is_disc_node(), false);
+ false ->
+ e(inconsistent_cluster)
+ end,
+ ok.
+
+%% We proceed like this: try to remove the node locally. If the node
+%% is offline, we remove the node if:
+%% * This node is a disc node
+%% * All other nodes are offline
+%% * This node was, at the best of our knowledge (see comment below)
+%% the last or second to last after the node we're removing to go
+%% down
+forget_cluster_node(Node, RemoveWhenOffline) ->
+ case ordsets:is_element(Node, all_clustered_nodes()) of
+ true -> ok;
+ false -> e(not_a_cluster_node)
+ end,
+ case {mnesia:system_info(is_running), RemoveWhenOffline} of
+ {yes, true} -> e(online_node_offline_flag);
+ _ -> ok
+ end,
+ case remove_node_if_mnesia_running(Node) of
+ ok ->
+ ok;
+ {error, mnesia_not_running} when RemoveWhenOffline ->
+ remove_node_offline_node(Node);
+ {error, mnesia_not_running} ->
+ e(offline_node_no_offline_flag);
+ Err = {error, _} ->
+ throw(Err)
+ end.
+
+remove_node_offline_node(Node) ->
+ case {ordsets:del_element(Node, running_nodes(all_clustered_nodes())),
+ is_disc_node()} of
+ {[], true} ->
+ %% Note that while we check if the nodes was the last to
+ %% go down, apart from the node we're removing from, this
+ %% is still unsafe. Consider the situation in which A and
+ %% B are clustered. A goes down, and records B as the
+ %% running node. Then B gets clustered with C, C goes down
+ %% and B goes down. In this case, C is the second-to-last,
+ %% but we don't know that and we'll remove B from A
+ %% anyway, even if that will lead to bad things.
+ case ordsets:subtract(running_clustered_nodes(),
+ ordsets:from_list([node(), Node])) of
+ [] -> start_mnesia(),
+ try
+ [mnesia:force_load_table(T) ||
+ T <- rabbit_mnesia:table_names()],
+ forget_cluster_node(Node, false),
+ ensure_mnesia_running()
+ after
+ stop_mnesia()
+ end;
+ _ -> e(not_last_node_to_go_down)
+ end;
+ {_, _} ->
+ e(removing_node_from_offline_node)
+ end.
+
+
+%%----------------------------------------------------------------------------
+%% Queries
+%%----------------------------------------------------------------------------
+
+status() ->
+ IfNonEmpty = fun (_, []) -> [];
+ (Type, Nodes) -> [{Type, Nodes}]
+ end,
+ [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++
+ IfNonEmpty(ram, clustered_ram_nodes()))}] ++
+ case mnesia:system_info(is_running) of
+ yes -> [{running_nodes, running_clustered_nodes()}];
+ no -> []
+ end.
+
+is_db_empty() ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ table_names()).
+
is_clustered() ->
- RunningNodes = running_clustered_nodes(),
- [node()] /= RunningNodes andalso [] /= RunningNodes.
+ Nodes = all_clustered_nodes(),
+ [node()] =/= Nodes andalso [] =/= Nodes.
+
+is_disc_and_clustered() -> is_disc_node() andalso is_clustered().
+
+%% Functions that retrieve the nodes in the cluster will rely on the
+%% status file if offline.
+
+all_clustered_nodes() -> cluster_status(all).
-all_clustered_nodes() ->
- mnesia:system_info(db_nodes).
+clustered_disc_nodes() -> cluster_status(disc).
-running_clustered_nodes() ->
- mnesia:system_info(running_db_nodes).
+clustered_ram_nodes() -> ordsets:subtract(cluster_status(all),
+ cluster_status(disc)).
+
+running_clustered_nodes() -> cluster_status(running).
+
+running_clustered_disc_nodes() ->
+ {_, DiscNodes, RunningNodes} = cluster_status(),
+ ordsets:intersection(DiscNodes, RunningNodes).
+
+%% This function is the actual source of information, since it gets
+%% the data from mnesia. Obviously it'll work only when mnesia is
+%% running.
+mnesia_nodes() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ {error, mnesia_not_running};
+ yes ->
+ %% If the tables are not present, it means that
+ %% `init_db/3' hasn't been run yet. In other words, either
+ %% we are a virgin node or a restarted RAM node. In both
+ %% cases we're not interested in what mnesia has to say.
+ IsDiscNode = mnesia:system_info(use_dir),
+ Tables = mnesia:system_info(tables),
+ {Table, _} = case table_definitions(case IsDiscNode of
+ true -> disc;
+ false -> ram
+ end) of [T|_] -> T end,
+ case lists:member(Table, Tables) of
+ true ->
+ AllNodes =
+ ordsets:from_list(mnesia:system_info(db_nodes)),
+ DiscCopies = ordsets:from_list(
+ mnesia:table_info(schema, disc_copies)),
+ DiscNodes =
+ case IsDiscNode of
+ true -> ordsets:add_element(node(), DiscCopies);
+ false -> DiscCopies
+ end,
+ {ok, {AllNodes, DiscNodes}};
+ false ->
+ {error, tables_not_present}
+ end
+ end.
+
+cluster_status(WhichNodes, ForceMnesia) ->
+ %% I don't want to call `running_nodes/1' unless if necessary,
+ %% since it can deadlock when stopping applications.
+ Nodes = case mnesia_nodes() of
+ {ok, {AllNodes, DiscNodes}} ->
+ {ok, {AllNodes, DiscNodes,
+ fun() -> running_nodes(AllNodes) end}};
+ {error, _Reason} when not ForceMnesia ->
+ {AllNodes, DiscNodes, RunningNodes} =
+ rabbit_node_monitor:read_cluster_status(),
+ %% The cluster status file records the status when
+ %% the node is online, but we know for sure that
+ %% the node is offline now, so we can remove it
+ %% from the list of running nodes.
+ {ok,
+ {AllNodes, DiscNodes,
+ fun() -> ordsets:del_element(node(), RunningNodes) end}};
+ Err = {error, _} ->
+ Err
+ end,
+ case Nodes of
+ {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} ->
+ {ok, case WhichNodes of
+ status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
+ all -> AllNodes1;
+ disc -> DiscNodes1;
+ running -> RunningNodesThunk()
+ end};
+ Err1 = {error, _} ->
+ Err1
+ end.
+
+cluster_status(WhichNodes) ->
+ {ok, Status} = cluster_status(WhichNodes, false),
+ Status.
+
+cluster_status() -> cluster_status(status).
+
+cluster_status_from_mnesia() -> cluster_status(status, true).
+
+node_info() ->
+ {erlang:system_info(otp_release), rabbit_misc:version(),
+ cluster_status_from_mnesia()}.
+
+is_disc_node() ->
+ DiscNodes = clustered_disc_nodes(),
+ DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes).
+
+dir() -> mnesia:system_info(directory).
+
+table_names() -> [Tab || {Tab, _} <- table_definitions()].
+
+%%----------------------------------------------------------------------------
+%% Operations on the db
+%%----------------------------------------------------------------------------
+
+%% Adds the provided nodes to the mnesia cluster, creating a new
+%% schema if there is the need to and catching up if there are other
+%% nodes in the cluster already. It also updates the cluster status
+%% file.
+init_db(ClusterNodes, WantDiscNode, Force) ->
+ Nodes = change_extra_db_nodes(ClusterNodes, Force),
+ %% Note that we use `system_info' here and not the cluster status
+ %% since when we start rabbit for the first time the cluster
+ %% status will say we are a disc node but the tables won't be
+ %% present yet.
+ WasDiscNode = mnesia:system_info(use_dir),
+ case {Nodes, WasDiscNode, WantDiscNode} of
+ {[], _, false} ->
+ %% Standalone ram node, we don't want that
+ throw({error, cannot_create_standalone_ram_node});
+ {[], false, true} ->
+ %% RAM -> disc, starting from scratch
+ ok = create_schema();
+ {[], true, true} ->
+ %% First disc node up
+ ok;
+ {[AnotherNode | _], _, _} ->
+ %% Subsequent node in cluster, catch up
+ ensure_version_ok(
+ rpc:call(AnotherNode, rabbit_version, recorded, [])),
+ ok = wait_for_replicated_tables(),
+ %% The sequence in which we delete the schema and then the
+ %% other tables is important: if we delete the schema
+ %% first when moving to RAM mnesia will loudly complain
+ %% since it doesn't make much sense to do that. But when
+ %% moving to disc, we need to move the schema first.
+ case WantDiscNode of
+ true -> create_local_table_copy(schema, disc_copies),
+ create_local_table_copies(disc);
+ false -> create_local_table_copies(ram),
+ create_local_table_copy(schema, ram_copies)
+ end
+ end,
+ ensure_schema_integrity(),
+ rabbit_node_monitor:update_cluster_status(),
+ ok.
+
+init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
+ ok = init_db(ClusterNodes, WantDiscNode, Force),
+ ok = case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ starting_from_scratch -> rabbit_version:record_desired();
+ version_not_available -> schema_ok_or_move()
+ end,
+ %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget
+ %% about the cluster
+ case WantDiscNode of
+ false -> start_mnesia(),
+ change_extra_db_nodes(ClusterNodes, true),
+ wait_for_replicated_tables();
+ true -> ok
+ end,
+ ok.
+
+init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) ->
+ start_mnesia(CheckConsistency),
+ try
+ init_db_and_upgrade(ClusterNodes, WantDiscNode, Force)
+ after
+ stop_mnesia()
+ end.
+
+init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) ->
+ init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force).
+
+ensure_mnesia_dir() ->
+ MnesiaDir = dir() ++ "/",
+ case filelib:ensure_dir(MnesiaDir) of
+ {error, Reason} ->
+ throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
+ ok ->
+ ok
+ end.
+
+ensure_mnesia_running() ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ ok;
+ starting ->
+ wait_for(mnesia_running),
+ ensure_mnesia_running();
+ Reason when Reason =:= no; Reason =:= stopping ->
+ throw({error, mnesia_not_running})
+ end.
+
+ensure_mnesia_not_running() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ ok;
+ stopping ->
+ wait_for(mnesia_not_running),
+ ensure_mnesia_not_running();
+ Reason when Reason =:= yes; Reason =:= starting ->
+ throw({error, mnesia_unexpectedly_running})
+ end.
+
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
+check_schema_integrity() ->
+ Tables = mnesia:system_info(tables),
+ case check_tables(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_table_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait_for_tables(),
+ check_tables(fun check_table_content/2);
+ Other -> Other
+ end.
empty_ram_only_tables() ->
Node = node(),
@@ -209,13 +610,127 @@ empty_ram_only_tables() ->
end, table_names()),
ok.
+create_tables() -> create_tables(disc).
+
+create_tables(Type) ->
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed,
+ Tab, TabDef1, Reason}})
+ end
+ end,
+ table_definitions(Type)),
+ ok.
+
+copy_db(Destination) ->
+ ok = ensure_mnesia_not_running(),
+ rabbit_file:recursive_copy(dir(), Destination).
+
+wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+
+wait_for_tables() -> wait_for_tables(table_names()).
+
+wait_for_tables(TableNames) ->
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
+ end.
+
+%% This does not guarantee us much, but it avoids some situations that
+%% will definitely end up badly
+check_cluster_consistency() ->
+ %% We want to find 0 or 1 consistent nodes.
+ case lists:foldl(
+ fun (Node, {error, _}) -> check_cluster_consistency(Node);
+ (_Node, {ok, Status}) -> {ok, Status}
+ end, {error, not_found},
+ ordsets:del_element(node(), all_clustered_nodes()))
+ of
+ {ok, Status = {RemoteAllNodes, _, _}} ->
+ case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of
+ true ->
+ ok;
+ false ->
+ %% We delete the schema here since we think we are
+ %% clustered with nodes that are no longer in the
+ %% cluster and there is no other way to remove
+ %% them from our schema. On the other hand, we are
+ %% sure that there is another online node that we
+ %% can use to sync the tables with. There is a
+ %% race here: if between this check and the
+ %% `init_db' invocation the cluster gets
+ %% disbanded, we're left with a node with no
+ %% mnesia data that will try to connect to offline
+ %% nodes.
+ mnesia:delete_schema([node()])
+ end,
+ rabbit_node_monitor:write_cluster_status(Status);
+ {error, not_found} ->
+ ok;
+ E = {error, _} ->
+ throw(E)
+ end.
+
+check_cluster_consistency(Node) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} ->
+ {error, not_found};
+ {_OTP, _Rabbit, {error, _}} ->
+ {error, not_found};
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit, Node, Status) of
+ E = {error, _} -> E;
+ {ok, Res} -> {ok, Res}
+ end
+ end.
+
+%%--------------------------------------------------------------------
+%% Hooks for `rabbit_node_monitor'
+%%--------------------------------------------------------------------
+
+on_node_up(Node) ->
+ case running_clustered_disc_nodes() =:= [Node] of
+ true -> rabbit_log:info("cluster contains disc nodes again~n");
+ false -> ok
+ end.
+
+on_node_down(_Node) ->
+ case running_clustered_disc_nodes() =:= [] of
+ true -> rabbit_log:info("only running disc node went down~n");
+ false -> ok
+ end.
+
+%%--------------------------------------------------------------------
+%% Internal helpers
%%--------------------------------------------------------------------
-nodes_of_type(Type) ->
- %% This function should return the nodes of a certain type (ram,
- %% disc or disc_only) in the current cluster. The type of nodes
- %% is determined when the cluster is initially configured.
- mnesia:table_info(schema, Type).
+discover_cluster(Nodes) when is_list(Nodes) ->
+ lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
+ (Node, {error, _}) -> discover_cluster(Node)
+ end, {error, no_nodes_provided}, Nodes);
+discover_cluster(Node) ->
+ OfflineError =
+ {error, {cannot_discover_cluster,
+ "The nodes provided is either offline or not running"}},
+ case Node =:= node() of
+ true ->
+ {error, {cannot_discover_cluster,
+ "You provided the current node as node to cluster with"}};
+ false ->
+ case rpc:call(Node,
+ rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
+ end
+ end.
%% The tables aren't supposed to be on disk on a ram node
table_definitions(disc) ->
@@ -336,68 +851,11 @@ queue_name_match() ->
resource_match(Kind) ->
#resource{kind = Kind, _='_'}.
-table_names() ->
- [Tab || {Tab, _} <- table_definitions()].
-
replicated_table_names() ->
[Tab || {Tab, TabDef} <- table_definitions(),
not lists:member({local_content, true}, TabDef)
].
-dir() -> mnesia:system_info(directory).
-
-ensure_mnesia_dir() ->
- MnesiaDir = dir() ++ "/",
- case filelib:ensure_dir(MnesiaDir) of
- {error, Reason} ->
- throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
- ok ->
- ok
- end.
-
-ensure_mnesia_running() ->
- case mnesia:system_info(is_running) of
- yes ->
- ok;
- starting ->
- wait_for(mnesia_running),
- ensure_mnesia_running();
- Reason when Reason =:= no; Reason =:= stopping ->
- throw({error, mnesia_not_running})
- end.
-
-ensure_mnesia_not_running() ->
- case mnesia:system_info(is_running) of
- no ->
- ok;
- stopping ->
- wait_for(mnesia_not_running),
- ensure_mnesia_not_running();
- Reason when Reason =:= yes; Reason =:= starting ->
- throw({error, mnesia_unexpectedly_running})
- end.
-
-ensure_schema_integrity() ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
- end.
-
-check_schema_integrity() ->
- Tables = mnesia:system_info(tables),
- case check_tables(fun (Tab, TabDef) ->
- case lists:member(Tab, Tables) of
- false -> {error, {table_missing, Tab}};
- true -> check_table_attributes(Tab, TabDef)
- end
- end) of
- ok -> ok = wait_for_tables(),
- check_tables(fun check_table_content/2);
- Other -> Other
- end.
-
check_table_attributes(Tab, TabDef) ->
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
case mnesia:table_info(Tab, attributes) of
@@ -433,153 +891,6 @@ check_tables(Fun) ->
Errors -> {error, Errors}
end.
-%% The cluster node config file contains some or all of the disk nodes
-%% that are members of the cluster this node is / should be a part of.
-%%
-%% If the file is absent, the list is empty, or only contains the
-%% current node, then the current node is a standalone (disk)
-%% node. Otherwise it is a node that is part of a cluster as either a
-%% disk node, if it appears in the cluster node config, or ram node if
-%% it doesn't.
-
-cluster_nodes_config_filename() ->
- dir() ++ "/cluster_nodes.config".
-
-create_cluster_nodes_config(ClusterNodes) ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:write_term_file(FileName, [ClusterNodes]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-read_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [ClusterNodes]} -> ClusterNodes;
- {error, enoent} ->
- {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes),
- ClusterNodes;
- {error, Reason} ->
- throw({error, {cannot_read_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-delete_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} ->
- throw({error, {cannot_delete_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-running_nodes_filename() ->
- filename:join(dir(), "nodes_running_at_shutdown").
-
-record_running_nodes() ->
- FileName = running_nodes_filename(),
- Nodes = running_clustered_nodes() -- [node()],
- %% Don't check the result: we're shutting down anyway and this is
- %% a best-effort-basis.
- rabbit_file:write_term_file(FileName, [Nodes]),
- ok.
-
-read_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [Nodes]} -> Nodes;
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
- FileName, Reason}})
- end.
-
-delete_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
- FileName, Reason}})
- end.
-
-init_db(ClusterNodes, Force) ->
- init_db(
- ClusterNodes, Force,
- fun () ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ok;
- %% If we're just starting up a new node we won't have a
- %% version
- starting_from_scratch -> ok = rabbit_version:record_desired()
- end
- end).
-
-%% Take a cluster node config and create the right kind of node - a
-%% standalone disk node, or disk or ram node connected to the
-%% specified cluster nodes. If Force is false, don't allow
-%% connections to offline nodes.
-init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
- UClusterNodes = lists:usort(ClusterNodes),
- ProperClusterNodes = UClusterNodes -- [node()],
- case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
- {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
- throw({error, {failed_to_cluster_with, ProperClusterNodes,
- "Mnesia could not connect to any disc nodes."}});
- {ok, Nodes} ->
- WasDiscNode = is_disc_node(),
- WantDiscNode = should_be_disc_node(ClusterNodes),
- %% We create a new db (on disk, or in ram) in the first
- %% two cases and attempt to upgrade the in the other two
- case {Nodes, WasDiscNode, WantDiscNode} of
- {[], _, false} ->
- %% New ram node; start from scratch
- ok = create_schema(ram);
- {[], false, true} ->
- %% Nothing there at all, start from scratch
- ok = create_schema(disc);
- {[], true, true} ->
- %% We're the first node up
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ensure_schema_integrity();
- version_not_available -> ok = schema_ok_or_move()
- end;
- {[AnotherNode|_], _, _} ->
- %% Subsequent node in cluster, catch up
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_version, recorded, [])),
- {CopyType, CopyTypeAlt} =
- case WantDiscNode of
- true -> {disc, disc_copies};
- false -> {ram, ram_copies}
- end,
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, CopyTypeAlt),
- ok = create_local_table_copies(CopyType),
-
- ok = SecondaryPostMnesiaFun(),
- %% We've taken down mnesia, so ram nodes will need
- %% to re-sync
- case is_disc_node() of
- false -> start_mnesia(),
- mnesia:change_config(extra_db_nodes,
- ProperClusterNodes),
- wait_for_replicated_tables();
- true -> ok
- end,
-
- ensure_schema_integrity(),
- ok
- end;
- {error, Reason} ->
- %% one reason we may end up here is if we try to join
- %% nodes together that are currently running standalone or
- %% are members of a different cluster
- throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -592,7 +903,7 @@ schema_ok_or_move() ->
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
- ok = create_schema(disc)
+ ok = create_schema()
end.
ensure_version_ok({ok, DiscVersion}) ->
@@ -604,25 +915,16 @@ ensure_version_ok({ok, DiscVersion}) ->
ensure_version_ok({error, _}) ->
ok = rabbit_version:record_desired().
-create_schema(Type) ->
+%% We only care about disc nodes since ram nodes are supposed to catch
+%% up only
+create_schema() ->
stop_mnesia(),
- case Type of
- disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
- cannot_create_schema);
- ram -> %% remove the disc schema since this is a ram node
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema)
- end,
+ rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
start_mnesia(),
- ok = create_tables(Type),
+ ok = create_tables(disc),
ensure_schema_integrity(),
ok = rabbit_version:record_desired().
-is_disc_node() -> mnesia:system_info(use_dir).
-
-should_be_disc_node(ClusterNodes) ->
- ClusterNodes == [] orelse lists:member(node(), ClusterNodes).
-
move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
@@ -644,25 +946,6 @@ move_db() ->
start_mnesia(),
ok.
-copy_db(Destination) ->
- ok = ensure_mnesia_not_running(),
- rabbit_file:recursive_copy(dir(), Destination).
-
-create_tables() -> create_tables(disc).
-
-create_tables(Type) ->
- lists:foreach(fun ({Tab, TabDef}) ->
- TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
- {atomic, ok} -> ok;
- {aborted, Reason} ->
- throw({error, {table_creation_failed,
- Tab, TabDef1, Reason}})
- end
- end,
- table_definitions(Type)),
- ok.
-
copy_type_to_ram(TabDef) ->
[{disc_copies, []}, {ram_copies, [node()]}
| proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))].
@@ -684,13 +967,6 @@ create_local_table_copies(Type) ->
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
end;
-%%% unused code - commented out to keep dialyzer happy
-%%% Type =:= disc_only ->
-%%% if
-%%% HasDiscCopies or HasDiscOnlyCopies ->
-%%% disc_only_copies;
-%%% true -> ram_copies
-%%% end;
Type =:= ram ->
ram_copies
end,
@@ -711,120 +987,187 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
-
-wait_for_tables() -> wait_for_tables(table_names()).
-
-wait_for_tables(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok ->
- ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
+remove_node_if_mnesia_running(Node) ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ %% Deleting the the schema copy of the node will result in
+ %% the node being removed from the cluster, with that
+ %% change being propagated to all nodes
+ case mnesia:del_table_copy(schema, Node) of
+ {atomic, ok} ->
+ rabbit_node_monitor:notify_left_cluster(Node),
+ ok;
+ {aborted, Reason} ->
+ {error, {failed_to_remove_node, Node, Reason}}
+ end;
+ no ->
+ {error, mnesia_not_running}
end.
-reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
- [if Force -> " forcefully";
- true -> ""
- end]),
- ensure_mnesia_not_running(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false)
+leave_cluster() ->
+ case {is_clustered(),
+ running_nodes(ordsets:del_element(node(), all_clustered_nodes()))}
of
- true -> log_both("no other disc nodes running");
- false -> ok
- end,
- Node = node(),
- Nodes = all_clustered_nodes() -- [Node],
- case Force of
- true -> ok;
- false ->
- ensure_mnesia_dir(),
- start_mnesia(),
- RunningNodes =
- try
- %% Force=true here so that reset still works when clustered
- %% with a node which is down
- ok = init_db(read_cluster_nodes_config(), true),
- running_clustered_nodes() -- [Node]
- after
- stop_mnesia()
- end,
- leave_cluster(Nodes, RunningNodes),
- rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
- cannot_delete_schema)
- end,
- %% We need to make sure that we don't end up in a distributed
- %% Erlang system with nodes while not being in an Mnesia cluster
- %% with them. We don't handle that well.
- [erlang:disconnect_node(N) || N <- Nodes],
- ok = delete_cluster_nodes_config(),
- %% remove persisted messages and any other garbage we find
- ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok.
+ {false, []} -> ok;
+ {_, AllNodes} -> case lists:any(fun leave_cluster/1, AllNodes) of
+ true -> ok;
+ false -> e(no_running_cluster_nodes)
+ end
+ end.
-leave_cluster([], _) -> ok;
-leave_cluster(Nodes, RunningNodes) ->
- %% find at least one running cluster node and instruct it to
- %% remove our schema copy which will in turn result in our node
- %% being removed as a cluster node from the schema, with that
- %% change being propagated to all nodes
- case lists:any(
- fun (Node) ->
- case rpc:call(Node, mnesia, del_table_copy,
- [schema, node()]) of
- {atomic, ok} -> true;
- {badrpc, nodedown} -> false;
- {aborted, {node_not_running, _}} -> false;
- {aborted, Reason} ->
- throw({error, {failed_to_leave_cluster,
- Nodes, RunningNodes, Reason}})
- end
- end,
- RunningNodes) of
- true -> ok;
- false -> throw({error, {no_running_cluster_nodes,
- Nodes, RunningNodes}})
+leave_cluster(Node) ->
+ case rpc:call(Node,
+ rabbit_mnesia, remove_node_if_mnesia_running, [node()]) of
+ ok -> true;
+ {error, mnesia_not_running} -> false;
+ {error, Reason} -> throw({error, Reason});
+ {badrpc, nodedown} -> false
end.
wait_for(Condition) ->
error_logger:info_msg("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
-on_node_up(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
+start_mnesia(CheckConsistency) ->
+ case CheckConsistency of
+ true -> check_cluster_consistency();
false -> ok
- end.
-
-on_node_down(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("only running disc node went down~n");
- false -> ok
- end.
-
-is_only_disc_node(Node, _MnesiaRunning = true) ->
- RunningSet = sets:from_list(running_clustered_nodes()),
- DiscSet = sets:from_list(nodes_of_type(disc_copies)),
- [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet));
-is_only_disc_node(Node, false) ->
- start_mnesia(),
- Res = is_only_disc_node(Node, true),
- stop_mnesia(),
- Res.
-
-log_both(Warning) ->
- io:format("Warning: ~s~n", [Warning]),
- rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg("~s~n", [Warning]) end).
-
-start_mnesia() ->
+ end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ensure_mnesia_running().
+start_mnesia() ->
+ start_mnesia(true).
+
stop_mnesia() ->
stopped = mnesia:stop(),
ensure_mnesia_not_running().
+
+change_extra_db_nodes(ClusterNodes0, Force) ->
+ ClusterNodes = lists:usort(ClusterNodes0) -- [node()],
+ case mnesia:change_config(extra_db_nodes, ClusterNodes) of
+ {ok, []} when not Force andalso ClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ClusterNodes,
+ "Mnesia could not connect to any nodes."}});
+ {ok, Nodes} ->
+ Nodes
+ end.
+
+%% What we really want is nodes running rabbit, not running
+%% mnesia. Using `mnesia:system_info(running_db_nodes)' will
+%% return false positives when we are actually just doing cluster
+%% operations (e.g. joining the cluster).
+running_nodes(Nodes) ->
+ {Replies, _BadNodes} =
+ rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []),
+ [Node || {Running, Node} <- Replies, Running].
+
+is_running_remote() ->
+ {proplists:is_defined(rabbit, application:which_applications(infinity)),
+ node()}.
+
+check_consistency(OTP, Rabbit) ->
+ rabbit_misc:sequence_error(
+ [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]).
+
+check_consistency(OTP, Rabbit, Node, Status) ->
+ rabbit_misc:sequence_error(
+ [check_otp_consistency(OTP),
+ check_rabbit_consistency(Rabbit),
+ check_nodes_consistency(Node, Status)]).
+
+check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
+ ThisNode = node(),
+ case ordsets:is_element(ThisNode, RemoteAllNodes) of
+ true ->
+ {ok, RemoteStatus};
+ false ->
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("Node ~p thinks it's clustered "
+ "with node ~p, but ~p disagrees",
+ [ThisNode, Node, Node])}}
+ end.
+
+check_version_consistency(This, Remote, _) when This =:= Remote ->
+ ok;
+check_version_consistency(This, Remote, Name) ->
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("~s version mismatch: local node is ~s, "
+ "remote node ~s", [Name, This, Remote])}}.
+
+check_otp_consistency(Remote) ->
+ check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
+
+check_rabbit_consistency(Remote) ->
+ check_version_consistency(rabbit_misc:version(), Remote, "Rabbit").
+
+%% This is fairly tricky. We want to know if the node is in the state
+%% that a `reset' would leave it in. We cannot simply check if the
+%% mnesia tables aren't there because restarted RAM nodes won't have
+%% tables while still being non-virgin. What we do instead is to
+%% check if the mnesia directory is non existant or empty, with the
+%% exception of the cluster status file, which will be there thanks to
+%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
+is_virgin_node() ->
+ case rabbit_file:list_dir(dir()) of
+ {error, enoent} -> true;
+ {ok, []} -> true;
+ {ok, [File]} -> (dir() ++ "/" ++ File) =:=
+ [rabbit_node_monitor:cluster_status_filename(),
+ rabbit_node_monitor:running_nodes_filename()];
+ {ok, _} -> false
+ end.
+
+find_good_node([]) ->
+ none;
+find_good_node([Node | Nodes]) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} -> find_good_node(Nodes);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
+ end.
+
+e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
+
+error_description(clustering_only_disc_node) ->
+ "You cannot cluster a node if it is the only disc node in its existing "
+ " cluster. If new nodes joined while this node was offline, use "
+ "\"update_cluster_nodes\" to add them manually.";
+error_description(resetting_only_disc_node) ->
+ "You cannot reset a node when it is the only disc node in a cluster. "
+ "Please convert another node of the cluster to a disc node first.";
+error_description(already_clustered) ->
+ "You are already clustered with the nodes you have selected.";
+error_description(not_clustered) ->
+ "Non-clustered nodes can only be disc nodes.";
+error_description(cannot_connect_to_cluster) ->
+ "Could not connect to the cluster nodes present in this node's "
+ "status file. If the cluster has changed, you can use the "
+ "\"update_cluster_nodes\" command to point to the new cluster nodes.";
+error_description(no_online_cluster_nodes) ->
+ "Could not find any online cluster nodes. If the cluster has changed, "
+ "you can use the 'recluster' command.";
+error_description(cannot_connect_to_node) ->
+ "Could not connect to the cluster node provided.";
+error_description(inconsistent_cluster) ->
+ "The nodes provided do not have this node as part of the cluster.";
+error_description(not_a_cluster_node) ->
+ "The node selected is not in the cluster.";
+error_description(online_node_offline_flag) ->
+ "You set the --offline flag, which is used to remove nodes remotely from "
+ "offline nodes, but this node is online.";
+error_description(offline_node_no_offline_flag) ->
+ "You are trying to remove a node from an offline node. That is dangerous, "
+ "but can be done with the --offline flag. Please consult the manual "
+ "for rabbitmqctl for more information.";
+error_description(not_last_node_to_go_down) ->
+ "The node you're trying to remove from was not the last to go down "
+ "(excluding the node you are removing). Please use the the last node "
+ "to go down to remove nodes when the cluster is offline.";
+error_description(removing_node_from_offline_node) ->
+ "To remove a node remotely from an offline node, the node you're removing "
+ "from must be a disc node and all the other nodes must be offline.";
+error_description(no_running_cluster_nodes) ->
+ "You cannot leave a cluster if no online nodes are present.".
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d69dad1f8b..c2e55022d9 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1394,7 +1394,7 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
-list_sorted_file_names(Dir, Ext) ->
+list_sorted_filenames(Dir, Ext) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
filelib:wildcard("*" ++ Ext, Dir)).
@@ -1531,8 +1531,8 @@ count_msg_refs(Gen, Seed, State) ->
end.
recover_crashed_compactions(Dir) ->
- FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
- TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
+ FileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION),
+ TmpFileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION_TMP),
lists:foreach(
fun (TmpFileName) ->
NonTmpRelatedFileName =
@@ -1609,7 +1609,7 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
{ok, Pid} = gatherer:start_link(),
case [filename_to_num(FileName) ||
- FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
+ FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of
[] -> build_index(Pid, undefined, [State #msstate.current_file],
State);
Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
@@ -2023,7 +2023,7 @@ transform_dir(BaseDir, Store, TransformFun) ->
CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
case filelib:is_dir(TmpDir) of
true -> throw({error, transform_failed_previously});
- false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ false -> FileList = list_sorted_filenames(Dir, ?FILE_EXTENSION),
foreach_file(Dir, TmpDir, TransformFile, FileList),
foreach_file(Dir, fun file:delete/1, FileList),
foreach_file(TmpDir, Dir, CopyFile, FileList),
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index bedf5142da..038154c36e 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
- close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
+ close/1, fast_close/1, sockname/1, peername/1, peercert/1,
tune_buffer_size/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -59,7 +59,7 @@
-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
--spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
+-spec(fast_close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
@@ -77,6 +77,8 @@
%%---------------------------------------------------------------------------
+-define(SSL_CLOSE_TIMEOUT, 5000).
+
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
is_ssl(Sock) -> ?IS_SSL(Sock).
@@ -148,8 +150,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
-maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok;
-maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok.
+fast_close(Sock) when ?IS_SSL(Sock) ->
+ %% We cannot simply port_close the underlying tcp socket since the
+ %% TLS protocol is quite insistent that a proper closing handshake
+ %% should take place (see RFC 5245 s7.2.1). So we call ssl:close
+ %% instead, but that can block for a very long time, e.g. when
+ %% there is lots of pending output and there is tcp backpressure,
+ %% or the ssl_connection process has entered the the
+ %% workaround_transport_delivery_problems function during
+ %% termination, which, inexplicably, does a gen_tcp:recv(Socket,
+ %% 0), which may never return if the client doesn't send a FIN or
+ %% that gets swallowed by the network. Since there is no timeout
+ %% variant of ssl:close, we construct our own.
+ {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end),
+ erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}),
+ receive
+ {Pid, ssl_close_timeout} ->
+ erlang:demonitor(MRef, [flush]),
+ exit(Pid, kill);
+ {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end,
+ catch port_close(Sock#ssl_socket.tcp),
+ ok;
+fast_close(Sock) when is_port(Sock) ->
+ catch port_close(Sock), ok.
sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 94a5a2b79d..2d0ded1239 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -160,7 +160,19 @@ ssl_transform_fun(SslOpts) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
+ {error, timeout} ->
+ {error, {ssl_upgrade_error, timeout}};
{error, Reason} ->
+ %% We have no idea what state the ssl_connection
+ %% process is in - it could still be happily
+ %% going, it might be stuck, or it could be just
+ %% about to fail. There is little that our caller
+ %% can do but close the TCP socket, but this could
+ %% cause ssl alerts to get dropped (which is bad
+ %% form, according to the TLS spec). So we give
+ %% the ssl_connection a little bit of time to send
+ %% such alerts.
+ timer:sleep(?SSL_TIMEOUT * 1000),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 323cf0ce9e..64c801f282 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,11 +18,29 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([running_nodes_filename/0,
+ cluster_status_filename/0,
+ prepare_cluster_status_files/0,
+ write_cluster_status/1,
+ read_cluster_status/0,
+ update_cluster_status/0,
+ reset_cluster_status/0,
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
--export([notify_cluster/0, rabbit_running_on/1]).
+ joined_cluster/2,
+ notify_joined_cluster/0,
+ left_cluster/1,
+ notify_left_cluster/1,
+ node_up/2,
+ notify_node_up/0,
+
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+ ]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -31,56 +49,198 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(rabbit_running_on/1 :: (node()) -> 'ok').
--spec(notify_cluster/0 :: () -> 'ok').
+-spec(running_nodes_filename/0 :: () -> string()).
+-spec(cluster_status_filename/0 :: () -> string()).
+-spec(prepare_cluster_status_files/0 :: () -> 'ok').
+-spec(write_cluster_status/1 :: (rabbit_mnesia:cluster_status()) -> 'ok').
+-spec(read_cluster_status/0 :: () -> rabbit_mnesia:cluster_status()).
+-spec(update_cluster_status/0 :: () -> 'ok').
+-spec(reset_cluster_status/0 :: () -> 'ok').
+
+-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_joined_cluster/0 :: () -> 'ok').
+-spec(left_cluster/1 :: (node()) -> 'ok').
+-spec(notify_left_cluster/1 :: (node()) -> 'ok').
+-spec(node_up/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_node_up/0 :: () -> 'ok').
-endif.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Cluster file operations
+%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+%% The cluster file information is kept in two files. The "cluster status file"
+%% contains all the clustered nodes and the disc nodes. The "running nodes
+%% file" contains the currently running nodes or the running nodes at shutdown
+%% when the node is down.
+%%
+%% We strive to keep the files up to date and we rely on this assumption in
+%% various situations. Obviously when mnesia is offline the information we have
+%% will be outdated, but it can't be otherwise.
-rabbit_running_on(Node) ->
- gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+running_nodes_filename() ->
+ filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
-notify_cluster() ->
- Node = node(),
- Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
- %% notify other rabbits of this rabbit
- case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
- [Node], ?RABBIT_UP_RPC_TIMEOUT) of
- {_, [] } -> ok;
- {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
- end,
+cluster_status_filename() ->
+ rabbit_mnesia:dir() ++ "/cluster_nodes.config".
+
+prepare_cluster_status_files() ->
+ rabbit_mnesia:ensure_mnesia_dir(),
+ CorruptFiles = fun () -> throw({error, corrupt_cluster_status_files}) end,
+ RunningNodes1 = case try_read_file(running_nodes_filename()) of
+ {ok, [Nodes]} when is_list(Nodes) -> Nodes;
+ {ok, _ } -> CorruptFiles();
+ {error, enoent} -> []
+ end,
+ {AllNodes1, WantDiscNode} =
+ case try_read_file(cluster_status_filename()) of
+ {ok, [{AllNodes, DiscNodes0}]} ->
+ {AllNodes, lists:member(node(), DiscNodes0)};
+ {ok, [AllNodes0]} when is_list(AllNodes0) ->
+ {legacy_cluster_nodes(AllNodes0),
+ legacy_should_be_disc_node(AllNodes0)};
+ {ok, _} ->
+ CorruptFiles();
+ {error, enoent} ->
+ {legacy_cluster_nodes([]), true}
+ end,
+
+ ThisNode = [node()],
+
+ RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode),
+ AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
+ DiscNodes = case WantDiscNode of
+ true -> ThisNode;
+ false -> []
+ end,
+
+ ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
+
+write_cluster_status({All, Disc, Running}) ->
+ ClusterStatusFN = cluster_status_filename(),
+ Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
+ ok ->
+ RunningNodesFN = running_nodes_filename(),
+ {RunningNodesFN,
+ rabbit_file:write_term_file(RunningNodesFN, [Running])};
+ E1 = {error, _} ->
+ {ClusterStatusFN, E1}
+ end,
+ case Res of
+ {_, ok} -> ok;
+ {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
+ end.
+
+try_read_file(FileName) ->
+ case rabbit_file:read_term_file(FileName) of
+ {ok, Term} -> {ok, Term};
+ {error, enoent} -> {error, enoent};
+ {error, E} -> throw({error, {cannot_read_file, FileName, E}})
+ end.
+
+read_cluster_status() ->
+ case {try_read_file(cluster_status_filename()),
+ try_read_file(running_nodes_filename())} of
+ {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
+ {All, Disc, Running};
+ {_, _} ->
+ throw({error, corrupt_or_missing_cluster_files})
+ end.
+
+update_cluster_status() ->
+ {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status(Status).
+
+reset_cluster_status() ->
+ write_cluster_status({[node()], [node()], [node()]}).
+
+%%----------------------------------------------------------------------------
+%% Cluster notifications
+%%----------------------------------------------------------------------------
+
+joined_cluster(Node, IsDiscNode) ->
+ gen_server:cast(?SERVER, {rabbit_join, Node, IsDiscNode}).
+
+notify_joined_cluster() ->
+ cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]),
+ ok.
+
+left_cluster(Node) ->
+ gen_server:cast(?SERVER, {left_cluster, Node}).
+
+notify_left_cluster(Node) ->
+ left_cluster(Node),
+ cluster_multicall(left_cluster, [Node]),
+ ok.
+
+node_up(Node, IsDiscNode) ->
+ gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}).
+
+notify_node_up() ->
+ Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]),
%% register other active rabbits with this rabbit
- [ rabbit_running_on(N) || N <- Nodes ],
+ [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) ||
+ N <- Nodes ],
ok.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
- {ok, ordsets:new()}.
+ {ok, no_state}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, Nodes) ->
- case ordsets:is_element(Node, Nodes) of
- true -> {noreply, Nodes};
+%% Note: when updating the status file, we can't simply write the mnesia
+%% information since the message can (and will) overtake the mnesia propagation.
+handle_cast({node_up, Node, IsDiscNode}, State) ->
+ case is_already_monitored({rabbit, Node}) of
+ true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(
+ Node, DiscNodes);
+ false -> DiscNodes
+ end,
+ ordsets:add_element(Node, RunningNodes)}),
erlang:monitor(process, {rabbit, Node}),
ok = handle_live_rabbit(Node),
- {noreply, ordsets:add_element(Node, Nodes)}
+ {noreply, State}
end;
+handle_cast({joined_cluster, Node, IsDiscNode}, State) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node,
+ DiscNodes);
+ false -> DiscNodes
+ end,
+ RunningNodes}),
+ {noreply, State};
+handle_cast({left_cluster, Node}, State) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:del_element(Node, AllNodes),
+ ordsets:del_element(Node, DiscNodes),
+ ordsets:del_element(Node, RunningNodes)}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({AllNodes, DiscNodes,
+ ordsets:del_element(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, ordsets:del_element(Node, Nodes)};
+ {noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -90,7 +250,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Functions that call the module specific hooks when nodes go up/down
+%%----------------------------------------------------------------------------
%% TODO: This may turn out to be a performance hog when there are lots
%% of nodes. We really only need to execute some of these statements
@@ -104,3 +266,32 @@ handle_dead_rabbit(Node) ->
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
+
+%%--------------------------------------------------------------------
+%% Internal utils
+%%--------------------------------------------------------------------
+
+cluster_multicall(Fun, Args) ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this cluster
+ case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
+ ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ Nodes.
+
+is_already_monitored(Item) ->
+ {monitors, Monitors} = process_info(self(), monitors),
+ lists:any(fun ({_, Item1}) when Item =:= Item1 -> true;
+ (_) -> false
+ end, Monitors).
+
+legacy_cluster_nodes(Nodes) ->
+ %% We get all the info that we can, including the nodes from mnesia, which
+ %% will be there if the node is a disc node (empty list otherwise)
+ lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
+
+legacy_should_be_disc_node(DiscNodes) ->
+ DiscNodes == [] orelse lists:member(node(), DiscNodes).
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index af940dde97..24762a733f 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -16,7 +16,7 @@
-module(rabbit_parameter_validation).
--export([number/2, binary/2, list/2, proplist/3]).
+-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]).
number(_Name, Term) when is_number(Term) ->
ok;
@@ -30,12 +30,26 @@ binary(_Name, Term) when is_binary(Term) ->
binary(Name, Term) ->
{error, "~s should be binary, actually was ~p", [Name, Term]}.
+boolean(_Name, Term) when is_boolean(Term) ->
+ ok;
+boolean(Name, Term) ->
+ {error, "~s should be boolean, actually was ~p", [Name, Term]}.
+
list(_Name, Term) when is_list(Term) ->
ok;
list(Name, Term) ->
{error, "~s should be list, actually was ~p", [Name, Term]}.
+regex(Name, Term) when is_binary(Term) ->
+ case re:compile(Term) of
+ {ok, _} -> ok;
+ {error, Reason} -> {error, "~s should be regular expression "
+ "but is invalid: ~p", [Name, Reason]}
+ end;
+regex(Name, Term) ->
+ {error, "~s should be a binary but was ~p", [Name, Term]}.
+
proplist(Name, Constraints, Term) when is_list(Term) ->
{Results, Remainder}
= lists:foldl(
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 1551795f7c..69480c9c11 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -22,11 +22,11 @@
-include("rabbit.hrl").
--import(rabbit_misc, [pget/2]).
+-import(rabbit_misc, [pget/2, pget/3]).
-export([register/0]).
-export([name/1, get/2, set/1]).
--export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-rabbit_boot_step({?MODULE,
[{description, "policy parameters"},
@@ -46,12 +46,13 @@ name0(Policy) -> pget(<<"name">>, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
-set0(Name) -> match(Name, list()).
+set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
-get(Name, EntityName = #resource{}) -> get0(Name, match(EntityName, list())).
+get(Name, EntityName = #resource{virtual_host = VHost}) ->
+ get0(Name, match(EntityName, list(VHost))).
get0(_Name, undefined) -> {error, not_found};
get0(Name, List) -> case pget(<<"policy">>, List) of
@@ -64,35 +65,33 @@ get0(Name, List) -> case pget(<<"policy">>, List) of
%%----------------------------------------------------------------------------
-validate(<<"policy">>, Name, Term) ->
+validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
-validate_clear(<<"policy">>, _Name) ->
+validate_clear(_VHost, <<"policy">>, _Name) ->
ok.
-notify(<<"policy">>, _Name, _Term) ->
- update_policies().
+notify(VHost, <<"policy">>, _Name, _Term) ->
+ update_policies(VHost).
-notify_clear(<<"policy">>, _Name) ->
- update_policies().
+notify_clear(VHost, <<"policy">>, _Name) ->
+ update_policies(VHost).
%%----------------------------------------------------------------------------
-list() ->
+list(VHost) ->
[[{<<"name">>, pget(key, P)} | pget(value, P)]
- || P <- rabbit_runtime_parameters:list(<<"policy">>)].
+ || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
-update_policies() ->
- Policies = list(),
+update_policies(VHost) ->
+ Policies = list(VHost),
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
fun() ->
{[update_exchange(X, Policies) ||
- VHost <- rabbit_vhost:list(),
- X <- rabbit_exchange:list(VHost)],
+ X <- rabbit_exchange:list(VHost)],
[update_queue(Q, Policies) ||
- VHost <- rabbit_vhost:list(),
- Q <- rabbit_amqqueue:list(VHost)]}
+ Q <- rabbit_amqqueue:list(VHost)]}
end),
[notify(X) || X <- Xs],
[notify(Q) || Q <- Qs],
@@ -129,28 +128,15 @@ match(Name, Policies) ->
[Policy | _Rest] -> Policy
end.
-matches(#resource{name = Name, virtual_host = VHost}, Policy) ->
- Prefix = pget(<<"prefix">>, Policy),
- case pget(<<"vhost">>, Policy) of
- undefined -> prefix(Prefix, Name);
- VHost -> prefix(Prefix, Name);
- _ -> false
- end.
-
-prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)).
+matches(#resource{name = Name}, Policy) ->
+ match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]).
sort_pred(A, B) ->
- R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)),
- case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of
- {undefined, undefined} -> R;
- {undefined, _} -> true;
- {_, undefined} -> false;
- _ -> R
- end.
+ pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0).
%%----------------------------------------------------------------------------
policy_validation() ->
- [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional},
- {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory},
- {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
+ [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional},
+ {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3ef769c7e6..6d6c648acb 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -400,19 +400,19 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
unsynced_msg_ids = gb_sets:new() }.
-clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
+clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
detect_clean_shutdown(Dir) ->
- case rabbit_file:delete(clean_file_name(Dir)) of
+ case rabbit_file:delete(clean_filename(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_file:read_term_file(clean_file_name(Dir)).
+ rabbit_file:read_term_file(clean_filename(Dir)).
store_clean_shutdown(Terms, Dir) ->
- CleanFileName = clean_file_name(Dir),
+ CleanFileName = clean_filename(Dir),
ok = rabbit_file:ensure_dir(CleanFileName),
rabbit_file:write_term_file(CleanFileName, Terms).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 19dac70c79..aef48b2030 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -184,6 +184,8 @@ socket_op(Sock, Fun) ->
{ok, Res} -> Res;
{error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
[self(), Reason]),
+ %% NB: this is tcp socket, even in case of ssl
+ rabbit_net:fast_close(Sock),
exit(normal)
end.
@@ -236,15 +238,14 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end, "closing AMQP connection ~p (~s):~n~p~n",
[self(), ConnStr, Ex])
after
- %% The reader is the controlling process and hence its
- %% termination will close the socket. Furthermore,
- %% gen_tcp:close/1 waits for pending output to be sent, which
- %% results in unnecessary delays. However, to keep the
- %% file_handle_cache accounting as accurate as possible it
- %% would be good to close the socket immediately if we
- %% can. But we can only do this for non-ssl sockets.
- %%
- rabbit_net:maybe_fast_close(ClientSock),
+ %% We don't call gen_tcp:close/1 here since it waits for
+ %% pending output to be sent, which results in unnecessary
+ %% delays. We could just terminate - the reader is the
+ %% controlling process and hence its termination will close
+ %% the socket. However, to keep the file_handle_cache
+ %% accounting as accurate as possible we ought to close the
+ %% socket w/o delay before termination.
+ rabbit_net:fast_close(ClientSock),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
index c7d3011674..186680496e 100644
--- a/src/rabbit_runtime_parameter.erl
+++ b/src/rabbit_runtime_parameter.erl
@@ -21,10 +21,12 @@
-type(validate_results() ::
'ok' | {error, string(), [term()]} | [validate_results()]).
--callback validate(binary(), binary(), term()) -> validate_results().
--callback validate_clear(binary(), binary()) -> validate_results().
--callback notify(binary(), binary(), term()) -> 'ok'.
--callback notify_clear(binary(), binary()) -> 'ok'.
+-callback validate(rabbit_types:vhost(), binary(), binary(),
+ term()) -> validate_results().
+-callback validate_clear(rabbit_types:vhost(), binary(),
+ binary()) -> validate_results().
+-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'.
+-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'.
-else.
@@ -32,10 +34,10 @@
behaviour_info(callbacks) ->
[
- {validate, 3},
- {validate_clear, 2},
- {notify, 3},
- {notify_clear, 2}
+ {validate, 4},
+ {validate_clear, 3},
+ {notify, 4},
+ {notify_clear, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 3a54e8f621..b58b459a7f 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,8 +18,9 @@
-include("rabbit.hrl").
--export([parse_set/3, set/3, clear/2, list/0, list/1, list_strict/1,
- list_formatted/0, lookup/2, value/2, value/3, info_keys/0]).
+-export([parse_set/4, set/4, clear/3,
+ list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1,
+ lookup/3, value/3, value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -27,16 +28,23 @@
-type(ok_or_error_string() :: 'ok' | {'error_string', string()}).
--spec(parse_set/3 :: (binary(), binary(), string()) -> ok_or_error_string()).
--spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()).
--spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()).
+-spec(parse_set/4 :: (rabbit_types:vhost(), binary(), binary(), string())
+ -> ok_or_error_string()).
+-spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term())
+ -> ok_or_error_string()).
+-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
--spec(list/1 :: (binary()) -> [rabbit_types:infos()]).
+-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
--spec(list_formatted/0 :: () -> [rabbit_types:infos()]).
--spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()).
--spec(value/2 :: (binary(), binary()) -> term()).
--spec(value/3 :: (binary(), binary(), term()) -> term()).
+-spec(list/2 :: (rabbit_types:vhost(), binary()) -> [rabbit_types:infos()]).
+-spec(list_strict/2 :: (rabbit_types:vhost(), binary())
+ -> [rabbit_types:infos()] | 'not_found').
+-spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
+-spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> rabbit_types:infos()).
+-spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()).
+-spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-endif.
@@ -49,14 +57,14 @@
%%---------------------------------------------------------------------------
-parse_set(Component, Key, String) ->
- case parse(String) of
- {ok, Term} -> set(Component, Key, Term);
- {errors, L} -> format_error(L)
+parse_set(VHost, Component, Key, String) ->
+ case rabbit_misc:json_decode(String) of
+ {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON));
+ error -> {error_string, "JSON decoding error"}
end.
-set(Component, Key, Term) ->
- case set0(Component, Key, Term) of
+set(VHost, Component, Key, Term) ->
+ case set0(VHost, Component, Key, Term) of
ok -> ok;
{errors, L} -> format_error(L)
end.
@@ -64,21 +72,16 @@ set(Component, Key, Term) ->
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
-set0(Component, Key, Term) ->
+set0(VHost, Component, Key, Term) ->
case lookup_component(Component) of
{ok, Mod} ->
- case flatten_errors(validate(Term)) of
+ case flatten_errors(Mod:validate(VHost, Component, Key, Term)) of
ok ->
- case flatten_errors(Mod:validate(Component, Key, Term)) of
- ok ->
- case mnesia_update(Component, Key, Term) of
- {old, Term} -> ok;
- _ -> Mod:notify(Component, Key, Term)
- end,
- ok;
- E ->
- E
- end;
+ case mnesia_update(VHost, Component, Key, Term) of
+ {old, Term} -> ok;
+ _ -> Mod:notify(VHost, Component, Key, Term)
+ end,
+ ok;
E ->
E
end;
@@ -86,95 +89,103 @@ set0(Component, Key, Term) ->
E
end.
-mnesia_update(Component, Key, Term) ->
+mnesia_update(VHost, Component, Key, Term) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Res = case mnesia:read(?TABLE, {Component, Key}, read) of
+ Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
[] -> new;
[Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(Component, Key, Term), write),
+ ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write),
Res
end).
-clear(Component, Key) ->
- case clear0(Component, Key) of
+clear(VHost, Component, Key) ->
+ case clear0(VHost, Component, Key) of
ok -> ok;
{errors, L} -> format_error(L)
end.
-clear0(Component, Key) ->
+clear0(VHost, Component, Key) ->
case lookup_component(Component) of
- {ok, Mod} -> case flatten_errors(Mod:validate_clear(Component, Key)) of
- ok -> mnesia_clear(Component, Key),
- Mod:notify_clear(Component, Key),
+ {ok, Mod} -> case flatten_errors(
+ Mod:validate_clear(VHost, Component, Key)) of
+ ok -> mnesia_clear(VHost, Component, Key),
+ Mod:notify_clear(VHost, Component, Key),
ok;
E -> E
end;
E -> E
end.
-mnesia_clear(Component, Key) ->
+mnesia_clear(VHost, Component, Key) ->
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
- ok = mnesia:delete(?TABLE, {Component, Key}, write)
+ ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write)
end).
list() ->
[p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
-list(Component) -> list(Component, []).
-list_strict(Component) -> list(Component, not_found).
-
-list(Component, Default) ->
- case lookup_component(Component) of
- {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'},
- [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
- _ -> Default
+list(VHost) -> list(VHost, '_', []).
+list_strict(Component) -> list('_', Component, not_found).
+list(VHost, Component) -> list(VHost, Component, []).
+list_strict(VHost, Component) -> list(VHost, Component, not_found).
+
+list(VHost, Component, Default) ->
+ case component_good(Component) of
+ true -> Match = #runtime_parameters{key = {VHost, Component, '_'},
+ _ = '_'},
+ [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
+ _ -> Default
end.
-list_formatted() ->
- [pset(value, format(pget(value, P)), P) || P <- list()].
+list_formatted(VHost) ->
+ [pset(value, format(pget(value, P)), P) || P <- list(VHost)].
-lookup(Component, Key) ->
- case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+lookup(VHost, Component, Key) ->
+ case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(Component, Key) ->
- case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+value(VHost, Component, Key) ->
+ case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(Component, Key, Default) ->
- Params = lookup0(Component, Key,
- fun () -> lookup_missing(Component, Key, Default) end),
+value(VHost, Component, Key, Default) ->
+ Params = lookup0(VHost, Component, Key,
+ fun () ->
+ lookup_missing(VHost, Component, Key, Default)
+ end),
Params#runtime_parameters.value.
-lookup0(Component, Key, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {Component, Key}) of
+lookup0(VHost, Component, Key, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(Component, Key, Default) ->
+lookup_missing(VHost, Component, Key, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {Component, Key}, read) of
- [] -> Record = c(Component, Key, Default),
+ case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
+ [] -> Record = c(VHost, Component, Key, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(Component, Key, Default) -> #runtime_parameters{key = {Component, Key},
- value = Default}.
+c(VHost, Component, Key, Default) ->
+ #runtime_parameters{key = {VHost, Component, Key},
+ value = Default}.
-p(#runtime_parameters{key = {Component, Key}, value = Value}) ->
- [{component, Component},
+p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) ->
+ [{vhost, VHost},
+ {component, Component},
{key, Key},
{value, Value}].
@@ -182,6 +193,12 @@ info_keys() -> [component, key, value].
%%---------------------------------------------------------------------------
+component_good('_') -> true;
+component_good(Component) -> case lookup_component(Component) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
lookup_component(Component) ->
case rabbit_registry:lookup_module(
runtime_parameter, list_to_atom(binary_to_list(Component))) of
@@ -190,51 +207,9 @@ lookup_component(Component) ->
{ok, Module} -> {ok, Module}
end.
-parse(Src0) ->
- Src1 = string:strip(Src0),
- Src = case lists:reverse(Src1) of
- [$. |_] -> Src1;
- _ -> Src1 ++ "."
- end,
- case erl_scan:string(Src) of
- {ok, Scanned, _} ->
- case erl_parse:parse_term(Scanned) of
- {ok, Parsed} ->
- {ok, Parsed};
- {error, E} ->
- {errors,
- [{"Could not parse value: ~s", [format_parse_error(E)]}]}
- end;
- {error, E, _} ->
- {errors, [{"Could not scan value: ~s", [format_parse_error(E)]}]}
- end.
-
-format_parse_error({_Line, Mod, Err}) ->
- lists:flatten(Mod:format_error(Err)).
-
format(Term) ->
- list_to_binary(rabbit_misc:format("~p", [Term])).
-
-%%---------------------------------------------------------------------------
-
-%% We will want to be able to biject these to JSON. So we have some
-%% generic restrictions on what we consider acceptable.
-validate(Proplist = [T | _]) when is_tuple(T) -> validate_proplist(Proplist);
-validate(L) when is_list(L) -> validate_list(L);
-validate(T) when is_tuple(T) -> {error, "tuple: ~p", [T]};
-validate(B) when is_boolean(B) -> ok;
-validate(null) -> ok;
-validate(A) when is_atom(A) -> {error, "atom: ~p", [A]};
-validate(N) when is_number(N) -> ok;
-validate(B) when is_binary(B) -> ok;
-validate(B) when is_bitstring(B) -> {error, "bitstring: ~p", [B]}.
-
-validate_list(L) -> [validate(I) || I <- L].
-validate_proplist(L) -> [vp(I) || I <- L].
-
-vp({K, V}) when is_binary(K) -> validate(V);
-vp({K, _V}) -> {error, "bad key: ~p", [K]};
-vp(H) -> {error, "not two tuple: ~p", [H]}.
+ {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
+ list_to_binary(JSON).
flatten_errors(L) ->
case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index f23b322722..5224ccaa36 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -17,7 +17,7 @@
-module(rabbit_runtime_parameters_test).
-behaviour(rabbit_runtime_parameter).
--export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
register() ->
@@ -26,13 +26,13 @@ register() ->
unregister() ->
rabbit_registry:unregister(runtime_parameter, <<"test">>).
-validate(<<"test">>, <<"good">>, _Term) -> ok;
-validate(<<"test">>, <<"maybe">>, <<"good">>) -> ok;
-validate(<<"test">>, _, _) -> {error, "meh", []}.
+validate(_, <<"test">>, <<"good">>, _Term) -> ok;
+validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok;
+validate(_, <<"test">>, _, _) -> {error, "meh", []}.
-validate_clear(<<"test">>, <<"good">>) -> ok;
-validate_clear(<<"test">>, <<"maybe">>) -> ok;
-validate_clear(<<"test">>, _) -> {error, "meh", []}.
+validate_clear(_, <<"test">>, <<"good">>) -> ok;
+validate_clear(_, <<"test">>, <<"maybe">>) -> ok;
+validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
-notify(_, _, _) -> ok.
-notify_clear(_, _) -> ok.
+notify(_, _, _, _) -> ok.
+notify_clear(_, _, _) -> ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bb60bd125e..3cc0e5db28 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -32,6 +32,8 @@
-define(TIMEOUT, 5000).
all_tests() ->
+ ok = setup_cluster(),
+ ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
@@ -52,36 +54,61 @@ all_tests() ->
passed = test_log_management_during_startup(),
passed = test_statistics(),
passed = test_arguments_parser(),
- passed = test_cluster_management(),
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_server_status(),
passed = test_confirms(),
- passed = maybe_run_cluster_dependent_tests(),
+ passed =
+ do_if_secondary_node(
+ fun run_cluster_dependent_tests/1,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode]),
+ passed
+ end),
passed = test_configurable_server_properties(),
passed.
-maybe_run_cluster_dependent_tests() ->
+do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
- pong -> passed = run_cluster_dependent_tests(SecondaryNode);
- pang -> io:format("Skipping cluster dependent tests with node ~p~n",
- [SecondaryNode])
- end,
- passed.
+ pong -> Up(SecondaryNode);
+ pang -> Down(SecondaryNode)
+ end.
-run_cluster_dependent_tests(SecondaryNode) ->
- SecondaryNodeS = atom_to_list(SecondaryNode),
+setup_cluster() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ cover:stop(SecondaryNode),
+ ok = control_action(stop_app, []),
+ %% 'cover' does not cope at all well with nodes disconnecting,
+ %% which happens as part of reset. So we turn it off
+ %% temporarily. That is ok even if we're not in general using
+ %% cover, it just turns the engine on / off and doesn't log
+ %% anything. Note that this way cover won't be on when joining
+ %% the cluster, but this is OK since we're testing the clustering
+ %% interface elsewere anyway.
+ cover:stop(nodes()),
+ ok = control_action(join_cluster,
+ [atom_to_list(SecondaryNode)]),
+ cover:start(nodes()),
+ ok = control_action(start_app, []),
+ ok = control_action(start_app, SecondaryNode, [], [])
+ end,
+ fun (_) -> ok end).
- cover:stop(SecondaryNode),
- ok = control_action(stop_app, []),
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- cover:start(SecondaryNode),
- ok = control_action(start_app, SecondaryNode, [], []),
+maybe_run_cluster_dependent_tests() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ passed = run_cluster_dependent_tests(SecondaryNode)
+ end,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode])
+ end).
+run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
@@ -856,200 +883,6 @@ test_arguments_parser() ->
passed.
-test_cluster_management() ->
- %% 'cluster' and 'reset' should only work if the app is stopped
- {error, _} = control_action(cluster, []),
- {error, _} = control_action(reset, []),
- {error, _} = control_action(force_reset, []),
-
- ok = control_action(stop_app, []),
-
- %% various ways of creating a standalone node
- NodeS = atom_to_list(node()),
- ClusteringSequence = [[],
- [NodeS],
- ["invalid@invalid", NodeS],
- [NodeS, "invalid@invalid"]],
-
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
-
- %% convert a disk node into a ram node
- ok = control_action(reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% join a non-existing cluster as a ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- ok = control_action(reset, []),
-
- SecondaryNode = rabbit_nodes:make("hare"),
- case net_adm:ping(SecondaryNode) of
- pong -> passed = test_cluster_management2(SecondaryNode);
- pang -> io:format("Skipping clustering tests with node ~p~n",
- [SecondaryNode])
- end,
-
- ok = control_action(start_app, []),
- passed.
-
-test_cluster_management2(SecondaryNode) ->
- NodeS = atom_to_list(node()),
- SecondaryNodeS = atom_to_list(SecondaryNode),
-
- %% make a disk node
- ok = control_action(cluster, [NodeS]),
- ok = assert_disc_node(),
- %% make a ram node
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = assert_ram_node(),
-
- %% join cluster as a ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% ram node will not start by itself
- ok = control_action(stop_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- {error, _} = control_action(start_app, []),
- ok = control_action(start_app, SecondaryNode, [], []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% change cluster config while remaining in same cluster
- ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% join non-existing cluster as a ram node
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- {error, _} = control_action(start_app, []),
- ok = assert_ram_node(),
-
- %% join empty cluster as a ram node (converts to disc)
- ok = control_action(cluster, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% make a new ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% turn ram node into disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% convert a disk node into a ram node
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% make a new disk node
- ok = control_action(force_reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% turn a disk node into a ram node
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% NB: this will log an inconsistent_database error, which is harmless
- %% Turning cover on / off is OK even if we're not in general using cover,
- %% it just turns the engine on / off, doesn't actually log anything.
- cover:stop([SecondaryNode]),
- true = disconnect_node(SecondaryNode),
- pong = net_adm:ping(SecondaryNode),
- cover:start([SecondaryNode]),
-
- %% leaving a cluster as a ram node
- ok = control_action(reset, []),
- %% ...and as a disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- cover:stop(SecondaryNode),
- ok = control_action(reset, []),
- cover:start(SecondaryNode),
-
- %% attempt to leave cluster when no other node is alive
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- ok = control_action(stop_app, []),
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(reset, []),
-
- %% attempt to change type when no other node is alive
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(cluster, [SecondaryNodeS]),
-
- %% leave system clustered, with the secondary node as a ram node
- ok = control_action(force_reset, []),
- ok = control_action(start_app, []),
- %% Yes, this is rather ugly. But since we're a clustered Mnesia
- %% node and we're telling another clustered node to reset itself,
- %% we will get disconnected half way through causing a
- %% badrpc. This never happens in real life since rabbitmqctl is
- %% not a clustered Mnesia node.
- cover:stop(SecondaryNode),
- {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []),
- pong = net_adm:ping(SecondaryNode),
- cover:start(SecondaryNode),
- ok = control_action(cluster, SecondaryNode, [NodeS], []),
- ok = control_action(start_app, SecondaryNode, [], []),
-
- passed.
-
test_user_management() ->
%% lots if stuff that should fail
@@ -1135,22 +968,21 @@ test_runtime_parameters() ->
Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end,
%% Acceptable for bijection
- Good(["test", "good", "<<\"ignore\">>"]),
+ Good(["test", "good", "\"ignore\""]),
Good(["test", "good", "123"]),
Good(["test", "good", "true"]),
Good(["test", "good", "false"]),
Good(["test", "good", "null"]),
- Good(["test", "good", "[{<<\"key\">>, <<\"value\">>}]"]),
+ Good(["test", "good", "{\"key\": \"value\"}"]),
- %% Various forms of fail due to non-bijectability
+ %% Invalid json
Bad(["test", "good", "atom"]),
- Bad(["test", "good", "{tuple, foo}"]),
- Bad(["test", "good", "[{<<\"key\">>, <<\"value\">>, 1}]"]),
- Bad(["test", "good", "[{key, <<\"value\">>}]"]),
+ Bad(["test", "good", "{\"foo\": \"bar\""]),
+ Bad(["test", "good", "{foo: \"bar\"}"]),
%% Test actual validation hook
- Good(["test", "maybe", "<<\"good\">>"]),
- Bad(["test", "maybe", "<<\"bad\">>"]),
+ Good(["test", "maybe", "\"good\""]),
+ Bad(["test", "maybe", "\"bad\""]),
ok = control_action(list_parameters, []),
@@ -1216,7 +1048,15 @@ test_server_status() ->
ok = control_action(list_consumers, []),
%% set vm memory high watermark
+ HWM = vm_memory_monitor:get_vm_memory_high_watermark(),
+ ok = control_action(set_vm_memory_high_watermark, ["1"]),
ok = control_action(set_vm_memory_high_watermark, ["1.0"]),
+ ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
+
+ %% eval
+ {parse_error, _} = control_action(eval, ["\""]),
+ {parse_error, _} = control_action(eval, ["a("]),
+ ok = control_action(eval, ["a."]),
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
@@ -2447,10 +2287,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- {undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
+ {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2467,11 +2307,11 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {undefined, VQ6} =
+ {_, undefined, VQ6} =
rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
VQ6.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 732c29b6b6..8966bcabcf 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -64,7 +64,7 @@
#basic_message{exchange_name :: rabbit_exchange:name(),
routing_keys :: [rabbit_router:routing_key()],
content :: content(),
- id :: msg_id(),
+ id :: msg_id(),
is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index e1a7bcaea8..3fbfeed0a8 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -121,10 +121,7 @@ remove_backup() ->
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
- %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point
- %% if we are a RAM node since Mnesia has not started yet.
- AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++
- rabbit_mnesia:read_cluster_nodes_config()),
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
@@ -150,12 +147,12 @@ maybe_upgrade_mnesia() ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:read_previously_running_nodes(),
+ AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()],
case {is_disc_node_legacy(), AfterUs} of
{true, []} ->
primary;
{true, _} ->
- Filename = rabbit_mnesia:running_nodes_filename(),
+ Filename = rabbit_node_monitor:running_nodes_filename(),
die("Cluster upgrade needed but other disc nodes shut "
"down after this one.~nPlease first start the last "
"disc node to shut down.~n~nNote: if several disc "
@@ -222,15 +219,8 @@ secondary_upgrade(AllNodes) ->
IsDiscNode = is_disc_node_legacy(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
- %% Note that we cluster with all nodes, rather than all disc nodes
- %% (as we can't know all disc nodes at this point). This is safe as
- %% we're not writing the cluster config, just setting up Mnesia.
- ClusterNodes = case IsDiscNode of
- true -> AllNodes;
- false -> AllNodes -- [node()]
- end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end),
+ ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true),
ok = rabbit_version:record_desired_for_scope(mnesia),
ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 49213c9552..98c4571736 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,8 +19,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
- timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ depth/1, set_ram_duration_target/2, ram_duration/1,
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
- (S) -> {undefined, S}
+ End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
+ (Next, S) -> {Next, undefined, S}
end,
case queue_out(State) of
{empty, State1} ->
- End(a(State1));
+ End(undefined, a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
@@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- End(a(in_r(MsgStatus, State1)))
+ End(MsgProps, a(in_r(MsgStatus, State1)))
end
end.
@@ -681,6 +681,9 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
+depth(State = #vqstate { pending_ack = Ack }) ->
+ len(State) + gb_trees:size(Ack).
+
set_ram_duration_target(
DurationTarget, State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 5548ef6d05..03dfbe245d 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -90,12 +90,13 @@ delete(VHostPath) ->
R.
internal_delete(VHostPath) ->
- lists:foreach(
- fun (Info) ->
- ok = rabbit_auth_backend_internal:clear_permissions(
- proplists:get_value(user, Info), VHostPath)
- end,
- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)),
+ [ok = rabbit_auth_backend_internal:clear_permissions(
+ proplists:get_value(user, Info), VHostPath)
+ || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
+ [ok = rabbit_runtime_parameters:clear(VHostPath,
+ proplists:get_value(component, Info),
+ proplists:get_value(key, Info))
+ || Info <- rabbit_runtime_parameters:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 3d3623d752..5af38573fc 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -255,10 +255,10 @@ behaviour_info(_Other) ->
%%% ---------------------------------------------------
start_link(Mod, Args) ->
gen_server:start_link(?MODULE, {self, Mod, Args}, []).
-
+
start_link(SupName, Mod, Args) ->
gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
-
+
%%% ---------------------------------------------------
%%% Interface functions.
%%% ---------------------------------------------------
@@ -298,9 +298,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
check_childspecs(X) -> {error, {badarg, X}}.
%%% ---------------------------------------------------
-%%%
+%%%
%%% Initialize the supervisor.
-%%%
+%%%
%%% ---------------------------------------------------
init({SupName, Mod, Args}) ->
process_flag(trap_exit, true),
@@ -319,7 +319,7 @@ init({SupName, Mod, Args}) ->
Error ->
{stop, {bad_return, {Mod, init, Error}}}
end.
-
+
init_children(State, StartSpec) ->
SupName = State#state.name,
case check_startspec(StartSpec) of
@@ -349,7 +349,7 @@ init_dynamic(_State, StartSpec) ->
%% Func: start_children/2
%% Args: Children = [#child] in start order
%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
-%% Purpose: Start all children. The new list contains #child's
+%% Purpose: Start all children. The new list contains #child's
%% with pids.
%% Returns: {ok, NChildren} | {error, NChildren}
%% NChildren = [#child] in termination order (reversed
@@ -381,7 +381,7 @@ do_start_child(SupName, Child) ->
NChild = Child#child{pid = Pid},
report_progress(NChild, SupName),
{ok, Pid, Extra};
- ignore ->
+ ignore ->
{ok, undefined};
{error, What} -> {error, What};
What -> {error, What}
@@ -400,12 +400,12 @@ do_start_child_i(M, F, A) ->
What ->
{error, What}
end.
-
+
%%% ---------------------------------------------------
-%%%
+%%%
%%% Callback functions.
-%%%
+%%%
%%% ---------------------------------------------------
handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
#child{mfa = {M, F, A}} = hd(State#state.children),
@@ -414,11 +414,11 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
{ok, undefined} ->
{reply, {ok, undefined}, State};
{ok, Pid} ->
- NState = State#state{dynamics =
+ NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
- NState = State#state{dynamics =
+ NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid, Extra}, NState};
What ->
@@ -497,7 +497,7 @@ handle_call(which_children, _From, State) ->
%%% Hopefully cause a function-clause as there is no API function
%%% that utilizes cast.
handle_cast(null, State) ->
- error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
+ error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
[]),
{noreply, State}.
@@ -527,7 +527,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
end;
handle_info(Msg, State) ->
- error_logger:error_msg("Supervisor received unexpected message: ~p~n",
+ error_logger:error_msg("Supervisor received unexpected message: ~p~n",
[Msg]),
{noreply, State}.
%%
@@ -577,13 +577,13 @@ check_flags({Strategy, MaxIntensity, Period}) ->
check_flags(What) ->
{bad_flags, What}.
-update_childspec(State, StartSpec) when ?is_simple(State) ->
- case check_startspec(StartSpec) of
- {ok, [Child]} ->
- {ok, State#state{children = [Child]}};
- Error ->
- {error, Error}
- end;
+update_childspec(State, StartSpec) when ?is_simple(State) ->
+ case check_startspec(StartSpec) of
+ {ok, [Child]} ->
+ {ok, State#state{children = [Child]}};
+ Error ->
+ {error, Error}
+ end;
update_childspec(State, StartSpec) ->
case check_startspec(StartSpec) of
@@ -604,7 +604,7 @@ update_childspec1([Child|OldC], Children, KeepOld) ->
end;
update_childspec1([], Children, KeepOld) ->
% Return them in (keeped) reverse start order.
- lists:reverse(Children ++ KeepOld).
+ lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
@@ -618,7 +618,7 @@ update_chsp(OldCh, Children) ->
NewC ->
{ok, NewC}
end.
-
+
%%% ---------------------------------------------------
%%% Start a new child.
%%% ---------------------------------------------------
@@ -630,12 +630,12 @@ handle_start_child(Child, State) ->
{ok, Pid} ->
Children = State#state.children,
{{ok, Pid},
- State#state{children =
+ State#state{children =
[Child#child{pid = Pid}|Children]}};
{ok, Pid, Extra} ->
Children = State#state.children,
{{ok, Pid, Extra},
- State#state{children =
+ State#state{children =
[Child#child{pid = Pid}|Children]}};
{error, What} ->
{{error, {What, Child}}, State}
@@ -816,29 +816,32 @@ terminate_simple_children(Child, Dynamics, SupName) ->
{Replies, Timedout} =
lists:foldl(
fun (_Pid, {Replies, Timedout}) ->
- {Reply, Timedout1} =
+ {Pid1, Reason1, Timedout1} =
receive
TimeoutMsg ->
Remaining = Pids -- [P || {P, _} <- Replies],
[exit(P, kill) || P <- Remaining],
- receive {'DOWN', _MRef, process, Pid, Reason} ->
- {{error, Reason}, true}
+ receive
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ {Pid, Reason, true}
end;
{'DOWN', _MRef, process, Pid, Reason} ->
- {child_res(Child, Reason, Timedout), Timedout};
- {'EXIT', Pid, Reason} ->
- receive {'DOWN', _MRef, process, Pid, _} ->
- {{error, Reason}, Timedout}
- end
+ {Pid, Reason, Timedout}
end,
- {[{Pid, Reply} | Replies], Timedout1}
+ {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies],
+ Timedout1}
end, {[], false}, Pids),
timeout_stop(Child, TRef, TimeoutMsg, Timedout),
ReportError = shutdown_error_reporter(SupName),
- [case Reply of
- {_Pid, ok} -> ok;
- {Pid, {error, R}} -> ReportError(R, Child#child{pid = Pid})
- end || Reply <- Replies],
+ Report = fun(_, ok) -> ok;
+ (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid})
+ end,
+ [receive
+ {'EXIT', Pid, Reason} ->
+ Report(Pid, child_res(Child, Reason, Timedout))
+ after
+ 0 -> Report(Pid, Reply)
+ end || {Pid, Reply} <- Replies],
ok.
child_exit_reason(#child{shutdown = brutal_kill}) -> kill;
@@ -863,7 +866,7 @@ timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) ->
after
0 -> ok
end;
-timeout_stop(#child{}, ok, _Msg, _Timedout) ->
+timeout_stop(#child{}, _TRef, _Msg, _Timedout) ->
ok.
do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
@@ -885,17 +888,17 @@ do_terminate(Child, _SupName) ->
Child.
%%-----------------------------------------------------------------
-%% Shutdowns a child. We must check the EXIT value
+%% Shutdowns a child. We must check the EXIT value
%% of the child, because it might have died with another reason than
-%% the wanted. In that case we want to report the error. We put a
-%% monitor on the child an check for the 'DOWN' message instead of
-%% checking for the 'EXIT' message, because if we check the 'EXIT'
-%% message a "naughty" child, who does unlink(Sup), could hang the
-%% supervisor.
+%% the wanted. In that case we want to report the error. We put a
+%% monitor on the child an check for the 'DOWN' message instead of
+%% checking for the 'EXIT' message, because if we check the 'EXIT'
+%% message a "naughty" child, who does unlink(Sup), could hang the
+%% supervisor.
%% Returns: ok | {error, OtherReason} (this should be reported)
%%-----------------------------------------------------------------
shutdown(Pid, brutal_kill) ->
-
+
case monitor_child(Pid) of
ok ->
exit(Pid, kill),
@@ -905,16 +908,16 @@ shutdown(Pid, brutal_kill) ->
{'DOWN', _MRef, process, Pid, OtherReason} ->
{error, OtherReason}
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end;
shutdown(Pid, Time) ->
-
+
case monitor_child(Pid) of
ok ->
exit(Pid, shutdown), %% Try to shutdown gracefully
- receive
+ receive
{'DOWN', _MRef, process, Pid, shutdown} ->
ok;
{'DOWN', _MRef, process, Pid, OtherReason} ->
@@ -926,14 +929,14 @@ shutdown(Pid, Time) ->
{error, OtherReason}
end
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end.
%% Help function to shutdown/2 switches from link to monitor approach
monitor_child(Pid) ->
-
- %% Do the monitor operation first so that if the child dies
+
+ %% Do the monitor operation first so that if the child dies
%% before the monitoring is done causing a 'DOWN'-message with
%% reason noproc, we will get the real reason in the 'EXIT'-message
%% unless a naughty child has already done unlink...
@@ -943,22 +946,22 @@ monitor_child(Pid) ->
receive
%% If the child dies before the unlik we must empty
%% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
- {'EXIT', Pid, Reason} ->
- receive
+ {'EXIT', Pid, Reason} ->
+ receive
{'DOWN', _, process, Pid, _} ->
{error, Reason}
end
- after 0 ->
+ after 0 ->
%% If a naughty child did unlink and the child dies before
- %% monitor the result will be that shutdown/2 receives a
+ %% monitor the result will be that shutdown/2 receives a
%% 'DOWN'-message with reason noproc.
%% If the child should die after the unlink there
%% will be a 'DOWN'-message with a correct reason
- %% that will be handled in shutdown/2.
- ok
+ %% that will be handled in shutdown/2.
+ ok
end.
-
-
+
+
%%-----------------------------------------------------------------
%% Child/State manipulating functions.
%%-----------------------------------------------------------------
@@ -1012,7 +1015,7 @@ remove_child(Child, State) ->
%% Args: SupName = {local, atom()} | {global, atom()} | self
%% Type = {Strategy, MaxIntensity, Period}
%% Strategy = one_for_one | one_for_all | simple_one_for_one |
-%% rest_for_one
+%% rest_for_one
%% MaxIntensity = integer()
%% Period = integer()
%% Mod :== atom()
@@ -1107,10 +1110,10 @@ validChildType(supervisor) -> true;
validChildType(worker) -> true;
validChildType(What) -> throw({invalid_child_type, What}).
-validName(_Name) -> true.
+validName(_Name) -> true.
-validFunc({M, F, A}) when is_atom(M),
- is_atom(F),
+validFunc({M, F, A}) when is_atom(M),
+ is_atom(F),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
@@ -1128,7 +1131,7 @@ validDelay(Delay) when is_number(Delay),
Delay >= 0 -> true;
validDelay(What) -> throw({invalid_delay, What}).
-validShutdown(Shutdown, _)
+validShutdown(Shutdown, _)
when is_integer(Shutdown), Shutdown > 0 -> true;
validShutdown(infinity, supervisor) -> true;
validShutdown(brutal_kill, _) -> true;
@@ -1154,7 +1157,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}).
%%% Returns: {ok, State'} | {terminate, State'}
%%% ------------------------------------------------------
-add_restart(State) ->
+add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
new file mode 100644
index 0000000000..e42ded7b53
--- /dev/null
+++ b/src/supervisor2_tests.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(supervisor2_tests).
+-behaviour(supervisor2).
+
+-export([test_all/0, start_link/0]).
+-export([init/1]).
+
+test_all() ->
+ ok = check_shutdown(stop, 200, 200, 2000),
+ ok = check_shutdown(ignored, 1, 2, 2000).
+
+check_shutdown(SigStop, Iterations, ChildCount, SupTimeout) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, [SupTimeout]),
+ Res = lists:foldl(
+ fun (I, ok) ->
+ TestSupPid = erlang:whereis(?MODULE),
+ ChildPids =
+ [begin
+ {ok, ChildPid} =
+ supervisor2:start_child(TestSupPid, []),
+ ChildPid
+ end || _ <- lists:seq(1, ChildCount)],
+ MRef = erlang:monitor(process, TestSupPid),
+ [P ! SigStop || P <- ChildPids],
+ ok = supervisor2:terminate_child(Sup, test_sup),
+ {ok, _} = supervisor2:restart_child(Sup, test_sup),
+ receive
+ {'DOWN', MRef, process, TestSupPid, shutdown} ->
+ ok;
+ {'DOWN', MRef, process, TestSupPid, Reason} ->
+ {error, {I, Reason}}
+ end;
+ (_, R) ->
+ R
+ end, ok, lists:seq(1, Iterations)),
+ unlink(Sup),
+ exit(Sup, shutdown),
+ Res.
+
+start_link() ->
+ Pid = spawn_link(fun () ->
+ process_flag(trap_exit, true),
+ receive stop -> ok end
+ end),
+ {ok, Pid}.
+
+init([Timeout]) ->
+ {ok, {{one_for_one, 0, 1},
+ [{test_sup, {supervisor2, start_link,
+ [{local, ?MODULE}, ?MODULE, []]},
+ transient, Timeout, supervisor, [?MODULE]}]}};
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{test_worker, {?MODULE, start_link, []},
+ temporary, 1000, worker, [?MODULE]}]}}.