diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-02-23 12:43:54 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-02-23 12:43:54 +0000 |
| commit | 2725b8ce01c2636ac6fbfe375d1c936653363d8e (patch) | |
| tree | f24a0cee1e615e8daf56dc049700de1975414df7 /src | |
| parent | 56f4ccba310376979e4a59028ef710383d6dd04f (diff) | |
| parent | 4230ebb9c24e607e240192894ab32408b8372634 (diff) | |
| download | rabbitmq-server-git-2725b8ce01c2636ac6fbfe375d1c936653363d8e.tar.gz | |
Merge bug24482
Diffstat (limited to 'src')
106 files changed, 1783 insertions, 1172 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl new file mode 100644 index 0000000000..072f4d9dc5 --- /dev/null +++ b/src/credit_flow.erl @@ -0,0 +1,127 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(credit_flow). + +%% Credit flow is controlled by a credit specification - a +%% {InitialCredit, MoreCreditAfter} tuple. For the message sender, +%% credit starts at InitialCredit and is decremented with every +%% message sent. The message receiver grants more credit to the sender +%% by sending it a {bump_credit, ...} control message after receiving +%% MoreCreditAfter messages. The sender should pass this message in to +%% handle_bump_msg/1. The sender should block when it goes below 0 +%% (check by invoking blocked/0). If a process is both a sender and a +%% receiver it will not grant any more credit to its senders when it +%% is itself blocked - thus the only processes that need to check +%% blocked/0 are ones that read from network sockets. + +-define(DEFAULT_CREDIT, {200, 50}). + +-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]). +-export([peer_down/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-opaque(bump_msg() :: {pid(), non_neg_integer()}). +-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}). + +-spec(send/1 :: (pid()) -> 'ok'). +-spec(send/2 :: (pid(), credit_spec()) -> 'ok'). +-spec(ack/1 :: (pid()) -> 'ok'). +-spec(ack/2 :: (pid(), credit_spec()) -> 'ok'). +-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). +-spec(blocked/0 :: () -> boolean()). +-spec(peer_down/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +%% There are two "flows" here; of messages and of credit, going in +%% opposite directions. The variable names "From" and "To" refer to +%% the flow of credit, but the function names refer to the flow of +%% messages. This is the clearest I can make it (since the function +%% names form the API and want to make sense externally, while the +%% variable names are used in credit bookkeeping and want to make +%% sense internally). + +%% For any given pair of processes, ack/2 and send/2 must always be +%% called with the same credit_spec(). + +send(From) -> send(From, ?DEFAULT_CREDIT). + +send(From, {InitialCredit, _MoreCreditAfter}) -> + update({credit_from, From}, InitialCredit, + fun (1) -> block(From), + 0; + (C) -> C - 1 + end). + +ack(To) -> ack(To, ?DEFAULT_CREDIT). + +ack(To, {_InitialCredit, MoreCreditAfter}) -> + update({credit_to, To}, MoreCreditAfter, + fun (1) -> grant(To, MoreCreditAfter), + MoreCreditAfter; + (C) -> C - 1 + end). + +handle_bump_msg({From, MoreCredit}) -> + update({credit_from, From}, 0, + fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From), + C + MoreCredit; + (C) -> C + MoreCredit + end). + +blocked() -> get(credit_blocked, []) =/= []. + +peer_down(Peer) -> + %% In theory we could also remove it from credit_deferred here, but it + %% doesn't really matter; at some point later we will drain + %% credit_deferred and thus send messages into the void... + unblock(Peer), + erase({credit_from, Peer}), + erase({credit_to, Peer}). + +%% -------------------------------------------------------------------------- + +grant(To, Quantity) -> + Msg = {bump_credit, {self(), Quantity}}, + case blocked() of + false -> To ! Msg; + true -> update(credit_deferred, [], + fun (Deferred) -> [{To, Msg} | Deferred] end) + end. + +block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end). + +unblock(From) -> + update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end), + case blocked() of + false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], + erase(credit_deferred); + true -> ok + end. + +get(Key, Default) -> + case get(Key) of + undefined -> Default; + Value -> Value + end. + +update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok. diff --git a/src/delegate.erl b/src/delegate.erl index edb4eba4ae..d595e4819e 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(delegate). diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 4c131a6c59..2a8b915b89 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(delegate_sup). diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6c3f1b5f50..59a0ab1cc0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(file_handle_cache). @@ -125,8 +125,7 @@ %% requesting process is considered to 'own' one more %% descriptor. release/0 is the inverse operation and releases a %% previously obtained descriptor. transfer/1 transfers ownership of a -%% file descriptor between processes. It is non-blocking. Obtain is -%% used to obtain permission to accept file descriptors. Obtain has a +%% file descriptor between processes. It is non-blocking. Obtain has a %% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use %% the entire limit, but will be evicted by obtain calls up to the %% point at which no more obtain calls can be satisfied by the obtains @@ -262,7 +261,7 @@ -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [obtain_count, obtain_limit]). +-define(INFO_KEYS, [total_limit, total_used, sockets_limit, sockets_used]). %%---------------------------------------------------------------------------- %% Public API @@ -790,8 +789,10 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(obtain_count, #fhc_state{obtain_count = Count}) -> Count; -i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(total_limit, #fhc_state{limit = Limit}) -> Limit; +i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2; +i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(sockets_used, #fhc_state{obtain_count = Count}) -> Count; i(Item, _) -> throw({bad_argument, Item}). %%---------------------------------------------------------------------------- diff --git a/src/gatherer.erl b/src/gatherer.erl index fe976b50a2..98b360389a 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gatherer). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ab6c4e64f3..f8537487cd 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -73,7 +73,7 @@ %% but where the second argument is specifically the priority_queue %% which contains the prioritised message_queue. -%% All modifications are (C) 2009-2011 VMware, Inc. +%% All modifications are (C) 2009-2012 VMware, Inc. %% ``The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -1079,7 +1079,7 @@ get_proc_name({local, Name}) -> exit(process_not_registered) end; get_proc_name({global, Name}) -> - case global:safe_whereis_name(Name) of + case whereis_name(Name) of undefined -> exit(process_not_registered_globally); Pid when Pid =:= self() -> @@ -1101,7 +1101,7 @@ get_parent() -> name_to_pid(Name) -> case whereis(Name) of undefined -> - case global:safe_whereis_name(Name) of + case whereis_name(Name) of undefined -> exit(could_not_find_registerd_name); Pid -> @@ -1111,6 +1111,20 @@ name_to_pid(Name) -> Pid end. +whereis_name(Name) -> + case ets:lookup(global_names, Name) of + [{_Name, Pid, _Method, _RPid, _Ref}] -> + if node(Pid) == node() -> + case is_process_alive(Pid) of + true -> Pid; + false -> undefined + end; + true -> + Pid + end; + [] -> undefined + end. + find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> PrioriCall = function_exported_or_default( Mod, 'prioritise_call', 3, diff --git a/src/gm.erl b/src/gm.erl index 8c838a7056..6f9ff5642b 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm). @@ -386,6 +386,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -515,8 +516,8 @@ group_members(Server) -> init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), + Self = make_member(GroupName), gen_server2:cast(self(), join), - Self = self(), {ok, #state { self = Self, left = {Self, undefined}, right = {Self, undefined}, @@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> internal_broadcast(Msg, From, State); @@ -604,7 +606,8 @@ handle_cast({broadcast, Msg}, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + State}); handle_cast({broadcast, Msg}, State) -> internal_broadcast(Msg, none, State); @@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self, State1 = check_neighbours(State #state { view = View, members_state = MembersState }), handle_callback_result( - {Module:joined(Args, all_known_members(View)), State1}); + {Module:joined(Args, get_pids(all_known_members(View))), State1}); handle_cast(leave, State) -> {stop, normal, State}. @@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self, confirms = Confirms, callback_args = Args, broadcast_buffer = Buffer }) -> - Result = Module:handle_msg(Args, Self, Msg), + Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount, Msg} | Buffer], Confirms1 = case From of none -> Confirms; @@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> end, try case gen_server2:call( - Left, {add_on_right, Self}, infinity) of + get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); not_ready -> join_group(Self, GroupName) end @@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) -> mnesia:sync_transaction( fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], - version = 0 }, + version = ?VERSION_START }, case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> mnesia:write(GroupNew), @@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false. ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> {Self, undefined}; ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> - ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}), + ok = gen_server2:cast(get_pid(RealNeighbour), + {?TAG, Ver, check_neighbours}), {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> true = erlang:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, - ok = gen_server2:cast(RealNeighbour, Msg), + ok = gen_server2:cast(get_pid(RealNeighbour), Msg), ok = case Neighbour of Self -> ok; - _ -> gen_server2:cast(Neighbour, Msg) + _ -> gen_server2:cast(get_pid(Neighbour), Msg) end, {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor(Self, Self) -> undefined; maybe_monitor(Other, _Self) -> - erlang:monitor(process, Other). + erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) -> build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). +make_member(GroupName) -> + {case read_group(GroupName) of + #gm_group { version = Version } -> Version; + {error, not_found} -> ?VERSION_START + end, self()}. + +get_pid({_Version, Pid}) -> Pid. + +get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% --------------------------------------------------------------------------- %% Activity assembly @@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self, send_right(Right, View, {activity, Self, Activity}). send_right(Right, View, Msg) -> - ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}). + ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> lists:foldl( fun ({Id, Pubs, _Acks}, ok) -> lists:foldl(fun ({_PubNum, Pub}, ok) -> - Module:handle_msg(Args, Id, Pub); + Module:handle_msg(Args, get_pid(Id), Pub); (_, Error) -> Error end, ok, Pubs); @@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) -> Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> ok; - _ -> Module:members_changed(Args, Births, Deaths) + _ -> Module:members_changed(Args, get_pids(Births), + get_pids(Deaths)) end. handle_callback_result({Result, State}) -> diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 5e5a3a5a6f..572175410d 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_soak_test). diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index defb0f29b8..dad75bd447 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_speed_test). diff --git a/src/gm_tests.erl b/src/gm_tests.erl index ca0ffd6483..0a2d420469 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_tests). diff --git a/src/lqueue.erl b/src/lqueue.erl index 04b4070621..c4e046b521 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(lqueue). diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 8dfe39f890..a599effa91 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(mirrored_supervisor). @@ -144,32 +144,17 @@ -type child() :: pid() | 'undefined'. -type child_id() :: term(). --type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}. -type modules() :: [module()] | 'dynamic'. --type restart() :: 'permanent' | 'transient' | 'temporary'. --type shutdown() :: 'brutal_kill' | timeout(). -type worker() :: 'worker' | 'supervisor'. -type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. -type sup_ref() :: (Name :: atom()) | {Name :: atom(), Node :: node()} | {'global', Name :: atom()} | pid(). --type child_spec() :: {Id :: child_id(), - StartFunc :: mfargs(), - Restart :: restart(), - Shutdown :: shutdown(), - Type :: worker(), - Modules :: modules()}. -type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). -type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. --type startchild_err() :: 'already_present' - | {'already_started', Child :: child()} | term(). --type startchild_ret() :: {'ok', Child :: child()} - | {'ok', Child :: child(), Info :: term()} - | {'error', startchild_err()}. - -type group_name() :: any(). -spec start_link(GroupName, Module, Args) -> startlink_ret() when @@ -183,9 +168,9 @@ Module :: module(), Args :: term(). --spec start_child(SupRef, ChildSpec) -> startchild_ret() when +-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when SupRef :: sup_ref(), - ChildSpec :: child_spec() | (List :: [term()]). + ChildSpec :: supervisor:child_spec() | (List :: [term()]). -spec restart_child(SupRef, Id) -> Result when SupRef :: sup_ref(), @@ -215,12 +200,12 @@ Modules :: modules(). -spec check_childspecs(ChildSpecs) -> Result when - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: 'ok' | {'error', Error :: term()}. -spec start_internal(Group, ChildSpecs) -> Result when Group :: group_name(), - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: startlink_ret(). -spec create_tables() -> Result when @@ -242,8 +227,10 @@ start_link({global, _SupName}, _Group, _Mod, _Args) -> start_link0(Prefix, Group, Init) -> case apply(?SUPERVISOR, start_link, Prefix ++ [?MODULE, {overall, Group, Init}]) of - {ok, Pid} -> call(Pid, {init, Pid}), - {ok, Pid}; + {ok, Pid} -> case catch call(Pid, {init, Pid}) of + ok -> {ok, Pid}; + E -> E + end; Other -> Other end. @@ -346,13 +333,20 @@ handle_call({init, Overall}, _From, end || Pid <- Rest], Delegate = child(Overall, delegate), erlang:monitor(process, Delegate), - [maybe_start(Group, Delegate, S) || S <- ChildSpecs], - {reply, ok, State#state{overall = Overall, delegate = Delegate}}; + State1 = State#state{overall = Overall, delegate = Delegate}, + case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of + true -> {reply, ok, State1}; + false -> {stop, shutdown, State1} + end; handle_call({start_child, ChildSpec}, _From, State = #state{delegate = Delegate, group = Group}) -> - {reply, maybe_start(Group, Delegate, ChildSpec), State}; + {reply, case maybe_start(Group, Delegate, ChildSpec) of + already_in_mnesia -> {error, already_present}; + {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; + Else -> Else + end, State}; handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, group = Group}) -> @@ -400,13 +394,16 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. Self = self(), - case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [Self | _] -> {atomic, ChildSpecs} = - mnesia:transaction(fun() -> update_all(Pid) end), - [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; - _ -> ok - end, - {noreply, State}; + R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of + [Self | _] -> {atomic, ChildSpecs} = + mnesia:transaction(fun() -> update_all(Pid) end), + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> [] + end, + case all_started(R) of + true -> {noreply, State}; + false -> {stop, shutdown, State} + end; handle_info(Info, State) -> {stop, {unexpected_info, Info}, State}. @@ -428,8 +425,8 @@ maybe_start(Group, Delegate, ChildSpec) -> check_start(Group, Delegate, ChildSpec) end) of {atomic, start} -> start(Delegate, ChildSpec); - {atomic, undefined} -> {error, already_present}; - {atomic, Pid} -> {error, {already_started, Pid}}; + {atomic, undefined} -> already_in_mnesia; + {atomic, Pid} -> {already_in_mnesia, Pid}; %% If we are torn down while in the transaction... {aborted, E} -> {error, E} end. @@ -499,6 +496,8 @@ delete_all(Group) -> [delete(Group, id(C)) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. +all_started(Results) -> [] =:= [R || R = {error, _} <- Results]. + %%---------------------------------------------------------------------------- create_tables() -> diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index b8d52ae86b..e8baabe8ff 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(mirrored_supervisor_tests). @@ -36,15 +36,14 @@ all_tests() -> passed = test_already_there(), passed = test_delete_restart(), passed = test_which_children(), -%% commented out in order to determine whether this is the only test -%% that is failing - see bug 24362 -%% passed = test_large_group(), + passed = test_large_group(), passed = test_childspecs_at_init(), passed = test_anonymous_supervisors(), passed = test_no_migration_on_shutdown(), passed = test_start_idempotence(), passed = test_unsupported(), passed = test_ignore(), + passed = test_startup_failure(), passed. %% Simplest test @@ -197,6 +196,22 @@ test_ignore() -> {sup, fake_strategy_for_ignore, []}), passed. +test_startup_failure() -> + [test_startup_failure(F) || F <- [want_error, want_exit]], + passed. + +test_startup_failure(Fail) -> + process_flag(trap_exit, true), + ?MS:start_link(get_group(group), ?MODULE, + {sup, one_for_one, [childspec(Fail)]}), + receive + {'EXIT', _, shutdown} -> + ok + after 1000 -> + exit({did_not_exit, Fail}) + end, + process_flag(trap_exit, false). + %% --------------------------------------------------------------------------- with_sups(Fun, Sups) -> @@ -230,6 +245,12 @@ start_sup0(Name, Group, ChildSpecs) -> childspec(Id) -> {Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. +start_gs(want_error) -> + {error, foo}; + +start_gs(want_exit) -> + exit(foo); + start_gs(Id) -> gen_server:start_link({local, Id}, ?MODULE, server, []). diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl new file mode 100644 index 0000000000..a3773d90b2 --- /dev/null +++ b/src/mnesia_sync.erl @@ -0,0 +1,77 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(mnesia_sync). + +%% mnesia:sync_transaction/3 fails to guarantee that the log is flushed to disk +%% at commit. This module is an attempt to minimise the risk of data loss by +%% performing a coalesced log fsync. Unfortunately this is performed regardless +%% of whether or not the log was appended to. + +-behaviour(gen_server). + +-export([sync/0]). + +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {waiting, disc_node}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(sync/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +sync() -> + gen_server:call(?SERVER, sync, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state{disc_node = mnesia:system_info(use_dir), waiting = []}}. + +handle_call(sync, _From, #state{disc_node = false} = State) -> + {reply, ok, State}; +handle_call(sync, From, #state{waiting = Waiting} = State) -> + {noreply, State#state{waiting = [From | Waiting]}, 0}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(timeout, #state{waiting = Waiting} = State) -> + ok = disk_log:sync(latest_log), + [gen_server:reply(From, ok) || From <- Waiting], + {noreply, State#state{waiting = []}}; +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/pg_local.erl b/src/pg_local.erl index c9c3a3a715..e2e82f1f4b 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -13,7 +13,7 @@ %% versions of Erlang/OTP. The remaining type specs have been %% removed. -%% All modifications are (C) 2010-2011 VMware, Inc. +%% All modifications are (C) 2010-2012 VMware, Inc. %% %CopyrightBegin% %% diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 4fc8b46972..780fa2e92f 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% Priority queues have essentially the same interface as ordinary diff --git a/src/rabbit.erl b/src/rabbit.erl index 0a2681a219..0a0ca90a63 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit). @@ -45,6 +45,12 @@ {requires, file_handle_cache}, {enables, external_infrastructure}]}). +-rabbit_boot_step({database_sync, + [{description, "database sync"}, + {mfa, {rabbit_sup, start_child, [mnesia_sync]}}, + {requires, database}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({file_handle_cache, [{description, "file handle cache server"}, {mfa, {rabbit_sup, start_restartable_child, @@ -132,7 +138,7 @@ -rabbit_boot_step({recovery, [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, - {requires, empty_db_check}, + {requires, core_initialized}, {enables, routing_ready}]}). -rabbit_boot_step({mirror_queue_slave_sup, @@ -158,8 +164,9 @@ {enables, networking}]}). -rabbit_boot_step({direct_client, - [{mfa, {rabbit_direct, boot, []}}, - {requires, log_relay}]}). + [{description, "direct client"}, + {mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, @@ -190,7 +197,7 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -308,17 +315,28 @@ stop_and_halt() -> ok. status() -> - [{pid, list_to_integer(os:getpid())}, - {running_applications, application:which_applications(infinity)}, - {os, os:type()}, - {erlang_version, erlang:system_info(system_version)}, - {memory, erlang:memory()}] ++ - rabbit_misc:filter_exit_map( - fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, - [{vm_memory_high_watermark, {vm_memory_monitor, - get_vm_memory_high_watermark, []}}, - {vm_memory_limit, {vm_memory_monitor, - get_memory_limit, []}}]). + S1 = [{pid, list_to_integer(os:getpid())}, + {running_applications, application:which_applications(infinity)}, + {os, os:type()}, + {erlang_version, erlang:system_info(system_version)}, + {memory, erlang:memory()}], + S2 = rabbit_misc:filter_exit_map( + fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, + [{vm_memory_high_watermark, {vm_memory_monitor, + get_vm_memory_high_watermark, []}}, + {vm_memory_limit, {vm_memory_monitor, + get_memory_limit, []}}]), + S3 = rabbit_misc:with_exit_handler( + fun () -> [] end, + fun () -> [{file_descriptors, file_handle_cache:info()}] end), + S4 = [{processes, [{limit, erlang:system_info(process_limit)}, + {used, erlang:system_info(process_count)}]}, + {run_queue, erlang:statistics(run_queue)}, + {uptime, begin + {T,_} = erlang:statistics(wall_clock), + T div 1000 + end}], + S1 ++ S2 ++ S3 ++ S4. is_running() -> is_running(node()). @@ -348,10 +366,8 @@ rotate_logs(BinarySuffix) -> start(normal, []) -> case erts_version_check() of ok -> - ok = rabbit_mnesia:delete_previously_running_nodes(), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), - print_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), @@ -430,8 +446,7 @@ run_boot_step({StepName, Attributes}) -> [try apply(M,F,A) catch - _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n", - [Reason, erlang:get_stacktrace()]) + _:Reason -> boot_step_error(Reason, erlang:get_stacktrace()) end || {M,F,A} <- MFAs], io:format("done~n"), ok @@ -490,8 +505,27 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> + {Err, Nodes} = + case rabbit_mnesia:read_previously_running_nodes() of + [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" + " shut down forcefully~nit cannot determine which nodes" + " are timing out. Details on all nodes will~nfollow.~n", + rabbit_mnesia:all_clustered_nodes() -- [node()]}; + Ns -> {rabbit_misc:format( + "Timeout contacting cluster nodes: ~p.~n", [Ns]), + Ns} + end, + boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []); + +boot_step_error(Reason, Stacktrace) -> + boot_error("Error description:~n ~p~n~n" + "Log files (may contain more information):~n ~s~n ~s~n~n" + "Stack trace:~n ~p~n~n", + [Reason, log_location(kernel), log_location(sasl), Stacktrace]). + boot_error(Format, Args) -> - io:format("BOOT ERROR: " ++ Format, Args), + io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), error_logger:error_msg(Format, Args), timer:sleep(1000), exit({?MODULE, failure_during_boot}). @@ -645,7 +679,7 @@ print_banner() -> {"app descriptor", app_location()}, {"home dir", home_dir()}, {"config file(s)", config_files()}, - {"cookie hash", rabbit_misc:cookie_hash()}, + {"cookie hash", rabbit_nodes:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, {"database dir", rabbit_mnesia:dir()}, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index ca28d68637..75c5351182 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_access_control). @@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) -> check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> - ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]), check_access( fun() -> rabbit_vhost:exists(VHostPath) andalso diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index fd03ca85b3..187ec1ab6c 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_alarm). @@ -31,10 +31,9 @@ -ifdef(use_specs). --type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 96017df812..a7dfd535c8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue). @@ -20,12 +20,12 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, unblock/2, flush_all/2]). +-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([store_queue/1]). @@ -40,21 +40,23 @@ -define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). +-define(MORE_CONSUMER_CREDIT_AFTER, 50). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([name/0, qmsg/0]). +-export_type([name/0, qmsg/0, routing_result/0]). -type(name() :: rabbit_types:r('queue')). - +-type(qpids() :: [pid()]). -type(qlen() :: rabbit_types:ok(non_neg_integer())). -type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())). -type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}). -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). - +-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). -spec(start/0 :: () -> [name()]). @@ -69,7 +71,8 @@ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | - rabbit_types:error('not_found')). + rabbit_types:error('not_found'); + ([name()]) -> [rabbit_types:amqqueue()]). -spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). @@ -117,12 +120,15 @@ rabbit_types:error('in_use') | rabbit_types:error('not_empty')). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()). +-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). +-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). --spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) -> +-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). +-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). @@ -133,8 +139,9 @@ -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). +-spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | @@ -264,6 +271,10 @@ add_default_binding(#amqqueue{name = QueueName}) -> key = RoutingKey, args = []}). +lookup(Names) when is_list(Names) -> + %% Normally we'd call mnesia:dirty_read/1 here, but that is quite + %% expensive for reasons explained in rabbit_misc:dirty_read/1. + lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]); lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -419,14 +430,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). -deliver(QPid, Delivery = #delivery{immediate = true}) -> - gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); -deliver(QPid, Delivery = #delivery{mandatory = true}) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity), - true; -deliver(QPid, Delivery) -> - gen_server2:cast(QPid, {deliver, Delivery}), - true. +deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). + +deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). @@ -458,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + Key = {consumer_credit_to, QPid}, + put(Key, case get(Key) of + 1 -> gen_server2:cast( + QPid, {notify_sent, ChPid, + ?MORE_CONSUMER_CREDIT_AFTER}), + ?MORE_CONSUMER_CREDIT_AFTER; + undefined -> erlang:monitor(process, QPid), + ?MORE_CONSUMER_CREDIT_AFTER - 1; + C -> C - 1 + end), + ok. + +notify_sent_queue_down(QPid) -> + erase({consumer_credit_to, QPid}), + ok. unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). @@ -518,6 +538,49 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. +deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> + %% /dev/null optimisation + {routed, []}; + +deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + QPids = qpids(Qs), + case Flow of + flow -> [credit_flow:send(QPid) || QPid <- QPids]; + noflow -> ok + end, + delegate:invoke_no_result( + QPids, fun (QPid) -> + gen_server2:cast(QPid, {deliver, Delivery, Flow}) + end), + {routed, QPids}; + +deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, + _Flow) -> + QPids = qpids(Qs), + {Success, _} = + delegate:invoke( + QPids, fun (QPid) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity) + end), + case {Mandatory, Immediate, + lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; + ({_, false}, {_, H}) -> {true, H} + end, {false, []}, Success)} of + {true, _ , {false, []}} -> {unroutable, []}; + {_ , true, {_ , []}} -> {not_delivered, []}; + {_ , _ , {_ , R}} -> {routed, R} + end. + +qpids(Qs) -> lists:append([[QPid | SPids] || + #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). + safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ba20b35524..12cd0c93ff 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue_process). @@ -20,7 +20,7 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). +-define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- init(Q) -> - ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), State = #q{q = Q#amqqueue{pid = self()}, @@ -135,7 +134,6 @@ init(Q) -> init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, AckTags, Deliveries, MTC) -> - ?LOGDEBUG("Queue starting - ~p~n", [Q]), case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -390,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) -> {_, _} -> ok end. -deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, +deliver_msgs_to_consumers(_DeliverFun, true, State) -> + {true, State}; +deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case PredFun(FunAcc, State) of - false -> {FunAcc, State}; - true -> case queue:out(ActiveConsumers) of - {empty, _} -> - {FunAcc, State}; - {{value, QEntry}, Tail} -> - {FunAcc1, State1} = - deliver_msg_to_consumer( + case queue:out(ActiveConsumers) of + {empty, _} -> + {false, State}; + {{value, QEntry}, Tail} -> + {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, - FunAcc, State#q{active_consumers = Tail}), - deliver_msgs_to_consumers(Funs, FunAcc1, State1) - end + State#q{active_consumers = Tail}), + deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = ch_record(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - {FunAcc, State}; + {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, self(), Consumer#consumer.ack_required) of false -> block_consumer(C#cr{is_limit_active = true}, E), - {FunAcc, State}; + {false, State}; true -> AC1 = queue:in(E, State#q.active_consumers), deliver_msg_to_consumer( - DeliverFun, Consumer, C, FunAcc, + DeliverFun, Consumer, C, State#q{active_consumers = AC1}) end end. @@ -428,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, - FunAcc, State = #q{q = #amqqueue{name = QName}}) -> - {{Message, IsDelivered, AckTag}, FunAcc1, State1} = - DeliverFun(AckRequired, FunAcc, State), + State = #q{q = #amqqueue{name = QName}}) -> + {{Message, IsDelivered, AckTag}, Stop, State1} = + DeliverFun(AckRequired, State), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of @@ -439,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun, end, update_ch_record(C#cr{acktags = ChAckTags1, unsent_message_count = Count + 1}), - {FunAcc1, State1}. - -deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. + {Stop, State1}. -deliver_from_queue_deliver(AckRequired, false, State) -> +deliver_from_queue_deliver(AckRequired, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. @@ -487,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) -> State. run_message_queue(State) -> - Funs = {fun deliver_from_queue_pred/2, - fun deliver_from_queue_deliver/3}, State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), - IsEmpty = BQ:is_empty(BQS), - {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), + {_IsEmpty1, State2} = deliver_msgs_to_consumers( + fun deliver_from_queue_deliver/2, + BQ:is_empty(BQS), State1), State2. attempt_delivery(Delivery = #delivery{sender = ChPid, @@ -506,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, end, case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = - fun (AckRequired, false, - State1 = #q{backing_queue_state = BQS2}) -> + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> %% we don't need an expiry here because %% messages are not being enqueued, so we use %% an empty message_properties. @@ -523,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, State1#q{backing_queue_state = BQS3}} end, {Delivered, State2} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + deliver_msgs_to_consumers(DeliverFun, false, State#q{backing_queue_state = BQS1}), {Delivered, Confirm, State2}; {Duplicate, BQS1} -> @@ -598,6 +589,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> + case get({ch_publisher, DownPid}) of + undefined -> ok; + MRef -> erlang:demonitor(MRef), + erase({ch_publisher, DownPid}), + credit_flow:peer_down(DownPid) + end, case lookup_ch(DownPid) of not_found -> {ok, State}; @@ -713,15 +710,12 @@ infos(Items, State) -> || Item <- (Items1 -- [synchronised_slave_pids])]. slaves_status(#q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} = - rabbit_amqqueue:lookup(Name), - case MNodes of - undefined -> + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - _ -> + {ok, #amqqueue{slave_pids = SPids}} -> {Results, _Bad} = - delegate:invoke( - SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), + delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), {SPids1, SSPids} = lists:foldl( fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> @@ -765,11 +759,9 @@ i(memory, _) -> {memory, M} = process_info(self(), memory), M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes, - slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), - case MNodes of - undefined -> []; - _ -> SPids + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> []; + {ok, #amqqueue{slave_pids = SPids}} -> SPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); @@ -826,7 +818,7 @@ prioritise_cast(Msg, _State) -> {set_maximum_since_use, _Age} -> 8; {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; + {notify_sent, _ChPid, _Credit} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 @@ -877,9 +869,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery}, _From, State) -> - %% Synchronous, "immediate" delivery mode - %% +handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> %% FIXME: Is this correct semantics? %% %% I'm worried in particular about the case where an exchange has @@ -897,8 +887,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> false -> discard_delivery(Delivery, State1) end); -handle_call({deliver, Delivery}, From, State) -> - %% Synchronous, "mandatory" delivery mode. Reply asap. +handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); @@ -1021,8 +1010,17 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + case Flow of + flow -> Key = {ch_publisher, Sender}, + case get(Key) of + undefined -> put(Key, erlang:monitor(process, Sender)); + _ -> ok + end, + credit_flow:ack(Sender); + noflow -> ok + end, noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> @@ -1054,11 +1052,11 @@ handle_cast({unblock, ChPid}, State) -> possibly_unblock(State, ChPid, fun (C) -> C#cr{is_limit_active = false} end)); -handle_cast({notify_sent, ChPid}, State) -> +handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - 1} + C#cr{unsent_message_count = Count - Credit} end)); handle_cast({limit, ChPid, Limiter}, State) -> @@ -1102,8 +1100,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), - {stop, normal, State}; + true -> {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; @@ -1150,8 +1147,11 @@ handle_info(timeout, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(Info, State) -> - ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 7b3ebcf266..a4305e5f5b 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue_sup). diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index ade158bb8c..e0e252b898 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_backend). diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 086a90b49f..3ef81d3208 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_backend_internal). diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index 897199ee78..0c8251b803 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism). diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index b8682a465e..3de6e7a679 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_amqplain). diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl index acbb6e48f6..64b01d8e5d 100644 --- a/src/rabbit_auth_mechanism_cr_demo.erl +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_cr_demo). diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 2448acb660..19fb587567 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_plain). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index c3b322ee6d..364eb8f646 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_backing_queue). diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index c61184a601..7b00fa5f13 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_backing_queue_qc). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b266d3664d..b8211d4332 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_basic). @@ -29,7 +29,7 @@ -type(properties_input() :: (rabbit_framing:amqp_property_record() | [{atom(), any()}])). -type(publish_result() :: - ({ok, rabbit_router:routing_result(), [pid()]} + ({ok, rabbit_amqqueue:routing_result(), [pid()]} | rabbit_types:error('not_found'))). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). @@ -88,8 +88,8 @@ publish(Delivery = #delivery{ end. publish(X, Delivery) -> - {RoutingRes, DeliveredQPids} = - rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery), + Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), + {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), {ok, RoutingRes, DeliveredQPids}. delivery(Mandatory, Immediate, Message, MsgSeqNo) -> @@ -139,7 +139,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> {ok, #basic_message{ exchange_name = XName, content = strip_header(DecodedContent, ?DELETED_HEADER), - id = rabbit_guid:guid(), + id = rabbit_guid:gen(), is_persistent = is_message_persistent(DecodedContent), routing_keys = [RoutingKey | header_routes(Props#'P_basic'.headers)]}} diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 494f3203c8..d69376fb75 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binary_generator). diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index f3ca4e9897..5f0016b60b 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binary_parser). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e625a427e4..bb44797e4d 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binding). @@ -277,6 +277,7 @@ has_for_source(SrcName) -> contains(rabbit_semi_durable_route, Match). remove_for_source(SrcName) -> + lock_route_tables(), Match = #route{binding = #binding{source = SrcName, _ = '_'}}, Routes = lists:usort( mnesia:match_object(rabbit_route, Match, write) ++ @@ -351,7 +352,28 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). +%% For bulk operations we lock the tables we are operating on in order +%% to reduce the time complexity. Without the table locks we end up +%% with num_tables*num_bulk_bindings row-level locks. Taking each lock +%% takes time proportional to the number of existing locks, thus +%% resulting in O(num_bulk_bindings^2) complexity. +%% +%% The locks need to be write locks since ultimately we end up +%% removing all these rows. +%% +%% The downside of all this is that no other binding operations except +%% lookup/routing (which uses dirty ops) can take place +%% concurrently. However, that is the case already since the bulk +%% operations involve mnesia:match_object calls with a partial key, +%% which entails taking a table lock. +lock_route_tables() -> + [mnesia:lock({table, T}, write) || T <- [rabbit_route, + rabbit_reverse_route, + rabbit_semi_durable_route, + rabbit_durable_route]]. + remove_for_destination(DstName, DeleteFun) -> + lock_route_tables(), Match = reverse_route( #route{binding = #binding{destination = DstName, _ = '_'}}), ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9b2fe28ce8..a101886f26 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel). @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/10, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). @@ -33,9 +33,9 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_acks, - user, virtual_host, most_recently_declared_queue, queue_monitors, + limiter, tx_status, next_tag, unacked_message_q, + uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, + virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state}). @@ -78,6 +78,8 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -111,7 +113,11 @@ do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - gen_server2:cast(Pid, {method, Method, Content}). + gen_server2:cast(Pid, {method, Method, Content, noflow}). + +do_flow(Pid, Method, Content) -> + credit_flow:send(Pid), + gen_server2:cast(Pid, {method, Method, Content, flow}). flush(Pid) -> gen_server2:call(Pid, flush, infinity). @@ -185,10 +191,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), uncommitted_acks = [], + uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = dict:new(), + queue_monitors = sets:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -244,7 +251,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> handle_call(_Request, _From, State) -> noreply(State). -handle_cast({method, Method, Content}, State) -> +handle_cast({method, Method, Content, Flow}, + State = #ch{reader_pid = Reader}) -> + case Flow of + flow -> credit_flow:ack(Reader); + noflow -> ok + end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), @@ -284,29 +296,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_keys = [RoutingKey | _CcRoutes], - content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag, - trace_state = TraceState}) -> - State1 = lock_message(AckRequired, - ack_record(DeliveryTag, ConsumerTag, Msg), - State), - - M = #'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey}, - rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), - State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), - rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State3#ch{next_tag = DeliveryTag + 1}); - + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> + ok = rabbit_writer:send_command_and_notify( + WriterPid, QPid, self(), + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -315,6 +317,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(timeout, State) -> noreply(State); @@ -327,9 +333,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = - dict:erase(QPid, State3#ch.queue_monitors)}); + sets:del_element(QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -527,7 +534,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> #'channel.flow_ok'{active = false}); _ -> ok end, - demonitor_queue(QPid, State#ch{blocking = Blocking1}) + State#ch{blocking = Blocking1} end. record_confirm(undefined, _, State) -> @@ -565,8 +572,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), case gb_sets:is_empty(MsgSeqNos1) of true -> UQM1 = gb_trees:delete(QPid, UQM), - demonitor_queue( - QPid, State#ch{unconfirmed_qm = UQM1}); + State#ch{unconfirmed_qm = UQM1}; false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), State#ch{unconfirmed_qm = UQM1} end; @@ -672,45 +678,36 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, State1 = State#ch{unacked_message_q = Remaining}, {noreply, case TxStatus of - none -> ack(Acked, State1); + none -> ack(Acked, State1), + State1; in_progress -> State1#ch{uncommitted_acks = Acked ++ State1#ch.uncommitted_acks} end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - next_tag = DeliveryTag, - trace_state = TraceState}) -> + _, State = #ch{writer_pid = WriterPid, + conn_pid = ConnPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_keys = [RoutingKey | _CcRoutes], - content = Content}}} -> - State1 = lock_message(not(NoAck), - ack_record(DeliveryTag, none, Msg), - State), - State2 = maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State1), - State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), - rabbit_trace:tap_trace_out(Msg, TraceState), + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}} -> ok = rabbit_writer:send_command( WriterPid, - #'basic.get_ok'{delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey, + #'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State3#ch{next_tag = DeliveryTag + 1}}; + {noreply, record_sent(none, not(NoAck), Msg, State)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -730,7 +727,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of - <<>> -> rabbit_guid:binstring_guid("amq.ctag"); + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.ctag"); Other -> Other end, @@ -787,9 +785,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, false -> dict:store(QPid, CTags1, QCons) end end, - NewState = demonitor_queue( - Q, State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = QCons1}), + NewState = State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QCons1}, %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -960,7 +957,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin, false -> none end, ActualNameBin = case QueueNameBin of - <<>> -> rabbit_guid:binstring_guid("amq.gen"); + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), @@ -1068,19 +1066,26 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL}) -> - State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, - State, TMQ))), - {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; +handle_method(#'tx.commit'{}, _, + State = #ch{uncommitted_message_q = TMQ, + uncommitted_acks = TAL, + uncommitted_nacks = TNL, + limiter = Limiter}) -> + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), + ack(TAL, State1), + lists:foreach( + fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL), + {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL}) -> - UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))), + uncommitted_acks = TAL, + uncommitted_nacks = TNL}) -> + TNL1 = lists:append([L || {_, L} <- TNL]), + UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> @@ -1111,10 +1116,7 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> State2 = lists:foldl(fun monitor_queue/2, - State1#ch{blocking = - sets:from_list(QPids)}, - QPids), + QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)}, ok = rabbit_amqqueue:flush_all(QPids, self()), {noreply, State2} end; @@ -1145,31 +1147,12 @@ consumer_monitor(ConsumerTag, end. monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (not dict:is_key(QPid, QMons) andalso - queue_monitor_needed(QPid, State)) of - true -> MRef = erlang:monitor(process, QPid), - State#ch{queue_monitors = dict:store(QPid, MRef, QMons)}; - false -> State - end. - -demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (dict:is_key(QPid, QMons) andalso - not queue_monitor_needed(QPid, State)) of - true -> true = erlang:demonitor(dict:fetch(QPid, QMons)), - State#ch{queue_monitors = dict:erase(QPid, QMons)}; + case not sets:is_element(QPid, QMons) of + true -> erlang:monitor(process, QPid), + State#ch{queue_monitors = sets:add_element(QPid, QMons)}; false -> State end. -queue_monitor_needed(QPid, #ch{queue_consumers = QCons, - blocking = Blocking, - unconfirmed_qm = UQM} = State) -> - StatsEnabled = rabbit_event:stats_level( - State, #ch.stats_timer) =:= fine, - ConsumerMonitored = dict:is_key(QPid, QCons), - QueueBlocked = sets:is_element(QPid, Blocking), - ConfirmMonitored = gb_trees:is_defined(QPid, UQM), - StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1266,18 +1249,46 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). -reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> +reject(DeliveryTag, Requeue, Multiple, + State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, + case TxStatus of + none -> + reject(Requeue, Acked, State1#ch.limiter), + State1; + in_progress -> + State1#ch{uncommitted_nacks = + [{Requeue, Acked} | State1#ch.uncommitted_nacks]} + end}. + +reject(Requeue, Acked, Limiter) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}. - -ack_record(DeliveryTag, ConsumerTag, - _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> - {DeliveryTag, ConsumerTag, {QPid, MsgId}}. + ok = notify_limiter(Limiter, Acked). + +record_sent(ConsumerTag, AckRequired, + Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + State = #ch{unacked_message_q = UAMQ, + next_tag = DeliveryTag, + trace_state = TraceState}) -> + maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), + maybe_incr_redeliver_stats(Redelivered, QPid, State), + rabbit_trace:tap_trace_out(Msg, TraceState), + UAMQ1 = case AckRequired of + true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, + UAMQ); + false -> UAMQ + end, + State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. collect_acks(Q, 0, true) -> {queue:to_list(Q), queue:new()}; @@ -1312,7 +1323,8 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = []}. + uncommitted_acks = [], + uncommitted_nacks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1362,22 +1374,25 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, msg_seq_no = MsgSeqNo}, QNames}, State) -> - {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery), - State1 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State), + {RoutingRes, DeliveredQPids} = + rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), + State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | [{{QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State1). + QPid <- DeliveredQPids]], publish, State2), + State2. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - record_confirm(MsgSeqNo, XName, - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State)); + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State), + record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - record_confirm(MsgSeqNo, XName, - maybe_incr_stats([{XName, 1}], return_not_delivered, State)); + maybe_incr_stats([{XName, 1}], return_not_delivered, State), + record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> @@ -1395,15 +1410,10 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State0#ch{unconfirmed_qm = UQM1}; none -> UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1}) + State0#ch{unconfirmed_qm = UQM1} end end, State#ch{unconfirmed_mq = UMQ1}, QPids). -lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> - State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; -lock_message(false, _MsgStruct, State) -> - State. - send_nacks([], State) -> State; send_nacks(MXs, State = #ch{tx_status = none}) -> @@ -1419,13 +1429,12 @@ send_nacks(_, State) -> send_confirms(State = #ch{tx_status = none, confirmed = []}) -> State; send_confirms(State = #ch{tx_status = none, confirmed = C}) -> - {MsgSeqNos, State1} = - lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> - {[MsgSeqNo | MSNs], - maybe_incr_stats([{ExchangeName, 1}], confirm, - State0)} - end, {[], State}, lists:append(C)), - send_confirms(MsgSeqNos, State1 #ch{confirmed = []}); + MsgSeqNos = + lists:foldl(fun ({MsgSeqNo, XName}, MSNs) -> + maybe_incr_stats([{XName, 1}], confirm, State), + [MsgSeqNo | MSNs] + end, [], lists:append(C)), + send_confirms(MsgSeqNos, State#ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). @@ -1505,26 +1514,21 @@ i(Item, _) -> maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_stats([{QPid, 1}], redeliver, State); -maybe_incr_redeliver_stats(_, _, State) -> - State. +maybe_incr_redeliver_stats(_, _, _State) -> + ok. maybe_incr_stats(QXIncs, Measure, State) -> case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> lists:foldl(fun ({QX, Inc}, State0) -> - incr_stats(QX, Inc, Measure, State0) - end, State, QXIncs); - _ -> State + fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; + _ -> ok end. -incr_stats({QPid, _} = QX, Inc, Measure, State) -> - update_measures(queue_exchange_stats, QX, Inc, Measure), - monitor_queue(QPid, State); -incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> - update_measures(queue_stats, QPid, Inc, Measure), - monitor_queue(QPid, State); -incr_stats(X, Inc, Measure, State) -> - update_measures(exchange_stats, X, Inc, Measure), - State. +incr_stats({_, _} = QX, Inc, Measure) -> + update_measures(queue_exchange_stats, QX, Inc, Measure); +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index a19b6bfdad..dc262b4948 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel_sup). diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index e2561c802e..995c41fbc2 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel_sup_sup). diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index dfb400e37f..c508f1b996 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_client_sup). @@ -28,8 +28,9 @@ -ifdef(use_specs). --spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()). --spec(start_link/2 :: ({'local', atom()}, mfa()) -> +-spec(start_link/1 :: (rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) -> rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl index a0953eab95..adf6e41702 100644 --- a/src/rabbit_command_assembler.erl +++ b/src/rabbit_command_assembler.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_command_assembler). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b2aba2eeb0..12a532b6fc 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_connection_sup). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 20486af52b..6a775adfb0 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -11,13 +11,13 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, diagnostics/1]). +-export([start/0, stop/0, action/5]). -define(RPC_TIMEOUT, infinity). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -49,7 +49,6 @@ (atom(), node(), [string()], [{string(), any()}], fun ((string(), [any()]) -> 'ok')) -> 'ok'). --spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]). -spec(usage/0 :: () -> no_return()). -endif. @@ -67,7 +66,7 @@ start() -> CmdArgsAndOpts -> CmdArgsAndOpts end, Opts1 = [case K of - ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)}; + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; _ -> {K, V} end || {K, V} <- Opts], Command = list_to_atom(Command0), @@ -79,6 +78,12 @@ start() -> io:format(Format ++ " ...~n", Args1) end end, + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, + %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values case catch action(Command, Node, Args, Opts, Inform) of @@ -88,9 +93,11 @@ start() -> false -> io:format("...done.~n") end, rabbit_misc:quit(0); - {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join([atom_to_list(Command) | Args], " ")]), + {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> %% < R15 + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> %% >= R15 + PrintInvalidCommandError(), usage(); {'EXIT', {badarg, _}} -> print_error("invalid parameter: ~p", [Args]), @@ -135,26 +142,7 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) -> print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). print_badrpc_diagnostics(Node) -> - [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)]. - -diagnostics(Node) -> - {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - [{"diagnostics:", []}, - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - {"- unable to connect to epmd on ~s: ~w", - [NodeHost, EpmdReason]}; - {ok, NamePorts} -> - {"- nodes and their ports on ~s: ~p", - [NodeHost, [{list_to_atom(Name), Port} || - {Name, Port} <- NamePorts]]} - end, - {"- current node: ~w", [node()]}, - case init:get_argument(home) of - {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]}; - Other -> {"- no current node home dir: ~p", [Other]} - end, - {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}]. + fmt_stderr(rabbit_nodes:diagnostics([Node]), []). stop() -> ok. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 6f9a46504f..e2928cae7e 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_direct). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 6e29ace71a..f1672f4e7b 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_error_logger). diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 7b6e07c19f..042ab23cbc 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_error_logger_file_h). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 5ae40c784a..4ec141cfd8 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_event). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a15b9be490..83e28c44a8 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange). @@ -355,11 +355,21 @@ peek_serial(XName) -> _ -> undefined end. +invalid_module(T) -> + rabbit_log:warning( + "Could not find exchange type ~s.~n", [T]), + put({xtype_to_module, T}, rabbit_exchange_type_invalid), + rabbit_exchange_type_invalid. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> case get({xtype_to_module, T}) of - undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T), - put({xtype_to_module, T}, Module), - Module; - Module -> Module + undefined -> + case rabbit_registry:lookup_module(exchange, T) of + {ok, Module} -> put({xtype_to_module, T}, Module), + Module; + {error, not_found} -> invalid_module(T) + end; + Module -> + Module end. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index ab3d00dc28..44a08e2437 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type). diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index b485e31f33..4bce42d4da 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_direct). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 3c02972278..cc3fb87c21 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_fanout). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index f09e4aae73..de9979b422 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_headers). diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl new file mode 100644 index 0000000000..8f60f7d898 --- /dev/null +++ b/src/rabbit_exchange_type_invalid.erl @@ -0,0 +1,47 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_exchange_type_invalid). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, serialise_events/0, route/2]). +-export([validate/1, create/2, delete/3, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-include("rabbit_exchange_type_spec.hrl"). + +description() -> + [{name, <<"invalid">>}, + {description, + <<"Dummy exchange type, to be used when the intended one is not found.">> + }]. + +serialise_events() -> false. + +route(#exchange{name = Name, type = Type}, _) -> + rabbit_misc:protocol_error( + precondition_failed, + "Cannot route message through ~s: exchange type ~s not found", + [rabbit_misc:rs(Name), Type]). + +validate(_X) -> ok. +create(_Tx, _X) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 348655b101..84f4f8a9de 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_topic). @@ -52,6 +52,7 @@ validate(_X) -> ok. create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> + trie_remove_all_nodes(X), trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; @@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) -> add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(transaction, #exchange{name = X}, Bs) -> - %% The remove process is split into two distinct phases. In the - %% first phase we gather the lists of bindings and edges to - %% delete, then in the second phase we process all the - %% deletions. This is to prevent interleaving of read/write - %% operations in mnesia that can adversely affect performance. - {ToDelete, Paths} = - lists:foldl( - fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) -> - Path = [{FinalNode, _} | _] = - follow_down_get_path(S, split_topic_key(K)), - {[{FinalNode, D} | Acc], - decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} - end, {[], gb_trees:empty()}, Bs), - - [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], - [trie_remove_edge(X, Parent, Node, W) || - {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], +remove_bindings(transaction, _X, Bs) -> + %% See rabbit_binding:lock_route_tables for the rationale for + %% taking table locks. + case Bs of + [_] -> ok; + _ -> [mnesia:lock({table, T}, write) || + T <- [rabbit_topic_trie_node, + rabbit_topic_trie_edge, + rabbit_topic_trie_binding]] + end, + [begin + Path = [{FinalNode, _} | _] = + follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path) + end || #binding{source = X, key = K, destination = D} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. -maybe_add_path(_X, [{root, none}], PathAcc) -> - PathAcc; -maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> - case gb_trees:is_defined(Node, PathAcc) of - true -> PathAcc; - false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node), - trie_child_count(X, Node)}}, - PathAcc) - end. - -decrement_bindings(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end, - Path, PathAcc). - -decrement_edges(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end, - Path, PathAcc). - -with_path_acc(_X, _Fun, [{root, none}], PathAcc) -> - PathAcc; -with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) -> - {Parent, W, Counts} = gb_trees:get(Node, PathAcc), - NewCounts = Fun(Counts), - NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc), - case NewCounts of - {0, 0} -> decrement_edges(X, ParentPath, - maybe_add_path(X, ParentPath, NewPathAcc)); - _ -> NewPathAcc - end. - - assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). @@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> error -> {error, Acc, Words} end. +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, node_id = Node}, write) of + [] -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath); + _ -> ok + end. + trie_child(X, Node, Word) -> case mnesia:read({rabbit_topic_trie_edge, #trie_edge{exchange_name = X, @@ -199,10 +177,30 @@ trie_bindings(X, Node) -> destination = '$1'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). +trie_update_node_counts(X, Node, Field, Delta) -> + E = case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, + node_id = Node}, write) of + [] -> #topic_trie_node{trie_node = #trie_node{ + exchange_name = X, + node_id = Node}, + edge_count = 0, + binding_count = 0}; + [E0] -> E0 + end, + case setelement(Field, E, element(Field, E) + Delta) of + #topic_trie_node{edge_count = 0, binding_count = 0} -> + ok = mnesia:delete_object(rabbit_topic_trie_node, E, write); + EN -> + ok = mnesia:write(rabbit_topic_trie_node, EN, write) + end. + trie_add_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). trie_remove_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). trie_edge_op(X, FromNode, ToNode, W, Op) -> @@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> write). trie_add_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), trie_binding_op(X, Node, D, fun mnesia:write/3). trie_remove_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), trie_binding_op(X, Node, D, fun mnesia:delete_object/3). trie_binding_op(X, Node, D, Op) -> @@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) -> destination = D}}, write). -trie_child_count(X, Node) -> - count(rabbit_topic_trie_edge, - #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -trie_binding_count(X, Node) -> - count(rabbit_topic_trie_binding, - #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -count(Table, Match) -> - length(mnesia:match_object(Table, Match, read)). +trie_remove_all_nodes(X) -> + remove_all(rabbit_topic_trie_node, + #topic_trie_node{trie_node = #trie_node{exchange_name = X, + _ = '_'}, + _ = '_'}). trie_remove_all_edges(X) -> remove_all(rabbit_topic_trie_edge, @@ -262,7 +250,7 @@ remove_all(Table, Pattern) -> mnesia:match_object(Table, Pattern, write)). new_node_id() -> - rabbit_guid:guid(). + rabbit_guid:gen(). split_topic_key(Key) -> split_topic_key(Key, [], []). @@ -275,4 +263,3 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> split_topic_key(Rest, [C | RevWordAcc], RevResAcc). - diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 5cb8e7b69d..59df14f318 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_file). diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl index da1a6a49ec..a79188abaf 100644 --- a/src/rabbit_framing.erl +++ b/src/rabbit_framing.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% TODO auto-generate diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2d0f5014f2..f4c425ca20 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_guid). @@ -19,7 +19,7 @@ -behaviour(gen_server). -export([start_link/0]). --export([guid/0, string_guid/1, binstring_guid/1]). +-export([gen/0, gen_secure/0, string/2, binary/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -38,9 +38,10 @@ -type(guid() :: binary()). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(guid/0 :: () -> guid()). --spec(string_guid/1 :: (any()) -> string()). --spec(binstring_guid/1 :: (any()) -> binary()). +-spec(gen/0 :: () -> guid()). +-spec(gen_secure/0 :: () -> guid()). +-spec(string/2 :: (guid(), any()) -> string()). +-spec(binary/2 :: (guid(), any()) -> binary()). -endif. @@ -65,11 +66,8 @@ update_disk_serial() -> end, Serial. -%% generate a GUID. -%% -%% The id is only unique within a single cluster and as long as the -%% serial store hasn't been deleted. -guid() -> +%% Generate an un-hashed guid. +fresh() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a %% restart, or ids were generated at a high rate (which causes @@ -78,29 +76,74 @@ guid() -> %% %% A persisted serial number, the node, and a unique reference %% (per node incarnation) uniquely identifies a process in space - %% and time. We combine that with a process-local counter to give - %% us a GUID. - G = case get(guid) of - undefined -> Serial = gen_server:call(?SERVER, serial, infinity), - {{Serial, node(), make_ref()}, 0}; + %% and time. + Serial = gen_server:call(?SERVER, serial, infinity), + {Serial, node(), make_ref()}. + +advance_blocks({B1, B2, B3, B4}, I) -> + %% To produce a new set of blocks, we create a new 32bit block + %% hashing {B5, I}. The new hash is used as last block, and the + %% other three blocks are XORed with it. + %% + %% Doing this is convenient because it avoids cascading conflits, + %% while being very fast. The conflicts are avoided by propagating + %% the changes through all the blocks at each round by XORing, so + %% the only occasion in which a collision will take place is when + %% all 4 blocks are the same and the counter is the same. + %% + %% The range (2^32) is provided explicitly since phash uses 2^27 + %% by default. + B5 = erlang:phash2({B1, I}, 4294967296), + {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}. + +blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>. + +%% generate a GUID. This function should be used when performance is a +%% priority and predictability is not an issue. Otherwise use +%% gen_secure/0. +gen() -> + %% We hash a fresh GUID with md5, split it in 4 blocks, and each + %% time we need a new guid we rotate them producing a new hash + %% with the aid of the counter. Look at the comments in + %% advance_blocks/2 for details. + {BS, I} = case get(guid) of + undefined -> <<B1:32, B2:32, B3:32, B4:32>> = + erlang:md5(term_to_binary(fresh())), + {{B1,B2,B3,B4}, 0}; + {BS0, I0} -> advance_blocks(BS0, I0) + end, + put(guid, {BS, I}), + blocks_to_binary(BS). + +%% generate a non-predictable GUID. +%% +%% The id is only unique within a single cluster and as long as the +%% serial store hasn't been deleted. +%% +%% If you are not concerned with predictability, gen/0 is faster. +gen_secure() -> + %% Here instead of hashing once we hash the GUID and the counter + %% each time, so that the GUID is not predictable. + G = case get(guid_secure) of + undefined -> {fresh(), 0}; {S, I} -> {S, I+1} end, - put(guid, G), + put(guid_secure, G), erlang:md5(term_to_binary(G)). %% generate a readable string representation of a GUID. %% %% employs base64url encoding, which is safer in more contexts than %% plain base64. -string_guid(Prefix) -> +string(G, Prefix) -> Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; ($\/, Acc) -> [$\_ | Acc]; ($\=, Acc) -> Acc; (Chr, Acc) -> [Chr | Acc] - end, [], base64:encode_to_string(guid())). + end, [], base64:encode_to_string(G)). -binstring_guid(Prefix) -> - list_to_binary(string_guid(Prefix)). +binary(G, Prefix) -> + list_to_binary(string(G, Prefix)). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 177ae86859..80b4e768a3 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_heartbeat). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8a08d4b673..9fa6213b94 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_limiter). diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 558e095751..a6b4eeb05f 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_log). @@ -23,8 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([debug/1, debug/2, message/4, info/1, info/2, - warning/1, warning/2, error/1, error/2]). +-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). -define(SERVER, ?MODULE). @@ -32,9 +31,15 @@ -ifdef(use_specs). +-export_type([level/0]). + +-type(category() :: atom()). +-type(level() :: 'info' | 'warning' | 'error'). + -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(debug/1 :: (string()) -> 'ok'). --spec(debug/2 :: (string(), [any()]) -> 'ok'). + +-spec(log/3 :: (category(), level(), string()) -> 'ok'). +-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). -spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). @@ -42,84 +47,47 @@ -spec(error/1 :: (string()) -> 'ok'). -spec(error/2 :: (string(), [any()]) -> 'ok'). --spec(message/4 :: (_,_,_,_) -> 'ok'). - -endif. %%---------------------------------------------------------------------------- - start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). -debug(Fmt) -> - gen_server:cast(?SERVER, {debug, Fmt}). - -debug(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {debug, Fmt, Args}). - -message(Direction, Channel, MethodRecord, Content) -> - gen_server:cast(?SERVER, - {message, Direction, Channel, MethodRecord, Content}). +log(Category, Level, Fmt, Args) when is_list(Args) -> + gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}). -info(Fmt) -> - gen_server:cast(?SERVER, {info, Fmt}). - -info(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {info, Fmt, Args}). - -warning(Fmt) -> - gen_server:cast(?SERVER, {warning, Fmt}). - -warning(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {warning, Fmt, Args}). - -error(Fmt) -> - gen_server:cast(?SERVER, {error, Fmt}). - -error(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {error, Fmt, Args}). +info(Fmt) -> log(default, info, Fmt). +info(Fmt, Args) -> log(default, info, Fmt, Args). +warning(Fmt) -> log(default, warning, Fmt). +warning(Fmt, Args) -> log(default, warning, Fmt, Args). +error(Fmt) -> log(default, error, Fmt). +error(Fmt, Args) -> log(default, error, Fmt, Args). %%-------------------------------------------------------------------- -init([]) -> {ok, none}. +init([]) -> + {ok, CatLevelList} = application:get_env(log_levels), + CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList], + {ok, orddict:from_list(CatLevels)}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({debug, Fmt}, State) -> - io:format("debug:: "), io:format(Fmt), - error_logger:info_msg("debug:: " ++ Fmt), - {noreply, State}; -handle_cast({debug, Fmt, Args}, State) -> - io:format("debug:: "), io:format(Fmt, Args), - error_logger:info_msg("debug:: " ++ Fmt, Args), - {noreply, State}; -handle_cast({message, Direction, Channel, MethodRecord, Content}, State) -> - io:format("~s ch~p ~p~n", - [case Direction of - in -> "-->"; - out -> "<--" end, - Channel, - {MethodRecord, Content}]), - {noreply, State}; -handle_cast({info, Fmt}, State) -> - error_logger:info_msg(Fmt), - {noreply, State}; -handle_cast({info, Fmt, Args}, State) -> - error_logger:info_msg(Fmt, Args), - {noreply, State}; -handle_cast({warning, Fmt}, State) -> - error_logger:warning_msg(Fmt), - {noreply, State}; -handle_cast({warning, Fmt, Args}, State) -> - error_logger:warning_msg(Fmt, Args), - {noreply, State}; -handle_cast({error, Fmt}, State) -> - error_logger:error_msg(Fmt), - {noreply, State}; -handle_cast({error, Fmt, Args}, State) -> - error_logger:error_msg(Fmt, Args), - {noreply, State}; +handle_cast({log, Category, Level, Fmt, Args}, CatLevels) -> + CatLevel = case orddict:find(Category, CatLevels) of + {ok, L} -> L; + error -> level(info) + end, + case level(Level) =< CatLevel of + false -> ok; + true -> (case Level of + info -> fun error_logger:info_msg/2; + warning -> fun error_logger:warning_msg/2; + error -> fun error_logger:error_msg/2 + end)(Fmt, Args) + end, + {noreply, CatLevels}; handle_cast(_Msg, State) -> {noreply, State}. @@ -132,3 +100,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%-------------------------------------------------------------------- + +level(info) -> 3; +level(warning) -> 2; +level(error) -> 1; +level(none) -> 0. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 02f3158f3b..f22ad874e3 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% @@ -178,11 +178,8 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%---------------------------------------------------------------------------- -zero_clamp(Sum) -> - case Sum < ?EPSILON of - true -> 0.0; - false -> Sum - end. +zero_clamp(Sum) when Sum < ?EPSILON -> 0.0; +zero_clamp(Sum) -> Sum. internal_deregister(Pid, Demonitor, State = #state { queue_duration_sum = Sum, @@ -240,26 +237,21 @@ internal_update(State = #state { queue_durations = Durations, fun (Proc = #process { reported = QueueDuration, sent = PrevSendDuration, callback = {M, F, A} }, true) -> - case (case {QueueDuration, PrevSendDuration} of - {infinity, infinity} -> - true; - {infinity, D} -> - DesiredDurationAvg1 < D; - {D, infinity} -> - DesiredDurationAvg1 < D; - {D1, D2} -> - DesiredDurationAvg1 < - lists:min([D1,D2]) - end) of - true -> - ok = erlang:apply( - M, F, A ++ [DesiredDurationAvg1]), - ets:insert( - Durations, - Proc #process {sent = DesiredDurationAvg1}); - false -> - true + case should_send(QueueDuration, PrevSendDuration, + DesiredDurationAvg1) of + true -> ok = erlang:apply( + M, F, A ++ [DesiredDurationAvg1]), + ets:insert( + Durations, + Proc #process { + sent = DesiredDurationAvg1}); + false -> true end end, true, Durations) end, State1. + +should_send(infinity, infinity, _) -> true; +should_send(infinity, D, DD) -> DD < D; +should_send(D, infinity, DD) -> DD < D; +should_send(D1, D2, DD) -> DD < lists:min([D1, D2]). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 8ed2bede32..d0b5bab779 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_coordinator). @@ -325,8 +325,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> true = link(GM), GM end, - {ok, _TRef} = - timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), + ensure_gm_heartbeat(), {ok, #state { q = Q, gm = GM1, monitors = dict:new(), @@ -366,6 +365,11 @@ handle_cast({ensure_monitoring, Pids}, end, Monitors, Pids), noreply(State #state { monitors = Monitors1 }). +handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> + gm:broadcast(GM, heartbeat), + ensure_gm_heartbeat(), + noreply(State); + handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Monitors, death_fun = DeathFun }) -> @@ -419,3 +423,6 @@ noreply(State) -> reply(Reply, State) -> {reply, Reply, State, hibernate}. + +ensure_gm_heartbeat() -> + erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index f60562efe6..64a4a7371e 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_master). @@ -280,8 +280,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. -status(#state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:status(BQS). +status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:status(BQS) ++ + [ {mirror_seen, dict:size(State #state.seen_status)}, + {mirror_senders, sets:size(State #state.known_senders)} ]. invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index baebc52b27..db7d8eccec 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_misc). @@ -136,12 +136,16 @@ add_mirror(Queue, MirrorNode) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> Result = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), - rabbit_log:info( - "Adding mirror of queue ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, Result]), case Result of - {ok, _Pid} -> ok; - _ -> Result + {ok, undefined} -> %% Already running + ok; + {ok, _Pid} -> + rabbit_log:info( + "Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, Result]), + ok; + _ -> + Result end; [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} end diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 73eaed1476..9bf89bce3f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_slave). @@ -90,7 +90,7 @@ }). start_link(Q) -> - gen_server2:start_link(?MODULE, [Q], []). + gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). @@ -98,59 +98,64 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init([#amqqueue { name = QueueName } = Q]) -> - process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), - receive {joined, GM} -> - ok - end, +init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), - {ok, MPid} = - rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - %% ASSERTION - [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], - MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {ok, QPid} - end), - erlang:monitor(process, MPid), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [Self]), - ok = rabbit_memory_monitor:register( - Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), - {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new(), - - synchronised = false + case rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), + {new, QPid}; + [SPid] -> true = rabbit_misc:is_process_alive(SPid), + existing + end + end) of + {new, MPid} -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + erlang:monitor(process, MPid), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [Self]), + ok = rabbit_memory_monitor:register( + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), - ok = gm:broadcast(GM, request_length), - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> - %% Synchronous, "immediate" delivery mode + rabbit_event:notify(queue_slave_created, + infos(?CREATION_EVENT_KEYS, State)), + ok = gm:broadcast(GM, request_length), + {ok, State, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}; + existing -> + ignore + end. +handle_call({deliver, Delivery = #delivery { immediate = true }}, + From, State) -> %% It is safe to reply 'false' here even if a) we've not seen the %% msg via gm, or b) the master dies before we receive the msg via %% gm. In the case of (a), we will eventually receive the msg via @@ -166,8 +171,8 @@ handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> gen_server2:reply(From, false), %% master may deliver it, not us noreply(maybe_enqueue_message(Delivery, false, State)); -handle_call({deliver, Delivery = #delivery {}}, From, State) -> - %% Synchronous, "mandatory" delivery mode +handle_call({deliver, Delivery = #delivery { mandatory = true }}, + From, State) -> gen_server2:reply(From, true), %% amqqueue throws away the result anyway noreply(maybe_enqueue_message(Delivery, true, State)); @@ -208,8 +213,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery {}}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, noreply(maybe_enqueue_message(Delivery, true, State)); handle_cast({set_maximum_since_use, Age}, State) -> @@ -250,6 +259,10 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -447,7 +460,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - MonitoringPids = [begin true = erlang:demonitor(MRef), + MonitoringPids = [begin put({ch_publisher, Pid}, MRef), Pid end || {Pid, MRef} <- dict:to_list(KS)], ok = rabbit_mirror_queue_coordinator:ensure_monitoring( @@ -526,9 +539,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, SS, MonitoringPids), - MTC = dict:from_list( - [{MsgId, {ChPid, MsgSeqNo}} || - {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> + gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + (_, MTC0) -> + MTC0 + end, gb_trees:empty(), MSList), NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), @@ -599,7 +614,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case dict:is_key(ChPid, KS) of false -> ok; - true -> confirm_sender_death(ChPid) + true -> credit_flow:peer_down(ChPid), + confirm_sender_death(ChPid) end, State. diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index fc04ec79ca..8eacb1f3c4 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_slave_sup). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0578cf7d9e..b6d38172b5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_misc). @@ -34,11 +34,11 @@ -export([execute_mnesia_transaction/2]). -export([execute_mnesia_tx_with_tail/1]). -export([ensure_ok/2]). --export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). +-export([tcp_name/3]). -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). --export([format_stderr/2, with_local_io/1, local_info_msg/2]). +-export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). @@ -141,9 +141,6 @@ -spec(execute_mnesia_tx_with_tail/1 :: (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). --spec(makenode/1 :: ({string(), string()} | string()) -> node()). --spec(nodeparts/1 :: (node() | string()) -> {string(), string()}). --spec(cookie_hash/0 :: () -> string()). -spec(tcp_name/3 :: (atom(), inet:ip_address(), rabbit_networking:ip_port()) -> atom()). @@ -155,6 +152,7 @@ -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()). +-spec(format/2 :: (string(), [any()]) -> string()). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(with_local_io/1 :: (fun (() -> A)) -> A). -spec(local_info_msg/2 :: (string(), [any()]) -> 'ok'). @@ -222,7 +220,7 @@ frame_error(MethodName, BinaryFields) -> protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName). amqp_error(Name, ExplanationFormat, Params, Method) -> - Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)), + Explanation = format(ExplanationFormat, Params), #amqp_error{name = Name, explanation = Explanation, method = Method}. protocol_error(Name, ExplanationFormat, Params) -> @@ -276,8 +274,7 @@ val({Type, Value}) -> true -> "~s"; false -> "~w" end, - lists:flatten(io_lib:format("the value '" ++ ValFmt ++ "' of type '~s'", - [Value, Type])). + format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]). %% Normally we'd call mnesia:dirty_read/1 here, but that is quite %% expensive due to general mnesia overheads (figuring out table types @@ -320,8 +317,7 @@ r_arg(VHostPath, Kind, Table, Key) -> end. rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> - lists:flatten(io_lib:format("~s '~s' in vhost '~s'", - [Kind, Name, VHostPath])). + format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]). enable_cover() -> enable_cover(["."]). @@ -337,7 +333,7 @@ enable_cover(Dirs) -> end, ok, Dirs). start_cover(NodesS) -> - {ok, _} = cover:start([makenode(N) || N <- NodesS]), + {ok, _} = cover:start([rabbit_nodes:make(N) || N <- NodesS]), ok. report_cover() -> report_cover(["."]). @@ -418,12 +414,25 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of - {atomic, Result} -> Result; - {aborted, Reason} -> throw({error, Reason}) + case worker_pool:submit( + fun () -> + case mnesia:is_transaction() of + false -> DiskLogBefore = mnesia_dumper:get_log_writes(), + Res = mnesia:sync_transaction(TxFun), + DiskLogAfter = mnesia_dumper:get_log_writes(), + case DiskLogAfter == DiskLogBefore of + true -> Res; + false -> {sync, Res} + end; + true -> mnesia:sync_transaction(TxFun) + end + end) of + {sync, {atomic, Result}} -> mnesia_sync:sync(), Result; + {sync, {aborted, Reason}} -> throw({error, Reason}); + {atomic, Result} -> Result; + {aborted, Reason} -> throw({error, Reason}) end. - %% Like execute_mnesia_transaction/1 with additional Pre- and Post- %% commit function execute_mnesia_transaction(TxFun, PrePostCommitFun) -> @@ -450,29 +459,10 @@ execute_mnesia_tx_with_tail(TxFun) -> ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). -makenode({Prefix, Suffix}) -> - list_to_atom(lists:append([Prefix, "@", Suffix])); -makenode(NodeStr) -> - makenode(nodeparts(NodeStr)). - -nodeparts(Node) when is_atom(Node) -> - nodeparts(atom_to_list(Node)); -nodeparts(NodeStr) -> - case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of - {Prefix, []} -> {_, Suffix} = nodeparts(node()), - {Prefix, Suffix}; - {Prefix, Suffix} -> {Prefix, tl(Suffix)} - end. - -cookie_hash() -> - base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). - tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> list_to_atom( - lists:flatten( - io_lib:format("~w_~s:~w", - [Prefix, inet_parse:ntoa(IPAddress), Port]))). + format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])). %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about @@ -541,6 +531,8 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]), dirty_dump_log1(LH, disk_log:chunk(LH, K)). +format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). + format_stderr(Fmt, Args) -> case os:type() of {unix, _} -> @@ -636,7 +628,7 @@ pid_to_string(Pid) when is_pid(Pid) -> <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> = term_to_binary(Pid), Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])). + format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]). %% inverse of above string_to_pid(Str) -> @@ -728,13 +720,14 @@ gb_trees_foreach(Fun, Tree) -> %% [{"-q",true},{"-p","/"}]} get_options(Defs, As) -> lists:foldl(fun(Def, {AsIn, RsIn}) -> - {AsOut, Value} = case Def of - {flag, Key} -> - get_flag(Key, AsIn); - {option, Key, Default} -> - get_option(Key, Default, AsIn) - end, - {AsOut, [{Key, Value} | RsIn]} + {K, {AsOut, V}} = + case Def of + {flag, Key} -> + {Key, get_flag(Key, AsIn)}; + {option, Key, Default} -> + {Key, get_option(Key, Default, AsIn)} + end, + {AsOut, [{K, V} | RsIn]} end, {As, []}, Defs). get_option(K, _Default, [K, V | As]) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c8c18843a8..4d419fd9a9 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% @@ -23,8 +23,8 @@ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, record_running_nodes/0, read_previously_running_nodes/0, - delete_previously_running_nodes/0, running_nodes_filename/0, - is_disc_node/0, on_node_down/1, on_node_up/1]). + running_nodes_filename/0, is_disc_node/0, on_node_down/1, + on_node_up/1]). -export([table_names/0]). @@ -64,7 +64,6 @@ -spec(read_cluster_nodes_config/0 :: () -> [node()]). -spec(record_running_nodes/0 :: () -> 'ok'). -spec(read_previously_running_nodes/0 :: () -> [node()]). --spec(delete_previously_running_nodes/0 :: () -> 'ok'). -spec(running_nodes_filename/0 :: () -> file:filename()). -spec(is_disc_node/0 :: () -> boolean()). -spec(on_node_up/1 :: (node()) -> 'ok'). @@ -98,12 +97,13 @@ status() -> init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config(), true, - fun maybe_upgrade_local_or_record_desired/0), + Nodes = read_cluster_nodes_config(), + ok = init_db(Nodes, should_be_disc_node(Nodes)), %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - let's %% make it so. ok = global:sync(), + ok = delete_previously_running_nodes(), ok. is_db_empty() -> @@ -174,8 +174,7 @@ cluster(ClusterNodes, Force) -> %% Join the cluster start_mnesia(), try - ok = init_db(ClusterNodes, Force, - fun maybe_upgrade_local_or_record_desired/0), + ok = init_db(ClusterNodes, Force), ok = create_cluster_nodes_config(ClusterNodes) after stop_mnesia() @@ -268,6 +267,11 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}, + {type, ordered_set}, + {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, {rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, {attributes, record_info(fields, topic_trie_edge)}, @@ -314,12 +318,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_node_match() -> + #trie_node{ exchange_name = exchange_name_match(), _='_'}. trie_edge_match() -> - #trie_edge{exchange_name = exchange_name_match(), - _='_'}. + #trie_edge{ exchange_name = exchange_name_match(), _='_'}. trie_binding_match() -> - #trie_binding{exchange_name = exchange_name_match(), - _='_'}. + #trie_binding{exchange_name = exchange_name_match(), _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> @@ -496,6 +500,18 @@ delete_previously_running_nodes() -> FileName, Reason}}) end. +init_db(ClusterNodes, Force) -> + init_db( + ClusterNodes, Force, + fun () -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + %% If we're just starting up a new node we won't have a + %% version + version_not_available -> ok = rabbit_version:record_desired() + end + end). + %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow @@ -504,20 +520,12 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of + {ok, []} when not Force andalso ProperClusterNodes =/= [] -> + throw({error, {failed_to_cluster_with, ProperClusterNodes, + "Mnesia could not connect to any disc nodes."}}); {ok, Nodes} -> - case Force of - false -> FailedClusterNodes = ProperClusterNodes -- Nodes, - case FailedClusterNodes of - [] -> ok; - _ -> throw({error, {failed_to_cluster_with, - FailedClusterNodes, - "Mnesia could not connect " - "to some nodes."}}) - end; - true -> ok - end, - WantDiscNode = should_be_disc_node(ClusterNodes), WasDiscNode = is_disc_node(), + WantDiscNode = should_be_disc_node(ClusterNodes), %% We create a new db (on disk, or in ram) in the first %% two cases and attempt to upgrade the in the other two case {Nodes, WasDiscNode, WantDiscNode} of @@ -567,14 +575,6 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. -maybe_upgrade_local_or_record_desired() -> - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ok; - %% If we're just starting up a new node we won't have a - %% version - version_not_available -> ok = rabbit_version:record_desired() - end. - schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -622,10 +622,9 @@ move_db() -> stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(), - BackupDir = lists:flatten( - io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", - [MnesiaDir, - Year, Month, Day, Hour, Minute, Second])), + BackupDir = rabbit_misc:format( + "~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", + [MnesiaDir, Year, Month, Day, Hour, Minute, Second]), case file:rename(MnesiaDir, BackupDir) of ok -> %% NB: we cannot use rabbit_log here since it may not have @@ -733,16 +732,18 @@ reset(Force) -> false -> ok end, Node = node(), + Nodes = all_clustered_nodes() -- [Node], case Force of true -> ok; false -> ensure_mnesia_dir(), start_mnesia(), - {Nodes, RunningNodes} = + RunningNodes = try - ok = init(), - {all_clustered_nodes() -- [Node], - running_clustered_nodes() -- [Node]} + %% Force=true here so that reset still works when clustered + %% with a node which is down + ok = init_db(read_cluster_nodes_config(), true), + running_clustered_nodes() -- [Node] after stop_mnesia() end, @@ -750,6 +751,10 @@ reset(Force) -> rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), cannot_delete_schema) end, + %% We need to make sure that we don't end up in a distributed + %% Erlang system with nodes while not being in an Mnesia cluster + %% with them. We don't handle that well. + [erlang:disconnect_node(N) || N <- Nodes], ok = delete_cluster_nodes_config(), %% remove persisted messages and any other garbage we find ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index b7de27d4b5..f685b109a9 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_file). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e6a32b9023..56265136dd 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store). @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2]). + write/3, write_flow/3, read/2, contains/2, remove/2]). -export([set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -152,6 +152,7 @@ -spec(close_all_indicated/1 :: (client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). +-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). @@ -436,7 +437,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( - Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), + Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun}, + infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -460,12 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write(MsgId, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, - client_ref = CRef }) -> - ok = client_update_flying(+1, MsgId, CState), - ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), - ok = server_cast(CState, {write, CRef, MsgId}). +write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) -> + credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND), + client_write(MsgId, Msg, flow, CState). + +write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> @@ -500,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) -> server_cast(#client_msstate { server = Server }, Msg) -> gen_server2:cast(Server, Msg). +client_write(MsgId, Msg, Flow, + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> + ok = client_update_flying(+1, MsgId, CState), + ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), + ok = server_cast(CState, {write, CRef, MsgId, Flow}). + client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of @@ -666,7 +674,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> recover_index_and_client_refs(IndexModule, FileSummaryRecovered, ClientRefs, Dir, Server), Clients = dict:from_list( - [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]), + [{CRef, {undefined, undefined, undefined}} || + CRef <- ClientRefs1]), %% CleanShutdown => msg location index and file_summary both %% recovered correctly. true = case {FileSummaryRecovered, CleanShutdown} of @@ -731,10 +740,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7; - {read, _MsgId} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7; + {read, _MsgId} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -755,7 +764,7 @@ prioritise_info(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, +handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From, State = #msstate { dir = Dir, index_state = IndexState, index_module = IndexModule, @@ -765,7 +774,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> - Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), + Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts}, State #msstate { clients = Clients1 }); @@ -789,11 +798,19 @@ handle_cast({client_dying, CRef}, handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> + {CPid, _, _} = dict:fetch(CRef, Clients), + credit_flow:peer_down(CPid), State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); -handle_cast({write, CRef, MsgId}, - State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> +handle_cast({write, CRef, MsgId, Flow}, + State = #msstate { cur_file_cache_ets = CurFileCacheEts, + clients = Clients }) -> + case Flow of + flow -> {CPid, _, _} = dict:fetch(CRef, Clients), + credit_flow:ack(CPid, ?CREDIT_DISC_BOUND); + noflow -> ok + end, true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), case update_flying(-1, MsgId, CRef, State) of process -> @@ -1204,10 +1221,10 @@ update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, cref_to_msg_ids = CTM }) -> case dict:fetch(CRef, Clients) of - {undefined, _CloseFDsFun} -> State; - {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), - State #msstate { - cref_to_msg_ids = CTM1 } + {_CPid, undefined, _CloseFDsFun} -> State; + {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), + State #msstate { + cref_to_msg_ids = CTM1 } end. record_pending_confirm(CRef, MsgId, State) -> @@ -1294,8 +1311,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> case (ets:update_element(FileHandlesEts, Key, {2, close}) andalso Invoke) of true -> case dict:fetch(Ref, ClientRefs) of - {_MsgOnDiskFun, undefined} -> ok; - {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun() + {_CPid, _MsgOnDiskFun, undefined} -> + ok; + {_CPid, _MsgOnDiskFun, CloseFDsFun} -> + ok = CloseFDsFun() end; false -> ok end diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index d6dc556878..9c31439f51 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_ets_index). diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 77f1f04ee2..3b61ed0bd7 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_gc). diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl index ef8b7cdfed..2f36256c61 100644 --- a/src/rabbit_msg_store_index.erl +++ b/src/rabbit_msg_store_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_index). diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index b944ec81a1..02889b93a3 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_net). @@ -19,7 +19,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, - sockname/1, peername/1, peercert/1]). + sockname/1, peername/1, peercert/1, connection_string/2]). %%--------------------------------------------------------------------------- @@ -62,6 +62,8 @@ -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). +-spec(connection_string/2 :: + (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). -endif. @@ -141,3 +143,19 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock). peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); peercert(Sock) when is_port(Sock) -> nossl. + +connection_string(Sock, Direction) -> + {From, To} = case Direction of + inbound -> {fun peername/1, fun sockname/1}; + outbound -> {fun sockname/1, fun peername/1} + end, + case {From(Sock), To(Sock)} of + {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> + {ok, rabbit_misc:format("~s:~p -> ~s:~p", + [rabbit_misc:ntoab(FromAddress), FromPort, + rabbit_misc:ntoab(ToAddress), ToPort])}; + {{error, _Reason} = Error, _} -> + Error; + {_, {error, _Reason} = Error} -> + Error + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 045ab89a70..825d1bb1e0 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_networking). @@ -24,7 +24,7 @@ close_connection/2, force_connection_event_refresh/0]). %%used by TCP-based transports, e.g. STOMP adapter --export([check_tcp_listener_address/2, +-export([tcp_listener_addresses/1, tcp_listener_spec/6, ensure_ssl/0, ssl_transform_fun/1]). -export([tcp_listener_started/3, tcp_listener_stopped/3, @@ -47,12 +47,16 @@ -export_type([ip_port/0, hostname/0]). -type(hostname() :: inet:hostname()). --type(ip_port() :: inet:ip_port()). +-type(ip_port() :: inet:port_number()). -type(family() :: atom()). -type(listener_config() :: ip_port() | {hostname(), ip_port()} | {hostname(), ip_port(), family()}). +-type(address() :: {inet:ip_address(), ip_port(), family()}). +-type(name_prefix() :: atom()). +-type(protocol() :: atom()). +-type(label() :: string()). -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). @@ -76,8 +80,10 @@ -spec(force_connection_event_refresh/0 :: () -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(check_tcp_listener_address/2 :: (atom(), listener_config()) - -> [{inet:ip_address(), ip_port(), family(), atom()}]). +-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]). +-spec(tcp_listener_spec/6 :: + (name_prefix(), address(), [gen_tcp:listen_option()], protocol(), + label(), rabbit_types:mfargs()) -> supervisor:child_spec()). -spec(ensure_ssl/0 :: () -> rabbit_types:infos()). -spec(ssl_transform_fun/1 :: (rabbit_types:infos()) @@ -140,39 +146,6 @@ start() -> transient, infinity, supervisor, [rabbit_client_sup]}), ok. -%% inet_parse:address takes care of ip string, like "0.0.0.0" -%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, -%% and runs 'inet_gethost' port process for dns lookups. -%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. - -getaddr(Host, Family) -> - case inet_parse:address(Host) of - {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; - {error, _} -> gethostaddr(Host, Family) - end. - -gethostaddr(Host, auto) -> - Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], - case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of - [] -> host_lookup_error(Host, Lookups); - IPs -> IPs - end; - -gethostaddr(Host, Family) -> - case inet:getaddr(Host, Family) of - {ok, IPAddress} -> [{IPAddress, Family}]; - {error, Reason} -> host_lookup_error(Host, Reason) - end. - -host_lookup_error(Host, Reason) -> - error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}). - -resolve_family({_,_,_,_}, auto) -> inet; -resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; -resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); -resolve_family(_, F) -> F. - ensure_ssl() -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), @@ -191,8 +164,6 @@ ssl_transform_fun(SslOpts) -> fun (Sock) -> case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection ~p to SSL~n", - [self()]), {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; {error, Reason} -> {error, {ssl_upgrade_error, Reason}}; @@ -201,31 +172,36 @@ ssl_transform_fun(SslOpts) -> end end. -check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) -> - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {"auto", Port}) -> +tcp_listener_addresses(Port) when is_integer(Port) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({"auto", Port}) -> %% Variant to prevent lots of hacking around in bash and batch files - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {Host, Port}) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({Host, Port}) -> %% auto: determine family IPv4 / IPv6 after converting to IP address - check_tcp_listener_address(NamePrefix, {Host, Port, auto}); - -check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) -> - if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; - true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", - [Port]), - throw({error, {invalid_port, Port}}) - end, - [{IPAddress, Port, Family, - rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} || - {IPAddress, Family} <- getaddr(Host, Family0)]. - -check_tcp_listener_address_auto(NamePrefix, Port) -> - lists:append([check_tcp_listener_address(NamePrefix, Listener) || + tcp_listener_addresses({Host, Port, auto}); +tcp_listener_addresses({Host, Port, Family0}) + when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> + [{IPAddress, Port, Family} || + {IPAddress, Family} <- getaddr(Host, Family0)]; +tcp_listener_addresses({_Host, Port, _Family0}) -> + error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), + throw({error, {invalid_port, Port}}). + +tcp_listener_addresses_auto(Port) -> + lists:append([tcp_listener_addresses(Listener) || Listener <- port_to_listeners(Port)]). +tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts, + Protocol, Label, OnConnect) -> + {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), + {tcp_listener_sup, start_link, + [IPAddress, Port, [Family | SocketOpts], + {?MODULE, tcp_listener_started, [Protocol]}, + {?MODULE, tcp_listener_stopped, [Protocol]}, + OnConnect, Label]}, + transient, infinity, supervisor, [tcp_listener_sup]}. + start_tcp_listener(Listener) -> start_listener(Listener, amqp, "TCP Listener", {?MODULE, start_client, []}). @@ -235,27 +211,26 @@ start_ssl_listener(Listener, SslOpts) -> {?MODULE, start_ssl_client, [SslOpts]}). start_listener(Listener, Protocol, Label, OnConnect) -> - [start_listener0(Spec, Protocol, Label, OnConnect) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [start_listener0(Address, Protocol, Label, OnConnect) || + Address <- tcp_listener_addresses(Listener)], ok. -start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> - {ok,_} = supervisor:start_child( - rabbit_sup, - {Name, - {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | tcp_opts()], - {?MODULE, tcp_listener_started, [Protocol]}, - {?MODULE, tcp_listener_stopped, [Protocol]}, - OnConnect, Label]}, - transient, infinity, supervisor, [tcp_listener_sup]}). +start_listener0(Address, Protocol, Label, OnConnect) -> + Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(), + Protocol, Label, OnConnect), + case supervisor:start_child(rabbit_sup, Spec) of + {ok, _} -> ok; + {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address, + exit({could_not_start_tcp_listener, + {rabbit_misc:ntoa(IPAddress), Port}}) + end. stop_tcp_listener(Listener) -> - [stop_tcp_listener0(Spec) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [stop_tcp_listener0(Address) || + Address <- tcp_listener_addresses(Listener)], ok. -stop_tcp_listener0({IPAddress, Port, _Family, Name}) -> +stop_tcp_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). @@ -294,6 +269,16 @@ start_client(Sock, SockTransform) -> {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Reader), Reader ! {go, Sock, SockTransform}, + + %% In the event that somebody floods us with connections, the + %% reader processes can spew log events at error_logger faster + %% than it can keep up, causing its mailbox to grow unbounded + %% until we eat all the memory available and crash. So here is a + %% meaningless synchronous call to the underlying gen_event + %% mechanism. When it returns the mailbox is drained, and we + %% return to our caller to accept more connetions. + gen_event:which_handlers(error_logger), + Reader. start_client(Sock) -> @@ -363,6 +348,38 @@ tcp_opts() -> {ok, Opts} = application:get_env(rabbit, tcp_listen_options), Opts. +%% inet_parse:address takes care of ip string, like "0.0.0.0" +%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, +%% and runs 'inet_gethost' port process for dns lookups. +%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. +getaddr(Host, Family) -> + case inet_parse:address(Host) of + {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; + {error, _} -> gethostaddr(Host, Family) + end. + +gethostaddr(Host, auto) -> + Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], + case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of + [] -> host_lookup_error(Host, Lookups); + IPs -> IPs + end; + +gethostaddr(Host, Family) -> + case inet:getaddr(Host, Family) of + {ok, IPAddress} -> [{IPAddress, Family}]; + {error, Reason} -> host_lookup_error(Host, Reason) + end. + +host_lookup_error(Host, Reason) -> + error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}). + +resolve_family({_,_,_,_}, auto) -> inet; +resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; +resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); +resolve_family(_, F) -> F. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 8aa24ab53e..323cf0ce9e 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_node_monitor). @@ -55,29 +55,32 @@ notify_cluster() -> {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) end, %% register other active rabbits with this rabbit - [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ], + [ rabbit_running_on(N) || N <- Nodes ], ok. %%-------------------------------------------------------------------- init([]) -> - {ok, no_state}. + {ok, ordsets:new()}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node}, State) -> - rabbit_log:info("rabbit on ~p up~n", [Node]), - erlang:monitor(process, {rabbit, Node}), - ok = handle_live_rabbit(Node), - {noreply, State}; +handle_cast({rabbit_running_on, Node}, Nodes) -> + case ordsets:is_element(Node, Nodes) of + true -> {noreply, Nodes}; + false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + ok = handle_live_rabbit(Node), + {noreply, ordsets:add_element(Node, Nodes)} + end; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> - rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) -> + rabbit_log:info("rabbit on node ~p down~n", [Node]), ok = handle_dead_rabbit(Node), - {noreply, State}; + {noreply, ordsets:del_element(Node, Nodes)}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl new file mode 100644 index 0000000000..329c07dc9f --- /dev/null +++ b/src/rabbit_nodes.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_nodes). + +-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]). + +-define(EPMD_TIMEOUT, 30000). + +%%---------------------------------------------------------------------------- +%% Specs +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(names/1 :: (string()) -> rabbit_types:ok_or_error2( + [{string(), integer()}], term())). +-spec(diagnostics/1 :: ([node()]) -> string()). +-spec(make/1 :: ({string(), string()} | string()) -> node()). +-spec(parts/1 :: (node() | string()) -> {string(), string()}). +-spec(cookie_hash/0 :: () -> string()). + +-endif. + +%%---------------------------------------------------------------------------- + +names(Hostname) -> + Self = self(), + process_flag(trap_exit, true), + Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), + timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), + Res = receive + {names, Names} -> Names; + {'EXIT', Pid, Reason} -> {error, Reason} + end, + process_flag(trap_exit, false), + Res. + +diagnostics(Nodes) -> + Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]), + NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n" + "nodes in question: ~p~n~n" + "hosts, their running nodes and ports:", [Nodes]}] ++ + [diagnostics_host(Host) || Host <- Hosts] ++ + diagnostics0(), + lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags, + {F, A} <- [NodeDiag]]). + +diagnostics0() -> + [{"~ncurrent node details:~n- node name: ~w", [node()]}, + case init:get_argument(home) of + {ok, [[Home]]} -> {"- home dir: ~s", [Home]}; + Other -> {"- no home dir: ~p", [Other]} + end, + {"- cookie hash: ~s", [cookie_hash()]}]. + +diagnostics_host(Host) -> + case names(Host) of + {error, EpmdReason} -> + {"- unable to connect to epmd on ~s: ~w", + [Host, EpmdReason]}; + {ok, NamePorts} -> + {"- ~s: ~p", + [Host, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]} + end. + +make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix])); +make(NodeStr) -> make(parts(NodeStr)). + +parts(Node) when is_atom(Node) -> + parts(atom_to_list(Node)); +parts(NodeStr) -> + case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of + {Prefix, []} -> {_, Suffix} = parts(node()), + {Prefix, Suffix}; + {Prefix, Suffix} -> {Prefix, tl(Suffix)} + end. + +cookie_hash() -> + base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 0862f1b2c7..7b85ab15fc 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_plugins). @@ -55,13 +55,20 @@ start() -> CmdArgsAndOpts -> CmdArgsAndOpts end, Command = list_to_atom(Command0), + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join([atom_to_list(Command) | Args], " ")]), + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> + PrintInvalidCommandError(), usage(); {error, Reason} -> print_error("~p", [Reason]), @@ -111,8 +118,7 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", NewImplicitlyEnabled -- ImplicitlyEnabled), - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n") + report_change() end; action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> @@ -140,8 +146,7 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> print_list("The following plugins have been disabled:", ImplicitlyEnabled -- NewImplicitlyEnabled), write_enabled_plugins(PluginsFile, NewEnabled), - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n") + report_change() end. %%---------------------------------------------------------------------------- @@ -327,6 +332,9 @@ lookup_plugins(Names, AllPlugins) -> read_enabled_plugins(PluginsFile) -> case rabbit_file:read_term_file(PluginsFile) of {ok, [Plugins]} -> Plugins; + {ok, []} -> []; + {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, + PluginsFile}}); {error, enoent} -> []; {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, PluginsFile, Reason}}) @@ -374,3 +382,17 @@ maybe_warn_mochiweb(Enabled) -> false -> ok end. + +report_change() -> + io:format("Plugin configuration has changed. " + "Restart RabbitMQ for changes to take effect.~n"), + case os:type() of + {win32, _OsName} -> + io:format("If you have RabbitMQ running as a service then you must" + " reinstall by running~n rabbitmq-service.bat stop~n" + " rabbitmq-service.bat install~n" + " rabbitmq-service.bat start~n~n"); + _ -> + ok + end. + diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 50444dc49d..162d44f13e 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_prelaunch). @@ -22,7 +22,6 @@ -define(BaseApps, [rabbit]). -define(ERROR_CODE, 1). --define(EPMD_TIMEOUT, 30000). %%---------------------------------------------------------------------------- %% Specs @@ -244,16 +243,15 @@ duplicate_node_check([]) -> %% Ignore running node while installing windows service ok; duplicate_node_check(NodeStr) -> - Node = rabbit_misc:makenode(NodeStr), - {NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - case names(NodeHost) of + Node = rabbit_nodes:make(NodeStr), + {NodeName, NodeHost} = rabbit_nodes:parts(Node), + case rabbit_nodes:names(NodeHost) of {ok, NamePorts} -> case proplists:is_defined(NodeName, NamePorts) of true -> io:format("node with name ~p " "already running on ~p~n", [NodeName, NodeHost]), - [io:format(Fmt ++ "~n", Args) || - {Fmt, Args} <- rabbit_control:diagnostics(Node)], + io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"), terminate(?ERROR_CODE); false -> ok end; @@ -279,15 +277,3 @@ terminate(Status) -> after infinity -> ok end end. - -names(Hostname) -> - Self = self(), - process_flag(trap_exit, true), - Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), - timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), - Res = receive - {names, Names} -> Names; - {'EXIT', Pid, Reason} -> {error, Reason} - end, - process_flag(trap_exit, false), - Res. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 9b45e79869..df957d883c 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_queue_collector). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f03c1d1c76..4c8793f1ad 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_queue_index). @@ -491,7 +491,7 @@ recover_message(false, _, no_del, RelSeq, Segment) -> queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), - lists:flatten(io_lib:format("~.36B", [Num])). + rabbit_misc:format("~.36B", [Num]). queues_dir() -> filename:join(rabbit_mnesia:dir(), "queues"). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 694abd9e49..01242e81b8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_reader). @@ -27,11 +27,9 @@ -export([conserve_memory/2, server_properties/1]). --export([process_channel_frame/5]). %% used by erlang-client - -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). --define(CLOSING_TIMEOUT, 1). +-define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). @@ -40,10 +38,12 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, conserve_memory, + last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, channels]). + send_pend, state, last_blocked_by, last_blocked_age, + channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, @@ -90,10 +90,6 @@ -spec(system_continue/3 :: (_,_,#v1{}) -> any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). --spec(process_channel_frame/5 :: - (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(), - tuple()) -> tuple()). - -endif. %%-------------------------------------------------------------------------- @@ -177,25 +173,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> server_capabilities(_) -> []. +log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; - {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", - [self(), Reason]), - rabbit_log:info("closing TCP connection ~p~n", - [self()]), + {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", + [self(), Reason]), exit(normal) end. start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), - PeerAddressS = rabbit_misc:ntoab(PeerAddress), - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), + ConnStr = socket_op(Sock, fun (Sock0) -> + rabbit_net:connection_string( + Sock0, inbound) + end), + log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), @@ -220,21 +217,22 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, auth_mechanism = none, - auth_state = none}, + auth_state = none, + conserve_memory = false, + last_blocked_by = none, + last_blocked_at = never}, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), - handshake, 8)) + handshake, 8)), + log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr]) catch - Ex -> (if Ex == connection_closed_abruptly -> - fun rabbit_log:warning/2; - true -> - fun rabbit_log:error/2 - end)("exception on TCP connection ~p from ~s:~p~n~p~n", - [self(), PeerAddressS, PeerPort, Ex]) + Ex -> log(case Ex of + connection_closed_abruptly -> warning; + _ -> error + end, "closing AMQP connection ~p (~s):~n~p~n", + [self(), ConnStr, Ex]) after - rabbit_log:info("closing TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), %% We don't close the socket explicitly. The reader is the %% controlling process and hence its termination will close %% the socket. Furthermore, gen_tcp:close/1 waits for pending @@ -267,21 +265,20 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], buf_len = BufLen + size(Data), pending_recv = false}); - closed -> if State#v1.connection_state =:= closed -> - State; - true -> - throw(connection_closed_abruptly) + closed -> case State#v1.connection_state of + closed -> State; + _ -> throw(connection_closed_abruptly) end; {error, Reason} -> throw({inet_error, Reason}); {other, Other} -> handle_other(Other, Deb, State) end. handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, internal_conserve_memory(Conserve, State)); + recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(State)); + mainloop(Deb, maybe_close(control_throttle(State))); handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -329,22 +326,25 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> catch Error -> {error, Error} end), mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> +handle_other({'$gen_cast', force_event_refresh}, Deb, State) + when ?IS_RUNNING(State) -> rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), mainloop(Deb, State); +handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + %% Ignore, we will emit a created event once we start running. + mainloop(Deb, State); handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); +handle_other({bump_credit, Msg}, Deb, State) -> + credit_flow:handle_bump_msg(Msg), + recvloop(Deb, control_throttle(State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). -switch_callback(State = #v1{connection_state = blocked, - heartbeater = Heartbeater}, Callback, Length) -> - ok = rabbit_heartbeat:pause_monitor(Heartbeater), - State#v1{callback = Callback, recv_len = Length}; switch_callback(State, Callback, Length) -> State#v1{callback = Callback, recv_len = Length}. @@ -355,17 +355,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(true, State = #v1{connection_state = running}) -> - State#v1{connection_state = blocking}; -internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> - State#v1{connection_state = running}; -internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater}) -> - ok = rabbit_heartbeat:resume_monitor(Heartbeater), - State#v1{connection_state = running}; -internal_conserve_memory(_Conserve, State) -> +control_throttle(State = #v1{connection_state = CS, + conserve_memory = Mem}) -> + case {CS, Mem orelse credit_flow:blocked()} of + {running, true} -> State#v1{connection_state = blocking}; + {blocking, false} -> State#v1{connection_state = running}; + {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( + State#v1.heartbeater), + State#v1{connection_state = running}; + {blocked, true} -> update_last_blocked_by(State); + {_, _} -> State + end. + +maybe_block(State = #v1{connection_state = blocking}) -> + ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + update_last_blocked_by(State#v1{connection_state = blocked, + last_blocked_at = erlang:now()}); +maybe_block(State) -> State. +update_last_blocked_by(State = #v1{conserve_memory = true}) -> + State#v1{last_blocked_by = mem}; +update_last_blocked_by(State = #v1{conserve_memory = false}) -> + State#v1{last_blocked_by = flow}. + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -376,34 +389,30 @@ close_connection(State = #v1{queue_collector = Collector, rabbit_queue_collector:delete_all(Collector), %% We terminate the connection after the specified interval, but %% no later than ?CLOSING_TIMEOUT seconds. - TimeoutMillisec = - 1000 * if TimeoutSec > 0 andalso - TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; - true -> ?CLOSING_TIMEOUT - end, - erlang:send_after(TimeoutMillisec, self(), terminate_connection), + erlang:send_after((if TimeoutSec > 0 andalso + TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; + true -> ?CLOSING_TIMEOUT + end) * 1000, self(), terminate_connection), State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> - case termination_kind(Reason) of - controlled -> - channel_cleanup(ChPid), - maybe_close(State); - uncontrolled -> - case channel_cleanup(ChPid) of - undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> rabbit_log:error( - "connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), - maybe_close( - handle_exception(State, Channel, Reason)) - end + case {channel_cleanup(ChPid), termination_kind(Reason)} of + {undefined, uncontrolled} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {_Channel, controlled} -> + maybe_close(control_throttle(State)); + {Channel, uncontrolled} -> + log(error, "AMQP connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close(handle_exception(control_throttle(State), + Channel, Reason)) end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of undefined -> undefined; - {Channel, MRef} -> erase({channel, Channel}), + {Channel, MRef} -> credit_flow:peer_down(ChPid), + erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), Channel @@ -432,19 +441,16 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> - case channel_cleanup(ChPid) of - undefined -> + case {channel_cleanup(ChPid), termination_kind(Reason)} of + {undefined, _} -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> - case termination_kind(Reason) of - controlled -> - ok; - uncontrolled -> - rabbit_log:error( - "connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]) - end, + {_Channel, controlled} -> + wait_for_channel_termination(N-1, TimerRef); + {Channel, uncontrolled} -> + log(error, + "AMQP connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), wait_for_channel_termination(N-1, TimerRef) end; cancel_wait -> @@ -493,41 +499,38 @@ handle_frame(Type, Channel, Payload, case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - AnalyzedFrame -> - case get({channel, Channel}) of - {ChPid, FramingState} -> - NewAState = process_channel_frame( - AnalyzedFrame, self(), - Channel, ChPid, FramingState), - put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(AnalyzedFrame, ChPid, State); - undefined -> - case ?IS_RUNNING(State) of - true -> send_to_new_channel( - Channel, AnalyzedFrame, State); - false -> throw({channel_frame_while_starting, - Channel, State#v1.connection_state, - AnalyzedFrame}) - end - end + AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State) + end. + +process_frame(Frame, Channel, State) -> + case get({channel, Channel}) of + {ChPid, AState} -> + case process_channel_frame(Frame, ChPid, AState) of + {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {error, Reason} -> handle_exception(State, Channel, Reason) + end; + undefined when ?IS_RUNNING(State) -> + ok = create_channel(Channel, State), + process_frame(Frame, Channel, State); + undefined -> + throw({channel_frame_while_starting, + Channel, State#v1.connection_state, Frame}) end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), - State; + control_throttle(State); post_process_frame({method, MethodName, _}, _ChPid, State = #v1{connection = #connection{ protocol = Protocol}}) -> case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), - case State#v1.connection_state of - blocking -> State#v1{connection_state = blocked}; - _ -> State - end; - false -> State + maybe_block(control_throttle(State)); + false -> control_throttle(State) end; post_process_frame(_Frame, _ChPid, State) -> - State. + control_throttle(State). handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( @@ -695,10 +698,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - State1 = internal_conserve_memory( - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State1 = control_throttle( State#v1{connection_state = running, - connection = NewConnection}), + connection = NewConnection, + conserve_memory = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -830,6 +834,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; fun ([{_, I}]) -> I end); i(state, #v1{connection_state = S}) -> S; +i(last_blocked_by, #v1{last_blocked_by = By}) -> + By; +i(last_blocked_age, #v1{last_blocked_at = never}) -> + infinity; +i(last_blocked_age, #v1{last_blocked_at = T}) -> + timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); i(protocol, #v1{connection = #connection{protocol = none}}) -> @@ -885,7 +895,7 @@ cert_info(F, Sock) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, State) -> +create_channel(Channel, State) -> #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, connection = #connection{protocol = Protocol, @@ -898,23 +908,19 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), - NewAState = process_channel_frame(AnalyzedFrame, self(), - Channel, ChPid, AState), - put({channel, Channel}, {ChPid, NewAState}), put({ch_pid, ChPid}, {Channel, MRef}), - State. + put({channel, Channel}, {ChPid, AState}), + ok. -process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> +process_channel_frame(Frame, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> NewAState; + {ok, NewAState} -> {ok, NewAState}; {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - NewAState; - {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, - Method, Content), - NewAState; - {error, Reason} -> ErrPid ! {channel_exit, Channel, - Reason}, - AState + {ok, NewAState}; + {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( + ChPid, Method, Content), + {ok, NewAState}; + {error, Reason} -> {error, Reason} end. handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 9821ae7b86..8c0ebcbe94 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_registry). diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index cda3ccbe0f..237ab78cac 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_restartable_sup). @@ -28,7 +28,8 @@ -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 31f5ad14ea..f4bbda0f7a 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -11,28 +11,24 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_router). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([deliver/2, match_bindings/2, match_routing_key/2]). +-export([match_bindings/2, match_routing_key/2]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([routing_key/0, routing_result/0, match_result/0]). +-export_type([routing_key/0, match_result/0]). -type(routing_key() :: binary()). --type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). --type(qpids() :: [pid()]). -type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). -spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> match_result()). @@ -44,38 +40,6 @@ %%---------------------------------------------------------------------------- -deliver([], #delivery{mandatory = false, - immediate = false}) -> - %% /dev/null optimisation - {routed, []}; - -deliver(QNames, Delivery = #delivery{mandatory = false, - immediate = false}) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. - QPids = lookup_qpids(QNames), - delegate:invoke_no_result( - QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - {routed, QPids}; - -deliver(QNames, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> - QPids = lookup_qpids(QNames), - {Success, _} = - delegate:invoke(QPids, - fun (Pid) -> - rabbit_amqqueue:deliver(Pid, Delivery) - end), - {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Mandatory, Immediate, {Routed, Handled}). - - %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source match_bindings(SrcName, Match) -> @@ -104,26 +68,6 @@ match_routing_key(SrcName, [_|_] = RoutingKeys) -> %%-------------------------------------------------------------------- -fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]}; -fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. - -%% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) -check_delivery(true, _ , {false, []}) -> {unroutable, []}; -check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; -check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. - -%% Normally we'd call mnesia:dirty_read/1 here, but that is quite -%% expensive for the reasons explained in rabbit_misc:dirty_read/1. -lookup_qpids(QNames) -> - lists:foldl(fun (QName, QPids) -> - case ets:lookup(rabbit_queue, QName) of - [#amqqueue{pid = QPid, slave_pids = SPids}] -> - [QPid | SPids ++ QPids]; - [] -> - QPids - end - end, [], QNames). - %% Normally we'd call mnesia:dirty_select/2 here, but that is quite %% expensive for the same reasons as above, and, additionally, due to %% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 963294d924..e8beecfe0a 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_sasl_report_file_h). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index e0defa9e96..3025d981d4 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_ssl). @@ -21,7 +21,7 @@ -include_lib("public_key/include/public_key.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). --export([peer_cert_subject_item/2]). +-export([peer_cert_subject_items/2]). %%-------------------------------------------------------------------------- @@ -34,8 +34,8 @@ -spec(peer_cert_issuer/1 :: (certificate()) -> string()). -spec(peer_cert_subject/1 :: (certificate()) -> string()). -spec(peer_cert_validity/1 :: (certificate()) -> string()). --spec(peer_cert_subject_item/2 :: - (certificate(), tuple()) -> string() | 'not_found'). +-spec(peer_cert_subject_items/2 :: + (certificate(), tuple()) -> [string()] | 'not_found'). -endif. @@ -59,8 +59,8 @@ peer_cert_subject(Cert) -> format_rdn_sequence(Subject) end, Cert). -%% Return a part of the certificate's subject. -peer_cert_subject_item(Cert, Type) -> +%% Return the parts of the certificate's subject. +peer_cert_subject_items(Cert, Type) -> cert_info(fun(#'OTPCertificate' { tbsCertificate = #'OTPTBSCertificate' { subject = Subject }}) -> @@ -72,9 +72,8 @@ peer_cert_validity(Cert) -> cert_info(fun(#'OTPCertificate' { tbsCertificate = #'OTPTBSCertificate' { validity = {'Validity', Start, End} }}) -> - lists:flatten( - io_lib:format("~s - ~s", [format_asn1_value(Start), - format_asn1_value(End)])) + rabbit_misc:format("~s - ~s", [format_asn1_value(Start), + format_asn1_value(End)]) end, Cert). %%-------------------------------------------------------------------------- @@ -89,8 +88,8 @@ find_by_type(Type, {rdnSequence, RDNs}) -> case [V || #'AttributeTypeAndValue'{type = T, value = V} <- lists:flatten(RDNs), T == Type] of - [Val] -> format_asn1_value(Val); - [] -> not_found + [] -> not_found; + L -> [format_asn1_value(V) || V <- L] end. %%-------------------------------------------------------------------------- @@ -150,11 +149,12 @@ escape_rdn_value([$ ], middle) -> escape_rdn_value([C | S], middle) when C =:= $"; C =:= $+; C =:= $,; C =:= $;; C =:= $<; C =:= $>; C =:= $\\ -> [$\\, C | escape_rdn_value(S, middle)]; -escape_rdn_value([C | S], middle) when C < 32 ; C =:= 127 -> - %% only U+0000 needs escaping, but for display purposes it's handy - %% to escape all non-printable chars - lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++ - escape_rdn_value(S, middle); +escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 -> + %% Of ASCII characters only U+0000 needs escaping, but for display + %% purposes it's handy to escape all non-printable chars. All non-ASCII + %% characters get converted to UTF-8 sequences and then escaped. We've + %% already got a UTF-8 sequence here, so just escape it. + rabbit_misc:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle); escape_rdn_value([C | S], middle) -> [C | escape_rdn_value(S, middle)]. @@ -167,6 +167,10 @@ format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2, $Z]}) -> io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); +%% We appear to get an untagged value back for an ia5string +%% (e.g. domainComponent). +format_asn1_value(V) when is_list(V) -> + V; format_asn1_value(V) -> io_lib:format("~p", [V]). diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 802ea5e2e7..0965e3b36e 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_sup). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 00d46f5a56..433ed9cb59 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_tests). @@ -59,7 +59,7 @@ all_tests() -> passed. maybe_run_cluster_dependent_tests() -> - SecondaryNode = rabbit_misc:makenode("hare"), + SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of pong -> passed = run_cluster_dependent_tests(SecondaryNode); @@ -71,10 +71,13 @@ maybe_run_cluster_dependent_tests() -> run_cluster_dependent_tests(SecondaryNode) -> SecondaryNodeS = atom_to_list(SecondaryNode), + cover:stop(SecondaryNode), ok = control_action(stop_app, []), ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), + cover:start(SecondaryNode), + ok = control_action(start_app, SecondaryNode, [], []), io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), @@ -859,7 +862,7 @@ test_cluster_management() -> "invalid2@invalid"]), ok = assert_ram_node(), - SecondaryNode = rabbit_misc:makenode("hare"), + SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of pong -> passed = test_cluster_management2(SecondaryNode); pang -> io:format("Skipping clustering tests with node ~p~n", @@ -889,6 +892,14 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), ok = assert_ram_node(), + %% ram node will not start by itself + ok = control_action(stop_app, []), + ok = control_action(stop_app, SecondaryNode, [], []), + {error, _} = control_action(start_app, []), + ok = control_action(start_app, SecondaryNode, [], []), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + %% change cluster config while remaining in same cluster ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), ok = control_action(start_app, []), @@ -897,8 +908,7 @@ test_cluster_management2(SecondaryNode) -> %% join non-existing cluster as a ram node ok = control_action(force_cluster, ["invalid1@invalid", "invalid2@invalid"]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), + {error, _} = control_action(start_app, []), ok = assert_ram_node(), %% join empty cluster as a ram node (converts to disc) @@ -953,7 +963,9 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), + cover:stop(SecondaryNode), ok = control_action(reset, []), + cover:start(SecondaryNode), %% attempt to leave cluster when no other node is alive ok = control_action(cluster, [SecondaryNodeS, NodeS]), @@ -970,7 +982,15 @@ test_cluster_management2(SecondaryNode) -> %% leave system clustered, with the secondary node as a ram node ok = control_action(force_reset, []), ok = control_action(start_app, []), - ok = control_action(force_reset, SecondaryNode, [], []), + %% Yes, this is rather ugly. But since we're a clustered Mnesia + %% node and we're telling another clustered node to reset itself, + %% we will get disconnected half way through causing a + %% badrpc. This never happens in real life since rabbitmqctl is + %% not a clustered Mnesia node. + cover:stop(SecondaryNode), + {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []), + pong = net_adm:ping(SecondaryNode), + cover:start(SecondaryNode), ok = control_action(cluster, SecondaryNode, [NodeS], []), ok = control_action(start_app, SecondaryNode, [], []), @@ -1779,10 +1799,10 @@ test_msg_store() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), {Cap, MSCState} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref), - Ref2 = rabbit_guid:guid(), + Ref2 = rabbit_guid:gen(), {Cap2, MSC2State} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref2), %% check we don't contain any of the msgs we're about to publish @@ -1934,7 +1954,7 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) -> passed. test_msg_store_confirm_timer() -> - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), MSCState = rabbit_msg_store:client_init( @@ -1963,7 +1983,7 @@ msg_store_keep_busy_until_confirm(MsgIds, MSCState) -> test_msg_store_client_delete_and_terminate() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), ok = msg_store_write(MsgIds, MSCState), %% test the 'dying client' fast path for writes @@ -1979,7 +1999,7 @@ test_queue() -> init_test_queue() -> TestQueue = test_queue(), Terms = rabbit_queue_index:shutdown_terms(TestQueue), - PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), + PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( TestQueue, Terms, false, @@ -2013,7 +2033,7 @@ restart_app() -> rabbit:start(). queue_index_publish(SeqIds, Persistent, Qi) -> - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MsgStore = case Persistent of true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE @@ -2022,7 +2042,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> {A, B = [{_SeqId, LastMsgIdWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> - MsgId = rabbit_guid:guid(), + MsgId = rabbit_guid:gen(), QiM = rabbit_queue_index:publish( MsgId, SeqId, #message_properties{}, Persistent, QiN), ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), @@ -2045,7 +2065,7 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> - MsgId = rabbit_guid:guid(), + MsgId = rabbit_guid:gen(), Props = #message_properties{expiry=12345}, Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = @@ -2222,17 +2242,29 @@ test_amqqueue(Durable) -> #amqqueue { durable = Durable }. with_fresh_variable_queue(Fun) -> - ok = empty_test_queue(), - VQ = variable_queue_init(test_amqqueue(true), false), - S0 = rabbit_variable_queue:status(VQ), - assert_props(S0, [{q1, 0}, {q2, 0}, - {delta, {delta, undefined, 0, undefined}}, - {q3, 0}, {q4, 0}, - {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), + Ref = make_ref(), + Me = self(), + %% Run in a separate process since rabbit_msg_store will send + %% bump_credit messages and we want to ignore them + spawn_link(fun() -> + ok = empty_test_queue(), + VQ = variable_queue_init(test_amqqueue(true), false), + S0 = rabbit_variable_queue:status(VQ), + assert_props(S0, [{q1, 0}, {q2, 0}, + {delta, + {delta, undefined, 0, undefined}}, + {q3, 0}, {q4, 0}, + {len, 0}]), + _ = rabbit_variable_queue:delete_and_terminate( + shutdown, Fun(VQ)), + Me ! Ref + end), + receive + Ref -> ok + end, passed. -publish_and_confirm(QPid, Payload, Count) -> +publish_and_confirm(Q, Payload, Count) -> Seqs = lists:seq(1, Count), [begin Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), @@ -2240,7 +2272,7 @@ publish_and_confirm(QPid, Payload, Count) -> Payload), Delivery = #delivery{mandatory = false, immediate = false, sender = self(), message = Msg, msg_seq_no = Seq}, - true = rabbit_amqqueue:deliver(QPid, Delivery) + {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). @@ -2477,7 +2509,7 @@ test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - publish_and_confirm(QPid, <<>>, Count), + publish_and_confirm(Q, <<>>, Count), exit(QPid, kill), MRef = erlang:monitor(process, QPid), @@ -2507,7 +2539,7 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:declare(test_queue(), true, false, [], none), Payload = <<0:8388608>>, %% 1MB Count = 30, - publish_and_confirm(QPid, Payload, Count), + publish_and_confirm(Q, Payload, Count), rabbit_amqqueue:set_ram_duration_target(QPid, 0), diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl index abcbe0b6d5..72c07b51ec 100644 --- a/src/rabbit_tests_event_receiver.erl +++ b/src/rabbit_tests_event_receiver.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_tests_event_receiver). diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 58079ccf47..3a5b96de4f 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_trace). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 2db960acc9..732c29b6b6 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_types). @@ -28,12 +28,9 @@ binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, internal_user/0, - username/0, password/0, password_hash/0, ok/1, error/1, - ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, - connection_exit/0]). - --type(channel_exit() :: no_return()). --type(connection_exit() :: no_return()). + username/0, password/0, password_hash/0, + ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, + channel_exit/0, connection_exit/0, mfargs/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +153,9 @@ -type(ok_or_error2(A, B) :: ok(A) | error(B)). -type(ok_pid_or_error() :: ok_or_error2(pid(), any())). +-type(channel_exit() :: no_return()). +-type(connection_exit() :: no_return()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -endif. % use_specs diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 717d94a802..80f50b38b3 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_upgrade). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index e0ca8cbb72..9f2535bd04 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_upgrade_functions). @@ -35,6 +35,7 @@ -rabbit_upgrade({gm, mnesia, []}). -rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). -rabbit_upgrade({mirrored_supervisor, mnesia, []}). +-rabbit_upgrade({topic_trie_node, mnesia, []}). %% ------------------------------------------------------------------- @@ -54,6 +55,7 @@ -spec(gm/0 :: () -> 'ok'). -spec(exchange_scratch/0 :: () -> 'ok'). -spec(mirrored_supervisor/0 :: () -> 'ok'). +-spec(topic_trie_node/0 :: () -> 'ok'). -endif. @@ -177,6 +179,12 @@ mirrored_supervisor() -> [{record_name, mirrored_sup_childspec}, {attributes, [key, mirroring_pid, childspec]}]). +topic_trie_node() -> + create(rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, [trie_node, edge_count, binding_count]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 63a0927f77..52eb168a42 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_variable_queue). @@ -434,7 +434,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, Terms1} = case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:guid(), []}; + undefined -> {rabbit_guid:gen(), []}; PRef1 -> {PRef1, Terms} end, PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, @@ -860,7 +860,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> Res. msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> - msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback). + msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, + Callback). msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), @@ -870,17 +871,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end). + fun (MSCState1) -> + rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) + end). msg_store_read(MSCState, IsPersistent, MsgId) -> with_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end). + fun (MSCState1) -> + rabbit_msg_store:read(MsgId, MSCState1) + end). msg_store_remove(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). + fun (MCSState1) -> + rabbit_msg_store:remove(MsgIds, MCSState1) + end). msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index f6bcbb7fd5..7545d81362 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_version). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 38bb76b03b..5548ef6d05 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_vhost). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 091b50e4c6..dc74b2f5b0 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_writer). @@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; +handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> + rabbit_amqqueue:notify_sent_queue_down(QPid), + State; handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> @@ -169,12 +172,10 @@ call(Pid, Msg) -> %%--------------------------------------------------------------------------- assemble_frame(Channel, MethodRecord, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, none), rabbit_binary_generator:build_simple_method_frame( Channel, MethodRecord, Protocol). assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( diff --git a/src/supervisor2.erl b/src/supervisor2.erl index f75da87221..a2f4fae980 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -41,7 +41,7 @@ %% 5) normal, and {shutdown, _} exit reasons are all treated the same %% (i.e. are regarded as normal exits) %% -%% All modifications are (C) 2010-2011 VMware, Inc. +%% All modifications are (C) 2010-2012 VMware, Inc. %% %% %CopyrightBegin% %% @@ -717,8 +717,8 @@ do_terminate(Child, SupName) when Child#child.pid =/= undefined -> ok; {error, normal} -> case Child#child.restart_type of - permanent -> ReportError(normal); - {permanent, _Delay} -> ReportError(normal); + permanent -> ReportError(normal, Child); + {permanent, _Delay} -> ReportError(normal, Child); _ -> ok end; {error, OtherReason} -> diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 0d50683db7..43a6bc994b 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_acceptor). @@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), - try - %% report - {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), - {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), - error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [rabbit_misc:ntoab(Address), Port, - rabbit_misc:ntoab(PeerAddress), PeerPort]), - %% In the event that somebody floods us with connections we can spew - %% the above message at error_logger faster than it can keep up. - %% So error_logger's mailbox grows unbounded until we eat all the - %% memory available and crash. So here's a meaningless synchronous call - %% to the underlying gen_event mechanism - when it returns the mailbox - %% is drained. - gen_event:which_handlers(error_logger), - %% handle - file_handle_cache:transfer(apply(M, F, A ++ [Sock])), - ok = file_handle_cache:obtain() - catch {inet_error, Reason} -> - gen_tcp:close(Sock), - error_logger:error_msg("unable to accept TCP connection: ~p~n", - [Reason]) - end, + %% handle + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain(), %% accept more accept(State); @@ -86,6 +67,16 @@ handle_info({inet_async, LSock, Ref, {error, closed}}, %% know this will fail. {stop, normal, State}; +handle_info({inet_async, LSock, Ref, {error, Reason}}, + State=#state{sock=LSock, ref=Ref}) -> + {AddressS, Port} = case inet:sockname(LSock) of + {ok, {A, P}} -> {rabbit_misc:ntoab(A), P}; + {error, _} -> {"unknown", unknown} + end, + error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n", + [AddressS, Port, Reason]), + accept(State); + handle_info(_Info, State) -> {noreply, State}. @@ -97,8 +88,6 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). - accept(State = #state{sock=LSock}) -> case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index 4c835598e0..d8844441d2 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_acceptor_sup). @@ -25,7 +25,11 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + +-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()). + -endif. %%---------------------------------------------------------------------------- diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index ad2a0d02d0..fb01c792f4 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_listener). @@ -28,9 +28,14 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(), - atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + integer(), atom(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). + -endif. %%-------------------------------------------------------------------- @@ -67,8 +72,9 @@ init({IPAddress, Port, SocketOpts, label = Label}}; {error, Reason} -> error_logger:error_msg( - "failed to start ~s on ~s:~p - ~p~n", - [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]), + "failed to start ~s on ~s:~p - ~p (~s)~n", + [Label, rabbit_misc:ntoab(IPAddress), Port, + Reason, inet:format_error(Reason)]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 5bff5c2701..9ee921b423 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_listener_sup). @@ -26,12 +26,16 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/7 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), integer(), string()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/test_sup.erl b/src/test_sup.erl index 5feb146f64..7f4b5049fe 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(test_sup). diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 8973a4f750..fca55f02f1 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% In practice Erlang shouldn't be allowed to grow to more than a half diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 456ff39f47..c9ecccd6b6 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool). @@ -37,10 +37,11 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). --spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/1 :: - (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). +-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). -spec(idle/1 :: (any()) -> 'ok'). -endif. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index d37c3a0fd6..ff356366ac 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool_sup). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 78ab4df3ea..1ddcebb23d 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool_worker). @@ -29,12 +29,12 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). --spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/2 :: - (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). --spec(run/1 :: (fun (() -> A)) -> A; - ({atom(), atom(), [any()]}) -> any()). +-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). +-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). +-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. |
