summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmq.config.example2
-rw-r--r--src/mirrored_supervisor.erl6
-rw-r--r--src/rabbit.erl10
-rw-r--r--src/rabbit_amqqueue.erl43
-rw-r--r--src/rabbit_binding.erl16
-rw-r--r--src/rabbit_direct.erl14
-rw-r--r--src/rabbit_file.erl9
-rw-r--r--src/rabbit_queue_index.erl63
8 files changed, 98 insertions, 65 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index c0d6cc7067..7d6b80a70d 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -341,7 +341,7 @@
%% {reconnect_delay, 2.5}
%% ]} %% End of my_first_shovel
- ]},
+ ]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index f44b1f1d7e..ae35526fed 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -245,7 +245,9 @@ count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2).
check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity).
-cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg).
+cast(Sup, Msg) -> with_exit_handler(
+ fun() -> ok end,
+ fun() -> ?GEN_SERVER:cast(mirroring(Sup), Msg) end).
find_call(Sup, Id, Msg) ->
Group = call(Sup, group),
@@ -369,7 +371,7 @@ handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', _Ref, process, Pid, Reason},
- State = #state{overall = Pid, group = Group}) ->
+ State = #state{delegate = Pid, group = Group}) ->
%% Since the delegate is temporary, its death won't cause us to
%% die. Since the overall supervisor kills processes in reverse
%% order when shutting down "from above" and we started after the
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 71cfed3226..081e2e229a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -357,7 +357,8 @@ status() ->
{running_applications, rabbit_misc:which_applications()},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
- {memory, rabbit_vm:memory()}],
+ {memory, rabbit_vm:memory()},
+ {alarms, alarms()}],
S2 = rabbit_misc:filter_exit_map(
fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
[{vm_memory_high_watermark, {vm_memory_monitor,
@@ -380,6 +381,13 @@ status() ->
end}],
S1 ++ S2 ++ S3 ++ S4.
+alarms() ->
+ Alarms = rabbit_misc:with_exit_handler(rabbit_misc:const([]),
+ fun rabbit_alarm:get_alarms/0),
+ N = node(),
+ %% [{{resource_limit,memory,rabbit@mercurio},[]}]
+ [Limit || {{resource_limit, Limit, Node}, _} <- Alarms, Node =:= N].
+
is_running() -> is_running(node()).
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c047857922..019cebe661 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -221,36 +221,37 @@ start(Qs) ->
find_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
+ mnesia:async_dirty(
fun () ->
qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
pid = Pid}
<- mnesia:table(rabbit_durable_queue),
- mnesia:read(rabbit_queue, Name, read) =:= [],
- node(Pid) == Node]))
+ node(Pid) == Node,
+ mnesia:read(rabbit_queue, Name, read) =:= []]))
end).
recover_durable_queues(QueuesAndRecoveryTerms) ->
- Qs = [{start_queue_process(node(), Q), Terms} ||
- {Q, Terms} <- QueuesAndRecoveryTerms],
- [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs,
- gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}].
+ {Results, Failures} =
+ gen_server2:mcall([{start_queue_process(node(), Q),
+ {init, {self(), Terms}}} ||
+ {Q, Terms} <- QueuesAndRecoveryTerms]),
+ [rabbit_log:error("Queue ~p failed to initialise: ~p~n",
+ [Pid, Error]) || {Pid, Error} <- Failures],
+ [Q || {_, {new, Q}} <- Results].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q0 = rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []}),
- {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
- Q1 = start_queue_process(Node, Q0),
- gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity).
+ Q = rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ gm_pids = []}),
+ {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
internal_declare(Q, true) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -313,7 +314,7 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
- Q#amqqueue{pid = Pid}.
+ Pid.
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb9c61a8f6..1b4a07e313 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -200,13 +200,15 @@ remove(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
- case mnesia:read(rabbit_route, B, write) =:= [] andalso
- mnesia:read(rabbit_durable_route, B, write) =/= [] of
- true -> rabbit_misc:const({error, binding_not_found});
- false -> case InnerFun(Src, Dst) of
- ok -> remove(Src, Dst, B);
- {error, _} = Err -> rabbit_misc:const(Err)
- end
+ case mnesia:read(rabbit_route, B, write) of
+ [] -> case mnesia:read(rabbit_durable_route, B, write) of
+ [] -> rabbit_misc:const(ok);
+ _ -> rabbit_misc:const({error, binding_not_found})
+ end;
+ _ -> case InnerFun(Src, Dst) of
+ ok -> remove(Src, Dst, B);
+ {error, _} = Err -> rabbit_misc:const(Err)
+ end
end
end, fun absent_errs_only/1).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index c372d5f15a..1a5f400b72 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -31,7 +31,7 @@
-spec(force_event_refresh/1 :: (reference()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
--spec(connect/5 :: (('nouser' |
+-spec(connect/5 :: (({'none', 'none'} | {rabbit_types:username(), 'none'} |
{rabbit_types:username(), rabbit_types:password()}),
rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
@@ -67,13 +67,17 @@ list() ->
%%----------------------------------------------------------------------------
+connect({none, _}, VHost, Protocol, Pid, Infos) ->
+ connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end,
+ VHost, Protocol, Pid, Infos);
+
+connect({Username, none}, VHost, Protocol, Pid, Infos) ->
+ connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end,
+ VHost, Protocol, Pid, Infos);
+
connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
connect0(fun () -> rabbit_access_control:check_user_pass_login(
Username, Password) end,
- VHost, Protocol, Pid, Infos);
-
-connect(nouser, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end,
VHost, Protocol, Pid, Infos).
connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 1a766b05b0..4658ecfd4e 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -94,9 +94,12 @@ ensure_dir_internal(File) ->
end.
wildcard(Pattern, Dir) ->
- {ok, Files} = list_dir(Dir),
- {ok, RE} = re:compile(Pattern, [anchored]),
- [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])].
+ case list_dir(Dir) of
+ {ok, Files} -> {ok, RE} = re:compile(Pattern, [anchored]),
+ [File || File <- Files,
+ match =:= re:run(File, RE, [{capture, none}])];
+ {error, _} -> []
+ end.
list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 919b7376a7..e00508b434 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -424,22 +424,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
recover_journal(State),
- {Segments1, Count} =
+ {Segments1, Count, DirtyCount} =
%% Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
%% lose them. Also mark delivered if not clean shutdown. Also
- %% find the number of unacked messages.
+ %% find the number of unacked messages. Also accumulate the
+ %% dirty count here, so we can call maybe_flush_journal below
+ %% and avoid unnecessary file system operations.
lists:foldl(
- fun (Seg, {Segments2, CountAcc}) ->
- Segment = #segment { unacked = UnackedCount } =
+ fun (Seg, {Segments2, CountAcc, DirtyCount}) ->
+ {Segment = #segment { unacked = UnackedCount }, Dirty} =
recover_segment(ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
- {segment_store(Segment, Segments2), CountAcc + UnackedCount}
- end, {Segments, 0}, all_segment_nums(State1)),
- %% Unconditionally flush since the dirty_count doesn't get updated
- %% by the above foldl.
- State2 = flush_journal(State1 #qistate { segments = Segments1 }),
+ {segment_store(Segment, Segments2),
+ CountAcc + UnackedCount, DirtyCount + Dirty}
+ end, {Segments, 0, 0}, all_segment_nums(State1)),
+ State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
+ dirty_count = DirtyCount }),
{Count, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
@@ -463,23 +465,25 @@ recover_segment(ContainsCheckFun, CleanShutdown,
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
- Segment1) ->
+ SegmentAndDirtyCount) ->
recover_message(ContainsCheckFun(MsgId), CleanShutdown,
- Del, RelSeq, Segment1)
+ Del, RelSeq, SegmentAndDirtyCount)
end,
- Segment #segment { unacked = UnackedCount + UnackedCountDelta },
+ {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0},
SegEntries1).
-recover_message( true, true, _Del, _RelSeq, Segment) ->
- Segment;
-recover_message( true, false, del, _RelSeq, Segment) ->
- Segment;
-recover_message( true, false, no_del, RelSeq, Segment) ->
- add_to_journal(RelSeq, del, Segment);
-recover_message(false, _, del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, Segment);
-recover_message(false, _, no_del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
+recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
+ SegmentAndDirtyCount;
+recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) ->
+ SegmentAndDirtyCount;
+recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, del, Segment), DirtyCount + 1};
+recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
+recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, ack,
+ add_to_journal(RelSeq, del, Segment)),
+ DirtyCount + 2}.
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
@@ -651,9 +655,18 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
%% if you call it more than once on the same state. Assumes the counts
%% are 0 to start with.
load_journal(State) ->
- {JournalHdl, State1} = get_journal_handle(State),
- {ok, 0} = file_handle_cache:position(JournalHdl, 0),
- load_journal_entries(State1).
+ case is_journal_present(State) of
+ true -> {JournalHdl, State1} = get_journal_handle(State),
+ {ok, 0} = file_handle_cache:position(JournalHdl, 0),
+ load_journal_entries(State1);
+ false -> State
+ end.
+
+is_journal_present(#qistate { journal_handle = undefined,
+ dir = Dir }) ->
+ rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME));
+is_journal_present(_) ->
+ true.
%% ditto
recover_journal(State) ->