diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-01-24 18:27:38 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-01-24 18:27:38 +0000 |
| commit | 60a09e95e6a3bb0f3383097e108a10d5269f0e2d (patch) | |
| tree | cff4001e2d7dcded69dd810989a035c579ff2b62 | |
| parent | 2142d1b62ef070cb413222920dac858ad53c33df (diff) | |
| parent | c177658c5547d4648f47b6dbb92e5e86dd84cb2b (diff) | |
| download | rabbitmq-server-git-60a09e95e6a3bb0f3383097e108a10d5269f0e2d.tar.gz | |
merge default into bug20337
47 files changed, 596 insertions, 447 deletions
@@ -56,7 +56,7 @@ endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) -VERSION=0.0.0 +VERSION?=0.0.0 PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo ) PLUGINS_DIR=plugins TARBALL_NAME=rabbitmq-server-$(VERSION) @@ -246,7 +246,8 @@ stop-cover: all srcdist: distclean mkdir -p $(TARGET_SRC_DIR)/codegen cp -r ebin src include LICENSE LICENSE-MPL-RabbitMQ INSTALL README $(TARGET_SRC_DIR) - sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save + sed 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in > $(TARGET_SRC_DIR)/ebin/rabbit_app.in.tmp && \ + mv $(TARGET_SRC_DIR)/ebin/rabbit_app.in.tmp $(TARGET_SRC_DIR)/ebin/rabbit_app.in cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml index 3368960b80..a4bd158087 100644 --- a/docs/rabbitmq-service.xml +++ b/docs/rabbitmq-service.xml @@ -66,7 +66,8 @@ Display usage information. <para> Install the service. The service will not be started. Subsequent invocations will update the service parameters if -relevant environment variables were modified. +relevant environment variables were modified or if the active +plugins were changed. </para> </listitem> </varlistentry> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 157550387d..7268f09037 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1273,9 +1273,10 @@ <para> Displays broker status information such as the running applications on the current Erlang node, RabbitMQ and - Erlang versions, OS name and memory statistics. (See - the <command>cluster_status</command> command to find - out which nodes are clustered and running.) + Erlang versions, OS name, memory and file descriptor + statistics. (See the <command>cluster_status</command> + command to find out which nodes are clustered and + running.) </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl status</screen> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 5ead105145..9301af6bdc 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -42,5 +42,6 @@ {reuseaddr, true}, {backlog, 128}, {nodelay, true}, + {linger, {true, 0}}, {exit_on_close, false}]} ]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index a603886c4b..d81b82dbbc 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -56,9 +56,11 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). +-record(topic_trie_node, {trie_node, edge_count, binding_count}). -record(topic_trie_edge, {trie_edge, node_id}). -record(topic_trie_binding, {trie_binding, value = const}). +-record(trie_node, {exchange_name, node_id}). -record(trie_edge, {exchange_name, node_id, word}). -record(trie_binding, {exchange_name, node_id, destination}). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 96d3974fb3..a6899005b3 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -121,6 +121,9 @@ done rm -rf %{buildroot} %changelog +* Fri Dec 16 2011 steve@rabbitmq.com 2.7.1-1 +- New Upstream Release + * Tue Nov 8 2011 steve@rabbitmq.com 2.7.0-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index ceb08ed092..b3743c39e5 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.7.1-1) natty; urgency=low + + * New Upstream Release + + -- Steve Powell <steve@rabbitmq.com> Fri, 16 Dec 2011 12:12:36 +0000 + rabbitmq-server (2.7.0-1) natty; urgency=low * New Upstream Release diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index b6dad35792..360fb394eb 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -60,6 +60,12 @@ use_configure no use_parallel_build yes +build.env-append HOME=${workpath} + +build.env-append VERSION=${version} + +destroot.env-append VERSION=${version} + destroot.target install_bin destroot.destdir \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 1831f87699..39a68c8e0f 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -117,7 +117,7 @@ exec erl \ -sasl sasl_error_logger false \ -rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \ -rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \ - -os_mon start_cpu_sup true \ + -os_mon start_cpu_sup false \ -os_mon start_disksup false \ -os_mon start_memsup false \ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index c27b418adb..44ce1ce148 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -139,7 +139,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
--os_mon start_cpu_sup true ^
+-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 4d3fce49b0..1582bfb142 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -145,7 +145,7 @@ if not exist "!RABBITMQ_BASE!" ( "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" list !RABBITMQ_SERVICENAME! 2>NUL 1>NUL
if errorlevel 1 (
- "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" add !RABBITMQ_SERVICENAME!
+ "!ERLANG_SERVICE_MANAGER_PATH!\erlsrv" add !RABBITMQ_SERVICENAME! -internalservicename !RABBITMQ_SERVICENAME!
) else (
echo !RABBITMQ_SERVICENAME! service is already present - only updating service parameters
)
@@ -204,7 +204,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
--os_mon start_cpu_sup true ^
+-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6c3f1b5f50..c11fb54bcb 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -125,8 +125,7 @@ %% requesting process is considered to 'own' one more %% descriptor. release/0 is the inverse operation and releases a %% previously obtained descriptor. transfer/1 transfers ownership of a -%% file descriptor between processes. It is non-blocking. Obtain is -%% used to obtain permission to accept file descriptors. Obtain has a +%% file descriptor between processes. It is non-blocking. Obtain has a %% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use %% the entire limit, but will be evicted by obtain calls up to the %% point at which no more obtain calls can be satisfied by the obtains @@ -262,7 +261,7 @@ -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [obtain_count, obtain_limit]). +-define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]). %%---------------------------------------------------------------------------- %% Public API @@ -790,8 +789,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(obtain_count, #fhc_state{obtain_count = Count}) -> Count; -i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(total_limit, #fhc_state{limit = Limit}) -> Limit; +i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2; +i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(sockets_used, #fhc_state{obtain_count = Count}) -> Count; i(Item, _) -> throw({bad_argument, Item}). %%---------------------------------------------------------------------------- diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ab6c4e64f3..49913d2694 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1079,7 +1079,7 @@ get_proc_name({local, Name}) -> exit(process_not_registered) end; get_proc_name({global, Name}) -> - case global:safe_whereis_name(Name) of + case whereis_name(Name) of undefined -> exit(process_not_registered_globally); Pid when Pid =:= self() -> @@ -1101,7 +1101,7 @@ get_parent() -> name_to_pid(Name) -> case whereis(Name) of undefined -> - case global:safe_whereis_name(Name) of + case whereis_name(Name) of undefined -> exit(could_not_find_registerd_name); Pid -> @@ -1111,6 +1111,20 @@ name_to_pid(Name) -> Pid end. +whereis_name(Name) -> + case ets:lookup(global_names, Name) of + [{_Name, Pid, _Method, _RPid, _Ref}] -> + if node(Pid) == node() -> + case is_process_alive(Pid) of + true -> Pid; + false -> undefined + end; + true -> + Pid + end; + [] -> undefined + end. + find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> PrioriCall = function_exported_or_default( Mod, 'prioritise_call', 3, diff --git a/src/gm.erl b/src/gm.erl index 8c838a7056..6c89912213 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -386,6 +386,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -515,8 +516,8 @@ group_members(Server) -> init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), + Self = make_member(GroupName), gen_server2:cast(self(), join), - Self = self(), {ok, #state { self = Self, left = {Self, undefined}, right = {Self, undefined}, @@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> internal_broadcast(Msg, From, State); @@ -604,7 +606,8 @@ handle_cast({broadcast, Msg}, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + State}); handle_cast({broadcast, Msg}, State) -> internal_broadcast(Msg, none, State); @@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self, State1 = check_neighbours(State #state { view = View, members_state = MembersState }), handle_callback_result( - {Module:joined(Args, all_known_members(View)), State1}); + {Module:joined(Args, get_pids(all_known_members(View))), State1}); handle_cast(leave, State) -> {stop, normal, State}. @@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self, confirms = Confirms, callback_args = Args, broadcast_buffer = Buffer }) -> - Result = Module:handle_msg(Args, Self, Msg), + Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount, Msg} | Buffer], Confirms1 = case From of none -> Confirms; @@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> end, try case gen_server2:call( - Left, {add_on_right, Self}, infinity) of + get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); not_ready -> join_group(Self, GroupName) end @@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) -> mnesia:sync_transaction( fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], - version = 0 }, + version = ?VERSION_START }, case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> mnesia:write(GroupNew), @@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false. ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> {Self, undefined}; ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> - ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}), + ok = gen_server2:cast(get_pid(RealNeighbour), + {?TAG, Ver, check_neighbours}), {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> true = erlang:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, - ok = gen_server2:cast(RealNeighbour, Msg), + ok = gen_server2:cast(get_pid(RealNeighbour), Msg), ok = case Neighbour of Self -> ok; - _ -> gen_server2:cast(Neighbour, Msg) + _ -> gen_server2:cast(get_pid(Neighbour), Msg) end, {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor(Self, Self) -> undefined; maybe_monitor(Other, _Self) -> - erlang:monitor(process, Other). + erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) -> build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). +make_member(GroupName) -> + {case read_group(GroupName) of + #gm_group { version = Version } -> Version; + {error, not_found} -> ?VERSION_START + end, self()}. + +get_pid({_Version, Pid}) -> Pid. + +get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% --------------------------------------------------------------------------- %% Activity assembly @@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self, send_right(Right, View, {activity, Self, Activity}). send_right(Right, View, Msg) -> - ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}). + ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> lists:foldl( fun ({Id, Pubs, _Acks}, ok) -> lists:foldl(fun ({_PubNum, Pub}, ok) -> - Module:handle_msg(Args, Id, Pub); + Module:handle_msg(Args, get_pid(Id), Pub); (_, Error) -> Error end, ok, Pubs); @@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) -> Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> ok; - _ -> Module:members_changed(Args, Births, Deaths) + _ -> Module:members_changed(Args, get_pids(Births), + get_pids(Deaths)) end. handle_callback_result({Result, State}) -> diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 8dfe39f890..3ba8f50d2e 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -144,32 +144,17 @@ -type child() :: pid() | 'undefined'. -type child_id() :: term(). --type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}. -type modules() :: [module()] | 'dynamic'. --type restart() :: 'permanent' | 'transient' | 'temporary'. --type shutdown() :: 'brutal_kill' | timeout(). -type worker() :: 'worker' | 'supervisor'. -type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. -type sup_ref() :: (Name :: atom()) | {Name :: atom(), Node :: node()} | {'global', Name :: atom()} | pid(). --type child_spec() :: {Id :: child_id(), - StartFunc :: mfargs(), - Restart :: restart(), - Shutdown :: shutdown(), - Type :: worker(), - Modules :: modules()}. -type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). -type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. --type startchild_err() :: 'already_present' - | {'already_started', Child :: child()} | term(). --type startchild_ret() :: {'ok', Child :: child()} - | {'ok', Child :: child(), Info :: term()} - | {'error', startchild_err()}. - -type group_name() :: any(). -spec start_link(GroupName, Module, Args) -> startlink_ret() when @@ -183,9 +168,9 @@ Module :: module(), Args :: term(). --spec start_child(SupRef, ChildSpec) -> startchild_ret() when +-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when SupRef :: sup_ref(), - ChildSpec :: child_spec() | (List :: [term()]). + ChildSpec :: supervisor:child_spec() | (List :: [term()]). -spec restart_child(SupRef, Id) -> Result when SupRef :: sup_ref(), @@ -215,12 +200,12 @@ Modules :: modules(). -spec check_childspecs(ChildSpecs) -> Result when - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: 'ok' | {'error', Error :: term()}. -spec start_internal(Group, ChildSpecs) -> Result when Group :: group_name(), - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: startlink_ret(). -spec create_tables() -> Result when @@ -242,8 +227,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) -> start_link0(Prefix, Group, Init) -> case apply(?SUPERVISOR, start_link, Prefix ++ [?MODULE, {overall, Group, Init}]) of - {ok, Pid} -> call(Pid, {init, Pid}), - {ok, Pid}; + {ok, Pid} -> case catch call(Pid, {init, Pid}) of + ok -> {ok, Pid}; + E -> E + end; Other -> Other end. @@ -346,13 +333,20 @@ handle_call({init, Overall}, _From, end || Pid <- Rest], Delegate = child(Overall, delegate), erlang:monitor(process, Delegate), - [maybe_start(Group, Delegate, S) || S <- ChildSpecs], - {reply, ok, State#state{overall = Overall, delegate = Delegate}}; + State1 = State#state{overall = Overall, delegate = Delegate}, + case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of + true -> {reply, ok, State1}; + false -> {stop, shutdown, State1} + end; handle_call({start_child, ChildSpec}, _From, State = #state{delegate = Delegate, group = Group}) -> - {reply, maybe_start(Group, Delegate, ChildSpec), State}; + {reply, case maybe_start(Group, Delegate, ChildSpec) of + already_in_mnesia -> {error, already_present}; + {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; + Else -> Else + end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, group = Group}) -> @@ -400,13 +394,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. Self = self(), - case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [Self | _] -> {atomic, ChildSpecs} = - mnesia:transaction(fun() -> update_all(Pid) end), - [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; - _ -> ok - end, - {noreply, State}; + R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of + [Self | _] -> {atomic, ChildSpecs} = + mnesia:transaction(fun() -> update_all(Pid) end), + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> [] + end, + case all_started(R) of + true -> {noreply, State}; + false -> {stop, shutdown, State} + end; handle_info(Info, State) -> {stop, {unexpected_info, Info}, State}. @@ -428,8 +425,8 @@ maybe_start(Group, Delegate, ChildSpec) -> check_start(Group, Delegate, ChildSpec) end) of {atomic, start} -> start(Delegate, ChildSpec); - {atomic, undefined} -> {error, already_present}; - {atomic, Pid} -> {error, {already_started, Pid}}; + {atomic, undefined} -> already_in_mnesia; + {atomic, Pid} -> {already_in_mnesia, Pid}; %% If we are torn down while in the transaction... {aborted, E} -> {error, E} end. @@ -499,6 +496,8 @@ delete_all(Group) -> [delete(Group, id(C)) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. +all_started(Results) -> [] =:= [R || R = {error, _} <- Results]. + %%---------------------------------------------------------------------------- create_tables() -> diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 6c91bc4fd5..d48a9ca51a 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -43,6 +43,7 @@ all_tests() -> passed = test_start_idempotence(), passed = test_unsupported(), passed = test_ignore(), + passed = test_startup_failure(), passed. %% Simplest test @@ -195,6 +196,22 @@ test_ignore() -> {sup, fake_strategy_for_ignore, []}), passed. +test_startup_failure() -> + [test_startup_failure(F) || F <- [want_error, want_exit]], + passed. + +test_startup_failure(Fail) -> + process_flag(trap_exit, true), + ?MS:start_link(get_group(group), ?MODULE, + {sup, one_for_one, [childspec(Fail)]}), + receive + {'EXIT', _, shutdown} -> + ok + after 1000 -> + exit({did_not_exit, Fail}) + end, + process_flag(trap_exit, false). + %% --------------------------------------------------------------------------- with_sups(Fun, Sups) -> @@ -228,6 +245,12 @@ start_sup0(Name, Group, ChildSpecs) -> childspec(Id) -> {Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. +start_gs(want_error) -> + {error, foo}; + +start_gs(want_exit) -> + exit(foo); + start_gs(Id) -> gen_server:start_link({local, Id}, ?MODULE, server, []). diff --git a/src/rabbit.erl b/src/rabbit.erl index 0a2681a219..9609eb0400 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -132,7 +132,7 @@ -rabbit_boot_step({recovery, [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, - {requires, empty_db_check}, + {requires, core_initialized}, {enables, routing_ready}]}). -rabbit_boot_step({mirror_queue_slave_sup, @@ -158,8 +158,9 @@ {enables, networking}]}). -rabbit_boot_step({direct_client, - [{mfa, {rabbit_direct, boot, []}}, - {requires, log_relay}]}). + [{description, "direct client"}, + {mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, @@ -308,17 +309,28 @@ stop_and_halt() -> ok. status() -> - [{pid, list_to_integer(os:getpid())}, - {running_applications, application:which_applications(infinity)}, - {os, os:type()}, - {erlang_version, erlang:system_info(system_version)}, - {memory, erlang:memory()}] ++ - rabbit_misc:filter_exit_map( - fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, - [{vm_memory_high_watermark, {vm_memory_monitor, - get_vm_memory_high_watermark, []}}, - {vm_memory_limit, {vm_memory_monitor, - get_memory_limit, []}}]). + S1 = [{pid, list_to_integer(os:getpid())}, + {running_applications, application:which_applications(infinity)}, + {os, os:type()}, + {erlang_version, erlang:system_info(system_version)}, + {memory, erlang:memory()}], + S2 = rabbit_misc:filter_exit_map( + fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, + [{vm_memory_high_watermark, {vm_memory_monitor, + get_vm_memory_high_watermark, []}}, + {vm_memory_limit, {vm_memory_monitor, + get_memory_limit, []}}]), + S3 = rabbit_misc:with_exit_handler( + fun () -> [] end, + fun () -> [{file_descriptors, file_handle_cache:info()}] end), + S4 = [{processes, [{limit, erlang:system_info(process_limit)}, + {used, erlang:system_info(process_count)}]}, + {run_queue, erlang:statistics(run_queue)}, + {uptime, begin + {T,_} = erlang:statistics(wall_clock), + T div 1000 + end}], + S1 ++ S2 ++ S3 ++ S4. is_running() -> is_running(node()). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index fd03ca85b3..517dd4ec60 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -31,10 +31,9 @@ -ifdef(use_specs). --type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 041f0b4a23..523a7bba2e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -44,17 +44,17 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0]). +-export_type([name/0, qmsg/0, routing_result/0]). -type(name() :: rabbit_types:r('queue')). - +-type(qpids() :: [pid()]). -type(qlen() :: rabbit_types:ok(non_neg_integer())). -type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())). -type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}). -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). - +-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). -spec(start/0 :: () -> [name()]). @@ -69,7 +69,8 @@ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | - rabbit_types:error('not_found')). + rabbit_types:error('not_found'); + ([name()]) -> [rabbit_types:amqqueue()]). -spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). @@ -117,12 +118,13 @@ rabbit_types:error('in_use') | rabbit_types:error('not_empty')). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()). +-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). --spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) -> +-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). +-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). @@ -134,7 +136,7 @@ (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). +-spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | @@ -244,7 +246,7 @@ determine_queue_nodes(Args) -> case [list_to_atom(binary_to_list(Node)) || {longstr, Node} <- Nodes] of [Node] -> {Node, undefined}; - [First | Rest] -> {First, Rest} + [First | Rest] -> {First, [First | Rest]} end; {{_Type, <<"all">>}, _} -> {node(), all}; @@ -264,6 +266,10 @@ add_default_binding(#amqqueue{name = QueueName}) -> key = RoutingKey, args = []}). +lookup(Names) when is_list(Names) -> + %% Normally we'd call mnesia:dirty_read/1 here, but that is quite + %% expensive for reasons explained in rabbit_misc:dirty_read/1. + lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]); lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -432,14 +438,39 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). -deliver(QPid, Delivery = #delivery{immediate = true}) -> - gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); -deliver(QPid, Delivery = #delivery{mandatory = true}) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity), - true; -deliver(QPid, Delivery) -> - gen_server2:cast(QPid, {deliver, Delivery}), - true. +deliver([], #delivery{mandatory = false, immediate = false}) -> + %% /dev/null optimisation + {routed, []}; + +deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + QPids = qpids(Qs), + delegate:invoke_no_result( + QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end), + {routed, QPids}; + +deliver(Qs, Delivery = #delivery{mandatory = Mandatory, + immediate = Immediate}) -> + QPids = qpids(Qs), + {Success, _} = + delegate:invoke( + QPids, fun (QPid) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity) + end), + case {Mandatory, Immediate, + lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; + ({_, false}, {_, H}) -> {true, H} + end, {false, []}, Success)} of + {true, _ , {false, []}} -> {unroutable, []}; + {_ , true, {_ , []}} -> {not_delivered, []}; + {_ , _ , {_ , R}} -> {routed, R} + end. requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). @@ -531,6 +562,9 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. +qpids(Qs) -> lists:append([[QPid | SPids] || + #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). + safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 019280f17a..b539ff628b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1061,9 +1061,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery}, _From, State) -> - %% Synchronous, "immediate" delivery mode - %% +handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> %% FIXME: Is this correct semantics? %% %% I'm worried in particular about the case where an exchange has @@ -1081,8 +1079,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> false -> discard_delivery(Delivery, State1) end); -handle_call({deliver, Delivery}, From, State) -> - %% Synchronous, "mandatory" delivery mode. Reply asap. +handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index bc2f6ea994..4196db7dd4 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -30,7 +30,7 @@ -type(properties_input() :: (rabbit_framing:amqp_property_record() | [{atom(), any()}])). -type(publish_result() :: - ({ok, rabbit_router:routing_result(), [pid()]} + ({ok, rabbit_amqqueue:routing_result(), [pid()]} | rabbit_types:error('not_found'))). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). @@ -89,8 +89,8 @@ publish(Delivery = #delivery{ end. publish(X, Delivery) -> - {RoutingRes, DeliveredQPids} = - rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), + Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), {ok, RoutingRes, DeliveredQPids}. delivery(Mandatory, Immediate, Message, MsgSeqNo) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e625a427e4..655bbb7369 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -277,6 +277,7 @@ has_for_source(SrcName) -> contains(rabbit_semi_durable_route, Match). remove_for_source(SrcName) -> + lock_route_tables(), Match = #route{binding = #binding{source = SrcName, _ = '_'}}, Routes = lists:usort( mnesia:match_object(rabbit_route, Match, write) ++ @@ -351,7 +352,28 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). +%% For bulk operations we lock the tables we are operating on in order +%% to reduce the time complexity. Without the table locks we end up +%% with num_tables*num_bulk_bindings row-level locks. Taking each lock +%% takes time proportional to the number of existing locks, thus +%% resulting in O(num_bulk_bindings^2) complexity. +%% +%% The locks need to be write locks since ultimately we end up +%% removing all these rows. +%% +%% The downside of all this is that no other binding operations except +%% lookup/routing (which uses dirty ops) can take place +%% concurrently. However, that is the case already since the bulk +%% operations involve mnesia:match_object calls with a partial key, +%% which entails taking a table lock. +lock_route_tables() -> + [mnesia:lock({table, T}, write) || T <- [rabbit_route, + rabbit_reverse_route, + rabbit_semi_durable_route, + rabbit_durable_route]]. + remove_for_destination(DstName, DeleteFun) -> + lock_route_tables(), Match = reverse_route( #route{binding = #binding{destination = DstName, _ = '_'}}), ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9df862c635..4c4ba230bc 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1358,7 +1358,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, QNames}, State) -> - {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery), + {RoutingRes, DeliveredQPids} = + rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery), State1 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State), maybe_incr_stats([{XName, 1} | diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index dfb400e37f..4ba01b4f4d 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -28,8 +28,9 @@ -ifdef(use_specs). --spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()). --spec(start_link/2 :: ({'local', atom()}, mfa()) -> +-spec(start_link/1 :: (rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) -> rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 20486af52b..22b57b1ae9 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -79,6 +79,12 @@ start() -> io:format(Format ++ " ...~n", Args1) end end, + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, + %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values case catch action(Command, Node, Args, Opts, Inform) of @@ -88,9 +94,11 @@ start() -> false -> io:format("...done.~n") end, rabbit_misc:quit(0); - {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join([atom_to_list(Command) | Args], " ")]), + {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15 + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15 + PrintInvalidCommandError(), usage(); {'EXIT', {badarg, _}} -> print_error("invalid parameter: ~p", [Args]), diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 348655b101..91c7b5d3ef 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -52,6 +52,7 @@ validate(_X) -> ok. create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> + trie_remove_all_nodes(X), trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; @@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) -> add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(transaction, #exchange{name = X}, Bs) -> - %% The remove process is split into two distinct phases. In the - %% first phase we gather the lists of bindings and edges to - %% delete, then in the second phase we process all the - %% deletions. This is to prevent interleaving of read/write - %% operations in mnesia that can adversely affect performance. - {ToDelete, Paths} = - lists:foldl( - fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) -> - Path = [{FinalNode, _} | _] = - follow_down_get_path(S, split_topic_key(K)), - {[{FinalNode, D} | Acc], - decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} - end, {[], gb_trees:empty()}, Bs), - - [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], - [trie_remove_edge(X, Parent, Node, W) || - {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], +remove_bindings(transaction, _X, Bs) -> + %% See rabbit_binding:lock_route_tables for the rationale for + %% taking table locks. + case Bs of + [_] -> ok; + _ -> [mnesia:lock({table, T}, write) || + T <- [rabbit_topic_trie_node, + rabbit_topic_trie_edge, + rabbit_topic_trie_binding]] + end, + [begin + Path = [{FinalNode, _} | _] = + follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path) + end || #binding{source = X, key = K, destination = D} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. -maybe_add_path(_X, [{root, none}], PathAcc) -> - PathAcc; -maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> - case gb_trees:is_defined(Node, PathAcc) of - true -> PathAcc; - false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node), - trie_child_count(X, Node)}}, - PathAcc) - end. - -decrement_bindings(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end, - Path, PathAcc). - -decrement_edges(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end, - Path, PathAcc). - -with_path_acc(_X, _Fun, [{root, none}], PathAcc) -> - PathAcc; -with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) -> - {Parent, W, Counts} = gb_trees:get(Node, PathAcc), - NewCounts = Fun(Counts), - NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc), - case NewCounts of - {0, 0} -> decrement_edges(X, ParentPath, - maybe_add_path(X, ParentPath, NewPathAcc)); - _ -> NewPathAcc - end. - - assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). @@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> error -> {error, Acc, Words} end. +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, node_id = Node}, write) of + [] -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath); + _ -> ok + end. + trie_child(X, Node, Word) -> case mnesia:read({rabbit_topic_trie_edge, #trie_edge{exchange_name = X, @@ -199,10 +177,30 @@ trie_bindings(X, Node) -> destination = '$1'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). +trie_update_node_counts(X, Node, Field, Delta) -> + E = case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, + node_id = Node}, write) of + [] -> #topic_trie_node{trie_node = #trie_node{ + exchange_name = X, + node_id = Node}, + edge_count = 0, + binding_count = 0}; + [E0] -> E0 + end, + case setelement(Field, E, element(Field, E) + Delta) of + #topic_trie_node{edge_count = 0, binding_count = 0} -> + ok = mnesia:delete_object(rabbit_topic_trie_node, E, write); + EN -> + ok = mnesia:write(rabbit_topic_trie_node, EN, write) + end. + trie_add_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). trie_remove_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). trie_edge_op(X, FromNode, ToNode, W, Op) -> @@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> write). trie_add_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), trie_binding_op(X, Node, D, fun mnesia:write/3). trie_remove_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), trie_binding_op(X, Node, D, fun mnesia:delete_object/3). trie_binding_op(X, Node, D, Op) -> @@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) -> destination = D}}, write). -trie_child_count(X, Node) -> - count(rabbit_topic_trie_edge, - #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -trie_binding_count(X, Node) -> - count(rabbit_topic_trie_binding, - #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -count(Table, Match) -> - length(mnesia:match_object(Table, Match, read)). +trie_remove_all_nodes(X) -> + remove_all(rabbit_topic_trie_node, + #topic_trie_node{trie_node = #trie_node{exchange_name = X, + _ = '_'}, + _ = '_'}). trie_remove_all_edges(X) -> remove_all(rabbit_topic_trie_edge, diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 02f3158f3b..c25a177b06 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -178,11 +178,8 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%---------------------------------------------------------------------------- -zero_clamp(Sum) -> - case Sum < ?EPSILON of - true -> 0.0; - false -> Sum - end. +zero_clamp(Sum) when Sum < ?EPSILON -> 0.0; +zero_clamp(Sum) -> Sum. internal_deregister(Pid, Demonitor, State = #state { queue_duration_sum = Sum, @@ -240,26 +237,21 @@ internal_update(State = #state { queue_durations = Durations, fun (Proc = #process { reported = QueueDuration, sent = PrevSendDuration, callback = {M, F, A} }, true) -> - case (case {QueueDuration, PrevSendDuration} of - {infinity, infinity} -> - true; - {infinity, D} -> - DesiredDurationAvg1 < D; - {D, infinity} -> - DesiredDurationAvg1 < D; - {D1, D2} -> - DesiredDurationAvg1 < - lists:min([D1,D2]) - end) of - true -> - ok = erlang:apply( - M, F, A ++ [DesiredDurationAvg1]), - ets:insert( - Durations, - Proc #process {sent = DesiredDurationAvg1}); - false -> - true + case should_send(QueueDuration, PrevSendDuration, + DesiredDurationAvg1) of + true -> ok = erlang:apply( + M, F, A ++ [DesiredDurationAvg1]), + ets:insert( + Durations, + Proc #process { + sent = DesiredDurationAvg1}); + false -> true end end, true, Durations) end, State1. + +should_send(infinity, infinity, _) -> true; +should_send(infinity, D, DD) -> DD < D; +should_send(D, infinity, DD) -> DD < D; +should_send(D1, D2, DD) -> DD < lists:min([D1, D2]). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a8c2006d14..db1f056f4c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -148,9 +148,8 @@ init([#amqqueue { name = QueueName } = Q]) -> {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> - %% Synchronous, "immediate" delivery mode - +handle_call({deliver, Delivery = #delivery { immediate = true }}, + From, State) -> %% It is safe to reply 'false' here even if a) we've not seen the %% msg via gm, or b) the master dies before we receive the msg via %% gm. In the case of (a), we will eventually receive the msg via @@ -166,8 +165,8 @@ handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> gen_server2:reply(From, false), %% master may deliver it, not us noreply(maybe_enqueue_message(Delivery, false, State)); -handle_call({deliver, Delivery = #delivery {}}, From, State) -> - %% Synchronous, "mandatory" delivery mode +handle_call({deliver, Delivery = #delivery { mandatory = true }}, + From, State) -> gen_server2:reply(From, true), %% amqqueue throws away the result anyway noreply(maybe_enqueue_message(Delivery, true, State)); @@ -526,9 +525,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, SS, MonitoringPids), - MTC = dict:from_list( - [{MsgId, {ChPid, MsgSeqNo}} || - {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> + gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + (_, MTC0) -> + MTC0 + end, gb_trees:empty(), MSList), NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), @@ -725,7 +726,7 @@ process_instruction( never -> {MQ2, PendingCh, MS}; eventually -> - {MQ2, sets:add_element(MsgId, PendingCh), + {MQ2, PendingCh, dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; immediately -> ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index a9a9a4c418..2676ddc63b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -253,18 +253,23 @@ assert_args_equivalence(Orig, New, Name, Keys) -> ok. assert_args_equivalence1(Orig, New, Name, Key) -> - case {table_lookup(Orig, Key), table_lookup(New, Key)} of + {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)}, + FailureFun = fun () -> + protocol_error(precondition_failed, "inequivalent arg '~s'" + "for ~s: received ~s but current is ~s", + [Key, rs(Name), val(New1), val(Orig1)]) + end, + case {Orig1, New1} of {Same, Same} -> ok; - {{OrigType, OrigVal} = Orig1, {NewType, NewVal} = New1} -> + {{OrigType, OrigVal}, {NewType, NewVal}} -> case type_class(OrigType) == type_class(NewType) andalso OrigVal == NewVal of true -> ok; - false -> protocol_error(precondition_failed, "inequivalent arg" - " '~s' for ~s: received ~s but current" - " is ~s", - [Key, rs(Name), val(New1), val(Orig1)]) - end + false -> FailureFun() + end; + {_, _} -> + FailureFun() end. val(undefined) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c8c18843a8..bf997a6f8b 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -268,6 +268,11 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}, + {type, ordered_set}, + {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, {rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, {attributes, record_info(fields, topic_trie_edge)}, @@ -314,12 +319,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_node_match() -> + #trie_node{ exchange_name = exchange_name_match(), _='_'}. trie_edge_match() -> - #trie_edge{exchange_name = exchange_name_match(), - _='_'}. + #trie_edge{ exchange_name = exchange_name_match(), _='_'}. trie_binding_match() -> - #trie_binding{exchange_name = exchange_name_match(), - _='_'}. + #trie_binding{exchange_name = exchange_name_match(), _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2c0912dfd6..e81f8134f8 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -24,7 +24,7 @@ close_connection/2, force_connection_event_refresh/0]). %%used by TCP-based transports, e.g. STOMP adapter --export([check_tcp_listener_address/2, +-export([tcp_listener_addresses/1, tcp_listener_spec/6, ensure_ssl/0, ssl_transform_fun/1]). -export([tcp_listener_started/3, tcp_listener_stopped/3, @@ -47,12 +47,16 @@ -export_type([ip_port/0, hostname/0]). -type(hostname() :: inet:hostname()). --type(ip_port() :: inet:ip_port()). +-type(ip_port() :: inet:port_number()). -type(family() :: atom()). -type(listener_config() :: ip_port() | {hostname(), ip_port()} | {hostname(), ip_port(), family()}). +-type(address() :: {inet:ip_address(), ip_port(), family()}). +-type(name_prefix() :: atom()). +-type(protocol() :: atom()). +-type(label() :: string()). -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). @@ -76,8 +80,10 @@ -spec(force_connection_event_refresh/0 :: () -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(check_tcp_listener_address/2 :: (atom(), listener_config()) - -> [{inet:ip_address(), ip_port(), family(), atom()}]). +-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]). +-spec(tcp_listener_spec/6 :: + (name_prefix(), address(), [gen_tcp:listen_option()], protocol(), + label(), rabbit_types:mfargs()) -> supervisor:child_spec()). -spec(ensure_ssl/0 :: () -> rabbit_types:infos()). -spec(ssl_transform_fun/1 :: (rabbit_types:infos()) @@ -140,39 +146,6 @@ start() -> transient, infinity, supervisor, [rabbit_client_sup]}), ok. -%% inet_parse:address takes care of ip string, like "0.0.0.0" -%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, -%% and runs 'inet_gethost' port process for dns lookups. -%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. - -getaddr(Host, Family) -> - case inet_parse:address(Host) of - {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; - {error, _} -> gethostaddr(Host, Family) - end. - -gethostaddr(Host, auto) -> - Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], - case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of - [] -> host_lookup_error(Host, Lookups); - IPs -> IPs - end; - -gethostaddr(Host, Family) -> - case inet:getaddr(Host, Family) of - {ok, IPAddress} -> [{IPAddress, Family}]; - {error, Reason} -> host_lookup_error(Host, Reason) - end. - -host_lookup_error(Host, Reason) -> - error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}). - -resolve_family({_,_,_,_}, auto) -> inet; -resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; -resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); -resolve_family(_, F) -> F. - ensure_ssl() -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), @@ -201,31 +174,36 @@ ssl_transform_fun(SslOpts) -> end end. -check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) -> - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {"auto", Port}) -> +tcp_listener_addresses(Port) when is_integer(Port) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({"auto", Port}) -> %% Variant to prevent lots of hacking around in bash and batch files - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {Host, Port}) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({Host, Port}) -> %% auto: determine family IPv4 / IPv6 after converting to IP address - check_tcp_listener_address(NamePrefix, {Host, Port, auto}); - -check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) -> - if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; - true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", - [Port]), - throw({error, {invalid_port, Port}}) - end, - [{IPAddress, Port, Family, - rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} || - {IPAddress, Family} <- getaddr(Host, Family0)]. - -check_tcp_listener_address_auto(NamePrefix, Port) -> - lists:append([check_tcp_listener_address(NamePrefix, Listener) || + tcp_listener_addresses({Host, Port, auto}); +tcp_listener_addresses({Host, Port, Family0}) + when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> + [{IPAddress, Port, Family} || + {IPAddress, Family} <- getaddr(Host, Family0)]; +tcp_listener_addresses({_Host, Port, _Family0}) -> + error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), + throw({error, {invalid_port, Port}}). + +tcp_listener_addresses_auto(Port) -> + lists:append([tcp_listener_addresses(Listener) || Listener <- port_to_listeners(Port)]). +tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts, + Protocol, Label, OnConnect) -> + {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), + {tcp_listener_sup, start_link, + [IPAddress, Port, [Family | SocketOpts], + {?MODULE, tcp_listener_started, [Protocol]}, + {?MODULE, tcp_listener_stopped, [Protocol]}, + OnConnect, Label]}, + transient, infinity, supervisor, [tcp_listener_sup]}. + start_tcp_listener(Listener) -> start_listener(Listener, amqp, "TCP Listener", {?MODULE, start_client, []}). @@ -235,27 +213,21 @@ start_ssl_listener(Listener, SslOpts) -> {?MODULE, start_ssl_client, [SslOpts]}). start_listener(Listener, Protocol, Label, OnConnect) -> - [start_listener0(Spec, Protocol, Label, OnConnect) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [start_listener0(Address, Protocol, Label, OnConnect) || + Address <- tcp_listener_addresses(Listener)], ok. -start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> - {ok,_} = supervisor:start_child( - rabbit_sup, - {Name, - {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | tcp_opts()], - {?MODULE, tcp_listener_started, [Protocol]}, - {?MODULE, tcp_listener_stopped, [Protocol]}, - OnConnect, Label]}, - transient, infinity, supervisor, [tcp_listener_sup]}). +start_listener0(Address, Protocol, Label, OnConnect) -> + Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(), + Protocol, Label, OnConnect), + {ok,_} = supervisor:start_child(rabbit_sup, Spec). stop_tcp_listener(Listener) -> - [stop_tcp_listener0(Spec) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [stop_tcp_listener0(Address) || + Address <- tcp_listener_addresses(Listener)], ok. -stop_tcp_listener0({IPAddress, Port, _Family, Name}) -> +stop_tcp_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). @@ -307,9 +279,15 @@ connections() -> rabbit_networking, connections_local, []). connections_local() -> - [rabbit_connection_sup:reader(ConnSup) || + [Reader || {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup)]. + <- supervisor:which_children(rabbit_tcp_client_sup), + Reader <- [try + rabbit_connection_sup:reader(ConnSup) + catch exit:{noproc, _} -> + noproc + end], + Reader =/= noproc]. connection_info_keys() -> rabbit_reader:info_keys(). @@ -357,6 +335,38 @@ tcp_opts() -> {ok, Opts} = application:get_env(rabbit, tcp_listen_options), Opts. +%% inet_parse:address takes care of ip string, like "0.0.0.0" +%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, +%% and runs 'inet_gethost' port process for dns lookups. +%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. +getaddr(Host, Family) -> + case inet_parse:address(Host) of + {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; + {error, _} -> gethostaddr(Host, Family) + end. + +gethostaddr(Host, auto) -> + Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], + case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of + [] -> host_lookup_error(Host, Lookups); + IPs -> IPs + end; + +gethostaddr(Host, Family) -> + case inet:getaddr(Host, Family) of + {ok, IPAddress} -> [{IPAddress, Family}]; + {error, Reason} -> host_lookup_error(Host, Reason) + end. + +host_lookup_error(Host, Reason) -> + error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}). + +resolve_family({_,_,_,_}, auto) -> inet; +resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; +resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); +resolve_family(_, F) -> F. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 0862f1b2c7..de2ba8adbb 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -55,13 +55,20 @@ start() -> CmdArgsAndOpts -> CmdArgsAndOpts end, Command = list_to_atom(Command0), + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join([atom_to_list(Command) | Args], " ")]), + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> + PrintInvalidCommandError(), usage(); {error, Reason} -> print_error("~p", [Reason]), @@ -111,8 +118,7 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", NewImplicitlyEnabled -- ImplicitlyEnabled), - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n") + report_change() end; action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> @@ -140,8 +146,7 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> print_list("The following plugins have been disabled:", ImplicitlyEnabled -- NewImplicitlyEnabled), write_enabled_plugins(PluginsFile, NewEnabled), - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n") + report_change() end. %%---------------------------------------------------------------------------- @@ -327,6 +332,9 @@ lookup_plugins(Names, AllPlugins) -> read_enabled_plugins(PluginsFile) -> case rabbit_file:read_term_file(PluginsFile) of {ok, [Plugins]} -> Plugins; + {ok, []} -> []; + {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, + PluginsFile}}); {error, enoent} -> []; {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, PluginsFile, Reason}}) @@ -374,3 +382,17 @@ maybe_warn_mochiweb(Enabled) -> false -> ok end. + +report_change() -> + io:format("Plugin configuration has changed. " + "Restart RabbitMQ for changes to take effect.~n"), + case os:type() of + {win32, _OsName} -> + io:format("If you have RabbitMQ running as a service then you must" + " reinstall by running~n rabbitmq-service.bat stop~n" + " rabbitmq-service.bat install~n" + " rabbitmq-service.bat start~n~n"); + _ -> + ok + end. + diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4b54546619..f03c1d1c76 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -505,7 +505,10 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> [begin ok = gatherer:fork(Gatherer), ok = worker_pool:submit_async( - fun () -> queue_index_walker_reader(QueueName, Gatherer) + fun () -> link(Gatherer), + ok = queue_index_walker_reader(QueueName, Gatherer), + unlink(Gatherer), + ok end) end || QueueName <- DurableQueues], queue_index_walker({next, Gatherer}); @@ -837,13 +840,16 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> + Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of - false -> {array_new(), 0}; + false -> Empty; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - {ok, SegData} = file_handle_cache:read( - Hdl, ?SEGMENT_TOTAL_SIZE), - Res = load_segment_entries(KeepAcked, SegData, array_new(), 0), + Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of + {ok, SegData} -> load_segment_entries( + KeepAcked, SegData, Empty); + eof -> Empty + end, ok = file_handle_cache:close(Hdl), Res end. @@ -853,15 +859,15 @@ load_segment_entries(KeepAcked, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, SegData/binary>>, - SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}) -> {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, SegData, SegEntries1, UnackedCount + 1); + load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1}); load_segment_entries(KeepAcked, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS, SegData/binary>>, - SegEntries, UnackedCount) -> + {SegEntries, UnackedCount}) -> {UnackedCountDelta, SegEntries1} = case array:get(RelSeq, SegEntries) of {Pub, no_del, no_ack} -> @@ -871,10 +877,10 @@ load_segment_entries(KeepAcked, {_Pub, del, no_ack} -> {-1, array:reset(RelSeq, SegEntries)} end, - load_segment_entries(KeepAcked, SegData, SegEntries1, - UnackedCount + UnackedCountDelta); -load_segment_entries(_KeepAcked, _SegData, SegEntries, UnackedCount) -> - {SegEntries, UnackedCount}. + load_segment_entries(KeepAcked, SegData, + {SegEntries1, UnackedCount + UnackedCountDelta}); +load_segment_entries(_KeepAcked, _SegData, Res) -> + Res. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 694abd9e49..fce61129fa 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -31,7 +31,7 @@ -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). --define(CLOSING_TIMEOUT, 1). +-define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). @@ -329,10 +329,14 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> catch Error -> {error, Error} end), mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> +handle_other({'$gen_cast', force_event_refresh}, Deb, State) + when ?IS_RUNNING(State) -> rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), mainloop(Deb, State); +handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + %% Ignore, we will emit a created event once we start running. + mainloop(Deb, State); handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> @@ -341,10 +345,6 @@ handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). -switch_callback(State = #v1{connection_state = blocked, - heartbeater = Heartbeater}, Callback, Length) -> - ok = rabbit_heartbeat:pause_monitor(Heartbeater), - State#v1{callback = Callback, recv_len = Length}; switch_callback(State, Callback, Length) -> State#v1{callback = Callback, recv_len = Length}. @@ -376,28 +376,22 @@ close_connection(State = #v1{queue_collector = Collector, rabbit_queue_collector:delete_all(Collector), %% We terminate the connection after the specified interval, but %% no later than ?CLOSING_TIMEOUT seconds. - TimeoutMillisec = - 1000 * if TimeoutSec > 0 andalso - TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; - true -> ?CLOSING_TIMEOUT - end, - erlang:send_after(TimeoutMillisec, self(), terminate_connection), + erlang:send_after((if TimeoutSec > 0 andalso + TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; + true -> ?CLOSING_TIMEOUT + end) * 1000, self(), terminate_connection), State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> - case termination_kind(Reason) of - controlled -> - channel_cleanup(ChPid), + case {channel_cleanup(ChPid), termination_kind(Reason)} of + {undefined, uncontrolled} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {_Channel, controlled} -> maybe_close(State); - uncontrolled -> - case channel_cleanup(ChPid) of - undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> rabbit_log:error( - "connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), - maybe_close( - handle_exception(State, Channel, Reason)) - end + {Channel, uncontrolled} -> + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close(handle_exception(State, Channel, Reason)) end. channel_cleanup(ChPid) -> @@ -432,19 +426,15 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> - case channel_cleanup(ChPid) of - undefined -> + case {channel_cleanup(ChPid), termination_kind(Reason)} of + {undefined, _} -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> - case termination_kind(Reason) of - controlled -> - ok; - uncontrolled -> - rabbit_log:error( - "connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]) - end, + {_Channel, controlled} -> + wait_for_channel_termination(N-1, TimerRef); + {Channel, uncontrolled} -> + rabbit_log:error("connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), wait_for_channel_termination(N-1, TimerRef) end; cancel_wait -> @@ -521,7 +511,9 @@ post_process_frame({method, MethodName, _}, _ChPid, case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), case State#v1.connection_state of - blocking -> State#v1{connection_state = blocked}; + blocking -> ok = rabbit_heartbeat:pause_monitor( + State#v1.heartbeater), + State#v1{connection_state = blocked}; _ -> State end; false -> State diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index cda3ccbe0f..1a08efed41 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -28,7 +28,8 @@ -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 31f5ad14ea..219833b726 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -18,21 +18,17 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([deliver/2, match_bindings/2, match_routing_key/2]). +-export([match_bindings/2, match_routing_key/2]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([routing_key/0, routing_result/0, match_result/0]). +-export_type([routing_key/0, match_result/0]). -type(routing_key() :: binary()). --type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). --type(qpids() :: [pid()]). -type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). -spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> match_result()). @@ -44,38 +40,6 @@ %%---------------------------------------------------------------------------- -deliver([], #delivery{mandatory = false, - immediate = false}) -> - %% /dev/null optimisation - {routed, []}; - -deliver(QNames, Delivery = #delivery{mandatory = false, - immediate = false}) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. - QPids = lookup_qpids(QNames), - delegate:invoke_no_result( - QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - {routed, QPids}; - -deliver(QNames, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> - QPids = lookup_qpids(QNames), - {Success, _} = - delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) - end), - {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Mandatory, Immediate, {Routed, Handled}). - - %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source match_bindings(SrcName, Match) -> @@ -104,26 +68,6 @@ match_routing_key(SrcName, [_|_] = RoutingKeys) -> %%-------------------------------------------------------------------- -fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]}; -fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. - -%% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) -check_delivery(true, _ , {false, []}) -> {unroutable, []}; -check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; -check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. - -%% Normally we'd call mnesia:dirty_read/1 here, but that is quite -%% expensive for the reasons explained in rabbit_misc:dirty_read/1. -lookup_qpids(QNames) -> - lists:foldl(fun (QName, QPids) -> - case ets:lookup(rabbit_queue, QName) of - [#amqqueue{pid = QPid, slave_pids = SPids}] -> - [QPid | SPids ++ QPids]; - [] -> - QPids - end - end, [], QNames). - %% Normally we'd call mnesia:dirty_select/2 here, but that is quite %% expensive for the same reasons as above, and, additionally, due to %% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index e0defa9e96..e524446ea8 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -21,7 +21,7 @@ -include_lib("public_key/include/public_key.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). --export([peer_cert_subject_item/2]). +-export([peer_cert_subject_items/2]). %%-------------------------------------------------------------------------- @@ -34,8 +34,8 @@ -spec(peer_cert_issuer/1 :: (certificate()) -> string()). -spec(peer_cert_subject/1 :: (certificate()) -> string()). -spec(peer_cert_validity/1 :: (certificate()) -> string()). --spec(peer_cert_subject_item/2 :: - (certificate(), tuple()) -> string() | 'not_found'). +-spec(peer_cert_subject_items/2 :: + (certificate(), tuple()) -> [string()] | 'not_found'). -endif. @@ -59,8 +59,8 @@ peer_cert_subject(Cert) -> format_rdn_sequence(Subject) end, Cert). -%% Return a part of the certificate's subject. -peer_cert_subject_item(Cert, Type) -> +%% Return the parts of the certificate's subject. +peer_cert_subject_items(Cert, Type) -> cert_info(fun(#'OTPCertificate' { tbsCertificate = #'OTPTBSCertificate' { subject = Subject }}) -> @@ -89,8 +89,8 @@ find_by_type(Type, {rdnSequence, RDNs}) -> case [V || #'AttributeTypeAndValue'{type = T, value = V} <- lists:flatten(RDNs), T == Type] of - [Val] -> format_asn1_value(Val); - [] -> not_found + [] -> not_found; + L -> [format_asn1_value(V) || V <- L] end. %%-------------------------------------------------------------------------- @@ -150,9 +150,11 @@ escape_rdn_value([$ ], middle) -> escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;; C =:= $<; C =:= $>; C =:= $\\ -> [$\\, C | escape_rdn_value(S, middle)]; -escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 -> - %% only U+0000 needs escaping, but for display purposes it's handy - %% to escape all non-printable chars +escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 -> + %% Of ASCII characters only U+0000 needs escaping, but for display + %% purposes it's handy to escape all non-printable chars. All non-ASCII + %% characters get converted to UTF-8 sequences and then escaped. We've + %% already got a UTF-8 sequence here, so just escape it. lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++ escape_rdn_value(S, middle); escape_rdn_value([C | S], middle) -> @@ -167,6 +169,10 @@ format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2, $Z]}) -> io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); +%% We appear to get an untagged value back for an ia5string +%% (e.g. domainComponent). +format_asn1_value(V) when is_list(V) -> + V; format_asn1_value(V) -> io_lib:format("~p", [V]). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2f03b2fb0b..ff55194707 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2232,7 +2232,7 @@ with_fresh_variable_queue(Fun) -> _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. -publish_and_confirm(QPid, Payload, Count) -> +publish_and_confirm(Q, Payload, Count) -> Seqs = lists:seq(1, Count), [begin Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), @@ -2240,7 +2240,7 @@ publish_and_confirm(QPid, Payload, Count) -> Payload), Delivery = #delivery{mandatory = false, immediate = false, sender = self(), message = Msg, msg_seq_no = Seq}, - true = rabbit_amqqueue:deliver(QPid, Delivery) + {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). @@ -2484,7 +2484,7 @@ test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - publish_and_confirm(QPid, <<>>, Count), + publish_and_confirm(Q, <<>>, Count), exit(QPid, kill), MRef = erlang:monitor(process, QPid), @@ -2514,7 +2514,7 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:declare(test_queue(), true, false, [], none), Payload = <<0:8388608>>, %% 1MB Count = 30, - publish_and_confirm(QPid, Payload, Count), + publish_and_confirm(Q, Payload, Count), rabbit_amqqueue:set_ram_duration_target(QPid, 0), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 2db960acc9..ae2b5d3f46 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -28,12 +28,9 @@ binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, internal_user/0, - username/0, password/0, password_hash/0, ok/1, error/1, - ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, - connection_exit/0]). - --type(channel_exit() :: no_return()). --type(connection_exit() :: no_return()). + username/0, password/0, password_hash/0, + ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, + channel_exit/0, connection_exit/0, mfargs/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +153,9 @@ -type(ok_or_error2(A, B) :: ok(A) | error(B)). -type(ok_pid_or_error() :: ok_or_error2(pid(), any())). +-type(channel_exit() :: no_return()). +-type(connection_exit() :: no_return()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -endif. % use_specs diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index e0ca8cbb72..f164035ee5 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -35,6 +35,7 @@ -rabbit_upgrade({gm, mnesia, []}). -rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). -rabbit_upgrade({mirrored_supervisor, mnesia, []}). +-rabbit_upgrade({topic_trie_node, mnesia, []}). %% ------------------------------------------------------------------- @@ -54,6 +55,7 @@ -spec(gm/0 :: () -> 'ok'). -spec(exchange_scratch/0 :: () -> 'ok'). -spec(mirrored_supervisor/0 :: () -> 'ok'). +-spec(topic_trie_node/0 :: () -> 'ok'). -endif. @@ -177,6 +179,12 @@ mirrored_supervisor() -> [{record_name, mirrored_sup_childspec}, {attributes, [key, mirroring_pid, childspec]}]). +topic_trie_node() -> + create(rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, [trie_node, edge_count, binding_count]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index f75da87221..26ea502ced 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -717,8 +717,8 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined -> ok; {error, normal} -> case Child#child.restart_type of - permanent -> ReportError(normal); - {permanent, _Delay} -> ReportError(normal); + permanent -> ReportError(normal, Child); + {permanent, _Delay} -> ReportError(normal, Child); _ -> ok end; {error, OtherReason} -> diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 0d50683db7..8678c2c9e0 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -86,6 +86,13 @@ handle_info({inet_async, LSock, Ref, {error, closed}}, %% know this will fail. {stop, normal, State}; +handle_info({inet_async, LSock, Ref, {error, Reason}}, + State=#state{sock=LSock, ref=Ref}) -> + {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n", + [rabbit_misc:ntoab(Address), Port, Reason]), + accept(State); + handle_info(_Info, State) -> {noreply, State}. diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index 4c835598e0..cb3dd02c42 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -25,7 +25,11 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + +-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()). + -endif. %%---------------------------------------------------------------------------- diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index ad2a0d02d0..9a82ac88c1 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -28,9 +28,14 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(), - atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + integer(), atom(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). + -endif. %%-------------------------------------------------------------------- diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 5bff5c2701..74297b6dce 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -26,12 +26,16 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/7 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), integer(), string()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 456ff39f47..fcb07a16f4 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -37,10 +37,11 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). --spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/1 :: - (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). +-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). -spec(idle/1 :: (any()) -> 'ok'). -endif. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 78ab4df3ea..b42530e269 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -29,12 +29,12 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). --spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/2 :: - (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). --spec(run/1 :: (fun (() -> A)) -> A; - ({atom(), atom(), [any()]}) -> any()). +-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). +-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). +-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. |
