diff options
| author | Michael Klishin <michael@novemberain.com> | 2017-07-28 21:37:42 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-07-28 21:37:42 +0300 |
| commit | e84ba4ca9329243879f51bf147f079519171c81f (patch) | |
| tree | 566e8a702fe706a0e4cfaa67f5ed7b2adc932d83 | |
| parent | 118666d7caba0e494ec3f8144c0a35e12130a9a1 (diff) | |
| parent | 77101e7fadaac0bf12b862c6c6cf48e08eb97e4b (diff) | |
| download | rabbitmq-server-git-e84ba4ca9329243879f51bf147f079519171c81f.tar.gz | |
Merge branch 'master' into fix-travis-ci-build
39 files changed, 1578 insertions, 2835 deletions
diff --git a/.travis.yml b/.travis.yml index 72810e630a..d9de14a7a4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,14 @@ addons: - unixodbc-dev - libwxgtk2.8-dev +otp_release: + - "19.2" + - "19.3" + - "20.0" + +services: + - docker + env: - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=0 - OTP_TAG_NAME=OTP-20.0 TEST_SUITE=1 @@ -112,17 +112,19 @@ define PROJECT_ENV {passphrase, undefined} ]}, - %% rabbitmq-server-973 + %% rabbitmq-server#973 {queue_explicit_gc_run_operation_threshold, 1000}, {lazy_queue_explicit_gc_run_operation_threshold, 1000}, {background_gc_enabled, false}, {background_gc_target_interval, 60000}, - %% rabbitmq-server-589 + %% rabbitmq-server#589 {proxy_protocol, false}, {disk_monitor_failure_retries, 10}, {disk_monitor_failure_retry_interval, 120000}, - %% either "stop_node" or "ignore" - {vhost_restart_strategy, stop_node} + %% either "stop_node" or "continue". + %% by default we choose to not terminate the entire node if one + %% vhost had to shut down, see server#1158 and server#1280 + {vhost_restart_strategy, continue} ] endef diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index e4c2fff92a..68ce59326e 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -252,8 +252,18 @@ ## Fraction of the high watermark limit at which queues start to ## page message out to disc in order to free up memory. +## For example, when vm_memory_high_watermark is set to 0.4 and this value is set to 0.5, +## paging can begin as early as when 20% of total available RAM is used by the node. ## -## Values greater than 0.9 can be dangerous and should be used carefully. +## Values greater than 1.0 can be dangerous and should be used carefully. +## +## One alternative to this is to use durable queues and publish messages +## as persistent (delivery mode = 2). With this combination queues will +## move messages to disk much more rapidly. +## +## Another alternative is to configure queues to page all messages (both +## persistent and transient) to disk as quickly +## as possible, see http://www.rabbitmq.com/lazy-queues.html. ## # vm_memory_high_watermark_paging_ratio = 0.5 diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 4135c5053c..b7f08afc78 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -236,8 +236,18 @@ %% Fraction of the high watermark limit at which queues start to %% page message out to disc in order to free up memory. + %% For example, when vm_memory_high_watermark is set to 0.4 and this value is set to 0.5, + %% paging can begin as early as when 20% of total available RAM is used by the node. %% - %% Values greater than 0.9 can be dangerous and should be used carefully. + %% Values greater than 1.0 can be dangerous and should be used carefully. + %% + %% One alternative to this is to use durable queues and publish messages + %% as persistent (delivery mode = 2). With this combination queues will + %% move messages to disk much more rapidly. + %% + %% Another alternative is to configure queues to page all messages (both + %% persistent and transient) to disk as quickly + %% as possible, see http://www.rabbitmq.com/lazy-queues.html. %% %% {vm_memory_high_watermark_paging_ratio, 0.5}, diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index c503548187..70351f116c 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -976,11 +976,11 @@ end}. {mapping, "proxy_protocol", "rabbit.proxy_protocol", [{datatype, {enum, [true, false]}}]}. -%% Whether to stop the rabbit application if VHost data -%% cannot be recovered. +%% Whether to stop the rabbit application if a vhost has +%% to terminate for any reason. {mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy", - [{datatype, {enum, [stop_node, ignore]}}]}. + [{datatype, {enum, [stop_node, continue, transient, persistent]}}]}. % ========================== % Lager section diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 63bc1c52bd..f15c9e504e 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -107,7 +107,7 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre # all projects use the same versions. It avoids conflicts and makes it # possible to work with rabbitmq-public-umbrella. -dep_cowboy_commit = 1.1.0 +dep_cowboy_commit = 1.1.2 dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2 dep_ranch_commit = 1.3.2 dep_sockjs = git https://github.com/rabbitmq/sockjs-erlang.git 405990ea62353d98d36dbf5e1e64942d9b0a1daf diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index d74a3b173a..1549a05524 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -80,7 +80,7 @@ ESCRIPT_DIR="${RABBITMQ_HOME}/escript" DEFAULT_SCHEDULER_BIND_TYPE="db" [ "x" = "x$RABBITMQ_SCHEDULER_BIND_TYPE" ] && RABBITMQ_SCHEDULER_BIND_TYPE=${DEFAULT_SCHEDULER_BIND_TYPE} -DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000 +DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000 [ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE} ## Common defaults diff --git a/scripts/rabbitmq-env.bat b/scripts/rabbitmq-env.bat index e3fddbb3eb..9bf6f1cdc4 100644 --- a/scripts/rabbitmq-env.bat +++ b/scripts/rabbitmq-env.bat @@ -38,10 +38,10 @@ if "!RABBITMQ_SCHEDULER_BIND_TYPE!"=="" ( set RABBITMQ_SCHEDULER_BIND_TYPE=!DEFAULT_SCHEDULER_BIND_TYPE!
)
-REM DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+REM DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000
REM set the VM distribution buffer size
REM [ "x" = "x$RABBITMQ_DISTRIBUTION_BUFFER_SIZE" ] && RABBITMQ_DISTRIBUTION_BUFFER_SIZE=${DEFAULT_DISTRIBUTION_BUFFER_SIZE}
-set DEFAULT_DISTRIBUTION_BUFFER_SIZE=32000
+set DEFAULT_DISTRIBUTION_BUFFER_SIZE=128000
if "!RABBITMQ_DISTRIBUTION_BUFFER_SIZE!"=="" (
set RABBITMQ_DISTRIBUTION_BUFFER_SIZE=!DEFAULT_DISTRIBUTION_BUFFER_SIZE!
)
diff --git a/src/gm.erl b/src/gm.erl index f67050affb..0da190a57d 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -395,9 +395,8 @@ -define(GROUP_TABLE, gm_group). -define(MAX_BUFFER_SIZE, 100000000). %% 100MB --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(FORCE_GC_TIMER, 250). -define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -416,6 +415,7 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, + force_gc_timer, txn_executor, shutting_down }). @@ -508,7 +508,8 @@ table_definitions() -> [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). + gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], + [{spawn_opt, [{fullsweep_after, 0}]}]). leave(Server) -> gen_server2:cast(Server, leave). @@ -551,9 +552,9 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, + force_gc_timer = undefined, txn_executor = TxnFun, - shutting_down = false }, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + shutting_down = false }}. handle_call({confirmed_broadcast, _Msg}, _From, @@ -708,6 +709,10 @@ handle_cast(leave, State) -> {stop, normal, State}. +handle_info(force_gc, State) -> + garbage_collect(), + noreply(State #state { force_gc_timer = undefined }); + handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); @@ -883,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. + {noreply, ensure_timers(State), flush_timeout(State)}. reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. + {reply, Reply, ensure_timers(State), flush_timeout(State)}. + +ensure_timers(State) -> + ensure_force_gc_timer(ensure_broadcast_timer(State)). -flush_timeout(#state{broadcast_buffer = []}) -> hibernate; +flush_timeout(#state{broadcast_buffer = []}) -> infinity; flush_timeout(_) -> 0. +ensure_force_gc_timer(State = #state { force_gc_timer = TRef }) + when is_reference(TRef) -> + State; +ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) -> + TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc), + State #state { force_gc_timer = TRef }. + ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> State; @@ -958,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self, end, Self, MembersState), State #state { members_state = MembersState1, broadcast_buffer = [], - broadcast_buffer_sz = 0}. - + broadcast_buffer_sz = 0 }. %% --------------------------------------------------------------------------- %% View construction and inspection diff --git a/src/rabbit.erl b/src/rabbit.erl index 254dc02d73..b166e079f4 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -705,7 +705,8 @@ status() -> {erlang_version, erlang:system_info(system_version)}, {memory, rabbit_vm:memory()}, {alarms, alarms()}, - {listeners, listeners()}], + {listeners, listeners()}, + {vm_memory_calculation_strategy, vm_memory_monitor:get_memory_calculation_strategy()}], S2 = rabbit_misc:filter_exit_map( fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, [{vm_memory_high_watermark, {vm_memory_monitor, @@ -802,6 +803,16 @@ start(normal, []) -> warn_if_disc_io_options_dubious(), rabbit_boot_steps:run_boot_steps(), {ok, SupPid}; + {error, {erlang_version_too_old, + {found, OTPRel, ERTSVer}, + {required, ?OTP_MINIMUM, ?ERTS_MINIMUM}}} -> + Msg = "This RabbitMQ version cannot run on Erlang ~s (erts ~s): " + "minimum required version is ~s (erts ~s)", + Args = [OTPRel, ERTSVer, ?OTP_MINIMUM, ?ERTS_MINIMUM], + rabbit_log:error(Msg, Args), + %% also print to stderr to make this more visible + io:format(standard_error, "Error: " ++ Msg ++ "~n", Args), + {error, {erlang_version_too_old, rabbit_misc:format("Erlang ~s or later is required, started on ~s", [?OTP_MINIMUM, OTPRel])}}; Error -> Error end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4eead35c1d..ff57593374 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). --export([list_down/1, count/1, list_names/0]). +-export([list_down/1, count/1, list_names/0, list_local_names/0]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/4, basic_consume/11, basic_cancel/5, notify_decorators/1]). @@ -234,8 +234,13 @@ recover(VHost) -> %% for further processing in recover_durable_queues. {ok, OrderedRecoveryTerms} = BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]), - {ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost), - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)). + case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of + {ok, _} -> + recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + {error, Reason} -> + rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]), + throw({error, Reason}) + end. stop(VHost) -> ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), @@ -587,7 +592,13 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list_names() -> mnesia:dirty_all_keys(rabbit_queue). -list(VHostPath) -> list(VHostPath, rabbit_queue). +list_local_names() -> + [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), + State =/= crashed, + node() =:= node(QPid) ]. + +list(VHostPath) -> + list(VHostPath, rabbit_queue). %% Not dirty_match_object since that would not be transactional when used in a %% tx context @@ -601,12 +612,16 @@ list(VHostPath, TableName) -> end). list_down(VHostPath) -> - Present = list(VHostPath), - Durable = list(VHostPath, rabbit_durable_queue), - PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]), - sets:to_list(sets:filter(fun (#amqqueue{name = N}) -> - not sets:is_element(N, PresentS) - end, sets:from_list(Durable))). + case rabbit_vhost:exists(VHostPath) of + false -> []; + true -> + Present = list(VHostPath), + Durable = list(VHostPath, rabbit_durable_queue), + PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]), + sets:to_list(sets:filter(fun (#amqqueue{name = N}) -> + not sets:is_element(N, PresentS) + end, sets:from_list(Durable))) + end. count(VHost) -> try diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c52d329392..4e43104de2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -266,11 +266,13 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3. terminate(shutdown = R, State = #q{backing_queue = BQ}) -> + rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, missing_owner} = Reason, State) -> %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(normal, State) -> %% delete case terminate_shutdown(terminate_delete(true, normal, State), State); diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index 347dbbb48a..b5ef86255d 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -65,15 +65,30 @@ find_for_vhost(VHost, Node) -> -spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. start_for_vhost(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - supervisor2:start_child( - VHostSup, - {rabbit_amqqueue_sup_sup, - {rabbit_amqqueue_sup_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + supervisor2:start_child( + VHostSup, + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start a queue process supervisor for vhost ~s: vhost no longer exists!", + [VHost]), + {error, {no_such_vhost, VHost}} + end. -spec stop_for_vhost(rabbit_types:vhost()) -> ok. stop_for_vhost(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), - ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).
\ No newline at end of file + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), + ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup); + %% see start/1 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a queue process supervisor for vhost ~s: vhost no longer exists!", + [VHost]), + ok + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 378c1e583b..00a6607dfb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -70,6 +70,9 @@ %% For testing -export([build_topic_variable_map/3]). +%% Mgmt HTTP API refactor +-export([handle_method/5]). + -record(ch, { %% starting | running | flow | closing state, @@ -741,20 +744,20 @@ clear_permission_cache() -> erase(permission_cache), erase(topic_permission_cache), ok. -check_configure_permitted(Resource, #ch{user = User}) -> +check_configure_permitted(Resource, User) -> check_resource_access(User, Resource, configure). -check_write_permitted(Resource, #ch{user = User}) -> +check_write_permitted(Resource, User) -> check_resource_access(User, Resource, write). -check_read_permitted(Resource, #ch{user = User}) -> +check_read_permitted(Resource, User) -> check_resource_access(User, Resource, read). -check_write_permitted_on_topic(Resource, Channel, RoutingKey) -> - check_topic_authorisation(Resource, Channel, RoutingKey, write). +check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write). -check_read_permitted_on_topic(Resource, Channel, RoutingKey) -> - check_topic_authorisation(Resource, Channel, RoutingKey, read). +check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) -> + check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read). check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; @@ -790,12 +793,19 @@ check_internal_exchange(_) -> ok. check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic}, - #ch{user = User = #user{username = Username}, conn_pid = ConnPid}, + User = #user{username = Username}, + ConnPid, RoutingKey, Permission) -> Resource = Name#resource{kind = topic}, - Timeout = get_operation_timeout(), - AmqpParams = rabbit_amqp_connection:amqp_params(ConnPid, Timeout), + Timeout = get_operation_timeout(), + AmqpParams = case ConnPid of + none -> + %% Called from outside the channel by mgmt API + []; + _ -> + rabbit_amqp_connection:amqp_params(ConnPid, Timeout) + end, VariableMap = build_topic_variable_map(AmqpParams, VHost, Username), Context = #{routing_key => RoutingKey, variable_map => VariableMap @@ -811,7 +821,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail]) end; -check_topic_authorisation(_, _, _, _) -> +check_topic_authorisation(_, _, _, _, _) -> ok. build_topic_variable_map(AmqpParams, VHost, Username) -> @@ -844,10 +854,10 @@ check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> end. -qbin_to_resource(QueueNameBin, State) -> - name_to_resource(queue, QueueNameBin, State). +qbin_to_resource(QueueNameBin, VHostPath) -> + name_to_resource(queue, QueueNameBin, VHostPath). -name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) -> +name_to_resource(Type, NameBin, VHostPath) -> rabbit_misc:r(VHostPath, Type, NameBin). expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> @@ -1002,15 +1012,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, channel = ChannelNum, confirm_enabled = ConfirmEnabled, trace_state = TraceState, - user = #user{username = Username}, + user = #user{username = Username} = User, conn_name = ConnName, - delivery_flow = Flow}) -> + delivery_flow = Flow, + conn_pid = ConnPid}) -> check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_write_permitted(ExchangeName, State), + check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), - check_write_permitted_on_topic(Exchange, State, RoutingKey), + check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = @@ -1063,9 +1074,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, limiter = Limiter, - next_tag = DeliveryTag}) -> - QueueName = qbin_to_resource(QueueNameBin, State), - check_read_permitted(QueueName, State), + next_tag = DeliveryTag, + user = User, + virtual_host = VHostPath}) -> + QueueName = qbin_to_resource(QueueNameBin, VHostPath), + check_read_permitted(QueueName, User), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get( @@ -1148,11 +1161,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait, arguments = Args}, _, State = #ch{consumer_prefetch = ConsumerPrefetch, - consumer_mapping = ConsumerMapping}) -> + consumer_mapping = ConsumerMapping, + user = User, + virtual_host = VHostPath}) -> case maps:find(ConsumerTag, ConsumerMapping) of error -> - QueueName = qbin_to_resource(QueueNameBin, State), - check_read_permitted(QueueName, State), + QueueName = qbin_to_resource(QueueNameBin, VHostPath), + check_read_permitted(QueueName, User), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), @@ -1273,251 +1288,81 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, _, State) -> reject(DeliveryTag, Requeue, false, State); -handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - type = TypeNameBin, - passive = false, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - nowait = NoWait, - arguments = Args}, +handle_method(#'exchange.declare'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, - user = #user{username = Username}}) -> - CheckedType = rabbit_exchange:check_type(TypeNameBin), - ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), - check_not_default_exchange(ExchangeName), - check_configure_permitted(ExchangeName, State), - X = case rabbit_exchange:lookup(ExchangeName) of - {ok, FoundX} -> FoundX; - {error, not_found} -> - check_name('exchange', strip_cr_lf(ExchangeNameBin)), - AeKey = <<"alternate-exchange">>, - case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of - undefined -> ok; - {error, {invalid_type, Type}} -> - precondition_failed( - "invalid type '~s' for arg '~s' in ~s", - [Type, AeKey, rabbit_misc:rs(ExchangeName)]); - AName -> check_read_permitted(ExchangeName, State), - check_write_permitted(AName, State), - ok - end, - rabbit_exchange:declare(ExchangeName, - CheckedType, - Durable, - AutoDelete, - Internal, - Args, - Username) - end, - ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, - AutoDelete, Internal, Args), - return_ok(State, NoWait, #'exchange.declare_ok'{}); - -handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - passive = true, - nowait = NoWait}, - _, State = #ch{virtual_host = VHostPath}) -> - ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), - check_not_default_exchange(ExchangeName), - _ = rabbit_exchange:lookup_or_die(ExchangeName), + user = User, + queue_collector_pid = CollectorPid, + conn_pid = ConnPid}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); -handle_method(#'exchange.delete'{exchange = ExchangeNameBin, - if_unused = IfUnused, - nowait = NoWait}, - _, State = #ch{virtual_host = VHostPath, - user = #user{username = Username}}) -> - StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), - ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), - check_not_default_exchange(ExchangeName), - check_exchange_deletion(ExchangeName), - check_configure_permitted(ExchangeName, State), - case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of - {error, not_found} -> - return_ok(State, NoWait, #'exchange.delete_ok'{}); - {error, in_use} -> - precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]); - ok -> - return_ok(State, NoWait, #'exchange.delete_ok'{}) - end; - -handle_method(#'exchange.bind'{destination = DestinationNameBin, - source = SourceNameBin, - routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:add/3, - strip_cr_lf(SourceNameBin), exchange, strip_cr_lf(DestinationNameBin), RoutingKey, - Arguments, #'exchange.bind_ok'{}, NoWait, State); - -handle_method(#'exchange.unbind'{destination = DestinationNameBin, - source = SourceNameBin, - routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:remove/3, - strip_cr_lf(SourceNameBin), exchange, strip_cr_lf(DestinationNameBin), RoutingKey, - Arguments, #'exchange.unbind_ok'{}, NoWait, State); - -%% Note that all declares to these are effectively passive. If it -%% exists it by definition has one consumer. -handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", - _/binary>> = QueueNameBin, - nowait = NoWait}, _, - State = #ch{virtual_host = VHost}) -> - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), - case declare_fast_reply_to(StrippedQueueNameBin) of - exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State); - not_found -> rabbit_misc:not_found(QueueName) - end; +handle_method(#'exchange.delete'{nowait = NoWait} = Method, + _, State = #ch{conn_pid = ConnPid, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, NoWait, #'exchange.delete_ok'{}); -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = DurableDeclare, - exclusive = ExclusiveDeclare, - auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args} = Declare, +handle_method(#'exchange.bind'{nowait = NoWait} = Method, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, queue_collector_pid = CollectorPid, - user = #user{username = Username}}) -> - Owner = case ExclusiveDeclare of - true -> ConnPid; - false -> none - end, - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - Durable = DurableDeclare andalso not ExclusiveDeclare, - ActualNameBin = case StrippedQueueNameBin of - <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), - "amq.gen"); - Other -> check_name('queue', Other) - end, - QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( - Q, Durable, AutoDelete, Args, Owner), - maybe_stat(NoWait, Q) - end) of - {ok, MessageCount, ConsumerCount} -> - return_queue_declare_ok(QueueName, NoWait, MessageCount, - ConsumerCount, State); - {error, not_found} -> - %% enforce the limit for newly declared queues only - check_vhost_queue_limit(QueueName, VHostPath), - DlxKey = <<"x-dead-letter-exchange">>, - case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of - undefined -> - ok; - {error, {invalid_type, Type}} -> - precondition_failed( - "invalid type '~s' for arg '~s' in ~s", - [Type, DlxKey, rabbit_misc:rs(QueueName)]); - DLX -> - check_read_permitted(QueueName, State), - check_write_permitted(DLX, State), - ok - end, - case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner, Username) of - {new, #amqqueue{pid = QPid}} -> - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as - %% the connection shuts down. - ok = case Owner of - none -> ok; - _ -> rabbit_queue_collector:register( - CollectorPid, QPid) - end, - return_queue_declare_ok(QueueName, NoWait, 0, 0, State); - {existing, _Q} -> - %% must have been created between the stat and the - %% declare. Loop around again. - handle_method(Declare, none, State); - {absent, Q, Reason} -> - rabbit_misc:absent(Q, Reason); - {owner_died, _Q} -> - %% Presumably our own days are numbered since the - %% connection has died. Pretend the queue exists though, - %% just so nothing fails. - return_queue_declare_ok(QueueName, NoWait, 0, 0, State) - end; - {error, {absent, Q, Reason}} -> - rabbit_misc:absent(Q, Reason) - end; + user = User}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, NoWait, #'exchange.bind_ok'{}); -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = true, - nowait = NoWait}, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid}) -> - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), - {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = - rabbit_amqqueue:with_or_die( - QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), - ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), - return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, - State); +handle_method(#'exchange.unbind'{nowait = NoWait} = Method, + _, State = #ch{virtual_host = VHostPath, + conn_pid = ConnPid, + queue_collector_pid = CollectorPid, + user = User}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, NoWait, #'exchange.unbind_ok'{}); -handle_method(#'queue.delete'{queue = QueueNameBin, - if_unused = IfUnused, - if_empty = IfEmpty, - nowait = NoWait}, +handle_method(#'queue.declare'{nowait = NoWait} = Method, + _, State = #ch{virtual_host = VHostPath, + conn_pid = ConnPid, + queue_collector_pid = CollectorPid, + user = User}) -> + {ok, QueueName, MessageCount, ConsumerCount} = + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + +handle_method(#'queue.delete'{nowait = NoWait} = Method, _, + State = #ch{conn_pid = ConnPid, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}) -> + {ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid, + VHostPath, User), + return_ok(State, NoWait, + #'queue.delete_ok'{message_count = PurgedMessageCount}); + +handle_method(#'queue.bind'{nowait = NoWait} = Method, _, + State = #ch{conn_pid = ConnPid, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, NoWait, #'queue.bind_ok'{}); + +handle_method(#'queue.unbind'{} = Method, _, + State = #ch{conn_pid = ConnPid, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}) -> + handle_method(Method, ConnPid, CollectorPid, VHostPath, User), + return_ok(State, false, #'queue.unbind_ok'{}); + +handle_method(#'queue.purge'{nowait = NoWait} = Method, _, State = #ch{conn_pid = ConnPid, - user = #user{username = Username}}) -> - StrippedQueueNameBin = strip_cr_lf(QueueNameBin), - QueueName = qbin_to_resource(StrippedQueueNameBin, State), - check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:check_exclusive_access(Q, ConnPid), - rabbit_amqqueue:delete(Q, IfUnused, IfEmpty, Username) - end, - fun (not_found) -> {ok, 0}; - ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username), - {ok, 0}; - ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) - end) of - {error, in_use} -> - precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); - {error, not_empty} -> - precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]); - {ok, PurgedMessageCount} -> - return_ok(State, NoWait, - #'queue.delete_ok'{message_count = PurgedMessageCount}) - end; - -handle_method(#'queue.bind'{queue = QueueNameBin, - exchange = ExchangeNameBin, - routing_key = RoutingKey, - nowait = NoWait, - arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:add/3, - strip_cr_lf(ExchangeNameBin), queue, strip_cr_lf(QueueNameBin), RoutingKey, Arguments, - #'queue.bind_ok'{}, NoWait, State); - -handle_method(#'queue.unbind'{queue = QueueNameBin, - exchange = ExchangeNameBin, - routing_key = RoutingKey, - arguments = Arguments}, _, State) -> - binding_action(fun rabbit_binding:remove/3, - strip_cr_lf(ExchangeNameBin), queue, strip_cr_lf(QueueNameBin), RoutingKey, Arguments, - #'queue.unbind_ok'{}, false, State); - -handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State = #ch{conn_pid = ConnPid}) -> - QueueName = qbin_to_resource(QueueNameBin, State), - check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnPid, - fun (Q) -> rabbit_amqqueue:purge(Q) end), + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}) -> + {ok, PurgedMessageCount} = handle_method(Method, ConnPid, CollectorPid, + VHostPath, User), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -1713,23 +1558,23 @@ queue_down_consumer_action(CTag, CMap) -> handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. -binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, - RoutingKey, Arguments, ReturnMethod, NoWait, - State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - user = #user{username = Username}}) -> - DestinationName = name_to_resource(DestinationType, DestinationNameBin, State), - check_write_permitted(DestinationName, State), +binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, + RoutingKey, Arguments, VHostPath, ConnPid, + #user{username = Username} = User) -> + ExchangeNameBin = strip_cr_lf(SourceNameBin0), + DestinationNameBin = strip_cr_lf(DestinationNameBin0), + DestinationName = name_to_resource(DestinationType, DestinationNameBin, VHostPath), + check_write_permitted(DestinationName, User), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], - check_read_permitted(ExchangeName, State), + check_read_permitted(ExchangeName, User), ExchangeLookup = rabbit_exchange:lookup(ExchangeName), case ExchangeLookup of {error, not_found} -> %% no-op ExchangeLookup; {ok, Exchange} -> - check_read_permitted_on_topic(Exchange, State, RoutingKey), + check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey), ExchangeLookup end, case Fun(#binding{source = ExchangeName, @@ -1757,7 +1602,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, rabbit_misc:protocol_error(precondition_failed, Fmt, Args); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); - ok -> return_ok(State, NoWait, ReturnMethod) + ok -> + ok end. basic_return(#basic_message{exchange_name = ExchangeName, @@ -2148,3 +1994,226 @@ put_operation_timeout() -> get_operation_timeout() -> get(channel_operation_timeout). + +%% Refactored and exported to allow direct calls from the HTTP API, +%% avoiding the usage of AMQP 0-9-1 from the management. +handle_method(#'exchange.bind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + arguments = Arguments}, + ConnPid, _CollectorId, VHostPath, User) -> + binding_action(fun rabbit_binding:add/3, + SourceNameBin, exchange, DestinationNameBin, + RoutingKey, Arguments, VHostPath, ConnPid, User); +handle_method(#'exchange.unbind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + arguments = Arguments}, + ConnPid, _CollectorId, VHostPath, User) -> + binding_action(fun rabbit_binding:remove/3, + SourceNameBin, exchange, DestinationNameBin, + RoutingKey, Arguments, VHostPath, ConnPid, User); +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, + ConnPid, _CollectorId, VHostPath, User) -> + binding_action(fun rabbit_binding:remove/3, + ExchangeNameBin, queue, QueueNameBin, + RoutingKey, Arguments, VHostPath, ConnPid, User); +handle_method(#'queue.bind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, + ConnPid, _CollectorId, VHostPath, User) -> + binding_action(fun rabbit_binding:add/3, + ExchangeNameBin, queue, QueueNameBin, + RoutingKey, Arguments, VHostPath, ConnPid, User); +%% Note that all declares to these are effectively passive. If it +%% exists it by definition has one consumer. +handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", + _/binary>> = QueueNameBin}, + _ConnPid, _CollectorPid, VHost, _User) -> + StrippedQueueNameBin = strip_cr_lf(QueueNameBin), + QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin), + case declare_fast_reply_to(StrippedQueueNameBin) of + exists -> {ok, QueueName, 0, 1}; + not_found -> rabbit_misc:not_found(QueueName) + end; +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = DurableDeclare, + exclusive = ExclusiveDeclare, + auto_delete = AutoDelete, + nowait = NoWait, + arguments = Args} = Declare, + ConnPid, CollectorPid, VHostPath, #user{username = Username} = User) -> + Owner = case ExclusiveDeclare of + true -> ConnPid; + false -> none + end, + StrippedQueueNameBin = strip_cr_lf(QueueNameBin), + Durable = DurableDeclare andalso not ExclusiveDeclare, + ActualNameBin = case StrippedQueueNameBin of + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.gen"); + Other -> check_name('queue', Other) + end, + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configure_permitted(QueueName, User), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + maybe_stat(NoWait, Q) + end) of + {ok, MessageCount, ConsumerCount} -> + {ok, QueueName, MessageCount, ConsumerCount}; + {error, not_found} -> + %% enforce the limit for newly declared queues only + check_vhost_queue_limit(QueueName, VHostPath), + DlxKey = <<"x-dead-letter-exchange">>, + case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of + undefined -> + ok; + {error, {invalid_type, Type}} -> + precondition_failed( + "invalid type '~s' for arg '~s' in ~s", + [Type, DlxKey, rabbit_misc:rs(QueueName)]); + DLX -> + check_read_permitted(QueueName, User), + check_write_permitted(DLX, User), + ok + end, + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner, Username) of + {new, #amqqueue{pid = QPid}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case {Owner, CollectorPid} of + {none, _} -> ok; + {_, none} -> ok; %% Supports call from mgmt API + _ -> rabbit_queue_collector:register( + CollectorPid, QPid) + end, + {ok, QueueName, 0, 0}; + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, ConnPid, CollectorPid, VHostPath, User); + {absent, Q, Reason} -> + rabbit_misc:absent(Q, Reason); + {owner_died, _Q} -> + %% Presumably our own days are numbered since the + %% connection has died. Pretend the queue exists though, + %% just so nothing fails. + {ok, QueueName, 0, 0} + end; + {error, {absent, Q, Reason}} -> + rabbit_misc:absent(Q, Reason) + end; +handle_method(#'queue.declare'{queue = QueueNameBin, + nowait = NoWait, + passive = true}, + ConnPid, _CollectorPid, VHostPath, _User) -> + StrippedQueueNameBin = strip_cr_lf(QueueNameBin), + QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin), + {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), + ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), + {ok, QueueName, MessageCount, ConsumerCount}; +handle_method(#'queue.delete'{queue = QueueNameBin, + if_unused = IfUnused, + if_empty = IfEmpty}, + ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) -> + StrippedQueueNameBin = strip_cr_lf(QueueNameBin), + QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath), + + check_configure_permitted(QueueName, User), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ConnPid), + rabbit_amqqueue:delete(Q, IfUnused, IfEmpty, Username) + end, + fun (not_found) -> {ok, 0}; + ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q, Username), + {ok, 0}; + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) + end) of + {error, in_use} -> + precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); + {error, not_empty} -> + precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]); + {ok, _Count} = OK -> + OK + end; +handle_method(#'exchange.delete'{exchange = ExchangeNameBin, + if_unused = IfUnused}, + _ConnPid, _CollectorPid, VHostPath, User = #user{username = Username}) -> + StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin), + ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin), + check_not_default_exchange(ExchangeName), + check_exchange_deletion(ExchangeName), + check_configure_permitted(ExchangeName, User), + case rabbit_exchange:delete(ExchangeName, IfUnused, Username) of + {error, not_found} -> + ok; + {error, in_use} -> + precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]); + ok -> + ok + end; +handle_method(#'queue.purge'{queue = QueueNameBin}, + ConnPid, _CollectorPid, VHostPath, User) -> + QueueName = qbin_to_resource(QueueNameBin, VHostPath), + check_read_permitted(QueueName, User), + rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ConnPid, + fun (Q) -> rabbit_amqqueue:purge(Q) end); +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + type = TypeNameBin, + passive = false, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args}, + _ConnPid, _CollectorPid, VHostPath, #user{username = Username} = User) -> + CheckedType = rabbit_exchange:check_type(TypeNameBin), + ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), + check_not_default_exchange(ExchangeName), + check_configure_permitted(ExchangeName, User), + X = case rabbit_exchange:lookup(ExchangeName) of + {ok, FoundX} -> FoundX; + {error, not_found} -> + check_name('exchange', strip_cr_lf(ExchangeNameBin)), + AeKey = <<"alternate-exchange">>, + case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of + undefined -> ok; + {error, {invalid_type, Type}} -> + precondition_failed( + "invalid type '~s' for arg '~s' in ~s", + [Type, AeKey, rabbit_misc:rs(ExchangeName)]); + AName -> check_read_permitted(ExchangeName, User), + check_write_permitted(AName, User), + ok + end, + rabbit_exchange:declare(ExchangeName, + CheckedType, + Durable, + AutoDelete, + Internal, + Args, + Username) + end, + ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, + AutoDelete, Internal, Args); +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + passive = true}, + _ConnPid, _CollectorPid, VHostPath, _User) -> + ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)), + check_not_default_exchange(ExchangeName), + _ = rabbit_exchange:lookup_or_die(ExchangeName). diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index f8c4c6541b..30932bb1b0 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -34,7 +34,7 @@ delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, clear_tracked_connection_tables_for_this_node/0, register_connection/1, unregister_connection/1, - list/0, list/1, list_on_node/1, list_of_user/1, + list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1, tracked_connection_from_connection_created/1, tracked_connection_from_connection_state/1, count_connections_in/1]). @@ -217,6 +217,16 @@ list_on_node(Node) -> catch exit:{aborted, {no_exists, _}} -> [] end. +-spec list_on_node(node(), rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node, VHost) -> + try mnesia:dirty_match_object( + tracked_connection_table_name_for(Node), + #tracked_connection{vhost = VHost, _ = '_'}) + catch exit:{aborted, {no_exists, _}} -> [] + end. + + -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()]. list_of_user(Username) -> diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index f1b844c60c..3ae17677e0 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) -> close_connections(rabbit_connection_tracking:list(VHost), rabbit_misc:format("vhost '~s' is deleted", [VHost])), {ok, State}; +handle_event(#event{type = vhost_down, props = Details}, State) -> + VHost = pget(name, Details), + Node = pget(node, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'" + " because the vhost database has stopped working", + [VHost, Node]), + close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), + rabbit_misc:format("vhost '~s' is down", [VHost])), + {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> Username = pget(name, Details), rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index 3141fdc301..3321f2b5de 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -70,7 +70,7 @@ gc_channels() -> ok. gc_queues() -> - Queues = rabbit_amqqueue:list_names(), + Queues = rabbit_amqqueue:list_local_names(), GbSet = gb_sets:from_list(Queues), gc_entity(queue_metrics, GbSet), gc_entity(queue_coarse_metrics, GbSet), diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 4b7f06305a..26e8f4d452 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -90,16 +90,21 @@ connect(Creds, VHost, Protocol, Pid, Infos) -> true -> {error, not_allowed}; false -> - case AuthFun() of - {ok, User = #user{username = Username}} -> - notify_auth_result(Username, - user_authentication_success, []), - connect1(User, VHost, Protocol, Pid, Infos); - {refused, Username, Msg, Args} -> - notify_auth_result(Username, - user_authentication_failure, - [{error, rabbit_misc:format(Msg, Args)}]), - {error, {auth_failure, "Refused"}} + case is_vhost_alive(VHost, Creds, Pid) of + false -> + {error, {internal_error, vhost_is_down}}; + true -> + case AuthFun() of + {ok, User = #user{username = Username}} -> + notify_auth_result(Username, + user_authentication_success, []), + connect1(User, VHost, Protocol, Pid, Infos); + {refused, Username, Msg, Args} -> + notify_auth_result(Username, + user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}]), + {error, {auth_failure, "Refused"}} + end end end; false -> {error, broker_not_found_on_node} @@ -140,6 +145,21 @@ maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) -> [] end. +is_vhost_alive(VHost, {Username, _Password}, Pid) -> + PrintedUsername = case Username of + none -> ""; + _ -> Username + end, + case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of + true -> true; + false -> + rabbit_log_connection:error( + "Error on Direct connection ~p~n" + "access to vhost '~s' refused for user '~s': " + "vhost '~s' is down", + [Pid, VHost, PrintedUsername, VHost]), + false + end. is_over_connection_limit(VHost, {Username, _Password}, Pid) -> PrintedUsername = case Username of diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl index c6c353d552..702fb41eb8 100644 --- a/src/rabbit_looking_glass.erl +++ b/src/rabbit_looking_glass.erl @@ -29,7 +29,10 @@ boot() -> Input = parse_value(Value), rabbit_log:info("Enabling Looking Glass profiler, input value: ~p", [Input]), {ok, _} = application:ensure_all_started(looking_glass), - lg:trace(Input, lg_file_tracer, "traces.lz4", #{mode => profile, running => true}) + lg:trace(Input, lg_file_tracer, "traces.lz4", #{ + mode => profile, + running => true, + send => true}) end. parse_value(Value) -> diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 2994e8cbcf..fd64fd61b3 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_sync). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 275a9127d1..fe78075d0f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, ok; {error, RTErr} -> rabbit_log:error("Unable to save message store recovery terms" - "for directory ~p~nError: ~p~n", + " for directory ~p~nError: ~p~n", [Dir, RTErr]) end, State3 #msstate { index_state = undefined, diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl index d93104d02c..f28e1281c3 100644 --- a/src/rabbit_parameter_validation.erl +++ b/src/rabbit_parameter_validation.erl @@ -22,13 +22,13 @@ number(_Name, Term) when is_number(Term) -> ok; number(Name, Term) -> - {error, "~s should be number, actually was ~p", [Name, Term]}. + {error, "~s should be a number, actually was ~p", [Name, Term]}. integer(_Name, Term) when is_integer(Term) -> ok; integer(Name, Term) -> - {error, "~s should be number, actually was ~p", [Name, Term]}. + {error, "~s should be a number, actually was ~p", [Name, Term]}. binary(_Name, Term) when is_binary(Term) -> ok; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e23d382d6e..77914a00bf 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -567,7 +567,7 @@ handle_other(handshake_timeout, State) -> throw({handshake_timeout, State#v1.callback}); handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> State; -handle_other(heartbeat_timeout, +handle_other(heartbeat_timeout, State = #v1{connection = #connection{timeout_sec = T}}) -> maybe_emit_stats(State), throw({heartbeat_timeout, T}); @@ -623,7 +623,7 @@ send_blocked(#v1{connection = #connection{protocol = Protocol, sock = Sock}, Reason) -> case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of {bool, true} -> - + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, Protocol); _ -> @@ -1164,6 +1164,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, ok = is_over_connection_limit(VHost, User), ok = rabbit_access_control:check_vhost_access(User, VHost, Sock), + ok = is_vhost_alive(VHost, User), NewConnection = Connection#connection{vhost = VHost}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -1209,6 +1210,16 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). +is_vhost_alive(VHostPath, User) -> + case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of + true -> ok; + false -> + rabbit_misc:protocol_error(internal_error, + "access to vhost '~s' refused for user '~s': " + "vhost '~s' is down", + [VHostPath, User#user.username, VHostPath]) + end. + is_over_connection_limit(VHostPath, User) -> try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of false -> ok; @@ -1567,7 +1578,7 @@ maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) -> State1 = State#v1{connection_state = blocked, throttle = update_last_blocked_at(Throttle)}, case CS of - running -> + running -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater); _ -> ok end, @@ -1589,7 +1600,7 @@ maybe_send_unblocked(State = #v1{throttle = Throttle}) -> case should_send_unblocked(Throttle) of true -> ok = send_unblocked(State), - State#v1{throttle = + State#v1{throttle = Throttle#throttle{connection_blocked_message_sent = false}}; false -> State end. @@ -1598,7 +1609,7 @@ maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) -> case should_send_blocked(Throttle) of true -> ok = send_blocked(State, blocked_by_message(Throttle)), - State#v1{throttle = + State#v1{throttle = Throttle#throttle{connection_blocked_message_sent = true}}; false -> maybe_send_unblocked(State) end. @@ -1624,7 +1635,7 @@ control_throttle(State = #v1{connection_state = CS, running -> maybe_block(State1); %% unblock or re-enable blocking blocked -> maybe_block(maybe_unblock(State1)); - _ -> State1 + _ -> State1 end. augment_connection_log_name(#connection{client_properties = ClientProperties, diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index be9b1b6227..73fc9c7449 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -48,20 +48,35 @@ %%---------------------------------------------------------------------------- start(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - {ok, _} = supervisor2:start_child( - VHostSup, - {?MODULE, - {?MODULE, start_link, [VHost]}, - transient, ?WORKER_WAIT, worker, - [?MODULE]}), + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + {ok, _} = supervisor2:start_child( + VHostSup, + {?MODULE, + {?MODULE, start_link, [VHost]}, + transient, ?WORKER_WAIT, worker, + [?MODULE]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start a recovery terms manager for vhost ~s: vhost no longer exists!", + [VHost]) + end, ok. stop(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - case supervisor:terminate_child(VHostSup, ?MODULE) of - ok -> supervisor:delete_child(VHostSup, ?MODULE); - E -> E + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + case supervisor:terminate_child(VHostSup, ?MODULE) of + ok -> supervisor:delete_child(VHostSup, ?MODULE); + E -> E + end; + %% see start/1 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a recovery terms manager for vhost ~s: vhost no longer exists!", + [VHost]), + + ok end. store(VHost, DirBaseName, Terms) -> @@ -74,7 +89,14 @@ read(VHost, DirBaseName) -> end. clear(VHost) -> - ok = dets:delete_all_objects(VHost), + try + dets:delete_all_objects(VHost) + %% see start/1 + catch _:badarg -> + rabbit_log:error("Failed to clear recovery terms for vhost ~s: table no longer exists!", + [VHost]), + ok + end, flush(VHost). start_link(VHost) -> @@ -126,8 +148,15 @@ open_global_table() -> ok. close_global_table() -> - ok = dets:sync(?MODULE), - ok = dets:close(?MODULE). + try + dets:sync(?MODULE), + dets:close(?MODULE) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to clear global recovery terms: table no longer exists!", + []), + ok + end. read_global(DirBaseName) -> read(?MODULE, DirBaseName). @@ -163,8 +192,23 @@ open_table(VHost) -> {ram_file, true}, {auto_save, infinity}]). -flush(VHost) -> ok = dets:sync(VHost). +flush(VHost) -> + try + dets:sync(VHost) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to sync recovery terms table for vhost ~s: the table no longer exists!", + [VHost]), + ok + end. close_table(VHost) -> - ok = flush(VHost), - ok = dets:close(VHost). + try + ok = flush(VHost), + ok = dets:close(VHost) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to close recovery terms table for vhost ~s: the table no longer exists!", + [VHost]), + ok + end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 40967e316e..b4945fe3d3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -496,15 +496,26 @@ stop(VHost) -> start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?TRANSIENT_MSG_STORE, - undefined, - ?EMPTY_START_FUN_STATE), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?PERSISTENT_MSG_STORE, - Refs, - StartFunState), - rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). + do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE), + do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState), + ok. + +do_start_msg_store(VHost, Type, Refs, StartFunState) -> + case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of + {ok, _} -> + rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]); + {error, {no_such_vhost, VHost}} = Err -> + rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n", + [Type, VHost]), + exit(Err); + {error, Error} -> + rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n", + [Type, VHost, Error]), + exit({error, Error}) + end. + +abbreviated_type(?TRANSIENT_MSG_STORE) -> transient; +abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent. stop_msg_store(VHost) -> rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), @@ -2906,7 +2917,7 @@ run_old_persistent_store(Refs, StartFunState) -> OldStoreName. start_new_store(VHosts) -> - %% Ensure vhost supervisor is started, so we can add vhsots to it. + %% Ensure vhost supervisor is started, so we can add vhosts to it. lists:map(fun(VHost) -> VHostDir = rabbit_vhost:msg_store_dir_path(VHost), {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 4dc2ec86d0..30557fc7be 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -26,9 +26,10 @@ -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([delete_storage/1]). +-export([vhost_down/1]). --spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. --spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. +-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). +-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). -spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. -spec exists(rabbit_types:vhost()) -> boolean(). -spec list() -> [rabbit_types:vhost()]. @@ -54,8 +55,9 @@ recover() -> %% rabbit_vhost_sup_sup will start the actual recovery. %% So recovery will be run every time a vhost supervisor is restarted. ok = rabbit_vhost_sup_sup:start(), - [{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost) - || VHost <- rabbit_vhost:list()], + + [ ok = rabbit_vhost_sup_sup:init_vhost(VHost) + || VHost <- rabbit_vhost:list()], ok. recover(VHost) -> @@ -73,7 +75,7 @@ recover(VHost) -> %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, tracing]). +-define(INFO_KEYS, [name, tracing, state]). add(VHostPath, ActingUser) -> rabbit_log:info("Adding vhost '~s'~n", [VHostPath]), @@ -104,10 +106,20 @@ add(VHostPath, ActingUser) -> {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), - ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath), - rabbit_event:notify(vhost_created, info(VHostPath) - ++ [{user_who_performed_action, ActingUser}]), - R. + case rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath) of + ok -> + rabbit_event:notify(vhost_created, info(VHostPath) + ++ [{user_who_performed_action, ActingUser}]), + R; + {error, {no_such_vhost, VHostPath}} -> + Msg = rabbit_misc:format("failed to set up vhost '~s': it was concurrently deleted!", + [VHostPath]), + {error, Msg}; + {error, Reason} -> + Msg = rabbit_misc:format("failed to set up vhost '~s': ~p", + [VHostPath, Reason]), + {error, Msg} + end. delete(VHostPath, ActingUser) -> %% FIXME: We are forced to delete the queues and exchanges outside @@ -125,12 +137,21 @@ delete(VHostPath, ActingUser) -> with(VHostPath, fun () -> internal_delete(VHostPath, ActingUser) end)), ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}, {user_who_performed_action, ActingUser}]), - [ok = Fun() || Fun <- Funs], + [case Fun() of + ok -> ok; + {error, {no_such_vhost, VHostPath}} -> ok + end || Fun <- Funs], %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors %% on all the nodes. rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), ok. +vhost_down(VHostPath) -> + ok = rabbit_event:notify(vhost_down, + [{name, VHostPath}, + {node, node()}, + {user_who_performed_action, ?INTERNAL_USER}]). + delete_storage(VHost) -> VhostDir = msg_store_dir_path(VHost), rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), @@ -240,6 +261,10 @@ infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, VHost) -> VHost; i(tracing, VHost) -> rabbit_trace:enabled(VHost); +i(state, VHost) -> case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of + true -> running; + false -> down + end; i(Item, _) -> throw({bad_argument, Item}). info(VHost) -> infos(?INFO_KEYS, VHost). diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl index 58c0ce065f..7b797e46b2 100644 --- a/src/rabbit_vhost_limit.erl +++ b/src/rabbit_vhost_limit.erl @@ -26,7 +26,8 @@ -export([update_limit/4, clear_limit/3, get_limit/2]). -export([validate/5, notify/5, notify_clear/4]). -export([connection_limit/1, queue_limit/1, - is_over_queue_limit/1, is_over_connection_limit/1]). + is_over_queue_limit/1, would_exceed_queue_limit/2, + is_over_connection_limit/1]). -import(rabbit_misc, [pget/2, pget/3]). @@ -106,28 +107,40 @@ is_over_connection_limit(VirtualHost) -> {ok, _Limit} -> false end. +-spec would_exceed_queue_limit(non_neg_integer(), rabbit_types:vhost()) -> + {true, non_neg_integer(), non_neg_integer()} | false. --spec is_over_queue_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. - -is_over_queue_limit(VirtualHost) -> +would_exceed_queue_limit(AdditionalCount, VirtualHost) -> case queue_limit(VirtualHost) of - %% no limit configured - undefined -> false; - %% with limit = 0, no queues can be declared (perhaps not very - %% useful but consistent with the connection limit) - {ok, 0} -> {true, 0}; + undefined -> + %% no limit configured + false; + {ok, 0} -> + %% with limit = 0, no queues can be declared (perhaps not very + %% useful but consistent with the connection limit) + {true, 0, 0}; {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> QueueCount = rabbit_amqqueue:count(VirtualHost), - case QueueCount >= Limit of + case (AdditionalCount + QueueCount) > Limit of false -> false; - true -> {true, Limit} + true -> {true, Limit, QueueCount} end; - %% any negative value means "no limit". Note that parameter validation - %% will replace negative integers with 'undefined', so this is to be - %% explicit and extra defensive - {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false; - %% ignore non-integer limits - {ok, _Limit} -> false + {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> + %% any negative value means "no limit". Note that parameter validation + %% will replace negative integers with 'undefined', so this is to be + %% explicit and extra defensive + false; + {ok, _Limit} -> + %% ignore non-integer limits + false + end. + +-spec is_over_queue_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. + +is_over_queue_limit(VirtualHost) -> + case would_exceed_queue_limit(1, VirtualHost) of + {true, Limit, _QueueCount} -> {true, Limit}; + false -> false end. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl index 482ad082b8..3c633875bc 100644 --- a/src/rabbit_vhost_msg_store.erl +++ b/src/rabbit_vhost_msg_store.erl @@ -23,17 +23,33 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); ClientRefs == undefined -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - supervisor2:start_child(VHostSup, - {Type, {rabbit_msg_store, start_link, - [Type, VHostDir, ClientRefs, StartupFunState]}, - transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + supervisor2:start_child(VHostSup, + {Type, {rabbit_msg_store, start_link, + [Type, VHostDir, ClientRefs, StartupFunState]}, + transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} = E -> + rabbit_log:error("Failed to start a message store for vhost ~s: vhost no longer exists!", + [VHost]), + E + end. stop(VHost, Type) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - ok = supervisor2:terminate_child(VHostSup, Type), - ok = supervisor2:delete_child(VHostSup, Type). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + ok = supervisor2:terminate_child(VHostSup, Type), + ok = supervisor2:delete_child(VHostSup, Type); + %% see start/4 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a message store for vhost ~s: vhost no longer exists!", + [VHost]), + + ok + end. client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) -> with_vhost_store(VHost, Type, fun(StorePid) -> @@ -58,4 +74,4 @@ vhost_store_pid(VHost, Type) -> successfully_recovered_state(VHost, Type) -> with_vhost_store(VHost, Type, fun(StorePid) -> rabbit_msg_store:successfully_recovered_state(StorePid) - end).
\ No newline at end of file + end). diff --git a/src/rabbit_vhost_process.erl b/src/rabbit_vhost_process.erl new file mode 100644 index 0000000000..e3c815a727 --- /dev/null +++ b/src/rabbit_vhost_process.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +%% This module implements a vhost identity process. + +%% On start this process will try to recover the vhost data and +%% processes structure (queues and message stores). +%% If recovered successfully, the process will save it's PID +%% to vhost process registry. If vhost process PID is in the registry and the +%% process is alive - the vhost is considered running. + +%% On termination, the ptocess will notify of vhost going down. + +%% The process will also check periodically if the vhost still +%% present in mnesia DB and stop the vhost supervision tree when it +%% disappears. + +-module(rabbit_vhost_process). + +-include("rabbit.hrl"). + +-define(TICKTIME_RATIO, 4). + +-behaviour(gen_server2). +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +start_link(VHost) -> + gen_server2:start_link(?MODULE, [VHost], []). + + +init([VHost]) -> + process_flag(trap_exit, true), + rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]), + try + %% Recover the vhost data and save it to vhost registry. + ok = rabbit_vhost:recover(VHost), + rabbit_vhost_sup_sup:save_vhost_process(VHost, self()), + Interval = interval(), + timer:send_interval(Interval, check_vhost), + {ok, VHost} + catch _:Reason -> + rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" + " Stacktrace ~p", + [VHost, Reason, erlang:get_stacktrace()]), + {stop, Reason} + end. + +handle_call(_,_,VHost) -> + {reply, ok, VHost}. + +handle_cast(_, VHost) -> + {noreply, VHost}. + +handle_info(check_vhost, VHost) -> + case rabbit_vhost:exists(VHost) of + true -> {noreply, VHost}; + false -> + rabbit_log:warning("Virtual host '~s' is gone. " + "Stopping its top level supervisor.", + [VHost]), + %% Stop vhost's top supervisor in a one-off process to avoid a deadlock: + %% us (a child process) waiting for supervisor shutdown and our supervisor(s) + %% waiting for us to shutdown. + spawn( + fun() -> + rabbit_vhost_sup_sup:stop_and_delete_vhost(VHost) + end), + {noreply, VHost} + end; +handle_info(_, VHost) -> + {noreply, VHost}. + +terminate(shutdown, VHost) -> + %% Notify that vhost is stopped. + rabbit_vhost:vhost_down(VHost); +terminate(_, _VHost) -> + ok. + +code_change(_OldVsn, VHost, _Extra) -> + {ok, VHost}. + +interval() -> + application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl index b8c7a649e5..82899f8236 100644 --- a/src/rabbit_vhost_sup.erl +++ b/src/rabbit_vhost_sup.erl @@ -18,7 +18,8 @@ -include("rabbit.hrl"). -%% Supervisor is a per-vhost supervisor to contain queues and message stores +%% Each vhost gets an instance of this supervisor that supervises +%% message stores and queues (via rabbit_amqqueue_sup_sup). -behaviour(supervisor2). -export([init/1]). -export([start_link/1]). @@ -26,9 +27,5 @@ start_link(VHost) -> supervisor2:start_link(?MODULE, [VHost]). -init([VHost]) -> - {ok, {{one_for_all, 0, 1}, - [{rabbit_vhost_sup_watcher, - {rabbit_vhost_sup_watcher, start_link, [VHost]}, - intrinsic, ?WORKER_WAIT, worker, - [rabbit_vhost_sup]}]}}. +init([_VHost]) -> + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index e528d64e0e..1d5db93fda 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -23,18 +23,23 @@ -export([init/1]). -export([start_link/0, start/0]). --export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). +-export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). -export([delete_on_all_nodes/1]). --export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]). +-export([start_on_all_nodes/1]). + +-export([save_vhost_process/2]). +-export([is_vhost_alive/1]). %% Internal --export([stop_and_delete_vhost/1]). +-export([stop_and_delete_vhost/1, start_vhost/1]). --record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). +-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}). start() -> - case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []}, - permanent, infinity, supervisor, [?MODULE]}) of + case supervisor:start_child(rabbit_sup, {?MODULE, + {?MODULE, start_link, []}, + permanent, infinity, supervisor, + [?MODULE]}) of {ok, _} -> ok; {error, Err} -> {error, Err} end. @@ -43,68 +48,43 @@ start_link() -> supervisor2:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of - ignore -> transient; - stop_node -> permanent; - transient -> transient; - permanent -> permanent - end, - + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + RestartStrategy = vhost_restart_strategy(), ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]), + {ok, {{simple_one_for_one, 0, 5}, [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []}, - VhostRestart, infinity, supervisor, + RestartStrategy, ?SUPERVISOR_WAIT, supervisor, [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. start_on_all_nodes(VHost) -> - [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], - ok. + NodesStart = [ {Node, start_vhost(VHost, Node)} + || Node <- rabbit_nodes:all_running() ], + Failures = lists:filter(fun({_, {ok, _}}) -> false; (_) -> true end, NodesStart), + case Failures of + [] -> ok; + Errors -> {error, {failed_to_start_vhost_on_nodes, Errors}} + end. delete_on_all_nodes(VHost) -> [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], ok. -start_vhost(VHost, Node) when Node == node(self()) -> - start_vhost(VHost); -start_vhost(VHost, Node) -> - case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {badrpc, RpcErr} -> - {error, RpcErr} - end. - -start_vhost(VHost) -> - case rabbit_vhost:exists(VHost) of - false -> {error, {no_such_vhost, VHost}}; - true -> - case vhost_sup_pid(VHost) of - no_pid -> - case supervisor2:start_child(?MODULE, [VHost]) of - {ok, _} -> ok; - {error, {already_started, _}} -> ok; - Error -> throw(Error) - end, - {ok, _} = vhost_sup_pid(VHost); - {ok, Pid} when is_pid(Pid) -> - {ok, Pid} - end - end. - stop_and_delete_vhost(VHost) -> case get_vhost_sup(VHost) of not_found -> ok; #vhost_sup{wrapper_pid = WrapperPid, - vhost_sup_pid = VHostSupPid} = VHostSup -> + vhost_sup_pid = VHostSupPid} -> case is_process_alive(WrapperPid) of false -> ok; true -> rabbit_log:info("Stopping vhost supervisor ~p" - " for vhost ~p~n", + " for vhost '~s'~n", [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> - ets:delete_object(?MODULE, VHostSup), + ets:delete(?MODULE, VHost), ok = rabbit_vhost:delete_storage(VHost); Other -> Other @@ -126,9 +106,31 @@ stop_and_delete_vhost(VHost, Node) -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}. -vhost_sup(VHost, Local) when Local == node(self()) -> - vhost_sup(VHost); +-spec init_vhost(rabbit_types:vhost()) -> ok. +init_vhost(VHost) -> + case start_vhost(VHost) of + {ok, _} -> ok; + {error, {no_such_vhost, VHost}} -> + {error, {no_such_vhost, VHost}}; + {error, Reason} -> + case vhost_restart_strategy() of + permanent -> + rabbit_log:error( + "Unable to initialize vhost data store for vhost '~s'." + " Reason: ~p", + [VHost, Reason]), + throw({error, Reason}); + transient -> + rabbit_log:warning( + "Unable to initialize vhost data store for vhost '~s'." + " The vhost will be stopped for this node. " + " Reason: ~p", + [VHost, Reason]), + ok + end + end. + +-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}. vhost_sup(VHost, Node) -> case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of {ok, Pid} when is_pid(Pid) -> @@ -137,9 +139,63 @@ vhost_sup(VHost, Node) -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}. +-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}. vhost_sup(VHost) -> - start_vhost(VHost). + case vhost_sup_pid(VHost) of + no_pid -> + case start_vhost(VHost) of + {ok, Pid} -> + true = is_vhost_alive(VHost), + {ok, Pid}; + {error, {no_such_vhost, VHost}} -> + {error, {no_such_vhost, VHost}}; + Error -> + throw(Error) + end; + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} + end. + +-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}. +start_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {badrpc, RpcErr} -> + {error, RpcErr} + end. + +-spec start_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. +start_vhost(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> {error, {no_such_vhost, VHost}}; + true -> + case supervisor2:start_child(?MODULE, [VHost]) of + {ok, Pid} -> {ok, Pid}; + {error, {already_started, Pid}} -> {ok, Pid}; + {error, Err} -> {error, Err} + end + end. + +-spec is_vhost_alive(rabbit_types:vhost()) -> boolean(). +is_vhost_alive(VHost) -> +%% A vhost is considered alive if it's supervision tree is alive and +%% saved in the ETS table + case get_vhost_sup(VHost) of + #vhost_sup{wrapper_pid = WrapperPid, + vhost_sup_pid = VHostSupPid, + vhost_process_pid = VHostProcessPid} + when is_pid(WrapperPid), + is_pid(VHostSupPid), + is_pid(VHostProcessPid) -> + is_process_alive(WrapperPid) + andalso + is_process_alive(VHostSupPid) + andalso + is_process_alive(VHostProcessPid); + _ -> false + end. + -spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok. save_vhost_sup(VHost, WrapperPid, VHostPid) -> @@ -148,6 +204,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) -> wrapper_pid = WrapperPid}), ok. +-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok. +save_vhost_process(VHost, VHostProcessPid) -> + true = ets:update_element(?MODULE, VHost, + {#vhost_sup.vhost_process_pid, VHostProcessPid}), + ok. + -spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}. get_vhost_sup(VHost) -> case ets:lookup(?MODULE, VHost) of @@ -169,3 +231,12 @@ vhost_sup_pid(VHost) -> end end. +vhost_restart_strategy() -> + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + case application:get_env(rabbit, vhost_restart_strategy, continue) of + continue -> transient; + stop_node -> permanent; + transient -> transient; + permanent -> permanent + end. diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl deleted file mode 100644 index 3ce726621f..0000000000 --- a/src/rabbit_vhost_sup_watcher.erl +++ /dev/null @@ -1,66 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. -%% - -%% This module implements a watcher process which should stop -%% the parent supervisor if its vhost is missing from the mnesia DB - --module(rabbit_vhost_sup_watcher). - --include("rabbit.hrl"). - --define(TICKTIME_RATIO, 4). - --behaviour(gen_server2). --export([start_link/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - - -start_link(VHost) -> - gen_server2:start_link(?MODULE, [VHost], []). - - -init([VHost]) -> - Interval = interval(), - timer:send_interval(Interval, check_vhost), - {ok, VHost}. - -handle_call(_,_,VHost) -> - {reply, ok, VHost}. - -handle_cast(_, VHost) -> - {noreply, VHost}. - -handle_info(check_vhost, VHost) -> - case rabbit_vhost:exists(VHost) of - true -> {noreply, VHost}; - false -> - rabbit_log:error(" Vhost \"~p\" is gone." - " Stopping message store supervisor.", - [VHost]), - {stop, normal, VHost} - end; -handle_info(_, VHost) -> - {noreply, VHost}. - -terminate(_, _) -> ok. - -code_change(_OldVsn, VHost, _Extra) -> - {ok, VHost}. - -interval() -> - application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
\ No newline at end of file diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 3396f71fa9..8e23389bb9 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -14,6 +14,9 @@ %% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. %% +%% This module is a wrapper around vhost supervisor to +%% provide exactly once restart semantics. + -module(rabbit_vhost_sup_wrapper). -include("rabbit.hrl"). @@ -24,30 +27,35 @@ -export([start_vhost_sup/1]). start_link(VHost) -> - supervisor2:start_link(?MODULE, [VHost]). - -%% This module is a wrapper around vhost supervisor to -%% provide exactly once restart. - -%% rabbit_vhost_sup supervisor children are added dynamically, -%% so one_for_all strategy cannot be used. + %% Using supervisor, because supervisor2 does not stop a started child when + %% another one fails to start. Bug? + supervisor:start_link(?MODULE, [VHost]). init([VHost]) -> - %% Two restarts in 1 hour. One per message store. - {ok, {{one_for_all, 2, 3600000}, - [{rabbit_vhost_sup, - {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, - permanent, infinity, supervisor, - [rabbit_vhost_sup]}]}}. + %% 2 restarts in 5 minutes. One per message store. + {ok, {{one_for_all, 2, 300}, + [ + %% rabbit_vhost_sup is an empty supervisor container for + %% all data processes. + {rabbit_vhost_sup, + {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, + permanent, infinity, supervisor, + [rabbit_vhost_sup]}, + %% rabbit_vhost_process is a vhost identity process, which + %% is responsible for data recovery and vhost aliveness status. + %% See the module comments for more info. + {rabbit_vhost_process, + {rabbit_vhost_process, start_link, [VHost]}, + permanent, ?WORKER_WAIT, worker, + [rabbit_vhost_process]}]}}. + start_vhost_sup(VHost) -> case rabbit_vhost_sup:start_link(VHost) of {ok, Pid} -> %% Save vhost sup record with wrapper pid and vhost sup pid. ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid), - %% We can start recover as soon as we have vhost_sup record saved - ok = rabbit_vhost:recover(VHost), {ok, Pid}; Other -> Other - end. + end.
\ No newline at end of file diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index c2353ab85f..88062dc32a 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -40,19 +40,18 @@ memory() -> [aggregate(Names, Sums, memory, fun (X) -> X end) || Names <- distinguished_interesting_sups()], - MnesiaETS = mnesia_memory(), - MsgIndexETS = ets_memory(msg_stores()), - MetricsETS = ets_memory([rabbit_metrics]), - MetricsProc = - try - [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), - M - catch - error:badarg -> - 0 - end, - MgmtDbETS = ets_memory([rabbit_mgmt_storage]), - OsTotal = vm_memory_monitor:get_process_memory(), + MnesiaETS = mnesia_memory(), + MsgIndexETS = ets_memory(msg_stores()), + MetricsETS = ets_memory([rabbit_metrics]), + MetricsProc = try + [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), + M + catch + error:badarg -> + 0 + end, + MgmtDbETS = ets_memory([rabbit_mgmt_storage]), + VMTotal = vm_memory_monitor:get_process_memory(), [{total, ErlangTotal}, {processes, Processes}, @@ -63,6 +62,11 @@ memory() -> {system, System}] = erlang:memory([total, processes, ets, atom, binary, code, system]), + Unaccounted = case VMTotal - ErlangTotal of + GTZ when GTZ > 0 -> GTZ; + _LTZ -> 0 + end, + OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, @@ -97,9 +101,9 @@ memory() -> %% System {code, Code}, {atom, Atom}, - {other_system, System - ETS - Bin - Code - Atom + (OsTotal - ErlangTotal)}, + {other_system, System - ETS - Bin - Code - Atom + Unaccounted}, - {total, OsTotal} + {total, VMTotal} ]. %% [1] - erlang:memory(processes) can be less than the sum of its %% parts. Rather than display something nonsensical, just silence any diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 09d7ab4e95..247477bf40 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -26,13 +26,9 @@ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - zip_msgs_and_acks/4, multiple_routing_keys/0]). - + zip_msgs_and_acks/4]). -export([start/2, stop/1]). -%% exported for testing only --export([start_msg_store/3, stop_msg_store/1, init/6]). - %%---------------------------------------------------------------------------- %% This test backing queue follows the variable queue implementation, with %% the exception that it will introduce infinite delays on some operations if @@ -64,6 +60,7 @@ unacked_bytes, persistent_count, %% w unacked persistent_bytes, %% w unacked + delta_transient_bytes, %% target_ram_count, ram_msg_count, %% w/o unacked @@ -88,6 +85,12 @@ %% default queue or lazy queue mode, + %% number of reduce_memory_usage executions, once it + %% reaches a threshold the queue will manually trigger a runtime GC + %% see: maybe_execute_gc/1 + memory_reduction_run_count, + %% Queue data is grouped by VHost. We need to store it + %% to work with queue index. virtual_host }). @@ -108,22 +111,18 @@ -record(delta, { start_seq_id, %% start_seq_id is inclusive count, + transient, end_seq_id %% end_seq_id is exclusive }). --define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). --define(QUEUE, lqueue). --define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). --define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). -%%---------------------------------------------------------------------------- +-define(QUEUE, lqueue). +-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>). --rabbit_upgrade({multiple_routing_keys, local, []}). +%%---------------------------------------------------------------------------- -type seq_id() :: non_neg_integer(). @@ -191,2220 +190,118 @@ %% Duplicated from rabbit_backing_queue -spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. --spec multiple_routing_keys() -> 'ok'. - --define(BLANK_DELTA, #delta { start_seq_id = undefined, - count = 0, - end_seq_id = undefined }). --define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, - count = 0, - end_seq_id = Z }). - --define(MICROS_PER_SECOND, 1000000.0). - -%% We're sampling every 5s for RAM duration; a half life that is of -%% the same order of magnitude is probably about right. --define(RATE_AVG_HALF_LIFE, 5.0). - -%% We will recalculate the #rates{} every time we get asked for our -%% RAM duration, or every N messages published, whichever is -%% sooner. We do this since the priority calculations in -%% rabbit_amqqueue_process need fairly fresh rates. --define(MSGS_PER_RATE_CALC, 100). - %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- start(VHost, DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues), - %% Group recovery terms by vhost. - ClientRefs = [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - start_msg_store(VHost, ClientRefs, StartFunState), - {ok, AllTerms}. + rabbit_variable_queue:start(VHost, DurableQueues). stop(VHost) -> - ok = stop_msg_store(VHost), - ok = rabbit_queue_index:stop(VHost). - -start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> - rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?TRANSIENT_MSG_STORE, - undefined, - ?EMPTY_START_FUN_STATE), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?PERSISTENT_MSG_STORE, - Refs, - StartFunState), - rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). - -stop_msg_store(VHost) -> - rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), - rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE), - ok. - + rabbit_variable_queue:stop(VHost). init(Queue, Recover, Callback) -> - init( - Queue, Recover, Callback, - fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(Callback, MsgIds, ActionTaken) - end, - fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, - fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). - -init(#amqqueue { name = QueueName, durable = IsDurable }, new, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> - IndexState = rabbit_queue_index:init(QueueName, - MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - VHost = QueueName#resource.virtual_host, - init(IsDurable, IndexState, 0, 0, [], - case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback, - VHost); - false -> undefined - end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback, VHost), VHost); - -%% We can be recovering a transient queue if it crashed -init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, - AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) -> - {PRef, RecoveryTerms} = process_recovery_terms(Terms), - VHost = QueueName#resource.virtual_host, - {PersistentClient, ContainsCheckFun} = - case IsDurable of - true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback, - VHost), - {C, fun (MsgId) when is_binary(MsgId) -> - rabbit_msg_store:contains(MsgId, C); - (#basic_message{is_persistent = Persistent}) -> - Persistent - end}; - false -> {undefined, fun(_MsgId) -> false end} - end, - TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback, - VHost), - {DeltaCount, DeltaBytes, IndexState} = - rabbit_queue_index:recover( - QueueName, RecoveryTerms, - rabbit_vhost_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), - ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, - PersistentClient, TransientClient, VHost). - -process_recovery_terms(Terms=non_clean_shutdown) -> - {rabbit_guid:gen(), Terms}; -process_recovery_terms(Terms) -> - case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:gen(), []}; - PRef -> {PRef, Terms} - end. - -terminate(_Reason, State) -> - State1 = #vqstate { persistent_count = PCount, - persistent_bytes = PBytes, - index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT}, - virtual_host = VHost } = - purge_pending_ack(true, State), - PRef = case MSCStateP of - undefined -> undefined; - _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), - rabbit_msg_store:client_ref(MSCStateP) - end, - ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, - {persistent_count, PCount}, - {persistent_bytes, PBytes}], - a(State1 #vqstate { index_state = rabbit_queue_index:terminate( - VHost, Terms, IndexState), - msg_store_clients = undefined }). - -%% the only difference between purge and delete is that delete also -%% needs to delete everything that's been delivered and not ack'd. -delete_and_terminate(_Reason, State) -> - %% Normally when we purge messages we interact with the qi by - %% issues delivers and acks for every purged message. In this case - %% we don't need to do that, so we just delete the qi. - State1 = purge_and_index_reset(State), - State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = - purge_pending_ack_delete_and_terminate(State1), - case MSCStateP of - undefined -> ok; - _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) - end, - rabbit_msg_store:client_delete_and_terminate(MSCStateT), - a(State2 #vqstate { msg_store_clients = undefined }). - -delete_crashed(#amqqueue{name = QName}) -> - ok = rabbit_queue_index:erase(QName). - -purge(State = #vqstate { len = Len, qi_pending_ack= QPA }) -> + rabbit_variable_queue:init(Queue, Recover, Callback). + +terminate(Reason, State) -> + rabbit_variable_queue:terminate(Reason, State). + +delete_and_terminate(Reason, State) -> + rabbit_variable_queue:delete_and_terminate(Reason, State). + +delete_crashed(Q) -> + rabbit_variable_queue:delete_crashed(Q). + +purge(State = #vqstate { qi_pending_ack= QPA }) -> maybe_delay(QPA), - case is_pending_ack_empty(State) of - true -> - {Len, purge_and_index_reset(State)}; - false -> - {Len, purge_when_pending_acks(State)} - end. + rabbit_variable_queue:purge(State). -purge_acks(State) -> a(purge_pending_ack(false, State)). +purge_acks(State) -> + rabbit_variable_queue:purge_acks(State). publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> - State1 = - publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, - fun maybe_write_to_disk/4, - State), - a(reduce_memory_use(maybe_update_rates(State1))). + rabbit_variable_queue:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State). batch_publish(Publishes, ChPid, Flow, State) -> - {ChPid, Flow, State1} = - lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes), - State2 = ui(State1), - a(reduce_memory_use(maybe_update_rates(State2))). + rabbit_variable_queue:batch_publish(Publishes, ChPid, Flow, State). publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> - {SeqId, State1} = - publish_delivered1(Msg, MsgProps, ChPid, Flow, - fun maybe_write_to_disk/4, - State), - {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}. + rabbit_variable_queue:publish_delivered(Msg, MsgProps, ChPid, Flow, State). batch_publish_delivered(Publishes, ChPid, Flow, State) -> - {ChPid, Flow, SeqIds, State1} = - lists:foldl(fun batch_publish_delivered1/2, - {ChPid, Flow, [], State}, Publishes), - State2 = ui(State1), - {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}. + rabbit_variable_queue:batch_publish_delivered(Publishes, ChPid, Flow, State). discard(_MsgId, _ChPid, _Flow, State) -> State. -drain_confirmed(State = #vqstate { confirmed = C }) -> - case gb_sets:is_empty(C) of - true -> {[], State}; %% common case - false -> {gb_sets:to_list(C), State #vqstate { - confirmed = gb_sets:new() }} - end. +drain_confirmed(State) -> + rabbit_variable_queue:drain_confirmed(State). dropwhile(Pred, State) -> - {MsgProps, State1} = - remove_by_predicate(Pred, State), - {MsgProps, a(State1)}. + rabbit_variable_queue:dropwhile(Pred, State). fetchwhile(Pred, Fun, Acc, State) -> - {MsgProps, Acc1, State1} = - fetch_by_predicate(Pred, Fun, Acc, State), - {MsgProps, Acc1, a(State1)}. + rabbit_variable_queue:fetchwhile(Pred, Fun, Acc, State). fetch(AckRequired, State) -> - case queue_out(State) of - {empty, State1} -> - {empty, a(State1)}; - {{value, MsgStatus}, State1} -> - %% it is possible that the message wasn't read from disk - %% at this point, so read it in. - {Msg, State2} = read_msg(MsgStatus, State1), - {AckTag, State3} = remove(AckRequired, MsgStatus, State2), - {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} - end. + rabbit_variable_queue:fetch(AckRequired, State). drop(AckRequired, State) -> - case queue_out(State) of - {empty, State1} -> - {empty, a(State1)}; - {{value, MsgStatus}, State1} -> - {AckTag, State2} = remove(AckRequired, MsgStatus, State1), - {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} - end. - -ack([], State) -> - {[], State}; -%% optimisation: this head is essentially a partial evaluation of the -%% general case below, for the single-ack case. -ack([SeqId], State) -> - {#msg_status { msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} = - remove_pending_ack(true, SeqId, State), - IndexState1 = case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState); - false -> IndexState - end, - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - {[MsgId], - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + 1 })}; -ack(AckTags, State) -> - {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} = - lists:foldl( - fun (SeqId, {Acc, State2}) -> - {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2), - {accumulate_ack(MsgStatus, Acc), State3} - end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - {lists:reverse(AllMsgIds), - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + length(AckTags) })}. - -requeue(AckTags, #vqstate { mode = default, - delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len, - qi_pending_ack = QPA } = State) -> - maybe_delay(QPA), - {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], - beta_limit(Q3), - fun publish_alpha/2, State), - {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, - delta_limit(Delta), - fun publish_beta/2, State1), - {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, - State2), - MsgCount = length(MsgIds2), - {MsgIds2, a(reduce_memory_use( - maybe_update_rates( - State3 #vqstate { delta = Delta1, - q3 = Q3a, - q4 = Q4a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}; -requeue(AckTags, #vqstate { mode = lazy, - delta = Delta, - q3 = Q3, - in_counter = InCounter, - len = Len, - qi_pending_ack = QPA } = State) -> + rabbit_variable_queue:drop(AckRequired, State). + +ack(List, State) -> + rabbit_variable_queue:ack(List, State). + +requeue(AckTags, #vqstate { qi_pending_ack = QPA } = State) -> maybe_delay(QPA), - {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [], - delta_limit(Delta), - fun publish_beta/2, State), - {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, - State1), - MsgCount = length(MsgIds1), - {MsgIds1, a(reduce_memory_use( - maybe_update_rates( - State2 #vqstate { delta = Delta1, - q3 = Q3a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}. + rabbit_variable_queue:requeue(AckTags, State). ackfold(MsgFun, Acc, State, AckTags) -> - {AccN, StateN} = - lists:foldl(fun(SeqId, {Acc0, State0}) -> - MsgStatus = lookup_pending_ack(SeqId, State0), - {Msg, State1} = read_msg(MsgStatus, State0), - {MsgFun(Msg, SeqId, Acc0), State1} - end, {Acc, State}, AckTags), - {AccN, a(StateN)}. - -fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> - {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, - [msg_iterator(State), - disk_ack_iterator(State), - ram_ack_iterator(State), - qi_ack_iterator(State)]), - ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). - -len(#vqstate { len = Len, qi_pending_ack = QPA }) -> + rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags). + +fold(Fun, Acc, State) -> + rabbit_variable_queue:fold(Fun, Acc, State). + +len(#vqstate { qi_pending_ack = QPA } = State) -> maybe_delay(QPA), - Len. + rabbit_variable_queue:len(State). is_empty(State) -> 0 == len(State). depth(State) -> - len(State) + count_pending_acks(State). - -set_ram_duration_target( - DurationTarget, State = #vqstate { - rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }, - target_ram_count = TargetRamCount }) -> - Rate = - AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, - TargetRamCount1 = - case DurationTarget of - infinity -> infinity; - _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec - end, - State1 = State #vqstate { target_ram_count = TargetRamCount1 }, - a(case TargetRamCount1 == infinity orelse - (TargetRamCount =/= infinity andalso - TargetRamCount1 >= TargetRamCount) of - true -> State1; - false -> reduce_memory_use(State1) - end). - -maybe_update_rates(State = #vqstate{ in_counter = InCount, - out_counter = OutCount }) - when InCount + OutCount > ?MSGS_PER_RATE_CALC -> - update_rates(State); -maybe_update_rates(State) -> - State. - -update_rates(State = #vqstate{ in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount, - rates = #rates{ in = InRate, - out = OutRate, - ack_in = AckInRate, - ack_out = AckOutRate, - timestamp = TS }}) -> - Now = erlang:monotonic_time(), - - Rates = #rates { in = update_rate(Now, TS, InCount, InRate), - out = update_rate(Now, TS, OutCount, OutRate), - ack_in = update_rate(Now, TS, AckInCount, AckInRate), - ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), - timestamp = Now }, - - State#vqstate{ in_counter = 0, - out_counter = 0, - ack_in_counter = 0, - ack_out_counter = 0, - rates = Rates }. - -update_rate(Now, TS, Count, Rate) -> - Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) / - ?MICROS_PER_SECOND, - if - Time == 0 -> Rate; - true -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, - Count / Time, Rate) - end. - -ram_duration(State) -> - State1 = #vqstate { rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }, - ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev, - ram_pending_ack = RPA, - qi_pending_ack = QPA, - ram_ack_count_prev = RamAckCountPrev } = - update_rates(State), - - RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA), - - Duration = %% msgs+acks / (msgs+acks/sec) == sec - case lists:all(fun (X) -> X < 0.01 end, - [AvgEgressRate, AvgIngressRate, - AvgAckEgressRate, AvgAckIngressRate]) of - true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount + - RamAckCount + RamAckCountPrev) / - (4 * (AvgEgressRate + AvgIngressRate + - AvgAckEgressRate + AvgAckIngressRate)) - end, - - {Duration, State1}. - -needs_timeout(#vqstate { index_state = IndexState }) -> - case rabbit_queue_index:needs_sync(IndexState) of - confirms -> timed; - other -> idle; - false -> false - end. - -timeout(State = #vqstate { index_state = IndexState }) -> - State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }. - -handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> - State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. - -resume(State) -> a(reduce_memory_use(State)). - -msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, - out = AvgEgressRate } }) -> - {AvgIngressRate, AvgEgressRate}. - -info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> - RamMsgCount; -info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA, - qi_pending_ack = QPA}) -> - gb_trees:size(RPA) + gb_trees:size(QPA); -info(messages_ram, State) -> - info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); -info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> - PersistentCount; -info(message_bytes, #vqstate{bytes = Bytes, - unacked_bytes = UBytes}) -> - Bytes + UBytes; -info(message_bytes_ready, #vqstate{bytes = Bytes}) -> - Bytes; -info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) -> - UBytes; -info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> - RamBytes; -info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> - PersistentBytes; -info(head_message_timestamp, #vqstate{ - q3 = Q3, - q4 = Q4, - ram_pending_ack = RPA, - qi_pending_ack = QPA}) -> - head_message_timestamp(Q3, Q4, RPA, QPA); -info(disk_reads, #vqstate{disk_read_count = Count}) -> - Count; -info(disk_writes, #vqstate{disk_write_count = Count}) -> - Count; -info(backing_queue_status, #vqstate { - q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = Mode, - len = Len, - target_ram_count = TargetRamCount, - next_seq_id = NextSeqId, - rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }}) -> - - [ {mode , Mode}, - {q1 , ?QUEUE:len(Q1)}, - {q2 , ?QUEUE:len(Q2)}, - {delta , Delta}, - {q3 , ?QUEUE:len(Q3)}, - {q4 , ?QUEUE:len(Q4)}, - {len , Len}, - {target_ram_count , TargetRamCount}, - {next_seq_id , NextSeqId}, - {avg_ingress_rate , AvgIngressRate}, - {avg_egress_rate , AvgEgressRate}, - {avg_ack_ingress_rate, AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]; -info(Item, _) -> - throw({bad_argument, Item}). - -invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); -invoke( _, _, State) -> State. - -is_duplicate(_Msg, State) -> {false, State}. - -set_queue_mode(Mode, State = #vqstate { mode = Mode }) -> - State; -set_queue_mode(lazy, State = #vqstate { - target_ram_count = TargetRamCount }) -> - %% To become a lazy queue we need to page everything to disk first. - State1 = convert_to_lazy(State), - %% restore the original target_ram_count - a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount }); -set_queue_mode(default, State) -> - %% becoming a default queue means loading messages from disk like - %% when a queue is recovered. - a(maybe_deltas_to_betas(State #vqstate { mode = default })); -set_queue_mode(_, State) -> - State. - -zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) -> - lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) -> - [{Id, AckTag} | Acc] - end, Accumulator, lists:zip(Msgs, AckTags)). - -convert_to_lazy(State) -> - State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } = - set_ram_duration_target(0, State), - case Delta#delta.count + ?QUEUE:len(Q3) == Len of - true -> - State1; - false -> - %% When pushing messages to disk, we might have been - %% blocked by the msg_store, so we need to see if we have - %% to wait for more credit, and then keep paging messages. - %% - %% The amqqueue_process could have taken care of this, but - %% between the time it receives the bump_credit msg and - %% calls BQ:resume to keep paging messages to disk, some - %% other request may arrive to the BQ which at this moment - %% is not in a proper state for a lazy BQ (unless all - %% messages have been paged to disk already). - wait_for_msg_store_credit(), - convert_to_lazy(State1) - end. - -wait_for_msg_store_credit() -> - case credit_flow:blocked() of - true -> receive - {bump_credit, Msg} -> - credit_flow:handle_bump_msg(Msg) - end; - false -> ok - end. - -%% Get the Timestamp property of the first msg, if present. This is -%% the one with the oldest timestamp among the heads of the pending -%% acks and unread queues. We can't check disk_pending_acks as these -%% are paged out - we assume some will soon be paged in rather than -%% forcing it to happen. Pending ack msgs are included as they are -%% regarded as unprocessed until acked, this also prevents the result -%% apparently oscillating during repeated rejects. Q3 is only checked -%% when Q4 is empty as any Q4 msg will be earlier. -head_message_timestamp(Q3, Q4, RPA, QPA) -> - HeadMsgs = [ HeadMsgStatus#msg_status.msg || - HeadMsgStatus <- - [ get_qs_head([Q4, Q3]), - get_pa_head(RPA), - get_pa_head(QPA) ], - HeadMsgStatus /= undefined, - HeadMsgStatus#msg_status.msg /= undefined ], - - Timestamps = - [Timestamp || HeadMsg <- HeadMsgs, - Timestamp <- [rabbit_basic:extract_timestamp( - HeadMsg#basic_message.content)], - Timestamp /= undefined - ], - - case Timestamps == [] of - true -> ''; - false -> lists:min(Timestamps) - end. - -get_qs_head(Qs) -> - catch lists:foldl( - fun (Q, Acc) -> - case get_q_head(Q) of - undefined -> Acc; - Val -> throw(Val) - end - end, undefined, Qs). - -get_q_head(Q) -> - get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1). - -get_pa_head(PA) -> - get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1). - -get_collection_head(Col, IsEmpty, GetVal) -> - case IsEmpty(Col) of - false -> - {_, MsgStatus} = GetVal(Col), - MsgStatus; - true -> undefined - end. - -%%---------------------------------------------------------------------------- -%% Minor helpers -%%---------------------------------------------------------------------------- -a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = default, - len = Len, - bytes = Bytes, - unacked_bytes = UnackedBytes, - persistent_count = PersistentCount, - persistent_bytes = PersistentBytes, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes}) -> - E1 = ?QUEUE:is_empty(Q1), - E2 = ?QUEUE:is_empty(Q2), - ED = Delta#delta.count == 0, - E3 = ?QUEUE:is_empty(Q3), - E4 = ?QUEUE:is_empty(Q4), - LZ = Len == 0, - - %% if q1 has messages then q3 cannot be empty. See publish/6. - true = E1 or not E3, - %% if q2 has messages then we have messages in delta (paged to - %% disk). See push_alphas_to_betas/2. - true = E2 or not ED, - %% if delta has messages then q3 cannot be empty. This is enforced - %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages - %% are always kept on RAM. - true = ED or not E3, - %% if the queue length is 0, then q3 and q4 must be empty. - true = LZ == (E3 and E4), - - true = Len >= 0, - true = Bytes >= 0, - true = UnackedBytes >= 0, - true = PersistentCount >= 0, - true = PersistentBytes >= 0, - true = RamMsgCount >= 0, - true = RamMsgCount =< Len, - true = RamBytes >= 0, - true = RamBytes =< Bytes + UnackedBytes, - - State; -a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = lazy, - len = Len, - bytes = Bytes, - unacked_bytes = UnackedBytes, - persistent_count = PersistentCount, - persistent_bytes = PersistentBytes, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes}) -> - E1 = ?QUEUE:is_empty(Q1), - E2 = ?QUEUE:is_empty(Q2), - ED = Delta#delta.count == 0, - E3 = ?QUEUE:is_empty(Q3), - E4 = ?QUEUE:is_empty(Q4), - LZ = Len == 0, - L3 = ?QUEUE:len(Q3), - - %% q1 must always be empty, since q1 only gets messages during - %% publish, but for lazy queues messages go straight to delta. - true = E1, - - %% q2 only gets messages from q1 when push_alphas_to_betas is - %% called for a non empty delta, which won't be the case for a - %% lazy queue. This means q2 must always be empty. - true = E2, - - %% q4 must always be empty, since q1 only gets messages during - %% publish, but for lazy queues messages go straight to delta. - true = E4, - - %% if the queue is empty, then delta is empty and q3 is empty. - true = LZ == (ED and E3), - - %% There should be no messages in q1, q2, and q4 - true = Delta#delta.count + L3 == Len, - - true = Len >= 0, - true = Bytes >= 0, - true = UnackedBytes >= 0, - true = PersistentCount >= 0, - true = PersistentBytes >= 0, - true = RamMsgCount >= 0, - true = RamMsgCount =< Len, - true = RamBytes >= 0, - true = RamBytes =< Bytes + UnackedBytes, - - State. - -d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) - when Start + Count =< End -> - Delta. - -m(MsgStatus = #msg_status { is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }) -> - true = (not IsPersistent) or IndexOnDisk, - true = msg_in_ram(MsgStatus) or MsgInStore, - MsgStatus. - -one_if(true ) -> 1; -one_if(false) -> 0. - -cons_if(true, E, L) -> [E | L]; -cons_if(false, _E, L) -> L. - -gb_sets_maybe_insert(false, _Val, Set) -> Set; -gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). - -msg_status(IsPersistent, IsDelivered, SeqId, - Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) -> - #msg_status{seq_id = SeqId, - msg_id = MsgId, - msg = Msg, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_in_store = false, - index_on_disk = false, - persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), - msg_props = MsgProps}. - -beta_msg_status({Msg = #basic_message{id = MsgId}, - SeqId, MsgProps, IsPersistent, IsDelivered}) -> - MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), - MS0#msg_status{msg_id = MsgId, - msg = Msg, - persist_to = queue_index, - msg_in_store = false}; - -beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> - MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), - MS0#msg_status{msg_id = MsgId, - msg = undefined, - persist_to = msg_store, - msg_in_store = true}. - -beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> - #msg_status{seq_id = SeqId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = true, - msg_props = MsgProps}. - -trim_msg_status(MsgStatus) -> - case persist_to(MsgStatus) of - msg_store -> MsgStatus#msg_status{msg = undefined}; - queue_index -> MsgStatus - end. - -with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> - {Result, MSCStateP1} = Fun(MSCStateP), - {Result, {MSCStateP1, MSCStateT}}; -with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) -> - {Result, MSCStateT1} = Fun(MSCStateT), - {Result, {MSCStateP, MSCStateT1}}. - -with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> - {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent, - fun (MSCState1) -> - {Fun(MSCState1), MSCState1} - end), - Res. - -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> - msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback, VHost). - -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> - CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_vhost_msg_store:client_init(VHost, - MsgStore, Ref, MsgOnDiskFun, - fun () -> Callback(?MODULE, CloseFDsFun) end). - -msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> - rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) - end). - -msg_store_read(MSCState, IsPersistent, MsgId) -> - with_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> - rabbit_msg_store:read(MsgId, MSCState1) - end). - -msg_store_remove(MSCState, IsPersistent, MsgIds) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MCSState1) -> - rabbit_msg_store:remove(MsgIds, MCSState1) - end). - -msg_store_close_fds(MSCState, IsPersistent) -> - with_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). - -msg_store_close_fds_fun(IsPersistent) -> - fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) -> - {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), - State #vqstate { msg_store_clients = MSCState1 } - end. - -maybe_write_delivered(false, _SeqId, IndexState) -> - IndexState; -maybe_write_delivered(true, SeqId, IndexState) -> - rabbit_queue_index:deliver([SeqId], IndexState). - -betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> - {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = - lists:foldr( - fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> - case SeqId < TransientThreshold andalso not IsPersistent of - true -> {Filtered1, - cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1], RRC, RB}; - false -> MsgStatus = m(beta_msg_status(M)), - HaveMsg = msg_in_ram(MsgStatus), - Size = msg_size(MsgStatus), - case is_msg_in_pending_acks(SeqId, State) of - false -> {?QUEUE:in_r(MsgStatus, Filtered1), - Delivers1, Acks1, - RRC + one_if(HaveMsg), - RB + one_if(HaveMsg) * Size}; - true -> Acc %% [0] - end - end - end, {?QUEUE:new(), [], [], 0, 0}, List), - {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}. -%% [0] We don't increase RamBytes here, even though it pertains to -%% unacked messages too, since if HaveMsg then the message must have -%% been stored in the QI, thus the message must have been in -%% qi_pending_ack, thus it must already have been in RAM. - -is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA) orelse - gb_trees:is_defined(SeqId, QPA)). - -expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> - d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); -expand_delta(SeqId, #delta { start_seq_id = StartSeqId, - count = Count } = Delta) - when SeqId < StartSeqId -> - d(Delta #delta { start_seq_id = SeqId, count = Count + 1 }); -expand_delta(SeqId, #delta { count = Count, - end_seq_id = EndSeqId } = Delta) - when SeqId >= EndSeqId -> - d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 }); -expand_delta(_SeqId, #delta { count = Count } = Delta) -> - d(Delta #delta { count = Count + 1 }). - -%%---------------------------------------------------------------------------- -%% Internal major helpers for Public API -%%---------------------------------------------------------------------------- - -init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, - PersistentClient, TransientClient, VHost) -> - {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - - {DeltaCount1, DeltaBytes1} = - case Terms of - non_clean_shutdown -> {DeltaCount, DeltaBytes}; - _ -> {proplists:get_value(persistent_count, - Terms, DeltaCount), - proplists:get_value(persistent_bytes, - Terms, DeltaBytes)} - end, - Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of - true -> ?BLANK_DELTA; - false -> d(#delta { start_seq_id = LowSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId }) - end, - Now = erlang:monotonic_time(), - IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, - ?IO_BATCH_SIZE), - - {ok, IndexMaxSize} = application:get_env( - rabbit, queue_index_embed_msgs_below), - State = #vqstate { - q1 = ?QUEUE:new(), - q2 = ?QUEUE:new(), - delta = Delta, - q3 = ?QUEUE:new(), - q4 = ?QUEUE:new(), - next_seq_id = NextSeqId, - ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty(), - qi_pending_ack = gb_trees:empty(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - durable = IsDurable, - transient_threshold = NextSeqId, - qi_embed_msgs_below = IndexMaxSize, - - len = DeltaCount1, - persistent_count = DeltaCount1, - bytes = DeltaBytes1, - persistent_bytes = DeltaBytes1, - - target_ram_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_ack_count_prev = 0, - ram_bytes = 0, - unacked_bytes = 0, - out_counter = 0, - in_counter = 0, - rates = blank_rates(Now), - msgs_on_disk = gb_sets:new(), - msg_indices_on_disk = gb_sets:new(), - unconfirmed = gb_sets:new(), - confirmed = gb_sets:new(), - ack_out_counter = 0, - ack_in_counter = 0, - disk_read_count = 0, - disk_write_count = 0, - - io_batch_size = IoBatchSize, - - mode = default, - virtual_host = VHost }, - a(maybe_deltas_to_betas(State)). - -blank_rates(Now) -> - #rates { in = 0.0, - out = 0.0, - ack_in = 0.0, - ack_out = 0.0, - timestamp = Now}. - -in_r(MsgStatus = #msg_status { msg = undefined }, - State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) -> - case ?QUEUE:is_empty(Q4) of - true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; - false -> {Msg, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, State), - MsgStatus1 = MsgStatus#msg_status{msg = Msg}, - stats(ready0, {MsgStatus, MsgStatus1}, - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) - end; -in_r(MsgStatus, - State = #vqstate { mode = default, q4 = Q4 }) -> - State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }; -%% lazy queues -in_r(MsgStatus = #msg_status { seq_id = SeqId }, - State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) -> - case ?QUEUE:is_empty(Q3) of - true -> - {_MsgStatus1, State1} = - maybe_write_to_disk(true, true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), - Delta1 = expand_delta(SeqId, Delta), - State2 #vqstate{ delta = Delta1 }; - false -> - State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } - end. - -queue_out(State = #vqstate { mode = default, q4 = Q4 }) -> - case ?QUEUE:out(Q4) of - {empty, _Q4} -> - case fetch_from_q3(State) of - {empty, _State1} = Result -> Result; - {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} - end; - {{value, MsgStatus}, Q4a} -> - {{value, MsgStatus}, State #vqstate { q4 = Q4a }} - end; -%% lazy queues -queue_out(State = #vqstate { mode = lazy }) -> - case fetch_from_q3(State) of - {empty, _State1} = Result -> Result; - {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} - end. - -read_msg(#msg_status{msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent}, State) -> - read_msg(MsgId, IsPersistent, State); -read_msg(#msg_status{msg = Msg}, State) -> - {Msg, State}. - -read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, - disk_read_count = Count}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1, - disk_read_count = Count + 1}}. - -stats(Signs, Statuses, State) -> - stats0(expand_signs(Signs), expand_statuses(Statuses), State). - -expand_signs(ready0) -> {0, 0, true}; -expand_signs(lazy_pub) -> {1, 0, true}; -expand_signs({A, B}) -> {A, B, false}. - -expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; -expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; -expand_statuses({lazy, A}) -> {false , false, A}; -expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. - -%% In this function at least, we are religious: the variable name -%% contains "Ready" or "Unacked" iff that is what it counts. If -%% neither is present it counts both. -stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, - {InRamBefore, InRamAfter, MsgStatus}, - State = #vqstate{len = ReadyCount, - bytes = ReadyBytes, - ram_msg_count = RamReadyCount, - persistent_count = PersistentCount, - unacked_bytes = UnackedBytes, - ram_bytes = RamBytes, - persistent_bytes = PersistentBytes}) -> - S = msg_size(MsgStatus), - DeltaTotal = DeltaReady + DeltaUnacked, - DeltaRam = case {InRamBefore, InRamAfter} of - {false, false} -> 0; - {false, true} -> 1; - {true, false} -> -1; - {true, true} -> 0 - end, - DeltaRamReady = case DeltaReady of - 1 -> one_if(InRamAfter); - -1 -> -one_if(InRamBefore); - 0 when ReadyMsgPaged -> DeltaRam; - 0 -> 0 - end, - DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent), - State#vqstate{len = ReadyCount + DeltaReady, - ram_msg_count = RamReadyCount + DeltaRamReady, - persistent_count = PersistentCount + DeltaPersistent, - bytes = ReadyBytes + DeltaReady * S, - unacked_bytes = UnackedBytes + DeltaUnacked * S, - ram_bytes = RamBytes + DeltaRam * S, - persistent_bytes = PersistentBytes + DeltaPersistent * S}. - -msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. - -msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. - -%% first param: AckRequired -remove(true, MsgStatus = #msg_status { - seq_id = SeqId, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk }, - State = #vqstate {out_counter = OutCount, - index_state = IndexState}) -> - %% Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - State1 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - - State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1), - - {SeqId, maybe_update_rates( - State2 #vqstate {out_counter = OutCount + 1, - index_state = IndexState1})}; - -%% This function body has the same behaviour as remove_queue_entries/3 -%% but instead of removing messages based on a ?QUEUE, this removes -%% just one message, the one referenced by the MsgStatus provided. -remove(false, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State = #vqstate {out_counter = OutCount, - index_state = IndexState, - msg_store_clients = MSCState}) -> - %% Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% Remove from msg_store and queue index, if necessary - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - - IndexState2 = - case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState1); - false -> IndexState1 - end, - - State1 = stats({-1, 0}, {MsgStatus, none}, State), - - {undefined, maybe_update_rates( - State1 #vqstate {out_counter = OutCount + 1, - index_state = IndexState2})}. - -%% This function exists as a way to improve dropwhile/2 -%% performance. The idea of having this function is to optimise calls -%% to rabbit_queue_index by batching delivers and acks, instead of -%% sending them one by one. -%% -%% Instead of removing every message as their are popped from the -%% queue, it first accumulates them and then removes them by calling -%% remove_queue_entries/3, since the behaviour of -%% remove_queue_entries/3 when used with -%% process_delivers_and_acks_fun(deliver_and_ack) is the same as -%% calling remove(false, MsgStatus, State). -%% -%% remove/3 also updates the out_counter in every call, but here we do -%% it just once at the end. -remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) -> - {MsgProps, QAcc, State1} = - collect_by_predicate(Pred, ?QUEUE:new(), State), - State2 = - remove_queue_entries( - QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1), - %% maybe_update_rates/1 is called in remove/2 for every - %% message. Since we update out_counter only once, we call it just - %% there. - {MsgProps, maybe_update_rates( - State2 #vqstate { - out_counter = OutCount + ?QUEUE:len(QAcc)})}. - -%% This function exists as a way to improve fetchwhile/4 -%% performance. The idea of having this function is to optimise calls -%% to rabbit_queue_index by batching delivers, instead of sending them -%% one by one. -%% -%% Fun is the function passed to fetchwhile/4 that's -%% applied to every fetched message and used to build the fetchwhile/4 -%% result accumulator FetchAcc. -fetch_by_predicate(Pred, Fun, FetchAcc, - State = #vqstate { - index_state = IndexState, - out_counter = OutCount}) -> - {MsgProps, QAcc, State1} = - collect_by_predicate(Pred, ?QUEUE:new(), State), - - {Delivers, FetchAcc1, State2} = - process_queue_entries(QAcc, Fun, FetchAcc, State1), - - IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState), - - {MsgProps, FetchAcc1, maybe_update_rates( - State2 #vqstate { - index_state = IndexState1, - out_counter = OutCount + ?QUEUE:len(QAcc)})}. - -%% We try to do here the same as what remove(true, State) does but -%% processing several messages at the same time. The idea is to -%% optimize rabbit_queue_index:deliver/2 calls by sending a list of -%% SeqIds instead of one by one, thus process_queue_entries1 will -%% accumulate the required deliveries, will record_pending_ack for -%% each message, and will update stats, like remove/2 does. -%% -%% For the meaning of Fun and FetchAcc arguments see -%% fetch_by_predicate/4 above. -process_queue_entries(Q, Fun, FetchAcc, State) -> - ?QUEUE:foldl(fun (MsgStatus, Acc) -> - process_queue_entries1(MsgStatus, Fun, Acc) - end, - {[], FetchAcc, State}, Q). - -process_queue_entries1( - #msg_status { seq_id = SeqId, is_delivered = IsDelivered, - index_on_disk = IndexOnDisk} = MsgStatus, - Fun, - {Delivers, FetchAcc, State}) -> - {Msg, State1} = read_msg(MsgStatus, State), - State2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State1), - {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - Fun(Msg, SeqId, FetchAcc), - stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}. - -collect_by_predicate(Pred, QAcc, State) -> - case queue_out(State) of - {empty, State1} -> - {undefined, QAcc, State1}; - {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc), - State1); - false -> {MsgProps, QAcc, in_r(MsgStatus, State1)} - end - end. - -%%---------------------------------------------------------------------------- -%% Helpers for Public API purge/1 function -%%---------------------------------------------------------------------------- + rabbit_variable_queue:depth(State). -%% The difference between purge_when_pending_acks/1 -%% vs. purge_and_index_reset/1 is that the first one issues a deliver -%% and an ack to the queue index for every message that's being -%% removed, while the later just resets the queue index state. -purge_when_pending_acks(State) -> - State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State), - a(State1). +set_ram_duration_target(DurationTarget, State) -> + rabbit_variable_queue:set_ram_duration_target(DurationTarget, State). -purge_and_index_reset(State) -> - State1 = purge1(process_delivers_and_acks_fun(none), State), - a(reset_qi_state(State1)). - -%% This function removes messages from each of {q1, q2, q3, q4}. -%% -%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3 -%% are specially handled by purge_betas_and_deltas/2. -%% -%% purge_betas_and_deltas/2 loads messages from the queue index, -%% filling up q3 and in some cases moving messages form q2 to q3 while -%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The -%% messages loaded into q3 are removed by calling -%% remove_queue_entries/3 until there are no more messages to be read -%% from the queue index. Messages are read in batches from the queue -%% index. -purge1(AfterFun, State = #vqstate { q4 = Q4}) -> - State1 = remove_queue_entries(Q4, AfterFun, State), - - State2 = #vqstate {q1 = Q1} = - purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}), - - State3 = remove_queue_entries(Q1, AfterFun, State2), - - a(State3#vqstate{q1 = ?QUEUE:new()}). - -reset_qi_state(State = #vqstate{index_state = IndexState}) -> - State#vqstate{index_state = - rabbit_queue_index:reset_state(IndexState)}. - -is_pending_ack_empty(State) -> - count_pending_acks(State) =:= 0. - -count_pending_acks(#vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). - -purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) -> - State0 = #vqstate { q3 = Q3 } = - case Mode of - lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State); - _ -> State - end, - - case ?QUEUE:is_empty(Q3) of - true -> State0; - false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0), - purge_betas_and_deltas(DelsAndAcksFun, - maybe_deltas_to_betas( - DelsAndAcksFun, - State1#vqstate{q3 = ?QUEUE:new()})) - end. - -remove_queue_entries(Q, DelsAndAcksFun, - State = #vqstate{msg_store_clients = MSCState}) -> - {MsgIdsByStore, Delivers, Acks, State1} = - ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), [], [], State}, Q), - remove_msgs_by_id(MsgIdsByStore, MSCState), - DelsAndAcksFun(Delivers, Acks, State1). - -remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, - msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, - is_persistent = IsPersistent} = MsgStatus, - {MsgIdsByStore, Delivers, Acks, State}) -> - {case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); - false -> MsgIdsByStore - end, - cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - cons_if(IndexOnDisk, SeqId, Acks), - stats({-1, 0}, {MsgStatus, none}, State)}. - -process_delivers_and_acks_fun(deliver_and_ack) -> - fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> - IndexState1 = - rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState)), - State #vqstate { index_state = IndexState1 } - end; -process_delivers_and_acks_fun(_) -> - fun (_, _, State) -> - State - end. - -%%---------------------------------------------------------------------------- -%% Internal gubbins for publishing -%%---------------------------------------------------------------------------- - -publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, _Flow, PersistFun, - State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - mode = default, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), - State2 = case ?QUEUE:is_empty(Q3) of - false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } - end, - InCount1 = InCount + 1, - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats({1, 0}, {none, MsgStatus1}, - State2#vqstate{ next_seq_id = SeqId + 1, - in_counter = InCount1, - unconfirmed = UC1 }); -publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, _Flow, PersistFun, - State = #vqstate { mode = lazy, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC, - delta = Delta }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - Delta1 = expand_delta(SeqId, Delta), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats(lazy_pub, {lazy, m(MsgStatus1)}, - State1#vqstate{ delta = Delta1, - next_seq_id = SeqId + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }). - -batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> - {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, - fun maybe_prepare_write_to_disk/4, State)}. - -publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, - MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, - _ChPid, _Flow, PersistFun, - State = #vqstate { mode = default, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), - State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), - {SeqId, State3}; -publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, - MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, - _ChPid, _Flow, PersistFun, - State = #vqstate { mode = lazy, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), - {SeqId, State3}. - -batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> - {SeqId, State1} = - publish_delivered1(Msg, MsgProps, ChPid, Flow, - fun maybe_prepare_write_to_disk/4, - State), - {ChPid, Flow, [SeqId | SeqIds], State1}. - -maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_in_store = true }, State) -> - {MsgStatus, State}; -maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, - is_persistent = IsPersistent }, - State = #vqstate{ msg_store_clients = MSCState, - disk_write_count = Count}) - when Force orelse IsPersistent -> - case persist_to(MsgStatus) of - msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, - prepare_to_store(Msg)), - {MsgStatus#msg_status{msg_in_store = true}, - State#vqstate{disk_write_count = Count + 1}}; - queue_index -> {MsgStatus, State} - end; -maybe_write_msg_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -%% Due to certain optimizations made inside -%% rabbit_queue_index:pre_publish/7 we need to have two separate -%% functions for index persistence. This one is only used when paging -%% during memory pressure. We didn't want to modify -%% maybe_write_index_to_disk/3 because that function is used in other -%% places. -maybe_batch_write_index_to_disk(_Force, - MsgStatus = #msg_status { - index_on_disk = true }, State) -> - {MsgStatus, State}; -maybe_batch_write_index_to_disk(Force, - MsgStatus = #msg_status { - msg = Msg, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_props = MsgProps}, - State = #vqstate { - target_ram_count = TargetRamCount, - disk_write_count = DiskWriteCount, - index_state = IndexState}) - when Force orelse IsPersistent -> - {MsgOrId, DiskWriteCount1} = - case persist_to(MsgStatus) of - msg_store -> {MsgId, DiskWriteCount}; - queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} - end, - IndexState1 = rabbit_queue_index:pre_publish( - MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, - TargetRamCount, IndexState), - {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState1, - disk_write_count = DiskWriteCount1}}; -maybe_batch_write_index_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { - index_on_disk = true }, State) -> - {MsgStatus, State}; -maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_props = MsgProps}, - State = #vqstate{target_ram_count = TargetRamCount, - disk_write_count = DiskWriteCount, - index_state = IndexState}) - when Force orelse IsPersistent -> - {MsgOrId, DiskWriteCount1} = - case persist_to(MsgStatus) of - msg_store -> {MsgId, DiskWriteCount}; - queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} - end, - IndexState1 = rabbit_queue_index:publish( - MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount, - IndexState), - IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1), - {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState2, - disk_write_count = DiskWriteCount1}}; - -maybe_write_index_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), - maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). - -maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), - maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1). - -determine_persist_to(#basic_message{ - content = #content{properties = Props, - properties_bin = PropsBin}}, - #message_properties{size = BodySize}, - IndexMaxSize) -> - %% The >= is so that you can set the env to 0 and never persist - %% to the index. - %% - %% We want this to be fast, so we avoid size(term_to_binary()) - %% here, or using the term size estimation from truncate.erl, both - %% of which are too slow. So instead, if the message body size - %% goes over the limit then we avoid any other checks. - %% - %% If it doesn't we need to decide if the properties will push - %% it past the limit. If we have the encoded properties (usual - %% case) we can just check their size. If we don't (message came - %% via the direct client), we make a guess based on the number of - %% headers. - case BodySize >= IndexMaxSize of - true -> msg_store; - false -> Est = case is_binary(PropsBin) of - true -> BodySize + size(PropsBin); - false -> #'P_basic'{headers = Hs} = Props, - case Hs of - undefined -> 0; - _ -> length(Hs) - end * ?HEADER_GUESS_SIZE + BodySize - end, - case Est >= IndexMaxSize of - true -> msg_store; - false -> queue_index - end - end. - -persist_to(#msg_status{persist_to = To}) -> To. - -prepare_to_store(Msg) -> - Msg#basic_message{ - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}. +ram_duration(State) -> + rabbit_variable_queue:ram_duration(State). -%%---------------------------------------------------------------------------- -%% Internal gubbins for acks -%%---------------------------------------------------------------------------- +needs_timeout(State) -> + rabbit_variable_queue:needs_timeout(State). -record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus, - State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA, - ack_in_counter = AckInCount}) -> - Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, - {RPA1, DPA1, QPA1} = - case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of - {false, _} -> {RPA, Insert(DPA), QPA}; - {_, queue_index} -> {RPA, DPA, Insert(QPA)}; - {_, msg_store} -> {Insert(RPA), DPA, QPA} - end, - State #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1, - qi_pending_ack = QPA1, - ack_in_counter = AckInCount + 1}. - -lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA}) -> - case gb_trees:lookup(SeqId, RPA) of - {value, V} -> V; - none -> case gb_trees:lookup(SeqId, DPA) of - {value, V} -> V; - none -> gb_trees:get(SeqId, QPA) - end - end. - -%% First parameter = UpdateStats -remove_pending_ack(true, SeqId, State) -> - {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), - {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; -remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA}) -> - case gb_trees:lookup(SeqId, RPA) of - {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), - {V, State #vqstate { ram_pending_ack = RPA1 }}; - none -> case gb_trees:lookup(SeqId, DPA) of - {value, V} -> - DPA1 = gb_trees:delete(SeqId, DPA), - {V, State#vqstate{disk_pending_ack = DPA1}}; - none -> - QPA1 = gb_trees:delete(SeqId, QPA), - {gb_trees:get(SeqId, QPA), - State#vqstate{qi_pending_ack = QPA1}} - end - end. - -purge_pending_ack(KeepPersistent, - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> - {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State), - case KeepPersistent of - true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState), - State1; - false -> IndexState1 = - rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - State1 #vqstate { index_state = IndexState1 } - end. - -purge_pending_ack_delete_and_terminate( - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> - {_, MsgIdsByStore, State1} = purge_pending_ack1(State), - IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - State1 #vqstate { index_state = IndexState1 }. - -purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, - {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = - rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold( - F, accumulate_ack_init(), RPA), DPA), QPA), - State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty(), - qi_pending_ack = gb_trees:empty()}, - {IndexOnDiskSeqIds, MsgIdsByStore, State1}. - -%% MsgIdsByStore is an orddict with two keys: -%% -%% true: holds a list of Persistent Message Ids. -%% false: holds a list of Transient Message Ids. -%% -%% When we call orddict:to_list/1 we get two sets of msg ids, where -%% IsPersistent is either true for persistent messages or false for -%% transient ones. The msg_store_remove/3 function takes this boolean -%% flag to determine from which store the messages should be removed -%% from. -remove_msgs_by_id(MsgIdsByStore, MSCState) -> - [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)]. - -remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> - case orddict:find(false, MsgIdsByStore) of - error -> ok; - {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) - end. - -accumulate_ack_init() -> {[], orddict:new(), []}. - -accumulate_ack(#msg_status { seq_id = SeqId, - msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), - case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); - false -> MsgIdsByStore - end, - [MsgId | AllMsgIds]}. +timeout(State) -> + rabbit_variable_queue:timeout(State). -%%---------------------------------------------------------------------------- -%% Internal plumbing for confirms (aka publisher acks) -%%---------------------------------------------------------------------------- +handle_pre_hibernate(State) -> + rabbit_variable_queue:handle_pre_hibernate(State). -record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC, - confirmed = C }) -> - State #vqstate { - msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet), - msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet), - unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), - confirmed = gb_sets:union(C, MsgIdSet) }. - -msgs_written_to_disk(Callback, MsgIdSet, ignored) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); -msgs_written_to_disk(Callback, MsgIdSet, written) -> - Callback(?MODULE, - fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Confirmed = gb_sets:intersection(UC, MsgIdSet), - record_confirms(gb_sets:intersection(MsgIdSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union(MOD, Confirmed) }) - end). - -msg_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Confirmed = gb_sets:intersection(UC, MsgIdSet), - record_confirms(gb_sets:intersection(MsgIdSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union(MIOD, Confirmed) }) - end). - -msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). +resume(State) -> rabbit_variable_queue:resume(State). -%%---------------------------------------------------------------------------- -%% Internal plumbing for requeue -%%---------------------------------------------------------------------------- +msg_rates(State) -> + rabbit_variable_queue:msg_rates(State). -publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - {Msg, State1} = read_msg(MsgStatus, State), - MsgStatus1 = MsgStatus#msg_status { msg = Msg }, - {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)}; -publish_alpha(MsgStatus, State) -> - {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. - -publish_beta(MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. - -%% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> - queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, - Limit, PubFun, State). - -queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, - Limit, PubFun, State) - when Limit == undefined orelse SeqId < Limit -> - case ?QUEUE:out(Q) of - {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} - when SeqIdQ < SeqId -> - %% enqueue from the remaining queue - queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, - Limit, PubFun, State); - {_, _Q1} -> - %% enqueue from the remaining list of sequence ids - {MsgStatus, State1} = msg_from_pending_ack(SeqId, State), - {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - PubFun(MsgStatus, State1), - queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) - end; -queue_merge(SeqIds, Q, Front, MsgIds, - _Limit, _PubFun, State) -> - {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. - -delta_merge([], Delta, MsgIds, State) -> - {Delta, MsgIds, State}; -delta_merge(SeqIds, Delta, MsgIds, State) -> - lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> - {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - msg_from_pending_ack(SeqId, State0), - {_MsgStatus, State2} = - maybe_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - stats({1, -1}, {MsgStatus, none}, State2)} - end, {Delta, MsgIds, State}, SeqIds). - -%% Mostly opposite of record_pending_ack/2 -msg_from_pending_ack(SeqId, State) -> - {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(false, SeqId, State), - {MsgStatus #msg_status { - msg_props = MsgProps #message_properties { needs_confirming = false } }, - State1}. - -beta_limit(Q) -> - case ?QUEUE:peek(Q) of - {value, #msg_status { seq_id = SeqId }} -> SeqId; - empty -> undefined - end. - -delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; -delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. +info(Info, State) -> + rabbit_variable_queue:info(Info, State). -%%---------------------------------------------------------------------------- -%% Iterator -%%---------------------------------------------------------------------------- +invoke(Module, Fun, State) -> rabbit_variable_queue:invoke(Module, Fun, State). -ram_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. - -disk_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. - -qi_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}. - -msg_iterator(State) -> istate(start, State). - -istate(start, State) -> {q4, State#vqstate.q4, State}; -istate(q4, State) -> {q3, State#vqstate.q3, State}; -istate(q3, State) -> {delta, State#vqstate.delta, State}; -istate(delta, State) -> {q2, State#vqstate.q2, State}; -istate(q2, State) -> {q1, State#vqstate.q1, State}; -istate(q1, _State) -> done. - -next({ack, It}, IndexState) -> - case gb_trees:next(It) of - none -> {empty, IndexState}; - {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, - {value, MsgStatus, true, Next, IndexState} - end; -next(done, IndexState) -> {empty, IndexState}; -next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqId}, State}, IndexState) -> - next(istate(delta, State), IndexState); -next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> - SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), - SeqId1 = lists:min([SeqIdB, SeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), - next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); -next({delta, Delta, [], State}, IndexState) -> - next({delta, Delta, State}, IndexState); -next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> - case is_msg_in_pending_acks(SeqId, State) of - false -> Next = {delta, Delta, Rest, State}, - {value, beta_msg_status(M), false, Next, IndexState}; - true -> next({delta, Delta, Rest, State}, IndexState) - end; -next({Key, Q, State}, IndexState) -> - case ?QUEUE:out(Q) of - {empty, _Q} -> next(istate(Key, State), IndexState); - {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, - {value, MsgStatus, false, Next, IndexState} - end. - -inext(It, {Its, IndexState}) -> - case next(It, IndexState) of - {empty, IndexState1} -> - {Its, IndexState1}; - {value, MsgStatus1, Unacked, It1, IndexState1} -> - {[{MsgStatus1, Unacked, It1} | Its], IndexState1} - end. - -ifold(_Fun, Acc, [], State) -> - {Acc, State}; -ifold(Fun, Acc, Its, State) -> - [{MsgStatus, Unacked, It} | Rest] = - lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, - {#msg_status{seq_id = SeqId2}, _, _}) -> - SeqId1 =< SeqId2 - end, Its), - {Msg, State1} = read_msg(MsgStatus, State), - case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of - {stop, Acc1} -> - {Acc1, State}; - {cont, Acc1} -> - {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}), - ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1}) - end. +is_duplicate(Msg, State) -> rabbit_variable_queue:is_duplicate(Msg, State). -%%---------------------------------------------------------------------------- -%% Phase changes -%%---------------------------------------------------------------------------- +set_queue_mode(Mode, State) -> + rabbit_variable_queue:set_queue_mode(Mode, State). -reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> - State; -reduce_memory_use(State = #vqstate { - mode = default, - ram_pending_ack = RPA, - ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount, - io_batch_size = IoBatchSize, - rates = #rates { in = AvgIngress, - out = AvgEgress, - ack_in = AvgAckIngress, - ack_out = AvgAckEgress } }) -> - - State1 = #vqstate { q2 = Q2, q3 = Q3 } = - case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> State; - %% Reduce memory of pending acks and alphas. The order is - %% determined based on which is growing faster. Whichever - %% comes second may very well get a quota of 0 if the - %% first manages to push out the max number of messages. - S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > - (AvgIngress - AvgEgress)) of - true -> [fun limit_ram_acks/2, - fun push_alphas_to_betas/2]; - false -> [fun push_alphas_to_betas/2, - fun limit_ram_acks/2] - end, - {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> - ReduceFun(QuotaN, StateN) - end, {S1, State}, Funs), - State2 - end, - - State3 = - case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - permitted_beta_count(State1)) of - S2 when S2 >= IoBatchSize -> - %% There is an implicit, but subtle, upper bound here. We - %% may shuffle a lot of messages from Q2/3 into delta, but - %% the number of these that require any disk operation, - %% namely index writing, i.e. messages that are genuine - %% betas and not gammas, is bounded by the credit_flow - %% limiting of the alpha->beta conversion above. - push_betas_to_deltas(S2, State1); - _ -> - State1 - end, - %% See rabbitmq-server-290 for the reasons behind this GC call. - garbage_collect(), - State3; -%% When using lazy queues, there are no alphas, so we don't need to -%% call push_alphas_to_betas/2. -reduce_memory_use(State = #vqstate { - mode = lazy, - ram_pending_ack = RPA, - ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) -> - State1 = #vqstate { q3 = Q3 } = - case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> State; - S1 -> {_, State2} = limit_ram_acks(S1, State), - State2 - end, - - State3 = - case chunk_size(?QUEUE:len(Q3), - permitted_beta_count(State1)) of - 0 -> - State1; - S2 -> - push_betas_to_deltas(S2, State1) - end, - garbage_collect(), - State3. - -limit_ram_acks(0, State) -> - {0, ui(State)}; -limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> - case gb_trees:is_empty(RPA) of - true -> - {Quota, ui(State)}; - false -> - {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), - {MsgStatus1, State1} = - maybe_prepare_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), - limit_ram_acks(Quota - 1, - stats({0, 0}, {MsgStatus, MsgStatus2}, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 })) - end. - -permitted_beta_count(#vqstate { len = 0 }) -> - infinity; -permitted_beta_count(#vqstate { mode = lazy, - target_ram_count = TargetRamCount}) -> - TargetRamCount; -permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> - lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); -permitted_beta_count(#vqstate { q1 = Q1, - q4 = Q4, - target_ram_count = TargetRamCount, - len = Len }) -> - BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), - lists:max([rabbit_queue_index:next_segment_boundary(0), - BetaDelta - ((BetaDelta * BetaDelta) div - (BetaDelta + TargetRamCount))]). - -chunk_size(Current, Permitted) - when Permitted =:= infinity orelse Permitted >= Current -> - 0; -chunk_size(Current, Permitted) -> - Current - Permitted. - -fetch_from_q3(State = #vqstate { mode = default, - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4 }) -> - case ?QUEUE:out(Q3) of - {empty, _Q3} -> - {empty, State}; - {{value, MsgStatus}, Q3a} -> - State1 = State #vqstate { q3 = Q3a }, - State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of - {true, true} -> - %% q3 is now empty, it wasn't before; - %% delta is still empty. So q2 must be - %% empty, and we know q4 is empty - %% otherwise we wouldn't be loading from - %% q3. As such, we can just set q4 to Q1. - true = ?QUEUE:is_empty(Q2), %% ASSERTION - true = ?QUEUE:is_empty(Q4), %% ASSERTION - State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; - {true, false} -> - maybe_deltas_to_betas(State1); - {false, _} -> - %% q3 still isn't empty, we've not - %% touched delta, so the invariants - %% between q1, q2, delta and q3 are - %% maintained - State1 - end, - {loaded, {MsgStatus, State2}} - end; -%% lazy queues -fetch_from_q3(State = #vqstate { mode = lazy, - delta = #delta { count = DeltaCount }, - q3 = Q3 }) -> - case ?QUEUE:out(Q3) of - {empty, _Q3} when DeltaCount =:= 0 -> - {empty, State}; - {empty, _Q3} -> - fetch_from_q3(maybe_deltas_to_betas(State)); - {{value, MsgStatus}, Q3a} -> - State1 = State #vqstate { q3 = Q3a }, - {loaded, {MsgStatus, State1}} - end. - -maybe_deltas_to_betas(State) -> - AfterFun = process_delivers_and_acks_fun(deliver_and_ack), - maybe_deltas_to_betas(AfterFun, State). - -maybe_deltas_to_betas(_DelsAndAcksFun, - State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) -> - State; -maybe_deltas_to_betas(DelsAndAcksFun, - State = #vqstate { - q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes, - disk_read_count = DiskReadCount, - transient_threshold = TransientThreshold }) -> - #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, - end_seq_id = DeltaSeqIdEnd } = Delta, - DeltaSeqId1 = - lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - {Q3a, RamCountsInc, RamBytesInc, State1} = - betas_from_index_entries(List, TransientThreshold, - DelsAndAcksFun, - State #vqstate { index_state = IndexState1 }), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, - ram_bytes = RamBytes + RamBytesInc, - disk_read_count = DiskReadCount + RamCountsInc }, - case ?QUEUE:len(Q3a) of - 0 -> - %% we ignored every message in the segment due to it being - %% transient and below the threshold - maybe_deltas_to_betas( - DelsAndAcksFun, - State2 #vqstate { - delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); - Q3aLen -> - Q3b = ?QUEUE:join(Q3, Q3a), - case DeltaCount - Q3aLen of - 0 -> - %% delta is now empty, but it wasn't before, so - %% can now join q2 onto q3 - State2 #vqstate { q2 = ?QUEUE:new(), - delta = ?BLANK_DELTA, - q3 = ?QUEUE:join(Q3b, Q2) }; - N when N > 0 -> - Delta1 = d(#delta { start_seq_id = DeltaSeqId1, - count = N, - end_seq_id = DeltaSeqIdEnd }), - State2 #vqstate { delta = Delta1, - q3 = Q3b } - end - end. - -push_alphas_to_betas(Quota, State) -> - {Quota1, State1} = - push_alphas_to_betas( - fun ?QUEUE:out/1, - fun (MsgStatus, Q1a, - State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> - State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; - (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> - State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } - end, Quota, State #vqstate.q1, State), - {Quota2, State2} = - push_alphas_to_betas( - fun ?QUEUE:out_r/1, - fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> - State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } - end, Quota1, State1 #vqstate.q4, State1), - {Quota2, State2}. - -push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, - State = #vqstate { ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) - when Quota =:= 0 orelse - TargetRamCount =:= infinity orelse - TargetRamCount >= RamMsgCount -> - {Quota, ui(State)}; -push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> - %% We consume credits from the message_store whenever we need to - %% persist a message to disk. See: - %% rabbit_variable_queue:msg_store_write/4. So perhaps the - %% msg_store is trying to throttle down our queue. - case credit_flow:blocked() of - true -> {Quota, ui(State)}; - false -> case Generator(Q) of - {empty, _Q} -> - {Quota, ui(State)}; - {{value, MsgStatus}, Qa} -> - {MsgStatus1, State1} = - maybe_prepare_write_to_disk(true, false, MsgStatus, - State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = stats( - ready0, {MsgStatus, MsgStatus2}, State1), - State3 = Consumer(MsgStatus2, Qa, State2), - push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State3) - end - end. - -push_betas_to_deltas(Quota, State = #vqstate { mode = default, - q2 = Q2, - delta = Delta, - q3 = Q3}) -> - PushState = {Quota, Delta, State}, - {Q3a, PushState1} = push_betas_to_deltas( - fun ?QUEUE:out_r/1, - fun rabbit_queue_index:next_segment_boundary/1, - Q3, PushState), - {Q2a, PushState2} = push_betas_to_deltas( - fun ?QUEUE:out/1, - fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, PushState1), - {_, Delta1, State1} = PushState2, - State1 #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a }; -%% In the case of lazy queues we want to page as many messages as -%% possible from q3. -push_betas_to_deltas(Quota, State = #vqstate { mode = lazy, - delta = Delta, - q3 = Q3}) -> - PushState = {Quota, Delta, State}, - {Q3a, PushState1} = push_betas_to_deltas( - fun ?QUEUE:out_r/1, - fun (Q2MinSeqId) -> Q2MinSeqId end, - Q3, PushState), - {_, Delta1, State1} = PushState1, - State1 #vqstate { delta = Delta1, - q3 = Q3a }. - - -push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> - case ?QUEUE:is_empty(Q) of - true -> - {Q, PushState}; - false -> - {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q), - {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q), - Limit = LimitFun(MinSeqId), - case MaxSeqId < Limit of - true -> {Q, PushState}; - false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) - end - end. - -push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) -> - {Q, {0, Delta, ui(State)}}; -push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) -> - case Generator(Q) of - {empty, _Q} -> - {Q, {Quota, Delta, ui(State)}}; - {{value, #msg_status { seq_id = SeqId }}, _Qa} - when SeqId < Limit -> - {Q, {Quota, Delta, ui(State)}}; - {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> - {#msg_status { index_on_disk = true }, State1} = - maybe_batch_write_index_to_disk(true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, State1), - Delta1 = expand_delta(SeqId, Delta), - push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, State2}) - end. - -%% Flushes queue index batch caches and updates queue index state. -ui(#vqstate{index_state = IndexState, - target_ram_count = TargetRamCount} = State) -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - State#vqstate{index_state = IndexState1}. +zip_msgs_and_acks(Msgs, AckTags, Accumulator, State) -> + rabbit_variable_queue:zip_msgs_and_acks(Msgs, AckTags, Accumulator, State). %% Delay maybe_delay(QPA) -> @@ -2430,27 +327,3 @@ is_timeout_test([#msg_status{ _ -> is_timeout_test(Rem) end; is_timeout_test([_|Rem]) -> is_timeout_test(Rem). - -%%---------------------------------------------------------------------------- -%% Upgrading -%%---------------------------------------------------------------------------- - -multiple_routing_keys() -> - transform_storage( - fun ({basic_message, ExchangeName, Routing_Key, Content, - MsgId, Persistent}) -> - {ok, {basic_message, ExchangeName, [Routing_Key], Content, - MsgId, Persistent}}; - (_) -> {error, corrupt_message} - end), - ok. - - -%% Assumes message store is not running -transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE, TransformFun), - transform_store(?TRANSIENT_MSG_STORE, TransformFun). - -transform_store(Store, TransformFun) -> - rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), - rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl index efc5ca830e..68deec12cb 100644 --- a/test/per_vhost_connection_limit_SUITE.erl +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -39,8 +39,7 @@ groups() -> single_node_single_vhost_limit, single_node_single_vhost_zero_limit, single_node_multiple_vhosts_limit, - single_node_multiple_vhosts_zero_limit, - single_node_vhost_deletion_forces_connection_closure + single_node_multiple_vhosts_zero_limit ], ClusterSize2Tests = [ most_basic_cluster_connection_count, @@ -51,8 +50,7 @@ groups() -> cluster_single_vhost_limit, cluster_single_vhost_limit2, cluster_single_vhost_zero_limit, - cluster_multiple_vhosts_zero_limit, - cluster_vhost_deletion_forces_connection_closure + cluster_multiple_vhosts_zero_limit ], [ {cluster_size_1_network, [], ClusterSize1Tests}, @@ -639,57 +637,6 @@ cluster_multiple_vhosts_zero_limit(Config) -> set_vhost_connection_limit(Config, VHost1, -1), set_vhost_connection_limit(Config, VHost2, -1). - -single_node_vhost_deletion_forces_connection_closure(Config) -> - VHost1 = <<"vhost1">>, - VHost2 = <<"vhost2">>, - - set_up_vhost(Config, VHost1), - set_up_vhost(Config, VHost2), - - ?assertEqual(0, count_connections_in(Config, VHost1)), - ?assertEqual(0, count_connections_in(Config, VHost2)), - - [Conn1] = open_connections(Config, [{0, VHost1}]), - ?assertEqual(1, count_connections_in(Config, VHost1)), - - [_Conn2] = open_connections(Config, [{0, VHost2}]), - ?assertEqual(1, count_connections_in(Config, VHost2)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, VHost2)), - - close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). - -cluster_vhost_deletion_forces_connection_closure(Config) -> - VHost1 = <<"vhost1">>, - VHost2 = <<"vhost2">>, - - set_up_vhost(Config, VHost1), - set_up_vhost(Config, VHost2), - - ?assertEqual(0, count_connections_in(Config, VHost1)), - ?assertEqual(0, count_connections_in(Config, VHost2)), - - [Conn1] = open_connections(Config, [{0, VHost1}]), - ?assertEqual(1, count_connections_in(Config, VHost1)), - - [_Conn2] = open_connections(Config, [{1, VHost2}]), - ?assertEqual(1, count_connections_in(Config, VHost2)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), - timer:sleep(200), - ?assertEqual(0, count_connections_in(Config, VHost2)), - - close_connections([Conn1]), - ?assertEqual(0, count_connections_in(Config, VHost1)), - - rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). - vhost_limit_after_node_renamed(Config) -> A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 0ef86717ac..47d47290a4 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -24,7 +24,8 @@ all() -> [ - {group, non_parallel_tests} + {group, non_parallel_tests}, + {group, cluster_tests} ]. groups() -> @@ -37,7 +38,8 @@ groups() -> gen_server2_metrics, consumer_metrics ] - } + }, + {cluster_tests, [], [cluster_queue_metrics]} ]. %% ------------------------------------------------------------------- @@ -45,33 +47,27 @@ groups() -> %% ------------------------------------------------------------------- merge_app_env(Config) -> - rabbit_ct_helpers:merge_app_env(Config, - {rabbit, [ - {core_metrics_gc_interval, 6000000}, - {collect_statistics_interval, 100}, - {collect_statistics, fine} - ]}). - -init_per_suite(Config) -> + AppEnv = {rabbit, [{core_metrics_gc_interval, 6000000}, + {collect_statistics_interval, 100}, + {collect_statistics, fine}]}, + rabbit_ct_helpers:merge_app_env(Config, AppEnv). + +init_per_group(cluster_tests, Config) -> + rabbit_ct_helpers:log_environment(), + Conf = [{rmq_nodename_suffix, cluster_tests}, {rmq_nodes_count, 2}], + Config1 = rabbit_ct_helpers:set_config(Config, Conf), + rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()); +init_per_group(non_parallel_tests, Config) -> rabbit_ct_helpers:log_environment(), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} - ]), - rabbit_ct_helpers:run_setup_steps( - Config1, - [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()). - -end_per_suite(Config) -> + Conf = [{rmq_nodename_suffix, non_parallel_tests}], + Config1 = rabbit_ct_helpers:set_config(Config, Conf), + rabbit_ct_helpers:run_setup_steps(Config1, setup_steps()). + +end_per_group(_, Config) -> rabbit_ct_helpers:run_teardown_steps( Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_group(_, Config) -> - Config. - -end_per_group(_, Config) -> - Config. - init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_helpers:run_steps(Config, @@ -83,8 +79,11 @@ end_per_testcase(Testcase, Config) -> Config, rabbit_ct_client_helpers:teardown_steps()). +setup_steps() -> + [ fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps(). + %% ------------------------------------------------------------------- -%% Testcases. +%% Single-node Testcases. %% ------------------------------------------------------------------- queue_metrics(Config) -> @@ -329,3 +328,74 @@ x(Name) -> #resource{ virtual_host = <<"/">>, kind = exchange, name = Name }. + +%% ------------------------------------------------------------------- +%% Cluster Testcases. +%% ------------------------------------------------------------------- + +cluster_queue_metrics(Config) -> + VHost = <<"/">>, + QueueName = <<"cluster_queue_metrics">>, + PolicyName = <<"ha-policy-1">>, + PolicyPattern = <<".*">>, + PolicyAppliesTo = <<"queues">>, + + Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Node0), + + Node0Name = rabbit_data_coercion:to_binary(Node0), + Definition0 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node0Name]}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, + PolicyName, PolicyPattern, + PolicyAppliesTo, Definition0), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"hello">>}), + + % Update policy to point to other node + Node1Name = rabbit_data_coercion:to_binary(Node1), + Definition1 = [{<<"ha-mode">>, <<"nodes">>}, {<<"ha-params">>, [Node1Name]}], + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, + PolicyName, PolicyPattern, + PolicyAppliesTo, Definition1), + + % Synchronize + Name = rabbit_misc:r(VHost, queue, QueueName), + [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0, + ets, lookup, + [rabbit_queue, Name]), + ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, + sync_mirrors, [QPid]), + + % Check ETS table for data + wait_for(fun () -> + [] =:= rabbit_ct_broker_helpers:rpc( + Config, Node0, ets, tab2list, + [queue_coarse_metrics]) + end, 60), + + wait_for(fun () -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, Node1, ets, tab2list, + [queue_coarse_metrics]), + case Ret of + [{Name, 1, 0, 1, _}] -> true; + _ -> false + end + end, 60), + + amqp_channel:call(Ch, #'queue.delete'{queue=QueueName}), + rabbit_ct_client_helpers:close_channel(Ch), + Config. + +wait_for(_Fun, 0) -> false; +wait_for(Fun, Seconds) -> + case Fun() of + true -> ok; + false -> + timer:sleep(1000), + wait_for(Fun, Seconds - 1) + end. diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 453f4b2e72..dd8cd48b5a 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -46,6 +46,8 @@ groups() -> password_hashing, change_password ]}, + set_disk_free_limit_command, + set_vm_memory_high_watermark_command, topic_matching ]} ]. @@ -972,6 +974,50 @@ configurable_server_properties1(_Config) -> application:set_env(rabbit, server_properties, ServerProperties), passed. +set_disk_free_limit_command(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, set_disk_free_limit_command1, [Config]). + +set_disk_free_limit_command1(_Config) -> + rabbit_disk_monitor:set_disk_free_limit("2000kiB"), + 2048000 = rabbit_disk_monitor:get_disk_free_limit(), + + %% Use an integer + rabbit_disk_monitor:set_disk_free_limit({mem_relative, 1}), + disk_free_limit_to_total_memory_ratio_is(1), + + %% Use a float + rabbit_disk_monitor:set_disk_free_limit({mem_relative, 1.5}), + disk_free_limit_to_total_memory_ratio_is(1.5), + + rabbit_disk_monitor:set_disk_free_limit("50MB"), + passed. + +disk_free_limit_to_total_memory_ratio_is(MemRatio) -> + ExpectedLimit = MemRatio * vm_memory_monitor:get_total_memory(), + % Total memory is unstable, so checking order + true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() < 1.2, + true = ExpectedLimit/rabbit_disk_monitor:get_disk_free_limit() > 0.98. + +set_vm_memory_high_watermark_command(Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, set_vm_memory_high_watermark_command1, [Config]). + +set_vm_memory_high_watermark_command1(_Config) -> + MemLimitRatio = 1.0, + MemTotal = vm_memory_monitor:get_total_memory(), + + vm_memory_monitor:set_vm_memory_high_watermark(MemLimitRatio), + MemLimit = vm_memory_monitor:get_memory_limit(), + case MemLimit of + MemTotal -> ok; + _ -> MemTotalToMemLimitRatio = MemLimit * 100.0 / MemTotal / 100, + ct:fail( + "Expected memory high watermark to be ~p (~s), but it was ~p (~.1f)", + [MemTotal, MemLimitRatio, MemLimit, MemTotalToMemLimitRatio] + ) + end. + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl new file mode 100644 index 0000000000..a519d01af5 --- /dev/null +++ b/test/vhost_SUITE.erl @@ -0,0 +1,376 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(vhost_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + {group, cluster_size_1_network}, + {group, cluster_size_2_network}, + {group, cluster_size_1_direct}, + {group, cluster_size_2_direct} + ]. + +groups() -> + ClusterSize1Tests = [ + single_node_vhost_deletion_forces_connection_closure, + vhost_failure_forces_connection_closure, + dead_vhost_connection_refused + ], + ClusterSize2Tests = [ + cluster_vhost_deletion_forces_connection_closure, + vhost_failure_forces_connection_closure, + dead_vhost_connection_refused, + vhost_failure_forces_connection_closure_on_failure_node, + dead_vhost_connection_refused_on_failure_node + ], + [ + {cluster_size_1_network, [], ClusterSize1Tests}, + {cluster_size_2_network, [], ClusterSize2Tests}, + {cluster_size_1_direct, [], ClusterSize1Tests}, + {cluster_size_2_direct, [], ClusterSize2Tests} + ]. + +suite() -> + [ + %% If a test hangs, no need to wait for 30 minutes. + {timetrap, {minutes, 8}} + ]. + +%% see partitions_SUITE +-define(DELAY, 9000). + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1 + ]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_1_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_1_network, Config1, 1); +init_per_group(cluster_size_2_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_2_network, Config1, 2); +init_per_group(cluster_size_1_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_1_direct, Config1, 1); +init_per_group(cluster_size_2_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_2_direct, Config1, 2). + +init_per_multinode_group(Group, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix} + ]), + + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + clear_all_connection_tracking_tables(Config), + Config. + +end_per_testcase(Testcase, Config) -> + clear_all_connection_tracking_tables(Config), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +clear_all_connection_tracking_tables(Config) -> + [rabbit_ct_broker_helpers:rpc(Config, + N, + rabbit_connection_tracking, + clear_tracked_connection_tables_for_this_node, + []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)]. + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- +single_node_vhost_deletion_forces_connection_closure(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + [Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn2] = open_connections(Config, [{0, VHost2}]), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +vhost_failure_forces_connection_closure(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + [Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn2] = open_connections(Config, [{0, VHost2}]), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + force_vhost_failure(Config, VHost2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +dead_vhost_connection_refused(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + force_vhost_failure(Config, VHost2), + timer:sleep(200), + + [_Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn2] = open_connections(Config, [{0, VHost2}]), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + expect_that_client_connection_is_rejected(Config, 0, VHost2), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + + +vhost_failure_forces_connection_closure_on_failure_node(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + [Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn20] = open_connections(Config, [{0, VHost2}]), + [_Conn21] = open_connections(Config, [{1, VHost2}]), + ?assertEqual(2, count_connections_in(Config, VHost2)), + + force_vhost_failure(Config, 0, VHost2), + timer:sleep(200), + %% Vhost2 connection on node 1 is still alive + ?assertEqual(1, count_connections_in(Config, VHost2)), + %% Vhost1 connection on node 0 is still alive + ?assertEqual(1, count_connections_in(Config, VHost1)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +dead_vhost_connection_refused_on_failure_node(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + force_vhost_failure(Config, 0, VHost2), + timer:sleep(200), + %% Can open connections to vhost1 on node 0 and 1 + [_Conn10] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + [_Conn11] = open_connections(Config, [{1, VHost1}]), + ?assertEqual(2, count_connections_in(Config, VHost1)), + + %% Connection on vhost2 on node 0 is refused + [_Conn20] = open_connections(Config, [{0, VHost2}]), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + expect_that_client_connection_is_rejected(Config, 0, VHost2), + + %% Can open connections to vhost2 on node 1 + [_Conn21] = open_connections(Config, [{1, VHost2}]), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +force_vhost_failure(Config, VHost) -> force_vhost_failure(Config, 0, VHost). + +force_vhost_failure(Config, Node, VHost) -> + force_vhost_failure(Config, Node, VHost, 10). + +force_vhost_failure(_Config, _Node, VHost, 0) -> + error({failed_to_force_vhost_failure, no_more_attempts_left, VHost}); +force_vhost_failure(Config, Node, VHost, Attempts) -> + MessageStorePid = get_message_store_pid(Config, VHost), + rabbit_ct_broker_helpers:rpc(Config, Node, + erlang, exit, + [MessageStorePid, force_vhost_failure]), + %% Give it a time to fail + timer:sleep(200), + case rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_vhost_sup_sup, is_vhost_alive, + [VHost]) of + true -> force_vhost_failure(Config, Node, VHost, Attempts - 1); + false -> ok + end. + +get_message_store_pid(Config, VHost) -> + {ok, VHostSup} = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_vhost_sup_sup, vhost_sup, [VHost]), + Children = rabbit_ct_broker_helpers:rpc(Config, 0, + supervisor, which_children, + [VHostSup]), + [MsgStorePid] = [Pid || {Name, Pid, _, _} <- Children, + Name == msg_store_persistent], + MsgStorePid. + +cluster_vhost_deletion_forces_connection_closure(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + [Conn1] = open_connections(Config, [{0, VHost1}]), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + [_Conn2] = open_connections(Config, [{1, VHost2}]), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +open_connections(Config, NodesAndVHosts) -> + % Randomly select connection type + OpenConnectionFun = case ?config(connection_type, Config) of + network -> open_unmanaged_connection; + direct -> open_unmanaged_connection_direct + end, + Conns = lists:map(fun + ({Node, VHost}) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node, + VHost); + (Node) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) + end, NodesAndVHosts), + timer:sleep(500), + Conns. + +close_connections(Conns) -> + lists:foreach(fun + (Conn) -> + rabbit_ct_client_helpers:close_connection(Conn) + end, Conns), + timer:sleep(500). + +count_connections_in(Config, VHost) -> + count_connections_in(Config, VHost, 0). +count_connections_in(Config, VHost, NodeIndex) -> + timer:sleep(200), + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + count_connections_in, [VHost]). + +set_up_vhost(Config, VHost) -> + rabbit_ct_broker_helpers:add_vhost(Config, VHost), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost), + set_vhost_connection_limit(Config, VHost, -1). + +set_vhost_connection_limit(Config, VHost, Count) -> + set_vhost_connection_limit(Config, 0, VHost, Count). + +set_vhost_connection_limit(Config, NodeIndex, VHost, Count) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + ok = rabbit_ct_broker_helpers:control_action( + set_vhost_limits, Node, + ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"], + [{"-p", binary_to_list(VHost)}]). + +expect_that_client_connection_is_rejected(Config) -> + expect_that_client_connection_is_rejected(Config, 0). + +expect_that_client_connection_is_rejected(Config, NodeIndex) -> + {error, _} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex). + +expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) -> + {error, _} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost). |
