diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-13 15:59:16 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-13 15:59:16 +0100 |
| commit | 5af9979478c311c6c63d3bace6f219e659aa7c85 (patch) | |
| tree | f558eadfc931bb34a3bd41a8bbfa814dc8a07dfd | |
| parent | a345670b49d2f9aaaf2e862b04faaf2b9b147143 (diff) | |
| parent | 7cd5c26949fe9c60d5a609bb05f9244e74f618bd (diff) | |
| download | rabbitmq-server-git-5af9979478c311c6c63d3bace6f219e659aa7c85.tar.gz | |
Merge bug26306
28 files changed, 518 insertions, 291 deletions
@@ -127,7 +127,7 @@ plugins: # Not building plugins check-xref: - $(info xref checks are disabled) + $(info xref checks are disabled as there is no plugins-src directory) endif diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index e8b5666098..63540568f1 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -49,6 +49,14 @@ %% ============== %% + %% The default "guest" user is only permitted to access the server + %% via a loopback interface (e.g. localhost). + %% {loopback_users, [<<"guest">>]}, + %% + %% Uncomment the following line if you want to allow access to the + %% guest user from anywhere on the network. + %% {loopback_users, []}, + %% Configuring SSL. %% See http://www.rabbitmq.com/ssl.html for full documentation. %% diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 328926df18..eb3c7ef38d 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1190,6 +1190,42 @@ (queue depth).</para></listitem> </varlistentry> <varlistentry> + <term>messages_ready_ram</term> + <listitem><para>Number of messages from messages_ready which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_unacknowledged_ram</term> + <listitem><para>Number of messages from messages_unacknowledged which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_ram</term> + <listitem><para>Total number of messages which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_persistent</term> + <listitem><para>Total number of persistent messages in the queue (will always be 0 for transient queues).</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes</term> + <listitem><para>Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_ready</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages ready to be delivered to clients.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_unacknowledged</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages delivered to clients but not yet acknowledged.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_ram</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages which are in RAM.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_persistent</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages which are persistent.</para></listitem> + </varlistentry> + <varlistentry> <term>consumers</term> <listitem><para>Number of consumers.</para></listitem> </varlistentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 7a40f9ebf0..5e41ea93bf 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -87,7 +87,7 @@ -record(event, {type, props, reference = undefined, timestamp}). --record(message_properties, {expiry, needs_confirming = false}). +-record(message_properties, {expiry, needs_confirming = false, size}). -record(plugin, {name, %% atom() version, %% string() diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 324040579d..971e7241df 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -130,6 +130,9 @@ done rm -rf %{buildroot} %changelog +* Mon Aug 11 2014 simon@rabbitmq.com 3.3.5-1 +- New Upstream Release + * Tue Jun 24 2014 simon@rabbitmq.com 3.3.4-1 - New Upstream Release diff --git a/packaging/debs/Debian/changelog_comments/additional_changelog_comments_3.3.5 b/packaging/debs/Debian/changelog_comments/additional_changelog_comments_3.3.5 new file mode 100644 index 0000000000..b9a9551c6b --- /dev/null +++ b/packaging/debs/Debian/changelog_comments/additional_changelog_comments_3.3.5 @@ -0,0 +1,9 @@ +# This file contains additional comments for the debian/changelog to be +# appended within the current version's changelog entry. +# Each line will be a separate comment. Do not begin with an *, dch will +# add that. +# For comments longer than one line do not put a line break and dch will +# neatly format it. +# Shell comments are ignored. +# +Changed Uploaders from Emile Joubert to Blair Hester diff --git a/packaging/debs/Debian/changelog_comments/additional_changelog_comments_x.x.x b/packaging/debs/Debian/changelog_comments/additional_changelog_comments_x.x.x new file mode 100644 index 0000000000..bbab75965e --- /dev/null +++ b/packaging/debs/Debian/changelog_comments/additional_changelog_comments_x.x.x @@ -0,0 +1,11 @@ +# This file contains additional comments for the debian/changelog to be +# appended within the current version's changelog entry. +# Each line will be a separate comment. Do not begin with an *, dch will +# add that. +# For comments longer than one line do not put a line break and dch will +# neatly format it. +# Shell comments are ignored. +# +# Examples: +#Remove parts made of undercooked chicken +#This is a long line which is the beginning of a long two line comment which I am sure is going to be needed if the script cannot handle it diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index d26991e437..8fde9087be 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,10 @@ +rabbitmq-server (3.3.5-1) unstable; urgency=low + + * New Upstream Release + * Changed Uploaders from Emile Joubert to Blair Hester + + -- Simon MacMullen <simon@rabbitmq.com> Mon, 11 Aug 2014 12:23:31 +0100 + rabbitmq-server (3.3.4-1) unstable; urgency=low * New Upstream Release diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 9a578aa953..a3573bbd04 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -139,7 +139,7 @@ out({queue, [V], [], 1}) -> {{value, V}, {queue, [], [], 0}}; out({queue, [Y|In], [], Len}) -> [V|Out] = lists:reverse(In, []), - {{value, V}, {queue, [Y], Out}, Len - 1}; + {{value, V}, {queue, [Y], Out, Len - 1}}; out({queue, In, [V], Len}) when is_list(In) -> {{value,V}, r2f(In, Len - 1)}; out({queue, In,[V|Out], Len}) when is_list(In) -> diff --git a/src/rabbit.erl b/src/rabbit.erl index 191f04a425..4b8af870f4 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -74,13 +74,6 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). --rabbit_boot_step({rabbit_log, - [{description, "logging server"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_log]}}, - {requires, external_infrastructure}, - {enables, kernel_ready}]}). - -rabbit_boot_step({rabbit_event, [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, @@ -266,7 +259,7 @@ maybe_hipe_compile() -> warn_if_hipe_compilation_failed(true) -> ok; warn_if_hipe_compilation_failed(false) -> - error_logger:warning_msg( + rabbit_log:warning( "Not HiPE compiling: HiPE not found in this Erlang installation.~n"). %% HiPE compilation happens before we have log handlers and can take a @@ -367,7 +360,7 @@ stop() -> undefined -> ok; _ -> await_startup(true) end, - rabbit_log:info("Stopping RabbitMQ~n"), + rabbit_misc:local_info_msg("Stopping RabbitMQ~n", []), Apps = ?APPS ++ rabbit_plugins:active(), stop_apps(app_utils:app_dependency_order(Apps, true)). @@ -502,9 +495,9 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, Vsn} = application:get_key(rabbit, vsn), - error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n", - [Vsn, erlang:system_info(otp_release), - ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), + rabbit_log:info("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n", + [Vsn, erlang:system_info(otp_release), + ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), print_banner(), @@ -773,7 +766,7 @@ log_broker_started(Plugins) -> fun() -> PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P]) || P <- Plugins]), - error_logger:info_msg( + rabbit_log:info( "Server startup complete; ~b plugins started.~n~s", [length(Plugins), PluginList]), io:format(" completed with ~p plugins.~n", [length(Plugins)]) @@ -822,18 +815,18 @@ log_banner() -> {K, V} -> Format(K, V) end || S <- Settings]), - error_logger:info_msg("~s", [Banner]). + rabbit_log:info("~s", [Banner]). warn_if_kernel_config_dubious() -> case erlang:system_info(kernel_poll) of true -> ok; - false -> error_logger:warning_msg( + false -> rabbit_log:warning( "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " "and CPU utilization may worsen.~n") end, AsyncThreads = erlang:system_info(thread_pool_size), case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of - true -> error_logger:warning_msg( + true -> rabbit_log:warning( "Erlang VM is running with ~b I/O threads, " "file I/O performance may worsen~n", [AsyncThreads]); false -> ok @@ -843,9 +836,8 @@ warn_if_kernel_config_dubious() -> {ok, Val} -> Val end, case proplists:get_value(nodelay, IDCOpts, false) of - false -> error_logger:warning_msg( - "Nagle's algorithm is enabled for sockets, " - "network I/O latency will be higher~n"); + false -> rabbit_log:warning("Nagle's algorithm is enabled for sockets, " + "network I/O latency will be higher~n"); true -> ok end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4e23dbd242..a33a8fcc09 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -461,7 +461,8 @@ declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, - {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. + {<<"x-max-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 63b1865570..db297c1daa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,6 +52,7 @@ dlx, dlx_routing_key, max_length, + max_bytes, args_policy_version, status }). @@ -265,7 +266,8 @@ process_args_policy(State = #q{q = Q, {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, - {<<"max-length">>, fun res_min/2, fun init_max_length/2}], + {<<"max-length">>, fun res_min/2, fun init_max_length/2}, + {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}], drop_expired_msgs( lists:foldl(fun({Name, Resolve, Fun}, StateN) -> Fun(args_policy_lookup(Name, Resolve, Q), StateN) @@ -304,6 +306,10 @@ init_max_length(MaxLen, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}), State1. +init_max_bytes(MaxBytes, State) -> + {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), + State1. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS, consumers = Consumers} = lists:foldl(fun (F, S) -> F(S) end, State, @@ -392,10 +398,8 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ttl_timer_expiry = TExpiry}) when Expiry + 1000 < TExpiry -> - case rabbit_misc:cancel_timer(TRef) of - false -> State; - _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) - end; + rabbit_misc:cancel_timer(TRef), + ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}); ensure_ttl_timer(_Expiry, State) -> State. @@ -545,34 +549,41 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% remains unchanged, or if the newly published message %% has no expiry and becomes the head of the queue then %% the call is unnecessary. - case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + case {Dropped, QLen =:= 1, Props#message_properties.expiry} of {false, false, _} -> State4; {true, true, undefined} -> State4; {_, _, _} -> drop_expired_msgs(State4) end end. -maybe_drop_head(State = #q{max_length = undefined}) -> - {0, State}; -maybe_drop_head(State = #q{max_length = MaxLen, - backing_queue = BQ, - backing_queue_state = BQS}) -> - case BQ:len(BQS) - MaxLen of - Excess when Excess > 0 -> - {Excess, - with_dlx( - State#q.dlx, - fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, - fun () -> - {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> - BQ:drop(false, BQS0) - end, {ok, BQS}, - lists:seq(1, Excess)), - State#q{backing_queue_state = BQS1} - end)}; - _ -> {0, State} +maybe_drop_head(State = #q{max_length = undefined, + max_bytes = undefined}) -> + {false, State}; +maybe_drop_head(State) -> + maybe_drop_head(false, State). + +maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case over_max_length(State) of + true -> + maybe_drop_head(true, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msg(X, State) end, + fun () -> + {_, BQS1} = BQ:drop(false, BQS), + State#q{backing_queue_state = BQS1} + end)); + false -> + {AlreadyDropped, State} end. +over_max_length(#q{max_length = MaxLen, + max_bytes = MaxBytes, + backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes_ready, BQS) > MaxBytes. + requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> WasEmpty = BQ:is_empty(BQS), @@ -666,9 +677,12 @@ subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) -> run_message_queue(true, Fun(State1)) end. -message_properties(Message, Confirm, #q{ttl = TTL}) -> +message_properties(Message = #basic_message{content = Content}, + Confirm, #q{ttl = TTL}) -> + #content{payload_fragments_rev = PFR} = Content, #message_properties{expiry = calculate_msg_expiry(Message, TTL), - needs_confirming = Confirm == eventually}. + needs_confirming = Confirm == eventually, + size = iolist_size(PFR)}. calculate_msg_expiry(#basic_message{content = Content}, TTL) -> #content{properties = Props} = @@ -725,15 +739,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. -dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> +dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) -> {ok, State1} = dead_letter_msgs( fun (DLFun, Acc, BQS) -> - lists:foldl(fun (_, {ok, Acc0, BQS0}) -> - {{Msg, _, AckTag}, BQS1} = - BQ:fetch(true, BQS0), - {ok, DLFun(Msg, AckTag, Acc0), BQS1} - end, {ok, Acc, BQS}, lists:seq(1, Excess)) + {{Msg, _, AckTag}, BQS1} = BQ:fetch(true, BQS), + {ok, DLFun(Msg, AckTag, Acc), BQS1} end, maxlen, X, State), State1. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 9e5f081353..098f5f4342 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -20,7 +20,9 @@ -define(INFO_KEYS, [messages_ram, messages_ready_ram, messages_unacknowledged_ram, messages_persistent, - backing_queue_status]). + message_bytes, message_bytes_ready, + message_bytes_unacknowledged, message_bytes_ram, + message_bytes_persistent, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 49b71122a8..622b1b161f 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -116,7 +116,8 @@ qc_publish(#state{bqstate = BQ}) -> [qc_message(), #message_properties{needs_confirming = frequency([{1, true}, {20, false}]), - expiry = oneof([undefined | lists:seq(1, 10)])}, + expiry = oneof([undefined | lists:seq(1, 10)]), + size = 10}, false, self(), BQ]}. qc_publish_multiple(#state{}) -> @@ -124,7 +125,7 @@ qc_publish_multiple(#state{}) -> qc_publish_delivered(#state{bqstate = BQ}) -> {call, ?BQMOD, publish_delivered, - [qc_message(), #message_properties{}, self(), BQ]}. + [qc_message(), #message_properties{size = 10}, self(), BQ]}. qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl index 8b76ee1711..4eafada3f3 100644 --- a/src/rabbit_diagnostics.erl +++ b/src/rabbit_diagnostics.erl @@ -16,29 +16,37 @@ -module(rabbit_diagnostics). --export([maybe_stuck_processes/1]). +-define(PROCESS_INFO, + [current_stacktrace, initial_call, dictionary, message_queue_len, + links, monitors, monitored_by, heap_size]). -maybe_stuck_processes(Timeout) -> +-export([maybe_stuck/0, maybe_stuck/1]). + +maybe_stuck() -> maybe_stuck(5000). + +maybe_stuck(Timeout) -> Pids = processes(), io:format("There are ~p processes.~n", [length(Pids)]), - maybe_stuck_processes(Pids, Timeout). + maybe_stuck(Pids, Timeout). -maybe_stuck_processes(Pids, Timeout) when Timeout =< 0 -> +maybe_stuck(Pids, Timeout) when Timeout =< 0 -> io:format("Found ~p suspicious processes.~n", [length(Pids)]), - [io:format("~p~n", [{Pid, erlang:process_info(Pid), - erlang:process_info(Pid, current_stacktrace)}]) - || Pid <- Pids], + [io:format("~p~n", [info(Pid)]) || Pid <- Pids], ok; -maybe_stuck_processes(Pids, Timeout) -> +maybe_stuck(Pids, Timeout) -> Pids2 = [P || P <- Pids, looks_stuck(P)], io:format("Investigated ~p processes this round, ~pms to go.~n", [length(Pids2), Timeout]), - timer:sleep(100), - maybe_stuck_processes(Pids2, Timeout - 100). + timer:sleep(500), + maybe_stuck(Pids2, Timeout - 500). looks_stuck(Pid) -> case process_info(Pid, status) of {status, waiting} -> + %% It's tempting to just check for message_queue_len > 0 + %% here rather than mess around with stack traces and + %% heuristics. But really, sometimes freshly stuck + %% processes can have 0 messages... case erlang:process_info(Pid, current_stacktrace) of {current_stacktrace, [H|_]} -> maybe_stuck_stacktrace(H); @@ -66,3 +74,6 @@ maybe_stuck_stacktrace({_M, F, _A}) -> 0 -> true; _ -> false end. + +info(Pid) -> + [{pid, Pid} | process_info(Pid, ?PROCESS_INFO)]. diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index f4df0e76d5..2ca5260c12 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -16,17 +16,8 @@ -module(rabbit_log). --behaviour(gen_server). - --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). --define(SERVER, ?MODULE). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -36,8 +27,6 @@ -type(category() :: atom()). -type(level() :: 'info' | 'warning' | 'error'). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). - -spec(log/3 :: (category(), level(), string()) -> 'ok'). -spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). @@ -51,56 +40,34 @@ -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). log(Category, Level, Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}). - -info(Fmt) -> log(default, info, Fmt). -info(Fmt, Args) -> log(default, info, Fmt, Args). -warning(Fmt) -> log(default, warning, Fmt). -warning(Fmt, Args) -> log(default, warning, Fmt, Args). -error(Fmt) -> log(default, error, Fmt). -error(Fmt, Args) -> log(default, error, Fmt, Args). - -%%-------------------------------------------------------------------- - -init([]) -> - {ok, CatLevelList} = application:get_env(log_levels), - CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList], - {ok, orddict:from_list(CatLevels)}. - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast({log, Category, Level, Fmt, Args}, CatLevels) -> - CatLevel = case orddict:find(Category, CatLevels) of - {ok, L} -> L; - error -> level(info) - end, - case level(Level) =< CatLevel of + case level(Level) =< catlevel(Category) of false -> ok; true -> (case Level of info -> fun error_logger:info_msg/2; warning -> fun error_logger:warning_msg/2; error -> fun error_logger:error_msg/2 end)(Fmt, Args) - end, - {noreply, CatLevels}; -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. + end. -terminate(_Reason, _State) -> - ok. +info(Fmt) -> log(default, info, Fmt). +info(Fmt, Args) -> log(default, info, Fmt, Args). +warning(Fmt) -> log(default, warning, Fmt). +warning(Fmt, Args) -> log(default, warning, Fmt, Args). +error(Fmt) -> log(default, error, Fmt). +error(Fmt, Args) -> log(default, error, Fmt, Args). -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +catlevel(Category) -> + %% We can get here as part of rabbitmqctl when it is impersonating + %% a node; in which case the env will not be defined. + CatLevelList = case application:get_env(rabbit, log_levels) of + {ok, L} -> L; + undefined -> [] + end, + level(proplists:get_value(Category, CatLevelList, info)). %%-------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index cc06ae442d..6d0064abe7 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -271,8 +271,8 @@ handle_cast({sync_start, Ref, Syncer}, DD, Ref, TRef, Syncer, BQ, BQS, fun (BQN, BQSN) -> BQSN1 = update_ram_duration(BQN, BQSN), - TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, - self(), update_ram_duration), + TRefN = rabbit_misc:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), {TRefN, BQSN1} end) of denied -> noreply(State1); diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 09355f3ffc..7ff88f04c1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -92,9 +92,9 @@ :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). -type(digraph_label() :: term()). -type(graph_vertex_fun() :: - fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). + fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: - fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). + fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph:vertex()}])). -type(tref() :: {'erlang', reference()} | {timer, timer:tref()}). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) @@ -656,10 +656,10 @@ with_local_io(Fun) -> end. %% Log an info message on the local node using the standard logger. -%% Use this if rabbit isn't running and the call didn't originate on -%% the local node (e.g. rabbitmqctl calls). +%% Use this if the call didn't originate on the local node (e.g. +%% rabbitmqctl calls). local_info_msg(Format, Args) -> - with_local_io(fun () -> error_logger:info_msg(Format, Args) end). + with_local_io(fun () -> rabbit_log:info(Format, Args) end). unfold(Fun, Init) -> unfold(Fun, [], Init). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 630d9853bb..a499686f52 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -116,7 +116,7 @@ init_from_config() -> true -> disc; false -> ram end}, - error_logger:warning_msg( + rabbit_log:warning( "Converting legacy 'cluster_nodes' configuration~n ~w~n" "to~n ~w.~n~n" "Please update the configuration to the new format " @@ -127,17 +127,22 @@ init_from_config() -> {ok, Config} -> Config end, - case find_good_node(nodes_excl_me(TryNodes)) of + case TryNodes of + [] -> init_db_and_upgrade([node()], disc, false); + _ -> auto_cluster(TryNodes, NodeType) + end. + +auto_cluster(TryNodes, NodeType) -> + case find_auto_cluster_node(nodes_excl_me(TryNodes)) of {ok, Node} -> - rabbit_log:info("Node '~p' selected for clustering from " - "configuration~n", [Node]), + rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]), {ok, {_, DiscNodes, _}} = discover_cluster0(Node), init_db_and_upgrade(DiscNodes, NodeType, true), rabbit_node_monitor:notify_joined_cluster(); none -> - rabbit_log:warning("Could not find any suitable node amongst the " - "ones provided in the configuration: ~p~n", - [TryNodes]), + rabbit_log:warning( + "Could not find any node for auto-clustering from: ~p~n" + "Starting blank node...~n", [TryNodes]), init_db_and_upgrade([node()], disc, false) end. @@ -619,10 +624,10 @@ schema_ok_or_move() -> {error, Reason} -> %% NB: we cannot use rabbit_log here since it may not have been %% started yet - error_logger:warning_msg("schema integrity check failed: ~p~n" - "moving database to backup location " - "and recreating schema from scratch~n", - [Reason]), + rabbit_log:warning("schema integrity check failed: ~p~n" + "moving database to backup location " + "and recreating schema from scratch~n", + [Reason]), ok = move_db(), ok = create_schema() end. @@ -648,8 +653,8 @@ move_db() -> ok -> %% NB: we cannot use rabbit_log here since it may not have %% been started yet - error_logger:warning_msg("moved database from ~s to ~s~n", - [MnesiaDir, BackupDir]), + rabbit_log:warning("moved database from ~s to ~s~n", + [MnesiaDir, BackupDir]), ok; {error, Reason} -> throw({error, {cannot_backup_mnesia, MnesiaDir, BackupDir, Reason}}) @@ -695,7 +700,7 @@ leave_cluster(Node) -> end. wait_for(Condition) -> - error_logger:info_msg("Waiting for ~p...~n", [Condition]), + rabbit_log:info("Waiting for ~p...~n", [Condition]), timer:sleep(1000). start_mnesia(CheckConsistency) -> @@ -788,17 +793,24 @@ is_virgin_node() -> false end. -find_good_node([]) -> +find_auto_cluster_node([]) -> none; -find_good_node([Node | Nodes]) -> +find_auto_cluster_node([Node | Nodes]) -> + Fail = fun (Fmt, Args) -> + rabbit_log:warning( + "Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]), + find_auto_cluster_node(Nodes) + end, case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _Reason} -> find_good_node(Nodes); + {badrpc, _} = Reason -> Diag = rabbit_nodes:diagnostics([Node]), + Fail("~p~n~s~n", [Reason, Diag]); %% old delegate hash check - {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes); - {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of - {error, _} -> find_good_node(Nodes); - ok -> {ok, Node} - end + {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]); + {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of + {error, _} -> Fail("versions ~p~n", + [{OTP, Rabbit}]); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 96448f3227..4e92bf394e 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -208,7 +208,7 @@ tcp_listener_addresses({Host, Port, Family0}) [{IPAddress, Port, Family} || {IPAddress, Family} <- getaddr(Host, Family0)]; tcp_listener_addresses({_Host, Port, _Family0}) -> - error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), + rabbit_log:error("invalid port ~p - not 0..65535~n", [Port]), throw({error, {invalid_port, Port}}). tcp_listener_addresses_auto(Port) -> @@ -395,7 +395,7 @@ gethostaddr(Host, Family) -> end. host_lookup_error(Host, Reason) -> - error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), + rabbit_log:error("invalid host ~p - ~p~n", [Host, Reason]), throw({error, {invalid_host, Host, Reason}}). resolve_family({_,_,_,_}, auto) -> inet; diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 9acaa1d418..04db5d5d21 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -97,7 +97,7 @@ list(PluginsDir) -> [plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]), case Problems of [] -> ok; - _ -> error_logger:warning_msg( + _ -> rabbit_log:warning( "Problem reading some plugins: ~p~n", [Problems]) end, ensure_dependencies(Plugins). diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 3558cf98df..cc88765f12 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -34,7 +34,8 @@ register() -> {policy_validator, <<"dead-letter-routing-key">>}, {policy_validator, <<"message-ttl">>}, {policy_validator, <<"expires">>}, - {policy_validator, <<"max-length">>}]], + {policy_validator, <<"max-length">>}, + {policy_validator, <<"max-length-bytes">>}]], ok. validate_policy(Terms) -> @@ -76,6 +77,10 @@ validate_policy0(<<"max-length">>, Value) when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"max-length">>, Value) -> - {error, "~p is not a valid maximum length", [Value]}. - + {error, "~p is not a valid maximum length", [Value]}; +validate_policy0(<<"max-length-bytes">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-length-bytes">>, Value) -> + {error, "~p is not a valid maximum length in bytes", [Value]}. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 56c19d3f51..0f57286656 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,7 +21,7 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -140,8 +140,11 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). -%% 16 bytes for md5sum + 8 for expiry --define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)). +-define(SIZE_BYTES, 4). +-define(SIZE_BITS, (?SIZE_BYTES * 8)). + +%% 16 bytes for md5sum + 8 for expiry + 4 for size +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). %% + 2 for seq, bits and prefix -define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). @@ -168,8 +171,9 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, local, []}). --rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). +-rabbit_upgrade({add_queue_ttl, local, []}). +-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). +-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). -ifdef(use_specs). @@ -199,7 +203,8 @@ -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), on_sync_fun()) -> - {'undefined' | non_neg_integer(), qistate()}). + {'undefined' | non_neg_integer(), + 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), @@ -415,7 +420,7 @@ init_clean(RecoveredCounts, State) -> end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the %% wrong thing to return - {undefined, State1 # qistate { segments = Segments1 }}. + {undefined, undefined, State1 # qistate { segments = Segments1 }}. init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Recover the journal completely. This will also load segments @@ -424,7 +429,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = recover_journal(State), - {Segments1, Count, DirtyCount} = + {Segments1, Count, Bytes, DirtyCount} = %% Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we @@ -433,16 +438,18 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% dirty count here, so we can call maybe_flush_journal below %% and avoid unnecessary file system operations. lists:foldl( - fun (Seg, {Segments2, CountAcc, DirtyCount}) -> - {Segment = #segment { unacked = UnackedCount }, Dirty} = + fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) -> + {{Segment = #segment { unacked = UnackedCount }, Dirty}, + UnackedBytes} = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), {segment_store(Segment, Segments2), - CountAcc + UnackedCount, DirtyCount + Dirty} - end, {Segments, 0, 0}, all_segment_nums(State1)), + CountAcc + UnackedCount, + BytesAcc + UnackedBytes, DirtyCount + Dirty} + end, {Segments, 0, 0, 0}, all_segment_nums(State1)), State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, dirty_count = DirtyCount }), - {Count, State2}. + {Count, Bytes, State2}. terminate(State = #qistate { journal_handle = JournalHdl, segments = Segments }) -> @@ -464,12 +471,16 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, - SegmentAndDirtyCount) -> - recover_message(ContainsCheckFun(MsgId), CleanShutdown, - Del, RelSeq, SegmentAndDirtyCount) + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack}, + {SegmentAndDirtyCount, Bytes}) -> + {recover_message(ContainsCheckFun(MsgId), CleanShutdown, + Del, RelSeq, SegmentAndDirtyCount), + Bytes + case IsPersistent of + true -> MsgProps#message_properties.size; + false -> 0 + end} end, - {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, + {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0}, SegEntries1). recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) -> @@ -549,13 +560,15 @@ scan_segments(Fun, Acc, State) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) -> - [MsgId, expiry_to_binary(Expiry)]. +create_pub_record_body(MsgId, #message_properties { expiry = Expiry, + size = Size }) -> + [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>]. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> +parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Size:?SIZE_BITS>>) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, @@ -563,7 +576,8 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> ?NO_EXPIRY -> undefined; X -> X end, - {MsgId, #message_properties { expiry = Exp }}. + {MsgId, #message_properties { expiry = Exp, + size = Size }}. %%---------------------------------------------------------------------------- %% journal manipulation @@ -1064,6 +1078,42 @@ avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS, avoid_zeroes_segment(_) -> stop. +%% At upgrade time we just define every message's size as 0 - that +%% will save us a load of faff with the message store, and means we +%% can actually use the clean recovery terms in VQ. It does mean we +%% don't count message bodies from before the migration, but we can +%% live with that. +store_msg_size() -> + foreach_queue_index({fun store_msg_size_journal/1, + fun store_msg_size_segment/1}). + +store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; +store_msg_size_journal(_) -> + stop. + +store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; +store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_size_segment(_) -> + stop. + + %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2ac24f9765..b2930f88d7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -611,7 +611,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol}, State1 = close_connection(terminate_channels(State)), ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), State1; -handle_exception(State = #v1{connection_state = tuning}, Channel, Reason) -> +handle_exception(State, Channel, Reason) -> %% We don't trust the client at this point - force them to wait %% for a bit so they can't DOS us with repeated failed logins etc. timer:sleep(?SILENT_CLOSE_DELAY * 1000), @@ -873,7 +873,7 @@ handle_method0(MethodName, FieldsBin, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) - catch throw:{inet_error, closed} -> + catch throw:{inet_error, E} when E =:= closed; E =:= enotconn -> maybe_emit_stats(State), throw(connection_closed_abruptly); exit:#amqp_error{method = none} = Reason -> diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index bbf38f58c1..9f837cce20 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -23,11 +23,14 @@ -export([start/0, stop/0, store/2, read/1, clear/0]). --export([upgrade_recovery_terms/0, start_link/0]). +-export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([upgrade_recovery_terms/0, persistent_bytes/0]). + -rabbit_upgrade({upgrade_recovery_terms, local, []}). +-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}). %%---------------------------------------------------------------------------- @@ -58,9 +61,11 @@ read(DirBaseName) -> end. clear() -> - dets:delete_all_objects(?MODULE), + ok = dets:delete_all_objects(?MODULE), flush(). +start_link() -> gen_server:start_link(?MODULE, [], []). + %%---------------------------------------------------------------------------- upgrade_recovery_terms() -> @@ -84,7 +89,20 @@ upgrade_recovery_terms() -> close_table() end. -start_link() -> gen_server:start_link(?MODULE, [], []). +persistent_bytes() -> dets_upgrade(fun persistent_bytes/1). +persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}]. + +dets_upgrade(Fun)-> + open_table(), + try + ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) -> + store(DirBaseName, Fun(Terms)), + Acc + end, ok, ?MODULE), + ok + after + close_table() + end. %%---------------------------------------------------------------------------- @@ -113,9 +131,8 @@ open_table() -> {ram_file, true}, {auto_save, infinity}]). -flush() -> dets:sync(?MODULE). +flush() -> ok = dets:sync(?MODULE). close_table() -> ok = flush(), ok = dets:close(?MODULE). - diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 34a8cc5c2a..a186fb7a68 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -37,7 +37,7 @@ all_tests() -> ok = supervisor2_tests:test_all(), passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), - application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + application:set_env(rabbit, file_handles_high_watermark, 10), ok = file_handle_cache:set_limit(10), passed = test_version_equivalance(), passed = test_file_handle_cache(), @@ -1870,22 +1870,20 @@ test_backing_queue() -> {ok, rabbit_variable_queue} -> {ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit), - application:set_env(rabbit, msg_store_file_size_limit, 512, - infinity), + application:set_env(rabbit, msg_store_file_size_limit, 512), {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), - application:set_env(rabbit, queue_index_max_journal_entries, 128, - infinity), + application:set_env(rabbit, queue_index_max_journal_entries, 128), passed = test_msg_store(), application:set_env(rabbit, msg_store_file_size_limit, - FileSizeLimit, infinity), + FileSizeLimit), passed = test_queue_index(), passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_variable_queue_delete_msg_store_files_callback(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, - MaxJournal, infinity), + MaxJournal), %% We will have restarted the message store, and thus changed %% the order of the children of rabbit_sup. This will cause %% problems if there are subsequent failures - see bug 24262. @@ -2210,13 +2208,13 @@ restart_test_queue(Qi) -> empty_test_queue() -> ok = rabbit_variable_queue:stop(), {ok, _} = rabbit_variable_queue:start([]), - {0, Qi} = init_test_queue(), + {0, 0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. with_empty_test_queue(Fun) -> ok = empty_test_queue(), - {0, Qi} = init_test_queue(), + {0, 0, Qi} = init_test_queue(), rabbit_queue_index:delete_and_terminate(Fun(Qi)). restart_app() -> @@ -2235,7 +2233,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> MsgId = rabbit_guid:gen(), QiM = rabbit_queue_index:publish( - MsgId, SeqId, #message_properties{}, Persistent, QiN), + MsgId, SeqId, #message_properties{size = 10}, + Persistent, QiN), ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} end, {Qi, []}, SeqIds), @@ -2257,7 +2256,7 @@ test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> MsgId = rabbit_guid:gen(), - Props = #message_properties{expiry=12345}, + Props = #message_properties{expiry=12345, size = 10}, Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), @@ -2287,7 +2286,7 @@ test_queue_index() -> ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsMsgIdsA)), %% should get length back as 0, as all the msgs were transient - {0, Qi6} = restart_test_queue(Qi4), + {0, 0, Qi6} = restart_test_queue(Qi4), {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), @@ -2296,7 +2295,8 @@ test_queue_index() -> lists:reverse(SeqIdsMsgIdsB)), %% should get length back as MostOfASegment LenB = length(SeqIdsB), - {LenB, Qi12} = restart_test_queue(Qi10), + BytesB = LenB * 10, + {LenB, BytesB, Qi12} = restart_test_queue(Qi10), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), @@ -2308,7 +2308,7 @@ test_queue_index() -> {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), %% should get length back as 0 because all persistent %% msgs have been acked - {0, Qi19} = restart_test_queue(Qi18), + {0, 0, Qi19} = restart_test_queue(Qi18), Qi19 end), @@ -2380,11 +2380,11 @@ test_queue_index() -> true, Qi0), Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), Qi3 = rabbit_queue_index:ack([0], Qi2), - {5, Qi4} = restart_test_queue(Qi3), + {5, 50, Qi4} = restart_test_queue(Qi3), {Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {5, Qi8} = restart_test_queue(Qi7), + {5, 50, Qi8} = restart_test_queue(Qi7), Qi8 end), @@ -2419,7 +2419,8 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> false -> 1 end}, PayloadFun(N)), - PropFun(N, #message_properties{}), false, self(), VQN) + PropFun(N, #message_properties{size = 10}), + false, self(), VQN) end, VQ, lists:seq(Start, Start + Count - 1))). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 8ab35a89dd..72bf7855c4 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -190,7 +190,7 @@ die(Msg, Args) -> %% We don't throw or exit here since that gets thrown %% straight out into do_boot, generating an erl_crash.dump %% and displaying any error message in a confusing way. - error_logger:error_msg(Msg, Args), + rabbit_log:error(Msg, Args), Str = rabbit_misc:format( "~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), io:format(Str), @@ -281,6 +281,4 @@ node_type_legacy() -> false -> ram end. -%% NB: we cannot use rabbit_log here since it may not have been -%% started yet -info(Msg, Args) -> error_logger:info_msg(Msg, Args). +info(Msg, Args) -> rabbit_log:info(Msg, Args). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 03b9956227..e97ed491a9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -254,13 +254,17 @@ durable, transient_threshold, - len, - persistent_count, + len, %% w/o unacked + bytes, %% w/o unacked + unacked_bytes, + persistent_count, %% w unacked + persistent_bytes, %% w unacked target_ram_count, - ram_msg_count, + ram_msg_count, %% w/o unacked ram_msg_count_prev, ram_ack_count_prev, + ram_bytes, %% w unacked out_counter, in_counter, rates, @@ -343,11 +347,17 @@ transient_threshold :: non_neg_integer(), len :: non_neg_integer(), + bytes :: non_neg_integer(), + unacked_bytes :: non_neg_integer(), + persistent_count :: non_neg_integer(), + persistent_bytes :: non_neg_integer(), target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), + ram_ack_count_prev :: non_neg_integer(), + ram_bytes :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), @@ -425,7 +435,7 @@ init(Queue, Recover, AsyncCallback) -> init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], + init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -440,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms, MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), - {DeltaCount, IndexState} = + {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), @@ -448,7 +458,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, RecoveryTerms, + init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). process_recovery_terms(Terms=non_clean_shutdown) -> @@ -461,6 +471,7 @@ process_recovery_terms(Terms) -> terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, + persistent_bytes = PBytes, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = purge_pending_ack(true, State), @@ -470,7 +481,9 @@ terminate(_Reason, State) -> rabbit_msg_store:client_ref(MSCStateP) end, ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, {persistent_count, PCount}], + Terms = [{persistent_ref, PRef}, + {persistent_count, PCount}, + {persistent_bytes, PBytes}], a(State1 #vqstate { index_state = rabbit_queue_index:terminate( Terms, IndexState), msg_store_clients = undefined }). @@ -498,28 +511,35 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount }) -> + ram_bytes = RamBytes, + persistent_count = PCount, + persistent_bytes = PBytes }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - {LensByStore, IndexState1} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q4, - orddict:new(), IndexState, MSCState), - {LensByStore1, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1 }} = - purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - {LensByStore2, IndexState3} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q1, - LensByStore1, IndexState2, MSCState1), - PCount1 = PCount - find_persistent_count(LensByStore2), + Stats = {RamBytes, PCount, PBytes}, + {Stats1, IndexState1} = + remove_queue_entries(Q4, Stats, IndexState, MSCState), + + {Stats2, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = + + purge_betas_and_deltas( + Stats1, State #vqstate { q4 = ?QUEUE:new(), + index_state = IndexState1 }), + + {{RamBytes3, PCount3, PBytes3}, IndexState3} = + remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), + {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, + bytes = 0, ram_msg_count = 0, - persistent_count = PCount1 })}. + ram_bytes = RamBytes3, + persistent_count = PCount3, + persistent_bytes = PBytes3 })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -542,11 +562,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, InCount1 = InCount + 1, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount1, - persistent_count = PCount1, - unconfirmed = UC1 }), + State3 = upd_bytes( + 1, 0, MsgStatus1, + inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount1, + persistent_count = PCount1, + unconfirmed = UC1 })), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, @@ -565,11 +587,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }, + State3 = upd_bytes(0, 1, MsgStatus, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. discard(_MsgId, _ChPid, State) -> State. @@ -638,9 +661,8 @@ ack([SeqId], State) -> index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = - remove_pending_ack(SeqId, State), + remove_pending_ack(true, SeqId, State), IndexState1 = case IndexOnDisk of true -> rabbit_queue_index:ack([SeqId], IndexState); false -> IndexState @@ -649,30 +671,24 @@ ack([SeqId], State) -> true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, - PCount1 = PCount - one_if(IsPersistent), {[MsgId], a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> - {MsgStatus, State3} = remove_pending_ack(SeqId, State2), + {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2), {accumulate_ack(MsgStatus, Acc), State3} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( - orddict:new(), MsgIdsByStore)), {lists:reverse(AllMsgIds), a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. requeue(AckTags, #vqstate { delta = Delta, @@ -829,6 +845,17 @@ info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> PersistentCount; +info(message_bytes, #vqstate{bytes = Bytes, + unacked_bytes = UBytes}) -> + Bytes + UBytes; +info(message_bytes_ready, #vqstate{bytes = Bytes}) -> + Bytes; +info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) -> + UBytes; +info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> + RamBytes; +info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> + PersistentBytes; info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, @@ -865,8 +892,12 @@ is_duplicate(_Msg, State) -> {false, State}. a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, + bytes = Bytes, + unacked_bytes = UnackedBytes, persistent_count = PersistentCount, - ram_msg_count = RamMsgCount }) -> + persistent_bytes = PersistentBytes, + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes}) -> E1 = ?QUEUE:is_empty(Q1), E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, @@ -880,9 +911,14 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = LZ == (E3 and E4), true = Len >= 0, + true = Bytes >= 0, + true = UnackedBytes >= 0, true = PersistentCount >= 0, + true = PersistentBytes >= 0, true = RamMsgCount >= 0, true = RamMsgCount =< Len, + true = RamBytes >= 0, + true = RamBytes =< Bytes + UnackedBytes, State. @@ -1030,15 +1066,17 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, +init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - DeltaCount1 = + {DeltaCount1, DeltaBytes1} = case Terms of - non_clean_shutdown -> DeltaCount; - _ -> proplists:get_value(persistent_count, - Terms, DeltaCount) + non_clean_shutdown -> {DeltaCount, DeltaBytes}; + _ -> {proplists:get_value(persistent_count, + Terms, DeltaCount), + proplists:get_value(persistent_bytes, + Terms, DeltaBytes)} end, Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; @@ -1063,11 +1101,15 @@ init(IsDurable, IndexState, DeltaCount, Terms, len = DeltaCount1, persistent_count = DeltaCount1, + bytes = DeltaBytes1, + persistent_bytes = DeltaBytes1, target_ram_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, ram_ack_count_prev = 0, + ram_bytes = 0, + unacked_bytes = 0, out_counter = 0, in_counter = 0, rates = blank_rates(Now), @@ -1092,9 +1134,11 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - inc_ram_msg_count( - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) }) + upd_ram_bytes( + 1, MsgStatus, + inc_ram_msg_count( + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) })) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1125,6 +1169,28 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. +upd_bytes(SignReady, SignUnacked, + MsgStatus = #msg_status{msg = undefined}, State) -> + upd_bytes0(SignReady, SignUnacked, MsgStatus, State); +upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) -> + upd_ram_bytes(SignReady + SignUnacked, MsgStatus, + upd_bytes0(SignReady, SignUnacked, MsgStatus, State)). + +upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, + State = #vqstate{bytes = Bytes, + unacked_bytes = UBytes, + persistent_bytes = PBytes}) -> + S = msg_size(MsgStatus), + SignTotal = SignReady + SignUnacked, + State#vqstate{bytes = Bytes + SignReady * S, + unacked_bytes = UBytes + SignUnacked * S, + persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. + +upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> + State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. + +msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. + remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, @@ -1166,58 +1232,64 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - + State2 = case AckRequired of + false -> upd_bytes(-1, 0, MsgStatus, State1); + true -> upd_bytes(-1, 1, MsgStatus, State1) + end, {AckTag, maybe_update_rates( - State1 #vqstate {ram_msg_count = RamMsgCount1, + State2 #vqstate {ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len - 1, persistent_count = PCount1})}. -purge_betas_and_deltas(LensByStore, +purge_betas_and_deltas(Stats, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> case ?QUEUE:is_empty(Q3) of - true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = - remove_queue_entries(fun ?QUEUE:foldl/3, Q3, - LensByStore, IndexState, MSCState), - purge_betas_and_deltas(LensByStore1, + true -> {Stats, State}; + false -> {Stats1, IndexState1} = remove_queue_entries( + Q3, Stats, IndexState, MSCState), + purge_betas_and_deltas(Stats1, maybe_deltas_to_betas( State #vqstate { q3 = ?QUEUE:new(), index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> - {MsgIdsByStore, Delivers, Acks} = - Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), +remove_queue_entries(Q, {RamBytes, PCount, PBytes}, + IndexState, MSCState) -> + {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} = + ?QUEUE:foldl(fun remove_queue_entries1/2, + {orddict:new(), RamBytes, PBytes, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore), + {{RamBytes1, + PCount - case orddict:find(true, MsgIdsByStore) of + error -> 0; + {ok, Ids} -> length(Ids) + end, + PBytes1}, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, + #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {MsgIdsByStore, Delivers, Acks}) -> + index_on_disk = IndexOnDisk, is_persistent = IsPersistent, + msg_props = #message_properties { size = Size } }, + {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> {case MsgOnDisk of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, + RamBytes - Size * one_if(Msg =/= undefined), + PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. -sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> - orddict:fold( - fun (IsPersistent, MsgIds, LensByStore1) -> - orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1) - end, LensByStore, MsgIdsByStore). - %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1289,8 +1361,15 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, none -> gb_trees:get(SeqId, DPA) end. -remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +%% First parameter = UpdatePersistentCount +remove_pending_ack(true, SeqId, State) -> + {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = + remove_pending_ack(false, SeqId, State), + PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), + {MsgStatus, upd_bytes(0, -1, MsgStatus, + State1 # vqstate{ persistent_count = PCount1 })}; +remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; @@ -1340,12 +1419,6 @@ accumulate_ack(#msg_status { seq_id = SeqId, end, [MsgId | AllMsgIds]}. -find_persistent_count(LensByStore) -> - case orddict:find(true, LensByStore) of - error -> 0; - {ok, Len} -> Len - end. - %%---------------------------------------------------------------------------- %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- @@ -1393,9 +1466,15 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), - {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)}; + {MsgStatus#msg_status { msg = Msg }, + upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1] publish_alpha(MsgStatus, State) -> {MsgStatus, inc_ram_msg_count(State)}. +%% [1] We increase the ram_bytes here because we paged the message in +%% to requeue it, not purely because we requeued it. Hence in the +%% second head it's already accounted for as already in memory. OTOH +%% ram_msg_count does not include unacked messages, so it needs +%% incrementing in both heads. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), @@ -1421,7 +1500,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) + Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2)) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1435,13 +1514,14 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], + upd_bytes(1, -1, MsgStatus, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(SeqId, State), + remove_pending_ack(false, SeqId, State), {MsgStatus #msg_status { msg_props = MsgProps #message_properties { needs_confirming = false } }, State1}. @@ -1593,8 +1673,10 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, maybe_write_to_disk(true, false, MsgStatus, State), DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA), limit_ram_acks(Quota - 1, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 }) + upd_ram_bytes( + -1, MsgStatus1, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1730,9 +1812,12 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> State1 = #vqstate { ram_msg_count = RamMsgCount }} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer(MsgStatus2, Qa, - State1 #vqstate { - ram_msg_count = RamMsgCount - 1 }), + State2 = Consumer( + MsgStatus2, Qa, + upd_ram_bytes( + -1, MsgStatus2, + State1 #vqstate { + ram_msg_count = RamMsgCount - 1})), push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, State2) end |
