diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-03-11 16:17:25 +0100 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2015-03-11 16:17:25 +0100 |
| commit | 17a5da8f085e4d4acb349fc41ceca0e923ac67d3 (patch) | |
| tree | 6d3b51561ea384f3b51f477b05d91237ebe1f3e7 | |
| parent | e03e7da3115c54e8a7ca54ad3cc2c4c74f7ed417 (diff) | |
| parent | 578cfc1916a4b6a8202b2f4698e35eb76942f061 (diff) | |
| download | rabbitmq-server-git-17a5da8f085e4d4acb349fc41ceca0e923ac67d3.tar.gz | |
Merge branch 'master' into stable
65 files changed, 3284 insertions, 1081 deletions
diff --git a/.gitignore b/.gitignore index 366e71ae44..1b1e92af4d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ erl_crash.dump /cover/ /dist/ /ebin/ +/etc/ /plugins/ /priv/plugins/ /deps.mk diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000000..69a4b4a437 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,51 @@ +## Overview + +RabbitMQ projects use pull requests to discuss, collaborate on and accept code contributions. +Pull requests is the primary place of discussing code changes. + +## How to Contribute + +The process is fairly standard: + + * Fork the repository or repositories you plan on contributing to + * Clone [RabbitMQ umbrella repository](https://github.com/rabbitmq/rabbitmq-public-umbrella) + * `cd umbrella`, `make co` + * Create a branch with a descriptive name in the relevant repositories + * Make your changes, run tests, commit with a [descriptive message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html), push to your fork + * Submit pull requests with an explanation what has been changed and **why** + * Submit a filled out and signed [Contributor Agreement](https://github.com/rabbitmq/ca#how-to-submit) if needed (see below) + * Be patient. We will get to your pull request eventually + +If what you are going to work on is a substantial change, please first ask the core team +of their opinion on [RabbitMQ mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users). + + +## (Brief) Code of Conduct + +In one line: don't be a dick. + +Be respectful to the maintainers and other contributors. Open source +contributors put long hours into developing projects and doing user +support. Those projects and user support are available for free. We +believe this deserves some respect. + +Be respectful to people of all races, genders, religious beliefs and +political views. Regardless of how brilliant a pull request is +technically, we will not tolerate disrespectful or aggressive +behaviour. + +Contributors who violate this straightforward Code of Conduct will see +their pull requests closed and locked. + + +## Contributor Agreement + +If you want to contribute a non-trivial change, please submit a signed copy of our +[Contributor Agreement](https://github.com/rabbitmq/ca#how-to-submit) around the time +you submit your pull request. This will make it much easier (in some cases, possible) +for the RabbitMQ team at Pivotal to merge your contribution. + + +## Where to Ask Questions + +If something isn't clear, feel free to ask on our [mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users). diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index b49befcc79..c9329cf2c4 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -33,8 +33,8 @@ %% {handshake_timeout, 10000}, %% Log levels (currently just used for connection logging). - %% One of 'info', 'warning', 'error' or 'none', in decreasing order - %% of verbosity. Defaults to 'info'. + %% One of 'debug', 'info', 'warning', 'error' or 'none', in decreasing + %% order of verbosity. Defaults to 'info'. %% %% {log_levels, [{connection, info}]}, @@ -108,7 +108,7 @@ %% This pertains to both the rabbitmq_auth_mechanism_ssl plugin and %% STOMP ssl_cert_login configurations. See the rabbitmq_stomp - %% configuration section later in this fail and the README in + %% configuration section later in this file and the README in %% https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further %% details. %% @@ -220,6 +220,13 @@ %% %% {cluster_nodes, {['rabbit@my.host.com'], disc}}, + %% Interval (in milliseconds) at which we send keepalive messages + %% to other cluster members. Note that this is not the same thing + %% as net_ticktime; missed keepalive messages will not cause nodes + %% to be considered down. + %% + %% {cluster_keepalive_interval, 10000}, + %% Set (internal) statistics collection granularity. %% %% {collect_statistics, none}, @@ -235,7 +242,12 @@ %% Timeout used when waiting for Mnesia tables in a cluster to %% become available. %% - %% {mnesia_table_loading_timeout, 30000} + %% {mnesia_table_loading_timeout, 30000}, + + %% Size in bytes below which to embed messages in the queue index. See + %% http://www.rabbitmq.com/persistence-conf.html + %% + %% {queue_index_embed_msgs_below, 4096} ]}, diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 8e89d7f0de..40d8978e9b 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -426,6 +426,41 @@ </listitem> </varlistentry> <varlistentry> + <term><cmdsynopsis><command>rename_cluster_node</command> <arg choice="req">oldnode1</arg> <arg choice="req">newnode1</arg> <arg choice="opt">oldnode2</arg> <arg choice="opt">newnode2 ...</arg></cmdsynopsis></term> + <listitem> + <para> + Supports renaming of cluster nodes in the local database. + </para> + <para> + This subcommand causes rabbitmqctl to temporarily become + the node in order to make the change. The local cluster + node must therefore be completely stopped; other nodes + can be online or offline. + </para> + <para> + This subcommand takes an even number of arguments, in + pairs representing the old and new names for nodes. You + must specify the old and new names for this node and for + any other nodes that are stopped and being renamed at + the same time. + </para> + <para> + It is possible to stop all nodes and rename them all + simultaneously (in which case old and new names for all + nodes must be given to every node) or stop and rename + nodes one at a time (in which case each node only needs + to be told how its own name is changing). + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl rename_cluster_node rabbit@misshelpful rabbit@cordelia</screen> + <para role="example"> + This command will rename the node + <command>rabbit@misshelpful</command> to the node + <command>rabbit@cordelia</command>. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><cmdsynopsis><command>update_cluster_nodes</command> <arg choice="req">clusternode</arg></cmdsynopsis> </term> <listitem> @@ -1226,6 +1261,14 @@ <listitem><para>Like <command>message_bytes</command> but counting only those messages which are persistent.</para></listitem> </varlistentry> <varlistentry> + <term>disk_reads</term> + <listitem><para>Total number of times messages have been read from disk by this queue since it started.</para></listitem> + </varlistentry> + <varlistentry> + <term>disk_writes</term> + <listitem><para>Total number of times messages have been written to disk by this queue since it started.</para></listitem> + </varlistentry> + <varlistentry> <term>consumers</term> <listitem><para>Number of consumers.</para></listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9e5584a1bf..918741438c 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -29,6 +29,7 @@ {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, + {queue_index_embed_msgs_below, 4096}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 74e165cd9b..7627ed431e 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -14,12 +14,17 @@ %% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. %% +%% Passed around most places -record(user, {username, tags, - auth_backend, %% Module this user came from - impl %% Scratch space for that module - }). + authz_backends}). %% List of {Module, AuthUserImpl} pairs +%% Passed to auth backends +-record(auth_user, {username, + tags, + impl}). + +%% Implementation for the internal auth backend -record(internal_user, {username, password_hash, tags}). -record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). @@ -52,7 +57,7 @@ arguments, %% immutable pid, %% durable (just so we know home node) slave_pids, sync_slave_pids, %% transient - down_slave_nodes, %% durable + recoverable_slaves, %% durable policy, %% durable, implicit update as above gm_pids, %% transient decorators, %% transient, recalculated as above @@ -83,7 +88,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, confirm, sender, message, msg_seq_no}). +-record(delivery, {mandatory, confirm, sender, message, msg_seq_no, flow}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, reference = undefined, timestamp}). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 05781f8b85..962e4c879a 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -128,6 +128,9 @@ done rm -rf %{buildroot} %changelog +* Wed Mar 11 2015 jean-sebastien@rabbitmq.com 3.5.0-1 +- New Upstream Release + * Wed Feb 11 2015 michael@rabbitmq.com 3.4.4-1 - New Upstream Release diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 1949dacae8..c9508847f9 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -30,14 +30,14 @@ cd /var/lib/rabbitmq SCRIPT=`basename $0` if [ `id -u` = `id -u rabbitmq` -a "$SCRIPT" = "rabbitmq-server" ] ; then - /usr/lib/rabbitmq/bin/rabbitmq-server "$@" @STDOUT_STDERR_REDIRECTION@ + exec /usr/lib/rabbitmq/bin/rabbitmq-server "$@" @STDOUT_STDERR_REDIRECTION@ elif [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then if [ -f $PWD/.erlang.cookie ] ; then export HOME=. fi - /usr/lib/rabbitmq/bin/${SCRIPT} "$@" + exec /usr/lib/rabbitmq/bin/${SCRIPT} "$@" elif [ `id -u` = 0 ] ; then - @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" + exec @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" else /usr/lib/rabbitmq/bin/${SCRIPT} echo diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 242ac9d701..b2c805b3ad 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (3.5.0-1) unstable; urgency=low + + * New Upstream Release + + -- Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> Wed, 11 Mar 2015 13:56:19 +0000 + rabbitmq-server (3.4.4-1) unstable; urgency=low * New Upstream Release diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 84ed9fdc53..d595620042 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -19,12 +19,12 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env +RABBITMQ_USE_LONGNAME=${RABBITMQ_USE_LONGNAME} \ exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_PLUGINS_ERL_ARGS} \ - ${RABBITMQ_NAME_TYPE} rabbitmq-plugins$$ \ -boot "${CLEAN_BOOT_FILE}" \ -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 8a3c1f330f..01a64c99e5 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -23,14 +23,6 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
-if "!RABBITMQ_USE_LONGNAME!"=="" (
- set RABBITMQ_NAME_TYPE="-sname"
-)
-
-if "!RABBITMQ_USE_LONGNAME!"=="true" (
- set RABBITMQ_NAME_TYPE="-name"
-)
-
if "!RABBITMQ_SERVICENAME!"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
@@ -52,7 +44,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( echo Please either set ERLANG_HOME to point to your Erlang installation or place the
echo RabbitMQ server distribution in the Erlang lib folder.
echo.
- exit /B
+ exit /B 1
)
if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
@@ -68,7 +60,6 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" ( -noinput ^
-hidden ^
!RABBITMQ_CTL_ERL_ARGS! ^
-!RABBITMQ_NAME_TYPE! rabbitmq-plugins!RANDOM!!TIME:~9! ^
-s rabbit_plugins_main ^
-enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
-plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" ^
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 0f27006560..b72b9ee0d5 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -70,7 +70,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( echo Please either set ERLANG_HOME to point to your Erlang installation or place the
echo RabbitMQ server distribution in the Erlang lib folder.
echo.
- exit /B
+ exit /B 1
)
if "!RABBITMQ_MNESIA_BASE!"=="" (
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index d437e251ab..2237275c47 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -19,11 +19,6 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -# rabbitmqctl starts distribution itself, so we need to make sure epmd -# is running. -${ERL_DIR}erl ${RABBITMQ_NAME_TYPE} rabbitmqctl-prelaunch-$$ -noinput \ --eval 'erlang:halt().' -boot "${CLEAN_BOOT_FILE}" - # We specify Mnesia dir and sasl error logger since some actions # (e.g. forget_cluster_node --offline) require us to impersonate the # real node. diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 6eb2530a4e..380b57a2b7 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -27,14 +27,6 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
-if "!RABBITMQ_USE_LONGNAME!"=="" (
- set RABBITMQ_NAME_TYPE="-sname"
-)
-
-if "!RABBITMQ_USE_LONGNAME!"=="true" (
- set RABBITMQ_NAME_TYPE="-name"
-)
-
if "!COMPUTERNAME!"=="" (
set COMPUTERNAME=localhost
)
@@ -60,13 +52,9 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( echo Please either set ERLANG_HOME to point to your Erlang installation or place the
echo RabbitMQ server distribution in the Erlang lib folder.
echo.
- exit /B
+ exit /B 1
)
-rem rabbitmqctl starts distribution itself, so we need to make sure epmd
-rem is running.
-"!ERLANG_HOME!\bin\erl.exe" !RABBITMQ_NAME_TYPE! rabbitmqctl-prelaunch-!RANDOM!!TIME:~9! -noinput -eval "erlang:halt()."
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!TDP0!..\ebin" ^
-noinput ^
diff --git a/src/app_utils.erl b/src/app_utils.erl index ad270518ad..87e6fa0b69 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -62,13 +62,7 @@ start_applications(Apps, ErrorHandler) -> stop_applications(Apps, ErrorHandler) -> manage_applications(fun lists:foldr/3, - %% Mitigation for bug 26467. TODO remove when we fix it. - fun (mnesia) -> - timer:sleep(1000), - application:stop(mnesia); - (App) -> - application:stop(App) - end, + fun application:stop/1, fun application:start/1, not_started, ErrorHandler, diff --git a/src/delegate.erl b/src/delegate.erl index 378759a64b..1a0df98220 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -16,6 +16,35 @@ -module(delegate). +%% delegate is an alternative way of doing remote calls. Compared to +%% the rpc module, it reduces inter-node communication. For example, +%% if a message is routed to 1,000 queues on node A and needs to be +%% propagated to nodes B and C, it would be nice to avoid doing 2,000 +%% remote casts to queue processes. +%% +%% An important issue here is preserving order - we need to make sure +%% that messages from a certain channel to a certain queue take a +%% consistent route, to prevent them being reordered. In fact all +%% AMQP-ish things (such as queue declaration results and basic.get) +%% must take the same route as well, to ensure that clients see causal +%% ordering correctly. Therefore we have a rather generic mechanism +%% here rather than just a message-reflector. That's also why we pick +%% the delegate process to use based on a hash of the source pid. +%% +%% When a function is invoked using delegate:invoke/2, delegate:call/2 +%% or delegate:cast/2 on a group of pids, the pids are first split +%% into local and remote ones. Remote processes are then grouped by +%% node. The function is then invoked locally and on every node (using +%% gen_server2:multi/4) as many times as there are processes on that +%% node, sequentially. +%% +%% Errors returned when executing functions on remote nodes are re-raised +%% in the caller. +%% +%% RabbitMQ starts a pool of delegate processes on boot. The size of +%% the pool is configurable, the aim is to make sure we don't have too +%% few delegates and thus limit performance on many-CPU machines. + -behaviour(gen_server2). -export([start_link/1, invoke_no_result/2, invoke/2, diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 3a7a692c5c..5a916c75bc 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -178,6 +178,10 @@ write_buffer_size, write_buffer_size_limit, write_buffer, + read_buffer, + read_buffer_pos, + read_buffer_rem, %% Num of bytes from pos to end + read_buffer_size_limit, at_eof, path, mode, @@ -237,7 +241,8 @@ -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(open/3 :: (file:filename(), [any()], - [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) + [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} | + {'read_buffer', (non_neg_integer() | 'unbuffered')}]) -> val_or_error(ref())). -spec(close/1 :: (ref()) -> ok_or_error()). -spec(read/2 :: (ref(), non_neg_integer()) -> @@ -331,16 +336,48 @@ close(Ref) -> read(Ref, Count) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; - ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case prim_file:read(Hdl, Count) of - {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), - {Obj, - [Handle #handle { offset = Offset1 }]}; - eof -> {eof, [Handle #handle { at_eof = true }]}; - Error -> {Error, [Handle]} + ([Handle = #handle{read_buffer = Buf, + read_buffer_pos = BufPos, + read_buffer_rem = BufRem, + offset = Offset}]) when BufRem >= Count -> + <<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf, + {{ok, Res}, [Handle#handle{offset = Offset + Count, + read_buffer_pos = BufPos + Count, + read_buffer_rem = BufRem - Count}]}; + ([Handle = #handle{read_buffer = Buf, + read_buffer_pos = BufPos, + read_buffer_rem = BufRem, + read_buffer_size_limit = BufSzLimit, + hdl = Hdl, + offset = Offset}]) -> + WantedCount = Count - BufRem, + case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of + {ok, Data} -> + <<_:BufPos/binary, BufTl/binary>> = Buf, + ReadCount = size(Data), + case ReadCount < WantedCount of + true -> + OffSet1 = Offset + BufRem + ReadCount, + {{ok, <<BufTl/binary, Data/binary>>}, + [reset_read_buffer( + Handle#handle{offset = OffSet1})]}; + false -> + <<Hd:WantedCount/binary, _/binary>> = Data, + OffSet1 = Offset + BufRem + WantedCount, + BufRem1 = ReadCount - WantedCount, + {{ok, <<BufTl/binary, Hd/binary>>}, + [Handle#handle{offset = OffSet1, + read_buffer = Data, + read_buffer_pos = WantedCount, + read_buffer_rem = BufRem1}]} + end; + eof -> + {eof, [Handle #handle { at_eof = true }]}; + Error -> + {Error, [reset_read_buffer(Handle)]} end end). @@ -355,7 +392,7 @@ append(Ref, Data) -> write_buffer_size_limit = 0, at_eof = true } = Handle1} -> Offset1 = Offset + iolist_size(Data), - {prim_file:write(Hdl, Data), + {prim_file_write(Hdl, Data), [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; {{ok, _Offset}, #handle { write_buffer = WriteBuffer, write_buffer_size = Size, @@ -377,12 +414,12 @@ append(Ref, Data) -> sync(Ref) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([#handle { is_dirty = false, write_buffer = [] }]) -> ok; ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> - case prim_file:sync(Hdl) of + case prim_file_sync(Hdl) of ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end @@ -397,7 +434,7 @@ needs_sync(Ref) -> position(Ref, NewOffset) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle), {Result, [Handle1]} end). @@ -465,8 +502,8 @@ clear(Ref) -> fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> ok; ([Handle]) -> - case maybe_seek(bof, Handle #handle { write_buffer = [], - write_buffer_size = 0 }) of + case maybe_seek(bof, Handle#handle{write_buffer = [], + write_buffer_size = 0}) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; @@ -539,6 +576,21 @@ info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). %% Internal functions %%---------------------------------------------------------------------------- +prim_file_read(Hdl, Size) -> + file_handle_cache_stats:update( + io_read, Size, fun() -> prim_file:read(Hdl, Size) end). + +prim_file_write(Hdl, Bytes) -> + file_handle_cache_stats:update( + io_write, iolist_size(Bytes), fun() -> prim_file:write(Hdl, Bytes) end). + +prim_file_sync(Hdl) -> + file_handle_cache_stats:update(io_sync, fun() -> prim_file:sync(Hdl) end). + +prim_file_position(Hdl, NewOffset) -> + file_handle_cache_stats:update( + io_seek, fun() -> prim_file:position(Hdl, NewOffset) end). + is_reader(Mode) -> lists:member(read, Mode). is_writer(Mode) -> lists:member(write, Mode). @@ -550,8 +602,15 @@ append_to_write(Mode) -> end. with_handles(Refs, Fun) -> + with_handles(Refs, reset, Fun). + +with_handles(Refs, ReadBuffer, Fun) -> case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of - {ok, Handles} -> + {ok, Handles0} -> + Handles = case ReadBuffer of + reset -> [reset_read_buffer(H) || H <- Handles0]; + keep -> Handles0 + end, case Fun(Handles) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), @@ -564,8 +623,11 @@ with_handles(Refs, Fun) -> end. with_flushed_handles(Refs, Fun) -> + with_flushed_handles(Refs, reset, Fun). + +with_flushed_handles(Refs, ReadBuffer, Fun) -> with_handles( - Refs, + Refs, ReadBuffer, fun (Handles) -> case lists:foldl( fun (Handle, {ok, HandlesAcc}) -> @@ -611,20 +673,23 @@ reopen([], Tree, RefHdls) -> {ok, lists:reverse(RefHdls)}; reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, path = Path, - mode = Mode, + mode = Mode0, offset = Offset, last_used_at = undefined }} | RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> - case prim_file:open(Path, case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end) of + Mode = case NewOrReopen of + new -> Mode0; + reopen -> file_handle_cache_stats:update(io_reopen), + [read | Mode0] + end, + case prim_file:open(Path, Mode) of {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = - maybe_seek(Offset, Handle #handle { hdl = Hdl, - offset = 0, - last_used_at = Now }), + maybe_seek(Offset, reset_read_buffer( + Handle#handle{hdl = Hdl, + offset = 0, + last_used_at = Now})), put({Ref, fhc_handle}, Handle1), reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), [{Ref, Handle1} | RefHdls]); @@ -709,6 +774,11 @@ new_closed_handle(Path, Mode, Options) -> infinity -> infinity; N when is_integer(N) -> N end, + ReadBufferSize = + case proplists:get_value(read_buffer, Options, unbuffered) of + unbuffered -> 0; + N2 when is_integer(N2) -> N2 + end, Ref = make_ref(), put({Ref, fhc_handle}, #handle { hdl = closed, offset = 0, @@ -716,6 +786,10 @@ new_closed_handle(Path, Mode, Options) -> write_buffer_size = 0, write_buffer_size_limit = WriteBufferSize, write_buffer = [], + read_buffer = <<>>, + read_buffer_pos = 0, + read_buffer_rem = 0, + read_buffer_size_limit = ReadBufferSize, at_eof = false, path = Path, mode = Mode, @@ -742,7 +816,7 @@ soft_close(Handle) -> is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of - true -> prim_file:sync(Hdl); + true -> prim_file_sync(Hdl); false -> ok end, ok = prim_file:close(Hdl), @@ -776,17 +850,31 @@ hard_close(Handle) -> Result end. -maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, - at_eof = AtEoF }) -> - {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), - case (case NeedsSeek of - true -> prim_file:position(Hdl, NewOffset); - false -> {ok, Offset} - end) of - {ok, Offset1} = Result -> - {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }}; - {error, _} = Error -> - {Error, Handle} +maybe_seek(New, Handle = #handle{hdl = Hdl, + offset = Old, + read_buffer_pos = BufPos, + read_buffer_rem = BufRem, + at_eof = AtEoF}) -> + {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Old, New), + case NeedsSeek of + true when is_number(New) andalso + ((New >= Old andalso New =< BufRem + Old) + orelse (New < Old andalso Old - New =< BufPos)) -> + Diff = New - Old, + {{ok, New}, Handle#handle{offset = New, + at_eof = AtEoF1, + read_buffer_pos = BufPos + Diff, + read_buffer_rem = BufRem - Diff}}; + true -> + case prim_file_position(Hdl, New) of + {ok, Offset1} = Result -> + {Result, reset_read_buffer(Handle#handle{offset = Offset1, + at_eof = AtEoF1})}; + {error, _} = Error -> + {Error, Handle} + end; + false -> + {{ok, Old}, Handle} end. needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false}; @@ -817,7 +905,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, write_buffer_size = DataSize, at_eof = true }) -> - case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of + case prim_file_write(Hdl, lists:reverse(WriteBuffer)) of ok -> Offset1 = Offset + DataSize, {ok, Handle #handle { offset = Offset1, is_dirty = true, @@ -826,6 +914,11 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, {Error, Handle} end. +reset_read_buffer(Handle) -> + Handle#handle{read_buffer = <<>>, + read_buffer_pos = 0, + read_buffer_rem = 0}. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(total_limit, #fhc_state{limit = Limit}) -> Limit; @@ -843,6 +936,7 @@ used(#fhc_state{open_count = C1, %%---------------------------------------------------------------------------- init([AlarmSet, AlarmClear]) -> + file_handle_cache_stats:init(), Limit = case application:get_env(file_handles_high_watermark) of {ok, Watermark} when (is_integer(Watermark) andalso Watermark > 0) -> diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl new file mode 100644 index 0000000000..de2f90c67a --- /dev/null +++ b/src/file_handle_cache_stats.erl @@ -0,0 +1,67 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(file_handle_cache_stats). + +%% stats about read / write operations that go through the fhc. + +-export([init/0, update/3, update/2, update/1, get/0]). + +-define(TABLE, ?MODULE). + +-define(COUNT, + [io_reopen, mnesia_ram_tx, mnesia_disk_tx, + msg_store_read, msg_store_write, + queue_index_journal_write, queue_index_write, queue_index_read]). +-define(COUNT_TIME, [io_sync, io_seek]). +-define(COUNT_TIME_BYTES, [io_read, io_write]). + +init() -> + ets:new(?TABLE, [public, named_table]), + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME_BYTES, + Counter <- [count, bytes, time]], + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME, + Counter <- [count, time]], + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT, + Counter <- [count]]. + +update(Op, Bytes, Thunk) -> + {Time, Res} = timer_tc(Thunk), + ets:update_counter(?TABLE, {Op, count}, 1), + ets:update_counter(?TABLE, {Op, bytes}, Bytes), + ets:update_counter(?TABLE, {Op, time}, Time), + Res. + +update(Op, Thunk) -> + {Time, Res} = timer_tc(Thunk), + ets:update_counter(?TABLE, {Op, count}, 1), + ets:update_counter(?TABLE, {Op, time}, Time), + Res. + +update(Op) -> + ets:update_counter(?TABLE, {Op, count}, 1), + ok. + +get() -> + lists:sort(ets:tab2list(?TABLE)). + +%% TODO timer:tc/1 was introduced in R14B03; use that function once we +%% require that version. +timer_tc(Thunk) -> + T1 = os:timestamp(), + Res = Thunk(), + T2 = os:timestamp(), + {timer:now_diff(T2, T1), Res}. diff --git a/src/gatherer.erl b/src/gatherer.erl index 8bce170754..6a71021686 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -16,6 +16,20 @@ -module(gatherer). +%% Gatherer is a queue which has producer and consumer processes. Before producers +%% push items to the queue using gatherer:in/2 they need to declare their intent +%% to do so with gatherer:fork/1. When a publisher's work is done, it states so +%% using gatherer:finish/1. +%% +%% Consumers pop messages off queues with gatherer:out/1. If a queue is empty +%% and there are producers that haven't finished working, the caller is blocked +%% until an item is available. If there are no active producers, gatherer:out/1 +%% immediately returns 'empty'. +%% +%% This module is primarily used to collect results from asynchronous tasks +%% running in a worker pool, e.g. when recovering bindings or rebuilding +%% message store indices. + -behaviour(gen_server2). -export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]). diff --git a/src/lqueue.erl b/src/lqueue.erl index 62f60d5ffb..941941de2e 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -16,6 +16,10 @@ -module(lqueue). +%% lqueue implements a subset of Erlang's queue module. lqueues +%% maintain their own length, so lqueue:len/1 +%% is an O(1) operation, in contrast with queue:len/1 which is O(n). + -export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]). diff --git a/src/pmon.erl b/src/pmon.erl index a94f5a67af..7981742284 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -16,6 +16,19 @@ -module(pmon). +%% Process Monitor +%% ================ +%% +%% This module monitors processes so that every process has at most +%% 1 monitor. +%% Processes monitored can be dynamically added and removed. +%% +%% Unlike erlang:[de]monitor* functions, this module +%% provides basic querying capability and avoids contacting down nodes. +%% +%% It is used to monitor nodes, queue mirrors, and by +%% the queue collector, among other things. + -export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, monitored/1, is_empty/1]). diff --git a/src/rabbit.erl b/src/rabbit.erl index 7fdb02d09e..b55c129a91 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -118,6 +118,13 @@ {requires, [rabbit_alarm, guid_generator]}, {enables, core_initialized}]}). +-rabbit_boot_step({rabbit_epmd_monitor, + [{description, "epmd monitor"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_epmd_monitor]}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + -rabbit_boot_step({core_initialized, [{description, "core initialized"}, {requires, kernel_ready}]}). @@ -243,15 +250,19 @@ maybe_hipe_compile() -> {ok, Want} = application:get_env(rabbit, hipe_compile), Can = code:which(hipe) =/= non_existing, case {Want, Can} of - {true, true} -> hipe_compile(), - true; + {true, true} -> hipe_compile(); {true, false} -> false; - {false, _} -> true + {false, _} -> {ok, disabled} end. -warn_if_hipe_compilation_failed(true) -> +log_hipe_result({ok, disabled}) -> ok; -warn_if_hipe_compilation_failed(false) -> +log_hipe_result({ok, Count, Duration}) -> + rabbit_log:info( + "HiPE in use: compiled ~B modules in ~Bs.~n", [Count, Duration]); +log_hipe_result(false) -> + io:format( + "~nNot HiPE compiling: HiPE not found in this Erlang installation.~n"), rabbit_log:warning( "Not HiPE compiling: HiPE not found in this Erlang installation.~n"). @@ -276,8 +287,9 @@ hipe_compile() -> {'DOWN', MRef, process, _, Reason} -> exit(Reason) end || {_Pid, MRef} <- PidMRefs], T2 = erlang:now(), - io:format("|~n~nCompiled ~B modules in ~Bs~n", - [Count, timer:now_diff(T2, T1) div 1000000]). + Duration = timer:now_diff(T2, T1) div 1000000, + io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]), + {ok, Count, Duration}. split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]). @@ -307,9 +319,9 @@ start() -> boot() -> start_it(fun() -> ok = ensure_application_loaded(), - Success = maybe_hipe_compile(), + HipeResult = maybe_hipe_compile(), ok = ensure_working_log_handlers(), - warn_if_hipe_compilation_failed(Success), + log_hipe_result(HipeResult), rabbit_node_monitor:prepare_cluster_status_files(), ok = rabbit_upgrade:maybe_upgrade_mnesia(), %% It's important that the consistency check happens after @@ -323,6 +335,11 @@ broker_start() -> Plugins = rabbit_plugins:setup(), ToBeLoaded = Plugins ++ ?APPS, start_apps(ToBeLoaded), + case code:load_file(sd_notify) of + {module, sd_notify} -> SDNotify = sd_notify, + SDNotify:sd_notify(0, "READY=1"); + {error, _} -> ok + end, ok = log_broker_started(rabbit_plugins:active()). start_it(StartFun) -> @@ -590,13 +607,19 @@ sort_boot_steps(UnsortedSteps) -> boot_error({could_not_start, rabbit, {{timeout_waiting_for_tables, _}, _}}, _Stacktrace) -> AllNodes = rabbit_mnesia:cluster_nodes(all), + Suffix = "~nBACKGROUND~n==========~n~n" + "This cluster node was shut down while other nodes were still running.~n" + "To avoid losing data, you should start the other nodes first, then~n" + "start this one. To force this node to start, first invoke~n" + "\"rabbitmqctl force_boot\". If you do so, any changes made on other~n" + "cluster nodes after this one was shut down may be lost.~n", {Err, Nodes} = case AllNodes -- [node()] of [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" " shut down forcefully~nit cannot determine which nodes" - " are timing out.~n", []}; + " are timing out.~n" ++ Suffix, []}; Ns -> {rabbit_misc:format( - "Timeout contacting cluster nodes: ~p.~n", [Ns]), + "Timeout contacting cluster nodes: ~p.~n" ++ Suffix, [Ns]), Ns} end, log_boot_error_and_exit( diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b0a9c0d807..41c54b07a2 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -19,7 +19,7 @@ -include("rabbit.hrl"). -export([check_user_pass_login/2, check_user_login/2, check_user_loopback/2, - check_vhost_access/2, check_resource_access/3]). + check_vhost_access/3, check_resource_access/3]). %%---------------------------------------------------------------------------- @@ -31,15 +31,17 @@ -spec(check_user_pass_login/2 :: (rabbit_types:username(), rabbit_types:password()) - -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). + -> {'ok', rabbit_types:user()} | + {'refused', rabbit_types:username(), string(), [any()]}). -spec(check_user_login/2 :: (rabbit_types:username(), [{atom(), any()}]) - -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). + -> {'ok', rabbit_types:user()} | + {'refused', rabbit_types:username(), string(), [any()]}). -spec(check_user_loopback/2 :: (rabbit_types:username(), rabbit_net:socket() | inet:ip_address()) -> 'ok' | 'not_allowed'). --spec(check_vhost_access/2 :: - (rabbit_types:user(), rabbit_types:vhost()) +-spec(check_vhost_access/3 :: + (rabbit_types:user(), rabbit_types:vhost(), rabbit_net:socket()) -> 'ok' | rabbit_types:channel_exit()). -spec(check_resource_access/3 :: (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) @@ -55,36 +57,71 @@ check_user_pass_login(Username, Password) -> check_user_login(Username, AuthProps) -> {ok, Modules} = application:get_env(rabbit, auth_backends), R = lists:foldl( - fun ({ModN, ModZ}, {refused, _, _}) -> + fun ({ModN, ModZs0}, {refused, _, _, _}) -> + ModZs = case ModZs0 of + A when is_atom(A) -> [A]; + L when is_list(L) -> L + end, %% Different modules for authN vs authZ. So authenticate %% with authN module, then if that succeeds do - %% passwordless (i.e pre-authenticated) login with authZ - %% module, and use the #user{} the latter gives us. - case try_login(ModN, Username, AuthProps) of - {ok, _} -> try_login(ModZ, Username, []); - Else -> Else + %% passwordless (i.e pre-authenticated) login with authZ. + case try_authenticate(ModN, Username, AuthProps) of + {ok, ModNUser = #auth_user{username = Username2}} -> + user(ModNUser, try_authorize(ModZs, Username2)); + Else -> + Else end; - (Mod, {refused, _, _}) -> + (Mod, {refused, _, _, _}) -> %% Same module for authN and authZ. Just take the result %% it gives us - try_login(Mod, Username, AuthProps); + case try_authenticate(Mod, Username, AuthProps) of + {ok, ModNUser = #auth_user{impl = Impl}} -> + user(ModNUser, {ok, [{Mod, Impl}]}); + Else -> + Else + end; (_, {ok, User}) -> %% We've successfully authenticated. Skip to the end... {ok, User} - end, {refused, "No modules checked '~s'", [Username]}, Modules), - rabbit_event:notify(case R of - {ok, _User} -> user_authentication_success; - _ -> user_authentication_failure - end, [{name, Username}]), + end, + {refused, Username, "No modules checked '~s'", [Username]}, Modules), R. -try_login(Module, Username, AuthProps) -> - case Module:check_user_login(Username, AuthProps) of - {error, E} -> {refused, "~s failed authenticating ~s: ~p~n", - [Module, Username, E]}; - Else -> Else +try_authenticate(Module, Username, AuthProps) -> + case Module:user_login_authentication(Username, AuthProps) of + {ok, AuthUser} -> {ok, AuthUser}; + {error, E} -> {refused, Username, + "~s failed authenticating ~s: ~p~n", + [Module, Username, E]}; + {refused, F, A} -> {refused, Username, F, A} end. +try_authorize(Modules, Username) -> + lists:foldr( + fun (Module, {ok, ModsImpls}) -> + case Module:user_login_authorization(Username) of + {ok, Impl} -> {ok, [{Module, Impl} | ModsImpls]}; + {error, E} -> {refused, Username, + "~s failed authorizing ~s: ~p~n", + [Module, Username, E]}; + {refused, F, A} -> {refused, Username, F, A} + end; + (_, {refused, F, A}) -> + {refused, Username, F, A} + end, {ok, []}, Modules). + +user(#auth_user{username = Username, tags = Tags}, {ok, ModZImpls}) -> + {ok, #user{username = Username, + tags = Tags, + authz_backends = ModZImpls}}; +user(_AuthUser, Error) -> + Error. + +auth_user(#user{username = Username, tags = Tags}, Impl) -> + #auth_user{username = Username, + tags = Tags, + impl = Impl}. + check_user_loopback(Username, SockOrAddr) -> {ok, Users} = application:get_env(rabbit, loopback_users), case rabbit_net:is_loopback(SockOrAddr) @@ -93,29 +130,38 @@ check_user_loopback(Username, SockOrAddr) -> false -> not_allowed end. -check_vhost_access(User = #user{ username = Username, - auth_backend = Module }, VHostPath) -> - check_access( - fun() -> - %% TODO this could be an andalso shortcut under >R13A - case rabbit_vhost:exists(VHostPath) of - false -> false; - true -> Module:check_vhost_access(User, VHostPath) - end - end, - Module, "access to vhost '~s' refused for user '~s'", - [VHostPath, Username]). +check_vhost_access(User = #user{username = Username, + authz_backends = Modules}, VHostPath, Sock) -> + lists:foldl( + fun({Mod, Impl}, ok) -> + check_access( + fun() -> + rabbit_vhost:exists(VHostPath) andalso + Mod:check_vhost_access( + auth_user(User, Impl), VHostPath, Sock) + end, + Mod, "access to vhost '~s' refused for user '~s'", + [VHostPath, Username]); + (_, Else) -> + Else + end, ok, Modules). check_resource_access(User, R = #resource{kind = exchange, name = <<"">>}, Permission) -> check_resource_access(User, R#resource{name = <<"amq.default">>}, Permission); -check_resource_access(User = #user{username = Username, auth_backend = Module}, +check_resource_access(User = #user{username = Username, + authz_backends = Modules}, Resource, Permission) -> - check_access( - fun() -> Module:check_resource_access(User, Resource, Permission) end, - Module, "access to ~s refused for user '~s'", - [rabbit_misc:rs(Resource), Username]). + lists:foldl( + fun({Module, Impl}, ok) -> + check_access( + fun() -> Module:check_resource_access( + auth_user(User, Impl), Resource, Permission) end, + Module, "access to ~s refused for user '~s'", + [rabbit_misc:rs(Resource), Username]); + (_, Else) -> Else + end, ok, Modules). check_access(Fun, Module, ErrStr, ErrArgs) -> Allow = case Fun() of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0dfca854af..2ab03ccec4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -23,7 +23,7 @@ -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([list_down/1]). -export([force_event_refresh/1, notify_policy_changed/1]). @@ -149,8 +149,6 @@ -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> qpids()). --spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - qpids()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -265,17 +263,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), Q = rabbit_queue_decorator:set( - rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - down_slave_nodes = [], - gm_pids = [], - state = live})), + rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + recoverable_slaves = [], + gm_pids = [], + state = live})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call( rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), @@ -469,7 +467,8 @@ declare_args() -> {<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, {<<"x-max-length">>, fun check_non_neg_int_arg/2}, - {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}]. + {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, + {<<"x-max-priority">>, fun check_non_neg_int_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -560,12 +559,12 @@ info_down(Q, DownReason) -> info_down(Q, Items, DownReason) -> [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. -i_down(name, #amqqueue{name = Name}, _) -> Name; -i_down(durable, #amqqueue{durable = Durable},_) -> Durable; -i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD; -i_down(arguments, #amqqueue{arguments = Args}, _) -> Args; -i_down(pid, #amqqueue{pid = QPid}, _) -> QPid; -i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}, _) -> DSN; +i_down(name, #amqqueue{name = Name}, _) -> Name; +i_down(durable, #amqqueue{durable = Dur}, _) -> Dur; +i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD; +i_down(arguments, #amqqueue{arguments = Args}, _) -> Args; +i_down(pid, #amqqueue{pid = QPid}, _) -> QPid; +i_down(recoverable_slaves, #amqqueue{recoverable_slaves = RS}, _) -> RS; i_down(state, _Q, DownReason) -> DownReason; i_down(K, _Q, _DownReason) -> case lists:member(K, rabbit_amqqueue_process:info_keys()) of @@ -623,10 +622,6 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }) -> purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). -deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). - -deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). - requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). @@ -724,24 +719,48 @@ forget_all_durable(Node) -> fun () -> Qs = mnesia:match_object(rabbit_durable_queue, #amqqueue{_ = '_'}, write), - [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs, + [forget_node_for_queue(Node, Q) || + #amqqueue{pid = Pid} = Q <- Qs, node(Pid) =:= Node], ok end), ok. -forget_node_for_queue(#amqqueue{name = Name, - down_slave_nodes = []}) -> +%% Try to promote a slave while down - it should recover as a +%% master. We try to take the oldest slave here for best chance of +%% recovery. +forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) -> + forget_node_for_queue(DeadNode, RS, Q). + +forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) -> %% No slaves to recover from, queue is gone. %% Don't process_deletions since that just calls callbacks and we %% are not really up. internal_delete1(Name, true); -forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) -> - %% Promote a slave while down - it'll happily recover as a master - Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H), - down_slave_nodes = T}, - ok = mnesia:write(rabbit_durable_queue, Q1, write). +%% Should not happen, but let's be conservative. +forget_node_for_queue(DeadNode, [DeadNode | T], Q) -> + forget_node_for_queue(DeadNode, T, Q); + +forget_node_for_queue(DeadNode, [H|T], Q) -> + case node_permits_offline_promotion(H) of + false -> forget_node_for_queue(DeadNode, T, Q); + true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)}, + ok = mnesia:write(rabbit_durable_queue, Q1, write) + end. + +node_permits_offline_promotion(Node) -> + case node() of + Node -> not rabbit:is_running(); %% [1] + _ -> Running = rabbit_mnesia:cluster_nodes(running), + not lists:member(Node, Running) %% [2] + end. +%% [1] In this case if we are a real running node (i.e. rabbitmqctl +%% has RPCed into us) then we cannot allow promotion. If on the other +%% hand we *are* rabbitmqctl impersonating the node for offline +%% node-forgetting then we can. +%% +%% [2] This is simpler; as long as it's down that's OK run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). @@ -763,12 +782,12 @@ on_node_up(Node) -> fun () -> Qs = mnesia:match_object(rabbit_queue, #amqqueue{_ = '_'}, write), - [case lists:member(Node, DSNs) of - true -> DSNs1 = DSNs -- [Node], + [case lists:member(Node, RSs) of + true -> RSs1 = RSs -- [Node], store_queue( - Q#amqqueue{down_slave_nodes = DSNs1}); + Q#amqqueue{recoverable_slaves = RSs1}); false -> ok - end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs], + end || #amqqueue{recoverable_slaves = RSs} = Q <- Qs], ok end). @@ -807,24 +826,29 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. -immutable(Q) -> Q#amqqueue{pid = none, - slave_pids = none, - sync_slave_pids = none, - down_slave_nodes = none, - gm_pids = none, - policy = none, - decorators = none, - state = none}. +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + recoverable_slaves = none, + gm_pids = none, + policy = none, + decorators = none, + state = none}. -deliver([], _Delivery, _Flow) -> +deliver([], _Delivery) -> %% /dev/null optimisation []; -deliver(Qs, Delivery, Flow) -> +deliver(Qs, Delivery = #delivery{flow = Flow}) -> {MPids, SPids} = qpids(Qs), QPids = MPids ++ SPids, + %% We use up two credits to send to a slave since the message + %% arrives at the slave from two directions. We will ack one when + %% the slave receives the message direct from the channel, and the + %% other when it receives it via GM. case Flow of - flow -> [credit_flow:send(QPid) || QPid <- QPids]; + flow -> [credit_flow:send(QPid) || QPid <- QPids], + [credit_flow:send(QPid) || QPid <- SPids]; noflow -> ok end, @@ -833,8 +857,8 @@ deliver(Qs, Delivery, Flow) -> %% after they have become master they should mark the message as %% 'delivered' since they do not know what the master may have %% done with it. - MMsg = {deliver, Delivery, false, Flow}, - SMsg = {deliver, Delivery, true, Flow}, + MMsg = {deliver, Delivery, false}, + SMsg = {deliver, Delivery, true}, delegate:cast(MPids, MMsg), delegate:cast(SPids, SMsg), QPids. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 16ead64af9..c6030a090e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -83,7 +83,7 @@ memory, slave_pids, synchronised_slave_pids, - down_slave_nodes, + recoverable_slaves, state ]). @@ -498,12 +498,13 @@ send_mandatory(#delivery{mandatory = true, discard(#delivery{confirm = Confirm, sender = SenderPid, + flow = Flow, message = #basic_message{id = MsgId}}, BQ, BQS, MTC) -> MTC1 = case Confirm of true -> confirm_messages([MsgId], MTC); false -> MTC end, - BQS1 = BQ:discard(MsgId, SenderPid, BQS), + BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -525,14 +526,17 @@ run_message_queue(ActiveConsumersChanged, State) -> end end. -attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, +attempt_delivery(Delivery = #delivery{sender = SenderPid, + flow = Flow, + message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS, msg_id_to_channel = MTC}) -> case rabbit_queue_consumers:deliver( fun (true) -> true = BQ:is_empty(BQS), - {AckTag, BQS1} = BQ:publish_delivered( - Message, Props, SenderPid, BQS), + {AckTag, BQS1} = + BQ:publish_delivered( + Message, Props, SenderPid, Flow, BQS), {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, discard(Delivery, BQ, BQS, MTC)} @@ -549,7 +553,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, State#q{consumers = Consumers})} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, +deliver_or_enqueue(Delivery = #delivery{message = Message, + sender = SenderPid, + flow = Flow}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms @@ -570,7 +576,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> - BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), + BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = maybe_drop_head(State3#q{backing_queue_state = BQS3}), QLen = BQ:len(BQS4), @@ -855,9 +861,9 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; -i(down_slave_nodes, #q{q = #amqqueue{name = Name, - durable = Durable}}) -> - {ok, Q = #amqqueue{down_slave_nodes = Nodes}} = +i(recoverable_slaves, #q{q = #amqqueue{name = Name, + durable = Durable}}) -> + {ok, Q = #amqqueue{recoverable_slaves = Nodes}} = rabbit_amqqueue:lookup(Name), case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of false -> ''; @@ -1100,15 +1106,23 @@ handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender, + flow = Flow}, SlaveWhenPublished}, State = #q{senders = Senders}) -> Senders1 = case Flow of flow -> credit_flow:ack(Sender), + case SlaveWhenPublished of + true -> credit_flow:ack(Sender); %% [0] + false -> ok + end, pmon:monitor(Sender, Senders); noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, Delivered, State1)); + noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1)); +%% [0] The second ack is since the channel thought we were a slave at +%% the time it published this message, so it used two credits (see +%% rabbit_amqqueue:deliver/2). handle_cast({ack, AckTags, ChPid}, State) -> noreply(ack(AckTags, ChPid, State)); diff --git a/src/rabbit_auth_backend_dummy.erl b/src/rabbit_auth_backend_dummy.erl index 5daca368eb..d2f07c1d57 100644 --- a/src/rabbit_auth_backend_dummy.erl +++ b/src/rabbit_auth_backend_dummy.erl @@ -17,11 +17,12 @@ -module(rabbit_auth_backend_dummy). -include("rabbit.hrl"). --behaviour(rabbit_auth_backend). +-behaviour(rabbit_authn_backend). +-behaviour(rabbit_authz_backend). --export([description/0]). -export([user/0]). --export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). +-export([user_login_authentication/2, user_login_authorization/1, + check_vhost_access/3, check_resource_access/3]). -ifdef(use_specs). @@ -31,19 +32,17 @@ %% A user to be used by the direct client when permission checks are %% not needed. This user can do anything AMQPish. -user() -> #user{username = <<"none">>, - tags = [], - auth_backend = ?MODULE, - impl = none}. +user() -> #user{username = <<"none">>, + tags = [], + authz_backends = [{?MODULE, none}]}. %% Implementation of rabbit_auth_backend -description() -> - [{name, <<"Dummy">>}, - {description, <<"Database for the dummy user">>}]. +user_login_authentication(_, _) -> + {refused, "cannot log in conventionally as dummy user", []}. -check_user_login(_, _) -> +user_login_authorization(_) -> {refused, "cannot log in conventionally as dummy user", []}. -check_vhost_access(#user{}, _VHostPath) -> true. -check_resource_access(#user{}, #resource{}, _Permission) -> true. +check_vhost_access(#auth_user{}, _VHostPath, _Sock) -> true. +check_resource_access(#auth_user{}, #resource{}, _Permission) -> true. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index fd1c4e8ee6..20a5766d20 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -17,10 +17,11 @@ -module(rabbit_auth_backend_internal). -include("rabbit.hrl"). --behaviour(rabbit_auth_backend). +-behaviour(rabbit_authn_backend). +-behaviour(rabbit_authz_backend). --export([description/0]). --export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). +-export([user_login_authentication/2, user_login_authorization/1, + check_vhost_access/3, check_resource_access/3]). -export([add_user/2, delete_user/1, lookup_user/1, change_password/2, clear_password/1, @@ -76,13 +77,9 @@ %%---------------------------------------------------------------------------- %% Implementation of rabbit_auth_backend -description() -> - [{name, <<"Internal">>}, - {description, <<"Internal user / password database">>}]. - -check_user_login(Username, []) -> +user_login_authentication(Username, []) -> internal_check_user_login(Username, fun(_) -> true end); -check_user_login(Username, [{password, Cleartext}]) -> +user_login_authentication(Username, [{password, Cleartext}]) -> internal_check_user_login( Username, fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) -> @@ -90,25 +87,30 @@ check_user_login(Username, [{password, Cleartext}]) -> (#internal_user{}) -> false end); -check_user_login(Username, AuthProps) -> +user_login_authentication(Username, AuthProps) -> exit({unknown_auth_props, Username, AuthProps}). +user_login_authorization(Username) -> + case user_login_authentication(Username, []) of + {ok, #auth_user{impl = Impl}} -> {ok, Impl}; + Else -> Else + end. + internal_check_user_login(Username, Fun) -> Refused = {refused, "user '~s' - invalid credentials", [Username]}, case lookup_user(Username) of {ok, User = #internal_user{tags = Tags}} -> case Fun(User) of - true -> {ok, #user{username = Username, - tags = Tags, - auth_backend = ?MODULE, - impl = User}}; + true -> {ok, #auth_user{username = Username, + tags = Tags, + impl = none}}; _ -> Refused end; {error, not_found} -> Refused end. -check_vhost_access(#user{username = Username}, VHostPath) -> +check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) -> case mnesia:dirty_read({rabbit_user_permission, #user_vhost{username = Username, virtual_host = VHostPath}}) of @@ -116,7 +118,7 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [_R] -> true end. -check_resource_access(#user{username = Username}, +check_resource_access(#auth_user{username = Username}, #resource{virtual_host = VHostPath, name = Name}, Permission) -> case mnesia:dirty_read({rabbit_user_permission, diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index d11af09552..c8e23a75d5 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -36,13 +36,13 @@ %% Another round is needed. Here's the state I want next time. %% {protocol_error, Msg, Args} %% Client got the protocol wrong. Log and die. -%% {refused, Msg, Args} +%% {refused, Username, Msg, Args} %% Client failed authentication. Log and die. -callback handle_response(binary(), any()) -> {'ok', rabbit_types:user()} | {'challenge', binary(), any()} | {'protocol_error', string(), [any()]} | - {'refused', string(), [any()]}. + {'refused', rabbit_types:username() | none, string(), [any()]}. -else. diff --git a/src/rabbit_authn_backend.erl b/src/rabbit_authn_backend.erl new file mode 100644 index 0000000000..cfc3f5db51 --- /dev/null +++ b/src/rabbit_authn_backend.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_authn_backend). + +-include("rabbit.hrl"). + +-ifdef(use_specs). + +%% Check a user can log in, given a username and a proplist of +%% authentication information (e.g. [{password, Password}]). If your +%% backend is not to be used for authentication, this should always +%% refuse access. +%% +%% Possible responses: +%% {ok, User} +%% Authentication succeeded, and here's the user record. +%% {error, Error} +%% Something went wrong. Log and die. +%% {refused, Msg, Args} +%% Client failed authentication. Log and die. +-callback user_login_authentication(rabbit_types:username(), [term()]) -> + {'ok', rabbit_types:auth_user()} | + {'refused', string(), [any()]} | + {'error', any()}. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{user_login_authentication, 2}]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_authz_backend.erl index a7dd6494b1..ff5f014e37 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_authz_backend.erl @@ -14,47 +14,49 @@ %% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. %% --module(rabbit_auth_backend). +-module(rabbit_authz_backend). --ifdef(use_specs). +-include("rabbit.hrl"). -%% A description proplist as with auth mechanisms, -%% exchanges. Currently unused. --callback description() -> [proplists:property()]. +-ifdef(use_specs). -%% Check a user can log in, given a username and a proplist of -%% authentication information (e.g. [{password, Password}]). +%% Check a user can log in, when this backend is being used for +%% authorisation only. Authentication has already taken place +%% successfully, but we need to check that the user exists in this +%% backend, and initialise any impl field we will want to have passed +%% back in future calls to check_vhost_access/3 and +%% check_resource_access/3. %% %% Possible responses: -%% {ok, User} -%% Authentication succeeded, and here's the user record. +%% {ok, Impl} +%% User authorisation succeeded, and here's the impl field. %% {error, Error} %% Something went wrong. Log and die. %% {refused, Msg, Args} -%% Client failed authentication. Log and die. --callback check_user_login(rabbit_types:username(), [term()]) -> - {'ok', rabbit_types:user()} | +%% User authorisation failed. Log and die. +-callback user_login_authorization(rabbit_types:username()) -> + {'ok', any()} | {'refused', string(), [any()]} | {'error', any()}. -%% Given #user and vhost, can a user log in to a vhost? +%% Given #auth_user and vhost, can a user log in to a vhost? %% Possible responses: %% true %% false %% {error, Error} %% Something went wrong. Log and die. --callback check_vhost_access(rabbit_types:user(), rabbit_types:vhost()) -> +-callback check_vhost_access(rabbit_types:auth_user(), + rabbit_types:vhost(), rabbit_net:socket()) -> boolean() | {'error', any()}. - -%% Given #user, resource and permission, can a user access a resource? +%% Given #auth_user, resource and permission, can a user access a resource? %% %% Possible responses: %% true %% false %% {error, Error} %% Something went wrong. Log and die. --callback check_resource_access(rabbit_types:user(), +-callback check_resource_access(rabbit_types:auth_user(), rabbit_types:r(atom()), rabbit_access_control:permission_atom()) -> boolean() | {'error', any()}. @@ -64,8 +66,8 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {check_user_login, 2}, {check_vhost_access, 2}, - {check_resource_access, 3}]; + [{user_login_authorization, 1}, + {check_vhost_access, 3}, {check_resource_access, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index 09e9aa6ae9..566c8bb836 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -16,13 +16,16 @@ -module(rabbit_autoheal). --export([init/0, maybe_start/1, rabbit_down/2, node_down/2, handle_msg/3]). +-export([init/0, enabled/0, maybe_start/1, rabbit_down/2, node_down/2, + handle_msg/3]). %% The named process we are running in. -define(SERVER, rabbit_node_monitor). -define(MNESIA_STOPPED_PING_INTERNAL, 200). +-define(AUTOHEAL_STATE_AFTER_RESTART, rabbit_autoheal_state_after_restart). + %%---------------------------------------------------------------------------- %% In order to autoheal we want to: @@ -45,9 +48,20 @@ %% stops - if a node stops for any other reason it just gets a message %% it will ignore, and otherwise we carry on. %% +%% Meanwhile, the leader may continue to receive new autoheal requests: +%% all of them are ignored. The winner notifies the leader when the +%% current autoheal process is finished (ie. when all losers stopped and +%% were asked to start again) or was aborted. When the leader receives +%% the notification or if it looses contact with the winner, it can +%% accept new autoheal requests. +%% %% The winner and the leader are not necessarily the same node. %% -%% Possible states: +%% The leader can be a loser and will restart in this case. It remembers +%% there is an autoheal in progress by temporarily saving the autoheal +%% state to the application environment. +%% +%% == Possible states == %% %% not_healing %% - the default @@ -56,31 +70,73 @@ %% - we are the winner and are waiting for all losing nodes to stop %% before telling them they can restart %% -%% about_to_heal -%% - we are the leader, and have already assigned the winner and -%% losers. We are part of the losers and we wait for the winner_is -%% announcement. This leader-specific state differs from not_healing -%% (the state other losers are in), because the leader could still -%% receive request_start messages: those subsequent requests must be -%% ignored. -%% -%% {leader_waiting, OutstandingStops} +%% {leader_waiting, Winner, Notify} %% - we are the leader, and have already assigned the winner and losers. -%% We are neither but need to ignore further requests to autoheal. +%% We are waiting for a confirmation from the winner that the autoheal +%% process has ended. Meanwhile we can ignore autoheal requests. +%% Because we may be a loser too, this state is saved to the application +%% environment and restored on startup. %% %% restarting %% - we are restarting. Of course the node monitor immediately dies %% then so this state does not last long. We therefore send the %% autoheal_safe_to_start message to the rabbit_outside_app_process %% instead. +%% +%% == Message flow == +%% +%% 1. Any node (leader included) >> {request_start, node()} >> Leader +%% When Mnesia detects it is running partitioned or +%% when a remote node starts, rabbit_node_monitor calls +%% rabbit_autoheal:maybe_start/1. The message above is sent to the +%% leader so the leader can take a decision. +%% +%% 2. Leader >> {become_winner, Losers} >> Winner +%% The leader notifies the winner so the latter can proceed with +%% the autoheal. +%% +%% 3. Winner >> {winner_is, Winner} >> All losers +%% The winner notifies losers they must stop. +%% +%% 4. Winner >> autoheal_safe_to_start >> All losers +%% When either all losers stopped or the autoheal process was +%% aborted, the winner notifies losers they can start again. +%% +%% 5. Leader >> report_autoheal_status >> Winner +%% The leader asks the autoheal status to the winner. This only +%% happens when the leader is a loser too. If this is not the case, +%% this message is never sent. +%% +%% 6. Winner >> {autoheal_finished, Winner} >> Leader +%% The winner notifies the leader that the autoheal process was +%% either finished or aborted (ie. autoheal_safe_to_start was sent +%% to losers). %%---------------------------------------------------------------------------- -init() -> not_healing. +init() -> + %% We check the application environment for a saved autoheal state + %% saved during a restart. If this node is a leader, it is used + %% to determine if it needs to ask the winner to report about the + %% autoheal progress. + State = case application:get_env(rabbit, ?AUTOHEAL_STATE_AFTER_RESTART) of + {ok, S} -> S; + undefined -> not_healing + end, + ok = application:unset_env(rabbit, ?AUTOHEAL_STATE_AFTER_RESTART), + case State of + {leader_waiting, Winner, _} -> + rabbit_log:info( + "Autoheal: in progress, requesting report from ~p~n", [Winner]), + send(Winner, report_autoheal_status); + _ -> + ok + end, + State. maybe_start(not_healing) -> case enabled() of - true -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)), + true -> Leader = leader(), send(Leader, {request_start, node()}), rabbit_log:info("Autoheal request sent to ~p~n", [Leader]), not_healing; @@ -90,8 +146,15 @@ maybe_start(State) -> State. enabled() -> - {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling). + case application:get_env(rabbit, cluster_partition_handling) of + {ok, autoheal} -> true; + {ok, {pause_if_all_down, _, autoheal}} -> true; + _ -> false + end. +leader() -> + [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)), + Leader. %% This is the winner receiving its last notification that a node has %% stopped - all nodes can now start again @@ -102,14 +165,13 @@ rabbit_down(Node, {winner_waiting, [Node], Notify}) -> rabbit_down(Node, {winner_waiting, WaitFor, Notify}) -> {winner_waiting, WaitFor -- [Node], Notify}; -rabbit_down(Node, {leader_waiting, [Node]}) -> - not_healing; - -rabbit_down(Node, {leader_waiting, WaitFor}) -> - {leader_waiting, WaitFor -- [Node]}; +rabbit_down(Winner, {leader_waiting, Winner, Losers}) -> + abort([Winner], Losers); rabbit_down(_Node, State) -> - %% ignore, we already cancelled the autoheal process + %% Ignore. Either: + %% o we already cancelled the autoheal process; + %% o we are still waiting the winner's report. State. node_down(_Node, not_healing) -> @@ -141,15 +203,10 @@ handle_msg({request_start, Node}, case node() =:= Winner of true -> handle_msg({become_winner, Losers}, not_healing, Partitions); - false -> send(Winner, {become_winner, Losers}), %% [0] - case lists:member(node(), Losers) of - true -> about_to_heal; - false -> {leader_waiting, Losers} - end + false -> send(Winner, {become_winner, Losers}), + {leader_waiting, Winner, Losers} end end; -%% [0] If we are a loser we will never receive this message - but it -%% won't stick in the mailbox as we are restarting anyway handle_msg({request_start, Node}, State, _Partitions) -> @@ -170,27 +227,49 @@ handle_msg({become_winner, Losers}, _ -> abort(Down, Losers) end; -handle_msg({winner_is, Winner}, - State, _Partitions) - when State =:= not_healing orelse State =:= about_to_heal -> - rabbit_log:warning( - "Autoheal: we were selected to restart; winner is ~p~n", [Winner]), - rabbit_node_monitor:run_outside_applications( - fun () -> - MRef = erlang:monitor(process, {?SERVER, Winner}), - rabbit:stop(), - receive - {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok; - autoheal_safe_to_start -> ok - end, - erlang:demonitor(MRef, [flush]), - rabbit:start() - end), +handle_msg({winner_is, Winner}, State = not_healing, + _Partitions) -> + %% This node is a loser, nothing else. + restart_loser(State, Winner), + restarting; +handle_msg({winner_is, Winner}, State = {leader_waiting, Winner, _}, + _Partitions) -> + %% This node is the leader and a loser at the same time. + restart_loser(State, Winner), restarting; handle_msg(_, restarting, _Partitions) -> %% ignore, we can contribute no further - restarting. + restarting; + +handle_msg(report_autoheal_status, not_healing, _Partitions) -> + %% The leader is asking about the autoheal status to us (the + %% winner). This happens when the leader is a loser and it just + %% restarted. We are in the "not_healing" state, so the previous + %% autoheal process ended: let's tell this to the leader. + send(leader(), {autoheal_finished, node()}), + not_healing; + +handle_msg(report_autoheal_status, State, _Partitions) -> + %% Like above, the leader is asking about the autoheal status. We + %% are not finished with it. There is no need to send anything yet + %% to the leader: we will send the notification when it is over. + State; + +handle_msg({autoheal_finished, Winner}, + {leader_waiting, Winner, _}, _Partitions) -> + %% The winner is finished with the autoheal process and notified us + %% (the leader). We can transition to the "not_healing" state and + %% accept new requests. + rabbit_log:info("Autoheal finished according to winner ~p~n", [Winner]), + not_healing; + +handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) + when Winner =:= node() -> + %% We are the leader and the winner. The state already transitioned + %% to "not_healing" at the end of the autoheal process. + rabbit_log:info("Autoheal finished according to winner ~p~n", [node()]), + not_healing. %%---------------------------------------------------------------------------- @@ -215,6 +294,7 @@ winner_finish(Notify) -> %% losing nodes before sending the "autoheal_safe_to_start" signal. wait_for_mnesia_shutdown(Notify), [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], + send(leader(), {autoheal_finished, node()}), not_healing. wait_for_mnesia_shutdown([Node | Rest] = AllNodes) -> @@ -233,6 +313,35 @@ wait_for_mnesia_shutdown([Node | Rest] = AllNodes) -> wait_for_mnesia_shutdown([]) -> ok. +restart_loser(State, Winner) -> + rabbit_log:warning( + "Autoheal: we were selected to restart; winner is ~p~n", [Winner]), + rabbit_node_monitor:run_outside_applications( + fun () -> + MRef = erlang:monitor(process, {?SERVER, Winner}), + rabbit:stop(), + NextState = receive + {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> + not_healing; + autoheal_safe_to_start -> + State + end, + erlang:demonitor(MRef, [flush]), + %% During the restart, the autoheal state is lost so we + %% store it in the application environment temporarily so + %% init/0 can pick it up. + %% + %% This is useful to the leader which is a loser at the + %% same time: because the leader is restarting, there + %% is a great chance it misses the "autoheal finished!" + %% notification from the winner. Thanks to the saved + %% state, it knows it needs to ask the winner if the + %% autoheal process is finished or not. + application:set_env(rabbit, + ?AUTOHEAL_STATE_AFTER_RESTART, NextState), + rabbit:start() + end, true). + make_decision(AllPartitions) -> Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 4ce133c3e3..55c8c971a0 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -22,7 +22,8 @@ messages_unacknowledged_ram, messages_persistent, message_bytes, message_bytes_ready, message_bytes_unacknowledged, message_bytes_ram, - message_bytes_persistent, backing_queue_status]). + message_bytes_persistent, + disk_reads, disk_writes, backing_queue_status]). -ifdef(use_specs). @@ -30,6 +31,7 @@ -type(ack() :: any()). -type(state() :: any()). +-type(flow() :: 'flow' | 'noflow'). -type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). @@ -99,19 +101,20 @@ %% Publish a message. -callback publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), boolean(), pid(), + rabbit_types:message_properties(), boolean(), pid(), flow(), state()) -> state(). %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). -callback publish_delivered(rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) + rabbit_types:message_properties(), pid(), flow(), + state()) -> {ack(), state()}. %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. --callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). +-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). @@ -249,8 +252,8 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5}, - {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, + {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 6}, + {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 67109e7d5a..cd2846c00b 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -114,7 +114,7 @@ publish(X, Delivery) -> delivery(Mandatory, Confirm, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), - message = Message, msg_seq_no = MsgSeqNo}. + message = Message, msg_seq_no = MsgSeqNo, flow = noflow}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> build_content(Properties, [BodyBin]); diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 3ab82cad78..ee8147f45f 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -41,48 +41,90 @@ %% parse_table supports the AMQP 0-8/0-9 standard types, S, I, D, T %% and F, as well as the QPid extensions b, d, f, l, s, t, x, and V. +-define(SIMPLE_PARSE_TABLE(BType, Pattern, RType), + parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + BType, Pattern, Rest/binary>>) -> + [{NameString, RType, Value} | parse_table(Rest)]). + +%% Note that we try to put these in approximately the order we expect +%% to hit them, that's why the empty binary is half way through. + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{NameString, longstr, Value} | parse_table(Rest)]; + +?SIMPLE_PARSE_TABLE($I, Value:32/signed, signedint); +?SIMPLE_PARSE_TABLE($T, Value:64/unsigned, timestamp); + parse_table(<<>>) -> []; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, ValueAndRest/binary>>) -> - {Type, Value, Rest} = parse_field_value(ValueAndRest), - [{NameString, Type, Value} | parse_table(Rest)]. -parse_array(<<>>) -> - []; -parse_array(<<ValueAndRest/binary>>) -> - {Type, Value, Rest} = parse_field_value(ValueAndRest), - [{Type, Value} | parse_array(Rest)]. +?SIMPLE_PARSE_TABLE($b, Value:8/signed, byte); +?SIMPLE_PARSE_TABLE($d, Value:64/float, double); +?SIMPLE_PARSE_TABLE($f, Value:32/float, float); +?SIMPLE_PARSE_TABLE($l, Value:64/signed, long); +?SIMPLE_PARSE_TABLE($s, Value:16/signed, short); + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $t, Value:8/unsigned, Rest/binary>>) -> + [{NameString, bool, (Value /= 0)} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> + [{NameString, decimal, {Before, After}} | parse_table(Rest)]; -parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> - {longstr, V, R}; +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{NameString, table, parse_table(Value)} | parse_table(Rest)]; -parse_field_value(<<$I, V:32/signed, R/binary>>) -> - {signedint, V, R}; +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{NameString, array, parse_array(Value)} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{NameString, binary, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $V, Rest/binary>>) -> + [{NameString, void, undefined} | parse_table(Rest)]. + +-define(SIMPLE_PARSE_ARRAY(BType, Pattern, RType), + parse_array(<<BType, Pattern, Rest/binary>>) -> + [{RType, Value} | parse_array(Rest)]). + +parse_array(<<$S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{longstr, Value} | parse_array(Rest)]; + +?SIMPLE_PARSE_ARRAY($I, Value:32/signed, signedint); +?SIMPLE_PARSE_ARRAY($T, Value:64/unsigned, timestamp); + +parse_array(<<>>) -> + []; -parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) -> - {decimal, {Before, After}, R}; +?SIMPLE_PARSE_ARRAY($b, Value:8/signed, byte); +?SIMPLE_PARSE_ARRAY($d, Value:64/float, double); +?SIMPLE_PARSE_ARRAY($f, Value:32/float, float); +?SIMPLE_PARSE_ARRAY($l, Value:64/signed, long); +?SIMPLE_PARSE_ARRAY($s, Value:16/signed, short); -parse_field_value(<<$T, V:64/unsigned, R/binary>>) -> - {timestamp, V, R}; +parse_array(<<$t, Value:8/unsigned, Rest/binary>>) -> + [{bool, (Value /= 0)} | parse_array(Rest)]; -parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> - {table, parse_table(Table), R}; +parse_array(<<$D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> + [{decimal, {Before, After}} | parse_array(Rest)]; -parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> - {array, parse_array(Array), R}; +parse_array(<<$F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{table, parse_table(Value)} | parse_array(Rest)]; -parse_field_value(<<$b, V:8/signed, R/binary>>) -> {byte, V, R}; -parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R}; -parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R}; -parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R}; -parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R}; -parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; +parse_array(<<$A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{array, parse_array(Value)} | parse_array(Rest)]; -parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> - {binary, V, R}; +parse_array(<<$x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{binary, Value} | parse_array(Rest)]; -parse_field_value(<<$V, R/binary>>) -> - {void, undefined, R}. +parse_array(<<$V, Rest/binary>>) -> + [{void, undefined} | parse_array(Rest)]. ensure_content_decoded(Content = #content{properties = Props}) when Props =/= none -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8632e1b3f7..63a5eb7e79 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -581,7 +581,8 @@ check_user_id_header(#'P_basic'{user_id = Username}, #ch{user = #user{username = Username}}) -> ok; check_user_id_header( - #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) -> + #'P_basic'{}, #ch{user = #user{authz_backends = + [{rabbit_auth_backend_dummy, _}]}}) -> ok; check_user_id_header(#'P_basic'{user_id = Claimed}, #ch{user = #user{username = Actual, @@ -660,7 +661,7 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> check_not_default_exchange(_) -> ok. -check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>, +check_exchange_deletion(XName = #resource{name = <<"amq.", _/binary>>, kind = exchange}) -> rabbit_misc:protocol_error( access_refused, "deletion of system ~s not allowed", @@ -789,12 +790,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_in(Message, ConnName, ChannelNum, - Username, TraceState), Delivery = rabbit_basic:delivery( Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), - DQ = {Delivery, QNames}, + rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, + Username, TraceState), + DQ = {Delivery#delivery{flow = flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), @@ -1665,7 +1666,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ DelQNames}, State = #ch{queue_names = QNames, queue_monitors = QMons}) -> Qs = rabbit_amqqueue:lookup(DelQNames), - DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery), + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), %% The pmon:monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean @@ -1735,7 +1736,7 @@ send_nacks(_, State) -> send_confirms(State = #ch{tx = none, confirmed = []}) -> State; send_confirms(State = #ch{tx = none, confirmed = C}) -> - case rabbit_node_monitor:pause_minority_guard() of + case rabbit_node_monitor:pause_partition_guard() of ok -> MsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> @@ -1747,7 +1748,7 @@ send_confirms(State = #ch{tx = none, confirmed = C}) -> pausing -> State end; send_confirms(State) -> - case rabbit_node_monitor:pause_minority_guard() of + case rabbit_node_monitor:pause_partition_guard() of ok -> maybe_complete_tx(State); pausing -> State end. diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl index 47505b3df3..cf7b608399 100644 --- a/src/rabbit_cli.erl +++ b/src/rabbit_cli.erl @@ -17,7 +17,8 @@ -module(rabbit_cli). -include("rabbit_cli.hrl"). --export([main/3, parse_arguments/4, rpc_call/4]). +-export([main/3, start_distribution/0, start_distribution/1, + parse_arguments/4, rpc_call/4]). %%---------------------------------------------------------------------------- @@ -31,6 +32,8 @@ -spec(main/3 :: (fun (([string()], string()) -> parse_result()), fun ((atom(), atom(), [any()], [any()]) -> any()), atom()) -> no_return()). +-spec(start_distribution/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_distribution/1 :: (string()) -> {'ok', pid()} | {'error', any()}). -spec(usage/1 :: (atom()) -> no_return()). -spec(parse_arguments/4 :: ([{atom(), [{string(), optdef()}]} | atom()], @@ -42,6 +45,8 @@ %%---------------------------------------------------------------------------- main(ParseFun, DoFun, UsageMod) -> + error_logger:tty(false), + start_distribution(), {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {Command, Opts, Args} = case ParseFun(init:get_plain_arguments(), NodeStr) of @@ -101,6 +106,20 @@ main(ParseFun, DoFun, UsageMod) -> rabbit_misc:quit(2) end. +start_distribution() -> + start_distribution(list_to_atom( + rabbit_misc:format("rabbitmq-cli-~s", [os:getpid()]))). + +start_distribution(Name) -> + rabbit_nodes:ensure_epmd(), + net_kernel:start([Name, name_type()]). + +name_type() -> + case os:getenv("RABBITMQ_USE_LONGNAME") of + "true" -> longnames; + _ -> shortnames + end. + usage(Mod) -> io:format("~s", [Mod:usage()]), rabbit_misc:quit(1). diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index a931eef009..810f5f6f33 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -19,7 +19,7 @@ -include("rabbit_cli.hrl"). -export([start/0, stop/0, parse_arguments/2, action/5, - sync_queue/1, cancel_sync_queue/1]). + sync_queue/1, cancel_sync_queue/1, become/1]). -import(rabbit_cli, [rpc_call/4]). @@ -40,6 +40,7 @@ change_cluster_node_type, update_cluster_nodes, {forget_cluster_node, [?OFFLINE_DEF]}, + rename_cluster_node, force_boot, cluster_status, {sync_queue, [?VHOST_DEF]}, @@ -104,8 +105,8 @@ -define(COMMANDS_NOT_REQUIRING_APP, [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, join_cluster, change_cluster_node_type, update_cluster_nodes, - forget_cluster_node, cluster_status, status, environment, eval, - force_boot]). + forget_cluster_node, rename_cluster_node, cluster_status, status, + environment, eval, force_boot]). %%---------------------------------------------------------------------------- @@ -123,7 +124,6 @@ %%---------------------------------------------------------------------------- start() -> - start_distribution(), rabbit_cli:main( fun (Args, NodeStr) -> parse_arguments(Args, NodeStr) @@ -234,6 +234,13 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> [ClusterNode, false]) end; +action(rename_cluster_node, Node, NodesS, _Opts, Inform) -> + Nodes = split_list([list_to_atom(N) || N <- NodesS]), + Inform("Renaming cluster nodes:~n~s~n", + [lists:flatten([rabbit_misc:format(" ~s -> ~s~n", [F, T]) || + {F, T} <- Nodes])]), + rabbit_mnesia_rename:rename(Node, Nodes); + action(force_boot, Node, [], _Opts, Inform) -> Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]), case rabbit:is_running(Node) of @@ -518,7 +525,7 @@ wait_for_startup(Node, Pid) -> Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end). while_process_is_alive(Node, Pid, Activity) -> - case process_up(Pid) of + case rabbit_misc:is_os_process_alive(Pid) of true -> case Activity() of true -> ok; false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), @@ -528,7 +535,7 @@ while_process_is_alive(Node, Pid, Activity) -> end. wait_for_process_death(Pid) -> - case process_up(Pid) of + case rabbit_misc:is_os_process_alive(Pid) of true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), wait_for_process_death(Pid); false -> ok @@ -552,62 +559,18 @@ read_pid_file(PidFile, Wait) -> exit({error, {could_not_read_pid, E}}) end. -% Test using some OS clunkiness since we shouldn't trust -% rpc:call(os, getpid, []) at this point -process_up(Pid) -> - with_os([{unix, fun () -> - run_ps(Pid) =:= 0 - end}, - {win32, fun () -> - Cmd = "tasklist /nh /fi \"pid eq " ++ Pid ++ "\" ", - Res = rabbit_misc:os_cmd(Cmd ++ "2>&1"), - case re:run(Res, "erl\\.exe", [{capture, none}]) of - match -> true; - _ -> false - end - end}]). - -with_os(Handlers) -> - {OsFamily, _} = os:type(), - case proplists:get_value(OsFamily, Handlers) of - undefined -> throw({unsupported_os, OsFamily}); - Handler -> Handler() - end. - -run_ps(Pid) -> - Port = erlang:open_port({spawn, "ps -p " ++ Pid}, - [exit_status, {line, 16384}, - use_stdio, stderr_to_stdout]), - exit_loop(Port). - -exit_loop(Port) -> - receive - {Port, {exit_status, Rc}} -> Rc; - {Port, _} -> exit_loop(Port) - end. - -start_distribution() -> - CtlNodeName = rabbit_misc:format("rabbitmqctl-~s", [os:getpid()]), - {ok, _} = net_kernel:start([list_to_atom(CtlNodeName), name_type()]). - become(BecomeNode) -> + error_logger:tty(false), + ok = net_kernel:stop(), case net_adm:ping(BecomeNode) of pong -> exit({node_running, BecomeNode}); pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]), - error_logger:tty(false), - ok = net_kernel:stop(), - {ok, _} = net_kernel:start([BecomeNode, name_type()]), + {ok, _} = rabbit_cli:start_distribution(BecomeNode), io:format(" done~n", []), Dir = mnesia:system_info(directory), io:format(" * Mnesia directory : ~s~n", [Dir]) end. -name_type() -> - case os:getenv("RABBITMQ_USE_LONGNAME") of - "true" -> longnames; - _ -> shortnames - end. - %%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> @@ -720,3 +683,7 @@ prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value); prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; prettify_typed_amqp_value(_Type, Value) -> Value. + +split_list([]) -> []; +split_list([_]) -> exit(even_list_needed); +split_list([A, B | T]) -> [{A, B} | split_list(T)]. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 749a67b1d5..11233e7eb8 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -83,16 +83,27 @@ connect({Username, Password}, VHost, Protocol, Pid, Infos) -> connect0(AuthFun, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of true -> case AuthFun() of - {ok, User} -> + {ok, User = #user{username = Username}} -> + notify_auth_result(Username, + user_authentication_success, []), connect1(User, VHost, Protocol, Pid, Infos); - {refused, _M, _A} -> + {refused, Username, Msg, Args} -> + notify_auth_result(Username, + user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}]), {error, {auth_failure, "Refused"}} end; false -> {error, broker_not_found_on_node} end. +notify_auth_result(Username, AuthResult, ExtraProps) -> + EventProps = [{connection_type, direct}, + {name, case Username of none -> ''; _ -> Username end}] ++ + ExtraProps, + rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). + connect1(User, VHost, Protocol, Pid, Infos) -> - try rabbit_access_control:check_vhost_access(User, VHost) of + try rabbit_access_control:check_vhost_access(User, VHost, undefined) of ok -> ok = pg_local:join(rabbit_direct, Pid), rabbit_event:notify(connection_created, Infos), {ok, {User, rabbit_reader:server_properties(Protocol)}} diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl new file mode 100644 index 0000000000..261554921a --- /dev/null +++ b/src/rabbit_epmd_monitor.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_epmd_monitor). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-record(state, {timer, mod, me, host, port}). + +-define(SERVER, ?MODULE). +-define(CHECK_FREQUENCY, 60000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- +%% It's possible for epmd to be killed out from underneath us. If that +%% happens, then obviously clustering and rabbitmqctl stop +%% working. This process checks up on epmd and restarts it / +%% re-registers us with it if it has gone away. +%% +%% How could epmd be killed? +%% +%% 1) The most popular way for this to happen is when running as a +%% Windows service. The user starts rabbitmqctl first, and this starts +%% epmd under the user's account. When they log out epmd is killed. +%% +%% 2) Some packagings of (non-RabbitMQ?) Erlang apps might do "killall +%% epmd" as a shutdown or uninstall step. +%% ---------------------------------------------------------------------------- + +start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + {Me, Host} = rabbit_nodes:parts(node()), + Mod = net_kernel:epmd_module(), + {port, Port, _Version} = Mod:port_please(Me, Host), + {ok, ensure_timer(#state{mod = Mod, + me = Me, + host = Host, + port = Port})}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(check, State) -> + check_epmd(State), + {noreply, ensure_timer(State#state{timer = undefined})}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +ensure_timer(State) -> + rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check). + +check_epmd(#state{mod = Mod, + me = Me, + host = Host, + port = Port}) -> + case Mod:port_please(Me, Host) of + noport -> rabbit_log:warning( + "epmd does not know us, re-registering ~s at port ~b~n", + [Me, Port]), + rabbit_nodes:ensure_epmd(), + erl_epmd:register_node(Me, Port); + _ -> ok + end. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index f32a187d32..82f4275a3e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -22,7 +22,7 @@ %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with -%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4. +%% rabbit_amqqueue:basic_consume/10, and rabbit_amqqueue:basic_get/4. %% The latter isn't strictly necessary, since basic.get is not %% subject to limiting, but it means that whenever a queue knows about %% a channel, it also knows about its limiter, which is less fiddly. diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index e05ef05af4..6f0865b300 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -16,7 +16,8 @@ -module(rabbit_log). --export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). +-export([log/3, log/4, debug/1, debug/2, info/1, info/2, warning/1, + warning/2, error/1, error/2]). -export([with_local_io/1]). %%---------------------------------------------------------------------------- @@ -26,11 +27,13 @@ -export_type([level/0]). -type(category() :: atom()). --type(level() :: 'info' | 'warning' | 'error'). +-type(level() :: 'debug' | 'info' | 'warning' | 'error'). -spec(log/3 :: (category(), level(), string()) -> 'ok'). -spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). +-spec(debug/1 :: (string()) -> 'ok'). +-spec(debug/2 :: (string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). -spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). @@ -50,6 +53,7 @@ log(Category, Level, Fmt, Args) when is_list(Args) -> case level(Level) =< catlevel(Category) of false -> ok; true -> F = case Level of + debug -> fun error_logger:info_msg/2; info -> fun error_logger:info_msg/2; warning -> fun error_logger:warning_msg/2; error -> fun error_logger:error_msg/2 @@ -57,6 +61,8 @@ log(Category, Level, Fmt, Args) when is_list(Args) -> with_local_io(fun () -> F(Fmt, Args) end) end. +debug(Fmt) -> log(default, debug, Fmt). +debug(Fmt, Args) -> log(default, debug, Fmt, Args). info(Fmt) -> log(default, info, Fmt). info(Fmt, Args) -> log(default, info, Fmt, Args). warning(Fmt) -> log(default, warning, Fmt). @@ -75,6 +81,7 @@ catlevel(Category) -> %%-------------------------------------------------------------------- +level(debug) -> 4; level(info) -> 3; level(warning) -> 2; level(error) -> 1; diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index d328488298..38273e1c28 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -353,9 +353,10 @@ handle_cast({gm_deaths, DeadGMPids}, when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue( QueueName, MPid, DeadGMPids) of - {ok, MPid, DeadPids} -> + {ok, MPid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index aa1e1ab9ee..0c05729292 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,8 +17,8 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, purge_acks/1, publish/5, publish_delivered/4, - discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, + purge/1, purge_acks/1, publish/6, publish_delivered/5, + discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, @@ -230,37 +230,38 @@ purge(State = #state { gm = GM, purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}). -publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, +publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, + ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), - BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, - ChPid, State = #state { gm = GM, - seen_status = SS, - backing_queue = BQ, - backing_queue_state = BQS }) -> + ChPid, Flow, State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, + ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), - {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. -discard(MsgId, ChPid, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> +discard(MsgId, ChPid, Flow, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {discard, ChPid, MsgId}), - ensure_monitoring(ChPid, State #state { backing_queue_state = - BQ:discard(MsgId, ChPid, BQS) }). + ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), + ensure_monitoring(ChPid, + State #state { backing_queue_state = + BQ:discard(MsgId, ChPid, Flow, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 826b6927a1..ce63f7af1d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -49,7 +49,7 @@ -spec(remove_from_queue/3 :: (rabbit_amqqueue:name(), pid(), [pid()]) - -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). + -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). -spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async') -> 'ok'). @@ -70,7 +70,7 @@ %%---------------------------------------------------------------------------- -%% Returns {ok, NewMPid, DeadPids} +%% Returns {ok, NewMPid, DeadPids, ExtraNodes} remove_from_queue(QueueName, Self, DeadGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -78,10 +78,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids, - gm_pids = GMPids, - down_slave_nodes = DSNs}] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids, + gm_pids = GMPids }] -> {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> lists:member(GM, DeadGMPids) @@ -90,36 +89,35 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> AlivePids = [Pid || {_GM, Pid} <- AliveGM], Alive = [Pid || Pid <- [QPid | SPids], lists:member(Pid, AlivePids)], - DSNs1 = [node(Pid) || - Pid <- SPids, - not lists:member(Pid, AlivePids)] ++ DSNs, {QPid1, SPids1} = promote_slave(Alive), - case {{QPid, SPids}, {QPid1, SPids1}} of - {Same, Same} -> - ok; - _ when QPid =:= QPid1 orelse QPid1 =:= Self -> - %% Either master hasn't changed, so - %% we're ok to update mnesia; or we have - %% become the master. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM, - down_slave_nodes = DSNs1}, - store_updated_slaves(Q1), - %% If we add and remove nodes at the same time we - %% might tell the old master we need to sync and - %% then shut it down. So let's check if the new - %% master needs to sync. - maybe_auto_sync(Q1); + Extra = + case {{QPid, SPids}, {QPid1, SPids1}} of + {Same, Same} -> + []; + _ when QPid =:= QPid1 orelse QPid1 =:= Self -> + %% Either master hasn't changed, so + %% we're ok to update mnesia; or we have + %% become the master. + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM}, + store_updated_slaves(Q1), + %% If we add and remove nodes at the + %% same time we might tell the old + %% master we need to sync and then + %% shut it down. So let's check if + %% the new master needs to sync. + maybe_auto_sync(Q1), + slaves_to_start_on_failure(Q1, DeadGMPids); _ -> - %% Master has changed, and we're not it. - %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM, - down_slave_nodes = DSNs1}, - store_updated_slaves(Q1) - end, - {ok, QPid1, DeadPids} + %% Master has changed, and we're not it. + %% [1]. + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM}, + store_updated_slaves(Q1), + [] + end, + {ok, QPid1, DeadPids, Extra} end end). %% [1] We still update mnesia here in case the slave that is supposed @@ -145,6 +143,17 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% aforementioned restriction on updating the master pid, that pid may %% not be present in gm_pids, but only if said master has died. +%% Sometimes a slave dying means we need to start more on other +%% nodes - "exactly" mode can cause this to happen. +slaves_to_start_on_failure(Q, DeadGMPids) -> + %% In case Mnesia has not caught up yet, filter out nodes we know + %% to be dead.. + ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- + [node(P) || P <- DeadGMPids], + {_, OldNodes, _} = actual_queue_nodes(Q), + {_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes), + NewNodes -- OldNodes. + on_node_up() -> QNames = rabbit_misc:execute_mnesia_transaction( @@ -234,22 +243,39 @@ log(Level, QName, Fmt, Args) -> rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). -store_updated_slaves(Q = #amqqueue{pid = MPid, - slave_pids = SPids, - sync_slave_pids = SSPids, - down_slave_nodes = DSNs}) -> +store_updated_slaves(Q = #amqqueue{slave_pids = SPids, + sync_slave_pids = SSPids, + recoverable_slaves = RS}) -> %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, %% do we still need this filtering? SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], - DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], - Q1 = Q#amqqueue{sync_slave_pids = SSPids1, - down_slave_nodes = DSNs1, - state = live}, + Q1 = Q#amqqueue{sync_slave_pids = SSPids1, + recoverable_slaves = update_recoverable(SPids, RS), + state = live}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), Q1. +%% Recoverable nodes are those which we could promote if the whole +%% cluster were to suddenly stop and we then lose the master; i.e. all +%% nodes with running slaves, and all stopped nodes which had running +%% slaves when they were up. +%% +%% Therefore we aim here to add new nodes with slaves, and remove +%% running nodes without slaves, We also try to keep the order +%% constant, and similar to the live SPids field (i.e. oldest +%% first). That's not necessarily optimal if nodes spend a long time +%% down, but we don't have a good way to predict what the optimal is +%% in that case anyway, and we assume nodes will not just be down for +%% a long time without being removed. +update_recoverable(SPids, RS) -> + SNodes = [node(SPid) || SPid <- SPids], + RunningNodes = rabbit_mnesia:cluster_nodes(running), + AddNodes = SNodes -- RS, + DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave + (RS -- DelNodes) ++ AddNodes. + %%---------------------------------------------------------------------------- promote_slave([SPid | SPids]) -> @@ -344,6 +370,13 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], + %% When a mirror dies, remove_from_queue/2 might have to add new + %% slaves (in "exactly" mode). It will check mnesia to see which + %% slaves there currently are. If drop_mirror/2 is invoked first + %% then when we end up in remove_from_queue/2 it will not see the + %% slaves that add_mirror/2 will add, and also want to add them + %% (even though we are not responding to the death of a + %% mirror). Breakage ensues. add_mirrors (QName, NewNodes -- OldNodes, async), drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c64e35599c..96515f5c35 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -206,21 +206,28 @@ handle_call({gm_deaths, DeadGMPids}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids} -> + {ok, Pid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids), case Pid of MPid -> %% master hasn't changed gen_server2:reply(From, ok), + rabbit_mirror_queue_misc:add_mirrors( + QName, ExtraNodes, async), noreply(State); Self -> %% we've become master QueueState = promote_me(From, State), + rabbit_mirror_queue_misc:add_mirrors( + QName, ExtraNodes, async), {become, rabbit_amqqueue_process, QueueState, hibernate}; _ -> %% master has changed to not us gen_server2:reply(From, ok), + %% assertion, we don't need to add_mirrors/2 in this + %% branch, see last clause in remove_from_queue/2 + [] = ExtraNodes, %% Since GM is by nature lazy we need to make sure %% there is some traffic when a master dies, to %% make sure all slaves get informed of the @@ -246,7 +253,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> %% Asynchronous, non-"mandatory", deliver mode. case Flow of @@ -631,7 +638,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery#delivery{mandatory = false} || %% [0] + Deliveries = [promote_delivery(Delivery) || {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], @@ -643,8 +650,16 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). -%% [0] We reset mandatory to false here because we will have sent the -%% mandatory_received already as soon as we got the message +%% We reset mandatory to false here because we will have sent the +%% mandatory_received already as soon as we got the message. We also +%% need to send an ack for these messages since the channel is waiting +%% for one for the via-GM case and we will not now receive one. +promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, + Delivery#delivery{mandatory = false}. noreply(State) -> {NewState, Timeout} = next_state(State), @@ -826,24 +841,27 @@ publish_or_discard(Status, ChPid, MsgId, State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. -process_instruction({publish, ChPid, MsgProps, +process_instruction({publish, ChPid, Flow, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), - BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; -process_instruction({publish_delivered, ChPid, MsgProps, +process_instruction({publish_delivered, ChPid, Flow, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), true = BQ:is_empty(BQS), - {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), {ok, maybe_store_ack(true, MsgId, AckTag, State1 #state { backing_queue_state = BQS1 })}; -process_instruction({discard, ChPid, MsgId}, State) -> +process_instruction({discard, ChPid, Flow, MsgId}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(discarded, ChPid, MsgId, State), - BQS1 = BQ:discard(MsgId, ChPid, BQS), + BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, @@ -902,6 +920,9 @@ process_instruction({delete_and_terminate, Reason}, BQ:delete_and_terminate(Reason, BQS), {stop, State #state { backing_queue_state = undefined }}. +maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid); +maybe_flow_ack(_ChPid, noflow) -> ok. + msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = lists:foldl( diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index ee1d210556..9a8d55f94b 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -263,9 +263,10 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, Props1 = Props#message_properties{needs_confirming = false}, {MA1, BQS1} = case Unacked of - false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)}; + false -> {MA, + BQ:publish(Msg, Props1, true, none, noflow, BQS)}; true -> {AckTag, BQS2} = BQ:publish_delivered( - Msg, Props1, none, BQS), + Msg, Props1, none, noflow, BQS), {[{Msg#basic_message.id, AckTag} | MA], BQS2} end, slave_sync_loop(Args, {MA1, TRef, BQS1}); diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3e2c88ee34..5e9c7ceb40 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -44,7 +44,8 @@ -export([format/2, format_many/1, format_stderr/2]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). --export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]). +-export([pid_to_string/1, string_to_pid/1, + pid_change_node/2, node_to_fake_pid/1]). -export([version_compare/2, version_compare/3]). -export([version_minor_equivalent/2]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). @@ -58,6 +59,7 @@ -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). -export([os_cmd/1]). +-export([is_os_process_alive/1]). -export([gb_sets_difference/2]). -export([version/0, otp_release/0, which_applications/0]). -export([sequence_error/1]). @@ -196,6 +198,7 @@ (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(pid_change_node/2 :: (pid(), node()) -> pid()). -spec(node_to_fake_pid/1 :: (atom()) -> pid()). -spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). -spec(version_compare/3 :: @@ -230,6 +233,7 @@ -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). -spec(os_cmd/1 :: (string()) -> string()). +-spec(is_os_process_alive/1 :: (non_neg_integer()) -> boolean()). -spec(gb_sets_difference/2 :: (gb_sets:set(), gb_sets:set()) -> gb_sets:set()). -spec(version/0 :: () -> string()). -spec(otp_release/0 :: () -> string()). @@ -520,8 +524,12 @@ execute_mnesia_transaction(TxFun) -> Res = mnesia:sync_transaction(TxFun), DiskLogAfter = mnesia_dumper:get_log_writes(), case DiskLogAfter == DiskLogBefore of - true -> Res; - false -> {sync, Res} + true -> file_handle_cache_stats:update( + mnesia_ram_tx), + Res; + false -> file_handle_cache_stats:update( + mnesia_disk_tx), + {sync, Res} end; true -> mnesia:sync_transaction(TxFun) end @@ -686,11 +694,7 @@ sort_field_table(Arguments) -> %% regardless of what node we are running on. The representation also %% permits easy identification of the pid's node. pid_to_string(Pid) when is_pid(Pid) -> - %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and - %% 8.7) - <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> - = term_to_binary(Pid), - Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + {Node, Cre, Id, Ser} = decompose_pid(Pid), format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]). %% inverse of above @@ -701,17 +705,32 @@ string_to_pid(Str) -> case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$", [{capture,all_but_first,list}]) of {match, [NodeStr, CreStr, IdStr, SerStr]} -> - <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)), [Cre, Id, Ser] = lists:map(fun list_to_integer/1, [CreStr, IdStr, SerStr]), - binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>); + compose_pid(list_to_atom(NodeStr), Cre, Id, Ser); nomatch -> throw(Err) end. +pid_change_node(Pid, NewNode) -> + {_OldNode, Cre, Id, Ser} = decompose_pid(Pid), + compose_pid(NewNode, Cre, Id, Ser). + %% node(node_to_fake_pid(Node)) =:= Node. node_to_fake_pid(Node) -> - string_to_pid(format("<~s.0.0.0>", [Node])). + compose_pid(Node, 0, 0, 0). + +decompose_pid(Pid) when is_pid(Pid) -> + %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and + %% 8.7) + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + {Node, Cre, Id, Ser}. + +compose_pid(Node, Cre, Id, Ser) -> + <<131,NodeEnc/binary>> = term_to_binary(Node), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>). version_compare(A, B, lte) -> case version_compare(A, B) of @@ -915,6 +934,38 @@ os_cmd(Command) -> end end. +is_os_process_alive(Pid) -> + with_os([{unix, fun () -> + run_ps(Pid) =:= 0 + end}, + {win32, fun () -> + Cmd = "tasklist /nh /fi \"pid eq " ++ Pid ++ "\" ", + Res = os_cmd(Cmd ++ "2>&1"), + case re:run(Res, "erl\\.exe", [{capture, none}]) of + match -> true; + _ -> false + end + end}]). + +with_os(Handlers) -> + {OsFamily, _} = os:type(), + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() + end. + +run_ps(Pid) -> + Port = erlang:open_port({spawn, "ps -p " ++ Pid}, + [exit_status, {line, 16384}, + use_stdio, stderr_to_stdout]), + exit_loop(Port). + +exit_loop(Port) -> + receive + {Port, {exit_status, Rc}} -> Rc; + {Port, _} -> exit_loop(Port) + end. + gb_sets_difference(S1, S2) -> gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 80fbcd6835..bde4221f0c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -109,27 +109,33 @@ init() -> %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - %% let's make it so. - ok = global:sync(), + ok = rabbit_node_monitor:global_sync(), ok. init_from_config() -> + FindBadNodeNames = fun + (Name, BadNames) when is_atom(Name) -> BadNames; + (Name, BadNames) -> [Name | BadNames] + end, {TryNodes, NodeType} = case application:get_env(rabbit, cluster_nodes) of + {ok, {Nodes, Type} = Config} + when is_list(Nodes) andalso (Type == disc orelse Type == ram) -> + case lists:foldr(FindBadNodeNames, [], Nodes) of + [] -> Config; + BadNames -> e({invalid_cluster_node_names, BadNames}) + end; + {ok, {_, BadType}} when BadType /= disc andalso BadType /= ram -> + e({invalid_cluster_node_type, BadType}); {ok, Nodes} when is_list(Nodes) -> - Config = {Nodes -- [node()], case lists:member(node(), Nodes) of - true -> disc; - false -> ram - end}, - rabbit_log:warning( - "Converting legacy 'cluster_nodes' configuration~n ~w~n" - "to~n ~w.~n~n" - "Please update the configuration to the new format " - "{Nodes, NodeType}, where Nodes contains the nodes that the " - "node will try to cluster with, and NodeType is either " - "'disc' or 'ram'~n", [Nodes, Config]), - Config; - {ok, Config} -> - Config + %% The legacy syntax (a nodes list without the node + %% type) is unsupported. + case lists:foldr(FindBadNodeNames, [], Nodes) of + [] -> e(cluster_node_type_mandatory); + _ -> e(invalid_cluster_nodes_conf) + end; + {ok, _} -> + e(invalid_cluster_nodes_conf) end, case TryNodes of [] -> init_db_and_upgrade([node()], disc, false); @@ -850,6 +856,20 @@ nodes_excl_me(Nodes) -> Nodes -- [node()]. e(Tag) -> throw({error, {Tag, error_description(Tag)}}). +error_description({invalid_cluster_node_names, BadNames}) -> + "In the 'cluster_nodes' configuration key, the following node names " + "are invalid: " ++ lists:flatten(io_lib:format("~p", [BadNames])); +error_description({invalid_cluster_node_type, BadType}) -> + "In the 'cluster_nodes' configuration key, the node type is invalid " + "(expected 'disc' or 'ram'): " ++ + lists:flatten(io_lib:format("~p", [BadType])); +error_description(cluster_node_type_mandatory) -> + "The 'cluster_nodes' configuration key must indicate the node type: " + "either {[...], disc} or {[...], ram}"; +error_description(invalid_cluster_nodes_conf) -> + "The 'cluster_nodes' configuration key is invalid, it must be of the " + "form {[Nodes], Type}, where Nodes is a list of node names and " + "Type is either 'disc' or 'ram'"; error_description(clustering_only_disc_node) -> "You cannot cluster a node if it is the only disc node in its existing " " cluster. If new nodes joined while this node was offline, use " diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl new file mode 100644 index 0000000000..2787cb743c --- /dev/null +++ b/src/rabbit_mnesia_rename.erl @@ -0,0 +1,267 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_mnesia_rename). +-include("rabbit.hrl"). + +-export([rename/2]). +-export([maybe_finish/1]). + +-define(CONVERT_TABLES, [schema, rabbit_durable_queue]). + +%% Supports renaming the nodes in the Mnesia database. In order to do +%% this, we take a backup of the database, traverse the backup +%% changing node names and pids as we go, then restore it. +%% +%% That's enough for a standalone node, for clusters the story is more +%% complex. We can take pairs of nodes From and To, but backing up and +%% restoring the database changes schema cookies, so if we just do +%% this on all nodes the cluster will refuse to re-form with +%% "Incompatible schema cookies.". Therefore we do something similar +%% to what we do for upgrades - the first node in the cluster to +%% restart becomes the authority, and other nodes wipe their own +%% Mnesia state and rejoin. They also need to tell Mnesia the old node +%% is not coming back. +%% +%% If we are renaming nodes one at a time then the running cluster +%% might not be aware that a rename has taken place, so after we wipe +%% and rejoin we then update any tables (in practice just +%% rabbit_durable_queue) which should be aware that we have changed. + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(rename/2 :: (node(), [{node(), node()}]) -> 'ok'). +-spec(maybe_finish/1 :: ([node()]) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +rename(Node, NodeMapList) -> + try + %% Check everything is correct and figure out what we are + %% changing from and to. + {FromNode, ToNode, NodeMap} = prepare(Node, NodeMapList), + + %% We backup and restore Mnesia even if other nodes are + %% running at the time, and defer the final decision about + %% whether to use our mutated copy or rejoin the cluster until + %% we restart. That means we might be mutating our copy of the + %% database while the cluster is running. *Do not* contact the + %% cluster while this is happening, we are likely to get + %% confused. + application:set_env(kernel, dist_auto_connect, never), + + %% Take a copy we can restore from if we abandon the + %% rename. We don't restore from the "backup" since restoring + %% that changes schema cookies and might stop us rejoining the + %% cluster. + ok = rabbit_mnesia:copy_db(mnesia_copy_dir()), + + %% And make the actual changes + rabbit_control_main:become(FromNode), + take_backup(before_backup_name()), + convert_backup(NodeMap, before_backup_name(), after_backup_name()), + ok = rabbit_file:write_term_file(rename_config_name(), + [{FromNode, ToNode}]), + convert_config_files(NodeMap), + rabbit_control_main:become(ToNode), + restore_backup(after_backup_name()), + ok + after + stop_mnesia() + end. + +prepare(Node, NodeMapList) -> + %% If we have a previous rename and haven't started since, give up. + case rabbit_file:is_dir(dir()) of + true -> exit({rename_in_progress, + "Restart node under old name to roll back"}); + false -> ok = rabbit_file:ensure_dir(mnesia_copy_dir()) + end, + + %% Check we don't have two nodes mapped to the same node + {FromNodes, ToNodes} = lists:unzip(NodeMapList), + case length(FromNodes) - length(lists:usort(ToNodes)) of + 0 -> ok; + _ -> exit({duplicate_node, ToNodes}) + end, + + %% Figure out which node we are before and after the change + FromNode = case [From || {From, To} <- NodeMapList, + To =:= Node] of + [N] -> N; + [] -> Node + end, + NodeMap = dict:from_list(NodeMapList), + ToNode = case dict:find(FromNode, NodeMap) of + {ok, N2} -> N2; + error -> FromNode + end, + + %% Check that we are in the cluster, all old nodes are in the + %% cluster, and no new nodes are. + Nodes = rabbit_mnesia:cluster_nodes(all), + case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes), + lists:member(Node, Nodes ++ ToNodes)} of + {[], [], true} -> ok; + {[], [], false} -> exit({i_am_not_involved, Node}); + {F, [], _} -> exit({nodes_not_in_cluster, F}); + {_, T, _} -> exit({nodes_already_in_cluster, T}) + end, + {FromNode, ToNode, NodeMap}. + +take_backup(Backup) -> + start_mnesia(), + ok = mnesia:backup(Backup), + stop_mnesia(). + +restore_backup(Backup) -> + ok = mnesia:install_fallback(Backup, [{scope, local}]), + start_mnesia(), + stop_mnesia(), + rabbit_mnesia:force_load_next_boot(). + +maybe_finish(AllNodes) -> + case rabbit_file:read_term_file(rename_config_name()) of + {ok, [{FromNode, ToNode}]} -> finish(FromNode, ToNode, AllNodes); + _ -> ok + end. + +finish(FromNode, ToNode, AllNodes) -> + case node() of + ToNode -> + case rabbit_upgrade:nodes_running(AllNodes) of + [] -> finish_primary(FromNode, ToNode); + _ -> finish_secondary(FromNode, ToNode, AllNodes) + end; + FromNode -> + rabbit_log:info( + "Abandoning rename from ~s to ~s since we are still ~s~n", + [FromNode, ToNode, FromNode]), + [{ok, _} = file:copy(backup_of_conf(F), F) || F <- config_files()], + ok = rabbit_file:recursive_delete([rabbit_mnesia:dir()]), + ok = rabbit_file:recursive_copy( + mnesia_copy_dir(), rabbit_mnesia:dir()), + delete_rename_files(); + _ -> + %% Boot will almost certainly fail but we might as + %% well just log this + rabbit_log:info( + "Rename attempted from ~s to ~s but we are ~s - ignoring.~n", + [FromNode, ToNode, node()]) + end. + +finish_primary(FromNode, ToNode) -> + rabbit_log:info("Restarting as primary after rename from ~s to ~s~n", + [FromNode, ToNode]), + delete_rename_files(), + ok. + +finish_secondary(FromNode, ToNode, AllNodes) -> + rabbit_log:info("Restarting as secondary after rename from ~s to ~s~n", + [FromNode, ToNode]), + rabbit_upgrade:secondary_upgrade(AllNodes), + rename_in_running_mnesia(FromNode, ToNode), + delete_rename_files(), + ok. + +dir() -> rabbit_mnesia:dir() ++ "-rename". +before_backup_name() -> dir() ++ "/backup-before". +after_backup_name() -> dir() ++ "/backup-after". +rename_config_name() -> dir() ++ "/pending.config". +mnesia_copy_dir() -> dir() ++ "/mnesia-copy". + +delete_rename_files() -> ok = rabbit_file:recursive_delete([dir()]). + +start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + rabbit_table:force_load(), + rabbit_table:wait_for_replicated(). +stop_mnesia() -> stopped = mnesia:stop(). + +convert_backup(NodeMap, FromBackup, ToBackup) -> + mnesia:traverse_backup( + FromBackup, ToBackup, + fun + (Row, Acc) -> + case lists:member(element(1, Row), ?CONVERT_TABLES) of + true -> {[update_term(NodeMap, Row)], Acc}; + false -> {[Row], Acc} + end + end, switched). + +config_files() -> + [rabbit_node_monitor:running_nodes_filename(), + rabbit_node_monitor:cluster_status_filename()]. + +backup_of_conf(Path) -> + filename:join([dir(), filename:basename(Path)]). + +convert_config_files(NodeMap) -> + [convert_config_file(NodeMap, Path) || Path <- config_files()]. + +convert_config_file(NodeMap, Path) -> + {ok, Term} = rabbit_file:read_term_file(Path), + {ok, _} = file:copy(Path, backup_of_conf(Path)), + ok = rabbit_file:write_term_file(Path, update_term(NodeMap, Term)). + +lookup_node(OldNode, NodeMap) -> + case dict:find(OldNode, NodeMap) of + {ok, NewNode} -> NewNode; + error -> OldNode + end. + +mini_map(FromNode, ToNode) -> dict:from_list([{FromNode, ToNode}]). + +update_term(NodeMap, L) when is_list(L) -> + [update_term(NodeMap, I) || I <- L]; +update_term(NodeMap, T) when is_tuple(T) -> + list_to_tuple(update_term(NodeMap, tuple_to_list(T))); +update_term(NodeMap, Node) when is_atom(Node) -> + lookup_node(Node, NodeMap); +update_term(NodeMap, Pid) when is_pid(Pid) -> + rabbit_misc:pid_change_node(Pid, lookup_node(node(Pid), NodeMap)); +update_term(_NodeMap, Term) -> + Term. + +rename_in_running_mnesia(FromNode, ToNode) -> + All = rabbit_mnesia:cluster_nodes(all), + Running = rabbit_mnesia:cluster_nodes(running), + case {lists:member(FromNode, Running), lists:member(ToNode, All)} of + {false, true} -> ok; + {true, _} -> exit({old_node_running, FromNode}); + {_, false} -> exit({new_node_not_in_cluster, ToNode}) + end, + {atomic, ok} = mnesia:del_table_copy(schema, FromNode), + Map = mini_map(FromNode, ToNode), + {atomic, _} = transform_table(rabbit_durable_queue, Map), + ok. + +transform_table(Table, Map) -> + mnesia:sync_transaction( + fun () -> + mnesia:lock({table, Table}, write), + transform_table(Table, Map, mnesia:first(Table)) + end). + +transform_table(_Table, _Map, '$end_of_table') -> + ok; +transform_table(Table, Map, Key) -> + [Term] = mnesia:read(Table, Key, write), + ok = mnesia:write(Table, update_term(Map, Term), write), + transform_table(Table, Map, mnesia:next(Table, Key)). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b829ae9476..9e3dd1e7c4 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -473,6 +473,7 @@ write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + file_handle_cache_stats:update(msg_store_read), %% Check the cur file cache case ets:lookup(CurFileCacheEts, MsgId) of [] -> @@ -507,6 +508,7 @@ server_cast(#client_msstate { server = Server }, Msg) -> client_write(MsgId, Msg, Flow, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> + file_handle_cache_stats:update(msg_store_write), ok = client_update_flying(+1, MsgId, CState), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), ok = server_cast(CState, {write, CRef, MsgId, Flow}). @@ -1299,7 +1301,8 @@ should_mask_action(CRef, MsgId, open_file(Dir, FileName, Mode) -> file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, - [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}, + {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1a2883748f..f5deaef388 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -394,7 +394,11 @@ node_listeners(Node) -> mnesia:dirty_read(rabbit_listener, Node). on_node_down(Node) -> - ok = mnesia:dirty_delete(rabbit_listener, Node). + case lists:member(Node, nodes()) of + false -> ok = mnesia:dirty_delete(rabbit_listener, Node); + true -> rabbit_log:info( + "Keep ~s listeners: the node is already back~n", [Node]) + end. start_client(Sock, SockTransform) -> {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 2fab29996b..0f00e66e55 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -25,14 +25,15 @@ update_cluster_status/0, reset_cluster_status/0]). -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). -export([partitions/0, partitions/1, status/1, subscribe/1]). --export([pause_minority_guard/0]). +-export([pause_partition_guard/0]). +-export([global_sync/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% Utils --export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0, +-export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0, alive_nodes/1, alive_rabbit_nodes/1]). -define(SERVER, ?MODULE). @@ -64,10 +65,10 @@ -spec(partitions/1 :: ([node()]) -> [{node(), [node()]}]). -spec(status/1 :: ([node()]) -> {[{node(), [node()]}], [node()]}). -spec(subscribe/1 :: (pid()) -> 'ok'). --spec(pause_minority_guard/0 :: () -> 'ok' | 'pausing'). +-spec(pause_partition_guard/0 :: () -> 'ok' | 'pausing'). -spec(all_rabbit_nodes_up/0 :: () -> boolean()). --spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). +-spec(run_outside_applications/2 :: (fun (() -> any()), boolean()) -> pid()). -spec(ping_all/0 :: () -> 'ok'). -spec(alive_nodes/1 :: ([node()]) -> [node()]). -spec(alive_rabbit_nodes/1 :: ([node()]) -> [node()]). @@ -194,34 +195,43 @@ subscribe(Pid) -> gen_server:cast(?SERVER, {subscribe, Pid}). %%---------------------------------------------------------------------------- -%% pause_minority safety +%% pause_minority/pause_if_all_down safety %%---------------------------------------------------------------------------- %% If we are in a minority and pause_minority mode then a) we are %% going to shut down imminently and b) we should not confirm anything %% until then, since anything we confirm is likely to be lost. %% -%% We could confirm something by having an HA queue see the minority +%% The same principles apply to a node which isn't part of the preferred +%% partition when we are in pause_if_all_down mode. +%% +%% We could confirm something by having an HA queue see the pausing %% state (and fail over into it) before the node monitor stops us, or %% by using unmirrored queues and just having them vanish (and %% confiming messages as thrown away). %% %% So we have channels call in here before issuing confirms, to do a -%% lightweight check that we have not entered a minority state. +%% lightweight check that we have not entered a pausing state. -pause_minority_guard() -> - case get(pause_minority_guard) of - not_minority_mode -> +pause_partition_guard() -> + case get(pause_partition_guard) of + not_pause_mode -> ok; undefined -> {ok, M} = application:get_env(rabbit, cluster_partition_handling), case M of - pause_minority -> pause_minority_guard([], ok); - _ -> put(pause_minority_guard, not_minority_mode), - ok + pause_minority -> + pause_minority_guard([], ok); + {pause_if_all_down, PreferredNodes, _} -> + pause_if_all_down_guard(PreferredNodes, [], ok); + _ -> + put(pause_partition_guard, not_pause_mode), + ok end; {minority_mode, Nodes, LastState} -> - pause_minority_guard(Nodes, LastState) + pause_minority_guard(Nodes, LastState); + {pause_if_all_down_mode, PreferredNodes, Nodes, LastState} -> + pause_if_all_down_guard(PreferredNodes, Nodes, LastState) end. pause_minority_guard(LastNodes, LastState) -> @@ -231,11 +241,89 @@ pause_minority_guard(LastNodes, LastState) -> false -> pausing; true -> ok end, - put(pause_minority_guard, + put(pause_partition_guard, {minority_mode, nodes(), NewState}), NewState end. +pause_if_all_down_guard(PreferredNodes, LastNodes, LastState) -> + case nodes() of + LastNodes -> LastState; + _ -> NewState = case in_preferred_partition(PreferredNodes) of + false -> pausing; + true -> ok + end, + put(pause_partition_guard, + {pause_if_all_down_mode, PreferredNodes, nodes(), + NewState}), + NewState + end. + +%%---------------------------------------------------------------------------- +%% "global" hang workaround. +%%---------------------------------------------------------------------------- + +%% This code works around a possible inconsistency in the "global" +%% state, causing global:sync/0 to never return. +%% +%% 1. A process is spawned. +%% 2. If after 15", global:sync() didn't return, the "global" +%% state is parsed. +%% 3. If it detects that a sync is blocked for more than 10", +%% the process sends fake nodedown/nodeup events to the two +%% nodes involved (one local, one remote). +%% 4. Both "global" instances restart their synchronisation. +%% 5. globao:sync() finally returns. +%% +%% FIXME: Remove this workaround, once we got rid of the change to +%% "dist_auto_connect" and fixed the bugs uncovered. + +global_sync() -> + Pid = spawn(fun workaround_global_hang/0), + ok = global:sync(), + Pid ! global_sync_done, + ok. + +workaround_global_hang() -> + receive + global_sync_done -> + ok + after 15000 -> + find_blocked_global_peers() + end. + +find_blocked_global_peers() -> + {status, _, _, [Dict | _]} = sys:get_status(global_name_server), + find_blocked_global_peers1(Dict). + +find_blocked_global_peers1([{{sync_tag_his, Peer}, Timestamp} | Rest]) -> + Diff = timer:now_diff(erlang:now(), Timestamp), + if + Diff >= 10000 -> unblock_global_peer(Peer); + true -> ok + end, + find_blocked_global_peers1(Rest); +find_blocked_global_peers1([_ | Rest]) -> + find_blocked_global_peers1(Rest); +find_blocked_global_peers1([]) -> + ok. + +unblock_global_peer(PeerNode) -> + ThisNode = node(), + PeerState = rpc:call(PeerNode, sys, get_status, [global_name_server]), + error_logger:info_msg( + "Global hang workaround: global state on ~s seems broken~n" + " * Peer global state: ~p~n" + " * Local global state: ~p~n" + "Faking nodedown/nodeup between ~s and ~s~n", + [PeerNode, PeerState, sys:get_status(global_name_server), + PeerNode, ThisNode]), + {global_name_server, ThisNode} ! {nodedown, PeerNode}, + {global_name_server, PeerNode} ! {nodedown, ThisNode}, + {global_name_server, ThisNode} ! {nodeup, PeerNode}, + {global_name_server, PeerNode} ! {nodeup, ThisNode}, + ok. + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -291,8 +379,9 @@ handle_cast(notify_node_up, State = #state{guid = GUID}) -> %% 'check_partial_partition' to all the nodes it still thinks are %% alive. If any of those (intermediate) nodes still see the "down" %% node as up, they inform it that this has happened. The original -%% node (in 'ignore' or 'autoheal' mode) will then disconnect from the -%% intermediate node to "upgrade" to a full partition. +%% node (in 'ignore', 'pause_if_all_down' or 'autoheal' mode) will then +%% disconnect from the intermediate node to "upgrade" to a full +%% partition. %% %% In pause_minority mode it will instead immediately pause until all %% nodes come back. This is because the contract for pause_minority is @@ -357,12 +446,22 @@ handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID}, ArgsBase), await_cluster_recovery(fun all_nodes_up/0), {noreply, State}; + {ok, {pause_if_all_down, PreferredNodes, _}} -> + case in_preferred_partition(PreferredNodes) of + true -> rabbit_log:error( + FmtBase ++ "We will therefore intentionally " + "disconnect from ~s~n", ArgsBase ++ [Proxy]), + upgrade_to_full_partition(Proxy); + false -> rabbit_log:info( + FmtBase ++ "We are about to pause, no need " + "for further actions~n", ArgsBase) + end, + {noreply, State}; {ok, _} -> rabbit_log:error( FmtBase ++ "We will therefore intentionally disconnect from ~s~n", ArgsBase ++ [Proxy]), - cast(Proxy, {partial_partition_disconnect, node()}), - disconnect(Proxy), + upgrade_to_full_partition(Proxy), {noreply, State} end; @@ -527,17 +626,29 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> %% that we can respond in the same way to "rabbitmqctl stop_app" %% and "rabbitmqctl stop" as much as possible. %% - %% However, for pause_minority mode we can't do this, since we - %% depend on looking at whether other nodes are up to decide - %% whether to come back up ourselves - if we decide that based on - %% the rabbit application we would go down and never come back. + %% However, for pause_minority and pause_if_all_down modes we can't do + %% this, since we depend on looking at whether other nodes are up + %% to decide whether to come back up ourselves - if we decide that + %% based on the rabbit application we would go down and never come + %% back. case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> - case majority() of + case majority([Node]) of true -> ok; false -> await_cluster_recovery(fun majority/0) end, State; + {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} -> + case in_preferred_partition(PreferredNodes, [Node]) of + true -> ok; + false -> await_cluster_recovery( + fun in_preferred_partition/0) + end, + case HowToRecover of + autoheal -> State#state{autoheal = + rabbit_autoheal:node_down(Node, Autoheal)}; + _ -> State + end; {ok, ignore} -> State; {ok, autoheal} -> @@ -549,35 +660,56 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> end. await_cluster_recovery(Condition) -> - rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", - []), + rabbit_log:warning("Cluster minority/secondary status detected - " + "awaiting recovery~n", []), run_outside_applications(fun () -> rabbit:stop(), wait_for_cluster_recovery(Condition) - end), + end, false), ok. -run_outside_applications(Fun) -> +run_outside_applications(Fun, WaitForExistingProcess) -> spawn(fun () -> %% If our group leader is inside an application we are about %% to stop, application:stop/1 does not return. group_leader(whereis(init), self()), - %% Ensure only one such process at a time, the - %% exit(badarg) is harmless if one is already running - try register(rabbit_outside_app_process, self()) of - true -> - try - Fun() - catch _:E -> - rabbit_log:error( - "rabbit_outside_app_process:~n~p~n~p~n", - [E, erlang:get_stacktrace()]) - end - catch error:badarg -> - ok - end + register_outside_app_process(Fun, WaitForExistingProcess) end). +register_outside_app_process(Fun, WaitForExistingProcess) -> + %% Ensure only one such process at a time, the exit(badarg) is + %% harmless if one is already running. + %% + %% If WaitForExistingProcess is false, the given fun is simply not + %% executed at all and the process exits. + %% + %% If WaitForExistingProcess is true, we wait for the end of the + %% currently running process before executing the given function. + try register(rabbit_outside_app_process, self()) of + true -> + do_run_outside_app_fun(Fun) + catch + error:badarg when WaitForExistingProcess -> + MRef = erlang:monitor(process, rabbit_outside_app_process), + receive + {'DOWN', MRef, _, _, _} -> + %% The existing process exited, let's try to + %% register again. + register_outside_app_process(Fun, WaitForExistingProcess) + end; + error:badarg -> + ok + end. + +do_run_outside_app_fun(Fun) -> + try + Fun() + catch _:E -> + rabbit_log:error( + "rabbit_outside_app_process:~n~p~n~p~n", + [E, erlang:get_stacktrace()]) + end. + wait_for_cluster_recovery(Condition) -> ping_all(), case Condition() of @@ -600,7 +732,9 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, %% that we do not attempt to deal with individual (other) partitions %% going away. It's only safe to forget anything about partitions when %% there are no partitions. - Partitions1 = case Partitions -- (Partitions -- alive_rabbit_nodes()) of + Down = Partitions -- alive_rabbit_nodes(), + NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running), + Partitions1 = case Partitions -- Down -- NoLongerPartitioned of [] -> []; _ -> Partitions end, @@ -661,6 +795,10 @@ del_node(Node, Nodes) -> Nodes -- [Node]. cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg). +upgrade_to_full_partition(Proxy) -> + cast(Proxy, {partial_partition_disconnect, node()}), + disconnect(Proxy). + %% When we call this, it's because we want to force Mnesia to detect a %% partition. But if we just disconnect_node/1 then Mnesia won't %% detect a very short partition. So we want to force a slightly @@ -683,14 +821,32 @@ disconnect(Node) -> %% here. "rabbit" in a function's name implies we test if the rabbit %% application is up, not just the node. -%% As we use these functions to decide what to do in pause_minority -%% state, they *must* be fast, even in the case where TCP connections -%% are timing out. So that means we should be careful about whether we -%% connect to nodes which are currently disconnected. +%% As we use these functions to decide what to do in pause_minority or +%% pause_if_all_down states, they *must* be fast, even in the case where +%% TCP connections are timing out. So that means we should be careful +%% about whether we connect to nodes which are currently disconnected. majority() -> + majority([]). + +majority(NodesDown) -> + Nodes = rabbit_mnesia:cluster_nodes(all), + AliveNodes = alive_nodes(Nodes) -- NodesDown, + length(AliveNodes) / length(Nodes) > 0.5. + +in_preferred_partition() -> + {ok, {pause_if_all_down, PreferredNodes, _}} = + application:get_env(rabbit, cluster_partition_handling), + in_preferred_partition(PreferredNodes). + +in_preferred_partition(PreferredNodes) -> + in_preferred_partition(PreferredNodes, []). + +in_preferred_partition(PreferredNodes, NodesDown) -> Nodes = rabbit_mnesia:cluster_nodes(all), - length(alive_nodes(Nodes)) / length(Nodes) > 0.5. + RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)], + AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown, + RealPreferredNodes =:= [] orelse AliveNodes =/= []. all_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 7f7fcc3126..bbe0d35719 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -18,7 +18,7 @@ -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2, is_process_running/2, - cluster_name/0, set_cluster_name/1]). + cluster_name/0, set_cluster_name/1, ensure_epmd/0]). -include_lib("kernel/include/inet.hrl"). @@ -41,6 +41,7 @@ -spec(is_process_running/2 :: (node(), atom()) -> boolean()). -spec(cluster_name/0 :: () -> binary()). -spec(set_cluster_name/1 :: (binary()) -> 'ok'). +-spec(ensure_epmd/0 :: () -> 'ok'). -endif. @@ -197,3 +198,19 @@ cluster_name_default() -> set_cluster_name(Name) -> rabbit_runtime_parameters:set_global(cluster_name, Name). + +ensure_epmd() -> + {ok, Prog} = init:get_argument(progname), + ID = random:uniform(1000000000), + Port = open_port( + {spawn_executable, os:find_executable(Prog)}, + [{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]), + "-noshell", "-eval", "halt()."]}, + exit_status, stderr_to_stdout, use_stdio]), + port_shutdown_loop(Port). + +port_shutdown_loop(Port) -> + receive + {Port, {exit_status, _Rc}} -> ok; + {Port, _} -> port_shutdown_loop(Port) + end. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl new file mode 100644 index 0000000000..1d9522f613 --- /dev/null +++ b/src/rabbit_priority_queue.erl @@ -0,0 +1,574 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_priority_queue). + +-include_lib("rabbit.hrl"). +-include_lib("rabbit_framing.hrl"). +-behaviour(rabbit_backing_queue). + +%% enabled unconditionally. Disabling priority queueing after +%% it has been enabled is dangerous. +-rabbit_boot_step({?MODULE, + [{description, "enable priority queue"}, + {mfa, {?MODULE, enable, []}}, + {requires, pre_boot}, + {enables, kernel_ready}]}). + +-export([enable/0]). + +-export([start/1, stop/0]). + +-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, + purge/1, purge_acks/1, + publish/6, publish_delivered/5, discard/4, drain_confirmed/1, + dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, + ackfold/4, fold/3, len/1, is_empty/1, depth/1, + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, + handle_pre_hibernate/1, resume/1, msg_rates/1, + info/2, invoke/3, is_duplicate/2]). + +-record(state, {bq, bqss}). +-record(passthrough, {bq, bqs}). + +%% See 'note on suffixes' below +-define(passthrough1(F), State#passthrough{bqs = BQ:F}). +-define(passthrough2(F), + {Res, BQS1} = BQ:F, {Res, State#passthrough{bqs = BQS1}}). +-define(passthrough3(F), + {Res1, Res2, BQS1} = BQ:F, {Res1, Res2, State#passthrough{bqs = BQS1}}). + +%% This module adds suport for priority queues. +%% +%% Priority queues have one backing queue per priority. Backing queue functions +%% then produce a list of results for each BQ and fold over them, sorting +%% by priority. +%% +%%For queues that do not +%% have priorities enabled, the functions in this module delegate to +%% their "regular" backing queue module counterparts. See the `passthrough` +%% record and passthrough{1,2,3} macros. +%% +%% Delivery to consumers happens by first "running" the queue with +%% the highest priority until there are no more messages to deliver, +%% then the next one, and so on. This offers good prioritisation +%% but may result in lower priority messages not being delivered +%% when there's a high ingress rate of messages with higher priority. + +enable() -> + {ok, RealBQ} = application:get_env(rabbit, backing_queue_module), + case RealBQ of + ?MODULE -> ok; + _ -> rabbit_log:info("Priority queues enabled, real BQ is ~s~n", + [RealBQ]), + application:set_env( + rabbitmq_priority_queue, backing_queue_module, RealBQ), + application:set_env(rabbit, backing_queue_module, ?MODULE) + end. + +%%---------------------------------------------------------------------------- + +start(QNames) -> + BQ = bq(), + %% TODO this expand-collapse dance is a bit ridiculous but it's what + %% rabbit_amqqueue:recover/0 expects. We could probably simplify + %% this if we rejigged recovery a bit. + {DupNames, ExpNames} = expand_queues(QNames), + case BQ:start(ExpNames) of + {ok, ExpRecovery} -> + {ok, collapse_recovery(QNames, DupNames, ExpRecovery)}; + Else -> + Else + end. + +stop() -> + BQ = bq(), + BQ:stop(). + +%%---------------------------------------------------------------------------- + +mutate_name(P, Q = #amqqueue{name = QName = #resource{name = QNameBin}}) -> + Q#amqqueue{name = QName#resource{name = mutate_name_bin(P, QNameBin)}}. + +mutate_name_bin(P, NameBin) -> <<NameBin/binary, 0, P:8>>. + +expand_queues(QNames) -> + lists:unzip( + lists:append([expand_queue(QName) || QName <- QNames])). + +expand_queue(QName = #resource{name = QNameBin}) -> + {ok, Q} = rabbit_misc:dirty_read({rabbit_durable_queue, QName}), + case priorities(Q) of + none -> [{QName, QName}]; + Ps -> [{QName, QName#resource{name = mutate_name_bin(P, QNameBin)}} + || P <- Ps] + end. + +collapse_recovery(QNames, DupNames, Recovery) -> + NameToTerms = lists:foldl(fun({Name, RecTerm}, Dict) -> + dict:append(Name, RecTerm, Dict) + end, dict:new(), lists:zip(DupNames, Recovery)), + [dict:fetch(Name, NameToTerms) || Name <- QNames]. + +priorities(#amqqueue{arguments = Args}) -> + Ints = [long, short, signedint, byte], + case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of + {Type, Max} -> case lists:member(Type, Ints) of + false -> none; + true -> lists:reverse(lists:seq(0, Max)) + end; + _ -> none + end. + +%%---------------------------------------------------------------------------- + +init(Q, Recover, AsyncCallback) -> + BQ = bq(), + case priorities(Q) of + none -> RealRecover = case Recover of + [R] -> R; %% [0] + R -> R + end, + #passthrough{bq = BQ, + bqs = BQ:init(Q, RealRecover, AsyncCallback)}; + Ps -> Init = fun (P, Term) -> + BQ:init( + mutate_name(P, Q), Term, + fun (M, F) -> AsyncCallback(M, {P, F}) end) + end, + BQSs = case have_recovery_terms(Recover) of + false -> [{P, Init(P, Recover)} || P <- Ps]; + _ -> PsTerms = lists:zip(Ps, Recover), + [{P, Init(P, Term)} || {P, Term} <- PsTerms] + end, + #state{bq = BQ, + bqss = BQSs} + end. +%% [0] collapse_recovery has the effect of making a list of recovery +%% terms in priority order, even for non priority queues. It's easier +%% to do that and "unwrap" in init/3 than to have collapse_recovery be +%% aware of non-priority queues. + +have_recovery_terms(new) -> false; +have_recovery_terms(non_clean_shutdown) -> false; +have_recovery_terms(_) -> true. + +terminate(Reason, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:terminate(Reason, BQSN) end, State); +terminate(Reason, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(terminate(Reason, BQS)). + +delete_and_terminate(Reason, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> + BQ:delete_and_terminate(Reason, BQSN) + end, State); +delete_and_terminate(Reason, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(delete_and_terminate(Reason, BQS)). + +delete_crashed(Q) -> + BQ = bq(), + case priorities(Q) of + none -> BQ:delete_crashed(Q); + Ps -> [BQ:delete_crashed(mutate_name(P, Q)) || P <- Ps] + end. + +purge(State = #state{bq = BQ}) -> + fold_add2(fun (_P, BQSN) -> BQ:purge(BQSN) end, State); +purge(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(purge(BQS)). + +purge_acks(State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:purge_acks(BQSN) end, State); +purge_acks(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(purge_acks(BQS)). + +publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #state{bq = BQ}) -> + pick1(fun (_P, BQSN) -> + BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQSN) + end, Msg, State); +publish(Msg, MsgProps, IsDelivered, ChPid, Flow, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). + +publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) -> + pick2(fun (P, BQSN) -> + {AckTag, BQSN1} = BQ:publish_delivered( + Msg, MsgProps, ChPid, Flow, BQSN), + {{P, AckTag}, BQSN1} + end, Msg, State); +publish_delivered(Msg, MsgProps, ChPid, Flow, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). + +%% TODO this is a hack. The BQ api does not give us enough information +%% here - if we had the Msg we could look at its priority and forward +%% to the appropriate sub-BQ. But we don't so we are stuck. +%% +%% But fortunately VQ ignores discard/4, so we can too, *assuming we +%% are talking to VQ*. discard/4 is used by HA, but that's "above" us +%% (if in use) so we don't break that either, just some hypothetical +%% alternate BQ implementation. +discard(_MsgId, _ChPid, _Flow, State = #state{}) -> + State; + %% We should have something a bit like this here: + %% pick1(fun (_P, BQSN) -> + %% BQ:discard(MsgId, ChPid, Flow, BQSN) + %% end, Msg, State); +discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(discard(MsgId, ChPid, Flow, BQS)). + +drain_confirmed(State = #state{bq = BQ}) -> + fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State); +drain_confirmed(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(drain_confirmed(BQS)). + +dropwhile(Pred, State = #state{bq = BQ}) -> + find2(fun (_P, BQSN) -> BQ:dropwhile(Pred, BQSN) end, undefined, State); +dropwhile(Pred, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(dropwhile(Pred, BQS)). + +%% TODO this is a bit nasty. In the one place where fetchwhile/4 is +%% actually used the accumulator is a list of acktags, which of course +%% we need to mutate - so we do that although we are encoding an +%% assumption here. +fetchwhile(Pred, Fun, Acc, State = #state{bq = BQ}) -> + findfold3( + fun (P, BQSN, AccN) -> + {Res, AccN1, BQSN1} = BQ:fetchwhile(Pred, Fun, AccN, BQSN), + {Res, priority_on_acktags(P, AccN1), BQSN1} + end, Acc, undefined, State); +fetchwhile(Pred, Fun, Acc, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough3(fetchwhile(Pred, Fun, Acc, BQS)). + +fetch(AckRequired, State = #state{bq = BQ}) -> + find2( + fun (P, BQSN) -> + case BQ:fetch(AckRequired, BQSN) of + {empty, BQSN1} -> {empty, BQSN1}; + {{Msg, Del, ATag}, BQSN1} -> {{Msg, Del, {P, ATag}}, BQSN1} + end + end, empty, State); +fetch(AckRequired, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(fetch(AckRequired, BQS)). + +drop(AckRequired, State = #state{bq = BQ}) -> + find2(fun (P, BQSN) -> + case BQ:drop(AckRequired, BQSN) of + {empty, BQSN1} -> {empty, BQSN1}; + {{MsgId, AckTag}, BQSN1} -> {{MsgId, {P, AckTag}}, BQSN1} + end + end, empty, State); +drop(AckRequired, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(drop(AckRequired, BQS)). + +ack(AckTags, State = #state{bq = BQ}) -> + fold_by_acktags2(fun (AckTagsN, BQSN) -> + BQ:ack(AckTagsN, BQSN) + end, AckTags, State); +ack(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(ack(AckTags, BQS)). + +requeue(AckTags, State = #state{bq = BQ}) -> + fold_by_acktags2(fun (AckTagsN, BQSN) -> + BQ:requeue(AckTagsN, BQSN) + end, AckTags, State); +requeue(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(requeue(AckTags, BQS)). + +%% Similar problem to fetchwhile/4 +ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) -> + AckTagsByPriority = partition_acktags(AckTags), + fold2( + fun (P, BQSN, AccN) -> + case orddict:find(P, AckTagsByPriority) of + {ok, ATagsN} -> {AccN1, BQSN1} = + BQ:ackfold(MsgFun, AccN, BQSN, ATagsN), + {priority_on_acktags(P, AccN1), BQSN1}; + error -> {AccN, BQSN} + end + end, Acc, State); +ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags) -> + ?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags)). + +fold(Fun, Acc, State = #state{bq = BQ}) -> + fold2(fun (_P, BQSN, AccN) -> BQ:fold(Fun, AccN, BQSN) end, Acc, State); +fold(Fun, Acc, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(fold(Fun, Acc, BQS)). + +len(#state{bq = BQ, bqss = BQSs}) -> + add0(fun (_P, BQSN) -> BQ:len(BQSN) end, BQSs); +len(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:len(BQS). + +is_empty(#state{bq = BQ, bqss = BQSs}) -> + all0(fun (_P, BQSN) -> BQ:is_empty(BQSN) end, BQSs); +is_empty(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:is_empty(BQS). + +depth(#state{bq = BQ, bqss = BQSs}) -> + add0(fun (_P, BQSN) -> BQ:depth(BQSN) end, BQSs); +depth(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:depth(BQS). + +set_ram_duration_target(DurationTarget, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> + BQ:set_ram_duration_target(DurationTarget, BQSN) + end, State); +set_ram_duration_target(DurationTarget, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(set_ram_duration_target(DurationTarget, BQS)). + +ram_duration(State = #state{bq = BQ}) -> + fold_add2(fun (_P, BQSN) -> BQ:ram_duration(BQSN) end, State); +ram_duration(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(ram_duration(BQS)). + +needs_timeout(#state{bq = BQ, bqss = BQSs}) -> + fold0(fun (_P, _BQSN, timed) -> timed; + (_P, BQSN, idle) -> case BQ:needs_timeout(BQSN) of + timed -> timed; + _ -> idle + end; + (_P, BQSN, false) -> BQ:needs_timeout(BQSN) + end, false, BQSs); +needs_timeout(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:needs_timeout(BQS). + +timeout(State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:timeout(BQSN) end, State); +timeout(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(timeout(BQS)). + +handle_pre_hibernate(State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> + BQ:handle_pre_hibernate(BQSN) + end, State); +handle_pre_hibernate(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(handle_pre_hibernate(BQS)). + +resume(State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:resume(BQSN) end, State); +resume(State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(resume(BQS)). + +msg_rates(#state{bq = BQ, bqss = BQSs}) -> + fold0(fun(_P, BQSN, {InN, OutN}) -> + {In, Out} = BQ:msg_rates(BQSN), + {InN + In, OutN + Out} + end, {0.0, 0.0}, BQSs); +msg_rates(#passthrough{bq = BQ, bqs = BQS}) -> + BQ:msg_rates(BQS). + +info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) -> + fold0(fun (P, BQSN, Acc) -> + combine_status(P, BQ:info(backing_queue_status, BQSN), Acc) + end, nothing, BQSs); +info(Item, #state{bq = BQ, bqss = BQSs}) -> + fold0(fun (_P, BQSN, Acc) -> + Acc + BQ:info(Item, BQSN) + end, 0, BQSs); +info(Item, #passthrough{bq = BQ, bqs = BQS}) -> + BQ:info(Item, BQS). + +invoke(Mod, {P, Fun}, State = #state{bq = BQ}) -> + pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State); +invoke(Mod, Fun, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(invoke(Mod, Fun, BQS)). + +is_duplicate(Msg, State = #state{bq = BQ}) -> + pick2(fun (_P, BQSN) -> BQ:is_duplicate(Msg, BQSN) end, Msg, State); +is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(is_duplicate(Msg, BQS)). + +%%---------------------------------------------------------------------------- + +bq() -> + {ok, RealBQ} = application:get_env( + rabbitmq_priority_queue, backing_queue_module), + RealBQ. + +%% Note on suffixes: Many utility functions here have suffixes telling +%% you the arity of the return type of the BQ function they are +%% designed to work with. +%% +%% 0 - BQ function returns a value and does not modify state +%% 1 - BQ function just returns a new state +%% 2 - BQ function returns a 2-tuple of {Result, NewState} +%% 3 - BQ function returns a 3-tuple of {Result1, Result2, NewState} + +%% Fold over results +fold0(Fun, Acc, [{P, BQSN} | Rest]) -> fold0(Fun, Fun(P, BQSN, Acc), Rest); +fold0(_Fun, Acc, []) -> Acc. + +%% Do all BQs match? +all0(Pred, BQSs) -> fold0(fun (_P, _BQSN, false) -> false; + (P, BQSN, true) -> Pred(P, BQSN) + end, true, BQSs). + +%% Sum results +add0(Fun, BQSs) -> fold0(fun (P, BQSN, Acc) -> Acc + Fun(P, BQSN) end, 0, BQSs). + +%% Apply for all states +foreach1(Fun, State = #state{bqss = BQSs}) -> + a(State#state{bqss = foreach1(Fun, BQSs, [])}). +foreach1(Fun, [{P, BQSN} | Rest], BQSAcc) -> + BQSN1 = Fun(P, BQSN), + foreach1(Fun, Rest, [{P, BQSN1} | BQSAcc]); +foreach1(_Fun, [], BQSAcc) -> + lists:reverse(BQSAcc). + +%% For a given thing, just go to its BQ +pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) -> + {P, BQSN} = priority(Prioritisable, BQSs), + a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}). + +%% Fold over results +fold2(Fun, Acc, State = #state{bqss = BQSs}) -> + {Res, BQSs1} = fold2(Fun, Acc, BQSs, []), + {Res, a(State#state{bqss = BQSs1})}. +fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) -> + {Acc1, BQSN1} = Fun(P, BQSN, Acc), + fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]); +fold2(_Fun, Acc, [], BQSAcc) -> + {Acc, lists:reverse(BQSAcc)}. + +%% Fold over results assuming results are lists and we want to append them +fold_append2(Fun, State) -> + fold2(fun (P, BQSN, Acc) -> + {Res, BQSN1} = Fun(P, BQSN), + {Res ++ Acc, BQSN1} + end, [], State). + +%% Fold over results assuming results are numbers and we want to sum them +fold_add2(Fun, State) -> + fold2(fun (P, BQSN, Acc) -> + {Res, BQSN1} = Fun(P, BQSN), + {add_maybe_infinity(Res, Acc), BQSN1} + end, 0, State). + +%% Fold over results assuming results are lists and we want to append +%% them, and also that we have some AckTags we want to pass in to each +%% invocation. +fold_by_acktags2(Fun, AckTags, State) -> + AckTagsByPriority = partition_acktags(AckTags), + fold_append2(fun (P, BQSN) -> + case orddict:find(P, AckTagsByPriority) of + {ok, AckTagsN} -> Fun(AckTagsN, BQSN); + error -> {[], BQSN} + end + end, State). + +%% For a given thing, just go to its BQ +pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) -> + {P, BQSN} = priority(Prioritisable, BQSs), + {Res, BQSN1} = Fun(P, BQSN), + {Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}. + +%% Run through BQs in priority order until one does not return +%% {NotFound, NewState} or we have gone through them all. +find2(Fun, NotFound, State = #state{bqss = BQSs}) -> + {Res, BQSs1} = find2(Fun, NotFound, BQSs, []), + {Res, a(State#state{bqss = BQSs1})}. +find2(Fun, NotFound, [{P, BQSN} | Rest], BQSAcc) -> + case Fun(P, BQSN) of + {NotFound, BQSN1} -> find2(Fun, NotFound, Rest, [{P, BQSN1} | BQSAcc]); + {Res, BQSN1} -> {Res, lists:reverse([{P, BQSN1} | BQSAcc]) ++ Rest} + end; +find2(_Fun, NotFound, [], BQSAcc) -> + {NotFound, lists:reverse(BQSAcc)}. + +%% Run through BQs in priority order like find2 but also folding as we go. +findfold3(Fun, Acc, NotFound, State = #state{bqss = BQSs}) -> + {Res, Acc1, BQSs1} = findfold3(Fun, Acc, NotFound, BQSs, []), + {Res, Acc1, a(State#state{bqss = BQSs1})}. +findfold3(Fun, Acc, NotFound, [{P, BQSN} | Rest], BQSAcc) -> + case Fun(P, BQSN, Acc) of + {NotFound, Acc1, BQSN1} -> + findfold3(Fun, Acc1, NotFound, Rest, [{P, BQSN1} | BQSAcc]); + {Res, Acc1, BQSN1} -> + {Res, Acc1, lists:reverse([{P, BQSN1} | BQSAcc]) ++ Rest} + end; +findfold3(_Fun, Acc, NotFound, [], BQSAcc) -> + {NotFound, Acc, lists:reverse(BQSAcc)}. + +bq_fetch(P, []) -> exit({not_found, P}); +bq_fetch(P, [{P, BQSN} | _]) -> BQSN; +bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T). + +bq_store(P, BQS, BQSs) -> + [{PN, case PN of + P -> BQS; + _ -> BQSN + end} || {PN, BQSN} <- BQSs]. + +%% +a(State = #state{bqss = BQSs}) -> + Ps = [P || {P, _} <- BQSs], + case lists:reverse(lists:usort(Ps)) of + Ps -> State; + _ -> exit({bad_order, Ps}) + end. + +%%---------------------------------------------------------------------------- + +priority(P, BQSs) when is_integer(P) -> + {P, bq_fetch(P, BQSs)}; +priority(_Msg, [{P, BQSN}]) -> + {P, BQSN}; +priority(Msg = #basic_message{content = #content{properties = Props}}, + [{P, BQSN} | Rest]) -> + #'P_basic'{priority = Priority0} = Props, + Priority = case Priority0 of + undefined -> 0; + _ when is_integer(Priority0) -> Priority0 + end, + case Priority >= P of + true -> {P, BQSN}; + false -> priority(Msg, Rest) + end. + +add_maybe_infinity(infinity, _) -> infinity; +add_maybe_infinity(_, infinity) -> infinity; +add_maybe_infinity(A, B) -> A + B. + +partition_acktags(AckTags) -> partition_acktags(AckTags, orddict:new()). + +partition_acktags([], Partitioned) -> + orddict:map(fun (_P, RevAckTags) -> + lists:reverse(RevAckTags) + end, Partitioned); +partition_acktags([{P, AckTag} | Rest], Partitioned) -> + partition_acktags(Rest, rabbit_misc:orddict_cons(P, AckTag, Partitioned)). + +priority_on_acktags(P, AckTags) -> + [case Tag of + _ when is_integer(Tag) -> {P, Tag}; + _ -> Tag + end || Tag <- AckTags]. + +combine_status(P, New, nothing) -> + [{priority_lengths, [{P, proplists:get_value(len, New)}]} | New]; +combine_status(P, New, Old) -> + Combined = [{K, cse(V, proplists:get_value(K, Old))} || {K, V} <- New], + Lens = [{P, proplists:get_value(len, New)} | + proplists:get_value(priority_lengths, Old)], + [{priority_lengths, Lens} | Combined]. + +cse(infinity, _) -> infinity; +cse(_, infinity) -> infinity; +cse(A, B) when is_number(A) -> A + B; +cse({delta, _, _, _}, _) -> {delta, todo, todo, todo}; +cse(A, B) -> exit({A, B}). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0a2c88d441..24dcd23cc8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,19 +16,27 @@ -module(rabbit_queue_index). --export([erase/1, init/2, recover/5, +-export([erase/1, init/3, recover/6, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, + publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). -define(CLEAN_FILENAME, "clean.dot"). %%---------------------------------------------------------------------------- %% The queue index is responsible for recording the order of messages -%% within a queue on disk. +%% within a queue on disk. As such it contains records of messages +%% being published, delivered and acknowledged. The publish record +%% includes the sequence ID, message ID and a small quantity of +%% metadata about the message; the delivery and acknowledgement +%% records just contain the sequence ID. A publish record may also +%% contain the complete message if provided to publish/5; this allows +%% the message store to be avoided altogether for small messages. In +%% either case the publish record is stored in memory in the same +%% serialised format it will take on disk. %% %% Because of the fact that the queue can decide at any point to send %% a queue entry to disk, you can not rely on publishes appearing in @@ -36,7 +44,7 @@ %% then delivered, then ack'd. %% %% In order to be able to clean up ack'd messages, we write to segment -%% files. These files have a fixed maximum size: ?SEGMENT_ENTRY_COUNT +%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT %% publishes, delivers and acknowledgements. They are numbered, and so %% it is known that the 0th segment contains messages 0 -> %% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages @@ -85,7 +93,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}), +%% the tuple: {('no_pub'|{IsPersistent, Bin, MsgBin}), %% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly %% necessary for most operations. However, for startup, and to ensure %% the safe and correct combination of journal entries with entries @@ -128,8 +136,8 @@ -define(REL_SEQ_ONLY_RECORD_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits -%% of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of +%% size and then 128 bits of md5sum msg id. -define(PUB_PREFIX, 1). -define(PUB_PREFIX_BITS, 1). @@ -140,32 +148,37 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). +%% This is the size of the message body content, for stats -define(SIZE_BYTES, 4). -define(SIZE_BITS, (?SIZE_BYTES * 8)). -%% 16 bytes for md5sum + 8 for expiry + 4 for size +%% This is the size of the message record embedded in the queue +%% index. If 0, the message can be found in the message store. +-define(EMBEDDED_SIZE_BYTES, 4). +-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)). + +%% 16 bytes for md5sum + 8 for expiry -define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). -%% + 2 for seq, bits and prefix --define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). +%% + 4 for size +-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)). -%% 1 publish, 1 deliver, 1 ack per msg --define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))). +%% + 2 for seq, bits and prefix +-define(PUB_RECORD_PREFIX_BYTES, 2). %% ---- misc ---- --define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent} +-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin} -define(READ_MODE, [binary, raw, read]). --define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). -define(WRITE_MODE, [write | ?READ_MODE]). %%---------------------------------------------------------------------------- --record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unconfirmed }). +-record(qistate, {dir, segments, journal_handle, dirty_count, + max_journal_entries, on_sync, on_sync_msg, + unconfirmed, unconfirmed_msg}). --record(segment, { num, path, journal_entries, unacked }). +-record(segment, {num, path, journal_entries, unacked}). -include("rabbit.hrl"). @@ -174,6 +187,7 @@ -rabbit_upgrade({add_queue_ttl, local, []}). -rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). -rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). +-rabbit_upgrade({store_msg, local, [store_msg_size]}). -ifdef(use_specs). @@ -193,7 +207,9 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unconfirmed :: gb_sets:set() + on_sync_msg :: on_sync_fun(), + unconfirmed :: gb_sets:set(), + unconfirmed_msg :: gb_sets:set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -201,16 +217,18 @@ -type(shutdown_terms() :: [term()] | 'non_clean_shutdown'). -spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok'). --spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). --spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), - contains_predicate(), on_sync_fun()) -> +-spec(init/3 :: (rabbit_amqqueue:name(), + on_sync_fun(), on_sync_fun()) -> qistate()). +-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + contains_predicate(), + on_sync_fun(), on_sync_fun()) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), - rabbit_types:message_properties(), boolean(), qistate()) - -> qistate()). +-spec(publish/6 :: (rabbit_types:msg_id(), seq_id(), + rabbit_types:message_properties(), boolean(), + non_neg_integer(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/1 :: (qistate()) -> qistate()). @@ -241,14 +259,17 @@ erase(Name) -> false -> ok end. -init(Name, OnSyncFun) -> +init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir - State #qistate { on_sync = OnSyncFun }. + State#qistate{on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, + OnSyncFun, OnSyncMsgFun) -> State = blank_state(Name), - State1 = State #qistate { on_sync = OnSyncFun }, + State1 = State #qistate{on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}, CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), @@ -267,26 +288,35 @@ delete_and_terminate(State) -> ok = rabbit_file:recursive_delete([Dir]), State1. -publish(MsgId, SeqId, MsgProps, IsPersistent, - State = #qistate { unconfirmed = Unconfirmed }) - when is_binary(MsgId) -> +publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM}) -> + MsgId = case MsgOrId of + #basic_message{id = Id} -> Id; + Id when is_binary(Id) -> Id + end, ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( - case MsgProps#message_properties.needs_confirming of - true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed), - State #qistate { unconfirmed = Unconfirmed1 }; - false -> State + case {MsgProps#message_properties.needs_confirming, MsgOrId} of + {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + State#qistate{unconfirmed = UC1}; + {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + State#qistate{unconfirmed_msg = UCM1}; + {false, _} -> State end), + file_handle_cache_stats:update(queue_index_journal_write), + {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, - SeqId:?SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]), + SeqId:?SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]), maybe_flush_journal( - add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)). + JournalSizeHint, + add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -302,10 +332,12 @@ sync(State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), notify_sync(State). -needs_sync(#qistate { journal_handle = undefined }) -> +needs_sync(#qistate{journal_handle = undefined}) -> false; -needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) -> - case gb_sets:is_empty(UC) of +needs_sync(#qistate{journal_handle = JournalHdl, + unconfirmed = UC, + unconfirmed_msg = UCM}) -> + case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of true -> case file_handle_cache:needs_sync(JournalHdl) of true -> other; false -> false @@ -409,7 +441,9 @@ blank_state_dir(Dir) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unconfirmed = gb_sets:new() }. + on_sync_msg = fun (_) -> ok end, + unconfirmed = gb_sets:new(), + unconfirmed_msg = gb_sets:new() }. init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -479,9 +513,10 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack}, + fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack}, {SegmentAndDirtyCount, Bytes}) -> - {recover_message(ContainsCheckFun(MsgId), CleanShutdown, + {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), + {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown, Del, RelSeq, SegmentAndDirtyCount), Bytes + case IsPersistent of true -> MsgProps#message_properties.size; @@ -541,7 +576,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = blank_state(QueueName), ok = scan_segments( - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) + when is_binary(MsgId) -> gatherer:sync_in(Gatherer, {MsgId, 1}); (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, Acc) -> @@ -555,9 +591,9 @@ scan_segments(Fun, Acc, State) -> Result = lists:foldr( fun (Seg, AccN) -> segment_entries_foldr( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, IsAcked}, AccM) -> - Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps, + Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, IsPersistent, IsDelivered, IsAcked, AccM) end, AccN, segment_find_or_new(Seg, Dir, Segments)) end, Acc, all_segment_nums(State1)), @@ -568,24 +604,35 @@ scan_segments(Fun, Acc, State) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties { expiry = Expiry, - size = Size }) -> - [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>]. +create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, + size = Size }) -> + ExpiryBin = expiry_to_binary(Expiry), + case MsgOrId of + MsgId when is_binary(MsgId) -> + {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, <<>>}; + #basic_message{id = MsgId} -> + MsgBin = term_to_binary(MsgOrId), + {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, MsgBin} + end. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, - Size:?SIZE_BITS>>) -> + Size:?SIZE_BITS>>, MsgBin) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, - Exp = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - {MsgId, #message_properties { expiry = Exp, - size = Size }}. + Props = #message_properties{expiry = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + size = Size}, + case MsgBin of + <<>> -> {MsgId, Props}; + _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin), + {Msg, Props} + end. %%---------------------------------------------------------------------------- %% journal manipulation @@ -628,11 +675,14 @@ add_to_journal(RelSeq, Action, JEntries) -> array:reset(RelSeq, JEntries) end. -maybe_flush_journal(State = #qistate { dirty_count = DCount, - max_journal_entries = MaxJournal }) - when DCount > MaxJournal -> - flush_journal(State); maybe_flush_journal(State) -> + maybe_flush_journal(infinity, State). + +maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount, + max_journal_entries = MaxJournal }) + when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) -> + flush_journal(State); +maybe_flush_journal(_Hint, State) -> State. flush_journal(State = #qistate { segments = Segments }) -> @@ -656,9 +706,13 @@ append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, + _ -> Seg = array:sparse_foldr( + fun entry_to_segment/3, [], JEntries), + file_handle_cache_stats:update(queue_index_write), + + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), - array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), + file_handle_cache:append(Hdl, Seg), ok = file_handle_cache:close(Hdl), Segment #segment { journal_entries = array_new() } end. @@ -677,10 +731,13 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. load_journal(State = #qistate { dir = Dir }) -> - case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of + Path = filename:join(Dir, ?JOURNAL_FILENAME), + case rabbit_file:is_file(Path) of true -> {JournalHdl, State1} = get_journal_handle(State), + Size = rabbit_file:file_size(Path), {ok, 0} = file_handle_cache:position(JournalHdl, 0), - load_journal_entries(State1); + {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size), + parse_journal_entries(JournalBin, State1); false -> State end. @@ -704,41 +761,37 @@ recover_journal(State) -> end, Segments), State1 #qistate { segments = Segments1 }. -load_journal_entries(State = #qistate { journal_handle = Hdl }) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> - case Prefix of - ?DEL_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, del, State)); - ?ACK_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, ack, State)); - _ -> - case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of - %% Journal entry composed only of zeroes was probably - %% produced during a dirty shutdown so stop reading - {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> - State; - {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> - {MsgId, MsgProps} = parse_pub_record_body(Bin), - IsPersistent = case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end, - load_journal_entries( - add_to_journal( - SeqId, {MsgId, MsgProps, IsPersistent}, State)); - _ErrOrEoF -> %% err, we've lost at least a publish - State - end - end; - _ErrOrEoF -> State - end. +parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>, State) -> + parse_journal_entries(Rest, add_to_journal(SeqId, del, State)); + +parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>, State) -> + parse_journal_entries(Rest, add_to_journal(SeqId, ack, State)); +parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS, + 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) -> + %% Journal entry composed only of zeroes was probably + %% produced during a dirty shutdown so stop reading + State; +parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary, + Rest/binary>>, State) -> + IsPersistent = case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end, + parse_journal_entries( + Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State)); +parse_journal_entries(_ErrOrEoF, State) -> + State. deliver_or_ack(_Kind, [], State) -> State; deliver_or_ack(Kind, SeqIds, State) -> JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end, {JournalHdl, State1} = get_journal_handle(State), + file_handle_cache_stats:update(queue_index_journal_write), ok = file_handle_cache:append( JournalHdl, [<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]), @@ -746,11 +799,19 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) -> - case gb_sets:is_empty(UC) of - true -> State; - false -> OnSyncFun(UC), - State #qistate { unconfirmed = gb_sets:new() } +notify_sync(State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM, + on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}) -> + State1 = case gb_sets:is_empty(UC) of + true -> State; + false -> OnSyncFun(UC), + State#qistate{unconfirmed = gb_sets:new()} + end, + case gb_sets:is_empty(UCM) of + true -> State1; + false -> OnSyncMsgFun(UCM), + State1#qistate{unconfirmed_msg = gb_sets:new()} end. %%---------------------------------------------------------------------------- @@ -823,42 +884,42 @@ segment_nums({Segments, CachedSegments}) -> segments_new() -> {dict:new(), []}. -write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) -> - Hdl; -write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> - ok = case Pub of - no_pub -> - ok; - {MsgId, MsgProps, IsPersistent} -> - file_handle_cache:append( - Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]) - end, - ok = case {Del, Ack} of - {no_del, no_ack} -> - ok; - _ -> - Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - file_handle_cache:append( - Hdl, case {Del, Ack} of - {del, ack} -> [Binary, Binary]; - _ -> Binary - end) - end, - Hdl. +entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> + Buf; +entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> + %% NB: we are assembling the segment in reverse order here, so + %% del/ack comes first. + Buf1 = case {Del, Ack} of + {no_del, no_ack} -> + Buf; + _ -> + Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + case {Del, Ack} of + {del, ack} -> [[Binary, Binary] | Buf]; + _ -> [Binary | Buf] + end + end, + case Pub of + no_pub -> + Buf1; + {IsPersistent, Bin, MsgBin} -> + [[<<?PUB_PREFIX:?PUB_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1] + end. read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack}, + Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, - IsPersistent, IsDelivered == del} | Acc ]; + [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, + IsPersistent, IsDelivered == del} | Acc]; (_RelSeq, _Value, Acc) -> Acc end, Messages, Segment), @@ -868,7 +929,11 @@ segment_entries_foldr(Fun, Init, Segment = #segment { journal_entries = JEntries }) -> {SegEntries, _UnackedCount} = load_segment(false, Segment), {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), - array:sparse_foldr(Fun, Init, SegEntries1). + array:sparse_foldr( + fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) -> + {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), + Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc) + end, Init, SegEntries1). %% Loading segments %% @@ -877,44 +942,48 @@ load_segment(KeepAcked, #segment { path = Path }) -> Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of false -> Empty; - true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), + true -> Size = rabbit_file:file_size(Path), + file_handle_cache_stats:update(queue_index_read), + {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of - {ok, SegData} -> load_segment_entries( - KeepAcked, SegData, Empty); - eof -> Empty - end, + {ok, SegBin} = file_handle_cache:read(Hdl, Size), ok = file_handle_cache:close(Hdl), + Res = parse_segment_entries(SegBin, KeepAcked, Empty), Res end. -load_segment_entries(KeepAcked, - <<?PUB_PREFIX:?PUB_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, - PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, - SegData/binary>>, - {SegEntries, UnackedCount}) -> - {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), - Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, +parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS, + IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>, + KeepAcked, Acc) -> + parse_segment_publish_entry( + Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc); +parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) -> + parse_segment_entries( + Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); +parse_segment_entries(<<>>, _KeepAcked, Acc) -> + Acc. + +parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS, + MsgBin:MsgSize/binary, Rest/binary>>, + IsPersistent, RelSeq, KeepAcked, + {SegEntries, Unacked}) -> + Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1}); -load_segment_entries(KeepAcked, - <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS, SegData/binary>>, - {SegEntries, UnackedCount}) -> - {UnackedCountDelta, SegEntries1} = - case array:get(RelSeq, SegEntries) of - {Pub, no_del, no_ack} -> - { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; - {Pub, del, no_ack} when KeepAcked -> - {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; - {_Pub, del, no_ack} -> - {-1, array:reset(RelSeq, SegEntries)} - end, - load_segment_entries(KeepAcked, SegData, - {SegEntries1, UnackedCount + UnackedCountDelta}); -load_segment_entries(_KeepAcked, _SegData, Res) -> - Res. + parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1}); +parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) -> + parse_segment_entries(Rest, KeepAcked, Acc). + +add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> + case array:get(RelSeq, SegEntries) of + {Pub, no_del, no_ack} -> + {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked}; + {Pub, del, no_ack} when KeepAcked -> + {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1}; + {_Pub, del, no_ack} -> + {array:reset(RelSeq, SegEntries), Unacked - 1} + end. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). @@ -1121,6 +1190,40 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, store_msg_size_segment(_) -> stop. +store_msg() -> + foreach_queue_index({fun store_msg_journal/1, + fun store_msg_segment/1}). + +store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?EMBEDDED_SIZE_BITS>>, Rest}; +store_msg_journal(_) -> + stop. + +store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?EMBEDDED_SIZE_BITS>>, Rest}; +store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_segment(_) -> + stop. + + + %%---------------------------------------------------------------------------- @@ -1157,7 +1260,7 @@ transform_file(Path, Fun) when is_function(Fun)-> [{write_buffer, infinity}]), {ok, PathHdl} = file_handle_cache:open( - Path, [{read_ahead, Size} | ?READ_MODE], []), + Path, ?READ_MODE, [{read_buffer, Size}]), {ok, Content} = file_handle_cache:read(PathHdl, Size), ok = file_handle_cache:close(PathHdl), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index eb50eeb163..e3247439ef 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -58,6 +58,11 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(AUTH_NOTIFICATION_INFO_KEYS, + [host, vhost, name, peer_host, peer_port, protocol, auth_mechanism, + ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject, + peer_cert_validity]). + -define(IS_RUNNING(State), (State#v1.connection_state =:= running orelse State#v1.connection_state =:= blocking orelse @@ -214,7 +219,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> rabbit_net:fast_close(Sock), exit(normal) end, - log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(HandshakeTimeout, self(), handshake_timeout), @@ -260,8 +264,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of - connection_closed_abruptly -> warning; - _ -> error + connection_closed_with_no_data_received -> debug; + connection_closed_abruptly -> warning; + _ -> error end, "closing AMQP connection ~p (~s):~n~p~n", [self(), Name, Ex]) after @@ -313,8 +318,28 @@ binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> binlist_split(Len, [H|T], Acc) -> binlist_split(Len - size(H), T, [H|Acc]). -mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> - case rabbit_net:recv(Sock) of +mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, + connection_state = CS, + connection = #connection{ + name = ConnName}}) -> + Recv = rabbit_net:recv(Sock), + case CS of + pre_init when Buf =:= [] -> + %% We only log incoming connections when either the + %% first byte was received or there was an error (eg. a + %% timeout). + %% + %% The goal is to not log TCP healthchecks (a connection + %% with no data received) unless specified otherwise. + log(case Recv of + closed -> debug; + _ -> info + end, "accepting AMQP connection ~p (~s)~n", + [self(), ConnName]); + _ -> + ok + end, + case Recv of {data, Data} -> recvloop(Deb, [Data | Buf], BufLen + size(Data), State#v1{pending_recv = false}); @@ -334,10 +359,18 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> end end. -stop(closed, State) -> maybe_emit_stats(State), - throw(connection_closed_abruptly); -stop(Reason, State) -> maybe_emit_stats(State), - throw({inet_error, Reason}). +stop(closed, #v1{connection_state = pre_init} = State) -> + %% The connection was closed before any packet was received. It's + %% probably a load-balancer healthcheck: don't consider this a + %% failure. + maybe_emit_stats(State), + throw(connection_closed_with_no_data_received); +stop(closed, State) -> + maybe_emit_stats(State), + throw(connection_closed_abruptly); +stop(Reason, State) -> + maybe_emit_stats(State), + throw({inet_error, Reason}). handle_other({conserve_resources, Source, Conserve}, State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> @@ -944,7 +977,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, helper_sup = SupPid, sock = Sock, throttle = Throttle}) -> - ok = rabbit_access_control:check_vhost_access(User, VHostPath), + ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), @@ -1046,9 +1079,12 @@ auth_phase(Response, auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of - {refused, Msg, Args} -> - auth_fail(Msg, Args, Name, State); + {refused, Username, Msg, Args} -> + auth_fail(Username, Msg, Args, Name, State); {protocol_error, Msg, Args} -> + notify_auth_result(none, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], + State), rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> Secure = #'connection.secure'{challenge = Challenge}, @@ -1057,9 +1093,12 @@ auth_phase(Response, auth_state = AuthState1}}; {ok, User = #user{username = Username}} -> case rabbit_access_control:check_user_loopback(Username, Sock) of - ok -> ok; - not_allowed -> auth_fail("user '~s' can only connect via " - "localhost", [Username], Name, State) + ok -> + notify_auth_result(Username, user_authentication_success, + [], State); + not_allowed -> + auth_fail(Username, "user '~s' can only connect via " + "localhost", [Username], Name, State) end, Tune = #'connection.tune'{frame_max = get_env(frame_max), channel_max = get_env(channel_max), @@ -1071,11 +1110,15 @@ auth_phase(Response, end. -ifdef(use_specs). --spec(auth_fail/4 :: (string(), [any()], binary(), #v1{}) -> no_return()). +-spec(auth_fail/5 :: + (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) -> + no_return()). -endif. -auth_fail(Msg, Args, AuthName, +auth_fail(Username, Msg, Args, AuthName, State = #v1{connection = #connection{protocol = Protocol, capabilities = Capabilities}}) -> + notify_auth_result(Username, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], State), AmqpError = rabbit_misc:amqp_error( access_refused, "~s login refused: ~s", [AuthName, io_lib:format(Msg, Args)], none), @@ -1094,6 +1137,16 @@ auth_fail(Msg, Args, AuthName, end, rabbit_misc:protocol_error(AmqpError). +notify_auth_result(Username, AuthResult, ExtraProps, State) -> + EventProps = [{connection_type, network}, + {name, case Username of none -> ''; _ -> Username end}] ++ + [case Item of + name -> {connection_name, i(name, State)}; + _ -> {Item, i(Item, State)} + end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++ + ExtraProps, + rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). + %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index dbc2856d0c..9292068cea 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -16,7 +16,7 @@ -module(rabbit_trace). --export([init/1, enabled/1, tap_in/5, tap_out/5, start/1, stop/1]). +-export([init/1, enabled/1, tap_in/6, tap_out/5, start/1, stop/1]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -32,8 +32,8 @@ -spec(init/1 :: (rabbit_types:vhost()) -> state()). -spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()). --spec(tap_in/5 :: (rabbit_types:basic_message(), binary(), - rabbit_channel:channel_number(), +-spec(tap_in/6 :: (rabbit_types:basic_message(), [rabbit_amqqueue:name()], + binary(), rabbit_channel:channel_number(), rabbit_types:username(), state()) -> 'ok'). -spec(tap_out/5 :: (rabbit_amqqueue:qmsg(), binary(), rabbit_channel:channel_number(), @@ -58,15 +58,17 @@ enabled(VHost) -> {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS), lists:member(VHost, VHosts). -tap_in(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok; +tap_in(_Msg, _QNames, _ConnName, _ChannelNum, _Username, none) -> ok; tap_in(Msg = #basic_message{exchange_name = #resource{name = XName, virtual_host = VHost}}, - ConnName, ChannelNum, Username, TraceX) -> + QNames, ConnName, ChannelNum, Username, TraceX) -> trace(TraceX, Msg, <<"publish">>, XName, - [{<<"vhost">>, longstr, VHost}, - {<<"connection">>, longstr, ConnName}, - {<<"channel">>, signedint, ChannelNum}, - {<<"user">>, longstr, Username}]). + [{<<"vhost">>, longstr, VHost}, + {<<"connection">>, longstr, ConnName}, + {<<"channel">>, signedint, ChannelNum}, + {<<"user">>, longstr, Username}, + {<<"routed_queues">>, array, + [{longstr, QName#resource.name} || QName <- QNames]}]). tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok; tap_out({#resource{name = QName, virtual_host = VHost}, diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index ba48867ad0..039568df8e 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -27,7 +27,7 @@ vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, - connection/0, protocol/0, user/0, internal_user/0, + connection/0, protocol/0, auth_user/0, user/0, internal_user/0, username/0, password/0, password_hash/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0, mfargs/0, proc_name/0, @@ -131,11 +131,15 @@ -type(protocol() :: rabbit_framing:protocol()). +-type(auth_user() :: + #auth_user{username :: username(), + tags :: [atom()], + impl :: any()}). + -type(user() :: - #user{username :: username(), - tags :: [atom()], - auth_backend :: atom(), - impl :: any()}). + #user{username :: username(), + tags :: [atom()], + authz_backends :: [{atom(), any()}]}). -type(internal_user() :: #internal_user{username :: username(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 72bf7855c4..2ab6545911 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,7 +16,8 @@ -module(rabbit_upgrade). --export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0, + nodes_running/1, secondary_upgrade/1]). -include("rabbit.hrl"). @@ -122,6 +123,7 @@ remove_backup() -> maybe_upgrade_mnesia() -> AllNodes = rabbit_mnesia:cluster_nodes(all), + ok = rabbit_mnesia_rename:maybe_finish(AllNodes), case rabbit_version:upgrades_required(mnesia) of {error, starting_from_scratch} -> ok; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 9f6dc21aa3..16f0b21b1b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -50,6 +50,7 @@ -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). -rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). +-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}). %% ------------------------------------------------------------------- @@ -82,6 +83,7 @@ -spec(cluster_name/0 :: () -> 'ok'). -spec(down_slave_nodes/0 :: () -> 'ok'). -spec(queue_state/0 :: () -> 'ok'). +-spec(recoverable_slaves/0 :: () -> 'ok'). -endif. @@ -418,6 +420,18 @@ queue_state(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]). +recoverable_slaves() -> + ok = recoverable_slaves(rabbit_queue), + ok = recoverable_slaves(rabbit_durable_queue). + +recoverable_slaves(Table) -> + transform( + Table, fun (Q) -> Q end, %% Don't change shape of record + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, + state]). + + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1da3de2671..d4e0a04f39 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, - publish/5, publish_delivered/4, discard/3, drain_confirmed/1, + publish/6, publish_delivered/5, discard/4, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -28,16 +28,34 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/6]). %%---------------------------------------------------------------------------- +%% Messages, and their position in the queue, can be in memory or on +%% disk, or both. Persistent messages will have both message and +%% position pushed to disk as soon as they arrive; transient messages +%% can be written to disk (and thus both types can be evicted from +%% memory) under memory pressure. The question of whether a message is +%% in RAM and whether it is persistent are orthogonal. +%% +%% Messages are persisted using the queue index and the message +%% store. Normally the queue index holds the position of the message +%% *within this queue* along with a couple of small bits of metadata, +%% while the message store holds the message itself (including headers +%% and other properties). +%% +%% However, as an optimisation, small messages can be embedded +%% directly in the queue index and bypass the message store +%% altogether. +%% %% Definitions: - +%% %% alpha: this is a message where both the message itself, and its %% position within the queue are held in RAM %% -%% beta: this is a message where the message itself is only held on -%% disk, but its position within the queue is held in RAM. +%% beta: this is a message where the message itself is only held on +%% disk (if persisted to the message store) but its position +%% within the queue is held in RAM. %% %% gamma: this is a message where the message itself is only held on %% disk, but its position is both in RAM and on disk. @@ -248,8 +266,9 @@ q3, q4, next_seq_id, - ram_pending_ack, - disk_pending_ack, + ram_pending_ack, %% msgs using store, still in RAM + disk_pending_ack, %% msgs in store, paged out + qi_pending_ack, %% msgs using qi, *can't* be paged out index_state, msg_store_clients, durable, @@ -274,7 +293,11 @@ unconfirmed, confirmed, ack_out_counter, - ack_in_counter + ack_in_counter, + %% Unlike the other counters these two do not feed into + %% #rates{} and get reset + disk_read_count, + disk_write_count }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -285,8 +308,9 @@ msg, is_persistent, is_delivered, - msg_on_disk, + msg_in_store, index_on_disk, + persist_to, msg_props }). @@ -300,11 +324,13 @@ %% betas, the IO_BATCH_SIZE sets the number of betas that we must be %% due to write indices for before we do any work at all. -define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND +-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(QUEUE, lqueue). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). %%---------------------------------------------------------------------------- @@ -341,6 +367,7 @@ next_seq_id :: seq_id(), ram_pending_ack :: gb_trees:tree(), disk_pending_ack :: gb_trees:tree(), + qi_pending_ack :: gb_trees:tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -367,7 +394,9 @@ unconfirmed :: gb_sets:set(), confirmed :: gb_sets:set(), ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer() }). + ack_in_counter :: non_neg_integer(), + disk_read_count :: non_neg_integer(), + disk_write_count :: non_neg_integer() }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -426,16 +455,19 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(Queue, Recover, AsyncCallback) -> - init(Queue, Recover, AsyncCallback, - fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) - end, - fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). +init(Queue, Recover, Callback) -> + init( + Queue, Recover, Callback, + fun (MsgIds, ActionTaken) -> + msgs_written_to_disk(Callback, MsgIds, ActionTaken) + end, + fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, + fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). init(#amqqueue { name = QueueName, durable = IsDurable }, new, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> - IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> + IndexState = rabbit_queue_index:init(QueueName, + MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, @@ -446,13 +478,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), {PersistentClient, ContainsCheckFun} = case IsDurable of true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback), - {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end}; + {C, fun (MsgId) when is_binary(MsgId) -> + rabbit_msg_store:contains(MsgId, C); + (#basic_message{is_persistent = Persistent}) -> + Persistent + end}; false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, @@ -461,7 +497,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - ContainsCheckFun, MsgIdxOnDiskFun), + ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -514,51 +550,30 @@ delete_and_terminate(_Reason, State) -> delete_crashed(#amqqueue{name = QName}) -> ok = rabbit_queue_index:erase(QName). -purge(State = #vqstate { q4 = Q4, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - ram_bytes = RamBytes, - persistent_count = PCount, - persistent_bytes = PBytes }) -> +purge(State = #vqstate { q4 = Q4, + len = Len }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - Stats = {RamBytes, PCount, PBytes}, - {Stats1, IndexState1} = - remove_queue_entries(Q4, Stats, IndexState, MSCState), - - {Stats2, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1 }} = - - purge_betas_and_deltas( - Stats1, State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - - {{RamBytes3, PCount3, PBytes3}, IndexState3} = - remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), - - {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), - index_state = IndexState3, - len = 0, - bytes = 0, - ram_msg_count = 0, - ram_bytes = RamBytes3, - persistent_count = PCount3, - persistent_bytes = PBytes3 })}. + State1 = remove_queue_entries(Q4, State), + + State2 = #vqstate { q1 = Q1 } = + purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }), + + State3 = remove_queue_entries(Q1, State2), + + {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}. purge_acks(State) -> a(purge_pending_ack(false, State)). publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + IsDelivered, _ChPid, _Flow, + State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), @@ -567,42 +582,36 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, InCount1 = InCount + 1, - PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_bytes( - 1, 0, MsgStatus1, - inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount1, - persistent_count = PCount1, - unconfirmed = UC1 })), + State3 = stats({1, 0}, {none, MsgStatus1}, + State2#vqstate{ next_seq_id = SeqId + 1, + in_counter = InCount1, + unconfirmed = UC1 }), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + _ChPid, _Flow, + State = #vqstate { next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), - PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_bytes(0, 1, MsgStatus, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }), + State3 = stats({0, 1}, {none, MsgStatus1}, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. -discard(_MsgId, _ChPid, State) -> State. +discard(_MsgId, _ChPid, _Flow, State) -> State. drain_confirmed(State = #vqstate { confirmed = C }) -> case gb_sets:is_empty(C) of @@ -664,7 +673,7 @@ ack([], State) -> ack([SeqId], State) -> {#msg_status { msg_id = MsgId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, @@ -674,7 +683,7 @@ ack([SeqId], State) -> true -> rabbit_queue_index:ack([SeqId], IndexState); false -> IndexState end, - case MsgOnDisk of + case MsgInStore of true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, @@ -733,15 +742,18 @@ fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, [msg_iterator(State), disk_ack_iterator(State), - ram_ack_iterator(State)]), + ram_ack_iterator(State), + qi_ack_iterator(State)]), ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> - len(State) + gb_trees:size(RPA) + gb_trees:size(DPA). +depth(State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). set_ram_duration_target( DurationTarget, State = #vqstate { @@ -807,10 +819,11 @@ ram_duration(State) -> ram_msg_count = RamMsgCount, ram_msg_count_prev = RamMsgCountPrev, ram_pending_ack = RPA, + qi_pending_ack = QPA, ram_ack_count_prev = RamAckCountPrev } = update_rates(State), - RamAckCount = gb_trees:size(RPA), + RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA), Duration = %% msgs+acks / (msgs+acks/sec) == sec case lists:all(fun (X) -> X < 0.01 end, @@ -846,8 +859,9 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> RamMsgCount; -info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) -> - gb_trees:size(RPA); +info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA, + qi_pending_ack = QPA}) -> + gb_trees:size(RPA) + gb_trees:size(QPA); info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> @@ -863,6 +877,10 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; +info(disk_reads, #vqstate{disk_read_count = Count}) -> + Count; +info(disk_writes, #vqstate{disk_write_count = Count}) -> + Count; info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, @@ -933,14 +951,11 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) when Start + Count =< End -> Delta. -m(MsgStatus = #msg_status { msg = Msg, - is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, +m(MsgStatus = #msg_status { is_persistent = IsPersistent, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }) -> true = (not IsPersistent) or IndexOnDisk, - true = (not IndexOnDisk) or MsgOnDisk, - true = (Msg =/= undefined) or MsgOnDisk, - + true = msg_in_ram(MsgStatus) or MsgInStore, MsgStatus. one_if(true ) -> 1; @@ -959,21 +974,39 @@ msg_status(IsPersistent, IsDelivered, SeqId, msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = false, + msg_in_store = false, index_on_disk = false, + persist_to = determine_persist_to(Msg, MsgProps), msg_props = MsgProps}. +beta_msg_status({Msg = #basic_message{id = MsgId}, + SeqId, MsgProps, IsPersistent, IsDelivered}) -> + MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), + MS0#msg_status{msg_id = MsgId, + msg = Msg, + persist_to = queue_index, + msg_in_store = false}; + beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> + MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), + MS0#msg_status{msg_id = MsgId, + msg = undefined, + persist_to = msg_store, + msg_in_store = true}. + +beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> #msg_status{seq_id = SeqId, - msg_id = MsgId, msg = undefined, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = true, index_on_disk = true, msg_props = MsgProps}. -trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }. +trim_msg_status(MsgStatus) -> + case persist_to(MsgStatus) of + msg_store -> MsgStatus#msg_status{msg = undefined}; + queue_index -> MsgStatus + end. with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(MSCStateP), @@ -1035,26 +1068,36 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> - {Filtered, Delivers, Acks} = +betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> + {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = lists:foldr( - fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1} = Acc) -> + fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, + {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1]}; - false -> case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA)) of - false -> {?QUEUE:in_r(m(beta_msg_status(M)), - Filtered1), - Delivers1, Acks1}; - true -> Acc - end + [SeqId | Acks1], RRC, RB}; + false -> MsgStatus = m(beta_msg_status(M)), + HaveMsg = msg_in_ram(MsgStatus), + Size = msg_size(MsgStatus), + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)) of + false -> {?QUEUE:in_r(MsgStatus, Filtered1), + Delivers1, Acks1, + RRC + one_if(HaveMsg), + RB + one_if(HaveMsg) * Size}; + true -> Acc %% [0] + end end - end, {?QUEUE:new(), [], []}, List), - {Filtered, rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. + end, {?QUEUE:new(), [], [], 0, 0}, List), + {Filtered, RamReadyCount, RamBytes, + rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. +%% [0] We don't increase RamBytes here, even though it pertains to +%% unacked messages too, since if HaveMsg then the message must have +%% been stored in the QI, thus the message must have been in +%% qi_pending_ack, thus it must already have been in RAM. expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); @@ -1101,6 +1144,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, next_seq_id = NextSeqId, ram_pending_ack = gb_trees:empty(), disk_pending_ack = gb_trees:empty(), + qi_pending_ack = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1125,7 +1169,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, unconfirmed = gb_sets:new(), confirmed = gb_sets:new(), ack_out_counter = 0, - ack_in_counter = 0 }, + ack_in_counter = 0, + disk_read_count = 0, + disk_write_count = 0 }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -1141,11 +1187,9 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - upd_ram_bytes( - 1, MsgStatus, - inc_ram_msg_count( - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) })) + MsgStatus1 = MsgStatus#msg_status{msg = Msg}, + stats(ready0, {MsgStatus, MsgStatus1}, + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1168,33 +1212,57 @@ read_msg(#msg_status{msg = undefined, read_msg(#msg_status{msg = Msg}, State) -> {Msg, State}. -read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> +read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, + disk_read_count = Count}) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1}}. - -inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> - State#vqstate{ram_msg_count = RamMsgCount + 1}. - -upd_bytes(SignReady, SignUnacked, - MsgStatus = #msg_status{msg = undefined}, State) -> - upd_bytes0(SignReady, SignUnacked, MsgStatus, State); -upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) -> - upd_ram_bytes(SignReady + SignUnacked, MsgStatus, - upd_bytes0(SignReady, SignUnacked, MsgStatus, State)). - -upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, - State = #vqstate{bytes = Bytes, - unacked_bytes = UBytes, - persistent_bytes = PBytes}) -> + {Msg, State #vqstate {msg_store_clients = MSCState1, + disk_read_count = Count + 1}}. + +stats(Signs, Statuses, State) -> + stats0(expand_signs(Signs), expand_statuses(Statuses), State). + +expand_signs(ready0) -> {0, 0, true}; +expand_signs({A, B}) -> {A, B, false}. + +expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; +expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; +expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. + +%% In this function at least, we are religious: the variable name +%% contains "Ready" or "Unacked" iff that is what it counts. If +%% neither is present it counts both. +stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, + {InRamBefore, InRamAfter, MsgStatus}, + State = #vqstate{len = ReadyCount, + bytes = ReadyBytes, + ram_msg_count = RamReadyCount, + persistent_count = PersistentCount, + unacked_bytes = UnackedBytes, + ram_bytes = RamBytes, + persistent_bytes = PersistentBytes}) -> S = msg_size(MsgStatus), - SignTotal = SignReady + SignUnacked, - State#vqstate{bytes = Bytes + SignReady * S, - unacked_bytes = UBytes + SignUnacked * S, - persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. - -upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> - State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. + DeltaTotal = DeltaReady + DeltaUnacked, + DeltaRam = case {InRamBefore, InRamAfter} of + {false, false} -> 0; + {false, true} -> 1; + {true, false} -> -1; + {true, true} -> 0 + end, + DeltaRamReady = case DeltaReady of + 1 -> one_if(InRamAfter); + -1 -> -one_if(InRamBefore); + 0 when ReadyMsgPaged -> DeltaRam; + 0 -> 0 + end, + DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent), + State#vqstate{len = ReadyCount + DeltaReady, + ram_msg_count = RamReadyCount + DeltaRamReady, + persistent_count = PersistentCount + DeltaPersistent, + bytes = ReadyBytes + DeltaReady * S, + unacked_bytes = UnackedBytes + DeltaUnacked * S, + ram_bytes = RamBytes + DeltaRam * S, + persistent_bytes = PersistentBytes + DeltaPersistent * S}. msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. @@ -1203,17 +1271,13 @@ msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, - msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, + State = #vqstate {out_counter = OutCount, index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - persistent_count = PCount}) -> + msg_store_clients = MSCState}) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -1224,10 +1288,11 @@ remove(AckRequired, MsgStatus = #msg_status { ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of - {false, true, false} -> Rem(), IndexState1; - {false, true, true} -> Rem(), Ack(); - _ -> IndexState1 + IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of + {false, true, false} -> Rem(), IndexState1; + {false, true, true} -> Rem(), Ack(); + {false, false, true} -> Ack(); + _ -> IndexState1 end, %% 3. If an ack is required, add something sensible to PA @@ -1238,166 +1303,215 @@ remove(AckRequired, MsgStatus = #msg_status { {SeqId, StateN}; false -> {undefined, State} end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), State2 = case AckRequired of - false -> upd_bytes(-1, 0, MsgStatus, State1); - true -> upd_bytes(-1, 1, MsgStatus, State1) + false -> stats({-1, 0}, {MsgStatus, none}, State1); + true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1) end, {AckTag, maybe_update_rates( - State2 #vqstate {ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len - 1, - persistent_count = PCount1})}. - -purge_betas_and_deltas(Stats, - State = #vqstate { q3 = Q3, - index_state = IndexState, - msg_store_clients = MSCState }) -> + State2 #vqstate {out_counter = OutCount + 1, + index_state = IndexState2})}. + +purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) -> case ?QUEUE:is_empty(Q3) of - true -> {Stats, State}; - false -> {Stats1, IndexState1} = remove_queue_entries( - Q3, Stats, IndexState, MSCState), - purge_betas_and_deltas(Stats1, - maybe_deltas_to_betas( - State #vqstate { - q3 = ?QUEUE:new(), - index_state = IndexState1 })) + true -> State; + false -> State1 = remove_queue_entries(Q3, State), + purge_betas_and_deltas(maybe_deltas_to_betas( + State1#vqstate{q3 = ?QUEUE:new()})) end. -remove_queue_entries(Q, {RamBytes, PCount, PBytes}, - IndexState, MSCState) -> - {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} = +remove_queue_entries(Q, State = #vqstate{index_state = IndexState, + msg_store_clients = MSCState}) -> + {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), RamBytes, PBytes, [], []}, Q), + {orddict:new(), [], [], State}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {{RamBytes1, - PCount - case orddict:find(true, MsgIdsByStore) of - error -> 0; - {ok, Ids} -> length(Ids) - end, - PBytes1}, - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState))}. + IndexState1 = rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState)), + State1#vqstate{index_state = IndexState1}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, - is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent, - msg_props = #message_properties { size = Size } }, - {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> - {case MsgOnDisk of + #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, + is_persistent = IsPersistent} = MsgStatus, + {MsgIdsByStore, Delivers, Acks, State}) -> + {case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - RamBytes - Size * one_if(Msg =/= undefined), - PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - cons_if(IndexOnDisk, SeqId, Acks)}. + cons_if(IndexOnDisk, SeqId, Acks), + stats({-1, 0}, {MsgStatus, none}, State)}. %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_on_disk = true }, _MSCState) -> - MsgStatus; + msg_in_store = true }, State) -> + {MsgStatus, State}; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, - is_persistent = IsPersistent }, MSCState) + is_persistent = IsPersistent }, + State = #vqstate{ msg_store_clients = MSCState, + disk_write_count = Count}) when Force orelse IsPersistent -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), - MsgStatus #msg_status { msg_on_disk = true }; -maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> - MsgStatus. + case persist_to(MsgStatus) of + msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, + prepare_to_store(Msg)), + {MsgStatus#msg_status{msg_in_store = true}, + State#vqstate{disk_write_count = Count + 1}}; + queue_index -> {MsgStatus, State} + end; +maybe_write_msg_to_disk(_Force, MsgStatus, State) -> + {MsgStatus, State}. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { - index_on_disk = true }, IndexState) -> - true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - {MsgStatus, IndexState}; + index_on_disk = true }, State) -> + {MsgStatus, State}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_props = MsgProps}, IndexState) + msg_props = MsgProps}, + State = #vqstate{target_ram_count = TargetRamCount, + disk_write_count = DiskWriteCount, + index_state = IndexState}) when Force orelse IsPersistent -> - true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION + {MsgOrId, DiskWriteCount1} = + case persist_to(MsgStatus) of + msg_store -> {MsgId, DiskWriteCount}; + queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} + end, IndexState1 = rabbit_queue_index:publish( - MsgId, SeqId, MsgProps, IsPersistent, IndexState), - {MsgStatus #msg_status { index_on_disk = true }, - maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; -maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> - {MsgStatus, IndexState}. - -maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> - MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState), - {MsgStatus2, IndexState1} = - maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), - {MsgStatus2, State #vqstate { index_state = IndexState1 }}. + MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount, + IndexState), + IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1), + {MsgStatus#msg_status{index_on_disk = true}, + State#vqstate{index_state = IndexState2, + disk_write_count = DiskWriteCount1}}; + +maybe_write_index_to_disk(_Force, MsgStatus, State) -> + {MsgStatus, State}. + +maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> + {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), + maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). + +determine_persist_to(#basic_message{ + content = #content{properties = Props, + properties_bin = PropsBin}}, + #message_properties{size = BodySize}) -> + {ok, IndexMaxSize} = application:get_env( + rabbit, queue_index_embed_msgs_below), + %% The >= is so that you can set the env to 0 and never persist + %% to the index. + %% + %% We want this to be fast, so we avoid size(term_to_binary()) + %% here, or using the term size estimation from truncate.erl, both + %% of which are too slow. So instead, if the message body size + %% goes over the limit then we avoid any other checks. + %% + %% If it doesn't we need to decide if the properties will push + %% it past the limit. If we have the encoded properties (usual + %% case) we can just check their size. If we don't (message came + %% via the direct client), we make a guess based on the number of + %% headers. + case BodySize >= IndexMaxSize of + true -> msg_store; + false -> Est = case is_binary(PropsBin) of + true -> BodySize + size(PropsBin); + false -> #'P_basic'{headers = Hs} = Props, + case Hs of + undefined -> 0; + _ -> length(Hs) + end * ?HEADER_GUESS_SIZE + BodySize + end, + case Est >= IndexMaxSize of + true -> msg_store; + false -> queue_index + end + end. + +persist_to(#msg_status{persist_to = To}) -> To. + +prepare_to_store(Msg) -> + Msg#basic_message{ + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}. %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus, +record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, ack_in_counter = AckInCount}) -> - {RPA1, DPA1} = - case Msg of - undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)}; - _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA} + Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, + {RPA1, DPA1, QPA1} = + case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of + {false, _} -> {RPA, Insert(DPA), QPA}; + {_, queue_index} -> {RPA, DPA, Insert(QPA)}; + {_, msg_store} -> {Insert(RPA), DPA, QPA} end, State #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1, + qi_pending_ack = QPA1, ack_in_counter = AckInCount + 1}. lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> V; - none -> gb_trees:get(SeqId, DPA) + none -> case gb_trees:lookup(SeqId, DPA) of + {value, V} -> V; + none -> gb_trees:get(SeqId, QPA) + end end. -%% First parameter = UpdatePersistentCount +%% First parameter = UpdateStats remove_pending_ack(true, SeqId, State) -> - {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = - remove_pending_ack(false, SeqId, State), - PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), - {MsgStatus, upd_bytes(0, -1, MsgStatus, - State1 # vqstate{ persistent_count = PCount1 })}; -remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> + {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), + {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; +remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; - none -> DPA1 = gb_trees:delete(SeqId, DPA), - {gb_trees:get(SeqId, DPA), - State #vqstate { disk_pending_ack = DPA1 }} + none -> case gb_trees:lookup(SeqId, DPA) of + {value, V} -> + DPA1 = gb_trees:delete(SeqId, DPA), + {V, State#vqstate{disk_pending_ack = DPA1}}; + none -> + QPA1 = gb_trees:delete(SeqId, QPA), + {gb_trees:get(SeqId, QPA), + State#vqstate{qi_pending_ack = QPA1}} + end end. purge_pending_ack(KeepPersistent, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, index_state = IndexState, msg_store_clients = MSCState }) -> F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA), + F, rabbit_misc:gb_trees_fold( + F, rabbit_misc:gb_trees_fold( + F, accumulate_ack_init(), RPA), DPA), QPA), State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty() }, + disk_pending_ack = gb_trees:empty(), + qi_pending_ack = gb_trees:empty()}, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of @@ -1418,11 +1532,11 @@ accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), - case MsgOnDisk of + case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, @@ -1469,29 +1583,25 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> gb_sets:union(MIOD, Confirmed) }) end). +msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). + %%---------------------------------------------------------------------------- %% Internal plumbing for requeue %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), - {MsgStatus#msg_status { msg = Msg }, - upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1] + MsgStatus1 = MsgStatus#msg_status { msg = Msg }, + {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)}; publish_alpha(MsgStatus, State) -> - {MsgStatus, inc_ram_msg_count(State)}. -%% [1] We increase the ram_bytes here because we paged the message in -%% to requeue it, not purely because we requeued it. Hence in the -%% second head it's already accounted for as already in memory. OTOH -%% ram_msg_count does not include unacked messages, so it needs -%% incrementing in both heads. + {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of - true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; - _ -> {MsgStatus2, State1} - end. + {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1513,7 +1623,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2)) + Limit, PubFun, State2) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1527,13 +1637,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - State3 = - case msg_in_ram(MsgStatus) of - false -> State2; - true -> upd_ram_bytes(-1, MsgStatus, State2) - end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - upd_bytes(1, -1, MsgStatus, State3)} + stats({1, -1}, {MsgStatus, none}, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 @@ -1563,6 +1668,9 @@ ram_ack_iterator(State) -> disk_ack_iterator(State) -> {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. +qi_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}. + msg_iterator(State) -> istate(start, State). istate(start, State) -> {q4, State#vqstate.q4, State}; @@ -1592,7 +1700,8 @@ next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of + gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse + gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of false -> Next = {delta, Delta, Rest, State}, {value, beta_msg_status(M), false, Next, IndexState}; true -> next({delta, Delta, Rest, State}, IndexState) @@ -1689,12 +1798,12 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, - upd_ram_bytes( - -1, MsgStatus1, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 })) + stats({0, 0}, {MsgStatus, MsgStatus2}, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1755,8 +1864,12 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes, ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, + disk_read_count = DiskReadCount, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, @@ -1766,9 +1879,13 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, IndexState1), - State1 = State #vqstate { index_state = IndexState2 }, + {Q3a, RamCountsInc, RamBytesInc, IndexState2} = + betas_from_index_entries(List, TransientThreshold, + RPA, DPA, QPA, IndexState1), + State1 = State #vqstate { index_state = IndexState2, + ram_msg_count = RamMsgCount + RamCountsInc, + ram_bytes = RamBytes + RamBytesInc, + disk_read_count = DiskReadCount + RamCountsInc}, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being @@ -1826,26 +1943,21 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, - State1 = #vqstate { ram_msg_count = RamMsgCount }} = + {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer( - MsgStatus2, Qa, - upd_ram_bytes( - -1, MsgStatus2, - State1 #vqstate { - ram_msg_count = RamMsgCount - 1})), + State2 = stats( + ready0, {MsgStatus, MsgStatus2}, State1), + State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State2) + Qa, State3) end end. -push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState }) -> - PushState = {Quota, Delta, IndexState}, +push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3}) -> + PushState = {Quota, Delta, State}, {Q3a, PushState1} = push_betas_to_deltas( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, @@ -1854,11 +1966,10 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, Q2, PushState1), - {_, Delta1, IndexState1} = PushState2, - State #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a, - index_state = IndexState1 }. + {_, Delta1, State1} = PushState2, + State1 #vqstate { q2 = Q2a, + delta = Delta1, + q3 = Q3a }. push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of @@ -1874,11 +1985,9 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> end end. -push_betas_to_deltas1(_Generator, _Limit, Q, - {0, _Delta, _IndexState} = PushState) -> +push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) -> {Q, PushState}; -push_betas_to_deltas1(Generator, Limit, Q, - {Quota, Delta, IndexState} = PushState) -> +push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) -> case Generator(Q) of {empty, _Q} -> {Q, PushState}; @@ -1886,11 +1995,12 @@ push_betas_to_deltas1(Generator, Limit, Q, when SeqId < Limit -> {Q, PushState}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> - {#msg_status { index_on_disk = true }, IndexState1} = - maybe_write_index_to_disk(true, MsgStatus, IndexState), + {#msg_status { index_on_disk = true }, State1} = + maybe_write_index_to_disk(true, MsgStatus, State), + State2 = stats(ready0, {MsgStatus, none}, State1), Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, IndexState1}) + {Quota - 1, Delta1, State2}) end. %%---------------------------------------------------------------------------- diff --git a/src/truncate.erl b/src/truncate.erl index 820af1bf86..1b4d957d0b 100644 --- a/src/truncate.erl +++ b/src/truncate.erl @@ -45,7 +45,7 @@ report(List, Params) when is_list(List) -> [case Item of report(Other, Params) -> term(Other, Params). term(Thing, {Max, {Content, Struct, ContentDec, StructDec}}) -> - case term_limit(Thing, Max) of + case exceeds_size(Thing, Max) of true -> term(Thing, true, #params{content = Content, struct = Struct, content_dec = ContentDec, @@ -93,7 +93,7 @@ shrink_list([H|T], #params{content = Content, %% sizes. This is all going to be rather approximate though, these %% sizes are probably not very "fair" but we are just trying to see if %% we reach a fairly arbitrary limit anyway though. -term_limit(Thing, Max) -> +exceeds_size(Thing, Max) -> case term_size(Thing, Max, erlang:system_info(wordsize)) of limit_exceeded -> true; _ -> false diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 608cea9166..71359aa5ed 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -18,13 +18,34 @@ %% Generic worker pool manager. %% -%% Supports nested submission of jobs (nested jobs always run -%% immediately in current worker process). +%% Submitted jobs are functions. They can be executed asynchronously +%% (using worker_pool:submit/1, worker_pool:submit/2) or synchronously +%% (using worker_pool:submit_async/1). %% -%% Possible future enhancements: +%% We typically use the worker pool if we want to limit the maximum +%% parallelism of some job. We are not trying to dodge the cost of +%% creating Erlang processes. %% -%% 1. Allow priorities (basically, change the pending queue to a -%% priority_queue). +%% Supports nested submission of jobs and two execution modes: +%% 'single' and 'reuse'. Jobs executed in 'single' mode are invoked in +%% a one-off process. Those executed in 'reuse' mode are invoked in a +%% worker process out of the pool. Nested jobs are always executed +%% immediately in current worker process. +%% +%% 'single' mode is offered to work around a bug in Mnesia: after +%% network partitions reply messages for prior failed requests can be +%% sent to Mnesia clients - a reused worker pool process can crash on +%% receiving one. +%% +%% Caller submissions are enqueued internally. When the next worker +%% process is available, it communicates it to the pool and is +%% assigned a job to execute. If job execution fails with an error, no +%% response is returned to the caller. +%% +%% Worker processes prioritise certain command-and-control messages +%% from the pool. +%% +%% Future improvement points: job prioritisation. -behaviour(gen_server2). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 819a6ae8ce..c2d058923d 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -16,6 +16,11 @@ -module(worker_pool_worker). +%% Executes jobs (functions) submitted to a worker pool with worker_pool:submit/1, +%% worker_pool:submit/2 or worker_pool:submit_async/1. +%% +%% See worker_pool for an overview. + -behaviour(gen_server2). -export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]). diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index f54a486058..0774dd9ab5 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -1290,11 +1290,9 @@ test_spawn_remote() -> end. user(Username) -> - #user{username = Username, - tags = [administrator], - auth_backend = rabbit_auth_backend_internal, - impl = #internal_user{username = Username, - tags = [administrator]}}. + #user{username = Username, + tags = [administrator], + authz_backends = [{rabbit_auth_backend_internal, none}]}. test_confirms() -> {_Writer, Ch} = test_spawn(), @@ -1888,11 +1886,15 @@ test_backing_queue() -> passed = test_msg_store(), application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit), - passed = test_queue_index(), - passed = test_queue_index_props(), - passed = test_variable_queue(), - passed = test_variable_queue_delete_msg_store_files_callback(), - passed = test_queue_recover(), + [begin + application:set_env( + rabbit, queue_index_embed_msgs_below, Bytes), + passed = test_queue_index(), + passed = test_queue_index_props(), + passed = test_variable_queue(), + passed = test_variable_queue_delete_msg_store_files_callback(), + passed = test_queue_recover() + end || Bytes <- [0, 1024]], application:set_env(rabbit, queue_index_max_journal_entries, MaxJournal), %% We will have restarted the message store, and thus changed @@ -2219,7 +2221,7 @@ init_test_queue() -> fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, - fun nop/1), + fun nop/1, fun nop/1), ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), Res. @@ -2258,7 +2260,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> MsgId = rabbit_guid:gen(), QiM = rabbit_queue_index:publish( MsgId, SeqId, #message_properties{size = 10}, - Persistent, QiN), + Persistent, infinity, QiN), ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} end, {Qi, []}, SeqIds), @@ -2281,7 +2283,8 @@ test_queue_index_props() -> fun(Qi0) -> MsgId = rabbit_guid:gen(), Props = #message_properties{expiry=12345, size = 10}, - Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), + Qi1 = rabbit_queue_index:publish( + MsgId, 1, Props, true, infinity, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), Qi2 @@ -2422,7 +2425,7 @@ variable_queue_init(Q, Recover) -> Q, case Recover of true -> non_clean_shutdown; false -> new - end, fun nop/2, fun nop/2, fun nop/1). + end, fun nop/2, fun nop/2, fun nop/1, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). @@ -2444,7 +2447,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> end}, PayloadFun(N)), PropFun(N, #message_properties{size = 10}), - false, self(), VQN) + false, self(), noflow, VQN) end, VQ, lists:seq(Start, Start + Count - 1))). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2462,7 +2465,10 @@ variable_queue_set_ram_duration_target(Duration, VQ) -> rabbit_variable_queue:set_ram_duration_target(Duration, VQ)). assert_prop(List, Prop, Value) -> - Value = proplists:get_value(Prop, List). + case proplists:get_value(Prop, List)of + Value -> ok; + _ -> {exit, Prop, exp, Value, List} + end. assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. @@ -2485,12 +2491,18 @@ with_fresh_variable_queue(Fun) -> {delta, undefined, 0, undefined}}, {q3, 0}, {q4, 0}, {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate( - shutdown, Fun(VQ)), - Me ! Ref + try + _ = rabbit_variable_queue:delete_and_terminate( + shutdown, Fun(VQ)), + Me ! Ref + catch + Type:Error -> + Me ! {Ref, Type, Error, erlang:get_stacktrace()} + end end), receive - Ref -> ok + Ref -> ok; + {Ref, Type, Error, ST} -> exit({Type, Error, ST}) end, passed. @@ -2501,7 +2513,8 @@ publish_and_confirm(Q, Payload, Count) -> <<>>, #'P_basic'{delivery_mode = 2}, Payload), Delivery = #delivery{mandatory = false, sender = self(), - confirm = true, message = Msg, msg_seq_no = Seq}, + confirm = true, message = Msg, msg_seq_no = Seq, + flow = noflow}, _QPids = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). @@ -2787,8 +2800,6 @@ test_variable_queue_dynamic_duration_change(VQ0) -> VQ7 = lists:foldl( fun (Duration1, VQ4) -> {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), - io:format("~p:~n~p~n", - [Duration1, variable_queue_status(VQ5)]), VQ6 = variable_queue_set_ram_duration_target( Duration1, VQ5), publish_fetch_and_ack(Churn, Len, VQ6) @@ -2849,7 +2860,6 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> check_variable_queue_status(VQ0, Props) -> VQ1 = variable_queue_wait_for_shuffling_end(VQ0), S = variable_queue_status(VQ1), - io:format("~p~n", [S]), assert_props(S, Props), VQ1. |
