summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-13 15:59:16 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-13 15:59:16 +0100
commit5af9979478c311c6c63d3bace6f219e659aa7c85 (patch)
treef558eadfc931bb34a3bd41a8bbfa814dc8a07dfd
parenta345670b49d2f9aaaf2e862b04faaf2b9b147143 (diff)
parent7cd5c26949fe9c60d5a609bb05f9244e74f618bd (diff)
downloadrabbitmq-server-git-5af9979478c311c6c63d3bace6f219e659aa7c85.tar.gz
Merge bug26306
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmq.config.example8
-rw-r--r--docs/rabbitmqctl.1.xml36
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/changelog_comments/additional_changelog_comments_3.3.59
-rw-r--r--packaging/debs/Debian/changelog_comments/additional_changelog_comments_x.x.x11
-rw-r--r--packaging/debs/Debian/debian/changelog7
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl30
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl77
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_backing_queue_qc.erl5
-rw-r--r--src/rabbit_diagnostics.erl31
-rw-r--r--src/rabbit_log.erl65
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_mnesia.erl56
-rw-r--r--src/rabbit_networking.erl4
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_policies.erl11
-rw-r--r--src/rabbit_queue_index.erl94
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/rabbit_recovery_terms.erl27
-rw-r--r--src/rabbit_tests.erl35
-rw-r--r--src/rabbit_upgrade.erl6
-rw-r--r--src/rabbit_variable_queue.erl261
28 files changed, 518 insertions, 291 deletions
diff --git a/Makefile b/Makefile
index 6dbb650e68..ffb4cdfed9 100644
--- a/Makefile
+++ b/Makefile
@@ -127,7 +127,7 @@ plugins:
# Not building plugins
check-xref:
- $(info xref checks are disabled)
+ $(info xref checks are disabled as there is no plugins-src directory)
endif
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index e8b5666098..63540568f1 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -49,6 +49,14 @@
%% ==============
%%
+ %% The default "guest" user is only permitted to access the server
+ %% via a loopback interface (e.g. localhost).
+ %% {loopback_users, [<<"guest">>]},
+ %%
+ %% Uncomment the following line if you want to allow access to the
+ %% guest user from anywhere on the network.
+ %% {loopback_users, []},
+
%% Configuring SSL.
%% See http://www.rabbitmq.com/ssl.html for full documentation.
%%
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 328926df18..eb3c7ef38d 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1190,6 +1190,42 @@
(queue depth).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>messages_ready_ram</term>
+ <listitem><para>Number of messages from messages_ready which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_unacknowledged_ram</term>
+ <listitem><para>Number of messages from messages_unacknowledged which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_ram</term>
+ <listitem><para>Total number of messages which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_persistent</term>
+ <listitem><para>Total number of persistent messages in the queue (will always be 0 for transient queues).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes</term>
+ <listitem><para>Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_ready</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages ready to be delivered to clients.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_unacknowledged</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages delivered to clients but not yet acknowledged.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_ram</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages which are in RAM.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_persistent</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages which are persistent.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>consumers</term>
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 7a40f9ebf0..5e41ea93bf 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -87,7 +87,7 @@
-record(event, {type, props, reference = undefined, timestamp}).
--record(message_properties, {expiry, needs_confirming = false}).
+-record(message_properties, {expiry, needs_confirming = false, size}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 324040579d..971e7241df 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,9 @@ done
rm -rf %{buildroot}
%changelog
+* Mon Aug 11 2014 simon@rabbitmq.com 3.3.5-1
+- New Upstream Release
+
* Tue Jun 24 2014 simon@rabbitmq.com 3.3.4-1
- New Upstream Release
diff --git a/packaging/debs/Debian/changelog_comments/additional_changelog_comments_3.3.5 b/packaging/debs/Debian/changelog_comments/additional_changelog_comments_3.3.5
new file mode 100644
index 0000000000..b9a9551c6b
--- /dev/null
+++ b/packaging/debs/Debian/changelog_comments/additional_changelog_comments_3.3.5
@@ -0,0 +1,9 @@
+# This file contains additional comments for the debian/changelog to be
+# appended within the current version's changelog entry.
+# Each line will be a separate comment. Do not begin with an *, dch will
+# add that.
+# For comments longer than one line do not put a line break and dch will
+# neatly format it.
+# Shell comments are ignored.
+#
+Changed Uploaders from Emile Joubert to Blair Hester
diff --git a/packaging/debs/Debian/changelog_comments/additional_changelog_comments_x.x.x b/packaging/debs/Debian/changelog_comments/additional_changelog_comments_x.x.x
new file mode 100644
index 0000000000..bbab75965e
--- /dev/null
+++ b/packaging/debs/Debian/changelog_comments/additional_changelog_comments_x.x.x
@@ -0,0 +1,11 @@
+# This file contains additional comments for the debian/changelog to be
+# appended within the current version's changelog entry.
+# Each line will be a separate comment. Do not begin with an *, dch will
+# add that.
+# For comments longer than one line do not put a line break and dch will
+# neatly format it.
+# Shell comments are ignored.
+#
+# Examples:
+#Remove parts made of undercooked chicken
+#This is a long line which is the beginning of a long two line comment which I am sure is going to be needed if the script cannot handle it
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index d26991e437..8fde9087be 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,10 @@
+rabbitmq-server (3.3.5-1) unstable; urgency=low
+
+ * New Upstream Release
+ * Changed Uploaders from Emile Joubert to Blair Hester
+
+ -- Simon MacMullen <simon@rabbitmq.com> Mon, 11 Aug 2014 12:23:31 +0100
+
rabbitmq-server (3.3.4-1) unstable; urgency=low
* New Upstream Release
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 9a578aa953..a3573bbd04 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -139,7 +139,7 @@ out({queue, [V], [], 1}) ->
{{value, V}, {queue, [], [], 0}};
out({queue, [Y|In], [], Len}) ->
[V|Out] = lists:reverse(In, []),
- {{value, V}, {queue, [Y], Out}, Len - 1};
+ {{value, V}, {queue, [Y], Out, Len - 1}};
out({queue, In, [V], Len}) when is_list(In) ->
{{value,V}, r2f(In, Len - 1)};
out({queue, In,[V|Out], Len}) when is_list(In) ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 191f04a425..4b8af870f4 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -74,13 +74,6 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
--rabbit_boot_step({rabbit_log,
- [{description, "logging server"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_log]}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -266,7 +259,7 @@ maybe_hipe_compile() ->
warn_if_hipe_compilation_failed(true) ->
ok;
warn_if_hipe_compilation_failed(false) ->
- error_logger:warning_msg(
+ rabbit_log:warning(
"Not HiPE compiling: HiPE not found in this Erlang installation.~n").
%% HiPE compilation happens before we have log handlers and can take a
@@ -367,7 +360,7 @@ stop() ->
undefined -> ok;
_ -> await_startup(true)
end,
- rabbit_log:info("Stopping RabbitMQ~n"),
+ rabbit_misc:local_info_msg("Stopping RabbitMQ~n", []),
Apps = ?APPS ++ rabbit_plugins:active(),
stop_apps(app_utils:app_dependency_order(Apps, true)).
@@ -502,9 +495,9 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, Vsn} = application:get_key(rabbit, vsn),
- error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
- [Vsn, erlang:system_info(otp_release),
- ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ rabbit_log:info("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
+ [Vsn, erlang:system_info(otp_release),
+ ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
@@ -773,7 +766,7 @@ log_broker_started(Plugins) ->
fun() ->
PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P])
|| P <- Plugins]),
- error_logger:info_msg(
+ rabbit_log:info(
"Server startup complete; ~b plugins started.~n~s",
[length(Plugins), PluginList]),
io:format(" completed with ~p plugins.~n", [length(Plugins)])
@@ -822,18 +815,18 @@ log_banner() ->
{K, V} ->
Format(K, V)
end || S <- Settings]),
- error_logger:info_msg("~s", [Banner]).
+ rabbit_log:info("~s", [Banner]).
warn_if_kernel_config_dubious() ->
case erlang:system_info(kernel_poll) of
true -> ok;
- false -> error_logger:warning_msg(
+ false -> rabbit_log:warning(
"Kernel poll (epoll, kqueue, etc) is disabled. Throughput "
"and CPU utilization may worsen.~n")
end,
AsyncThreads = erlang:system_info(thread_pool_size),
case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of
- true -> error_logger:warning_msg(
+ true -> rabbit_log:warning(
"Erlang VM is running with ~b I/O threads, "
"file I/O performance may worsen~n", [AsyncThreads]);
false -> ok
@@ -843,9 +836,8 @@ warn_if_kernel_config_dubious() ->
{ok, Val} -> Val
end,
case proplists:get_value(nodelay, IDCOpts, false) of
- false -> error_logger:warning_msg(
- "Nagle's algorithm is enabled for sockets, "
- "network I/O latency will be higher~n");
+ false -> rabbit_log:warning("Nagle's algorithm is enabled for sockets, "
+ "network I/O latency will be higher~n");
true -> ok
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4e23dbd242..a33a8fcc09 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -461,7 +461,8 @@ declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
- {<<"x-max-length">>, fun check_non_neg_int_arg/2}].
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 63b1865570..db297c1daa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,6 +52,7 @@
dlx,
dlx_routing_key,
max_length,
+ max_bytes,
args_policy_version,
status
}).
@@ -265,7 +266,8 @@ process_args_policy(State = #q{q = Q,
{<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
- {<<"max-length">>, fun res_min/2, fun init_max_length/2}],
+ {<<"max-length">>, fun res_min/2, fun init_max_length/2},
+ {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
@@ -304,6 +306,10 @@ init_max_length(MaxLen, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}),
State1.
+init_max_bytes(MaxBytes, State) ->
+ {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
+ State1.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -392,10 +398,8 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
when Expiry + 1000 < TExpiry ->
- case rabbit_misc:cancel_timer(TRef) of
- false -> State;
- _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
- end;
+ rabbit_misc:cancel_timer(TRef),
+ ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined});
ensure_ttl_timer(_Expiry, State) ->
State.
@@ -545,34 +549,41 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% remains unchanged, or if the newly published message
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
- case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
{false, false, _} -> State4;
{true, true, undefined} -> State4;
{_, _, _} -> drop_expired_msgs(State4)
end
end.
-maybe_drop_head(State = #q{max_length = undefined}) ->
- {0, State};
-maybe_drop_head(State = #q{max_length = MaxLen,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case BQ:len(BQS) - MaxLen of
- Excess when Excess > 0 ->
- {Excess,
- with_dlx(
- State#q.dlx,
- fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
- fun () ->
- {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
- BQ:drop(false, BQS0)
- end, {ok, BQS},
- lists:seq(1, Excess)),
- State#q{backing_queue_state = BQS1}
- end)};
- _ -> {0, State}
+maybe_drop_head(State = #q{max_length = undefined,
+ max_bytes = undefined}) ->
+ {false, State};
+maybe_drop_head(State) ->
+ maybe_drop_head(false, State).
+
+maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case over_max_length(State) of
+ true ->
+ maybe_drop_head(true,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msg(X, State) end,
+ fun () ->
+ {_, BQS1} = BQ:drop(false, BQS),
+ State#q{backing_queue_state = BQS1}
+ end));
+ false ->
+ {AlreadyDropped, State}
end.
+over_max_length(#q{max_length = MaxLen,
+ max_bytes = MaxBytes,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes_ready, BQS) > MaxBytes.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
WasEmpty = BQ:is_empty(BQS),
@@ -666,9 +677,12 @@ subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) ->
run_message_queue(true, Fun(State1))
end.
-message_properties(Message, Confirm, #q{ttl = TTL}) ->
+message_properties(Message = #basic_message{content = Content},
+ Confirm, #q{ttl = TTL}) ->
+ #content{payload_fragments_rev = PFR} = Content,
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually}.
+ needs_confirming = Confirm == eventually,
+ size = iolist_size(PFR)}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
@@ -725,15 +739,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
-dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) ->
{ok, State1} =
dead_letter_msgs(
fun (DLFun, Acc, BQS) ->
- lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
- {{Msg, _, AckTag}, BQS1} =
- BQ:fetch(true, BQS0),
- {ok, DLFun(Msg, AckTag, Acc0), BQS1}
- end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ {{Msg, _, AckTag}, BQS1} = BQ:fetch(true, BQS),
+ {ok, DLFun(Msg, AckTag, Acc), BQS1}
end, maxlen, X, State),
State1.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9e5f081353..098f5f4342 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -20,7 +20,9 @@
-define(INFO_KEYS, [messages_ram, messages_ready_ram,
messages_unacknowledged_ram, messages_persistent,
- backing_queue_status]).
+ message_bytes, message_bytes_ready,
+ message_bytes_unacknowledged, message_bytes_ram,
+ message_bytes_persistent, backing_queue_status]).
-ifdef(use_specs).
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 49b71122a8..622b1b161f 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -116,7 +116,8 @@ qc_publish(#state{bqstate = BQ}) ->
[qc_message(),
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
- expiry = oneof([undefined | lists:seq(1, 10)])},
+ expiry = oneof([undefined | lists:seq(1, 10)]),
+ size = 10},
false, self(), BQ]}.
qc_publish_multiple(#state{}) ->
@@ -124,7 +125,7 @@ qc_publish_multiple(#state{}) ->
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
- [qc_message(), #message_properties{}, self(), BQ]}.
+ [qc_message(), #message_properties{size = 10}, self(), BQ]}.
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl
index 8b76ee1711..4eafada3f3 100644
--- a/src/rabbit_diagnostics.erl
+++ b/src/rabbit_diagnostics.erl
@@ -16,29 +16,37 @@
-module(rabbit_diagnostics).
--export([maybe_stuck_processes/1]).
+-define(PROCESS_INFO,
+ [current_stacktrace, initial_call, dictionary, message_queue_len,
+ links, monitors, monitored_by, heap_size]).
-maybe_stuck_processes(Timeout) ->
+-export([maybe_stuck/0, maybe_stuck/1]).
+
+maybe_stuck() -> maybe_stuck(5000).
+
+maybe_stuck(Timeout) ->
Pids = processes(),
io:format("There are ~p processes.~n", [length(Pids)]),
- maybe_stuck_processes(Pids, Timeout).
+ maybe_stuck(Pids, Timeout).
-maybe_stuck_processes(Pids, Timeout) when Timeout =< 0 ->
+maybe_stuck(Pids, Timeout) when Timeout =< 0 ->
io:format("Found ~p suspicious processes.~n", [length(Pids)]),
- [io:format("~p~n", [{Pid, erlang:process_info(Pid),
- erlang:process_info(Pid, current_stacktrace)}])
- || Pid <- Pids],
+ [io:format("~p~n", [info(Pid)]) || Pid <- Pids],
ok;
-maybe_stuck_processes(Pids, Timeout) ->
+maybe_stuck(Pids, Timeout) ->
Pids2 = [P || P <- Pids, looks_stuck(P)],
io:format("Investigated ~p processes this round, ~pms to go.~n",
[length(Pids2), Timeout]),
- timer:sleep(100),
- maybe_stuck_processes(Pids2, Timeout - 100).
+ timer:sleep(500),
+ maybe_stuck(Pids2, Timeout - 500).
looks_stuck(Pid) ->
case process_info(Pid, status) of
{status, waiting} ->
+ %% It's tempting to just check for message_queue_len > 0
+ %% here rather than mess around with stack traces and
+ %% heuristics. But really, sometimes freshly stuck
+ %% processes can have 0 messages...
case erlang:process_info(Pid, current_stacktrace) of
{current_stacktrace, [H|_]} ->
maybe_stuck_stacktrace(H);
@@ -66,3 +74,6 @@ maybe_stuck_stacktrace({_M, F, _A}) ->
0 -> true;
_ -> false
end.
+
+info(Pid) ->
+ [{pid, Pid} | process_info(Pid, ?PROCESS_INFO)].
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index f4df0e76d5..2ca5260c12 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -16,17 +16,8 @@
-module(rabbit_log).
--behaviour(gen_server).
-
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
--define(SERVER, ?MODULE).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -36,8 +27,6 @@
-type(category() :: atom()).
-type(level() :: 'info' | 'warning' | 'error').
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-
-spec(log/3 :: (category(), level(), string()) -> 'ok').
-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
@@ -51,56 +40,34 @@
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
log(Category, Level, Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}).
-
-info(Fmt) -> log(default, info, Fmt).
-info(Fmt, Args) -> log(default, info, Fmt, Args).
-warning(Fmt) -> log(default, warning, Fmt).
-warning(Fmt, Args) -> log(default, warning, Fmt, Args).
-error(Fmt) -> log(default, error, Fmt).
-error(Fmt, Args) -> log(default, error, Fmt, Args).
-
-%%--------------------------------------------------------------------
-
-init([]) ->
- {ok, CatLevelList} = application:get_env(log_levels),
- CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList],
- {ok, orddict:from_list(CatLevels)}.
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast({log, Category, Level, Fmt, Args}, CatLevels) ->
- CatLevel = case orddict:find(Category, CatLevels) of
- {ok, L} -> L;
- error -> level(info)
- end,
- case level(Level) =< CatLevel of
+ case level(Level) =< catlevel(Category) of
false -> ok;
true -> (case Level of
info -> fun error_logger:info_msg/2;
warning -> fun error_logger:warning_msg/2;
error -> fun error_logger:error_msg/2
end)(Fmt, Args)
- end,
- {noreply, CatLevels};
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
+ end.
-terminate(_Reason, _State) ->
- ok.
+info(Fmt) -> log(default, info, Fmt).
+info(Fmt, Args) -> log(default, info, Fmt, Args).
+warning(Fmt) -> log(default, warning, Fmt).
+warning(Fmt, Args) -> log(default, warning, Fmt, Args).
+error(Fmt) -> log(default, error, Fmt).
+error(Fmt, Args) -> log(default, error, Fmt, Args).
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+catlevel(Category) ->
+ %% We can get here as part of rabbitmqctl when it is impersonating
+ %% a node; in which case the env will not be defined.
+ CatLevelList = case application:get_env(rabbit, log_levels) of
+ {ok, L} -> L;
+ undefined -> []
+ end,
+ level(proplists:get_value(Category, CatLevelList, info)).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index cc06ae442d..6d0064abe7 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -271,8 +271,8 @@ handle_cast({sync_start, Ref, Syncer},
DD, Ref, TRef, Syncer, BQ, BQS,
fun (BQN, BQSN) ->
BQSN1 = update_ram_duration(BQN, BQSN),
- TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
- self(), update_ram_duration),
+ TRefN = rabbit_misc:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
{TRefN, BQSN1}
end) of
denied -> noreply(State1);
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 09355f3ffc..7ff88f04c1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -92,9 +92,9 @@
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-type(digraph_label() :: term()).
-type(graph_vertex_fun() ::
- fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
+ fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
- fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+ fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph:vertex()}])).
-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
@@ -656,10 +656,10 @@ with_local_io(Fun) ->
end.
%% Log an info message on the local node using the standard logger.
-%% Use this if rabbit isn't running and the call didn't originate on
-%% the local node (e.g. rabbitmqctl calls).
+%% Use this if the call didn't originate on the local node (e.g.
+%% rabbitmqctl calls).
local_info_msg(Format, Args) ->
- with_local_io(fun () -> error_logger:info_msg(Format, Args) end).
+ with_local_io(fun () -> rabbit_log:info(Format, Args) end).
unfold(Fun, Init) ->
unfold(Fun, [], Init).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 630d9853bb..a499686f52 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -116,7 +116,7 @@ init_from_config() ->
true -> disc;
false -> ram
end},
- error_logger:warning_msg(
+ rabbit_log:warning(
"Converting legacy 'cluster_nodes' configuration~n ~w~n"
"to~n ~w.~n~n"
"Please update the configuration to the new format "
@@ -127,17 +127,22 @@ init_from_config() ->
{ok, Config} ->
Config
end,
- case find_good_node(nodes_excl_me(TryNodes)) of
+ case TryNodes of
+ [] -> init_db_and_upgrade([node()], disc, false);
+ _ -> auto_cluster(TryNodes, NodeType)
+ end.
+
+auto_cluster(TryNodes, NodeType) ->
+ case find_auto_cluster_node(nodes_excl_me(TryNodes)) of
{ok, Node} ->
- rabbit_log:info("Node '~p' selected for clustering from "
- "configuration~n", [Node]),
+ rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true),
rabbit_node_monitor:notify_joined_cluster();
none ->
- rabbit_log:warning("Could not find any suitable node amongst the "
- "ones provided in the configuration: ~p~n",
- [TryNodes]),
+ rabbit_log:warning(
+ "Could not find any node for auto-clustering from: ~p~n"
+ "Starting blank node...~n", [TryNodes]),
init_db_and_upgrade([node()], disc, false)
end.
@@ -619,10 +624,10 @@ schema_ok_or_move() ->
{error, Reason} ->
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
- error_logger:warning_msg("schema integrity check failed: ~p~n"
- "moving database to backup location "
- "and recreating schema from scratch~n",
- [Reason]),
+ rabbit_log:warning("schema integrity check failed: ~p~n"
+ "moving database to backup location "
+ "and recreating schema from scratch~n",
+ [Reason]),
ok = move_db(),
ok = create_schema()
end.
@@ -648,8 +653,8 @@ move_db() ->
ok ->
%% NB: we cannot use rabbit_log here since it may not have
%% been started yet
- error_logger:warning_msg("moved database from ~s to ~s~n",
- [MnesiaDir, BackupDir]),
+ rabbit_log:warning("moved database from ~s to ~s~n",
+ [MnesiaDir, BackupDir]),
ok;
{error, Reason} -> throw({error, {cannot_backup_mnesia,
MnesiaDir, BackupDir, Reason}})
@@ -695,7 +700,7 @@ leave_cluster(Node) ->
end.
wait_for(Condition) ->
- error_logger:info_msg("Waiting for ~p...~n", [Condition]),
+ rabbit_log:info("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
start_mnesia(CheckConsistency) ->
@@ -788,17 +793,24 @@ is_virgin_node() ->
false
end.
-find_good_node([]) ->
+find_auto_cluster_node([]) ->
none;
-find_good_node([Node | Nodes]) ->
+find_auto_cluster_node([Node | Nodes]) ->
+ Fail = fun (Fmt, Args) ->
+ rabbit_log:warning(
+ "Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]),
+ find_auto_cluster_node(Nodes)
+ end,
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _Reason} -> find_good_node(Nodes);
+ {badrpc, _} = Reason -> Diag = rabbit_nodes:diagnostics([Node]),
+ Fail("~p~n~s~n", [Reason, Diag]);
%% old delegate hash check
- {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes);
- {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
- {error, _} -> find_good_node(Nodes);
- ok -> {ok, Node}
- end
+ {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> Fail("versions ~p~n",
+ [{OTP, Rabbit}]);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 96448f3227..4e92bf394e 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -208,7 +208,7 @@ tcp_listener_addresses({Host, Port, Family0})
[{IPAddress, Port, Family} ||
{IPAddress, Family} <- getaddr(Host, Family0)];
tcp_listener_addresses({_Host, Port, _Family0}) ->
- error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
+ rabbit_log:error("invalid port ~p - not 0..65535~n", [Port]),
throw({error, {invalid_port, Port}}).
tcp_listener_addresses_auto(Port) ->
@@ -395,7 +395,7 @@ gethostaddr(Host, Family) ->
end.
host_lookup_error(Host, Reason) ->
- error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ rabbit_log:error("invalid host ~p - ~p~n", [Host, Reason]),
throw({error, {invalid_host, Host, Reason}}).
resolve_family({_,_,_,_}, auto) -> inet;
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 9acaa1d418..04db5d5d21 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -97,7 +97,7 @@ list(PluginsDir) ->
[plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]),
case Problems of
[] -> ok;
- _ -> error_logger:warning_msg(
+ _ -> rabbit_log:warning(
"Problem reading some plugins: ~p~n", [Problems])
end,
ensure_dependencies(Plugins).
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 3558cf98df..cc88765f12 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -34,7 +34,8 @@ register() ->
{policy_validator, <<"dead-letter-routing-key">>},
{policy_validator, <<"message-ttl">>},
{policy_validator, <<"expires">>},
- {policy_validator, <<"max-length">>}]],
+ {policy_validator, <<"max-length">>},
+ {policy_validator, <<"max-length-bytes">>}]],
ok.
validate_policy(Terms) ->
@@ -76,6 +77,10 @@ validate_policy0(<<"max-length">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"max-length">>, Value) ->
- {error, "~p is not a valid maximum length", [Value]}.
-
+ {error, "~p is not a valid maximum length", [Value]};
+validate_policy0(<<"max-length-bytes">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-length-bytes">>, Value) ->
+ {error, "~p is not a valid maximum length in bytes", [Value]}.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 56c19d3f51..0f57286656 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,7 +21,7 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
--export([add_queue_ttl/0, avoid_zeroes/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -140,8 +140,11 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
-%% 16 bytes for md5sum + 8 for expiry
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)).
+-define(SIZE_BYTES, 4).
+-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry + 4 for size
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
%% + 2 for seq, bits and prefix
-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
@@ -168,8 +171,9 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, local, []}).
--rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
-ifdef(use_specs).
@@ -199,7 +203,8 @@
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
contains_predicate(), on_sync_fun()) ->
- {'undefined' | non_neg_integer(), qistate()}).
+ {'undefined' | non_neg_integer(),
+ 'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
@@ -415,7 +420,7 @@ init_clean(RecoveredCounts, State) ->
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
%% wrong thing to return
- {undefined, State1 # qistate { segments = Segments1 }}.
+ {undefined, undefined, State1 # qistate { segments = Segments1 }}.
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
@@ -424,7 +429,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
recover_journal(State),
- {Segments1, Count, DirtyCount} =
+ {Segments1, Count, Bytes, DirtyCount} =
%% Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
@@ -433,16 +438,18 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% dirty count here, so we can call maybe_flush_journal below
%% and avoid unnecessary file system operations.
lists:foldl(
- fun (Seg, {Segments2, CountAcc, DirtyCount}) ->
- {Segment = #segment { unacked = UnackedCount }, Dirty} =
+ fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) ->
+ {{Segment = #segment { unacked = UnackedCount }, Dirty},
+ UnackedBytes} =
recover_segment(ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
{segment_store(Segment, Segments2),
- CountAcc + UnackedCount, DirtyCount + Dirty}
- end, {Segments, 0, 0}, all_segment_nums(State1)),
+ CountAcc + UnackedCount,
+ BytesAcc + UnackedBytes, DirtyCount + Dirty}
+ end, {Segments, 0, 0, 0}, all_segment_nums(State1)),
State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
- {Count, State2}.
+ {Count, Bytes, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
segments = Segments }) ->
@@ -464,12 +471,16 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
- SegmentAndDirtyCount) ->
- recover_message(ContainsCheckFun(MsgId), CleanShutdown,
- Del, RelSeq, SegmentAndDirtyCount)
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack},
+ {SegmentAndDirtyCount, Bytes}) ->
+ {recover_message(ContainsCheckFun(MsgId), CleanShutdown,
+ Del, RelSeq, SegmentAndDirtyCount),
+ Bytes + case IsPersistent of
+ true -> MsgProps#message_properties.size;
+ false -> 0
+ end}
end,
- {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0},
+ {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
SegEntries1).
recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
@@ -549,13 +560,15 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) ->
- [MsgId, expiry_to_binary(Expiry)].
+create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
+parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS>>) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
@@ -563,7 +576,8 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
?NO_EXPIRY -> undefined;
X -> X
end,
- {MsgId, #message_properties { expiry = Exp }}.
+ {MsgId, #message_properties { expiry = Exp,
+ size = Size }}.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -1064,6 +1078,42 @@ avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
avoid_zeroes_segment(_) ->
stop.
+%% At upgrade time we just define every message's size as 0 - that
+%% will save us a load of faff with the message store, and means we
+%% can actually use the clean recovery terms in VQ. It does mean we
+%% don't count message bodies from before the migration, but we can
+%% live with that.
+store_msg_size() ->
+ foreach_queue_index({fun store_msg_size_journal/1,
+ fun store_msg_size_segment/1}).
+
+store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_journal(_) ->
+ stop.
+
+store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_size_segment(_) ->
+ stop.
+
+
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2ac24f9765..b2930f88d7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -611,7 +611,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
State1 = close_connection(terminate_channels(State)),
ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
State1;
-handle_exception(State = #v1{connection_state = tuning}, Channel, Reason) ->
+handle_exception(State, Channel, Reason) ->
%% We don't trust the client at this point - force them to wait
%% for a bit so they can't DOS us with repeated failed logins etc.
timer:sleep(?SILENT_CLOSE_DELAY * 1000),
@@ -873,7 +873,7 @@ handle_method0(MethodName, FieldsBin,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
- catch throw:{inet_error, closed} ->
+ catch throw:{inet_error, E} when E =:= closed; E =:= enotconn ->
maybe_emit_stats(State),
throw(connection_closed_abruptly);
exit:#amqp_error{method = none} = Reason ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index bbf38f58c1..9f837cce20 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -23,11 +23,14 @@
-export([start/0, stop/0, store/2, read/1, clear/0]).
--export([upgrade_recovery_terms/0, start_link/0]).
+-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([upgrade_recovery_terms/0, persistent_bytes/0]).
+
-rabbit_upgrade({upgrade_recovery_terms, local, []}).
+-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}).
%%----------------------------------------------------------------------------
@@ -58,9 +61,11 @@ read(DirBaseName) ->
end.
clear() ->
- dets:delete_all_objects(?MODULE),
+ ok = dets:delete_all_objects(?MODULE),
flush().
+start_link() -> gen_server:start_link(?MODULE, [], []).
+
%%----------------------------------------------------------------------------
upgrade_recovery_terms() ->
@@ -84,7 +89,20 @@ upgrade_recovery_terms() ->
close_table()
end.
-start_link() -> gen_server:start_link(?MODULE, [], []).
+persistent_bytes() -> dets_upgrade(fun persistent_bytes/1).
+persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}].
+
+dets_upgrade(Fun)->
+ open_table(),
+ try
+ ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) ->
+ store(DirBaseName, Fun(Terms)),
+ Acc
+ end, ok, ?MODULE),
+ ok
+ after
+ close_table()
+ end.
%%----------------------------------------------------------------------------
@@ -113,9 +131,8 @@ open_table() ->
{ram_file, true},
{auto_save, infinity}]).
-flush() -> dets:sync(?MODULE).
+flush() -> ok = dets:sync(?MODULE).
close_table() ->
ok = flush(),
ok = dets:close(?MODULE).
-
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 34a8cc5c2a..a186fb7a68 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -37,7 +37,7 @@ all_tests() ->
ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
- application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
+ application:set_env(rabbit, file_handles_high_watermark, 10),
ok = file_handle_cache:set_limit(10),
passed = test_version_equivalance(),
passed = test_file_handle_cache(),
@@ -1870,22 +1870,20 @@ test_backing_queue() ->
{ok, rabbit_variable_queue} ->
{ok, FileSizeLimit} =
application:get_env(rabbit, msg_store_file_size_limit),
- application:set_env(rabbit, msg_store_file_size_limit, 512,
- infinity),
+ application:set_env(rabbit, msg_store_file_size_limit, 512),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
- application:set_env(rabbit, queue_index_max_journal_entries, 128,
- infinity),
+ application:set_env(rabbit, queue_index_max_journal_entries, 128),
passed = test_msg_store(),
application:set_env(rabbit, msg_store_file_size_limit,
- FileSizeLimit, infinity),
+ FileSizeLimit),
passed = test_queue_index(),
passed = test_queue_index_props(),
passed = test_variable_queue(),
passed = test_variable_queue_delete_msg_store_files_callback(),
passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,
- MaxJournal, infinity),
+ MaxJournal),
%% We will have restarted the message store, and thus changed
%% the order of the children of rabbit_sup. This will cause
%% problems if there are subsequent failures - see bug 24262.
@@ -2210,13 +2208,13 @@ restart_test_queue(Qi) ->
empty_test_queue() ->
ok = rabbit_variable_queue:stop(),
{ok, _} = rabbit_variable_queue:start([]),
- {0, Qi} = init_test_queue(),
+ {0, 0, Qi} = init_test_queue(),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
with_empty_test_queue(Fun) ->
ok = empty_test_queue(),
- {0, Qi} = init_test_queue(),
+ {0, 0, Qi} = init_test_queue(),
rabbit_queue_index:delete_and_terminate(Fun(Qi)).
restart_app() ->
@@ -2235,7 +2233,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
MsgId = rabbit_guid:gen(),
QiM = rabbit_queue_index:publish(
- MsgId, SeqId, #message_properties{}, Persistent, QiN),
+ MsgId, SeqId, #message_properties{size = 10},
+ Persistent, QiN),
ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
{QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
end, {Qi, []}, SeqIds),
@@ -2257,7 +2256,7 @@ test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
MsgId = rabbit_guid:gen(),
- Props = #message_properties{expiry=12345},
+ Props = #message_properties{expiry=12345, size = 10},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
rabbit_queue_index:read(1, 2, Qi1),
@@ -2287,7 +2286,7 @@ test_queue_index() ->
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsMsgIdsA)),
%% should get length back as 0, as all the msgs were transient
- {0, Qi6} = restart_test_queue(Qi4),
+ {0, 0, Qi6} = restart_test_queue(Qi4),
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
@@ -2296,7 +2295,8 @@ test_queue_index() ->
lists:reverse(SeqIdsMsgIdsB)),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
- {LenB, Qi12} = restart_test_queue(Qi10),
+ BytesB = LenB * 10,
+ {LenB, BytesB, Qi12} = restart_test_queue(Qi10),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
@@ -2308,7 +2308,7 @@ test_queue_index() ->
{0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
%% should get length back as 0 because all persistent
%% msgs have been acked
- {0, Qi19} = restart_test_queue(Qi18),
+ {0, 0, Qi19} = restart_test_queue(Qi18),
Qi19
end),
@@ -2380,11 +2380,11 @@ test_queue_index() ->
true, Qi0),
Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1),
Qi3 = rabbit_queue_index:ack([0], Qi2),
- {5, Qi4} = restart_test_queue(Qi3),
+ {5, 50, Qi4} = restart_test_queue(Qi3),
{Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4),
Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
- {5, Qi8} = restart_test_queue(Qi7),
+ {5, 50, Qi8} = restart_test_queue(Qi7),
Qi8
end),
@@ -2419,7 +2419,8 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
false -> 1
end},
PayloadFun(N)),
- PropFun(N, #message_properties{}), false, self(), VQN)
+ PropFun(N, #message_properties{size = 10}),
+ false, self(), VQN)
end, VQ, lists:seq(Start, Start + Count - 1))).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 8ab35a89dd..72bf7855c4 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -190,7 +190,7 @@ die(Msg, Args) ->
%% We don't throw or exit here since that gets thrown
%% straight out into do_boot, generating an erl_crash.dump
%% and displaying any error message in a confusing way.
- error_logger:error_msg(Msg, Args),
+ rabbit_log:error(Msg, Args),
Str = rabbit_misc:format(
"~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
io:format(Str),
@@ -281,6 +281,4 @@ node_type_legacy() ->
false -> ram
end.
-%% NB: we cannot use rabbit_log here since it may not have been
-%% started yet
-info(Msg, Args) -> error_logger:info_msg(Msg, Args).
+info(Msg, Args) -> rabbit_log:info(Msg, Args).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 03b9956227..e97ed491a9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -254,13 +254,17 @@
durable,
transient_threshold,
- len,
- persistent_count,
+ len, %% w/o unacked
+ bytes, %% w/o unacked
+ unacked_bytes,
+ persistent_count, %% w unacked
+ persistent_bytes, %% w unacked
target_ram_count,
- ram_msg_count,
+ ram_msg_count, %% w/o unacked
ram_msg_count_prev,
ram_ack_count_prev,
+ ram_bytes, %% w unacked
out_counter,
in_counter,
rates,
@@ -343,11 +347,17 @@
transient_threshold :: non_neg_integer(),
len :: non_neg_integer(),
+ bytes :: non_neg_integer(),
+ unacked_bytes :: non_neg_integer(),
+
persistent_count :: non_neg_integer(),
+ persistent_bytes :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
+ ram_ack_count_prev :: non_neg_integer(),
+ ram_bytes :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
@@ -425,7 +435,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -440,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
- {DeltaCount, IndexState} =
+ {DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
@@ -448,7 +458,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, RecoveryTerms,
+ init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
process_recovery_terms(Terms=non_clean_shutdown) ->
@@ -461,6 +471,7 @@ process_recovery_terms(Terms) ->
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
+ persistent_bytes = PBytes,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(true, State),
@@ -470,7 +481,9 @@ terminate(_Reason, State) ->
rabbit_msg_store:client_ref(MSCStateP)
end,
ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- Terms = [{persistent_ref, PRef}, {persistent_count, PCount}],
+ Terms = [{persistent_ref, PRef},
+ {persistent_count, PCount},
+ {persistent_bytes, PBytes}],
a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
Terms, IndexState),
msg_store_clients = undefined }).
@@ -498,28 +511,35 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount }) ->
+ ram_bytes = RamBytes,
+ persistent_count = PCount,
+ persistent_bytes = PBytes }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- {LensByStore, IndexState1} = remove_queue_entries(
- fun ?QUEUE:foldl/3, Q4,
- orddict:new(), IndexState, MSCState),
- {LensByStore1, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
- purge_betas_and_deltas(LensByStore,
- State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
- {LensByStore2, IndexState3} = remove_queue_entries(
- fun ?QUEUE:foldl/3, Q1,
- LensByStore1, IndexState2, MSCState1),
- PCount1 = PCount - find_persistent_count(LensByStore2),
+ Stats = {RamBytes, PCount, PBytes},
+ {Stats1, IndexState1} =
+ remove_queue_entries(Q4, Stats, IndexState, MSCState),
+
+ {Stats2, State1 = #vqstate { q1 = Q1,
+ index_state = IndexState2,
+ msg_store_clients = MSCState1 }} =
+
+ purge_betas_and_deltas(
+ Stats1, State #vqstate { q4 = ?QUEUE:new(),
+ index_state = IndexState1 }),
+
+ {{RamBytes3, PCount3, PBytes3}, IndexState3} =
+ remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
+
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
+ bytes = 0,
ram_msg_count = 0,
- persistent_count = PCount1 })}.
+ ram_bytes = RamBytes3,
+ persistent_count = PCount3,
+ persistent_bytes = PBytes3 })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -542,11 +562,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
InCount1 = InCount + 1,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount1,
- persistent_count = PCount1,
- unconfirmed = UC1 }),
+ State3 = upd_bytes(
+ 1, 0, MsgStatus1,
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 })),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
@@ -565,11 +587,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 },
+ State3 = upd_bytes(0, 1, MsgStatus,
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, State) -> State.
@@ -638,9 +661,8 @@ ack([SeqId], State) ->
index_on_disk = IndexOnDisk },
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
ack_out_counter = AckOutCount }} =
- remove_pending_ack(SeqId, State),
+ remove_pending_ack(true, SeqId, State),
IndexState1 = case IndexOnDisk of
true -> rabbit_queue_index:ack([SeqId], IndexState);
false -> IndexState
@@ -649,30 +671,24 @@ ack([SeqId], State) ->
true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
false -> ok
end,
- PCount1 = PCount - one_if(IsPersistent),
{[MsgId],
a(State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
ack_out_counter = AckOutCount + 1 })};
ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
ack_out_counter = AckOutCount }} =
lists:foldl(
fun (SeqId, {Acc, State2}) ->
- {MsgStatus, State3} = remove_pending_ack(SeqId, State2),
+ {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2),
{accumulate_ack(MsgStatus, Acc), State3}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
- orddict:new(), MsgIdsByStore)),
{lists:reverse(AllMsgIds),
a(State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
requeue(AckTags, #vqstate { delta = Delta,
@@ -829,6 +845,17 @@ info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
PersistentCount;
+info(message_bytes, #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes}) ->
+ Bytes + UBytes;
+info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
+ Bytes;
+info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
+ UBytes;
+info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
+ RamBytes;
+info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
+ PersistentBytes;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
@@ -865,8 +892,12 @@ is_duplicate(_Msg, State) -> {false, State}.
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
+ bytes = Bytes,
+ unacked_bytes = UnackedBytes,
persistent_count = PersistentCount,
- ram_msg_count = RamMsgCount }) ->
+ persistent_bytes = PersistentBytes,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes}) ->
E1 = ?QUEUE:is_empty(Q1),
E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
@@ -880,9 +911,14 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = LZ == (E3 and E4),
true = Len >= 0,
+ true = Bytes >= 0,
+ true = UnackedBytes >= 0,
true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
true = RamMsgCount >= 0,
true = RamMsgCount =< Len,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes + UnackedBytes,
State.
@@ -1030,15 +1066,17 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms,
+init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
- DeltaCount1 =
+ {DeltaCount1, DeltaBytes1} =
case Terms of
- non_clean_shutdown -> DeltaCount;
- _ -> proplists:get_value(persistent_count,
- Terms, DeltaCount)
+ non_clean_shutdown -> {DeltaCount, DeltaBytes};
+ _ -> {proplists:get_value(persistent_count,
+ Terms, DeltaCount),
+ proplists:get_value(persistent_bytes,
+ Terms, DeltaBytes)}
end,
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
@@ -1063,11 +1101,15 @@ init(IsDurable, IndexState, DeltaCount, Terms,
len = DeltaCount1,
persistent_count = DeltaCount1,
+ bytes = DeltaBytes1,
+ persistent_bytes = DeltaBytes1,
target_ram_count = infinity,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
+ ram_bytes = 0,
+ unacked_bytes = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rates(Now),
@@ -1092,9 +1134,11 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- inc_ram_msg_count(
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) })
+ upd_ram_bytes(
+ 1, MsgStatus,
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }))
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1125,6 +1169,28 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
State#vqstate{ram_msg_count = RamMsgCount + 1}.
+upd_bytes(SignReady, SignUnacked,
+ MsgStatus = #msg_status{msg = undefined}, State) ->
+ upd_bytes0(SignReady, SignUnacked, MsgStatus, State);
+upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) ->
+ upd_ram_bytes(SignReady + SignUnacked, MsgStatus,
+ upd_bytes0(SignReady, SignUnacked, MsgStatus, State)).
+
+upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
+ State = #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes,
+ persistent_bytes = PBytes}) ->
+ S = msg_size(MsgStatus),
+ SignTotal = SignReady + SignUnacked,
+ State#vqstate{bytes = Bytes + SignReady * S,
+ unacked_bytes = UBytes + SignUnacked * S,
+ persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
+
+upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
+ State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
+
+msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
+
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -1166,58 +1232,64 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
-
+ State2 = case AckRequired of
+ false -> upd_bytes(-1, 0, MsgStatus, State1);
+ true -> upd_bytes(-1, 1, MsgStatus, State1)
+ end,
{AckTag, maybe_update_rates(
- State1 #vqstate {ram_msg_count = RamMsgCount1,
+ State2 #vqstate {ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len - 1,
persistent_count = PCount1})}.
-purge_betas_and_deltas(LensByStore,
+purge_betas_and_deltas(Stats,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {LensByStore, State};
- false -> {LensByStore1, IndexState1} =
- remove_queue_entries(fun ?QUEUE:foldl/3, Q3,
- LensByStore, IndexState, MSCState),
- purge_betas_and_deltas(LensByStore1,
+ true -> {Stats, State};
+ false -> {Stats1, IndexState1} = remove_queue_entries(
+ Q3, Stats, IndexState, MSCState),
+ purge_betas_and_deltas(Stats1,
maybe_deltas_to_betas(
State #vqstate {
q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
-remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
- {MsgIdsByStore, Delivers, Acks} =
- Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
+remove_queue_entries(Q, {RamBytes, PCount, PBytes},
+ IndexState, MSCState) ->
+ {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} =
+ ?QUEUE:foldl(fun remove_queue_entries1/2,
+ {orddict:new(), RamBytes, PBytes, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
+ {{RamBytes1,
+ PCount - case orddict:find(true, MsgIdsByStore) of
+ error -> 0;
+ {ok, Ids} -> length(Ids)
+ end,
+ PBytes1},
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId,
+ #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {MsgIdsByStore, Delivers, Acks}) ->
+ index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
+ msg_props = #message_properties { size = Size } },
+ {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
{case MsgOnDisk of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
+ RamBytes - Size * one_if(Msg =/= undefined),
+ PBytes - Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
-sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
- orddict:fold(
- fun (IsPersistent, MsgIds, LensByStore1) ->
- orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
- end, LensByStore, MsgIdsByStore).
-
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1289,8 +1361,15 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
none -> gb_trees:get(SeqId, DPA)
end.
-remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+%% First parameter = UpdatePersistentCount
+remove_pending_ack(true, SeqId, State) ->
+ {MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
+ remove_pending_ack(false, SeqId, State),
+ PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
+ {MsgStatus, upd_bytes(0, -1, MsgStatus,
+ State1 # vqstate{ persistent_count = PCount1 })};
+remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
{V, State #vqstate { ram_pending_ack = RPA1 }};
@@ -1340,12 +1419,6 @@ accumulate_ack(#msg_status { seq_id = SeqId,
end,
[MsgId | AllMsgIds]}.
-find_persistent_count(LensByStore) ->
- case orddict:find(true, LensByStore) of
- error -> 0;
- {ok, Len} -> Len
- end.
-
%%----------------------------------------------------------------------------
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
@@ -1393,9 +1466,15 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
- {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+ {MsgStatus#msg_status { msg = Msg },
+ upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1]
publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
+%% [1] We increase the ram_bytes here because we paged the message in
+%% to requeue it, not purely because we requeued it. Hence in the
+%% second head it's already accounted for as already in memory. OTOH
+%% ram_msg_count does not include unacked messages, so it needs
+%% incrementing in both heads.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
@@ -1421,7 +1500,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
+ Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2))
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1435,13 +1514,14 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
+ upd_bytes(1, -1, MsgStatus, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, State) ->
{#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
- remove_pending_ack(SeqId, State),
+ remove_pending_ack(false, SeqId, State),
{MsgStatus #msg_status {
msg_props = MsgProps #message_properties { needs_confirming = false } },
State1}.
@@ -1593,8 +1673,10 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
maybe_write_to_disk(true, false, MsgStatus, State),
DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
limit_ram_acks(Quota - 1,
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 })
+ upd_ram_bytes(
+ -1, MsgStatus1,
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
end.
permitted_beta_count(#vqstate { len = 0 }) ->
@@ -1730,9 +1812,12 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
State1 = #vqstate { ram_msg_count = RamMsgCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(MsgStatus2, Qa,
- State1 #vqstate {
- ram_msg_count = RamMsgCount - 1 }),
+ State2 = Consumer(
+ MsgStatus2, Qa,
+ upd_ram_bytes(
+ -1, MsgStatus2,
+ State1 #vqstate {
+ ram_msg_count = RamMsgCount - 1})),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
Qa, State2)
end