summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-07-28 21:37:42 +0300
committerGitHub <noreply@github.com>2017-07-28 21:37:42 +0300
commite84ba4ca9329243879f51bf147f079519171c81f (patch)
tree566e8a702fe706a0e4cfaa67f5ed7b2adc932d83
parent118666d7caba0e494ec3f8144c0a35e12130a9a1 (diff)
parent77101e7fadaac0bf12b862c6c6cf48e08eb97e4b (diff)
downloadrabbitmq-server-git-e84ba4ca9329243879f51bf147f079519171c81f.tar.gz
Merge branch 'master' into fix-travis-ci-build
-rw-r--r--.travis.yml8
-rw-r--r--Makefile10
-rw-r--r--docs/rabbitmq.conf.example12
-rw-r--r--docs/rabbitmq.config.example12
-rw-r--r--priv/schema/rabbit.schema6
-rw-r--r--rabbitmq-components.mk2
-rwxr-xr-xscripts/rabbitmq-env2
-rw-r--r--scripts/rabbitmq-env.bat4
-rw-r--r--src/gm.erl34
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_amqqueue.erl35
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl33
-rw-r--r--src/rabbit_channel.erl607
-rw-r--r--src/rabbit_connection_tracking.erl12
-rw-r--r--src/rabbit_connection_tracking_handler.erl9
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--src/rabbit_direct.erl40
-rw-r--r--src/rabbit_looking_glass.erl5
-rw-r--r--src/rabbit_mirror_queue_sync.erl2
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_parameter_validation.erl4
-rw-r--r--src/rabbit_reader.erl23
-rw-r--r--src/rabbit_recovery_terms.erl78
-rw-r--r--src/rabbit_variable_queue.erl31
-rw-r--r--src/rabbit_vhost.erl45
-rw-r--r--src/rabbit_vhost_limit.erl47
-rw-r--r--src/rabbit_vhost_msg_store.erl36
-rw-r--r--src/rabbit_vhost_process.erl99
-rw-r--r--src/rabbit_vhost_sup.erl11
-rw-r--r--src/rabbit_vhost_sup_sup.erl173
-rw-r--r--src/rabbit_vhost_sup_watcher.erl66
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl40
-rw-r--r--src/rabbit_vm.erl34
-rw-r--r--test/channel_operation_timeout_test_queue.erl2275
-rw-r--r--test/per_vhost_connection_limit_SUITE.erl57
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl120
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl46
-rw-r--r--test/vhost_SUITE.erl376
39 files changed, 1578 insertions, 2835 deletions
diff --git a/.travis.yml b/.travis.yml
index 72810e630a..d9de14a7a4 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -10,6 +10,14 @@ addons:
- unixodbc-dev
- libwxgtk2.8-dev
+otp_release:
+ - "19.2"
+ - "19.3"
+ - "20.0"
+
+services:
+ - docker
+
env:
- OTP_TAG_NAME=OTP-20.0 TEST_SUITE=0
- OTP_TAG_NAME=OTP-20.0 TEST_SUITE=1
diff --git a/Makefile b/Makefile
index 61c8fa4b53..9aeaf80c72 100644
--- a/Makefile
+++ b/Makefile
@@ -112,17 +112,19 @@ define PROJECT_ENV
{passphrase, undefined}
]},
- %% rabbitmq-server-973
+ %% rabbitmq-server#973
{queue_explicit_gc_run_operation_threshold, 1000},
{lazy_queue_explicit_gc_run_operation_threshold, 1000},
{background_gc_enabled, false},
{background_gc_target_interval, 60000},
- %% rabbitmq-server-589
+ %% rabbitmq-server#589
{proxy_protocol, false},
{disk_monitor_failure_retries, 10},
{disk_monitor_failure_retry_interval, 120000},
- %% either "stop_node" or "ignore"
- {vhost_restart_strategy, stop_node}
+ %% either "stop_node" or "continue".
+ %% by default we choose to not terminate the entire node if one
+ %% vhost had to shut down, see server#1158 and server#1280
+ {vhost_restart_strategy, continue}
]
endef
diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example
index e4c2fff92a..68ce59326e 100644
--- a/docs/rabbitmq.conf.example
+++ b/docs/rabbitmq.conf.example
@@ -252,8 +252,18 @@
## Fraction of the high watermark limit at which queues start to
## page message out to disc in order to free up memory.
+## For example, when vm_memory_high_watermark is set to 0.4 and this value is set to 0.5,
+## paging can begin as early as when 20% of total available RAM is used by the node.
##
-## Values greater than 0.9 can be dangerous and should be used carefully.
+## Values greater than 1.0 can be dangerous and should be used carefully.
+##
+## One alternative to this is to use durable queues and publish messages
+## as persistent (delivery mode = 2). With this combination queues will
+## move messages to disk much more rapidly.
+##
+## Another alternative is to configure queues to page all messages (both
+## persistent and transient) to disk as quickly
+## as possible, see http://www.rabbitmq.com/lazy-queues.html.
##
# vm_memory_high_watermark_paging_ratio = 0.5
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 4135c5053c..b7f08afc78 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -236,8 +236,18 @@
%% Fraction of the high watermark limit at which queues start to
%% page message out to disc in order to free up memory.
+ %% For example, when vm_memory_high_watermark is set to 0.4 and this value is set to 0.5,
+ %% paging can begin as early as when 20% of total available RAM is used by the node.
%%
- %% Values greater than 0.9 can be dangerous and should be used carefully.
+ %% Values greater than 1.0 can be dangerous and should be used carefully.
+ %%
+ %% One alternative to this is to use durable queues and publish messages
+ %% as persistent (delivery mode = 2). With this combination queues will
+ %% move messages to disk much more rapidly.
+ %%
+ %% Another alternative is to configure queues to page all messages (both
+ %% persistent and transient) to disk as quickly
+ %% as possible, see http://www.rabbitmq.com/lazy-queues.html.
%%
%% {vm_memory_high_watermark_paging_ratio, 0.5},
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index c503548187..70351f116c 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -976,11 +976,11 @@ end}.
{mapping, "proxy_protocol", "rabbit.proxy_protocol",
[{datatype, {enum, [true, false]}}]}.
-%% Whether to stop the rabbit application if VHost data
-%% cannot be recovered.
+%% Whether to stop the rabbit application if a vhost has
+%% to terminate for any reason.
{mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy",
- [{datatype, {enum, [stop_node, ignore]}}]}.
+ [{datatype, {enum, [stop_node, continue, transient, persistent]}}]}.
% ==========================
% Lager section
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index 63bc1c52bd..f15c9e504e 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -107,7 +107,7 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre
# all projects use the same versions. It avoids conflicts and makes it
# possible to work with rabbitmq-public-umbrella.
-dep_cowboy_commit = 1.1.0
+dep_cowboy_commit = 1.1.2
dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2
dep_ranch_commit = 1.3.2
dep_sockjs = git https://github.com/rabbitmq/sockjs-erlang.git 405990ea62353d98d36dbf5e1e64942d9b0a1daf
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index d74a3b173a..1549a05524 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -80,7 +80,7 @@ ESCRIPT_DIR="${RABBITMQ_HOME}/escript"
DEFAULT_SCHEDULER_BIND_TYPE="db"
[ "x" = "x$RABBITMQ_SCHEDULER_BIND_TYPE" ] && RABBITMQ_SCHEDULER_BIND_TYPE=${DEFAULT_SCHEDULER_BIND_TYPE}
-DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000
[ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE}
## Common defaults
diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat
index e3fddbb3eb..9bf6f1cdc4 100644
--- a/scripts/rabbitmq-env.bat
+++ b/scripts/rabbitmq-env.bat
@@ -38,10 +38,10 @@ if "!RABBITMQ_SCHEDULER_BIND_TYPE!"=="" (
set RABBITMQ_SCHEDULER_BIND_TYPE=!DEFAULT_SCHEDULER_BIND_TYPE!
)
-REM DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+REM DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000
REM set the VM distribution buffer size
REM [ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE}
-set DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+set DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000
if "!RABBITMQ_DISTRIBUTION_BUFFER_SIZE!"=="" (
set RABBITMQ_DISTRIBUTION_BUFFER_SIZE=!DEFAULT_DISTRIBUTION_BUFFER_SIZE!
)
diff --git a/src/gm.erl b/src/gm.erl
index f67050affb..0da190a57d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -395,9 +395,8 @@
-define(GROUP_TABLE, gm_group).
-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
+-define(FORCE_GC_TIMER, 250).
-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -416,6 +415,7 @@
broadcast_buffer,
broadcast_buffer_sz,
broadcast_timer,
+ force_gc_timer,
txn_executor,
shutting_down
}).
@@ -508,7 +508,8 @@ table_definitions() ->
[{Name, [?TABLE_MATCH | Attributes]}].
start_link(GroupName, Module, Args, TxnFun) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
+ [{spawn_opt, [{fullsweep_after, 0}]}]).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -551,9 +552,9 @@ init([GroupName, Module, Args, TxnFun]) ->
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
+ force_gc_timer = undefined,
txn_executor = TxnFun,
- shutting_down = false }, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ shutting_down = false }}.
handle_call({confirmed_broadcast, _Msg}, _From,
@@ -708,6 +709,10 @@ handle_cast(leave, State) ->
{stop, normal, State}.
+handle_info(force_gc, State) ->
+ garbage_collect(),
+ noreply(State #state { force_gc_timer = undefined });
+
handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
@@ -883,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
+ {noreply, ensure_timers(State), flush_timeout(State)}.
reply(Reply, State) ->
- {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
+ {reply, Reply, ensure_timers(State), flush_timeout(State)}.
+
+ensure_timers(State) ->
+ ensure_force_gc_timer(ensure_broadcast_timer(State)).
-flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
+flush_timeout(#state{broadcast_buffer = []}) -> infinity;
flush_timeout(_) -> 0.
+ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
+ when is_reference(TRef) ->
+ State;
+ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
+ TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
+ State #state { force_gc_timer = TRef }.
+
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
State;
@@ -958,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self,
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [],
- broadcast_buffer_sz = 0}.
-
+ broadcast_buffer_sz = 0 }.
%% ---------------------------------------------------------------------------
%% View construction and inspection
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 254dc02d73..b166e079f4 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -705,7 +705,8 @@ status() ->
{erlang_version, erlang:system_info(system_version)},
{memory, rabbit_vm:memory()},
{alarms, alarms()},
- {listeners, listeners()}],
+ {listeners, listeners()},
+ {vm_memory_calculation_strategy, vm_memory_monitor:get_memory_calculation_strategy()}],
S2 = rabbit_misc:filter_exit_map(
fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
[{vm_memory_high_watermark, {vm_memory_monitor,
@@ -802,6 +803,16 @@ start(normal, []) ->
warn_if_disc_io_options_dubious(),
rabbit_boot_steps:run_boot_steps(),
{ok, SupPid};
+ {error, {erlang_version_too_old,
+ {found, OTPRel, ERTSVer},
+ {required, ?OTP_MINIMUM, ?ERTS_MINIMUM}}} ->
+ Msg = "This RabbitMQ version cannot run on Erlang ~s (erts ~s): "
+ "minimum required version is ~s (erts ~s)",
+ Args = [OTPRel, ERTSVer, ?OTP_MINIMUM, ?ERTS_MINIMUM],
+ rabbit_log:error(Msg, Args),
+ %% also print to stderr to make this more visible
+ io:format(standard_error, "Error: " ++ Msg ++ "~n", Args),
+ {error, {erlang_version_too_old, rabbit_misc:format("Erlang ~s or later is required, started on ~s", [?OTP_MINIMUM, OTPRel])}};
Error ->
Error
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4eead35c1d..ff57593374 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
--export([list_down/1, count/1, list_names/0]).
+-export([list_down/1, count/1, list_names/0, list_local_names/0]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/11, basic_cancel/5, notify_decorators/1]).
@@ -234,8 +234,13 @@ recover(VHost) ->
%% for further processing in recover_durable_queues.
{ok, OrderedRecoveryTerms} =
BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]),
- {ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost),
- recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)).
+ case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
+ {ok, _} ->
+ recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
+ {error, Reason} ->
+ rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
+ throw({error, Reason})
+ end.
stop(VHost) ->
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
@@ -587,7 +592,13 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
-list(VHostPath) -> list(VHostPath, rabbit_queue).
+list_local_names() ->
+ [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
+ State =/= crashed,
+ node() =:= node(QPid) ].
+
+list(VHostPath) ->
+ list(VHostPath, rabbit_queue).
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
@@ -601,12 +612,16 @@ list(VHostPath, TableName) ->
end).
list_down(VHostPath) ->
- Present = list(VHostPath),
- Durable = list(VHostPath, rabbit_durable_queue),
- PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]),
- sets:to_list(sets:filter(fun (#amqqueue{name = N}) ->
- not sets:is_element(N, PresentS)
- end, sets:from_list(Durable))).
+ case rabbit_vhost:exists(VHostPath) of
+ false -> [];
+ true ->
+ Present = list(VHostPath),
+ Durable = list(VHostPath, rabbit_durable_queue),
+ PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]),
+ sets:to_list(sets:filter(fun (#amqqueue{name = N}) ->
+ not sets:is_element(N, PresentS)
+ end, sets:from_list(Durable)))
+ end.
count(VHost) ->
try
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c52d329392..4e43104de2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -266,11 +266,13 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3.
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index 347dbbb48a..b5ef86255d 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -65,15 +65,30 @@ find_for_vhost(VHost, Node) ->
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- supervisor2:start_child(
- VHostSup,
- {rabbit_amqqueue_sup_sup,
- {rabbit_amqqueue_sup_sup, start_link, []},
- transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ supervisor2:start_child(
+ VHostSup,
+ {rabbit_amqqueue_sup_sup,
+ {rabbit_amqqueue_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to start a queue process supervisor for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ {error, {no_such_vhost, VHost}}
+ end.
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
- ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup). \ No newline at end of file
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
+ ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);
+ %% see start/1
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a queue process supervisor for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ ok
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 378c1e583b..00a6607dfb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -70,6 +70,9 @@
%% For testing
-export([build_topic_variable_map/3]).
+%% Mgmt HTTP API refactor
+-export([handle_method/5]).
+
-record(ch, {
%% starting | running | flow | closing
state,
@@ -741,20 +744,20 @@ clear_permission_cache() -> erase(permission_cache),
erase(topic_permission_cache),
ok.
-check_configure_permitted(Resource, #ch{user = User}) ->
+check_configure_permitted(Resource, User) ->
check_resource_access(User, Resource, configure).
-check_write_permitted(Resource, #ch{user = User}) ->
+check_write_permitted(Resource, User) ->
check_resource_access(User, Resource, write).
-check_read_permitted(Resource, #ch{user = User}) ->
+check_read_permitted(Resource, User) ->
check_resource_access(User, Resource, read).
-check_write_permitted_on_topic(Resource, Channel, RoutingKey) ->
- check_topic_authorisation(Resource, Channel, RoutingKey, write).
+check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write).
-check_read_permitted_on_topic(Resource, Channel, RoutingKey) ->
- check_topic_authorisation(Resource, Channel, RoutingKey, read).
+check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -790,12 +793,19 @@ check_internal_exchange(_) ->
ok.
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
- #ch{user = User = #user{username = Username}, conn_pid = ConnPid},
+ User = #user{username = Username},
+ ConnPid,
RoutingKey,
Permission) ->
Resource = Name#resource{kind = topic},
- Timeout = get_operation_timeout(),
- AmqpParams = rabbit_amqp_connection:amqp_params(ConnPid, Timeout),
+ Timeout = get_operation_timeout(),
+ AmqpParams = case ConnPid of
+ none ->
+ %% Called from outside the channel by mgmt API
+ [];
+ _ ->
+ rabbit_amqp_connection:amqp_params(ConnPid, Timeout)
+ end,
VariableMap = build_topic_variable_map(AmqpParams, VHost, Username),
Context = #{routing_key => RoutingKey,
variable_map => VariableMap
@@ -811,7 +821,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
end;
-check_topic_authorisation(_, _, _, _) ->
+check_topic_authorisation(_, _, _, _, _) ->
ok.
build_topic_variable_map(AmqpParams, VHost, Username) ->
@@ -844,10 +854,10 @@ check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
end.
-qbin_to_resource(QueueNameBin, State) ->
- name_to_resource(queue, QueueNameBin, State).
+qbin_to_resource(QueueNameBin, VHostPath) ->
+ name_to_resource(queue, QueueNameBin, VHostPath).
-name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
+name_to_resource(Type, NameBin, VHostPath) ->
rabbit_misc:r(VHostPath, Type, NameBin).
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
@@ -1002,15 +1012,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
- user = #user{username = Username},
+ user = #user{username = Username} = User,
conn_name = ConnName,
- delivery_flow = Flow}) ->
+ delivery_flow = Flow,
+ conn_pid = ConnPid}) ->
check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_write_permitted(ExchangeName, State),
+ check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, State, RoutingKey),
+ check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1063,9 +1074,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
limiter = Limiter,
- next_tag = DeliveryTag}) ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
+ next_tag = DeliveryTag,
+ user = User,
+ virtual_host = VHostPath}) ->
+ QueueName = qbin_to_resource(QueueNameBin, VHostPath),
+ check_read_permitted(QueueName, User),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(
@@ -1148,11 +1161,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args},
_, State = #ch{consumer_prefetch = ConsumerPrefetch,
- consumer_mapping = ConsumerMapping}) ->
+ consumer_mapping = ConsumerMapping,
+ user = User,
+ virtual_host = VHostPath}) ->
case maps:find(ConsumerTag, ConsumerMapping) of
error ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
+ QueueName = qbin_to_resource(QueueNameBin, VHostPath),
+ check_read_permitted(QueueName, User),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
@@ -1273,251 +1288,81 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue},
_, State) ->
reject(DeliveryTag, Requeue, false, State);
-handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
- passive = false,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- nowait = NoWait,
- arguments = Args},
+handle_method(#'exchange.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
- user = #user{username = Username}}) ->
- CheckedType = rabbit_exchange:check_type(TypeNameBin),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
- check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, State),
- X = case rabbit_exchange:lookup(ExchangeName) of
- {ok, FoundX} -> FoundX;
- {error, not_found} ->
- check_name('exchange', strip_cr_lf(ExchangeNameBin)),
- AeKey = <<"alternate-exchange">>,
- case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of
- undefined -> ok;
- {error, {invalid_type, Type}} ->
- precondition_failed(
- "invalid type '~s' for arg '~s' in ~s",
- [Type, AeKey, rabbit_misc:rs(ExchangeName)]);
- AName -> check_read_permitted(ExchangeName, State),
- check_write_permitted(AName, State),
- ok
- end,
- rabbit_exchange:declare(ExchangeName,
- CheckedType,
- Durable,
- AutoDelete,
- Internal,
- Args,
- Username)
- end,
- ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
- AutoDelete, Internal, Args),
- return_ok(State, NoWait, #'exchange.declare_ok'{});
-
-handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- passive = true,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath}) ->
- ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
- check_not_default_exchange(ExchangeName),
- _ = rabbit_exchange:lookup_or_die(ExchangeName),
+ user = User,
+ queue_collector_pid = CollectorPid,
+ conn_pid = ConnPid}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
-handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
- if_unused = IfUnused,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath,
- user = #user{username = Username}}) ->
- StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
- check_not_default_exchange(ExchangeName),
- check_exchange_deletion(ExchangeName),
- check_configure_permitted(ExchangeName, State),
- case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
- {error, not_found} ->
- return_ok(State, NoWait, #'exchange.delete_ok'{});
- {error, in_use} ->
- precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
- ok ->
- return_ok(State, NoWait, #'exchange.delete_ok'{})
- end;
-
-handle_method(#'exchange.bind'{destination = DestinationNameBin,
- source = SourceNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:add/3,
- strip_cr_lf(SourceNameBin), exchange, strip_cr_lf(DestinationNameBin), RoutingKey,
- Arguments, #'exchange.bind_ok'{}, NoWait, State);
-
-handle_method(#'exchange.unbind'{destination = DestinationNameBin,
- source = SourceNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:remove/3,
- strip_cr_lf(SourceNameBin), exchange, strip_cr_lf(DestinationNameBin), RoutingKey,
- Arguments, #'exchange.unbind_ok'{}, NoWait, State);
-
-%% Note that all declares to these are effectively passive. If it
-%% exists it by definition has one consumer.
-handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
- _/binary>> = QueueNameBin,
- nowait = NoWait}, _,
- State = #ch{virtual_host = VHost}) ->
- StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
- QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
- case declare_fast_reply_to(StrippedQueueNameBin) of
- exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State);
- not_found -> rabbit_misc:not_found(QueueName)
- end;
+handle_method(#'exchange.delete'{nowait = NoWait} = Method,
+ _, State = #ch{conn_pid = ConnPid,
+ virtual_host = VHostPath,
+ queue_collector_pid = CollectorPid,
+ user = User}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, NoWait, #'exchange.delete_ok'{});
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = DurableDeclare,
- exclusive = ExclusiveDeclare,
- auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args} = Declare,
+handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
queue_collector_pid = CollectorPid,
- user = #user{username = Username}}) ->
- Owner = case ExclusiveDeclare of
- true -> ConnPid;
- false -> none
- end,
- StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
- Durable = DurableDeclare andalso not ExclusiveDeclare,
- ActualNameBin = case StrippedQueueNameBin of
- <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
- "amq.gen");
- Other -> check_name('queue', Other)
- end,
- QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
- Q, Durable, AutoDelete, Args, Owner),
- maybe_stat(NoWait, Q)
- end) of
- {ok, MessageCount, ConsumerCount} ->
- return_queue_declare_ok(QueueName, NoWait, MessageCount,
- ConsumerCount, State);
- {error, not_found} ->
- %% enforce the limit for newly declared queues only
- check_vhost_queue_limit(QueueName, VHostPath),
- DlxKey = <<"x-dead-letter-exchange">>,
- case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
- undefined ->
- ok;
- {error, {invalid_type, Type}} ->
- precondition_failed(
- "invalid type '~s' for arg '~s' in ~s",
- [Type, DlxKey, rabbit_misc:rs(QueueName)]);
- DLX ->
- check_read_permitted(QueueName, State),
- check_write_permitted(DLX, State),
- ok
- end,
- case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner, Username) of
- {new, #amqqueue{pid = QPid}} ->
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as
- %% the connection shuts down.
- ok = case Owner of
- none -> ok;
- _ -> rabbit_queue_collector:register(
- CollectorPid, QPid)
- end,
- return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
- {existing, _Q} ->
- %% must have been created between the stat and the
- %% declare. Loop around again.
- handle_method(Declare, none, State);
- {absent, Q, Reason} ->
- rabbit_misc:absent(Q, Reason);
- {owner_died, _Q} ->
- %% Presumably our own days are numbered since the
- %% connection has died. Pretend the queue exists though,
- %% just so nothing fails.
- return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
- end;
- {error, {absent, Q, Reason}} ->
- rabbit_misc:absent(Q, Reason)
- end;
+ user = User}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, NoWait, #'exchange.bind_ok'{});
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = true,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid}) ->
- StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
- QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
- {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
- rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
- ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
- return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
- State);
+handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
+ _, State = #ch{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ queue_collector_pid = CollectorPid,
+ user = User}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, NoWait, #'exchange.unbind_ok'{});
-handle_method(#'queue.delete'{queue = QueueNameBin,
- if_unused = IfUnused,
- if_empty = IfEmpty,
- nowait = NoWait},
+handle_method(#'queue.declare'{nowait = NoWait} = Method,
+ _, State = #ch{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ queue_collector_pid = CollectorPid,
+ user = User}) ->
+ {ok, QueueName, MessageCount, ConsumerCount} =
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+
+handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
+ State = #ch{conn_pid = ConnPid,
+ virtual_host = VHostPath,
+ queue_collector_pid = CollectorPid,
+ user = User}) ->
+ {ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid,
+ VHostPath, User),
+ return_ok(State, NoWait,
+ #'queue.delete_ok'{message_count = PurgedMessageCount});
+
+handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
+ State = #ch{conn_pid = ConnPid,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, NoWait, #'queue.bind_ok'{});
+
+handle_method(#'queue.unbind'{} = Method, _,
+ State = #ch{conn_pid = ConnPid,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}) ->
+ handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ return_ok(State, false, #'queue.unbind_ok'{});
+
+handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
- user = #user{username = Username}}) ->
- StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
- QueueName = qbin_to_resource(StrippedQueueNameBin, State),
- check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
- rabbit_amqqueue:delete(Q, IfUnused, IfEmpty, Username)
- end,
- fun (not_found) -> {ok, 0};
- ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
- {ok, 0};
- ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
- end) of
- {error, in_use} ->
- precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
- {error, not_empty} ->
- precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
- {ok, PurgedMessageCount} ->
- return_ok(State, NoWait,
- #'queue.delete_ok'{message_count = PurgedMessageCount})
- end;
-
-handle_method(#'queue.bind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:add/3,
- strip_cr_lf(ExchangeNameBin), queue, strip_cr_lf(QueueNameBin), RoutingKey, Arguments,
- #'queue.bind_ok'{}, NoWait, State);
-
-handle_method(#'queue.unbind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
- routing_key = RoutingKey,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:remove/3,
- strip_cr_lf(ExchangeNameBin), queue, strip_cr_lf(QueueNameBin), RoutingKey, Arguments,
- #'queue.unbind_ok'{}, false, State);
-
-handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait},
- _, State = #ch{conn_pid = ConnPid}) ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:purge(Q) end),
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}) ->
+ {ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid,
+ VHostPath, User),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -1713,23 +1558,23 @@ queue_down_consumer_action(CTag, CMap) ->
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
- RoutingKey, Arguments, ReturnMethod, NoWait,
- State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- user = #user{username = Username}}) ->
- DestinationName = name_to_resource(DestinationType, DestinationNameBin, State),
- check_write_permitted(DestinationName, State),
+binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
+ RoutingKey, Arguments, VHostPath, ConnPid,
+ #user{username = Username} = User) ->
+ ExchangeNameBin = strip_cr_lf(SourceNameBin0),
+ DestinationNameBin = strip_cr_lf(DestinationNameBin0),
+ DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath),
+ check_write_permitted(DestinationName, User),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
- check_read_permitted(ExchangeName, State),
+ check_read_permitted(ExchangeName, User),
ExchangeLookup = rabbit_exchange:lookup(ExchangeName),
case ExchangeLookup of
{error, not_found} ->
%% no-op
ExchangeLookup;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, State, RoutingKey),
+ check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
ExchangeLookup
end,
case Fun(#binding{source = ExchangeName,
@@ -1757,7 +1602,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
- ok -> return_ok(State, NoWait, ReturnMethod)
+ ok ->
+ ok
end.
basic_return(#basic_message{exchange_name = ExchangeName,
@@ -2148,3 +1994,226 @@ put_operation_timeout() ->
get_operation_timeout() ->
get(channel_operation_timeout).
+
+%% Refactored and exported to allow direct calls from the HTTP API,
+%% avoiding the usage of AMQP 0-9-1 from the management.
+handle_method(#'exchange.bind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments},
+ ConnPid, _CollectorId, VHostPath, User) ->
+ binding_action(fun rabbit_binding:add/3,
+ SourceNameBin, exchange, DestinationNameBin,
+ RoutingKey, Arguments, VHostPath, ConnPid, User);
+handle_method(#'exchange.unbind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments},
+ ConnPid, _CollectorId, VHostPath, User) ->
+ binding_action(fun rabbit_binding:remove/3,
+ SourceNameBin, exchange, DestinationNameBin,
+ RoutingKey, Arguments, VHostPath, ConnPid, User);
+handle_method(#'queue.unbind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments},
+ ConnPid, _CollectorId, VHostPath, User) ->
+ binding_action(fun rabbit_binding:remove/3,
+ ExchangeNameBin, queue, QueueNameBin,
+ RoutingKey, Arguments, VHostPath, ConnPid, User);
+handle_method(#'queue.bind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments},
+ ConnPid, _CollectorId, VHostPath, User) ->
+ binding_action(fun rabbit_binding:add/3,
+ ExchangeNameBin, queue, QueueNameBin,
+ RoutingKey, Arguments, VHostPath, ConnPid, User);
+%% Note that all declares to these are effectively passive. If it
+%% exists it by definition has one consumer.
+handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
+ _/binary>> = QueueNameBin},
+ _ConnPid, _CollectorPid, VHost, _User) ->
+ StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
+ QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
+ case declare_fast_reply_to(StrippedQueueNameBin) of
+ exists -> {ok, QueueName, 0, 1};
+ not_found -> rabbit_misc:not_found(QueueName)
+ end;
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = DurableDeclare,
+ exclusive = ExclusiveDeclare,
+ auto_delete = AutoDelete,
+ nowait = NoWait,
+ arguments = Args} = Declare,
+ ConnPid, CollectorPid, VHostPath, #user{username = Username} = User) ->
+ Owner = case ExclusiveDeclare of
+ true -> ConnPid;
+ false -> none
+ end,
+ StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
+ Durable = DurableDeclare andalso not ExclusiveDeclare,
+ ActualNameBin = case StrippedQueueNameBin of
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
+ Other -> check_name('queue', Other)
+ end,
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, User),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
+ Q, Durable, AutoDelete, Args, Owner),
+ maybe_stat(NoWait, Q)
+ end) of
+ {ok, MessageCount, ConsumerCount} ->
+ {ok, QueueName, MessageCount, ConsumerCount};
+ {error, not_found} ->
+ %% enforce the limit for newly declared queues only
+ check_vhost_queue_limit(QueueName, VHostPath),
+ DlxKey = <<"x-dead-letter-exchange">>,
+ case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
+ undefined ->
+ ok;
+ {error, {invalid_type, Type}} ->
+ precondition_failed(
+ "invalid type '~s' for arg '~s' in ~s",
+ [Type, DlxKey, rabbit_misc:rs(QueueName)]);
+ DLX ->
+ check_read_permitted(QueueName, User),
+ check_write_permitted(DLX, User),
+ ok
+ end,
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner, Username) of
+ {new, #amqqueue{pid = QPid}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case {Owner, CollectorPid} of
+ {none, _} -> ok;
+ {_, none} -> ok; %% Supports call from mgmt API
+ _ -> rabbit_queue_collector:register(
+ CollectorPid, QPid)
+ end,
+ {ok, QueueName, 0, 0};
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, ConnPid, CollectorPid, VHostPath, User);
+ {absent, Q, Reason} ->
+ rabbit_misc:absent(Q, Reason);
+ {owner_died, _Q} ->
+ %% Presumably our own days are numbered since the
+ %% connection has died. Pretend the queue exists though,
+ %% just so nothing fails.
+ {ok, QueueName, 0, 0}
+ end;
+ {error, {absent, Q, Reason}} ->
+ rabbit_misc:absent(Q, Reason)
+ end;
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ nowait = NoWait,
+ passive = true},
+ ConnPid, _CollectorPid, VHostPath, _User) ->
+ StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
+ QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
+ {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
+ {ok, QueueName, MessageCount, ConsumerCount};
+handle_method(#'queue.delete'{queue = QueueNameBin,
+ if_unused = IfUnused,
+ if_empty = IfEmpty},
+ ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) ->
+ StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
+ QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
+
+ check_configure_permitted(QueueName, User),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty, Username)
+ end,
+ fun (not_found) -> {ok, 0};
+ ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username),
+ {ok, 0};
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
+ end) of
+ {error, in_use} ->
+ precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
+ {error, not_empty} ->
+ precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
+ {ok, _Count} = OK ->
+ OK
+ end;
+handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
+ if_unused = IfUnused},
+ _ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) ->
+ StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
+ check_not_default_exchange(ExchangeName),
+ check_exchange_deletion(ExchangeName),
+ check_configure_permitted(ExchangeName, User),
+ case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of
+ {error, not_found} ->
+ ok;
+ {error, in_use} ->
+ precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
+ ok ->
+ ok
+ end;
+handle_method(#'queue.purge'{queue = QueueNameBin},
+ ConnPid, _CollectorPid, VHostPath, User) ->
+ QueueName = qbin_to_resource(QueueNameBin, VHostPath),
+ check_read_permitted(QueueName, User),
+ rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ConnPid,
+ fun (Q) -> rabbit_amqqueue:purge(Q) end);
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
+ type = TypeNameBin,
+ passive = false,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args},
+ _ConnPid, _CollectorPid, VHostPath, #user{username = Username} = User) ->
+ CheckedType = rabbit_exchange:check_type(TypeNameBin),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
+ check_not_default_exchange(ExchangeName),
+ check_configure_permitted(ExchangeName, User),
+ X = case rabbit_exchange:lookup(ExchangeName) of
+ {ok, FoundX} -> FoundX;
+ {error, not_found} ->
+ check_name('exchange', strip_cr_lf(ExchangeNameBin)),
+ AeKey = <<"alternate-exchange">>,
+ case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of
+ undefined -> ok;
+ {error, {invalid_type, Type}} ->
+ precondition_failed(
+ "invalid type '~s' for arg '~s' in ~s",
+ [Type, AeKey, rabbit_misc:rs(ExchangeName)]);
+ AName -> check_read_permitted(ExchangeName, User),
+ check_write_permitted(AName, User),
+ ok
+ end,
+ rabbit_exchange:declare(ExchangeName,
+ CheckedType,
+ Durable,
+ AutoDelete,
+ Internal,
+ Args,
+ Username)
+ end,
+ ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
+ AutoDelete, Internal, Args);
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
+ passive = true},
+ _ConnPid, _CollectorPid, VHostPath, _User) ->
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
+ check_not_default_exchange(ExchangeName),
+ _ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index f8c4c6541b..30932bb1b0 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -34,7 +34,7 @@
delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
clear_tracked_connection_tables_for_this_node/0,
register_connection/1, unregister_connection/1,
- list/0, list/1, list_on_node/1, list_of_user/1,
+ list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
count_connections_in/1]).
@@ -217,6 +217,16 @@ list_on_node(Node) ->
catch exit:{aborted, {no_exists, _}} -> []
end.
+-spec list_on_node(node(), rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
+
+list_on_node(Node, VHost) ->
+ try mnesia:dirty_match_object(
+ tracked_connection_table_name_for(Node),
+ #tracked_connection{vhost = VHost, _ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
list_of_user(Username) ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index f1b844c60c..3ae17677e0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
+handle_event(#event{type = vhost_down, props = Details}, State) ->
+ VHost = pget(name, Details),
+ Node = pget(node, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
+ " because the vhost database has stopped working",
+ [VHost, Node]),
+ close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
+ rabbit_misc:format("vhost '~s' is down", [VHost])),
+ {ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
Username = pget(name, Details),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index 3141fdc301..3321f2b5de 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -70,7 +70,7 @@ gc_channels() ->
ok.
gc_queues() ->
- Queues = rabbit_amqqueue:list_names(),
+ Queues = rabbit_amqqueue:list_local_names(),
GbSet = gb_sets:from_list(Queues),
gc_entity(queue_metrics, GbSet),
gc_entity(queue_coarse_metrics, GbSet),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 4b7f06305a..26e8f4d452 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -90,16 +90,21 @@ connect(Creds, VHost, Protocol, Pid, Infos) ->
true ->
{error, not_allowed};
false ->
- case AuthFun() of
- {ok, User = #user{username = Username}} ->
- notify_auth_result(Username,
- user_authentication_success, []),
- connect1(User, VHost, Protocol, Pid, Infos);
- {refused, Username, Msg, Args} ->
- notify_auth_result(Username,
- user_authentication_failure,
- [{error, rabbit_misc:format(Msg, Args)}]),
- {error, {auth_failure, "Refused"}}
+ case is_vhost_alive(VHost, Creds, Pid) of
+ false ->
+ {error, {internal_error, vhost_is_down}};
+ true ->
+ case AuthFun() of
+ {ok, User = #user{username = Username}} ->
+ notify_auth_result(Username,
+ user_authentication_success, []),
+ connect1(User, VHost, Protocol, Pid, Infos);
+ {refused, Username, Msg, Args} ->
+ notify_auth_result(Username,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}]),
+ {error, {auth_failure, "Refused"}}
+ end
end
end;
false -> {error, broker_not_found_on_node}
@@ -140,6 +145,21 @@ maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) ->
[]
end.
+is_vhost_alive(VHost, {Username, _Password}, Pid) ->
+ PrintedUsername = case Username of
+ none -> "";
+ _ -> Username
+ end,
+ case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
+ true -> true;
+ false ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "access to vhost '~s' refused for user '~s': "
+ "vhost '~s' is down",
+ [Pid, VHost, PrintedUsername, VHost]),
+ false
+ end.
is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
PrintedUsername = case Username of
diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl
index c6c353d552..702fb41eb8 100644
--- a/src/rabbit_looking_glass.erl
+++ b/src/rabbit_looking_glass.erl
@@ -29,7 +29,10 @@ boot() ->
Input = parse_value(Value),
rabbit_log:info("Enabling Looking Glass profiler, input value: ~p", [Input]),
{ok, _} = application:ensure_all_started(looking_glass),
- lg:trace(Input, lg_file_tracer, "traces.lz4", #{mode => profile, running => true})
+ lg:trace(Input, lg_file_tracer, "traces.lz4", #{
+ mode => profile,
+ running => true,
+ send => true})
end.
parse_value(Value) ->
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 2994e8cbcf..fd64fd61b3 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_sync).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 275a9127d1..fe78075d0f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
ok;
{error, RTErr} ->
rabbit_log:error("Unable to save message store recovery terms"
- "for directory ~p~nError: ~p~n",
+ " for directory ~p~nError: ~p~n",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index d93104d02c..f28e1281c3 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -22,13 +22,13 @@ number(_Name, Term) when is_number(Term) ->
ok;
number(Name, Term) ->
- {error, "~s should be number, actually was ~p", [Name, Term]}.
+ {error, "~s should be a number, actually was ~p", [Name, Term]}.
integer(_Name, Term) when is_integer(Term) ->
ok;
integer(Name, Term) ->
- {error, "~s should be number, actually was ~p", [Name, Term]}.
+ {error, "~s should be a number, actually was ~p", [Name, Term]}.
binary(_Name, Term) when is_binary(Term) ->
ok;
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e23d382d6e..77914a00bf 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -567,7 +567,7 @@ handle_other(handshake_timeout, State) ->
throw({handshake_timeout, State#v1.callback});
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
State;
-handle_other(heartbeat_timeout,
+handle_other(heartbeat_timeout,
State = #v1{connection = #connection{timeout_sec = T}}) ->
maybe_emit_stats(State),
throw({heartbeat_timeout, T});
@@ -623,7 +623,7 @@ send_blocked(#v1{connection = #connection{protocol = Protocol,
sock = Sock}, Reason) ->
case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
{bool, true} ->
-
+
ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
Protocol);
_ ->
@@ -1164,6 +1164,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
ok = is_over_connection_limit(VHost, User),
ok = rabbit_access_control:check_vhost_access(User, VHost, Sock),
+ ok = is_vhost_alive(VHost, User),
NewConnection = Connection#connection{vhost = VHost},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -1209,6 +1210,16 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
+is_vhost_alive(VHostPath, User) ->
+ case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of
+ true -> ok;
+ false ->
+ rabbit_misc:protocol_error(internal_error,
+ "access to vhost '~s' refused for user '~s': "
+ "vhost '~s' is down",
+ [VHostPath, User#user.username, VHostPath])
+ end.
+
is_over_connection_limit(VHostPath, User) ->
try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
false -> ok;
@@ -1567,7 +1578,7 @@ maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) ->
State1 = State#v1{connection_state = blocked,
throttle = update_last_blocked_at(Throttle)},
case CS of
- running ->
+ running ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater);
_ -> ok
end,
@@ -1589,7 +1600,7 @@ maybe_send_unblocked(State = #v1{throttle = Throttle}) ->
case should_send_unblocked(Throttle) of
true ->
ok = send_unblocked(State),
- State#v1{throttle =
+ State#v1{throttle =
Throttle#throttle{connection_blocked_message_sent = false}};
false -> State
end.
@@ -1598,7 +1609,7 @@ maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) ->
case should_send_blocked(Throttle) of
true ->
ok = send_blocked(State, blocked_by_message(Throttle)),
- State#v1{throttle =
+ State#v1{throttle =
Throttle#throttle{connection_blocked_message_sent = true}};
false -> maybe_send_unblocked(State)
end.
@@ -1624,7 +1635,7 @@ control_throttle(State = #v1{connection_state = CS,
running -> maybe_block(State1);
%% unblock or re-enable blocking
blocked -> maybe_block(maybe_unblock(State1));
- _ -> State1
+ _ -> State1
end.
augment_connection_log_name(#connection{client_properties = ClientProperties,
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index be9b1b6227..73fc9c7449 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -48,20 +48,35 @@
%%----------------------------------------------------------------------------
start(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- {ok, _} = supervisor2:start_child(
- VHostSup,
- {?MODULE,
- {?MODULE, start_link, [VHost]},
- transient, ?WORKER_WAIT, worker,
- [?MODULE]}),
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ {ok, _} = supervisor2:start_child(
+ VHostSup,
+ {?MODULE,
+ {?MODULE, start_link, [VHost]},
+ transient, ?WORKER_WAIT, worker,
+ [?MODULE]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to start a recovery terms manager for vhost ~s: vhost no longer exists!",
+ [VHost])
+ end,
ok.
stop(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- case supervisor:terminate_child(VHostSup, ?MODULE) of
- ok -> supervisor:delete_child(VHostSup, ?MODULE);
- E -> E
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ case supervisor:terminate_child(VHostSup, ?MODULE) of
+ ok -> supervisor:delete_child(VHostSup, ?MODULE);
+ E -> E
+ end;
+ %% see start/1
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a recovery terms manager for vhost ~s: vhost no longer exists!",
+ [VHost]),
+
+ ok
end.
store(VHost, DirBaseName, Terms) ->
@@ -74,7 +89,14 @@ read(VHost, DirBaseName) ->
end.
clear(VHost) ->
- ok = dets:delete_all_objects(VHost),
+ try
+ dets:delete_all_objects(VHost)
+ %% see start/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to clear recovery terms for vhost ~s: table no longer exists!",
+ [VHost]),
+ ok
+ end,
flush(VHost).
start_link(VHost) ->
@@ -126,8 +148,15 @@ open_global_table() ->
ok.
close_global_table() ->
- ok = dets:sync(?MODULE),
- ok = dets:close(?MODULE).
+ try
+ dets:sync(?MODULE),
+ dets:close(?MODULE)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to clear global recovery terms: table no longer exists!",
+ []),
+ ok
+ end.
read_global(DirBaseName) ->
read(?MODULE, DirBaseName).
@@ -163,8 +192,23 @@ open_table(VHost) ->
{ram_file, true},
{auto_save, infinity}]).
-flush(VHost) -> ok = dets:sync(VHost).
+flush(VHost) ->
+ try
+ dets:sync(VHost)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to sync recovery terms table for vhost ~s: the table no longer exists!",
+ [VHost]),
+ ok
+ end.
close_table(VHost) ->
- ok = flush(VHost),
- ok = dets:close(VHost).
+ try
+ ok = flush(VHost),
+ ok = dets:close(VHost)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to close recovery terms table for vhost ~s: the table no longer exists!",
+ [VHost]),
+ ok
+ end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 40967e316e..b4945fe3d3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -496,15 +496,26 @@ stop(VHost) ->
start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
- {ok, _} = rabbit_vhost_msg_store:start(VHost,
- ?TRANSIENT_MSG_STORE,
- undefined,
- ?EMPTY_START_FUN_STATE),
- {ok, _} = rabbit_vhost_msg_store:start(VHost,
- ?PERSISTENT_MSG_STORE,
- Refs,
- StartFunState),
- rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
+ do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE),
+ do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState),
+ ok.
+
+do_start_msg_store(VHost, Type, Refs, StartFunState) ->
+ case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of
+ {ok, _} ->
+ rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]);
+ {error, {no_such_vhost, VHost}} = Err ->
+ rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n",
+ [Type, VHost]),
+ exit(Err);
+ {error, Error} ->
+ rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n",
+ [Type, VHost, Error]),
+ exit({error, Error})
+ end.
+
+abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
+abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
stop_msg_store(VHost) ->
rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
@@ -2906,7 +2917,7 @@ run_old_persistent_store(Refs, StartFunState) ->
OldStoreName.
start_new_store(VHosts) ->
- %% Ensure vhost supervisor is started, so we can add vhsots to it.
+ %% Ensure vhost supervisor is started, so we can add vhosts to it.
lists:map(fun(VHost) ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
{ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 4dc2ec86d0..30557fc7be 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -26,9 +26,10 @@
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
+-export([vhost_down/1]).
--spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
--spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
+-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
+-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
-spec exists(rabbit_types:vhost()) -> boolean().
-spec list() -> [rabbit_types:vhost()].
@@ -54,8 +55,9 @@ recover() ->
%% rabbit_vhost_sup_sup will start the actual recovery.
%% So recovery will be run every time a vhost supervisor is restarted.
ok = rabbit_vhost_sup_sup:start(),
- [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost)
- || VHost <- rabbit_vhost:list()],
+
+ [ ok = rabbit_vhost_sup_sup:init_vhost(VHost)
+ || VHost <- rabbit_vhost:list()],
ok.
recover(VHost) ->
@@ -73,7 +75,7 @@ recover(VHost) ->
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, tracing]).
+-define(INFO_KEYS, [name, tracing, state]).
add(VHostPath, ActingUser) ->
rabbit_log:info("Adding vhost '~s'~n", [VHostPath]),
@@ -104,10 +106,20 @@ add(VHostPath, ActingUser) ->
{<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
- ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath),
- rabbit_event:notify(vhost_created, info(VHostPath)
- ++ [{user_who_performed_action, ActingUser}]),
- R.
+ case rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath) of
+ ok ->
+ rabbit_event:notify(vhost_created, info(VHostPath)
+ ++ [{user_who_performed_action, ActingUser}]),
+ R;
+ {error, {no_such_vhost, VHostPath}} ->
+ Msg = rabbit_misc:format("failed to set up vhost '~s': it was concurrently deleted!",
+ [VHostPath]),
+ {error, Msg};
+ {error, Reason} ->
+ Msg = rabbit_misc:format("failed to set up vhost '~s': ~p",
+ [VHostPath, Reason]),
+ {error, Msg}
+ end.
delete(VHostPath, ActingUser) ->
%% FIXME: We are forced to delete the queues and exchanges outside
@@ -125,12 +137,21 @@ delete(VHostPath, ActingUser) ->
with(VHostPath, fun () -> internal_delete(VHostPath, ActingUser) end)),
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath},
{user_who_performed_action, ActingUser}]),
- [ok = Fun() || Fun <- Funs],
+ [case Fun() of
+ ok -> ok;
+ {error, {no_such_vhost, VHostPath}} -> ok
+ end || Fun <- Funs],
%% After vhost was deleted from mnesia DB, we try to stop vhost supervisors
%% on all the nodes.
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
ok.
+vhost_down(VHostPath) ->
+ ok = rabbit_event:notify(vhost_down,
+ [{name, VHostPath},
+ {node, node()},
+ {user_who_performed_action, ?INTERNAL_USER}]).
+
delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
@@ -240,6 +261,10 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
i(tracing, VHost) -> rabbit_trace:enabled(VHost);
+i(state, VHost) -> case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
+ true -> running;
+ false -> down
+ end;
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index 58c0ce065f..7b797e46b2 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -26,7 +26,8 @@
-export([update_limit/4, clear_limit/3, get_limit/2]).
-export([validate/5, notify/5, notify_clear/4]).
-export([connection_limit/1, queue_limit/1,
- is_over_queue_limit/1, is_over_connection_limit/1]).
+ is_over_queue_limit/1, would_exceed_queue_limit/2,
+ is_over_connection_limit/1]).
-import(rabbit_misc, [pget/2, pget/3]).
@@ -106,28 +107,40 @@ is_over_connection_limit(VirtualHost) ->
{ok, _Limit} -> false
end.
+-spec would_exceed_queue_limit(non_neg_integer(), rabbit_types:vhost()) ->
+ {true, non_neg_integer(), non_neg_integer()} | false.
--spec is_over_queue_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
-
-is_over_queue_limit(VirtualHost) ->
+would_exceed_queue_limit(AdditionalCount, VirtualHost) ->
case queue_limit(VirtualHost) of
- %% no limit configured
- undefined -> false;
- %% with limit = 0, no queues can be declared (perhaps not very
- %% useful but consistent with the connection limit)
- {ok, 0} -> {true, 0};
+ undefined ->
+ %% no limit configured
+ false;
+ {ok, 0} ->
+ %% with limit = 0, no queues can be declared (perhaps not very
+ %% useful but consistent with the connection limit)
+ {true, 0, 0};
{ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
QueueCount = rabbit_amqqueue:count(VirtualHost),
- case QueueCount >= Limit of
+ case (AdditionalCount + QueueCount) > Limit of
false -> false;
- true -> {true, Limit}
+ true -> {true, Limit, QueueCount}
end;
- %% any negative value means "no limit". Note that parameter validation
- %% will replace negative integers with 'undefined', so this is to be
- %% explicit and extra defensive
- {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false;
- %% ignore non-integer limits
- {ok, _Limit} -> false
+ {ok, Limit} when is_integer(Limit) andalso Limit < 0 ->
+ %% any negative value means "no limit". Note that parameter validation
+ %% will replace negative integers with 'undefined', so this is to be
+ %% explicit and extra defensive
+ false;
+ {ok, _Limit} ->
+ %% ignore non-integer limits
+ false
+ end.
+
+-spec is_over_queue_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
+
+is_over_queue_limit(VirtualHost) ->
+ case would_exceed_queue_limit(1, VirtualHost) of
+ {true, Limit, _QueueCount} -> {true, Limit};
+ false -> false
end.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
index 482ad082b8..3c633875bc 100644
--- a/src/rabbit_vhost_msg_store.erl
+++ b/src/rabbit_vhost_msg_store.erl
@@ -23,17 +23,33 @@
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
- supervisor2:start_child(VHostSup,
- {Type, {rabbit_msg_store, start_link,
- [Type, VHostDir, ClientRefs, StartupFunState]},
- transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ supervisor2:start_child(VHostSup,
+ {Type, {rabbit_msg_store, start_link,
+ [Type, VHostDir, ClientRefs, StartupFunState]},
+ transient, ?WORKER_WAIT, worker, [rabbit_msg_store]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} = E ->
+ rabbit_log:error("Failed to start a message store for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ E
+ end.
stop(VHost, Type) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- ok = supervisor2:terminate_child(VHostSup, Type),
- ok = supervisor2:delete_child(VHostSup, Type).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ ok = supervisor2:terminate_child(VHostSup, Type),
+ ok = supervisor2:delete_child(VHostSup, Type);
+ %% see start/4
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a message store for vhost ~s: vhost no longer exists!",
+ [VHost]),
+
+ ok
+ end.
client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) ->
with_vhost_store(VHost, Type, fun(StorePid) ->
@@ -58,4 +74,4 @@ vhost_store_pid(VHost, Type) ->
successfully_recovered_state(VHost, Type) ->
with_vhost_store(VHost, Type, fun(StorePid) ->
rabbit_msg_store:successfully_recovered_state(StorePid)
- end). \ No newline at end of file
+ end).
diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl
new file mode 100644
index 0000000000..e3c815a727
--- /dev/null
+++ b/src/rabbit_vhost_process.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+%% This module implements a vhost identity process.
+
+%% On start this process will try to recover the vhost data and
+%% processes structure (queues and message stores).
+%% If recovered successfully, the process will save it's PID
+%% to vhost process registry. If vhost process PID is in the registry and the
+%% process is alive - the vhost is considered running.
+
+%% On termination, the ptocess will notify of vhost going down.
+
+%% The process will also check periodically if the vhost still
+%% present in mnesia DB and stop the vhost supervision tree when it
+%% disappears.
+
+-module(rabbit_vhost_process).
+
+-include("rabbit.hrl").
+
+-define(TICKTIME_RATIO, 4).
+
+-behaviour(gen_server2).
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+start_link(VHost) ->
+ gen_server2:start_link(?MODULE, [VHost], []).
+
+
+init([VHost]) ->
+ process_flag(trap_exit, true),
+ rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]),
+ try
+ %% Recover the vhost data and save it to vhost registry.
+ ok = rabbit_vhost:recover(VHost),
+ rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
+ Interval = interval(),
+ timer:send_interval(Interval, check_vhost),
+ {ok, VHost}
+ catch _:Reason ->
+ rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
+ " Stacktrace ~p",
+ [VHost, Reason, erlang:get_stacktrace()]),
+ {stop, Reason}
+ end.
+
+handle_call(_,_,VHost) ->
+ {reply, ok, VHost}.
+
+handle_cast(_, VHost) ->
+ {noreply, VHost}.
+
+handle_info(check_vhost, VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ true -> {noreply, VHost};
+ false ->
+ rabbit_log:warning("Virtual host '~s' is gone. "
+ "Stopping its top level supervisor.",
+ [VHost]),
+ %% Stop vhost's top supervisor in a one-off process to avoid a deadlock:
+ %% us (a child process) waiting for supervisor shutdown and our supervisor(s)
+ %% waiting for us to shutdown.
+ spawn(
+ fun() ->
+ rabbit_vhost_sup_sup:stop_and_delete_vhost(VHost)
+ end),
+ {noreply, VHost}
+ end;
+handle_info(_, VHost) ->
+ {noreply, VHost}.
+
+terminate(shutdown, VHost) ->
+ %% Notify that vhost is stopped.
+ rabbit_vhost:vhost_down(VHost);
+terminate(_, _VHost) ->
+ ok.
+
+code_change(_OldVsn, VHost, _Extra) ->
+ {ok, VHost}.
+
+interval() ->
+ application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl
index b8c7a649e5..82899f8236 100644
--- a/src/rabbit_vhost_sup.erl
+++ b/src/rabbit_vhost_sup.erl
@@ -18,7 +18,8 @@
-include("rabbit.hrl").
-%% Supervisor is a per-vhost supervisor to contain queues and message stores
+%% Each vhost gets an instance of this supervisor that supervises
+%% message stores and queues (via rabbit_amqqueue_sup_sup).
-behaviour(supervisor2).
-export([init/1]).
-export([start_link/1]).
@@ -26,9 +27,5 @@
start_link(VHost) ->
supervisor2:start_link(?MODULE, [VHost]).
-init([VHost]) ->
- {ok, {{one_for_all, 0, 1},
- [{rabbit_vhost_sup_watcher,
- {rabbit_vhost_sup_watcher, start_link, [VHost]},
- intrinsic, ?WORKER_WAIT, worker,
- [rabbit_vhost_sup]}]}}.
+init([_VHost]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index e528d64e0e..1d5db93fda 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,18 +23,23 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
+-export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
--export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]).
+-export([start_on_all_nodes/1]).
+
+-export([save_vhost_process/2]).
+-export([is_vhost_alive/1]).
%% Internal
--export([stop_and_delete_vhost/1]).
+-export([stop_and_delete_vhost/1, start_vhost/1]).
--record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
+-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
start() ->
- case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []},
- permanent, infinity, supervisor, [?MODULE]}) of
+ case supervisor:start_child(rabbit_sup, {?MODULE,
+ {?MODULE, start_link, []},
+ permanent, infinity, supervisor,
+ [?MODULE]}) of
{ok, _} -> ok;
{error, Err} -> {error, Err}
end.
@@ -43,68 +48,43 @@ start_link() ->
supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of
- ignore -> transient;
- stop_node -> permanent;
- transient -> transient;
- permanent -> permanent
- end,
-
+ %% This assumes that a single vhost termination should not shut down nodes
+ %% unless the operator opts in.
+ RestartStrategy = vhost_restart_strategy(),
ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]),
+
{ok, {{simple_one_for_one, 0, 5},
[{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []},
- VhostRestart, infinity, supervisor,
+ RestartStrategy, ?SUPERVISOR_WAIT, supervisor,
[rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
start_on_all_nodes(VHost) ->
- [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
- ok.
+ NodesStart = [ {Node, start_vhost(VHost, Node)}
+ || Node <- rabbit_nodes:all_running() ],
+ Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart),
+ case Failures of
+ [] -> ok;
+ Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}}
+ end.
delete_on_all_nodes(VHost) ->
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
ok.
-start_vhost(VHost, Node) when Node == node(self()) ->
- start_vhost(VHost);
-start_vhost(VHost, Node) ->
- case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {badrpc, RpcErr} ->
- {error, RpcErr}
- end.
-
-start_vhost(VHost) ->
- case rabbit_vhost:exists(VHost) of
- false -> {error, {no_such_vhost, VHost}};
- true ->
- case vhost_sup_pid(VHost) of
- no_pid ->
- case supervisor2:start_child(?MODULE, [VHost]) of
- {ok, _} -> ok;
- {error, {already_started, _}} -> ok;
- Error -> throw(Error)
- end,
- {ok, _} = vhost_sup_pid(VHost);
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid}
- end
- end.
-
stop_and_delete_vhost(VHost) ->
case get_vhost_sup(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
- vhost_sup_pid = VHostSupPid} = VHostSup ->
+ vhost_sup_pid = VHostSupPid} ->
case is_process_alive(WrapperPid) of
false -> ok;
true ->
rabbit_log:info("Stopping vhost supervisor ~p"
- " for vhost ~p~n",
+ " for vhost '~s'~n",
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
- ets:delete_object(?MODULE, VHostSup),
+ ets:delete(?MODULE, VHost),
ok = rabbit_vhost:delete_storage(VHost);
Other ->
Other
@@ -126,9 +106,31 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}.
-vhost_sup(VHost, Local) when Local == node(self()) ->
- vhost_sup(VHost);
+-spec init_vhost(rabbit_types:vhost()) -> ok.
+init_vhost(VHost) ->
+ case start_vhost(VHost) of
+ {ok, _} -> ok;
+ {error, {no_such_vhost, VHost}} ->
+ {error, {no_such_vhost, VHost}};
+ {error, Reason} ->
+ case vhost_restart_strategy() of
+ permanent ->
+ rabbit_log:error(
+ "Unable to initialize vhost data store for vhost '~s'."
+ " Reason: ~p",
+ [VHost, Reason]),
+ throw({error, Reason});
+ transient ->
+ rabbit_log:warning(
+ "Unable to initialize vhost data store for vhost '~s'."
+ " The vhost will be stopped for this node. "
+ " Reason: ~p",
+ [VHost, Reason]),
+ ok
+ end
+ end.
+
+-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
vhost_sup(VHost, Node) ->
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
{ok, Pid} when is_pid(Pid) ->
@@ -137,9 +139,63 @@ vhost_sup(VHost, Node) ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
+-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
vhost_sup(VHost) ->
- start_vhost(VHost).
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ case start_vhost(VHost) of
+ {ok, Pid} ->
+ true = is_vhost_alive(VHost),
+ {ok, Pid};
+ {error, {no_such_vhost, VHost}} ->
+ {error, {no_such_vhost, VHost}};
+ Error ->
+ throw(Error)
+ end;
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end.
+
+-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
+start_vhost(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {badrpc, RpcErr} ->
+ {error, RpcErr}
+ end.
+
+-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
+start_vhost(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false -> {error, {no_such_vhost, VHost}};
+ true ->
+ case supervisor2:start_child(?MODULE, [VHost]) of
+ {ok, Pid} -> {ok, Pid};
+ {error, {already_started, Pid}} -> {ok, Pid};
+ {error, Err} -> {error, Err}
+ end
+ end.
+
+-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().
+is_vhost_alive(VHost) ->
+%% A vhost is considered alive if it's supervision tree is alive and
+%% saved in the ETS table
+ case get_vhost_sup(VHost) of
+ #vhost_sup{wrapper_pid = WrapperPid,
+ vhost_sup_pid = VHostSupPid,
+ vhost_process_pid = VHostProcessPid}
+ when is_pid(WrapperPid),
+ is_pid(VHostSupPid),
+ is_pid(VHostProcessPid) ->
+ is_process_alive(WrapperPid)
+ andalso
+ is_process_alive(VHostSupPid)
+ andalso
+ is_process_alive(VHostProcessPid);
+ _ -> false
+ end.
+
-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok.
save_vhost_sup(VHost, WrapperPid, VHostPid) ->
@@ -148,6 +204,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) ->
wrapper_pid = WrapperPid}),
ok.
+-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok.
+save_vhost_process(VHost, VHostProcessPid) ->
+ true = ets:update_element(?MODULE, VHost,
+ {#vhost_sup.vhost_process_pid, VHostProcessPid}),
+ ok.
+
-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
get_vhost_sup(VHost) ->
case ets:lookup(?MODULE, VHost) of
@@ -169,3 +231,12 @@ vhost_sup_pid(VHost) ->
end
end.
+vhost_restart_strategy() ->
+ %% This assumes that a single vhost termination should not shut down nodes
+ %% unless the operator opts in.
+ case application:get_env(rabbit, vhost_restart_strategy, continue) of
+ continue -> transient;
+ stop_node -> permanent;
+ transient -> transient;
+ permanent -> permanent
+ end.
diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl
deleted file mode 100644
index 3ce726621f..0000000000
--- a/src/rabbit_vhost_sup_watcher.erl
+++ /dev/null
@@ -1,66 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
-%%
-
-%% This module implements a watcher process which should stop
-%% the parent supervisor if its vhost is missing from the mnesia DB
-
--module(rabbit_vhost_sup_watcher).
-
--include("rabbit.hrl").
-
--define(TICKTIME_RATIO, 4).
-
--behaviour(gen_server2).
--export([start_link/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
-
-start_link(VHost) ->
- gen_server2:start_link(?MODULE, [VHost], []).
-
-
-init([VHost]) ->
- Interval = interval(),
- timer:send_interval(Interval, check_vhost),
- {ok, VHost}.
-
-handle_call(_,_,VHost) ->
- {reply, ok, VHost}.
-
-handle_cast(_, VHost) ->
- {noreply, VHost}.
-
-handle_info(check_vhost, VHost) ->
- case rabbit_vhost:exists(VHost) of
- true -> {noreply, VHost};
- false ->
- rabbit_log:error(" Vhost \"~p\" is gone."
- " Stopping message store supervisor.",
- [VHost]),
- {stop, normal, VHost}
- end;
-handle_info(_, VHost) ->
- {noreply, VHost}.
-
-terminate(_, _) -> ok.
-
-code_change(_OldVsn, VHost, _Extra) ->
- {ok, VHost}.
-
-interval() ->
- application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. \ No newline at end of file
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 3396f71fa9..8e23389bb9 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -14,6 +14,9 @@
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
%%
+%% This module is a wrapper around vhost supervisor to
+%% provide exactly once restart semantics.
+
-module(rabbit_vhost_sup_wrapper).
-include("rabbit.hrl").
@@ -24,30 +27,35 @@
-export([start_vhost_sup/1]).
start_link(VHost) ->
- supervisor2:start_link(?MODULE, [VHost]).
-
-%% This module is a wrapper around vhost supervisor to
-%% provide exactly once restart.
-
-%% rabbit_vhost_sup supervisor children are added dynamically,
-%% so one_for_all strategy cannot be used.
+ %% Using supervisor, because supervisor2 does not stop a started child when
+ %% another one fails to start. Bug?
+ supervisor:start_link(?MODULE, [VHost]).
init([VHost]) ->
- %% Two restarts in 1 hour. One per message store.
- {ok, {{one_for_all, 2, 3600000},
- [{rabbit_vhost_sup,
- {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
- permanent, infinity, supervisor,
- [rabbit_vhost_sup]}]}}.
+ %% 2 restarts in 5 minutes. One per message store.
+ {ok, {{one_for_all, 2, 300},
+ [
+ %% rabbit_vhost_sup is an empty supervisor container for
+ %% all data processes.
+ {rabbit_vhost_sup,
+ {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
+ permanent, infinity, supervisor,
+ [rabbit_vhost_sup]},
+ %% rabbit_vhost_process is a vhost identity process, which
+ %% is responsible for data recovery and vhost aliveness status.
+ %% See the module comments for more info.
+ {rabbit_vhost_process,
+ {rabbit_vhost_process, start_link, [VHost]},
+ permanent, ?WORKER_WAIT, worker,
+ [rabbit_vhost_process]}]}}.
+
start_vhost_sup(VHost) ->
case rabbit_vhost_sup:start_link(VHost) of
{ok, Pid} ->
%% Save vhost sup record with wrapper pid and vhost sup pid.
ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid),
- %% We can start recover as soon as we have vhost_sup record saved
- ok = rabbit_vhost:recover(VHost),
{ok, Pid};
Other ->
Other
- end.
+ end. \ No newline at end of file
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index c2353ab85f..88062dc32a 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -40,19 +40,18 @@ memory() ->
[aggregate(Names, Sums, memory, fun (X) -> X end)
|| Names <- distinguished_interesting_sups()],
- MnesiaETS = mnesia_memory(),
- MsgIndexETS = ets_memory(msg_stores()),
- MetricsETS = ets_memory([rabbit_metrics]),
- MetricsProc =
- try
- [{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
- M
- catch
- error:badarg ->
- 0
- end,
- MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
- OsTotal = vm_memory_monitor:get_process_memory(),
+ MnesiaETS = mnesia_memory(),
+ MsgIndexETS = ets_memory(msg_stores()),
+ MetricsETS = ets_memory([rabbit_metrics]),
+ MetricsProc = try
+ [{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
+ M
+ catch
+ error:badarg ->
+ 0
+ end,
+ MgmtDbETS = ets_memory([rabbit_mgmt_storage]),
+ VMTotal = vm_memory_monitor:get_process_memory(),
[{total, ErlangTotal},
{processes, Processes},
@@ -63,6 +62,11 @@ memory() ->
{system, System}] =
erlang:memory([total, processes, ets, atom, binary, code, system]),
+ Unaccounted = case VMTotal - ErlangTotal of
+ GTZ when GTZ > 0 -> GTZ;
+ _LTZ -> 0
+ end,
+
OtherProc = Processes
- ConnsReader - ConnsWriter - ConnsChannel - ConnsOther
- Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc,
@@ -97,9 +101,9 @@ memory() ->
%% System
{code, Code},
{atom, Atom},
- {other_system, System - ETS - Bin - Code - Atom + (OsTotal - ErlangTotal)},
+ {other_system, System - ETS - Bin - Code - Atom + Unaccounted},
- {total, OsTotal}
+ {total, VMTotal}
].
%% [1] - erlang:memory(processes) can be less than the sum of its
%% parts. Rather than display something nonsensical, just silence any
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
index 09d7ab4e95..247477bf40 100644
--- a/test/channel_operation_timeout_test_queue.erl
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -26,13 +26,9 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
- zip_msgs_and_acks/4, multiple_routing_keys/0]).
-
+ zip_msgs_and_acks/4]).
-export([start/2, stop/1]).
-%% exported for testing only
--export([start_msg_store/3, stop_msg_store/1, init/6]).
-
%%----------------------------------------------------------------------------
%% This test backing queue follows the variable queue implementation, with
%% the exception that it will introduce infinite delays on some operations if
@@ -64,6 +60,7 @@
unacked_bytes,
persistent_count, %% w unacked
persistent_bytes, %% w unacked
+ delta_transient_bytes, %%
target_ram_count,
ram_msg_count, %% w/o unacked
@@ -88,6 +85,12 @@
%% default queue or lazy queue
mode,
+ %% number of reduce_memory_usage executions, once it
+ %% reaches a threshold the queue will manually trigger a runtime GC
+ %% see: maybe_execute_gc/1
+ memory_reduction_run_count,
+ %% Queue data is grouped by VHost. We need to store it
+ %% to work with queue index.
virtual_host
}).
@@ -108,22 +111,18 @@
-record(delta,
{ start_seq_id, %% start_seq_id is inclusive
count,
+ transient,
end_seq_id %% end_seq_id is exclusive
}).
--define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
--define(PERSISTENT_MSG_STORE, msg_store_persistent).
--define(TRANSIENT_MSG_STORE, msg_store_transient).
--define(QUEUE, lqueue).
--define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
--define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-%%----------------------------------------------------------------------------
+-define(QUEUE, lqueue).
+-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
--rabbit_upgrade({multiple_routing_keys, local, []}).
+%%----------------------------------------------------------------------------
-type seq_id() :: non_neg_integer().
@@ -191,2220 +190,118 @@
%% Duplicated from rabbit_backing_queue
-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
--spec multiple_routing_keys() -> 'ok'.
-
--define(BLANK_DELTA, #delta { start_seq_id = undefined,
- count = 0,
- end_seq_id = undefined }).
--define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
- count = 0,
- end_seq_id = Z }).
-
--define(MICROS_PER_SECOND, 1000000.0).
-
-%% We're sampling every 5s for RAM duration; a half life that is of
-%% the same order of magnitude is probably about right.
--define(RATE_AVG_HALF_LIFE, 5.0).
-
-%% We will recalculate the #rates{} every time we get asked for our
-%% RAM duration, or every N messages published, whichever is
-%% sooner. We do this since the priority calculations in
-%% rabbit_amqqueue_process need fairly fresh rates.
--define(MSGS_PER_RATE_CALC, 100).
-
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
start(VHost, DurableQueues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
- %% Group recovery terms by vhost.
- ClientRefs = [Ref || Terms <- AllTerms,
- Terms /= non_clean_shutdown,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- start_msg_store(VHost, ClientRefs, StartFunState),
- {ok, AllTerms}.
+ rabbit_variable_queue:start(VHost, DurableQueues).
stop(VHost) ->
- ok = stop_msg_store(VHost),
- ok = rabbit_queue_index:stop(VHost).
-
-start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
- rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
- {ok, _} = rabbit_vhost_msg_store:start(VHost,
- ?TRANSIENT_MSG_STORE,
- undefined,
- ?EMPTY_START_FUN_STATE),
- {ok, _} = rabbit_vhost_msg_store:start(VHost,
- ?PERSISTENT_MSG_STORE,
- Refs,
- StartFunState),
- rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
-
-stop_msg_store(VHost) ->
- rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
- rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE),
- ok.
-
+ rabbit_variable_queue:stop(VHost).
init(Queue, Recover, Callback) ->
- init(
- Queue, Recover, Callback,
- fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(Callback, MsgIds, ActionTaken)
- end,
- fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
- fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
-
-init(#amqqueue { name = QueueName, durable = IsDurable }, new,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
- IndexState = rabbit_queue_index:init(QueueName,
- MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
- VHost = QueueName#resource.virtual_host,
- init(IsDurable, IndexState, 0, 0, [],
- case IsDurable of
- true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun, AsyncCallback,
- VHost);
- false -> undefined
- end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), VHost);
-
-%% We can be recovering a transient queue if it crashed
-init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
- {PRef, RecoveryTerms} = process_recovery_terms(Terms),
- VHost = QueueName#resource.virtual_host,
- {PersistentClient, ContainsCheckFun} =
- case IsDurable of
- true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun, AsyncCallback,
- VHost),
- {C, fun (MsgId) when is_binary(MsgId) ->
- rabbit_msg_store:contains(MsgId, C);
- (#basic_message{is_persistent = Persistent}) ->
- Persistent
- end};
- false -> {undefined, fun(_MsgId) -> false end}
- end,
- TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
- undefined, AsyncCallback,
- VHost),
- {DeltaCount, DeltaBytes, IndexState} =
- rabbit_queue_index:recover(
- QueueName, RecoveryTerms,
- rabbit_vhost_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost),
- ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
- init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
- PersistentClient, TransientClient, VHost).
-
-process_recovery_terms(Terms=non_clean_shutdown) ->
- {rabbit_guid:gen(), Terms};
-process_recovery_terms(Terms) ->
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:gen(), []};
- PRef -> {PRef, Terms}
- end.
-
-terminate(_Reason, State) ->
- State1 = #vqstate { persistent_count = PCount,
- persistent_bytes = PBytes,
- index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT},
- virtual_host = VHost } =
- purge_pending_ack(true, State),
- PRef = case MSCStateP of
- undefined -> undefined;
- _ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
- rabbit_msg_store:client_ref(MSCStateP)
- end,
- ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- Terms = [{persistent_ref, PRef},
- {persistent_count, PCount},
- {persistent_bytes, PBytes}],
- a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
- VHost, Terms, IndexState),
- msg_store_clients = undefined }).
-
-%% the only difference between purge and delete is that delete also
-%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(_Reason, State) ->
- %% Normally when we purge messages we interact with the qi by
- %% issues delivers and acks for every purged message. In this case
- %% we don't need to do that, so we just delete the qi.
- State1 = purge_and_index_reset(State),
- State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
- purge_pending_ack_delete_and_terminate(State1),
- case MSCStateP of
- undefined -> ok;
- _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
- end,
- rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- a(State2 #vqstate { msg_store_clients = undefined }).
-
-delete_crashed(#amqqueue{name = QName}) ->
- ok = rabbit_queue_index:erase(QName).
-
-purge(State = #vqstate { len = Len, qi_pending_ack= QPA }) ->
+ rabbit_variable_queue:init(Queue, Recover, Callback).
+
+terminate(Reason, State) ->
+ rabbit_variable_queue:terminate(Reason, State).
+
+delete_and_terminate(Reason, State) ->
+ rabbit_variable_queue:delete_and_terminate(Reason, State).
+
+delete_crashed(Q) ->
+ rabbit_variable_queue:delete_crashed(Q).
+
+purge(State = #vqstate { qi_pending_ack= QPA }) ->
maybe_delay(QPA),
- case is_pending_ack_empty(State) of
- true ->
- {Len, purge_and_index_reset(State)};
- false ->
- {Len, purge_when_pending_acks(State)}
- end.
+ rabbit_variable_queue:purge(State).
-purge_acks(State) -> a(purge_pending_ack(false, State)).
+purge_acks(State) ->
+ rabbit_variable_queue:purge_acks(State).
publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
- State1 =
- publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
- fun maybe_write_to_disk/4,
- State),
- a(reduce_memory_use(maybe_update_rates(State1))).
+ rabbit_variable_queue:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State).
batch_publish(Publishes, ChPid, Flow, State) ->
- {ChPid, Flow, State1} =
- lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
- State2 = ui(State1),
- a(reduce_memory_use(maybe_update_rates(State2))).
+ rabbit_variable_queue:batch_publish(Publishes, ChPid, Flow, State).
publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
- {SeqId, State1} =
- publish_delivered1(Msg, MsgProps, ChPid, Flow,
- fun maybe_write_to_disk/4,
- State),
- {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}.
+ rabbit_variable_queue:publish_delivered(Msg, MsgProps, ChPid, Flow, State).
batch_publish_delivered(Publishes, ChPid, Flow, State) ->
- {ChPid, Flow, SeqIds, State1} =
- lists:foldl(fun batch_publish_delivered1/2,
- {ChPid, Flow, [], State}, Publishes),
- State2 = ui(State1),
- {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}.
+ rabbit_variable_queue:batch_publish_delivered(Publishes, ChPid, Flow, State).
discard(_MsgId, _ChPid, _Flow, State) -> State.
-drain_confirmed(State = #vqstate { confirmed = C }) ->
- case gb_sets:is_empty(C) of
- true -> {[], State}; %% common case
- false -> {gb_sets:to_list(C), State #vqstate {
- confirmed = gb_sets:new() }}
- end.
+drain_confirmed(State) ->
+ rabbit_variable_queue:drain_confirmed(State).
dropwhile(Pred, State) ->
- {MsgProps, State1} =
- remove_by_predicate(Pred, State),
- {MsgProps, a(State1)}.
+ rabbit_variable_queue:dropwhile(Pred, State).
fetchwhile(Pred, Fun, Acc, State) ->
- {MsgProps, Acc1, State1} =
- fetch_by_predicate(Pred, Fun, Acc, State),
- {MsgProps, Acc1, a(State1)}.
+ rabbit_variable_queue:fetchwhile(Pred, Fun, Acc, State).
fetch(AckRequired, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {empty, a(State1)};
- {{value, MsgStatus}, State1} ->
- %% it is possible that the message wasn't read from disk
- %% at this point, so read it in.
- {Msg, State2} = read_msg(MsgStatus, State1),
- {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
- {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
- end.
+ rabbit_variable_queue:fetch(AckRequired, State).
drop(AckRequired, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {empty, a(State1)};
- {{value, MsgStatus}, State1} ->
- {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
- {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
- end.
-
-ack([], State) ->
- {[], State};
-%% optimisation: this head is essentially a partial evaluation of the
-%% general case below, for the single-ack case.
-ack([SeqId], State) ->
- {#msg_status { msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- ack_out_counter = AckOutCount }} =
- remove_pending_ack(true, SeqId, State),
- IndexState1 = case IndexOnDisk of
- true -> rabbit_queue_index:ack([SeqId], IndexState);
- false -> IndexState
- end,
- case MsgInStore of
- true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
- false -> ok
- end,
- {[MsgId],
- a(State1 #vqstate { index_state = IndexState1,
- ack_out_counter = AckOutCount + 1 })};
-ack(AckTags, State) ->
- {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- ack_out_counter = AckOutCount }} =
- lists:foldl(
- fun (SeqId, {Acc, State2}) ->
- {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2),
- {accumulate_ack(MsgStatus, Acc), State3}
- end, {accumulate_ack_init(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
- remove_msgs_by_id(MsgIdsByStore, MSCState),
- {lists:reverse(AllMsgIds),
- a(State1 #vqstate { index_state = IndexState1,
- ack_out_counter = AckOutCount + length(AckTags) })}.
-
-requeue(AckTags, #vqstate { mode = default,
- delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len,
- qi_pending_ack = QPA } = State) ->
- maybe_delay(QPA),
- {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
- beta_limit(Q3),
- fun publish_alpha/2, State),
- {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
- delta_limit(Delta),
- fun publish_beta/2, State1),
- {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
- State2),
- MsgCount = length(MsgIds2),
- {MsgIds2, a(reduce_memory_use(
- maybe_update_rates(
- State3 #vqstate { delta = Delta1,
- q3 = Q3a,
- q4 = Q4a,
- in_counter = InCounter + MsgCount,
- len = Len + MsgCount })))};
-requeue(AckTags, #vqstate { mode = lazy,
- delta = Delta,
- q3 = Q3,
- in_counter = InCounter,
- len = Len,
- qi_pending_ack = QPA } = State) ->
+ rabbit_variable_queue:drop(AckRequired, State).
+
+ack(List, State) ->
+ rabbit_variable_queue:ack(List, State).
+
+requeue(AckTags, #vqstate { qi_pending_ack = QPA } = State) ->
maybe_delay(QPA),
- {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [],
- delta_limit(Delta),
- fun publish_beta/2, State),
- {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
- State1),
- MsgCount = length(MsgIds1),
- {MsgIds1, a(reduce_memory_use(
- maybe_update_rates(
- State2 #vqstate { delta = Delta1,
- q3 = Q3a,
- in_counter = InCounter + MsgCount,
- len = Len + MsgCount })))}.
+ rabbit_variable_queue:requeue(AckTags, State).
ackfold(MsgFun, Acc, State, AckTags) ->
- {AccN, StateN} =
- lists:foldl(fun(SeqId, {Acc0, State0}) ->
- MsgStatus = lookup_pending_ack(SeqId, State0),
- {Msg, State1} = read_msg(MsgStatus, State0),
- {MsgFun(Msg, SeqId, Acc0), State1}
- end, {Acc, State}, AckTags),
- {AccN, a(StateN)}.
-
-fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
- {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
- [msg_iterator(State),
- disk_ack_iterator(State),
- ram_ack_iterator(State),
- qi_ack_iterator(State)]),
- ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
-
-len(#vqstate { len = Len, qi_pending_ack = QPA }) ->
+ rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags).
+
+fold(Fun, Acc, State) ->
+ rabbit_variable_queue:fold(Fun, Acc, State).
+
+len(#vqstate { qi_pending_ack = QPA } = State) ->
maybe_delay(QPA),
- Len.
+ rabbit_variable_queue:len(State).
is_empty(State) -> 0 == len(State).
depth(State) ->
- len(State) + count_pending_acks(State).
-
-set_ram_duration_target(
- DurationTarget, State = #vqstate {
- rates = #rates { in = AvgIngressRate,
- out = AvgEgressRate,
- ack_in = AvgAckIngressRate,
- ack_out = AvgAckEgressRate },
- target_ram_count = TargetRamCount }) ->
- Rate =
- AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
- TargetRamCount1 =
- case DurationTarget of
- infinity -> infinity;
- _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
- end,
- State1 = State #vqstate { target_ram_count = TargetRamCount1 },
- a(case TargetRamCount1 == infinity orelse
- (TargetRamCount =/= infinity andalso
- TargetRamCount1 >= TargetRamCount) of
- true -> State1;
- false -> reduce_memory_use(State1)
- end).
-
-maybe_update_rates(State = #vqstate{ in_counter = InCount,
- out_counter = OutCount })
- when InCount + OutCount > ?MSGS_PER_RATE_CALC ->
- update_rates(State);
-maybe_update_rates(State) ->
- State.
-
-update_rates(State = #vqstate{ in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
- ack_out_counter = AckOutCount,
- rates = #rates{ in = InRate,
- out = OutRate,
- ack_in = AckInRate,
- ack_out = AckOutRate,
- timestamp = TS }}) ->
- Now = erlang:monotonic_time(),
-
- Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
- out = update_rate(Now, TS, OutCount, OutRate),
- ack_in = update_rate(Now, TS, AckInCount, AckInRate),
- ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
- timestamp = Now },
-
- State#vqstate{ in_counter = 0,
- out_counter = 0,
- ack_in_counter = 0,
- ack_out_counter = 0,
- rates = Rates }.
-
-update_rate(Now, TS, Count, Rate) ->
- Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) /
- ?MICROS_PER_SECOND,
- if
- Time == 0 -> Rate;
- true -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE,
- Count / Time, Rate)
- end.
-
-ram_duration(State) ->
- State1 = #vqstate { rates = #rates { in = AvgIngressRate,
- out = AvgEgressRate,
- ack_in = AvgAckIngressRate,
- ack_out = AvgAckEgressRate },
- ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev,
- ram_pending_ack = RPA,
- qi_pending_ack = QPA,
- ram_ack_count_prev = RamAckCountPrev } =
- update_rates(State),
-
- RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
-
- Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case lists:all(fun (X) -> X < 0.01 end,
- [AvgEgressRate, AvgIngressRate,
- AvgAckEgressRate, AvgAckIngressRate]) of
- true -> infinity;
- false -> (RamMsgCountPrev + RamMsgCount +
- RamAckCount + RamAckCountPrev) /
- (4 * (AvgEgressRate + AvgIngressRate +
- AvgAckEgressRate + AvgAckIngressRate))
- end,
-
- {Duration, State1}.
-
-needs_timeout(#vqstate { index_state = IndexState }) ->
- case rabbit_queue_index:needs_sync(IndexState) of
- confirms -> timed;
- other -> idle;
- false -> false
- end.
-
-timeout(State = #vqstate { index_state = IndexState }) ->
- State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
-
-handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
- State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-
-resume(State) -> a(reduce_memory_use(State)).
-
-msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
- out = AvgEgressRate } }) ->
- {AvgIngressRate, AvgEgressRate}.
-
-info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
- RamMsgCount;
-info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
- qi_pending_ack = QPA}) ->
- gb_trees:size(RPA) + gb_trees:size(QPA);
-info(messages_ram, State) ->
- info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
-info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
- PersistentCount;
-info(message_bytes, #vqstate{bytes = Bytes,
- unacked_bytes = UBytes}) ->
- Bytes + UBytes;
-info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
- Bytes;
-info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
- UBytes;
-info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
- RamBytes;
-info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
- PersistentBytes;
-info(head_message_timestamp, #vqstate{
- q3 = Q3,
- q4 = Q4,
- ram_pending_ack = RPA,
- qi_pending_ack = QPA}) ->
- head_message_timestamp(Q3, Q4, RPA, QPA);
-info(disk_reads, #vqstate{disk_read_count = Count}) ->
- Count;
-info(disk_writes, #vqstate{disk_write_count = Count}) ->
- Count;
-info(backing_queue_status, #vqstate {
- q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- mode = Mode,
- len = Len,
- target_ram_count = TargetRamCount,
- next_seq_id = NextSeqId,
- rates = #rates { in = AvgIngressRate,
- out = AvgEgressRate,
- ack_in = AvgAckIngressRate,
- ack_out = AvgAckEgressRate }}) ->
-
- [ {mode , Mode},
- {q1 , ?QUEUE:len(Q1)},
- {q2 , ?QUEUE:len(Q2)},
- {delta , Delta},
- {q3 , ?QUEUE:len(Q3)},
- {q4 , ?QUEUE:len(Q4)},
- {len , Len},
- {target_ram_count , TargetRamCount},
- {next_seq_id , NextSeqId},
- {avg_ingress_rate , AvgIngressRate},
- {avg_egress_rate , AvgEgressRate},
- {avg_ack_ingress_rate, AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ];
-info(Item, _) ->
- throw({bad_argument, Item}).
-
-invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
-invoke( _, _, State) -> State.
-
-is_duplicate(_Msg, State) -> {false, State}.
-
-set_queue_mode(Mode, State = #vqstate { mode = Mode }) ->
- State;
-set_queue_mode(lazy, State = #vqstate {
- target_ram_count = TargetRamCount }) ->
- %% To become a lazy queue we need to page everything to disk first.
- State1 = convert_to_lazy(State),
- %% restore the original target_ram_count
- a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount });
-set_queue_mode(default, State) ->
- %% becoming a default queue means loading messages from disk like
- %% when a queue is recovered.
- a(maybe_deltas_to_betas(State #vqstate { mode = default }));
-set_queue_mode(_, State) ->
- State.
-
-zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
- lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) ->
- [{Id, AckTag} | Acc]
- end, Accumulator, lists:zip(Msgs, AckTags)).
-
-convert_to_lazy(State) ->
- State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
- set_ram_duration_target(0, State),
- case Delta#delta.count + ?QUEUE:len(Q3) == Len of
- true ->
- State1;
- false ->
- %% When pushing messages to disk, we might have been
- %% blocked by the msg_store, so we need to see if we have
- %% to wait for more credit, and then keep paging messages.
- %%
- %% The amqqueue_process could have taken care of this, but
- %% between the time it receives the bump_credit msg and
- %% calls BQ:resume to keep paging messages to disk, some
- %% other request may arrive to the BQ which at this moment
- %% is not in a proper state for a lazy BQ (unless all
- %% messages have been paged to disk already).
- wait_for_msg_store_credit(),
- convert_to_lazy(State1)
- end.
-
-wait_for_msg_store_credit() ->
- case credit_flow:blocked() of
- true -> receive
- {bump_credit, Msg} ->
- credit_flow:handle_bump_msg(Msg)
- end;
- false -> ok
- end.
-
-%% Get the Timestamp property of the first msg, if present. This is
-%% the one with the oldest timestamp among the heads of the pending
-%% acks and unread queues. We can't check disk_pending_acks as these
-%% are paged out - we assume some will soon be paged in rather than
-%% forcing it to happen. Pending ack msgs are included as they are
-%% regarded as unprocessed until acked, this also prevents the result
-%% apparently oscillating during repeated rejects. Q3 is only checked
-%% when Q4 is empty as any Q4 msg will be earlier.
-head_message_timestamp(Q3, Q4, RPA, QPA) ->
- HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
- HeadMsgStatus <-
- [ get_qs_head([Q4, Q3]),
- get_pa_head(RPA),
- get_pa_head(QPA) ],
- HeadMsgStatus /= undefined,
- HeadMsgStatus#msg_status.msg /= undefined ],
-
- Timestamps =
- [Timestamp || HeadMsg <- HeadMsgs,
- Timestamp <- [rabbit_basic:extract_timestamp(
- HeadMsg#basic_message.content)],
- Timestamp /= undefined
- ],
-
- case Timestamps == [] of
- true -> '';
- false -> lists:min(Timestamps)
- end.
-
-get_qs_head(Qs) ->
- catch lists:foldl(
- fun (Q, Acc) ->
- case get_q_head(Q) of
- undefined -> Acc;
- Val -> throw(Val)
- end
- end, undefined, Qs).
-
-get_q_head(Q) ->
- get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1).
-
-get_pa_head(PA) ->
- get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1).
-
-get_collection_head(Col, IsEmpty, GetVal) ->
- case IsEmpty(Col) of
- false ->
- {_, MsgStatus} = GetVal(Col),
- MsgStatus;
- true -> undefined
- end.
-
-%%----------------------------------------------------------------------------
-%% Minor helpers
-%%----------------------------------------------------------------------------
-a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- mode = default,
- len = Len,
- bytes = Bytes,
- unacked_bytes = UnackedBytes,
- persistent_count = PersistentCount,
- persistent_bytes = PersistentBytes,
- ram_msg_count = RamMsgCount,
- ram_bytes = RamBytes}) ->
- E1 = ?QUEUE:is_empty(Q1),
- E2 = ?QUEUE:is_empty(Q2),
- ED = Delta#delta.count == 0,
- E3 = ?QUEUE:is_empty(Q3),
- E4 = ?QUEUE:is_empty(Q4),
- LZ = Len == 0,
-
- %% if q1 has messages then q3 cannot be empty. See publish/6.
- true = E1 or not E3,
- %% if q2 has messages then we have messages in delta (paged to
- %% disk). See push_alphas_to_betas/2.
- true = E2 or not ED,
- %% if delta has messages then q3 cannot be empty. This is enforced
- %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages
- %% are always kept on RAM.
- true = ED or not E3,
- %% if the queue length is 0, then q3 and q4 must be empty.
- true = LZ == (E3 and E4),
-
- true = Len >= 0,
- true = Bytes >= 0,
- true = UnackedBytes >= 0,
- true = PersistentCount >= 0,
- true = PersistentBytes >= 0,
- true = RamMsgCount >= 0,
- true = RamMsgCount =< Len,
- true = RamBytes >= 0,
- true = RamBytes =< Bytes + UnackedBytes,
-
- State;
-a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- mode = lazy,
- len = Len,
- bytes = Bytes,
- unacked_bytes = UnackedBytes,
- persistent_count = PersistentCount,
- persistent_bytes = PersistentBytes,
- ram_msg_count = RamMsgCount,
- ram_bytes = RamBytes}) ->
- E1 = ?QUEUE:is_empty(Q1),
- E2 = ?QUEUE:is_empty(Q2),
- ED = Delta#delta.count == 0,
- E3 = ?QUEUE:is_empty(Q3),
- E4 = ?QUEUE:is_empty(Q4),
- LZ = Len == 0,
- L3 = ?QUEUE:len(Q3),
-
- %% q1 must always be empty, since q1 only gets messages during
- %% publish, but for lazy queues messages go straight to delta.
- true = E1,
-
- %% q2 only gets messages from q1 when push_alphas_to_betas is
- %% called for a non empty delta, which won't be the case for a
- %% lazy queue. This means q2 must always be empty.
- true = E2,
-
- %% q4 must always be empty, since q1 only gets messages during
- %% publish, but for lazy queues messages go straight to delta.
- true = E4,
-
- %% if the queue is empty, then delta is empty and q3 is empty.
- true = LZ == (ED and E3),
-
- %% There should be no messages in q1, q2, and q4
- true = Delta#delta.count + L3 == Len,
-
- true = Len >= 0,
- true = Bytes >= 0,
- true = UnackedBytes >= 0,
- true = PersistentCount >= 0,
- true = PersistentBytes >= 0,
- true = RamMsgCount >= 0,
- true = RamMsgCount =< Len,
- true = RamBytes >= 0,
- true = RamBytes =< Bytes + UnackedBytes,
-
- State.
-
-d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
- when Start + Count =< End ->
- Delta.
-
-m(MsgStatus = #msg_status { is_persistent = IsPersistent,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk }) ->
- true = (not IsPersistent) or IndexOnDisk,
- true = msg_in_ram(MsgStatus) or MsgInStore,
- MsgStatus.
-
-one_if(true ) -> 1;
-one_if(false) -> 0.
-
-cons_if(true, E, L) -> [E | L];
-cons_if(false, _E, L) -> L.
-
-gb_sets_maybe_insert(false, _Val, Set) -> Set;
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-
-msg_status(IsPersistent, IsDelivered, SeqId,
- Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) ->
- #msg_status{seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_in_store = false,
- index_on_disk = false,
- persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize),
- msg_props = MsgProps}.
-
-beta_msg_status({Msg = #basic_message{id = MsgId},
- SeqId, MsgProps, IsPersistent, IsDelivered}) ->
- MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
- MS0#msg_status{msg_id = MsgId,
- msg = Msg,
- persist_to = queue_index,
- msg_in_store = false};
-
-beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
- MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
- MS0#msg_status{msg_id = MsgId,
- msg = undefined,
- persist_to = msg_store,
- msg_in_store = true}.
-
-beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
- #msg_status{seq_id = SeqId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- index_on_disk = true,
- msg_props = MsgProps}.
-
-trim_msg_status(MsgStatus) ->
- case persist_to(MsgStatus) of
- msg_store -> MsgStatus#msg_status{msg = undefined};
- queue_index -> MsgStatus
- end.
-
-with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
- {Result, MSCStateP1} = Fun(MSCStateP),
- {Result, {MSCStateP1, MSCStateT}};
-with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
- {Result, MSCStateT1} = Fun(MSCStateT),
- {Result, {MSCStateP, MSCStateT1}}.
-
-with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
- {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
- fun (MSCState1) ->
- {Fun(MSCState1), MSCState1}
- end),
- Res.
-
-msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
- msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
- Callback, VHost).
-
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
- CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_vhost_msg_store:client_init(VHost,
- MsgStore, Ref, MsgOnDiskFun,
- fun () -> Callback(?MODULE, CloseFDsFun) end).
-
-msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) ->
- rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
- end).
-
-msg_store_read(MSCState, IsPersistent, MsgId) ->
- with_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) ->
- rabbit_msg_store:read(MsgId, MSCState1)
- end).
-
-msg_store_remove(MSCState, IsPersistent, MsgIds) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MCSState1) ->
- rabbit_msg_store:remove(MsgIds, MCSState1)
- end).
-
-msg_store_close_fds(MSCState, IsPersistent) ->
- with_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
-
-msg_store_close_fds_fun(IsPersistent) ->
- fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
- {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
- State #vqstate { msg_store_clients = MSCState1 }
- end.
-
-maybe_write_delivered(false, _SeqId, IndexState) ->
- IndexState;
-maybe_write_delivered(true, SeqId, IndexState) ->
- rabbit_queue_index:deliver([SeqId], IndexState).
-
-betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
- {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
- lists:foldr(
- fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
- case SeqId < TransientThreshold andalso not IsPersistent of
- true -> {Filtered1,
- cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1], RRC, RB};
- false -> MsgStatus = m(beta_msg_status(M)),
- HaveMsg = msg_in_ram(MsgStatus),
- Size = msg_size(MsgStatus),
- case is_msg_in_pending_acks(SeqId, State) of
- false -> {?QUEUE:in_r(MsgStatus, Filtered1),
- Delivers1, Acks1,
- RRC + one_if(HaveMsg),
- RB + one_if(HaveMsg) * Size};
- true -> Acc %% [0]
- end
- end
- end, {?QUEUE:new(), [], [], 0, 0}, List),
- {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}.
-%% [0] We don't increase RamBytes here, even though it pertains to
-%% unacked messages too, since if HaveMsg then the message must have
-%% been stored in the QI, thus the message must have been in
-%% qi_pending_ack, thus it must already have been in RAM.
-
-is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA) orelse
- gb_trees:is_defined(SeqId, QPA)).
-
-expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
- d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
-expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
- count = Count } = Delta)
- when SeqId < StartSeqId ->
- d(Delta #delta { start_seq_id = SeqId, count = Count + 1 });
-expand_delta(SeqId, #delta { count = Count,
- end_seq_id = EndSeqId } = Delta)
- when SeqId >= EndSeqId ->
- d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 });
-expand_delta(_SeqId, #delta { count = Count } = Delta) ->
- d(Delta #delta { count = Count + 1 }).
-
-%%----------------------------------------------------------------------------
-%% Internal major helpers for Public API
-%%----------------------------------------------------------------------------
-
-init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
- PersistentClient, TransientClient, VHost) ->
- {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
-
- {DeltaCount1, DeltaBytes1} =
- case Terms of
- non_clean_shutdown -> {DeltaCount, DeltaBytes};
- _ -> {proplists:get_value(persistent_count,
- Terms, DeltaCount),
- proplists:get_value(persistent_bytes,
- Terms, DeltaBytes)}
- end,
- Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
- true -> ?BLANK_DELTA;
- false -> d(#delta { start_seq_id = LowSeqId,
- count = DeltaCount1,
- end_seq_id = NextSeqId })
- end,
- Now = erlang:monotonic_time(),
- IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
- ?IO_BATCH_SIZE),
-
- {ok, IndexMaxSize} = application:get_env(
- rabbit, queue_index_embed_msgs_below),
- State = #vqstate {
- q1 = ?QUEUE:new(),
- q2 = ?QUEUE:new(),
- delta = Delta,
- q3 = ?QUEUE:new(),
- q4 = ?QUEUE:new(),
- next_seq_id = NextSeqId,
- ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty(),
- qi_pending_ack = gb_trees:empty(),
- index_state = IndexState1,
- msg_store_clients = {PersistentClient, TransientClient},
- durable = IsDurable,
- transient_threshold = NextSeqId,
- qi_embed_msgs_below = IndexMaxSize,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
- bytes = DeltaBytes1,
- persistent_bytes = DeltaBytes1,
-
- target_ram_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_ack_count_prev = 0,
- ram_bytes = 0,
- unacked_bytes = 0,
- out_counter = 0,
- in_counter = 0,
- rates = blank_rates(Now),
- msgs_on_disk = gb_sets:new(),
- msg_indices_on_disk = gb_sets:new(),
- unconfirmed = gb_sets:new(),
- confirmed = gb_sets:new(),
- ack_out_counter = 0,
- ack_in_counter = 0,
- disk_read_count = 0,
- disk_write_count = 0,
-
- io_batch_size = IoBatchSize,
-
- mode = default,
- virtual_host = VHost },
- a(maybe_deltas_to_betas(State)).
-
-blank_rates(Now) ->
- #rates { in = 0.0,
- out = 0.0,
- ack_in = 0.0,
- ack_out = 0.0,
- timestamp = Now}.
-
-in_r(MsgStatus = #msg_status { msg = undefined },
- State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) ->
- case ?QUEUE:is_empty(Q4) of
- true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- MsgStatus1 = MsgStatus#msg_status{msg = Msg},
- stats(ready0, {MsgStatus, MsgStatus1},
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
- end;
-in_r(MsgStatus,
- State = #vqstate { mode = default, q4 = Q4 }) ->
- State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) };
-%% lazy queues
-in_r(MsgStatus = #msg_status { seq_id = SeqId },
- State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) ->
- case ?QUEUE:is_empty(Q3) of
- true ->
- {_MsgStatus1, State1} =
- maybe_write_to_disk(true, true, MsgStatus, State),
- State2 = stats(ready0, {MsgStatus, none}, State1),
- Delta1 = expand_delta(SeqId, Delta),
- State2 #vqstate{ delta = Delta1 };
- false ->
- State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }
- end.
-
-queue_out(State = #vqstate { mode = default, q4 = Q4 }) ->
- case ?QUEUE:out(Q4) of
- {empty, _Q4} ->
- case fetch_from_q3(State) of
- {empty, _State1} = Result -> Result;
- {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
- end;
- {{value, MsgStatus}, Q4a} ->
- {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
- end;
-%% lazy queues
-queue_out(State = #vqstate { mode = lazy }) ->
- case fetch_from_q3(State) of
- {empty, _State1} = Result -> Result;
- {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
- end.
-
-read_msg(#msg_status{msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent}, State) ->
- read_msg(MsgId, IsPersistent, State);
-read_msg(#msg_status{msg = Msg}, State) ->
- {Msg, State}.
-
-read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
- disk_read_count = Count}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, State #vqstate {msg_store_clients = MSCState1,
- disk_read_count = Count + 1}}.
-
-stats(Signs, Statuses, State) ->
- stats0(expand_signs(Signs), expand_statuses(Statuses), State).
-
-expand_signs(ready0) -> {0, 0, true};
-expand_signs(lazy_pub) -> {1, 0, true};
-expand_signs({A, B}) -> {A, B, false}.
-
-expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
-expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
-expand_statuses({lazy, A}) -> {false , false, A};
-expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
-
-%% In this function at least, we are religious: the variable name
-%% contains "Ready" or "Unacked" iff that is what it counts. If
-%% neither is present it counts both.
-stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
- {InRamBefore, InRamAfter, MsgStatus},
- State = #vqstate{len = ReadyCount,
- bytes = ReadyBytes,
- ram_msg_count = RamReadyCount,
- persistent_count = PersistentCount,
- unacked_bytes = UnackedBytes,
- ram_bytes = RamBytes,
- persistent_bytes = PersistentBytes}) ->
- S = msg_size(MsgStatus),
- DeltaTotal = DeltaReady + DeltaUnacked,
- DeltaRam = case {InRamBefore, InRamAfter} of
- {false, false} -> 0;
- {false, true} -> 1;
- {true, false} -> -1;
- {true, true} -> 0
- end,
- DeltaRamReady = case DeltaReady of
- 1 -> one_if(InRamAfter);
- -1 -> -one_if(InRamBefore);
- 0 when ReadyMsgPaged -> DeltaRam;
- 0 -> 0
- end,
- DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent),
- State#vqstate{len = ReadyCount + DeltaReady,
- ram_msg_count = RamReadyCount + DeltaRamReady,
- persistent_count = PersistentCount + DeltaPersistent,
- bytes = ReadyBytes + DeltaReady * S,
- unacked_bytes = UnackedBytes + DeltaUnacked * S,
- ram_bytes = RamBytes + DeltaRam * S,
- persistent_bytes = PersistentBytes + DeltaPersistent * S}.
-
-msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
-
-msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
-
-%% first param: AckRequired
-remove(true, MsgStatus = #msg_status {
- seq_id = SeqId,
- is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk },
- State = #vqstate {out_counter = OutCount,
- index_state = IndexState}) ->
- %% Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- State1 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
-
- State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1),
-
- {SeqId, maybe_update_rates(
- State2 #vqstate {out_counter = OutCount + 1,
- index_state = IndexState1})};
-
-%% This function body has the same behaviour as remove_queue_entries/3
-%% but instead of removing messages based on a ?QUEUE, this removes
-%% just one message, the one referenced by the MsgStatus provided.
-remove(false, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
- State = #vqstate {out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState}) ->
- %% Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- %% Remove from msg_store and queue index, if necessary
- case MsgInStore of
- true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
- false -> ok
- end,
-
- IndexState2 =
- case IndexOnDisk of
- true -> rabbit_queue_index:ack([SeqId], IndexState1);
- false -> IndexState1
- end,
-
- State1 = stats({-1, 0}, {MsgStatus, none}, State),
-
- {undefined, maybe_update_rates(
- State1 #vqstate {out_counter = OutCount + 1,
- index_state = IndexState2})}.
-
-%% This function exists as a way to improve dropwhile/2
-%% performance. The idea of having this function is to optimise calls
-%% to rabbit_queue_index by batching delivers and acks, instead of
-%% sending them one by one.
-%%
-%% Instead of removing every message as their are popped from the
-%% queue, it first accumulates them and then removes them by calling
-%% remove_queue_entries/3, since the behaviour of
-%% remove_queue_entries/3 when used with
-%% process_delivers_and_acks_fun(deliver_and_ack) is the same as
-%% calling remove(false, MsgStatus, State).
-%%
-%% remove/3 also updates the out_counter in every call, but here we do
-%% it just once at the end.
-remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
- {MsgProps, QAcc, State1} =
- collect_by_predicate(Pred, ?QUEUE:new(), State),
- State2 =
- remove_queue_entries(
- QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1),
- %% maybe_update_rates/1 is called in remove/2 for every
- %% message. Since we update out_counter only once, we call it just
- %% there.
- {MsgProps, maybe_update_rates(
- State2 #vqstate {
- out_counter = OutCount + ?QUEUE:len(QAcc)})}.
-
-%% This function exists as a way to improve fetchwhile/4
-%% performance. The idea of having this function is to optimise calls
-%% to rabbit_queue_index by batching delivers, instead of sending them
-%% one by one.
-%%
-%% Fun is the function passed to fetchwhile/4 that's
-%% applied to every fetched message and used to build the fetchwhile/4
-%% result accumulator FetchAcc.
-fetch_by_predicate(Pred, Fun, FetchAcc,
- State = #vqstate {
- index_state = IndexState,
- out_counter = OutCount}) ->
- {MsgProps, QAcc, State1} =
- collect_by_predicate(Pred, ?QUEUE:new(), State),
-
- {Delivers, FetchAcc1, State2} =
- process_queue_entries(QAcc, Fun, FetchAcc, State1),
-
- IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState),
-
- {MsgProps, FetchAcc1, maybe_update_rates(
- State2 #vqstate {
- index_state = IndexState1,
- out_counter = OutCount + ?QUEUE:len(QAcc)})}.
-
-%% We try to do here the same as what remove(true, State) does but
-%% processing several messages at the same time. The idea is to
-%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
-%% SeqIds instead of one by one, thus process_queue_entries1 will
-%% accumulate the required deliveries, will record_pending_ack for
-%% each message, and will update stats, like remove/2 does.
-%%
-%% For the meaning of Fun and FetchAcc arguments see
-%% fetch_by_predicate/4 above.
-process_queue_entries(Q, Fun, FetchAcc, State) ->
- ?QUEUE:foldl(fun (MsgStatus, Acc) ->
- process_queue_entries1(MsgStatus, Fun, Acc)
- end,
- {[], FetchAcc, State}, Q).
-
-process_queue_entries1(
- #msg_status { seq_id = SeqId, is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk} = MsgStatus,
- Fun,
- {Delivers, FetchAcc, State}) ->
- {Msg, State1} = read_msg(MsgStatus, State),
- State2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State1),
- {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
- Fun(Msg, SeqId, FetchAcc),
- stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}.
-
-collect_by_predicate(Pred, QAcc, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {undefined, QAcc, State1};
- {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc),
- State1);
- false -> {MsgProps, QAcc, in_r(MsgStatus, State1)}
- end
- end.
-
-%%----------------------------------------------------------------------------
-%% Helpers for Public API purge/1 function
-%%----------------------------------------------------------------------------
+ rabbit_variable_queue:depth(State).
-%% The difference between purge_when_pending_acks/1
-%% vs. purge_and_index_reset/1 is that the first one issues a deliver
-%% and an ack to the queue index for every message that's being
-%% removed, while the later just resets the queue index state.
-purge_when_pending_acks(State) ->
- State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State),
- a(State1).
+set_ram_duration_target(DurationTarget, State) ->
+ rabbit_variable_queue:set_ram_duration_target(DurationTarget, State).
-purge_and_index_reset(State) ->
- State1 = purge1(process_delivers_and_acks_fun(none), State),
- a(reset_qi_state(State1)).
-
-%% This function removes messages from each of {q1, q2, q3, q4}.
-%%
-%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
-%% are specially handled by purge_betas_and_deltas/2.
-%%
-%% purge_betas_and_deltas/2 loads messages from the queue index,
-%% filling up q3 and in some cases moving messages form q2 to q3 while
-%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The
-%% messages loaded into q3 are removed by calling
-%% remove_queue_entries/3 until there are no more messages to be read
-%% from the queue index. Messages are read in batches from the queue
-%% index.
-purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
- State1 = remove_queue_entries(Q4, AfterFun, State),
-
- State2 = #vqstate {q1 = Q1} =
- purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}),
-
- State3 = remove_queue_entries(Q1, AfterFun, State2),
-
- a(State3#vqstate{q1 = ?QUEUE:new()}).
-
-reset_qi_state(State = #vqstate{index_state = IndexState}) ->
- State#vqstate{index_state =
- rabbit_queue_index:reset_state(IndexState)}.
-
-is_pending_ack_empty(State) ->
- count_pending_acks(State) =:= 0.
-
-count_pending_acks(#vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
-
-purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) ->
- State0 = #vqstate { q3 = Q3 } =
- case Mode of
- lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State);
- _ -> State
- end,
-
- case ?QUEUE:is_empty(Q3) of
- true -> State0;
- false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0),
- purge_betas_and_deltas(DelsAndAcksFun,
- maybe_deltas_to_betas(
- DelsAndAcksFun,
- State1#vqstate{q3 = ?QUEUE:new()}))
- end.
-
-remove_queue_entries(Q, DelsAndAcksFun,
- State = #vqstate{msg_store_clients = MSCState}) ->
- {MsgIdsByStore, Delivers, Acks, State1} =
- ?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), [], [], State}, Q),
- remove_msgs_by_id(MsgIdsByStore, MSCState),
- DelsAndAcksFun(Delivers, Acks, State1).
-
-remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
- msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
- is_persistent = IsPersistent} = MsgStatus,
- {MsgIdsByStore, Delivers, Acks, State}) ->
- {case MsgInStore of
- true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
- false -> MsgIdsByStore
- end,
- cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
- cons_if(IndexOnDisk, SeqId, Acks),
- stats({-1, 0}, {MsgStatus, none}, State)}.
-
-process_delivers_and_acks_fun(deliver_and_ack) ->
- fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
- IndexState1 =
- rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
- State #vqstate { index_state = IndexState1 }
- end;
-process_delivers_and_acks_fun(_) ->
- fun (_, _, State) ->
- State
- end.
-
-%%----------------------------------------------------------------------------
-%% Internal gubbins for publishing
-%%----------------------------------------------------------------------------
-
-publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, _Flow, PersistFun,
- State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- mode = default,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
- State2 = case ?QUEUE:is_empty(Q3) of
- false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
- end,
- InCount1 = InCount + 1,
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- stats({1, 0}, {none, MsgStatus1},
- State2#vqstate{ next_seq_id = SeqId + 1,
- in_counter = InCount1,
- unconfirmed = UC1 });
-publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, _Flow, PersistFun,
- State = #vqstate { mode = lazy,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC,
- delta = Delta }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
- Delta1 = expand_delta(SeqId, Delta),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- stats(lazy_pub, {lazy, m(MsgStatus1)},
- State1#vqstate{ delta = Delta1,
- next_seq_id = SeqId + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1 }).
-
-batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
- {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
- fun maybe_prepare_write_to_disk/4, State)}.
-
-publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
- MsgProps = #message_properties {
- needs_confirming = NeedsConfirming },
- _ChPid, _Flow, PersistFun,
- State = #vqstate { mode = default,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
- State2 = record_pending_ack(m(MsgStatus1), State1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({0, 1}, {none, MsgStatus1},
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1 }),
- {SeqId, State3};
-publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
- MsgProps = #message_properties {
- needs_confirming = NeedsConfirming },
- _ChPid, _Flow, PersistFun,
- State = #vqstate { mode = lazy,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
- State2 = record_pending_ack(m(MsgStatus1), State1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({0, 1}, {none, MsgStatus1},
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1 }),
- {SeqId, State3}.
-
-batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
- {SeqId, State1} =
- publish_delivered1(Msg, MsgProps, ChPid, Flow,
- fun maybe_prepare_write_to_disk/4,
- State),
- {ChPid, Flow, [SeqId | SeqIds], State1}.
-
-maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_in_store = true }, State) ->
- {MsgStatus, State};
-maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
- msg = Msg, msg_id = MsgId,
- is_persistent = IsPersistent },
- State = #vqstate{ msg_store_clients = MSCState,
- disk_write_count = Count})
- when Force orelse IsPersistent ->
- case persist_to(MsgStatus) of
- msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
- prepare_to_store(Msg)),
- {MsgStatus#msg_status{msg_in_store = true},
- State#vqstate{disk_write_count = Count + 1}};
- queue_index -> {MsgStatus, State}
- end;
-maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
- {MsgStatus, State}.
-
-%% Due to certain optimizations made inside
-%% rabbit_queue_index:pre_publish/7 we need to have two separate
-%% functions for index persistence. This one is only used when paging
-%% during memory pressure. We didn't want to modify
-%% maybe_write_index_to_disk/3 because that function is used in other
-%% places.
-maybe_batch_write_index_to_disk(_Force,
- MsgStatus = #msg_status {
- index_on_disk = true }, State) ->
- {MsgStatus, State};
-maybe_batch_write_index_to_disk(Force,
- MsgStatus = #msg_status {
- msg = Msg,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_props = MsgProps},
- State = #vqstate {
- target_ram_count = TargetRamCount,
- disk_write_count = DiskWriteCount,
- index_state = IndexState})
- when Force orelse IsPersistent ->
- {MsgOrId, DiskWriteCount1} =
- case persist_to(MsgStatus) of
- msg_store -> {MsgId, DiskWriteCount};
- queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
- end,
- IndexState1 = rabbit_queue_index:pre_publish(
- MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
- TargetRamCount, IndexState),
- {MsgStatus#msg_status{index_on_disk = true},
- State#vqstate{index_state = IndexState1,
- disk_write_count = DiskWriteCount1}};
-maybe_batch_write_index_to_disk(_Force, MsgStatus, State) ->
- {MsgStatus, State}.
-
-maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
- index_on_disk = true }, State) ->
- {MsgStatus, State};
-maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- msg = Msg,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_props = MsgProps},
- State = #vqstate{target_ram_count = TargetRamCount,
- disk_write_count = DiskWriteCount,
- index_state = IndexState})
- when Force orelse IsPersistent ->
- {MsgOrId, DiskWriteCount1} =
- case persist_to(MsgStatus) of
- msg_store -> {MsgId, DiskWriteCount};
- queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
- end,
- IndexState1 = rabbit_queue_index:publish(
- MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount,
- IndexState),
- IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1),
- {MsgStatus#msg_status{index_on_disk = true},
- State#vqstate{index_state = IndexState2,
- disk_write_count = DiskWriteCount1}};
-
-maybe_write_index_to_disk(_Force, MsgStatus, State) ->
- {MsgStatus, State}.
-
-maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
- {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
- maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
-
-maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
- {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
- maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1).
-
-determine_persist_to(#basic_message{
- content = #content{properties = Props,
- properties_bin = PropsBin}},
- #message_properties{size = BodySize},
- IndexMaxSize) ->
- %% The >= is so that you can set the env to 0 and never persist
- %% to the index.
- %%
- %% We want this to be fast, so we avoid size(term_to_binary())
- %% here, or using the term size estimation from truncate.erl, both
- %% of which are too slow. So instead, if the message body size
- %% goes over the limit then we avoid any other checks.
- %%
- %% If it doesn't we need to decide if the properties will push
- %% it past the limit. If we have the encoded properties (usual
- %% case) we can just check their size. If we don't (message came
- %% via the direct client), we make a guess based on the number of
- %% headers.
- case BodySize >= IndexMaxSize of
- true -> msg_store;
- false -> Est = case is_binary(PropsBin) of
- true -> BodySize + size(PropsBin);
- false -> #'P_basic'{headers = Hs} = Props,
- case Hs of
- undefined -> 0;
- _ -> length(Hs)
- end * ?HEADER_GUESS_SIZE + BodySize
- end,
- case Est >= IndexMaxSize of
- true -> msg_store;
- false -> queue_index
- end
- end.
-
-persist_to(#msg_status{persist_to = To}) -> To.
-
-prepare_to_store(Msg) ->
- Msg#basic_message{
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)}.
+ram_duration(State) ->
+ rabbit_variable_queue:ram_duration(State).
-%%----------------------------------------------------------------------------
-%% Internal gubbins for acks
-%%----------------------------------------------------------------------------
+needs_timeout(State) ->
+ rabbit_variable_queue:needs_timeout(State).
-record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
- State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA,
- ack_in_counter = AckInCount}) ->
- Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
- {RPA1, DPA1, QPA1} =
- case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of
- {false, _} -> {RPA, Insert(DPA), QPA};
- {_, queue_index} -> {RPA, DPA, Insert(QPA)};
- {_, msg_store} -> {Insert(RPA), DPA, QPA}
- end,
- State #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1,
- qi_pending_ack = QPA1,
- ack_in_counter = AckInCount + 1}.
-
-lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA}) ->
- case gb_trees:lookup(SeqId, RPA) of
- {value, V} -> V;
- none -> case gb_trees:lookup(SeqId, DPA) of
- {value, V} -> V;
- none -> gb_trees:get(SeqId, QPA)
- end
- end.
-
-%% First parameter = UpdateStats
-remove_pending_ack(true, SeqId, State) ->
- {MsgStatus, State1} = remove_pending_ack(false, SeqId, State),
- {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)};
-remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA}) ->
- case gb_trees:lookup(SeqId, RPA) of
- {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
- {V, State #vqstate { ram_pending_ack = RPA1 }};
- none -> case gb_trees:lookup(SeqId, DPA) of
- {value, V} ->
- DPA1 = gb_trees:delete(SeqId, DPA),
- {V, State#vqstate{disk_pending_ack = DPA1}};
- none ->
- QPA1 = gb_trees:delete(SeqId, QPA),
- {gb_trees:get(SeqId, QPA),
- State#vqstate{qi_pending_ack = QPA1}}
- end
- end.
-
-purge_pending_ack(KeepPersistent,
- State = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState }) ->
- {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State),
- case KeepPersistent of
- true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
- State1;
- false -> IndexState1 =
- rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
- remove_msgs_by_id(MsgIdsByStore, MSCState),
- State1 #vqstate { index_state = IndexState1 }
- end.
-
-purge_pending_ack_delete_and_terminate(
- State = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState }) ->
- {_, MsgIdsByStore, State1} = purge_pending_ack1(State),
- IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
- remove_msgs_by_id(MsgIdsByStore, MSCState),
- State1 #vqstate { index_state = IndexState1 }.
-
-purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
- {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- rabbit_misc:gb_trees_fold(
- F, rabbit_misc:gb_trees_fold(
- F, rabbit_misc:gb_trees_fold(
- F, accumulate_ack_init(), RPA), DPA), QPA),
- State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty(),
- qi_pending_ack = gb_trees:empty()},
- {IndexOnDiskSeqIds, MsgIdsByStore, State1}.
-
-%% MsgIdsByStore is an orddict with two keys:
-%%
-%% true: holds a list of Persistent Message Ids.
-%% false: holds a list of Transient Message Ids.
-%%
-%% When we call orddict:to_list/1 we get two sets of msg ids, where
-%% IsPersistent is either true for persistent messages or false for
-%% transient ones. The msg_store_remove/3 function takes this boolean
-%% flag to determine from which store the messages should be removed
-%% from.
-remove_msgs_by_id(MsgIdsByStore, MSCState) ->
- [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)].
-
-remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
- case orddict:find(false, MsgIdsByStore) of
- error -> ok;
- {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
- end.
-
-accumulate_ack_init() -> {[], orddict:new(), []}.
-
-accumulate_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
- {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
- case MsgInStore of
- true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
- false -> MsgIdsByStore
- end,
- [MsgId | AllMsgIds]}.
+timeout(State) ->
+ rabbit_variable_queue:timeout(State).
-%%----------------------------------------------------------------------------
-%% Internal plumbing for confirms (aka publisher acks)
-%%----------------------------------------------------------------------------
+handle_pre_hibernate(State) ->
+ rabbit_variable_queue:handle_pre_hibernate(State).
-record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC,
- confirmed = C }) ->
- State #vqstate {
- msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
- msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
- unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
- confirmed = gb_sets:union(C, MsgIdSet) }.
-
-msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
- Callback(?MODULE,
- fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
-msgs_written_to_disk(Callback, MsgIdSet, written) ->
- Callback(?MODULE,
- fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Confirmed = gb_sets:intersection(UC, MsgIdSet),
- record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(MOD, Confirmed) })
- end).
-
-msg_indices_written_to_disk(Callback, MsgIdSet) ->
- Callback(?MODULE,
- fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Confirmed = gb_sets:intersection(UC, MsgIdSet),
- record_confirms(gb_sets:intersection(MsgIdSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(MIOD, Confirmed) })
- end).
-
-msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
- Callback(?MODULE,
- fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
+resume(State) -> rabbit_variable_queue:resume(State).
-%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue
-%%----------------------------------------------------------------------------
+msg_rates(State) ->
+ rabbit_variable_queue:msg_rates(State).
-publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- {Msg, State1} = read_msg(MsgStatus, State),
- MsgStatus1 = MsgStatus#msg_status { msg = Msg },
- {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)};
-publish_alpha(MsgStatus, State) ->
- {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}.
-
-publish_beta(MsgStatus, State) ->
- {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
-
-%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
- queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
- Limit, PubFun, State).
-
-queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
- Limit, PubFun, State)
- when Limit == undefined orelse SeqId < Limit ->
- case ?QUEUE:out(Q) of
- {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
- when SeqIdQ < SeqId ->
- %% enqueue from the remaining queue
- queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
- Limit, PubFun, State);
- {_, _Q1} ->
- %% enqueue from the remaining list of sequence ids
- {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
- {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
- PubFun(MsgStatus, State1),
- queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
- end;
-queue_merge(SeqIds, Q, Front, MsgIds,
- _Limit, _PubFun, State) ->
- {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
-
-delta_merge([], Delta, MsgIds, State) ->
- {Delta, MsgIds, State};
-delta_merge(SeqIds, Delta, MsgIds, State) ->
- lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
- {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
- msg_from_pending_ack(SeqId, State0),
- {_MsgStatus, State2} =
- maybe_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- stats({1, -1}, {MsgStatus, none}, State2)}
- end, {Delta, MsgIds, State}, SeqIds).
-
-%% Mostly opposite of record_pending_ack/2
-msg_from_pending_ack(SeqId, State) ->
- {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
- remove_pending_ack(false, SeqId, State),
- {MsgStatus #msg_status {
- msg_props = MsgProps #message_properties { needs_confirming = false } },
- State1}.
-
-beta_limit(Q) ->
- case ?QUEUE:peek(Q) of
- {value, #msg_status { seq_id = SeqId }} -> SeqId;
- empty -> undefined
- end.
-
-delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
-delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+info(Info, State) ->
+ rabbit_variable_queue:info(Info, State).
-%%----------------------------------------------------------------------------
-%% Iterator
-%%----------------------------------------------------------------------------
+invoke(Module, Fun, State) -> rabbit_variable_queue:invoke(Module, Fun, State).
-ram_ack_iterator(State) ->
- {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
-
-disk_ack_iterator(State) ->
- {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
-
-qi_ack_iterator(State) ->
- {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
-
-msg_iterator(State) -> istate(start, State).
-
-istate(start, State) -> {q4, State#vqstate.q4, State};
-istate(q4, State) -> {q3, State#vqstate.q3, State};
-istate(q3, State) -> {delta, State#vqstate.delta, State};
-istate(delta, State) -> {q2, State#vqstate.q2, State};
-istate(q2, State) -> {q1, State#vqstate.q1, State};
-istate(q1, _State) -> done.
-
-next({ack, It}, IndexState) ->
- case gb_trees:next(It) of
- none -> {empty, IndexState};
- {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
- {value, MsgStatus, true, Next, IndexState}
- end;
-next(done, IndexState) -> {empty, IndexState};
-next({delta, #delta{start_seq_id = SeqId,
- end_seq_id = SeqId}, State}, IndexState) ->
- next(istate(delta, State), IndexState);
-next({delta, #delta{start_seq_id = SeqId,
- end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
- SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
- SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
- next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
-next({delta, Delta, [], State}, IndexState) ->
- next({delta, Delta, State}, IndexState);
-next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
- case is_msg_in_pending_acks(SeqId, State) of
- false -> Next = {delta, Delta, Rest, State},
- {value, beta_msg_status(M), false, Next, IndexState};
- true -> next({delta, Delta, Rest, State}, IndexState)
- end;
-next({Key, Q, State}, IndexState) ->
- case ?QUEUE:out(Q) of
- {empty, _Q} -> next(istate(Key, State), IndexState);
- {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
- {value, MsgStatus, false, Next, IndexState}
- end.
-
-inext(It, {Its, IndexState}) ->
- case next(It, IndexState) of
- {empty, IndexState1} ->
- {Its, IndexState1};
- {value, MsgStatus1, Unacked, It1, IndexState1} ->
- {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
- end.
-
-ifold(_Fun, Acc, [], State) ->
- {Acc, State};
-ifold(Fun, Acc, Its, State) ->
- [{MsgStatus, Unacked, It} | Rest] =
- lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
- {#msg_status{seq_id = SeqId2}, _, _}) ->
- SeqId1 =< SeqId2
- end, Its),
- {Msg, State1} = read_msg(MsgStatus, State),
- case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
- {stop, Acc1} ->
- {Acc1, State};
- {cont, Acc1} ->
- {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
- ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
- end.
+is_duplicate(Msg, State) -> rabbit_variable_queue:is_duplicate(Msg, State).
-%%----------------------------------------------------------------------------
-%% Phase changes
-%%----------------------------------------------------------------------------
+set_queue_mode(Mode, State) ->
+ rabbit_variable_queue:set_queue_mode(Mode, State).
-reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
- State;
-reduce_memory_use(State = #vqstate {
- mode = default,
- ram_pending_ack = RPA,
- ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount,
- io_batch_size = IoBatchSize,
- rates = #rates { in = AvgIngress,
- out = AvgEgress,
- ack_in = AvgAckIngress,
- ack_out = AvgAckEgress } }) ->
-
- State1 = #vqstate { q2 = Q2, q3 = Q3 } =
- case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
- 0 -> State;
- %% Reduce memory of pending acks and alphas. The order is
- %% determined based on which is growing faster. Whichever
- %% comes second may very well get a quota of 0 if the
- %% first manages to push out the max number of messages.
- S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
- (AvgIngress - AvgEgress)) of
- true -> [fun limit_ram_acks/2,
- fun push_alphas_to_betas/2];
- false -> [fun push_alphas_to_betas/2,
- fun limit_ram_acks/2]
- end,
- {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
- ReduceFun(QuotaN, StateN)
- end, {S1, State}, Funs),
- State2
- end,
-
- State3 =
- case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- S2 when S2 >= IoBatchSize ->
- %% There is an implicit, but subtle, upper bound here. We
- %% may shuffle a lot of messages from Q2/3 into delta, but
- %% the number of these that require any disk operation,
- %% namely index writing, i.e. messages that are genuine
- %% betas and not gammas, is bounded by the credit_flow
- %% limiting of the alpha->beta conversion above.
- push_betas_to_deltas(S2, State1);
- _ ->
- State1
- end,
- %% See rabbitmq-server-290 for the reasons behind this GC call.
- garbage_collect(),
- State3;
-%% When using lazy queues, there are no alphas, so we don't need to
-%% call push_alphas_to_betas/2.
-reduce_memory_use(State = #vqstate {
- mode = lazy,
- ram_pending_ack = RPA,
- ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount }) ->
- State1 = #vqstate { q3 = Q3 } =
- case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
- 0 -> State;
- S1 -> {_, State2} = limit_ram_acks(S1, State),
- State2
- end,
-
- State3 =
- case chunk_size(?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- 0 ->
- State1;
- S2 ->
- push_betas_to_deltas(S2, State1)
- end,
- garbage_collect(),
- State3.
-
-limit_ram_acks(0, State) ->
- {0, ui(State)};
-limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
- case gb_trees:is_empty(RPA) of
- true ->
- {Quota, ui(State)};
- false ->
- {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
- {MsgStatus1, State1} =
- maybe_prepare_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
- limit_ram_acks(Quota - 1,
- stats({0, 0}, {MsgStatus, MsgStatus2},
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 }))
- end.
-
-permitted_beta_count(#vqstate { len = 0 }) ->
- infinity;
-permitted_beta_count(#vqstate { mode = lazy,
- target_ram_count = TargetRamCount}) ->
- TargetRamCount;
-permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
- lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
-permitted_beta_count(#vqstate { q1 = Q1,
- q4 = Q4,
- target_ram_count = TargetRamCount,
- len = Len }) ->
- BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
- lists:max([rabbit_queue_index:next_segment_boundary(0),
- BetaDelta - ((BetaDelta * BetaDelta) div
- (BetaDelta + TargetRamCount))]).
-
-chunk_size(Current, Permitted)
- when Permitted =:= infinity orelse Permitted >= Current ->
- 0;
-chunk_size(Current, Permitted) ->
- Current - Permitted.
-
-fetch_from_q3(State = #vqstate { mode = default,
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4 }) ->
- case ?QUEUE:out(Q3) of
- {empty, _Q3} ->
- {empty, State};
- {{value, MsgStatus}, Q3a} ->
- State1 = State #vqstate { q3 = Q3a },
- State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
- {true, true} ->
- %% q3 is now empty, it wasn't before;
- %% delta is still empty. So q2 must be
- %% empty, and we know q4 is empty
- %% otherwise we wouldn't be loading from
- %% q3. As such, we can just set q4 to Q1.
- true = ?QUEUE:is_empty(Q2), %% ASSERTION
- true = ?QUEUE:is_empty(Q4), %% ASSERTION
- State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
- {true, false} ->
- maybe_deltas_to_betas(State1);
- {false, _} ->
- %% q3 still isn't empty, we've not
- %% touched delta, so the invariants
- %% between q1, q2, delta and q3 are
- %% maintained
- State1
- end,
- {loaded, {MsgStatus, State2}}
- end;
-%% lazy queues
-fetch_from_q3(State = #vqstate { mode = lazy,
- delta = #delta { count = DeltaCount },
- q3 = Q3 }) ->
- case ?QUEUE:out(Q3) of
- {empty, _Q3} when DeltaCount =:= 0 ->
- {empty, State};
- {empty, _Q3} ->
- fetch_from_q3(maybe_deltas_to_betas(State));
- {{value, MsgStatus}, Q3a} ->
- State1 = State #vqstate { q3 = Q3a },
- {loaded, {MsgStatus, State1}}
- end.
-
-maybe_deltas_to_betas(State) ->
- AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
- maybe_deltas_to_betas(AfterFun, State).
-
-maybe_deltas_to_betas(_DelsAndAcksFun,
- State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) ->
- State;
-maybe_deltas_to_betas(DelsAndAcksFun,
- State = #vqstate {
- q2 = Q2,
- delta = Delta,
- q3 = Q3,
- index_state = IndexState,
- ram_msg_count = RamMsgCount,
- ram_bytes = RamBytes,
- disk_read_count = DiskReadCount,
- transient_threshold = TransientThreshold }) ->
- #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount,
- end_seq_id = DeltaSeqIdEnd } = Delta,
- DeltaSeqId1 =
- lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
- {Q3a, RamCountsInc, RamBytesInc, State1} =
- betas_from_index_entries(List, TransientThreshold,
- DelsAndAcksFun,
- State #vqstate { index_state = IndexState1 }),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
- ram_bytes = RamBytes + RamBytesInc,
- disk_read_count = DiskReadCount + RamCountsInc },
- case ?QUEUE:len(Q3a) of
- 0 ->
- %% we ignored every message in the segment due to it being
- %% transient and below the threshold
- maybe_deltas_to_betas(
- DelsAndAcksFun,
- State2 #vqstate {
- delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
- Q3aLen ->
- Q3b = ?QUEUE:join(Q3, Q3a),
- case DeltaCount - Q3aLen of
- 0 ->
- %% delta is now empty, but it wasn't before, so
- %% can now join q2 onto q3
- State2 #vqstate { q2 = ?QUEUE:new(),
- delta = ?BLANK_DELTA,
- q3 = ?QUEUE:join(Q3b, Q2) };
- N when N > 0 ->
- Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
- count = N,
- end_seq_id = DeltaSeqIdEnd }),
- State2 #vqstate { delta = Delta1,
- q3 = Q3b }
- end
- end.
-
-push_alphas_to_betas(Quota, State) ->
- {Quota1, State1} =
- push_alphas_to_betas(
- fun ?QUEUE:out/1,
- fun (MsgStatus, Q1a,
- State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
- State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
- (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
- State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
- end, Quota, State #vqstate.q1, State),
- {Quota2, State2} =
- push_alphas_to_betas(
- fun ?QUEUE:out_r/1,
- fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
- State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
- end, Quota1, State1 #vqstate.q4, State1),
- {Quota2, State2}.
-
-push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
- State = #vqstate { ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount })
- when Quota =:= 0 orelse
- TargetRamCount =:= infinity orelse
- TargetRamCount >= RamMsgCount ->
- {Quota, ui(State)};
-push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
- %% We consume credits from the message_store whenever we need to
- %% persist a message to disk. See:
- %% rabbit_variable_queue:msg_store_write/4. So perhaps the
- %% msg_store is trying to throttle down our queue.
- case credit_flow:blocked() of
- true -> {Quota, ui(State)};
- false -> case Generator(Q) of
- {empty, _Q} ->
- {Quota, ui(State)};
- {{value, MsgStatus}, Qa} ->
- {MsgStatus1, State1} =
- maybe_prepare_write_to_disk(true, false, MsgStatus,
- State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = stats(
- ready0, {MsgStatus, MsgStatus2}, State1),
- State3 = Consumer(MsgStatus2, Qa, State2),
- push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State3)
- end
- end.
-
-push_betas_to_deltas(Quota, State = #vqstate { mode = default,
- q2 = Q2,
- delta = Delta,
- q3 = Q3}) ->
- PushState = {Quota, Delta, State},
- {Q3a, PushState1} = push_betas_to_deltas(
- fun ?QUEUE:out_r/1,
- fun rabbit_queue_index:next_segment_boundary/1,
- Q3, PushState),
- {Q2a, PushState2} = push_betas_to_deltas(
- fun ?QUEUE:out/1,
- fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, PushState1),
- {_, Delta1, State1} = PushState2,
- State1 #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a };
-%% In the case of lazy queues we want to page as many messages as
-%% possible from q3.
-push_betas_to_deltas(Quota, State = #vqstate { mode = lazy,
- delta = Delta,
- q3 = Q3}) ->
- PushState = {Quota, Delta, State},
- {Q3a, PushState1} = push_betas_to_deltas(
- fun ?QUEUE:out_r/1,
- fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q3, PushState),
- {_, Delta1, State1} = PushState1,
- State1 #vqstate { delta = Delta1,
- q3 = Q3a }.
-
-
-push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
- case ?QUEUE:is_empty(Q) of
- true ->
- {Q, PushState};
- false ->
- {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
- {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
- Limit = LimitFun(MinSeqId),
- case MaxSeqId < Limit of
- true -> {Q, PushState};
- false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
- end
- end.
-
-push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) ->
- {Q, {0, Delta, ui(State)}};
-push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
- case Generator(Q) of
- {empty, _Q} ->
- {Q, {Quota, Delta, ui(State)}};
- {{value, #msg_status { seq_id = SeqId }}, _Qa}
- when SeqId < Limit ->
- {Q, {Quota, Delta, ui(State)}};
- {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
- {#msg_status { index_on_disk = true }, State1} =
- maybe_batch_write_index_to_disk(true, MsgStatus, State),
- State2 = stats(ready0, {MsgStatus, none}, State1),
- Delta1 = expand_delta(SeqId, Delta),
- push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota - 1, Delta1, State2})
- end.
-
-%% Flushes queue index batch caches and updates queue index state.
-ui(#vqstate{index_state = IndexState,
- target_ram_count = TargetRamCount} = State) ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- State#vqstate{index_state = IndexState1}.
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, State) ->
+ rabbit_variable_queue:zip_msgs_and_acks(Msgs, AckTags, Accumulator, State).
%% Delay
maybe_delay(QPA) ->
@@ -2430,27 +327,3 @@ is_timeout_test([#msg_status{
_ -> is_timeout_test(Rem)
end;
is_timeout_test([_|Rem]) -> is_timeout_test(Rem).
-
-%%----------------------------------------------------------------------------
-%% Upgrading
-%%----------------------------------------------------------------------------
-
-multiple_routing_keys() ->
- transform_storage(
- fun ({basic_message, ExchangeName, Routing_Key, Content,
- MsgId, Persistent}) ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- MsgId, Persistent}};
- (_) -> {error, corrupt_message}
- end),
- ok.
-
-
-%% Assumes message store is not running
-transform_storage(TransformFun) ->
- transform_store(?PERSISTENT_MSG_STORE, TransformFun),
- transform_store(?TRANSIENT_MSG_STORE, TransformFun).
-
-transform_store(Store, TransformFun) ->
- rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
- rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl
index efc5ca830e..68deec12cb 100644
--- a/test/per_vhost_connection_limit_SUITE.erl
+++ b/test/per_vhost_connection_limit_SUITE.erl
@@ -39,8 +39,7 @@ groups() ->
single_node_single_vhost_limit,
single_node_single_vhost_zero_limit,
single_node_multiple_vhosts_limit,
- single_node_multiple_vhosts_zero_limit,
- single_node_vhost_deletion_forces_connection_closure
+ single_node_multiple_vhosts_zero_limit
],
ClusterSize2Tests = [
most_basic_cluster_connection_count,
@@ -51,8 +50,7 @@ groups() ->
cluster_single_vhost_limit,
cluster_single_vhost_limit2,
cluster_single_vhost_zero_limit,
- cluster_multiple_vhosts_zero_limit,
- cluster_vhost_deletion_forces_connection_closure
+ cluster_multiple_vhosts_zero_limit
],
[
{cluster_size_1_network, [], ClusterSize1Tests},
@@ -639,57 +637,6 @@ cluster_multiple_vhosts_zero_limit(Config) ->
set_vhost_connection_limit(Config, VHost1, -1),
set_vhost_connection_limit(Config, VHost2, -1).
-
-single_node_vhost_deletion_forces_connection_closure(Config) ->
- VHost1 = <<"vhost1">>,
- VHost2 = <<"vhost2">>,
-
- set_up_vhost(Config, VHost1),
- set_up_vhost(Config, VHost2),
-
- ?assertEqual(0, count_connections_in(Config, VHost1)),
- ?assertEqual(0, count_connections_in(Config, VHost2)),
-
- [Conn1] = open_connections(Config, [{0, VHost1}]),
- ?assertEqual(1, count_connections_in(Config, VHost1)),
-
- [_Conn2] = open_connections(Config, [{0, VHost2}]),
- ?assertEqual(1, count_connections_in(Config, VHost2)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
- timer:sleep(200),
- ?assertEqual(0, count_connections_in(Config, VHost2)),
-
- close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
-
-cluster_vhost_deletion_forces_connection_closure(Config) ->
- VHost1 = <<"vhost1">>,
- VHost2 = <<"vhost2">>,
-
- set_up_vhost(Config, VHost1),
- set_up_vhost(Config, VHost2),
-
- ?assertEqual(0, count_connections_in(Config, VHost1)),
- ?assertEqual(0, count_connections_in(Config, VHost2)),
-
- [Conn1] = open_connections(Config, [{0, VHost1}]),
- ?assertEqual(1, count_connections_in(Config, VHost1)),
-
- [_Conn2] = open_connections(Config, [{1, VHost2}]),
- ?assertEqual(1, count_connections_in(Config, VHost2)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
- timer:sleep(200),
- ?assertEqual(0, count_connections_in(Config, VHost2)),
-
- close_connections([Conn1]),
- ?assertEqual(0, count_connections_in(Config, VHost1)),
-
- rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
-
vhost_limit_after_node_renamed(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index 0ef86717ac..47d47290a4 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -24,7 +24,8 @@
all() ->
[
- {group, non_parallel_tests}
+ {group, non_parallel_tests},
+ {group, cluster_tests}
].
groups() ->
@@ -37,7 +38,8 @@ groups() ->
gen_server2_metrics,
consumer_metrics
]
- }
+ },
+ {cluster_tests, [], [cluster_queue_metrics]}
].
%% -------------------------------------------------------------------
@@ -45,33 +47,27 @@ groups() ->
%% -------------------------------------------------------------------
merge_app_env(Config) ->
- rabbit_ct_helpers:merge_app_env(Config,
- {rabbit, [
- {core_metrics_gc_interval, 6000000},
- {collect_statistics_interval, 100},
- {collect_statistics, fine}
- ]}).
-
-init_per_suite(Config) ->
+ AppEnv = {rabbit, [{core_metrics_gc_interval, 6000000},
+ {collect_statistics_interval, 100},
+ {collect_statistics, fine}]},
+ rabbit_ct_helpers:merge_app_env(Config, AppEnv).
+
+init_per_group(cluster_tests, Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Conf = [{rmq_nodename_suffix, cluster_tests}, {rmq_nodes_count, 2}],
+ Config1 = rabbit_ct_helpers:set_config(Config, Conf),
+ rabbit_ct_helpers:run_setup_steps(Config1, setup_steps());
+init_per_group(non_parallel_tests, Config) ->
rabbit_ct_helpers:log_environment(),
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, ?MODULE}
- ]),
- rabbit_ct_helpers:run_setup_steps(
- Config1,
- [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()).
-
-end_per_suite(Config) ->
+ Conf = [{rmq_nodename_suffix, non_parallel_tests}],
+ Config1 = rabbit_ct_helpers:set_config(Config, Conf),
+ rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()).
+
+end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_broker_helpers:teardown_steps()).
-init_per_group(_, Config) ->
- Config.
-
-end_per_group(_, Config) ->
- Config.
-
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_helpers:run_steps(Config,
@@ -83,8 +79,11 @@ end_per_testcase(Testcase, Config) ->
Config,
rabbit_ct_client_helpers:teardown_steps()).
+setup_steps() ->
+ [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps().
+
%% -------------------------------------------------------------------
-%% Testcases.
+%% Single-node Testcases.
%% -------------------------------------------------------------------
queue_metrics(Config) ->
@@ -329,3 +328,74 @@ x(Name) ->
#resource{ virtual_host = <<"/">>,
kind = exchange,
name = Name }.
+
+%% -------------------------------------------------------------------
+%% Cluster Testcases.
+%% -------------------------------------------------------------------
+
+cluster_queue_metrics(Config) ->
+ VHost = <<"/">>,
+ QueueName = <<"cluster_queue_metrics">>,
+ PolicyName = <<"ha-policy-1">>,
+ PolicyPattern = <<".*">>,
+ PolicyAppliesTo = <<"queues">>,
+
+ Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node0),
+
+ Node0Name = rabbit_data_coercion:to_binary(Node0),
+ Definition0 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node0Name]}],
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
+ PolicyName, PolicyPattern,
+ PolicyAppliesTo, Definition0),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueName}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = <<"hello">>}),
+
+ % Update policy to point to other node
+ Node1Name = rabbit_data_coercion:to_binary(Node1),
+ Definition1 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node1Name]}],
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
+ PolicyName, PolicyPattern,
+ PolicyAppliesTo, Definition1),
+
+ % Synchronize
+ Name = rabbit_misc:r(VHost, queue, QueueName),
+ [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
+ ets, lookup,
+ [rabbit_queue, Name]),
+ ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
+ sync_mirrors, [QPid]),
+
+ % Check ETS table for data
+ wait_for(fun () ->
+ [] =:= rabbit_ct_broker_helpers:rpc(
+ Config, Node0, ets, tab2list,
+ [queue_coarse_metrics])
+ end, 60),
+
+ wait_for(fun () ->
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, Node1, ets, tab2list,
+ [queue_coarse_metrics]),
+ case Ret of
+ [{Name, 1, 0, 1, _}] -> true;
+ _ -> false
+ end
+ end, 60),
+
+ amqp_channel:call(Ch, #'queue.delete'{queue=QueueName}),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ Config.
+
+wait_for(_Fun, 0) -> false;
+wait_for(Fun, Seconds) ->
+ case Fun() of
+ true -> ok;
+ false ->
+ timer:sleep(1000),
+ wait_for(Fun, Seconds - 1)
+ end.
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 453f4b2e72..dd8cd48b5a 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -46,6 +46,8 @@ groups() ->
password_hashing,
change_password
]},
+ set_disk_free_limit_command,
+ set_vm_memory_high_watermark_command,
topic_matching
]}
].
@@ -972,6 +974,50 @@ configurable_server_properties1(_Config) ->
application:set_env(rabbit, server_properties, ServerProperties),
passed.
+set_disk_free_limit_command(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, set_disk_free_limit_command1, [Config]).
+
+set_disk_free_limit_command1(_Config) ->
+ rabbit_disk_monitor:set_disk_free_limit("2000kiB"),
+ 2048000 = rabbit_disk_monitor:get_disk_free_limit(),
+
+ %% Use an integer
+ rabbit_disk_monitor:set_disk_free_limit({mem_relative, 1}),
+ disk_free_limit_to_total_memory_ratio_is(1),
+
+ %% Use a float
+ rabbit_disk_monitor:set_disk_free_limit({mem_relative, 1.5}),
+ disk_free_limit_to_total_memory_ratio_is(1.5),
+
+ rabbit_disk_monitor:set_disk_free_limit("50MB"),
+ passed.
+
+disk_free_limit_to_total_memory_ratio_is(MemRatio) ->
+ ExpectedLimit = MemRatio * vm_memory_monitor:get_total_memory(),
+ % Total memory is unstable, so checking order
+ true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() < 1.2,
+ true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() > 0.98.
+
+set_vm_memory_high_watermark_command(Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, set_vm_memory_high_watermark_command1, [Config]).
+
+set_vm_memory_high_watermark_command1(_Config) ->
+ MemLimitRatio = 1.0,
+ MemTotal = vm_memory_monitor:get_total_memory(),
+
+ vm_memory_monitor:set_vm_memory_high_watermark(MemLimitRatio),
+ MemLimit = vm_memory_monitor:get_memory_limit(),
+ case MemLimit of
+ MemTotal -> ok;
+ _ -> MemTotalToMemLimitRatio = MemLimit * 100.0 / MemTotal / 100,
+ ct:fail(
+ "Expected memory high watermark to be ~p (~s), but it was ~p (~.1f)",
+ [MemTotal, MemLimitRatio, MemLimit, MemTotalToMemLimitRatio]
+ )
+ end.
+
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
new file mode 100644
index 0000000000..a519d01af5
--- /dev/null
+++ b/test/vhost_SUITE.erl
@@ -0,0 +1,376 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(vhost_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_1_network},
+ {group, cluster_size_2_network},
+ {group, cluster_size_1_direct},
+ {group, cluster_size_2_direct}
+ ].
+
+groups() ->
+ ClusterSize1Tests = [
+ single_node_vhost_deletion_forces_connection_closure,
+ vhost_failure_forces_connection_closure,
+ dead_vhost_connection_refused
+ ],
+ ClusterSize2Tests = [
+ cluster_vhost_deletion_forces_connection_closure,
+ vhost_failure_forces_connection_closure,
+ dead_vhost_connection_refused,
+ vhost_failure_forces_connection_closure_on_failure_node,
+ dead_vhost_connection_refused_on_failure_node
+ ],
+ [
+ {cluster_size_1_network, [], ClusterSize1Tests},
+ {cluster_size_2_network, [], ClusterSize2Tests},
+ {cluster_size_1_direct, [], ClusterSize1Tests},
+ {cluster_size_2_direct, [], ClusterSize2Tests}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% see partitions_SUITE
+-define(DELAY, 9000).
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_1_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_1_network, Config1, 1);
+init_per_group(cluster_size_2_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_2_network, Config1, 2);
+init_per_group(cluster_size_1_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_1_direct, Config1, 1);
+init_per_group(cluster_size_2_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_2_direct, Config1, 2).
+
+init_per_multinode_group(Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ clear_all_connection_tracking_tables(Config),
+ Config.
+
+end_per_testcase(Testcase, Config) ->
+ clear_all_connection_tracking_tables(Config),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+clear_all_connection_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_connection_tracking,
+ clear_tracked_connection_tables_for_this_node,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+single_node_vhost_deletion_forces_connection_closure(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [_Conn2] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+
+vhost_failure_forces_connection_closure(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [_Conn2] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ force_vhost_failure(Config, VHost2),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+
+dead_vhost_connection_refused(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ force_vhost_failure(Config, VHost2),
+ timer:sleep(200),
+
+ [_Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [_Conn2] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ expect_that_client_connection_is_rejected(Config, 0, VHost2),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+
+
+vhost_failure_forces_connection_closure_on_failure_node(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [_Conn20] = open_connections(Config, [{0, VHost2}]),
+ [_Conn21] = open_connections(Config, [{1, VHost2}]),
+ ?assertEqual(2, count_connections_in(Config, VHost2)),
+
+ force_vhost_failure(Config, 0, VHost2),
+ timer:sleep(200),
+ %% Vhost2 connection on node 1 is still alive
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+ %% Vhost1 connection on node 0 is still alive
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+
+dead_vhost_connection_refused_on_failure_node(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ force_vhost_failure(Config, 0, VHost2),
+ timer:sleep(200),
+ %% Can open connections to vhost1 on node 0 and 1
+ [_Conn10] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+ [_Conn11] = open_connections(Config, [{1, VHost1}]),
+ ?assertEqual(2, count_connections_in(Config, VHost1)),
+
+ %% Connection on vhost2 on node 0 is refused
+ [_Conn20] = open_connections(Config, [{0, VHost2}]),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ expect_that_client_connection_is_rejected(Config, 0, VHost2),
+
+ %% Can open connections to vhost2 on node 1
+ [_Conn21] = open_connections(Config, [{1, VHost2}]),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+
+force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost).
+
+force_vhost_failure(Config, Node, VHost) ->
+ force_vhost_failure(Config, Node, VHost, 10).
+
+force_vhost_failure(_Config, _Node, VHost, 0) ->
+ error({failed_to_force_vhost_failure, no_more_attempts_left, VHost});
+force_vhost_failure(Config, Node, VHost, Attempts) ->
+ MessageStorePid = get_message_store_pid(Config, VHost),
+ rabbit_ct_broker_helpers:rpc(Config, Node,
+ erlang, exit,
+ [MessageStorePid, force_vhost_failure]),
+ %% Give it a time to fail
+ timer:sleep(200),
+ case rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_vhost_sup_sup, is_vhost_alive,
+ [VHost]) of
+ true -> force_vhost_failure(Config, Node, VHost, Attempts - 1);
+ false -> ok
+ end.
+
+get_message_store_pid(Config, VHost) ->
+ {ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_vhost_sup_sup, vhost_sup, [VHost]),
+ Children = rabbit_ct_broker_helpers:rpc(Config, 0,
+ supervisor, which_children,
+ [VHostSup]),
+ [MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children,
+ Name == msg_store_persistent],
+ MsgStorePid.
+
+cluster_vhost_deletion_forces_connection_closure(Config) ->
+ VHost1 = <<"vhost1">>,
+ VHost2 = <<"vhost2">>,
+
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ [Conn1] = open_connections(Config, [{0, VHost1}]),
+ ?assertEqual(1, count_connections_in(Config, VHost1)),
+
+ [_Conn2] = open_connections(Config, [{1, VHost2}]),
+ ?assertEqual(1, count_connections_in(Config, VHost2)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, VHost2)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, VHost1)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1).
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+open_connections(Config, NodesAndVHosts) ->
+ % Randomly select connection type
+ OpenConnectionFun = case ?config(connection_type, Config) of
+ network -> open_unmanaged_connection;
+ direct -> open_unmanaged_connection_direct
+ end,
+ Conns = lists:map(fun
+ ({Node, VHost}) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
+ VHost);
+ (Node) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
+ end, NodesAndVHosts),
+ timer:sleep(500),
+ Conns.
+
+close_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ rabbit_ct_client_helpers:close_connection(Conn)
+ end, Conns),
+ timer:sleep(500).
+
+count_connections_in(Config, VHost) ->
+ count_connections_in(Config, VHost, 0).
+count_connections_in(Config, VHost, NodeIndex) ->
+ timer:sleep(200),
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_connection_tracking,
+ count_connections_in, [VHost]).
+
+set_up_vhost(Config, VHost) ->
+ rabbit_ct_broker_helpers:add_vhost(Config, VHost),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
+ set_vhost_connection_limit(Config, VHost, -1).
+
+set_vhost_connection_limit(Config, VHost, Count) ->
+ set_vhost_connection_limit(Config, 0, VHost, Count).
+
+set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ ok = rabbit_ct_broker_helpers:control_action(
+ set_vhost_limits, Node,
+ ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
+ [{"-p", binary_to_list(VHost)}]).
+
+expect_that_client_connection_is_rejected(Config) ->
+ expect_that_client_connection_is_rejected(Config, 0).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex) ->
+ {error, _} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) ->
+ {error, _} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost).