diff options
| -rw-r--r-- | docs/rabbitmq.config.example | 2 | ||||
| -rw-r--r-- | src/mirrored_supervisor.erl | 6 | ||||
| -rw-r--r-- | src/rabbit.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_file.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 63 |
8 files changed, 98 insertions, 65 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index c0d6cc7067..7d6b80a70d 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -341,7 +341,7 @@ %% {reconnect_delay, 2.5} %% ]} %% End of my_first_shovel - ]}, + ]} %% Rather than specifying some values per-shovel, you can specify %% them for all shovels here. %% diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index f44b1f1d7e..ae35526fed 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -245,7 +245,9 @@ count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2). check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity). -cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg). +cast(Sup, Msg) -> with_exit_handler( + fun() -> ok end, + fun() -> ?GEN_SERVER:cast(mirroring(Sup), Msg) end). find_call(Sup, Id, Msg) -> Group = call(Sup, group), @@ -369,7 +371,7 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. handle_info({'DOWN', _Ref, process, Pid, Reason}, - State = #state{overall = Pid, group = Group}) -> + State = #state{delegate = Pid, group = Group}) -> %% Since the delegate is temporary, its death won't cause us to %% die. Since the overall supervisor kills processes in reverse %% order when shutting down "from above" and we started after the diff --git a/src/rabbit.erl b/src/rabbit.erl index 71cfed3226..081e2e229a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -357,7 +357,8 @@ status() -> {running_applications, rabbit_misc:which_applications()}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, - {memory, rabbit_vm:memory()}], + {memory, rabbit_vm:memory()}, + {alarms, alarms()}], S2 = rabbit_misc:filter_exit_map( fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, [{vm_memory_high_watermark, {vm_memory_monitor, @@ -380,6 +381,13 @@ status() -> end}], S1 ++ S2 ++ S3 ++ S4. +alarms() -> + Alarms = rabbit_misc:with_exit_handler(rabbit_misc:const([]), + fun rabbit_alarm:get_alarms/0), + N = node(), + %% [{{resource_limit,memory,rabbit@mercurio},[]}] + [Limit || {{resource_limit, Limit, Node}, _} <- Alarms, Node =:= N]. + is_running() -> is_running(node()). is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c047857922..019cebe661 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -221,36 +221,37 @@ start(Qs) -> find_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( + mnesia:async_dirty( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, pid = Pid} <- mnesia:table(rabbit_durable_queue), - mnesia:read(rabbit_queue, Name, read) =:= [], - node(Pid) == Node])) + node(Pid) == Node, + mnesia:read(rabbit_queue, Name, read) =:= []])) end). recover_durable_queues(QueuesAndRecoveryTerms) -> - Qs = [{start_queue_process(node(), Q), Terms} || - {Q, Terms} <- QueuesAndRecoveryTerms], - [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs, - gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}]. + {Results, Failures} = + gen_server2:mcall([{start_queue_process(node(), Q), + {init, {self(), Terms}}} || + {Q, Terms} <- QueuesAndRecoveryTerms]), + [rabbit_log:error("Queue ~p failed to initialise: ~p~n", + [Pid, Error]) || {Pid, Error} <- Failures], + [Q || {_, {new, Q}} <- Results]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q0 = rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []}), - {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), - Q1 = start_queue_process(Node, Q0), - gen_server2:call(Q1#amqqueue.pid, {init, new}, infinity). + Q = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + gm_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -313,7 +314,7 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), - Q#amqqueue{pid = Pid}. + Pid. add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index bb9c61a8f6..1b4a07e313 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -200,13 +200,15 @@ remove(Binding, InnerFun) -> binding_action( Binding, fun (Src, Dst, B) -> - case mnesia:read(rabbit_route, B, write) =:= [] andalso - mnesia:read(rabbit_durable_route, B, write) =/= [] of - true -> rabbit_misc:const({error, binding_not_found}); - false -> case InnerFun(Src, Dst) of - ok -> remove(Src, Dst, B); - {error, _} = Err -> rabbit_misc:const(Err) - end + case mnesia:read(rabbit_route, B, write) of + [] -> case mnesia:read(rabbit_durable_route, B, write) of + [] -> rabbit_misc:const(ok); + _ -> rabbit_misc:const({error, binding_not_found}) + end; + _ -> case InnerFun(Src, Dst) of + ok -> remove(Src, Dst, B); + {error, _} = Err -> rabbit_misc:const(Err) + end end end, fun absent_errs_only/1). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index c372d5f15a..1a5f400b72 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -31,7 +31,7 @@ -spec(force_event_refresh/1 :: (reference()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). --spec(connect/5 :: (('nouser' | +-spec(connect/5 :: (({'none', 'none'} | {rabbit_types:username(), 'none'} | {rabbit_types:username(), rabbit_types:password()}), rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> @@ -67,13 +67,17 @@ list() -> %%---------------------------------------------------------------------------- +connect({none, _}, VHost, Protocol, Pid, Infos) -> + connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end, + VHost, Protocol, Pid, Infos); + +connect({Username, none}, VHost, Protocol, Pid, Infos) -> + connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end, + VHost, Protocol, Pid, Infos); + connect({Username, Password}, VHost, Protocol, Pid, Infos) -> connect0(fun () -> rabbit_access_control:check_user_pass_login( Username, Password) end, - VHost, Protocol, Pid, Infos); - -connect(nouser, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end, VHost, Protocol, Pid, Infos). connect0(AuthFun, VHost, Protocol, Pid, Infos) -> diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 1a766b05b0..4658ecfd4e 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -94,9 +94,12 @@ ensure_dir_internal(File) -> end. wildcard(Pattern, Dir) -> - {ok, Files} = list_dir(Dir), - {ok, RE} = re:compile(Pattern, [anchored]), - [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])]. + case list_dir(Dir) of + {ok, Files} -> {ok, RE} = re:compile(Pattern, [anchored]), + [File || File <- Files, + match =:= re:run(File, RE, [{capture, none}])]; + {error, _} -> [] + end. list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 919b7376a7..e00508b434 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -424,22 +424,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = recover_journal(State), - {Segments1, Count} = + {Segments1, Count, DirtyCount} = %% Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we %% lose them. Also mark delivered if not clean shutdown. Also - %% find the number of unacked messages. + %% find the number of unacked messages. Also accumulate the + %% dirty count here, so we can call maybe_flush_journal below + %% and avoid unnecessary file system operations. lists:foldl( - fun (Seg, {Segments2, CountAcc}) -> - Segment = #segment { unacked = UnackedCount } = + fun (Seg, {Segments2, CountAcc, DirtyCount}) -> + {Segment = #segment { unacked = UnackedCount }, Dirty} = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), - {segment_store(Segment, Segments2), CountAcc + UnackedCount} - end, {Segments, 0}, all_segment_nums(State1)), - %% Unconditionally flush since the dirty_count doesn't get updated - %% by the above foldl. - State2 = flush_journal(State1 #qistate { segments = Segments1 }), + {segment_store(Segment, Segments2), + CountAcc + UnackedCount, DirtyCount + Dirty} + end, {Segments, 0, 0}, all_segment_nums(State1)), + State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, + dirty_count = DirtyCount }), {Count, State2}. terminate(State = #qistate { journal_handle = JournalHdl, @@ -463,23 +465,25 @@ recover_segment(ContainsCheckFun, CleanShutdown, segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, - Segment1) -> + SegmentAndDirtyCount) -> recover_message(ContainsCheckFun(MsgId), CleanShutdown, - Del, RelSeq, Segment1) + Del, RelSeq, SegmentAndDirtyCount) end, - Segment #segment { unacked = UnackedCount + UnackedCountDelta }, + {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, SegEntries1). -recover_message( true, true, _Del, _RelSeq, Segment) -> - Segment; -recover_message( true, false, del, _RelSeq, Segment) -> - Segment; -recover_message( true, false, no_del, RelSeq, Segment) -> - add_to_journal(RelSeq, del, Segment); -recover_message(false, _, del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, Segment); -recover_message(false, _, no_del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). +recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) -> + SegmentAndDirtyCount; +recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) -> + SegmentAndDirtyCount; +recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, del, Segment), DirtyCount + 1}; +recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1}; +recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, ack, + add_to_journal(RelSeq, del, Segment)), + DirtyCount + 2}. queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), @@ -651,9 +655,18 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. load_journal(State) -> - {JournalHdl, State1} = get_journal_handle(State), - {ok, 0} = file_handle_cache:position(JournalHdl, 0), - load_journal_entries(State1). + case is_journal_present(State) of + true -> {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + load_journal_entries(State1); + false -> State + end. + +is_journal_present(#qistate { journal_handle = undefined, + dir = Dir }) -> + rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)); +is_journal_present(_) -> + true. %% ditto recover_journal(State) -> |
