diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2017-06-02 18:54:42 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2017-06-26 13:30:46 +0200 |
| commit | 9cf0cfea9eb087b9df1c0e6ab636601be6c9a7cb (patch) | |
| tree | 99cb308eb182b0e2ce31692bbed5c85d4d29e105 /src | |
| parent | 3ad84520cb23e4bf05662e43384870de529d21a6 (diff) | |
| download | rabbitmq-server-git-9cf0cfea9eb087b9df1c0e6ab636601be6c9a7cb.tar.gz | |
Move modules which don't belong to rabbitmq-common here
Those modules depend on rabbitmq-server but they are used neither by the
Erlang client nor by rabbitmq-common itself.
They are imported as-is. Some may need further changes after the import.
This resolves the dependency of rabbitmq-common on rabbitmq-server.
[#118490793]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 1010 | ||||
| -rw-r--r-- | src/rabbit_auth_backend_internal.erl | 421 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 302 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2034 | ||||
| -rw-r--r-- | src/rabbit_channel_interceptor.erl | 104 | ||||
| -rw-r--r-- | src/rabbit_exchange_decorator.erl | 113 | ||||
| -rw-r--r-- | src/rabbit_health_check.erl | 95 | ||||
| -rw-r--r-- | src/rabbit_nodes.erl | 230 | ||||
| -rw-r--r-- | src/rabbit_queue_collector.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_queue_decorator.erl | 66 |
10 files changed, 4464 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl new file mode 100644 index 0000000000..3eaf7613ee --- /dev/null +++ b/src/rabbit_amqqueue.erl @@ -0,0 +1,1010 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_amqqueue). + +-export([recover/0, stop/0, start/1, declare/5, declare/6, + delete_immediately/1, delete/3, purge/1, forget_all_durable/1, + delete_crashed/1, delete_crashed_internal/1]). +-export([pseudo_queue/2, immutable/1]). +-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, + assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, + stat/1, deliver/2, requeue/3, ack/3, reject/4]). +-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, + info_all/5, info_local/1, list_names/0]). +-export([list_down/1]). +-export([force_event_refresh/1, notify_policy_changed/1]). +-export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2]). +-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]). +-export([on_node_up/1, on_node_down/1]). +-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). +-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]). + +-export([pid_of/1, pid_of/2]). + +%% internal +-export([internal_declare/2, internal_delete/1, run_backing_queue/3, + set_ram_duration_target/2, set_maximum_since_use/2]). + +-include("rabbit.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(INTEGER_ARG_TYPES, [byte, short, signedint, long, + unsignedbyte, unsignedshort, unsignedint]). + +-define(MORE_CONSUMER_CREDIT_AFTER, 50). + +%%---------------------------------------------------------------------------- + +-export_type([name/0, qmsg/0, absent_reason/0]). + +-type name() :: rabbit_types:r('queue'). +-type qpids() :: [pid()]. +-type qlen() :: rabbit_types:ok(non_neg_integer()). +-type qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return()). +-type qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}. +-type msg_id() :: non_neg_integer(). +-type ok_or_errors() :: + 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}. +-type absent_reason() :: 'nodedown' | 'crashed'. +-type queue_or_absent() :: rabbit_types:amqqueue() | + {'absent', rabbit_types:amqqueue(),absent_reason()}. +-type not_found_or_absent() :: + 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}. +-spec recover() -> [rabbit_types:amqqueue()]. +-spec stop() -> 'ok'. +-spec start([rabbit_types:amqqueue()]) -> 'ok'. +-spec declare + (name(), boolean(), boolean(), rabbit_framing:amqp_table(), + rabbit_types:maybe(pid())) -> + {'new' | 'existing' | 'absent' | 'owner_died', + rabbit_types:amqqueue()} | + rabbit_types:channel_exit(). +-spec declare + (name(), boolean(), boolean(), rabbit_framing:amqp_table(), + rabbit_types:maybe(pid()), node()) -> + {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} | + {'absent', rabbit_types:amqqueue(), absent_reason()} | + rabbit_types:channel_exit(). +-spec internal_declare(rabbit_types:amqqueue(), boolean()) -> + queue_or_absent() | rabbit_misc:thunk(queue_or_absent()). +-spec update + (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> + 'not_found' | rabbit_types:amqqueue(). +-spec lookup + (name()) -> + rabbit_types:ok(rabbit_types:amqqueue()) | + rabbit_types:error('not_found'); + ([name()]) -> + [rabbit_types:amqqueue()]. +-spec not_found_or_absent(name()) -> not_found_or_absent(). +-spec with(name(), qfun(A)) -> + A | rabbit_types:error(not_found_or_absent()). +-spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B. +-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit(). +-spec assert_equivalence + (rabbit_types:amqqueue(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> + 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit(). +-spec check_exclusive_access(rabbit_types:amqqueue(), pid()) -> + 'ok' | rabbit_types:channel_exit(). +-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) -> + A | rabbit_types:channel_exit(). +-spec list() -> [rabbit_types:amqqueue()]. +-spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. +-spec list_names() -> [rabbit_amqqueue:name()]. +-spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()]. +-spec info_keys() -> rabbit_types:info_keys(). +-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). +-spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> + rabbit_types:infos(). +-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. +-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> + [rabbit_types:infos()]. +-type info_all_filter() :: 'all' | 'online' | 'offline' | 'local'. +-spec info_all + (rabbit_types:vhost(), rabbit_types:info_keys(), info_all_filter(), + reference(), pid()) -> + 'ok'. +-spec force_event_refresh(reference()) -> 'ok'. +-spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'. +-spec consumers(rabbit_types:amqqueue()) -> + [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), + rabbit_framing:amqp_table()}]. +-spec consumer_info_keys() -> rabbit_types:info_keys(). +-spec consumers_all(rabbit_types:vhost()) -> + [{name(), pid(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table()}]. +-spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'. +-spec stat(rabbit_types:amqqueue()) -> + {'ok', non_neg_integer(), non_neg_integer()}. +-spec delete_immediately(qpids()) -> 'ok'. +-spec delete + (rabbit_types:amqqueue(), 'false', 'false') -> + qlen(); + (rabbit_types:amqqueue(), 'true' , 'false') -> + qlen() | rabbit_types:error('in_use'); + (rabbit_types:amqqueue(), 'false', 'true' ) -> + qlen() | rabbit_types:error('not_empty'); + (rabbit_types:amqqueue(), 'true' , 'true' ) -> + qlen() | + rabbit_types:error('in_use') | + rabbit_types:error('not_empty'). +-spec delete_crashed(rabbit_types:amqqueue()) -> 'ok'. +-spec delete_crashed_internal(rabbit_types:amqqueue()) -> 'ok'. +-spec purge(rabbit_types:amqqueue()) -> qlen(). +-spec forget_all_durable(node()) -> 'ok'. +-spec deliver([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + qpids(). +-spec requeue(pid(), [msg_id()], pid()) -> 'ok'. +-spec ack(pid(), [msg_id()], pid()) -> 'ok'. +-spec reject(pid(), [msg_id()], boolean(), pid()) -> 'ok'. +-spec notify_down_all(qpids(), pid()) -> ok_or_errors(). +-spec notify_down_all(qpids(), pid(), non_neg_integer()) -> + ok_or_errors(). +-spec activate_limit_all(qpids(), pid()) -> ok_or_errors(). +-spec basic_get(rabbit_types:amqqueue(), pid(), boolean(), pid()) -> + {'ok', non_neg_integer(), qmsg()} | 'empty'. +-spec credit + (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), + boolean()) -> + 'ok'. +-spec basic_consume + (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), + non_neg_integer(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table(), any()) -> + rabbit_types:ok_or_error('exclusive_consume_unavailable'). +-spec basic_cancel + (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'. +-spec notify_decorators(rabbit_types:amqqueue()) -> 'ok'. +-spec resume(pid(), pid()) -> 'ok'. +-spec internal_delete(name()) -> + rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit() | + fun (() -> + rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit()). +-spec run_backing_queue + (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> + 'ok'. +-spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'. +-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. +-spec on_node_up(node()) -> 'ok'. +-spec on_node_down(node()) -> 'ok'. +-spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue(). +-spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue(). +-spec store_queue(rabbit_types:amqqueue()) -> 'ok'. +-spec update_decorators(name()) -> 'ok'. +-spec policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> + 'ok'. +-spec update_mirroring(pid()) -> 'ok'. +-spec sync_mirrors(rabbit_types:amqqueue() | pid()) -> + 'ok' | rabbit_types:error('not_mirrored'). +-spec cancel_sync_mirrors(rabbit_types:amqqueue() | pid()) -> + 'ok' | {'ok', 'not_syncing'}. +-spec is_mirrored(rabbit_types:amqqueue()) -> boolean(). + +-spec pid_of(rabbit_types:amqqueue()) -> + {'ok', pid()} | rabbit_types:error('not_found'). +-spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) -> + {'ok', pid()} | rabbit_types:error('not_found'). + +%%---------------------------------------------------------------------------- + +-define(CONSUMER_INFO_KEYS, + [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, + arguments]). + +recover() -> + %% Clear out remnants of old incarnation, in case we restarted + %% faster than other nodes handled DOWN messages from us. + on_node_down(node()), + DurableQueues = find_durable_queues(), + L = length(DurableQueues), + + %% if there are not enough file handles, the server might hang + %% when trying to recover queues, warn the user: + case file_handle_cache:get_limit() < L of + true -> + rabbit_log:warning( + "Recovering ~p queues, available file handles: ~p. Please increase max open file handles limit to at least ~p!~n", + [L, file_handle_cache:get_limit(), L]); + false -> + ok + end, + + {ok, BQ} = application:get_env(rabbit, backing_queue_module), + + %% We rely on BQ:start/1 returning the recovery terms in the same + %% order as the supplied queue names, so that we can zip them together + %% for further processing in recover_durable_queues. + {ok, OrderedRecoveryTerms} = + BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), + {ok,_} = supervisor:start_child( + rabbit_sup, + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}), + recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). + +stop() -> + ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup), + ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup), + {ok, BQ} = application:get_env(rabbit, backing_queue_module), + ok = BQ:stop(). + +start(Qs) -> + %% At this point all recovered queues and their bindings are + %% visible to routing, so now it is safe for them to complete + %% their initialisation (which may involve interacting with other + %% queues). + [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs], + ok. + +find_durable_queues() -> + Node = node(), + mnesia:async_dirty( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{name = Name, + pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node andalso + %% Terminations on node down will not remove the rabbit_queue + %% record if it is a mirrored queue (such info is now obtained from + %% the policy). Thus, we must check if the local pid is alive + %% - if the record is present - in order to restart. + (mnesia:read(rabbit_queue, Name, read) =:= [] + orelse not erlang:is_process_alive(Pid))])) + end). + +recover_durable_queues(QueuesAndRecoveryTerms) -> + {Results, Failures} = + gen_server2:mcall( + [{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q, recovery), + {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]), + [rabbit_log:error("Queue ~p failed to initialise: ~p~n", + [Pid, Error]) || {Pid, Error} <- Failures], + [Q || {_, {new, Q}} <- Results]. + +declare(QueueName, Durable, AutoDelete, Args, Owner) -> + declare(QueueName, Durable, AutoDelete, Args, Owner, node()). + + +%% The Node argument suggests where the queue (master if mirrored) +%% should be. Note that in some cases (e.g. with "nodes" policy in +%% effect) this might not be possible to satisfy. +declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> + ok = check_declare_arguments(QueueName, Args), + Q = rabbit_queue_decorator:set( + rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + recoverable_slaves = [], + gm_pids = [], + state = live, + policy_version = 0 })), + + Node1 = case rabbit_queue_master_location_misc:get_location(Q) of + {ok, Node0} -> Node0; + {error, _} -> Node + end, + + Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1), + gen_server2:call( + rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare), + {init, new}, infinity). + +internal_declare(Q, true) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + ok = store_queue(Q#amqqueue{state = live}), + rabbit_misc:const(Q) + end); +internal_declare(Q = #amqqueue{name = QueueName}, false) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case not_found_or_absent(QueueName) of + not_found -> Q1 = rabbit_policy:set(Q), + Q2 = Q1#amqqueue{state = live}, + ok = store_queue(Q2), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; + {absent, _Q, _} = R -> rabbit_misc:const(R) + end; + [ExistingQ] -> + rabbit_misc:const(ExistingQ) + end + end). + +update(Name, Fun) -> + case mnesia:wread({rabbit_queue, Name}) of + [Q = #amqqueue{durable = Durable}] -> + Q1 = Fun(Q), + ok = mnesia:write(rabbit_queue, Q1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); + _ -> ok + end, + Q1; + [] -> + not_found + end. + +store_queue(Q = #amqqueue{durable = true}) -> + ok = mnesia:write(rabbit_durable_queue, + Q#amqqueue{slave_pids = [], + sync_slave_pids = [], + gm_pids = [], + decorators = undefined}, write), + store_queue_ram(Q); +store_queue(Q = #amqqueue{durable = false}) -> + store_queue_ram(Q). + +store_queue_ram(Q) -> + ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). + +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_queue, Name}) of + [Q] -> store_queue_ram(Q), + ok; + [] -> ok + end + end). + +policy_changed(Q1 = #amqqueue{decorators = Decorators1}, + Q2 = #amqqueue{decorators = Decorators2}) -> + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), + D1 = rabbit_queue_decorator:select(Decorators1), + D2 = rabbit_queue_decorator:select(Decorators2), + [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)], + %% Make sure we emit a stats event even if nothing + %% mirroring-related has changed - the policy may have changed anyway. + notify_policy_changed(Q1). + +add_default_binding(#amqqueue{name = QueueName}) -> + ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), + RoutingKey = QueueName#resource.name, + rabbit_binding:add(#binding{source = ExchangeName, + destination = QueueName, + key = RoutingKey, + args = []}). + +lookup([]) -> []; %% optimisation +lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation +lookup(Names) when is_list(Names) -> + %% Normally we'd call mnesia:dirty_read/1 here, but that is quite + %% expensive for reasons explained in rabbit_misc:dirty_read/1. + lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]); +lookup(Name) -> + rabbit_misc:dirty_read({rabbit_queue, Name}). + +not_found_or_absent(Name) -> + %% NB: we assume that the caller has already performed a lookup on + %% rabbit_queue and not found anything + case mnesia:read({rabbit_durable_queue, Name}) of + [] -> not_found; + [Q] -> {absent, Q, nodedown} %% Q exists on stopped node + end. + +not_found_or_absent_dirty(Name) -> + %% We should read from both tables inside a tx, to get a + %% consistent view. But the chances of an inconsistency are small, + %% and only affect the error kind. + case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of + {error, not_found} -> not_found; + {ok, Q} -> {absent, Q, nodedown} + end. + +with(Name, F, E) -> + with(Name, F, E, 2000). + +with(Name, F, E, RetriesLeft) -> + case lookup(Name) of + {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 -> + %% Something bad happened to that queue, we are bailing out + %% on processing current request. + E({absent, Q, timeout}); + {ok, Q = #amqqueue{state = crashed}} -> + E({absent, Q, crashed}); + {ok, Q = #amqqueue{pid = QPid}} -> + %% We check is_process_alive(QPid) in case we receive a + %% nodedown (for example) in F() that has nothing to do + %% with the QPid. F() should be written s.t. that this + %% cannot happen, so we bail if it does since that + %% indicates a code bug and we don't want to get stuck in + %% the retry loop. + rabbit_misc:with_exit_handler( + fun () -> false = rabbit_mnesia:is_process_alive(QPid), + timer:sleep(30), + with(Name, F, E, RetriesLeft - 1) + end, fun () -> F(Q) end); + {error, not_found} -> + E(not_found_or_absent_dirty(Name)) + end. + +with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). + +with_or_die(Name, F) -> + with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) + end). + +assert_equivalence(#amqqueue{name = QName, + durable = Durable, + auto_delete = AD} = Q, + Durable1, AD1, Args1, Owner) -> + rabbit_misc:assert_field_equivalence(Durable, Durable1, QName, durable), + rabbit_misc:assert_field_equivalence(AD, AD1, QName, auto_delete), + assert_args_equivalence(Q, Args1), + check_exclusive_access(Q, Owner, strict). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + +assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, + RequiredArgs) -> + rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, + [Key || {Key, _Fun} <- declare_args()]). + +check_declare_arguments(QueueName, Args) -> + check_arguments(QueueName, Args, declare_args()). + +check_consume_arguments(QueueName, Args) -> + check_arguments(QueueName, Args, consume_args()). + +check_arguments(QueueName, Args, Validators) -> + [case rabbit_misc:table_lookup(Args, Key) of + undefined -> ok; + TypeVal -> case Fun(TypeVal, Args) of + ok -> ok; + {error, Error} -> rabbit_misc:protocol_error( + precondition_failed, + "invalid arg '~s' for ~s: ~255p", + [Key, rabbit_misc:rs(QueueName), + Error]) + end + end || {Key, Fun} <- Validators], + ok. + +declare_args() -> + [{<<"x-expires">>, fun check_expires_arg/2}, + {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, + {<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2}, + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-max-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, + {<<"x-max-priority">>, fun check_non_neg_int_arg/2}, + {<<"x-queue-mode">>, fun check_queue_mode/2}]. + +consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, + {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. + +check_int_arg({Type, _}, _) -> + case lists:member(Type, ?INTEGER_ARG_TYPES) of + true -> ok; + false -> {error, {unacceptable_type, Type}} + end. + +check_bool_arg({bool, _}, _) -> ok; +check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. + +check_non_neg_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error + end. + +check_expires_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val == 0 -> {error, {value_zero, Val}}; + ok -> rabbit_misc:check_expiry(Val); + Error -> Error + end. + +check_message_ttl_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok -> rabbit_misc:check_expiry(Val); + Error -> Error + end. + +%% Note that the validity of x-dead-letter-exchange is already verified +%% by rabbit_channel's queue.declare handler. +check_dlxname_arg({longstr, _}, _) -> ok; +check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. + +check_dlxrk_arg({longstr, _}, Args) -> + case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of + undefined -> {error, routing_key_but_no_dlx_defined}; + _ -> ok + end; +check_dlxrk_arg({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + +check_queue_mode({longstr, Val}, _Args) -> + case lists:member(Val, [<<"default">>, <<"lazy">>]) of + true -> ok; + false -> {error, invalid_queue_mode} + end; +check_queue_mode({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + +list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). + +list_names() -> mnesia:dirty_all_keys(rabbit_queue). + +list(VHostPath) -> list(VHostPath, rabbit_queue). + +%% Not dirty_match_object since that would not be transactional when used in a +%% tx context +list(VHostPath, TableName) -> + mnesia:async_dirty( + fun () -> + mnesia:match_object( + TableName, + #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}, + read) + end). + +list_down(VHostPath) -> + Present = list(VHostPath), + Durable = list(VHostPath, rabbit_durable_queue), + PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]), + sets:to_list(sets:filter(fun (#amqqueue{name = N}) -> + not sets:is_element(N, PresentS) + end, sets:from_list(Durable))). + +info_keys() -> rabbit_amqqueue_process:info_keys(). + +map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). + +info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); +info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info). + +info(Q = #amqqueue{ state = crashed }, Items) -> + info_down(Q, Items, crashed); +info(#amqqueue{ pid = QPid }, Items) -> + case delegate:call(QPid, {info, Items}) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + +info_down(Q, DownReason) -> + info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason). + +info_down(Q, Items, DownReason) -> + [{Item, i_down(Item, Q, DownReason)} || Item <- Items]. + +i_down(name, #amqqueue{name = Name}, _) -> Name; +i_down(durable, #amqqueue{durable = Dur}, _) -> Dur; +i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD; +i_down(arguments, #amqqueue{arguments = Args}, _) -> Args; +i_down(pid, #amqqueue{pid = QPid}, _) -> QPid; +i_down(recoverable_slaves, #amqqueue{recoverable_slaves = RS}, _) -> RS; +i_down(state, _Q, DownReason) -> DownReason; +i_down(K, _Q, _DownReason) -> + case lists:member(K, rabbit_amqqueue_process:info_keys()) of + true -> ''; + false -> throw({bad_argument, K}) + end. + +info_all(VHostPath) -> + map(list(VHostPath), fun (Q) -> info(Q) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end). + +info_all(VHostPath, Items) -> + map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end). + +info_all_partial_emit(VHostPath, Items, all, Ref, AggregatorPid) -> + info_all_partial_emit(VHostPath, Items, online, Ref, AggregatorPid), + info_all_partial_emit(VHostPath, Items, offline, Ref, AggregatorPid); +info_all_partial_emit(VHostPath, Items, online, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, + list(VHostPath), + continue); +info_all_partial_emit(VHostPath, Items, offline, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, + list_down(VHostPath), + continue); +info_all_partial_emit(VHostPath, Items, local, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, + list_local(VHostPath), + continue). + +info_all(VHostPath, Items, Filter, Ref, AggregatorPid) -> + info_all_partial_emit(VHostPath, Items, Filter, Ref, AggregatorPid), + %% Previous map(s) are incomplete, finalize emission + rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []). + +info_local(VHostPath) -> + map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end). + +list_local(VHostPath) -> + [ Q || #amqqueue{state = State, pid=QPid} = Q <- list(VHostPath), + State =/= crashed, + node() =:= node(QPid) ]. + +force_event_refresh(Ref) -> + [gen_server2:cast(Q#amqqueue.pid, + {force_event_refresh, Ref}) || Q <- list()], + ok. + +notify_policy_changed(#amqqueue{pid = QPid}) -> + gen_server2:cast(QPid, policy_changed). + +consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). + +consumer_info_keys() -> ?CONSUMER_INFO_KEYS. + +consumers_all(VHostPath) -> + ConsumerInfoKeys = consumer_info_keys(), + lists:append( + map(list(VHostPath), + fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)). + +consumers_all(VHostPath, Ref, AggregatorPid) -> + ConsumerInfoKeys = consumer_info_keys(), + rabbit_control_misc:emitting_map( + AggregatorPid, Ref, + fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end, + list(VHostPath)). + +get_queue_consumer_info(Q, ConsumerInfoKeys) -> + [lists:zip(ConsumerInfoKeys, + [Q#amqqueue.name, ChPid, CTag, + AckRequired, Prefetch, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]. + +stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). + +pid_of(#amqqueue{pid = Pid}) -> Pid. +pid_of(VHost, QueueName) -> + case lookup(rabbit_misc:r(VHost, queue, QueueName)) of + {ok, Q} -> pid_of(Q); + {error, not_found} = E -> E + end. + +delete_immediately(QPids) -> + [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], + ok. + +delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> + delegate:call(QPid, {delete, IfUnused, IfEmpty}). + +delete_crashed(#amqqueue{ pid = QPid } = Q) -> + ok = rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]). + +delete_crashed_internal(Q = #amqqueue{ name = QName }) -> + {ok, BQ} = application:get_env(rabbit, backing_queue_module), + BQ:delete_crashed(Q), + ok = internal_delete(QName). + +purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). + +requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). + +ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). + +reject(QPid, Requeue, MsgIds, ChPid) -> + delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}). + +notify_down_all(QPids, ChPid) -> + notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT). + +notify_down_all(QPids, ChPid, Timeout) -> + case rpc:call(node(), delegate, call, + [QPids, {notify_down, ChPid}], Timeout) of + {badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}}; + {badrpc, Reason} -> {error, Reason}; + {_, Bads} -> + case lists:filter( + fun ({_Pid, {exit, {R, _}, _}}) -> + rabbit_misc:is_abnormal_exit(R); + ({_Pid, _}) -> false + end, Bads) of + [] -> ok; + Bads1 -> {error, Bads1} + end; + Error -> {error, Error} + end. + +activate_limit_all(QPids, ChPid) -> + delegate:cast(QPids, {activate_limit, ChPid}). + +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). + +basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> + delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). + +basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, + LimiterActive, ConsumerPrefetchCount, ConsumerTag, + ExclusiveConsume, Args, OkMsg) -> + ok = check_consume_arguments(QName, Args), + delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg}). + +basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> + delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + +notify_decorators(#amqqueue{pid = QPid}) -> + delegate:cast(QPid, notify_decorators). + +notify_sent(QPid, ChPid) -> + rabbit_amqqueue_common:notify_sent(QPid, ChPid). + +notify_sent_queue_down(QPid) -> + rabbit_amqqueue_common:notify_sent_queue_down(QPid). + +resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). + +internal_delete1(QueueName, OnlyDurable) -> + ok = mnesia:delete({rabbit_queue, QueueName}), + %% this 'guarded' delete prevents unnecessary writes to the mnesia + %% disk log + case mnesia:wread({rabbit_durable_queue, QueueName}) of + [] -> ok; + [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName}) + end, + %% we want to execute some things, as decided by rabbit_exchange, + %% after the transaction. + rabbit_binding:remove_for_destination(QueueName, OnlyDurable). + +internal_delete(QueueName) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + case {mnesia:wread({rabbit_queue, QueueName}), + mnesia:wread({rabbit_durable_queue, QueueName})} of + {[], []} -> + rabbit_misc:const({error, not_found}); + _ -> + Deletions = internal_delete1(QueueName, false), + T = rabbit_binding:process_deletions(Deletions), + fun() -> + ok = T(), + rabbit_core_metrics:queue_deleted(QueueName), + ok = rabbit_event:notify(queue_deleted, + [{name, QueueName}]) + end + end + end). + +forget_all_durable(Node) -> + %% Note rabbit is not running so we avoid e.g. the worker pool. Also why + %% we don't invoke the return from rabbit_binding:process_deletions/1. + {atomic, ok} = + mnesia:sync_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_durable_queue, + #amqqueue{_ = '_'}, write), + [forget_node_for_queue(Node, Q) || + #amqqueue{pid = Pid} = Q <- Qs, + node(Pid) =:= Node], + ok + end), + ok. + +%% Try to promote a slave while down - it should recover as a +%% master. We try to take the oldest slave here for best chance of +%% recovery. +forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) -> + forget_node_for_queue(DeadNode, RS, Q). + +forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) -> + %% No slaves to recover from, queue is gone. + %% Don't process_deletions since that just calls callbacks and we + %% are not really up. + internal_delete1(Name, true); + +%% Should not happen, but let's be conservative. +forget_node_for_queue(DeadNode, [DeadNode | T], Q) -> + forget_node_for_queue(DeadNode, T, Q); + +forget_node_for_queue(DeadNode, [H|T], Q) -> + case node_permits_offline_promotion(H) of + false -> forget_node_for_queue(DeadNode, T, Q); + true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)}, + ok = mnesia:write(rabbit_durable_queue, Q1, write) + end. + +node_permits_offline_promotion(Node) -> + case node() of + Node -> not rabbit:is_running(); %% [1] + _ -> All = rabbit_mnesia:cluster_nodes(all), + Running = rabbit_mnesia:cluster_nodes(running), + lists:member(Node, All) andalso + not lists:member(Node, Running) %% [2] + end. +%% [1] In this case if we are a real running node (i.e. rabbitmqctl +%% has RPCed into us) then we cannot allow promotion. If on the other +%% hand we *are* rabbitmqctl impersonating the node for offline +%% node-forgetting then we can. +%% +%% [2] This is simpler; as long as it's down that's OK + +run_backing_queue(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). + +set_ram_duration_target(QPid, Duration) -> + gen_server2:cast(QPid, {set_ram_duration_target, Duration}). + +set_maximum_since_use(QPid, Age) -> + gen_server2:cast(QPid, {set_maximum_since_use, Age}). + +update_mirroring(QPid) -> ok = delegate:cast(QPid, update_mirroring). + +sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, sync_mirrors); +sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). +cancel_sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, cancel_sync_mirrors); +cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). + +is_mirrored(Q) -> + rabbit_mirror_queue_misc:is_mirrored(Q). + +on_node_up(Node) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_queue, + #amqqueue{_ = '_'}, write), + [maybe_clear_recoverable_node(Node, Q) || Q <- Qs], + ok + end). + +maybe_clear_recoverable_node(Node, + #amqqueue{sync_slave_pids = SPids, + recoverable_slaves = RSs} = Q) -> + case lists:member(Node, RSs) of + true -> + %% There is a race with + %% rabbit_mirror_queue_slave:record_synchronised/1 called + %% by the incoming slave node and this function, called + %% by the master node. If this function is executed after + %% record_synchronised/1, the node is erroneously removed + %% from the recoverable slaves list. + %% + %% We check if the slave node's queue PID is alive. If it is + %% the case, then this function is executed after. In this + %% situation, we don't touch the queue record, it is already + %% correct. + DoClearNode = + case [SP || SP <- SPids, node(SP) =:= Node] of + [SPid] -> not rabbit_misc:is_process_alive(SPid); + _ -> true + end, + if + DoClearNode -> RSs1 = RSs -- [Node], + store_queue( + Q#amqqueue{recoverable_slaves = RSs1}); + true -> ok + end; + false -> + ok + end. + +on_node_down(Node) -> + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> QsDels = + qlc:e(qlc:q([{QName, delete_queue(QName)} || + #amqqueue{name = QName, pid = Pid} = Q + <- mnesia:table(rabbit_queue), + not rabbit_amqqueue:is_mirrored(Q) andalso + node(Pid) == Node andalso + not rabbit_mnesia:is_process_alive(Pid)])), + {Qs, Dels} = lists:unzip(QsDels), + T = rabbit_binding:process_deletions( + lists:foldl(fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), Dels)), + fun () -> + T(), + lists:foreach( + fun(QName) -> + rabbit_core_metrics:queue_deleted(QName), + ok = rabbit_event:notify(queue_deleted, + [{name, QName}]) + end, Qs) + end + end). + +delete_queue(QueueName) -> + ok = mnesia:delete({rabbit_queue, QueueName}), + rabbit_binding:remove_transient_for_destination(QueueName). + +pseudo_queue(QueueName, Pid) -> + #amqqueue{name = QueueName, + durable = false, + auto_delete = false, + arguments = [], + pid = Pid, + slave_pids = []}. + +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + recoverable_slaves = none, + gm_pids = none, + policy = none, + decorators = none, + state = none}. + +deliver([], _Delivery) -> + %% /dev/null optimisation + []; + +deliver(Qs, Delivery = #delivery{flow = Flow}) -> + {MPids, SPids} = qpids(Qs), + QPids = MPids ++ SPids, + %% We use up two credits to send to a slave since the message + %% arrives at the slave from two directions. We will ack one when + %% the slave receives the message direct from the channel, and the + %% other when it receives it via GM. + case Flow of + %% Here we are tracking messages sent by the rabbit_channel + %% process. We are accessing the rabbit_channel process + %% dictionary. + flow -> [credit_flow:send(QPid) || QPid <- QPids], + [credit_flow:send(QPid) || QPid <- SPids]; + noflow -> ok + end, + + %% We let slaves know that they were being addressed as slaves at + %% the time - if they receive such a message from the channel + %% after they have become master they should mark the message as + %% 'delivered' since they do not know what the master may have + %% done with it. + MMsg = {deliver, Delivery, false}, + SMsg = {deliver, Delivery, true}, + delegate:cast(MPids, MMsg), + delegate:cast(SPids, SMsg), + QPids. + +qpids([]) -> {[], []}; %% optimisation +qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt +qpids(Qs) -> + {MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids}, + {MPidAcc, SPidAcc}) -> + {[QPid | MPidAcc], [SPids | SPidAcc]} + end, {[], []}, Qs), + {MPids, lists:append(SPids)}. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl new file mode 100644 index 0000000000..d8c5289997 --- /dev/null +++ b/src/rabbit_auth_backend_internal.erl @@ -0,0 +1,421 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_auth_backend_internal). +-include("rabbit.hrl"). + +-behaviour(rabbit_authn_backend). +-behaviour(rabbit_authz_backend). + +-export([user_login_authentication/2, user_login_authorization/1, + check_vhost_access/3, check_resource_access/3]). + +-export([add_user/2, delete_user/1, lookup_user/1, + change_password/2, clear_password/1, + hash_password/2, change_password_hash/2, change_password_hash/3, + set_tags/2, set_permissions/5, clear_permissions/2, + add_user_sans_validation/2]). +-export([user_info_keys/0, perms_info_keys/0, + user_perms_info_keys/0, vhost_perms_info_keys/0, + user_vhost_perms_info_keys/0, + list_users/0, list_users/2, list_permissions/0, + list_user_permissions/1, list_user_permissions/3, + list_vhost_permissions/1, list_vhost_permissions/3, + list_user_vhost_permissions/2]). + +%% for testing +-export([hashing_module_for_user/1]). + +%%---------------------------------------------------------------------------- + +-type regexp() :: binary(). + +-spec add_user(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}. +-spec delete_user(rabbit_types:username()) -> 'ok'. +-spec lookup_user + (rabbit_types:username()) -> + rabbit_types:ok(rabbit_types:internal_user()) | + rabbit_types:error('not_found'). +-spec change_password + (rabbit_types:username(), rabbit_types:password()) -> 'ok'. +-spec clear_password(rabbit_types:username()) -> 'ok'. +-spec hash_password + (module(), rabbit_types:password()) -> rabbit_types:password_hash(). +-spec change_password_hash + (rabbit_types:username(), rabbit_types:password_hash()) -> 'ok'. +-spec set_tags(rabbit_types:username(), [atom()]) -> 'ok'. +-spec set_permissions + (rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(), + regexp()) -> + 'ok'. +-spec clear_permissions + (rabbit_types:username(), rabbit_types:vhost()) -> 'ok'. +-spec user_info_keys() -> rabbit_types:info_keys(). +-spec perms_info_keys() -> rabbit_types:info_keys(). +-spec user_perms_info_keys() -> rabbit_types:info_keys(). +-spec vhost_perms_info_keys() -> rabbit_types:info_keys(). +-spec user_vhost_perms_info_keys() -> rabbit_types:info_keys(). +-spec list_users() -> [rabbit_types:infos()]. +-spec list_users(reference(), pid()) -> 'ok'. +-spec list_permissions() -> [rabbit_types:infos()]. +-spec list_user_permissions + (rabbit_types:username()) -> [rabbit_types:infos()]. +-spec list_user_permissions + (rabbit_types:username(), reference(), pid()) -> 'ok'. +-spec list_vhost_permissions + (rabbit_types:vhost()) -> [rabbit_types:infos()]. +-spec list_vhost_permissions + (rabbit_types:vhost(), reference(), pid()) -> 'ok'. +-spec list_user_vhost_permissions + (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()]. + +%%---------------------------------------------------------------------------- +%% Implementation of rabbit_auth_backend + +%% Returns a password hashing module for the user record provided. If +%% there is no information in the record, we consider it to be legacy +%% (inserted by a version older than 3.6.0) and fall back to MD5, the +%% now obsolete hashing function. +hashing_module_for_user(#internal_user{ + hashing_algorithm = ModOrUndefined}) -> + rabbit_password:hashing_mod(ModOrUndefined). + +user_login_authentication(Username, []) -> + internal_check_user_login(Username, fun(_) -> true end); +user_login_authentication(Username, AuthProps) -> + case lists:keyfind(password, 1, AuthProps) of + {password, Cleartext} -> + internal_check_user_login( + Username, + fun (#internal_user{ + password_hash = <<Salt:4/binary, Hash/binary>> + } = U) -> + Hash =:= rabbit_password:salted_hash( + hashing_module_for_user(U), Salt, Cleartext); + (#internal_user{}) -> + false + end); + false -> exit({unknown_auth_props, Username, AuthProps}) + end. + +user_login_authorization(Username) -> + case user_login_authentication(Username, []) of + {ok, #auth_user{impl = Impl, tags = Tags}} -> {ok, Impl, Tags}; + Else -> Else + end. + +internal_check_user_login(Username, Fun) -> + Refused = {refused, "user '~s' - invalid credentials", [Username]}, + case lookup_user(Username) of + {ok, User = #internal_user{tags = Tags}} -> + case Fun(User) of + true -> {ok, #auth_user{username = Username, + tags = Tags, + impl = none}}; + _ -> Refused + end; + {error, not_found} -> + Refused + end. + +check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) -> + case mnesia:dirty_read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> false; + [_R] -> true + end. + +check_resource_access(#auth_user{username = Username}, + #resource{virtual_host = VHostPath, name = Name}, + Permission) -> + case mnesia:dirty_read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> + false; + [#user_permission{permission = P}] -> + PermRegexp = case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false + end + end. + +permission_index(configure) -> #permission.configure; +permission_index(write) -> #permission.write; +permission_index(read) -> #permission.read. + +%%---------------------------------------------------------------------------- +%% Manipulation of the user database + +validate_credentials(Username, Password) -> + rabbit_credential_validation:validate(Username, Password). + +validate_and_alternate_credentials(Username, Password, Fun) -> + case validate_credentials(Username, Password) of + ok -> + Fun(Username, Password); + {error, Err} -> + rabbit_log:error("Credential validation for '~s' failed!~n", [Username]), + {error, Err} + end. + +add_user(Username, Password) -> + validate_and_alternate_credentials(Username, Password, fun add_user_sans_validation/2). + +add_user_sans_validation(Username, Password) -> + rabbit_log:info("Creating user '~s'~n", [Username]), + %% hash_password will pick the hashing function configured for us + %% but we also need to store a hint as part of the record, so we + %% retrieve it here one more time + HashingMod = rabbit_password:hashing_mod(), + User = #internal_user{username = Username, + password_hash = hash_password(HashingMod, Password), + tags = [], + hashing_algorithm = HashingMod}, + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_user, Username}) of + [] -> + ok = mnesia:write(rabbit_user, User, write); + _ -> + mnesia:abort({user_already_exists, Username}) + end + end), + rabbit_event:notify(user_created, [{name, Username}]), + R. + +delete_user(Username) -> + rabbit_log:info("Deleting user '~s'~n", [Username]), + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:delete({rabbit_user, Username}), + [ok = mnesia:delete_object( + rabbit_user_permission, R, write) || + R <- mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = '_'}, + permission = '_'}, + write)], + ok + end)), + rabbit_event:notify(user_deleted, [{name, Username}]), + R. + +lookup_user(Username) -> + rabbit_misc:dirty_read({rabbit_user, Username}). + +change_password(Username, Password) -> + validate_and_alternate_credentials(Username, Password, fun change_password_sans_validation/2). + +change_password_sans_validation(Username, Password) -> + rabbit_log:info("Changing password for '~s'~n", [Username]), + HashingAlgorithm = rabbit_password:hashing_mod(), + R = change_password_hash(Username, + hash_password(rabbit_password:hashing_mod(), + Password), + HashingAlgorithm), + rabbit_event:notify(user_password_changed, [{name, Username}]), + R. + +clear_password(Username) -> + rabbit_log:info("Clearing password for '~s'~n", [Username]), + R = change_password_hash(Username, <<"">>), + rabbit_event:notify(user_password_cleared, [{name, Username}]), + R. + +hash_password(HashingMod, Cleartext) -> + rabbit_password:hash(HashingMod, Cleartext). + +change_password_hash(Username, PasswordHash) -> + change_password_hash(Username, PasswordHash, rabbit_password:hashing_mod()). + + +change_password_hash(Username, PasswordHash, HashingAlgorithm) -> + update_user(Username, fun(User) -> + User#internal_user{ + password_hash = PasswordHash, + hashing_algorithm = HashingAlgorithm } + end). + +set_tags(Username, Tags) -> + rabbit_log:info("Setting user tags for user '~s' to ~p~n", + [Username, Tags]), + R = update_user(Username, fun(User) -> + User#internal_user{tags = Tags} + end), + rabbit_event:notify(user_tags_set, [{name, Username}, {tags, Tags}]), + R. + +set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + rabbit_log:info("Setting permissions for " + "'~s' in '~s' to '~s', '~s', '~s'~n", + [Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]), + lists:map( + fun (RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case re:compile(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, + Regexp, Reason}}) + end + end, [ConfigurePerm, WritePerm, ReadPerm]), + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> ok = mnesia:write( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}, + write) + end)), + rabbit_event:notify(permission_created, [{user, Username}, + {vhost, VHostPath}, + {configure, ConfigurePerm}, + {write, WritePerm}, + {read, ReadPerm}]), + R. + +clear_permissions(Username, VHostPath) -> + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> + ok = mnesia:delete({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) + end)), + rabbit_event:notify(permission_deleted, [{user, Username}, + {vhost, VHostPath}]), + R. + + +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + +%%---------------------------------------------------------------------------- +%% Listing + +-define(PERMS_INFO_KEYS, [configure, write, read]). +-define(USER_INFO_KEYS, [user, tags]). + +user_info_keys() -> ?USER_INFO_KEYS. + +perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS]. +vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS]. +user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS]. +user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS. + +list_users() -> + [extract_internal_user_params(U) || + U <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. + +list_users(Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map( + AggregatorPid, Ref, + fun(U) -> extract_internal_user_params(U) end, + mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})). + +list_permissions() -> + list_permissions(perms_info_keys(), match_user_vhost('_', '_')). + +list_permissions(Keys, QueryThunk) -> + [extract_user_permission_params(Keys, U) || + %% TODO: use dirty ops instead + U <- rabbit_misc:execute_mnesia_transaction(QueryThunk)]. + +list_permissions(Keys, QueryThunk, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map( + AggregatorPid, Ref, fun(U) -> extract_user_permission_params(Keys, U) end, + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction(QueryThunk)). + +filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)]. + +list_user_permissions(Username) -> + list_permissions( + user_perms_info_keys(), + rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))). + +list_user_permissions(Username, Ref, AggregatorPid) -> + list_permissions( + user_perms_info_keys(), + rabbit_misc:with_user(Username, match_user_vhost(Username, '_')), + Ref, AggregatorPid). + +list_vhost_permissions(VHostPath) -> + list_permissions( + vhost_perms_info_keys(), + rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))). + +list_vhost_permissions(VHostPath, Ref, AggregatorPid) -> + list_permissions( + vhost_perms_info_keys(), + rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath)), + Ref, AggregatorPid). + +list_user_vhost_permissions(Username, VHostPath) -> + list_permissions( + user_vhost_perms_info_keys(), + rabbit_misc:with_user_and_vhost( + Username, VHostPath, match_user_vhost(Username, VHostPath))). + +extract_user_permission_params(Keys, #user_permission{ + user_vhost = + #user_vhost{username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}) -> + filter_props(Keys, [{user, Username}, + {vhost, VHostPath}, + {configure, ConfigurePerm}, + {write, WritePerm}, + {read, ReadPerm}]). + +extract_internal_user_params(#internal_user{username = Username, tags = Tags}) -> + [{user, Username}, {tags, Tags}]. + +match_user_vhost(Username, VHostPath) -> + fun () -> mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = '_'}, + read) + end. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl new file mode 100644 index 0000000000..d474e9cad3 --- /dev/null +++ b/src/rabbit_basic.erl @@ -0,0 +1,302 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_basic). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([publish/4, publish/5, publish/1, + message/3, message/4, properties/1, prepend_table_header/3, + extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, + header_routes/1, parse_expiration/1, header/2, header/3]). +-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). + +%%---------------------------------------------------------------------------- + +-type properties_input() :: + rabbit_framing:amqp_property_record() | [{atom(), any()}]. +-type publish_result() :: + {ok, [pid()]} | rabbit_types:error('not_found'). +-type header() :: any(). +-type headers() :: rabbit_framing:amqp_table() | 'undefined'. + +-type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name(). +-type body_input() :: binary() | [binary()]. + +-spec publish + (exchange_input(), rabbit_router:routing_key(), properties_input(), + body_input()) -> + publish_result(). +-spec publish + (exchange_input(), rabbit_router:routing_key(), boolean(), + properties_input(), body_input()) -> + publish_result(). +-spec publish(rabbit_types:delivery()) -> publish_result(). +-spec delivery + (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery(). +-spec message + (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), + binary()) -> + rabbit_types:message(). +-spec message + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> + rabbit_types:ok_or_error2(rabbit_types:message(), any()). +-spec properties + (properties_input()) -> rabbit_framing:amqp_property_record(). + +-spec prepend_table_header + (binary(), rabbit_framing:amqp_table(), headers()) -> headers(). + +-spec header(header(), headers()) -> 'undefined' | any(). +-spec header(header(), headers(), any()) -> 'undefined' | any(). + +-spec extract_headers(rabbit_types:content()) -> headers(). + +-spec map_headers + (fun((headers()) -> headers()), rabbit_types:content()) -> + rabbit_types:content(). + +-spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()]. +-spec build_content + (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> + rabbit_types:content(). +-spec from_content + (rabbit_types:content()) -> + {rabbit_framing:amqp_property_record(), binary()}. +-spec parse_expiration + (rabbit_framing:amqp_property_record()) -> + rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()). + +%%---------------------------------------------------------------------------- + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. +publish(Exchange, RoutingKeyBin, Properties, Body) -> + publish(Exchange, RoutingKeyBin, false, Properties, Body). + +%% Convenience function, for avoiding round-trips in calls across the +%% erlang distributed network. +publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(X, delivery(Mandatory, false, Message, undefined)); +publish(XName, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(delivery(Mandatory, false, Message, undefined)). + +publish(Delivery = #delivery{ + message = #basic_message{exchange_name = XName}}) -> + case rabbit_exchange:lookup(XName) of + {ok, X} -> publish(X, Delivery); + Err -> Err + end. + +publish(X, Delivery) -> + Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)), + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), + {ok, DeliveredQPids}. + +delivery(Mandatory, Confirm, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), + message = Message, msg_seq_no = MsgSeqNo, flow = noflow}. + +build_content(Properties, BodyBin) when is_binary(BodyBin) -> + build_content(Properties, [BodyBin]); + +build_content(Properties, PFR) -> + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), + #content{class_id = ClassId, + properties = Properties, + properties_bin = none, + protocol = none, + payload_fragments_rev = PFR}. + +from_content(Content) -> + #content{class_id = ClassId, + properties = Props, + payload_fragments_rev = FragmentsRev} = + rabbit_binary_parser:ensure_content_decoded(Content), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), + {Props, list_to_binary(lists:reverse(FragmentsRev))}. + +%% This breaks the spec rule forbidding message modification +strip_header(#content{properties = #'P_basic'{headers = undefined}} + = DecodedContent, _Key) -> + DecodedContent; +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} + = DecodedContent, Key) -> + case lists:keysearch(Key, 1, Headers) of + false -> DecodedContent; + {value, Found} -> Headers0 = lists:delete(Found, Headers), + rabbit_binary_generator:clear_encoded_content( + DecodedContent#content{ + properties = Props#'P_basic'{ + headers = Headers0}}) + end. + +message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> + try + {ok, #basic_message{ + exchange_name = XName, + content = strip_header(DecodedContent, ?DELETED_HEADER), + id = rabbit_guid:gen(), + is_persistent = is_message_persistent(DecodedContent), + routing_keys = [RoutingKey | + header_routes(Props#'P_basic'.headers)]}} + catch + {error, _Reason} = Error -> Error + end. + +message(XName, RoutingKey, RawProperties, Body) -> + Properties = properties(RawProperties), + Content = build_content(Properties, Body), + {ok, Msg} = message(XName, RoutingKey, Content), + Msg. + +properties(P = #'P_basic'{}) -> + P; +properties(P) when is_list(P) -> + %% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2), + %% i.e. slow. Use the definition of 'P_basic' directly if + %% possible! + lists:foldl(fun ({Key, Value}, Acc) -> + case indexof(record_info(fields, 'P_basic'), Key) of + 0 -> throw({unknown_basic_property, Key}); + N -> setelement(N + 1, Acc, Value) + end + end, #'P_basic'{}, P). + +prepend_table_header(Name, Info, undefined) -> + prepend_table_header(Name, Info, []); +prepend_table_header(Name, Info, Headers) -> + case rabbit_misc:table_lookup(Headers, Name) of + {array, Existing} -> + prepend_table(Name, Info, Existing, Headers); + undefined -> + prepend_table(Name, Info, [], Headers); + Other -> + Headers2 = prepend_table(Name, Info, [], Headers), + set_invalid_header(Name, Other, Headers2) + end. + +prepend_table(Name, Info, Prior, Headers) -> + rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). + +set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) -> + case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of + undefined -> + set_invalid([{Name, array, [Value]}], Headers); + {table, ExistingHdr} -> + update_invalid(Name, Value, ExistingHdr, Headers); + Other -> + %% somehow the x-invalid-headers header is corrupt + Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}], + set_invalid_header(Name, Value, set_invalid(Invalid, Headers)) + end. + +set_invalid(NewHdr, Headers) -> + rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr). + +update_invalid(Name, Value, ExistingHdr, Header) -> + Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of + undefined -> [Value]; + {array, Prior} -> [Value | Prior] + end, + NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), + set_invalid(NewHdr, Header). + +header(_Header, undefined) -> + undefined; +header(_Header, []) -> + undefined; +header(Header, Headers) -> + header(Header, Headers, undefined). + +header(Header, Headers, Default) -> + case lists:keysearch(Header, 1, Headers) of + false -> Default; + {value, Val} -> Val + end. + +extract_headers(Content) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers. + +extract_timestamp(Content) -> + #content{properties = #'P_basic'{timestamp = Timestamp}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Timestamp. + +map_headers(F, Content) -> + Content1 = rabbit_binary_parser:ensure_content_decoded(Content), + #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, + Headers1 = F(Headers), + rabbit_binary_generator:clear_encoded_content( + Content1#content{properties = Props#'P_basic'{headers = Headers1}}). + +indexof(L, Element) -> indexof(L, Element, 1). + +indexof([], _Element, _N) -> 0; +indexof([Element | _Rest], Element, N) -> N; +indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> throw({error, {delivery_mode_unknown, Other}}) + end. + +%% Extract CC routes from headers +header_routes(undefined) -> + []; +header_routes(HeadersTable) -> + lists:append( + [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {array, Routes} -> [Route || {longstr, Route} <- Routes]; + undefined -> []; + {Type, _Val} -> throw({error, {unacceptable_type_in_header, + binary_to_list(HeaderKey), Type}}) + end || HeaderKey <- ?ROUTING_HEADERS]). + +parse_expiration(#'P_basic'{expiration = undefined}) -> + {ok, undefined}; +parse_expiration(#'P_basic'{expiration = Expiration}) -> + case string:to_integer(binary_to_list(Expiration)) of + {error, no_integer} = E -> + E; + {N, ""} -> + case rabbit_misc:check_expiry(N) of + ok -> {ok, N}; + E = {error, _} -> E + end; + {_, S} -> + {error, {leftover_string, S}} + end. + +maybe_gc_large_msg(Content) -> + rabbit_writer:maybe_gc_large_msg(Content). + +msg_size(Content) -> + rabbit_writer:msg_size(Content). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl new file mode 100644 index 0000000000..dae0612e03 --- /dev/null +++ b/src/rabbit_channel.erl @@ -0,0 +1,2034 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_channel). + +%% rabbit_channel processes represent an AMQP 0-9-1 channels. +%% +%% Connections parse protocol frames coming from clients and +%% dispatch them to channel processes. +%% Channels are responsible for implementing the logic behind +%% the various protocol methods, involving other processes as +%% needed: +%% +%% * Routing messages (using functions in various exchange type +%% modules) to queue processes. +%% * Managing queues, exchanges, and bindings. +%% * Keeping track of consumers +%% * Keeping track of unacknowledged deliveries to consumers +%% * Keeping track of publisher confirms +%% * Keeping track of mandatory message routing confirmations +%% and returns +%% * Transaction management +%% * Authorisation (enforcing permissions) +%% * Publishing trace events if tracing is enabled +%% +%% Every channel has a number of dependent processes: +%% +%% * A writer which is responsible for sending frames to clients. +%% * A limiter which controls how many messages can be delivered +%% to consumers according to active QoS prefetch and internal +%% flow control logic. +%% +%% Channels are also aware of their connection's queue collector. +%% When a queue is declared as exclusive on a channel, the channel +%% will notify queue collector of that queue. + +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-behaviour(gen_server2). + +-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). +-export([send_command/2, deliver/4, deliver_reply/2, + send_credit_reply/2, send_drained/2]). +-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, + info_all/3, info_local/1]). +-export([refresh_config_local/0, ready_for_close/1]). +-export([force_event_refresh/1]). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). +%% Internal +-export([list_local/0, deliver_reply_local/3]). +-export([get_vhost/1, get_user/1]). + +-record(ch, { + %% starting | running | flow | closing + state, + %% same as reader's protocol. Used when instantiating + %% (protocol) exceptions. + protocol, + %% channel number + channel, + %% reader process + reader_pid, + %% writer process + writer_pid, + %% + conn_pid, + %% same as reader's name, see #v1.name + %% in rabbit_reader + conn_name, + %% limiter pid, see rabbit_limiter + limiter, + %% none | {Msgs, Acks} | committing | failed | + tx, + %% (consumer) delivery tag sequence + next_tag, + %% messages pending consumer acknowledgement + unacked_message_q, + %% same as #v1.user in the reader, used in + %% authorisation checks + user, + %% same as #v1.user in the reader + virtual_host, + %% when queue.bind's queue field is empty, + %% this name will be used instead + most_recently_declared_queue, + %% a dictionary of queue pid to queue name + queue_names, + %% queue processes are monitored to update + %% queue names + queue_monitors, + %% a dictionary of consumer tags to + %% consumer details: #amqqueue record, acknowledgement mode, + %% consumer exclusivity, etc + consumer_mapping, + %% a dictionary of queue pids to consumer tag lists + queue_consumers, + %% a set of pids of queues that have unacknowledged + %% deliveries + delivering_queues, + %% when a queue is declared as exclusive, queue + %% collector must be notified. + %% see rabbit_queue_collector for more info. + queue_collector_pid, + %% timer used to emit statistics + stats_timer, + %% are publisher confirms enabled for this channel? + confirm_enabled, + %% publisher confirm delivery tag sequence + publish_seqno, + %% a dtree used to track unconfirmed + %% (to publishers) messages + unconfirmed, + %% a list of tags for published messages that were + %% delivered but are yet to be confirmed to the client + confirmed, + %% a dtree used to track oustanding notifications + %% for messages published as mandatory + mandatory, + %% same as capabilities in the reader + capabilities, + %% tracing exchange resource if tracing is enabled, + %% 'none' otherwise + trace_state, + consumer_prefetch, + %% used by "one shot RPC" (amq. + reply_consumer, + %% flow | noflow, see rabbitmq-server#114 + delivery_flow, + interceptor_state +}). + + +-define(MAX_PERMISSION_CACHE_SIZE, 12). + +-define(STATISTICS_KEYS, + [reductions, + pid, + transactional, + confirm, + consumer_count, + messages_unacknowledged, + messages_unconfirmed, + messages_uncommitted, + acks_uncommitted, + prefetch_count, + global_prefetch_count, + state, + garbage_collection]). + +-define(CREATION_EVENT_KEYS, + [pid, + name, + connection, + number, + user, + vhost]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). + +-define(INCR_STATS(Incs, Measure, State), + case rabbit_event:stats_level(State, #ch.stats_timer) of + fine -> incr_stats(Incs, Measure); + _ -> ok + end). + +%%---------------------------------------------------------------------------- + +-export_type([channel_number/0]). + +-type channel_number() :: non_neg_integer(). + +-export_type([channel/0]). + +-type channel() :: #ch{}. + +-spec start_link + (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid(), pid()) -> + rabbit_types:ok_pid_or_error(). +-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. +-spec do + (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> + 'ok'. +-spec do_flow + (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> + 'ok'. +-spec flush(pid()) -> 'ok'. +-spec shutdown(pid()) -> 'ok'. +-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. +-spec deliver + (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'. +-spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'. +-spec deliver_reply_local(pid(), binary(), rabbit_types:delivery()) -> 'ok'. +-spec send_credit_reply(pid(), non_neg_integer()) -> 'ok'. +-spec send_drained(pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'. +-spec list() -> [pid()]. +-spec list_local() -> [pid()]. +-spec info_keys() -> rabbit_types:info_keys(). +-spec info(pid()) -> rabbit_types:infos(). +-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). +-spec info_all() -> [rabbit_types:infos()]. +-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. +-spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'. +-spec refresh_config_local() -> 'ok'. +-spec ready_for_close(pid()) -> 'ok'. +-spec force_event_refresh(reference()) -> 'ok'. + +%%---------------------------------------------------------------------------- + +start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, + VHost, Capabilities, CollectorPid, Limiter) -> + gen_server2:start_link( + ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, + User, VHost, Capabilities, CollectorPid, Limiter], []). + +do(Pid, Method) -> + do(Pid, Method, none). + +do(Pid, Method, Content) -> + gen_server2:cast(Pid, {method, Method, Content, noflow}). + +do_flow(Pid, Method, Content) -> + %% Here we are tracking messages sent by the rabbit_reader + %% process. We are accessing the rabbit_reader process dictionary. + credit_flow:send(Pid), + gen_server2:cast(Pid, {method, Method, Content, flow}). + +flush(Pid) -> + gen_server2:call(Pid, flush, infinity). + +shutdown(Pid) -> + gen_server2:cast(Pid, terminate). + +send_command(Pid, Msg) -> + gen_server2:cast(Pid, {command, Msg}). + +deliver(Pid, ConsumerTag, AckRequired, Msg) -> + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). + +deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> + case decode_fast_reply_to(Rest) of + {ok, Pid, Key} -> + delegate:invoke_no_result( + Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}); + error -> + ok + end. + +%% We want to ensure people can't use this mechanism to send a message +%% to an arbitrary process and kill it! +deliver_reply_local(Pid, Key, Delivery) -> + case pg_local:in_group(rabbit_channels, Pid) of + true -> gen_server2:cast(Pid, {deliver_reply, Key, Delivery}); + false -> ok + end. + +declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> + exists; +declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) -> + case decode_fast_reply_to(Rest) of + {ok, Pid, Key} -> + Msg = {declare_fast_reply_to, Key}, + rabbit_misc:with_exit_handler( + rabbit_misc:const(not_found), + fun() -> gen_server2:call(Pid, Msg, infinity) end); + error -> + not_found + end; +declare_fast_reply_to(_) -> + not_found. + +decode_fast_reply_to(Rest) -> + case string:tokens(binary_to_list(Rest), ".") of + [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)), + {ok, Pid, Key}; + _ -> error + end. + +send_credit_reply(Pid, Len) -> + gen_server2:cast(Pid, {send_credit_reply, Len}). + +send_drained(Pid, CTagCredit) -> + gen_server2:cast(Pid, {send_drained, CTagCredit}). + +list() -> + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), + rabbit_channel, list_local, []). + +list_local() -> + pg_local:get_members(rabbit_channels). + +info_keys() -> ?INFO_KEYS. + +info(Pid) -> + gen_server2:call(Pid, info, infinity). + +info(Pid, Items) -> + case gen_server2:call(Pid, {info, Items}, infinity) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end. + +info_all() -> + rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()). + +info_all(Items) -> + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). + +info_local(Items) -> + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list_local()). + +info_all(Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()). + +refresh_config_local() -> + rabbit_misc:upmap( + fun (C) -> gen_server2:call(C, refresh_config, infinity) end, + list_local()), + ok. + +ready_for_close(Pid) -> + gen_server2:cast(Pid, ready_for_close). + +force_event_refresh(Ref) -> + [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], + ok. + +%%--------------------------------------------------------------------------- + +init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, + Capabilities, CollectorPid, LimiterPid]) -> + process_flag(trap_exit, true), + ?store_proc_name({ConnName, Channel}), + ok = pg_local:join(rabbit_channels, self()), + Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of + true -> flow; + false -> noflow + end, + State = #ch{state = starting, + protocol = Protocol, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + conn_pid = ConnPid, + conn_name = ConnName, + limiter = rabbit_limiter:new(LimiterPid), + tx = none, + next_tag = 1, + unacked_message_q = queue:new(), + user = User, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + queue_names = dict:new(), + queue_monitors = pmon:new(), + consumer_mapping = dict:new(), + queue_consumers = dict:new(), + delivering_queues = sets:new(), + queue_collector_pid = CollectorPid, + confirm_enabled = false, + publish_seqno = 1, + unconfirmed = dtree:empty(), + confirmed = [], + mandatory = dtree:empty(), + capabilities = Capabilities, + trace_state = rabbit_trace:init(VHost), + consumer_prefetch = 0, + reply_consumer = none, + delivery_flow = Flow, + interceptor_state = undefined}, + State1 = State#ch{ + interceptor_state = rabbit_channel_interceptor:init(State)}, + State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), + Infos = infos(?CREATION_EVENT_KEYS, State2), + rabbit_core_metrics:channel_created(self(), Infos), + rabbit_event:notify(channel_created, Infos), + rabbit_event:if_enabled(State2, #ch.stats_timer, + fun() -> emit_stats(State2) end), + put(channel_operation_timeout, ?CHANNEL_OPERATION_TIMEOUT), + {ok, State2, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +prioritise_call(Msg, _From, _Len, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + _ -> 0 + end. + +prioritise_cast(Msg, _Len, _State) -> + case Msg of + {confirm, _MsgSeqNos, _QPid} -> 5; + {mandatory_received, _MsgSeqNo, _QPid} -> 5; + _ -> 0 + end. + +prioritise_info(Msg, _Len, _State) -> + case Msg of + emit_stats -> 7; + _ -> 0 + end. + +handle_call(flush, _From, State) -> + reply(ok, State); + +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); + +handle_call({info, Items}, _From, State) -> + try + reply({ok, infos(Items, State)}, State) + catch Error -> reply({error, Error}, State) + end; + +handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> + reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); + +handle_call({declare_fast_reply_to, Key}, _From, + State = #ch{reply_consumer = Consumer}) -> + reply(case Consumer of + {_, _, Key} -> exists; + _ -> not_found + end, State); + +handle_call(_Request, _From, State) -> + noreply(State). + +handle_cast({method, Method, Content, Flow}, + State = #ch{reader_pid = Reader, + interceptor_state = IState}) -> + case Flow of + %% We are going to process a message from the rabbit_reader + %% process, so here we ack it. In this case we are accessing + %% the rabbit_channel process dictionary. + flow -> credit_flow:ack(Reader); + noflow -> ok + end, + + try handle_method(rabbit_channel_interceptor:intercept_in( + expand_shortcuts(Method, State), Content, IState), + State) of + {reply, Reply, NewState} -> + ok = send(Reply, NewState), + noreply(NewState); + {noreply, NewState} -> + noreply(NewState); + stop -> + {stop, normal, State} + catch + exit:Reason = #amqp_error{} -> + MethodName = rabbit_misc:method_record_type(Method), + handle_exception(Reason#amqp_error{method = MethodName}, State); + _:Reason -> + {stop, {Reason, erlang:get_stacktrace()}, State} + end; + +handle_cast(ready_for_close, State = #ch{state = closing, + writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), + {stop, normal, State}; + +handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:flush(WriterPid), + {stop, normal, State}; + +handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) -> + ok = send(Msg, State), + noreply(consumer_monitor(CTag, State)); + +handle_cast({command, Msg}, State) -> + ok = send(Msg, State), + noreply(State); + +handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) -> + noreply(State); +handle_cast({deliver, ConsumerTag, AckRequired, + Msg = {_QName, QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> + ok = rabbit_writer:send_command_and_notify( + WriterPid, QPid, self(), + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + rabbit_basic:maybe_gc_large_msg(Content), + noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); + +handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) -> + noreply(State); +handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) -> + noreply(State); +handle_cast({deliver_reply, Key, #delivery{message = + #basic_message{exchange_name = ExchangeName, + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag, + reply_consumer = {ConsumerTag, _Suffix, Key}}) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = false, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + noreply(State); +handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) -> + noreply(State); + +handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_ok'{available = Len}), + noreply(State); + +handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> + [ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, + credit_drained = CreditDrained}) + || {ConsumerTag, CreditDrained} <- CTagCredit], + noreply(State); + +handle_cast({force_event_refresh, Ref}, State) -> + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), + Ref), + noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer)); + +handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> + %% NB: don't call noreply/1 since we don't want to send confirms. + noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)}); + +handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), + %% NB: don't call noreply/1 since we don't want to send confirms. + noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})). + +handle_info({bump_credit, Msg}, State) -> + %% A rabbit_amqqueue_process is granting credit to our channel. If + %% our channel was being blocked by this process, and no other + %% process is blocking our channel, then this channel will be + %% unblocked. This means that any credit that was deferred will be + %% sent to rabbit_reader processs that might be blocked by this + %% particular channel. + credit_flow:handle_bump_msg(Msg), + noreply(State); + +handle_info(timeout, State) -> + noreply(State); + +handle_info(emit_stats, State) -> + emit_stats(State), + State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer), + %% NB: don't call noreply/1 since we don't want to kick off the + %% stats timer. + {noreply, send_confirms(State1), hibernate}; + +handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> + State1 = handle_publishing_queue_down(QPid, Reason, State), + State3 = handle_consuming_queue_down(QPid, State1), + State4 = handle_delivering_queue_down(QPid, State3), + %% A rabbit_amqqueue_process has died. If our channel was being + %% blocked by this process, and no other process is blocking our + %% channel, then this channel will be unblocked. This means that + %% any credit that was deferred will be sent to the rabbit_reader + %% processs that might be blocked by this particular channel. + credit_flow:peer_down(QPid), + #ch{queue_names = QNames, queue_monitors = QMons} = State4, + case dict:find(QPid, QNames) of + {ok, QName} -> erase_queue_stats(QName); + error -> ok + end, + noreply(State4#ch{queue_names = dict:erase(QPid, QNames), + queue_monitors = pmon:erase(QPid, QMons)}); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; + +handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel}) + when is_reference(Ref) -> + log(warning, "Channel ~p ignoring late answer ~p from ~p", + [Channel, LateAnswer, Node]), + noreply(State). + +handle_pre_hibernate(State) -> + ok = clear_permission_cache(), + rabbit_event:if_enabled( + State, #ch.stats_timer, + fun () -> emit_stats(State, + [{idle_since, + time_compat:os_system_time(milli_seconds)}]) + end), + {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. + +terminate(_Reason, State = #ch{}) -> + {_Res, _State1} = notify_queues(State), + pg_local:leave(rabbit_channels, self()), + rabbit_event:if_enabled(State, #ch.stats_timer, + fun() -> emit_stats(State) end), + [delete_stats(Tag) || {Tag, _} <- get()], + rabbit_core_metrics:channel_closed(self()), + rabbit_event:notify(channel_closed, [{pid, self()}]). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + +%%--------------------------------------------------------------------------- + +log(Level, Fmt, Args) -> rabbit_log:log(channel, Level, Fmt, Args). + +reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. + +noreply(NewState) -> {noreply, next_state(NewState), hibernate}. + +next_state(State) -> ensure_stats_timer(send_confirms(State)). + +noreply_coalesce(State = #ch{confirmed = C}) -> + Timeout = case C of [] -> hibernate; _ -> 0 end, + {noreply, ensure_stats_timer(State), Timeout}. + +ensure_stats_timer(State) -> + rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats). + +return_ok(State, true, _Msg) -> {noreply, State}; +return_ok(State, false, Msg) -> {reply, Msg, State}. + +ok_msg(true, _Msg) -> undefined; +ok_msg(false, Msg) -> Msg. + +send(_Command, #ch{state = closing}) -> + ok; +send(Command, #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command(WriterPid, Command). + +format_soft_error(#amqp_error{name = N, explanation = E, method = M}) -> + io_lib:format("operation ~s caused a channel exception ~s: ~ts", [M, N, E]); +format_soft_error(Reason) -> + Reason. + +handle_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid, + conn_name = ConnName, + virtual_host = VHost, + user = User}) -> + %% something bad's happened: notify_queues may not be 'ok' + {_Result, State1} = notify_queues(State), + case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of + {Channel, CloseMethod} -> + log(error, "Channel error on connection ~p (~s, vhost: '~s'," + " user: '~s'), channel ~p:~n~s~n", + [ConnPid, ConnName, VHost, User#user.username, + Channel, format_soft_error(Reason)]), + ok = rabbit_writer:send_command(WriterPid, CloseMethod), + {noreply, State1}; + {0, _} -> + ReaderPid ! {channel_exit, Channel, Reason}, + {stop, normal, State1} + end. + +-spec precondition_failed(string()) -> no_return(). + +precondition_failed(Format) -> precondition_failed(Format, []). + +-spec precondition_failed(string(), [any()]) -> no_return(). + +precondition_failed(Format, Params) -> + rabbit_misc:protocol_error(precondition_failed, Format, Params). + +return_queue_declare_ok(#resource{name = ActualName}, + NoWait, MessageCount, ConsumerCount, State) -> + return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, + #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}). + +check_resource_access(User, Resource, Perm) -> + V = {Resource, Perm}, + Cache = case get(permission_cache) of + undefined -> []; + Other -> Other + end, + case lists:member(V, Cache) of + true -> ok; + false -> ok = rabbit_access_control:check_resource_access( + User, Resource, Perm), + CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1), + put(permission_cache, [V | CacheTail]) + end. + +clear_permission_cache() -> erase(permission_cache), + ok. + +check_configure_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, configure). + +check_write_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, write). + +check_read_permitted(Resource, #ch{user = User}) -> + check_resource_access(User, Resource, read). + +check_user_id_header(#'P_basic'{user_id = undefined}, _) -> + ok; +check_user_id_header(#'P_basic'{user_id = Username}, + #ch{user = #user{username = Username}}) -> + ok; +check_user_id_header( + #'P_basic'{}, #ch{user = #user{authz_backends = + [{rabbit_auth_backend_dummy, _}]}}) -> + ok; +check_user_id_header(#'P_basic'{user_id = Claimed}, + #ch{user = #user{username = Actual, + tags = Tags}}) -> + case lists:member(impersonator, Tags) of + true -> ok; + false -> precondition_failed( + "user_id property set to '~s' but authenticated user was " + "'~s'", [Claimed, Actual]) + end. + +check_expiration_header(Props) -> + case rabbit_basic:parse_expiration(Props) of + {ok, _} -> ok; + {error, E} -> precondition_failed("invalid expiration '~s': ~p", + [Props#'P_basic'.expiration, E]) + end. + +check_internal_exchange(#exchange{name = Name, internal = true}) -> + rabbit_misc:protocol_error(access_refused, + "cannot publish to internal ~s", + [rabbit_misc:rs(Name)]); +check_internal_exchange(_) -> + ok. + +check_msg_size(Content) -> + Size = rabbit_basic:maybe_gc_large_msg(Content), + case Size > ?MAX_MSG_SIZE of + true -> precondition_failed("message size ~B larger than max size ~B", + [Size, ?MAX_MSG_SIZE]); + false -> ok + end. + +qbin_to_resource(QueueNameBin, State) -> + name_to_resource(queue, QueueNameBin, State). + +name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) -> + rabbit_misc:r(VHostPath, Type, NameBin). + +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> + MRDQ; +expand_queue_name_shortcut(QueueNameBin, _) -> + QueueNameBin. + +expand_routing_key_shortcut(<<>>, <<>>, + #ch{most_recently_declared_queue = <<>>}) -> + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); +expand_routing_key_shortcut(<<>>, <<>>, + #ch{most_recently_declared_queue = MRDQ}) -> + MRDQ; +expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> + RoutingKey. + +expand_shortcuts(#'basic.get' {queue = Q} = M, State) -> + M#'basic.get' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'basic.consume'{queue = Q} = M, State) -> + M#'basic.consume'{queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.delete' {queue = Q} = M, State) -> + M#'queue.delete' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.purge' {queue = Q} = M, State) -> + M#'queue.purge' {queue = expand_queue_name_shortcut(Q, State)}; +expand_shortcuts(#'queue.bind' {queue = Q, routing_key = K} = M, State) -> + M#'queue.bind' {queue = expand_queue_name_shortcut(Q, State), + routing_key = expand_routing_key_shortcut(Q, K, State)}; +expand_shortcuts(#'queue.unbind' {queue = Q, routing_key = K} = M, State) -> + M#'queue.unbind' {queue = expand_queue_name_shortcut(Q, State), + routing_key = expand_routing_key_shortcut(Q, K, State)}; +expand_shortcuts(M, _State) -> + M. + +check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> + rabbit_misc:protocol_error( + access_refused, "operation not permitted on the default exchange", []); +check_not_default_exchange(_) -> + ok. + +check_exchange_deletion(XName = #resource{name = <<"amq.", _/binary>>, + kind = exchange}) -> + rabbit_misc:protocol_error( + access_refused, "deletion of system ~s not allowed", + [rabbit_misc:rs(XName)]); +check_exchange_deletion(_) -> + ok. + +%% check that an exchange/queue name does not contain the reserved +%% "amq." prefix. +%% +%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names +%% only applies on actual creation, and not in the cases where the +%% entity already exists or passive=true. +%% +%% NB: We deliberately do not enforce the other constraints on names +%% required by the spec. +check_name(Kind, NameBin = <<"amq.", _/binary>>) -> + rabbit_misc:protocol_error( + access_refused, + "~s name '~s' contains reserved prefix 'amq.*'",[Kind, NameBin]); +check_name(_Kind, NameBin) -> + NameBin. + +maybe_set_fast_reply_to( + C = #content{properties = P = #'P_basic'{reply_to = + <<"amq.rabbitmq.reply-to">>}}, + #ch{reply_consumer = ReplyConsumer}) -> + case ReplyConsumer of + none -> rabbit_misc:protocol_error( + precondition_failed, + "fast reply consumer does not exist", []); + {_, Suf, _K} -> Rep = <<"amq.rabbitmq.reply-to.", Suf/binary>>, + rabbit_binary_generator:clear_encoded_content( + C#content{properties = P#'P_basic'{reply_to = Rep}}) + end; +maybe_set_fast_reply_to(C, _State) -> + C. + +record_confirms([], State) -> + State; +record_confirms(MXs, State = #ch{confirmed = C}) -> + State#ch{confirmed = [MXs | C]}. + +handle_method({Method, Content}, State) -> + handle_method(Method, Content, State). + +handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> + %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? + State1 = State#ch{state = running}, + rabbit_event:if_enabled(State1, #ch.stats_timer, + fun() -> emit_stats(State1) end), + {reply, #'channel.open_ok'{}, State1}; + +handle_method(#'channel.open'{}, _, _State) -> + rabbit_misc:protocol_error( + channel_error, "second 'channel.open' seen", []); + +handle_method(_Method, _, #ch{state = starting}) -> + rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); + +handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> + stop; + +handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid, + state = closing}) -> + ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), + {noreply, State}; + +handle_method(_Method, _, State = #ch{state = closing}) -> + {noreply, State}; + +handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> + {_Result, State1} = notify_queues(State), + %% We issue the channel.close_ok response after a handshake with + %% the reader, the other half of which is ready_for_close. That + %% way the reader forgets about the channel before we send the + %% response (and this channel process terminates). If we didn't do + %% that, a channel.open for the same channel number, which a + %% client is entitled to send as soon as it has received the + %% close_ok, might be received by the reader before it has seen + %% the termination and hence be sent to the old, now dead/dying + %% channel process, instead of a new process, and thus lost. + ReaderPid ! {channel_closing, self()}, + {noreply, State1}; + +%% Even though the spec prohibits the client from sending commands +%% while waiting for the reply to a synchronous command, we generally +%% do allow this...except in the case of a pending tx.commit, where +%% it could wreak havoc. +handle_method(_Method, _, #ch{tx = Tx}) + when Tx =:= committing orelse Tx =:= failed -> + rabbit_misc:protocol_error( + channel_error, "unexpected command while processing 'tx.commit'", []); + +handle_method(#'access.request'{},_, State) -> + {reply, #'access.request_ok'{ticket = 1}, State}; + +handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> + rabbit_misc:protocol_error(not_implemented, "immediate=true", []); + +handle_method(#'basic.publish'{exchange = ExchangeNameBin, + routing_key = RoutingKey, + mandatory = Mandatory}, + Content, State = #ch{virtual_host = VHostPath, + tx = Tx, + channel = ChannelNum, + confirm_enabled = ConfirmEnabled, + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName, + delivery_flow = Flow}) -> + check_msg_size(Content), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_write_permitted(ExchangeName, State), + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + check_internal_exchange(Exchange), + %% We decode the content's properties here because we're almost + %% certain to want to look at delivery-mode and priority. + DecodedContent = #content {properties = Props} = + maybe_set_fast_reply_to( + rabbit_binary_parser:ensure_content_decoded(Content), State), + check_user_id_header(Props, State), + check_expiration_header(Props), + DoConfirm = Tx =/= none orelse ConfirmEnabled, + {MsgSeqNo, State1} = + case DoConfirm orelse Mandatory of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, State#ch{publish_seqno = SeqNo + 1}} + end, + case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of + {ok, Message} -> + Delivery = rabbit_basic:delivery( + Mandatory, DoConfirm, Message, MsgSeqNo), + QNames = rabbit_exchange:route(Exchange, Delivery), + rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, + Username, TraceState), + DQ = {Delivery#delivery{flow = Flow}, QNames}, + {noreply, case Tx of + none -> deliver_to_queues(DQ, State1); + {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), + State1#ch{tx = {Msgs1, Acks}} + end}; + {error, Reason} -> + precondition_failed("invalid message: ~p", [Reason]) + end; + +handle_method(#'basic.nack'{delivery_tag = DeliveryTag, + multiple = Multiple, + requeue = Requeue}, _, State) -> + reject(DeliveryTag, Requeue, Multiple, State); + +handle_method(#'basic.ack'{delivery_tag = DeliveryTag, + multiple = Multiple}, + _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, case Tx of + none -> ack(Acked, State1), + State1; + {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks), + State1#ch{tx = {Msgs, Acks1}} + end}; + +handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, + _, State = #ch{writer_pid = WriterPid, + conn_pid = ConnPid, + limiter = Limiter, + next_tag = DeliveryTag}) -> + QueueName = qbin_to_resource(QueueNameBin, State), + check_read_permitted(QueueName, State), + case rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ConnPid, + fun (Q) -> rabbit_amqqueue:basic_get( + Q, self(), NoAck, rabbit_limiter:pid(Limiter)) + end) of + {ok, MessageCount, + Msg = {QName, QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}} -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey, + message_count = MessageCount}, + Content), + State1 = monitor_delivering_queue(NoAck, QPid, QName, State), + {noreply, record_sent(none, not(NoAck), Msg, State1)}; + empty -> + {reply, #'basic.get_empty'{}, State} + end; + +handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, + consumer_tag = CTag0, + no_ack = NoAck, + nowait = NoWait}, + _, State = #ch{reply_consumer = ReplyConsumer, + consumer_mapping = ConsumerMapping}) -> + case dict:find(CTag0, ConsumerMapping) of + error -> + case {ReplyConsumer, NoAck} of + {none, true} -> + CTag = case CTag0 of + <<>> -> rabbit_guid:binary( + rabbit_guid:gen_secure(), "amq.ctag"); + Other -> Other + end, + %% Precalculate both suffix and key; base64 encoding is + %% expensive + Key = base64:encode(rabbit_guid:gen_secure()), + PidEnc = base64:encode(term_to_binary(self())), + Suffix = <<PidEnc/binary, ".", Key/binary>>, + Consumer = {CTag, Suffix, binary_to_list(Key)}, + State1 = State#ch{reply_consumer = Consumer}, + case NoWait of + true -> {noreply, State1}; + false -> Rep = #'basic.consume_ok'{consumer_tag = CTag}, + {reply, Rep, State1} + end; + {_, false} -> + rabbit_misc:protocol_error( + precondition_failed, + "reply consumer cannot acknowledge", []); + _ -> + rabbit_misc:protocol_error( + precondition_failed, "reply consumer already set", []) + end; + {ok, _} -> + %% Attempted reuse of consumer tag. + rabbit_misc:protocol_error( + not_allowed, "attempt to reuse consumer tag '~s'", [CTag0]) + end; + +handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, + _, State = #ch{reply_consumer = {ConsumerTag, _, _}}) -> + State1 = State#ch{reply_consumer = none}, + case NoWait of + true -> {noreply, State1}; + false -> Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, + {reply, Rep, State1} + end; + +handle_method(#'basic.consume'{queue = QueueNameBin, + consumer_tag = ConsumerTag, + no_local = _, % FIXME: implement + no_ack = NoAck, + exclusive = ExclusiveConsume, + nowait = NoWait, + arguments = Args}, + _, State = #ch{consumer_prefetch = ConsumerPrefetch, + consumer_mapping = ConsumerMapping}) -> + case dict:find(ConsumerTag, ConsumerMapping) of + error -> + QueueName = qbin_to_resource(QueueNameBin, State), + check_read_permitted(QueueName, State), + ActualConsumerTag = + case ConsumerTag of + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.ctag"); + Other -> Other + end, + case basic_consume( + QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, State) of + {ok, State1} -> + {noreply, State1}; + {error, exclusive_consume_unavailable} -> + rabbit_misc:protocol_error( + access_refused, "~s in exclusive use", + [rabbit_misc:rs(QueueName)]) + end; + {ok, _} -> + %% Attempted reuse of consumer tag. + rabbit_misc:protocol_error( + not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag]) + end; + +handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, + _, State = #ch{consumer_mapping = ConsumerMapping, + queue_consumers = QCons}) -> + OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, + case dict:find(ConsumerTag, ConsumerMapping) of + error -> + %% Spec requires we ignore this situation. + return_ok(State, NoWait, OkMsg); + {ok, {Q = #amqqueue{pid = QPid}, _CParams}} -> + ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), + QCons1 = + case dict:find(QPid, QCons) of + error -> QCons; + {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags), + case gb_sets:is_empty(CTags1) of + true -> dict:erase(QPid, QCons); + false -> dict:store(QPid, CTags1, QCons) + end + end, + NewState = State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QCons1}, + %% In order to ensure that no more messages are sent to + %% the consumer after the cancel_ok has been sent, we get + %% the queue process to send the cancel_ok on our + %% behalf. If we were sending the cancel_ok ourselves it + %% might overtake a message sent previously by the queue. + case rabbit_misc:with_exit_handler( + fun () -> {error, not_found} end, + fun () -> + rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg)) + end) of + ok -> + {noreply, NewState}; + {error, not_found} -> + %% Spec requires we ignore this situation. + return_ok(NewState, NoWait, OkMsg) + end + end; + +handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> + rabbit_misc:protocol_error(not_implemented, + "prefetch_size!=0 (~w)", [Size]); + +handle_method(#'basic.qos'{global = false, + prefetch_count = PrefetchCount}, _, State) -> + {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}}; + +handle_method(#'basic.qos'{global = true, + prefetch_count = 0}, + _, State = #ch{limiter = Limiter}) -> + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; + +handle_method(#'basic.qos'{global = true, + prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> + %% TODO queue:len(UAMQ) is not strictly right since that counts + %% unacked messages from basic.get too. Pretty obscure though. + Limiter1 = rabbit_limiter:limit_prefetch(Limiter, + PrefetchCount, queue:len(UAMQ)), + case ((not rabbit_limiter:is_active(Limiter)) andalso + rabbit_limiter:is_active(Limiter1)) of + true -> rabbit_amqqueue:activate_limit_all( + consumer_queues(State#ch.consumer_mapping), self()); + false -> ok + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; + +handle_method(#'basic.recover_async'{requeue = true}, + _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> + OkFun = fun () -> ok end, + UAMQL = queue:to_list(UAMQ), + foreach_per_queue( + fun (QPid, MsgIds) -> + rabbit_misc:with_exit_handler( + OkFun, + fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end) + end, lists:reverse(UAMQL)), + ok = notify_limiter(Limiter, UAMQL), + %% No answer required - basic.recover is the newer, synchronous + %% variant of this method + {noreply, State#ch{unacked_message_q = queue:new()}}; + +handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "requeue=false", []); + +handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> + {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue}, + Content, State), + {reply, #'basic.recover_ok'{}, State1}; + +handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, + _, State) -> + reject(DeliveryTag, Requeue, false, State); + +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + type = TypeNameBin, + passive = false, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + nowait = NoWait, + arguments = Args}, + _, State = #ch{virtual_host = VHostPath}) -> + CheckedType = rabbit_exchange:check_type(TypeNameBin), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_not_default_exchange(ExchangeName), + check_configure_permitted(ExchangeName, State), + X = case rabbit_exchange:lookup(ExchangeName) of + {ok, FoundX} -> FoundX; + {error, not_found} -> + check_name('exchange', ExchangeNameBin), + AeKey = <<"alternate-exchange">>, + case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of + undefined -> ok; + {error, {invalid_type, Type}} -> + precondition_failed( + "invalid type '~s' for arg '~s' in ~s", + [Type, AeKey, rabbit_misc:rs(ExchangeName)]); + AName -> check_read_permitted(ExchangeName, State), + check_write_permitted(AName, State), + ok + end, + rabbit_exchange:declare(ExchangeName, + CheckedType, + Durable, + AutoDelete, + Internal, + Args) + end, + ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, + AutoDelete, Internal, Args), + return_ok(State, NoWait, #'exchange.declare_ok'{}); + +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, + passive = true, + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath}) -> + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_not_default_exchange(ExchangeName), + _ = rabbit_exchange:lookup_or_die(ExchangeName), + return_ok(State, NoWait, #'exchange.declare_ok'{}); + +handle_method(#'exchange.delete'{exchange = ExchangeNameBin, + if_unused = IfUnused, + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath}) -> + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_not_default_exchange(ExchangeName), + check_exchange_deletion(ExchangeName), + check_configure_permitted(ExchangeName, State), + case rabbit_exchange:delete(ExchangeName, IfUnused) of + {error, not_found} -> + return_ok(State, NoWait, #'exchange.delete_ok'{}); + {error, in_use} -> + precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]); + ok -> + return_ok(State, NoWait, #'exchange.delete_ok'{}) + end; + +handle_method(#'exchange.bind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:add/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.bind_ok'{}, NoWait, State); + +handle_method(#'exchange.unbind'{destination = DestinationNameBin, + source = SourceNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:remove/2, + SourceNameBin, exchange, DestinationNameBin, RoutingKey, + Arguments, #'exchange.unbind_ok'{}, NoWait, State); + +%% Note that all declares to these are effectively passive. If it +%% exists it by definition has one consumer. +handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", + _/binary>> = QueueNameBin, + nowait = NoWait}, _, + State = #ch{virtual_host = VHost}) -> + QueueName = rabbit_misc:r(VHost, queue, QueueNameBin), + case declare_fast_reply_to(QueueNameBin) of + exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State); + not_found -> rabbit_misc:not_found(QueueName) + end; + +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = DurableDeclare, + exclusive = ExclusiveDeclare, + auto_delete = AutoDelete, + nowait = NoWait, + arguments = Args} = Declare, + _, State = #ch{virtual_host = VHostPath, + conn_pid = ConnPid, + queue_collector_pid = CollectorPid}) -> + Owner = case ExclusiveDeclare of + true -> ConnPid; + false -> none + end, + Durable = DurableDeclare andalso not ExclusiveDeclare, + ActualNameBin = case QueueNameBin of + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.gen"); + Other -> check_name('queue', Other) + end, + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configure_permitted(QueueName, State), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + maybe_stat(NoWait, Q) + end) of + {ok, MessageCount, ConsumerCount} -> + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + {error, not_found} -> + DlxKey = <<"x-dead-letter-exchange">>, + case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of + undefined -> + ok; + {error, {invalid_type, Type}} -> + precondition_failed( + "invalid type '~s' for arg '~s' in ~s", + [Type, DlxKey, rabbit_misc:rs(QueueName)]); + DLX -> + check_read_permitted(QueueName, State), + check_write_permitted(DLX, State), + ok + end, + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + {new, #amqqueue{pid = QPid}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case Owner of + none -> ok; + _ -> rabbit_queue_collector:register( + CollectorPid, QPid) + end, + return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, none, State); + {absent, Q, Reason} -> + rabbit_misc:absent(Q, Reason); + {owner_died, _Q} -> + %% Presumably our own days are numbered since the + %% connection has died. Pretend the queue exists though, + %% just so nothing fails. + return_queue_declare_ok(QueueName, NoWait, 0, 0, State) + end; + {error, {absent, Q, Reason}} -> + rabbit_misc:absent(Q, Reason) + end; + +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = true, + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath, + conn_pid = ConnPid}) -> + QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), + {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), + ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), + return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, + State); + +handle_method(#'queue.delete'{queue = QueueNameBin, + if_unused = IfUnused, + if_empty = IfEmpty, + nowait = NoWait}, + _, State = #ch{conn_pid = ConnPid}) -> + QueueName = qbin_to_resource(QueueNameBin, State), + check_configure_permitted(QueueName, State), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ConnPid), + rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) + end, + fun (not_found) -> {ok, 0}; + ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q), + {ok, 0}; + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) + end) of + {error, in_use} -> + precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); + {error, not_empty} -> + precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]); + {ok, PurgedMessageCount} -> + return_ok(State, NoWait, + #'queue.delete_ok'{message_count = PurgedMessageCount}) + end; + +handle_method(#'queue.bind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:add/2, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, + #'queue.bind_ok'{}, NoWait, State); + +handle_method(#'queue.unbind'{queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + arguments = Arguments}, _, State) -> + binding_action(fun rabbit_binding:remove/2, + ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments, + #'queue.unbind_ok'{}, false, State); + +handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, + _, State = #ch{conn_pid = ConnPid}) -> + QueueName = qbin_to_resource(QueueNameBin, State), + check_read_permitted(QueueName, State), + {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ConnPid, + fun (Q) -> rabbit_amqqueue:purge(Q) end), + return_ok(State, NoWait, + #'queue.purge_ok'{message_count = PurgedMessageCount}); + +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + precondition_failed("cannot switch from confirm to tx mode"); + +handle_method(#'tx.select'{}, _, State = #ch{tx = none}) -> + {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}}; + +handle_method(#'tx.select'{}, _, State) -> + {reply, #'tx.select_ok'{}, State}; + +handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> + precondition_failed("channel is not transactional"); + +handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks}, + limiter = Limiter}) -> + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), + Rev = fun (X) -> lists:reverse(lists:sort(X)) end, + lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1); + ({Requeue, A}) -> reject(Requeue, Rev(A), Limiter) + end, lists:reverse(Acks)), + {noreply, maybe_complete_tx(State1#ch{tx = committing})}; + +handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> + precondition_failed("channel is not transactional"); + +handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, + tx = {_Msgs, Acks}}) -> + AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])), + UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))), + {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, + tx = new_tx()}}; + +handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) -> + precondition_failed("cannot switch from tx to confirm mode"); + +handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> + return_ok(State#ch{confirm_enabled = true}, + NoWait, #'confirm.select_ok'{}); + +handle_method(#'channel.flow'{active = true}, _, State) -> + {reply, #'channel.flow_ok'{active = true}, State}; + +handle_method(#'channel.flow'{active = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "active=false", []); + +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + drain = Drain}, + _, State = #ch{consumer_mapping = Consumers}) -> + case dict:find(CTag, Consumers) of + {ok, {Q, _CParams}} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed( + "unknown consumer tag '~s'", [CTag]) + end; + +handle_method(_MethodRecord, _Content, _State) -> + rabbit_misc:protocol_error( + command_invalid, "unimplemented method", []). + +%%---------------------------------------------------------------------------- + +%% We get the queue process to send the consume_ok on our behalf. This +%% is for symmetry with basic.cancel - see the comment in that method +%% for why. +basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, + State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_mapping = ConsumerMapping}) -> + case rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ConnPid, + fun (Q) -> + {rabbit_amqqueue:basic_consume( + Q, NoAck, self(), + rabbit_limiter:pid(Limiter), + rabbit_limiter:is_active(Limiter), + ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, + ok_msg(NoWait, #'basic.consume_ok'{ + consumer_tag = ActualConsumerTag})), + Q} + end) of + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> + CM1 = dict:store( + ActualConsumerTag, + {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, + ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), + {ok, case NoWait of + true -> consumer_monitor(ActualConsumerTag, State1); + false -> State1 + end}; + {{error, exclusive_consume_unavailable} = E, _Q} -> + E + end. + +maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q); +maybe_stat(true, _Q) -> {ok, 0, 0}. + +consumer_monitor(ConsumerTag, + State = #ch{consumer_mapping = ConsumerMapping, + queue_monitors = QMons, + queue_consumers = QCons}) -> + {#amqqueue{pid = QPid}, _CParams} = + dict:fetch(ConsumerTag, ConsumerMapping), + QCons1 = dict:update(QPid, fun (CTags) -> + gb_sets:insert(ConsumerTag, CTags) + end, + gb_sets:singleton(ConsumerTag), QCons), + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + queue_consumers = QCons1}. + +monitor_delivering_queue(NoAck, QPid, QName, + State = #ch{queue_names = QNames, + queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_names = dict:store(QPid, QName, QNames), + queue_monitors = pmon:monitor(QPid, QMons), + delivering_queues = case NoAck of + true -> DQ; + false -> sets:add_element(QPid, DQ) + end}. + +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, + mandatory = Mand}) -> + {MMsgs, Mand1} = dtree:take(QPid, Mand), + [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs], + State1 = State#ch{mandatory = Mand1}, + case rabbit_misc:is_abnormal_exit(Reason) of + true -> {MXs, UC1} = dtree:take_all(QPid, UC), + send_nacks(MXs, State1#ch{unconfirmed = UC1}); + false -> {MXs, UC1} = dtree:take(QPid, UC), + record_confirms(MXs, State1#ch{unconfirmed = UC1}) + + end. + +handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, + queue_names = QNames}) -> + ConsumerTags = case dict:find(QPid, QCons) of + error -> gb_sets:new(); + {ok, CTags} -> CTags + end, + gb_sets:fold( + fun (CTag, StateN = #ch{consumer_mapping = CMap}) -> + QName = dict:fetch(QPid, QNames), + case queue_down_consumer_action(CTag, CMap) of + remove -> + cancel_consumer(CTag, QName, StateN); + {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} -> + case catch basic_consume( %% [0] + QName, NoAck, ConsumerPrefetch, CTag, + Exclusive, Args, true, StateN) of + {ok, StateN1} -> StateN1; + _ -> cancel_consumer(CTag, QName, StateN) + end + end + end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags). + +%% [0] There is a slight danger here that if a queue is deleted and +%% then recreated again the reconsume will succeed even though it was +%% not an HA failover. But the likelihood is not great and most users +%% are unlikely to care. + +cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities, + consumer_mapping = CMap}) -> + case rabbit_misc:table_lookup( + Capabilities, <<"consumer_cancel_notify">>) of + {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, State); + _ -> ok + end, + rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag}, + {channel, self()}, + {queue, QName}]), + State#ch{consumer_mapping = dict:erase(CTag, CMap)}. + +queue_down_consumer_action(CTag, CMap) -> + {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), + case rabbit_misc:table_lookup(Args, <<"x-cancel-on-ha-failover">>) of + {bool, true} -> remove; + _ -> {recover, ConsumeSpec} + end. + +handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QPid, DQ)}. + +binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, + RoutingKey, Arguments, ReturnMethod, NoWait, + State = #ch{virtual_host = VHostPath, + conn_pid = ConnPid }) -> + DestinationName = name_to_resource(DestinationType, DestinationNameBin, State), + check_write_permitted(DestinationName, State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], + check_read_permitted(ExchangeName, State), + case Fun(#binding{source = ExchangeName, + destination = DestinationName, + key = RoutingKey, + args = Arguments}, + fun (_X, Q = #amqqueue{}) -> + try rabbit_amqqueue:check_exclusive_access(Q, ConnPid) + catch exit:Reason -> {error, Reason} + end; + (_X, #exchange{}) -> + ok + end) of + {error, {resources_missing, [{not_found, Name} | _]}} -> + rabbit_misc:not_found(Name); + {error, {resources_missing, [{absent, Q, Reason} | _]}} -> + rabbit_misc:absent(Q, Reason); + {error, binding_not_found} -> + rabbit_misc:protocol_error( + not_found, "no binding ~s between ~s and ~s", + [RoutingKey, rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(DestinationName)]); + {error, {binding_invalid, Fmt, Args}} -> + rabbit_misc:protocol_error(precondition_failed, Fmt, Args); + {error, #amqp_error{} = Error} -> + rabbit_misc:protocol_error(Error); + ok -> return_ok(State, NoWait, ReturnMethod) + end. + +basic_return(#basic_message{exchange_name = ExchangeName, + routing_keys = [RoutingKey | _CcRoutes], + content = Content}, + State = #ch{protocol = Protocol, writer_pid = WriterPid}, + Reason) -> + ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State), + {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.return'{reply_code = ReplyCode, + reply_text = ReplyText, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content). + +reject(DeliveryTag, Requeue, Multiple, + State = #ch{unacked_message_q = UAMQ, tx = Tx}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, case Tx of + none -> reject(Requeue, Acked, State1#ch.limiter), + State1; + {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks), + State1#ch{tx = {Msgs, Acks1}} + end}. + +%% NB: Acked is in youngest-first order +reject(Requeue, Acked, Limiter) -> + foreach_per_queue( + fun (QPid, MsgIds) -> + rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self()) + end, Acked), + ok = notify_limiter(Limiter, Acked). + +record_sent(ConsumerTag, AckRequired, + Msg = {QName, QPid, MsgId, Redelivered, _Message}, + State = #ch{unacked_message_q = UAMQ, + next_tag = DeliveryTag, + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName, + channel = ChannelNum}) -> + ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), + case Redelivered of + true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); + false -> ok + end, + rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), + UAMQ1 = case AckRequired of + true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, + UAMQ); + false -> UAMQ + end, + State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. + +%% NB: returns acks in youngest-first order +collect_acks(Q, 0, true) -> + {lists:reverse(queue:to_list(Q)), queue:new()}; +collect_acks(Q, DeliveryTag, Multiple) -> + collect_acks([], [], Q, DeliveryTag, Multiple). + +collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> + case queue:out(Q) of + {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, + QTail} -> + if CurrentDeliveryTag == DeliveryTag -> + {[UnackedMsg | ToAcc], + case PrefixAcc of + [] -> QTail; + _ -> queue:join( + queue:from_list(lists:reverse(PrefixAcc)), + QTail) + end}; + Multiple -> + collect_acks([UnackedMsg | ToAcc], PrefixAcc, + QTail, DeliveryTag, Multiple); + true -> + collect_acks(ToAcc, [UnackedMsg | PrefixAcc], + QTail, DeliveryTag, Multiple) + end; + {empty, _} -> + precondition_failed("unknown delivery tag ~w", [DeliveryTag]) + end. + +%% NB: Acked is in youngest-first order +ack(Acked, State = #ch{queue_names = QNames}) -> + foreach_per_queue( + fun (QPid, MsgIds) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + ?INCR_STATS(case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + [{queue_stats, QName, Count}]; + error -> [] + end, ack, State) + end, Acked), + ok = notify_limiter(State#ch.limiter, Acked). + +%% {Msgs, Acks} +%% +%% Msgs is a queue. +%% +%% Acks looks s.t. like this: +%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...] +%% +%% Each element is a pair consisting of a tag and a list of +%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true' +%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as +%% well as the list overall, are in "most-recent (generally youngest) +%% ack first" order. +new_tx() -> {queue:new(), []}. + +notify_queues(State = #ch{state = closing}) -> + {ok, State}; +notify_queues(State = #ch{consumer_mapping = Consumers, + delivering_queues = DQ }) -> + QPids = sets:to_list( + sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + {rabbit_amqqueue:notify_down_all(QPids, self(), + get(channel_operation_timeout)), + State#ch{state = closing}}. + +foreach_per_queue(_F, []) -> + ok; +foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case + F(QPid, [MsgId]); +%% NB: UAL should be in youngest-first order; the tree values will +%% then be in oldest-first order +foreach_per_queue(F, UAL) -> + T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons(QPid, MsgId, T) + end, gb_trees:empty(), UAL), + rabbit_misc:gb_trees_foreach(F, T). + +consumer_queues(Consumers) -> + lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}} + <- dict:to_list(Consumers)]). + +%% tell the limiter about the number of acks that have been received +%% for messages delivered to subscribed consumers, but not acks for +%% messages sent in a response to a basic.get (identified by their +%% 'none' consumer tag) +notify_limiter(Limiter, Acked) -> + %% optimisation: avoid the potentially expensive 'foldl' in the + %% common case. + case rabbit_limiter:is_active(Limiter) of + false -> ok; + true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of + 0 -> ok; + Count -> rabbit_limiter:ack(Limiter, Count) + end + end. + +deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, + confirm = false, + mandatory = false}, + []}, State) -> %% optimisation + ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), + State; +deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ + exchange_name = XName}, + mandatory = Mandatory, + confirm = Confirm, + msg_seq_no = MsgSeqNo}, + DelQNames}, State = #ch{queue_names = QNames, + queue_monitors = QMons}) -> + Qs = rabbit_amqqueue:lookup(DelQNames), + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), + %% The pmon:monitor_all/2 monitors all queues to which we + %% delivered. But we want to monitor even queues we didn't deliver + %% to, since we need their 'DOWN' messages to clean + %% queue_names. So we also need to monitor each QPid from + %% queues. But that only gets the masters (which is fine for + %% cleaning queue_names), so we need the union of both. + %% + %% ...and we need to add even non-delivered queues to queue_names + %% since alternative algorithms to update queue_names less + %% frequently would in fact be more expensive in the common case. + {QNames1, QMons1} = + lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, + {QNames0, QMons0}) -> + {case dict:is_key(QPid, QNames0) of + true -> QNames0; + false -> dict:store(QPid, QName, QNames0) + end, pmon:monitor(QPid, QMons0)} + end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs), + State1 = State#ch{queue_names = QNames1, + queue_monitors = QMons1}, + %% NB: the order here is important since basic.returns must be + %% sent before confirms. + State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo, + Message, State1), + State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo, + XName, State2), + ?INCR_STATS([{exchange_stats, XName, 1} | + [{queue_exchange_stats, {QName, XName}, 1} || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]], + publish, State3), + State3. + +process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> + State; +process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State, no_route), + State; +process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg, + State#ch.mandatory)}. + +process_routing_confirm(false, _, _MsgSeqNo, _XName, State) -> + State; +process_routing_confirm(true, [], MsgSeqNo, XName, State) -> + record_confirms([{MsgSeqNo, XName}], State); +process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, + State#ch.unconfirmed)}. + +send_nacks([], State) -> + State; +send_nacks(_MXs, State = #ch{state = closing, + tx = none}) -> %% optimisation + State; +send_nacks(MXs, State = #ch{tx = none}) -> + coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], + fun(MsgSeqNo, Multiple) -> + #'basic.nack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State); +send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation + State#ch{tx = failed}; +send_nacks(_, State) -> + maybe_complete_tx(State#ch{tx = failed}). + +send_confirms(State = #ch{tx = none, confirmed = []}) -> + State; +send_confirms(State = #ch{tx = none, confirmed = C}) -> + case rabbit_node_monitor:pause_partition_guard() of + ok -> MsgSeqNos = + lists:foldl( + fun ({MsgSeqNo, XName}, MSNs) -> + ?INCR_STATS([{exchange_stats, XName, 1}], + confirm, State), + [MsgSeqNo | MSNs] + end, [], lists:append(C)), + send_confirms(MsgSeqNos, State#ch{confirmed = []}); + pausing -> State + end; +send_confirms(State) -> + case rabbit_node_monitor:pause_partition_guard() of + ok -> maybe_complete_tx(State); + pausing -> State + end. + +send_confirms([], State) -> + State; +send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation + State; +send_confirms([MsgSeqNo], State) -> + ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), + State; +send_confirms(Cs, State) -> + coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> + #'basic.ack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). + +coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> + SMsgSeqNos = lists:usort(MsgSeqNos), + CutOff = case dtree:is_empty(UC) of + true -> lists:last(SMsgSeqNos) + 1; + false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo + end, + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), + case Ms of + [] -> ok; + _ -> ok = send(MkMsgFun(lists:last(Ms), true), State) + end, + [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss], + State. + +ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L]; +ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks]. + +ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]). + +maybe_complete_tx(State = #ch{tx = {_, _}}) -> + State; +maybe_complete_tx(State = #ch{unconfirmed = UC}) -> + case dtree:is_empty(UC) of + false -> State; + true -> complete_tx(State#ch{confirmed = []}) + end. + +complete_tx(State = #ch{tx = committing}) -> + ok = send(#'tx.commit_ok'{}, State), + State#ch{tx = new_tx()}; +complete_tx(State = #ch{tx = failed}) -> + {noreply, State1} = handle_exception( + rabbit_misc:amqp_error( + precondition_failed, "partial tx completion", [], + 'tx.commit'), + State), + State1#ch{tx = new_tx()}. + +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _) -> self(); +i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; +i(number, #ch{channel = Channel}) -> Channel; +i(user, #ch{user = User}) -> User#user.username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{tx = Tx}) -> Tx =/= none; +i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(name, State) -> name(State); +i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); +i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); +i(messages_uncommitted, #ch{}) -> 0; +i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); +i(acks_uncommitted, #ch{}) -> 0; +i(state, #ch{state = running}) -> credit_flow:state(); +i(state, #ch{state = State}) -> State; +i(prefetch_count, #ch{consumer_prefetch = C}) -> C; +i(global_prefetch_count, #ch{limiter = Limiter}) -> + rabbit_limiter:get_prefetch_limit(Limiter); +i(garbage_collection, _State) -> + rabbit_misc:get_gc_info(self()); +i(reductions, _State) -> + {reductions, Reductions} = erlang:process_info(self(), reductions), + Reductions; +i(Item, _) -> + throw({bad_argument, Item}). + +name(#ch{conn_name = ConnName, channel = Channel}) -> + list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). + +incr_stats(Incs, Measure) -> + [begin + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), + %% Keys in the process dictionary are used to clean up the core metrics + put({Type, Key}, none) + end || {Type, Key, Inc} <- Incs]. + +emit_stats(State) -> emit_stats(State, []). + +emit_stats(State, Extra) -> + [{reductions, Red} | Coarse0] = infos(?STATISTICS_KEYS, State), + rabbit_core_metrics:channel_stats(self(), Extra ++ Coarse0), + rabbit_core_metrics:channel_stats(reductions, self(), Red). + +erase_queue_stats(QName) -> + rabbit_core_metrics:channel_queue_down({self(), QName}), + erase({queue_stats, QName}), + [begin + rabbit_core_metrics:channel_queue_exchange_down({self(), QX}), + erase({queue_exchange_stats, QX}) + end || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), + QName0 =:= QName]. + +get_vhost(#ch{virtual_host = VHost}) -> VHost. + +get_user(#ch{user = User}) -> User. + +delete_stats({queue_stats, QName}) -> + rabbit_core_metrics:channel_queue_down({self(), QName}); +delete_stats({exchange_stats, XName}) -> + rabbit_core_metrics:channel_exchange_down({self(), XName}); +delete_stats({queue_exchange_stats, QX}) -> + rabbit_core_metrics:channel_queue_exchange_down({self(), QX}); +delete_stats(_) -> + ok. diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl new file mode 100644 index 0000000000..b237a5f691 --- /dev/null +++ b/src/rabbit_channel_interceptor.erl @@ -0,0 +1,104 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_channel_interceptor). + +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-export([init/1, intercept_in/3]). + +-type(method_name() :: rabbit_framing:amqp_method_name()). +-type(original_method() :: rabbit_framing:amqp_method_record()). +-type(processed_method() :: rabbit_framing:amqp_method_record()). +-type(original_content() :: rabbit_types:maybe(rabbit_types:content())). +-type(processed_content() :: rabbit_types:maybe(rabbit_types:content())). +-type(interceptor_state() :: term()). + +-callback description() -> [proplists:property()]. +%% Derive some initial state from the channel. This will be passed back +%% as the third argument of intercept/3. +-callback init(rabbit_channel:channel()) -> interceptor_state(). +-callback intercept(original_method(), original_content(), + interceptor_state()) -> + {processed_method(), processed_content()} | + rabbit_misc:channel_or_connection_exit(). +-callback applies_to() -> list(method_name()). + +init(Ch) -> + Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)], + check_no_overlap(Mods), + [{Mod, Mod:init(Ch)} || Mod <- Mods]. + +check_no_overlap(Mods) -> + check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]). + +%% Check no non-empty pairwise intersection in a list of sets +check_no_overlap1(Sets) -> + lists:foldl(fun(Set, Union) -> + Is = sets:intersection(Set, Union), + case sets:size(Is) of + 0 -> ok; + _ -> + internal_error("Interceptor: more than one " + "module handles ~p~n", [Is]) + end, + sets:union(Set, Union) + end, + sets:new(), + Sets), + ok. + +intercept_in(M, C, Mods) -> + lists:foldl(fun({Mod, ModState}, {M1, C1}) -> + call_module(Mod, ModState, M1, C1) + end, + {M, C}, + Mods). + +call_module(Mod, St, M, C) -> + % this little dance is because Mod might be unloaded at any point + case (catch {ok, Mod:intercept(M, C, St)}) of + {ok, R} -> validate_response(Mod, M, C, R); + {'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C} + end. + +validate_response(Mod, M1, C1, R = {M2, C2}) -> + case {validate_method(M1, M2), validate_content(C1, C2)} of + {true, true} -> R; + {false, _} -> + internal_error("Interceptor: ~p expected to return " + "method: ~p but returned: ~p", + [Mod, rabbit_misc:method_record_type(M1), + rabbit_misc:method_record_type(M2)]); + {_, false} -> + internal_error("Interceptor: ~p expected to return " + "content iff content is provided but " + "content in = ~p; content out = ~p", + [Mod, C1, C2]) + end. + +validate_method(M, M2) -> + rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2). + +validate_content(none, none) -> true; +validate_content(#content{}, #content{}) -> true; +validate_content(_, _) -> false. + +%% keep dialyzer happy +-spec internal_error(string(), [any()]) -> no_return(). +internal_error(Format, Args) -> + rabbit_misc:protocol_error(internal_error, Format, Args). diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl new file mode 100644 index 0000000000..4a545dfdbf --- /dev/null +++ b/src/rabbit_exchange_decorator.erl @@ -0,0 +1,113 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_exchange_decorator). + +-include("rabbit.hrl"). + +-export([select/2, set/1, register/2, unregister/1]). + +%% This is like an exchange type except that: +%% +%% 1) It applies to all exchanges as soon as it is installed, therefore +%% 2) It is not allowed to affect validation, so no validate/1 or +%% assert_args_equivalence/2 +%% +%% It's possible in the future we might make decorators +%% able to manipulate messages as they are published. + +-type(tx() :: 'transaction' | 'none'). +-type(serial() :: pos_integer() | tx()). + +-callback description() -> [proplists:property()]. + +%% Should Rabbit ensure that all binding events that are +%% delivered to an individual exchange can be serialised? (they +%% might still be delivered out of order, but there'll be a +%% serial number). +-callback serialise_events(rabbit_types:exchange()) -> boolean(). + +%% called after declaration and recovery +-callback create(tx(), rabbit_types:exchange()) -> 'ok'. + +%% called after exchange (auto)deletion. +-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> + 'ok'. + +%% called when the policy attached to this exchange changes. +-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) -> + 'ok'. + +%% called after a binding has been added or recovered +-callback add_binding(serial(), rabbit_types:exchange(), + rabbit_types:binding()) -> 'ok'. + +%% called after bindings have been deleted. +-callback remove_bindings(serial(), rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'. + +%% Allows additional destinations to be added to the routing decision. +-callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> + [rabbit_amqqueue:name() | rabbit_exchange:name()]. + +%% Whether the decorator wishes to receive callbacks for the exchange +%% none:no callbacks, noroute:all callbacks except route, all:all callbacks +-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'. + +%%---------------------------------------------------------------------------- + +%% select a subset of active decorators +select(all, {Route, NoRoute}) -> filter(Route ++ NoRoute); +select(route, {Route, _NoRoute}) -> filter(Route); +select(raw, {Route, NoRoute}) -> Route ++ NoRoute. + +filter(Modules) -> + [M || M <- Modules, code:which(M) =/= non_existing]. + +set(X) -> + Decs = lists:foldl(fun (D, {Route, NoRoute}) -> + ActiveFor = D:active_for(X), + {cons_if_eq(all, ActiveFor, D, Route), + cons_if_eq(noroute, ActiveFor, D, NoRoute)} + end, {[], []}, list()), + X#exchange{decorators = Decs}. + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. + +cons_if_eq(Select, Select, Item, List) -> [Item | List]; +cons_if_eq(_Select, _Other, _Item, List) -> List. + +register(TypeName, ModuleName) -> + rabbit_registry:register(exchange_decorator, TypeName, ModuleName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(exchange_decorator, TypeName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +maybe_recover(X = #exchange{name = Name, + decorators = Decs}) -> + #exchange{decorators = Decs1} = set(X), + Old = lists:sort(select(all, Decs)), + New = lists:sort(select(all, Decs1)), + case New of + Old -> ok; + _ -> %% TODO create a tx here for non-federation decorators + [M:create(none, X) || M <- New -- Old], + rabbit_exchange:update_decorators(Name) + end. diff --git a/src/rabbit_health_check.erl b/src/rabbit_health_check.erl new file mode 100644 index 0000000000..d8d4cc240b --- /dev/null +++ b/src/rabbit_health_check.erl @@ -0,0 +1,95 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% +-module(rabbit_health_check). + +%% External API +-export([node/1, node/2]). + +%% Internal API +-export([local/0]). + +-spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}. +-spec local() -> ok | {error_string, string()}. + +%%---------------------------------------------------------------------------- +%% External functions +%%---------------------------------------------------------------------------- + +node(Node) -> + %% same default as in CLI + node(Node, 70000). +node(Node, Timeout) -> + rabbit_misc:rpc_call(Node, rabbit_health_check, local, [], Timeout). + +local() -> + run_checks([list_channels, list_queues, alarms, rabbit_node_monitor]). + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- +run_checks([]) -> + ok; +run_checks([C|Cs]) -> + case node_health_check(C) of + ok -> + run_checks(Cs); + Error -> + Error + end. + +node_health_check(list_channels) -> + case rabbit_channel:info_local([pid]) of + L when is_list(L) -> + ok; + Other -> + ErrorMsg = io_lib:format("list_channels unexpected output: ~p", + [Other]), + {error_string, ErrorMsg} + end; + +node_health_check(list_queues) -> + health_check_queues(rabbit_vhost:list()); + +node_health_check(rabbit_node_monitor) -> + case rabbit_node_monitor:partitions() of + L when is_list(L) -> + ok; + Other -> + ErrorMsg = io_lib:format("rabbit_node_monitor reports unexpected partitions value: ~p", + [Other]), + {error_string, ErrorMsg} + end; + +node_health_check(alarms) -> + case proplists:get_value(alarms, rabbit:status()) of + [] -> + ok; + Alarms -> + ErrorMsg = io_lib:format("resource alarm(s) in effect:~p", [Alarms]), + {error_string, ErrorMsg} + end. + +health_check_queues([]) -> + ok; +health_check_queues([VHost|RestVHosts]) -> + case rabbit_amqqueue:info_local(VHost) of + L when is_list(L) -> + health_check_queues(RestVHosts); + Other -> + ErrorMsg = io_lib:format("list_queues unexpected output for vhost ~s: ~p", + [VHost, Other]), + {error_string, ErrorMsg} + end. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl new file mode 100644 index 0000000000..cb7f181121 --- /dev/null +++ b/src/rabbit_nodes.erl @@ -0,0 +1,230 @@ +%% The contents of this file are subject to the Mozilla Public License + +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_nodes). + +-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, + is_running/2, is_process_running/2, + cluster_name/0, set_cluster_name/1, ensure_epmd/0, + all_running/0]). + +-include_lib("kernel/include/inet.hrl"). + +-define(EPMD_TIMEOUT, 30000). +-define(TCP_DIAGNOSTIC_TIMEOUT, 5000). +-define(ERROR_LOGGER_HANDLER, rabbit_error_logger_handler). + +%%---------------------------------------------------------------------------- +%% Specs +%%---------------------------------------------------------------------------- + +-spec names(string()) -> + rabbit_types:ok_or_error2([{string(), integer()}], term()). +-spec diagnostics([node()]) -> string(). +-spec cookie_hash() -> string(). +-spec is_running(node(), atom()) -> boolean(). +-spec is_process_running(node(), atom()) -> boolean(). +-spec cluster_name() -> binary(). +-spec set_cluster_name(binary()) -> 'ok'. +-spec ensure_epmd() -> 'ok'. +-spec all_running() -> [node()]. + +%%---------------------------------------------------------------------------- + +names(Hostname) -> + Self = self(), + Ref = make_ref(), + {Pid, MRef} = spawn_monitor( + fun () -> Self ! {Ref, net_adm:names(Hostname)} end), + timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), + receive + {Ref, Names} -> erlang:demonitor(MRef, [flush]), + Names; + {'DOWN', MRef, process, Pid, Reason} -> {error, Reason} + end. + +diagnostics(Nodes) -> + verbose_erlang_distribution(true), + NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n" + "attempted to contact: ~p~n", [Nodes]}] ++ + [diagnostics_node(Node) || Node <- Nodes] ++ + current_node_details(), + verbose_erlang_distribution(false), + rabbit_misc:format_many(lists:flatten(NodeDiags)). + +verbose_erlang_distribution(true) -> + net_kernel:verbose(1), + error_logger:add_report_handler(?ERROR_LOGGER_HANDLER); +verbose_erlang_distribution(false) -> + net_kernel:verbose(0), + error_logger:delete_report_handler(?ERROR_LOGGER_HANDLER). + +current_node_details() -> + [{"~ncurrent node details:~n- node name: ~w", [node()]}, + case init:get_argument(home) of + {ok, [[Home]]} -> {"- home dir: ~s", [Home]}; + Other -> {"- no home dir: ~p", [Other]} + end, + {"- cookie hash: ~s", [cookie_hash()]}]. + +diagnostics_node(Node) -> + {Name, Host} = parts(Node), + [{"~s:", [Node]} | + case names(Host) of + {error, Reason} -> + [{" * unable to connect to epmd (port ~s) on ~s: ~s~n", + [epmd_port(), Host, rabbit_misc:format_inet_error(Reason)]}]; + {ok, NamePorts} -> + [{" * connected to epmd (port ~s) on ~s", + [epmd_port(), Host]}] ++ + case net_adm:ping(Node) of + pong -> dist_working_diagnostics(Node); + pang -> dist_broken_diagnostics(Name, Host, NamePorts) + end + end]. + +epmd_port() -> + case init:get_argument(epmd_port) of + {ok, [[Port | _] | _]} when is_list(Port) -> Port; + error -> "4369" + end. + +dist_working_diagnostics(Node) -> + case rabbit:is_running(Node) of + true -> [{" * node ~s up, 'rabbit' application running", [Node]}]; + false -> [{" * node ~s up, 'rabbit' application not running~n" + " * running applications on ~s: ~p~n" + " * suggestion: start_app on ~s", + [Node, Node, remote_apps(Node), Node]}] + end. + +remote_apps(Node) -> + %% We want a timeout here because really, we don't trust the node, + %% the last thing we want to do is hang. + case rpc:call(Node, application, which_applications, [5000]) of + {badrpc, _} = E -> E; + Apps -> [App || {App, _, _} <- Apps] + end. + +dist_broken_diagnostics(Name, Host, NamePorts) -> + case [{N, P} || {N, P} <- NamePorts, N =:= Name] of + [] -> + {SelfName, SelfHost} = parts(node()), + Others = [list_to_atom(N) || {N, _} <- NamePorts, + N =/= case SelfHost of + Host -> SelfName; + _ -> never_matches + end], + OthersDiag = case Others of + [] -> [{" no other nodes on ~s", + [Host]}]; + _ -> [{" other nodes on ~s: ~p", + [Host, Others]}] + end, + [{" * epmd reports: node '~s' not running at all", [Name]}, + OthersDiag, {" * suggestion: start the node", []}]; + [{Name, Port}] -> + [{" * epmd reports node '~s' running on port ~b", [Name, Port]} | + case diagnose_connect(Host, Port) of + ok -> + connection_succeeded_diagnostics(); + {error, Reason} -> + [{" * can't establish TCP connection, reason: ~s~n" + " * suggestion: blocked by firewall?", + [rabbit_misc:format_inet_error(Reason)]}] + end] + end. + +connection_succeeded_diagnostics() -> + case gen_event:call(error_logger, ?ERROR_LOGGER_HANDLER, get_connection_report) of + [] -> + [{" * TCP connection succeeded but Erlang distribution " + "failed~n" + " * suggestion: hostname mismatch?~n" + " * suggestion: is the cookie set correctly?~n" + " * suggestion: is the Erlang distribution using TLS?", []}]; + Report -> + [{" * TCP connection succeeded but Erlang distribution " + "failed~n", []}] + ++ Report + end. + +diagnose_connect(Host, Port) -> + case inet:gethostbyname(Host) of + {ok, #hostent{h_addrtype = Family}} -> + case gen_tcp:connect(Host, Port, [Family], + ?TCP_DIAGNOSTIC_TIMEOUT) of + {ok, Socket} -> gen_tcp:close(Socket), + ok; + {error, _} = E -> E + end; + {error, _} = E -> + E + end. + +make(NodeStr) -> + rabbit_nodes_common:make(NodeStr). + +parts(NodeStr) -> + rabbit_nodes_common:parts(NodeStr). + +cookie_hash() -> + base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). + +is_running(Node, Application) -> + case rpc:call(Node, rabbit_misc, which_applications, []) of + {badrpc, _} -> false; + Apps -> proplists:is_defined(Application, Apps) + end. + +is_process_running(Node, Process) -> + case rpc:call(Node, erlang, whereis, [Process]) of + {badrpc, _} -> false; + undefined -> false; + P when is_pid(P) -> true + end. + +cluster_name() -> + rabbit_runtime_parameters:value_global( + cluster_name, cluster_name_default()). + +cluster_name_default() -> + {ID, _} = rabbit_nodes:parts(node()), + {ok, Host} = inet:gethostname(), + {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), + list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). + +set_cluster_name(Name) -> + rabbit_runtime_parameters:set_global(cluster_name, Name). + +ensure_epmd() -> + {ok, Prog} = init:get_argument(progname), + ID = rabbit_misc:random(1000000000), + Port = open_port( + {spawn_executable, os:find_executable(Prog)}, + [{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]), + "-noshell", "-eval", "halt()."]}, + exit_status, stderr_to_stdout, use_stdio]), + port_shutdown_loop(Port). + +port_shutdown_loop(Port) -> + receive + {Port, {exit_status, _Rc}} -> ok; + {Port, _} -> port_shutdown_loop(Port) + end. + +all_running() -> rabbit_mnesia:cluster_nodes(running). diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl new file mode 100644 index 0000000000..6329b0bdaa --- /dev/null +++ b/src/rabbit_queue_collector.erl @@ -0,0 +1,89 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_collector). + +%% Queue collector keeps track of exclusive queues and cleans them +%% up e.g. when their connection is closed. + +-behaviour(gen_server). + +-export([start_link/1, register/2, delete_all/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {monitors, delete_from}). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-spec start_link(rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error(). +-spec register(pid(), pid()) -> 'ok'. + +%%---------------------------------------------------------------------------- + +start_link(ProcName) -> + gen_server:start_link(?MODULE, [ProcName], []). + +register(CollectorPid, Q) -> + gen_server:call(CollectorPid, {register, Q}, infinity). + +delete_all(CollectorPid) -> + rabbit_queue_collector_common:delete_all(CollectorPid). + +%%---------------------------------------------------------------------------- + +init([ProcName]) -> + ?store_proc_name(ProcName), + {ok, #state{monitors = pmon:new(), delete_from = undefined}}. + +%%-------------------------------------------------------------------------- + +handle_call({register, QPid}, _From, + State = #state{monitors = QMons, delete_from = Deleting}) -> + case Deleting of + undefined -> ok; + _ -> ok = rabbit_amqqueue:delete_immediately([QPid]) + end, + {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}}; + +handle_call(delete_all, From, State = #state{monitors = QMons, + delete_from = undefined}) -> + case pmon:monitored(QMons) of + [] -> {reply, ok, State#state{delete_from = From}}; + QPids -> ok = rabbit_amqqueue:delete_immediately(QPids), + {noreply, State#state{delete_from = From}} + end. + +handle_cast(Msg, State) -> + {stop, {unhandled_cast, Msg}, State}. + +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, + State = #state{monitors = QMons, delete_from = Deleting}) -> + QMons1 = pmon:erase(DownPid, QMons), + case Deleting =/= undefined andalso pmon:is_empty(QMons1) of + true -> gen_server:reply(Deleting, ok); + false -> ok + end, + {noreply, State#state{monitors = QMons1}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl new file mode 100644 index 0000000000..fa0bafa4c3 --- /dev/null +++ b/src/rabbit_queue_decorator.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_queue_decorator). + +-include("rabbit.hrl"). + +-export([select/1, set/1, register/2, unregister/1]). + +%%---------------------------------------------------------------------------- + +-callback startup(rabbit_types:amqqueue()) -> 'ok'. + +-callback shutdown(rabbit_types:amqqueue()) -> 'ok'. + +-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> + 'ok'. + +-callback active_for(rabbit_types:amqqueue()) -> boolean(). + +%% called with Queue, MaxActivePriority, IsEmpty +-callback consumer_state_changed( + rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'. + +%%---------------------------------------------------------------------------- + +select(Modules) -> + [M || M <- Modules, code:which(M) =/= non_existing]. + +set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. + +list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. + +register(TypeName, ModuleName) -> + rabbit_registry:register(queue_decorator, TypeName, ModuleName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(queue_decorator, TypeName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +maybe_recover(Q = #amqqueue{name = Name, + decorators = Decs}) -> + #amqqueue{decorators = Decs1} = set(Q), + Old = lists:sort(select(Decs)), + New = lists:sort(select(Decs1)), + case New of + Old -> ok; + _ -> [M:startup(Q) || M <- New -- Old], + rabbit_amqqueue:update_decorators(Name) + end. |
