diff options
33 files changed, 1969 insertions, 654 deletions
@@ -11,6 +11,7 @@ syntax: regexp ^include/rabbit_framing.hrl$ ^src/rabbit_framing.erl$ ^rabbit.plt$ +^ebin/rabbit.app$ ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ @@ -7,7 +7,8 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -39,25 +40,31 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e #all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl +$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app + escript generate_app $(EBIN_DIR) < $< > $@ + +$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl erlc $(ERLC_OPTS) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $< + +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam + erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< +# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) > $@ + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) > $@ + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' -dialyze: $(TARGETS) +dialyze: $(BEAM_TARGETS) dialyzer -c $? clean: cleandb rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz @@ -120,10 +127,10 @@ srcdist: distclean cp INSTALL.in $(TARGET_SRC_DIR)/INSTALL elinks -dump -no-references -no-numbering $(WEB_URL)install.html \ >> $(TARGET_SRC_DIR)/INSTALL - cp BUILD.in $(TARGET_SRC_DIR)/BUILD + cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ >> $(TARGET_SRC_DIR)/BUILD - sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app + sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile $(TARGET_SRC_DIR) diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index b9edd5847f..e9b9514ef6 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -26,7 +26,7 @@ B<-n> I<node> startup time). The output of hostname -s is usually the correct suffix to use after the "@" sign. See rabbitmq-server(1) for details of configuring the RabbitMQ broker. - + B<-q> quiet output mode is selected with the B<-q> flag. Informational messages are suppressed when quiet mode is in effect. @@ -43,32 +43,32 @@ stop_app This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g. I<reset>. - + start_app start the RabbitMQ application. This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g. I<reset>. - + status display various information about the RabbitMQ broker, such as whether the RabbitMQ application on the current node, its version number, what nodes are part of the broker, which of these are running. - + force return a RabbitMQ node to its virgin state. Removes the node from any cluster it belongs to, removes all data from the management database, such as configured users, vhosts and deletes all persistent messages. - + force_reset the same as I<force> command, but resets the node unconditionally, regardless of the current management database state and cluster configuration. It should only be used as a last resort if the database or cluster configuration has been corrupted. - + rotate_logs [suffix] instruct the RabbitMQ node to rotate the log files. The RabbitMQ broker will attempt to append the current contents of the log file @@ -81,48 +81,57 @@ rotate_logs [suffix] specified. This command might be helpful when you are e.g. writing your own logrotate script and you do not want to restart the RabbitMQ node. - + cluster I<clusternode> ... instruct the node to become member of a cluster with the specified nodes determined by I<clusternode> option(s). See http://www.rabbitmq.com/clustering.html for more information about clustering. - + =head2 USER MANAGEMENT - + add_user I<username> I<password> create a user named I<username> with (initial) password I<password>. - + +delete_user I<username> + delete the user named I<username>. + change_password I<username> I<newpassword> change the password for the user named I<username> to I<newpassword>. list_users list all users. - + =head2 ACCESS CONTROL add_vhost I<vhostpath> create a new virtual host called I<vhostpath>. - + delete_vhost I<vhostpath> delete a virtual host I<vhostpath>. That command deletes also all its exchanges, queues and user mappings. - + list_vhosts list all virtual hosts. - -map_user_vhost I<username> I<vhostpath> - grant the user named I<username> access to the virtual host called - I<vhostpath>. - -unmap_user_vhost I<username> I<vhostpath> - deny the user named I<username> access to the virtual host called + +set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> + set the permissions for the user named I<username> in the virtual + host I<vhostpath>, granting them configuration access to resources + with names matching the first I<regexp> and messaging access to + resources with names matching the second I<regexp>. + +clear_permissions [-p I<vhostpath>] I<username> + remove the permissions for the user named I<username> in the + virtual host I<vhostpath>. + +list_permissions [-p I<vhostpath>] + list all the users and their permissions in the virtual host I<vhostpath>. -list_user_vhost I<username> - list all the virtual hosts to which the user named I<username> has - been granted access. - +list_user_permissions I<username> + list the permissions of the user named I<username> across all + virtual hosts. + =head2 SERVER STATUS list_queues [-p I<vhostpath>] [I<queueinfoitem> ...] @@ -146,23 +155,24 @@ auto_delete arguments queue arguments -pid - Erlang process identifier associated with the queue +node + node on which the process associated with the queue resides messages_ready - number of ready messages + number of messages ready to be delivered to clients messages_unacknowledged - number of unacknowledged messages + number of messages delivered to clients but not yet acknowledged messages_uncommitted - number of uncommitted messages + number of messages published in as yet uncommitted transactions messages sum of ready, unacknowledged and uncommitted messages acks_uncommitted - number of uncommitted acknowledgements + number of acknowledgements received in as yet uncommitted + transactions consumers number of consumers @@ -214,8 +224,8 @@ list_connections [I<connectioninfoitem> ...] =over 4 -pid - Erlang process id associated with the connection +node + node on which the process associated with the connection resides address server IP number @@ -228,7 +238,7 @@ peer_address peer_port peer port - + state connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>, B<running>, B<closing>, B<closed>) @@ -262,7 +272,7 @@ send_cnt send_pend send queue size - + =back The list_queues, list_exchanges and list_bindings commands accept an @@ -276,12 +286,12 @@ Create a user named foo with (initial) password bar at the Erlang node rabbit@test: rabbitmqctl -n rabbit@test add_user foo bar - + Grant user named foo access to the virtual host called test at the default Erlang node: rabbitmqctl map_user_vhost foo test - + Append the current logs' content to the files with ".1" suffix and reopen them: diff --git a/ebin/rabbit.app b/ebin/rabbit.app deleted file mode 100644 index 9e78d619f8..0000000000 --- a/ebin/rabbit.app +++ /dev/null @@ -1,60 +0,0 @@ -{application, rabbit, %% -*- erlang -*- - [{description, "RabbitMQ"}, - {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, - {modules, [buffering_proxy, - rabbit_access_control, - rabbit_alarm, - rabbit_amqqueue, - rabbit_amqqueue_process, - rabbit_amqqueue_sup, - rabbit_binary_generator, - rabbit_binary_parser, - rabbit_channel, - rabbit_control, - rabbit, - rabbit_error_logger, - rabbit_error_logger_file_h, - rabbit_exchange, - rabbit_framing_channel, - rabbit_framing, - rabbit_heartbeat, - rabbit_load, - rabbit_log, - rabbit_memsup_linux, - rabbit_misc, - rabbit_mnesia, - rabbit_multi, - rabbit_networking, - rabbit_net, - rabbit_node_monitor, - rabbit_persister, - rabbit_reader, - rabbit_router, - rabbit_sasl_report_file_h, - rabbit_sup, - rabbit_tests, - rabbit_tracer, - rabbit_writer, - tcp_acceptor, - tcp_acceptor_sup, - tcp_client_sup, - tcp_listener, - tcp_listener_sup]}, - {registered, [rabbit_amqqueue_sup, - rabbit_log, - rabbit_node_monitor, - rabbit_persister, - rabbit_router, - rabbit_sup, - rabbit_tcp_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, - {mod, {rabbit, []}}, - {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, - {extra_startup_steps, []}, - {default_user, <<"guest">>}, - {default_pass, <<"guest">>}, - {default_vhost, <<"/">>}, - {ssl_listeners, []}, - {ssl_options, []}, - {memory_alarms, auto}]}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in new file mode 100644 index 0000000000..5dd701b028 --- /dev/null +++ b/ebin/rabbit_app.in @@ -0,0 +1,23 @@ +{application, rabbit, %% -*- erlang -*- + [{description, "RabbitMQ"}, + {id, "RabbitMQ"}, + {vsn, "%%VERSION%%"}, + {modules, []}, + {registered, [rabbit_amqqueue_sup, + rabbit_log, + rabbit_node_monitor, + rabbit_persister, + rabbit_router, + rabbit_sup, + rabbit_tcp_client_sup]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {mod, {rabbit, []}}, + {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {extra_startup_steps, []}, + {default_user, <<"guest">>}, + {default_pass, <<"guest">>}, + {default_vhost, <<"/">>}, + {default_permissions, [<<".*">>, <<".*">>]}, + {ssl_listeners, []}, + {ssl_options, []}, + {memory_alarms, auto}]}]}. diff --git a/generate_app b/generate_app new file mode 100644 index 0000000000..623012927e --- /dev/null +++ b/generate_app @@ -0,0 +1,10 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +main([BeamDir]) -> + Modules = [list_to_atom(filename:basename(F, ".beam")) || + F <- filelib:wildcard("*.beam", BeamDir)], + {ok, {application, Application, Properties}} = io:read(''), + NewProperties = lists:keyreplace(modules, 1, Properties, + {modules, Modules}), + io:format("~p.", [{application, Application, NewProperties}]). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b6faf4f06f..c42df1eab3 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -30,7 +30,9 @@ %% -record(user, {username, password}). +-record(permission, {configuration, messaging}). -record(user_vhost, {username, virtual_host}). +-record(user_permission, {user_vhost, permission}). -record(vhost, {virtual_host, dummy}). @@ -77,6 +79,7 @@ -type(thunk(T) :: fun(() -> T)). -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). +-type(regexp() :: binary()). %% this is really an abstract type, but dialyzer does not support them -type(guid() :: any()). @@ -91,6 +94,9 @@ -type(user() :: #user{username :: username(), password :: password()}). +-type(permission() :: + #permission{configuration :: regexp(), + messaging :: regexp()}). -type(amqqueue() :: #amqqueue{name :: queue_name(), durable :: bool(), diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index ffcd11ac61..a006a5a7a2 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -16,7 +16,6 @@ # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON_NAME=rabbitmq-multi DAEMON=/usr/lib/rabbitmq/bin/$DAEMON_NAME NAME=rabbitmq-server diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 241afd713c..fc109bdbcc 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -118,6 +118,9 @@ fi rm -rf %{buildroot} %changelog +* Mon Jan 19 2009 Ben Hood <0x6e6562@gmail.com> 1.5.1-1 +- Maintenance release for the 1.5.x series + * Wed Dec 17 2008 Matthias Radestock <matthias@lshift.net> 1.5.0-1 - New upstream release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index e8be8d8d8c..37b01dabb8 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (1.5.1-1) hardy; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@lshift.net> Mon, 19 Jan 2009 15:46:13 +0000 + rabbitmq-server (1.5.0-1) testing; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d index ace474c59f..70dd0adf5f 100644 --- a/packaging/debs/Debian/debian/init.d +++ b/packaging/debs/Debian/debian/init.d @@ -9,7 +9,6 @@ # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON=/usr/lib/rabbitmq/bin/rabbitmq-multi NAME=rabbitmq-server DESC=rabbitmq-server diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 495b8331f0..05fb179cbf 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -25,8 +25,8 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then - adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq --no-create-home rabbitmq - usermod -c "RabbitMQ messaging server" rabbitmq + adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ + --no-create-home --gecos "RabbitMQ messaging server" rabbitmq fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq diff --git a/packaging/debs/Debian/debian/watch b/packaging/debs/Debian/debian/watch new file mode 100644 index 0000000000..b41aff9aed --- /dev/null +++ b/packaging/debs/Debian/debian/watch @@ -0,0 +1,4 @@ +version=3 + +http://www.rabbitmq.com/releases/rabbitmq-server/v(.*)/rabbitmq-server-(\d.*)\.tar\.gz \ + debian uupdate diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index 9d16fd9fb3..59101cb2c6 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -15,7 +15,7 @@ dist: mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile - rm -f $(SOURCE_DIR)/BUILD + rm -f $(SOURCE_DIR)/README rm -rf $(SOURCE_DIR)/docs mv $(SOURCE_DIR) $(TARGET_DIR) diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl deleted file mode 100644 index 344b719a3c..0000000000 --- a/src/buffering_proxy.erl +++ /dev/null @@ -1,108 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(buffering_proxy). - --export([start_link/2]). - -%% internal - --export([mainloop/4, drain/2]). --export([proxy_loop/3]). - --define(HIBERNATE_AFTER, 5000). - -%%---------------------------------------------------------------------------- - -start_link(M, A) -> - spawn_link( - fun () -> process_flag(trap_exit, true), - ProxyPid = self(), - Ref = make_ref(), - Pid = spawn_link( - fun () -> ProxyPid ! Ref, - mainloop(ProxyPid, Ref, M, - M:init(ProxyPid, A)) end), - proxy_loop(Ref, Pid, empty) - end). - -%%---------------------------------------------------------------------------- - -mainloop(ProxyPid, Ref, M, State) -> - NewState = - receive - {Ref, Messages} -> - NewSt = - lists:foldl(fun (Msg, S) -> - drain(M, M:handle_message(Msg, S)) - end, State, lists:reverse(Messages)), - ProxyPid ! Ref, - NewSt; - Msg -> M:handle_message(Msg, State) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, - [ProxyPid, Ref, M, State]) - end, - ?MODULE:mainloop(ProxyPid, Ref, M, NewState). - -drain(M, State) -> - receive - Msg -> ?MODULE:drain(M, M:handle_message(Msg, State)) - after 0 -> - State - end. - -proxy_loop(Ref, Pid, State) -> - receive - Ref -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> waiting; - waiting -> exit(duplicate_next); - Messages -> Pid ! {Ref, Messages}, empty - end); - {'EXIT', Pid, Reason} -> - exit(Reason); - {'EXIT', _, Reason} -> - exit(Pid, Reason), - ?MODULE:proxy_loop(Ref, Pid, State); - Msg -> - ?MODULE:proxy_loop( - Ref, Pid, - case State of - empty -> [Msg]; - waiting -> Pid ! {Ref, [Msg]}, empty; - Messages -> [Msg | Messages] - end) - after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State]) - end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl new file mode 100644 index 0000000000..11bb66d743 --- /dev/null +++ b/src/gen_server2.erl @@ -0,0 +1,854 @@ +%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) the module name is gen_server2 +%% +%% 2) more efficient handling of selective receives in callbacks +%% gen_server2 processes drain their message queue into an internal +%% buffer before invoking any callback module functions. Messages are +%% dequeued from the buffer for processing. Thus the effective message +%% queue of a gen_server2 process is the concatenation of the internal +%% buffer and the real message queue. +%% As a result of the draining, any selective receive invoked inside a +%% callback is less likely to have to scan a large message queue. +%% +%% 3) gen_server2:cast is guaranteed to be order-preserving +%% The original code could reorder messages when communicating with a +%% process on a remote node that was not currently connected. +%% +%% All modifications are (C) 2009 LShift Ltd. + +%% ``The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved via the world wide web at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. +%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings +%% AB. All Rights Reserved.'' +%% +%% $Id$ +%% +-module(gen_server2). + +%%% --------------------------------------------------- +%%% +%%% The idea behind THIS server is that the user module +%%% provides (different) functions to handle different +%%% kind of inputs. +%%% If the Parent process terminates the Module:terminate/2 +%%% function is called. +%%% +%%% The user module should export: +%%% +%%% init(Args) +%%% ==> {ok, State} +%%% {ok, State, Timeout} +%%% ignore +%%% {stop, Reason} +%%% +%%% handle_call(Msg, {From, Tag}, State) +%%% +%%% ==> {reply, Reply, State} +%%% {reply, Reply, State, Timeout} +%%% {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, Reply, State} +%%% Reason = normal | shutdown | Term terminate(State) is called +%%% +%%% handle_cast(Msg, State) +%%% +%%% ==> {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term terminate(State) is called +%%% +%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... +%%% +%%% ==> {noreply, State} +%%% {noreply, State, Timeout} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% terminate(Reason, State) Let the user module clean up +%%% always called when server terminates +%%% +%%% ==> ok +%%% +%%% +%%% The work flow (of the server) can be described as follows: +%%% +%%% User module Generic +%%% ----------- ------- +%%% start -----> start +%%% init <----- . +%%% +%%% loop +%%% handle_call <----- . +%%% -----> reply +%%% +%%% handle_cast <----- . +%%% +%%% handle_info <----- . +%%% +%%% terminate <----- . +%%% +%%% -----> reply +%%% +%%% +%%% --------------------------------------------------- + +%% API +-export([start/3, start/4, + start_link/3, start_link/4, + call/2, call/3, + cast/2, reply/2, + abcast/2, abcast/3, + multi_call/2, multi_call/3, multi_call/4, + enter_loop/3, enter_loop/4, enter_loop/5]). + +-export([behaviour_info/1]). + +%% System exports +-export([system_continue/3, + system_terminate/4, + system_code_change/4, + format_status/2]). + +%% Internal exports +-export([init_it/6, print_event/3]). + +-import(error_logger, [format/2]). + +%%%========================================================================= +%%% API +%%%========================================================================= + +behaviour_info(callbacks) -> + [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, + {terminate,2},{code_change,3}]; +behaviour_info(_Other) -> + undefined. + +%%% ----------------------------------------------------------------- +%%% Starts a generic server. +%%% start(Mod, Args, Options) +%%% start(Name, Mod, Args, Options) +%%% start_link(Mod, Args, Options) +%%% start_link(Name, Mod, Args, Options) where: +%%% Name ::= {local, atom()} | {global, atom()} +%%% Mod ::= atom(), callback module implementing the 'real' server +%%% Args ::= term(), init arguments (to Mod:init/1) +%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}] +%%% Flag ::= trace | log | {logfile, File} | statistics | debug +%%% (debug == log && statistics) +%%% Returns: {ok, Pid} | +%%% {error, {already_started, Pid}} | +%%% {error, Reason} +%%% ----------------------------------------------------------------- +start(Mod, Args, Options) -> + gen:start(?MODULE, nolink, Mod, Args, Options). + +start(Name, Mod, Args, Options) -> + gen:start(?MODULE, nolink, Name, Mod, Args, Options). + +start_link(Mod, Args, Options) -> + gen:start(?MODULE, link, Mod, Args, Options). + +start_link(Name, Mod, Args, Options) -> + gen:start(?MODULE, link, Name, Mod, Args, Options). + + +%% ----------------------------------------------------------------- +%% Make a call to a generic server. +%% If the server is located at another node, that node will +%% be monitored. +%% If the client is trapping exits and is linked server termination +%% is handled here (? Shall we do that here (or rely on timeouts) ?). +%% ----------------------------------------------------------------- +call(Name, Request) -> + case catch gen:call(Name, '$gen_call', Request) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request]}}) + end. + +call(Name, Request, Timeout) -> + case catch gen:call(Name, '$gen_call', Request, Timeout) of + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) + end. + +%% ----------------------------------------------------------------- +%% Make a cast to a generic server. +%% ----------------------------------------------------------------- +cast({global,Name}, Request) -> + catch global:send(Name, cast_msg(Request)), + ok; +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_atom(Dest) -> + do_cast(Dest, Request); +cast(Dest, Request) when is_pid(Dest) -> + do_cast(Dest, Request). + +do_cast(Dest, Request) -> + do_send(Dest, cast_msg(Request)), + ok. + +cast_msg(Request) -> {'$gen_cast',Request}. + +%% ----------------------------------------------------------------- +%% Send a reply to the client. +%% ----------------------------------------------------------------- +reply({To, Tag}, Reply) -> + catch To ! {Tag, Reply}. + +%% ----------------------------------------------------------------- +%% Asyncronous broadcast, returns nothing, it's just send'n prey +%%----------------------------------------------------------------- +abcast(Name, Request) when is_atom(Name) -> + do_abcast([node() | nodes()], Name, cast_msg(Request)). + +abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) -> + do_abcast(Nodes, Name, cast_msg(Request)). + +do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) -> + do_send({Name,Node},Msg), + do_abcast(Nodes, Name, Msg); +do_abcast([], _,_) -> abcast. + +%%% ----------------------------------------------------------------- +%%% Make a call to servers at several nodes. +%%% Returns: {[Replies],[BadNodes]} +%%% A Timeout can be given +%%% +%%% A middleman process is used in case late answers arrives after +%%% the timeout. If they would be allowed to glog the callers message +%%% queue, it would probably become confused. Late answers will +%%% now arrive to the terminated middleman and so be discarded. +%%% ----------------------------------------------------------------- +multi_call(Name, Req) + when is_atom(Name) -> + do_multi_call([node() | nodes()], Name, Req, infinity). + +multi_call(Nodes, Name, Req) + when is_list(Nodes), is_atom(Name) -> + do_multi_call(Nodes, Name, Req, infinity). + +multi_call(Nodes, Name, Req, infinity) -> + do_multi_call(Nodes, Name, Req, infinity); +multi_call(Nodes, Name, Req, Timeout) + when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> + do_multi_call(Nodes, Name, Req, Timeout). + + +%%----------------------------------------------------------------- +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive +%% loop and become a gen_server process. +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the +%% process, including registering a name for it. +%%----------------------------------------------------------------- +enter_loop(Mod, Options, State) -> + enter_loop(Mod, Options, State, self(), infinity). + +enter_loop(Mod, Options, State, ServerName = {_, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity); + +enter_loop(Mod, Options, State, Timeout) -> + enter_loop(Mod, Options, State, self(), Timeout). + +enter_loop(Mod, Options, State, ServerName, Timeout) -> + Name = get_proc_name(ServerName), + Parent = get_parent(), + Debug = debug_options(Name, Options), + Queue = queue:new(), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + +%%%======================================================================== +%%% Gen-callback functions +%%%======================================================================== + +%%% --------------------------------------------------- +%%% Initiate the new process. +%%% Register the name using the Rfunc function +%%% Calls the Mod:init/Args function. +%%% Finally an acknowledge is sent to Parent and the main +%%% loop is entered. +%%% --------------------------------------------------- +init_it(Starter, self, Name, Mod, Args, Options) -> + init_it(Starter, self(), Name, Mod, Args, Options); +init_it(Starter, Parent, Name, Mod, Args, Options) -> + Debug = debug_options(Name, Options), + Queue = queue:new(), + case catch Mod:init(Args) of + {ok, State} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, infinity, Queue, Debug); + {ok, State, Timeout} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + {stop, Reason} -> + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + ignore -> + proc_lib:init_ack(Starter, ignore), + exit(normal); + {'EXIT', Reason} -> + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + Else -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error) + end. + +%%%======================================================================== +%%% Internal functions +%%%======================================================================== +%%% --------------------------------------------------- +%%% The MAIN loop. +%%% --------------------------------------------------- +loop(Parent, Name, State, Mod, Time, Queue, Debug) -> + receive + Input -> loop(Parent, Name, State, Mod, + Time, queue:in(Input, Queue), Debug) + after 0 -> + case queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, Msg); + {empty, Queue1} -> + receive + Input -> + loop(Parent, Name, State, Mod, + Time, queue:in(Input, Queue1), Debug) + after Time -> + process_msg(Parent, Name, State, Mod, + Time, Queue1, Debug, timeout) + end + end + end. + +process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> + case Msg of + {system, From, Req} -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, Queue]); + {'EXIT', Parent, Reason} -> + terminate(Reason, Name, Msg, Mod, State, Debug); + _Msg when Debug =:= [] -> + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + _Msg -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Name, {in, Msg}), + handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + end. + +%%% --------------------------------------------------- +%%% Send/recive functions +%%% --------------------------------------------------- +do_send(Dest, Msg) -> + catch erlang:send(Dest, Msg). + +do_multi_call(Nodes, Name, Req, infinity) -> + Tag = make_ref(), + Monitors = send_nodes(Nodes, Name, Tag, Req), + rec_nodes(Tag, Monitors, Name, undefined); +do_multi_call(Nodes, Name, Req, Timeout) -> + Tag = make_ref(), + Caller = self(), + Receiver = + spawn( + fun() -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), + Mref = erlang:monitor(process, Receiver), + Receiver ! {self(),Tag}, + receive + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) + end. + +send_nodes(Nodes, Name, Tag, Req) -> + send_nodes(Nodes, Name, Tag, Req, []). + +send_nodes([Node|Tail], Name, Tag, Req, Monitors) + when is_atom(Node) -> + Monitor = start_monitor(Node, Name), + %% Handle non-existing names in rec_nodes. + catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req}, + send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]); +send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> + %% Skip non-atom Node + send_nodes(Tail, Name, Tag, Req, Monitors); +send_nodes([], _Name, _Tag, _Req, Monitors) -> + Monitors. + +%% Against old nodes: +%% If no reply has been delivered within 2 secs. (per node) check that +%% the server really exists and wait for ever for the answer. +%% +%% Against contemporary nodes: +%% Wait for reply, server 'DOWN', or timeout from TimerId. + +rec_nodes(Tag, Nodes, Name, TimerId) -> + rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). + +rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + unmonitor(R), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> + %% R6 node + receive + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + after Time -> + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end + end; +rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> + case catch erlang:cancel_timer(TimerId) of + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok + end, + {Replies, Badnodes}. + +%% Collect all replies that already have arrived +rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> + receive + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + after 0 -> + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> + %% R6 node + receive + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + after 0 -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + end; +rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> + {Replies, Badnodes}. + + +%%% --------------------------------------------------- +%%% Monitor functions +%%% --------------------------------------------------- + +start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> + if node() =:= nonode@nohost, Node =/= nonode@nohost -> + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; + true -> + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end + end. + +%% Cancels a monitor started with Ref=erlang:monitor(_, _). +unmonitor(Ref) when is_reference(Ref) -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, _, _, _} -> + true + after 0 -> + true + end. + +%%% --------------------------------------------------- +%%% Message handling functions +%%% --------------------------------------------------- + +dispatch({'$gen_cast', Msg}, Mod, State) -> + Mod:handle_cast(Msg, State); +dispatch(Info, Mod, State) -> + Mod:handle_info(Info, State). + +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue) -> + case catch Mod:handle_call(Msg, From, State) of + {reply, Reply, NState} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {reply, Reply, NState, Time1} -> + reply(From, Reply), + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Name, Msg, Mod, NState, [])), + reply(From, Reply), + exit(R); + Other -> handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue) + end; +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue) -> + Reply = (catch dispatch(Msg, Mod, State)), + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + +handle_msg({'$gen_call', From, Msg}, + Parent, Name, State, Mod, _Time, Queue, Debug) -> + case catch Mod:handle_call(Msg, From, State) of + {reply, Reply, NState} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {reply, Reply, NState, Time1} -> + Debug1 = reply(Name, From, Reply, NState, Debug), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {noreply, NState} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {noreply, NState, Time1} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), + reply(Name, From, Reply, NState, Debug), + exit(R); + Other -> + handle_common_reply(Other, + Parent, Name, Msg, Mod, State, Queue, Debug) + end; +handle_msg(Msg, + Parent, Name, State, Mod, _Time, Queue, Debug) -> + Reply = (catch dispatch(Msg, Mod, State)), + handle_common_reply(Reply, + Parent, Name, Msg, Mod, State, Queue, Debug). + +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> + case Reply of + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, Queue, []); + {noreply, NState, Time1} -> + loop(Parent, Name, NState, Mod, Time1, Queue, []); + {stop, Reason, NState} -> + terminate(Reason, Name, Msg, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, Msg, Mod, State, []); + _ -> + terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + end. + +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> + case Reply of + {noreply, NState} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + {noreply, NState, Time1} -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + {stop, Reason, NState} -> + terminate(Reason, Name, Msg, Mod, NState, Debug); + {'EXIT', What} -> + terminate(What, Name, Msg, Mod, State, Debug); + _ -> + terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) + end. + +reply(Name, {To, Tag}, Reply, State, Debug) -> + reply({To, Tag}, Reply), + sys:handle_debug(Debug, {?MODULE, print_event}, Name, + {out, Reply, To, State} ). + + +%%----------------------------------------------------------------- +%% Callback functions for system messages handling. +%%----------------------------------------------------------------- +system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> + loop(Parent, Name, State, Mod, Time, Queue, Debug). + +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> + terminate(Reason, Name, [], Mod, State, Debug). + +system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> + case catch Mod:code_change(OldVsn, State, Extra) of + {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; + Else -> Else + end. + +%%----------------------------------------------------------------- +%% Format debug messages. Print them as the call-back module sees +%% them, not as the real erlang messages. Use trace for that. +%%----------------------------------------------------------------- +print_event(Dev, {in, Msg}, Name) -> + case Msg of + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~p got call ~p from ~w~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~p got cast ~p~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) + end; +print_event(Dev, {out, Msg, To, State}, Name) -> + io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", + [Name, Msg, To, State]); +print_event(Dev, {noreply, State}, Name) -> + io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); +print_event(Dev, Event, Name) -> + io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]). + + +%%% --------------------------------------------------- +%%% Terminate the server. +%%% --------------------------------------------------- + +terminate(Reason, Name, Msg, Mod, State, Debug) -> + case catch Mod:terminate(Reason, State) of + {'EXIT', R} -> + error_info(R, Name, Msg, State, Debug), + exit(R); + _ -> + case Reason of + normal -> + exit(normal); + shutdown -> + exit(shutdown); + _ -> + error_info(Reason, Name, Msg, State, Debug), + exit(Reason) + end + end. + +error_info(_Reason, application_controller, _Msg, _State, _Debug) -> + %% OTP-5811 Don't send an error report if it's the system process + %% application_controller which is terminating - let init take care + %% of it instead + ok; +error_info(Reason, Name, Msg, State, Debug) -> + Reason1 = + case Reason of + {undef,[{M,F,A}|MFAs]} -> + case code:is_loaded(M) of + false -> + {'module could not be loaded',[{M,F,A}|MFAs]}; + _ -> + case erlang:function_exported(M, F, length(A)) of + true -> + Reason; + false -> + {'function not exported',[{M,F,A}|MFAs]} + end + end; + _ -> + Reason + end, + format("** Generic server ~p terminating \n" + "** Last message in was ~p~n" + "** When Server state == ~p~n" + "** Reason for termination == ~n** ~p~n", + [Name, Msg, State, Reason1]), + sys:print_log(Debug), + ok. + +%%% --------------------------------------------------- +%%% Misc. functions. +%%% --------------------------------------------------- + +opt(Op, [{Op, Value}|_]) -> + {ok, Value}; +opt(Op, [_|Options]) -> + opt(Op, Options); +opt(_, []) -> + false. + +debug_options(Name, Opts) -> + case opt(debug, Opts) of + {ok, Options} -> dbg_options(Name, Options); + _ -> dbg_options(Name, []) + end. + +dbg_options(Name, []) -> + Opts = + case init:get_argument(generic_debug) of + error -> + []; + _ -> + [log, statistics] + end, + dbg_opts(Name, Opts); +dbg_options(Name, Opts) -> + dbg_opts(Name, Opts). + +dbg_opts(Name, Opts) -> + case catch sys:debug_options(Opts) of + {'EXIT',_} -> + format("~p: ignoring erroneous debug options - ~p~n", + [Name, Opts]), + []; + Dbg -> + Dbg + end. + +get_proc_name(Pid) when is_pid(Pid) -> + Pid; +get_proc_name({local, Name}) -> + case process_info(self(), registered_name) of + {registered_name, Name} -> + Name; + {registered_name, _Name} -> + exit(process_not_registered); + [] -> + exit(process_not_registered) + end; +get_proc_name({global, Name}) -> + case global:safe_whereis_name(Name) of + undefined -> + exit(process_not_registered_globally); + Pid when Pid =:= self() -> + Name; + _Pid -> + exit(process_not_registered_globally) + end. + +get_parent() -> + case get('$ancestors') of + [Parent | _] when is_pid(Parent)-> + Parent; + [Parent | _] when is_atom(Parent)-> + name_to_pid(Parent); + _ -> + exit(process_was_not_started_by_proc_lib) + end. + +name_to_pid(Name) -> + case whereis(Name) of + undefined -> + case global:safe_whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +%%----------------------------------------------------------------- +%% Status information +%%----------------------------------------------------------------- +format_status(Opt, StatusData) -> + [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = + StatusData, + NameTag = if is_pid(Name) -> + pid_to_list(Name); + is_atom(Name) -> + Name + end, + Header = lists:concat(["Status for generic server ", NameTag]), + Log = sys:get_debug(log, Debug, []), + Specfic = + case erlang:function_exported(Mod, format_status, 2) of + true -> + case catch Mod:format_status(Opt, [PDict, State]) of + {'EXIT', _} -> [{data, [{"State", State}]}]; + Else -> Else + end; + _ -> + [{data, [{"State", State}]}] + end, + [{header, Header}, + {data, [{"Status", SysState}, + {"Parent", Parent}, + {"Logged events", Log}, + {"Queued messages", queue:to_list(Queue)}]} | + Specfic]. diff --git a/src/rabbit.erl b/src/rabbit.erl index 9831b0c460..c513ddd791 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -87,7 +87,8 @@ stop() -> stop_and_halt() -> spawn(fun () -> SleepTime = 1000, - rabbit_log:info("Stop-and-halt request received; halting in ~p milliseconds~n", + rabbit_log:info("Stop-and-halt request received; " + "halting in ~p milliseconds~n", [SleepTime]), timer:sleep(SleepTime), init:stop() @@ -282,9 +283,13 @@ insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), + {ok, [DefaultConfigurationPerm, DefaultMessagingPerm]} = + application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), - ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost), + ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, + DefaultConfigurationPerm, + DefaultMessagingPerm), ok. start_builtin_amq_applications() -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b73090fc44..394eb2b124 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -34,11 +34,12 @@ -include("rabbit.hrl"). -export([check_login/2, user_pass_login/2, - check_vhost_access/2]). + check_vhost_access/2, check_resource_access/3]). -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). --export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]). --export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]). +-export([add_vhost/1, delete_vhost/1, list_vhosts/0]). +-export([set_permissions/4, clear_permissions/2, + list_vhost_permissions/1, list_user_permissions/1]). %%---------------------------------------------------------------------------- @@ -47,6 +48,8 @@ -spec(check_login/2 :: (binary(), binary()) -> user()). -spec(user_pass_login/2 :: (username(), password()) -> user()). -spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). +-spec(check_resource_access/3 :: + (username(), r(atom()), non_neg_integer()) -> 'ok'). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -55,10 +58,12 @@ -spec(add_vhost/1 :: (vhost()) -> 'ok'). -spec(delete_vhost/1 :: (vhost()) -> 'ok'). -spec(list_vhosts/0 :: () -> [vhost()]). --spec(list_vhost_users/1 :: (vhost()) -> [username()]). --spec(list_user_vhosts/1 :: (username()) -> [vhost()]). --spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok'). --spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok'). +-spec(set_permissions/4 :: (username(), vhost(), regexp(), regexp()) -> 'ok'). +-spec(clear_permissions/2 :: (username(), vhost()) -> 'ok'). +-spec(list_vhost_permissions/1 :: + (vhost()) -> [{username(), regexp(), regexp()}]). +-spec(list_user_permissions/1 :: + (username()) -> [{vhost(), regexp(), regexp()}]). -endif. @@ -112,9 +117,9 @@ internal_lookup_vhost_access(Username, VHostPath) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:match_object( - #user_vhost{username = Username, - virtual_host = VHostPath}) of + case mnesia:read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of [] -> not_found; [R] -> {ok, R} end @@ -131,13 +136,47 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. +check_resource_access(Username, + R = #resource{kind = exchange, name = <<"">>}, + Permission) -> + check_resource_access(Username, + R#resource{name = <<"amq.default">>}, + Permission); +check_resource_access(_Username, + #resource{name = <<"amq.gen",_/binary>>}, + _Permission) -> + ok; +check_resource_access(Username, + R = #resource{virtual_host = VHostPath, name = Name}, + Permission) -> + Res = case mnesia:dirty_read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> + false; + [#user_permission{permission = P}] -> + case regexp:match( + binary_to_list(Name), + binary_to_list(element(Permission, P))) of + {match, _, _} -> true; + nomatch -> false + end + end, + if Res -> ok; + true -> rabbit_misc:protocol_error( + access_refused, "access to ~s refused for user '~s'", + [rabbit_misc:rs(R), Username]) + end. + add_user(Username, Password) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({user, Username}) of + case mnesia:wread({rabbit_user, Username}) of [] -> - ok = mnesia:write(#user{username = Username, - password = Password}); + ok = mnesia:write(rabbit_user, + #user{username = Username, + password = Password}, + write); _ -> mnesia:abort({user_already_exists, Username}) end @@ -150,8 +189,17 @@ delete_user(Username) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:delete({user, Username}), - ok = mnesia:delete({user_vhost, Username}) + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permissions, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok end)), rabbit_log:info("Deleted user ~p~n", [Username]), R. @@ -161,24 +209,28 @@ change_password(Username, Password) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:write(#user{username = Username, - password = Password}) + ok = mnesia:write(rabbit_user, + #user{username = Username, + password = Password}, + write) end)), rabbit_log:info("Changed password for user ~p~n", [Username]), R. list_users() -> - mnesia:dirty_all_keys(user). + mnesia:dirty_all_keys(rabbit_user). lookup_user(Username) -> - rabbit_misc:dirty_read({user, Username}). + rabbit_misc:dirty_read({rabbit_user, Username}). add_vhost(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({vhost, VHostPath}) of + case mnesia:wread({rabbit_vhost, VHostPath}) of [] -> - ok = mnesia:write(#vhost{virtual_host = VHostPath}), + ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, []) || @@ -186,6 +238,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> @@ -218,53 +272,78 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun (Username) -> - ok = unmap_user_vhost(Username, VHostPath) + lists:foreach(fun ({Username, _, _}) -> + ok = clear_permissions(Username, VHostPath) end, - list_vhost_users(VHostPath)), - ok = mnesia:delete({vhost, VHostPath}), + list_vhost_permissions(VHostPath)), + ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. list_vhosts() -> - mnesia:dirty_all_keys(vhost). + mnesia:dirty_all_keys(rabbit_vhost). -list_vhost_users(VHostPath) -> - [Username || - #user_vhost{username = Username} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_vhost( - VHostPath, - fun () -> mnesia:index_read(user_vhost, VHostPath, - #user_vhost.virtual_host) - end))]. - -list_user_vhosts(Username) -> - [VHostPath || - #user_vhost{virtual_host = VHostPath} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> mnesia:read({user_vhost, Username}) end))]. +validate_regexp(RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case regexp:parse(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) + end. -map_user_vhost(Username, VHostPath) -> +set_permissions(Username, VHostPath, ConfigurationPerm, MessagingPerm) -> + validate_regexp(ConfigurationPerm), + validate_regexp(MessagingPerm), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, - fun () -> - ok = mnesia:write( - #user_vhost{username = Username, - virtual_host = VHostPath}) + fun () -> ok = mnesia:write( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configuration = ConfigurationPerm, + messaging = MessagingPerm}}, + write) end)). -unmap_user_vhost(Username, VHostPath) -> +clear_permissions(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, fun () -> - ok = mnesia:delete_object( - #user_vhost{username = Username, - virtual_host = VHostPath}) + ok = mnesia:delete({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) end)). +list_vhost_permissions(VHostPath) -> + [{Username, ConfigurationPerm, MessagingPerm} || + {Username, _, ConfigurationPerm, MessagingPerm} <- + list_permissions(rabbit_misc:with_vhost( + VHostPath, match_user_vhost('_', VHostPath)))]. + +list_user_permissions(Username) -> + [{VHostPath, ConfigurationPerm, MessagingPerm} || + {_, VHostPath, ConfigurationPerm, MessagingPerm} <- + list_permissions(rabbit_misc:with_user( + Username, match_user_vhost(Username, '_')))]. + +list_permissions(QueryThunk) -> + [{Username, VHostPath, ConfigurationPerm, MessagingPerm} || + #user_permission{user_vhost = #user_vhost{username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configuration = ConfigurationPerm, + messaging = MessagingPerm}} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction(QueryThunk)]. + +match_user_vhost(Username, VHostPath) -> + fun () -> mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = '_'}, + read) + end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b9abb2990..3018582f94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,13 +37,13 @@ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([notify_sent/2, unblock/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). --import(gen_server). +-import(gen_server2). -import(lists). -import(queue). @@ -91,15 +91,17 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -126,7 +128,7 @@ recover_durable_queues() -> R = rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(durable_queues), + <- mnesia:table(rabbit_durable_queue), node(Pid) == Node])) end), Queues = lists:map(fun start_queue_process/1, R), @@ -144,7 +146,7 @@ declare(QueueName, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> ok = store_queue(Q), ok = add_default_binding(Q), Q; @@ -157,11 +159,11 @@ declare(QueueName, Durable, AutoDelete, Args) -> end. store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(durable_queues, Q, write), - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_durable_queue, Q, write), + ok = mnesia:write(rabbit_queue, Q, write), ok; store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_queue, Q, write), ok. start_queue_process(Q) -> @@ -175,7 +177,7 @@ add_default_binding(#amqqueue{name = QueueName}) -> ok. lookup(Name) -> - rabbit_misc:dirty_read({amqqueue, Name}). + rabbit_misc:dirty_read({rabbit_queue, Name}). with(Name, F, E) -> case lookup(Name) of @@ -192,15 +194,16 @@ with_or_die(Name, F) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_queue, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server:call(QPid, info). + gen_server2:call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server:call(QPid, {info, Items}) of + case gen_server2:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -209,45 +212,45 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). + lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server:cast(QPid, {deliver, Txn, Message}), + gen_server2:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> - gen_server:cast(QPid, {redeliver, Messages}). + gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> @@ -256,41 +259,50 @@ notify_down_all(QPids, ChPid) -> %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server:cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). + +unblock(QPid, ChPid) -> + gen_server2:cast(QPid, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; [Q] -> ok = delete_queue(Q), - ok = mnesia:delete({durable_queues, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), ok end end). delete_queue(#amqqueue{name = QueueName}) -> ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({amqqueue, QueueName}), + ok = mnesia:delete({rabbit_queue, QueueName}), ok. on_node_down(Node) -> @@ -300,7 +312,7 @@ on_node_down(Node) -> fun (Q, Acc) -> ok = delete_queue(Q), Acc end, ok, qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(amqqueue), + <- mnesia:table(rabbit_queue), node(Pid) == Node])) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6282a8fb6f..c390b2b7e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). @@ -62,9 +62,10 @@ %% These are held in our process dictionary -record(cr, {consumers, ch_pid, + limiter_pid, monitor_ref, unacked_messages, - is_overload_protection_active, + is_limit_active, unsent_message_count}). -define(INFO_KEYS, @@ -85,7 +86,7 @@ %%---------------------------------------------------------------------------- start_link(Q) -> - gen_server:start_link(?MODULE, Q, []). + gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- @@ -131,7 +132,7 @@ ch_record(ChPid) -> ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), - is_overload_protection_active = false, + is_limit_active = false, unsent_message_count = 0}, put(Key, C), C; @@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Active, - unsent_message_count = Count}) -> - {Result, NewActive} = - if - not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true}; - Active and (Count == 0) -> - {unblock_ch, false}; - true -> - {ok, Active} - end, - store_ch_record(C#cr{is_overload_protection_active = NewActive}), - Result. +is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> + Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + +ch_record_state_transition(OldCR, NewCR) -> + BlockedOld = is_ch_blocked(OldCR), + BlockedNew = is_ch_blocked(NewCR), + if BlockedOld andalso not(BlockedNew) -> unblock; + BlockedNew andalso not(BlockedOld) -> block; + true -> ok + end. deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, @@ -168,26 +165,37 @@ deliver_immediately(Message, Delivered, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, + C = #cr{limiter_pid = LimiterPid, + unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; + case not(AckRequired) orelse rabbit_limiter:can_send( + LimiterPid, self()) of + true -> + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId + 1}}; + false -> + store_ch_record(C#cr{is_limit_active = true}), + NewConsumers = block_consumers(ChPid, RoundRobinTail), + deliver_immediately(Message, Delivered, + State#q{round_robin = NewConsumers}) + end; {empty, _} -> - not_offered + {not_offered, State} end. attempt_delivery(none, Message, State) -> @@ -198,8 +206,8 @@ attempt_delivery(none, Message, State) -> persist_message(none, qname(State), Message), persist_delivery(qname(State), Message, false), {true, State1}; - not_offered -> - {false, State} + {not_offered, State1} -> + {false, State1} end; attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), @@ -237,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) -> (CP /= ChPid) or (CT /= ConsumerTag) end, queue:to_list(RoundRobin))). -possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, - State = #q{round_robin = RoundRobin}) -> - case update_store_and_maybe_block_ch(C) of - ok -> +possibly_unblock(State, ChPid, Update) -> + case lookup_ch(ChPid) of + not_found -> State; - unblock_ch -> - run_poke_burst(State#q{round_robin = - unblock_consumers(ChPid, Consumers, RoundRobin)}) + C -> + NewC = Update(C), + store_ch_record(NewC), + case ch_record_state_transition(C, NewC) of + ok -> State; + unblock -> NewRR = unblock_consumers(ChPid, + NewC#cr.consumers, + State#q.round_robin), + run_poke_burst(State#q{round_robin = NewRR}) + end end. - + check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> {continue, State}; check_auto_delete(State = #q{has_had_consumers = false}) -> @@ -301,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, {stop, normal, NewState} end end. - + cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> none; cancel_holder(_ChPid, _ConsumerTag, Holder) -> @@ -334,8 +348,8 @@ run_poke_burst(MessageBuffer, State) -> {offered, false, NewState} -> persist_auto_ack(qname(State), Message), run_poke_burst(BufferTail, NewState); - not_offered -> - State#q{message_buffer = MessageBuffer} + {not_offered, NewState} -> + NewState#q{message_buffer = MessageBuffer} end; {empty, _} -> State#q{message_buffer = MessageBuffer} @@ -500,8 +514,8 @@ i(messages_uncommitted, _) -> #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); + messages_unacknowledged, + messages_uncommitted]]); i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); @@ -552,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) -> handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), handle_ch_down(ChPid, State); handle_call({basic_get, ChPid, NoAck}, _From, @@ -586,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, - ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, + ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, exclusive_consumer = ExistingHolder, round_robin = RoundRobin}) -> @@ -601,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers]}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}), + if Consumers == [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = if @@ -622,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers} -> + C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), - C1 = C#cr{consumers = NewConsumers}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = NewConsumers}), + if NewConsumers == [] -> + ok = rabbit_limiter:unregister(LimiterPid, self()); + true -> + ok + end, ok = maybe_send_reply(ChPid, OkMsg), case check_auto_delete( State#q{exclusive_consumer = cancel_holder(ChPid, @@ -730,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; +handle_cast({unblock, ChPid}, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end)); + handle_cast({notify_sent, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> noreply(State); - T = #cr{unsent_message_count =Count} -> - noreply(possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)) - end. + noreply( + possibly_unblock(State, ChPid, + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - 1} + end)); + +handle_cast({limit, ChPid, LimiterPid}, State) -> + noreply( + possibly_unblock( + State, ChPid, + fun (C = #cr{consumers = Consumers, + limiter_pid = OldLimiterPid, + is_limit_active = Limited}) -> + if Consumers =/= [] andalso OldLimiterPid == undefined -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, + NewLimited = Limited andalso LimiterPid =/= undefined, + C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -758,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]); + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ca2782c77d..5a1c095266 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,18 +33,23 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). +-behaviour(gen_server2). + -export([start_link/4, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -%% callbacks --export([init/2, handle_message/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, +-record(ch, {state, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping}). +-define(HIBERNATE_AFTER, 1000). + +-define(MAX_PERMISSION_CACHE_SIZE, 12). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -62,111 +67,120 @@ %%---------------------------------------------------------------------------- start_link(ReaderPid, WriterPid, Username, VHost) -> - buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, - Username, VHost]). + {ok, Pid} = gen_server2:start_link( + ?MODULE, [ReaderPid, WriterPid, Username, VHost], []), + Pid. do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - Pid ! {method, Method, Content}, - ok. + gen_server2:cast(Pid, {method, Method, Content}). shutdown(Pid) -> - Pid ! terminate, - ok. + gen_server2:cast(Pid, terminate). send_command(Pid, Msg) -> - Pid ! {command, Msg}, - ok. + gen_server2:cast(Pid, {command, Msg}). deliver(Pid, ConsumerTag, AckRequired, Msg) -> - Pid ! {deliver, ConsumerTag, AckRequired, Msg}, - ok. + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, - ok. + gen_server2:cast(Pid, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- -init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> +init([ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), - %% this is bypassing the proxy so alarms can "jump the queue" and - %% be handled promptly rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - #ch{state = starting, - proxy_pid = ProxyPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}. - -handle_message({method, Method, Content}, State) -> + {ok, #ch{state = starting, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new()}}. + +handle_call(_Request, _From, State) -> + noreply(State). + +handle_cast({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; + noreply(NewState); {noreply, NewState} -> - NewState; + noreply(NewState); stop -> - exit(normal) + {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - terminate({amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State); + {stop, {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, State}; exit:normal -> - terminate(normal, State); + {stop, normal, State}; _:Reason -> - terminate({Reason, erlang:get_stacktrace()}, State) + {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_message(terminate, State) -> - terminate(normal, State); +handle_cast(terminate, State) -> + {stop, normal, State}; -handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - State; + noreply(State); -handle_message({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{proxy_pid = ProxyPid, - writer_pid = WriterPid, - next_tag = DeliveryTag}) -> +handle_cast({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, ProxyPid, - true, ConsumerTag, DeliveryTag, Msg), - State1#ch{next_tag = DeliveryTag + 1}; + ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), + noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_message({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> + ok = clear_permission_cache(), ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - State; + noreply(State). -handle_message({'EXIT', _Pid, Reason}, State) -> - terminate(Reason, State); +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; -handle_message(Other, State) -> - terminate({unexpected_channel_message, Other}, State). +handle_info(timeout, State) -> + ok = clear_permission_cache(), + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). -%%--------------------------------------------------------------------------- +terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, + state = terminating}) -> + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid); -terminate(Reason, State = #ch{writer_pid = WriterPid}) -> +terminate(Reason, State = #ch{writer_pid = WriterPid, + limiter_pid = LimiterPid}) -> Res = notify_queues(internal_rollback(State)), case Reason of normal -> ok = Res; _ -> ok end, rabbit_writer:shutdown(WriterPid), - exit(Reason). + rabbit_limiter:shutdown(LimiterPid). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%--------------------------------------------------------------------------- + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -190,6 +204,32 @@ return_queue_declare_ok(State, NoWait, Q) -> {reply, Reply, NewState} end. +check_resource_access(Username, Resource, Perm) -> + V = {Resource, Perm}, + Cache = case get(permission_cache) of + undefined -> []; + Other -> Other + end, + CacheTail = + case lists:member(V, Cache) of + true -> lists:delete(V, Cache); + false -> ok = rabbit_access_control:check_resource_access( + Username, Resource, Perm), + lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1) + end, + put(permission_cache, [V | CacheTail]), + ok. + +clear_permission_cache() -> + erase(permission_cache), + ok. + +check_configuration_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.configuration). + +check_messaging_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.messaging). + expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); @@ -248,7 +288,6 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = notify_queues(internal_rollback(State)), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - ok = rabbit_writer:shutdown(WriterPid), stop; handle_method(#'access.request'{},_, State) -> @@ -260,6 +299,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{ virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_messaging_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. @@ -273,7 +313,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey), State)}; + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -286,9 +326,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(State#ch.proxy_pid, TxnKey, Acked), + Participants = ack(TxnKey, Acked), {noreply, case TxnKey of - none -> State#ch{unacked_message_q = Remaining}; + none -> ok = notify_limiter(State#ch.limiter_pid, Acked), + State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( @@ -299,12 +340,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, + _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_messaging_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -330,12 +372,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - reader_pid = ReaderPid, + _, State = #ch{ reader_pid = ReaderPid, + limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_messaging_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_misc:binstring_guid("amq.ctag"); @@ -349,7 +392,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -380,8 +423,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -402,7 +444,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( - Q, ProxyPid, ConsumerTag, + Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of @@ -414,13 +456,34 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{}, _, State) -> - %% FIXME: Need to implement QOS - {reply, #'basic.qos_ok'{}, State}; +handle_method(#'basic.qos'{global = true}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "global=true", []); + +handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> + rabbit_misc:protocol_error(not_implemented, + "prefetch_size!=0 (~w)", [Size]); + +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{ limiter_pid = LimiterPid }) -> + NewLimiterPid = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> + undefined; + {undefined, _} -> + LPid = rabbit_limiter:start_link(self()), + ok = limit_queues(LPid, State), + LPid; + {_, 0} -> + ok = rabbit_limiter:shutdown(LimiterPid), + ok = limit_queues(undefined, State), + undefined; + {_, _} -> + LimiterPid + end, + ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -429,14 +492,13 @@ handle_method(#'basic.recover'{requeue = true}, %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), ProxyPid) + QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> lists:foreach( @@ -454,8 +516,7 @@ handle_method(#'basic.recover'{requeue = false}, %% %% FIXME: should we allocate a fresh DeliveryTag? ok = internal_deliver( - WriterPid, ProxyPid, - false, ConsumerTag, DeliveryTag, + WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), %% No answer required, apparently! @@ -476,6 +537,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configuration_permitted(ExchangeName, State), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -495,6 +557,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configuration_permitted(ExchangeName, State), X = rabbit_exchange:lookup_or_die(ExchangeName), ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), return_ok(State, NoWait, #'exchange.declare_ok'{}); @@ -504,6 +567,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch { virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configuration_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> rabbit_misc:protocol_error( @@ -554,9 +618,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configuration_permitted(QueueName, State), Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args)); - Other -> Other + Other = #amqqueue{name = QueueName} -> + check_configuration_permitted(QueueName, State), + Other end, return_queue_declare_ok(State, NoWait, Q); @@ -565,6 +632,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), + check_configuration_permitted(QueueName, State), Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); @@ -575,6 +643,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, }, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_configuration_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of @@ -611,6 +680,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_messaging_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:purge(Q) end), @@ -660,9 +730,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_configuration_permitted(QueueName, State), ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configuration_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, queue_not_found} -> rabbit_misc:protocol_error( @@ -744,10 +816,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. -ack(ProxyPid, TxnKey, UAQ) -> +ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), [QPid | L] end, [], UAQ). @@ -762,7 +834,9 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> new_tx(State); + ok -> ok = notify_limiter(State#ch.limiter_pid, + State#ch.uncommitted_ack_q), + new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. @@ -799,19 +873,37 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all( - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end], - ProxyPid). +notify_queues(#ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). + +limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). + +consumer_queues(Consumers) -> + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end]. + +%% tell the limiter about the number of acks that have been received +%% for messages delivered to subscribed consumers, but not acks for +%% messages sent in a response to a basic.get (identified by their +%% 'none' consumer tag) +notify_limiter(undefined, _Acked) -> + ok; +notify_limiter(LimiterPid, Acked) -> + case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, queue:to_list(Acked)) of + 0 -> ok; + Count -> rabbit_limiter:ack(LimiterPid, Count) + end. is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> @@ -819,7 +911,8 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n", + Other -> rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", [Other]), false end. @@ -829,7 +922,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, +internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -841,6 +934,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, ChPid, M, Content); + WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index ecc285a57f..293cd79751 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -57,7 +57,7 @@ start() -> true -> ok; false -> io:format("...done.~n") end, - init:stop(); + halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> error("invalid command '~s'", [lists:flatten( @@ -114,10 +114,10 @@ Available commands: delete_vhost <VHostPath> list_vhosts - map_user_vhost <UserName> <VHostPath> - unmap_user_vhost <UserName> <VHostPath> - list_user_vhosts <UserName> - list_vhost_users <VHostPath> + set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> + clear_permissions [-p <VHostPath>] <UserName> + list_permissions [-p <VHostPath>] + list_user_permissions <UserName> list_queues [-p <VHostPath>] [<QueueInfoItem> ...] list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] @@ -138,7 +138,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, +arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -148,7 +148,7 @@ auto_delete, arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing exchange name, routing key, queue name and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [pid, address, port, +<ConnectionInfoItem> must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address and peer_port. @@ -223,33 +223,23 @@ action(list_vhosts, Node, [], Inform) -> Inform("Listing vhosts", []), display_list(call(Node, {rabbit_access_control, list_vhosts, []})); -action(map_user_vhost, Node, Args = [_Username, _VHostPath], Inform) -> - Inform("Mapping user ~p to vhost ~p", Args), - call(Node, {rabbit_access_control, map_user_vhost, Args}); - -action(unmap_user_vhost, Node, Args = [_Username, _VHostPath], Inform) -> - Inform("Unmapping user ~p from vhost ~p", Args), - call(Node, {rabbit_access_control, unmap_user_vhost, Args}); - -action(list_user_vhosts, Node, Args = [_Username], Inform) -> - Inform("Listing vhosts for user ~p", Args), - display_list(call(Node, {rabbit_access_control, list_user_vhosts, Args})); - -action(list_vhost_users, Node, Args = [_VHostPath], Inform) -> - Inform("Listing users for vhosts ~p", Args), - display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})); +action(list_user_permissions, Node, Args = [_Username], Inform) -> + Inform("Listing permissions for user ~p", Args), + display_list(call(Node, {rabbit_access_control, list_user_permissions, + Args})); action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), - {VHostArg, RemainingArgs} = parse_vhost_flag(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), + {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), + ArgAtoms = list_replace(node, pid, + default_if_empty(RemainingArgs, [name, messages])), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); action(list_exchanges, Node, Args, Inform) -> Inform("Listing exchanges", []), - {VHostArg, RemainingArgs} = parse_vhost_flag(Args), + {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), ArgAtoms = default_if_empty(RemainingArgs, [name, type]), display_info_list(rpc_call(Node, rabbit_exchange, info_all, [VHostArg, ArgAtoms]), @@ -257,7 +247,7 @@ action(list_exchanges, Node, Args, Inform) -> action(list_bindings, Node, Args, Inform) -> Inform("Listing bindings", []), - {VHostArg, _} = parse_vhost_flag(Args), + {VHostArg, _} = parse_vhost_flag_bin(Args), InfoKeys = [exchange_name, routing_key, queue_name, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || @@ -267,18 +257,41 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port]), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, peer_port])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), - ArgAtoms). + ArgAtoms); + +action(Command, Node, Args, Inform) -> + {VHost, RemainingArgs} = parse_vhost_flag(Args), + action(Command, Node, VHost, RemainingArgs, Inform). + +action(set_permissions, Node, VHost, [Username, CPerm, MPerm], Inform) -> + Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), + call(Node, {rabbit_access_control, set_permissions, + [Username, VHost, CPerm, MPerm]}); + +action(clear_permissions, Node, VHost, [Username], Inform) -> + Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), + call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); + +action(list_permissions, Node, VHost, [], Inform) -> + Inform("Listing permissions in vhost ~p", [VHost]), + display_list(call(Node, {rabbit_access_control, list_vhost_permissions, + [VHost]})). parse_vhost_flag(Args) when is_list(Args) -> - case Args of - ["-p", VHost | RemainingArgs] -> - {list_to_binary(VHost), RemainingArgs}; - RemainingArgs -> - {<<"/">>, RemainingArgs} - end. + case Args of + ["-p", VHost | RemainingArgs] -> + {VHost, RemainingArgs}; + RemainingArgs -> + {"/", RemainingArgs} + end. + +parse_vhost_flag_bin(Args) -> + {VHost, RemainingArgs} = parse_vhost_flag(Args), + {list_to_binary(VHost), RemainingArgs}. default_if_empty(List, Default) when is_list(List) -> if List == [] -> @@ -288,29 +301,26 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach( - fun (Result) -> - io:fwrite( - lists:flatten( - rabbit_misc:intersperse( - "\t", - [format_info_item(Result, X) || X <- InfoItemKeys]))), - io:nl() - end, - Results), + lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || + X <- InfoItemKeys]) + end, Results), ok; - display_info_list(Other, _) -> Other. +display_row(Row) -> + io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), + io:nl(). + format_info_item(Items, Key) -> {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), case Info of {_, #resource{name = Name}} -> url_encode(Name); - {Key, IpAddress} when Key =:= address; Key =:= peer_address - andalso is_tuple(IpAddress) -> - inet_parse:ntoa(IpAddress); + _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> + inet_parse:ntoa(Value); + _ when is_pid(Value) -> + atom_to_list(node(Value)); _ when is_binary(Value) -> url_encode(Value); _ -> @@ -318,8 +328,10 @@ format_info_item(Items, Key) -> end. display_list(L) when is_list(L) -> - lists:foreach(fun (I) -> - io:format("~s~n", [binary_to_list(I)]) + lists:foreach(fun (I) when is_binary(I) -> + io:format("~s~n", [url_encode(I)]); + (I) when is_tuple(I) -> + display_row([url_encode(V) || V <- tuple_to_list(I)]) end, lists:sort(L)), ok; @@ -357,3 +369,6 @@ url_encode_char([], Acc) -> d2h(N) when N<10 -> N+$0; d2h(N) -> N+$a-10. +list_replace(Find, Replace, List) -> + [case X of Find -> Replace; _ -> X end || X <- List]. + diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 925c335cee..19efd9fc22 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,11 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/2]). + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_bindings_for_queue/1]). --export([check_type/1, assert_type/2, topic_matches/2]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -77,7 +77,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -88,6 +88,7 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -106,16 +107,18 @@ recover() -> fun () -> mnesia:foldl( fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), + ok = mnesia:write(rabbit_exchange, Exchange, write), Acc - end, ok, durable_exchanges), + end, ok, rabbit_durable_exchange), mnesia:foldl( fun (Route, Acc) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(Route), - ok = mnesia:write(ReverseRoute), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write), Acc - end, ok, durable_routes), + end, ok, rabbit_durable_route), ok end). @@ -127,11 +130,11 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> ok = mnesia:write(Exchange), + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), if Durable -> - ok = mnesia:write( - durable_exchanges, Exchange, write); + ok = mnesia:write(rabbit_durable_exchange, + Exchange, write); true -> ok end, Exchange; @@ -145,6 +148,8 @@ check_type(<<"direct">>) -> direct; check_type(<<"topic">>) -> topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]). @@ -158,7 +163,7 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> [rabbit_misc:rs(Name), ActualType, RequiredType]). lookup(Name) -> - rabbit_misc:dirty_read({exchange, Name}). + rabbit_misc:dirty_read({rabbit_exchange, Name}). lookup_or_die(Name) -> case lookup(Name) of @@ -170,6 +175,7 @@ lookup_or_die(Name) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). map(VHostPath, F) -> @@ -211,64 +217,80 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey}) -> + routing_key = RoutingKey, + content = Content}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey), + QPids = route(Exchange, RoutingKey, Content), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -%% +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); + +route(X = #exchange{type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> sort_arguments(H) + end, + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + match_routing_key(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + match_routing_key(X, RoutingKey). + %% TODO: Maybe this should be handled by a cursor instead. -route(#exchange{name = Name, type = topic}, RoutingKey) -> - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Name, - %% TODO: This causes a full scan for each entry - %% with the same exchange (see bug 19336) - topic_matches(BindingKey, RoutingKey)]), +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(#exchange{name = Name}, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - key = BindingKey}} <- + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- mnesia:dirty_match_object( + rabbit_route, #route{binding = #binding{exchange_name = Name, _ = '_'}}), - topic_matches(BindingKey, RoutingKey)] - end); - -route(X = #exchange{type = fanout}, _) -> - route_internal(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey) -> - route_internal(X, RoutingKey). + Match(Binding)] + end). -route_internal(#exchange{name = Name}, RoutingKey) -> +match_routing_key(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, queue_name = '$1', key = RoutingKey, _ = '_'}}, - lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> sets:fold( fun(Key, Acc) -> - case mnesia:dirty_read({amqqueue, Key}) of + case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end @@ -279,33 +301,37 @@ lookup_qpids(Queues) -> %% to be implemented for 0.91 ? delete_bindings_for_exchange(ExchangeName) -> - indexed_delete( - #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - fun delete_forward_routes/1, fun mnesia:delete_object/1). + [begin + ok = mnesia:delete_object(rabbit_reverse_route, + reverse_route(Route), write), + ok = delete_forward_routes(Route) + end || Route <- mnesia:match_object( + rabbit_route, + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + write)], + ok. delete_bindings_for_queue(QueueName) -> Exchanges = exchanges_for_queue(QueueName), - indexed_delete( - reverse_route(#route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - fun mnesia:delete_object/1, fun delete_forward_routes/1), [begin - [X] = mnesia:read({exchange, ExchangeName}), + ok = delete_forward_routes(reverse_route(Route)), + ok = mnesia:delete_object(rabbit_reverse_route, Route, write) + end || Route <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + [begin + [X] = mnesia:read({rabbit_exchange, ExchangeName}), ok = maybe_auto_delete(X) end || ExchangeName <- Exchanges], ok. -indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> - [begin - ok = ReverseDeleteFun(reverse_route(Route)), - ok = ForwardsDeleteFun(Route) - end || Route <- mnesia:match_object(Match)], - ok. - delete_forward_routes(Route) -> - ok = mnesia:delete_object(Route), - ok = mnesia:delete_object(durable_routes, Route, write). + ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_durable_route, Route, write). exchanges_for_queue(QueueName) -> MatchHead = reverse_route( @@ -314,17 +340,18 @@ exchanges_for_queue(QueueName) -> _ = '_'}}), sets:to_list( sets:from_list( - mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). has_bindings(ExchangeName) -> MatchHead = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, try - continue(mnesia:select(route, [{MatchHead, [], ['$_']}], 1, read)) + continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}], + 1, read)) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - case mnesia:match_object(MatchHead) of + case mnesia:match_object(rabbit_route, MatchHead, read) of [] -> false; [_|_] -> true end @@ -336,7 +363,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case mnesia:read({exchange, Exchange}) of + fun() -> case mnesia:read({rabbit_exchange, Exchange}) of [] -> {error, exchange_not_found}; [X] -> Fun(X) end @@ -345,7 +372,7 @@ call_with_exchange(Exchange, Fun) -> call_with_exchange_and_queue(Exchange, Queue, Fun) -> call_with_exchange( Exchange, - fun(X) -> case mnesia:read({amqqueue, Queue}) of + fun(X) -> case mnesia:read({rabbit_queue, Queue}) of [] -> {error, queue_not_found}; [Q] -> Fun(X, Q) end @@ -377,13 +404,15 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = Arguments}, + args = sort_arguments(Arguments)}, ok = case Durable of - true -> Fun(durable_routes, #route{binding = Binding}, write); + true -> Fun(rabbit_durable_route, + #route{binding = Binding}, write); false -> ok end, - [ok, ok] = [Fun(element(1, R), R, write) || - R <- tuple_to_list(route_with_reverse(Binding))], + {Route, ReverseRoute} = route_with_reverse(Binding), + ok = Fun(rabbit_route, Route, write), + ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. list_bindings(VHostPath) -> @@ -394,6 +423,7 @@ list_bindings(VHostPath) -> queue_name = QueueName, args = Arguments}} <- mnesia:dirty_match_object( + rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), _ = '_'}, @@ -429,6 +459,67 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and sync_binding/6 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. @@ -475,8 +566,8 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_bindings_for_exchange(ExchangeName), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}). + ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), + ok = mnesia:delete({rabbit_exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API @@ -492,7 +583,7 @@ list_exchange_bindings(ExchangeName) -> #route{binding = #binding{queue_name = QueueName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(Route)]. + <- mnesia:dirty_match_object(rabbit_route, Route)]. % Refactoring is left as an exercise for the reader list_queue_bindings(QueueName) -> @@ -502,4 +593,4 @@ list_queue_bindings(QueueName) -> #route{binding = #binding{exchange_name = ExchangeName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(Route)]. + <- mnesia:dirty_match_object(rabbit_route, Route)]. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 060bed4893..5c447792a2 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -95,13 +95,15 @@ collect_content(ChannelPid, MethodName) -> true -> rabbit_misc:protocol_error( command_invalid, - "expected content header for class ~w, got one for class ~w instead", + "expected content header for class ~w, " + "got one for class ~w instead", [ClassId, HeaderClassId]) end; _ -> rabbit_misc:protocol_error( command_invalid, - "expected content header for class ~w, got non content header frame instead", + "expected content header for class ~w, " + "got non content header frame instead", [ClassId]) end. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl new file mode 100644 index 0000000000..532be26d8e --- /dev/null +++ b/src/rabbit_limiter.erl @@ -0,0 +1,195 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_limiter). + +-behaviour(gen_server). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2]). +-export([start_link/1, shutdown/1]). +-export([limit/2, can_send/2, ack/2, register/2, unregister/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(maybe_pid() :: pid() | 'undefined'). + +-spec(start_link/1 :: (pid()) -> pid()). +-spec(shutdown/1 :: (maybe_pid()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). +-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-record(lim, {prefetch_count = 0, + ch_pid, + queues = dict:new(), % QPid -> {MonitorRef, Notify} + volume = 0}). +%% 'Notify' is a boolean that indicates whether a queue should be +%% notified of a change in the limit or volume that may allow it to +%% deliver more messages via the limiter's channel. + +%%---------------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------------- + +start_link(ChPid) -> + {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), + Pid. + +shutdown(undefined) -> + ok; +shutdown(LimiterPid) -> + unlink(LimiterPid), + gen_server2:cast(LimiterPid, shutdown). + +limit(undefined, 0) -> + ok; +limit(LimiterPid, PrefetchCount) -> + gen_server2:cast(LimiterPid, {limit, PrefetchCount}). + +%% Ask the limiter whether the queue can deliver a message without +%% breaching a limit +can_send(undefined, _QPid) -> + true; +can_send(LimiterPid, QPid) -> + rabbit_misc:with_exit_handler( + fun () -> true end, + fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end). + +%% Let the limiter know that the channel has received some acks from a +%% consumer +ack(undefined, _Count) -> ok; +ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). + +register(undefined, _QPid) -> ok; +register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). + +unregister(undefined, _QPid) -> ok; +unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([ChPid]) -> + {ok, #lim{ch_pid = ChPid} }. + +handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> + case limit_reached(State) of + true -> {reply, false, limit_queue(QPid, State)}; + false -> {reply, true, State#lim{volume = Volume + 1}} + end. + +handle_cast(shutdown, State) -> + {stop, normal, State}; + +handle_cast({limit, PrefetchCount}, State) -> + {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; + +handle_cast({ack, Count}, State = #lim{volume = Volume}) -> + NewVolume = if Volume == 0 -> 0; + true -> Volume - Count + end, + {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; + +handle_cast({register, QPid}, State) -> + {noreply, remember_queue(QPid, State)}; + +handle_cast({unregister, QPid}, State) -> + {noreply, forget_queue(QPid, State)}. + +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) -> + {noreply, forget_queue(QPid, State)}. + +terminate(_, _) -> + ok. + +code_change(_, State, _) -> + State. + +%%---------------------------------------------------------------------------- +%% Internal plumbing +%%---------------------------------------------------------------------------- + +maybe_notify(OldState, NewState) -> + case limit_reached(OldState) andalso not(limit_reached(NewState)) of + true -> notify_queues(NewState); + false -> NewState + end. + +limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> + Limit =/= 0 andalso Volume >= Limit. + +remember_queue(QPid, State = #lim{queues = Queues}) -> + case dict:is_key(QPid, Queues) of + false -> MRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; + true -> State + end. + +forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> + case dict:find(QPid, Queues) of + {ok, {MRef, _}} -> + true = erlang:demonitor(MRef), + ok = rabbit_amqqueue:unblock(QPid, ChPid), + State#lim{queues = dict:erase(QPid, Queues)}; + error -> State + end. + +limit_queue(QPid, State = #lim{queues = Queues}) -> + UpdateFun = fun ({MRef, _}) -> {MRef, true} end, + State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + +notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> + {QList, NewQueues} = + dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + (QPid, {MRef, true}, {L, D}) -> + {[QPid | L], dict:store(QPid, {MRef, false}, D)} + end, {[], Queues}, Queues), + case length(QList) of + 0 -> ok; + L -> + %% We randomly vary the position of queues in the list, + %% thus ensuring that each queue has an equal chance of + %% being notified first. + {L1, L2} = lists:split(random:uniform(L), QList), + [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], + ok + end, + State#lim{queues = NewQueues}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 85db50d79d..214c952834 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -237,7 +237,7 @@ filter_exit_map(F, L) -> with_user(Username, Thunk) -> fun () -> - case mnesia:read({user, Username}) of + case mnesia:read({rabbit_user, Username}) of [] -> mnesia:abort({no_such_user, Username}); [_U] -> @@ -247,7 +247,7 @@ with_user(Username, Thunk) -> with_vhost(VHostPath, Thunk) -> fun () -> - case mnesia:read({vhost, VHostPath}) of + case mnesia:read({rabbit_vhost, VHostPath}) of [] -> mnesia:abort({no_such_vhost, VHostPath}); [_V] -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index eebb38fa61..c0bf3d254e 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -100,33 +100,51 @@ force_reset() -> reset(true). %%-------------------------------------------------------------------- table_definitions() -> - [{user, [{disc_copies, [node()]}, - {attributes, record_info(fields, user)}]}, - {user_vhost, [{type, bag}, - {disc_copies, [node()]}, - {attributes, record_info(fields, user_vhost)}, - {index, [virtual_host]}]}, - {vhost, [{disc_copies, [node()]}, - {attributes, record_info(fields, vhost)}]}, - {rabbit_config, [{disc_copies, [node()]}]}, - {listener, [{type, bag}, - {attributes, record_info(fields, listener)}]}, - {durable_routes, [{disc_copies, [node()]}, - {record_name, route}, - {attributes, record_info(fields, route)}]}, - {route, [{type, ordered_set}, - {attributes, record_info(fields, route)}]}, - {reverse_route, [{type, ordered_set}, - {attributes, record_info(fields, reverse_route)}]}, - {durable_exchanges, [{disc_copies, [node()]}, - {record_name, exchange}, - {attributes, record_info(fields, exchange)}]}, - {exchange, [{attributes, record_info(fields, exchange)}]}, - {durable_queues, [{disc_copies, [node()]}, - {record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}]}, - {amqqueue, [{attributes, record_info(fields, amqqueue)}, - {index, [pid]}]}]. + [{rabbit_user, + [{record_name, user}, + {attributes, record_info(fields, user)}, + {disc_copies, [node()]}]}, + {rabbit_user_permission, + [{record_name, user_permission}, + {attributes, record_info(fields, user_permission)}, + {disc_copies, [node()]}]}, + {rabbit_vhost, + [{record_name, vhost}, + {attributes, record_info(fields, vhost)}, + {disc_copies, [node()]}]}, + {rabbit_config, + [{disc_copies, [node()]}]}, + {rabbit_listener, + [{record_name, listener}, + {attributes, record_info(fields, listener)}, + {type, bag}]}, + {rabbit_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {disc_copies, [node()]}]}, + {rabbit_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}]}, + {rabbit_reverse_route, + [{record_name, reverse_route}, + {attributes, record_info(fields, reverse_route)}, + {type, ordered_set}]}, + {rabbit_durable_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}, + {disc_copies, [node()]}]}, + {rabbit_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}]}, + {rabbit_durable_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {disc_copies, [node()]}]}, + {rabbit_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {index, [pid]}]}]. table_names() -> [Tab || {Tab, _} <- table_definitions()]. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 7f6eaa8e93..5e8edd53a1 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -50,7 +50,7 @@ start() -> case catch action(Command, Args, RpcTimeout) of ok -> io:format("done.~n"), - init:stop(); + halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> error("invalid command '~s'", [lists:flatten( diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 0fb869d3ca..30fb7b1f60 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -134,6 +134,7 @@ stop_tcp_listener(Host, Port) -> tcp_listener_started(IPAddress, Port) -> ok = mnesia:dirty_write( + rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), @@ -142,19 +143,20 @@ tcp_listener_started(IPAddress, Port) -> tcp_listener_stopped(IPAddress, Port) -> ok = mnesia:dirty_delete_object( + rabbit_listener, #listener{node = node(), protocol = tcp, host = tcp_host(IPAddress), port = Port}). active_listeners() -> - rabbit_misc:dirty_read_all(listener). + rabbit_misc:dirty_read_all(rabbit_listener). node_listeners(Node) -> - mnesia:dirty_read(listener, Node). + mnesia:dirty_read(rabbit_listener, Node). on_node_down(Node) -> - ok = mnesia:dirty_delete(listener, Node). + ok = mnesia:dirty_delete(rabbit_listener, Node). start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bb8c174cc1..a6094ebc58 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -173,7 +173,8 @@ setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of once -> - rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"), + rabbit_log:info("Enabling profiling for this connection, " + "and disabling for subsequent.~n"), rabbit_misc:set_config(profiling_enabled, false), fprof:trace(start); true -> @@ -404,7 +405,8 @@ wait_for_channel_termination(N, TimerRef) -> normal -> ok; _ -> rabbit_log:error( - "connection ~p, channel ~p - error while terminating:~n~p~n", + "connection ~p, channel ~p - " + "error while terminating:~n~p~n", [self(), Channel, Reason]) end, wait_for_channel_termination(N-1, TimerRef) diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ad653a2f2a..26d857bef0 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% than the non-immediate case below. {ok, lists:flatmap( fun ({Node, QPids}) -> - gen_server:cast( + gen_server2:cast( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}), QPids @@ -110,7 +110,7 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server:call( + try gen_server2:call( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}) catch diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index df2e71d9e6..ef390e4de6 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -444,17 +444,18 @@ test_user_management() -> {error, {no_such_vhost, _}} = control_action(delete_vhost, ["/testhost"]), {error, {no_such_user, _}} = - control_action(map_user_vhost, ["foo", "/"]), + control_action(set_permissions, ["foo", ".*", ".*"]), {error, {no_such_user, _}} = - control_action(unmap_user_vhost, ["foo", "/"]), + control_action(clear_permissions, ["foo"]), {error, {no_such_user, _}} = - control_action(list_user_vhosts, ["foo"]), + control_action(list_user_permissions, ["foo"]), {error, {no_such_vhost, _}} = - control_action(map_user_vhost, ["guest", "/testhost"]), - {error, {no_such_vhost, _}} = - control_action(unmap_user_vhost, ["guest", "/testhost"]), - {error, {no_such_vhost, _}} = - control_action(list_vhost_users, ["/testhost"]), + control_action(list_permissions, ["-p", "/testhost"]), + {error, {invalid_regexp, _, _}} = + control_action(set_permissions, ["guest", "+foo", ".*"]), + {error, {invalid_regexp, _, _}} = + control_action(set_permissions, ["guest", ".*", "+foo"]), + %% user creation ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = @@ -469,13 +470,16 @@ test_user_management() -> ok = control_action(list_vhosts, []), %% user/vhost mapping - ok = control_action(map_user_vhost, ["foo", "/testhost"]), - ok = control_action(map_user_vhost, ["foo", "/testhost"]), - ok = control_action(list_user_vhosts, ["foo"]), + ok = control_action(set_permissions, ["-p", "/testhost", + "foo", ".*", ".*"]), + ok = control_action(set_permissions, ["-p", "/testhost", + "foo", ".*", ".*"]), + ok = control_action(list_permissions, ["-p", "/testhost"]), + ok = control_action(list_user_permissions, ["foo"]), %% user/vhost unmapping - ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), - ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), + ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), + ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]), %% vhost deletion ok = control_action(delete_vhost, ["/testhost"]), @@ -484,7 +488,8 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), - ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(set_permissions, ["-p", "/testhost", + "foo", ".*", ".*"]), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion |
