summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Wragg <david@rabbitmq.com>2010-11-23 10:58:49 +0000
committerDavid Wragg <david@rabbitmq.com>2010-11-23 10:58:49 +0000
commitd5276790137419632cfcdc1c43250166ab125732 (patch)
treefecba36b8505a6a35c06cedf227fd748c51eeb38 /src
parent89fcddb7e956f077169947c2abe8a13ea7337fc0 (diff)
parente64d8aa70ef29db76b40ecffe9d24902e0444e12 (diff)
downloadrabbitmq-server-git-d5276790137419632cfcdc1c43250166ab125732.tar.gz
Merge bug23186 into default
plugin activator should use command line args instead of rabbit env vars
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl27
-rw-r--r--src/rabbit.erl122
-rw-r--r--src/rabbit_access_control.erl7
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl73
-rw-r--r--src/rabbit_binary_generator.erl7
-rw-r--r--src/rabbit_binding.erl20
-rw-r--r--src/rabbit_channel.erl30
-rw-r--r--src/rabbit_connection_sup.erl32
-rw-r--r--src/rabbit_control.erl8
-rw-r--r--src/rabbit_dialyzer.erl92
-rw-r--r--src/rabbit_exchange.erl29
-rw-r--r--src/rabbit_framing.erl64
-rw-r--r--src/rabbit_heartbeat.erl54
-rw-r--r--src/rabbit_invariable_queue.erl2
-rw-r--r--src/rabbit_memory_monitor.erl5
-rw-r--r--src/rabbit_misc.erl114
-rw-r--r--src/rabbit_mnesia.erl112
-rw-r--r--src/rabbit_msg_store.erl6
-rw-r--r--src/rabbit_multi.erl11
-rw-r--r--src/rabbit_net.erl6
-rw-r--r--src/rabbit_networking.erl14
-rw-r--r--src/rabbit_queue_index.erl123
-rw-r--r--src/rabbit_reader.erl81
-rw-r--r--src/rabbit_ssl.erl13
-rw-r--r--src/rabbit_types.erl8
-rw-r--r--src/rabbit_upgrade.erl158
-rw-r--r--src/rabbit_upgrade_functions.erl78
-rw-r--r--src/rabbit_variable_queue.erl2
-rw-r--r--src/rabbit_writer.erl5
-rw-r--r--src/supervisor2.erl3
31 files changed, 871 insertions, 453 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 230d1f2aa1..6e02b23ecb 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -177,7 +177,7 @@
format_status/2]).
%% Internal exports
--export([init_it/6, print_event/3]).
+-export([init_it/6]).
-import(error_logger, [format/2]).
@@ -192,10 +192,13 @@
-ifdef(use_specs).
--spec(handle_common_termination/3 ::
- (any(), atom(), #gs2_state{}) -> no_return()).
+-type(gs2_state() :: #gs2_state{}).
--spec(hibernate/1 :: (#gs2_state{}) -> no_return()).
+-spec(handle_common_termination/3 ::
+ (any(), atom(), gs2_state()) -> no_return()).
+-spec(hibernate/1 :: (gs2_state()) -> no_return()).
+-spec(pre_hibernate/1 :: (gs2_state()) -> no_return()).
+-spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()).
-endif.
@@ -612,7 +615,7 @@ process_msg(Msg,
_Msg when Debug =:= [] ->
handle_msg(Msg, GS2State);
_Msg ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+ Debug1 = sys:handle_debug(Debug, fun print_event/3,
Name, {in, Msg}),
handle_msg(Msg, GS2State #gs2_state { debug = Debug1 })
end.
@@ -838,13 +841,13 @@ handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
time = Time1,
debug = Debug1});
{noreply, NState} ->
- Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
{noreply, NState}),
loop(GS2State #gs2_state {state = NState,
time = infinity,
debug = Debug1});
{noreply, NState, Time1} ->
- Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
{noreply, NState}),
loop(GS2State #gs2_state {state = NState,
time = Time1,
@@ -866,13 +869,13 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
debug = Debug}) ->
case Reply of
{noreply, NState} ->
- Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
{noreply, NState}),
loop(GS2State #gs2_state { state = NState,
time = infinity,
debug = Debug1 });
{noreply, NState, Time1} ->
- Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
{noreply, NState}),
loop(GS2State #gs2_state { state = NState,
time = Time1,
@@ -894,7 +897,7 @@ handle_common_termination(Reply, Msg, GS2State) ->
reply(Name, {To, Tag}, Reply, State, Debug) ->
reply({To, Tag}, Reply),
sys:handle_debug(
- Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}).
+ Debug, fun print_event/3, Name, {out, Reply, To, State}).
%%-----------------------------------------------------------------
@@ -903,10 +906,6 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
system_continue(Parent, Debug, GS2State) ->
loop(GS2State #gs2_state { parent = Parent, debug = Debug }).
--ifdef(use_specs).
--spec system_terminate(_, _, _, [_]) -> no_return().
--endif.
-
system_terminate(Reason, _Parent, Debug, GS2State) ->
terminate(Reason, [], GS2State #gs2_state { debug = Debug }).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 8c36a9f0a4..61a3a53dbc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -291,96 +291,66 @@ run_boot_step({StepName, Attributes}) ->
io:format("-- ~s~n", [Description]);
MFAs ->
io:format("starting ~-60s ...", [Description]),
- [case catch apply(M,F,A) of
- {'EXIT', Reason} ->
- boot_error("FAILED~nReason: ~p~n", [Reason]);
- ok ->
- ok
+ [try
+ apply(M,F,A)
+ catch
+ _:Reason -> boot_error("FAILED~nReason: ~p~n", [Reason])
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
end.
-module_attributes(Module) ->
- case catch Module:module_info(attributes) of
- {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
- io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
- [Module]),
- [];
- {'EXIT', Reason} ->
- exit(Reason);
- V ->
- V
- end.
-
boot_steps() ->
- AllApps = [App || {App, _, _} <- application:loaded_applications()],
- Modules = lists:usort(
- lists:append([Modules
- || {ok, Modules} <-
- [application:get_key(App, modules)
- || App <- AllApps]])),
- UnsortedSteps =
- lists:flatmap(fun (Module) ->
- [{StepName, Attributes}
- || {rabbit_boot_step, [{StepName, Attributes}]}
- <- module_attributes(Module)]
- end, Modules),
- sort_boot_steps(UnsortedSteps).
+ sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
-sort_boot_steps(UnsortedSteps) ->
- G = digraph:new([acyclic]),
-
- %% Add vertices, with duplicate checking.
- [case digraph:vertex(G, StepName) of
- false -> digraph:add_vertex(G, StepName, Step);
- _ -> boot_error("Duplicate boot step name: ~w~n", [StepName])
- end || Step = {StepName, _Attrs} <- UnsortedSteps],
-
- %% Add edges, detecting cycles and missing vertices.
- lists:foreach(fun ({StepName, Attributes}) ->
- [add_boot_step_dep(G, StepName, PrecedingStepName)
- || {requires, PrecedingStepName} <- Attributes],
- [add_boot_step_dep(G, SucceedingStepName, StepName)
- || {enables, SucceedingStepName} <- Attributes]
- end, UnsortedSteps),
-
- %% Use topological sort to find a consistent ordering (if there is
- %% one, otherwise fail).
- SortedStepsRev = [begin
- {StepName, Step} = digraph:vertex(G, StepName),
- Step
- end || StepName <- digraph_utils:topsort(G)],
- SortedSteps = lists:reverse(SortedStepsRev),
-
- digraph:delete(G),
-
- %% Check that all mentioned {M,F,A} triples are exported.
- case [{StepName, {M,F,A}}
- || {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
- not erlang:function_exported(M, F, length(A))] of
- [] -> SortedSteps;
- MissingFunctions -> boot_error("Boot step functions not exported: ~p~n",
- [MissingFunctions])
- end.
+vertices(_Module, Steps) ->
+ [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
-add_boot_step_dep(G, RunsSecond, RunsFirst) ->
- case digraph:add_edge(G, RunsSecond, RunsFirst) of
- {error, Reason} ->
- boot_error("Could not add boot step dependency of ~w on ~w:~n~s",
- [RunsSecond, RunsFirst,
+edges(_Module, Steps) ->
+ [case Key of
+ requires -> {StepName, OtherStep};
+ enables -> {OtherStep, StepName}
+ end || {StepName, Atts} <- Steps,
+ {Key, OtherStep} <- Atts,
+ Key =:= requires orelse Key =:= enables].
+
+sort_boot_steps(UnsortedSteps) ->
+ case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ UnsortedSteps) of
+ {ok, G} ->
+ %% Use topological sort to find a consistent ordering (if
+ %% there is one, otherwise fail).
+ SortedSteps = lists:reverse(
+ [begin
+ {StepName, Step} = digraph:vertex(G, StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)]),
+ digraph:delete(G),
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}} ||
+ {StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> boot_error(
+ "Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ {error, {edge, Reason, From, To}} ->
+ boot_error(
+ "Could not add boot step dependency of ~w on ~w:~n~s",
+ [To, From,
case Reason of
{bad_vertex, V} ->
io_lib:format("Boot step not registered: ~w~n", [V]);
{bad_edge, [First | Rest]} ->
[io_lib:format("Cyclic dependency: ~w", [First]),
- [io_lib:format(" depends on ~w", [Next])
- || Next <- Rest],
+ [io_lib:format(" depends on ~w", [Next]) ||
+ Next <- Rest],
io_lib:format(" depends on ~w~n", [First])]
- end]);
- _ ->
- ok
+ end])
end.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 15897dfa39..bc5880130d 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -37,7 +37,7 @@
check_vhost_access/2, check_resource_access/3]).
-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
clear_admin/1, list_users/0, lookup_user/1]).
--export([change_password_hash/2]).
+-export([change_password_hash/2, hash_password/1]).
-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
-export([set_permissions/5, clear_permissions/2,
list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
@@ -47,7 +47,7 @@
-ifdef(use_specs).
--export_type([username/0, password/0]).
+-export_type([username/0, password/0, password_hash/0]).
-type(permission_atom() :: 'configure' | 'read' | 'write').
-type(username() :: binary()).
@@ -73,9 +73,10 @@
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
-spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok').
+-spec(hash_password/1 :: (password()) -> password_hash()).
-spec(set_admin/1 :: (username()) -> 'ok').
-spec(clear_admin/1 :: (username()) -> 'ok').
--spec(list_users/0 :: () -> [username()]).
+-spec(list_users/0 :: () -> [{username(), boolean()}]).
-spec(lookup_user/1 ::
(username()) -> rabbit_types:ok(rabbit_types:user())
| rabbit_types:error('not_found')).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9d78bafa95..5cdd0e3cb2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -97,14 +97,14 @@
-spec(with_exclusive_access_or_die/3 ::
(name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
--spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
--spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(info/1 :: (rabbit_types:amqqueue()) -> rabbit_types:infos()).
-spec(info/2 ::
- (rabbit_types:amqqueue(), [rabbit_types:info_key()])
- -> [rabbit_types:info()]).
--spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
--spec(info_all/2 :: (rabbit_types:vhost(), [rabbit_types:info_key()])
- -> [[rabbit_types:info()]]).
+ (rabbit_types:amqqueue(), rabbit_types:info_keys())
+ -> rabbit_types:infos()).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
+-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
+ -> [rabbit_types:infos()]).
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -162,7 +162,7 @@
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(maybe_expire/1 :: (pid()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()).
+-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
-endif.
@@ -361,7 +361,7 @@ consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers, infinity).
consumers_all(VHostPath) ->
- lists:concat(
+ lists:append(
map(VHostPath,
fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} ||
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2c975b4e..a999fe582c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -315,6 +315,25 @@ ch_record(ChPid) ->
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C).
+maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ txn = Txn,
+ unsent_message_count = UnsentMessageCount}) ->
+ case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of
+ {0, 0, 0, none} -> ok = erase_ch_record(C),
+ false;
+ _ -> store_ch_record(C),
+ true
+ end.
+
+erase_ch_record(#cr{ch_pid = ChPid,
+ limiter_pid = LimiterPid,
+ monitor_ref = MonitorRef}) ->
+ ok = rabbit_limiter:unregister(LimiterPid, self()),
+ erlang:demonitor(MonitorRef),
+ erase({ch, ChPid}),
+ ok.
+
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
@@ -361,7 +380,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
- store_ch_record(NewC),
+ true = maybe_store_ch_record(NewC),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -380,7 +399,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
- store_ch_record(C#cr{is_limit_active = true}),
+ true = maybe_store_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
@@ -479,7 +498,7 @@ possibly_unblock(State, ChPid, Update) ->
State;
C ->
NewC = Update(C),
- store_ch_record(NewC),
+ maybe_store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
@@ -500,10 +519,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found ->
{ok, State};
- #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
- acktags = ChAckTags} ->
- erlang:demonitor(MonitorRef),
- erase({ch, ChPid}),
+ C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} ->
+ ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, _} -> none;
@@ -562,7 +579,7 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
@@ -652,7 +669,10 @@ i(Item, _) ->
throw({bad_argument, Item}).
emit_stats(State) ->
- rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)).
+ emit_stats(State, []).
+
+emit_stats(State, Extra) ->
+ rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
%---------------------------------------------------------------------------
@@ -772,8 +792,9 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- store_ch_record(
- C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
+ true = maybe_store_ch_record(
+ C#cr{acktags = sets:add_element(AckTag,
+ ChAckTags)});
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
@@ -791,8 +812,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
+ true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
ok = case ConsumerCount of
0 -> rabbit_limiter:register(LimiterPid, self());
_ -> ok
@@ -826,12 +847,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
- store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
- case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(LimiterPid, self());
- _ -> ok
- end,
+ C = #cr{consumer_count = ConsumerCount,
+ limiter_pid = LimiterPid} ->
+ C1 = C#cr{consumer_count = ConsumerCount -1},
+ maybe_store_ch_record(
+ case ConsumerCount of
+ 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()),
+ C1#cr{limiter_pid = undefined};
+ _ -> C1
+ end),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -880,7 +904,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(requeue_and_run(AckTags, State))
end;
@@ -904,7 +928,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
{C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
_ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
- store_ch_record(C1),
+ maybe_store_ch_record(C1),
noreply(State#q{backing_queue_state = BQS1})
end;
@@ -915,7 +939,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
false -> BQS1 = BQ:ack(AckTags, BQS),
@@ -1032,7 +1056,10 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end),
+ rabbit_event:if_enabled(StatsTimer,
+ fun () ->
+ emit_stats(State, [{idle_since, now()}])
+ end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
backing_queue_state = BQS2},
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 722573c769..b2997ae259 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -75,9 +75,12 @@
rabbit_types:encoded_content()).
-spec(clear_encoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:unencoded_content()).
--spec(map_exception/3 :: (non_neg_integer(), rabbit_types:amqp_error(),
+-spec(map_exception/3 :: (rabbit_channel:channel_number(),
+ rabbit_types:amqp_error() | any(),
rabbit_types:protocol()) ->
- {boolean(), non_neg_integer(), rabbit_framing:amqp_method()}).
+ {boolean(),
+ rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record()}).
-endif.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 1af213c474..9d1399f7cc 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -60,7 +60,7 @@
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
--opaque(deletions() :: dict:dictionary()).
+-opaque(deletions() :: dict()).
-spec(recover/0 :: () -> [rabbit_types:binding()]).
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
@@ -78,13 +78,13 @@
-spec(list_for_source_and_destination/2 ::
(rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
bindings()).
--spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
--spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
--spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
- [rabbit_types:info()]).
--spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
--spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
- -> [[rabbit_types:info()]]).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(info/1 :: (rabbit_types:binding()) -> rabbit_types:infos()).
+-spec(info/2 :: (rabbit_types:binding(), rabbit_types:info_keys()) ->
+ rabbit_types:infos()).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
+-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
+ -> [rabbit_types:infos()]).
-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
-spec(remove_for_destination/1 ::
@@ -94,9 +94,9 @@
-spec(process_deletions/1 :: (deletions()) -> 'ok').
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
- {'undefined' | rabbit_types:binding_source(),
+ {'undefined' | rabbit_types:exchange(),
'deleted' | 'not_deleted',
- deletions()}, deletions()) -> deletions()).
+ bindings()}, deletions()) -> deletions()).
-spec(new_deletions/0 :: () -> deletions()).
-endif.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 873268cdd9..800cc23793 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -87,17 +87,17 @@
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
--spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok').
+-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
--spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
--spec(info/1 :: (pid()) -> [rabbit_types:info()]).
--spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
--spec(info_all/0 :: () -> [[rabbit_types:info()]]).
--spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
+-spec(info_all/0 :: () -> [rabbit_types:infos()]).
+-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
-endif.
@@ -267,9 +267,11 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- rabbit_event:if_enabled(StatsTimer, fun () ->
- internal_emit_stats(State)
- end),
+ rabbit_event:if_enabled(StatsTimer,
+ fun () ->
+ internal_emit_stats(
+ State, [{idle_since, now()}])
+ end),
{hibernate,
State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
@@ -1201,11 +1203,14 @@ update_measures(Type, QX, Inc, Measure) ->
put({Type, QX},
orddict:store(Measure, Cur + Inc, Measures)).
-internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
+internal_emit_stats(State) ->
+ internal_emit_stats(State, []).
+
+internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
- rabbit_event:notify(channel_stats, CoarseStats);
+ rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
fine ->
FineStats =
[{channel_queue_stats,
@@ -1215,7 +1220,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
{channel_queue_exchange_stats,
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
+ rabbit_event:notify(channel_stats,
+ Extra ++ CoarseStats ++ FineStats)
end.
erase_queue_stats(QPid) ->
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index b3821d3b8b..ff3995b54a 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -52,21 +52,22 @@
start_link() ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
- {ok, ChannelSupSupPid} =
- supervisor2:start_child(
- SupPid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, Collector} =
supervisor2:start_child(
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]},
+ [ChannelSupSupPid, Collector,
+ rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
@@ -78,22 +79,3 @@ reader(Pid) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
-start_heartbeat_fun(SupPid) ->
- fun (_Sock, 0) ->
- none;
- (Sock, TimeoutSec) ->
- Parent = self(),
- {ok, Sender} =
- supervisor2:start_child(
- SupPid, {heartbeat_sender,
- {rabbit_heartbeat, start_heartbeat_sender,
- [Parent, Sock, TimeoutSec]},
- transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {ok, Receiver} =
- supervisor2:start_child(
- SupPid, {heartbeat_receiver,
- {rabbit_heartbeat, start_heartbeat_receiver,
- [Parent, Sock, TimeoutSec]},
- transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {Sender, Receiver}
- end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6b21274529..72b77b1f07 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -94,9 +94,7 @@ start() ->
halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
- [lists:flatten(
- rabbit_misc:intersperse(
- " ", [atom_to_list(Command) | Args]))]),
+ [string:join([atom_to_list(Command) | Args], " ")]),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
@@ -321,7 +319,7 @@ display_info_list(Other, _) ->
Other.
display_row(Row) ->
- io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
+ io:fwrite(string:join(Row, "\t")),
io:nl().
-define(IS_U8(X), (X >= 0 andalso X =< 255)).
@@ -346,8 +344,6 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
-format_info_item([C|_] = Value) when is_number(C), C >= 32, C =< 255 ->
- Value;
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
deleted file mode 100644
index a9806305ef..0000000000
--- a/src/rabbit_dialyzer.erl
+++ /dev/null
@@ -1,92 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_dialyzer).
-
--export([create_basic_plt/1, add_to_plt/2, dialyze_files/2,
- halt_with_code/1]).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(create_basic_plt/1 :: (file:filename()) -> 'ok').
--spec(add_to_plt/2 :: (file:filename(), string()) -> 'ok').
--spec(dialyze_files/2 :: (file:filename(), string()) -> 'ok').
--spec(halt_with_code/1 :: (atom()) -> no_return()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-create_basic_plt(BasicPltPath) ->
- OptsRecord = dialyzer_options:build(
- [{analysis_type, plt_build},
- {output_plt, BasicPltPath},
- {files_rec, otp_apps_dependencies_paths()}]),
- dialyzer_cl:start(OptsRecord),
- ok.
-
-add_to_plt(PltPath, FilesString) ->
- Files = string:tokens(FilesString, " "),
- DialyzerWarnings = dialyzer:run([{analysis_type, plt_add},
- {init_plt, PltPath},
- {output_plt, PltPath},
- {files, Files}]),
- print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1),
- ok.
-
-dialyze_files(PltPath, ModifiedFiles) ->
- Files = string:tokens(ModifiedFiles, " "),
- DialyzerWarnings = dialyzer:run([{init_plt, PltPath},
- {files, Files},
- {warnings, [behaviours,
- race_conditions]}]),
- case DialyzerWarnings of
- [] -> io:format("~nOk~n");
- _ -> io:format("~n~nFAILED with the following ~p warnings:~n~n",
- [length(DialyzerWarnings)]),
- print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1)
- end,
- ok.
-
-print_warnings(Warnings, FormatFun) ->
- [io:format("~s~n", [FormatFun(W)]) || W <- Warnings],
- io:format("~n").
-
-otp_apps_dependencies_paths() ->
- [code:lib_dir(App, ebin) ||
- App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]].
-
-halt_with_code(ok) ->
- halt();
-halt_with_code(fail) ->
- halt(1).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 465642332c..4c0f341ffc 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -68,14 +68,14 @@
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
--spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
--spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
- (rabbit_types:exchange(), [rabbit_types:info_key()])
- -> [rabbit_types:info()]).
--spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
--spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
- -> [[rabbit_types:info()]]).
+ (rabbit_types:exchange(), rabbit_types:info_keys())
+ -> rabbit_types:infos()).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
+-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
+ -> [rabbit_types:infos()]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
-spec(delete/2 ::
@@ -228,7 +228,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X = #exchange{name = XName}, Delivery) ->
rabbit_router:deliver(
- route(Delivery, {queue:from_list([X]), sets:from_list([XName]), []}),
+ route(Delivery, {queue:from_list([X]), XName, []}),
Delivery).
route(Delivery, {WorkList, SeenXs, QNames}) ->
@@ -252,13 +252,22 @@ process_alternate(_X, Results) ->
Results.
process_route(#resource{kind = exchange} = XName,
+ {_WorkList, XName, _QNames} = Acc) ->
+ Acc;
+process_route(#resource{kind = exchange} = XName,
+ {WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
+ {case lookup(XName) of
+ {ok, X} -> queue:in(X, WorkList);
+ {error, not_found} -> WorkList
+ end, gb_sets:from_list([SeenX, XName]), QNames};
+process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames} = Acc) ->
- case sets:is_element(XName, SeenXs) of
+ case gb_sets:is_element(XName, SeenXs) of
true -> Acc;
false -> {case lookup(XName) of
{ok, X} -> queue:in(X, WorkList);
{error, not_found} -> WorkList
- end, sets:add_element(XName, SeenXs), QNames}
+ end, gb_sets:add_element(XName, SeenXs), QNames}
end;
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl
new file mode 100644
index 0000000000..a0c8f4d566
--- /dev/null
+++ b/src/rabbit_framing.erl
@@ -0,0 +1,64 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% TODO auto-generate
+
+-module(rabbit_framing).
+
+-ifdef(use_specs).
+
+-export_type([protocol/0,
+ amqp_field_type/0, amqp_property_type/0,
+ amqp_table/0, amqp_array/0, amqp_value/0,
+ amqp_method_name/0, amqp_method/0, amqp_method_record/0,
+ amqp_method_field_name/0, amqp_property_record/0,
+ amqp_exception/0, amqp_exception_code/0, amqp_class_id/0]).
+
+-type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1').
+
+-define(protocol_type(T), type(T :: rabbit_framing_amqp_0_8:T |
+ rabbit_framing_amqp_0_9_1:T)).
+
+-?protocol_type(amqp_field_type()).
+-?protocol_type(amqp_property_type()).
+-?protocol_type(amqp_table()).
+-?protocol_type(amqp_array()).
+-?protocol_type(amqp_value()).
+-?protocol_type(amqp_method_name()).
+-?protocol_type(amqp_method()).
+-?protocol_type(amqp_method_record()).
+-?protocol_type(amqp_method_field_name()).
+-?protocol_type(amqp_property_record()).
+-?protocol_type(amqp_exception()).
+-?protocol_type(amqp_exception_code()).
+-?protocol_type(amqp_class_id()).
+
+-endif.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index a9945af1d4..589bf7cc9d 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -32,7 +32,7 @@
-module(rabbit_heartbeat).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
- pause_monitor/1, resume_monitor/1]).
+ start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
-include("rabbit.hrl").
@@ -41,16 +41,28 @@
-ifdef(use_specs).
-export_type([heartbeaters/0]).
+-export_type([start_heartbeat_fun/0]).
--type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
+-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}).
+
+-type(heartbeat_callback() :: fun (() -> any())).
+
+-type(start_heartbeat_fun() ::
+ fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) ->
+ no_return())).
-spec(start_heartbeat_sender/3 ::
- (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
-spec(start_heartbeat_receiver/3 ::
- (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
+-spec(start_heartbeat_fun/1 ::
+ (pid()) -> start_heartbeat_fun()).
+
+
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -58,40 +70,60 @@
%%----------------------------------------------------------------------------
-start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater(
{Sock, TimeoutSec * 1000 div 2, send_oct, 0,
fun () ->
- catch rabbit_net:send(
- Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ SendFun(),
continue
end}).
-start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
- Parent ! timeout,
+ ReceiveFun(),
stop
end}).
-pause_monitor(none) ->
+start_heartbeat_fun(SupPid) ->
+ fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ {ok, Sender} =
+ start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ SendFun, heartbeat_sender,
+ start_heartbeat_sender),
+ {ok, Receiver} =
+ start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ ReceiveFun, heartbeat_receiver,
+ start_heartbeat_receiver),
+ {Sender, Receiver}
+ end.
+
+pause_monitor({_Sender, none}) ->
ok;
pause_monitor({_Sender, Receiver}) ->
Receiver ! pause,
ok.
-resume_monitor(none) ->
+resume_monitor({_Sender, none}) ->
ok;
resume_monitor({_Sender, Receiver}) ->
Receiver ! resume,
ok.
%%----------------------------------------------------------------------------
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+ {ok, none};
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+ supervisor2:start_child(
+ SupPid, {Name,
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
{ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 671634c464..5a0532eac1 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -52,7 +52,7 @@
-type(state() :: #iv_state { queue :: queue(),
qname :: rabbit_amqqueue:name(),
len :: non_neg_integer(),
- pending_ack :: dict:dictionary()
+ pending_ack :: dict()
}).
-include("rabbit_backing_queue_spec.hrl").
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index f87b62713a..bb4d77a4f7 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -63,8 +63,9 @@
%% have some space for when the queues don't quite respond as fast as
%% we would like, or when there is buffering going on in other parts
%% of the system. In short, we aim to stay some distance away from
-%% when the memory alarms will go off, which cause channel.flow.
-%% Note that all other Thresholds are relative to this scaling.
+%% when the memory alarms will go off, which cause backpressure (of
+%% some sort) on producers. Note that all other Thresholds are
+%% relative to this scaling.
-define(MEMORY_LIMIT_SCALING, 0.4).
-define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 086d260e24..230f4db524 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -50,7 +50,7 @@
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
--export([intersperse/2, upmap/2, map_in_order/2]).
+-export([upmap/2, map_in_order/2]).
-export([table_fold/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2]).
@@ -64,6 +64,8 @@
-export([recursive_delete/1, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
-export([get_options/2]).
+-export([all_module_attributes/1, build_acyclic_graph/3]).
+-export([now_ms/0]).
-import(mnesia).
-import(lists).
@@ -82,6 +84,11 @@
-type(optdef() :: {flag, string()} | {option, string(), any()}).
-type(channel_or_connection_exit()
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
+-type(digraph_label() :: term()).
+-type(graph_vertex_fun() ::
+ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
+-type(graph_edge_fun() ::
+ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -128,8 +135,8 @@
-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok').
-spec(report_cover/0 :: () -> 'ok').
--spec(enable_cover/1 :: (file:filename()) -> ok_or_error()).
--spec(report_cover/1 :: (file:filename()) -> 'ok').
+-spec(enable_cover/1 :: ([file:filename() | atom()]) -> ok_or_error()).
+-spec(report_cover/1 :: ([file:filename() | atom()]) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
@@ -147,7 +154,6 @@
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
--spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
@@ -177,13 +183,20 @@
-spec(recursive_delete/1 ::
([file:filename()])
-> rabbit_types:ok_or_error({file:filename(), any()})).
--spec(dict_cons/3 :: (any(), any(), dict:dictionary()) ->
- dict:dictionary()).
--spec(orddict_cons/3 :: (any(), any(), orddict:dictionary()) ->
- orddict:dictionary()).
+-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
+-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
+-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(build_acyclic_graph/3 ::
+ (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
+ -> rabbit_types:ok_or_error2(digraph(),
+ {'vertex', 'duplicate', digraph:vertex()} |
+ {'edge', ({bad_vertex, digraph:vertex()} |
+ {bad_edge, [digraph:vertex()]}),
+ digraph:vertex(), digraph:vertex()})).
+-spec(now_ms/0 :: () -> non_neg_integer()).
-endif.
@@ -268,29 +281,30 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
[Kind, Name, VHostPath])).
-enable_cover() ->
- enable_cover(".").
+enable_cover() -> enable_cover(["."]).
-enable_cover([Root]) when is_atom(Root) ->
- enable_cover(atom_to_list(Root));
-enable_cover(Root) ->
- case cover:compile_beam_directory(filename:join(Root, "ebin")) of
- {error,Reason} -> {error,Reason};
- _ -> ok
- end.
+enable_cover(Dirs) ->
+ lists:foldl(fun (Dir, ok) ->
+ case cover:compile_beam_directory(
+ filename:join(lists:concat([Dir]),"ebin")) of
+ {error, _} = Err -> Err;
+ _ -> ok
+ end;
+ (_Dir, Err) ->
+ Err
+ end, ok, Dirs).
start_cover(NodesS) ->
{ok, _} = cover:start([makenode(N) || N <- NodesS]),
ok.
-report_cover() ->
- report_cover(".").
+report_cover() -> report_cover(["."]).
+
+report_cover(Dirs) -> [report_cover1(lists:concat([Dir])) || Dir <- Dirs], ok.
-report_cover([Root]) when is_atom(Root) ->
- report_cover(atom_to_list(Root));
-report_cover(Root) ->
+report_cover1(Root) ->
Dir = filename:join(Root, "cover"),
- ok = filelib:ensure_dir(filename:join(Dir,"junk")),
+ ok = filelib:ensure_dir(filename:join(Dir, "junk")),
lists:foreach(fun (F) -> file:delete(F) end,
filelib:wildcard(filename:join(Dir, "*.html"))),
{ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]),
@@ -402,10 +416,6 @@ tcp_name(Prefix, IPAddress, Port)
io_lib:format("~w_~s:~w",
[Prefix, inet_parse:ntoa(IPAddress), Port]))).
-intersperse(_, []) -> [];
-intersperse(_, [E]) -> [E];
-intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)].
-
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
@@ -721,3 +731,53 @@ get_flag(K, [Nk | As]) ->
{[Nk | As1], V};
get_flag(_, []) ->
{[], false}.
+
+now_ms() ->
+ timer:now_diff(now(), {0,0,0}) div 1000.
+
+module_attributes(Module) ->
+ case catch Module:module_info(attributes) of
+ {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
+ io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
+ [Module]),
+ [];
+ {'EXIT', Reason} ->
+ exit(Reason);
+ V ->
+ V
+ end.
+
+all_module_attributes(Name) ->
+ Modules =
+ lists:usort(
+ lists:append(
+ [Modules || {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
+ lists:foldl(
+ fun (Module, Acc) ->
+ case lists:append([Atts || {N, Atts} <- module_attributes(Module),
+ N =:= Name]) of
+ [] -> Acc;
+ Atts -> [{Module, Atts} | Acc]
+ end
+ end, [], Modules).
+
+
+build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
+ G = digraph:new([acyclic]),
+ try
+ [case digraph:vertex(G, Vertex) of
+ false -> digraph:add_vertex(G, Vertex, Label);
+ _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
+ end || {Module, Atts} <- Graph,
+ {Vertex, Label} <- VertexFun(Module, Atts)],
+ [case digraph:add_edge(G, From, To) of
+ {error, E} -> throw({graph_error, {edge, E, From, To}});
+ _ -> ok
+ end || {Module, Atts} <- Graph,
+ {From, To} <- EdgeFun(Module, Atts)],
+ {ok, G}
+ catch {graph_error, Reason} ->
+ true = digraph:delete(G),
+ {error, Reason}
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 8de2f0d664..cb3251c704 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -44,9 +44,6 @@
-include("rabbit.hrl").
--define(SCHEMA_VERSION_SET, []).
--define(SCHEMA_VERSION_FILENAME, "schema_version").
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -94,9 +91,6 @@ init() ->
ok = ensure_mnesia_running(),
ok = ensure_mnesia_dir(),
ok = init_db(read_cluster_nodes_config(), true),
- ok = rabbit_misc:write_term_file(filename:join(
- dir(), ?SCHEMA_VERSION_FILENAME),
- [?SCHEMA_VERSION_SET]),
ok.
is_db_empty() ->
@@ -256,12 +250,12 @@ ensure_mnesia_dir() ->
ensure_mnesia_running() ->
case mnesia:system_info(is_running) of
yes -> ok;
- no -> throw({error, mnesia_not_running})
+ no -> throw({error, mnesia_not_running})
end.
ensure_mnesia_not_running() ->
case mnesia:system_info(is_running) of
- no -> ok;
+ no -> ok;
yes -> throw({error, mnesia_unexpectedly_running})
end.
@@ -367,39 +361,41 @@ init_db(ClusterNodes, Force) ->
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
{ok, Nodes} ->
case Force of
- false ->
- FailedClusterNodes = ProperClusterNodes -- Nodes,
- case FailedClusterNodes of
- [] -> ok;
- _ ->
- throw({error, {failed_to_cluster_with,
- FailedClusterNodes,
- "Mnesia could not connect to some nodes."}})
- end;
- _ -> ok
+ false -> FailedClusterNodes = ProperClusterNodes -- Nodes,
+ case FailedClusterNodes of
+ [] -> ok;
+ _ -> throw({error, {failed_to_cluster_with,
+ FailedClusterNodes,
+ "Mnesia could not connect "
+ "to some nodes."}})
+ end;
+ true -> ok
end,
- case Nodes of
- [] ->
- case mnesia:system_info(use_dir) of
- true ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- %% NB: we cannot use rabbit_log here since
- %% it may not have been started yet
- error_logger:warning_msg(
- "schema integrity check failed: ~p~n"
- "moving database to backup location "
- "and recreating schema from scratch~n",
- [Reason]),
- ok = move_db(),
- ok = create_schema()
- end;
- false ->
- ok = create_schema()
+ case {Nodes, mnesia:system_info(use_dir),
+ mnesia:system_info(db_nodes)} of
+ {[], true, [_]} ->
+ %% True single disc node, attempt upgrade
+ wait_for_tables(),
+ case rabbit_upgrade:maybe_upgrade() of
+ ok ->
+ ensure_schema_ok();
+ version_not_available ->
+ schema_ok_or_move()
end;
- [_|_] ->
+ {[], true, _} ->
+ %% "Master" (i.e. without config) disc node in cluster,
+ %% verify schema
+ wait_for_tables(),
+ ensure_version_ok(rabbit_upgrade:read_version()),
+ ensure_schema_ok();
+ {[], false, _} ->
+ %% First RAM node in cluster, start from scratch
+ ok = create_schema();
+ {[AnotherNode|_], _, _} ->
+ %% Subsequent node in cluster, catch up
+ ensure_version_ok(rabbit_upgrade:read_version()),
+ ensure_version_ok(
+ rpc:call(AnotherNode, rabbit_upgrade, read_version, [])),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
ok = wait_for_replicated_tables(),
@@ -408,14 +404,43 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- ok = ensure_schema_integrity()
+ ensure_schema_ok()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
%% are members of a different cluster
- throw({error, {unable_to_join_cluster,
- ClusterNodes, Reason}})
+ throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
+ end.
+
+schema_ok_or_move() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ %% NB: we cannot use rabbit_log here since it may not have been
+ %% started yet
+ error_logger:warning_msg("schema integrity check failed: ~p~n"
+ "moving database to backup location "
+ "and recreating schema from scratch~n",
+ [Reason]),
+ ok = move_db(),
+ ok = create_schema()
+ end.
+
+ensure_version_ok({ok, DiscVersion}) ->
+ case rabbit_upgrade:desired_version() of
+ DiscVersion -> ok;
+ DesiredVersion -> throw({error, {schema_mismatch,
+ DesiredVersion, DiscVersion}})
+ end;
+ensure_version_ok({error, _}) ->
+ ok = rabbit_upgrade:write_version().
+
+ensure_schema_ok() ->
+ case check_schema_integrity() of
+ ok -> ok;
+ {error, Reason} -> throw({error, {schema_invalid, Reason}})
end.
create_schema() ->
@@ -426,7 +451,8 @@ create_schema() ->
cannot_start_mnesia),
ok = create_tables(),
ok = ensure_schema_integrity(),
- ok = wait_for_tables().
+ ok = wait_for_tables(),
+ ok = rabbit_upgrade:write_version().
move_db() ->
mnesia:stop(),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 682a7faa62..fd84109bf9 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -130,7 +130,7 @@
-type(client_msstate() :: #client_msstate {
server :: server(),
client_ref :: client_ref(),
- file_handle_cache :: dict:dictionary(),
+ file_handle_cache :: dict(),
index_state :: any(),
index_module :: atom(),
dir :: file:filename(),
@@ -164,8 +164,8 @@
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
- non_neg_integer()).
--spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> non_neg_integer()).
+ 'ok').
+-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok').
-endif.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index b48d0aa394..0030216e42 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -65,8 +65,7 @@ start() ->
halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
- [lists:flatten(
- rabbit_misc:intersperse(" ", FullCommand))]),
+ [string:join(FullCommand, " ")]),
usage();
timeout ->
print_error("timeout starting some nodes.", []),
@@ -227,11 +226,11 @@ run_rabbitmq_server_unix() ->
run_rabbitmq_server_win32() ->
Cmd = filename:nativename(os:find_executable("cmd")),
- CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME")
- ++ "\\rabbitmq-server.bat\" -noinput",
+ CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") ++
+ "\\rabbitmq-server.bat\" -noinput -detached",
erlang:open_port({spawn_executable, Cmd},
[{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]},
- nouse_stdio, hide]).
+ nouse_stdio]).
is_rabbit_running(Node, RpcTimeout) ->
case rpc:call(Node, rabbit, status, [], RpcTimeout) of
@@ -315,7 +314,7 @@ is_dead(Pid) ->
end},
{win32, fun () ->
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
- PidS ++ "\""),
+ PidS ++ "\" 2>&1"),
case re:run(Res, "erl\\.exe", [{capture, none}]) of
match -> false;
_ -> true
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 53d0d5cbf3..0940dce2f4 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -34,7 +34,7 @@
-export([async_recv/3, close/1, controlling_process/2,
getstat/2, peername/1, peercert/1, port_command/2,
- send/2, sockname/1]).
+ send/2, sockname/1, is_ssl/1]).
%%---------------------------------------------------------------------------
@@ -65,6 +65,7 @@
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
+-spec(is_ssl/1 :: (socket()) -> boolean()).
-spec(getstat/2 ::
(socket(), [stat_option()])
-> ok_val_or_error([{stat_option(), integer()}])).
@@ -133,3 +134,6 @@ sockname(Sock) when ?IS_SSL(Sock) ->
ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) ->
inet:sockname(Sock).
+
+is_ssl(Sock) ->
+ ?IS_SSL(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 03a9b38679..1c542ffe7b 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -67,21 +67,21 @@
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
--spec(start_ssl_listener/3 :: (hostname(), ip_port(), [rabbit_types:info()])
+-spec(start_ssl_listener/3 :: (hostname(), ip_port(), rabbit_types:infos())
-> 'ok').
-spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
-spec(connections/0 :: () -> [rabbit_types:connection()]).
--spec(connection_info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(connection_info/1 ::
- (rabbit_types:connection()) -> [rabbit_types:info()]).
+ (rabbit_types:connection()) -> rabbit_types:infos()).
-spec(connection_info/2 ::
- (rabbit_types:connection(), [rabbit_types:info_key()])
- -> [rabbit_types:info()]).
--spec(connection_info_all/0 :: () -> [[rabbit_types:info()]]).
+ (rabbit_types:connection(), rabbit_types:info_keys())
+ -> rabbit_types:infos()).
+-spec(connection_info_all/0 :: () -> [rabbit_types:infos()]).
-spec(connection_info_all/1 ::
- ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
+ (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/3 ::
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 28d0b47dd6..248c1fbc98 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -36,6 +36,8 @@
publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
+-export([add_queue_ttl/0]).
+
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
@@ -180,6 +182,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({add_queue_ttl, []}).
+
-ifdef(use_specs).
-type(hdl() :: ('undefined' | any())).
@@ -190,7 +194,7 @@
unacked :: non_neg_integer()
})).
-type(seq_id() :: integer()).
--type(seg_dict() :: {dict:dictionary(), [segment()]}).
+-type(seg_dict() :: {dict(), [segment()]}).
-type(qistate() :: #qistate { dir :: file:filename(),
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
@@ -226,6 +230,8 @@
-spec(recover/1 ::
([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}).
+-spec(add_queue_ttl/0 :: () -> 'ok').
+
-endif.
@@ -345,35 +351,36 @@ recover(DurableQueues) ->
DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
- Directories = case file:list_dir(QueuesDir) of
- {ok, Entries} -> [ Entry || Entry <- Entries,
- filelib:is_dir(
- filename:join(
- QueuesDir, Entry)) ];
- {error, enoent} -> []
- end,
+ QueueDirNames = all_queue_directory_names(QueuesDir),
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
{DurableQueueNames, DurableTerms} =
lists:foldl(
- fun (QueueDir, {DurableAcc, TermsAcc}) ->
- case sets:is_element(QueueDir, DurableDirectories) of
+ fun (QueueDirName, {DurableAcc, TermsAcc}) ->
+ QueueDirPath = filename:join(QueuesDir, QueueDirName),
+ case sets:is_element(QueueDirName, DurableDirectories) of
true ->
TermsAcc1 =
- case read_shutdown_terms(
- filename:join(QueuesDir, QueueDir)) of
+ case read_shutdown_terms(QueueDirPath) of
{error, _} -> TermsAcc;
{ok, Terms} -> [Terms | TermsAcc]
end,
- {[dict:fetch(QueueDir, DurableDict) | DurableAcc],
+ {[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
TermsAcc1};
false ->
- Dir = filename:join(queues_dir(), QueueDir),
- ok = rabbit_misc:recursive_delete([Dir]),
+ ok = rabbit_misc:recursive_delete([QueueDirPath]),
{DurableAcc, TermsAcc}
end
- end, {[], []}, Directories),
+ end, {[], []}, QueueDirNames),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+all_queue_directory_names(Dir) ->
+ case file:list_dir(Dir) of
+ {ok, Entries} -> [ Entry || Entry <- Entries,
+ filelib:is_dir(
+ filename:join(Dir, Entry)) ];
+ {error, enoent} -> []
+ end.
+
%%----------------------------------------------------------------------------
%% startup and shutdown
%%----------------------------------------------------------------------------
@@ -972,3 +979,87 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
{{no_pub, no_del, ack}, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
{undefined, -1}.
+
+%%----------------------------------------------------------------------------
+%% upgrade
+%%----------------------------------------------------------------------------
+
+add_queue_ttl() ->
+ foreach_queue_index({fun add_queue_ttl_journal/1,
+ fun add_queue_ttl_segment/1}).
+
+add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Guid:?GUID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid,
+ expiry_to_binary(undefined)], Rest};
+add_queue_ttl_journal(_) ->
+ stop.
+
+add_queue_ttl_segment(<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, Guid:?GUID_BYTES/binary,
+ Rest/binary>>) ->
+ {[<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS>>, Guid, expiry_to_binary(undefined)], Rest};
+add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+add_queue_ttl_segment(_) ->
+ stop.
+
+%%----------------------------------------------------------------------------
+
+foreach_queue_index(Funs) ->
+ QueuesDir = queues_dir(),
+ QueueDirNames = all_queue_directory_names(QueuesDir),
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () ->
+ transform_queue(filename:join(QueuesDir, QueueDirName),
+ Gatherer, Funs)
+ end)
+ end || QueueDirName <- QueueDirNames],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
+ ok = rabbit_misc:unlink_and_capture_exit(Gatherer).
+
+transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
+ ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
+ [ok = transform_file(filename:join(Dir, Seg), SegmentFun)
+ || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ ok = gatherer:finish(Gatherer).
+
+transform_file(Path, Fun) ->
+ PathTmp = Path ++ ".upgrade",
+ Size = filelib:file_size(Path),
+
+ {ok, PathTmpHdl} =
+ file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE],
+ [{write_buffer, infinity}]),
+
+ {ok, PathHdl} =
+ file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []),
+ {ok, Content} = file_handle_cache:read(PathHdl, Size),
+ ok = file_handle_cache:close(PathHdl),
+
+ ok = drive_transform_fun(Fun, PathTmpHdl, Content),
+
+ ok = file_handle_cache:close(PathTmpHdl),
+ ok = file:rename(PathTmp, Path).
+
+drive_transform_fun(Fun, Hdl, Contents) ->
+ case Fun(Contents) of
+ stop ->
+ ok;
+ {Output, Contents1} ->
+ ok = file_handle_cache:append(Hdl, Output),
+ drive_transform_fun(Fun, Hdl, Contents1)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 127467bb1f..71115a73f8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -45,10 +45,6 @@
-export([emit_stats/1]).
--import(gen_tcp).
--import(inet).
--import(prim_inet).
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
@@ -65,7 +61,7 @@
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
--define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port,
+-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
peer_cert_validity,
protocol, user, vhost, timeout, frame_max,
@@ -162,28 +158,25 @@
-ifdef(use_specs).
--type(start_heartbeat_fun() ::
- fun ((rabbit_networking:socket(), non_neg_integer()) ->
- rabbit_heartbeat:heartbeaters())).
-
--spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) ->
+-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
rabbit_types:ok(pid())).
--spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
--spec(info/1 :: (pid()) -> [rabbit_types:info()]).
--spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()).
+-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
+ -> no_return()).
-spec(start_connection/7 ::
- (pid(), pid(), pid(), start_heartbeat_fun(), any(),
- rabbit_networking:socket(),
- fun ((rabbit_networking:socket()) ->
+ (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
+ rabbit_net:socket(),
+ fun ((rabbit_net:socket()) ->
rabbit_types:ok_or_error2(
- rabbit_networking:socket(), any()))) -> no_return()).
+ rabbit_net:socket(), any()))) -> no_return()).
-endif.
@@ -325,13 +318,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
done.
mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
- %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
- {State1, Callback1, Length1} =
- handle_input(State#v1.callback, Data,
- State#v1{recv_ref = none}),
- mainloop(Deb, switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, handle_input(State#v1.callback, Data,
+ State#v1{recv_ref = none}));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
State;
@@ -571,7 +561,6 @@ handle_frame(Type, Channel, Payload,
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
- %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{ch_fr_pid, ChFrPid} ->
ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
@@ -635,18 +624,18 @@ analyze_frame(_Type, _Body, _Protocol) ->
error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1};
+ ensure_stats_timer(
+ switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
+handle_input({frame_payload, Type, Channel, PayloadSize},
+ PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- %%?LOGDEBUG("Frame completed: ~p/~p/~p~n", [Type, Channel, Payload]),
- NewState = handle_frame(Type, Channel, Payload, State),
- {NewState, frame_header, 7};
+ handle_frame(Type, Channel, Payload,
+ switch_callback(State, frame_header, 7));
_ ->
- throw({bad_payload, PayloadAndMarker})
+ throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
end;
%% The two rules pertaining to version negotiation:
@@ -697,11 +686,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
mechanisms = <<"PLAIN AMQPLAIN">>,
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT,
- protocol = Protocol},
- connection_state = starting},
- frame_header, 7}.
+ switch_callback(State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7).
refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
@@ -771,7 +760,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = SHF(Sock, ClientHeartbeat),
+ SendFun =
+ fun() ->
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ catch rabbit_net:send(Sock, Frame)
+ end,
+
+ Parent = self(),
+ ReceiveFun =
+ fun() ->
+ Parent ! timeout
+ end,
+ Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
+ ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
@@ -839,6 +840,8 @@ i(peer_address, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock);
i(peer_port, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
+i(ssl, #v1{sock = Sock}) ->
+ rabbit_net:is_ssl(Sock);
i(peer_cert_issuer, #v1{sock = Sock}) ->
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
i(peer_cert_subject, #v1{sock = Sock}) ->
@@ -889,7 +892,7 @@ cert_info(F, Sock) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
{error, no_peercert} -> '';
- {ok, Cert} -> F(Cert)
+ {ok, Cert} -> list_to_binary(F(Cert))
end.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 4335dd2e40..1d8ce23b81 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -85,8 +85,8 @@ peer_cert_validity(Cert) ->
cert_info(F, Cert) ->
F(case public_key:pkix_decode_cert(Cert, otp) of
- {ok, DecCert} -> DecCert;
- DecCert -> DecCert
+ {ok, DecCert} -> DecCert; %%pre R14B
+ DecCert -> DecCert %%R14B onwards
end).
%%--------------------------------------------------------------------------
@@ -95,14 +95,11 @@ cert_info(F, Cert) ->
%% Format and rdnSequence as a RFC4514 subject string.
format_rdn_sequence({rdnSequence, Seq}) ->
- lists:flatten(
- rabbit_misc:intersperse(
- ",", lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]))).
+ string:join(lists:reverse([format_complex_rdn(RDN) || RDN <- Seq]), ",").
%% Format an RDN set.
format_complex_rdn(RDNs) ->
- lists:flatten(
- rabbit_misc:intersperse("+", [format_rdn(RDN) || RDN <- RDNs])).
+ string:join([format_rdn(RDN) || RDN <- RDNs], "+").
%% Format an RDN. If the type name is unknown, use the dotted decimal
%% representation. See RFC4514, section 2.3.
@@ -129,7 +126,7 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) ->
io_lib:format(Fmt ++ "=~s", [FV]);
none when is_tuple(T) ->
TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)],
- io_lib:format("~s:~s", [rabbit_misc:intersperse(".", TypeL), FV]);
+ io_lib:format("~s:~s", [string:join(TypeL, "."), FV]);
none ->
io_lib:format("~p:~s", [T, FV])
end.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index edc73f7000..b9993823d1 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -35,7 +35,8 @@
-ifdef(use_specs).
--export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0,
+-export_type([txn/0, maybe/1, info/0, infos/0, info_key/0, info_keys/0,
+ message/0, basic_message/0,
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, message_properties/0,
vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
@@ -95,7 +96,10 @@
-type(txn() :: rabbit_guid:guid()).
-type(info_key() :: atom()).
+-type(info_keys() :: [info_key()]).
+
-type(info() :: {info_key(), any()}).
+-type(infos() :: [info()]).
-type(amqp_error() ::
#amqp_error{name :: rabbit_framing:amqp_exception(),
@@ -143,7 +147,7 @@
-type(connection() :: pid()).
--type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1').
+-type(protocol() :: rabbit_framing:protocol()).
-type(user() ::
#user{username :: rabbit_access_control:username(),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
new file mode 100644
index 0000000000..27a94f6fc5
--- /dev/null
+++ b/src/rabbit_upgrade.erl
@@ -0,0 +1,158 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are Rabbit Technologies Ltd.
+%%
+%% Copyright (C) 2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_upgrade).
+
+-export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]).
+
+-include("rabbit.hrl").
+
+-define(VERSION_FILENAME, "schema_version").
+-define(LOCK_FILENAME, "schema_upgrade_lock").
+
+%% -------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(step() :: atom()).
+-type(version() :: [step()]).
+
+-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available').
+-spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())).
+-spec(write_version/0 :: () -> 'ok').
+-spec(desired_version/0 :: () -> version()).
+
+-endif.
+
+%% -------------------------------------------------------------------
+
+%% Try to upgrade the schema. If no information on the existing schema
+%% could be found, do nothing. rabbit_mnesia:check_schema_integrity()
+%% will catch the problem.
+maybe_upgrade() ->
+ case read_version() of
+ {ok, CurrentHeads} ->
+ with_upgrade_graph(
+ fun (G) ->
+ case unknown_heads(CurrentHeads, G) of
+ [] -> case upgrades_to_apply(CurrentHeads, G) of
+ [] -> ok;
+ Upgrades -> apply_upgrades(Upgrades)
+ end;
+ Unknown -> throw({error,
+ {future_upgrades_found, Unknown}})
+ end
+ end);
+ {error, enoent} ->
+ version_not_available
+ end.
+
+read_version() ->
+ case rabbit_misc:read_term_file(schema_filename()) of
+ {ok, [Heads]} -> {ok, Heads};
+ {error, _} = Err -> Err
+ end.
+
+write_version() ->
+ ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]),
+ ok.
+
+desired_version() ->
+ with_upgrade_graph(fun (G) -> heads(G) end).
+
+%% -------------------------------------------------------------------
+
+with_upgrade_graph(Fun) ->
+ case rabbit_misc:build_acyclic_graph(
+ fun vertices/2, fun edges/2,
+ rabbit_misc:all_module_attributes(rabbit_upgrade)) of
+ {ok, G} -> try
+ Fun(G)
+ after
+ true = digraph:delete(G)
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ throw({error, {duplicate_upgrade_step, StepName}});
+ {error, {edge, {bad_vertex, StepName}, _From, _To}} ->
+ throw({error, {dependency_on_unknown_upgrade_step, StepName}});
+ {error, {edge, {bad_edge, StepNames}, _From, _To}} ->
+ throw({error, {cycle_in_upgrade_steps, StepNames}})
+ end.
+
+vertices(Module, Steps) ->
+ [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps].
+
+edges(_Module, Steps) ->
+ [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
+
+
+unknown_heads(Heads, G) ->
+ [H || H <- Heads, digraph:vertex(G, H) =:= false].
+
+upgrades_to_apply(Heads, G) ->
+ %% Take all the vertices which can reach the known heads. That's
+ %% everything we've already applied. Subtract that from all
+ %% vertices: that's what we have to apply.
+ Unsorted = sets:to_list(
+ sets:subtract(
+ sets:from_list(digraph:vertices(G)),
+ sets:from_list(digraph_utils:reaching(Heads, G)))),
+ %% Form a subgraph from that list and find a topological ordering
+ %% so we can invoke them in order.
+ [element(2, digraph:vertex(G, StepName)) ||
+ StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))].
+
+heads(G) ->
+ lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]).
+
+%% -------------------------------------------------------------------
+
+apply_upgrades(Upgrades) ->
+ LockFile = lock_filename(),
+ case file:open(LockFile, [write, exclusive]) of
+ {ok, Lock} ->
+ ok = file:close(Lock),
+ info("Upgrades: ~w to apply~n", [length(Upgrades)]),
+ [apply_upgrade(Upgrade) || Upgrade <- Upgrades],
+ info("Upgrades: All applied~n", []),
+ ok = write_version(),
+ ok = file:delete(LockFile);
+ {error, eexist} ->
+ throw({error, previous_upgrade_failed});
+ {error, _} = Error ->
+ throw(Error)
+ end.
+
+apply_upgrade({M, F}) ->
+ info("Upgrades: Applying ~w:~w~n", [M, F]),
+ ok = apply(M, F, []).
+
+%% -------------------------------------------------------------------
+
+dir() -> rabbit_mnesia:dir().
+
+schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
+
+lock_filename() -> filename:join(dir(), ?LOCK_FILENAME).
+
+%% NB: we cannot use rabbit_log here since it may not have been
+%% started yet
+info(Msg, Args) -> error_logger:info_msg(Msg, Args).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
new file mode 100644
index 0000000000..1c56d51dd8
--- /dev/null
+++ b/src/rabbit_upgrade_functions.erl
@@ -0,0 +1,78 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are Rabbit Technologies Ltd.
+%%
+%% Copyright (C) 2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+-module(rabbit_upgrade_functions).
+
+-include("rabbit.hrl").
+
+-compile([export_all]).
+
+-rabbit_upgrade({remove_user_scope, []}).
+-rabbit_upgrade({hash_passwords, []}).
+-rabbit_upgrade({add_ip_to_listener, []}).
+
+%% -------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(remove_user_scope/0 :: () -> 'ok').
+-spec(hash_passwords/0 :: () -> 'ok').
+-spec(add_ip_to_listener/0 :: () -> 'ok').
+
+-endif.
+
+%%--------------------------------------------------------------------
+
+%% It's a bad idea to use records or record_info here, even for the
+%% destination form. Because in the future, the destination form of
+%% your current transform may not match the record any more, and it
+%% would be messy to have to go back and fix old transforms at that
+%% point.
+
+remove_user_scope() ->
+ mnesia(
+ rabbit_user_permission,
+ fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) ->
+ {user_permission, UV, {permission, Conf, Write, Read}}
+ end,
+ [user_vhost, permission]).
+
+hash_passwords() ->
+ mnesia(
+ rabbit_user,
+ fun ({user, Username, Password, IsAdmin}) ->
+ Hash = rabbit_access_control:hash_password(Password),
+ {user, Username, Hash, IsAdmin}
+ end,
+ [username, password_hash, is_admin]).
+
+add_ip_to_listener() ->
+ mnesia(
+ rabbit_listener,
+ fun ({listener, Node, Protocol, Host, Port}) ->
+ {listener, Node, Protocol, Host, {0,0,0,0}, Port}
+ end,
+ [node, protocol, host, ip_address, port]).
+
+%%--------------------------------------------------------------------
+
+mnesia(TableName, Fun, FieldList) ->
+ {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
+ ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f88e49c2f9..69d62fdeeb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -305,7 +305,7 @@
q3 :: bpqueue:bpqueue(),
q4 :: queue(),
next_seq_id :: seq_id(),
- pending_ack :: dict:dictionary(),
+ pending_ack :: dict(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index aa986e542b..50bca3909d 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -62,9 +62,10 @@
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
-> 'ok').
-spec(send_command_sync/2 ::
- (pid(), rabbit_framing:amqp_method()) -> 'ok').
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command_sync/3 ::
- (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok').
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 93adfcb1b5..46bab31ddc 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -357,8 +357,7 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
when ?is_simple(State) ->
{ok, NState} = do_restart(RestartType, Reason, Child, State),
{noreply, NState};
-handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
- when not (?is_simple(State)) ->
+handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
case get_child(Child#child.name, State) of
{value, Child} ->
{ok, NState} = do_restart(RestartType, Reason, Child, State),