summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl18
-rw-r--r--src/delegate_sup.erl26
-rw-r--r--src/gen_server2.erl25
-rw-r--r--src/rabbit.erl12
-rw-r--r--src/rabbit_amqqueue.erl28
-rw-r--r--src/rabbit_amqqueue_process.erl39
-rw-r--r--src/rabbit_control.erl26
-rw-r--r--src/rabbit_direct.erl9
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_networking.erl16
-rw-r--r--src/rabbit_reader.erl39
-rw-r--r--src/rabbit_tests.erl49
13 files changed, 183 insertions, 117 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 46bd8245b3..17046201ad 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/1]).
+-export([start_link/1, invoke_no_result/2, invoke/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,8 +36,6 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/1 :: ([node()]) -> non_neg_integer()).
-
-endif.
%%----------------------------------------------------------------------------
@@ -111,22 +109,14 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count([RemoteNode | _]) ->
- {ok, Count} = case application:get_env(rabbit, delegate_count) of
- undefined -> rpc:call(RemoteNode, application, get_env,
- [rabbit, delegate_count]);
- Result -> Result
- end,
- Count.
-
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
delegate(RemoteNodes) ->
case get(delegate) of
- undefined -> Name =
- delegate_name(erlang:phash2(
- self(), delegate_count(RemoteNodes))),
+ undefined -> Name = delegate_name(
+ erlang:phash2(self(),
+ delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index e0ffa7c8c1..fc693c7d3d 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/1, count/1]).
-export([init/1]).
@@ -28,20 +28,32 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(count/1 :: ([node()]) -> integer()).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link(Count) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
+
+count([]) ->
+ 1;
+count([Node | Nodes]) ->
+ try
+ length(supervisor:which_children({?SERVER, Node}))
+ catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
+ count(Nodes);
+ exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown;
+ R =:= nodedown ->
+ count(Nodes)
+ end.
%%----------------------------------------------------------------------------
-init(_Args) ->
- DCount = delegate:delegate_count([node()]),
+init([Count]) ->
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||
- Num <- lists:seq(0, DCount - 1)]}}.
+ Num <- lists:seq(0, Count - 1)]}}.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index a637ddddc8..94296f9751 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -58,6 +58,15 @@
%% hibernate the process immediately, as it would if backoff wasn't
%% being used. Instead it'll wait for the current timeout as described
%% above.
+%%
+%% 7) The callback module can return from any of the handle_*
+%% functions, a {become, Module, State} triple, or a {become, Module,
+%% State, Timeout} quadruple. This allows the gen_server to
+%% dynamically change the callback module. The State is the new state
+%% which will be passed into any of the callback functions in the new
+%% module. Note there is no form also encompassing a reply, thus if
+%% you wish to reply in handle_call/3 and change the callback module,
+%% you need to use gen_server2:reply/2 to issue the reply manually.
%% All modifications are (C) 2009-2011 VMware, Inc.
@@ -880,6 +889,22 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
loop(GS2State #gs2_state { state = NState,
time = Time1,
debug = Debug1 });
+ {become, Mod, NState} ->
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
+ {become, Mod, NState}),
+ loop(find_prioritisers(
+ GS2State #gs2_state { mod = Mod,
+ state = NState,
+ time = infinity,
+ debug = Debug1 }));
+ {become, Mod, NState, Time1} ->
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
+ {become, Mod, NState}),
+ loop(find_prioritisers(
+ GS2State #gs2_state { mod = Mod,
+ state = NState,
+ time = Time1,
+ debug = Debug1 }));
_ ->
handle_common_termination(Reply, Msg, GS2State)
end.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 5ff96f904c..faf484af9c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,7 +27,7 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0]).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
@@ -101,8 +101,7 @@
-rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"},
- {mfa, {rabbit_sup, start_child,
- [delegate_sup]}},
+ {mfa, {rabbit, boot_delegate, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).
@@ -184,6 +183,9 @@
{running_nodes, [node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
+-spec(maybe_insert_default_data/0 :: () -> 'ok').
+-spec(boot_delegate/0 :: () -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -454,6 +456,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end
end.
+boot_delegate() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ rabbit_sup:start_child(delegate_sup, [Count]).
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2545b07c14..1c89539fc0 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:const(not_found)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
- case is_process_alive(QPid) of
+ case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName),
fun (Tx) -> TailFun(Tx), ExistingQ end
@@ -300,29 +300,19 @@ check_declare_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_expires_argument/1},
- {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
+ [{<<"x-expires">>, fun check_integer_argument/1},
+ {<<"x-message-ttl">>, fun check_integer_argument/1}]],
ok.
-check_expires_argument(Val) ->
- check_integer_argument(Val,
- expires_not_of_acceptable_type,
- expires_zero_or_less).
-
-check_message_ttl_argument(Val) ->
- check_integer_argument(Val,
- ttl_not_of_acceptable_type,
- ttl_zero_or_less).
-
-check_integer_argument(undefined, _, _) ->
+check_integer_argument(undefined) ->
ok;
-check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 ->
+check_integer_argument({Type, Val}) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
- false -> {error, {InvalidTypeError, Type, Val}}
+ false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, _Val}, _, ZeroOrLessError) ->
- {error, ZeroOrLessError}.
+check_integer_argument({_Type, Val}) ->
+ {error, {value_zero_or_less, Val}}.
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -422,7 +412,7 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- delegate_cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c7e28fe9e..e794b4aa1e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -21,7 +21,7 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES,
@@ -122,6 +122,8 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
+ rabbit_event:notify(
+ queue_deleted, [{pid, self()}]),
BQS1 = BQ:delete_and_terminate(BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
@@ -186,7 +188,6 @@ terminate_shutdown(Fun, State) ->
end, BQS, all_ch_record()),
[emit_consumer_deleted(Ch, CTag)
|| {Ch, CTag, _} <- consumers(State1)],
- rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -657,13 +658,13 @@ message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000).
+calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- Now = now_millis(),
+ Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) ->
Now > Expiry
@@ -684,7 +685,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-now_millis() -> timer:now_diff(now(), {0,0,0}).
+now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -790,20 +791,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
- end,
- BQS = BQ:init(QName, IsDurable, Recover),
- %% Rely on terminate to delete the queue.
- {stop, normal, State#q{backing_queue_state = BQS}}
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ false -> #q{backing_queue = BQ, backing_queue_state = undefined,
+ q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ gen_server2:reply(From, not_found),
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index f0b4ced1a2..746bb66eb5 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -45,22 +45,18 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
- FullCommand = init:get_plain_arguments(),
- case FullCommand of
- [] -> usage();
- _ -> ok
- end,
{[Command0 | Args], Opts} =
- rabbit_misc:get_options(
- [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
- {option, ?VHOST_OPT, "/"}],
- FullCommand),
- Opts1 = lists:map(fun({K, V}) ->
- case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
- _ -> {K, V}
- end
- end, Opts),
+ case rabbit_misc:get_options([{flag, ?QUIET_OPT},
+ {option, ?NODE_OPT, NodeStr},
+ {option, ?VHOST_OPT, "/"}],
+ init:get_plain_arguments()) of
+ {[], _Opts} -> usage();
+ CmdArgsAndOpts -> CmdArgsAndOpts
+ end,
+ Opts1 = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts],
Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 3b8c9fba39..bd41a8b997 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/3, start_channel/5]).
+-export([boot/0, connect/4, start_channel/5]).
-include("rabbit.hrl").
@@ -25,7 +25,7 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(connect/3 :: (binary(), binary(), binary()) ->
+-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
@@ -49,13 +49,14 @@ boot() ->
%%----------------------------------------------------------------------------
-connect(Username, Password, VHost) ->
+connect(Username, Password, VHost, Protocol) ->
case lists:keymember(rabbit, 1, application:which_applications()) of
true ->
try rabbit_access_control:user_pass_login(Username, Password) of
#user{} = User ->
try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> {ok, {User, rabbit_reader:server_properties()}}
+ ok -> {ok, {User,
+ rabbit_reader:server_properties(Protocol)}}
catch
exit:#amqp_error{name = access_refused} ->
{error, access_refused}
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index e36b1dd12c..abc27c5f7d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([lock_file/1]).
-export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]).
+-export([is_process_alive/1]).
%%----------------------------------------------------------------------------
@@ -194,6 +195,7 @@
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-endif.
@@ -861,3 +863,12 @@ ntoab(IP) ->
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.
+
+is_process_alive(Pid) when node(Pid) =:= node() ->
+ erlang:is_process_alive(Pid);
+is_process_alive(Pid) ->
+ case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
+ true -> true;
+ _ -> false
+ end.
+
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e9c356e1f6..7f3cf35faa 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,7 +33,7 @@
-include("rabbit_msg_store.hrl").
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 283d25c716..36f61628b8 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -32,16 +32,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(RABBIT_TCP_OPTS, [
- binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- {backlog, 128}, % use the maximum listen(2) backlog value
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
- {exit_on_close, false}
- ]).
-
-define(SSL_TIMEOUT, 5). %% seconds
-define(FIRST_TEST_BIND_PORT, 10000).
@@ -200,7 +190,7 @@ start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | ?RABBIT_TCP_OPTS],
+ [IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
@@ -315,6 +305,10 @@ hostname() ->
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
+tcp_opts() ->
+ {ok, Opts} = application:get_env(rabbit, tcp_listen_options),
+ Opts.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1781469a13..e82253165d 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -24,7 +24,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/0]).
+-export([conserve_memory/2, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
@@ -160,7 +160,8 @@
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
--spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+-spec(server_properties/1 :: (rabbit_types:protocol()) ->
+ rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
@@ -219,7 +220,7 @@ conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
-server_properties() ->
+server_properties(Protocol) ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -230,22 +231,30 @@ server_properties() ->
%% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms
%% from the config and merge them with the generated built-in properties
NormalizedConfigServerProps =
- [case X of
- {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
- longstr,
- list_to_binary(Value)};
- {BinKey, Type, Value} -> {BinKey, Type, Value}
- end || X <- RawConfigServerProps ++
- [{product, Product},
- {version, Version},
- {platform, "Erlang/OTP"},
- {copyright, ?COPYRIGHT_MESSAGE},
- {information, ?INFORMATION_MESSAGE}]],
+ [{<<"capabilities">>, table, server_capabilities(Protocol)} |
+ [case X of
+ {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
+ longstr,
+ list_to_binary(Value)};
+ {BinKey, Type, Value} -> {BinKey, Type, Value}
+ end || X <- RawConfigServerProps ++
+ [{product, Product},
+ {version, Version},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
%% Filter duplicated properties in favor of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
+server_capabilities(rabbit_framing_amqp_0_9_1) ->
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true}];
+server_capabilities(_) ->
+ [].
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
@@ -655,7 +664,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
- server_properties = server_properties(),
+ server_properties = server_properties(Protocol),
mechanisms = auth_mechanisms_binary(),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ddb53b1519..bc9a84c882 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -82,6 +82,7 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
passed = test_queue_cleanup(SecondaryNode),
+ passed = test_declare_on_dead_queue(SecondaryNode),
%% we now run the tests remotely, so that code coverage on the
%% local node picks up more of the delegate
@@ -90,13 +91,14 @@ run_cluster_dependent_tests(SecondaryNode) ->
Remote = spawn(SecondaryNode,
fun () -> Rs = [ test_delegates_async(Node),
test_delegates_sync(Node),
- test_queue_cleanup(Node) ],
+ test_queue_cleanup(Node),
+ test_declare_on_dead_queue(Node) ],
Self ! {self(), Rs}
end),
receive
{Remote, Result} ->
- Result = [passed, passed, passed]
- after 2000 ->
+ Result = lists:duplicate(length(Result), passed)
+ after 30000 ->
throw(timeout)
end,
@@ -1310,6 +1312,32 @@ test_queue_cleanup(_SecondaryNode) ->
end,
passed.
+test_declare_on_dead_queue(SecondaryNode) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
+ Self = self(),
+ Pid = spawn(SecondaryNode,
+ fun () ->
+ {new, #amqqueue{name = QueueName, pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ exit(QPid, kill),
+ Self ! {self(), killed, QPid}
+ end),
+ receive
+ {Pid, killed, QPid} ->
+ {existing, #amqqueue{name = QueueName,
+ pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ false = rabbit_misc:is_process_alive(QPid),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ {ok, 0} = rabbit_amqqueue:delete(Q, false, false),
+ passed
+ after 2000 ->
+ throw(failed_to_create_and_kill_queue)
+ end.
+
%---------------------------------------------------------------------
control_action(Command, Args) ->
@@ -2173,9 +2201,11 @@ test_configurable_server_properties() ->
BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
<<"copyright">>, <<"information">>],
+ Protocol = rabbit_framing_amqp_0_9_1,
+
%% Verify that the built-in properties are initially present
- ActualPropNames = [Key ||
- {Key, longstr, _} <- rabbit_reader:server_properties()],
+ ActualPropNames = [Key || {Key, longstr, _} <-
+ rabbit_reader:server_properties(Protocol)],
true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end,
BuiltInPropNames),
@@ -2186,9 +2216,10 @@ test_configurable_server_properties() ->
ConsProp = fun (X) -> application:set_env(rabbit,
server_properties,
[X | ServerProperties]) end,
- IsPropPresent = fun (X) -> lists:member(X,
- rabbit_reader:server_properties())
- end,
+ IsPropPresent =
+ fun (X) ->
+ lists:member(X, rabbit_reader:server_properties(Protocol))
+ end,
%% Add a wholly new property of the simplified {KeyAtom, StringValue} form
NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"},
@@ -2211,7 +2242,7 @@ test_configurable_server_properties() ->
{BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)),
list_to_binary(NewVerVal)},
ConsProp(NewVersion),
- ClobberedServerProps = rabbit_reader:server_properties(),
+ ClobberedServerProps = rabbit_reader:server_properties(Protocol),
%% Is the clobbering insert present?
true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}),
%% Is the clobbering insert the only thing with the clobbering key?