diff options
46 files changed, 1964 insertions, 844 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 63540568f1..7c185e5652 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -33,8 +33,8 @@ %% {handshake_timeout, 10000}, %% Log levels (currently just used for connection logging). - %% One of 'info', 'warning', 'error' or 'none', in decreasing order - %% of verbosity. Defaults to 'info'. + %% One of 'debug', 'info', 'warning', 'error' or 'none', in decreasing + %% order of verbosity. Defaults to 'info'. %% %% {log_levels, [{connection, info}]}, @@ -235,7 +235,12 @@ %% Timeout used when waiting for Mnesia tables in a cluster to %% become available. %% - %% {mnesia_table_loading_timeout, 30000} + %% {mnesia_table_loading_timeout, 30000}, + + %% Size in bytes below which to embed messages in the queue index. See + %% http://www.rabbitmq.com/persistence-conf.html + %% + %% {queue_index_embed_msgs_below, 4096} ]}, diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 8e89d7f0de..ad5f73514f 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -426,6 +426,41 @@ </listitem> </varlistentry> <varlistentry> + <term><cmdsynopsis><command>rename_cluster_node</command> <arg choice="req">oldnode1</arg> <arg choice="req">newnode1</arg> <arg choice="opt">oldnode2</arg> <arg choice="opt">newnode2 ...</arg></cmdsynopsis></term> + <listitem> + <para> + Supports renaming of cluster nodes in the local database. + </para> + <para> + This subcommand causes rabbitmqctl to temporarily become + the node in order to make the change. The local cluster + node must therefore be completely stopped; other nodes + can be online or offline. + </para> + <para> + This subcommand takes an even number of arguments, in + pairs representing the old and new names for nodes. You + must specify the old and new names for this node and for + any other nodes that are stopped and being renamed at + the same time. + </para> + <para> + It is possible to stop all nodes and rename them all + simultaneously (in which case old and new names for all + nodes must be given to every node) or stop and rename + nodes one at a time (in which case each node only needs + to be told how its own name is changing). + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl rename_cluster_node rabbit@misshelpful rabbit@cordelia</screen> + <para role="example"> + This command will rename the node + <command>rabbit@misshelpful</command> to the node + <command>rabbit@cordelia</command>. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><cmdsynopsis><command>update_cluster_nodes</command> <arg choice="req">clusternode</arg></cmdsynopsis> </term> <listitem> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 9e5584a1bf..918741438c 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -29,6 +29,7 @@ {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, + {queue_index_embed_msgs_below, 4096}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 74e165cd9b..b925dffc01 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -14,12 +14,17 @@ %% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. %% +%% Passed around most places -record(user, {username, tags, - auth_backend, %% Module this user came from - impl %% Scratch space for that module - }). + authz_backends}). %% List of {Module, AuthUserImpl} pairs +%% Passed to auth backends +-record(auth_user, {username, + tags, + impl}). + +%% Implementation for the internal auth backend -record(internal_user, {username, password_hash, tags}). -record(permission, {configure, write, read}). -record(user_vhost, {username, virtual_host}). @@ -83,7 +88,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, confirm, sender, message, msg_seq_no}). +-record(delivery, {mandatory, confirm, sender, message, msg_seq_no, flow}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, reference = undefined, timestamp}). diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index 53fc31fcfd..5dc802a8c2 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -7,9 +7,9 @@ dist: tar -zxf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) - mkdir $(SOURCE_DIR)/sbin + mkdir -p $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/*.bat $(SOURCE_DIR)/sbin - mkdir $(SOURCE_DIR)/etc + mkdir -p $(SOURCE_DIR)/etc cp $(SOURCE_DIR)/docs/rabbitmq.config.example $(SOURCE_DIR)/etc/rabbitmq.config.example cp README-etc $(SOURCE_DIR)/etc/README.txt rm -rf $(SOURCE_DIR)/scripts diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 29c4b0c86a..22fec32f0a 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -19,11 +19,11 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env +RABBITMQ_USE_LONGNAME=${RABBITMQ_USE_LONGNAME} \ exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ - ${RABBITMQ_NAME_TYPE} rabbitmq-plugins$$ \ -boot "${CLEAN_BOOT_FILE}" \ -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 1ed648b0bc..b27a8586b2 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -23,14 +23,6 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
-if "!RABBITMQ_USE_LONGNAME!"=="" (
- set RABBITMQ_NAME_TYPE="-sname"
-)
-
-if "!RABBITMQ_USE_LONGNAME!"=="true" (
- set RABBITMQ_NAME_TYPE="-name"
-)
-
if "!RABBITMQ_SERVICENAME!"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
@@ -63,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_NAME_TYPE! rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index d437e251ab..2237275c47 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -19,11 +19,6 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -# rabbitmqctl starts distribution itself, so we need to make sure epmd -# is running. -${ERL_DIR}erl ${RABBITMQ_NAME_TYPE} rabbitmqctl-prelaunch-$$ -noinput \ --eval 'erlang:halt().' -boot "${CLEAN_BOOT_FILE}" - # We specify Mnesia dir and sasl error logger since some actions # (e.g. forget_cluster_node --offline) require us to impersonate the # real node. diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 6eb2530a4e..a3734088f6 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -27,14 +27,6 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
-if "!RABBITMQ_USE_LONGNAME!"=="" (
- set RABBITMQ_NAME_TYPE="-sname"
-)
-
-if "!RABBITMQ_USE_LONGNAME!"=="true" (
- set RABBITMQ_NAME_TYPE="-name"
-)
-
if "!COMPUTERNAME!"=="" (
set COMPUTERNAME=localhost
)
@@ -63,10 +55,6 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-rem rabbitmqctl starts distribution itself, so we need to make sure epmd
-rem is running.
-"!ERLANG_HOME!\bin\erl.exe" !RABBITMQ_NAME_TYPE! rabbitmqctl-prelaunch-!RANDOM!!TIME:~9! -noinput -eval "erlang:halt()."
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!TDP0!..\ebin" ^
-noinput ^
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 3a7a692c5c..5a916c75bc 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -178,6 +178,10 @@ write_buffer_size, write_buffer_size_limit, write_buffer, + read_buffer, + read_buffer_pos, + read_buffer_rem, %% Num of bytes from pos to end + read_buffer_size_limit, at_eof, path, mode, @@ -237,7 +241,8 @@ -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(open/3 :: (file:filename(), [any()], - [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) + [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} | + {'read_buffer', (non_neg_integer() | 'unbuffered')}]) -> val_or_error(ref())). -spec(close/1 :: (ref()) -> ok_or_error()). -spec(read/2 :: (ref(), non_neg_integer()) -> @@ -331,16 +336,48 @@ close(Ref) -> read(Ref, Count) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; - ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> - case prim_file:read(Hdl, Count) of - {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), - {Obj, - [Handle #handle { offset = Offset1 }]}; - eof -> {eof, [Handle #handle { at_eof = true }]}; - Error -> {Error, [Handle]} + ([Handle = #handle{read_buffer = Buf, + read_buffer_pos = BufPos, + read_buffer_rem = BufRem, + offset = Offset}]) when BufRem >= Count -> + <<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf, + {{ok, Res}, [Handle#handle{offset = Offset + Count, + read_buffer_pos = BufPos + Count, + read_buffer_rem = BufRem - Count}]}; + ([Handle = #handle{read_buffer = Buf, + read_buffer_pos = BufPos, + read_buffer_rem = BufRem, + read_buffer_size_limit = BufSzLimit, + hdl = Hdl, + offset = Offset}]) -> + WantedCount = Count - BufRem, + case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of + {ok, Data} -> + <<_:BufPos/binary, BufTl/binary>> = Buf, + ReadCount = size(Data), + case ReadCount < WantedCount of + true -> + OffSet1 = Offset + BufRem + ReadCount, + {{ok, <<BufTl/binary, Data/binary>>}, + [reset_read_buffer( + Handle#handle{offset = OffSet1})]}; + false -> + <<Hd:WantedCount/binary, _/binary>> = Data, + OffSet1 = Offset + BufRem + WantedCount, + BufRem1 = ReadCount - WantedCount, + {{ok, <<BufTl/binary, Hd/binary>>}, + [Handle#handle{offset = OffSet1, + read_buffer = Data, + read_buffer_pos = WantedCount, + read_buffer_rem = BufRem1}]} + end; + eof -> + {eof, [Handle #handle { at_eof = true }]}; + Error -> + {Error, [reset_read_buffer(Handle)]} end end). @@ -355,7 +392,7 @@ append(Ref, Data) -> write_buffer_size_limit = 0, at_eof = true } = Handle1} -> Offset1 = Offset + iolist_size(Data), - {prim_file:write(Hdl, Data), + {prim_file_write(Hdl, Data), [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; {{ok, _Offset}, #handle { write_buffer = WriteBuffer, write_buffer_size = Size, @@ -377,12 +414,12 @@ append(Ref, Data) -> sync(Ref) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([#handle { is_dirty = false, write_buffer = [] }]) -> ok; ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> - case prim_file:sync(Hdl) of + case prim_file_sync(Hdl) of ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end @@ -397,7 +434,7 @@ needs_sync(Ref) -> position(Ref, NewOffset) -> with_flushed_handles( - [Ref], + [Ref], keep, fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle), {Result, [Handle1]} end). @@ -465,8 +502,8 @@ clear(Ref) -> fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> ok; ([Handle]) -> - case maybe_seek(bof, Handle #handle { write_buffer = [], - write_buffer_size = 0 }) of + case maybe_seek(bof, Handle#handle{write_buffer = [], + write_buffer_size = 0}) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> case prim_file:truncate(Hdl) of ok -> {ok, [Handle1 #handle { at_eof = true }]}; @@ -539,6 +576,21 @@ info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity). %% Internal functions %%---------------------------------------------------------------------------- +prim_file_read(Hdl, Size) -> + file_handle_cache_stats:update( + io_read, Size, fun() -> prim_file:read(Hdl, Size) end). + +prim_file_write(Hdl, Bytes) -> + file_handle_cache_stats:update( + io_write, iolist_size(Bytes), fun() -> prim_file:write(Hdl, Bytes) end). + +prim_file_sync(Hdl) -> + file_handle_cache_stats:update(io_sync, fun() -> prim_file:sync(Hdl) end). + +prim_file_position(Hdl, NewOffset) -> + file_handle_cache_stats:update( + io_seek, fun() -> prim_file:position(Hdl, NewOffset) end). + is_reader(Mode) -> lists:member(read, Mode). is_writer(Mode) -> lists:member(write, Mode). @@ -550,8 +602,15 @@ append_to_write(Mode) -> end. with_handles(Refs, Fun) -> + with_handles(Refs, reset, Fun). + +with_handles(Refs, ReadBuffer, Fun) -> case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of - {ok, Handles} -> + {ok, Handles0} -> + Handles = case ReadBuffer of + reset -> [reset_read_buffer(H) || H <- Handles0]; + keep -> Handles0 + end, case Fun(Handles) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), @@ -564,8 +623,11 @@ with_handles(Refs, Fun) -> end. with_flushed_handles(Refs, Fun) -> + with_flushed_handles(Refs, reset, Fun). + +with_flushed_handles(Refs, ReadBuffer, Fun) -> with_handles( - Refs, + Refs, ReadBuffer, fun (Handles) -> case lists:foldl( fun (Handle, {ok, HandlesAcc}) -> @@ -611,20 +673,23 @@ reopen([], Tree, RefHdls) -> {ok, lists:reverse(RefHdls)}; reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, path = Path, - mode = Mode, + mode = Mode0, offset = Offset, last_used_at = undefined }} | RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> - case prim_file:open(Path, case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end) of + Mode = case NewOrReopen of + new -> Mode0; + reopen -> file_handle_cache_stats:update(io_reopen), + [read | Mode0] + end, + case prim_file:open(Path, Mode) of {ok, Hdl} -> Now = now(), {{ok, _Offset}, Handle1} = - maybe_seek(Offset, Handle #handle { hdl = Hdl, - offset = 0, - last_used_at = Now }), + maybe_seek(Offset, reset_read_buffer( + Handle#handle{hdl = Hdl, + offset = 0, + last_used_at = Now})), put({Ref, fhc_handle}, Handle1), reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), [{Ref, Handle1} | RefHdls]); @@ -709,6 +774,11 @@ new_closed_handle(Path, Mode, Options) -> infinity -> infinity; N when is_integer(N) -> N end, + ReadBufferSize = + case proplists:get_value(read_buffer, Options, unbuffered) of + unbuffered -> 0; + N2 when is_integer(N2) -> N2 + end, Ref = make_ref(), put({Ref, fhc_handle}, #handle { hdl = closed, offset = 0, @@ -716,6 +786,10 @@ new_closed_handle(Path, Mode, Options) -> write_buffer_size = 0, write_buffer_size_limit = WriteBufferSize, write_buffer = [], + read_buffer = <<>>, + read_buffer_pos = 0, + read_buffer_rem = 0, + read_buffer_size_limit = ReadBufferSize, at_eof = false, path = Path, mode = Mode, @@ -742,7 +816,7 @@ soft_close(Handle) -> is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of - true -> prim_file:sync(Hdl); + true -> prim_file_sync(Hdl); false -> ok end, ok = prim_file:close(Hdl), @@ -776,17 +850,31 @@ hard_close(Handle) -> Result end. -maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, - at_eof = AtEoF }) -> - {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), - case (case NeedsSeek of - true -> prim_file:position(Hdl, NewOffset); - false -> {ok, Offset} - end) of - {ok, Offset1} = Result -> - {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }}; - {error, _} = Error -> - {Error, Handle} +maybe_seek(New, Handle = #handle{hdl = Hdl, + offset = Old, + read_buffer_pos = BufPos, + read_buffer_rem = BufRem, + at_eof = AtEoF}) -> + {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Old, New), + case NeedsSeek of + true when is_number(New) andalso + ((New >= Old andalso New =< BufRem + Old) + orelse (New < Old andalso Old - New =< BufPos)) -> + Diff = New - Old, + {{ok, New}, Handle#handle{offset = New, + at_eof = AtEoF1, + read_buffer_pos = BufPos + Diff, + read_buffer_rem = BufRem - Diff}}; + true -> + case prim_file_position(Hdl, New) of + {ok, Offset1} = Result -> + {Result, reset_read_buffer(Handle#handle{offset = Offset1, + at_eof = AtEoF1})}; + {error, _} = Error -> + {Error, Handle} + end; + false -> + {{ok, Old}, Handle} end. needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false}; @@ -817,7 +905,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, write_buffer = WriteBuffer, write_buffer_size = DataSize, at_eof = true }) -> - case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of + case prim_file_write(Hdl, lists:reverse(WriteBuffer)) of ok -> Offset1 = Offset + DataSize, {ok, Handle #handle { offset = Offset1, is_dirty = true, @@ -826,6 +914,11 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, {Error, Handle} end. +reset_read_buffer(Handle) -> + Handle#handle{read_buffer = <<>>, + read_buffer_pos = 0, + read_buffer_rem = 0}. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(total_limit, #fhc_state{limit = Limit}) -> Limit; @@ -843,6 +936,7 @@ used(#fhc_state{open_count = C1, %%---------------------------------------------------------------------------- init([AlarmSet, AlarmClear]) -> + file_handle_cache_stats:init(), Limit = case application:get_env(file_handles_high_watermark) of {ok, Watermark} when (is_integer(Watermark) andalso Watermark > 0) -> diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl new file mode 100644 index 0000000000..de2f90c67a --- /dev/null +++ b/src/file_handle_cache_stats.erl @@ -0,0 +1,67 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(file_handle_cache_stats). + +%% stats about read / write operations that go through the fhc. + +-export([init/0, update/3, update/2, update/1, get/0]). + +-define(TABLE, ?MODULE). + +-define(COUNT, + [io_reopen, mnesia_ram_tx, mnesia_disk_tx, + msg_store_read, msg_store_write, + queue_index_journal_write, queue_index_write, queue_index_read]). +-define(COUNT_TIME, [io_sync, io_seek]). +-define(COUNT_TIME_BYTES, [io_read, io_write]). + +init() -> + ets:new(?TABLE, [public, named_table]), + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME_BYTES, + Counter <- [count, bytes, time]], + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME, + Counter <- [count, time]], + [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT, + Counter <- [count]]. + +update(Op, Bytes, Thunk) -> + {Time, Res} = timer_tc(Thunk), + ets:update_counter(?TABLE, {Op, count}, 1), + ets:update_counter(?TABLE, {Op, bytes}, Bytes), + ets:update_counter(?TABLE, {Op, time}, Time), + Res. + +update(Op, Thunk) -> + {Time, Res} = timer_tc(Thunk), + ets:update_counter(?TABLE, {Op, count}, 1), + ets:update_counter(?TABLE, {Op, time}, Time), + Res. + +update(Op) -> + ets:update_counter(?TABLE, {Op, count}, 1), + ok. + +get() -> + lists:sort(ets:tab2list(?TABLE)). + +%% TODO timer:tc/1 was introduced in R14B03; use that function once we +%% require that version. +timer_tc(Thunk) -> + T1 = os:timestamp(), + Res = Thunk(), + T2 = os:timestamp(), + {timer:now_diff(T2, T1), Res}. diff --git a/src/rabbit.erl b/src/rabbit.erl index 664da20688..ba78b1eee0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -118,6 +118,13 @@ {requires, [rabbit_alarm, guid_generator]}, {enables, core_initialized}]}). +-rabbit_boot_step({rabbit_epmd_monitor, + [{description, "epmd monitor"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_epmd_monitor]}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + -rabbit_boot_step({core_initialized, [{description, "core initialized"}, {requires, kernel_ready}]}). @@ -243,15 +250,19 @@ maybe_hipe_compile() -> {ok, Want} = application:get_env(rabbit, hipe_compile), Can = code:which(hipe) =/= non_existing, case {Want, Can} of - {true, true} -> hipe_compile(), - true; + {true, true} -> hipe_compile(); {true, false} -> false; - {false, _} -> true + {false, _} -> {ok, disabled} end. -warn_if_hipe_compilation_failed(true) -> +log_hipe_result({ok, disabled}) -> ok; -warn_if_hipe_compilation_failed(false) -> +log_hipe_result({ok, Count, Duration}) -> + rabbit_log:info( + "HiPE in use: compiled ~B modules in ~Bs.~n", [Count, Duration]); +log_hipe_result(false) -> + io:format( + "~nNot HiPE compiling: HiPE not found in this Erlang installation.~n"), rabbit_log:warning( "Not HiPE compiling: HiPE not found in this Erlang installation.~n"). @@ -276,8 +287,9 @@ hipe_compile() -> {'DOWN', MRef, process, _, Reason} -> exit(Reason) end || {_Pid, MRef} <- PidMRefs], T2 = erlang:now(), - io:format("|~n~nCompiled ~B modules in ~Bs~n", - [Count, timer:now_diff(T2, T1) div 1000000]). + Duration = timer:now_diff(T2, T1) div 1000000, + io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]), + {ok, Count, Duration}. split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]). @@ -307,9 +319,9 @@ start() -> boot() -> start_it(fun() -> ok = ensure_application_loaded(), - Success = maybe_hipe_compile(), + HipeResult = maybe_hipe_compile(), ok = ensure_working_log_handlers(), - warn_if_hipe_compilation_failed(Success), + log_hipe_result(HipeResult), rabbit_node_monitor:prepare_cluster_status_files(), ok = rabbit_upgrade:maybe_upgrade_mnesia(), %% It's important that the consistency check happens after diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b0a9c0d807..41c54b07a2 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -19,7 +19,7 @@ -include("rabbit.hrl"). -export([check_user_pass_login/2, check_user_login/2, check_user_loopback/2, - check_vhost_access/2, check_resource_access/3]). + check_vhost_access/3, check_resource_access/3]). %%---------------------------------------------------------------------------- @@ -31,15 +31,17 @@ -spec(check_user_pass_login/2 :: (rabbit_types:username(), rabbit_types:password()) - -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). + -> {'ok', rabbit_types:user()} | + {'refused', rabbit_types:username(), string(), [any()]}). -spec(check_user_login/2 :: (rabbit_types:username(), [{atom(), any()}]) - -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). + -> {'ok', rabbit_types:user()} | + {'refused', rabbit_types:username(), string(), [any()]}). -spec(check_user_loopback/2 :: (rabbit_types:username(), rabbit_net:socket() | inet:ip_address()) -> 'ok' | 'not_allowed'). --spec(check_vhost_access/2 :: - (rabbit_types:user(), rabbit_types:vhost()) +-spec(check_vhost_access/3 :: + (rabbit_types:user(), rabbit_types:vhost(), rabbit_net:socket()) -> 'ok' | rabbit_types:channel_exit()). -spec(check_resource_access/3 :: (rabbit_types:user(), rabbit_types:r(atom()), permission_atom()) @@ -55,36 +57,71 @@ check_user_pass_login(Username, Password) -> check_user_login(Username, AuthProps) -> {ok, Modules} = application:get_env(rabbit, auth_backends), R = lists:foldl( - fun ({ModN, ModZ}, {refused, _, _}) -> + fun ({ModN, ModZs0}, {refused, _, _, _}) -> + ModZs = case ModZs0 of + A when is_atom(A) -> [A]; + L when is_list(L) -> L + end, %% Different modules for authN vs authZ. So authenticate %% with authN module, then if that succeeds do - %% passwordless (i.e pre-authenticated) login with authZ - %% module, and use the #user{} the latter gives us. - case try_login(ModN, Username, AuthProps) of - {ok, _} -> try_login(ModZ, Username, []); - Else -> Else + %% passwordless (i.e pre-authenticated) login with authZ. + case try_authenticate(ModN, Username, AuthProps) of + {ok, ModNUser = #auth_user{username = Username2}} -> + user(ModNUser, try_authorize(ModZs, Username2)); + Else -> + Else end; - (Mod, {refused, _, _}) -> + (Mod, {refused, _, _, _}) -> %% Same module for authN and authZ. Just take the result %% it gives us - try_login(Mod, Username, AuthProps); + case try_authenticate(Mod, Username, AuthProps) of + {ok, ModNUser = #auth_user{impl = Impl}} -> + user(ModNUser, {ok, [{Mod, Impl}]}); + Else -> + Else + end; (_, {ok, User}) -> %% We've successfully authenticated. Skip to the end... {ok, User} - end, {refused, "No modules checked '~s'", [Username]}, Modules), - rabbit_event:notify(case R of - {ok, _User} -> user_authentication_success; - _ -> user_authentication_failure - end, [{name, Username}]), + end, + {refused, Username, "No modules checked '~s'", [Username]}, Modules), R. -try_login(Module, Username, AuthProps) -> - case Module:check_user_login(Username, AuthProps) of - {error, E} -> {refused, "~s failed authenticating ~s: ~p~n", - [Module, Username, E]}; - Else -> Else +try_authenticate(Module, Username, AuthProps) -> + case Module:user_login_authentication(Username, AuthProps) of + {ok, AuthUser} -> {ok, AuthUser}; + {error, E} -> {refused, Username, + "~s failed authenticating ~s: ~p~n", + [Module, Username, E]}; + {refused, F, A} -> {refused, Username, F, A} end. +try_authorize(Modules, Username) -> + lists:foldr( + fun (Module, {ok, ModsImpls}) -> + case Module:user_login_authorization(Username) of + {ok, Impl} -> {ok, [{Module, Impl} | ModsImpls]}; + {error, E} -> {refused, Username, + "~s failed authorizing ~s: ~p~n", + [Module, Username, E]}; + {refused, F, A} -> {refused, Username, F, A} + end; + (_, {refused, F, A}) -> + {refused, Username, F, A} + end, {ok, []}, Modules). + +user(#auth_user{username = Username, tags = Tags}, {ok, ModZImpls}) -> + {ok, #user{username = Username, + tags = Tags, + authz_backends = ModZImpls}}; +user(_AuthUser, Error) -> + Error. + +auth_user(#user{username = Username, tags = Tags}, Impl) -> + #auth_user{username = Username, + tags = Tags, + impl = Impl}. + check_user_loopback(Username, SockOrAddr) -> {ok, Users} = application:get_env(rabbit, loopback_users), case rabbit_net:is_loopback(SockOrAddr) @@ -93,29 +130,38 @@ check_user_loopback(Username, SockOrAddr) -> false -> not_allowed end. -check_vhost_access(User = #user{ username = Username, - auth_backend = Module }, VHostPath) -> - check_access( - fun() -> - %% TODO this could be an andalso shortcut under >R13A - case rabbit_vhost:exists(VHostPath) of - false -> false; - true -> Module:check_vhost_access(User, VHostPath) - end - end, - Module, "access to vhost '~s' refused for user '~s'", - [VHostPath, Username]). +check_vhost_access(User = #user{username = Username, + authz_backends = Modules}, VHostPath, Sock) -> + lists:foldl( + fun({Mod, Impl}, ok) -> + check_access( + fun() -> + rabbit_vhost:exists(VHostPath) andalso + Mod:check_vhost_access( + auth_user(User, Impl), VHostPath, Sock) + end, + Mod, "access to vhost '~s' refused for user '~s'", + [VHostPath, Username]); + (_, Else) -> + Else + end, ok, Modules). check_resource_access(User, R = #resource{kind = exchange, name = <<"">>}, Permission) -> check_resource_access(User, R#resource{name = <<"amq.default">>}, Permission); -check_resource_access(User = #user{username = Username, auth_backend = Module}, +check_resource_access(User = #user{username = Username, + authz_backends = Modules}, Resource, Permission) -> - check_access( - fun() -> Module:check_resource_access(User, Resource, Permission) end, - Module, "access to ~s refused for user '~s'", - [rabbit_misc:rs(Resource), Username]). + lists:foldl( + fun({Module, Impl}, ok) -> + check_access( + fun() -> Module:check_resource_access( + auth_user(User, Impl), Resource, Permission) end, + Module, "access to ~s refused for user '~s'", + [rabbit_misc:rs(Resource), Username]); + (_, Else) -> Else + end, ok, Modules). check_access(Fun, Module, ErrStr, ErrArgs) -> Allow = case Fun() of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0dfca854af..ca5eb34874 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -23,7 +23,7 @@ -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([list_down/1]). -export([force_event_refresh/1, notify_policy_changed/1]). @@ -149,8 +149,6 @@ -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> qpids()). --spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - qpids()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -623,10 +621,6 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }) -> purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). -deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). - -deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). - requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). @@ -816,15 +810,20 @@ immutable(Q) -> Q#amqqueue{pid = none, decorators = none, state = none}. -deliver([], _Delivery, _Flow) -> +deliver([], _Delivery) -> %% /dev/null optimisation []; -deliver(Qs, Delivery, Flow) -> +deliver(Qs, Delivery = #delivery{flow = Flow}) -> {MPids, SPids} = qpids(Qs), QPids = MPids ++ SPids, + %% We use up two credits to send to a slave since the message + %% arrives at the slave from two directions. We will ack one when + %% the slave receives the message direct from the channel, and the + %% other when it receives it via GM. case Flow of - flow -> [credit_flow:send(QPid) || QPid <- QPids]; + flow -> [credit_flow:send(QPid) || QPid <- QPids], + [credit_flow:send(QPid) || QPid <- SPids]; noflow -> ok end, @@ -833,8 +832,8 @@ deliver(Qs, Delivery, Flow) -> %% after they have become master they should mark the message as %% 'delivered' since they do not know what the master may have %% done with it. - MMsg = {deliver, Delivery, false, Flow}, - SMsg = {deliver, Delivery, true, Flow}, + MMsg = {deliver, Delivery, false}, + SMsg = {deliver, Delivery, true}, delegate:cast(MPids, MMsg), delegate:cast(SPids, SMsg), QPids. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a18df22522..1b9427da1f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -498,12 +498,13 @@ send_mandatory(#delivery{mandatory = true, discard(#delivery{confirm = Confirm, sender = SenderPid, + flow = Flow, message = #basic_message{id = MsgId}}, BQ, BQS, MTC) -> MTC1 = case Confirm of true -> confirm_messages([MsgId], MTC); false -> MTC end, - BQS1 = BQ:discard(MsgId, SenderPid, BQS), + BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -525,14 +526,17 @@ run_message_queue(ActiveConsumersChanged, State) -> end end. -attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, +attempt_delivery(Delivery = #delivery{sender = SenderPid, + flow = Flow, + message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS, msg_id_to_channel = MTC}) -> case rabbit_queue_consumers:deliver( fun (true) -> true = BQ:is_empty(BQS), - {AckTag, BQS1} = BQ:publish_delivered( - Message, Props, SenderPid, BQS), + {AckTag, BQS1} = + BQ:publish_delivered( + Message, Props, SenderPid, Flow, BQS), {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, discard(Delivery, BQ, BQS, MTC)} @@ -549,7 +553,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, State#q{consumers = Consumers})} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, +deliver_or_enqueue(Delivery = #delivery{message = Message, + sender = SenderPid, + flow = Flow}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms @@ -570,7 +576,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> - BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), + BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = maybe_drop_head(State3#q{backing_queue_state = BQS3}), QLen = BQ:len(BQS4), @@ -1100,15 +1106,23 @@ handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender, + flow = Flow}, SlaveWhenPublished}, State = #q{senders = Senders}) -> Senders1 = case Flow of flow -> credit_flow:ack(Sender), + case SlaveWhenPublished of + true -> credit_flow:ack(Sender); %% [0] + false -> ok + end, pmon:monitor(Sender, Senders); noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, Delivered, State1)); + noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1)); +%% [0] The second ack is since the channel thought we were a slave at +%% the time it published this message, so it used two credits (see +%% rabbit_amqqueue:deliver/2). handle_cast({ack, AckTags, ChPid}, State) -> noreply(ack(AckTags, ChPid, State)); diff --git a/src/rabbit_auth_backend_dummy.erl b/src/rabbit_auth_backend_dummy.erl index 5daca368eb..d2f07c1d57 100644 --- a/src/rabbit_auth_backend_dummy.erl +++ b/src/rabbit_auth_backend_dummy.erl @@ -17,11 +17,12 @@ -module(rabbit_auth_backend_dummy). -include("rabbit.hrl"). --behaviour(rabbit_auth_backend). +-behaviour(rabbit_authn_backend). +-behaviour(rabbit_authz_backend). --export([description/0]). -export([user/0]). --export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). +-export([user_login_authentication/2, user_login_authorization/1, + check_vhost_access/3, check_resource_access/3]). -ifdef(use_specs). @@ -31,19 +32,17 @@ %% A user to be used by the direct client when permission checks are %% not needed. This user can do anything AMQPish. -user() -> #user{username = <<"none">>, - tags = [], - auth_backend = ?MODULE, - impl = none}. +user() -> #user{username = <<"none">>, + tags = [], + authz_backends = [{?MODULE, none}]}. %% Implementation of rabbit_auth_backend -description() -> - [{name, <<"Dummy">>}, - {description, <<"Database for the dummy user">>}]. +user_login_authentication(_, _) -> + {refused, "cannot log in conventionally as dummy user", []}. -check_user_login(_, _) -> +user_login_authorization(_) -> {refused, "cannot log in conventionally as dummy user", []}. -check_vhost_access(#user{}, _VHostPath) -> true. -check_resource_access(#user{}, #resource{}, _Permission) -> true. +check_vhost_access(#auth_user{}, _VHostPath, _Sock) -> true. +check_resource_access(#auth_user{}, #resource{}, _Permission) -> true. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index fd1c4e8ee6..20a5766d20 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -17,10 +17,11 @@ -module(rabbit_auth_backend_internal). -include("rabbit.hrl"). --behaviour(rabbit_auth_backend). +-behaviour(rabbit_authn_backend). +-behaviour(rabbit_authz_backend). --export([description/0]). --export([check_user_login/2, check_vhost_access/2, check_resource_access/3]). +-export([user_login_authentication/2, user_login_authorization/1, + check_vhost_access/3, check_resource_access/3]). -export([add_user/2, delete_user/1, lookup_user/1, change_password/2, clear_password/1, @@ -76,13 +77,9 @@ %%---------------------------------------------------------------------------- %% Implementation of rabbit_auth_backend -description() -> - [{name, <<"Internal">>}, - {description, <<"Internal user / password database">>}]. - -check_user_login(Username, []) -> +user_login_authentication(Username, []) -> internal_check_user_login(Username, fun(_) -> true end); -check_user_login(Username, [{password, Cleartext}]) -> +user_login_authentication(Username, [{password, Cleartext}]) -> internal_check_user_login( Username, fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) -> @@ -90,25 +87,30 @@ check_user_login(Username, [{password, Cleartext}]) -> (#internal_user{}) -> false end); -check_user_login(Username, AuthProps) -> +user_login_authentication(Username, AuthProps) -> exit({unknown_auth_props, Username, AuthProps}). +user_login_authorization(Username) -> + case user_login_authentication(Username, []) of + {ok, #auth_user{impl = Impl}} -> {ok, Impl}; + Else -> Else + end. + internal_check_user_login(Username, Fun) -> Refused = {refused, "user '~s' - invalid credentials", [Username]}, case lookup_user(Username) of {ok, User = #internal_user{tags = Tags}} -> case Fun(User) of - true -> {ok, #user{username = Username, - tags = Tags, - auth_backend = ?MODULE, - impl = User}}; + true -> {ok, #auth_user{username = Username, + tags = Tags, + impl = none}}; _ -> Refused end; {error, not_found} -> Refused end. -check_vhost_access(#user{username = Username}, VHostPath) -> +check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) -> case mnesia:dirty_read({rabbit_user_permission, #user_vhost{username = Username, virtual_host = VHostPath}}) of @@ -116,7 +118,7 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [_R] -> true end. -check_resource_access(#user{username = Username}, +check_resource_access(#auth_user{username = Username}, #resource{virtual_host = VHostPath, name = Name}, Permission) -> case mnesia:dirty_read({rabbit_user_permission, diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index d11af09552..c8e23a75d5 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -36,13 +36,13 @@ %% Another round is needed. Here's the state I want next time. %% {protocol_error, Msg, Args} %% Client got the protocol wrong. Log and die. -%% {refused, Msg, Args} +%% {refused, Username, Msg, Args} %% Client failed authentication. Log and die. -callback handle_response(binary(), any()) -> {'ok', rabbit_types:user()} | {'challenge', binary(), any()} | {'protocol_error', string(), [any()]} | - {'refused', string(), [any()]}. + {'refused', rabbit_types:username() | none, string(), [any()]}. -else. diff --git a/src/rabbit_authn_backend.erl b/src/rabbit_authn_backend.erl new file mode 100644 index 0000000000..cfc3f5db51 --- /dev/null +++ b/src/rabbit_authn_backend.erl @@ -0,0 +1,49 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_authn_backend). + +-include("rabbit.hrl"). + +-ifdef(use_specs). + +%% Check a user can log in, given a username and a proplist of +%% authentication information (e.g. [{password, Password}]). If your +%% backend is not to be used for authentication, this should always +%% refuse access. +%% +%% Possible responses: +%% {ok, User} +%% Authentication succeeded, and here's the user record. +%% {error, Error} +%% Something went wrong. Log and die. +%% {refused, Msg, Args} +%% Client failed authentication. Log and die. +-callback user_login_authentication(rabbit_types:username(), [term()]) -> + {'ok', rabbit_types:auth_user()} | + {'refused', string(), [any()]} | + {'error', any()}. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{user_login_authentication, 2}]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_authz_backend.erl index a7dd6494b1..ff5f014e37 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_authz_backend.erl @@ -14,47 +14,49 @@ %% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. %% --module(rabbit_auth_backend). +-module(rabbit_authz_backend). --ifdef(use_specs). +-include("rabbit.hrl"). -%% A description proplist as with auth mechanisms, -%% exchanges. Currently unused. --callback description() -> [proplists:property()]. +-ifdef(use_specs). -%% Check a user can log in, given a username and a proplist of -%% authentication information (e.g. [{password, Password}]). +%% Check a user can log in, when this backend is being used for +%% authorisation only. Authentication has already taken place +%% successfully, but we need to check that the user exists in this +%% backend, and initialise any impl field we will want to have passed +%% back in future calls to check_vhost_access/3 and +%% check_resource_access/3. %% %% Possible responses: -%% {ok, User} -%% Authentication succeeded, and here's the user record. +%% {ok, Impl} +%% User authorisation succeeded, and here's the impl field. %% {error, Error} %% Something went wrong. Log and die. %% {refused, Msg, Args} -%% Client failed authentication. Log and die. --callback check_user_login(rabbit_types:username(), [term()]) -> - {'ok', rabbit_types:user()} | +%% User authorisation failed. Log and die. +-callback user_login_authorization(rabbit_types:username()) -> + {'ok', any()} | {'refused', string(), [any()]} | {'error', any()}. -%% Given #user and vhost, can a user log in to a vhost? +%% Given #auth_user and vhost, can a user log in to a vhost? %% Possible responses: %% true %% false %% {error, Error} %% Something went wrong. Log and die. --callback check_vhost_access(rabbit_types:user(), rabbit_types:vhost()) -> +-callback check_vhost_access(rabbit_types:auth_user(), + rabbit_types:vhost(), rabbit_net:socket()) -> boolean() | {'error', any()}. - -%% Given #user, resource and permission, can a user access a resource? +%% Given #auth_user, resource and permission, can a user access a resource? %% %% Possible responses: %% true %% false %% {error, Error} %% Something went wrong. Log and die. --callback check_resource_access(rabbit_types:user(), +-callback check_resource_access(rabbit_types:auth_user(), rabbit_types:r(atom()), rabbit_access_control:permission_atom()) -> boolean() | {'error', any()}. @@ -64,8 +66,8 @@ -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {check_user_login, 2}, {check_vhost_access, 2}, - {check_resource_access, 3}]; + [{user_login_authorization, 1}, + {check_vhost_access, 3}, {check_resource_access, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 4ce133c3e3..0d00ced756 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -30,6 +30,7 @@ -type(ack() :: any()). -type(state() :: any()). +-type(flow() :: 'flow' | 'noflow'). -type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). @@ -99,19 +100,20 @@ %% Publish a message. -callback publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), boolean(), pid(), + rabbit_types:message_properties(), boolean(), pid(), flow(), state()) -> state(). %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). -callback publish_delivered(rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) + rabbit_types:message_properties(), pid(), flow(), + state()) -> {ack(), state()}. %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. --callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). +-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). @@ -249,8 +251,8 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5}, - {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, + {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 6}, + {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 67109e7d5a..cd2846c00b 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -114,7 +114,7 @@ publish(X, Delivery) -> delivery(Mandatory, Confirm, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), - message = Message, msg_seq_no = MsgSeqNo}. + message = Message, msg_seq_no = MsgSeqNo, flow = noflow}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> build_content(Properties, [BodyBin]); diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 3ab82cad78..ee8147f45f 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -41,48 +41,90 @@ %% parse_table supports the AMQP 0-8/0-9 standard types, S, I, D, T %% and F, as well as the QPid extensions b, d, f, l, s, t, x, and V. +-define(SIMPLE_PARSE_TABLE(BType, Pattern, RType), + parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + BType, Pattern, Rest/binary>>) -> + [{NameString, RType, Value} | parse_table(Rest)]). + +%% Note that we try to put these in approximately the order we expect +%% to hit them, that's why the empty binary is half way through. + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{NameString, longstr, Value} | parse_table(Rest)]; + +?SIMPLE_PARSE_TABLE($I, Value:32/signed, signedint); +?SIMPLE_PARSE_TABLE($T, Value:64/unsigned, timestamp); + parse_table(<<>>) -> []; -parse_table(<<NLen:8/unsigned, NameString:NLen/binary, ValueAndRest/binary>>) -> - {Type, Value, Rest} = parse_field_value(ValueAndRest), - [{NameString, Type, Value} | parse_table(Rest)]. -parse_array(<<>>) -> - []; -parse_array(<<ValueAndRest/binary>>) -> - {Type, Value, Rest} = parse_field_value(ValueAndRest), - [{Type, Value} | parse_array(Rest)]. +?SIMPLE_PARSE_TABLE($b, Value:8/signed, byte); +?SIMPLE_PARSE_TABLE($d, Value:64/float, double); +?SIMPLE_PARSE_TABLE($f, Value:32/float, float); +?SIMPLE_PARSE_TABLE($l, Value:64/signed, long); +?SIMPLE_PARSE_TABLE($s, Value:16/signed, short); + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $t, Value:8/unsigned, Rest/binary>>) -> + [{NameString, bool, (Value /= 0)} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> + [{NameString, decimal, {Before, After}} | parse_table(Rest)]; -parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> - {longstr, V, R}; +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{NameString, table, parse_table(Value)} | parse_table(Rest)]; -parse_field_value(<<$I, V:32/signed, R/binary>>) -> - {signedint, V, R}; +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{NameString, array, parse_array(Value)} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{NameString, binary, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, + $V, Rest/binary>>) -> + [{NameString, void, undefined} | parse_table(Rest)]. + +-define(SIMPLE_PARSE_ARRAY(BType, Pattern, RType), + parse_array(<<BType, Pattern, Rest/binary>>) -> + [{RType, Value} | parse_array(Rest)]). + +parse_array(<<$S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{longstr, Value} | parse_array(Rest)]; + +?SIMPLE_PARSE_ARRAY($I, Value:32/signed, signedint); +?SIMPLE_PARSE_ARRAY($T, Value:64/unsigned, timestamp); + +parse_array(<<>>) -> + []; -parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) -> - {decimal, {Before, After}, R}; +?SIMPLE_PARSE_ARRAY($b, Value:8/signed, byte); +?SIMPLE_PARSE_ARRAY($d, Value:64/float, double); +?SIMPLE_PARSE_ARRAY($f, Value:32/float, float); +?SIMPLE_PARSE_ARRAY($l, Value:64/signed, long); +?SIMPLE_PARSE_ARRAY($s, Value:16/signed, short); -parse_field_value(<<$T, V:64/unsigned, R/binary>>) -> - {timestamp, V, R}; +parse_array(<<$t, Value:8/unsigned, Rest/binary>>) -> + [{bool, (Value /= 0)} | parse_array(Rest)]; -parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> - {table, parse_table(Table), R}; +parse_array(<<$D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> + [{decimal, {Before, After}} | parse_array(Rest)]; -parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> - {array, parse_array(Array), R}; +parse_array(<<$F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{table, parse_table(Value)} | parse_array(Rest)]; -parse_field_value(<<$b, V:8/signed, R/binary>>) -> {byte, V, R}; -parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R}; -parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R}; -parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R}; -parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R}; -parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; +parse_array(<<$A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{array, parse_array(Value)} | parse_array(Rest)]; -parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> - {binary, V, R}; +parse_array(<<$x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) -> + [{binary, Value} | parse_array(Rest)]; -parse_field_value(<<$V, R/binary>>) -> - {void, undefined, R}. +parse_array(<<$V, Rest/binary>>) -> + [{void, undefined} | parse_array(Rest)]. ensure_content_decoded(Content = #content{properties = Props}) when Props =/= none -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8632e1b3f7..3450fd4e91 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -581,7 +581,8 @@ check_user_id_header(#'P_basic'{user_id = Username}, #ch{user = #user{username = Username}}) -> ok; check_user_id_header( - #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) -> + #'P_basic'{}, #ch{user = #user{authz_backends = + [{rabbit_auth_backend_dummy, _}]}}) -> ok; check_user_id_header(#'P_basic'{user_id = Claimed}, #ch{user = #user{username = Actual, @@ -794,7 +795,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Delivery = rabbit_basic:delivery( Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), - DQ = {Delivery, QNames}, + DQ = {Delivery#delivery{flow = flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), @@ -1665,7 +1666,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ DelQNames}, State = #ch{queue_names = QNames, queue_monitors = QMons}) -> Qs = rabbit_amqqueue:lookup(DelQNames), - DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery), + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), %% The pmon:monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl index 47505b3df3..cf7b608399 100644 --- a/src/rabbit_cli.erl +++ b/src/rabbit_cli.erl @@ -17,7 +17,8 @@ -module(rabbit_cli). -include("rabbit_cli.hrl"). --export([main/3, parse_arguments/4, rpc_call/4]). +-export([main/3, start_distribution/0, start_distribution/1, + parse_arguments/4, rpc_call/4]). %%---------------------------------------------------------------------------- @@ -31,6 +32,8 @@ -spec(main/3 :: (fun (([string()], string()) -> parse_result()), fun ((atom(), atom(), [any()], [any()]) -> any()), atom()) -> no_return()). +-spec(start_distribution/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_distribution/1 :: (string()) -> {'ok', pid()} | {'error', any()}). -spec(usage/1 :: (atom()) -> no_return()). -spec(parse_arguments/4 :: ([{atom(), [{string(), optdef()}]} | atom()], @@ -42,6 +45,8 @@ %%---------------------------------------------------------------------------- main(ParseFun, DoFun, UsageMod) -> + error_logger:tty(false), + start_distribution(), {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {Command, Opts, Args} = case ParseFun(init:get_plain_arguments(), NodeStr) of @@ -101,6 +106,20 @@ main(ParseFun, DoFun, UsageMod) -> rabbit_misc:quit(2) end. +start_distribution() -> + start_distribution(list_to_atom( + rabbit_misc:format("rabbitmq-cli-~s", [os:getpid()]))). + +start_distribution(Name) -> + rabbit_nodes:ensure_epmd(), + net_kernel:start([Name, name_type()]). + +name_type() -> + case os:getenv("RABBITMQ_USE_LONGNAME") of + "true" -> longnames; + _ -> shortnames + end. + usage(Mod) -> io:format("~s", [Mod:usage()]), rabbit_misc:quit(1). diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index a931eef009..751dbd298d 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -19,7 +19,7 @@ -include("rabbit_cli.hrl"). -export([start/0, stop/0, parse_arguments/2, action/5, - sync_queue/1, cancel_sync_queue/1]). + sync_queue/1, cancel_sync_queue/1, become/1]). -import(rabbit_cli, [rpc_call/4]). @@ -40,6 +40,7 @@ change_cluster_node_type, update_cluster_nodes, {forget_cluster_node, [?OFFLINE_DEF]}, + rename_cluster_node, force_boot, cluster_status, {sync_queue, [?VHOST_DEF]}, @@ -104,8 +105,8 @@ -define(COMMANDS_NOT_REQUIRING_APP, [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, join_cluster, change_cluster_node_type, update_cluster_nodes, - forget_cluster_node, cluster_status, status, environment, eval, - force_boot]). + forget_cluster_node, rename_cluster_node, cluster_status, status, + environment, eval, force_boot]). %%---------------------------------------------------------------------------- @@ -123,7 +124,6 @@ %%---------------------------------------------------------------------------- start() -> - start_distribution(), rabbit_cli:main( fun (Args, NodeStr) -> parse_arguments(Args, NodeStr) @@ -234,6 +234,13 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> [ClusterNode, false]) end; +action(rename_cluster_node, Node, NodesS, _Opts, Inform) -> + Nodes = split_list([list_to_atom(N) || N <- NodesS]), + Inform("Renaming cluster nodes:~n~s~n", + [lists:flatten([rabbit_misc:format(" ~s -> ~s~n", [F, T]) || + {F, T} <- Nodes])]), + rabbit_mnesia_rename:rename(Node, Nodes); + action(force_boot, Node, [], _Opts, Inform) -> Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]), case rabbit:is_running(Node) of @@ -586,28 +593,18 @@ exit_loop(Port) -> {Port, _} -> exit_loop(Port) end. -start_distribution() -> - CtlNodeName = rabbit_misc:format("rabbitmqctl-~s", [os:getpid()]), - {ok, _} = net_kernel:start([list_to_atom(CtlNodeName), name_type()]). - become(BecomeNode) -> + error_logger:tty(false), + ok = net_kernel:stop(), case net_adm:ping(BecomeNode) of pong -> exit({node_running, BecomeNode}); pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]), - error_logger:tty(false), - ok = net_kernel:stop(), - {ok, _} = net_kernel:start([BecomeNode, name_type()]), + {ok, _} = rabbit_cli:start_distribution(BecomeNode), io:format(" done~n", []), Dir = mnesia:system_info(directory), io:format(" * Mnesia directory : ~s~n", [Dir]) end. -name_type() -> - case os:getenv("RABBITMQ_USE_LONGNAME") of - "true" -> longnames; - _ -> shortnames - end. - %%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> @@ -720,3 +717,7 @@ prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value); prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; prettify_typed_amqp_value(_Type, Value) -> Value. + +split_list([]) -> []; +split_list([_]) -> exit(even_list_needed); +split_list([A, B | T]) -> [{A, B} | split_list(T)]. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 749a67b1d5..11233e7eb8 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -83,16 +83,27 @@ connect({Username, Password}, VHost, Protocol, Pid, Infos) -> connect0(AuthFun, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of true -> case AuthFun() of - {ok, User} -> + {ok, User = #user{username = Username}} -> + notify_auth_result(Username, + user_authentication_success, []), connect1(User, VHost, Protocol, Pid, Infos); - {refused, _M, _A} -> + {refused, Username, Msg, Args} -> + notify_auth_result(Username, + user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}]), {error, {auth_failure, "Refused"}} end; false -> {error, broker_not_found_on_node} end. +notify_auth_result(Username, AuthResult, ExtraProps) -> + EventProps = [{connection_type, direct}, + {name, case Username of none -> ''; _ -> Username end}] ++ + ExtraProps, + rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). + connect1(User, VHost, Protocol, Pid, Infos) -> - try rabbit_access_control:check_vhost_access(User, VHost) of + try rabbit_access_control:check_vhost_access(User, VHost, undefined) of ok -> ok = pg_local:join(rabbit_direct, Pid), rabbit_event:notify(connection_created, Infos), {ok, {User, rabbit_reader:server_properties(Protocol)}} diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl new file mode 100644 index 0000000000..261554921a --- /dev/null +++ b/src/rabbit_epmd_monitor.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_epmd_monitor). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-record(state, {timer, mod, me, host, port}). + +-define(SERVER, ?MODULE). +-define(CHECK_FREQUENCY, 60000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- +%% It's possible for epmd to be killed out from underneath us. If that +%% happens, then obviously clustering and rabbitmqctl stop +%% working. This process checks up on epmd and restarts it / +%% re-registers us with it if it has gone away. +%% +%% How could epmd be killed? +%% +%% 1) The most popular way for this to happen is when running as a +%% Windows service. The user starts rabbitmqctl first, and this starts +%% epmd under the user's account. When they log out epmd is killed. +%% +%% 2) Some packagings of (non-RabbitMQ?) Erlang apps might do "killall +%% epmd" as a shutdown or uninstall step. +%% ---------------------------------------------------------------------------- + +start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + {Me, Host} = rabbit_nodes:parts(node()), + Mod = net_kernel:epmd_module(), + {port, Port, _Version} = Mod:port_please(Me, Host), + {ok, ensure_timer(#state{mod = Mod, + me = Me, + host = Host, + port = Port})}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(check, State) -> + check_epmd(State), + {noreply, ensure_timer(State#state{timer = undefined})}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +ensure_timer(State) -> + rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check). + +check_epmd(#state{mod = Mod, + me = Me, + host = Host, + port = Port}) -> + case Mod:port_please(Me, Host) of + noport -> rabbit_log:warning( + "epmd does not know us, re-registering ~s at port ~b~n", + [Me, Port]), + rabbit_nodes:ensure_epmd(), + erl_epmd:register_node(Me, Port); + _ -> ok + end. diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index e05ef05af4..6f0865b300 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -16,7 +16,8 @@ -module(rabbit_log). --export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). +-export([log/3, log/4, debug/1, debug/2, info/1, info/2, warning/1, + warning/2, error/1, error/2]). -export([with_local_io/1]). %%---------------------------------------------------------------------------- @@ -26,11 +27,13 @@ -export_type([level/0]). -type(category() :: atom()). --type(level() :: 'info' | 'warning' | 'error'). +-type(level() :: 'debug' | 'info' | 'warning' | 'error'). -spec(log/3 :: (category(), level(), string()) -> 'ok'). -spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). +-spec(debug/1 :: (string()) -> 'ok'). +-spec(debug/2 :: (string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). -spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). @@ -50,6 +53,7 @@ log(Category, Level, Fmt, Args) when is_list(Args) -> case level(Level) =< catlevel(Category) of false -> ok; true -> F = case Level of + debug -> fun error_logger:info_msg/2; info -> fun error_logger:info_msg/2; warning -> fun error_logger:warning_msg/2; error -> fun error_logger:error_msg/2 @@ -57,6 +61,8 @@ log(Category, Level, Fmt, Args) when is_list(Args) -> with_local_io(fun () -> F(Fmt, Args) end) end. +debug(Fmt) -> log(default, debug, Fmt). +debug(Fmt, Args) -> log(default, debug, Fmt, Args). info(Fmt) -> log(default, info, Fmt). info(Fmt, Args) -> log(default, info, Fmt, Args). warning(Fmt) -> log(default, warning, Fmt). @@ -75,6 +81,7 @@ catlevel(Category) -> %%-------------------------------------------------------------------- +level(debug) -> 4; level(info) -> 3; level(warning) -> 2; level(error) -> 1; diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 3d4605280f..a8f5e74899 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -353,9 +353,10 @@ handle_cast({gm_deaths, DeadGMPids}, when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue( QueueName, MPid, DeadGMPids) of - {ok, MPid, DeadPids} -> + {ok, MPid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index aa1e1ab9ee..0c05729292 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,8 +17,8 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, purge_acks/1, publish/5, publish_delivered/4, - discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, + purge/1, purge_acks/1, publish/6, publish_delivered/5, + discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, @@ -230,37 +230,38 @@ purge(State = #state { gm = GM, purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}). -publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, +publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, + ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), - BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, - ChPid, State = #state { gm = GM, - seen_status = SS, - backing_queue = BQ, - backing_queue_state = BQS }) -> + ChPid, Flow, State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, + ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), - {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. -discard(MsgId, ChPid, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> +discard(MsgId, ChPid, Flow, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {discard, ChPid, MsgId}), - ensure_monitoring(ChPid, State #state { backing_queue_state = - BQ:discard(MsgId, ChPid, BQS) }). + ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), + ensure_monitoring(ChPid, + State #state { backing_queue_state = + BQ:discard(MsgId, ChPid, Flow, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 826b6927a1..c2095cebbe 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -49,7 +49,7 @@ -spec(remove_from_queue/3 :: (rabbit_amqqueue:name(), pid(), [pid()]) - -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). + -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). -spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async') -> 'ok'). @@ -70,7 +70,7 @@ %%---------------------------------------------------------------------------- -%% Returns {ok, NewMPid, DeadPids} +%% Returns {ok, NewMPid, DeadPids, ExtraNodes} remove_from_queue(QueueName, Self, DeadGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -94,32 +94,36 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> Pid <- SPids, not lists:member(Pid, AlivePids)] ++ DSNs, {QPid1, SPids1} = promote_slave(Alive), - case {{QPid, SPids}, {QPid1, SPids1}} of - {Same, Same} -> - ok; - _ when QPid =:= QPid1 orelse QPid1 =:= Self -> - %% Either master hasn't changed, so - %% we're ok to update mnesia; or we have - %% become the master. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM, - down_slave_nodes = DSNs1}, - store_updated_slaves(Q1), - %% If we add and remove nodes at the same time we - %% might tell the old master we need to sync and - %% then shut it down. So let's check if the new - %% master needs to sync. - maybe_auto_sync(Q1); + Extra = + case {{QPid, SPids}, {QPid1, SPids1}} of + {Same, Same} -> + []; + _ when QPid =:= QPid1 orelse QPid1 =:= Self -> + %% Either master hasn't changed, so + %% we're ok to update mnesia; or we have + %% become the master. + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, + store_updated_slaves(Q1), + %% If we add and remove nodes at the + %% same time we might tell the old + %% master we need to sync and then + %% shut it down. So let's check if + %% the new master needs to sync. + maybe_auto_sync(Q1), + slaves_to_start_on_failure(Q1, DeadGMPids); _ -> - %% Master has changed, and we're not it. - %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM, - down_slave_nodes = DSNs1}, - store_updated_slaves(Q1) - end, - {ok, QPid1, DeadPids} + %% Master has changed, and we're not it. + %% [1]. + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, + store_updated_slaves(Q1), + [] + end, + {ok, QPid1, DeadPids, Extra} end end). %% [1] We still update mnesia here in case the slave that is supposed @@ -145,6 +149,17 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% aforementioned restriction on updating the master pid, that pid may %% not be present in gm_pids, but only if said master has died. +%% Sometimes a slave dying means we need to start more on other +%% nodes - "exactly" mode can cause this to happen. +slaves_to_start_on_failure(Q, DeadGMPids) -> + %% In case Mnesia has not caught up yet, filter out nodes we know + %% to be dead.. + ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- + [node(P) || P <- DeadGMPids], + {_, OldNodes, _} = actual_queue_nodes(Q), + {_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes), + NewNodes -- OldNodes. + on_node_up() -> QNames = rabbit_misc:execute_mnesia_transaction( @@ -344,6 +359,13 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], + %% When a mirror dies, remove_from_queue/2 might have to add new + %% slaves (in "exactly" mode). It will check mnesia to see which + %% slaves there currently are. If drop_mirror/2 is invoked first + %% then when we end up in remove_from_queue/2 it will not see the + %% slaves that add_mirror/2 will add, and also want to add them + %% (even though we are not responding to the death of a + %% mirror). Breakage ensues. add_mirrors (QName, NewNodes -- OldNodes, async), drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 92e1cc2736..45167706b4 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -206,21 +206,28 @@ handle_call({gm_deaths, DeadGMPids}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids} -> + {ok, Pid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids), case Pid of MPid -> %% master hasn't changed gen_server2:reply(From, ok), + rabbit_mirror_queue_misc:add_mirrors( + QName, ExtraNodes, async), noreply(State); Self -> %% we've become master QueueState = promote_me(From, State), + rabbit_mirror_queue_misc:add_mirrors( + QName, ExtraNodes, async), {become, rabbit_amqqueue_process, QueueState, hibernate}; _ -> %% master has changed to not us gen_server2:reply(From, ok), + %% assertion, we don't need to add_mirrors/2 in this + %% branch, see last clause in remove_from_queue/2 + [] = ExtraNodes, %% Since GM is by nature lazy we need to make sure %% there is some traffic when a master dies, to %% make sure all slaves get informed of the @@ -246,7 +253,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> %% Asynchronous, non-"mandatory", deliver mode. case Flow of @@ -628,7 +635,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery#delivery{mandatory = false} || %% [0] + Deliveries = [promote_delivery(Delivery) || {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], @@ -640,8 +647,16 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). -%% [0] We reset mandatory to false here because we will have sent the -%% mandatory_received already as soon as we got the message +%% We reset mandatory to false here because we will have sent the +%% mandatory_received already as soon as we got the message. We also +%% need to send an ack for these messages since the channel is waiting +%% for one for the via-GM case and we will not now receive one. +promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, + Delivery#delivery{mandatory = false}. noreply(State) -> {NewState, Timeout} = next_state(State), @@ -823,24 +838,27 @@ publish_or_discard(Status, ChPid, MsgId, State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. -process_instruction({publish, ChPid, MsgProps, +process_instruction({publish, ChPid, Flow, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), - BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; -process_instruction({publish_delivered, ChPid, MsgProps, +process_instruction({publish_delivered, ChPid, Flow, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), true = BQ:is_empty(BQS), - {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), {ok, maybe_store_ack(true, MsgId, AckTag, State1 #state { backing_queue_state = BQS1 })}; -process_instruction({discard, ChPid, MsgId}, State) -> +process_instruction({discard, ChPid, Flow, MsgId}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(discarded, ChPid, MsgId, State), - BQS1 = BQ:discard(MsgId, ChPid, BQS), + BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, @@ -899,6 +917,9 @@ process_instruction({delete_and_terminate, Reason}, BQ:delete_and_terminate(Reason, BQS), {stop, State #state { backing_queue_state = undefined }}. +maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid); +maybe_flow_ack(_ChPid, noflow) -> ok. + msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = lists:foldl( diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index ee1d210556..9a8d55f94b 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -263,9 +263,10 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, Props1 = Props#message_properties{needs_confirming = false}, {MA1, BQS1} = case Unacked of - false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)}; + false -> {MA, + BQ:publish(Msg, Props1, true, none, noflow, BQS)}; true -> {AckTag, BQS2} = BQ:publish_delivered( - Msg, Props1, none, BQS), + Msg, Props1, none, noflow, BQS), {[{Msg#basic_message.id, AckTag} | MA], BQS2} end, slave_sync_loop(Args, {MA1, TRef, BQS1}); diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3e2c88ee34..626341b5f5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -44,7 +44,8 @@ -export([format/2, format_many/1, format_stderr/2]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). --export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]). +-export([pid_to_string/1, string_to_pid/1, + pid_change_node/2, node_to_fake_pid/1]). -export([version_compare/2, version_compare/3]). -export([version_minor_equivalent/2]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). @@ -196,6 +197,7 @@ (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(pid_change_node/2 :: (pid(), node()) -> pid()). -spec(node_to_fake_pid/1 :: (atom()) -> pid()). -spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). -spec(version_compare/3 :: @@ -520,8 +522,12 @@ execute_mnesia_transaction(TxFun) -> Res = mnesia:sync_transaction(TxFun), DiskLogAfter = mnesia_dumper:get_log_writes(), case DiskLogAfter == DiskLogBefore of - true -> Res; - false -> {sync, Res} + true -> file_handle_cache_stats:update( + mnesia_ram_tx), + Res; + false -> file_handle_cache_stats:update( + mnesia_disk_tx), + {sync, Res} end; true -> mnesia:sync_transaction(TxFun) end @@ -686,11 +692,7 @@ sort_field_table(Arguments) -> %% regardless of what node we are running on. The representation also %% permits easy identification of the pid's node. pid_to_string(Pid) when is_pid(Pid) -> - %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and - %% 8.7) - <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> - = term_to_binary(Pid), - Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + {Node, Cre, Id, Ser} = decompose_pid(Pid), format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]). %% inverse of above @@ -701,17 +703,32 @@ string_to_pid(Str) -> case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$", [{capture,all_but_first,list}]) of {match, [NodeStr, CreStr, IdStr, SerStr]} -> - <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)), [Cre, Id, Ser] = lists:map(fun list_to_integer/1, [CreStr, IdStr, SerStr]), - binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>); + compose_pid(list_to_atom(NodeStr), Cre, Id, Ser); nomatch -> throw(Err) end. +pid_change_node(Pid, NewNode) -> + {_OldNode, Cre, Id, Ser} = decompose_pid(Pid), + compose_pid(NewNode, Cre, Id, Ser). + %% node(node_to_fake_pid(Node)) =:= Node. node_to_fake_pid(Node) -> - string_to_pid(format("<~s.0.0.0>", [Node])). + compose_pid(Node, 0, 0, 0). + +decompose_pid(Pid) when is_pid(Pid) -> + %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and + %% 8.7) + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + {Node, Cre, Id, Ser}. + +compose_pid(Node, Cre, Id, Ser) -> + <<131,NodeEnc/binary>> = term_to_binary(Node), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>). version_compare(A, B, lte) -> case version_compare(A, B) of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 80fbcd6835..f9110e5868 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -113,23 +113,29 @@ init() -> ok. init_from_config() -> + FindBadNodeNames = fun + (Name, BadNames) when is_atom(Name) -> BadNames; + (Name, BadNames) -> [Name | BadNames] + end, {TryNodes, NodeType} = case application:get_env(rabbit, cluster_nodes) of + {ok, {Nodes, Type} = Config} + when is_list(Nodes) andalso (Type == disc orelse Type == ram) -> + case lists:foldr(FindBadNodeNames, [], Nodes) of + [] -> Config; + BadNames -> e({invalid_cluster_node_names, BadNames}) + end; + {ok, {_, BadType}} when BadType /= disc andalso BadType /= ram -> + e({invalid_cluster_node_type, BadType}); {ok, Nodes} when is_list(Nodes) -> - Config = {Nodes -- [node()], case lists:member(node(), Nodes) of - true -> disc; - false -> ram - end}, - rabbit_log:warning( - "Converting legacy 'cluster_nodes' configuration~n ~w~n" - "to~n ~w.~n~n" - "Please update the configuration to the new format " - "{Nodes, NodeType}, where Nodes contains the nodes that the " - "node will try to cluster with, and NodeType is either " - "'disc' or 'ram'~n", [Nodes, Config]), - Config; - {ok, Config} -> - Config + %% The legacy syntax (a nodes list without the node + %% type) is unsupported. + case lists:foldr(FindBadNodeNames, [], Nodes) of + [] -> e(cluster_node_type_mandatory); + _ -> e(invalid_cluster_nodes_conf) + end; + {ok, _} -> + e(invalid_cluster_nodes_conf) end, case TryNodes of [] -> init_db_and_upgrade([node()], disc, false); @@ -850,6 +856,20 @@ nodes_excl_me(Nodes) -> Nodes -- [node()]. e(Tag) -> throw({error, {Tag, error_description(Tag)}}). +error_description({invalid_cluster_node_names, BadNames}) -> + "In the 'cluster_nodes' configuration key, the following node names " + "are invalid: " ++ lists:flatten(io_lib:format("~p", [BadNames])); +error_description({invalid_cluster_node_type, BadType}) -> + "In the 'cluster_nodes' configuration key, the node type is invalid " + "(expected 'disc' or 'ram'): " ++ + lists:flatten(io_lib:format("~p", [BadType])); +error_description(cluster_node_type_mandatory) -> + "The 'cluster_nodes' configuration key must indicate the node type: " + "either {[...], disc} or {[...], ram}"; +error_description(invalid_cluster_nodes_conf) -> + "The 'cluster_nodes' configuration key is invalid, it must be of the " + "form {[Nodes], Type}, where Nodes is a list of node names and " + "Type is either 'disc' or 'ram'"; error_description(clustering_only_disc_node) -> "You cannot cluster a node if it is the only disc node in its existing " " cluster. If new nodes joined while this node was offline, use " diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl new file mode 100644 index 0000000000..2787cb743c --- /dev/null +++ b/src/rabbit_mnesia_rename.erl @@ -0,0 +1,267 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_mnesia_rename). +-include("rabbit.hrl"). + +-export([rename/2]). +-export([maybe_finish/1]). + +-define(CONVERT_TABLES, [schema, rabbit_durable_queue]). + +%% Supports renaming the nodes in the Mnesia database. In order to do +%% this, we take a backup of the database, traverse the backup +%% changing node names and pids as we go, then restore it. +%% +%% That's enough for a standalone node, for clusters the story is more +%% complex. We can take pairs of nodes From and To, but backing up and +%% restoring the database changes schema cookies, so if we just do +%% this on all nodes the cluster will refuse to re-form with +%% "Incompatible schema cookies.". Therefore we do something similar +%% to what we do for upgrades - the first node in the cluster to +%% restart becomes the authority, and other nodes wipe their own +%% Mnesia state and rejoin. They also need to tell Mnesia the old node +%% is not coming back. +%% +%% If we are renaming nodes one at a time then the running cluster +%% might not be aware that a rename has taken place, so after we wipe +%% and rejoin we then update any tables (in practice just +%% rabbit_durable_queue) which should be aware that we have changed. + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(rename/2 :: (node(), [{node(), node()}]) -> 'ok'). +-spec(maybe_finish/1 :: ([node()]) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +rename(Node, NodeMapList) -> + try + %% Check everything is correct and figure out what we are + %% changing from and to. + {FromNode, ToNode, NodeMap} = prepare(Node, NodeMapList), + + %% We backup and restore Mnesia even if other nodes are + %% running at the time, and defer the final decision about + %% whether to use our mutated copy or rejoin the cluster until + %% we restart. That means we might be mutating our copy of the + %% database while the cluster is running. *Do not* contact the + %% cluster while this is happening, we are likely to get + %% confused. + application:set_env(kernel, dist_auto_connect, never), + + %% Take a copy we can restore from if we abandon the + %% rename. We don't restore from the "backup" since restoring + %% that changes schema cookies and might stop us rejoining the + %% cluster. + ok = rabbit_mnesia:copy_db(mnesia_copy_dir()), + + %% And make the actual changes + rabbit_control_main:become(FromNode), + take_backup(before_backup_name()), + convert_backup(NodeMap, before_backup_name(), after_backup_name()), + ok = rabbit_file:write_term_file(rename_config_name(), + [{FromNode, ToNode}]), + convert_config_files(NodeMap), + rabbit_control_main:become(ToNode), + restore_backup(after_backup_name()), + ok + after + stop_mnesia() + end. + +prepare(Node, NodeMapList) -> + %% If we have a previous rename and haven't started since, give up. + case rabbit_file:is_dir(dir()) of + true -> exit({rename_in_progress, + "Restart node under old name to roll back"}); + false -> ok = rabbit_file:ensure_dir(mnesia_copy_dir()) + end, + + %% Check we don't have two nodes mapped to the same node + {FromNodes, ToNodes} = lists:unzip(NodeMapList), + case length(FromNodes) - length(lists:usort(ToNodes)) of + 0 -> ok; + _ -> exit({duplicate_node, ToNodes}) + end, + + %% Figure out which node we are before and after the change + FromNode = case [From || {From, To} <- NodeMapList, + To =:= Node] of + [N] -> N; + [] -> Node + end, + NodeMap = dict:from_list(NodeMapList), + ToNode = case dict:find(FromNode, NodeMap) of + {ok, N2} -> N2; + error -> FromNode + end, + + %% Check that we are in the cluster, all old nodes are in the + %% cluster, and no new nodes are. + Nodes = rabbit_mnesia:cluster_nodes(all), + case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes), + lists:member(Node, Nodes ++ ToNodes)} of + {[], [], true} -> ok; + {[], [], false} -> exit({i_am_not_involved, Node}); + {F, [], _} -> exit({nodes_not_in_cluster, F}); + {_, T, _} -> exit({nodes_already_in_cluster, T}) + end, + {FromNode, ToNode, NodeMap}. + +take_backup(Backup) -> + start_mnesia(), + ok = mnesia:backup(Backup), + stop_mnesia(). + +restore_backup(Backup) -> + ok = mnesia:install_fallback(Backup, [{scope, local}]), + start_mnesia(), + stop_mnesia(), + rabbit_mnesia:force_load_next_boot(). + +maybe_finish(AllNodes) -> + case rabbit_file:read_term_file(rename_config_name()) of + {ok, [{FromNode, ToNode}]} -> finish(FromNode, ToNode, AllNodes); + _ -> ok + end. + +finish(FromNode, ToNode, AllNodes) -> + case node() of + ToNode -> + case rabbit_upgrade:nodes_running(AllNodes) of + [] -> finish_primary(FromNode, ToNode); + _ -> finish_secondary(FromNode, ToNode, AllNodes) + end; + FromNode -> + rabbit_log:info( + "Abandoning rename from ~s to ~s since we are still ~s~n", + [FromNode, ToNode, FromNode]), + [{ok, _} = file:copy(backup_of_conf(F), F) || F <- config_files()], + ok = rabbit_file:recursive_delete([rabbit_mnesia:dir()]), + ok = rabbit_file:recursive_copy( + mnesia_copy_dir(), rabbit_mnesia:dir()), + delete_rename_files(); + _ -> + %% Boot will almost certainly fail but we might as + %% well just log this + rabbit_log:info( + "Rename attempted from ~s to ~s but we are ~s - ignoring.~n", + [FromNode, ToNode, node()]) + end. + +finish_primary(FromNode, ToNode) -> + rabbit_log:info("Restarting as primary after rename from ~s to ~s~n", + [FromNode, ToNode]), + delete_rename_files(), + ok. + +finish_secondary(FromNode, ToNode, AllNodes) -> + rabbit_log:info("Restarting as secondary after rename from ~s to ~s~n", + [FromNode, ToNode]), + rabbit_upgrade:secondary_upgrade(AllNodes), + rename_in_running_mnesia(FromNode, ToNode), + delete_rename_files(), + ok. + +dir() -> rabbit_mnesia:dir() ++ "-rename". +before_backup_name() -> dir() ++ "/backup-before". +after_backup_name() -> dir() ++ "/backup-after". +rename_config_name() -> dir() ++ "/pending.config". +mnesia_copy_dir() -> dir() ++ "/mnesia-copy". + +delete_rename_files() -> ok = rabbit_file:recursive_delete([dir()]). + +start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + rabbit_table:force_load(), + rabbit_table:wait_for_replicated(). +stop_mnesia() -> stopped = mnesia:stop(). + +convert_backup(NodeMap, FromBackup, ToBackup) -> + mnesia:traverse_backup( + FromBackup, ToBackup, + fun + (Row, Acc) -> + case lists:member(element(1, Row), ?CONVERT_TABLES) of + true -> {[update_term(NodeMap, Row)], Acc}; + false -> {[Row], Acc} + end + end, switched). + +config_files() -> + [rabbit_node_monitor:running_nodes_filename(), + rabbit_node_monitor:cluster_status_filename()]. + +backup_of_conf(Path) -> + filename:join([dir(), filename:basename(Path)]). + +convert_config_files(NodeMap) -> + [convert_config_file(NodeMap, Path) || Path <- config_files()]. + +convert_config_file(NodeMap, Path) -> + {ok, Term} = rabbit_file:read_term_file(Path), + {ok, _} = file:copy(Path, backup_of_conf(Path)), + ok = rabbit_file:write_term_file(Path, update_term(NodeMap, Term)). + +lookup_node(OldNode, NodeMap) -> + case dict:find(OldNode, NodeMap) of + {ok, NewNode} -> NewNode; + error -> OldNode + end. + +mini_map(FromNode, ToNode) -> dict:from_list([{FromNode, ToNode}]). + +update_term(NodeMap, L) when is_list(L) -> + [update_term(NodeMap, I) || I <- L]; +update_term(NodeMap, T) when is_tuple(T) -> + list_to_tuple(update_term(NodeMap, tuple_to_list(T))); +update_term(NodeMap, Node) when is_atom(Node) -> + lookup_node(Node, NodeMap); +update_term(NodeMap, Pid) when is_pid(Pid) -> + rabbit_misc:pid_change_node(Pid, lookup_node(node(Pid), NodeMap)); +update_term(_NodeMap, Term) -> + Term. + +rename_in_running_mnesia(FromNode, ToNode) -> + All = rabbit_mnesia:cluster_nodes(all), + Running = rabbit_mnesia:cluster_nodes(running), + case {lists:member(FromNode, Running), lists:member(ToNode, All)} of + {false, true} -> ok; + {true, _} -> exit({old_node_running, FromNode}); + {_, false} -> exit({new_node_not_in_cluster, ToNode}) + end, + {atomic, ok} = mnesia:del_table_copy(schema, FromNode), + Map = mini_map(FromNode, ToNode), + {atomic, _} = transform_table(rabbit_durable_queue, Map), + ok. + +transform_table(Table, Map) -> + mnesia:sync_transaction( + fun () -> + mnesia:lock({table, Table}, write), + transform_table(Table, Map, mnesia:first(Table)) + end). + +transform_table(_Table, _Map, '$end_of_table') -> + ok; +transform_table(Table, Map, Key) -> + [Term] = mnesia:read(Table, Key, write), + ok = mnesia:write(Table, update_term(Map, Term), write), + transform_table(Table, Map, mnesia:next(Table, Key)). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b829ae9476..9e3dd1e7c4 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -473,6 +473,7 @@ write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + file_handle_cache_stats:update(msg_store_read), %% Check the cur file cache case ets:lookup(CurFileCacheEts, MsgId) of [] -> @@ -507,6 +508,7 @@ server_cast(#client_msstate { server = Server }, Msg) -> client_write(MsgId, Msg, Flow, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> + file_handle_cache_stats:update(msg_store_write), ok = client_update_flying(+1, MsgId, CState), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), ok = server_cast(CState, {write, CRef, MsgId, Flow}). @@ -1299,7 +1301,8 @@ should_mask_action(CRef, MsgId, open_file(Dir, FileName, Mode) -> file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, - [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}, + {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 7f7fcc3126..d3c0e55ff8 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -18,7 +18,7 @@ -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2, is_process_running/2, - cluster_name/0, set_cluster_name/1]). + cluster_name/0, set_cluster_name/1, ensure_epmd/0]). -include_lib("kernel/include/inet.hrl"). @@ -41,6 +41,7 @@ -spec(is_process_running/2 :: (node(), atom()) -> boolean()). -spec(cluster_name/0 :: () -> binary()). -spec(set_cluster_name/1 :: (binary()) -> 'ok'). +-spec(ensure_epmd/0 :: () -> 'ok'). -endif. @@ -197,3 +198,20 @@ cluster_name_default() -> set_cluster_name(Name) -> rabbit_runtime_parameters:set_global(cluster_name, Name). + +ensure_epmd() -> + {ok, Root} = init:get_argument(root), + {ok, Prog} = init:get_argument(progname), + ID = random:uniform(1000000000), + Port = open_port( + {spawn_executable, filename:join([Root, "bin", Prog])}, + [{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]), + "-noshell", "-eval", "halt()."]}, + exit_status, stderr_to_stdout, use_stdio]), + port_shutdown_loop(Port). + +port_shutdown_loop(Port) -> + receive + {Port, {exit_status, _Rc}} -> ok; + {Port, _} -> port_shutdown_loop(Port) + end. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0a2c88d441..24dcd23cc8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,19 +16,27 @@ -module(rabbit_queue_index). --export([erase/1, init/2, recover/5, +-export([erase/1, init/3, recover/6, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, + publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). -define(CLEAN_FILENAME, "clean.dot"). %%---------------------------------------------------------------------------- %% The queue index is responsible for recording the order of messages -%% within a queue on disk. +%% within a queue on disk. As such it contains records of messages +%% being published, delivered and acknowledged. The publish record +%% includes the sequence ID, message ID and a small quantity of +%% metadata about the message; the delivery and acknowledgement +%% records just contain the sequence ID. A publish record may also +%% contain the complete message if provided to publish/5; this allows +%% the message store to be avoided altogether for small messages. In +%% either case the publish record is stored in memory in the same +%% serialised format it will take on disk. %% %% Because of the fact that the queue can decide at any point to send %% a queue entry to disk, you can not rely on publishes appearing in @@ -36,7 +44,7 @@ %% then delivered, then ack'd. %% %% In order to be able to clean up ack'd messages, we write to segment -%% files. These files have a fixed maximum size: ?SEGMENT_ENTRY_COUNT +%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT %% publishes, delivers and acknowledgements. They are numbered, and so %% it is known that the 0th segment contains messages 0 -> %% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages @@ -85,7 +93,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}), +%% the tuple: {('no_pub'|{IsPersistent, Bin, MsgBin}), %% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly %% necessary for most operations. However, for startup, and to ensure %% the safe and correct combination of journal entries with entries @@ -128,8 +136,8 @@ -define(REL_SEQ_ONLY_RECORD_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits -%% of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of +%% size and then 128 bits of md5sum msg id. -define(PUB_PREFIX, 1). -define(PUB_PREFIX_BITS, 1). @@ -140,32 +148,37 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). +%% This is the size of the message body content, for stats -define(SIZE_BYTES, 4). -define(SIZE_BITS, (?SIZE_BYTES * 8)). -%% 16 bytes for md5sum + 8 for expiry + 4 for size +%% This is the size of the message record embedded in the queue +%% index. If 0, the message can be found in the message store. +-define(EMBEDDED_SIZE_BYTES, 4). +-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)). + +%% 16 bytes for md5sum + 8 for expiry -define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). -%% + 2 for seq, bits and prefix --define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). +%% + 4 for size +-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)). -%% 1 publish, 1 deliver, 1 ack per msg --define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))). +%% + 2 for seq, bits and prefix +-define(PUB_RECORD_PREFIX_BYTES, 2). %% ---- misc ---- --define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent} +-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin} -define(READ_MODE, [binary, raw, read]). --define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). -define(WRITE_MODE, [write | ?READ_MODE]). %%---------------------------------------------------------------------------- --record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unconfirmed }). +-record(qistate, {dir, segments, journal_handle, dirty_count, + max_journal_entries, on_sync, on_sync_msg, + unconfirmed, unconfirmed_msg}). --record(segment, { num, path, journal_entries, unacked }). +-record(segment, {num, path, journal_entries, unacked}). -include("rabbit.hrl"). @@ -174,6 +187,7 @@ -rabbit_upgrade({add_queue_ttl, local, []}). -rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). -rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). +-rabbit_upgrade({store_msg, local, [store_msg_size]}). -ifdef(use_specs). @@ -193,7 +207,9 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unconfirmed :: gb_sets:set() + on_sync_msg :: on_sync_fun(), + unconfirmed :: gb_sets:set(), + unconfirmed_msg :: gb_sets:set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -201,16 +217,18 @@ -type(shutdown_terms() :: [term()] | 'non_clean_shutdown'). -spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok'). --spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). --spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), - contains_predicate(), on_sync_fun()) -> +-spec(init/3 :: (rabbit_amqqueue:name(), + on_sync_fun(), on_sync_fun()) -> qistate()). +-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + contains_predicate(), + on_sync_fun(), on_sync_fun()) -> {'undefined' | non_neg_integer(), 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), - rabbit_types:message_properties(), boolean(), qistate()) - -> qistate()). +-spec(publish/6 :: (rabbit_types:msg_id(), seq_id(), + rabbit_types:message_properties(), boolean(), + non_neg_integer(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/1 :: (qistate()) -> qistate()). @@ -241,14 +259,17 @@ erase(Name) -> false -> ok end. -init(Name, OnSyncFun) -> +init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir - State #qistate { on_sync = OnSyncFun }. + State#qistate{on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, + OnSyncFun, OnSyncMsgFun) -> State = blank_state(Name), - State1 = State #qistate { on_sync = OnSyncFun }, + State1 = State #qistate{on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}, CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), @@ -267,26 +288,35 @@ delete_and_terminate(State) -> ok = rabbit_file:recursive_delete([Dir]), State1. -publish(MsgId, SeqId, MsgProps, IsPersistent, - State = #qistate { unconfirmed = Unconfirmed }) - when is_binary(MsgId) -> +publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, + State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM}) -> + MsgId = case MsgOrId of + #basic_message{id = Id} -> Id; + Id when is_binary(Id) -> Id + end, ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( - case MsgProps#message_properties.needs_confirming of - true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed), - State #qistate { unconfirmed = Unconfirmed1 }; - false -> State + case {MsgProps#message_properties.needs_confirming, MsgOrId} of + {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC), + State#qistate{unconfirmed = UC1}; + {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM), + State#qistate{unconfirmed_msg = UCM1}; + {false, _} -> State end), + file_handle_cache_stats:update(queue_index_journal_write), + {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, - SeqId:?SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]), + SeqId:?SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]), maybe_flush_journal( - add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)). + JournalSizeHint, + add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -302,10 +332,12 @@ sync(State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), notify_sync(State). -needs_sync(#qistate { journal_handle = undefined }) -> +needs_sync(#qistate{journal_handle = undefined}) -> false; -needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) -> - case gb_sets:is_empty(UC) of +needs_sync(#qistate{journal_handle = JournalHdl, + unconfirmed = UC, + unconfirmed_msg = UCM}) -> + case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of true -> case file_handle_cache:needs_sync(JournalHdl) of true -> other; false -> false @@ -409,7 +441,9 @@ blank_state_dir(Dir) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unconfirmed = gb_sets:new() }. + on_sync_msg = fun (_) -> ok end, + unconfirmed = gb_sets:new(), + unconfirmed_msg = gb_sets:new() }. init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -479,9 +513,10 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack}, + fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack}, {SegmentAndDirtyCount, Bytes}) -> - {recover_message(ContainsCheckFun(MsgId), CleanShutdown, + {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), + {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown, Del, RelSeq, SegmentAndDirtyCount), Bytes + case IsPersistent of true -> MsgProps#message_properties.size; @@ -541,7 +576,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = blank_state(QueueName), ok = scan_segments( - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) + when is_binary(MsgId) -> gatherer:sync_in(Gatherer, {MsgId, 1}); (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, Acc) -> @@ -555,9 +591,9 @@ scan_segments(Fun, Acc, State) -> Result = lists:foldr( fun (Seg, AccN) -> segment_entries_foldr( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, IsAcked}, AccM) -> - Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps, + Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, IsPersistent, IsDelivered, IsAcked, AccM) end, AccN, segment_find_or_new(Seg, Dir, Segments)) end, Acc, all_segment_nums(State1)), @@ -568,24 +604,35 @@ scan_segments(Fun, Acc, State) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties { expiry = Expiry, - size = Size }) -> - [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>]. +create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, + size = Size }) -> + ExpiryBin = expiry_to_binary(Expiry), + case MsgOrId of + MsgId when is_binary(MsgId) -> + {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, <<>>}; + #basic_message{id = MsgId} -> + MsgBin = term_to_binary(MsgOrId), + {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, MsgBin} + end. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, - Size:?SIZE_BITS>>) -> + Size:?SIZE_BITS>>, MsgBin) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, - Exp = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - {MsgId, #message_properties { expiry = Exp, - size = Size }}. + Props = #message_properties{expiry = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + size = Size}, + case MsgBin of + <<>> -> {MsgId, Props}; + _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin), + {Msg, Props} + end. %%---------------------------------------------------------------------------- %% journal manipulation @@ -628,11 +675,14 @@ add_to_journal(RelSeq, Action, JEntries) -> array:reset(RelSeq, JEntries) end. -maybe_flush_journal(State = #qistate { dirty_count = DCount, - max_journal_entries = MaxJournal }) - when DCount > MaxJournal -> - flush_journal(State); maybe_flush_journal(State) -> + maybe_flush_journal(infinity, State). + +maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount, + max_journal_entries = MaxJournal }) + when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) -> + flush_journal(State); +maybe_flush_journal(_Hint, State) -> State. flush_journal(State = #qistate { segments = Segments }) -> @@ -656,9 +706,13 @@ append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, + _ -> Seg = array:sparse_foldr( + fun entry_to_segment/3, [], JEntries), + file_handle_cache_stats:update(queue_index_write), + + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), - array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), + file_handle_cache:append(Hdl, Seg), ok = file_handle_cache:close(Hdl), Segment #segment { journal_entries = array_new() } end. @@ -677,10 +731,13 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. load_journal(State = #qistate { dir = Dir }) -> - case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of + Path = filename:join(Dir, ?JOURNAL_FILENAME), + case rabbit_file:is_file(Path) of true -> {JournalHdl, State1} = get_journal_handle(State), + Size = rabbit_file:file_size(Path), {ok, 0} = file_handle_cache:position(JournalHdl, 0), - load_journal_entries(State1); + {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size), + parse_journal_entries(JournalBin, State1); false -> State end. @@ -704,41 +761,37 @@ recover_journal(State) -> end, Segments), State1 #qistate { segments = Segments1 }. -load_journal_entries(State = #qistate { journal_handle = Hdl }) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> - case Prefix of - ?DEL_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, del, State)); - ?ACK_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, ack, State)); - _ -> - case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of - %% Journal entry composed only of zeroes was probably - %% produced during a dirty shutdown so stop reading - {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> - State; - {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> - {MsgId, MsgProps} = parse_pub_record_body(Bin), - IsPersistent = case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end, - load_journal_entries( - add_to_journal( - SeqId, {MsgId, MsgProps, IsPersistent}, State)); - _ErrOrEoF -> %% err, we've lost at least a publish - State - end - end; - _ErrOrEoF -> State - end. +parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>, State) -> + parse_journal_entries(Rest, add_to_journal(SeqId, del, State)); + +parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>, State) -> + parse_journal_entries(Rest, add_to_journal(SeqId, ack, State)); +parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS, + 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) -> + %% Journal entry composed only of zeroes was probably + %% produced during a dirty shutdown so stop reading + State; +parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary, + Rest/binary>>, State) -> + IsPersistent = case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end, + parse_journal_entries( + Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State)); +parse_journal_entries(_ErrOrEoF, State) -> + State. deliver_or_ack(_Kind, [], State) -> State; deliver_or_ack(Kind, SeqIds, State) -> JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end, {JournalHdl, State1} = get_journal_handle(State), + file_handle_cache_stats:update(queue_index_journal_write), ok = file_handle_cache:append( JournalHdl, [<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]), @@ -746,11 +799,19 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) -> - case gb_sets:is_empty(UC) of - true -> State; - false -> OnSyncFun(UC), - State #qistate { unconfirmed = gb_sets:new() } +notify_sync(State = #qistate{unconfirmed = UC, + unconfirmed_msg = UCM, + on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun}) -> + State1 = case gb_sets:is_empty(UC) of + true -> State; + false -> OnSyncFun(UC), + State#qistate{unconfirmed = gb_sets:new()} + end, + case gb_sets:is_empty(UCM) of + true -> State1; + false -> OnSyncMsgFun(UCM), + State1#qistate{unconfirmed_msg = gb_sets:new()} end. %%---------------------------------------------------------------------------- @@ -823,42 +884,42 @@ segment_nums({Segments, CachedSegments}) -> segments_new() -> {dict:new(), []}. -write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) -> - Hdl; -write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> - ok = case Pub of - no_pub -> - ok; - {MsgId, MsgProps, IsPersistent} -> - file_handle_cache:append( - Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(MsgId, MsgProps)]) - end, - ok = case {Del, Ack} of - {no_del, no_ack} -> - ok; - _ -> - Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - file_handle_cache:append( - Hdl, case {Del, Ack} of - {del, ack} -> [Binary, Binary]; - _ -> Binary - end) - end, - Hdl. +entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> + Buf; +entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> + %% NB: we are assembling the segment in reverse order here, so + %% del/ack comes first. + Buf1 = case {Del, Ack} of + {no_del, no_ack} -> + Buf; + _ -> + Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + case {Del, Ack} of + {del, ack} -> [[Binary, Binary] | Buf]; + _ -> [Binary | Buf] + end + end, + case Pub of + no_pub -> + Buf1; + {IsPersistent, Bin, MsgBin} -> + [[<<?PUB_PREFIX:?PUB_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1] + end. read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack}, + Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, - IsPersistent, IsDelivered == del} | Acc ]; + [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, + IsPersistent, IsDelivered == del} | Acc]; (_RelSeq, _Value, Acc) -> Acc end, Messages, Segment), @@ -868,7 +929,11 @@ segment_entries_foldr(Fun, Init, Segment = #segment { journal_entries = JEntries }) -> {SegEntries, _UnackedCount} = load_segment(false, Segment), {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), - array:sparse_foldr(Fun, Init, SegEntries1). + array:sparse_foldr( + fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) -> + {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin), + Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc) + end, Init, SegEntries1). %% Loading segments %% @@ -877,44 +942,48 @@ load_segment(KeepAcked, #segment { path = Path }) -> Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of false -> Empty; - true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), + true -> Size = rabbit_file:file_size(Path), + file_handle_cache_stats:update(queue_index_read), + {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of - {ok, SegData} -> load_segment_entries( - KeepAcked, SegData, Empty); - eof -> Empty - end, + {ok, SegBin} = file_handle_cache:read(Hdl, Size), ok = file_handle_cache:close(Hdl), + Res = parse_segment_entries(SegBin, KeepAcked, Empty), Res end. -load_segment_entries(KeepAcked, - <<?PUB_PREFIX:?PUB_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, - PubRecordBody:?PUB_RECORD_BODY_BYTES/binary, - SegData/binary>>, - {SegEntries, UnackedCount}) -> - {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody), - Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, +parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS, + IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>, + KeepAcked, Acc) -> + parse_segment_publish_entry( + Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc); +parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) -> + parse_segment_entries( + Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); +parse_segment_entries(<<>>, _KeepAcked, Acc) -> + Acc. + +parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS, + MsgBin:MsgSize/binary, Rest/binary>>, + IsPersistent, RelSeq, KeepAcked, + {SegEntries, Unacked}) -> + Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), - load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1}); -load_segment_entries(KeepAcked, - <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS, SegData/binary>>, - {SegEntries, UnackedCount}) -> - {UnackedCountDelta, SegEntries1} = - case array:get(RelSeq, SegEntries) of - {Pub, no_del, no_ack} -> - { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)}; - {Pub, del, no_ack} when KeepAcked -> - {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)}; - {_Pub, del, no_ack} -> - {-1, array:reset(RelSeq, SegEntries)} - end, - load_segment_entries(KeepAcked, SegData, - {SegEntries1, UnackedCount + UnackedCountDelta}); -load_segment_entries(_KeepAcked, _SegData, Res) -> - Res. + parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1}); +parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) -> + parse_segment_entries(Rest, KeepAcked, Acc). + +add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> + case array:get(RelSeq, SegEntries) of + {Pub, no_del, no_ack} -> + {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked}; + {Pub, del, no_ack} when KeepAcked -> + {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1}; + {_Pub, del, no_ack} -> + {array:reset(RelSeq, SegEntries), Unacked - 1} + end. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). @@ -1121,6 +1190,40 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, store_msg_size_segment(_) -> stop. +store_msg() -> + foreach_queue_index({fun store_msg_journal/1, + fun store_msg_segment/1}). + +store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?EMBEDDED_SIZE_BITS>>, Rest}; +store_msg_journal(_) -> + stop. + +store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, + 0:?EMBEDDED_SIZE_BITS>>, Rest}; +store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_segment(_) -> + stop. + + + %%---------------------------------------------------------------------------- @@ -1157,7 +1260,7 @@ transform_file(Path, Fun) when is_function(Fun)-> [{write_buffer, infinity}]), {ok, PathHdl} = file_handle_cache:open( - Path, [{read_ahead, Size} | ?READ_MODE], []), + Path, ?READ_MODE, [{read_buffer, Size}]), {ok, Content} = file_handle_cache:read(PathHdl, Size), ok = file_handle_cache:close(PathHdl), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ca73006aed..11c2d8e5df 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -58,6 +58,11 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(AUTH_NOTIFICATION_INFO_KEYS, + [host, vhost, name, peer_host, peer_port, protocol, auth_mechanism, + ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject, + peer_cert_validity]). + -define(IS_RUNNING(State), (State#v1.connection_state =:= running orelse State#v1.connection_state =:= blocking orelse @@ -214,7 +219,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> rabbit_net:fast_close(Sock), exit(normal) end, - log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(HandshakeTimeout, self(), handshake_timeout), @@ -260,8 +264,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of - connection_closed_abruptly -> warning; - _ -> error + connection_closed_with_no_data_received -> debug; + connection_closed_abruptly -> warning; + _ -> error end, "closing AMQP connection ~p (~s):~n~p~n", [self(), Name, Ex]) after @@ -313,8 +318,28 @@ binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> binlist_split(Len, [H|T], Acc) -> binlist_split(Len - size(H), T, [H|Acc]). -mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> - case rabbit_net:recv(Sock) of +mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, + connection_state = CS, + connection = #connection{ + name = ConnName}}) -> + Recv = rabbit_net:recv(Sock), + case CS of + pre_init when Buf =:= [] -> + %% We only log incoming connections when either the + %% first byte was received or there was an error (eg. a + %% timeout). + %% + %% The goal is to not log TCP healthchecks (a connection + %% with no data received) unless specified otherwise. + log(case Recv of + closed -> debug; + _ -> info + end, "accepting AMQP connection ~p (~s)~n", + [self(), ConnName]); + _ -> + ok + end, + case Recv of {data, Data} -> recvloop(Deb, [Data | Buf], BufLen + size(Data), State#v1{pending_recv = false}); @@ -334,10 +359,18 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> end end. -stop(closed, State) -> maybe_emit_stats(State), - throw(connection_closed_abruptly); -stop(Reason, State) -> maybe_emit_stats(State), - throw({inet_error, Reason}). +stop(closed, #v1{connection_state = pre_init} = State) -> + %% The connection was closed before any packet was received. It's + %% probably a load-balancer healthcheck: don't consider this a + %% failure. + maybe_emit_stats(State), + throw(connection_closed_with_no_data_received); +stop(closed, State) -> + maybe_emit_stats(State), + throw(connection_closed_abruptly); +stop(Reason, State) -> + maybe_emit_stats(State), + throw({inet_error, Reason}). handle_other({conserve_resources, Source, Conserve}, State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> @@ -944,7 +977,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, helper_sup = SupPid, sock = Sock, throttle = Throttle}) -> - ok = rabbit_access_control:check_vhost_access(User, VHostPath), + ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), @@ -1046,9 +1079,12 @@ auth_phase(Response, auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of - {refused, Msg, Args} -> - auth_fail(Msg, Args, Name, State); + {refused, Username, Msg, Args} -> + auth_fail(Username, Msg, Args, Name, State); {protocol_error, Msg, Args} -> + notify_auth_result(none, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], + State), rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> Secure = #'connection.secure'{challenge = Challenge}, @@ -1057,9 +1093,12 @@ auth_phase(Response, auth_state = AuthState1}}; {ok, User = #user{username = Username}} -> case rabbit_access_control:check_user_loopback(Username, Sock) of - ok -> ok; - not_allowed -> auth_fail("user '~s' can only connect via " - "localhost", [Username], Name, State) + ok -> + notify_auth_result(Username, user_authentication_success, + [], State); + not_allowed -> + auth_fail(Username, "user '~s' can only connect via " + "localhost", [Username], Name, State) end, Tune = #'connection.tune'{frame_max = get_env(frame_max), channel_max = get_env(channel_max), @@ -1071,11 +1110,15 @@ auth_phase(Response, end. -ifdef(use_specs). --spec(auth_fail/4 :: (string(), [any()], binary(), #v1{}) -> no_return()). +-spec(auth_fail/5 :: + (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) -> + no_return()). -endif. -auth_fail(Msg, Args, AuthName, +auth_fail(Username, Msg, Args, AuthName, State = #v1{connection = #connection{protocol = Protocol, capabilities = Capabilities}}) -> + notify_auth_result(Username, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], State), AmqpError = rabbit_misc:amqp_error( access_refused, "~s login refused: ~s", [AuthName, io_lib:format(Msg, Args)], none), @@ -1094,6 +1137,16 @@ auth_fail(Msg, Args, AuthName, end, rabbit_misc:protocol_error(AmqpError). +notify_auth_result(Username, AuthResult, ExtraProps, State) -> + EventProps = [{connection_type, network}, + {name, case Username of none -> ''; _ -> Username end}] ++ + [case Item of + name -> {connection_name, i(name, State)}; + _ -> {Item, i(Item, State)} + end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++ + ExtraProps, + rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). + %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index ba48867ad0..039568df8e 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -27,7 +27,7 @@ vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, - connection/0, protocol/0, user/0, internal_user/0, + connection/0, protocol/0, auth_user/0, user/0, internal_user/0, username/0, password/0, password_hash/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0, mfargs/0, proc_name/0, @@ -131,11 +131,15 @@ -type(protocol() :: rabbit_framing:protocol()). +-type(auth_user() :: + #auth_user{username :: username(), + tags :: [atom()], + impl :: any()}). + -type(user() :: - #user{username :: username(), - tags :: [atom()], - auth_backend :: atom(), - impl :: any()}). + #user{username :: username(), + tags :: [atom()], + authz_backends :: [{atom(), any()}]}). -type(internal_user() :: #internal_user{username :: username(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 72bf7855c4..2ab6545911 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,7 +16,8 @@ -module(rabbit_upgrade). --export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0, + nodes_running/1, secondary_upgrade/1]). -include("rabbit.hrl"). @@ -122,6 +123,7 @@ remove_backup() -> maybe_upgrade_mnesia() -> AllNodes = rabbit_mnesia:cluster_nodes(all), + ok = rabbit_mnesia_rename:maybe_finish(AllNodes), case rabbit_version:upgrades_required(mnesia) of {error, starting_from_scratch} -> ok; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1da3de2671..cf18e8ea6a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, - publish/5, publish_delivered/4, discard/3, drain_confirmed/1, + publish/6, publish_delivered/5, discard/4, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -28,16 +28,34 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/6]). %%---------------------------------------------------------------------------- +%% Messages, and their position in the queue, can be in memory or on +%% disk, or both. Persistent messages will have both message and +%% position pushed to disk as soon as they arrive; transient messages +%% can be written to disk (and thus both types can be evicted from +%% memory) under memory pressure. The question of whether a message is +%% in RAM and whether it is persistent are orthogonal. +%% +%% Messages are persisted using the queue index and the message +%% store. Normally the queue index holds the position of the message +%% *within this queue* along with a couple of small bits of metadata, +%% while the message store holds the message itself (including headers +%% and other properties). +%% +%% However, as an optimisation, small messages can be embedded +%% directly in the queue index and bypass the message store +%% altogether. +%% %% Definitions: - +%% %% alpha: this is a message where both the message itself, and its %% position within the queue are held in RAM %% -%% beta: this is a message where the message itself is only held on -%% disk, but its position within the queue is held in RAM. +%% beta: this is a message where the message itself is only held on +%% disk (if persisted to the message store) but its position +%% within the queue is held in RAM. %% %% gamma: this is a message where the message itself is only held on %% disk, but its position is both in RAM and on disk. @@ -248,8 +266,9 @@ q3, q4, next_seq_id, - ram_pending_ack, - disk_pending_ack, + ram_pending_ack, %% msgs using store, still in RAM + disk_pending_ack, %% msgs in store, paged out + qi_pending_ack, %% msgs using qi, *can't* be paged out index_state, msg_store_clients, durable, @@ -285,8 +304,9 @@ msg, is_persistent, is_delivered, - msg_on_disk, + msg_in_store, index_on_disk, + persist_to, msg_props }). @@ -300,11 +320,13 @@ %% betas, the IO_BATCH_SIZE sets the number of betas that we must be %% due to write indices for before we do any work at all. -define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND +-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(QUEUE, lqueue). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). %%---------------------------------------------------------------------------- @@ -341,6 +363,7 @@ next_seq_id :: seq_id(), ram_pending_ack :: gb_trees:tree(), disk_pending_ack :: gb_trees:tree(), + qi_pending_ack :: gb_trees:tree(), index_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, @@ -426,16 +449,19 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(Queue, Recover, AsyncCallback) -> - init(Queue, Recover, AsyncCallback, - fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) - end, - fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). +init(Queue, Recover, Callback) -> + init( + Queue, Recover, Callback, + fun (MsgIds, ActionTaken) -> + msgs_written_to_disk(Callback, MsgIds, ActionTaken) + end, + fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, + fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). init(#amqqueue { name = QueueName, durable = IsDurable }, new, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> - IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> + IndexState = rabbit_queue_index:init(QueueName, + MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, @@ -446,13 +472,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, %% We can be recovering a transient queue if it crashed init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> + AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), {PersistentClient, ContainsCheckFun} = case IsDurable of true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback), - {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end}; + {C, fun (MsgId) when is_binary(MsgId) -> + rabbit_msg_store:contains(MsgId, C); + (#basic_message{is_persistent = Persistent}) -> + Persistent + end}; false -> {undefined, fun(_MsgId) -> false end} end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, @@ -461,7 +491,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - ContainsCheckFun, MsgIdxOnDiskFun), + ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -514,51 +544,30 @@ delete_and_terminate(_Reason, State) -> delete_crashed(#amqqueue{name = QName}) -> ok = rabbit_queue_index:erase(QName). -purge(State = #vqstate { q4 = Q4, - index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - ram_bytes = RamBytes, - persistent_count = PCount, - persistent_bytes = PBytes }) -> +purge(State = #vqstate { q4 = Q4, + len = Len }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - Stats = {RamBytes, PCount, PBytes}, - {Stats1, IndexState1} = - remove_queue_entries(Q4, Stats, IndexState, MSCState), - - {Stats2, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1 }} = - - purge_betas_and_deltas( - Stats1, State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - - {{RamBytes3, PCount3, PBytes3}, IndexState3} = - remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), - - {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), - index_state = IndexState3, - len = 0, - bytes = 0, - ram_msg_count = 0, - ram_bytes = RamBytes3, - persistent_count = PCount3, - persistent_bytes = PBytes3 })}. + State1 = remove_queue_entries(Q4, State), + + State2 = #vqstate { q1 = Q1 } = + purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }), + + State3 = remove_queue_entries(Q1, State2), + + {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}. purge_acks(State) -> a(purge_pending_ack(false, State)). publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + IsDelivered, _ChPid, _Flow, + State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), @@ -567,42 +576,36 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } end, InCount1 = InCount + 1, - PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_bytes( - 1, 0, MsgStatus1, - inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount1, - persistent_count = PCount1, - unconfirmed = UC1 })), + State3 = stats({1, 0}, {none, MsgStatus1}, + State2#vqstate{ next_seq_id = SeqId + 1, + in_counter = InCount1, + unconfirmed = UC1 }), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + _ChPid, _Flow, + State = #vqstate { next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), - PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_bytes(0, 1, MsgStatus, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }), + State3 = stats({0, 1}, {none, MsgStatus1}, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. -discard(_MsgId, _ChPid, State) -> State. +discard(_MsgId, _ChPid, _Flow, State) -> State. drain_confirmed(State = #vqstate { confirmed = C }) -> case gb_sets:is_empty(C) of @@ -664,7 +667,7 @@ ack([], State) -> ack([SeqId], State) -> {#msg_status { msg_id = MsgId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, @@ -674,7 +677,7 @@ ack([SeqId], State) -> true -> rabbit_queue_index:ack([SeqId], IndexState); false -> IndexState end, - case MsgOnDisk of + case MsgInStore of true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, @@ -733,15 +736,18 @@ fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, [msg_iterator(State), disk_ack_iterator(State), - ram_ack_iterator(State)]), + ram_ack_iterator(State), + qi_ack_iterator(State)]), ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> - len(State) + gb_trees:size(RPA) + gb_trees:size(DPA). +depth(State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). set_ram_duration_target( DurationTarget, State = #vqstate { @@ -807,10 +813,11 @@ ram_duration(State) -> ram_msg_count = RamMsgCount, ram_msg_count_prev = RamMsgCountPrev, ram_pending_ack = RPA, + qi_pending_ack = QPA, ram_ack_count_prev = RamAckCountPrev } = update_rates(State), - RamAckCount = gb_trees:size(RPA), + RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA), Duration = %% msgs+acks / (msgs+acks/sec) == sec case lists:all(fun (X) -> X < 0.01 end, @@ -846,8 +853,9 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> RamMsgCount; -info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) -> - gb_trees:size(RPA); +info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA, + qi_pending_ack = QPA}) -> + gb_trees:size(RPA) + gb_trees:size(QPA); info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> @@ -933,14 +941,11 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) when Start + Count =< End -> Delta. -m(MsgStatus = #msg_status { msg = Msg, - is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, +m(MsgStatus = #msg_status { is_persistent = IsPersistent, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }) -> true = (not IsPersistent) or IndexOnDisk, - true = (not IndexOnDisk) or MsgOnDisk, - true = (Msg =/= undefined) or MsgOnDisk, - + true = msg_in_ram(MsgStatus) or MsgInStore, MsgStatus. one_if(true ) -> 1; @@ -959,21 +964,39 @@ msg_status(IsPersistent, IsDelivered, SeqId, msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = false, + msg_in_store = false, index_on_disk = false, + persist_to = determine_persist_to(Msg, MsgProps), msg_props = MsgProps}. +beta_msg_status({Msg = #basic_message{id = MsgId}, + SeqId, MsgProps, IsPersistent, IsDelivered}) -> + MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), + MS0#msg_status{msg_id = MsgId, + msg = Msg, + persist_to = queue_index, + msg_in_store = false}; + beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> + MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), + MS0#msg_status{msg_id = MsgId, + msg = undefined, + persist_to = msg_store, + msg_in_store = true}. + +beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> #msg_status{seq_id = SeqId, - msg_id = MsgId, msg = undefined, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = true, index_on_disk = true, msg_props = MsgProps}. -trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }. +trim_msg_status(MsgStatus) -> + case persist_to(MsgStatus) of + msg_store -> MsgStatus#msg_status{msg = undefined}; + queue_index -> MsgStatus + end. with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(MSCStateP), @@ -1035,26 +1058,36 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> - {Filtered, Delivers, Acks} = +betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> + {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = lists:foldr( - fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1} = Acc) -> + fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, + {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1]}; - false -> case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA)) of - false -> {?QUEUE:in_r(m(beta_msg_status(M)), - Filtered1), - Delivers1, Acks1}; - true -> Acc - end + [SeqId | Acks1], RRC, RB}; + false -> MsgStatus = m(beta_msg_status(M)), + HaveMsg = msg_in_ram(MsgStatus), + Size = msg_size(MsgStatus), + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)) of + false -> {?QUEUE:in_r(MsgStatus, Filtered1), + Delivers1, Acks1, + RRC + one_if(HaveMsg), + RB + one_if(HaveMsg) * Size}; + true -> Acc %% [0] + end end - end, {?QUEUE:new(), [], []}, List), - {Filtered, rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. + end, {?QUEUE:new(), [], [], 0, 0}, List), + {Filtered, RamReadyCount, RamBytes, + rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. +%% [0] We don't increase RamBytes here, even though it pertains to +%% unacked messages too, since if HaveMsg then the message must have +%% been stored in the QI, thus the message must have been in +%% qi_pending_ack, thus it must already have been in RAM. expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); @@ -1101,6 +1134,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, next_seq_id = NextSeqId, ram_pending_ack = gb_trees:empty(), disk_pending_ack = gb_trees:empty(), + qi_pending_ack = gb_trees:empty(), index_state = IndexState1, msg_store_clients = {PersistentClient, TransientClient}, durable = IsDurable, @@ -1141,11 +1175,9 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - upd_ram_bytes( - 1, MsgStatus, - inc_ram_msg_count( - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) })) + MsgStatus1 = MsgStatus#msg_status{msg = Msg}, + stats(ready0, {MsgStatus, MsgStatus1}, + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1173,28 +1205,50 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> msg_store_read(MSCState, IsPersistent, MsgId), {Msg, State #vqstate {msg_store_clients = MSCState1}}. -inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> - State#vqstate{ram_msg_count = RamMsgCount + 1}. - -upd_bytes(SignReady, SignUnacked, - MsgStatus = #msg_status{msg = undefined}, State) -> - upd_bytes0(SignReady, SignUnacked, MsgStatus, State); -upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) -> - upd_ram_bytes(SignReady + SignUnacked, MsgStatus, - upd_bytes0(SignReady, SignUnacked, MsgStatus, State)). - -upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, - State = #vqstate{bytes = Bytes, - unacked_bytes = UBytes, - persistent_bytes = PBytes}) -> +stats(Signs, Statuses, State) -> + stats0(expand_signs(Signs), expand_statuses(Statuses), State). + +expand_signs(ready0) -> {0, 0, true}; +expand_signs({A, B}) -> {A, B, false}. + +expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; +expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; +expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. + +%% In this function at least, we are religious: the variable name +%% contains "Ready" or "Unacked" iff that is what it counts. If +%% neither is present it counts both. +stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, + {InRamBefore, InRamAfter, MsgStatus}, + State = #vqstate{len = ReadyCount, + bytes = ReadyBytes, + ram_msg_count = RamReadyCount, + persistent_count = PersistentCount, + unacked_bytes = UnackedBytes, + ram_bytes = RamBytes, + persistent_bytes = PersistentBytes}) -> S = msg_size(MsgStatus), - SignTotal = SignReady + SignUnacked, - State#vqstate{bytes = Bytes + SignReady * S, - unacked_bytes = UBytes + SignUnacked * S, - persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. - -upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> - State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. + DeltaTotal = DeltaReady + DeltaUnacked, + DeltaRam = case {InRamBefore, InRamAfter} of + {false, false} -> 0; + {false, true} -> 1; + {true, false} -> -1; + {true, true} -> 0 + end, + DeltaRamReady = case DeltaReady of + 1 -> one_if(InRamAfter); + -1 -> -one_if(InRamBefore); + 0 when ReadyMsgPaged -> DeltaRam; + 0 -> 0 + end, + DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent), + State#vqstate{len = ReadyCount + DeltaReady, + ram_msg_count = RamReadyCount + DeltaRamReady, + persistent_count = PersistentCount + DeltaPersistent, + bytes = ReadyBytes + DeltaReady * S, + unacked_bytes = UnackedBytes + DeltaUnacked * S, + ram_bytes = RamBytes + DeltaRam * S, + persistent_bytes = PersistentBytes + DeltaPersistent * S}. msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. @@ -1203,17 +1257,13 @@ msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, - msg = Msg, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, - State = #vqstate {ram_msg_count = RamMsgCount, - out_counter = OutCount, + State = #vqstate {out_counter = OutCount, index_state = IndexState, - msg_store_clients = MSCState, - len = Len, - persistent_count = PCount}) -> + msg_store_clients = MSCState}) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -1224,10 +1274,11 @@ remove(AckRequired, MsgStatus = #msg_status { ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of - {false, true, false} -> Rem(), IndexState1; - {false, true, true} -> Rem(), Ack(); - _ -> IndexState1 + IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of + {false, true, false} -> Rem(), IndexState1; + {false, true, true} -> Rem(), Ack(); + {false, false, true} -> Ack(); + _ -> IndexState1 end, %% 3. If an ack is required, add something sensible to PA @@ -1238,166 +1289,209 @@ remove(AckRequired, MsgStatus = #msg_status { {SeqId, StateN}; false -> {undefined, State} end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), State2 = case AckRequired of - false -> upd_bytes(-1, 0, MsgStatus, State1); - true -> upd_bytes(-1, 1, MsgStatus, State1) + false -> stats({-1, 0}, {MsgStatus, none}, State1); + true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1) end, {AckTag, maybe_update_rates( - State2 #vqstate {ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len - 1, - persistent_count = PCount1})}. - -purge_betas_and_deltas(Stats, - State = #vqstate { q3 = Q3, - index_state = IndexState, - msg_store_clients = MSCState }) -> + State2 #vqstate {out_counter = OutCount + 1, + index_state = IndexState2})}. + +purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) -> case ?QUEUE:is_empty(Q3) of - true -> {Stats, State}; - false -> {Stats1, IndexState1} = remove_queue_entries( - Q3, Stats, IndexState, MSCState), - purge_betas_and_deltas(Stats1, - maybe_deltas_to_betas( - State #vqstate { - q3 = ?QUEUE:new(), - index_state = IndexState1 })) + true -> State; + false -> State1 = remove_queue_entries(Q3, State), + purge_betas_and_deltas(maybe_deltas_to_betas( + State1#vqstate{q3 = ?QUEUE:new()})) end. -remove_queue_entries(Q, {RamBytes, PCount, PBytes}, - IndexState, MSCState) -> - {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} = +remove_queue_entries(Q, State = #vqstate{index_state = IndexState, + msg_store_clients = MSCState}) -> + {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), RamBytes, PBytes, [], []}, Q), + {orddict:new(), [], [], State}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {{RamBytes1, - PCount - case orddict:find(true, MsgIdsByStore) of - error -> 0; - {ok, Ids} -> length(Ids) - end, - PBytes1}, - rabbit_queue_index:ack(Acks, - rabbit_queue_index:deliver(Delivers, IndexState))}. + IndexState1 = rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState)), + State1#vqstate{index_state = IndexState1}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, - is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent, - msg_props = #message_properties { size = Size } }, - {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> - {case MsgOnDisk of + #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, + is_persistent = IsPersistent} = MsgStatus, + {MsgIdsByStore, Delivers, Acks, State}) -> + {case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - RamBytes - Size * one_if(Msg =/= undefined), - PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - cons_if(IndexOnDisk, SeqId, Acks)}. + cons_if(IndexOnDisk, SeqId, Acks), + stats({-1, 0}, {MsgStatus, none}, State)}. %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_on_disk = true }, _MSCState) -> + msg_in_store = true }, _MSCState) -> MsgStatus; maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1), - MsgStatus #msg_status { msg_on_disk = true }; + case persist_to(MsgStatus) of + msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, + prepare_to_store(Msg)), + MsgStatus#msg_status{msg_in_store = true}; + queue_index -> MsgStatus + end; maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) -> MsgStatus. maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { - index_on_disk = true }, IndexState) -> - true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - {MsgStatus, IndexState}; + index_on_disk = true }, State) -> + {MsgStatus, State}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_props = MsgProps}, IndexState) + msg_props = MsgProps}, + State = #vqstate{target_ram_count = TargetRamCount, + index_state = IndexState}) when Force orelse IsPersistent -> - true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION IndexState1 = rabbit_queue_index:publish( - MsgId, SeqId, MsgProps, IsPersistent, IndexState), - {MsgStatus #msg_status { index_on_disk = true }, - maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; -maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> - {MsgStatus, IndexState}. + case persist_to(MsgStatus) of + msg_store -> MsgId; + queue_index -> prepare_to_store(Msg) + end, SeqId, MsgProps, IsPersistent, TargetRamCount, + IndexState), + IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1), + {MsgStatus#msg_status{index_on_disk = true}, + State#vqstate{index_state = IndexState2}}; + +maybe_write_index_to_disk(_Force, MsgStatus, State) -> + {MsgStatus, State}. maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> + State = #vqstate { msg_store_clients = MSCState }) -> MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState), - {MsgStatus2, IndexState1} = - maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), - {MsgStatus2, State #vqstate { index_state = IndexState1 }}. + maybe_write_index_to_disk(ForceIndex, MsgStatus1, State). + +determine_persist_to(#basic_message{ + content = #content{properties = Props, + properties_bin = PropsBin}}, + #message_properties{size = BodySize}) -> + {ok, IndexMaxSize} = application:get_env( + rabbit, queue_index_embed_msgs_below), + %% The >= is so that you can set the env to 0 and never persist + %% to the index. + %% + %% We want this to be fast, so we avoid size(term_to_binary()) + %% here, or using the term size estimation from truncate.erl, both + %% of which are too slow. So instead, if the message body size + %% goes over the limit then we avoid any other checks. + %% + %% If it doesn't we need to decide if the properties will push + %% it past the limit. If we have the encoded properties (usual + %% case) we can just check their size. If we don't (message came + %% via the direct client), we make a guess based on the number of + %% headers. + case BodySize >= IndexMaxSize of + true -> msg_store; + false -> Est = case is_binary(PropsBin) of + true -> BodySize + size(PropsBin); + false -> #'P_basic'{headers = Hs} = Props, + case Hs of + undefined -> 0; + _ -> length(Hs) + end * ?HEADER_GUESS_SIZE + BodySize + end, + case Est >= IndexMaxSize of + true -> msg_store; + false -> queue_index + end + end. + +persist_to(#msg_status{persist_to = To}) -> To. + +prepare_to_store(Msg) -> + Msg#basic_message{ + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}. %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus, +record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, ack_in_counter = AckInCount}) -> - {RPA1, DPA1} = - case Msg of - undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)}; - _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA} + Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, + {RPA1, DPA1, QPA1} = + case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of + {false, _} -> {RPA, Insert(DPA), QPA}; + {_, queue_index} -> {RPA, DPA, Insert(QPA)}; + {_, msg_store} -> {Insert(RPA), DPA, QPA} end, State #vqstate { ram_pending_ack = RPA1, disk_pending_ack = DPA1, + qi_pending_ack = QPA1, ack_in_counter = AckInCount + 1}. lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> V; - none -> gb_trees:get(SeqId, DPA) + none -> case gb_trees:lookup(SeqId, DPA) of + {value, V} -> V; + none -> gb_trees:get(SeqId, QPA) + end end. -%% First parameter = UpdatePersistentCount +%% First parameter = UpdateStats remove_pending_ack(true, SeqId, State) -> - {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = - remove_pending_ack(false, SeqId, State), - PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), - {MsgStatus, upd_bytes(0, -1, MsgStatus, - State1 # vqstate{ persistent_count = PCount1 })}; -remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> + {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), + {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; +remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; - none -> DPA1 = gb_trees:delete(SeqId, DPA), - {gb_trees:get(SeqId, DPA), - State #vqstate { disk_pending_ack = DPA1 }} + none -> case gb_trees:lookup(SeqId, DPA) of + {value, V} -> + DPA1 = gb_trees:delete(SeqId, DPA), + {V, State#vqstate{disk_pending_ack = DPA1}}; + none -> + QPA1 = gb_trees:delete(SeqId, QPA), + {gb_trees:get(SeqId, QPA), + State#vqstate{qi_pending_ack = QPA1}} + end end. purge_pending_ack(KeepPersistent, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, index_state = IndexState, msg_store_clients = MSCState }) -> F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA), + F, rabbit_misc:gb_trees_fold( + F, rabbit_misc:gb_trees_fold( + F, accumulate_ack_init(), RPA), DPA), QPA), State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty() }, + disk_pending_ack = gb_trees:empty(), + qi_pending_ack = gb_trees:empty()}, case KeepPersistent of true -> case orddict:find(false, MsgIdsByStore) of @@ -1418,11 +1512,11 @@ accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, + msg_in_store = MsgInStore, index_on_disk = IndexOnDisk }, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), - case MsgOnDisk of + case MsgInStore of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, @@ -1469,29 +1563,25 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> gb_sets:union(MIOD, Confirmed) }) end). +msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). + %%---------------------------------------------------------------------------- %% Internal plumbing for requeue %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), - {MsgStatus#msg_status { msg = Msg }, - upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1] + MsgStatus1 = MsgStatus#msg_status { msg = Msg }, + {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)}; publish_alpha(MsgStatus, State) -> - {MsgStatus, inc_ram_msg_count(State)}. -%% [1] We increase the ram_bytes here because we paged the message in -%% to requeue it, not purely because we requeued it. Hence in the -%% second head it's already accounted for as already in memory. OTOH -%% ram_msg_count does not include unacked messages, so it needs -%% incrementing in both heads. + {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of - true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; - _ -> {MsgStatus2, State1} - end. + {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1513,7 +1603,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2)) + Limit, PubFun, State2) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1527,13 +1617,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - State3 = - case msg_in_ram(MsgStatus) of - false -> State2; - true -> upd_ram_bytes(-1, MsgStatus, State2) - end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - upd_bytes(1, -1, MsgStatus, State3)} + stats({1, -1}, {MsgStatus, none}, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 @@ -1563,6 +1648,9 @@ ram_ack_iterator(State) -> disk_ack_iterator(State) -> {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. +qi_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}. + msg_iterator(State) -> istate(start, State). istate(start, State) -> {q4, State#vqstate.q4, State}; @@ -1592,7 +1680,8 @@ next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of + gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse + gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of false -> Next = {delta, Delta, Rest, State}, {value, beta_msg_status(M), false, Next, IndexState}; true -> next({delta, Delta, Rest, State}, IndexState) @@ -1689,12 +1778,12 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, - upd_ram_bytes( - -1, MsgStatus1, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 })) + stats({0, 0}, {MsgStatus, MsgStatus2}, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1755,8 +1844,11 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes, ram_pending_ack = RPA, disk_pending_ack = DPA, + qi_pending_ack = QPA, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, @@ -1766,9 +1858,12 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, IndexState1), - State1 = State #vqstate { index_state = IndexState2 }, + {Q3a, RamCountsInc, RamBytesInc, IndexState2} = + betas_from_index_entries(List, TransientThreshold, + RPA, DPA, QPA, IndexState1), + State1 = State #vqstate { index_state = IndexState2, + ram_msg_count = RamMsgCount + RamCountsInc, + ram_bytes = RamBytes + RamBytesInc }, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being @@ -1826,26 +1921,21 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, - State1 = #vqstate { ram_msg_count = RamMsgCount }} = + {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer( - MsgStatus2, Qa, - upd_ram_bytes( - -1, MsgStatus2, - State1 #vqstate { - ram_msg_count = RamMsgCount - 1})), + State2 = stats( + ready0, {MsgStatus, MsgStatus2}, State1), + State3 = Consumer(MsgStatus2, Qa, State2), push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State2) + Qa, State3) end end. -push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState }) -> - PushState = {Quota, Delta, IndexState}, +push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3}) -> + PushState = {Quota, Delta, State}, {Q3a, PushState1} = push_betas_to_deltas( fun ?QUEUE:out_r/1, fun rabbit_queue_index:next_segment_boundary/1, @@ -1854,11 +1944,10 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, fun ?QUEUE:out/1, fun (Q2MinSeqId) -> Q2MinSeqId end, Q2, PushState1), - {_, Delta1, IndexState1} = PushState2, - State #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a, - index_state = IndexState1 }. + {_, Delta1, State1} = PushState2, + State1 #vqstate { q2 = Q2a, + delta = Delta1, + q3 = Q3a }. push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of @@ -1874,11 +1963,9 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> end end. -push_betas_to_deltas1(_Generator, _Limit, Q, - {0, _Delta, _IndexState} = PushState) -> +push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) -> {Q, PushState}; -push_betas_to_deltas1(Generator, Limit, Q, - {Quota, Delta, IndexState} = PushState) -> +push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) -> case Generator(Q) of {empty, _Q} -> {Q, PushState}; @@ -1886,11 +1973,12 @@ push_betas_to_deltas1(Generator, Limit, Q, when SeqId < Limit -> {Q, PushState}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> - {#msg_status { index_on_disk = true }, IndexState1} = - maybe_write_index_to_disk(true, MsgStatus, IndexState), + {#msg_status { index_on_disk = true }, State1} = + maybe_write_index_to_disk(true, MsgStatus, State), + State2 = stats(ready0, {MsgStatus, none}, State1), Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, IndexState1}) + {Quota - 1, Delta1, State2}) end. %%---------------------------------------------------------------------------- diff --git a/src/truncate.erl b/src/truncate.erl index 820af1bf86..1b4d957d0b 100644 --- a/src/truncate.erl +++ b/src/truncate.erl @@ -45,7 +45,7 @@ report(List, Params) when is_list(List) -> [case Item of report(Other, Params) -> term(Other, Params). term(Thing, {Max, {Content, Struct, ContentDec, StructDec}}) -> - case term_limit(Thing, Max) of + case exceeds_size(Thing, Max) of true -> term(Thing, true, #params{content = Content, struct = Struct, content_dec = ContentDec, @@ -93,7 +93,7 @@ shrink_list([H|T], #params{content = Content, %% sizes. This is all going to be rather approximate though, these %% sizes are probably not very "fair" but we are just trying to see if %% we reach a fairly arbitrary limit anyway though. -term_limit(Thing, Max) -> +exceeds_size(Thing, Max) -> case term_size(Thing, Max, erlang:system_info(wordsize)) of limit_exceeded -> true; _ -> false diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index ef6b756b14..c26dcf32aa 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -1292,11 +1292,9 @@ test_spawn_remote() -> end. user(Username) -> - #user{username = Username, - tags = [administrator], - auth_backend = rabbit_auth_backend_internal, - impl = #internal_user{username = Username, - tags = [administrator]}}. + #user{username = Username, + tags = [administrator], + authz_backends = [{rabbit_auth_backend_internal, none}]}. test_confirms() -> {_Writer, Ch} = test_spawn(), @@ -1890,11 +1888,15 @@ test_backing_queue() -> passed = test_msg_store(), application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit), - passed = test_queue_index(), - passed = test_queue_index_props(), - passed = test_variable_queue(), - passed = test_variable_queue_delete_msg_store_files_callback(), - passed = test_queue_recover(), + [begin + application:set_env( + rabbit, queue_index_embed_msgs_below, Bytes), + passed = test_queue_index(), + passed = test_queue_index_props(), + passed = test_variable_queue(), + passed = test_variable_queue_delete_msg_store_files_callback(), + passed = test_queue_recover() + end || Bytes <- [0, 1024]], application:set_env(rabbit, queue_index_max_journal_entries, MaxJournal), %% We will have restarted the message store, and thus changed @@ -2221,7 +2223,7 @@ init_test_queue() -> fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, - fun nop/1), + fun nop/1, fun nop/1), ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), Res. @@ -2260,7 +2262,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> MsgId = rabbit_guid:gen(), QiM = rabbit_queue_index:publish( MsgId, SeqId, #message_properties{size = 10}, - Persistent, QiN), + Persistent, infinity, QiN), ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} end, {Qi, []}, SeqIds), @@ -2283,7 +2285,8 @@ test_queue_index_props() -> fun(Qi0) -> MsgId = rabbit_guid:gen(), Props = #message_properties{expiry=12345, size = 10}, - Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), + Qi1 = rabbit_queue_index:publish( + MsgId, 1, Props, true, infinity, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), Qi2 @@ -2424,7 +2427,7 @@ variable_queue_init(Q, Recover) -> Q, case Recover of true -> non_clean_shutdown; false -> new - end, fun nop/2, fun nop/2, fun nop/1). + end, fun nop/2, fun nop/2, fun nop/1, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). @@ -2446,7 +2449,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> end}, PayloadFun(N)), PropFun(N, #message_properties{size = 10}), - false, self(), VQN) + false, self(), noflow, VQN) end, VQ, lists:seq(Start, Start + Count - 1))). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2464,7 +2467,10 @@ variable_queue_set_ram_duration_target(Duration, VQ) -> rabbit_variable_queue:set_ram_duration_target(Duration, VQ)). assert_prop(List, Prop, Value) -> - Value = proplists:get_value(Prop, List). + case proplists:get_value(Prop, List)of + Value -> ok; + _ -> {exit, Prop, exp, Value, List} + end. assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. @@ -2487,12 +2493,18 @@ with_fresh_variable_queue(Fun) -> {delta, undefined, 0, undefined}}, {q3, 0}, {q4, 0}, {len, 0}]), - _ = rabbit_variable_queue:delete_and_terminate( - shutdown, Fun(VQ)), - Me ! Ref + try + _ = rabbit_variable_queue:delete_and_terminate( + shutdown, Fun(VQ)), + Me ! Ref + catch + Type:Error -> + Me ! {Ref, Type, Error, erlang:get_stacktrace()} + end end), receive - Ref -> ok + Ref -> ok; + {Ref, Type, Error, ST} -> exit({Type, Error, ST}) end, passed. @@ -2503,7 +2515,8 @@ publish_and_confirm(Q, Payload, Count) -> <<>>, #'P_basic'{delivery_mode = 2}, Payload), Delivery = #delivery{mandatory = false, sender = self(), - confirm = true, message = Msg, msg_seq_no = Seq}, + confirm = true, message = Msg, msg_seq_no = Seq, + flow = noflow}, _QPids = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). @@ -2789,8 +2802,6 @@ test_variable_queue_dynamic_duration_change(VQ0) -> VQ7 = lists:foldl( fun (Duration1, VQ4) -> {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), - io:format("~p:~n~p~n", - [Duration1, variable_queue_status(VQ5)]), VQ6 = variable_queue_set_ram_duration_target( Duration1, VQ5), publish_fetch_and_ack(Churn, Len, VQ6) @@ -2851,7 +2862,6 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> check_variable_queue_status(VQ0, Props) -> VQ1 = variable_queue_wait_for_shuffling_end(VQ0), S = variable_queue_status(VQ1), - io:format("~p~n", [S]), assert_props(S, Props), VQ1. |
