summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-22 12:06:39 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-22 12:06:39 +0000
commit358ccf68180357b4471ce06f87828b9f4d89a214 (patch)
treee30d74bedee55a2aeea649af1e8325e3a2510e45
parent86895beb9c059fd9a3f56cf2f014a64dc2c8c7aa (diff)
parent9b833f66a6323566dc4a9e8e9395696e6a2c36b1 (diff)
downloadrabbitmq-server-git-358ccf68180357b4471ce06f87828b9f4d89a214.tar.gz
stable to default
-rw-r--r--docs/rabbitmq.config.example4
-rw-r--r--docs/rabbitmqctl.1.xml35
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl13
-rw-r--r--packaging/windows/Makefile4
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat10
-rwxr-xr-xscripts/rabbitmqctl5
-rwxr-xr-xscripts/rabbitmqctl.bat12
-rw-r--r--src/file_handle_cache.erl170
-rw-r--r--src/file_handle_cache_stats.erl67
-rw-r--r--src/rabbit.erl30
-rw-r--r--src/rabbit_access_control.erl126
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl30
-rw-r--r--src/rabbit_auth_backend_dummy.erl25
-rw-r--r--src/rabbit_auth_backend_internal.erl34
-rw-r--r--src/rabbit_auth_mechanism.erl4
-rw-r--r--src/rabbit_authn_backend.erl49
-rw-r--r--src/rabbit_authz_backend.erl (renamed from src/rabbit_auth_backend.erl)40
-rw-r--r--src/rabbit_backing_queue.erl12
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_binary_parser.erl102
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_cli.erl21
-rw-r--r--src/rabbit_control_main.erl35
-rw-r--r--src/rabbit_direct.erl17
-rw-r--r--src/rabbit_epmd_monitor.erl101
-rw-r--r--src/rabbit_log.erl11
-rw-r--r--src/rabbit_mirror_queue_master.erl37
-rw-r--r--src/rabbit_mirror_queue_slave.erl34
-rw-r--r--src/rabbit_mirror_queue_sync.erl5
-rw-r--r--src/rabbit_misc.erl39
-rw-r--r--src/rabbit_mnesia.erl48
-rw-r--r--src/rabbit_mnesia_rename.erl267
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_nodes.erl20
-rw-r--r--src/rabbit_queue_index.erl411
-rw-r--r--src/rabbit_reader.erl87
-rw-r--r--src/rabbit_types.erl14
-rw-r--r--src/rabbit_upgrade.erl4
-rw-r--r--src/rabbit_variable_queue.erl458
-rw-r--r--src/truncate.erl4
-rw-r--r--test/src/rabbit_tests.erl53
44 files changed, 1806 insertions, 672 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 63540568f1..fdb166ea70 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -33,8 +33,8 @@
%% {handshake_timeout, 10000},
%% Log levels (currently just used for connection logging).
- %% One of 'info', 'warning', 'error' or 'none', in decreasing order
- %% of verbosity. Defaults to 'info'.
+ %% One of 'debug', 'info', 'warning', 'error' or 'none', in decreasing
+ %% order of verbosity. Defaults to 'info'.
%%
%% {log_levels, [{connection, info}]},
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 8d04f28a26..8d042670a8 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -426,6 +426,41 @@
</listitem>
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>rename_cluster_node</command> <arg choice="req">oldnode1</arg> <arg choice="req">newnode1</arg> <arg choice="opt">oldnode2</arg> <arg choice="opt">newnode2 ...</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Supports renaming of cluster nodes in the local database.
+ </para>
+ <para>
+ This subcommand causes rabbitmqctl to temporarily become
+ the node in order to make the change. The local cluster
+ node must therefore be completely stopped; other nodes
+ can be online or offline.
+ </para>
+ <para>
+ This subcommand takes an even number of arguments, in
+ pairs representing the old and new names for nodes. You
+ must specify the old and new names for this node and for
+ any other nodes that are stopped and being renamed at
+ the same time.
+ </para>
+ <para>
+ It is possible to stop all nodes and rename them all
+ simultaneously (in which case old and new names for all
+ nodes must be given to every node) or stop and rename
+ nodes one at a time (in which case each node only needs
+ to be told how its own name is changing).
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl rename_cluster_node rabbit@misshelpful rabbit@cordelia</screen>
+ <para role="example">
+ This command will rename the node
+ <command>rabbit@misshelpful</command> to the node
+ <command>rabbit@cordelia</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
<term><cmdsynopsis><command>update_cluster_nodes</command> <arg choice="req">clusternode</arg></cmdsynopsis>
</term>
<listitem>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9e5584a1bf..5ebef60828 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -29,6 +29,7 @@
{heartbeat, 580},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 65536},
+ {queue_index_embed_msgs_below, 1024},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_user_tags, [administrator]},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 74e165cd9b..b925dffc01 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -14,12 +14,17 @@
%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
%%
+%% Passed around most places
-record(user, {username,
tags,
- auth_backend, %% Module this user came from
- impl %% Scratch space for that module
- }).
+ authz_backends}). %% List of {Module, AuthUserImpl} pairs
+%% Passed to auth backends
+-record(auth_user, {username,
+ tags,
+ impl}).
+
+%% Implementation for the internal auth backend
-record(internal_user, {username, password_hash, tags}).
-record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
@@ -83,7 +88,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, confirm, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, confirm, sender, message, msg_seq_no, flow}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, reference = undefined, timestamp}).
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index 53fc31fcfd..5dc802a8c2 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -7,9 +7,9 @@ dist:
tar -zxf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR)
- mkdir $(SOURCE_DIR)/sbin
+ mkdir -p $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/*.bat $(SOURCE_DIR)/sbin
- mkdir $(SOURCE_DIR)/etc
+ mkdir -p $(SOURCE_DIR)/etc
cp $(SOURCE_DIR)/docs/rabbitmq.config.example $(SOURCE_DIR)/etc/rabbitmq.config.example
cp README-etc $(SOURCE_DIR)/etc/README.txt
rm -rf $(SOURCE_DIR)/scripts
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 29c4b0c86a..22fec32f0a 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -19,11 +19,11 @@
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
+RABBITMQ_USE_LONGNAME=${RABBITMQ_USE_LONGNAME} \
exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
- ${RABBITMQ_NAME_TYPE} rabbitmq-plugins$$ \
-boot "${CLEAN_BOOT_FILE}" \
-s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 1ed648b0bc..b27a8586b2 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -23,14 +23,6 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
-if "!RABBITMQ_USE_LONGNAME!"=="" (
- set RABBITMQ_NAME_TYPE="-sname"
-)
-
-if "!RABBITMQ_USE_LONGNAME!"=="true" (
- set RABBITMQ_NAME_TYPE="-name"
-)
-
if "!RABBITMQ_SERVICENAME!"=="" (
set RABBITMQ_SERVICENAME=RabbitMQ
)
@@ -63,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_NAME_TYPE! rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index d437e251ab..2237275c47 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -19,11 +19,6 @@
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-# rabbitmqctl starts distribution itself, so we need to make sure epmd
-# is running.
-${ERL_DIR}erl ${RABBITMQ_NAME_TYPE} rabbitmqctl-prelaunch-$$ -noinput \
--eval 'erlang:halt().' -boot "${CLEAN_BOOT_FILE}"
-
# We specify Mnesia dir and sasl error logger since some actions
# (e.g. forget_cluster_node --offline) require us to impersonate the
# real node.
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 6eb2530a4e..a3734088f6 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -27,14 +27,6 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
-if "!RABBITMQ_USE_LONGNAME!"=="" (
- set RABBITMQ_NAME_TYPE="-sname"
-)
-
-if "!RABBITMQ_USE_LONGNAME!"=="true" (
- set RABBITMQ_NAME_TYPE="-name"
-)
-
if "!COMPUTERNAME!"=="" (
set COMPUTERNAME=localhost
)
@@ -63,10 +55,6 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-rem rabbitmqctl starts distribution itself, so we need to make sure epmd
-rem is running.
-"!ERLANG_HOME!\bin\erl.exe" !RABBITMQ_NAME_TYPE! rabbitmqctl-prelaunch-!RANDOM!!TIME:~9! -noinput -eval "erlang:halt()."
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!TDP0!..\ebin" ^
-noinput ^
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3a7a692c5c..5a916c75bc 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -178,6 +178,10 @@
write_buffer_size,
write_buffer_size_limit,
write_buffer,
+ read_buffer,
+ read_buffer_pos,
+ read_buffer_rem, %% Num of bytes from pos to end
+ read_buffer_size_limit,
at_eof,
path,
mode,
@@ -237,7 +241,8 @@
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
(file:filename(), [any()],
- [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
+ [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} |
+ {'read_buffer', (non_neg_integer() | 'unbuffered')}])
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
@@ -331,16 +336,48 @@ close(Ref) ->
read(Ref, Count) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
- ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case prim_file:read(Hdl, Count) of
- {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
- {Obj,
- [Handle #handle { offset = Offset1 }]};
- eof -> {eof, [Handle #handle { at_eof = true }]};
- Error -> {Error, [Handle]}
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ offset = Offset}]) when BufRem >= Count ->
+ <<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf,
+ {{ok, Res}, [Handle#handle{offset = Offset + Count,
+ read_buffer_pos = BufPos + Count,
+ read_buffer_rem = BufRem - Count}]};
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ read_buffer_size_limit = BufSzLimit,
+ hdl = Hdl,
+ offset = Offset}]) ->
+ WantedCount = Count - BufRem,
+ case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of
+ {ok, Data} ->
+ <<_:BufPos/binary, BufTl/binary>> = Buf,
+ ReadCount = size(Data),
+ case ReadCount < WantedCount of
+ true ->
+ OffSet1 = Offset + BufRem + ReadCount,
+ {{ok, <<BufTl/binary, Data/binary>>},
+ [reset_read_buffer(
+ Handle#handle{offset = OffSet1})]};
+ false ->
+ <<Hd:WantedCount/binary, _/binary>> = Data,
+ OffSet1 = Offset + BufRem + WantedCount,
+ BufRem1 = ReadCount - WantedCount,
+ {{ok, <<BufTl/binary, Hd/binary>>},
+ [Handle#handle{offset = OffSet1,
+ read_buffer = Data,
+ read_buffer_pos = WantedCount,
+ read_buffer_rem = BufRem1}]}
+ end;
+ eof ->
+ {eof, [Handle #handle { at_eof = true }]};
+ Error ->
+ {Error, [reset_read_buffer(Handle)]}
end
end).
@@ -355,7 +392,7 @@ append(Ref, Data) ->
write_buffer_size_limit = 0,
at_eof = true } = Handle1} ->
Offset1 = Offset + iolist_size(Data),
- {prim_file:write(Hdl, Data),
+ {prim_file_write(Hdl, Data),
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
@@ -377,12 +414,12 @@ append(Ref, Data) ->
sync(Ref) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
ok;
([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
- case prim_file:sync(Hdl) of
+ case prim_file_sync(Hdl) of
ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
@@ -397,7 +434,7 @@ needs_sync(Ref) ->
position(Ref, NewOffset) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
{Result, [Handle1]}
end).
@@ -465,8 +502,8 @@ clear(Ref) ->
fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
ok;
([Handle]) ->
- case maybe_seek(bof, Handle #handle { write_buffer = [],
- write_buffer_size = 0 }) of
+ case maybe_seek(bof, Handle#handle{write_buffer = [],
+ write_buffer_size = 0}) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
@@ -539,6 +576,21 @@ info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
%% Internal functions
%%----------------------------------------------------------------------------
+prim_file_read(Hdl, Size) ->
+ file_handle_cache_stats:update(
+ io_read, Size, fun() -> prim_file:read(Hdl, Size) end).
+
+prim_file_write(Hdl, Bytes) ->
+ file_handle_cache_stats:update(
+ io_write, iolist_size(Bytes), fun() -> prim_file:write(Hdl, Bytes) end).
+
+prim_file_sync(Hdl) ->
+ file_handle_cache_stats:update(io_sync, fun() -> prim_file:sync(Hdl) end).
+
+prim_file_position(Hdl, NewOffset) ->
+ file_handle_cache_stats:update(
+ io_seek, fun() -> prim_file:position(Hdl, NewOffset) end).
+
is_reader(Mode) -> lists:member(read, Mode).
is_writer(Mode) -> lists:member(write, Mode).
@@ -550,8 +602,15 @@ append_to_write(Mode) ->
end.
with_handles(Refs, Fun) ->
+ with_handles(Refs, reset, Fun).
+
+with_handles(Refs, ReadBuffer, Fun) ->
case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
- {ok, Handles} ->
+ {ok, Handles0} ->
+ Handles = case ReadBuffer of
+ reset -> [reset_read_buffer(H) || H <- Handles0];
+ keep -> Handles0
+ end,
case Fun(Handles) of
{Result, Handles1} when is_list(Handles1) ->
lists:zipwith(fun put_handle/2, Refs, Handles1),
@@ -564,8 +623,11 @@ with_handles(Refs, Fun) ->
end.
with_flushed_handles(Refs, Fun) ->
+ with_flushed_handles(Refs, reset, Fun).
+
+with_flushed_handles(Refs, ReadBuffer, Fun) ->
with_handles(
- Refs,
+ Refs, ReadBuffer,
fun (Handles) ->
case lists:foldl(
fun (Handle, {ok, HandlesAcc}) ->
@@ -611,20 +673,23 @@ reopen([], Tree, RefHdls) ->
{ok, lists:reverse(RefHdls)};
reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
path = Path,
- mode = Mode,
+ mode = Mode0,
offset = Offset,
last_used_at = undefined }} |
RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- case prim_file:open(Path, case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end) of
+ Mode = case NewOrReopen of
+ new -> Mode0;
+ reopen -> file_handle_cache_stats:update(io_reopen),
+ [read | Mode0]
+ end,
+ case prim_file:open(Path, Mode) of
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
- maybe_seek(Offset, Handle #handle { hdl = Hdl,
- offset = 0,
- last_used_at = Now }),
+ maybe_seek(Offset, reset_read_buffer(
+ Handle#handle{hdl = Hdl,
+ offset = 0,
+ last_used_at = Now})),
put({Ref, fhc_handle}, Handle1),
reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
[{Ref, Handle1} | RefHdls]);
@@ -709,6 +774,11 @@ new_closed_handle(Path, Mode, Options) ->
infinity -> infinity;
N when is_integer(N) -> N
end,
+ ReadBufferSize =
+ case proplists:get_value(read_buffer, Options, unbuffered) of
+ unbuffered -> 0;
+ N2 when is_integer(N2) -> N2
+ end,
Ref = make_ref(),
put({Ref, fhc_handle}, #handle { hdl = closed,
offset = 0,
@@ -716,6 +786,10 @@ new_closed_handle(Path, Mode, Options) ->
write_buffer_size = 0,
write_buffer_size_limit = WriteBufferSize,
write_buffer = [],
+ read_buffer = <<>>,
+ read_buffer_pos = 0,
+ read_buffer_rem = 0,
+ read_buffer_size_limit = ReadBufferSize,
at_eof = false,
path = Path,
mode = Mode,
@@ -742,7 +816,7 @@ soft_close(Handle) ->
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
- true -> prim_file:sync(Hdl);
+ true -> prim_file_sync(Hdl);
false -> ok
end,
ok = prim_file:close(Hdl),
@@ -776,17 +850,31 @@ hard_close(Handle) ->
Result
end.
-maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
- at_eof = AtEoF }) ->
- {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
- case (case NeedsSeek of
- true -> prim_file:position(Hdl, NewOffset);
- false -> {ok, Offset}
- end) of
- {ok, Offset1} = Result ->
- {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
- {error, _} = Error ->
- {Error, Handle}
+maybe_seek(New, Handle = #handle{hdl = Hdl,
+ offset = Old,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ at_eof = AtEoF}) ->
+ {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Old, New),
+ case NeedsSeek of
+ true when is_number(New) andalso
+ ((New >= Old andalso New =< BufRem + Old)
+ orelse (New < Old andalso Old - New =< BufPos)) ->
+ Diff = New - Old,
+ {{ok, New}, Handle#handle{offset = New,
+ at_eof = AtEoF1,
+ read_buffer_pos = BufPos + Diff,
+ read_buffer_rem = BufRem - Diff}};
+ true ->
+ case prim_file_position(Hdl, New) of
+ {ok, Offset1} = Result ->
+ {Result, reset_read_buffer(Handle#handle{offset = Offset1,
+ at_eof = AtEoF1})};
+ {error, _} = Error ->
+ {Error, Handle}
+ end;
+ false ->
+ {{ok, Old}, Handle}
end.
needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};
@@ -817,7 +905,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
at_eof = true }) ->
- case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
+ case prim_file_write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, is_dirty = true,
@@ -826,6 +914,11 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
+reset_read_buffer(Handle) ->
+ Handle#handle{read_buffer = <<>>,
+ read_buffer_pos = 0,
+ read_buffer_rem = 0}.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(total_limit, #fhc_state{limit = Limit}) -> Limit;
@@ -843,6 +936,7 @@ used(#fhc_state{open_count = C1,
%%----------------------------------------------------------------------------
init([AlarmSet, AlarmClear]) ->
+ file_handle_cache_stats:init(),
Limit = case application:get_env(file_handles_high_watermark) of
{ok, Watermark} when (is_integer(Watermark) andalso
Watermark > 0) ->
diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl
new file mode 100644
index 0000000000..de2f90c67a
--- /dev/null
+++ b/src/file_handle_cache_stats.erl
@@ -0,0 +1,67 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(file_handle_cache_stats).
+
+%% stats about read / write operations that go through the fhc.
+
+-export([init/0, update/3, update/2, update/1, get/0]).
+
+-define(TABLE, ?MODULE).
+
+-define(COUNT,
+ [io_reopen, mnesia_ram_tx, mnesia_disk_tx,
+ msg_store_read, msg_store_write,
+ queue_index_journal_write, queue_index_write, queue_index_read]).
+-define(COUNT_TIME, [io_sync, io_seek]).
+-define(COUNT_TIME_BYTES, [io_read, io_write]).
+
+init() ->
+ ets:new(?TABLE, [public, named_table]),
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME_BYTES,
+ Counter <- [count, bytes, time]],
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME,
+ Counter <- [count, time]],
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT,
+ Counter <- [count]].
+
+update(Op, Bytes, Thunk) ->
+ {Time, Res} = timer_tc(Thunk),
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ets:update_counter(?TABLE, {Op, bytes}, Bytes),
+ ets:update_counter(?TABLE, {Op, time}, Time),
+ Res.
+
+update(Op, Thunk) ->
+ {Time, Res} = timer_tc(Thunk),
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ets:update_counter(?TABLE, {Op, time}, Time),
+ Res.
+
+update(Op) ->
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ok.
+
+get() ->
+ lists:sort(ets:tab2list(?TABLE)).
+
+%% TODO timer:tc/1 was introduced in R14B03; use that function once we
+%% require that version.
+timer_tc(Thunk) ->
+ T1 = os:timestamp(),
+ Res = Thunk(),
+ T2 = os:timestamp(),
+ {timer:now_diff(T2, T1), Res}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 664da20688..ba78b1eee0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -118,6 +118,13 @@
{requires, [rabbit_alarm, guid_generator]},
{enables, core_initialized}]}).
+-rabbit_boot_step({rabbit_epmd_monitor,
+ [{description, "epmd monitor"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_epmd_monitor]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({core_initialized,
[{description, "core initialized"},
{requires, kernel_ready}]}).
@@ -243,15 +250,19 @@ maybe_hipe_compile() ->
{ok, Want} = application:get_env(rabbit, hipe_compile),
Can = code:which(hipe) =/= non_existing,
case {Want, Can} of
- {true, true} -> hipe_compile(),
- true;
+ {true, true} -> hipe_compile();
{true, false} -> false;
- {false, _} -> true
+ {false, _} -> {ok, disabled}
end.
-warn_if_hipe_compilation_failed(true) ->
+log_hipe_result({ok, disabled}) ->
ok;
-warn_if_hipe_compilation_failed(false) ->
+log_hipe_result({ok, Count, Duration}) ->
+ rabbit_log:info(
+ "HiPE in use: compiled ~B modules in ~Bs.~n", [Count, Duration]);
+log_hipe_result(false) ->
+ io:format(
+ "~nNot HiPE compiling: HiPE not found in this Erlang installation.~n"),
rabbit_log:warning(
"Not HiPE compiling: HiPE not found in this Erlang installation.~n").
@@ -276,8 +287,9 @@ hipe_compile() ->
{'DOWN', MRef, process, _, Reason} -> exit(Reason)
end || {_Pid, MRef} <- PidMRefs],
T2 = erlang:now(),
- io:format("|~n~nCompiled ~B modules in ~Bs~n",
- [Count, timer:now_diff(T2, T1) div 1000000]).
+ Duration = timer:now_diff(T2, T1) div 1000000,
+ io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]),
+ {ok, Count, Duration}.
split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
@@ -307,9 +319,9 @@ start() ->
boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
- Success = maybe_hipe_compile(),
+ HipeResult = maybe_hipe_compile(),
ok = ensure_working_log_handlers(),
- warn_if_hipe_compilation_failed(Success),
+ log_hipe_result(HipeResult),
rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index b0a9c0d807..41c54b07a2 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -19,7 +19,7 @@
-include("rabbit.hrl").
-export([check_user_pass_login/2, check_user_login/2, check_user_loopback/2,
- check_vhost_access/2, check_resource_access/3]).
+ check_vhost_access/3, check_resource_access/3]).
%%----------------------------------------------------------------------------
@@ -31,15 +31,17 @@
-spec(check_user_pass_login/2 ::
(rabbit_types:username(), rabbit_types:password())
- -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
+ -> {'ok', rabbit_types:user()} |
+ {'refused', rabbit_types:username(), string(), [any()]}).
-spec(check_user_login/2 ::
(rabbit_types:username(), [{atom(), any()}])
- -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
+ -> {'ok', rabbit_types:user()} |
+ {'refused', rabbit_types:username(), string(), [any()]}).
-spec(check_user_loopback/2 :: (rabbit_types:username(),
rabbit_net:socket() | inet:ip_address())
-> 'ok' | 'not_allowed').
--spec(check_vhost_access/2 ::
- (rabbit_types:user(), rabbit_types:vhost())
+-spec(check_vhost_access/3 ::
+ (rabbit_types:user(), rabbit_types:vhost(), rabbit_net:socket())
-> 'ok' | rabbit_types:channel_exit()).
-spec(check_resource_access/3 ::
(rabbit_types:user(), rabbit_types:r(atom()), permission_atom())
@@ -55,36 +57,71 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
{ok, Modules} = application:get_env(rabbit, auth_backends),
R = lists:foldl(
- fun ({ModN, ModZ}, {refused, _, _}) ->
+ fun ({ModN, ModZs0}, {refused, _, _, _}) ->
+ ModZs = case ModZs0 of
+ A when is_atom(A) -> [A];
+ L when is_list(L) -> L
+ end,
%% Different modules for authN vs authZ. So authenticate
%% with authN module, then if that succeeds do
- %% passwordless (i.e pre-authenticated) login with authZ
- %% module, and use the #user{} the latter gives us.
- case try_login(ModN, Username, AuthProps) of
- {ok, _} -> try_login(ModZ, Username, []);
- Else -> Else
+ %% passwordless (i.e pre-authenticated) login with authZ.
+ case try_authenticate(ModN, Username, AuthProps) of
+ {ok, ModNUser = #auth_user{username = Username2}} ->
+ user(ModNUser, try_authorize(ModZs, Username2));
+ Else ->
+ Else
end;
- (Mod, {refused, _, _}) ->
+ (Mod, {refused, _, _, _}) ->
%% Same module for authN and authZ. Just take the result
%% it gives us
- try_login(Mod, Username, AuthProps);
+ case try_authenticate(Mod, Username, AuthProps) of
+ {ok, ModNUser = #auth_user{impl = Impl}} ->
+ user(ModNUser, {ok, [{Mod, Impl}]});
+ Else ->
+ Else
+ end;
(_, {ok, User}) ->
%% We've successfully authenticated. Skip to the end...
{ok, User}
- end, {refused, "No modules checked '~s'", [Username]}, Modules),
- rabbit_event:notify(case R of
- {ok, _User} -> user_authentication_success;
- _ -> user_authentication_failure
- end, [{name, Username}]),
+ end,
+ {refused, Username, "No modules checked '~s'", [Username]}, Modules),
R.
-try_login(Module, Username, AuthProps) ->
- case Module:check_user_login(Username, AuthProps) of
- {error, E} -> {refused, "~s failed authenticating ~s: ~p~n",
- [Module, Username, E]};
- Else -> Else
+try_authenticate(Module, Username, AuthProps) ->
+ case Module:user_login_authentication(Username, AuthProps) of
+ {ok, AuthUser} -> {ok, AuthUser};
+ {error, E} -> {refused, Username,
+ "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ {refused, F, A} -> {refused, Username, F, A}
end.
+try_authorize(Modules, Username) ->
+ lists:foldr(
+ fun (Module, {ok, ModsImpls}) ->
+ case Module:user_login_authorization(Username) of
+ {ok, Impl} -> {ok, [{Module, Impl} | ModsImpls]};
+ {error, E} -> {refused, Username,
+ "~s failed authorizing ~s: ~p~n",
+ [Module, Username, E]};
+ {refused, F, A} -> {refused, Username, F, A}
+ end;
+ (_, {refused, F, A}) ->
+ {refused, Username, F, A}
+ end, {ok, []}, Modules).
+
+user(#auth_user{username = Username, tags = Tags}, {ok, ModZImpls}) ->
+ {ok, #user{username = Username,
+ tags = Tags,
+ authz_backends = ModZImpls}};
+user(_AuthUser, Error) ->
+ Error.
+
+auth_user(#user{username = Username, tags = Tags}, Impl) ->
+ #auth_user{username = Username,
+ tags = Tags,
+ impl = Impl}.
+
check_user_loopback(Username, SockOrAddr) ->
{ok, Users} = application:get_env(rabbit, loopback_users),
case rabbit_net:is_loopback(SockOrAddr)
@@ -93,29 +130,38 @@ check_user_loopback(Username, SockOrAddr) ->
false -> not_allowed
end.
-check_vhost_access(User = #user{ username = Username,
- auth_backend = Module }, VHostPath) ->
- check_access(
- fun() ->
- %% TODO this could be an andalso shortcut under >R13A
- case rabbit_vhost:exists(VHostPath) of
- false -> false;
- true -> Module:check_vhost_access(User, VHostPath)
- end
- end,
- Module, "access to vhost '~s' refused for user '~s'",
- [VHostPath, Username]).
+check_vhost_access(User = #user{username = Username,
+ authz_backends = Modules}, VHostPath, Sock) ->
+ lists:foldl(
+ fun({Mod, Impl}, ok) ->
+ check_access(
+ fun() ->
+ rabbit_vhost:exists(VHostPath) andalso
+ Mod:check_vhost_access(
+ auth_user(User, Impl), VHostPath, Sock)
+ end,
+ Mod, "access to vhost '~s' refused for user '~s'",
+ [VHostPath, Username]);
+ (_, Else) ->
+ Else
+ end, ok, Modules).
check_resource_access(User, R = #resource{kind = exchange, name = <<"">>},
Permission) ->
check_resource_access(User, R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(User = #user{username = Username, auth_backend = Module},
+check_resource_access(User = #user{username = Username,
+ authz_backends = Modules},
Resource, Permission) ->
- check_access(
- fun() -> Module:check_resource_access(User, Resource, Permission) end,
- Module, "access to ~s refused for user '~s'",
- [rabbit_misc:rs(Resource), Username]).
+ lists:foldl(
+ fun({Module, Impl}, ok) ->
+ check_access(
+ fun() -> Module:check_resource_access(
+ auth_user(User, Impl), Resource, Permission) end,
+ Module, "access to ~s refused for user '~s'",
+ [rabbit_misc:rs(Resource), Username]);
+ (_, Else) -> Else
+ end, ok, Modules).
check_access(Fun, Module, ErrStr, ErrArgs) ->
Allow = case Fun() of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0dfca854af..ca5eb34874 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -23,7 +23,7 @@
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
@@ -149,8 +149,6 @@
-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
qpids()).
--spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
- qpids()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -623,10 +621,6 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }) ->
purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
-deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
-
-deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-
requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
@@ -816,15 +810,20 @@ immutable(Q) -> Q#amqqueue{pid = none,
decorators = none,
state = none}.
-deliver([], _Delivery, _Flow) ->
+deliver([], _Delivery) ->
%% /dev/null optimisation
[];
-deliver(Qs, Delivery, Flow) ->
+deliver(Qs, Delivery = #delivery{flow = Flow}) ->
{MPids, SPids} = qpids(Qs),
QPids = MPids ++ SPids,
+ %% We use up two credits to send to a slave since the message
+ %% arrives at the slave from two directions. We will ack one when
+ %% the slave receives the message direct from the channel, and the
+ %% other when it receives it via GM.
case Flow of
- flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ flow -> [credit_flow:send(QPid) || QPid <- QPids],
+ [credit_flow:send(QPid) || QPid <- SPids];
noflow -> ok
end,
@@ -833,8 +832,8 @@ deliver(Qs, Delivery, Flow) ->
%% after they have become master they should mark the message as
%% 'delivered' since they do not know what the master may have
%% done with it.
- MMsg = {deliver, Delivery, false, Flow},
- SMsg = {deliver, Delivery, true, Flow},
+ MMsg = {deliver, Delivery, false},
+ SMsg = {deliver, Delivery, true},
delegate:cast(MPids, MMsg),
delegate:cast(SPids, SMsg),
QPids.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a18df22522..1b9427da1f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -498,12 +498,13 @@ send_mandatory(#delivery{mandatory = true,
discard(#delivery{confirm = Confirm,
sender = SenderPid,
+ flow = Flow,
message = #basic_message{id = MsgId}}, BQ, BQS, MTC) ->
MTC1 = case Confirm of
true -> confirm_messages([MsgId], MTC);
false -> MTC
end,
- BQS1 = BQ:discard(MsgId, SenderPid, BQS),
+ BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
{BQS1, MTC1}.
run_message_queue(State) -> run_message_queue(false, State).
@@ -525,14 +526,17 @@ run_message_queue(ActiveConsumersChanged, State) ->
end
end.
-attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
+attempt_delivery(Delivery = #delivery{sender = SenderPid,
+ flow = Flow,
+ message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
case rabbit_queue_consumers:deliver(
fun (true) -> true = BQ:is_empty(BQS),
- {AckTag, BQS1} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS),
+ {AckTag, BQS1} =
+ BQ:publish_delivered(
+ Message, Props, SenderPid, Flow, BQS),
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
@@ -549,7 +553,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
State#q{consumers = Consumers})}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ sender = SenderPid,
+ flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
@@ -570,7 +576,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
- BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
+ BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
QLen = BQ:len(BQS4),
@@ -1100,15 +1106,23 @@ handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ flow = Flow}, SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
+ case SlaveWhenPublished of
+ true -> credit_flow:ack(Sender); %% [0]
+ false -> ok
+ end,
pmon:monitor(Sender, Senders);
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, Delivered, State1));
+ noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
+%% [0] The second ack is since the channel thought we were a slave at
+%% the time it published this message, so it used two credits (see
+%% rabbit_amqqueue:deliver/2).
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(ack(AckTags, ChPid, State));
diff --git a/src/rabbit_auth_backend_dummy.erl b/src/rabbit_auth_backend_dummy.erl
index 5daca368eb..d2f07c1d57 100644
--- a/src/rabbit_auth_backend_dummy.erl
+++ b/src/rabbit_auth_backend_dummy.erl
@@ -17,11 +17,12 @@
-module(rabbit_auth_backend_dummy).
-include("rabbit.hrl").
--behaviour(rabbit_auth_backend).
+-behaviour(rabbit_authn_backend).
+-behaviour(rabbit_authz_backend).
--export([description/0]).
-export([user/0]).
--export([check_user_login/2, check_vhost_access/2, check_resource_access/3]).
+-export([user_login_authentication/2, user_login_authorization/1,
+ check_vhost_access/3, check_resource_access/3]).
-ifdef(use_specs).
@@ -31,19 +32,17 @@
%% A user to be used by the direct client when permission checks are
%% not needed. This user can do anything AMQPish.
-user() -> #user{username = <<"none">>,
- tags = [],
- auth_backend = ?MODULE,
- impl = none}.
+user() -> #user{username = <<"none">>,
+ tags = [],
+ authz_backends = [{?MODULE, none}]}.
%% Implementation of rabbit_auth_backend
-description() ->
- [{name, <<"Dummy">>},
- {description, <<"Database for the dummy user">>}].
+user_login_authentication(_, _) ->
+ {refused, "cannot log in conventionally as dummy user", []}.
-check_user_login(_, _) ->
+user_login_authorization(_) ->
{refused, "cannot log in conventionally as dummy user", []}.
-check_vhost_access(#user{}, _VHostPath) -> true.
-check_resource_access(#user{}, #resource{}, _Permission) -> true.
+check_vhost_access(#auth_user{}, _VHostPath, _Sock) -> true.
+check_resource_access(#auth_user{}, #resource{}, _Permission) -> true.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index fd1c4e8ee6..20a5766d20 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -17,10 +17,11 @@
-module(rabbit_auth_backend_internal).
-include("rabbit.hrl").
--behaviour(rabbit_auth_backend).
+-behaviour(rabbit_authn_backend).
+-behaviour(rabbit_authz_backend).
--export([description/0]).
--export([check_user_login/2, check_vhost_access/2, check_resource_access/3]).
+-export([user_login_authentication/2, user_login_authorization/1,
+ check_vhost_access/3, check_resource_access/3]).
-export([add_user/2, delete_user/1, lookup_user/1,
change_password/2, clear_password/1,
@@ -76,13 +77,9 @@
%%----------------------------------------------------------------------------
%% Implementation of rabbit_auth_backend
-description() ->
- [{name, <<"Internal">>},
- {description, <<"Internal user / password database">>}].
-
-check_user_login(Username, []) ->
+user_login_authentication(Username, []) ->
internal_check_user_login(Username, fun(_) -> true end);
-check_user_login(Username, [{password, Cleartext}]) ->
+user_login_authentication(Username, [{password, Cleartext}]) ->
internal_check_user_login(
Username,
fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) ->
@@ -90,25 +87,30 @@ check_user_login(Username, [{password, Cleartext}]) ->
(#internal_user{}) ->
false
end);
-check_user_login(Username, AuthProps) ->
+user_login_authentication(Username, AuthProps) ->
exit({unknown_auth_props, Username, AuthProps}).
+user_login_authorization(Username) ->
+ case user_login_authentication(Username, []) of
+ {ok, #auth_user{impl = Impl}} -> {ok, Impl};
+ Else -> Else
+ end.
+
internal_check_user_login(Username, Fun) ->
Refused = {refused, "user '~s' - invalid credentials", [Username]},
case lookup_user(Username) of
{ok, User = #internal_user{tags = Tags}} ->
case Fun(User) of
- true -> {ok, #user{username = Username,
- tags = Tags,
- auth_backend = ?MODULE,
- impl = User}};
+ true -> {ok, #auth_user{username = Username,
+ tags = Tags,
+ impl = none}};
_ -> Refused
end;
{error, not_found} ->
Refused
end.
-check_vhost_access(#user{username = Username}, VHostPath) ->
+check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) ->
case mnesia:dirty_read({rabbit_user_permission,
#user_vhost{username = Username,
virtual_host = VHostPath}}) of
@@ -116,7 +118,7 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[_R] -> true
end.
-check_resource_access(#user{username = Username},
+check_resource_access(#auth_user{username = Username},
#resource{virtual_host = VHostPath, name = Name},
Permission) ->
case mnesia:dirty_read({rabbit_user_permission,
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index d11af09552..c8e23a75d5 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -36,13 +36,13 @@
%% Another round is needed. Here's the state I want next time.
%% {protocol_error, Msg, Args}
%% Client got the protocol wrong. Log and die.
-%% {refused, Msg, Args}
+%% {refused, Username, Msg, Args}
%% Client failed authentication. Log and die.
-callback handle_response(binary(), any()) ->
{'ok', rabbit_types:user()} |
{'challenge', binary(), any()} |
{'protocol_error', string(), [any()]} |
- {'refused', string(), [any()]}.
+ {'refused', rabbit_types:username() | none, string(), [any()]}.
-else.
diff --git a/src/rabbit_authn_backend.erl b/src/rabbit_authn_backend.erl
new file mode 100644
index 0000000000..cfc3f5db51
--- /dev/null
+++ b/src/rabbit_authn_backend.erl
@@ -0,0 +1,49 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_authn_backend).
+
+-include("rabbit.hrl").
+
+-ifdef(use_specs).
+
+%% Check a user can log in, given a username and a proplist of
+%% authentication information (e.g. [{password, Password}]). If your
+%% backend is not to be used for authentication, this should always
+%% refuse access.
+%%
+%% Possible responses:
+%% {ok, User}
+%% Authentication succeeded, and here's the user record.
+%% {error, Error}
+%% Something went wrong. Log and die.
+%% {refused, Msg, Args}
+%% Client failed authentication. Log and die.
+-callback user_login_authentication(rabbit_types:username(), [term()]) ->
+ {'ok', rabbit_types:auth_user()} |
+ {'refused', string(), [any()]} |
+ {'error', any()}.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{user_login_authentication, 2}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_authz_backend.erl
index a7dd6494b1..ff5f014e37 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_authz_backend.erl
@@ -14,47 +14,49 @@
%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
%%
--module(rabbit_auth_backend).
+-module(rabbit_authz_backend).
--ifdef(use_specs).
+-include("rabbit.hrl").
-%% A description proplist as with auth mechanisms,
-%% exchanges. Currently unused.
--callback description() -> [proplists:property()].
+-ifdef(use_specs).
-%% Check a user can log in, given a username and a proplist of
-%% authentication information (e.g. [{password, Password}]).
+%% Check a user can log in, when this backend is being used for
+%% authorisation only. Authentication has already taken place
+%% successfully, but we need to check that the user exists in this
+%% backend, and initialise any impl field we will want to have passed
+%% back in future calls to check_vhost_access/3 and
+%% check_resource_access/3.
%%
%% Possible responses:
-%% {ok, User}
-%% Authentication succeeded, and here's the user record.
+%% {ok, Impl}
+%% User authorisation succeeded, and here's the impl field.
%% {error, Error}
%% Something went wrong. Log and die.
%% {refused, Msg, Args}
-%% Client failed authentication. Log and die.
--callback check_user_login(rabbit_types:username(), [term()]) ->
- {'ok', rabbit_types:user()} |
+%% User authorisation failed. Log and die.
+-callback user_login_authorization(rabbit_types:username()) ->
+ {'ok', any()} |
{'refused', string(), [any()]} |
{'error', any()}.
-%% Given #user and vhost, can a user log in to a vhost?
+%% Given #auth_user and vhost, can a user log in to a vhost?
%% Possible responses:
%% true
%% false
%% {error, Error}
%% Something went wrong. Log and die.
--callback check_vhost_access(rabbit_types:user(), rabbit_types:vhost()) ->
+-callback check_vhost_access(rabbit_types:auth_user(),
+ rabbit_types:vhost(), rabbit_net:socket()) ->
boolean() | {'error', any()}.
-
-%% Given #user, resource and permission, can a user access a resource?
+%% Given #auth_user, resource and permission, can a user access a resource?
%%
%% Possible responses:
%% true
%% false
%% {error, Error}
%% Something went wrong. Log and die.
--callback check_resource_access(rabbit_types:user(),
+-callback check_resource_access(rabbit_types:auth_user(),
rabbit_types:r(atom()),
rabbit_access_control:permission_atom()) ->
boolean() | {'error', any()}.
@@ -64,8 +66,8 @@
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {check_user_login, 2}, {check_vhost_access, 2},
- {check_resource_access, 3}];
+ [{user_login_authorization, 1},
+ {check_vhost_access, 3}, {check_resource_access, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 4ce133c3e3..0d00ced756 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -30,6 +30,7 @@
-type(ack() :: any()).
-type(state() :: any()).
+-type(flow() :: 'flow' | 'noflow').
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
@@ -99,19 +100,20 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), boolean(), pid(),
+ rabbit_types:message_properties(), boolean(), pid(), flow(),
state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
-callback publish_delivered(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
+ rabbit_types:message_properties(), pid(), flow(),
+ state())
-> {ack(), state()}.
%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
--callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
+-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
@@ -249,8 +251,8 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 6},
+ {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 67109e7d5a..cd2846c00b 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -114,7 +114,7 @@ publish(X, Delivery) ->
delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
- message = Message, msg_seq_no = MsgSeqNo}.
+ message = Message, msg_seq_no = MsgSeqNo, flow = noflow}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
build_content(Properties, [BodyBin]);
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 3ab82cad78..ee8147f45f 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -41,48 +41,90 @@
%% parse_table supports the AMQP 0-8/0-9 standard types, S, I, D, T
%% and F, as well as the QPid extensions b, d, f, l, s, t, x, and V.
+-define(SIMPLE_PARSE_TABLE(BType, Pattern, RType),
+ parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ BType, Pattern, Rest/binary>>) ->
+ [{NameString, RType, Value} | parse_table(Rest)]).
+
+%% Note that we try to put these in approximately the order we expect
+%% to hit them, that's why the empty binary is half way through.
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, longstr, Value} | parse_table(Rest)];
+
+?SIMPLE_PARSE_TABLE($I, Value:32/signed, signedint);
+?SIMPLE_PARSE_TABLE($T, Value:64/unsigned, timestamp);
+
parse_table(<<>>) ->
[];
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, ValueAndRest/binary>>) ->
- {Type, Value, Rest} = parse_field_value(ValueAndRest),
- [{NameString, Type, Value} | parse_table(Rest)].
-parse_array(<<>>) ->
- [];
-parse_array(<<ValueAndRest/binary>>) ->
- {Type, Value, Rest} = parse_field_value(ValueAndRest),
- [{Type, Value} | parse_array(Rest)].
+?SIMPLE_PARSE_TABLE($b, Value:8/signed, byte);
+?SIMPLE_PARSE_TABLE($d, Value:64/float, double);
+?SIMPLE_PARSE_TABLE($f, Value:32/float, float);
+?SIMPLE_PARSE_TABLE($l, Value:64/signed, long);
+?SIMPLE_PARSE_TABLE($s, Value:16/signed, short);
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $t, Value:8/unsigned, Rest/binary>>) ->
+ [{NameString, bool, (Value /= 0)} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
+ [{NameString, decimal, {Before, After}} | parse_table(Rest)];
-parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
- {longstr, V, R};
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, table, parse_table(Value)} | parse_table(Rest)];
-parse_field_value(<<$I, V:32/signed, R/binary>>) ->
- {signedint, V, R};
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, array, parse_array(Value)} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, binary, Value} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $V, Rest/binary>>) ->
+ [{NameString, void, undefined} | parse_table(Rest)].
+
+-define(SIMPLE_PARSE_ARRAY(BType, Pattern, RType),
+ parse_array(<<BType, Pattern, Rest/binary>>) ->
+ [{RType, Value} | parse_array(Rest)]).
+
+parse_array(<<$S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{longstr, Value} | parse_array(Rest)];
+
+?SIMPLE_PARSE_ARRAY($I, Value:32/signed, signedint);
+?SIMPLE_PARSE_ARRAY($T, Value:64/unsigned, timestamp);
+
+parse_array(<<>>) ->
+ [];
-parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) ->
- {decimal, {Before, After}, R};
+?SIMPLE_PARSE_ARRAY($b, Value:8/signed, byte);
+?SIMPLE_PARSE_ARRAY($d, Value:64/float, double);
+?SIMPLE_PARSE_ARRAY($f, Value:32/float, float);
+?SIMPLE_PARSE_ARRAY($l, Value:64/signed, long);
+?SIMPLE_PARSE_ARRAY($s, Value:16/signed, short);
-parse_field_value(<<$T, V:64/unsigned, R/binary>>) ->
- {timestamp, V, R};
+parse_array(<<$t, Value:8/unsigned, Rest/binary>>) ->
+ [{bool, (Value /= 0)} | parse_array(Rest)];
-parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
- {table, parse_table(Table), R};
+parse_array(<<$D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
+ [{decimal, {Before, After}} | parse_array(Rest)];
-parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
- {array, parse_array(Array), R};
+parse_array(<<$F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{table, parse_table(Value)} | parse_array(Rest)];
-parse_field_value(<<$b, V:8/signed, R/binary>>) -> {byte, V, R};
-parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R};
-parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R};
-parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R};
-parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R};
-parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
+parse_array(<<$A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{array, parse_array(Value)} | parse_array(Rest)];
-parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
- {binary, V, R};
+parse_array(<<$x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{binary, Value} | parse_array(Rest)];
-parse_field_value(<<$V, R/binary>>) ->
- {void, undefined, R}.
+parse_array(<<$V, Rest/binary>>) ->
+ [{void, undefined} | parse_array(Rest)].
ensure_content_decoded(Content = #content{properties = Props})
when Props =/= none ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8632e1b3f7..3450fd4e91 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -581,7 +581,8 @@ check_user_id_header(#'P_basic'{user_id = Username},
#ch{user = #user{username = Username}}) ->
ok;
check_user_id_header(
- #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) ->
+ #'P_basic'{}, #ch{user = #user{authz_backends =
+ [{rabbit_auth_backend_dummy, _}]}}) ->
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
#ch{user = #user{username = Actual,
@@ -794,7 +795,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Delivery = rabbit_basic:delivery(
Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
- DQ = {Delivery, QNames},
+ DQ = {Delivery#delivery{flow = flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
{Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
@@ -1665,7 +1666,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
DelQNames}, State = #ch{queue_names = QNames,
queue_monitors = QMons}) ->
Qs = rabbit_amqqueue:lookup(DelQNames),
- DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
%% The pmon:monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 47505b3df3..cf7b608399 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -17,7 +17,8 @@
-module(rabbit_cli).
-include("rabbit_cli.hrl").
--export([main/3, parse_arguments/4, rpc_call/4]).
+-export([main/3, start_distribution/0, start_distribution/1,
+ parse_arguments/4, rpc_call/4]).
%%----------------------------------------------------------------------------
@@ -31,6 +32,8 @@
-spec(main/3 :: (fun (([string()], string()) -> parse_result()),
fun ((atom(), atom(), [any()], [any()]) -> any()),
atom()) -> no_return()).
+-spec(start_distribution/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_distribution/1 :: (string()) -> {'ok', pid()} | {'error', any()}).
-spec(usage/1 :: (atom()) -> no_return()).
-spec(parse_arguments/4 ::
([{atom(), [{string(), optdef()}]} | atom()],
@@ -42,6 +45,8 @@
%%----------------------------------------------------------------------------
main(ParseFun, DoFun, UsageMod) ->
+ error_logger:tty(false),
+ start_distribution(),
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{Command, Opts, Args} =
case ParseFun(init:get_plain_arguments(), NodeStr) of
@@ -101,6 +106,20 @@ main(ParseFun, DoFun, UsageMod) ->
rabbit_misc:quit(2)
end.
+start_distribution() ->
+ start_distribution(list_to_atom(
+ rabbit_misc:format("rabbitmq-cli-~s", [os:getpid()]))).
+
+start_distribution(Name) ->
+ rabbit_nodes:ensure_epmd(),
+ net_kernel:start([Name, name_type()]).
+
+name_type() ->
+ case os:getenv("RABBITMQ_USE_LONGNAME") of
+ "true" -> longnames;
+ _ -> shortnames
+ end.
+
usage(Mod) ->
io:format("~s", [Mod:usage()]),
rabbit_misc:quit(1).
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index a931eef009..751dbd298d 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -19,7 +19,7 @@
-include("rabbit_cli.hrl").
-export([start/0, stop/0, parse_arguments/2, action/5,
- sync_queue/1, cancel_sync_queue/1]).
+ sync_queue/1, cancel_sync_queue/1, become/1]).
-import(rabbit_cli, [rpc_call/4]).
@@ -40,6 +40,7 @@
change_cluster_node_type,
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
+ rename_cluster_node,
force_boot,
cluster_status,
{sync_queue, [?VHOST_DEF]},
@@ -104,8 +105,8 @@
-define(COMMANDS_NOT_REQUIRING_APP,
[stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
join_cluster, change_cluster_node_type, update_cluster_nodes,
- forget_cluster_node, cluster_status, status, environment, eval,
- force_boot]).
+ forget_cluster_node, rename_cluster_node, cluster_status, status,
+ environment, eval, force_boot]).
%%----------------------------------------------------------------------------
@@ -123,7 +124,6 @@
%%----------------------------------------------------------------------------
start() ->
- start_distribution(),
rabbit_cli:main(
fun (Args, NodeStr) ->
parse_arguments(Args, NodeStr)
@@ -234,6 +234,13 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
[ClusterNode, false])
end;
+action(rename_cluster_node, Node, NodesS, _Opts, Inform) ->
+ Nodes = split_list([list_to_atom(N) || N <- NodesS]),
+ Inform("Renaming cluster nodes:~n~s~n",
+ [lists:flatten([rabbit_misc:format(" ~s -> ~s~n", [F, T]) ||
+ {F, T} <- Nodes])]),
+ rabbit_mnesia_rename:rename(Node, Nodes);
+
action(force_boot, Node, [], _Opts, Inform) ->
Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]),
case rabbit:is_running(Node) of
@@ -586,28 +593,18 @@ exit_loop(Port) ->
{Port, _} -> exit_loop(Port)
end.
-start_distribution() ->
- CtlNodeName = rabbit_misc:format("rabbitmqctl-~s", [os:getpid()]),
- {ok, _} = net_kernel:start([list_to_atom(CtlNodeName), name_type()]).
-
become(BecomeNode) ->
+ error_logger:tty(false),
+ ok = net_kernel:stop(),
case net_adm:ping(BecomeNode) of
pong -> exit({node_running, BecomeNode});
pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]),
- error_logger:tty(false),
- ok = net_kernel:stop(),
- {ok, _} = net_kernel:start([BecomeNode, name_type()]),
+ {ok, _} = rabbit_cli:start_distribution(BecomeNode),
io:format(" done~n", []),
Dir = mnesia:system_info(directory),
io:format(" * Mnesia directory : ~s~n", [Dir])
end.
-name_type() ->
- case os:getenv("RABBITMQ_USE_LONGNAME") of
- "true" -> longnames;
- _ -> shortnames
- end.
-
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
@@ -720,3 +717,7 @@ prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value);
prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) ||
{T, V} <- Value];
prettify_typed_amqp_value(_Type, Value) -> Value.
+
+split_list([]) -> [];
+split_list([_]) -> exit(even_list_needed);
+split_list([A, B | T]) -> [{A, B} | split_list(T)].
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 749a67b1d5..11233e7eb8 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -83,16 +83,27 @@ connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
true -> case AuthFun() of
- {ok, User} ->
+ {ok, User = #user{username = Username}} ->
+ notify_auth_result(Username,
+ user_authentication_success, []),
connect1(User, VHost, Protocol, Pid, Infos);
- {refused, _M, _A} ->
+ {refused, Username, Msg, Args} ->
+ notify_auth_result(Username,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}]),
{error, {auth_failure, "Refused"}}
end;
false -> {error, broker_not_found_on_node}
end.
+notify_auth_result(Username, AuthResult, ExtraProps) ->
+ EventProps = [{connection_type, direct},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
connect1(User, VHost, Protocol, Pid, Infos) ->
- try rabbit_access_control:check_vhost_access(User, VHost) of
+ try rabbit_access_control:check_vhost_access(User, VHost, undefined) of
ok -> ok = pg_local:join(rabbit_direct, Pid),
rabbit_event:notify(connection_created, Infos),
{ok, {User, rabbit_reader:server_properties(Protocol)}}
diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl
new file mode 100644
index 0000000000..261554921a
--- /dev/null
+++ b/src/rabbit_epmd_monitor.erl
@@ -0,0 +1,101 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_epmd_monitor).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-record(state, {timer, mod, me, host, port}).
+
+-define(SERVER, ?MODULE).
+-define(CHECK_FREQUENCY, 60000).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% It's possible for epmd to be killed out from underneath us. If that
+%% happens, then obviously clustering and rabbitmqctl stop
+%% working. This process checks up on epmd and restarts it /
+%% re-registers us with it if it has gone away.
+%%
+%% How could epmd be killed?
+%%
+%% 1) The most popular way for this to happen is when running as a
+%% Windows service. The user starts rabbitmqctl first, and this starts
+%% epmd under the user's account. When they log out epmd is killed.
+%%
+%% 2) Some packagings of (non-RabbitMQ?) Erlang apps might do "killall
+%% epmd" as a shutdown or uninstall step.
+%% ----------------------------------------------------------------------------
+
+start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+init([]) ->
+ {Me, Host} = rabbit_nodes:parts(node()),
+ Mod = net_kernel:epmd_module(),
+ {port, Port, _Version} = Mod:port_please(Me, Host),
+ {ok, ensure_timer(#state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = Port})}.
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(check, State) ->
+ check_epmd(State),
+ {noreply, ensure_timer(State#state{timer = undefined})};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+ensure_timer(State) ->
+ rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check).
+
+check_epmd(#state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = Port}) ->
+ case Mod:port_please(Me, Host) of
+ noport -> rabbit_log:warning(
+ "epmd does not know us, re-registering ~s at port ~b~n",
+ [Me, Port]),
+ rabbit_nodes:ensure_epmd(),
+ erl_epmd:register_node(Me, Port);
+ _ -> ok
+ end.
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index e05ef05af4..6f0865b300 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -16,7 +16,8 @@
-module(rabbit_log).
--export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
+-export([log/3, log/4, debug/1, debug/2, info/1, info/2, warning/1,
+ warning/2, error/1, error/2]).
-export([with_local_io/1]).
%%----------------------------------------------------------------------------
@@ -26,11 +27,13 @@
-export_type([level/0]).
-type(category() :: atom()).
--type(level() :: 'info' | 'warning' | 'error').
+-type(level() :: 'debug' | 'info' | 'warning' | 'error').
-spec(log/3 :: (category(), level(), string()) -> 'ok').
-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
+-spec(debug/1 :: (string()) -> 'ok').
+-spec(debug/2 :: (string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -50,6 +53,7 @@ log(Category, Level, Fmt, Args) when is_list(Args) ->
case level(Level) =< catlevel(Category) of
false -> ok;
true -> F = case Level of
+ debug -> fun error_logger:info_msg/2;
info -> fun error_logger:info_msg/2;
warning -> fun error_logger:warning_msg/2;
error -> fun error_logger:error_msg/2
@@ -57,6 +61,8 @@ log(Category, Level, Fmt, Args) when is_list(Args) ->
with_local_io(fun () -> F(Fmt, Args) end)
end.
+debug(Fmt) -> log(default, debug, Fmt).
+debug(Fmt, Args) -> log(default, debug, Fmt, Args).
info(Fmt) -> log(default, info, Fmt).
info(Fmt, Args) -> log(default, info, Fmt, Args).
warning(Fmt) -> log(default, warning, Fmt).
@@ -75,6 +81,7 @@ catlevel(Category) ->
%%--------------------------------------------------------------------
+level(debug) -> 4;
level(info) -> 3;
level(warning) -> 2;
level(error) -> 1;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index aa1e1ab9ee..0c05729292 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,8 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, purge_acks/1, publish/5, publish_delivered/4,
- discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ purge/1, purge_acks/1, publish/6, publish_delivered/5,
+ discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
@@ -230,37 +230,38 @@ purge(State = #state { gm = GM,
purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
-publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
- BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
- ChPid, State = #state { gm = GM,
- seen_status = SS,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ ChPid, Flow, State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
- {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
-discard(MsgId, ChPid, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
+discard(MsgId, ChPid, Flow, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
- ensure_monitoring(ChPid, State #state { backing_queue_state =
- BQ:discard(MsgId, ChPid, BQS) }).
+ ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
+ ensure_monitoring(ChPid,
+ State #state { backing_queue_state =
+ BQ:discard(MsgId, ChPid, Flow, BQS) }).
dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 92e1cc2736..2450501a8b 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -246,7 +246,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
State) ->
%% Asynchronous, non-"mandatory", deliver mode.
case Flow of
@@ -628,7 +628,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery#delivery{mandatory = false} || %% [0]
+ Deliveries = [promote_delivery(Delivery) ||
{_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
@@ -640,8 +640,16 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
-%% [0] We reset mandatory to false here because we will have sent the
-%% mandatory_received already as soon as we got the message
+%% We reset mandatory to false here because we will have sent the
+%% mandatory_received already as soon as we got the message. We also
+%% need to send an ack for these messages since the channel is waiting
+%% for one for the via-GM case and we will not now receive one.
+promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
+ Delivery#delivery{mandatory = false}.
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -823,24 +831,27 @@ publish_or_discard(Status, ChPid, MsgId,
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
-process_instruction({publish, ChPid, MsgProps,
+process_instruction({publish, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
-process_instruction({publish_delivered, ChPid, MsgProps,
+process_instruction({publish_delivered, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
true = BQ:is_empty(BQS),
- {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
{ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
-process_instruction({discard, ChPid, MsgId}, State) ->
+process_instruction({discard, ChPid, Flow, MsgId}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(discarded, ChPid, MsgId, State),
- BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
@@ -899,6 +910,9 @@ process_instruction({delete_and_terminate, Reason},
BQ:delete_and_terminate(Reason, BQS),
{stop, State #state { backing_queue_state = undefined }}.
+maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid);
+maybe_flow_ack(_ChPid, noflow) -> ok.
+
msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =
lists:foldl(
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index ee1d210556..9a8d55f94b 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -263,9 +263,10 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
Props1 = Props#message_properties{needs_confirming = false},
{MA1, BQS1} =
case Unacked of
- false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ false -> {MA,
+ BQ:publish(Msg, Props1, true, none, noflow, BQS)};
true -> {AckTag, BQS2} = BQ:publish_delivered(
- Msg, Props1, none, BQS),
+ Msg, Props1, none, noflow, BQS),
{[{Msg#basic_message.id, AckTag} | MA], BQS2}
end,
slave_sync_loop(Args, {MA1, TRef, BQS1});
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3e2c88ee34..626341b5f5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -44,7 +44,8 @@
-export([format/2, format_many/1, format_stderr/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
--export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]).
+-export([pid_to_string/1, string_to_pid/1,
+ pid_change_node/2, node_to_fake_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
@@ -196,6 +197,7 @@
(rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(pid_change_node/2 :: (pid(), node()) -> pid()).
-spec(node_to_fake_pid/1 :: (atom()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
-spec(version_compare/3 ::
@@ -520,8 +522,12 @@ execute_mnesia_transaction(TxFun) ->
Res = mnesia:sync_transaction(TxFun),
DiskLogAfter = mnesia_dumper:get_log_writes(),
case DiskLogAfter == DiskLogBefore of
- true -> Res;
- false -> {sync, Res}
+ true -> file_handle_cache_stats:update(
+ mnesia_ram_tx),
+ Res;
+ false -> file_handle_cache_stats:update(
+ mnesia_disk_tx),
+ {sync, Res}
end;
true -> mnesia:sync_transaction(TxFun)
end
@@ -686,11 +692,7 @@ sort_field_table(Arguments) ->
%% regardless of what node we are running on. The representation also
%% permits easy identification of the pid's node.
pid_to_string(Pid) when is_pid(Pid) ->
- %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
- %% 8.7)
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
- = term_to_binary(Pid),
- Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ {Node, Cre, Id, Ser} = decompose_pid(Pid),
format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]).
%% inverse of above
@@ -701,17 +703,32 @@ string_to_pid(Str) ->
case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, CreStr, IdStr, SerStr]} ->
- <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
[Cre, Id, Ser] = lists:map(fun list_to_integer/1,
[CreStr, IdStr, SerStr]),
- binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>);
+ compose_pid(list_to_atom(NodeStr), Cre, Id, Ser);
nomatch ->
throw(Err)
end.
+pid_change_node(Pid, NewNode) ->
+ {_OldNode, Cre, Id, Ser} = decompose_pid(Pid),
+ compose_pid(NewNode, Cre, Id, Ser).
+
%% node(node_to_fake_pid(Node)) =:= Node.
node_to_fake_pid(Node) ->
- string_to_pid(format("<~s.0.0.0>", [Node])).
+ compose_pid(Node, 0, 0, 0).
+
+decompose_pid(Pid) when is_pid(Pid) ->
+ %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
+ %% 8.7)
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ {Node, Cre, Id, Ser}.
+
+compose_pid(Node, Cre, Id, Ser) ->
+ <<131,NodeEnc/binary>> = term_to_binary(Node),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>).
version_compare(A, B, lte) ->
case version_compare(A, B) of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 80fbcd6835..f9110e5868 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -113,23 +113,29 @@ init() ->
ok.
init_from_config() ->
+ FindBadNodeNames = fun
+ (Name, BadNames) when is_atom(Name) -> BadNames;
+ (Name, BadNames) -> [Name | BadNames]
+ end,
{TryNodes, NodeType} =
case application:get_env(rabbit, cluster_nodes) of
+ {ok, {Nodes, Type} = Config}
+ when is_list(Nodes) andalso (Type == disc orelse Type == ram) ->
+ case lists:foldr(FindBadNodeNames, [], Nodes) of
+ [] -> Config;
+ BadNames -> e({invalid_cluster_node_names, BadNames})
+ end;
+ {ok, {_, BadType}} when BadType /= disc andalso BadType /= ram ->
+ e({invalid_cluster_node_type, BadType});
{ok, Nodes} when is_list(Nodes) ->
- Config = {Nodes -- [node()], case lists:member(node(), Nodes) of
- true -> disc;
- false -> ram
- end},
- rabbit_log:warning(
- "Converting legacy 'cluster_nodes' configuration~n ~w~n"
- "to~n ~w.~n~n"
- "Please update the configuration to the new format "
- "{Nodes, NodeType}, where Nodes contains the nodes that the "
- "node will try to cluster with, and NodeType is either "
- "'disc' or 'ram'~n", [Nodes, Config]),
- Config;
- {ok, Config} ->
- Config
+ %% The legacy syntax (a nodes list without the node
+ %% type) is unsupported.
+ case lists:foldr(FindBadNodeNames, [], Nodes) of
+ [] -> e(cluster_node_type_mandatory);
+ _ -> e(invalid_cluster_nodes_conf)
+ end;
+ {ok, _} ->
+ e(invalid_cluster_nodes_conf)
end,
case TryNodes of
[] -> init_db_and_upgrade([node()], disc, false);
@@ -850,6 +856,20 @@ nodes_excl_me(Nodes) -> Nodes -- [node()].
e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
+error_description({invalid_cluster_node_names, BadNames}) ->
+ "In the 'cluster_nodes' configuration key, the following node names "
+ "are invalid: " ++ lists:flatten(io_lib:format("~p", [BadNames]));
+error_description({invalid_cluster_node_type, BadType}) ->
+ "In the 'cluster_nodes' configuration key, the node type is invalid "
+ "(expected 'disc' or 'ram'): " ++
+ lists:flatten(io_lib:format("~p", [BadType]));
+error_description(cluster_node_type_mandatory) ->
+ "The 'cluster_nodes' configuration key must indicate the node type: "
+ "either {[...], disc} or {[...], ram}";
+error_description(invalid_cluster_nodes_conf) ->
+ "The 'cluster_nodes' configuration key is invalid, it must be of the "
+ "form {[Nodes], Type}, where Nodes is a list of node names and "
+ "Type is either 'disc' or 'ram'";
error_description(clustering_only_disc_node) ->
"You cannot cluster a node if it is the only disc node in its existing "
" cluster. If new nodes joined while this node was offline, use "
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
new file mode 100644
index 0000000000..2787cb743c
--- /dev/null
+++ b/src/rabbit_mnesia_rename.erl
@@ -0,0 +1,267 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_mnesia_rename).
+-include("rabbit.hrl").
+
+-export([rename/2]).
+-export([maybe_finish/1]).
+
+-define(CONVERT_TABLES, [schema, rabbit_durable_queue]).
+
+%% Supports renaming the nodes in the Mnesia database. In order to do
+%% this, we take a backup of the database, traverse the backup
+%% changing node names and pids as we go, then restore it.
+%%
+%% That's enough for a standalone node, for clusters the story is more
+%% complex. We can take pairs of nodes From and To, but backing up and
+%% restoring the database changes schema cookies, so if we just do
+%% this on all nodes the cluster will refuse to re-form with
+%% "Incompatible schema cookies.". Therefore we do something similar
+%% to what we do for upgrades - the first node in the cluster to
+%% restart becomes the authority, and other nodes wipe their own
+%% Mnesia state and rejoin. They also need to tell Mnesia the old node
+%% is not coming back.
+%%
+%% If we are renaming nodes one at a time then the running cluster
+%% might not be aware that a rename has taken place, so after we wipe
+%% and rejoin we then update any tables (in practice just
+%% rabbit_durable_queue) which should be aware that we have changed.
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(rename/2 :: (node(), [{node(), node()}]) -> 'ok').
+-spec(maybe_finish/1 :: ([node()]) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+rename(Node, NodeMapList) ->
+ try
+ %% Check everything is correct and figure out what we are
+ %% changing from and to.
+ {FromNode, ToNode, NodeMap} = prepare(Node, NodeMapList),
+
+ %% We backup and restore Mnesia even if other nodes are
+ %% running at the time, and defer the final decision about
+ %% whether to use our mutated copy or rejoin the cluster until
+ %% we restart. That means we might be mutating our copy of the
+ %% database while the cluster is running. *Do not* contact the
+ %% cluster while this is happening, we are likely to get
+ %% confused.
+ application:set_env(kernel, dist_auto_connect, never),
+
+ %% Take a copy we can restore from if we abandon the
+ %% rename. We don't restore from the "backup" since restoring
+ %% that changes schema cookies and might stop us rejoining the
+ %% cluster.
+ ok = rabbit_mnesia:copy_db(mnesia_copy_dir()),
+
+ %% And make the actual changes
+ rabbit_control_main:become(FromNode),
+ take_backup(before_backup_name()),
+ convert_backup(NodeMap, before_backup_name(), after_backup_name()),
+ ok = rabbit_file:write_term_file(rename_config_name(),
+ [{FromNode, ToNode}]),
+ convert_config_files(NodeMap),
+ rabbit_control_main:become(ToNode),
+ restore_backup(after_backup_name()),
+ ok
+ after
+ stop_mnesia()
+ end.
+
+prepare(Node, NodeMapList) ->
+ %% If we have a previous rename and haven't started since, give up.
+ case rabbit_file:is_dir(dir()) of
+ true -> exit({rename_in_progress,
+ "Restart node under old name to roll back"});
+ false -> ok = rabbit_file:ensure_dir(mnesia_copy_dir())
+ end,
+
+ %% Check we don't have two nodes mapped to the same node
+ {FromNodes, ToNodes} = lists:unzip(NodeMapList),
+ case length(FromNodes) - length(lists:usort(ToNodes)) of
+ 0 -> ok;
+ _ -> exit({duplicate_node, ToNodes})
+ end,
+
+ %% Figure out which node we are before and after the change
+ FromNode = case [From || {From, To} <- NodeMapList,
+ To =:= Node] of
+ [N] -> N;
+ [] -> Node
+ end,
+ NodeMap = dict:from_list(NodeMapList),
+ ToNode = case dict:find(FromNode, NodeMap) of
+ {ok, N2} -> N2;
+ error -> FromNode
+ end,
+
+ %% Check that we are in the cluster, all old nodes are in the
+ %% cluster, and no new nodes are.
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes),
+ lists:member(Node, Nodes ++ ToNodes)} of
+ {[], [], true} -> ok;
+ {[], [], false} -> exit({i_am_not_involved, Node});
+ {F, [], _} -> exit({nodes_not_in_cluster, F});
+ {_, T, _} -> exit({nodes_already_in_cluster, T})
+ end,
+ {FromNode, ToNode, NodeMap}.
+
+take_backup(Backup) ->
+ start_mnesia(),
+ ok = mnesia:backup(Backup),
+ stop_mnesia().
+
+restore_backup(Backup) ->
+ ok = mnesia:install_fallback(Backup, [{scope, local}]),
+ start_mnesia(),
+ stop_mnesia(),
+ rabbit_mnesia:force_load_next_boot().
+
+maybe_finish(AllNodes) ->
+ case rabbit_file:read_term_file(rename_config_name()) of
+ {ok, [{FromNode, ToNode}]} -> finish(FromNode, ToNode, AllNodes);
+ _ -> ok
+ end.
+
+finish(FromNode, ToNode, AllNodes) ->
+ case node() of
+ ToNode ->
+ case rabbit_upgrade:nodes_running(AllNodes) of
+ [] -> finish_primary(FromNode, ToNode);
+ _ -> finish_secondary(FromNode, ToNode, AllNodes)
+ end;
+ FromNode ->
+ rabbit_log:info(
+ "Abandoning rename from ~s to ~s since we are still ~s~n",
+ [FromNode, ToNode, FromNode]),
+ [{ok, _} = file:copy(backup_of_conf(F), F) || F <- config_files()],
+ ok = rabbit_file:recursive_delete([rabbit_mnesia:dir()]),
+ ok = rabbit_file:recursive_copy(
+ mnesia_copy_dir(), rabbit_mnesia:dir()),
+ delete_rename_files();
+ _ ->
+ %% Boot will almost certainly fail but we might as
+ %% well just log this
+ rabbit_log:info(
+ "Rename attempted from ~s to ~s but we are ~s - ignoring.~n",
+ [FromNode, ToNode, node()])
+ end.
+
+finish_primary(FromNode, ToNode) ->
+ rabbit_log:info("Restarting as primary after rename from ~s to ~s~n",
+ [FromNode, ToNode]),
+ delete_rename_files(),
+ ok.
+
+finish_secondary(FromNode, ToNode, AllNodes) ->
+ rabbit_log:info("Restarting as secondary after rename from ~s to ~s~n",
+ [FromNode, ToNode]),
+ rabbit_upgrade:secondary_upgrade(AllNodes),
+ rename_in_running_mnesia(FromNode, ToNode),
+ delete_rename_files(),
+ ok.
+
+dir() -> rabbit_mnesia:dir() ++ "-rename".
+before_backup_name() -> dir() ++ "/backup-before".
+after_backup_name() -> dir() ++ "/backup-after".
+rename_config_name() -> dir() ++ "/pending.config".
+mnesia_copy_dir() -> dir() ++ "/mnesia-copy".
+
+delete_rename_files() -> ok = rabbit_file:recursive_delete([dir()]).
+
+start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ rabbit_table:force_load(),
+ rabbit_table:wait_for_replicated().
+stop_mnesia() -> stopped = mnesia:stop().
+
+convert_backup(NodeMap, FromBackup, ToBackup) ->
+ mnesia:traverse_backup(
+ FromBackup, ToBackup,
+ fun
+ (Row, Acc) ->
+ case lists:member(element(1, Row), ?CONVERT_TABLES) of
+ true -> {[update_term(NodeMap, Row)], Acc};
+ false -> {[Row], Acc}
+ end
+ end, switched).
+
+config_files() ->
+ [rabbit_node_monitor:running_nodes_filename(),
+ rabbit_node_monitor:cluster_status_filename()].
+
+backup_of_conf(Path) ->
+ filename:join([dir(), filename:basename(Path)]).
+
+convert_config_files(NodeMap) ->
+ [convert_config_file(NodeMap, Path) || Path <- config_files()].
+
+convert_config_file(NodeMap, Path) ->
+ {ok, Term} = rabbit_file:read_term_file(Path),
+ {ok, _} = file:copy(Path, backup_of_conf(Path)),
+ ok = rabbit_file:write_term_file(Path, update_term(NodeMap, Term)).
+
+lookup_node(OldNode, NodeMap) ->
+ case dict:find(OldNode, NodeMap) of
+ {ok, NewNode} -> NewNode;
+ error -> OldNode
+ end.
+
+mini_map(FromNode, ToNode) -> dict:from_list([{FromNode, ToNode}]).
+
+update_term(NodeMap, L) when is_list(L) ->
+ [update_term(NodeMap, I) || I <- L];
+update_term(NodeMap, T) when is_tuple(T) ->
+ list_to_tuple(update_term(NodeMap, tuple_to_list(T)));
+update_term(NodeMap, Node) when is_atom(Node) ->
+ lookup_node(Node, NodeMap);
+update_term(NodeMap, Pid) when is_pid(Pid) ->
+ rabbit_misc:pid_change_node(Pid, lookup_node(node(Pid), NodeMap));
+update_term(_NodeMap, Term) ->
+ Term.
+
+rename_in_running_mnesia(FromNode, ToNode) ->
+ All = rabbit_mnesia:cluster_nodes(all),
+ Running = rabbit_mnesia:cluster_nodes(running),
+ case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
+ {false, true} -> ok;
+ {true, _} -> exit({old_node_running, FromNode});
+ {_, false} -> exit({new_node_not_in_cluster, ToNode})
+ end,
+ {atomic, ok} = mnesia:del_table_copy(schema, FromNode),
+ Map = mini_map(FromNode, ToNode),
+ {atomic, _} = transform_table(rabbit_durable_queue, Map),
+ ok.
+
+transform_table(Table, Map) ->
+ mnesia:sync_transaction(
+ fun () ->
+ mnesia:lock({table, Table}, write),
+ transform_table(Table, Map, mnesia:first(Table))
+ end).
+
+transform_table(_Table, _Map, '$end_of_table') ->
+ ok;
+transform_table(Table, Map, Key) ->
+ [Term] = mnesia:read(Table, Key, write),
+ ok = mnesia:write(Table, update_term(Map, Term), write),
+ transform_table(Table, Map, mnesia:next(Table, Key)).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b829ae9476..9e3dd1e7c4 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -473,6 +473,7 @@ write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ file_handle_cache_stats:update(msg_store_read),
%% Check the cur file cache
case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
@@ -507,6 +508,7 @@ server_cast(#client_msstate { server = Server }, Msg) ->
client_write(MsgId, Msg, Flow,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
+ file_handle_cache_stats:update(msg_store_write),
ok = client_update_flying(+1, MsgId, CState),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgId, Flow}).
@@ -1299,7 +1301,8 @@ should_mask_action(CRef, MsgId,
open_file(Dir, FileName, Mode) ->
file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
- [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
+ {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 7f7fcc3126..d3c0e55ff8 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -18,7 +18,7 @@
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
is_running/2, is_process_running/2,
- cluster_name/0, set_cluster_name/1]).
+ cluster_name/0, set_cluster_name/1, ensure_epmd/0]).
-include_lib("kernel/include/inet.hrl").
@@ -41,6 +41,7 @@
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
-spec(cluster_name/0 :: () -> binary()).
-spec(set_cluster_name/1 :: (binary()) -> 'ok').
+-spec(ensure_epmd/0 :: () -> 'ok').
-endif.
@@ -197,3 +198,20 @@ cluster_name_default() ->
set_cluster_name(Name) ->
rabbit_runtime_parameters:set_global(cluster_name, Name).
+
+ensure_epmd() ->
+ {ok, Root} = init:get_argument(root),
+ {ok, Prog} = init:get_argument(progname),
+ ID = random:uniform(1000000000),
+ Port = open_port(
+ {spawn_executable, filename:join([Root, "bin", Prog])},
+ [{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]),
+ "-noshell", "-eval", "halt()."]},
+ exit_status, stderr_to_stdout, use_stdio]),
+ port_shutdown_loop(Port).
+
+port_shutdown_loop(Port) ->
+ receive
+ {Port, {exit_status, _Rc}} -> ok;
+ {Port, _} -> port_shutdown_loop(Port)
+ end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0a2c88d441..c58f00ce0f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,19 +16,27 @@
-module(rabbit_queue_index).
--export([erase/1, init/2, recover/5,
+-export([erase/1, init/3, recover/6,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
--export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
%% The queue index is responsible for recording the order of messages
-%% within a queue on disk.
+%% within a queue on disk. As such it contains records of messages
+%% being published, delivered and acknowledged. The publish record
+%% includes the sequence ID, message ID and a small quantity of
+%% metadata about the message; the delivery and acknowledgement
+%% records just contain the sequence ID. A publish record may also
+%% contain the complete message if provided to publish/5; this allows
+%% the message store to be avoided altogether for small messages. In
+%% either case the publish record is stored in memory in the same
+%% serialised format it will take on disk.
%%
%% Because of the fact that the queue can decide at any point to send
%% a queue entry to disk, you can not rely on publishes appearing in
@@ -36,7 +44,7 @@
%% then delivered, then ack'd.
%%
%% In order to be able to clean up ack'd messages, we write to segment
-%% files. These files have a fixed maximum size: ?SEGMENT_ENTRY_COUNT
+%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT
%% publishes, delivers and acknowledgements. They are numbered, and so
%% it is known that the 0th segment contains messages 0 ->
%% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages
@@ -85,7 +93,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}),
+%% the tuple: {('no_pub'|{IsPersistent, Bin, MsgBin}),
%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
%% necessary for most operations. However, for startup, and to ensure
%% the safe and correct combination of journal entries with entries
@@ -128,8 +136,8 @@
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
-%% of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of
+%% size and then 128 bits of md5sum msg id.
-define(PUB_PREFIX, 1).
-define(PUB_PREFIX_BITS, 1).
@@ -140,32 +148,37 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+%% This is the size of the message body content, for stats
-define(SIZE_BYTES, 4).
-define(SIZE_BITS, (?SIZE_BYTES * 8)).
-%% 16 bytes for md5sum + 8 for expiry + 4 for size
+%% This is the size of the message record embedded in the queue
+%% index. If 0, the message can be found in the message store.
+-define(EMBEDDED_SIZE_BYTES, 4).
+-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry
-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
-%% + 2 for seq, bits and prefix
--define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
+%% + 4 for size
+-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)).
-%% 1 publish, 1 deliver, 1 ack per msg
--define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
+%% + 2 for seq, bits and prefix
+-define(PUB_RECORD_PREFIX_BYTES, 2).
%% ---- misc ----
--define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
+-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin}
-define(READ_MODE, [binary, raw, read]).
--define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
-define(WRITE_MODE, [write | ?READ_MODE]).
%%----------------------------------------------------------------------------
--record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unconfirmed }).
+-record(qistate, {dir, segments, journal_handle, dirty_count,
+ max_journal_entries, on_sync, on_sync_msg,
+ unconfirmed, unconfirmed_msg}).
--record(segment, { num, path, journal_entries, unacked }).
+-record(segment, {num, path, journal_entries, unacked}).
-include("rabbit.hrl").
@@ -174,6 +187,7 @@
-rabbit_upgrade({add_queue_ttl, local, []}).
-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
+-rabbit_upgrade({store_msg, local, [store_msg_size]}).
-ifdef(use_specs).
@@ -193,7 +207,9 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unconfirmed :: gb_sets:set()
+ on_sync_msg :: on_sync_fun(),
+ unconfirmed :: gb_sets:set(),
+ unconfirmed_msg :: gb_sets:set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -201,9 +217,11 @@
-type(shutdown_terms() :: [term()] | 'non_clean_shutdown').
-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok').
--spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
--spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- contains_predicate(), on_sync_fun()) ->
+-spec(init/3 :: (rabbit_amqqueue:name(),
+ on_sync_fun(), on_sync_fun()) -> qistate()).
+-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ contains_predicate(),
+ on_sync_fun(), on_sync_fun()) ->
{'undefined' | non_neg_integer(),
'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
@@ -241,14 +259,17 @@ erase(Name) ->
false -> ok
end.
-init(Name, OnSyncFun) ->
+init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
- State #qistate { on_sync = OnSyncFun }.
+ State#qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
+recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
+ OnSyncFun, OnSyncMsgFun) ->
State = blank_state(Name),
- State1 = State #qistate { on_sync = OnSyncFun },
+ State1 = State #qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun},
CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
@@ -267,26 +288,34 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unconfirmed = Unconfirmed })
- when is_binary(MsgId) ->
+publish(MsgOrId, SeqId, MsgProps, IsPersistent,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- case MsgProps#message_properties.needs_confirming of
- true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
- State #qistate { unconfirmed = Unconfirmed1 };
- false -> State
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
end),
+ file_handle_cache_stats:update(queue_index_journal_write),
+ {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
- SeqId:?SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)]),
+ SeqId:?SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]),
maybe_flush_journal(
- add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
+ add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -302,10 +331,12 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
-needs_sync(#qistate { journal_handle = undefined }) ->
+needs_sync(#qistate{journal_handle = undefined}) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
- case gb_sets:is_empty(UC) of
+needs_sync(#qistate{journal_handle = JournalHdl,
+ unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of
true -> case file_handle_cache:needs_sync(JournalHdl) of
true -> other;
false -> false
@@ -409,7 +440,9 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unconfirmed = gb_sets:new() }.
+ on_sync_msg = fun (_) -> ok end,
+ unconfirmed = gb_sets:new(),
+ unconfirmed_msg = gb_sets:new() }.
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -479,9 +512,10 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack},
+ fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack},
{SegmentAndDirtyCount, Bytes}) ->
- {recover_message(ContainsCheckFun(MsgId), CleanShutdown,
+ {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
+ {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
Del, RelSeq, SegmentAndDirtyCount),
Bytes + case IsPersistent of
true -> MsgProps#message_properties.size;
@@ -541,7 +575,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
+ when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
@@ -555,9 +590,9 @@ scan_segments(Fun, Acc, State) ->
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
IsDelivered, IsAcked}, AccM) ->
- Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
end, Acc, all_segment_nums(State1)),
@@ -568,24 +603,35 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
- size = Size }) ->
- [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
+create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ ExpiryBin = expiry_to_binary(Expiry),
+ case MsgOrId of
+ MsgId when is_binary(MsgId) ->
+ {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, <<>>};
+ #basic_message{id = MsgId} ->
+ MsgBin = term_to_binary(MsgOrId),
+ {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, MsgBin}
+ end.
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
- Size:?SIZE_BITS>>) ->
+ Size:?SIZE_BITS>>, MsgBin) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {MsgId, #message_properties { expiry = Exp,
- size = Size }}.
+ Props = #message_properties{expiry = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ size = Size},
+ case MsgBin of
+ <<>> -> {MsgId, Props};
+ _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin),
+ {Msg, Props}
+ end.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -656,9 +702,13 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
+ _ -> Seg = array:sparse_foldr(
+ fun entry_to_segment/3, [], JEntries),
+ file_handle_cache_stats:update(queue_index_write),
+
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
- array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
+ file_handle_cache:append(Hdl, Seg),
ok = file_handle_cache:close(Hdl),
Segment #segment { journal_entries = array_new() }
end.
@@ -677,10 +727,13 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
%% if you call it more than once on the same state. Assumes the counts
%% are 0 to start with.
load_journal(State = #qistate { dir = Dir }) ->
- case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of
+ Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ case rabbit_file:is_file(Path) of
true -> {JournalHdl, State1} = get_journal_handle(State),
+ Size = rabbit_file:file_size(Path),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
- load_journal_entries(State1);
+ {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size),
+ parse_journal_entries(JournalBin, State1);
false -> State
end.
@@ -704,41 +757,37 @@ recover_journal(State) ->
end, Segments),
State1 #qistate { segments = Segments1 }.
-load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
- case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
- {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} ->
- case Prefix of
- ?DEL_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, del, State));
- ?ACK_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, ack, State));
- _ ->
- case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
- %% Journal entry composed only of zeroes was probably
- %% produced during a dirty shutdown so stop reading
- {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
- State;
- {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
- {MsgId, MsgProps} = parse_pub_record_body(Bin),
- IsPersistent = case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end,
- load_journal_entries(
- add_to_journal(
- SeqId, {MsgId, MsgProps, IsPersistent}, State));
- _ErrOrEoF -> %% err, we've lost at least a publish
- State
- end
- end;
- _ErrOrEoF -> State
- end.
+parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, del, State));
+
+parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, ack, State));
+parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS,
+ 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ State;
+parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary,
+ Rest/binary>>, State) ->
+ IsPersistent = case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end,
+ parse_journal_entries(
+ Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State));
+parse_journal_entries(_ErrOrEoF, State) ->
+ State.
deliver_or_ack(_Kind, [], State) ->
State;
deliver_or_ack(Kind, SeqIds, State) ->
JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end,
{JournalHdl, State1} = get_journal_handle(State),
+ file_handle_cache_stats:update(queue_index_journal_write),
ok = file_handle_cache:append(
JournalHdl,
[<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]),
@@ -746,11 +795,19 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
- case gb_sets:is_empty(UC) of
- true -> State;
- false -> OnSyncFun(UC),
- State #qistate { unconfirmed = gb_sets:new() }
+notify_sync(State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}) ->
+ State1 = case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State#qistate{unconfirmed = gb_sets:new()}
+ end,
+ case gb_sets:is_empty(UCM) of
+ true -> State1;
+ false -> OnSyncMsgFun(UCM),
+ State1#qistate{unconfirmed_msg = gb_sets:new()}
end.
%%----------------------------------------------------------------------------
@@ -823,42 +880,42 @@ segment_nums({Segments, CachedSegments}) ->
segments_new() ->
{dict:new(), []}.
-write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) ->
- Hdl;
-write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
- ok = case Pub of
- no_pub ->
- ok;
- {MsgId, MsgProps, IsPersistent} ->
- file_handle_cache:append(
- Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)])
- end,
- ok = case {Del, Ack} of
- {no_del, no_ack} ->
- ok;
- _ ->
- Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
- file_handle_cache:append(
- Hdl, case {Del, Ack} of
- {del, ack} -> [Binary, Binary];
- _ -> Binary
- end)
- end,
- Hdl.
+entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) ->
+ Buf;
+entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) ->
+ %% NB: we are assembling the segment in reverse order here, so
+ %% del/ack comes first.
+ Buf1 = case {Del, Ack} of
+ {no_del, no_ack} ->
+ Buf;
+ _ ->
+ Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ case {Del, Ack} of
+ {del, ack} -> [[Binary, Binary] | Buf];
+ _ -> [Binary | Buf]
+ end
+ end,
+ case Pub of
+ no_pub ->
+ Buf1;
+ {IsPersistent, Bin, MsgBin} ->
+ [[<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1]
+ end.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
+ Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
- IsPersistent, IsDelivered == del} | Acc ];
+ [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ IsPersistent, IsDelivered == del} | Acc];
(_RelSeq, _Value, Acc) ->
Acc
end, Messages, Segment),
@@ -868,7 +925,11 @@ segment_entries_foldr(Fun, Init,
Segment = #segment { journal_entries = JEntries }) ->
{SegEntries, _UnackedCount} = load_segment(false, Segment),
{SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
- array:sparse_foldr(Fun, Init, SegEntries1).
+ array:sparse_foldr(
+ fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) ->
+ {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
+ Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc)
+ end, Init, SegEntries1).
%% Loading segments
%%
@@ -877,44 +938,48 @@ load_segment(KeepAcked, #segment { path = Path }) ->
Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
false -> Empty;
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
+ true -> Size = rabbit_file:file_size(Path),
+ file_handle_cache_stats:update(queue_index_read),
+ {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
- {ok, SegData} -> load_segment_entries(
- KeepAcked, SegData, Empty);
- eof -> Empty
- end,
+ {ok, SegBin} = file_handle_cache:read(Hdl, Size),
ok = file_handle_cache:close(Hdl),
+ Res = parse_segment_entries(SegBin, KeepAcked, Empty),
Res
end.
-load_segment_entries(KeepAcked,
- <<?PUB_PREFIX:?PUB_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
- PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
- SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
- Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
+parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>,
+ KeepAcked, Acc) ->
+ parse_segment_publish_entry(
+ Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc);
+parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) ->
+ parse_segment_entries(
+ Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+parse_segment_entries(<<>>, _KeepAcked, Acc) ->
+ Acc.
+
+parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS,
+ MsgBin:MsgSize/binary, Rest/binary>>,
+ IsPersistent, RelSeq, KeepAcked,
+ {SegEntries, Unacked}) ->
+ Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
-load_segment_entries(KeepAcked,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, SegData,
- {SegEntries1, UnackedCount + UnackedCountDelta});
-load_segment_entries(_KeepAcked, _SegData, Res) ->
- Res.
+ parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1});
+parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) ->
+ parse_segment_entries(Rest, KeepAcked, Acc).
+
+add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked};
+ {Pub, del, no_ack} when KeepAcked ->
+ {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1};
+ {_Pub, del, no_ack} ->
+ {array:reset(RelSeq, SegEntries), Unacked - 1}
+ end.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -1121,6 +1186,40 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
store_msg_size_segment(_) ->
stop.
+store_msg() ->
+ foreach_queue_index({fun store_msg_journal/1,
+ fun store_msg_segment/1}).
+
+store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?EMBEDDED_SIZE_BITS>>, Rest};
+store_msg_journal(_) ->
+ stop.
+
+store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?EMBEDDED_SIZE_BITS>>, Rest};
+store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_segment(_) ->
+ stop.
+
+
+
%%----------------------------------------------------------------------------
@@ -1157,7 +1256,7 @@ transform_file(Path, Fun) when is_function(Fun)->
[{write_buffer, infinity}]),
{ok, PathHdl} = file_handle_cache:open(
- Path, [{read_ahead, Size} | ?READ_MODE], []),
+ Path, ?READ_MODE, [{read_buffer, Size}]),
{ok, Content} = file_handle_cache:read(PathHdl, Size),
ok = file_handle_cache:close(PathHdl),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ca73006aed..8c545467ab 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -58,6 +58,11 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(AUTH_NOTIFICATION_INFO_KEYS,
+ [host, vhost, name, peer_host, peer_port, protocol, auth_mechanism,
+ ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject,
+ peer_cert_validity]).
+
-define(IS_RUNNING(State),
(State#v1.connection_state =:= running orelse
State#v1.connection_state =:= blocking orelse
@@ -214,7 +219,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
rabbit_net:fast_close(Sock),
exit(normal)
end,
- log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
{ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
@@ -260,8 +264,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
- connection_closed_abruptly -> warning;
- _ -> error
+ connection_closed_with_no_data_received -> debug;
+ connection_closed_abruptly -> warning;
+ _ -> error
end, "closing AMQP connection ~p (~s):~n~p~n",
[self(), Name, Ex])
after
@@ -313,8 +318,28 @@ binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
binlist_split(Len, [H|T], Acc) ->
binlist_split(Len - size(H), T, [H|Acc]).
-mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
- case rabbit_net:recv(Sock) of
+mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
+ connection_state = CS,
+ connection = #connection{
+ name = ConnName}}) ->
+ Recv = rabbit_net:recv(Sock),
+ case CS of
+ pre_init when Buf =:= [] ->
+ %% We only new incoming connections only when either the
+ %% first byte was received or there was an error (eg. a
+ %% timeout).
+ %%
+ %% The goal is to not log TCP healthchecks (a connection
+ %% with no data received) unless specified otherwise.
+ log(case Recv of
+ closed -> debug;
+ _ -> info
+ end, "accepting AMQP connection ~p (~s)~n",
+ [self(), ConnName]);
+ _ ->
+ ok
+ end,
+ case Recv of
{data, Data} ->
recvloop(Deb, [Data | Buf], BufLen + size(Data),
State#v1{pending_recv = false});
@@ -334,10 +359,18 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
end
end.
-stop(closed, State) -> maybe_emit_stats(State),
- throw(connection_closed_abruptly);
-stop(Reason, State) -> maybe_emit_stats(State),
- throw({inet_error, Reason}).
+stop(closed, #v1{connection_state = pre_init} = State) ->
+ %% The connection was closed before any packet was received. It's
+ %% probably a load-balancer healthcheck: don't consider this a
+ %% failure.
+ maybe_emit_stats(State),
+ throw(connection_closed_with_no_data_received);
+stop(closed, State) ->
+ maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+stop(Reason, State) ->
+ maybe_emit_stats(State),
+ throw({inet_error, Reason}).
handle_other({conserve_resources, Source, Conserve},
State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) ->
@@ -944,7 +977,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
- ok = rabbit_access_control:check_vhost_access(User, VHostPath),
+ ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
@@ -1046,9 +1079,12 @@ auth_phase(Response,
auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
- {refused, Msg, Args} ->
- auth_fail(Msg, Args, Name, State);
+ {refused, Username, Msg, Args} ->
+ auth_fail(Username, Msg, Args, Name, State);
{protocol_error, Msg, Args} ->
+ notify_auth_result(none, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ State),
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
@@ -1057,9 +1093,12 @@ auth_phase(Response,
auth_state = AuthState1}};
{ok, User = #user{username = Username}} ->
case rabbit_access_control:check_user_loopback(Username, Sock) of
- ok -> ok;
- not_allowed -> auth_fail("user '~s' can only connect via "
- "localhost", [Username], Name, State)
+ ok ->
+ notify_auth_result(Username, user_authentication_success,
+ [], State);
+ not_allowed ->
+ auth_fail(Username, "user '~s' can only connect via "
+ "localhost", [Username], Name, State)
end,
Tune = #'connection.tune'{frame_max = get_env(frame_max),
channel_max = get_env(channel_max),
@@ -1071,11 +1110,15 @@ auth_phase(Response,
end.
-ifdef(use_specs).
--spec(auth_fail/4 :: (string(), [any()], binary(), #v1{}) -> no_return()).
+-spec(auth_fail/5 ::
+ (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) ->
+ no_return()).
-endif.
-auth_fail(Msg, Args, AuthName,
+auth_fail(Username, Msg, Args, AuthName,
State = #v1{connection = #connection{protocol = Protocol,
capabilities = Capabilities}}) ->
+ notify_auth_result(Username, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}], State),
AmqpError = rabbit_misc:amqp_error(
access_refused, "~s login refused: ~s",
[AuthName, io_lib:format(Msg, Args)], none),
@@ -1094,6 +1137,16 @@ auth_fail(Msg, Args, AuthName,
end,
rabbit_misc:protocol_error(AmqpError).
+notify_auth_result(Username, AuthResult, ExtraProps, State) ->
+ EventProps = [{connection_type, network},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ [case Item of
+ name -> {connection_name, i(name, State)};
+ _ -> {Item, i(Item, State)}
+ end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index ba48867ad0..039568df8e 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -27,7 +27,7 @@
vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
- connection/0, protocol/0, user/0, internal_user/0,
+ connection/0, protocol/0, auth_user/0, user/0, internal_user/0,
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
@@ -131,11 +131,15 @@
-type(protocol() :: rabbit_framing:protocol()).
+-type(auth_user() ::
+ #auth_user{username :: username(),
+ tags :: [atom()],
+ impl :: any()}).
+
-type(user() ::
- #user{username :: username(),
- tags :: [atom()],
- auth_backend :: atom(),
- impl :: any()}).
+ #user{username :: username(),
+ tags :: [atom()],
+ authz_backends :: [{atom(), any()}]}).
-type(internal_user() ::
#internal_user{username :: username(),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 72bf7855c4..2ab6545911 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -16,7 +16,8 @@
-module(rabbit_upgrade).
--export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]).
+-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0,
+ nodes_running/1, secondary_upgrade/1]).
-include("rabbit.hrl").
@@ -122,6 +123,7 @@ remove_backup() ->
maybe_upgrade_mnesia() ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
+ ok = rabbit_mnesia_rename:maybe_finish(AllNodes),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1da3de2671..be3404c91f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
- publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
+ publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -28,16 +28,34 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/6]).
%%----------------------------------------------------------------------------
+%% Messages, and their position in the queue, can be in memory or on
+%% disk, or both. Persistent messages will have both message and
+%% position pushed to disk as soon as they arrive; transient messages
+%% can be written to disk (and thus both types can be evicted from
+%% memory) under memory pressure. The question of whether a message is
+%% in RAM and whether it is persistent are orthogonal.
+%%
+%% Messages are persisted using the queue index and the message
+%% store. Normally the queue index holds the position of the message
+%% *within this queue* along with a couple of small bits of metadata,
+%% while the message store holds the message itself (including headers
+%% and other properties).
+%%
+%% However, as an optimisation, small messages can be embedded
+%% directly in the queue index and bypass the message store
+%% altogether.
+%%
%% Definitions:
-
+%%
%% alpha: this is a message where both the message itself, and its
%% position within the queue are held in RAM
%%
-%% beta: this is a message where the message itself is only held on
-%% disk, but its position within the queue is held in RAM.
+%% beta: this is a message where the message itself is only held on
+%% disk (if persisted to the message store) but its position
+%% within the queue is held in RAM.
%%
%% gamma: this is a message where the message itself is only held on
%% disk, but its position is both in RAM and on disk.
@@ -248,8 +266,9 @@
q3,
q4,
next_seq_id,
- ram_pending_ack,
- disk_pending_ack,
+ ram_pending_ack, %% msgs using store, still in RAM
+ disk_pending_ack, %% msgs in store, paged out
+ qi_pending_ack, %% msgs using qi, *can't* be paged out
index_state,
msg_store_clients,
durable,
@@ -285,8 +304,9 @@
msg,
is_persistent,
is_delivered,
- msg_on_disk,
+ msg_in_store,
index_on_disk,
+ persist_to,
msg_props
}).
@@ -300,11 +320,13 @@
%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
%% due to write indices for before we do any work at all.
-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
+-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(QUEUE, lqueue).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
%%----------------------------------------------------------------------------
@@ -341,6 +363,7 @@
next_seq_id :: seq_id(),
ram_pending_ack :: gb_trees:tree(),
disk_pending_ack :: gb_trees:tree(),
+ qi_pending_ack :: gb_trees:tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -426,16 +449,19 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(Queue, Recover, AsyncCallback) ->
- init(Queue, Recover, AsyncCallback,
- fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
- end,
- fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
+init(Queue, Recover, Callback) ->
+ init(
+ Queue, Recover, Callback,
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(Callback, MsgIds, ActionTaken)
+ end,
+ fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
+ fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
+ IndexState = rabbit_queue_index:init(QueueName,
+ MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
@@ -446,13 +472,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
- {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end};
+ {C, fun (MsgId) when is_binary(MsgId) ->
+ rabbit_msg_store:contains(MsgId, C);
+ (#basic_message{is_persistent = Persistent}) ->
+ Persistent
+ end};
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
@@ -461,7 +491,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- ContainsCheckFun, MsgIdxOnDiskFun),
+ ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -517,26 +547,25 @@ delete_crashed(#amqqueue{name = QName}) ->
purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
- len = Len,
- ram_bytes = RamBytes,
- persistent_count = PCount,
- persistent_bytes = PBytes }) ->
+ len = Len }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- Stats = {RamBytes, PCount, PBytes},
+ Stats = {0, 0, 0},
{Stats1, IndexState1} =
remove_queue_entries(Q4, Stats, IndexState, MSCState),
{Stats2, State1 = #vqstate { q1 = Q1,
index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
-
+ msg_store_clients = MSCState1,
+ ram_bytes = RamBytes,
+ persistent_count = PCount,
+ persistent_bytes = PBytes }} =
purge_betas_and_deltas(
Stats1, State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
- {{RamBytes3, PCount3, PBytes3}, IndexState3} =
+ {{RamBytesDec, PCountDec, PBytesDec}, IndexState3} =
remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
@@ -544,21 +573,22 @@ purge(State = #vqstate { q4 = Q4,
len = 0,
bytes = 0,
ram_msg_count = 0,
- ram_bytes = RamBytes3,
- persistent_count = PCount3,
- persistent_bytes = PBytes3 })}.
+ ram_bytes = RamBytes - RamBytesDec,
+ persistent_count = PCount - PCountDec,
+ persistent_bytes = PBytes - PBytesDec })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, _Flow,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
@@ -582,12 +612,13 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ _ChPid, _Flow,
+ State = #vqstate { next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
@@ -602,7 +633,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
-discard(_MsgId, _ChPid, State) -> State.
+discard(_MsgId, _ChPid, _Flow, State) -> State.
drain_confirmed(State = #vqstate { confirmed = C }) ->
case gb_sets:is_empty(C) of
@@ -664,7 +695,7 @@ ack([], State) ->
ack([SeqId], State) ->
{#msg_status { msg_id = MsgId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
@@ -674,7 +705,7 @@ ack([SeqId], State) ->
true -> rabbit_queue_index:ack([SeqId], IndexState);
false -> IndexState
end,
- case MsgOnDisk of
+ case MsgInStore of
true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
false -> ok
end,
@@ -733,15 +764,18 @@ fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
{Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
[msg_iterator(State),
disk_ack_iterator(State),
- ram_ack_iterator(State)]),
+ ram_ack_iterator(State),
+ qi_ack_iterator(State)]),
ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
- len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
+depth(State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -807,10 +841,11 @@ ram_duration(State) ->
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
ram_pending_ack = RPA,
+ qi_pending_ack = QPA,
ram_ack_count_prev = RamAckCountPrev } =
update_rates(State),
- RamAckCount = gb_trees:size(RPA),
+ RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case lists:all(fun (X) -> X < 0.01 end,
@@ -846,8 +881,9 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
RamMsgCount;
-info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) ->
- gb_trees:size(RPA);
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ gb_trees:size(RPA) + gb_trees:size(QPA);
info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
@@ -933,14 +969,11 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
when Start + Count =< End ->
Delta.
-m(MsgStatus = #msg_status { msg = Msg,
- is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+m(MsgStatus = #msg_status { is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk }) ->
true = (not IsPersistent) or IndexOnDisk,
- true = (not IndexOnDisk) or MsgOnDisk,
- true = (Msg =/= undefined) or MsgOnDisk,
-
+ true = msg_in_ram(MsgStatus) or MsgInStore,
MsgStatus.
one_if(true ) -> 1;
@@ -959,21 +992,39 @@ msg_status(IsPersistent, IsDelivered, SeqId,
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = false,
+ msg_in_store = false,
index_on_disk = false,
+ persist_to = determine_persist_to(Msg, MsgProps),
msg_props = MsgProps}.
+beta_msg_status({Msg = #basic_message{id = MsgId},
+ SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg,
+ persist_to = queue_index,
+ msg_in_store = false};
+
beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined,
+ persist_to = msg_store,
+ msg_in_store = true}.
+
+beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
#msg_status{seq_id = SeqId,
- msg_id = MsgId,
msg = undefined,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = true,
index_on_disk = true,
msg_props = MsgProps}.
-trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
+trim_msg_status(MsgStatus) ->
+ case persist_to(MsgStatus) of
+ msg_store -> MsgStatus#msg_status{msg = undefined};
+ queue_index -> MsgStatus
+ end.
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
@@ -1035,26 +1086,36 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
- {Filtered, Delivers, Acks} =
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
+ {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
- fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1} = Acc) ->
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1]};
- false -> case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA)) of
- false -> {?QUEUE:in_r(m(beta_msg_status(M)),
- Filtered1),
- Delivers1, Acks1};
- true -> Acc
- end
+ [SeqId | Acks1], RRC, RB};
+ false -> MsgStatus = m(beta_msg_status(M)),
+ HaveMsg = msg_in_ram(MsgStatus),
+ Size = msg_size(MsgStatus),
+ case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)) of
+ false -> {?QUEUE:in_r(MsgStatus, Filtered1),
+ Delivers1, Acks1,
+ RRC + one_if(HaveMsg),
+ RB + one_if(HaveMsg) * Size};
+ true -> Acc %% [0]
+ end
end
- end, {?QUEUE:new(), [], []}, List),
- {Filtered, rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+ end, {?QUEUE:new(), [], [], 0, 0}, List),
+ {Filtered, RamReadyCount, RamBytes,
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+%% [0] We don't increase RamBytes here, even though it pertains to
+%% unacked messages too, since if HaveMsg then the message must have
+%% been stored in the QI, thus the message must have been in
+%% qi_pending_ack, thus it must already have been in RAM.
expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
@@ -1101,6 +1162,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
next_seq_id = NextSeqId,
ram_pending_ack = gb_trees:empty(),
disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1141,11 +1203,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- upd_ram_bytes(
+ upd_ram_bytes_count(
1, MsgStatus,
- inc_ram_msg_count(
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) }))
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1193,6 +1254,9 @@ upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
unacked_bytes = UBytes + SignUnacked * S,
persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
+upd_ram_bytes_count(Sign, MsgStatus, State = #vqstate{ram_msg_count = Count}) ->
+ upd_ram_bytes(Sign, MsgStatus, State#vqstate{ram_msg_count = Count + Sign}).
+
upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
@@ -1203,10 +1267,9 @@ msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
- msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
State = #vqstate {ram_msg_count = RamMsgCount,
out_counter = OutCount,
@@ -1224,10 +1287,11 @@ remove(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
+ IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ {false, false, true} -> Ack();
+ _ -> IndexState1
end,
%% 3. If an ack is required, add something sensible to PA
@@ -1240,7 +1304,7 @@ remove(AckRequired, MsgStatus = #msg_status {
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
+ RamMsgCount1 = RamMsgCount - one_if(msg_in_ram(MsgStatus)),
State2 = case AckRequired of
false -> upd_bytes(-1, 0, MsgStatus, State1);
true -> upd_bytes(-1, 1, MsgStatus, State1)
@@ -1267,35 +1331,31 @@ purge_betas_and_deltas(Stats,
index_state = IndexState1 }))
end.
-remove_queue_entries(Q, {RamBytes, PCount, PBytes},
+remove_queue_entries(Q, {RamBytesDec, PCountDec, PBytesDec},
IndexState, MSCState) ->
- {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} =
+ {MsgIdsByStore, RamBytesDec1, PCountDec1, PBytesDec1, Delivers, Acks} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), RamBytes, PBytes, [], []}, Q),
+ {orddict:new(), RamBytesDec, PCountDec, PBytesDec, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {{RamBytes1,
- PCount - case orddict:find(true, MsgIdsByStore) of
- error -> 0;
- {ok, Ids} -> length(Ids)
- end,
- PBytes1},
+ {{RamBytesDec1, PCountDec1, PBytesDec1},
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
- is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
+ #msg_status { msg_id = MsgId, seq_id = SeqId,
+ is_delivered = IsDelivered, msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
- msg_props = #message_properties { size = Size } },
- {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
- {case MsgOnDisk of
+ msg_props = #message_properties { size = Size } } = MsgStatus,
+ {MsgIdsByStore, RamBytesDec, PCountDec, PBytesDec, Delivers, Acks}) ->
+ {case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- RamBytes - Size * one_if(Msg =/= undefined),
- PBytes - Size * one_if(IsPersistent),
+ RamBytesDec + Size * one_if(msg_in_ram(MsgStatus)),
+ PCountDec + one_if(IsPersistent),
+ PBytesDec + Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
@@ -1304,36 +1364,38 @@ remove_queue_entries1(
%%----------------------------------------------------------------------------
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_on_disk = true }, _MSCState) ->
+ msg_in_store = true }, _MSCState) ->
MsgStatus;
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
- MsgStatus #msg_status { msg_on_disk = true };
+ case persist_to(MsgStatus) of
+ msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
+ prepare_to_store(Msg)),
+ MsgStatus#msg_status{msg_in_store = true};
+ queue_index -> MsgStatus
+ end;
maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
MsgStatus.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, IndexState) ->
- true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg,
msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_props = MsgProps}, IndexState)
when Force orelse IsPersistent ->
- true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
IndexState1 = rabbit_queue_index:publish(
- MsgId, SeqId, MsgProps, IsPersistent, IndexState),
- {MsgStatus #msg_status { index_on_disk = true },
+ case persist_to(MsgStatus) of
+ msg_store -> MsgId;
+ queue_index -> prepare_to_store(Msg)
+ end, SeqId, MsgProps, IsPersistent, IndexState),
+ {MsgStatus#msg_status{index_on_disk = true},
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
{MsgStatus, IndexState}.
@@ -1346,28 +1408,79 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
{MsgStatus2, State #vqstate { index_state = IndexState1 }}.
+determine_persist_to(#basic_message{
+ content = #content{properties = Props,
+ properties_bin = PropsBin}},
+ #message_properties{size = BodySize}) ->
+ {ok, IndexMaxSize} = application:get_env(
+ rabbit, queue_index_embed_msgs_below),
+ %% The >= is so that you can set the env to 0 and never persist
+ %% to the index.
+ %%
+ %% We want this to be fast, so we avoid size(term_to_binary())
+ %% here, or using the term size estimation from truncate.erl, both
+ %% of which are too slow. So instead, if the message body size
+ %% goes over the limit then we avoid any other checks.
+ %%
+ %% If it doesn't we need to decide if the properties will push
+ %% it past the limit. If we have the encoded properties (usual
+ %% case) we can just check their size. If we don't (message came
+ %% via the direct client), we make a guess based on the number of
+ %% headers.
+ case BodySize >= IndexMaxSize of
+ true -> msg_store;
+ false -> Est = case is_binary(PropsBin) of
+ true -> BodySize + size(PropsBin);
+ false -> #'P_basic'{headers = Hs} = Props,
+ case Hs of
+ undefined -> 0;
+ _ -> length(Hs)
+ end * ?HEADER_GUESS_SIZE + BodySize
+ end,
+ case Est >= IndexMaxSize of
+ true -> msg_store;
+ false -> queue_index
+ end
+ end.
+
+persist_to(#msg_status{persist_to = To}) -> To.
+
+prepare_to_store(Msg) ->
+ Msg#basic_message{
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)}.
+
%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
+record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
ack_in_counter = AckInCount}) ->
- {RPA1, DPA1} =
- case Msg of
- undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
- _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
+ Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
+ {RPA1, DPA1, QPA1} =
+ case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of
+ {false, _} -> {RPA, Insert(DPA), QPA};
+ {_, queue_index} -> {RPA, DPA, Insert(QPA)};
+ {_, msg_store} -> {Insert(RPA), DPA, QPA}
end,
State #vqstate { ram_pending_ack = RPA1,
disk_pending_ack = DPA1,
+ qi_pending_ack = QPA1,
ack_in_counter = AckInCount + 1}.
lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> V;
- none -> gb_trees:get(SeqId, DPA)
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, QPA)
+ end
end.
%% First parameter = UpdatePersistentCount
@@ -1377,27 +1490,38 @@ remove_pending_ack(true, SeqId, State) ->
PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
{MsgStatus, upd_bytes(0, -1, MsgStatus,
State1 # vqstate{ persistent_count = PCount1 })};
-remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
{V, State #vqstate { ram_pending_ack = RPA1 }};
- none -> DPA1 = gb_trees:delete(SeqId, DPA),
- {gb_trees:get(SeqId, DPA),
- State #vqstate { disk_pending_ack = DPA1 }}
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} ->
+ DPA1 = gb_trees:delete(SeqId, DPA),
+ {V, State#vqstate{disk_pending_ack = DPA1}};
+ none ->
+ QPA1 = gb_trees:delete(SeqId, QPA),
+ {gb_trees:get(SeqId, QPA),
+ State#vqstate{qi_pending_ack = QPA1}}
+ end
end.
purge_pending_ack(KeepPersistent,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
- F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA),
+ F, rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(
+ F, accumulate_ack_init(), RPA), DPA), QPA),
State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty() },
+ disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty()},
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
@@ -1418,11 +1542,11 @@ accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
- case MsgOnDisk of
+ case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
@@ -1469,6 +1593,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
gb_sets:union(MIOD, Confirmed) })
end).
+msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
+
%%----------------------------------------------------------------------------
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
@@ -1476,7 +1604,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
{MsgStatus#msg_status { msg = Msg },
- upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1]
+ upd_ram_bytes_count(1, MsgStatus, State1)}; %% [1]
publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
%% [1] We increase the ram_bytes here because we paged the message in
@@ -1488,10 +1616,11 @@ publish_alpha(MsgStatus, State) ->
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of
- true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)};
- _ -> {MsgStatus2, State1}
- end.
+ {MsgStatus2, case {msg_in_ram(MsgStatus1), msg_in_ram(MsgStatus2)} of
+ {true, false} -> upd_ram_bytes(-1, MsgStatus, State1);
+ {_, true} -> inc_ram_msg_count(State1);
+ _ -> State1
+ end}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1563,6 +1692,9 @@ ram_ack_iterator(State) ->
disk_ack_iterator(State) ->
{ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+qi_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
+
msg_iterator(State) -> istate(start, State).
istate(start, State) -> {q4, State#vqstate.q4, State};
@@ -1592,7 +1724,8 @@ next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of
false -> Next = {delta, Delta, Rest, State},
{value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
@@ -1755,8 +1888,11 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1766,9 +1902,12 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2 },
+ {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
+ betas_from_index_entries(List, TransientThreshold,
+ RPA, DPA, QPA, IndexState1),
+ State1 = State #vqstate { index_state = IndexState2,
+ ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc },
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
@@ -1826,18 +1965,17 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true },
- State1 = #vqstate { ram_msg_count = RamMsgCount }} =
+ {MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(
- MsgStatus2, Qa,
- upd_ram_bytes(
- -1, MsgStatus2,
- State1 #vqstate {
- ram_msg_count = RamMsgCount - 1})),
+ State2 = case msg_in_ram(MsgStatus2) of
+ false -> upd_ram_bytes_count(
+ -1, MsgStatus2, State1);
+ true -> State1
+ end,
+ State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State2)
+ Qa, State3)
end
end.
@@ -1845,7 +1983,7 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
delta = Delta,
q3 = Q3,
index_state = IndexState }) ->
- PushState = {Quota, Delta, IndexState},
+ PushState = {Quota, Delta, IndexState, State},
{Q3a, PushState1} = push_betas_to_deltas(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
@@ -1854,11 +1992,11 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
Q2, PushState1),
- {_, Delta1, IndexState1} = PushState2,
- State #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a,
- index_state = IndexState1 }.
+ {_, Delta1, IndexState1, State1} = PushState2,
+ State1 #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a,
+ index_state = IndexState1 }.
push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
@@ -1875,10 +2013,10 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
end.
push_betas_to_deltas1(_Generator, _Limit, Q,
- {0, _Delta, _IndexState} = PushState) ->
+ {0, _Delta, _IndexState, _State} = PushState) ->
{Q, PushState};
push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, IndexState} = PushState) ->
+ {Quota, Delta, IndexState, State} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
{Q, PushState};
@@ -1888,9 +2026,13 @@ push_betas_to_deltas1(Generator, Limit, Q,
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus, IndexState),
+ State1 = case msg_in_ram(MsgStatus) of
+ false -> State;
+ true -> upd_ram_bytes_count(-1, MsgStatus, State)
+ end,
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota - 1, Delta1, IndexState1})
+ {Quota - 1, Delta1, IndexState1, State1})
end.
%%----------------------------------------------------------------------------
diff --git a/src/truncate.erl b/src/truncate.erl
index 820af1bf86..1b4d957d0b 100644
--- a/src/truncate.erl
+++ b/src/truncate.erl
@@ -45,7 +45,7 @@ report(List, Params) when is_list(List) -> [case Item of
report(Other, Params) -> term(Other, Params).
term(Thing, {Max, {Content, Struct, ContentDec, StructDec}}) ->
- case term_limit(Thing, Max) of
+ case exceeds_size(Thing, Max) of
true -> term(Thing, true, #params{content = Content,
struct = Struct,
content_dec = ContentDec,
@@ -93,7 +93,7 @@ shrink_list([H|T], #params{content = Content,
%% sizes. This is all going to be rather approximate though, these
%% sizes are probably not very "fair" but we are just trying to see if
%% we reach a fairly arbitrary limit anyway though.
-term_limit(Thing, Max) ->
+exceeds_size(Thing, Max) ->
case term_size(Thing, Max, erlang:system_info(wordsize)) of
limit_exceeded -> true;
_ -> false
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index ef6b756b14..f19c361e94 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -1292,11 +1292,9 @@ test_spawn_remote() ->
end.
user(Username) ->
- #user{username = Username,
- tags = [administrator],
- auth_backend = rabbit_auth_backend_internal,
- impl = #internal_user{username = Username,
- tags = [administrator]}}.
+ #user{username = Username,
+ tags = [administrator],
+ authz_backends = [{rabbit_auth_backend_internal, none}]}.
test_confirms() ->
{_Writer, Ch} = test_spawn(),
@@ -1890,11 +1888,15 @@ test_backing_queue() ->
passed = test_msg_store(),
application:set_env(rabbit, msg_store_file_size_limit,
FileSizeLimit),
- passed = test_queue_index(),
- passed = test_queue_index_props(),
- passed = test_variable_queue(),
- passed = test_variable_queue_delete_msg_store_files_callback(),
- passed = test_queue_recover(),
+ [begin
+ application:set_env(
+ rabbit, queue_index_embed_msgs_below, Bytes),
+ passed = test_queue_index(),
+ passed = test_queue_index_props(),
+ passed = test_variable_queue(),
+ passed = test_variable_queue_delete_msg_store_files_callback(),
+ passed = test_queue_recover()
+ end || Bytes <- [0, 1024]],
application:set_env(rabbit, queue_index_max_journal_entries,
MaxJournal),
%% We will have restarted the message store, and thus changed
@@ -2221,7 +2223,7 @@ init_test_queue() ->
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
- fun nop/1),
+ fun nop/1, fun nop/1),
ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
Res.
@@ -2424,7 +2426,7 @@ variable_queue_init(Q, Recover) ->
Q, case Recover of
true -> non_clean_shutdown;
false -> new
- end, fun nop/2, fun nop/2, fun nop/1).
+ end, fun nop/2, fun nop/2, fun nop/1, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
@@ -2446,7 +2448,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
end},
PayloadFun(N)),
PropFun(N, #message_properties{size = 10}),
- false, self(), VQN)
+ false, self(), noflow, VQN)
end, VQ, lists:seq(Start, Start + Count - 1))).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -2464,7 +2466,10 @@ variable_queue_set_ram_duration_target(Duration, VQ) ->
rabbit_variable_queue:set_ram_duration_target(Duration, VQ)).
assert_prop(List, Prop, Value) ->
- Value = proplists:get_value(Prop, List).
+ case proplists:get_value(Prop, List)of
+ Value -> ok;
+ _ -> {exit, Prop, exp, Value, List}
+ end.
assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
@@ -2487,12 +2492,18 @@ with_fresh_variable_queue(Fun) ->
{delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(
- shutdown, Fun(VQ)),
- Me ! Ref
+ try
+ _ = rabbit_variable_queue:delete_and_terminate(
+ shutdown, Fun(VQ)),
+ Me ! Ref
+ catch
+ Type:Error ->
+ Me ! {Ref, Type, Error, erlang:get_stacktrace()}
+ end
end),
receive
- Ref -> ok
+ Ref -> ok;
+ {Ref, Type, Error, ST} -> exit({Type, Error, ST})
end,
passed.
@@ -2503,7 +2514,8 @@ publish_and_confirm(Q, Payload, Count) ->
<<>>, #'P_basic'{delivery_mode = 2},
Payload),
Delivery = #delivery{mandatory = false, sender = self(),
- confirm = true, message = Msg, msg_seq_no = Seq},
+ confirm = true, message = Msg, msg_seq_no = Seq,
+ flow = noflow},
_QPids = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
@@ -2789,8 +2801,6 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
VQ7 = lists:foldl(
fun (Duration1, VQ4) ->
{_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
- io:format("~p:~n~p~n",
- [Duration1, variable_queue_status(VQ5)]),
VQ6 = variable_queue_set_ram_duration_target(
Duration1, VQ5),
publish_fetch_and_ack(Churn, Len, VQ6)
@@ -2851,7 +2861,6 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
check_variable_queue_status(VQ0, Props) ->
VQ1 = variable_queue_wait_for_shuffling_end(VQ0),
S = variable_queue_status(VQ1),
- io:format("~p~n", [S]),
assert_props(S, Props),
VQ1.