diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-08 01:28:25 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-08 01:28:25 +0100 |
| commit | 861a14118826c0edb58fc26521819da0841e65e7 (patch) | |
| tree | 1c88d20c6885abc1a6fe7ddcbe980aacb6d32f8e | |
| parent | 910debb20f4cc6ea4982c27495722bfeb4441a64 (diff) | |
| parent | bc6df57d4b683ebbb73f3866c086a1fd285770bd (diff) | |
| download | rabbitmq-server-git-861a14118826c0edb58fc26521819da0841e65e7.tar.gz | |
Merging default into bug 23133
41 files changed, 1696 insertions, 1391 deletions
@@ -237,7 +237,7 @@ distclean: clean %.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \ xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \ + xmlto -o $(DOCS_DIR) $$opt man $<.tmp && \ gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 33552e17cc..be1ee70b5e 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -704,7 +704,7 @@ <variablelist> <varlistentry> <term>name</term> - <listitem><para>The name of the queue with non-ASCII characters URL-escaped.</para></listitem> + <listitem><para>The name of the queue with non-ASCII characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> <term>durable</term> @@ -795,7 +795,7 @@ <variablelist> <varlistentry> <term>name</term> - <listitem><para>The name of the exchange with non-ASCII characters URL-escaped.</para></listitem> + <listitem><para>The name of the exchange with non-ASCII characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> <term>type</term> @@ -830,22 +830,58 @@ </para> </listitem> </varlistentry> - </variablelist> - <variablelist> - <varlistentry> - <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> + <varlistentry role="usage-has-option-list"> + <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>bindinginfoitem</replaceable> ...</arg></cmdsynopsis></term> <listitem> <para> - By default the bindings for the <command>/</command> virtual - host are returned. The "-p" flag can be used to override - this default. Each result row will contain an exchange - name, queue name, routing key and binding arguments, in - that order. Non-ASCII characters will be URL-encoded. + Returns binding details. By default the bindings for + the <command>/</command> virtual host are returned. The + "-p" flag can be used to override this default. </para> - <para role="usage"> - The output format for "list_bindings" is a list of rows containing - exchange name, queue name, routing key and arguments, in that order. + <para> + The <command>bindinginfoitem</command> parameter is used + to indicate which binding information items to include + in the results. The column order in the results will + match the order of the parameters. + <command>bindinginfoitem</command> can take any value + from the list that follows: + </para> + <variablelist> + <varlistentry> + <term>exchange_name</term> + <listitem><para>The name of the exchange to which the + binding is attached. with non-ASCII characters + escaped as in C.</para></listitem> + </varlistentry> + <varlistentry> + <term>queue_name</term> + <listitem><para>The name of the queue to which the + binding is attached. with non-ASCII characters + escaped as in C.</para></listitem> + </varlistentry> + <varlistentry> + <term>routing_key</term> + <listitem><para>The binding's routing key, with + non-ASCII characters escaped as in C.</para></listitem> + </varlistentry> + <varlistentry> + <term>arguments</term> + <listitem><para>The binding's arguments.</para></listitem> + </varlistentry> + </variablelist> + <para> + If no <command>bindinginfoitem</command>s are specified then + all above items are displayed. + </para> + <para role="example-prefix"> + For example: + </para> + <screen role="example">rabbitmqctl list_bindings -p /myvhost exchange_name queue_name</screen> + <para role="example"> + This command displays the exchange name and queue name + of the bindings in the virtual host + named <command>/myvhost</command>. </para> </listitem> </varlistentry> @@ -904,7 +940,7 @@ </varlistentry> <varlistentry> <term>vhost</term> - <listitem><para>Virtual host name with non-ASCII characters URL-escaped.</para></listitem> + <listitem><para>Virtual host name with non-ASCII characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> <term>timeout</term> diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index c0d9aedae1..17518ddf68 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -127,6 +127,9 @@ done rm -rf %{buildroot} %changelog +* Mon Aug 23 2010 mikeb@rabbitmq.com 2.0.0-1 +- New Upstream Release + * Wed Jul 14 2010 Emile Joubert <emile@rabbitmq.com> 1.8.1-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 0dccf93879..7ee6001630 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.0.0-1) karmic; urgency=low + + * New Upstream Release + + -- Michael Bridgen <mikeb@rabbitmq.com> Mon, 23 Aug 2010 14:55:39 +0100 + rabbitmq-server (1.8.1-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index 4eade6c744..c4e01f4a4b 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION) dist: - $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) \ diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index f47b534083..abe174e08e 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_ZIP=rabbitmq-server-windows-$(VERSION) dist: - $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 74567d090f..aecfb09694 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -130,7 +130,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0]). +-export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -148,7 +149,8 @@ -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). --define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). +-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)). +-define(CLIENT_ETS_TABLE, ?MODULE). %%---------------------------------------------------------------------------- @@ -182,11 +184,26 @@ obtain_limit, obtain_count, obtain_pending, - callbacks, - counts, + clients, timer_ref }). +-record(cstate, + { pid, + callback, + opened, + obtained, + blocked, + pending_closes + }). + +-record(pending, + { kind, + pid, + requested, + from + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -224,6 +241,9 @@ -spec(clear/1 :: (ref()) -> ok_or_error()). -spec(obtain/0 :: () -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). +-spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). +-spec(get_limit/0 :: () -> non_neg_integer()). +-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). -endif. @@ -250,9 +270,9 @@ open(Path, Mode, Options) -> IsWriter = is_writer(Mode1), case IsWriter andalso HasWriter of true -> {error, writer_exists}; - false -> Ref = make_ref(), - case open1(Path1, Mode1, Options, Ref, bof, new) of - {ok, _Handle} -> + false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options), + case get_or_reopen([{Ref, new}]) of + {ok, [_Handle1]} -> RCount1 = case is_reader(Mode1) of true -> RCount + 1; false -> RCount @@ -263,6 +283,7 @@ open(Path, Mode, Options) -> has_writer = HasWriter1 }), {ok, Ref}; Error -> + erase({Ref, fhc_handle}), Error end end. @@ -433,8 +454,8 @@ set_maximum_since_use(MaximumAge) -> case lists:foldl( fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> - Age = timer:now_diff(Now, Then), - case Hdl =/= closed andalso Age >= MaximumAge of + case Hdl =/= closed andalso + timer:now_diff(Now, Then) >= MaximumAge of true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; @@ -451,6 +472,12 @@ obtain() -> transfer(Pid) -> gen_server:cast(?SERVER, {transfer, self(), Pid}). +set_limit(Limit) -> + gen_server:call(?SERVER, {set_limit, Limit}, infinity). + +get_limit() -> + gen_server:call(?SERVER, get_limit, infinity). + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -466,18 +493,9 @@ append_to_write(Mode) -> end. with_handles(Refs, Fun) -> - ResHandles = lists:foldl( - fun (Ref, {ok, HandlesAcc}) -> - case get_or_reopen(Ref) of - {ok, Handle} -> {ok, [Handle | HandlesAcc]}; - Error -> Error - end; - (_Ref, Error) -> - Error - end, {ok, []}, Refs), - case ResHandles of + case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of {ok, Handles} -> - case Fun(lists:reverse(Handles)) of + case Fun(Handles) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), Result; @@ -506,17 +524,80 @@ with_flushed_handles(Refs, Fun) -> end end). -get_or_reopen(Ref) -> - case get({Ref, fhc_handle}) of - undefined -> - {error, not_open, Ref}; - #handle { hdl = closed, offset = Offset, - path = Path, mode = Mode, options = Options } -> - open1(Path, Mode, Options, Ref, Offset, reopen); - Handle -> - {ok, Handle} +get_or_reopen(RefNewOrReopens) -> + case partition_handles(RefNewOrReopens) of + {OpenHdls, []} -> + {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; + {OpenHdls, ClosedHdls} -> + Oldest = oldest(get_age_tree(), fun () -> now() end), + case gen_server:call(?SERVER, {open, self(), length(ClosedHdls), + Oldest}, infinity) of + ok -> + case reopen(ClosedHdls) of + {ok, RefHdls} -> sort_handles(RefNewOrReopens, + OpenHdls, RefHdls, []); + Error -> Error + end; + close -> + [soft_close(Ref, Handle) || + {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <- + get(), + Hdl =/= closed], + get_or_reopen(RefNewOrReopens) + end end. +reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []). + +reopen([], Tree, RefHdls) -> + put_age_tree(Tree), + {ok, lists:reverse(RefHdls)}; +reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, + path = Path, + mode = Mode, + offset = Offset, + last_used_at = undefined }} | + RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> + case file:open(Path, case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end) of + {ok, Hdl} -> + Now = now(), + {{ok, Offset1}, Handle1} = + maybe_seek(Offset, Handle #handle { hdl = Hdl, + offset = 0, + last_used_at = Now }), + Handle2 = Handle1 #handle { trusted_offset = Offset1 }, + put({Ref, fhc_handle}, Handle2), + reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), + [{Ref, Handle2} | RefHdls]); + Error -> + %% NB: none of the handles in ToOpen are in the age tree + Oldest = oldest(Tree, fun () -> undefined end), + [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], + put_age_tree(Tree), + Error + end. + +partition_handles(RefNewOrReopens) -> + lists:foldr( + fun ({Ref, NewOrReopen}, {Open, Closed}) -> + case get({Ref, fhc_handle}) of + #handle { hdl = closed } = Handle -> + {Open, [{Ref, NewOrReopen, Handle} | Closed]}; + #handle {} = Handle -> + {[{Ref, Handle} | Open], Closed} + end + end, {[], []}, RefNewOrReopens). + +sort_handles([], [], [], Acc) -> + {ok, lists:reverse(Acc)}; +sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]); +sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]). + put_handle(Ref, Handle = #handle { last_used_at = Then }) -> Now = now(), age_tree_update(Then, Now, Ref), @@ -532,21 +613,6 @@ get_age_tree() -> put_age_tree(Tree) -> put(fhc_age_tree, Tree). -age_tree_insert(Now, Ref) -> - Tree = get_age_tree(), - Tree1 = gb_trees:insert(Now, Ref, Tree), - {Oldest, _Ref} = gb_trees:smallest(Tree1), - case gen_server:call(?SERVER, {open, self(), Oldest, - not gb_trees:is_empty(Tree)}, infinity) of - ok -> - put_age_tree(Tree1); - close -> - [soft_close(Ref1, Handle1) || - {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(), - Hdl1 =/= closed], - age_tree_insert(Now, Ref) - end. - age_tree_update(Then, Now, Ref) -> with_age_tree( fun (Tree) -> @@ -557,13 +623,7 @@ age_tree_delete(Then) -> with_age_tree( fun (Tree) -> Tree1 = gb_trees:delete_any(Then, Tree), - Oldest = case gb_trees:is_empty(Tree1) of - true -> - undefined; - false -> - {Oldest1, _Ref} = gb_trees:smallest(Tree1), - Oldest1 - end, + Oldest = oldest(Tree1, fun () -> undefined end), gen_server:cast(?SERVER, {close, self(), Oldest}), Tree1 end). @@ -579,44 +639,37 @@ age_tree_change() -> Tree end). -open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> - Mode1 = case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end, - Now = now(), - age_tree_insert(Now, Ref), - case file:open(Path, Mode1) of - {ok, Hdl} -> - WriteBufferSize = - case proplists:get_value(write_buffer, Options, unbuffered) of - unbuffered -> 0; - infinity -> infinity; - N when is_integer(N) -> N - end, - Handle = #handle { hdl = Hdl, - offset = 0, - trusted_offset = 0, - is_dirty = false, - write_buffer_size = 0, - write_buffer_size_limit = WriteBufferSize, - write_buffer = [], - at_eof = false, - path = Path, - mode = Mode, - options = Options, - is_write = is_writer(Mode), - is_read = is_reader(Mode), - last_used_at = Now }, - {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), - Handle2 = Handle1 #handle { trusted_offset = Offset1 }, - put({Ref, fhc_handle}, Handle2), - {ok, Handle2}; - {error, Reason} -> - age_tree_delete(Now), - {error, Reason} +oldest(Tree, DefaultFun) -> + case gb_trees:is_empty(Tree) of + true -> DefaultFun(); + false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + Oldest end. +new_closed_handle(Path, Mode, Options) -> + WriteBufferSize = + case proplists:get_value(write_buffer, Options, unbuffered) of + unbuffered -> 0; + infinity -> infinity; + N when is_integer(N) -> N + end, + Ref = make_ref(), + put({Ref, fhc_handle}, #handle { hdl = closed, + offset = 0, + trusted_offset = 0, + is_dirty = false, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, + write_buffer = [], + at_eof = false, + path = Path, + mode = Mode, + options = Options, + is_write = is_writer(Mode), + is_read = is_reader(Mode), + last_used_at = undefined }), + {ok, Ref}. + soft_close(Ref, Handle) -> {Res, Handle1} = soft_close(Handle), case Res of @@ -630,7 +683,9 @@ soft_close(Handle = #handle { hdl = closed }) -> {ok, Handle}; soft_close(Handle) -> case write_buffer(Handle) of - {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty, + {ok, #handle { hdl = Hdl, + offset = Offset, + is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of true -> file:sync(Hdl); @@ -638,8 +693,10 @@ soft_close(Handle) -> end, ok = file:close(Hdl), age_tree_delete(Then), - {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset, - is_dirty = false }}; + {ok, Handle1 #handle { hdl = closed, + trusted_offset = Offset, + is_dirty = false, + last_used_at = undefined }}; {_Error, _Handle} = Result -> Result end. @@ -726,150 +783,212 @@ init([]) -> Watermark > 0) -> Watermark; _ -> - ulimit() + case ulimit() of + infinity -> infinity; + unknown -> ?FILE_HANDLES_LIMIT_OTHER; + Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS]) + end end, - ObtainLimit = case Limit of - infinity -> infinity; - _ -> ?OBTAIN_LIMIT(Limit) - end, + ObtainLimit = obtain_limit(Limit), error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), + Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), {ok, #fhc_state { elders = dict:new(), limit = Limit, open_count = 0, - open_pending = [], + open_pending = pending_new(), obtain_limit = ObtainLimit, obtain_count = 0, - obtain_pending = [], - callbacks = dict:new(), - counts = dict:new(), + obtain_pending = pending_new(), + clients = Clients, timer_ref = undefined }}. -handle_call({open, Pid, EldestUnusedSince, CanClose}, From, +handle_call({open, Pid, Requested, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, - elders = Elders }) -> + elders = Elders, + clients = Clients }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - Item = {open, Pid, From}, - case maybe_reduce(ensure_mref(Pid, State #fhc_state { - open_count = Count + 1, - elders = Elders1 })) of - {true, State1} -> - State2 = State1 #fhc_state { open_count = Count }, - case CanClose of - true -> {reply, close, State2}; - false -> {noreply, State2 #fhc_state { - open_pending = [Item | Pending], - elders = dict:erase(Pid, Elders1) }} - end; - {false, State1} -> - {noreply, run_pending_item(Item, State1)} + Item = #pending { kind = open, + pid = Pid, + requested = Requested, + from = From }, + ok = track_client(Pid, Clients), + State1 = State #fhc_state { elders = Elders1 }, + case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of + true -> case ets:lookup(Clients, Pid) of + [#cstate { opened = 0 }] -> + true = ets:update_element( + Clients, Pid, {#cstate.blocked, true}), + {noreply, + reduce(State1 #fhc_state { + open_pending = pending_in(Item, Pending) })}; + [#cstate { opened = Opened }] -> + true = ets:update_element( + Clients, Pid, + {#cstate.pending_closes, Opened}), + {reply, close, State1} + end; + false -> {noreply, run_pending_item(Item, State1)} end; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - elders = Elders }) + clients = Clients }) when Limit =/= infinity andalso Count >= Limit -> - Item = {obtain, Pid, From}, - {noreply, ensure_mref(Pid, State #fhc_state { - obtain_pending = [Item | Pending], - elders = dict:erase(Pid, Elders) })}; + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - elders = Elders }) -> - Item = {obtain, Pid, From}, - case maybe_reduce(ensure_mref(Pid, State #fhc_state { - obtain_count = Count + 1 })) of - {true, State1} -> - {noreply, State1 #fhc_state { - obtain_count = Count, - obtain_pending = [Item | Pending], - elders = dict:erase(Pid, Elders) }}; - {false, State1} -> - {noreply, run_pending_item(Item, State1)} - end. + clients = Clients }) -> + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + ok = track_client(Pid, Clients), + case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of + true -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + {noreply, reduce(State #fhc_state { + obtain_pending = pending_in(Item, Pending) })}; + false -> + {noreply, run_pending_item(Item, State)} + end; +handle_call({set_limit, Limit}, _From, State) -> + {reply, ok, maybe_reduce( + process_pending(State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) }))}; +handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> + {reply, Limit, State}. handle_cast({register_callback, Pid, MFA}, - State = #fhc_state { callbacks = Callbacks }) -> - {noreply, ensure_mref( - Pid, State #fhc_state { - callbacks = dict:store(Pid, MFA, Callbacks) })}; - -handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> + State = #fhc_state { clients = Clients }) -> + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}), + {noreply, State}; + +handle_cast({update, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages {noreply, State #fhc_state { elders = Elders1 }}; handle_cast({close, Pid, EldestUnusedSince}, - State = #fhc_state { open_count = Count, - counts = Counts, - elders = Elders }) -> + State = #fhc_state { elders = Elders, clients = Clients }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - Counts1 = update_counts(open, Pid, -1, Counts), - {noreply, process_pending(State #fhc_state { open_count = Count - 1, - counts = Counts1, - elders = Elders1 })}; + ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), + {noreply, process_pending( + update_counts(open, Pid, -1, + State #fhc_state { elders = Elders1 }))}; handle_cast({transfer, FromPid, ToPid}, State) -> - State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State), - Counts1 = update_counts(obtain, FromPid, -1, Counts), - Counts2 = update_counts(obtain, ToPid, +1, Counts1), - {noreply, process_pending(State1 #fhc_state { counts = Counts2 })}; + ok = track_client(ToPid, State#fhc_state.clients), + {noreply, process_pending( + update_counts(obtain, ToPid, +1, + update_counts(obtain, FromPid, -1, State)))}; handle_cast(check_counts, State) -> - {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), - {noreply, State1}. - -handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = - #fhc_state { open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_pending = ObtainPending, - callbacks = Callbacks, - counts = Counts, - elders = Elders }) -> - FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, - OpenPending1 = lists:filter(FilterFun, OpenPending), - ObtainPending1 = lists:filter(FilterFun, ObtainPending), - {Opened, Obtained} = dict:fetch(Pid, Counts), - {noreply, process_pending(State #fhc_state { - open_count = OpenCount - Opened, - open_pending = OpenPending1, - obtain_count = ObtainCount - Obtained, - obtain_pending = ObtainPending1, - callbacks = dict:erase(Pid, Callbacks), - counts = dict:erase(Pid, Counts), - elders = dict:erase(Pid, Elders) })}. - -terminate(_Reason, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #fhc_state { elders = Elders, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, + clients = Clients }) -> + [#cstate { opened = Opened, obtained = Obtained }] = + ets:lookup(Clients, Pid), + true = ets:delete(Clients, Pid), + FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, + {noreply, process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(FilterFun, OpenPending), + obtain_count = ObtainCount - Obtained, + obtain_pending = filter_pending(FilterFun, ObtainPending), + elders = dict:erase(Pid, Elders) })}. + +terminate(_Reason, State = #fhc_state { clients = Clients }) -> + ets:delete(Clients), State. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- +%% pending queue abstraction helpers +%%---------------------------------------------------------------------------- + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +filter_pending(Fun, {Count, Queue}) -> + {Delta, Queue1} = + queue_fold(fun (Item, {DeltaN, QueueN}) -> + case Fun(Item) of + true -> {DeltaN, queue:in(Item, QueueN)}; + false -> {DeltaN - requested(Item), QueueN} + end + end, {0, queue:new()}, Queue), + {Count + Delta, Queue1}. + +pending_new() -> + {0, queue:new()}. + +pending_in(Item = #pending { requested = Requested }, {Count, Queue}) -> + {Count + Requested, queue:in(Item, Queue)}. + +pending_out({0, _Queue} = Pending) -> + {empty, Pending}; +pending_out({N, Queue}) -> + {{value, #pending { requested = Requested }} = Result, Queue1} = + queue:out(Queue), + {Result, {N - Requested, Queue1}}. + +pending_count({Count, _Queue}) -> + Count. + +pending_is_empty({0, _Queue}) -> + true; +pending_is_empty({_N, _Queue}) -> + false. + +%%---------------------------------------------------------------------------- %% server helpers %%---------------------------------------------------------------------------- +obtain_limit(infinity) -> infinity; +obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of + OLimit when OLimit < 0 -> 0; + OLimit -> OLimit + end. + +requested({_Kind, _Pid, Requested, _From}) -> + Requested. + process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> - process_obtain(process_open(State)). + process_open(process_obtain(State)). process_open(State = #fhc_state { limit = Limit, open_pending = Pending, open_count = OpenCount, obtain_count = ObtainCount }) -> - {Pending1, Inc, State1} = + {Pending1, State1} = process_pending(Pending, Limit - (ObtainCount + OpenCount), State), - State1 #fhc_state { open_pending = Pending1, - open_count = OpenCount + Inc }. + State1 #fhc_state { open_pending = Pending1 }. process_obtain(State = #fhc_state { limit = Limit, obtain_pending = Pending, @@ -878,82 +997,139 @@ process_obtain(State = #fhc_state { limit = Limit, open_count = OpenCount }) -> Quota = lists:min([ObtainLimit - ObtainCount, Limit - (ObtainCount + OpenCount)]), - {Pending1, Inc, State1} = process_pending(Pending, Quota, State), - State1 #fhc_state { obtain_pending = Pending1, - obtain_count = ObtainCount + Inc }. + {Pending1, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1 }. -process_pending([], _Quota, State) -> - {[], 0, State}; process_pending(Pending, Quota, State) when Quota =< 0 -> - {Pending, 0, State}; -process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) -> - PendingLen = length(Pending), - SatisfiableLen = lists:min([PendingLen, Quota]), - Take = PendingLen - SatisfiableLen, - {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev), - {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}. - -run_pending_item(Item, State = #fhc_state { counts = Counts }) -> - State #fhc_state { counts = run_pending_item1(Item, Counts) }. - -run_pending_item1({Kind, Pid, From}, Counts) -> + {Pending, State}; +process_pending(Pending, Quota, State) -> + case pending_out(Pending) of + {empty, _Pending} -> + {Pending, State}; + {{value, #pending { requested = Requested }}, _Pending1} + when Requested > Quota -> + {Pending, State}; + {{value, #pending { requested = Requested } = Item}, Pending1} -> + process_pending(Pending1, Quota - Requested, + run_pending_item(Item, State)) + end. + +run_pending_item(#pending { kind = Kind, + pid = Pid, + requested = Requested, + from = From }, + State = #fhc_state { clients = Clients }) -> gen_server:reply(From, ok), - update_counts(Kind, Pid, +1, Counts). - -update_counts(open, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) - when Opened >= 0 andalso Obtained >= 0 -> - {Opened + Delta, Obtained} end, - Counts); -update_counts(obtain, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) - when Opened >= 0 andalso Obtained >= 0 -> - {Opened, Obtained + Delta} end, - Counts). - -maybe_reduce(State = #fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_limit = ObtainLimit, - obtain_pending = ObtainPending, - elders = Elders, - callbacks = Callbacks, - timer_ref = TRef }) - when Limit =/= infinity andalso - (((OpenCount + ObtainCount) > Limit) orelse - (OpenPending =/= []) orelse - (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), + update_counts(Kind, Pid, Requested, State). + +update_counts(Kind, Pid, Delta, + State = #fhc_state { open_count = OpenCount, + obtain_count = ObtainCount, + clients = Clients }) -> + {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + +update_counts1(open, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), + {Delta, 0}; +update_counts1(obtain, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), + {0, Delta}. + +maybe_reduce(State) -> + case needs_reduce(State) of + true -> reduce(State); + false -> State + end. + +needs_reduce(#fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending }) -> + Limit =/= infinity + andalso ((OpenCount + ObtainCount > Limit) + orelse (not pending_is_empty(OpenPending)) + orelse (ObtainCount < ObtainLimit + andalso not pending_is_empty(ObtainPending))). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending = ObtainPending, + elders = Elders, + clients = Clients, + timer_ref = TRef }) -> Now = now(), - {Pids, Sum, ClientCount} = - dict:fold(fun (_Pid, undefined, Accs) -> - Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> - {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), - CountAcc + 1} + {CStates, Sum, ClientCount} = + dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + [#cstate { pending_closes = PendingCloses, + opened = Opened, + blocked = Blocked } = CState] = + ets:lookup(Clients, Pid), + case Blocked orelse PendingCloses =:= Opened of + true -> Accs; + false -> {[CState | CStatesAcc], + SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end end, {[], 0, 0}, Elders), - case Pids of + case CStates of [] -> ok; - _ -> AverageAge = Sum / ClientCount, - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> ok; - {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) - end - end, Pids) + _ -> case (Sum / ClientCount) - + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + notify_age(CStates, AverageAge); + _ -> + notify_age0(Clients, CStates, + pending_count(OpenPending) + + pending_count(ObtainPending)) + end end, - AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; - _ -> {AboveLimit, State} - end; -maybe_reduce(State) -> - {false, State}. + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end. + +notify_age(CStates, AverageAge) -> + lists:foreach( + fun (#cstate { callback = undefined }) -> ok; + (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge]) + end, CStates). + +notify_age0(Clients, CStates, Required) -> + Notifications = + [CState || CState <- CStates, CState#cstate.callback =/= undefined], + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Clients, Required, L2 ++ L1). + +notify(_Clients, _Required, []) -> + ok; +notify(_Clients, Required, _Notifications) when Required =< 0 -> + ok; +notify(Clients, Required, [#cstate{ pid = Pid, + callback = {M, F, A}, + opened = Opened } | Notifications]) -> + apply(M, F, A ++ [0]), + ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}), + notify(Clients, Required - Opened, Notifications). + +track_client(Pid, Clients) -> + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained = 0, + blocked = false, + pending_closes = 0 }) of + true -> _MRef = erlang:monitor(process, Pid), + ok; + false -> ok + end. %% For all unices, assume ulimit exists. Further googling suggests %% that BSDs (incl OS X), solaris and linux all agree that ulimit -n @@ -961,7 +1137,7 @@ maybe_reduce(State) -> ulimit() -> case os:type() of {win32, _OsName} -> - ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS; + ?FILE_HANDLES_LIMIT_WINDOWS; {unix, _OsName} -> %% Under Linux, Solaris and FreeBSD, ulimit is a shell %% builtin, not a command. In OS X, it's a command. @@ -971,24 +1147,14 @@ ulimit() -> "unlimited" -> infinity; String = [C|_] when $0 =< C andalso C =< $9 -> - Num = list_to_integer( - lists:takewhile( - fun (D) -> $0 =< D andalso D =< $9 end, String)) - - ?RESERVED_FOR_OTHERS, - lists:max([1, Num]); + list_to_integer( + lists:takewhile( + fun (D) -> $0 =< D andalso D =< $9 end, String)); _ -> %% probably a variant of %% "/bin/sh: line 1: ulimit: command not found\n" - ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + unknown end; _ -> - ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS - end. - -ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> - case dict:find(Pid, Counts) of - {ok, _} -> State; - error -> _MRef = erlang:monitor(process, Pid), - State #fhc_state { - counts = dict:store(Pid, {0, 0}, Counts) } + unknown end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index f1c8eb4d9d..9fb9e2fea7 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -976,7 +976,7 @@ print_event(Dev, Event, Name) -> terminate(Reason, Name, Msg, Mod, State, Debug) -> case catch Mod:terminate(Reason, State) of {'EXIT', R} -> - error_info(R, Name, Msg, State, Debug), + error_info(R, Reason, Name, Msg, State, Debug), exit(R); _ -> case Reason of @@ -987,42 +987,44 @@ terminate(Reason, Name, Msg, Mod, State, Debug) -> {shutdown,_}=Shutdown -> exit(Shutdown); _ -> - error_info(Reason, Name, Msg, State, Debug), + error_info(Reason, undefined, Name, Msg, State, Debug), exit(Reason) end end. -error_info(_Reason, application_controller, _Msg, _State, _Debug) -> +error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> %% OTP-5811 Don't send an error report if it's the system process %% application_controller which is terminating - let init take care %% of it instead ok; -error_info(Reason, Name, Msg, State, Debug) -> - Reason1 = - case Reason of - {undef,[{M,F,A}|MFAs]} -> - case code:is_loaded(M) of - false -> - {'module could not be loaded',[{M,F,A}|MFAs]}; - _ -> - case erlang:function_exported(M, F, length(A)) of - true -> - Reason; - false -> - {'function not exported',[{M,F,A}|MFAs]} - end - end; - _ -> - Reason - end, - format("** Generic server ~p terminating \n" - "** Last message in was ~p~n" - "** When Server state == ~p~n" - "** Reason for termination == ~n** ~p~n", - [Name, Msg, State, Reason1]), +error_info(Reason, RootCause, Name, Msg, State, Debug) -> + Reason1 = error_reason(Reason), + Fmt = + "** Generic server ~p terminating~n" + "** Last message in was ~p~n" + "** When Server state == ~p~n" + "** Reason for termination == ~n** ~p~n", + case RootCause of + undefined -> format(Fmt, [Name, Msg, State, Reason1]); + _ -> format(Fmt ++ "** In 'terminate' callback " + "with reason ==~n** ~p~n", + [Name, Msg, State, Reason1, + error_reason(RootCause)]) + end, sys:print_log(Debug), ok. +error_reason({undef,[{M,F,A}|MFAs]} = Reason) -> + case code:is_loaded(M) of + false -> {'module could not be loaded',[{M,F,A}|MFAs]}; + _ -> case erlang:function_exported(M, F, length(A)) of + true -> Reason; + false -> {'function not exported',[{M,F,A}|MFAs]} + end + end; +error_reason(Reason) -> + Reason. + %%% --------------------------------------------------- %%% Misc. functions. %%% --------------------------------------------------- diff --git a/src/rabbit.erl b/src/rabbit.erl index 41c628a071..c257497070 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -83,12 +83,6 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). --rabbit_boot_step({rabbit_hooks, - [{description, "internal event notification system"}, - {mfa, {rabbit_hooks, start, []}}, - {requires, external_infrastructure}, - {enables, kernel_ready}]}). - -rabbit_boot_step({rabbit_event, [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, @@ -211,8 +205,7 @@ %%---------------------------------------------------------------------------- prepare() -> - ok = ensure_working_log_handlers(), - ok = rabbit_mnesia:ensure_mnesia_dir(). + ok = ensure_working_log_handlers(). start() -> try diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 8d00f59124..fd57cbfc00 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -37,9 +37,10 @@ check_vhost_access/2, check_resource_access/3]). -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). --export([add_vhost/1, delete_vhost/1, list_vhosts/0]). +-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). -export([set_permissions/5, set_permissions/6, clear_permissions/2, - list_vhost_permissions/1, list_user_permissions/1]). + list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, + list_user_vhost_permissions/2]). %%---------------------------------------------------------------------------- @@ -52,6 +53,7 @@ -type(password() :: binary()). -type(regexp() :: binary()). -type(scope() :: binary()). +-type(scope_atom() :: 'client' | 'all'). -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | @@ -72,22 +74,27 @@ -spec(lookup_user/1 :: (username()) -> rabbit_types:ok(rabbit_types:user()) | rabbit_types:error('not_found')). --spec(add_vhost/1 :: - (rabbit_types:vhost()) -> 'ok'). --spec(delete_vhost/1 :: - (rabbit_types:vhost()) -> 'ok'). +-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()). -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). +-spec(list_permissions/0 :: + () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), + scope_atom()}]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) - -> [{username(), regexp(), regexp(), regexp()}]). + (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(), + scope_atom()}]). -spec(list_user_permissions/1 :: - (username()) - -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). + (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(), + scope_atom()}]). +-spec(list_user_vhost_permissions/2 :: + (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(), + scope_atom()}]). -endif. @@ -142,7 +149,7 @@ internal_lookup_vhost_access(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> not_found; [R] -> {ok, R} @@ -160,7 +167,6 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. -permission_index(scope) -> #permission.scope; permission_index(configure) -> #permission.configure; permission_index(write) -> #permission.write; permission_index(read) -> #permission.read. @@ -171,27 +177,29 @@ check_resource_access(Username, check_resource_access(Username, R#resource{name = <<"amq.default">>}, Permission); -check_resource_access(_Username, - #resource{name = <<"amq.gen",_/binary>>}, - #permission{scope = client}) -> - ok; check_resource_access(Username, R = #resource{virtual_host = VHostPath, name = Name}, Permission) -> Res = case mnesia:dirty_read({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> false; [#user_permission{permission = P}] -> - PermRegexp = case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false + case {Name, P} of + {<<"amq.gen",_/binary>>, #permission{scope = client}} -> + true; + _ -> + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false + end end end, if Res -> ok; @@ -300,7 +308,7 @@ delete_vhost(VHostPath) -> R. internal_delete_vhost(VHostPath) -> - lists:foreach(fun (#exchange{name=Name}) -> + lists:foreach(fun (#exchange{name = Name}) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), @@ -311,6 +319,9 @@ internal_delete_vhost(VHostPath) -> ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. +vhost_exists(VHostPath) -> + mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. + list_vhosts() -> mnesia:dirty_all_keys(rabbit_vhost). @@ -338,13 +349,13 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer fun () -> ok = mnesia:write( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ - username = Username, + username = Username, virtual_host = VHostPath}, permission = #permission{ - scope = Scope, + scope = Scope, configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}}, + write = WritePerm, + read = ReadPerm}}, write) end)). @@ -355,10 +366,15 @@ clear_permissions(Username, VHostPath) -> Username, VHostPath, fun () -> ok = mnesia:delete({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) end)). +list_permissions() -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + list_permissions(match_user_vhost('_', '_'))]. + list_vhost_permissions(VHostPath) -> [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- @@ -371,15 +387,21 @@ list_user_permissions(Username) -> list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. +list_user_vhost_permissions(Username, VHostPath) -> + [{ConfigurePerm, WritePerm, ReadPerm, Scope} || + {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + list_permissions(rabbit_misc:with_user_and_vhost( + Username, VHostPath, + match_user_vhost(Username, VHostPath)))]. + list_permissions(QueryThunk) -> [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - #user_permission{user_vhost = #user_vhost{username = Username, + #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, - permission = #permission{ - scope = Scope, - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}} <- + permission = #permission{ scope = Scope, + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. @@ -387,7 +409,7 @@ match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ - username = Username, + username = Username, virtual_host = VHostPath}, permission = '_'}, read) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0cdb4fff08..7116653c2a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -56,7 +56,7 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(EXPIRES_TYPE, long). +-define(EXPIRES_TYPES, [byte, short, signedint, long]). %%---------------------------------------------------------------------------- @@ -249,11 +249,12 @@ start_queue_process(Q) -> Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> - Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], - fun (_X, _Q) -> ok end), - ok. + rabbit_binding:add(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -313,13 +314,13 @@ check_declare_arguments(QueueName, Args) -> check_expires_argument(undefined) -> ok; -check_expires_argument({?EXPIRES_TYPE, Expires}) - when is_integer(Expires) andalso Expires > 0 -> - ok; -check_expires_argument({?EXPIRES_TYPE, _Expires}) -> - {error, expires_zero_or_less}; -check_expires_argument(_) -> - {error, expires_not_of_type_long}. +check_expires_argument({Type, Expires}) when Expires > 0 -> + case lists:member(Type, ?EXPIRES_TYPES) of + true -> ok; + false -> {error, {expires_not_of_acceptable_type, Type, Expires}} + end; +check_expires_argument({_Type, _Expires}) -> + {error, expires_zero_or_less}. list(VHostPath) -> mnesia:dirty_match_object( @@ -433,7 +434,7 @@ internal_delete1(QueueName) -> %% we want to execute some things, as %% decided by rabbit_exchange, after the %% transaction. - rabbit_exchange:delete_queue_bindings(QueueName). + rabbit_binding:remove_for_queue(QueueName). internal_delete(QueueName) -> case @@ -478,7 +479,7 @@ on_node_down(Node) -> ok. delete_queue(QueueName) -> - Post = rabbit_exchange:delete_transient_queue_bindings(QueueName), + Post = rabbit_binding:remove_transient_for_queue(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), Post. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2cab7136a6..0849586294 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -146,8 +146,8 @@ code_change(_OldVsn, State, _Extra) -> init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of - {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); - undefined -> State + {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); + undefined -> State end. declare(Recover, From, @@ -163,10 +163,8 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - rabbit_event:notify( - queue_created, - [{Item, i(Item, State)} || - Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(queue_created, + infos(?CREATION_EVENT_KEYS, State)), noreply(init_expires(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -587,8 +585,7 @@ i(Item, _) -> throw({bad_argument, Item}). emit_stats(State) -> - rabbit_event:notify(queue_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)). %--------------------------------------------------------------------------- diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl new file mode 100644 index 0000000000..bb29580fd4 --- /dev/null +++ b/src/rabbit_binding.erl @@ -0,0 +1,378 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_binding). +-include("rabbit.hrl"). + +-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). +-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). +%% these must all be run inside a mnesia tx +-export([has_for_exchange/1, remove_for_exchange/1, + remove_for_queue/1, remove_transient_for_queue/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([key/0]). + +-type(key() :: binary()). + +-type(bind_errors() :: rabbit_types:error('queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found')). +-type(bind_res() :: 'ok' | bind_errors()). +-type(inner_fun() :: + fun((rabbit_types:exchange(), queue()) -> + rabbit_types:ok_or_error(rabbit_types:amqp_error()))). +-type(bindings() :: [rabbit_types:binding()]). + +-spec(recover/0 :: () -> [rabbit_types:binding()]). +-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). +-spec(add/1 :: (rabbit_types:binding()) -> bind_res()). +-spec(remove/1 :: (rabbit_types:binding()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(list/1 :: (rabbit_types:vhost()) -> bindings()). +-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_exchange_and_queue/2 :: + (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). +-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> + [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). +-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). +-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(remove_for_queue/1 :: + (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(remove_transient_for_queue/1 :: + (rabbit_amqqueue:name()) -> fun (() -> any())). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). + +recover() -> + rabbit_misc:table_fold( + fun (Route = #route{binding = B}, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, Route, write), + ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write), + [B | Acc] + end, [], rabbit_durable_route). + +exists(Binding) -> + binding_action( + Binding, + fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + +add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). + +remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). + +add(Binding, InnerFun) -> + case binding_action( + Binding, + fun (X, Q, B) -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + case InnerFun(X, Q) of + ok -> + case mnesia:read({rabbit_route, B}) of + [] -> Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:write/3), + {new, X, B}; + [_] -> {existing, X, B} + end; + {error, _} = E -> + E + end + end) of + {new, Exchange = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Exchange, B), + rabbit_event:notify(binding_created, info(B)); + {existing, _, _} -> + ok; + {error, _} = Err -> + Err + end. + +remove(Binding, InnerFun) -> + case binding_action( + Binding, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + [_] -> case InnerFun(X, Q) of + ok -> + Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(X), + {{Deleted, X}, B}; + {error, _} = E -> + E + end + end + end) of + {error, _} = Err -> + Err; + {{IsDeleted, X = #exchange{ type = Type }}, B} -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> ok = Module:delete(X, [B]); + not_deleted -> ok = Module:remove_bindings(X, [B]) + end, + rabbit_event:notify(binding_deleted, info(B)), + ok + end. + +list(VHostPath) -> + Route = #route{binding = #binding{ + exchange_name = rabbit_misc:r(VHostPath, exchange), + queue_name = rabbit_misc:r(VHostPath, queue), + _ = '_'}, + _ = '_'}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +list_for_exchange(ExchangeName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +list_for_queue(QueueName) -> + Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, + [reverse_binding(B) || #reverse_route{reverse_binding = B} <- + mnesia:dirty_match_object(rabbit_reverse_route, + reverse_route(Route))]. + +list_for_exchange_and_queue(ExchangeName, QueueName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +info_keys() -> ?INFO_KEYS. + +map(VHostPath, F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list(VHostPath)). + +infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. + +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(queue_name, #binding{queue_name = QName}) -> QName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(B = #binding{}) -> infos(?INFO_KEYS, B). + +info(B = #binding{}, Items) -> infos(Items, B). + +info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). + +has_for_exchange(ExchangeName) -> + Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, + %% we need to check for durable routes here too in case a bunch of + %% routes to durable queues have been removed temporarily as a + %% result of a node failure + contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). + +remove_for_exchange(ExchangeName) -> + [begin + ok = mnesia:delete_object(rabbit_reverse_route, + reverse_route(Route), write), + ok = delete_forward_routes(Route), + Route#route.binding + end || Route <- mnesia:match_object( + rabbit_route, + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + write)]. + +remove_for_queue(QueueName) -> + remove_for_queue(QueueName, fun delete_forward_routes/1). + +remove_transient_for_queue(QueueName) -> + remove_for_queue(QueueName, fun delete_transient_forward_routes/1). + +%%---------------------------------------------------------------------------- + +binding_action(Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + args = Arguments}, Fun) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + SortedArgs = rabbit_misc:sort_field_table(Arguments), + Fun(X, Q, Binding#binding{args = SortedArgs}) + end). + +sync_binding(Binding, Durable, Fun) -> + ok = case Durable of + true -> Fun(rabbit_durable_route, + #route{binding = Binding}, write); + false -> ok + end, + {Route, ReverseRoute} = route_with_reverse(Binding), + ok = Fun(rabbit_route, Route, write), + ok = Fun(rabbit_reverse_route, ReverseRoute, write), + ok. + +call_with_exchange_and_queue(Exchange, Queue, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> case {mnesia:read({rabbit_exchange, Exchange}), + mnesia:read({rabbit_queue, Queue})} of + {[X], [Q]} -> Fun(X, Q); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, queue_not_found}; + {[ ], [ ]} -> {error, exchange_and_queue_not_found} + end + end). + +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + Module. + +contains(Table, MatchHead) -> + continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). + +continue('$end_of_table') -> false; +continue({[_|_], _}) -> true; +continue({[], Continuation}) -> continue(mnesia:select(Continuation)). + +remove_for_queue(QueueName, FwdDeleteFun) -> + DeletedBindings = + [begin + Route = reverse_route(ReverseRoute), + ok = FwdDeleteFun(Route), + ok = mnesia:delete_object(rabbit_reverse_route, + ReverseRoute, write), + Route#route.binding + end || ReverseRoute + <- mnesia:match_object( + rabbit_reverse_route, + reverse_route(#route{binding = #binding{ + queue_name = QueueName, + _ = '_'}}), + write)], + Grouped = group_bindings_and_auto_delete( + lists:keysort(#binding.exchange_name, DeletedBindings), []), + fun () -> + lists:foreach( + fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> Module:delete(X, Bs); + not_deleted -> Module:remove_bindings(X, Bs) + end + end, Grouped) + end. + +%% Requires that its input binding list is sorted in exchange-name +%% order, so that the grouping of bindings (for passing to +%% group_bindings_and_auto_delete1) works properly. +group_bindings_and_auto_delete([], Acc) -> + Acc; +group_bindings_and_auto_delete( + [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> + group_bindings_and_auto_delete(ExchangeName, Bs, [B], Acc). + +group_bindings_and_auto_delete( + ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], + Bindings, Acc) -> + group_bindings_and_auto_delete(ExchangeName, Bs, [B | Bindings], Acc); +group_bindings_and_auto_delete(ExchangeName, Removed, Bindings, Acc) -> + %% either Removed is [], or its head has a non-matching ExchangeName + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], + group_bindings_and_auto_delete(Removed, NewAcc). + +delete_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_durable_route, Route, write). + +delete_transient_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write). + +route_with_reverse(#route{binding = Binding}) -> + route_with_reverse(Binding); +route_with_reverse(Binding = #binding{}) -> + Route = #route{binding = Binding}, + {Route, reverse_route(Route)}. + +reverse_route(#route{binding = Binding}) -> + #reverse_route{reverse_binding = reverse_binding(Binding)}; + +reverse_route(#reverse_route{reverse_binding = Binding}) -> + #route{binding = reverse_binding(Binding)}. + +reverse_binding(#reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}; + +reverse_binding(#binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 138df716e2..174eab4002 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/6, do/2, do/3, shutdown/1]). +-export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1, flush/1]). @@ -44,7 +44,7 @@ handle_info/2, handle_pre_hibernate/1]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, - transaction_id, tx_participants, next_tag, + start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer}). @@ -76,9 +76,11 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/6 :: +-spec(start_link/7 :: (channel_number(), pid(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> rabbit_types:ok_pid_or_error()). + rabbit_types:vhost(), pid(), + fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> + rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -100,9 +102,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost, CollectorPid], []). +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun) -> + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username, + VHost, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -150,15 +153,16 @@ flush(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun]) -> process_flag(trap_exit, true), - link(WriterPid), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, + start_limiter_fun = StartLimiterFun, transaction_id = none, tx_participants = sets:new(), next_tag = 1, @@ -171,9 +175,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> blocking = dict:new(), queue_collector_pid = CollectorPid, stats_timer = rabbit_event:init_stats_timer()}, - rabbit_event:notify( - channel_created, - [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -239,12 +241,6 @@ handle_cast(emit_stats, State) -> internal_emit_stats(State), {noreply, State}. -handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, - State = #ch{writer_pid = WriterPid}) -> - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, normal, State}; -handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. @@ -259,8 +255,10 @@ terminate(_Reason, State = #ch{state = terminating}) -> terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of - normal -> ok = Res; - _ -> ok + normal -> ok = Res; + shutdown -> ok = Res; + {shutdown, _Term} -> ok = Res; + _ -> ok end, terminate(State). @@ -403,7 +401,7 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rollback_and_notify(State), - ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), stop; handle_method(#'access.request'{},_, State) -> @@ -807,17 +805,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, - QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, - NoWait, State); + binding_action(fun rabbit_binding:add/2, + ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + #'queue.bind_ok'{}, NoWait, State); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, - QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, - false, State); + binding_action(fun rabbit_binding:remove/2, + ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -895,7 +893,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + case Fun(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = ActualRoutingKey, + args = Arguments}, fun (_X, Q) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} @@ -1016,8 +1017,8 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ}) -> - {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)), +start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> + {ok, LPid} = SLF(queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid. @@ -1089,11 +1090,9 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> +terminate(_State) -> pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]), - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + rabbit_event:notify(channel_closed, [{pid, self()}]). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1150,7 +1149,7 @@ update_measures(Type, QX, Inc, Measure) -> orddict:store(Measure, Cur + Inc, Measures)). internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> - CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS], + CoarseStats = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(StatsTimer) of coarse -> rabbit_event:notify(channel_stats, CoarseStats); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl new file mode 100644 index 0000000000..02199a6516 --- /dev/null +++ b/src/rabbit_channel_sup.erl @@ -0,0 +1,96 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup). + +-behaviour(supervisor2). + +-export([start_link/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([start_link_args/0]). + +-type(start_link_args() :: + {rabbit_types:protocol(), rabbit_net:socket(), + rabbit_channel:channel_number(), non_neg_integer(), pid(), + rabbit_access_control:username(), rabbit_types:vhost(), pid()}). + +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, + Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, WriterPid} = + supervisor2:start_child( + SupPid, + {writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol, ReaderPid]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ReaderPid, WriterPid, Username, VHost, + Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, FramingChannelPid} = + supervisor2:start_child( + SupPid, + {framing_channel, {rabbit_framing_channel, start_link, + [ReaderPid, ChannelPid, Protocol]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), + {ok, SupPid, FramingChannelPid}. + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + +start_limiter_fun(SupPid) -> + fun (UnackedCount) -> + Me = self(), + {ok, _Pid} = + supervisor2:start_child( + SupPid, + {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}) + end. diff --git a/src/rabbit_tracer.erl b/src/rabbit_channel_sup_sup.erl index 484249b1df..d193880555 100644 --- a/src/rabbit_tracer.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -29,22 +29,37 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_tracer). --export([start/0]). +-module(rabbit_channel_sup_sup). --import(erlang). +-behaviour(supervisor2). -start() -> - spawn(fun mainloop/0), - ok. +-export([start_link/0, start_channel/2]). -mainloop() -> - erlang:trace(new, true, [all]), - mainloop1(). +-export([init/1]). -mainloop1() -> - receive - Msg -> - rabbit_log:info("TRACE: ~p~n", [Msg]) - end, - mainloop1(). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> + {'ok', pid(), pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link(?MODULE, []). + +start_channel(Pid, Args) -> + {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]), + link(ChSupPid), + Result. + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{channel_sup, {rabbit_channel_sup, start_link, []}, + temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl new file mode 100644 index 0000000000..b3821d3b8b --- /dev/null +++ b/src/rabbit_connection_sup.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_connection_sup). + +-behaviour(supervisor2). + +-export([start_link/0, reader/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid(), pid()}). +-spec(reader/1 :: (pid()) -> pid()). + +-endif. + +%%-------------------------------------------------------------------------- + +start_link() -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelSupSupPid} = + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + {ok, Collector} = + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + {ok, ReaderPid} = + supervisor2:start_child( + SupPid, + {reader, {rabbit_reader, start_link, + [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), + {ok, SupPid, ReaderPid}. + +reader(Pid) -> + hd(supervisor2:find_child(Pid, reader)). + +%%-------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + +start_heartbeat_fun(SupPid) -> + fun (_Sock, 0) -> + none; + (Sock, TimeoutSec) -> + Parent = self(), + {ok, Sender} = + supervisor2:start_child( + SupPid, {heartbeat_sender, + {rabbit_heartbeat, start_heartbeat_sender, + [Parent, Sock, TimeoutSec]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, Receiver} = + supervisor2:start_child( + SupPid, {heartbeat_receiver, + {rabbit_heartbeat, start_heartbeat_receiver, + [Parent, Sock, TimeoutSec]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {Sender, Receiver} + end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index f0b623c2dc..3cdb0619cf 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -246,14 +246,14 @@ action(list_exchanges, Node, Args, Opts, Inform) -> [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, _Args, Opts, Inform) -> +action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - InfoKeys = [exchange_name, queue_name, routing_key, args], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], - InfoKeys); + ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), @@ -304,9 +304,11 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || - X <- InfoItemKeys]) - end, Results), + lists:foreach( + fun (Result) -> display_row( + [format_info_item(proplists:get_value(X, Result)) || + X <- InfoItemKeys]) + end, Results), ok; display_info_list(Other, _) -> Other. @@ -315,25 +317,30 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Key, Items) -> - case proplists:get_value(Key, Items) of - #resource{name = Name} -> - escape(Name); - Value when Key =:= address; Key =:= peer_address andalso - is_tuple(Value) -> - inet_parse:ntoa(Value); - Value when is_pid(Value) -> - rabbit_misc:pid_to_string(Value); - Value when is_binary(Value) -> - escape(Value); - Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] - when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> - io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); - Value -> - io_lib:format("~w", [Value]) - end. +-define(IS_U8(X), (X >= 0 andalso X =< 255)). +-define(IS_U16(X), (X >= 0 andalso X =< 65535)). + +format_info_item(#resource{name = Name}) -> + escape(Name); +format_info_item({N1, N2, N3, N4} = Value) when + ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) -> + inet_parse:ntoa(Value); +format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when + ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4), + ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) -> + inet_parse:ntoa(Value); +format_info_item(Value) when is_pid(Value) -> + rabbit_misc:pid_to_string(Value); +format_info_item(Value) when is_binary(Value) -> + escape(Value); +format_info_item(Value) when is_atom(Value) -> + escape(atom_to_list(Value)); +format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = + Value) when is_binary(TableEntryKey) andalso + is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item(Value) -> + io_lib:format("~w", [Value]). display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index af4eb1bd79..40bee25f8b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -34,38 +34,19 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, - info/1, info/2, info_all/1, info_all/2, publish/2]). --export([add_binding/5, delete_binding/5, list_bindings/1]). --export([delete/2]). --export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([assert_equivalence/5]). --export([assert_args_equivalence/2]). --export([check_type/1]). - -%% EXTENDED API --export([list_exchange_bindings/1]). --export([list_queue_bindings/1]). - --import(mnesia). --import(sets). --import(lists). + info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). +%% this must be run inside a mnesia tx +-export([maybe_auto_delete/1]). +-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([name/0, type/0, binding_key/0]). +-export_type([name/0, type/0]). -type(name() :: rabbit_types:r('exchange')). -type(type() :: atom()). --type(binding_key() :: binary()). - --type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). --type(inner_fun() :: - fun((rabbit_types:exchange(), queue()) -> - rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: @@ -97,32 +78,12 @@ -> [[rabbit_types:info()]]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> {rabbit_router:routing_result(), [pid()]}). --spec(add_binding/5 :: - (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table(), inner_fun()) -> bind_res()). --spec(delete_binding/5 :: - (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table(), inner_fun()) - -> bind_res() | rabbit_types:error('binding_not_found')). --spec(list_bindings/1 :: - (rabbit_types:vhost()) - -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(delete_queue_bindings/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(delete_transient_queue_bindings/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(delete/2 :: (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). --spec(list_queue_bindings/1 :: - (rabbit_amqqueue:name()) - -> [{name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_exchange_bindings/1 :: - (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). +-spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> + 'not_deleted' | 'auto_deleted'). -endif. @@ -136,19 +97,7 @@ recover() -> ok = mnesia:write(rabbit_exchange, Exchange, write), [Exchange | Acc] end, [], rabbit_durable_exchange), - Bs = rabbit_misc:table_fold( - fun (Route = #route{binding = B}, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), - [B | Acc] - end, [], rabbit_durable_route), - recover_with_bindings(Bs, Exs), - ok. - -recover_with_bindings(Bs, Exs) -> + Bs = rabbit_binding:recover(), recover_with_bindings( lists:keysort(#binding.exchange_name, Bs), lists:keysort(#exchange.name, Exs), []). @@ -164,11 +113,11 @@ recover_with_bindings([], [], []) -> ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = ExchangeName, - type = Type, - durable = Durable, + Exchange = #exchange{name = ExchangeName, + type = Type, + durable = Durable, auto_delete = AutoDelete, - arguments = Args}, + arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. @@ -192,9 +141,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> end end) of {new, X} -> TypeModule:create(X), - rabbit_event:notify( - exchange_created, - [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]), + rabbit_event:notify(exchange_created, info(X)), X; {existing, X} -> X; Err -> Err @@ -220,9 +167,9 @@ check_type(TypeBin) -> end end. -assert_equivalence(X = #exchange{ durable = Durable, +assert_equivalence(X = #exchange{ durable = Durable, auto_delete = AutoDelete, - type = Type}, + type = Type}, Type, Durable, AutoDelete, RequiredArgs) -> (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, @@ -232,8 +179,7 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, "cannot redeclare ~s with different type, durable or autodelete value", [rabbit_misc:rs(Name)]). -assert_args_equivalence(#exchange{ name = Name, - arguments = Args }, +assert_args_equivalence(#exchange{ name = Name, arguments = Args }, RequiredArgs) -> %% The spec says "Arguments are compared for semantic %% equivalence". The only arg we care about is @@ -311,92 +257,6 @@ publish(X = #exchange{type = Type}, Seen, Delivery) -> R end. -%% TODO: Should all of the route and binding management not be -%% refactored to its own module, especially seeing as unbind will have -%% to be implemented for 0.91 ? - -delete_exchange_bindings(ExchangeName) -> - [begin - ok = mnesia:delete_object(rabbit_reverse_route, - reverse_route(Route), write), - ok = delete_forward_routes(Route), - Route#route.binding - end || Route <- mnesia:match_object( - rabbit_route, - #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - write)]. - -delete_queue_bindings(QueueName) -> - delete_queue_bindings(QueueName, fun delete_forward_routes/1). - -delete_transient_queue_bindings(QueueName) -> - delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). - -delete_queue_bindings(QueueName, FwdDeleteFun) -> - DeletedBindings = - [begin - Route = reverse_route(ReverseRoute), - ok = FwdDeleteFun(Route), - ok = mnesia:delete_object(rabbit_reverse_route, - ReverseRoute, write), - Route#route.binding - end || ReverseRoute - <- mnesia:match_object( - rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), - write)], - Cleanup = cleanup_deleted_queue_bindings( - lists:keysort(#binding.exchange_name, DeletedBindings), []), - fun () -> - lists:foreach( - fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, Bs); - not_deleted -> Module:remove_bindings(X, Bs) - end - end, Cleanup) - end. - -%% Requires that its input binding list is sorted in exchange-name -%% order, so that the grouping of bindings (for passing to -%% cleanup_deleted_queue_bindings1) works properly. -cleanup_deleted_queue_bindings([], Acc) -> - Acc; -cleanup_deleted_queue_bindings( - [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> - cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc). - -cleanup_deleted_queue_bindings( - ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], - Bindings, Acc) -> - cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc); -cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) -> - %% either Deleted is [], or its head has a non-matching ExchangeName - NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc], - cleanup_deleted_queue_bindings(Deleted, NewAcc). - -cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - {maybe_auto_delete(X), Bindings}. - -delete_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write), - ok = mnesia:delete_object(rabbit_durable_route, Route, write). - -delete_transient_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write). - -contains(Table, MatchHead) -> - continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). - -continue('$end_of_table') -> false; -continue({[_|_], _}) -> true; -continue({[], Continuation}) -> continue(mnesia:select(Continuation)). - call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, Exchange}) of @@ -405,156 +265,6 @@ call_with_exchange(Exchange, Fun) -> end end). -call_with_exchange_and_queue(Exchange, Queue, Fun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, Exchange}), - mnesia:read({rabbit_queue, Queue})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end - end). - -add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> - case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - %% this argument is used to check queue exclusivity; - %% in general, we want to fail on that in preference to - %% anything else - case InnerFun(X, Q) of - ok -> - case mnesia:read({rabbit_route, B}) of - [] -> - ok = sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:write/3), - rabbit_event:notify( - binding_created, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}, - {routing_key, RoutingKey}, - {arguments, Arguments}]), - {new, X, B}; - [_R] -> - {existing, X, B} - end; - {error, _} = E -> - E - end - end) of - {new, Exchange = #exchange{ type = Type }, Binding} -> - (type_to_module(Type)):add_binding(Exchange, Binding); - {existing, _, _} -> - ok; - {error, _} = Err -> - Err - end. - -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> - case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> - {error, binding_not_found}; - _ -> - case InnerFun(X, Q) of - ok -> - ok = - sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:delete_object/3), - rabbit_event:notify( - binding_deleted, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}]), - {maybe_auto_delete(X), B}; - {error, _} = E -> - E - end - end - end) of - {error, _} = Err -> - Err; - {{IsDeleted, X = #exchange{ type = Type }}, B} -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, [B]); - not_deleted -> Module:remove_bindings(X, [B]) - end - end. - -binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> - call_with_exchange_and_queue( - ExchangeName, QueueName, - fun (X, Q) -> - Fun(X, Q, #binding{ - exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) - end). - -sync_binding(Binding, Durable, Fun) -> - ok = case Durable of - true -> Fun(rabbit_durable_route, - #route{binding = Binding}, write); - false -> ok - end, - {Route, ReverseRoute} = route_with_reverse(Binding), - ok = Fun(rabbit_route, Route, write), - ok = Fun(rabbit_reverse_route, ReverseRoute, write), - ok. - -list_bindings(VHostPath) -> - [{ExchangeName, QueueName, RoutingKey, Arguments} || - #route{binding = #binding{ - exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName, - args = Arguments}} - <- mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. - -route_with_reverse(#route{binding = Binding}) -> - route_with_reverse(Binding); -route_with_reverse(Binding = #binding{}) -> - Route = #route{binding = Binding}, - {Route, reverse_route(Route)}. - -reverse_route(#route{binding = Binding}) -> - #reverse_route{reverse_binding = reverse_binding(Binding)}; - -reverse_route(#reverse_route{reverse_binding = Binding}) -> - #route{binding = reverse_binding(Binding)}. - -reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> - #binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}; - -reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> - #reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}. - delete(ExchangeName, IfUnused) -> Fun = case IfUnused of true -> fun conditional_delete/1; @@ -568,54 +278,23 @@ delete(ExchangeName, IfUnused) -> Error end. -maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> - {not_deleted, Exchange}; -maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> +maybe_auto_delete(#exchange{auto_delete = false}) -> + not_deleted; +maybe_auto_delete(#exchange{auto_delete = true} = Exchange) -> case conditional_delete(Exchange) of - {error, in_use} -> {not_deleted, Exchange}; - {deleted, Exchange, []} -> {auto_deleted, Exchange} + {error, in_use} -> not_deleted; + {deleted, Exchange, []} -> auto_deleted end. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - %% we need to check for durable routes here too in case a bunch of - %% routes to durable queues have been removed temporarily as a - %% result of a node failure - case contains(rabbit_route, Match) orelse - contains(rabbit_durable_route, Match) of + case rabbit_binding:has_for_exchange(ExchangeName) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Bindings = delete_exchange_bindings(ExchangeName), + Bindings = rabbit_binding:remove_for_exchange(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}), rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), {deleted, Exchange, Bindings}. - -%%---------------------------------------------------------------------------- -%% EXTENDED API -%% These are API calls that are not used by the server internally, -%% they are exported for embedded clients to use - -%% This is currently used in mod_rabbit.erl (XMPP) and expects this to -%% return {QueueName, RoutingKey, Arguments} tuples -list_exchange_bindings(ExchangeName) -> - Route = #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - [{QueueName, RoutingKey, Arguments} || - #route{binding = #binding{queue_name = QueueName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. - -% Refactoring is left as an exercise for the reader -list_queue_bindings(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, - _ = '_'}}, - [{ExchangeName, RoutingKey, Arguments} || - #route{binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 44607398cb..0a59a175cd 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -79,8 +79,8 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort -%% (rabbit_misc:sort_field_table) that route/3 and -%% rabbit_exchange:{add,delete}_binding/4 do. +%% (rabbit_misc:sort_field_table) that publish/1 and +%% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 553faaa814..cb53185f6b 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -39,16 +39,9 @@ %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs, Protocol) -> - Parent = self(), - {ok, spawn_link( - fun () -> - %% we trap exits so that a normal termination of - %% the channel or reader process terminates us too. - process_flag(trap_exit, true), - {ok, ChannelPid} = apply(StartFun, StartArgs), - mainloop(Parent, ChannelPid, Protocol) - end)}. +start_link(Parent, ChannelPid, Protocol) -> + {ok, proc_lib:spawn_link( + fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -62,12 +55,6 @@ shutdown(Pid) -> read_frame(ChannelPid) -> receive - %% converting the exit signal into one of our own ensures that - %% the reader sees the right pid (i.e. ours) when a channel - %% exits. Similarly in the other direction, though it is not - %% really relevant there since the channel is not specifically - %% watching out for reader exit signals. - {'EXIT', _Pid, Reason} -> exit(Reason); {frame, Frame} -> Frame; terminate -> rabbit_channel:shutdown(ChannelPid), read_frame(ChannelPid); diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ab50c28ca5..a9945af1d4 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,16 +31,26 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]). +-export([start_heartbeat_sender/3, start_heartbeat_receiver/3, + pause_monitor/1, resume_monitor/1]). + +-include("rabbit.hrl"). %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([heartbeaters/0]). + -type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). --spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> - heartbeaters()). +-spec(start_heartbeat_sender/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). + -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -48,27 +58,26 @@ %%---------------------------------------------------------------------------- -start_heartbeat(_Sock, 0) -> - none; -start_heartbeat(Sock, TimeoutSec) -> - Parent = self(), +start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - Sender = heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end}, Parent), + heartbeater( + {Sock, TimeoutSec * 1000 div 2, send_oct, 0, + fun () -> + catch rabbit_net:send( + Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end}). + +start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - Receiver = heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end}, Parent), - {Sender, Receiver}. + heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> + Parent ! timeout, + stop + end}). pause_monitor(none) -> ok; @@ -84,21 +93,15 @@ resume_monitor({_Sender, Receiver}) -> %%---------------------------------------------------------------------------- -heartbeater(Params, Parent) -> - spawn_link(fun () -> heartbeater(Params, erlang:monitor(process, Parent), - {0, 0}) - end). +heartbeater(Params) -> + {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - MonitorRef, {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, + {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, V) end, receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; pause -> receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; resume -> Recurse({0, 0}); Other -> diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl deleted file mode 100644 index 3fc84c1e09..0000000000 --- a/src/rabbit_hooks.erl +++ /dev/null @@ -1,73 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_hooks). - --export([start/0]). --export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]). - --define(TableName, rabbit_hooks). - --ifdef(use_specs). - --spec(start/0 :: () -> 'ok'). --spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok'). --spec(unsubscribe/2 :: (atom(), atom()) -> 'ok'). --spec(trigger/2 :: (atom(), list()) -> 'ok'). --spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok'). - --endif. - -start() -> - ets:new(?TableName, [bag, public, named_table]), - ok. - -subscribe(Hook, HandlerName, Handler) -> - ets:insert(?TableName, {Hook, HandlerName, Handler}), - ok. - -unsubscribe(Hook, HandlerName) -> - ets:match_delete(?TableName, {Hook, HandlerName, '_'}), - ok. - -trigger(Hook, Args) -> - Hooks = ets:lookup(?TableName, Hook), - [case catch apply(M, F, [Hook, Name, Args | A]) of - {'EXIT', Reason} -> - rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", - [Name, Hook, Reason]); - _ -> ok - end || {_, Name, {M, F, A}} <- Hooks], - ok. - -notify_remote(Hook, HandlerName, Args, Pid, PidArgs) -> - Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]}, - ok. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9894a85005..da7078f1ba 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -35,7 +35,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/2, shutdown/1]). +-export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -47,7 +47,6 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok_pid_or_error()). --spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). @@ -77,17 +76,10 @@ start_link(ChPid, UnackedMsgCount) -> gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). -shutdown(undefined) -> - ok; -shutdown(LimiterPid) -> - true = unlink(LimiterPid), - gen_server2:cast(LimiterPid, shutdown). - limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, {limit, PrefetchCount})). + gen_server2:call(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -125,8 +117,7 @@ block(LimiterPid) -> unblock(undefined) -> ok; unblock(LimiterPid) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, unblock, infinity)). + gen_server2:call(LimiterPid, unblock, infinity). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -165,9 +156,6 @@ handle_call(unblock, _From, State) -> {stop, State1} -> {stop, normal, stopped, State1} end. -handle_cast(shutdown, State) -> - {stop, normal, State}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count @@ -247,9 +235,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. - -unlink_on_stopped(LimiterPid, stopped) -> - ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), - stopped; -unlink_on_stopped(_LimiterPid, Result) -> - Result. diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl deleted file mode 100644 index e0457b1e43..0000000000 --- a/src/rabbit_load.erl +++ /dev/null @@ -1,78 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_load). - --export([local_load/0, remote_loads/0, pick/0]). - --define(FUDGE_FACTOR, 0.98). --define(TIMEOUT, 100). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}). --spec(local_load/0 :: () -> load()). --spec(remote_loads/0 :: () -> [load()]). --spec(pick/0 :: () -> node()). - --endif. - -%%---------------------------------------------------------------------------- - -local_load() -> - LoadAvg = case whereis(cpu_sup) of - undefined -> unknown; - _ -> case cpu_sup:avg1() of - L when is_integer(L) -> L; - {error, timeout} -> unknown - end - end, - {{statistics(run_queue), LoadAvg}, node()}. - -remote_loads() -> - {ResL, _BadNodes} = - rpc:multicall(nodes(), ?MODULE, local_load, [], ?TIMEOUT), - ResL. - -pick() -> - RemoteLoads = remote_loads(), - {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node; we rely on Erlang's term order - %% of SomeFloat < local_unknown < unknown. - AdjustedLoadAvg = case LoadAvg of - unknown -> local_unknown; - _ -> LoadAvg * ?FUDGE_FACTOR - end, - Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], - {_, SelectedNode} = lists:min(Loads), - SelectedNode. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5fa3f8edb4..086d260e24 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -39,7 +39,6 @@ -export([die/1, frame_error/2, amqp_error/4, protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). --export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). -export([table_lookup/2]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -108,10 +107,6 @@ rabbit_framing:amqp_table(), rabbit_types:r(any()), [binary()]) -> 'ok' | rabbit_types:connection_exit()). --spec(get_config/1 :: - (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')). --spec(get_config/2 :: (atom(), A) -> A). --spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(table_lookup/2 :: @@ -240,21 +235,6 @@ assert_args_equivalence1(Orig, New, Name, Key) -> [Key, rabbit_misc:rs(Name), New1, Orig1]) end. -get_config(Key) -> - case dirty_read({rabbit_config, Key}) of - {ok, {rabbit_config, Key, V}} -> {ok, V}; - Other -> Other - end. - -get_config(Key, DefaultValue) -> - case get_config(Key) of - {ok, V} -> V; - {error, not_found} -> DefaultValue - end. - -set_config(Key, Value) -> - ok = mnesia:dirty_write({rabbit_config, Key, Value}). - dirty_read(ReadSpec) -> case mnesia:dirty_read(ReadSpec) of [Result] -> {ok, Result}; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4a5adfaeed..a321488897 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -169,10 +169,6 @@ table_definitions() -> {attributes, record_info(fields, vhost)}, {disc_copies, [node()]}, {match, #vhost{_='_'}}]}, - {rabbit_config, - [{attributes, [key, val]}, % same mnesia's default - {disc_copies, [node()]}, - {match, {rabbit_config, '_', '_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 09c285670e..337f04ab62 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/1, + sync/3, client_init/2, client_terminate/2, client_delete_and_terminate/3, successfully_recovered_state/1]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -136,7 +136,7 @@ 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: (client_msstate(), server(), binary()) -> 'ok'). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). @@ -317,7 +317,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid, Msg}), CState}. + {gen_server2:cast(Server, {write, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -363,7 +363,7 @@ set_maximum_since_use(Server, Age) -> client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), + gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -374,16 +374,16 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState) -> +client_terminate(CState, Server) -> close_all_handles(CState), - ok. + ok = gen_server2:call(Server, client_terminate, infinity). client_delete_and_terminate(CState, Server, Ref) -> - ok = client_terminate(CState), - ok = gen_server2:call(Server, {delete_client, Ref}, infinity). + close_all_handles(CState), + ok = gen_server2:cast(Server, {client_delete, Ref}). successfully_recovered_state(Server) -> - gen_server2:call(Server, successfully_recovered_state, infinity). + gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). %%---------------------------------------------------------------------------- %% Client-side-only helpers @@ -607,16 +607,15 @@ handle_call({new_client_state, CRef}, _From, handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({delete_client, CRef}, _From, - State = #msstate { client_refs = ClientRefs }) -> - reply(ok, - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). +handle_call(client_terminate, _From, State) -> + reply(ok, State). -handle_cast({write, Guid, Msg}, +handle_cast({write, Guid}, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of not_found -> write_message(Guid, Msg, 1, State); @@ -710,7 +709,12 @@ handle_cast({gc_done, Reclaimed, Src, Dst}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State). + noreply(State); + +handle_cast({client_delete, CRef}, + State = #msstate { client_refs = ClientRefs }) -> + noreply( + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). handle_info(timeout, State) -> noreply(internal_sync(State)); diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 3facef17f7..c7a5a60027 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -93,7 +93,14 @@ usage() -> action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), application:load(rabbit), - NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")), + {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts( + getenv("RABBITMQ_NODENAME")), + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + throw({cannot_connect_to_epmd, NodeHost, EpmdReason}); + {ok, _} -> + ok + end, {NodePids, Running} = case list_to_integer(NodeCount) of 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 6baa4b8864..6c5e656407 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -72,72 +72,58 @@ %%--------------------------------------------------------------------------- +-define(IS_SSL(Sock), is_record(Sock, ssl_socket)). -async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> +async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), spawn(fun () -> Pid ! {inet_async, Sock, Ref, - ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} - end), + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), {ok, Ref}; - async_recv(Sock, Length, infinity) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, -1); - async_recv(Sock, Length, Timeout) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, Timeout). -close(Sock) when is_record(Sock, ssl_socket) -> +close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); - close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). - -controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> +controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); - controlling_process(Sock, Pid) when is_port(Sock) -> gen_tcp:controlling_process(Sock, Pid). - -getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> +getstat(Sock, Stats) when ?IS_SSL(Sock) -> inet:getstat(Sock#ssl_socket.tcp, Stats); - getstat(Sock, Stats) when is_port(Sock) -> inet:getstat(Sock, Stats). - -peername(Sock) when is_record(Sock, ssl_socket) -> +peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock#ssl_socket.ssl); - peername(Sock) when is_port(Sock) -> inet:peername(Sock). - -port_command(Sock, Data) when is_record(Sock, ssl_socket) -> +port_command(Sock, Data) when ?IS_SSL(Sock) -> case ssl:send(Sock#ssl_socket.ssl, Data) of - ok -> - self() ! {inet_reply, Sock, ok}, - true; - {error, Reason} -> - erlang:error(Reason) + ok -> self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> erlang:error(Reason) end; - port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). -send(Sock, Data) when is_record(Sock, ssl_socket) -> +send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data); - send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). -sockname(Sock) when is_record(Sock, ssl_socket) -> +sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); - sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index f968b0d890..08272afed4 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -107,7 +107,15 @@ boot_ssl() -> ok; {ok, SslListeners} -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), - {ok, SslOpts} = application:get_env(ssl_options), + {ok, SslOptsConfig} = application:get_env(ssl_options), + SslOpts = + case proplists:get_value(verify, SslOptsConfig, verify_none) of + verify_none -> SslOptsConfig; + verify_peer -> [{verify_fun, fun([]) -> true; + ([_|_]) -> false + end} + | SslOptsConfig] + end, [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], ok end. @@ -118,7 +126,7 @@ start() -> {rabbit_tcp_client_sup, {tcp_client_sup, start_link, [{local, rabbit_tcp_client_sup}, - {rabbit_reader,start_link,[]}]}, + {rabbit_connection_sup,start_link,[]}]}, transient, infinity, supervisor, [tcp_client_sup]}), ok. @@ -204,10 +212,10 @@ on_node_down(Node) -> ok = mnesia:dirty_delete(rabbit_listener, Node). start_client(Sock, SockTransform) -> - {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock, SockTransform}, - Child. + {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), + ok = rabbit_net:controlling_process(Sock, Reader), + Reader ! {go, Sock, SockTransform}, + Reader. start_client(Sock) -> start_client(Sock, fun (S) -> {ok, S} end). @@ -230,8 +238,9 @@ start_ssl_client(SslOpts, Sock) -> end). connections() -> - [Pid || {_, Pid, _, _} <- supervisor:which_children( - rabbit_tcp_client_sup)]. + [rabbit_connection_sup:reader(ConnSup) || + {_, ConnSup, supervisor, _} + <- supervisor:which_children(rabbit_tcp_client_sup)]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index c9f75be030..b23776cd74 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -51,7 +51,7 @@ %%---------------------------------------------------------------------------- start() -> - io:format("Activating RabbitMQ plugins ..."), + io:format("Activating RabbitMQ plugins ...~n"), %% Ensure Rabbit is loaded so we can access it's environment application:load(rabbit), @@ -77,7 +77,7 @@ start() -> AppList end, AppVersions = [determine_version(App) || App <- AllApps], - {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), + RabbitVersion = proplists:get_value(rabbit, AppVersions), %% Build the overall release descriptor RDesc = {release, @@ -130,8 +130,9 @@ start() -> ok -> ok; error -> error("failed to compile boot script file ~s", [ScriptFile]) end, - io:format("~n~w plugins activated:~n", [length(PluginApps)]), - [io:format("* ~w~n", [App]) || App <- PluginApps], + io:format("~w plugins activated:~n", [length(PluginApps)]), + [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)]) + || App <- PluginApps], io:nl(), halt(), ok. @@ -150,29 +151,33 @@ determine_version(App) -> {ok, Vsn} = application:get_key(App, vsn), {App, Vsn}. -assert_dir(Dir) -> - case filelib:is_dir(Dir) of - true -> ok; - false -> ok = filelib:ensure_dir(Dir), - ok = file:make_dir(Dir) - end. - -delete_dir(Dir) -> - case filelib:is_dir(Dir) of +delete_recursively(Fn) -> + case filelib:is_dir(Fn) and not(is_symlink(Fn)) of true -> - case file:list_dir(Dir) of + case file:list_dir(Fn) of {ok, Files} -> - [case Dir ++ "/" ++ F of - Fn -> - case filelib:is_dir(Fn) and not(is_symlink(Fn)) of - true -> delete_dir(Fn); - false -> file:delete(Fn) - end - end || F <- Files] - end, - ok = file:del_dir(Dir); + case lists:foldl(fun ( Fn1, ok) -> delete_recursively( + Fn ++ "/" ++ Fn1); + (_Fn1, Err) -> Err + end, ok, Files) of + ok -> case file:del_dir(Fn) of + ok -> ok; + {error, E} -> {error, + {cannot_delete, Fn, E}} + end; + Err -> Err + end; + {error, E} -> + {error, {cannot_list_files, Fn, E}} + end; false -> - ok + case filelib:is_file(Fn) of + true -> case file:delete(Fn) of + ok -> ok; + {error, E} -> {error, {cannot_delete, Fn, E}} + end; + false -> ok + end end. is_symlink(Name) -> @@ -181,13 +186,18 @@ is_symlink(Name) -> _ -> false end. -unpack_ez_plugins(PluginSrcDir, PluginDestDir) -> +unpack_ez_plugins(SrcDir, DestDir) -> %% Eliminate the contents of the destination directory - delete_dir(PluginDestDir), - - assert_dir(PluginDestDir), - [unpack_ez_plugin(PluginName, PluginDestDir) || - PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")]. + case delete_recursively(DestDir) of + ok -> ok; + {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E]) + end, + case filelib:ensure_dir(DestDir ++ "/") of + ok -> ok; + {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2]) + end, + [unpack_ez_plugin(PluginName, DestDir) || + PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")]. unpack_ez_plugin(PluginFn, PluginDestDir) -> zip:unzip(PluginFn, [{cwd, PluginDestDir}]), @@ -246,8 +256,8 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) -> - [Entry, {apply,{rabbit,prepare,[]}}]; +process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> + [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index b056d60bf6..0a49b94d09 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -33,7 +33,7 @@ -behaviour(gen_server). --export([start_link/0, register/2, delete_all/1, shutdown/1]). +-export([start_link/0, register/2, delete_all/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -49,7 +49,6 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). --spec(shutdown/1 :: (pid()) -> 'ok'). -endif. @@ -64,9 +63,6 @@ register(CollectorPid, Q) -> delete_all(CollectorPid) -> gen_server:call(CollectorPid, delete_all, infinity). -shutdown(CollectorPid) -> - gen_server:cast(CollectorPid, shutdown). - %%---------------------------------------------------------------------------- init([]) -> @@ -90,8 +86,8 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> || {MonitorRef, Q} <- dict:to_list(Queues)], {reply, ok, State}. -handle_cast(shutdown, State) -> - {stop, normal, State}. +handle_cast(Msg, State) -> + {stop, {unhandled_cast, Msg}, State}. handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State = #state{queues = Queues}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a133bf450d..a21961b5e0 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/2]). +-export([init/4, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -46,7 +46,6 @@ -export([emit_stats/1]). -import(gen_tcp). --import(fprof). -import(inet). -import(prim_inet). @@ -60,7 +59,8 @@ %--------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, - connection_state, queue_collector, heartbeater, stats_timer}). + connection_state, queue_collector, heartbeater, stats_timer, + channel_sup_sup_pid, start_heartbeat_fun}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -160,6 +160,12 @@ -ifdef(use_specs). +-type(start_heartbeat_fun() :: + fun ((rabbit_networking:socket(), non_neg_integer()) -> + rabbit_heartbeat:heartbeaters())). + +-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> + rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -168,21 +174,33 @@ -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). +%% These specs only exists to add no_return() to keep dialyzer happy +-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(start_connection/7 :: + (pid(), pid(), pid(), start_heartbeat_fun(), any(), + rabbit_networking:socket(), + fun ((rabbit_networking:socket()) -> + rabbit_types:ok_or_error2( + rabbit_networking:socket(), any()))) -> no_return()). + -endif. %%-------------------------------------------------------------------------- -start_link() -> - {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, + Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent) -> +init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection(Parent, Deb, Sock, SockTransform) + start_connection( + Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + SockTransform) end. system_continue(Parent, Deb, State) -> @@ -208,33 +226,6 @@ info(Pid, Items) -> emit_stats(Pid) -> gen_server:cast(Pid, emit_stats). -setup_profiling() -> - Value = rabbit_misc:get_config(profiling_enabled, false), - case Value of - once -> - rabbit_log:info("Enabling profiling for this connection, " - "and disabling for subsequent.~n"), - rabbit_misc:set_config(profiling_enabled, false), - fprof:trace(start); - true -> - rabbit_log:info("Enabling profiling for this connection.~n"), - fprof:trace(start); - false -> - ok - end, - Value. - -teardown_profiling(Value) -> - case Value of - false -> - ok; - _ -> - rabbit_log:info("Completing profiling for this connection.~n"), - fprof:trace(stop), - fprof:profile(), - fprof:analyse([{dest, []}, {cols, 100}]) - end. - conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -261,7 +252,8 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, Deb, Sock, SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, + Sock, SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -270,28 +262,29 @@ start_connection(Parent, Deb, Sock, SockTransform) -> ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), - ProfilingValue = setup_profiling(), - {ok, Collector} = rabbit_queue_collector:start_link(), try mainloop(Deb, switch_callback( - #v1{parent = Parent, - sock = ClientSock, - connection = #connection{ - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none, - protocol = none}, - callback = uninitialized_callback, - recv_length = 0, - recv_ref = none, - connection_state = pre_init, - queue_collector = Collector, - heartbeater = none, - stats_timer = - rabbit_event:init_stats_timer()}, - handshake, 8)) + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + protocol = none, + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none}, + callback = uninitialized_callback, + recv_length = 0, + recv_ref = none, + connection_state = pre_init, + queue_collector = Collector, + heartbeater = none, + stats_timer = + rabbit_event:init_stats_timer(), + channel_sup_sup_pid = ChannelSupSupPid, + start_heartbeat_fun = StartHeartbeatFun + }, + handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; @@ -308,9 +301,6 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue), - rabbit_misc:unlink_and_capture_exit(Collector), - rabbit_queue_collector:shutdown(Collector), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. @@ -347,10 +337,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> exit(Reason); {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_channel_exit(Channel, Reason, State)); - {'EXIT', Pid, Reason} -> - mainloop(Deb, handle_dependent_exit(Pid, Reason, State)); + {channel_exit, ChannelOrFrPid, Reason} -> + mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); + {'EXIT', ChSupPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -443,33 +433,45 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) -> - {channel, Channel} = get({chpid, ChPid}), +handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> + {channel, Channel} = get({ch_fr_pid, ChFrPid}), handle_exception(State, Channel, Reason); handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, normal, State) -> - erase({chpid, Pid}), - maybe_close(State); -handle_dependent_exit(Pid, Reason, State) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> maybe_close(handle_exception(State, Channel, Reason)) +handle_dependent_exit(ChSupPid, Reason, State) -> + case termination_kind(Reason) of + controlled -> + case erase({ch_sup_pid, ChSupPid}) of + undefined -> ok; + {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) + end, + maybe_close(State); + uncontrolled -> + case channel_cleanup(ChSupPid) of + undefined -> + exit({abnormal_dependent_exit, ChSupPid, Reason}); + Channel -> + maybe_close(handle_exception(State, Channel, Reason)) + end end. -channel_cleanup(Pid) -> - case get({chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> erase({channel, Channel}), - erase({chpid, Pid}), - Channel +channel_cleanup(ChSupPid) -> + case get({ch_sup_pid, ChSupPid}) of + undefined -> undefined; + {{channel, Channel}, ChFr} -> erase({channel, Channel}), + erase(ChFr), + erase({ch_sup_pid, ChSupPid}), + Channel end. -all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. +all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, + {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. terminate_channels() -> - NChannels = length([exit(Pid, normal) || Pid <- all_channels()]), + NChannels = + length([rabbit_framing_channel:shutdown(ChFrPid) + || ChFrPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -487,14 +489,15 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'EXIT', Pid, Reason} -> - case channel_cleanup(Pid) of + {'EXIT', ChSupPid, Reason} -> + case channel_cleanup(ChSupPid) of undefined -> - exit({abnormal_dependent_exit, Pid, Reason}); + exit({abnormal_dependent_exit, ChSupPid, Reason}); Channel -> - case Reason of - normal -> ok; - _ -> + case termination_kind(Reason) of + controlled -> + ok; + uncontrolled -> rabbit_log:error( "connection ~p, channel ~p - " "error while terminating:~n~p~n", @@ -519,6 +522,11 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. +termination_kind(normal) -> controlled; +termination_kind(shutdown) -> controlled; +termination_kind({shutdown, _Term}) -> controlled; +termination_kind(_) -> uncontrolled. + handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) @@ -548,8 +556,8 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of - {chpid, ChPid} -> - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), + {ch_fr_pid, ChFrPid} -> + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), @@ -732,7 +740,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, - sock = Sock}) -> + sock = Sock, + start_heartbeat_fun = SHF}) -> if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w < ~w min size", @@ -742,8 +751,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = rabbit_heartbeat:start_heartbeat( - Sock, ClientHeartbeat), + Heartbeater = SHF(Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, @@ -765,9 +773,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State#v1{connection_state = running, connection = NewConnection}), - rabbit_event:notify( - connection_created, - [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(connection_created, + infos(?CREATION_EVENT_KEYS, State1)), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -849,21 +856,21 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector}) -> - #v1{sock = Sock, connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost, - protocol = Protocol}} = State, - {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), - {ok, ChPid} = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector], - Protocol), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). +send_to_new_channel(Channel, AnalyzedFrame, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + {ok, ChSupPid, ChFrPid} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {Protocol, Sock, Channel, FrameMax, + self(), Username, VHost, Collector}), + put({channel, Channel}, {ch_fr_pid, ChFrPid}), + put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), + put({ch_fr_pid, ChFrPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", @@ -931,5 +938,4 @@ amqp_exception_explanation(Text, Expl) -> end. internal_emit_stats(State) -> - rabbit_event:notify(connection_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ec049a1a2c..bfccb0daa5 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -33,9 +33,7 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([deliver/2, - match_bindings/2, - match_routing_key/2]). +-export([deliver/2, match_bindings/2, match_routing_key/2]). %%---------------------------------------------------------------------------- @@ -45,9 +43,15 @@ -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). +-type(qpids() :: [pid()]). -spec(deliver/2 :: - ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). + (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(match_bindings/2 :: (rabbit_exchange:name(), + fun ((rabbit_types:binding()) -> boolean())) -> + qpids()). +-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> + qpids()). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c07055af40..b541f0f70f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,9 +35,6 @@ -export([all_tests/0, test_parsing/0]). -%% Exported so the hook mechanism can call back --export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]). - -import(lists). -include("rabbit.hrl"). @@ -55,6 +52,8 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + ok = file_handle_cache:set_limit(10), + passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), @@ -1020,7 +1019,8 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - <<"user">>, <<"/">>, self()), + <<"user">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1037,7 +1037,15 @@ test_server_status() -> ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true), %% list bindings - ok = control_action(list_bindings, []), + ok = info_action(list_bindings, rabbit_binding:info_keys(), true), + %% misc binding listing APIs + [_|_] = rabbit_binding:list_for_exchange( + rabbit_misc:r(<<"/">>, exchange, <<"">>)), + [_] = rabbit_binding:list_for_queue( + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), + [_] = rabbit_binding:list_for_exchange_and_queue( + rabbit_misc:r(<<"/">>, exchange, <<"">>), + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), %% list connections [#listener{host = H, port = P} | _] = @@ -1061,67 +1069,23 @@ test_server_status() -> %% cleanup [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], + + unlink(Ch), ok = rabbit_channel:shutdown(Ch), passed. -test_hooks() -> - %% Firing of hooks calls all hooks in an isolated manner - rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}), - rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}), - rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}), - rabbit_hooks:trigger(test_hook, [arg1, arg2]), - [arg1, arg2] = get(test_hook_test_fired), - [arg1, arg2] = get(test_hook_test2_fired), - undefined = get(test_hook2_test2_fired), - - %% Hook Deletion works - put(test_hook_test_fired, undefined), - put(test_hook_test2_fired, undefined), - rabbit_hooks:unsubscribe(test_hook, test), - rabbit_hooks:trigger(test_hook, [arg3, arg4]), - undefined = get(test_hook_test_fired), - [arg3, arg4] = get(test_hook_test2_fired), - undefined = get(test_hook2_test2_fired), - - %% Catches exceptions from bad hooks - rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}), - ok = rabbit_hooks:trigger(test_hook3, []), - - %% Passing extra arguments to hooks - rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}), - rabbit_hooks:trigger(arg_hook, [arg1, arg2]), - {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), - - %% Invoking Pids - Remote = fun () -> - receive - {rabbitmq_hook,[remote_test,test,[],Target]} -> - Target ! invoked - end - end, - P = spawn(Remote), - rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}), - rabbit_hooks:trigger(remote_test, []), - receive - invoked -> ok - after 100 -> - io:format("Remote hook not invoked"), - throw(timeout) - end, - passed. - test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, - <<"/">>, self()), + {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, + <<"guest">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), - MRef = erlang:monitor(process, Ch), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) end, - {Writer, Ch, MRef}. + {Writer, Ch}. test_statistics_receiver(Pid) -> receive @@ -1160,7 +1124,7 @@ test_statistics() -> %% by far the most complex code though. %% Set up a channel and queue - {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1), + {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1), rabbit_channel:do(Ch, #'queue.declare'{}), QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 @@ -1404,17 +1368,35 @@ delete_log_handlers(Handlers) -> Handler <- Handlers], ok. -handle_hook(HookName, Handler, Args) -> - A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired", - put(list_to_atom(A), Args). -bad_handle_hook(_, _, _) -> - exit(bad_handle_hook_called). -extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> - handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). - test_supervisor_delayed_restart() -> test_sup:test_supervisor_delayed_restart(). +test_file_handle_cache() -> + %% test copying when there is just one spare handle + Limit = file_handle_cache:get_limit(), + ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores + TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"), + ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")), + Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open( + filename:join(TmpDir, "file3"), + [write], []), + receive close -> ok end, + file_handle_cache:delete(Hdl) + end), + Src = filename:join(TmpDir, "file1"), + Dst = filename:join(TmpDir, "file2"), + Content = <<"foo">>, + ok = file:write_file(Src, Content), + {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), + {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), + Size = size(Content), + {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), + ok = file_handle_cache:delete(SrcHdl), + file_handle_cache:delete(DstHdl), + Pid ! close, + ok = file_handle_cache:set_limit(Limit), + passed. + test_backing_queue() -> case application:get_env(rabbit, backing_queue_module) of {ok, rabbit_variable_queue} -> @@ -1486,7 +1468,7 @@ msg_store_remove(Guids) -> foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L)). + rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). test_msg_store() -> restart_msg_store_empty(), @@ -1549,7 +1531,7 @@ test_msg_store() -> ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), %% read the second half again, just for fun (aka code coverage) MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), - ok = rabbit_msg_store:client_terminate(MSCState7), + ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( @@ -1574,7 +1556,7 @@ test_msg_store() -> {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState9)), + msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE), ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), %% restart empty restart_msg_store_empty(), %% now safe to reuse guids diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 47e8bb0161..9dfd33bd87 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -118,7 +118,8 @@ -type(binding() :: #binding{exchange_name :: rabbit_exchange:name(), queue_name :: rabbit_amqqueue:name(), - key :: rabbit_exchange:binding_key()}). + key :: rabbit_binding:key(), + args :: rabbit_framing:amqp_table()}). -type(amqqueue() :: #amqqueue{name :: rabbit_amqqueue:name(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0f52eee84f..30d3a8aec1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -439,9 +439,10 @@ terminate(State) -> remove_pending_ack(true, tx_commit_index(State)), case MSCStateP of undefined -> ok; - _ -> rabbit_msg_store:client_terminate(MSCStateP) + _ -> rabbit_msg_store:client_terminate( + MSCStateP, ?PERSISTENT_MSG_STORE) end, - rabbit_msg_store:client_terminate(MSCStateT), + rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], @@ -464,8 +465,7 @@ delete_and_terminate(State) -> case MSCStateP of undefined -> ok; _ -> rabbit_msg_store:client_delete_and_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE, PRef), - rabbit_msg_store:client_terminate(MSCStateP) + MSCStateP, ?PERSISTENT_MSG_STORE, PRef) end, rabbit_msg_store:client_delete_and_terminate( MSCStateT, ?TRANSIENT_MSG_STORE, TRef), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index f90ee734a8..aa986e542b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,9 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/4, start_link/4, shutdown/1, mainloop/1]). --export([send_command/2, send_command/3, send_command_and_signal_back/3, - send_command_and_signal_back/4, send_command_and_notify/5]). +-export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([send_command/2, send_command/3, send_command_sync/2, + send_command_sync/3, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). @@ -48,24 +48,23 @@ -ifdef(use_specs). --spec(start/4 :: +-spec(start/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). --spec(start_link/4 :: +-spec(start_link/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(send_command_and_signal_back/3 :: - (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok'). --spec(send_command_and_signal_back/4 :: - (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid()) - -> 'ok'). +-spec(send_command_sync/2 :: + (pid(), rabbit_framing:amqp_method()) -> 'ok'). +-spec(send_command_sync/3 :: + (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) @@ -84,68 +83,61 @@ %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol) -> - {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}])}. - -start_link(Sock, Channel, FrameMax, Protocol) -> - {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, +start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + {ok, + proc_lib:spawn(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}])}. -mainloop(State) -> +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + {ok, + proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}])}. + +mainloop(ReaderPid, State) -> + try + mainloop1(ReaderPid, State) + catch + exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + end, + done. + +mainloop1(ReaderPid, State) -> receive - Message -> ?MODULE:mainloop(handle_message(Message, State)) + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) + erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) end. -handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), +handle_message({send_command, MethodRecord}, State) -> + ok = internal_send_command_async(MethodRecord, State), State; -handle_message({send_command, MethodRecord, Content}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), +handle_message({send_command, MethodRecord, Content}, State) -> + ok = internal_send_command_async(MethodRecord, Content, State), State; -handle_message({send_command_and_signal_back, MethodRecord, Parent}, - State = #wstate{sock = Sock, channel = Channel, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), - Parent ! rabbit_writer_send_command_signal, +handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> + ok = internal_send_command_async(MethodRecord, State), + gen_server:reply(From, ok), State; -handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), - Parent ! rabbit_writer_send_command_signal, +handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, + State) -> + ok = internal_send_command_async(MethodRecord, Content, State), + gen_server:reply(From, ok), State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), + State) -> + ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); -handle_message(shutdown, _State) -> - exit(normal); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). @@ -159,29 +151,28 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. -send_command_and_signal_back(W, MethodRecord, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Parent}, - ok. +send_command_sync(W, MethodRecord) -> + call(W, {send_command_sync, MethodRecord}). -send_command_and_signal_back(W, MethodRecord, Content, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, - ok. +send_command_sync(W, MethodRecord, Content) -> + call(W, {send_command_sync, MethodRecord, Content}). send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -shutdown(W) -> - W ! shutdown, - rabbit_misc:unlink_and_capture_exit(W), - ok. +%--------------------------------------------------------------------------- + +call(Pid, Msg) -> + {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity), + Res. %--------------------------------------------------------------------------- assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord, - Protocol). + rabbit_binary_generator:build_simple_method_frame( + Channel, MethodRecord, Protocol). assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), @@ -223,12 +214,18 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord, Protocol) -> +internal_send_command_async(MethodRecord, + #wstate{sock = Sock, + channel = Channel, + protocol = Protocol}) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, - Protocol) -> +internal_send_command_async(MethodRecord, Content, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol)), ok. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 8788303752..93adfcb1b5 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -34,7 +34,9 @@ %% 4) Added an 'intrinsic' restart type. Like the transient type, this %% type means the child should only be restarted if the child exits %% abnormally. Unlike the transient type, if the child exits -%% normally, the supervisor itself also exits normally. +%% normally, the supervisor itself also exits normally. If the +%% child is a supervisor and it exits normally (i.e. with reason of +%% 'shutdown') then the child's parent also exits normally. %% %% All modifications are (C) 2010 Rabbit Technologies Ltd. %% @@ -63,7 +65,7 @@ -export([start_link/2,start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, + which_children/1, find_child/2, check_childspecs/1]). -export([behaviour_info/1]). @@ -138,6 +140,10 @@ terminate_child(Supervisor, Name) -> which_children(Supervisor) -> call(Supervisor, which_children). +find_child(Supervisor, Name) -> + [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor), + Name1 =:= Name]. + call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). @@ -541,6 +547,9 @@ do_restart(permanent, Reason, Child, State) -> restart(Child, State); do_restart(intrinsic, normal, Child, State) -> {shutdown, state_del_child(Child, State)}; +do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor}, + State) -> + {shutdown, state_del_child(Child, State)}; do_restart(_, normal, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl index 1b78584384..02d7e0e40d 100644 --- a/src/tcp_client_sup.erl +++ b/src/tcp_client_sup.erl @@ -31,19 +31,19 @@ -module(tcp_client_sup). --behaviour(supervisor). +-behaviour(supervisor2). -export([start_link/1, start_link/2]). -export([init/1]). start_link(Callback) -> - supervisor:start_link(?MODULE, Callback). + supervisor2:start_link(?MODULE, Callback). start_link(SupName, Callback) -> - supervisor:start_link(SupName, ?MODULE, Callback). + supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one, 10, 10}, + {ok, {{simple_one_for_one_terminate, 10, 10}, [{tcp_client, {M,F,A}, - temporary, brutal_kill, worker, [M]}]}}. + temporary, infinity, supervisor, [M]}]}}. |
