summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2017-06-02 18:54:42 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2017-06-26 13:30:46 +0200
commit9cf0cfea9eb087b9df1c0e6ab636601be6c9a7cb (patch)
tree99cb308eb182b0e2ce31692bbed5c85d4d29e105 /src
parent3ad84520cb23e4bf05662e43384870de529d21a6 (diff)
downloadrabbitmq-server-git-9cf0cfea9eb087b9df1c0e6ab636601be6c9a7cb.tar.gz
Move modules which don't belong to rabbitmq-common here
Those modules depend on rabbitmq-server but they are used neither by the Erlang client nor by rabbitmq-common itself. They are imported as-is. Some may need further changes after the import. This resolves the dependency of rabbitmq-common on rabbitmq-server. [#118490793]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl1010
-rw-r--r--src/rabbit_auth_backend_internal.erl421
-rw-r--r--src/rabbit_basic.erl302
-rw-r--r--src/rabbit_channel.erl2034
-rw-r--r--src/rabbit_channel_interceptor.erl104
-rw-r--r--src/rabbit_exchange_decorator.erl113
-rw-r--r--src/rabbit_health_check.erl95
-rw-r--r--src/rabbit_nodes.erl230
-rw-r--r--src/rabbit_queue_collector.erl89
-rw-r--r--src/rabbit_queue_decorator.erl66
10 files changed, 4464 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
new file mode 100644
index 0000000000..3eaf7613ee
--- /dev/null
+++ b/src/rabbit_amqqueue.erl
@@ -0,0 +1,1010 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_amqqueue).
+
+-export([recover/0, stop/0, start/1, declare/5, declare/6,
+ delete_immediately/1, delete/3, purge/1, forget_all_durable/1,
+ delete_crashed/1, delete_crashed_internal/1]).
+-export([pseudo_queue/2, immutable/1]).
+-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
+ assert_equivalence/5,
+ check_exclusive_access/2, with_exclusive_access_or_die/3,
+ stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
+ info_all/5, info_local/1, list_names/0]).
+-export([list_down/1]).
+-export([force_event_refresh/1, notify_policy_changed/1]).
+-export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
+-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]).
+-export([on_node_up/1, on_node_down/1]).
+-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
+-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1, is_mirrored/1]).
+
+-export([pid_of/1, pid_of/2]).
+
+%% internal
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
+ set_ram_duration_target/2, set_maximum_since_use/2]).
+
+-include("rabbit.hrl").
+-include_lib("stdlib/include/qlc.hrl").
+
+-define(INTEGER_ARG_TYPES, [byte, short, signedint, long,
+ unsignedbyte, unsignedshort, unsignedint]).
+
+-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+
+%%----------------------------------------------------------------------------
+
+-export_type([name/0, qmsg/0, absent_reason/0]).
+
+-type name() :: rabbit_types:r('queue').
+-type qpids() :: [pid()].
+-type qlen() :: rabbit_types:ok(non_neg_integer()).
+-type qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return()).
+-type qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}.
+-type msg_id() :: non_neg_integer().
+-type ok_or_errors() ::
+ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}.
+-type absent_reason() :: 'nodedown' | 'crashed'.
+-type queue_or_absent() :: rabbit_types:amqqueue() |
+ {'absent', rabbit_types:amqqueue(),absent_reason()}.
+-type not_found_or_absent() ::
+ 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}.
+-spec recover() -> [rabbit_types:amqqueue()].
+-spec stop() -> 'ok'.
+-spec start([rabbit_types:amqqueue()]) -> 'ok'.
+-spec declare
+ (name(), boolean(), boolean(), rabbit_framing:amqp_table(),
+ rabbit_types:maybe(pid())) ->
+ {'new' | 'existing' | 'absent' | 'owner_died',
+ rabbit_types:amqqueue()} |
+ rabbit_types:channel_exit().
+-spec declare
+ (name(), boolean(), boolean(), rabbit_framing:amqp_table(),
+ rabbit_types:maybe(pid()), node()) ->
+ {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} |
+ {'absent', rabbit_types:amqqueue(), absent_reason()} |
+ rabbit_types:channel_exit().
+-spec internal_declare(rabbit_types:amqqueue(), boolean()) ->
+ queue_or_absent() | rabbit_misc:thunk(queue_or_absent()).
+-spec update
+ (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) ->
+ 'not_found' | rabbit_types:amqqueue().
+-spec lookup
+ (name()) ->
+ rabbit_types:ok(rabbit_types:amqqueue()) |
+ rabbit_types:error('not_found');
+ ([name()]) ->
+ [rabbit_types:amqqueue()].
+-spec not_found_or_absent(name()) -> not_found_or_absent().
+-spec with(name(), qfun(A)) ->
+ A | rabbit_types:error(not_found_or_absent()).
+-spec with(name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B.
+-spec with_or_die(name(), qfun(A)) -> A | rabbit_types:channel_exit().
+-spec assert_equivalence
+ (rabbit_types:amqqueue(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) ->
+ 'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit().
+-spec check_exclusive_access(rabbit_types:amqqueue(), pid()) ->
+ 'ok' | rabbit_types:channel_exit().
+-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
+ A | rabbit_types:channel_exit().
+-spec list() -> [rabbit_types:amqqueue()].
+-spec list(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
+-spec list_names() -> [rabbit_amqqueue:name()].
+-spec list_down(rabbit_types:vhost()) -> [rabbit_types:amqqueue()].
+-spec info_keys() -> rabbit_types:info_keys().
+-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
+-spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
+-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
+ [rabbit_types:infos()].
+-type info_all_filter() :: 'all' | 'online' | 'offline' | 'local'.
+-spec info_all
+ (rabbit_types:vhost(), rabbit_types:info_keys(), info_all_filter(),
+ reference(), pid()) ->
+ 'ok'.
+-spec force_event_refresh(reference()) -> 'ok'.
+-spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'.
+-spec consumers(rabbit_types:amqqueue()) ->
+ [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(),
+ rabbit_framing:amqp_table()}].
+-spec consumer_info_keys() -> rabbit_types:info_keys().
+-spec consumers_all(rabbit_types:vhost()) ->
+ [{name(), pid(), rabbit_types:ctag(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table()}].
+-spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'.
+-spec stat(rabbit_types:amqqueue()) ->
+ {'ok', non_neg_integer(), non_neg_integer()}.
+-spec delete_immediately(qpids()) -> 'ok'.
+-spec delete
+ (rabbit_types:amqqueue(), 'false', 'false') ->
+ qlen();
+ (rabbit_types:amqqueue(), 'true' , 'false') ->
+ qlen() | rabbit_types:error('in_use');
+ (rabbit_types:amqqueue(), 'false', 'true' ) ->
+ qlen() | rabbit_types:error('not_empty');
+ (rabbit_types:amqqueue(), 'true' , 'true' ) ->
+ qlen() |
+ rabbit_types:error('in_use') |
+ rabbit_types:error('not_empty').
+-spec delete_crashed(rabbit_types:amqqueue()) -> 'ok'.
+-spec delete_crashed_internal(rabbit_types:amqqueue()) -> 'ok'.
+-spec purge(rabbit_types:amqqueue()) -> qlen().
+-spec forget_all_durable(node()) -> 'ok'.
+-spec deliver([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ qpids().
+-spec requeue(pid(), [msg_id()], pid()) -> 'ok'.
+-spec ack(pid(), [msg_id()], pid()) -> 'ok'.
+-spec reject(pid(), [msg_id()], boolean(), pid()) -> 'ok'.
+-spec notify_down_all(qpids(), pid()) -> ok_or_errors().
+-spec notify_down_all(qpids(), pid(), non_neg_integer()) ->
+ ok_or_errors().
+-spec activate_limit_all(qpids(), pid()) -> ok_or_errors().
+-spec basic_get(rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
+ {'ok', non_neg_integer(), qmsg()} | 'empty'.
+-spec credit
+ (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(),
+ boolean()) ->
+ 'ok'.
+-spec basic_consume
+ (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
+ non_neg_integer(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table(), any()) ->
+ rabbit_types:ok_or_error('exclusive_consume_unavailable').
+-spec basic_cancel
+ (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'.
+-spec notify_decorators(rabbit_types:amqqueue()) -> 'ok'.
+-spec resume(pid(), pid()) -> 'ok'.
+-spec internal_delete(name()) ->
+ rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() ->
+ rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit()).
+-spec run_backing_queue
+ (pid(), atom(), (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) ->
+ 'ok'.
+-spec set_ram_duration_target(pid(), number() | 'infinity') -> 'ok'.
+-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
+-spec on_node_up(node()) -> 'ok'.
+-spec on_node_down(node()) -> 'ok'.
+-spec pseudo_queue(name(), pid()) -> rabbit_types:amqqueue().
+-spec immutable(rabbit_types:amqqueue()) -> rabbit_types:amqqueue().
+-spec store_queue(rabbit_types:amqqueue()) -> 'ok'.
+-spec update_decorators(name()) -> 'ok'.
+-spec policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
+ 'ok'.
+-spec update_mirroring(pid()) -> 'ok'.
+-spec sync_mirrors(rabbit_types:amqqueue() | pid()) ->
+ 'ok' | rabbit_types:error('not_mirrored').
+-spec cancel_sync_mirrors(rabbit_types:amqqueue() | pid()) ->
+ 'ok' | {'ok', 'not_syncing'}.
+-spec is_mirrored(rabbit_types:amqqueue()) -> boolean().
+
+-spec pid_of(rabbit_types:amqqueue()) ->
+ {'ok', pid()} | rabbit_types:error('not_found').
+-spec pid_of(rabbit_types:vhost(), rabbit_misc:resource_name()) ->
+ {'ok', pid()} | rabbit_types:error('not_found').
+
+%%----------------------------------------------------------------------------
+
+-define(CONSUMER_INFO_KEYS,
+ [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
+ arguments]).
+
+recover() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ on_node_down(node()),
+ DurableQueues = find_durable_queues(),
+ L = length(DurableQueues),
+
+ %% if there are not enough file handles, the server might hang
+ %% when trying to recover queues, warn the user:
+ case file_handle_cache:get_limit() < L of
+ true ->
+ rabbit_log:warning(
+ "Recovering ~p queues, available file handles: ~p. Please increase max open file handles limit to at least ~p!~n",
+ [L, file_handle_cache:get_limit(), L]);
+ false ->
+ ok
+ end,
+
+ {ok, BQ} = application:get_env(rabbit, backing_queue_module),
+
+ %% We rely on BQ:start/1 returning the recovery terms in the same
+ %% order as the supplied queue names, so that we can zip them together
+ %% for further processing in recover_durable_queues.
+ {ok, OrderedRecoveryTerms} =
+ BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
+ {ok,_} = supervisor:start_child(
+ rabbit_sup,
+ {rabbit_amqqueue_sup_sup,
+ {rabbit_amqqueue_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}),
+ recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
+
+stop() ->
+ ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup),
+ ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup),
+ {ok, BQ} = application:get_env(rabbit, backing_queue_module),
+ ok = BQ:stop().
+
+start(Qs) ->
+ %% At this point all recovered queues and their bindings are
+ %% visible to routing, so now it is safe for them to complete
+ %% their initialisation (which may involve interacting with other
+ %% queues).
+ [Pid ! {self(), go} || #amqqueue{pid = Pid} <- Qs],
+ ok.
+
+find_durable_queues() ->
+ Node = node(),
+ mnesia:async_dirty(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{name = Name,
+ pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node andalso
+ %% Terminations on node down will not remove the rabbit_queue
+ %% record if it is a mirrored queue (such info is now obtained from
+ %% the policy). Thus, we must check if the local pid is alive
+ %% - if the record is present - in order to restart.
+ (mnesia:read(rabbit_queue, Name, read) =:= []
+ orelse not erlang:is_process_alive(Pid))]))
+ end).
+
+recover_durable_queues(QueuesAndRecoveryTerms) ->
+ {Results, Failures} =
+ gen_server2:mcall(
+ [{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q, recovery),
+ {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]),
+ [rabbit_log:error("Queue ~p failed to initialise: ~p~n",
+ [Pid, Error]) || {Pid, Error} <- Failures],
+ [Q || {_, {new, Q}} <- Results].
+
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
+ declare(QueueName, Durable, AutoDelete, Args, Owner, node()).
+
+
+%% The Node argument suggests where the queue (master if mirrored)
+%% should be. Note that in some cases (e.g. with "nodes" policy in
+%% effect) this might not be possible to satisfy.
+declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
+ ok = check_declare_arguments(QueueName, Args),
+ Q = rabbit_queue_decorator:set(
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ recoverable_slaves = [],
+ gm_pids = [],
+ state = live,
+ policy_version = 0 })),
+
+ Node1 = case rabbit_queue_master_location_misc:get_location(Q) of
+ {ok, Node0} -> Node0;
+ {error, _} -> Node
+ end,
+
+ Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1),
+ gen_server2:call(
+ rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
+ {init, new}, infinity).
+
+internal_declare(Q, true) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () ->
+ ok = store_queue(Q#amqqueue{state = live}),
+ rabbit_misc:const(Q)
+ end);
+internal_declare(Q = #amqqueue{name = QueueName}, false) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] ->
+ case not_found_or_absent(QueueName) of
+ not_found -> Q1 = rabbit_policy:set(Q),
+ Q2 = Q1#amqqueue{state = live},
+ ok = store_queue(Q2),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
+ {absent, _Q, _} = R -> rabbit_misc:const(R)
+ end;
+ [ExistingQ] ->
+ rabbit_misc:const(ExistingQ)
+ end
+ end).
+
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q = #amqqueue{durable = Durable}] ->
+ Q1 = Fun(Q),
+ ok = mnesia:write(rabbit_queue, Q1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
+ _ -> ok
+ end,
+ Q1;
+ [] ->
+ not_found
+ end.
+
+store_queue(Q = #amqqueue{durable = true}) ->
+ ok = mnesia:write(rabbit_durable_queue,
+ Q#amqqueue{slave_pids = [],
+ sync_slave_pids = [],
+ gm_pids = [],
+ decorators = undefined}, write),
+ store_queue_ram(Q);
+store_queue(Q = #amqqueue{durable = false}) ->
+ store_queue_ram(Q).
+
+store_queue_ram(Q) ->
+ ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write).
+
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q] -> store_queue_ram(Q),
+ ok;
+ [] -> ok
+ end
+ end).
+
+policy_changed(Q1 = #amqqueue{decorators = Decorators1},
+ Q2 = #amqqueue{decorators = Decorators2}) ->
+ rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
+ D1 = rabbit_queue_decorator:select(Decorators1),
+ D2 = rabbit_queue_decorator:select(Decorators2),
+ [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)],
+ %% Make sure we emit a stats event even if nothing
+ %% mirroring-related has changed - the policy may have changed anyway.
+ notify_policy_changed(Q1).
+
+add_default_binding(#amqqueue{name = QueueName}) ->
+ ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
+ RoutingKey = QueueName#resource.name,
+ rabbit_binding:add(#binding{source = ExchangeName,
+ destination = QueueName,
+ key = RoutingKey,
+ args = []}).
+
+lookup([]) -> []; %% optimisation
+lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
+lookup(Names) when is_list(Names) ->
+ %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+ %% expensive for reasons explained in rabbit_misc:dirty_read/1.
+ lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]);
+lookup(Name) ->
+ rabbit_misc:dirty_read({rabbit_queue, Name}).
+
+not_found_or_absent(Name) ->
+ %% NB: we assume that the caller has already performed a lookup on
+ %% rabbit_queue and not found anything
+ case mnesia:read({rabbit_durable_queue, Name}) of
+ [] -> not_found;
+ [Q] -> {absent, Q, nodedown} %% Q exists on stopped node
+ end.
+
+not_found_or_absent_dirty(Name) ->
+ %% We should read from both tables inside a tx, to get a
+ %% consistent view. But the chances of an inconsistency are small,
+ %% and only affect the error kind.
+ case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of
+ {error, not_found} -> not_found;
+ {ok, Q} -> {absent, Q, nodedown}
+ end.
+
+with(Name, F, E) ->
+ with(Name, F, E, 2000).
+
+with(Name, F, E, RetriesLeft) ->
+ case lookup(Name) of
+ {ok, Q = #amqqueue{}} when RetriesLeft =:= 0 ->
+ %% Something bad happened to that queue, we are bailing out
+ %% on processing current request.
+ E({absent, Q, timeout});
+ {ok, Q = #amqqueue{state = crashed}} ->
+ E({absent, Q, crashed});
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ %% We check is_process_alive(QPid) in case we receive a
+ %% nodedown (for example) in F() that has nothing to do
+ %% with the QPid. F() should be written s.t. that this
+ %% cannot happen, so we bail if it does since that
+ %% indicates a code bug and we don't want to get stuck in
+ %% the retry loop.
+ rabbit_misc:with_exit_handler(
+ fun () -> false = rabbit_mnesia:is_process_alive(QPid),
+ timer:sleep(30),
+ with(Name, F, E, RetriesLeft - 1)
+ end, fun () -> F(Q) end);
+ {error, not_found} ->
+ E(not_found_or_absent_dirty(Name))
+ end.
+
+with(Name, F) -> with(Name, F, fun (E) -> {error, E} end).
+
+with_or_die(Name, F) ->
+ with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name);
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
+ end).
+
+assert_equivalence(#amqqueue{name = QName,
+ durable = Durable,
+ auto_delete = AD} = Q,
+ Durable1, AD1, Args1, Owner) ->
+ rabbit_misc:assert_field_equivalence(Durable, Durable1, QName, durable),
+ rabbit_misc:assert_field_equivalence(AD, AD1, QName, auto_delete),
+ assert_args_equivalence(Q, Args1),
+ check_exclusive_access(Q, Owner, strict).
+
+check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
+
+check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+ ok;
+check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+ ok;
+check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]).
+
+with_exclusive_access_or_die(Name, ReaderPid, F) ->
+ with_or_die(Name,
+ fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
+
+assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
+ RequiredArgs) ->
+ rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
+ [Key || {Key, _Fun} <- declare_args()]).
+
+check_declare_arguments(QueueName, Args) ->
+ check_arguments(QueueName, Args, declare_args()).
+
+check_consume_arguments(QueueName, Args) ->
+ check_arguments(QueueName, Args, consume_args()).
+
+check_arguments(QueueName, Args, Validators) ->
+ [case rabbit_misc:table_lookup(Args, Key) of
+ undefined -> ok;
+ TypeVal -> case Fun(TypeVal, Args) of
+ ok -> ok;
+ {error, Error} -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "invalid arg '~s' for ~s: ~255p",
+ [Key, rabbit_misc:rs(QueueName),
+ Error])
+ end
+ end || {Key, Fun} <- Validators],
+ ok.
+
+declare_args() ->
+ [{<<"x-expires">>, fun check_expires_arg/2},
+ {<<"x-message-ttl">>, fun check_message_ttl_arg/2},
+ {<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-priority">>, fun check_non_neg_int_arg/2},
+ {<<"x-queue-mode">>, fun check_queue_mode/2}].
+
+consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
+ {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
+
+check_int_arg({Type, _}, _) ->
+ case lists:member(Type, ?INTEGER_ARG_TYPES) of
+ true -> ok;
+ false -> {error, {unacceptable_type, Type}}
+ end.
+
+check_bool_arg({bool, _}, _) -> ok;
+check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
+
+check_non_neg_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
+ end.
+
+check_expires_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val == 0 -> {error, {value_zero, Val}};
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
+ end.
+
+check_message_ttl_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
+ end.
+
+%% Note that the validity of x-dead-letter-exchange is already verified
+%% by rabbit_channel's queue.declare handler.
+check_dlxname_arg({longstr, _}, _) -> ok;
+check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
+
+check_dlxrk_arg({longstr, _}, Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
+ undefined -> {error, routing_key_but_no_dlx_defined};
+ _ -> ok
+ end;
+check_dlxrk_arg({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
+check_queue_mode({longstr, Val}, _Args) ->
+ case lists:member(Val, [<<"default">>, <<"lazy">>]) of
+ true -> ok;
+ false -> {error, invalid_queue_mode}
+ end;
+check_queue_mode({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
+list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
+
+list_names() -> mnesia:dirty_all_keys(rabbit_queue).
+
+list(VHostPath) -> list(VHostPath, rabbit_queue).
+
+%% Not dirty_match_object since that would not be transactional when used in a
+%% tx context
+list(VHostPath, TableName) ->
+ mnesia:async_dirty(
+ fun () ->
+ mnesia:match_object(
+ TableName,
+ #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'},
+ read)
+ end).
+
+list_down(VHostPath) ->
+ Present = list(VHostPath),
+ Durable = list(VHostPath, rabbit_durable_queue),
+ PresentS = sets:from_list([N || #amqqueue{name = N} <- Present]),
+ sets:to_list(sets:filter(fun (#amqqueue{name = N}) ->
+ not sets:is_element(N, PresentS)
+ end, sets:from_list(Durable))).
+
+info_keys() -> rabbit_amqqueue_process:info_keys().
+
+map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
+
+info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
+info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
+
+info(Q = #amqqueue{ state = crashed }, Items) ->
+ info_down(Q, Items, crashed);
+info(#amqqueue{ pid = QPid }, Items) ->
+ case delegate:call(QPid, {info, Items}) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+info_down(Q, DownReason) ->
+ info_down(Q, rabbit_amqqueue_process:info_keys(), DownReason).
+
+info_down(Q, Items, DownReason) ->
+ [{Item, i_down(Item, Q, DownReason)} || Item <- Items].
+
+i_down(name, #amqqueue{name = Name}, _) -> Name;
+i_down(durable, #amqqueue{durable = Dur}, _) -> Dur;
+i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD;
+i_down(arguments, #amqqueue{arguments = Args}, _) -> Args;
+i_down(pid, #amqqueue{pid = QPid}, _) -> QPid;
+i_down(recoverable_slaves, #amqqueue{recoverable_slaves = RS}, _) -> RS;
+i_down(state, _Q, DownReason) -> DownReason;
+i_down(K, _Q, _DownReason) ->
+ case lists:member(K, rabbit_amqqueue_process:info_keys()) of
+ true -> '';
+ false -> throw({bad_argument, K})
+ end.
+
+info_all(VHostPath) ->
+ map(list(VHostPath), fun (Q) -> info(Q) end) ++
+ map(list_down(VHostPath), fun (Q) -> info_down(Q, down) end).
+
+info_all(VHostPath, Items) ->
+ map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
+ map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
+
+info_all_partial_emit(VHostPath, Items, all, Ref, AggregatorPid) ->
+ info_all_partial_emit(VHostPath, Items, online, Ref, AggregatorPid),
+ info_all_partial_emit(VHostPath, Items, offline, Ref, AggregatorPid);
+info_all_partial_emit(VHostPath, Items, online, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(Q) -> info(Q, Items) end,
+ list(VHostPath),
+ continue);
+info_all_partial_emit(VHostPath, Items, offline, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
+ list_down(VHostPath),
+ continue);
+info_all_partial_emit(VHostPath, Items, local, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(Q) -> info(Q, Items) end,
+ list_local(VHostPath),
+ continue).
+
+info_all(VHostPath, Items, Filter, Ref, AggregatorPid) ->
+ info_all_partial_emit(VHostPath, Items, Filter, Ref, AggregatorPid),
+ %% Previous map(s) are incomplete, finalize emission
+ rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []).
+
+info_local(VHostPath) ->
+ map(list_local(VHostPath), fun (Q) -> info(Q, [name]) end).
+
+list_local(VHostPath) ->
+ [ Q || #amqqueue{state = State, pid=QPid} = Q <- list(VHostPath),
+ State =/= crashed,
+ node() =:= node(QPid) ].
+
+force_event_refresh(Ref) ->
+ [gen_server2:cast(Q#amqqueue.pid,
+ {force_event_refresh, Ref}) || Q <- list()],
+ ok.
+
+notify_policy_changed(#amqqueue{pid = QPid}) ->
+ gen_server2:cast(QPid, policy_changed).
+
+consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
+
+consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
+
+consumers_all(VHostPath) ->
+ ConsumerInfoKeys = consumer_info_keys(),
+ lists:append(
+ map(list(VHostPath),
+ fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)).
+
+consumers_all(VHostPath, Ref, AggregatorPid) ->
+ ConsumerInfoKeys = consumer_info_keys(),
+ rabbit_control_misc:emitting_map(
+ AggregatorPid, Ref,
+ fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end,
+ list(VHostPath)).
+
+get_queue_consumer_info(Q, ConsumerInfoKeys) ->
+ [lists:zip(ConsumerInfoKeys,
+ [Q#amqqueue.name, ChPid, CTag,
+ AckRequired, Prefetch, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)].
+
+stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
+
+pid_of(#amqqueue{pid = Pid}) -> Pid.
+pid_of(VHost, QueueName) ->
+ case lookup(rabbit_misc:r(VHost, queue, QueueName)) of
+ {ok, Q} -> pid_of(Q);
+ {error, not_found} = E -> E
+ end.
+
+delete_immediately(QPids) ->
+ [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
+ ok.
+
+delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
+ delegate:call(QPid, {delete, IfUnused, IfEmpty}).
+
+delete_crashed(#amqqueue{ pid = QPid } = Q) ->
+ ok = rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]).
+
+delete_crashed_internal(Q = #amqqueue{ name = QName }) ->
+ {ok, BQ} = application:get_env(rabbit, backing_queue_module),
+ BQ:delete_crashed(Q),
+ ok = internal_delete(QName).
+
+purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
+
+requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
+
+ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
+
+reject(QPid, Requeue, MsgIds, ChPid) ->
+ delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
+
+notify_down_all(QPids, ChPid) ->
+ notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT).
+
+notify_down_all(QPids, ChPid, Timeout) ->
+ case rpc:call(node(), delegate, call,
+ [QPids, {notify_down, ChPid}], Timeout) of
+ {badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}};
+ {badrpc, Reason} -> {error, Reason};
+ {_, Bads} ->
+ case lists:filter(
+ fun ({_Pid, {exit, {R, _}, _}}) ->
+ rabbit_misc:is_abnormal_exit(R);
+ ({_Pid, _}) -> false
+ end, Bads) of
+ [] -> ok;
+ Bads1 -> {error, Bads1}
+ end;
+ Error -> {error, Error}
+ end.
+
+activate_limit_all(QPids, ChPid) ->
+ delegate:cast(QPids, {activate_limit, ChPid}).
+
+credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
+ delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
+
+basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
+ delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
+
+basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
+ LimiterActive, ConsumerPrefetchCount, ConsumerTag,
+ ExclusiveConsume, Args, OkMsg) ->
+ ok = check_consume_arguments(QName, Args),
+ delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
+ Args, OkMsg}).
+
+basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
+ delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+
+notify_decorators(#amqqueue{pid = QPid}) ->
+ delegate:cast(QPid, notify_decorators).
+
+notify_sent(QPid, ChPid) ->
+ rabbit_amqqueue_common:notify_sent(QPid, ChPid).
+
+notify_sent_queue_down(QPid) ->
+ rabbit_amqqueue_common:notify_sent_queue_down(QPid).
+
+resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
+
+internal_delete1(QueueName, OnlyDurable) ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_queue, QueueName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
+ end,
+ %% we want to execute some things, as decided by rabbit_exchange,
+ %% after the transaction.
+ rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
+
+internal_delete(QueueName) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () ->
+ case {mnesia:wread({rabbit_queue, QueueName}),
+ mnesia:wread({rabbit_durable_queue, QueueName})} of
+ {[], []} ->
+ rabbit_misc:const({error, not_found});
+ _ ->
+ Deletions = internal_delete1(QueueName, false),
+ T = rabbit_binding:process_deletions(Deletions),
+ fun() ->
+ ok = T(),
+ rabbit_core_metrics:queue_deleted(QueueName),
+ ok = rabbit_event:notify(queue_deleted,
+ [{name, QueueName}])
+ end
+ end
+ end).
+
+forget_all_durable(Node) ->
+ %% Note rabbit is not running so we avoid e.g. the worker pool. Also why
+ %% we don't invoke the return from rabbit_binding:process_deletions/1.
+ {atomic, ok} =
+ mnesia:sync_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_durable_queue,
+ #amqqueue{_ = '_'}, write),
+ [forget_node_for_queue(Node, Q) ||
+ #amqqueue{pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node],
+ ok
+ end),
+ ok.
+
+%% Try to promote a slave while down - it should recover as a
+%% master. We try to take the oldest slave here for best chance of
+%% recovery.
+forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) ->
+ forget_node_for_queue(DeadNode, RS, Q).
+
+forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) ->
+ %% No slaves to recover from, queue is gone.
+ %% Don't process_deletions since that just calls callbacks and we
+ %% are not really up.
+ internal_delete1(Name, true);
+
+%% Should not happen, but let's be conservative.
+forget_node_for_queue(DeadNode, [DeadNode | T], Q) ->
+ forget_node_for_queue(DeadNode, T, Q);
+
+forget_node_for_queue(DeadNode, [H|T], Q) ->
+ case node_permits_offline_promotion(H) of
+ false -> forget_node_for_queue(DeadNode, T, Q);
+ true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write)
+ end.
+
+node_permits_offline_promotion(Node) ->
+ case node() of
+ Node -> not rabbit:is_running(); %% [1]
+ _ -> All = rabbit_mnesia:cluster_nodes(all),
+ Running = rabbit_mnesia:cluster_nodes(running),
+ lists:member(Node, All) andalso
+ not lists:member(Node, Running) %% [2]
+ end.
+%% [1] In this case if we are a real running node (i.e. rabbitmqctl
+%% has RPCed into us) then we cannot allow promotion. If on the other
+%% hand we *are* rabbitmqctl impersonating the node for offline
+%% node-forgetting then we can.
+%%
+%% [2] This is simpler; as long as it's down that's OK
+
+run_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
+
+set_ram_duration_target(QPid, Duration) ->
+ gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
+
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+
+update_mirroring(QPid) -> ok = delegate:cast(QPid, update_mirroring).
+
+sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, sync_mirrors);
+sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
+cancel_sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, cancel_sync_mirrors);
+cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+
+is_mirrored(Q) ->
+ rabbit_mirror_queue_misc:is_mirrored(Q).
+
+on_node_up(Node) ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_queue,
+ #amqqueue{_ = '_'}, write),
+ [maybe_clear_recoverable_node(Node, Q) || Q <- Qs],
+ ok
+ end).
+
+maybe_clear_recoverable_node(Node,
+ #amqqueue{sync_slave_pids = SPids,
+ recoverable_slaves = RSs} = Q) ->
+ case lists:member(Node, RSs) of
+ true ->
+ %% There is a race with
+ %% rabbit_mirror_queue_slave:record_synchronised/1 called
+ %% by the incoming slave node and this function, called
+ %% by the master node. If this function is executed after
+ %% record_synchronised/1, the node is erroneously removed
+ %% from the recoverable slaves list.
+ %%
+ %% We check if the slave node's queue PID is alive. If it is
+ %% the case, then this function is executed after. In this
+ %% situation, we don't touch the queue record, it is already
+ %% correct.
+ DoClearNode =
+ case [SP || SP <- SPids, node(SP) =:= Node] of
+ [SPid] -> not rabbit_misc:is_process_alive(SPid);
+ _ -> true
+ end,
+ if
+ DoClearNode -> RSs1 = RSs -- [Node],
+ store_queue(
+ Q#amqqueue{recoverable_slaves = RSs1});
+ true -> ok
+ end;
+ false ->
+ ok
+ end.
+
+on_node_down(Node) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> QsDels =
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
+ #amqqueue{name = QName, pid = Pid} = Q
+ <- mnesia:table(rabbit_queue),
+ not rabbit_amqqueue:is_mirrored(Q) andalso
+ node(Pid) == Node andalso
+ not rabbit_mnesia:is_process_alive(Pid)])),
+ {Qs, Dels} = lists:unzip(QsDels),
+ T = rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels)),
+ fun () ->
+ T(),
+ lists:foreach(
+ fun(QName) ->
+ rabbit_core_metrics:queue_deleted(QName),
+ ok = rabbit_event:notify(queue_deleted,
+ [{name, QName}])
+ end, Qs)
+ end
+ end).
+
+delete_queue(QueueName) ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ rabbit_binding:remove_transient_for_destination(QueueName).
+
+pseudo_queue(QueueName, Pid) ->
+ #amqqueue{name = QueueName,
+ durable = false,
+ auto_delete = false,
+ arguments = [],
+ pid = Pid,
+ slave_pids = []}.
+
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ recoverable_slaves = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none,
+ state = none}.
+
+deliver([], _Delivery) ->
+ %% /dev/null optimisation
+ [];
+
+deliver(Qs, Delivery = #delivery{flow = Flow}) ->
+ {MPids, SPids} = qpids(Qs),
+ QPids = MPids ++ SPids,
+ %% We use up two credits to send to a slave since the message
+ %% arrives at the slave from two directions. We will ack one when
+ %% the slave receives the message direct from the channel, and the
+ %% other when it receives it via GM.
+ case Flow of
+ %% Here we are tracking messages sent by the rabbit_channel
+ %% process. We are accessing the rabbit_channel process
+ %% dictionary.
+ flow -> [credit_flow:send(QPid) || QPid <- QPids],
+ [credit_flow:send(QPid) || QPid <- SPids];
+ noflow -> ok
+ end,
+
+ %% We let slaves know that they were being addressed as slaves at
+ %% the time - if they receive such a message from the channel
+ %% after they have become master they should mark the message as
+ %% 'delivered' since they do not know what the master may have
+ %% done with it.
+ MMsg = {deliver, Delivery, false},
+ SMsg = {deliver, Delivery, true},
+ delegate:cast(MPids, MMsg),
+ delegate:cast(SPids, SMsg),
+ QPids.
+
+qpids([]) -> {[], []}; %% optimisation
+qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt
+qpids(Qs) ->
+ {MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids},
+ {MPidAcc, SPidAcc}) ->
+ {[QPid | MPidAcc], [SPids | SPidAcc]}
+ end, {[], []}, Qs),
+ {MPids, lists:append(SPids)}.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
new file mode 100644
index 0000000000..d8c5289997
--- /dev/null
+++ b/src/rabbit_auth_backend_internal.erl
@@ -0,0 +1,421 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_auth_backend_internal).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_authn_backend).
+-behaviour(rabbit_authz_backend).
+
+-export([user_login_authentication/2, user_login_authorization/1,
+ check_vhost_access/3, check_resource_access/3]).
+
+-export([add_user/2, delete_user/1, lookup_user/1,
+ change_password/2, clear_password/1,
+ hash_password/2, change_password_hash/2, change_password_hash/3,
+ set_tags/2, set_permissions/5, clear_permissions/2,
+ add_user_sans_validation/2]).
+-export([user_info_keys/0, perms_info_keys/0,
+ user_perms_info_keys/0, vhost_perms_info_keys/0,
+ user_vhost_perms_info_keys/0,
+ list_users/0, list_users/2, list_permissions/0,
+ list_user_permissions/1, list_user_permissions/3,
+ list_vhost_permissions/1, list_vhost_permissions/3,
+ list_user_vhost_permissions/2]).
+
+%% for testing
+-export([hashing_module_for_user/1]).
+
+%%----------------------------------------------------------------------------
+
+-type regexp() :: binary().
+
+-spec add_user(rabbit_types:username(), rabbit_types:password()) -> 'ok' | {'error', string()}.
+-spec delete_user(rabbit_types:username()) -> 'ok'.
+-spec lookup_user
+ (rabbit_types:username()) ->
+ rabbit_types:ok(rabbit_types:internal_user()) |
+ rabbit_types:error('not_found').
+-spec change_password
+ (rabbit_types:username(), rabbit_types:password()) -> 'ok'.
+-spec clear_password(rabbit_types:username()) -> 'ok'.
+-spec hash_password
+ (module(), rabbit_types:password()) -> rabbit_types:password_hash().
+-spec change_password_hash
+ (rabbit_types:username(), rabbit_types:password_hash()) -> 'ok'.
+-spec set_tags(rabbit_types:username(), [atom()]) -> 'ok'.
+-spec set_permissions
+ (rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(),
+ regexp()) ->
+ 'ok'.
+-spec clear_permissions
+ (rabbit_types:username(), rabbit_types:vhost()) -> 'ok'.
+-spec user_info_keys() -> rabbit_types:info_keys().
+-spec perms_info_keys() -> rabbit_types:info_keys().
+-spec user_perms_info_keys() -> rabbit_types:info_keys().
+-spec vhost_perms_info_keys() -> rabbit_types:info_keys().
+-spec user_vhost_perms_info_keys() -> rabbit_types:info_keys().
+-spec list_users() -> [rabbit_types:infos()].
+-spec list_users(reference(), pid()) -> 'ok'.
+-spec list_permissions() -> [rabbit_types:infos()].
+-spec list_user_permissions
+ (rabbit_types:username()) -> [rabbit_types:infos()].
+-spec list_user_permissions
+ (rabbit_types:username(), reference(), pid()) -> 'ok'.
+-spec list_vhost_permissions
+ (rabbit_types:vhost()) -> [rabbit_types:infos()].
+-spec list_vhost_permissions
+ (rabbit_types:vhost(), reference(), pid()) -> 'ok'.
+-spec list_user_vhost_permissions
+ (rabbit_types:username(), rabbit_types:vhost()) -> [rabbit_types:infos()].
+
+%%----------------------------------------------------------------------------
+%% Implementation of rabbit_auth_backend
+
+%% Returns a password hashing module for the user record provided. If
+%% there is no information in the record, we consider it to be legacy
+%% (inserted by a version older than 3.6.0) and fall back to MD5, the
+%% now obsolete hashing function.
+hashing_module_for_user(#internal_user{
+ hashing_algorithm = ModOrUndefined}) ->
+ rabbit_password:hashing_mod(ModOrUndefined).
+
+user_login_authentication(Username, []) ->
+ internal_check_user_login(Username, fun(_) -> true end);
+user_login_authentication(Username, AuthProps) ->
+ case lists:keyfind(password, 1, AuthProps) of
+ {password, Cleartext} ->
+ internal_check_user_login(
+ Username,
+ fun (#internal_user{
+ password_hash = <<Salt:4/binary, Hash/binary>>
+ } = U) ->
+ Hash =:= rabbit_password:salted_hash(
+ hashing_module_for_user(U), Salt, Cleartext);
+ (#internal_user{}) ->
+ false
+ end);
+ false -> exit({unknown_auth_props, Username, AuthProps})
+ end.
+
+user_login_authorization(Username) ->
+ case user_login_authentication(Username, []) of
+ {ok, #auth_user{impl = Impl, tags = Tags}} -> {ok, Impl, Tags};
+ Else -> Else
+ end.
+
+internal_check_user_login(Username, Fun) ->
+ Refused = {refused, "user '~s' - invalid credentials", [Username]},
+ case lookup_user(Username) of
+ {ok, User = #internal_user{tags = Tags}} ->
+ case Fun(User) of
+ true -> {ok, #auth_user{username = Username,
+ tags = Tags,
+ impl = none}};
+ _ -> Refused
+ end;
+ {error, not_found} ->
+ Refused
+ end.
+
+check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) ->
+ case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] -> false;
+ [_R] -> true
+ end.
+
+check_resource_access(#auth_user{username = Username},
+ #resource{virtual_host = VHostPath, name = Name},
+ Permission) ->
+ case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] ->
+ false;
+ [#user_permission{permission = P}] ->
+ PermRegexp = case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
+ end
+ end.
+
+permission_index(configure) -> #permission.configure;
+permission_index(write) -> #permission.write;
+permission_index(read) -> #permission.read.
+
+%%----------------------------------------------------------------------------
+%% Manipulation of the user database
+
+validate_credentials(Username, Password) ->
+ rabbit_credential_validation:validate(Username, Password).
+
+validate_and_alternate_credentials(Username, Password, Fun) ->
+ case validate_credentials(Username, Password) of
+ ok ->
+ Fun(Username, Password);
+ {error, Err} ->
+ rabbit_log:error("Credential validation for '~s' failed!~n", [Username]),
+ {error, Err}
+ end.
+
+add_user(Username, Password) ->
+ validate_and_alternate_credentials(Username, Password, fun add_user_sans_validation/2).
+
+add_user_sans_validation(Username, Password) ->
+ rabbit_log:info("Creating user '~s'~n", [Username]),
+ %% hash_password will pick the hashing function configured for us
+ %% but we also need to store a hint as part of the record, so we
+ %% retrieve it here one more time
+ HashingMod = rabbit_password:hashing_mod(),
+ User = #internal_user{username = Username,
+ password_hash = hash_password(HashingMod, Password),
+ tags = [],
+ hashing_algorithm = HashingMod},
+ R = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_user, Username}) of
+ [] ->
+ ok = mnesia:write(rabbit_user, User, write);
+ _ ->
+ mnesia:abort({user_already_exists, Username})
+ end
+ end),
+ rabbit_event:notify(user_created, [{name, Username}]),
+ R.
+
+delete_user(Username) ->
+ rabbit_log:info("Deleting user '~s'~n", [Username]),
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
+ end)),
+ rabbit_event:notify(user_deleted, [{name, Username}]),
+ R.
+
+lookup_user(Username) ->
+ rabbit_misc:dirty_read({rabbit_user, Username}).
+
+change_password(Username, Password) ->
+ validate_and_alternate_credentials(Username, Password, fun change_password_sans_validation/2).
+
+change_password_sans_validation(Username, Password) ->
+ rabbit_log:info("Changing password for '~s'~n", [Username]),
+ HashingAlgorithm = rabbit_password:hashing_mod(),
+ R = change_password_hash(Username,
+ hash_password(rabbit_password:hashing_mod(),
+ Password),
+ HashingAlgorithm),
+ rabbit_event:notify(user_password_changed, [{name, Username}]),
+ R.
+
+clear_password(Username) ->
+ rabbit_log:info("Clearing password for '~s'~n", [Username]),
+ R = change_password_hash(Username, <<"">>),
+ rabbit_event:notify(user_password_cleared, [{name, Username}]),
+ R.
+
+hash_password(HashingMod, Cleartext) ->
+ rabbit_password:hash(HashingMod, Cleartext).
+
+change_password_hash(Username, PasswordHash) ->
+ change_password_hash(Username, PasswordHash, rabbit_password:hashing_mod()).
+
+
+change_password_hash(Username, PasswordHash, HashingAlgorithm) ->
+ update_user(Username, fun(User) ->
+ User#internal_user{
+ password_hash = PasswordHash,
+ hashing_algorithm = HashingAlgorithm }
+ end).
+
+set_tags(Username, Tags) ->
+ rabbit_log:info("Setting user tags for user '~s' to ~p~n",
+ [Username, Tags]),
+ R = update_user(Username, fun(User) ->
+ User#internal_user{tags = Tags}
+ end),
+ rabbit_event:notify(user_tags_set, [{name, Username}, {tags, Tags}]),
+ R.
+
+set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ rabbit_log:info("Setting permissions for "
+ "'~s' in '~s' to '~s', '~s', '~s'~n",
+ [Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]),
+ lists:map(
+ fun (RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case re:compile(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp,
+ Regexp, Reason}})
+ end
+ end, [ConfigurePerm, WritePerm, ReadPerm]),
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () -> ok = mnesia:write(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}},
+ write)
+ end)),
+ rabbit_event:notify(permission_created, [{user, Username},
+ {vhost, VHostPath},
+ {configure, ConfigurePerm},
+ {write, WritePerm},
+ {read, ReadPerm}]),
+ R.
+
+clear_permissions(Username, VHostPath) ->
+ R = rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ fun () ->
+ ok = mnesia:delete({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}})
+ end)),
+ rabbit_event:notify(permission_deleted, [{user, Username},
+ {vhost, VHostPath}]),
+ R.
+
+
+update_user(Username, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ {ok, User} = lookup_user(Username),
+ ok = mnesia:write(rabbit_user, Fun(User), write)
+ end)).
+
+%%----------------------------------------------------------------------------
+%% Listing
+
+-define(PERMS_INFO_KEYS, [configure, write, read]).
+-define(USER_INFO_KEYS, [user, tags]).
+
+user_info_keys() -> ?USER_INFO_KEYS.
+
+perms_info_keys() -> [user, vhost | ?PERMS_INFO_KEYS].
+vhost_perms_info_keys() -> [user | ?PERMS_INFO_KEYS].
+user_perms_info_keys() -> [vhost | ?PERMS_INFO_KEYS].
+user_vhost_perms_info_keys() -> ?PERMS_INFO_KEYS.
+
+list_users() ->
+ [extract_internal_user_params(U) ||
+ U <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
+
+list_users(Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map(
+ AggregatorPid, Ref,
+ fun(U) -> extract_internal_user_params(U) end,
+ mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})).
+
+list_permissions() ->
+ list_permissions(perms_info_keys(), match_user_vhost('_', '_')).
+
+list_permissions(Keys, QueryThunk) ->
+ [extract_user_permission_params(Keys, U) ||
+ %% TODO: use dirty ops instead
+ U <- rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+
+list_permissions(Keys, QueryThunk, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map(
+ AggregatorPid, Ref, fun(U) -> extract_user_permission_params(Keys, U) end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(QueryThunk)).
+
+filter_props(Keys, Props) -> [T || T = {K, _} <- Props, lists:member(K, Keys)].
+
+list_user_permissions(Username) ->
+ list_permissions(
+ user_perms_info_keys(),
+ rabbit_misc:with_user(Username, match_user_vhost(Username, '_'))).
+
+list_user_permissions(Username, Ref, AggregatorPid) ->
+ list_permissions(
+ user_perms_info_keys(),
+ rabbit_misc:with_user(Username, match_user_vhost(Username, '_')),
+ Ref, AggregatorPid).
+
+list_vhost_permissions(VHostPath) ->
+ list_permissions(
+ vhost_perms_info_keys(),
+ rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath))).
+
+list_vhost_permissions(VHostPath, Ref, AggregatorPid) ->
+ list_permissions(
+ vhost_perms_info_keys(),
+ rabbit_vhost:with(VHostPath, match_user_vhost('_', VHostPath)),
+ Ref, AggregatorPid).
+
+list_user_vhost_permissions(Username, VHostPath) ->
+ list_permissions(
+ user_vhost_perms_info_keys(),
+ rabbit_misc:with_user_and_vhost(
+ Username, VHostPath, match_user_vhost(Username, VHostPath))).
+
+extract_user_permission_params(Keys, #user_permission{
+ user_vhost =
+ #user_vhost{username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}}) ->
+ filter_props(Keys, [{user, Username},
+ {vhost, VHostPath},
+ {configure, ConfigurePerm},
+ {write, WritePerm},
+ {read, ReadPerm}]).
+
+extract_internal_user_params(#internal_user{username = Username, tags = Tags}) ->
+ [{user, Username}, {tags, Tags}].
+
+match_user_vhost(Username, VHostPath) ->
+ fun () -> mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = '_'},
+ read)
+ end.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
new file mode 100644
index 0000000000..d474e9cad3
--- /dev/null
+++ b/src/rabbit_basic.erl
@@ -0,0 +1,302 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_basic).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-export([publish/4, publish/5, publish/1,
+ message/3, message/4, properties/1, prepend_table_header/3,
+ extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
+ header_routes/1, parse_expiration/1, header/2, header/3]).
+-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
+
+%%----------------------------------------------------------------------------
+
+-type properties_input() ::
+ rabbit_framing:amqp_property_record() | [{atom(), any()}].
+-type publish_result() ::
+ {ok, [pid()]} | rabbit_types:error('not_found').
+-type header() :: any().
+-type headers() :: rabbit_framing:amqp_table() | 'undefined'.
+
+-type exchange_input() :: rabbit_types:exchange() | rabbit_exchange:name().
+-type body_input() :: binary() | [binary()].
+
+-spec publish
+ (exchange_input(), rabbit_router:routing_key(), properties_input(),
+ body_input()) ->
+ publish_result().
+-spec publish
+ (exchange_input(), rabbit_router:routing_key(), boolean(),
+ properties_input(), body_input()) ->
+ publish_result().
+-spec publish(rabbit_types:delivery()) -> publish_result().
+-spec delivery
+ (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
+ rabbit_types:delivery().
+-spec message
+ (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(),
+ binary()) ->
+ rabbit_types:message().
+-spec message
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ rabbit_types:decoded_content()) ->
+ rabbit_types:ok_or_error2(rabbit_types:message(), any()).
+-spec properties
+ (properties_input()) -> rabbit_framing:amqp_property_record().
+
+-spec prepend_table_header
+ (binary(), rabbit_framing:amqp_table(), headers()) -> headers().
+
+-spec header(header(), headers()) -> 'undefined' | any().
+-spec header(header(), headers(), any()) -> 'undefined' | any().
+
+-spec extract_headers(rabbit_types:content()) -> headers().
+
+-spec map_headers
+ (fun((headers()) -> headers()), rabbit_types:content()) ->
+ rabbit_types:content().
+
+-spec header_routes(undefined | rabbit_framing:amqp_table()) -> [string()].
+-spec build_content
+ (rabbit_framing:amqp_property_record(), binary() | [binary()]) ->
+ rabbit_types:content().
+-spec from_content
+ (rabbit_types:content()) ->
+ {rabbit_framing:amqp_property_record(), binary()}.
+-spec parse_expiration
+ (rabbit_framing:amqp_property_record()) ->
+ rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any()).
+
+%%----------------------------------------------------------------------------
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(Exchange, RoutingKeyBin, Properties, Body) ->
+ publish(Exchange, RoutingKeyBin, false, Properties, Body).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(X, delivery(Mandatory, false, Message, undefined));
+publish(XName, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(delivery(Mandatory, false, Message, undefined)).
+
+publish(Delivery = #delivery{
+ message = #basic_message{exchange_name = XName}}) ->
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} -> publish(X, Delivery);
+ Err -> Err
+ end.
+
+publish(X, Delivery) ->
+ Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
+ DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
+ {ok, DeliveredQPids}.
+
+delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
+ message = Message, msg_seq_no = MsgSeqNo, flow = noflow}.
+
+build_content(Properties, BodyBin) when is_binary(BodyBin) ->
+ build_content(Properties, [BodyBin]);
+
+build_content(Properties, PFR) ->
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
+ #content{class_id = ClassId,
+ properties = Properties,
+ properties_bin = none,
+ protocol = none,
+ payload_fragments_rev = PFR}.
+
+from_content(Content) ->
+ #content{class_id = ClassId,
+ properties = Props,
+ payload_fragments_rev = FragmentsRev} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
+ {Props, list_to_binary(lists:reverse(FragmentsRev))}.
+
+%% This breaks the spec rule forbidding message modification
+strip_header(#content{properties = #'P_basic'{headers = undefined}}
+ = DecodedContent, _Key) ->
+ DecodedContent;
+strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
+ = DecodedContent, Key) ->
+ case lists:keysearch(Key, 1, Headers) of
+ false -> DecodedContent;
+ {value, Found} -> Headers0 = lists:delete(Found, Headers),
+ rabbit_binary_generator:clear_encoded_content(
+ DecodedContent#content{
+ properties = Props#'P_basic'{
+ headers = Headers0}})
+ end.
+
+message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
+ try
+ {ok, #basic_message{
+ exchange_name = XName,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ id = rabbit_guid:gen(),
+ is_persistent = is_message_persistent(DecodedContent),
+ routing_keys = [RoutingKey |
+ header_routes(Props#'P_basic'.headers)]}}
+ catch
+ {error, _Reason} = Error -> Error
+ end.
+
+message(XName, RoutingKey, RawProperties, Body) ->
+ Properties = properties(RawProperties),
+ Content = build_content(Properties, Body),
+ {ok, Msg} = message(XName, RoutingKey, Content),
+ Msg.
+
+properties(P = #'P_basic'{}) ->
+ P;
+properties(P) when is_list(P) ->
+ %% Yes, this is O(length(P) * record_info(size, 'P_basic') / 2),
+ %% i.e. slow. Use the definition of 'P_basic' directly if
+ %% possible!
+ lists:foldl(fun ({Key, Value}, Acc) ->
+ case indexof(record_info(fields, 'P_basic'), Key) of
+ 0 -> throw({unknown_basic_property, Key});
+ N -> setelement(N + 1, Acc, Value)
+ end
+ end, #'P_basic'{}, P).
+
+prepend_table_header(Name, Info, undefined) ->
+ prepend_table_header(Name, Info, []);
+prepend_table_header(Name, Info, Headers) ->
+ case rabbit_misc:table_lookup(Headers, Name) of
+ {array, Existing} ->
+ prepend_table(Name, Info, Existing, Headers);
+ undefined ->
+ prepend_table(Name, Info, [], Headers);
+ Other ->
+ Headers2 = prepend_table(Name, Info, [], Headers),
+ set_invalid_header(Name, Other, Headers2)
+ end.
+
+prepend_table(Name, Info, Prior, Headers) ->
+ rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+
+set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) ->
+ case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of
+ undefined ->
+ set_invalid([{Name, array, [Value]}], Headers);
+ {table, ExistingHdr} ->
+ update_invalid(Name, Value, ExistingHdr, Headers);
+ Other ->
+ %% somehow the x-invalid-headers header is corrupt
+ Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}],
+ set_invalid_header(Name, Value, set_invalid(Invalid, Headers))
+ end.
+
+set_invalid(NewHdr, Headers) ->
+ rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr).
+
+update_invalid(Name, Value, ExistingHdr, Header) ->
+ Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of
+ undefined -> [Value];
+ {array, Prior} -> [Value | Prior]
+ end,
+ NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
+ set_invalid(NewHdr, Header).
+
+header(_Header, undefined) ->
+ undefined;
+header(_Header, []) ->
+ undefined;
+header(Header, Headers) ->
+ header(Header, Headers, undefined).
+
+header(Header, Headers, Default) ->
+ case lists:keysearch(Header, 1, Headers) of
+ false -> Default;
+ {value, Val} -> Val
+ end.
+
+extract_headers(Content) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers.
+
+extract_timestamp(Content) ->
+ #content{properties = #'P_basic'{timestamp = Timestamp}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Timestamp.
+
+map_headers(F, Content) ->
+ Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
+ #content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
+ Headers1 = F(Headers),
+ rabbit_binary_generator:clear_encoded_content(
+ Content1#content{properties = Props#'P_basic'{headers = Headers1}}).
+
+indexof(L, Element) -> indexof(L, Element, 1).
+
+indexof([], _Element, _N) -> 0;
+indexof([Element | _Rest], Element, N) -> N;
+indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
+
+is_message_persistent(#content{properties = #'P_basic'{
+ delivery_mode = Mode}}) ->
+ case Mode of
+ 1 -> false;
+ 2 -> true;
+ undefined -> false;
+ Other -> throw({error, {delivery_mode_unknown, Other}})
+ end.
+
+%% Extract CC routes from headers
+header_routes(undefined) ->
+ [];
+header_routes(HeadersTable) ->
+ lists:append(
+ [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {array, Routes} -> [Route || {longstr, Route} <- Routes];
+ undefined -> [];
+ {Type, _Val} -> throw({error, {unacceptable_type_in_header,
+ binary_to_list(HeaderKey), Type}})
+ end || HeaderKey <- ?ROUTING_HEADERS]).
+
+parse_expiration(#'P_basic'{expiration = undefined}) ->
+ {ok, undefined};
+parse_expiration(#'P_basic'{expiration = Expiration}) ->
+ case string:to_integer(binary_to_list(Expiration)) of
+ {error, no_integer} = E ->
+ E;
+ {N, ""} ->
+ case rabbit_misc:check_expiry(N) of
+ ok -> {ok, N};
+ E = {error, _} -> E
+ end;
+ {_, S} ->
+ {error, {leftover_string, S}}
+ end.
+
+maybe_gc_large_msg(Content) ->
+ rabbit_writer:maybe_gc_large_msg(Content).
+
+msg_size(Content) ->
+ rabbit_writer:msg_size(Content).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
new file mode 100644
index 0000000000..dae0612e03
--- /dev/null
+++ b/src/rabbit_channel.erl
@@ -0,0 +1,2034 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_channel).
+
+%% rabbit_channel processes represent an AMQP 0-9-1 channels.
+%%
+%% Connections parse protocol frames coming from clients and
+%% dispatch them to channel processes.
+%% Channels are responsible for implementing the logic behind
+%% the various protocol methods, involving other processes as
+%% needed:
+%%
+%% * Routing messages (using functions in various exchange type
+%% modules) to queue processes.
+%% * Managing queues, exchanges, and bindings.
+%% * Keeping track of consumers
+%% * Keeping track of unacknowledged deliveries to consumers
+%% * Keeping track of publisher confirms
+%% * Keeping track of mandatory message routing confirmations
+%% and returns
+%% * Transaction management
+%% * Authorisation (enforcing permissions)
+%% * Publishing trace events if tracing is enabled
+%%
+%% Every channel has a number of dependent processes:
+%%
+%% * A writer which is responsible for sending frames to clients.
+%% * A limiter which controls how many messages can be delivered
+%% to consumers according to active QoS prefetch and internal
+%% flow control logic.
+%%
+%% Channels are also aware of their connection's queue collector.
+%% When a queue is declared as exclusive on a channel, the channel
+%% will notify queue collector of that queue.
+
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-behaviour(gen_server2).
+
+-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
+-export([send_command/2, deliver/4, deliver_reply/2,
+ send_credit_reply/2, send_drained/2]).
+-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
+ info_all/3, info_local/1]).
+-export([refresh_config_local/0, ready_for_close/1]).
+-export([force_event_refresh/1]).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
+%% Internal
+-export([list_local/0, deliver_reply_local/3]).
+-export([get_vhost/1, get_user/1]).
+
+-record(ch, {
+ %% starting | running | flow | closing
+ state,
+ %% same as reader's protocol. Used when instantiating
+ %% (protocol) exceptions.
+ protocol,
+ %% channel number
+ channel,
+ %% reader process
+ reader_pid,
+ %% writer process
+ writer_pid,
+ %%
+ conn_pid,
+ %% same as reader's name, see #v1.name
+ %% in rabbit_reader
+ conn_name,
+ %% limiter pid, see rabbit_limiter
+ limiter,
+ %% none | {Msgs, Acks} | committing | failed |
+ tx,
+ %% (consumer) delivery tag sequence
+ next_tag,
+ %% messages pending consumer acknowledgement
+ unacked_message_q,
+ %% same as #v1.user in the reader, used in
+ %% authorisation checks
+ user,
+ %% same as #v1.user in the reader
+ virtual_host,
+ %% when queue.bind's queue field is empty,
+ %% this name will be used instead
+ most_recently_declared_queue,
+ %% a dictionary of queue pid to queue name
+ queue_names,
+ %% queue processes are monitored to update
+ %% queue names
+ queue_monitors,
+ %% a dictionary of consumer tags to
+ %% consumer details: #amqqueue record, acknowledgement mode,
+ %% consumer exclusivity, etc
+ consumer_mapping,
+ %% a dictionary of queue pids to consumer tag lists
+ queue_consumers,
+ %% a set of pids of queues that have unacknowledged
+ %% deliveries
+ delivering_queues,
+ %% when a queue is declared as exclusive, queue
+ %% collector must be notified.
+ %% see rabbit_queue_collector for more info.
+ queue_collector_pid,
+ %% timer used to emit statistics
+ stats_timer,
+ %% are publisher confirms enabled for this channel?
+ confirm_enabled,
+ %% publisher confirm delivery tag sequence
+ publish_seqno,
+ %% a dtree used to track unconfirmed
+ %% (to publishers) messages
+ unconfirmed,
+ %% a list of tags for published messages that were
+ %% delivered but are yet to be confirmed to the client
+ confirmed,
+ %% a dtree used to track oustanding notifications
+ %% for messages published as mandatory
+ mandatory,
+ %% same as capabilities in the reader
+ capabilities,
+ %% tracing exchange resource if tracing is enabled,
+ %% 'none' otherwise
+ trace_state,
+ consumer_prefetch,
+ %% used by "one shot RPC" (amq.
+ reply_consumer,
+ %% flow | noflow, see rabbitmq-server#114
+ delivery_flow,
+ interceptor_state
+}).
+
+
+-define(MAX_PERMISSION_CACHE_SIZE, 12).
+
+-define(STATISTICS_KEYS,
+ [reductions,
+ pid,
+ transactional,
+ confirm,
+ consumer_count,
+ messages_unacknowledged,
+ messages_unconfirmed,
+ messages_uncommitted,
+ acks_uncommitted,
+ prefetch_count,
+ global_prefetch_count,
+ state,
+ garbage_collection]).
+
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ name,
+ connection,
+ number,
+ user,
+ vhost]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
+-define(INCR_STATS(Incs, Measure, State),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
+ fine -> incr_stats(Incs, Measure);
+ _ -> ok
+ end).
+
+%%----------------------------------------------------------------------------
+
+-export_type([channel_number/0]).
+
+-type channel_number() :: non_neg_integer().
+
+-export_type([channel/0]).
+
+-type channel() :: #ch{}.
+
+-spec start_link
+ (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid(), pid()) ->
+ rabbit_types:ok_pid_or_error().
+-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
+-spec do
+ (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) ->
+ 'ok'.
+-spec do_flow
+ (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) ->
+ 'ok'.
+-spec flush(pid()) -> 'ok'.
+-spec shutdown(pid()) -> 'ok'.
+-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
+-spec deliver
+ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'.
+-spec deliver_reply(binary(), rabbit_types:delivery()) -> 'ok'.
+-spec deliver_reply_local(pid(), binary(), rabbit_types:delivery()) -> 'ok'.
+-spec send_credit_reply(pid(), non_neg_integer()) -> 'ok'.
+-spec send_drained(pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'.
+-spec list() -> [pid()].
+-spec list_local() -> [pid()].
+-spec info_keys() -> rabbit_types:info_keys().
+-spec info(pid()) -> rabbit_types:infos().
+-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
+-spec info_all() -> [rabbit_types:infos()].
+-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
+-spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'.
+-spec refresh_config_local() -> 'ok'.
+-spec ready_for_close(pid()) -> 'ok'.
+-spec force_event_refresh(reference()) -> 'ok'.
+
+%%----------------------------------------------------------------------------
+
+start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
+ VHost, Capabilities, CollectorPid, Limiter) ->
+ gen_server2:start_link(
+ ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
+ User, VHost, Capabilities, CollectorPid, Limiter], []).
+
+do(Pid, Method) ->
+ do(Pid, Method, none).
+
+do(Pid, Method, Content) ->
+ gen_server2:cast(Pid, {method, Method, Content, noflow}).
+
+do_flow(Pid, Method, Content) ->
+ %% Here we are tracking messages sent by the rabbit_reader
+ %% process. We are accessing the rabbit_reader process dictionary.
+ credit_flow:send(Pid),
+ gen_server2:cast(Pid, {method, Method, Content, flow}).
+
+flush(Pid) ->
+ gen_server2:call(Pid, flush, infinity).
+
+shutdown(Pid) ->
+ gen_server2:cast(Pid, terminate).
+
+send_command(Pid, Msg) ->
+ gen_server2:cast(Pid, {command, Msg}).
+
+deliver(Pid, ConsumerTag, AckRequired, Msg) ->
+ gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+
+deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
+ case decode_fast_reply_to(Rest) of
+ {ok, Pid, Key} ->
+ delegate:invoke_no_result(
+ Pid, {?MODULE, deliver_reply_local, [Key, Delivery]});
+ error ->
+ ok
+ end.
+
+%% We want to ensure people can't use this mechanism to send a message
+%% to an arbitrary process and kill it!
+deliver_reply_local(Pid, Key, Delivery) ->
+ case pg_local:in_group(rabbit_channels, Pid) of
+ true -> gen_server2:cast(Pid, {deliver_reply, Key, Delivery});
+ false -> ok
+ end.
+
+declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
+ exists;
+declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) ->
+ case decode_fast_reply_to(Rest) of
+ {ok, Pid, Key} ->
+ Msg = {declare_fast_reply_to, Key},
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const(not_found),
+ fun() -> gen_server2:call(Pid, Msg, infinity) end);
+ error ->
+ not_found
+ end;
+declare_fast_reply_to(_) ->
+ not_found.
+
+decode_fast_reply_to(Rest) ->
+ case string:tokens(binary_to_list(Rest), ".") of
+ [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
+ {ok, Pid, Key};
+ _ -> error
+ end.
+
+send_credit_reply(Pid, Len) ->
+ gen_server2:cast(Pid, {send_credit_reply, Len}).
+
+send_drained(Pid, CTagCredit) ->
+ gen_server2:cast(Pid, {send_drained, CTagCredit}).
+
+list() ->
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
+ rabbit_channel, list_local, []).
+
+list_local() ->
+ pg_local:get_members(rabbit_channels).
+
+info_keys() -> ?INFO_KEYS.
+
+info(Pid) ->
+ gen_server2:call(Pid, info, infinity).
+
+info(Pid, Items) ->
+ case gen_server2:call(Pid, {info, Items}, infinity) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+info_all() ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()).
+
+info_all(Items) ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+
+info_local(Items) ->
+ rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list_local()).
+
+info_all(Items, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()).
+
+refresh_config_local() ->
+ rabbit_misc:upmap(
+ fun (C) -> gen_server2:call(C, refresh_config, infinity) end,
+ list_local()),
+ ok.
+
+ready_for_close(Pid) ->
+ gen_server2:cast(Pid, ready_for_close).
+
+force_event_refresh(Ref) ->
+ [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
+ ok.
+
+%%---------------------------------------------------------------------------
+
+init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
+ Capabilities, CollectorPid, LimiterPid]) ->
+ process_flag(trap_exit, true),
+ ?store_proc_name({ConnName, Channel}),
+ ok = pg_local:join(rabbit_channels, self()),
+ Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of
+ true -> flow;
+ false -> noflow
+ end,
+ State = #ch{state = starting,
+ protocol = Protocol,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ limiter = rabbit_limiter:new(LimiterPid),
+ tx = none,
+ next_tag = 1,
+ unacked_message_q = queue:new(),
+ user = User,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
+ queue_monitors = pmon:new(),
+ consumer_mapping = dict:new(),
+ queue_consumers = dict:new(),
+ delivering_queues = sets:new(),
+ queue_collector_pid = CollectorPid,
+ confirm_enabled = false,
+ publish_seqno = 1,
+ unconfirmed = dtree:empty(),
+ confirmed = [],
+ mandatory = dtree:empty(),
+ capabilities = Capabilities,
+ trace_state = rabbit_trace:init(VHost),
+ consumer_prefetch = 0,
+ reply_consumer = none,
+ delivery_flow = Flow,
+ interceptor_state = undefined},
+ State1 = State#ch{
+ interceptor_state = rabbit_channel_interceptor:init(State)},
+ State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
+ Infos = infos(?CREATION_EVENT_KEYS, State2),
+ rabbit_core_metrics:channel_created(self(), Infos),
+ rabbit_event:notify(channel_created, Infos),
+ rabbit_event:if_enabled(State2, #ch.stats_timer,
+ fun() -> emit_stats(State2) end),
+ put(channel_operation_timeout, ?CHANNEL_OPERATION_TIMEOUT),
+ {ok, State2, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+prioritise_call(Msg, _From, _Len, _State) ->
+ case Msg of
+ info -> 9;
+ {info, _Items} -> 9;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _Len, _State) ->
+ case Msg of
+ {confirm, _MsgSeqNos, _QPid} -> 5;
+ {mandatory_received, _MsgSeqNo, _QPid} -> 5;
+ _ -> 0
+ end.
+
+prioritise_info(Msg, _Len, _State) ->
+ case Msg of
+ emit_stats -> 7;
+ _ -> 0
+ end.
+
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
+
+handle_call({info, Items}, _From, State) ->
+ try
+ reply({ok, infos(Items, State)}, State)
+ catch Error -> reply({error, Error}, State)
+ end;
+
+handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
+ reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)});
+
+handle_call({declare_fast_reply_to, Key}, _From,
+ State = #ch{reply_consumer = Consumer}) ->
+ reply(case Consumer of
+ {_, _, Key} -> exists;
+ _ -> not_found
+ end, State);
+
+handle_call(_Request, _From, State) ->
+ noreply(State).
+
+handle_cast({method, Method, Content, Flow},
+ State = #ch{reader_pid = Reader,
+ interceptor_state = IState}) ->
+ case Flow of
+ %% We are going to process a message from the rabbit_reader
+ %% process, so here we ack it. In this case we are accessing
+ %% the rabbit_channel process dictionary.
+ flow -> credit_flow:ack(Reader);
+ noflow -> ok
+ end,
+
+ try handle_method(rabbit_channel_interceptor:intercept_in(
+ expand_shortcuts(Method, State), Content, IState),
+ State) of
+ {reply, Reply, NewState} ->
+ ok = send(Reply, NewState),
+ noreply(NewState);
+ {noreply, NewState} ->
+ noreply(NewState);
+ stop ->
+ {stop, normal, State}
+ catch
+ exit:Reason = #amqp_error{} ->
+ MethodName = rabbit_misc:method_record_type(Method),
+ handle_exception(Reason#amqp_error{method = MethodName}, State);
+ _:Reason ->
+ {stop, {Reason, erlang:get_stacktrace()}, State}
+ end;
+
+handle_cast(ready_for_close, State = #ch{state = closing,
+ writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
+ {stop, normal, State};
+
+handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:flush(WriterPid),
+ {stop, normal, State};
+
+handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) ->
+ ok = send(Msg, State),
+ noreply(consumer_monitor(CTag, State));
+
+handle_cast({command, Msg}, State) ->
+ ok = send(Msg, State),
+ noreply(State);
+
+handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) ->
+ noreply(State);
+handle_cast({deliver, ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
+ ok = rabbit_writer:send_command_and_notify(
+ WriterPid, QPid, self(),
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ rabbit_basic:maybe_gc_large_msg(Content),
+ noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
+
+handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) ->
+ noreply(State);
+handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) ->
+ noreply(State);
+handle_cast({deliver_reply, Key, #delivery{message =
+ #basic_message{exchange_name = ExchangeName,
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag,
+ reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = false,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(State);
+handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) ->
+ noreply(State);
+
+handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_ok'{available = Len}),
+ noreply(State);
+
+handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
+ [ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
+ credit_drained = CreditDrained})
+ || {ConsumerTag, CreditDrained} <- CTagCredit],
+ noreply(State);
+
+handle_cast({force_event_refresh, Ref}, State) ->
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
+ Ref),
+ noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer));
+
+handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
+
+handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})).
+
+handle_info({bump_credit, Msg}, State) ->
+ %% A rabbit_amqqueue_process is granting credit to our channel. If
+ %% our channel was being blocked by this process, and no other
+ %% process is blocking our channel, then this channel will be
+ %% unblocked. This means that any credit that was deferred will be
+ %% sent to rabbit_reader processs that might be blocked by this
+ %% particular channel.
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
+handle_info(timeout, State) ->
+ noreply(State);
+
+handle_info(emit_stats, State) ->
+ emit_stats(State),
+ State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
+ %% NB: don't call noreply/1 since we don't want to kick off the
+ %% stats timer.
+ {noreply, send_confirms(State1), hibernate};
+
+handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
+ State1 = handle_publishing_queue_down(QPid, Reason, State),
+ State3 = handle_consuming_queue_down(QPid, State1),
+ State4 = handle_delivering_queue_down(QPid, State3),
+ %% A rabbit_amqqueue_process has died. If our channel was being
+ %% blocked by this process, and no other process is blocking our
+ %% channel, then this channel will be unblocked. This means that
+ %% any credit that was deferred will be sent to the rabbit_reader
+ %% processs that might be blocked by this particular channel.
+ credit_flow:peer_down(QPid),
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+
+handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel})
+ when is_reference(Ref) ->
+ log(warning, "Channel ~p ignoring late answer ~p from ~p",
+ [Channel, LateAnswer, Node]),
+ noreply(State).
+
+handle_pre_hibernate(State) ->
+ ok = clear_permission_cache(),
+ rabbit_event:if_enabled(
+ State, #ch.stats_timer,
+ fun () -> emit_stats(State,
+ [{idle_since,
+ time_compat:os_system_time(milli_seconds)}])
+ end),
+ {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
+
+terminate(_Reason, State = #ch{}) ->
+ {_Res, _State1} = notify_queues(State),
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_event:if_enabled(State, #ch.stats_timer,
+ fun() -> emit_stats(State) end),
+ [delete_stats(Tag) || {Tag, _} <- get()],
+ rabbit_core_metrics:channel_closed(self()),
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
+%%---------------------------------------------------------------------------
+
+log(Level, Fmt, Args) -> rabbit_log:log(channel, Level, Fmt, Args).
+
+reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
+
+noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
+
+next_state(State) -> ensure_stats_timer(send_confirms(State)).
+
+noreply_coalesce(State = #ch{confirmed = C}) ->
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ {noreply, ensure_stats_timer(State), Timeout}.
+
+ensure_stats_timer(State) ->
+ rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
+
+return_ok(State, true, _Msg) -> {noreply, State};
+return_ok(State, false, Msg) -> {reply, Msg, State}.
+
+ok_msg(true, _Msg) -> undefined;
+ok_msg(false, Msg) -> Msg.
+
+send(_Command, #ch{state = closing}) ->
+ ok;
+send(Command, #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Command).
+
+format_soft_error(#amqp_error{name = N, explanation = E, method = M}) ->
+ io_lib:format("operation ~s caused a channel exception ~s: ~ts", [M, N, E]);
+format_soft_error(Reason) ->
+ Reason.
+
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ virtual_host = VHost,
+ user = User}) ->
+ %% something bad's happened: notify_queues may not be 'ok'
+ {_Result, State1} = notify_queues(State),
+ case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
+ {Channel, CloseMethod} ->
+ log(error, "Channel error on connection ~p (~s, vhost: '~s',"
+ " user: '~s'), channel ~p:~n~s~n",
+ [ConnPid, ConnName, VHost, User#user.username,
+ Channel, format_soft_error(Reason)]),
+ ok = rabbit_writer:send_command(WriterPid, CloseMethod),
+ {noreply, State1};
+ {0, _} ->
+ ReaderPid ! {channel_exit, Channel, Reason},
+ {stop, normal, State1}
+ end.
+
+-spec precondition_failed(string()) -> no_return().
+
+precondition_failed(Format) -> precondition_failed(Format, []).
+
+-spec precondition_failed(string(), [any()]) -> no_return().
+
+precondition_failed(Format, Params) ->
+ rabbit_misc:protocol_error(precondition_failed, Format, Params).
+
+return_queue_declare_ok(#resource{name = ActualName},
+ NoWait, MessageCount, ConsumerCount, State) ->
+ return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
+ #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount}).
+
+check_resource_access(User, Resource, Perm) ->
+ V = {Resource, Perm},
+ Cache = case get(permission_cache) of
+ undefined -> [];
+ Other -> Other
+ end,
+ case lists:member(V, Cache) of
+ true -> ok;
+ false -> ok = rabbit_access_control:check_resource_access(
+ User, Resource, Perm),
+ CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
+ put(permission_cache, [V | CacheTail])
+ end.
+
+clear_permission_cache() -> erase(permission_cache),
+ ok.
+
+check_configure_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, configure).
+
+check_write_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, write).
+
+check_read_permitted(Resource, #ch{user = User}) ->
+ check_resource_access(User, Resource, read).
+
+check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
+ ok;
+check_user_id_header(#'P_basic'{user_id = Username},
+ #ch{user = #user{username = Username}}) ->
+ ok;
+check_user_id_header(
+ #'P_basic'{}, #ch{user = #user{authz_backends =
+ [{rabbit_auth_backend_dummy, _}]}}) ->
+ ok;
+check_user_id_header(#'P_basic'{user_id = Claimed},
+ #ch{user = #user{username = Actual,
+ tags = Tags}}) ->
+ case lists:member(impersonator, Tags) of
+ true -> ok;
+ false -> precondition_failed(
+ "user_id property set to '~s' but authenticated user was "
+ "'~s'", [Claimed, Actual])
+ end.
+
+check_expiration_header(Props) ->
+ case rabbit_basic:parse_expiration(Props) of
+ {ok, _} -> ok;
+ {error, E} -> precondition_failed("invalid expiration '~s': ~p",
+ [Props#'P_basic'.expiration, E])
+ end.
+
+check_internal_exchange(#exchange{name = Name, internal = true}) ->
+ rabbit_misc:protocol_error(access_refused,
+ "cannot publish to internal ~s",
+ [rabbit_misc:rs(Name)]);
+check_internal_exchange(_) ->
+ ok.
+
+check_msg_size(Content) ->
+ Size = rabbit_basic:maybe_gc_large_msg(Content),
+ case Size > ?MAX_MSG_SIZE of
+ true -> precondition_failed("message size ~B larger than max size ~B",
+ [Size, ?MAX_MSG_SIZE]);
+ false -> ok
+ end.
+
+qbin_to_resource(QueueNameBin, State) ->
+ name_to_resource(queue, QueueNameBin, State).
+
+name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
+ rabbit_misc:r(VHostPath, Type, NameBin).
+
+expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
+expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
+ MRDQ;
+expand_queue_name_shortcut(QueueNameBin, _) ->
+ QueueNameBin.
+
+expand_routing_key_shortcut(<<>>, <<>>,
+ #ch{most_recently_declared_queue = <<>>}) ->
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
+expand_routing_key_shortcut(<<>>, <<>>,
+ #ch{most_recently_declared_queue = MRDQ}) ->
+ MRDQ;
+expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
+ RoutingKey.
+
+expand_shortcuts(#'basic.get' {queue = Q} = M, State) ->
+ M#'basic.get' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'basic.consume'{queue = Q} = M, State) ->
+ M#'basic.consume'{queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.delete' {queue = Q} = M, State) ->
+ M#'queue.delete' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.purge' {queue = Q} = M, State) ->
+ M#'queue.purge' {queue = expand_queue_name_shortcut(Q, State)};
+expand_shortcuts(#'queue.bind' {queue = Q, routing_key = K} = M, State) ->
+ M#'queue.bind' {queue = expand_queue_name_shortcut(Q, State),
+ routing_key = expand_routing_key_shortcut(Q, K, State)};
+expand_shortcuts(#'queue.unbind' {queue = Q, routing_key = K} = M, State) ->
+ M#'queue.unbind' {queue = expand_queue_name_shortcut(Q, State),
+ routing_key = expand_routing_key_shortcut(Q, K, State)};
+expand_shortcuts(M, _State) ->
+ M.
+
+check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
+ rabbit_misc:protocol_error(
+ access_refused, "operation not permitted on the default exchange", []);
+check_not_default_exchange(_) ->
+ ok.
+
+check_exchange_deletion(XName = #resource{name = <<"amq.", _/binary>>,
+ kind = exchange}) ->
+ rabbit_misc:protocol_error(
+ access_refused, "deletion of system ~s not allowed",
+ [rabbit_misc:rs(XName)]);
+check_exchange_deletion(_) ->
+ ok.
+
+%% check that an exchange/queue name does not contain the reserved
+%% "amq." prefix.
+%%
+%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names
+%% only applies on actual creation, and not in the cases where the
+%% entity already exists or passive=true.
+%%
+%% NB: We deliberately do not enforce the other constraints on names
+%% required by the spec.
+check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
+ rabbit_misc:protocol_error(
+ access_refused,
+ "~s name '~s' contains reserved prefix 'amq.*'",[Kind, NameBin]);
+check_name(_Kind, NameBin) ->
+ NameBin.
+
+maybe_set_fast_reply_to(
+ C = #content{properties = P = #'P_basic'{reply_to =
+ <<"amq.rabbitmq.reply-to">>}},
+ #ch{reply_consumer = ReplyConsumer}) ->
+ case ReplyConsumer of
+ none -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "fast reply consumer does not exist", []);
+ {_, Suf, _K} -> Rep = <<"amq.rabbitmq.reply-to.", Suf/binary>>,
+ rabbit_binary_generator:clear_encoded_content(
+ C#content{properties = P#'P_basic'{reply_to = Rep}})
+ end;
+maybe_set_fast_reply_to(C, _State) ->
+ C.
+
+record_confirms([], State) ->
+ State;
+record_confirms(MXs, State = #ch{confirmed = C}) ->
+ State#ch{confirmed = [MXs | C]}.
+
+handle_method({Method, Content}, State) ->
+ handle_method(Method, Content, State).
+
+handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
+ %% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
+ State1 = State#ch{state = running},
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {reply, #'channel.open_ok'{}, State1};
+
+handle_method(#'channel.open'{}, _, _State) ->
+ rabbit_misc:protocol_error(
+ channel_error, "second 'channel.open' seen", []);
+
+handle_method(_Method, _, #ch{state = starting}) ->
+ rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
+
+handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
+ stop;
+
+handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid,
+ state = closing}) ->
+ ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
+ {noreply, State};
+
+handle_method(_Method, _, State = #ch{state = closing}) ->
+ {noreply, State};
+
+handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
+ {_Result, State1} = notify_queues(State),
+ %% We issue the channel.close_ok response after a handshake with
+ %% the reader, the other half of which is ready_for_close. That
+ %% way the reader forgets about the channel before we send the
+ %% response (and this channel process terminates). If we didn't do
+ %% that, a channel.open for the same channel number, which a
+ %% client is entitled to send as soon as it has received the
+ %% close_ok, might be received by the reader before it has seen
+ %% the termination and hence be sent to the old, now dead/dying
+ %% channel process, instead of a new process, and thus lost.
+ ReaderPid ! {channel_closing, self()},
+ {noreply, State1};
+
+%% Even though the spec prohibits the client from sending commands
+%% while waiting for the reply to a synchronous command, we generally
+%% do allow this...except in the case of a pending tx.commit, where
+%% it could wreak havoc.
+handle_method(_Method, _, #ch{tx = Tx})
+ when Tx =:= committing orelse Tx =:= failed ->
+ rabbit_misc:protocol_error(
+ channel_error, "unexpected command while processing 'tx.commit'", []);
+
+handle_method(#'access.request'{},_, State) ->
+ {reply, #'access.request_ok'{ticket = 1}, State};
+
+handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "immediate=true", []);
+
+handle_method(#'basic.publish'{exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ mandatory = Mandatory},
+ Content, State = #ch{virtual_host = VHostPath,
+ tx = Tx,
+ channel = ChannelNum,
+ confirm_enabled = ConfirmEnabled,
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName,
+ delivery_flow = Flow}) ->
+ check_msg_size(Content),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_write_permitted(ExchangeName, State),
+ Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ check_internal_exchange(Exchange),
+ %% We decode the content's properties here because we're almost
+ %% certain to want to look at delivery-mode and priority.
+ DecodedContent = #content {properties = Props} =
+ maybe_set_fast_reply_to(
+ rabbit_binary_parser:ensure_content_decoded(Content), State),
+ check_user_id_header(Props, State),
+ check_expiration_header(Props),
+ DoConfirm = Tx =/= none orelse ConfirmEnabled,
+ {MsgSeqNo, State1} =
+ case DoConfirm orelse Mandatory of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ end,
+ case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
+ {ok, Message} ->
+ Delivery = rabbit_basic:delivery(
+ Mandatory, DoConfirm, Message, MsgSeqNo),
+ QNames = rabbit_exchange:route(Exchange, Delivery),
+ rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
+ Username, TraceState),
+ DQ = {Delivery#delivery{flow = Flow}, QNames},
+ {noreply, case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = {Msgs1, Acks}}
+ end};
+ {error, Reason} ->
+ precondition_failed("invalid message: ~p", [Reason])
+ end;
+
+handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = Multiple,
+ requeue = Requeue}, _, State) ->
+ reject(DeliveryTag, Requeue, Multiple, State);
+
+handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = Multiple},
+ _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply, case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end};
+
+handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ limiter = Limiter,
+ next_tag = DeliveryTag}) ->
+ QueueName = qbin_to_resource(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
+ case rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ConnPid,
+ fun (Q) -> rabbit_amqqueue:basic_get(
+ Q, self(), NoAck, rabbit_limiter:pid(Limiter))
+ end) of
+ {ok, MessageCount,
+ Msg = {QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}} ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey,
+ message_count = MessageCount},
+ Content),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
+ {noreply, record_sent(none, not(NoAck), Msg, State1)};
+ empty ->
+ {reply, #'basic.get_empty'{}, State}
+ end;
+
+handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
+ consumer_tag = CTag0,
+ no_ack = NoAck,
+ nowait = NoWait},
+ _, State = #ch{reply_consumer = ReplyConsumer,
+ consumer_mapping = ConsumerMapping}) ->
+ case dict:find(CTag0, ConsumerMapping) of
+ error ->
+ case {ReplyConsumer, NoAck} of
+ {none, true} ->
+ CTag = case CTag0 of
+ <<>> -> rabbit_guid:binary(
+ rabbit_guid:gen_secure(), "amq.ctag");
+ Other -> Other
+ end,
+ %% Precalculate both suffix and key; base64 encoding is
+ %% expensive
+ Key = base64:encode(rabbit_guid:gen_secure()),
+ PidEnc = base64:encode(term_to_binary(self())),
+ Suffix = <<PidEnc/binary, ".", Key/binary>>,
+ Consumer = {CTag, Suffix, binary_to_list(Key)},
+ State1 = State#ch{reply_consumer = Consumer},
+ case NoWait of
+ true -> {noreply, State1};
+ false -> Rep = #'basic.consume_ok'{consumer_tag = CTag},
+ {reply, Rep, State1}
+ end;
+ {_, false} ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "reply consumer cannot acknowledge", []);
+ _ ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "reply consumer already set", [])
+ end;
+ {ok, _} ->
+ %% Attempted reuse of consumer tag.
+ rabbit_misc:protocol_error(
+ not_allowed, "attempt to reuse consumer tag '~s'", [CTag0])
+ end;
+
+handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
+ _, State = #ch{reply_consumer = {ConsumerTag, _, _}}) ->
+ State1 = State#ch{reply_consumer = none},
+ case NoWait of
+ true -> {noreply, State1};
+ false -> Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
+ {reply, Rep, State1}
+ end;
+
+handle_method(#'basic.consume'{queue = QueueNameBin,
+ consumer_tag = ConsumerTag,
+ no_local = _, % FIXME: implement
+ no_ack = NoAck,
+ exclusive = ExclusiveConsume,
+ nowait = NoWait,
+ arguments = Args},
+ _, State = #ch{consumer_prefetch = ConsumerPrefetch,
+ consumer_mapping = ConsumerMapping}) ->
+ case dict:find(ConsumerTag, ConsumerMapping) of
+ error ->
+ QueueName = qbin_to_resource(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
+ ActualConsumerTag =
+ case ConsumerTag of
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.ctag");
+ Other -> Other
+ end,
+ case basic_consume(
+ QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait, State) of
+ {ok, State1} ->
+ {noreply, State1};
+ {error, exclusive_consume_unavailable} ->
+ rabbit_misc:protocol_error(
+ access_refused, "~s in exclusive use",
+ [rabbit_misc:rs(QueueName)])
+ end;
+ {ok, _} ->
+ %% Attempted reuse of consumer tag.
+ rabbit_misc:protocol_error(
+ not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag])
+ end;
+
+handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
+ _, State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QCons}) ->
+ OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
+ case dict:find(ConsumerTag, ConsumerMapping) of
+ error ->
+ %% Spec requires we ignore this situation.
+ return_ok(State, NoWait, OkMsg);
+ {ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
+ QCons1 =
+ case dict:find(QPid, QCons) of
+ error -> QCons;
+ {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
+ case gb_sets:is_empty(CTags1) of
+ true -> dict:erase(QPid, QCons);
+ false -> dict:store(QPid, CTags1, QCons)
+ end
+ end,
+ NewState = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1},
+ %% In order to ensure that no more messages are sent to
+ %% the consumer after the cancel_ok has been sent, we get
+ %% the queue process to send the cancel_ok on our
+ %% behalf. If we were sending the cancel_ok ourselves it
+ %% might overtake a message sent previously by the queue.
+ case rabbit_misc:with_exit_handler(
+ fun () -> {error, not_found} end,
+ fun () ->
+ rabbit_amqqueue:basic_cancel(
+ Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg))
+ end) of
+ ok ->
+ {noreply, NewState};
+ {error, not_found} ->
+ %% Spec requires we ignore this situation.
+ return_ok(NewState, NoWait, OkMsg)
+ end
+ end;
+
+handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
+ rabbit_misc:protocol_error(not_implemented,
+ "prefetch_size!=0 (~w)", [Size]);
+
+handle_method(#'basic.qos'{global = false,
+ prefetch_count = PrefetchCount}, _, State) ->
+ {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
+
+handle_method(#'basic.qos'{global = true,
+ prefetch_count = 0},
+ _, State = #ch{limiter = Limiter}) ->
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
+
+handle_method(#'basic.qos'{global = true,
+ prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
+ %% TODO queue:len(UAMQ) is not strictly right since that counts
+ %% unacked messages from basic.get too. Pretty obscure though.
+ Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
+ PrefetchCount, queue:len(UAMQ)),
+ case ((not rabbit_limiter:is_active(Limiter)) andalso
+ rabbit_limiter:is_active(Limiter1)) of
+ true -> rabbit_amqqueue:activate_limit_all(
+ consumer_queues(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
+
+handle_method(#'basic.recover_async'{requeue = true},
+ _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
+ OkFun = fun () -> ok end,
+ UAMQL = queue:to_list(UAMQ),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_misc:with_exit_handler(
+ OkFun,
+ fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
+ end, lists:reverse(UAMQL)),
+ ok = notify_limiter(Limiter, UAMQL),
+ %% No answer required - basic.recover is the newer, synchronous
+ %% variant of this method
+ {noreply, State#ch{unacked_message_q = queue:new()}};
+
+handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
+
+handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
+ {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content, State),
+ {reply, #'basic.recover_ok'{}, State1};
+
+handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue},
+ _, State) ->
+ reject(DeliveryTag, Requeue, false, State);
+
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
+ type = TypeNameBin,
+ passive = false,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ nowait = NoWait,
+ arguments = Args},
+ _, State = #ch{virtual_host = VHostPath}) ->
+ CheckedType = rabbit_exchange:check_type(TypeNameBin),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_not_default_exchange(ExchangeName),
+ check_configure_permitted(ExchangeName, State),
+ X = case rabbit_exchange:lookup(ExchangeName) of
+ {ok, FoundX} -> FoundX;
+ {error, not_found} ->
+ check_name('exchange', ExchangeNameBin),
+ AeKey = <<"alternate-exchange">>,
+ case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of
+ undefined -> ok;
+ {error, {invalid_type, Type}} ->
+ precondition_failed(
+ "invalid type '~s' for arg '~s' in ~s",
+ [Type, AeKey, rabbit_misc:rs(ExchangeName)]);
+ AName -> check_read_permitted(ExchangeName, State),
+ check_write_permitted(AName, State),
+ ok
+ end,
+ rabbit_exchange:declare(ExchangeName,
+ CheckedType,
+ Durable,
+ AutoDelete,
+ Internal,
+ Args)
+ end,
+ ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
+ AutoDelete, Internal, Args),
+ return_ok(State, NoWait, #'exchange.declare_ok'{});
+
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
+ passive = true,
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath}) ->
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_not_default_exchange(ExchangeName),
+ _ = rabbit_exchange:lookup_or_die(ExchangeName),
+ return_ok(State, NoWait, #'exchange.declare_ok'{});
+
+handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
+ if_unused = IfUnused,
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath}) ->
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_not_default_exchange(ExchangeName),
+ check_exchange_deletion(ExchangeName),
+ check_configure_permitted(ExchangeName, State),
+ case rabbit_exchange:delete(ExchangeName, IfUnused) of
+ {error, not_found} ->
+ return_ok(State, NoWait, #'exchange.delete_ok'{});
+ {error, in_use} ->
+ precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
+ ok ->
+ return_ok(State, NoWait, #'exchange.delete_ok'{})
+ end;
+
+handle_method(#'exchange.bind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:add/2,
+ SourceNameBin, exchange, DestinationNameBin, RoutingKey,
+ Arguments, #'exchange.bind_ok'{}, NoWait, State);
+
+handle_method(#'exchange.unbind'{destination = DestinationNameBin,
+ source = SourceNameBin,
+ routing_key = RoutingKey,
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:remove/2,
+ SourceNameBin, exchange, DestinationNameBin, RoutingKey,
+ Arguments, #'exchange.unbind_ok'{}, NoWait, State);
+
+%% Note that all declares to these are effectively passive. If it
+%% exists it by definition has one consumer.
+handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
+ _/binary>> = QueueNameBin,
+ nowait = NoWait}, _,
+ State = #ch{virtual_host = VHost}) ->
+ QueueName = rabbit_misc:r(VHost, queue, QueueNameBin),
+ case declare_fast_reply_to(QueueNameBin) of
+ exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State);
+ not_found -> rabbit_misc:not_found(QueueName)
+ end;
+
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = DurableDeclare,
+ exclusive = ExclusiveDeclare,
+ auto_delete = AutoDelete,
+ nowait = NoWait,
+ arguments = Args} = Declare,
+ _, State = #ch{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ queue_collector_pid = CollectorPid}) ->
+ Owner = case ExclusiveDeclare of
+ true -> ConnPid;
+ false -> none
+ end,
+ Durable = DurableDeclare andalso not ExclusiveDeclare,
+ ActualNameBin = case QueueNameBin of
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
+ Other -> check_name('queue', Other)
+ end,
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, State),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
+ Q, Durable, AutoDelete, Args, Owner),
+ maybe_stat(NoWait, Q)
+ end) of
+ {ok, MessageCount, ConsumerCount} ->
+ return_queue_declare_ok(QueueName, NoWait, MessageCount,
+ ConsumerCount, State);
+ {error, not_found} ->
+ DlxKey = <<"x-dead-letter-exchange">>,
+ case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
+ undefined ->
+ ok;
+ {error, {invalid_type, Type}} ->
+ precondition_failed(
+ "invalid type '~s' for arg '~s' in ~s",
+ [Type, DlxKey, rabbit_misc:rs(QueueName)]);
+ DLX ->
+ check_read_permitted(QueueName, State),
+ check_write_permitted(DLX, State),
+ ok
+ end,
+ case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ {new, #amqqueue{pid = QPid}} ->
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as
+ %% the connection shuts down.
+ ok = case Owner of
+ none -> ok;
+ _ -> rabbit_queue_collector:register(
+ CollectorPid, QPid)
+ end,
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
+ {existing, _Q} ->
+ %% must have been created between the stat and the
+ %% declare. Loop around again.
+ handle_method(Declare, none, State);
+ {absent, Q, Reason} ->
+ rabbit_misc:absent(Q, Reason);
+ {owner_died, _Q} ->
+ %% Presumably our own days are numbered since the
+ %% connection has died. Pretend the queue exists though,
+ %% just so nothing fails.
+ return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
+ end;
+ {error, {absent, Q, Reason}} ->
+ rabbit_misc:absent(Q, Reason)
+ end;
+
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = true,
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath,
+ conn_pid = ConnPid}) ->
+ QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ rabbit_amqqueue:with_or_die(
+ QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
+ return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
+ State);
+
+handle_method(#'queue.delete'{queue = QueueNameBin,
+ if_unused = IfUnused,
+ if_empty = IfEmpty,
+ nowait = NoWait},
+ _, State = #ch{conn_pid = ConnPid}) ->
+ QueueName = qbin_to_resource(QueueNameBin, State),
+ check_configure_permitted(QueueName, State),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
+ end,
+ fun (not_found) -> {ok, 0};
+ ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q),
+ {ok, 0};
+ ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
+ end) of
+ {error, in_use} ->
+ precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
+ {error, not_empty} ->
+ precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
+ {ok, PurgedMessageCount} ->
+ return_ok(State, NoWait,
+ #'queue.delete_ok'{message_count = PurgedMessageCount})
+ end;
+
+handle_method(#'queue.bind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ nowait = NoWait,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:add/2,
+ ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
+ #'queue.bind_ok'{}, NoWait, State);
+
+handle_method(#'queue.unbind'{queue = QueueNameBin,
+ exchange = ExchangeNameBin,
+ routing_key = RoutingKey,
+ arguments = Arguments}, _, State) ->
+ binding_action(fun rabbit_binding:remove/2,
+ ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
+ #'queue.unbind_ok'{}, false, State);
+
+handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait},
+ _, State = #ch{conn_pid = ConnPid}) ->
+ QueueName = qbin_to_resource(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
+ {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ConnPid,
+ fun (Q) -> rabbit_amqqueue:purge(Q) end),
+ return_ok(State, NoWait,
+ #'queue.purge_ok'{message_count = PurgedMessageCount});
+
+handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
+ precondition_failed("cannot switch from confirm to tx mode");
+
+handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
+
+handle_method(#'tx.select'{}, _, State) ->
+ {reply, #'tx.select_ok'{}, State};
+
+handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
+ precondition_failed("channel is not transactional");
+
+handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
+ lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1);
+ ({Requeue, A}) -> reject(Requeue, Rev(A), Limiter)
+ end, lists:reverse(Acks)),
+ {noreply, maybe_complete_tx(State1#ch{tx = committing})};
+
+handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
+ precondition_failed("channel is not transactional");
+
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ tx = {_Msgs, Acks}}) ->
+ AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
+ UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
+ {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
+ tx = new_tx()}};
+
+handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
+ precondition_failed("cannot switch from tx to confirm mode");
+
+handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
+ return_ok(State#ch{confirm_enabled = true},
+ NoWait, #'confirm.select_ok'{});
+
+handle_method(#'channel.flow'{active = true}, _, State) ->
+ {reply, #'channel.flow_ok'{active = true}, State};
+
+handle_method(#'channel.flow'{active = false}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "active=false", []);
+
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ drain = Drain},
+ _, State = #ch{consumer_mapping = Consumers}) ->
+ case dict:find(CTag, Consumers) of
+ {ok, {Q, _CParams}} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain),
+ {noreply, State};
+ error -> precondition_failed(
+ "unknown consumer tag '~s'", [CTag])
+ end;
+
+handle_method(_MethodRecord, _Content, _State) ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unimplemented method", []).
+
+%%----------------------------------------------------------------------------
+
+%% We get the queue process to send the consume_ok on our behalf. This
+%% is for symmetry with basic.cancel - see the comment in that method
+%% for why.
+basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait,
+ State = #ch{conn_pid = ConnPid,
+ limiter = Limiter,
+ consumer_mapping = ConsumerMapping}) ->
+ case rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ConnPid,
+ fun (Q) ->
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(),
+ rabbit_limiter:pid(Limiter),
+ rabbit_limiter:is_active(Limiter),
+ ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q}
+ end) of
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
+ CM1 = dict:store(
+ ActualConsumerTag,
+ {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
+ ConsumerMapping),
+ State1 = monitor_delivering_queue(
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
+ {ok, case NoWait of
+ true -> consumer_monitor(ActualConsumerTag, State1);
+ false -> State1
+ end};
+ {{error, exclusive_consume_unavailable} = E, _Q} ->
+ E
+ end.
+
+maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q);
+maybe_stat(true, _Q) -> {ok, 0, 0}.
+
+consumer_monitor(ConsumerTag,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_monitors = QMons,
+ queue_consumers = QCons}) ->
+ {#amqqueue{pid = QPid}, _CParams} =
+ dict:fetch(ConsumerTag, ConsumerMapping),
+ QCons1 = dict:update(QPid, fun (CTags) ->
+ gb_sets:insert(ConsumerTag, CTags)
+ end,
+ gb_sets:singleton(ConsumerTag), QCons),
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ queue_consumers = QCons1}.
+
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
+ delivering_queues = case NoAck of
+ true -> DQ;
+ false -> sets:add_element(QPid, DQ)
+ end}.
+
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
+ mandatory = Mand}) ->
+ {MMsgs, Mand1} = dtree:take(QPid, Mand),
+ [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
+ State1 = State#ch{mandatory = Mand1},
+ case rabbit_misc:is_abnormal_exit(Reason) of
+ true -> {MXs, UC1} = dtree:take_all(QPid, UC),
+ send_nacks(MXs, State1#ch{unconfirmed = UC1});
+ false -> {MXs, UC1} = dtree:take(QPid, UC),
+ record_confirms(MXs, State1#ch{unconfirmed = UC1})
+
+ end.
+
+handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons,
+ queue_names = QNames}) ->
+ ConsumerTags = case dict:find(QPid, QCons) of
+ error -> gb_sets:new();
+ {ok, CTags} -> CTags
+ end,
+ gb_sets:fold(
+ fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
+ QName = dict:fetch(QPid, QNames),
+ case queue_down_consumer_action(CTag, CMap) of
+ remove ->
+ cancel_consumer(CTag, QName, StateN);
+ {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} ->
+ case catch basic_consume( %% [0]
+ QName, NoAck, ConsumerPrefetch, CTag,
+ Exclusive, Args, true, StateN) of
+ {ok, StateN1} -> StateN1;
+ _ -> cancel_consumer(CTag, QName, StateN)
+ end
+ end
+ end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags).
+
+%% [0] There is a slight danger here that if a queue is deleted and
+%% then recreated again the reconsume will succeed even though it was
+%% not an HA failover. But the likelihood is not great and most users
+%% are unlikely to care.
+
+cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities,
+ consumer_mapping = CMap}) ->
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
+ nowait = true}, State);
+ _ -> ok
+ end,
+ rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag},
+ {channel, self()},
+ {queue, QName}]),
+ State#ch{consumer_mapping = dict:erase(CTag, CMap)}.
+
+queue_down_consumer_action(CTag, CMap) ->
+ {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap),
+ case rabbit_misc:table_lookup(Args, <<"x-cancel-on-ha-failover">>) of
+ {bool, true} -> remove;
+ _ -> {recover, ConsumeSpec}
+ end.
+
+handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+
+binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
+ RoutingKey, Arguments, ReturnMethod, NoWait,
+ State = #ch{virtual_host = VHostPath,
+ conn_pid = ConnPid }) ->
+ DestinationName = name_to_resource(DestinationType, DestinationNameBin, State),
+ check_write_permitted(DestinationName, State),
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
+ check_read_permitted(ExchangeName, State),
+ case Fun(#binding{source = ExchangeName,
+ destination = DestinationName,
+ key = RoutingKey,
+ args = Arguments},
+ fun (_X, Q = #amqqueue{}) ->
+ try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
+ catch exit:Reason -> {error, Reason}
+ end;
+ (_X, #exchange{}) ->
+ ok
+ end) of
+ {error, {resources_missing, [{not_found, Name} | _]}} ->
+ rabbit_misc:not_found(Name);
+ {error, {resources_missing, [{absent, Q, Reason} | _]}} ->
+ rabbit_misc:absent(Q, Reason);
+ {error, binding_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no binding ~s between ~s and ~s",
+ [RoutingKey, rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(DestinationName)]);
+ {error, {binding_invalid, Fmt, Args}} ->
+ rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
+ {error, #amqp_error{} = Error} ->
+ rabbit_misc:protocol_error(Error);
+ ok -> return_ok(State, NoWait, ReturnMethod)
+ end.
+
+basic_return(#basic_message{exchange_name = ExchangeName,
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content},
+ State = #ch{protocol = Protocol, writer_pid = WriterPid},
+ Reason) ->
+ ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State),
+ {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.return'{reply_code = ReplyCode,
+ reply_text = ReplyText,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content).
+
+reject(DeliveryTag, Requeue, Multiple,
+ State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply, case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end}.
+
+%% NB: Acked is in youngest-first order
+reject(Requeue, Acked, Limiter) ->
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self())
+ end, Acked),
+ ok = notify_limiter(Limiter, Acked).
+
+record_sent(ConsumerTag, AckRequired,
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
+ State = #ch{unacked_message_q = UAMQ,
+ next_tag = DeliveryTag,
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName,
+ channel = ChannelNum}) ->
+ ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ case Redelivered of
+ true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
+ false -> ok
+ end,
+ rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
+ UAMQ1 = case AckRequired of
+ true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ UAMQ);
+ false -> UAMQ
+ end,
+ State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
+
+%% NB: returns acks in youngest-first order
+collect_acks(Q, 0, true) ->
+ {lists:reverse(queue:to_list(Q)), queue:new()};
+collect_acks(Q, DeliveryTag, Multiple) ->
+ collect_acks([], [], Q, DeliveryTag, Multiple).
+
+collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
+ case queue:out(Q) of
+ {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
+ QTail} ->
+ if CurrentDeliveryTag == DeliveryTag ->
+ {[UnackedMsg | ToAcc],
+ case PrefixAcc of
+ [] -> QTail;
+ _ -> queue:join(
+ queue:from_list(lists:reverse(PrefixAcc)),
+ QTail)
+ end};
+ Multiple ->
+ collect_acks([UnackedMsg | ToAcc], PrefixAcc,
+ QTail, DeliveryTag, Multiple);
+ true ->
+ collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
+ QTail, DeliveryTag, Multiple)
+ end;
+ {empty, _} ->
+ precondition_failed("unknown delivery tag ~w", [DeliveryTag])
+ end.
+
+%% NB: Acked is in youngest-first order
+ack(Acked, State = #ch{queue_names = QNames}) ->
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ ?INCR_STATS(case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count}];
+ error -> []
+ end, ack, State)
+ end, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked).
+
+%% {Msgs, Acks}
+%%
+%% Msgs is a queue.
+%%
+%% Acks looks s.t. like this:
+%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
+%%
+%% Each element is a pair consisting of a tag and a list of
+%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
+%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
+%% well as the list overall, are in "most-recent (generally youngest)
+%% ack first" order.
+new_tx() -> {queue:new(), []}.
+
+notify_queues(State = #ch{state = closing}) ->
+ {ok, State};
+notify_queues(State = #ch{consumer_mapping = Consumers,
+ delivering_queues = DQ }) ->
+ QPids = sets:to_list(
+ sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ {rabbit_amqqueue:notify_down_all(QPids, self(),
+ get(channel_operation_timeout)),
+ State#ch{state = closing}}.
+
+foreach_per_queue(_F, []) ->
+ ok;
+foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId]);
+%% NB: UAL should be in youngest-first order; the tree values will
+%% then be in oldest-first order
+foreach_per_queue(F, UAL) ->
+ T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+ end, gb_trees:empty(), UAL),
+ rabbit_misc:gb_trees_foreach(F, T).
+
+consumer_queues(Consumers) ->
+ lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
+ <- dict:to_list(Consumers)]).
+
+%% tell the limiter about the number of acks that have been received
+%% for messages delivered to subscribed consumers, but not acks for
+%% messages sent in a response to a basic.get (identified by their
+%% 'none' consumer tag)
+notify_limiter(Limiter, Acked) ->
+ %% optimisation: avoid the potentially expensive 'foldl' in the
+ %% common case.
+ case rabbit_limiter:is_active(Limiter) of
+ false -> ok;
+ true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(Limiter, Count)
+ end
+ end.
+
+deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
+ confirm = false,
+ mandatory = false},
+ []}, State) -> %% optimisation
+ ?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
+ State;
+deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
+ exchange_name = XName},
+ mandatory = Mandatory,
+ confirm = Confirm,
+ msg_seq_no = MsgSeqNo},
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = State#ch{queue_names = QNames1,
+ queue_monitors = QMons1},
+ %% NB: the order here is important since basic.returns must be
+ %% sent before confirms.
+ State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo,
+ Message, State1),
+ State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo,
+ XName, State2),
+ ?INCR_STATS([{exchange_stats, XName, 1} |
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State3),
+ State3.
+
+process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
+ State;
+process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State, no_route),
+ State;
+process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
+ State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
+ State#ch.mandatory)}.
+
+process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
+ State;
+process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
+ record_confirms([{MsgSeqNo, XName}], State);
+process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+ State#ch.unconfirmed)}.
+
+send_nacks([], State) ->
+ State;
+send_nacks(_MXs, State = #ch{state = closing,
+ tx = none}) -> %% optimisation
+ State;
+send_nacks(MXs, State = #ch{tx = none}) ->
+ coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
+ fun(MsgSeqNo, Multiple) ->
+ #'basic.nack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State);
+send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
+ State#ch{tx = failed};
+send_nacks(_, State) ->
+ maybe_complete_tx(State#ch{tx = failed}).
+
+send_confirms(State = #ch{tx = none, confirmed = []}) ->
+ State;
+send_confirms(State = #ch{tx = none, confirmed = C}) ->
+ case rabbit_node_monitor:pause_partition_guard() of
+ ok -> MsgSeqNos =
+ lists:foldl(
+ fun ({MsgSeqNo, XName}, MSNs) ->
+ ?INCR_STATS([{exchange_stats, XName, 1}],
+ confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
+ pausing -> State
+ end;
+send_confirms(State) ->
+ case rabbit_node_monitor:pause_partition_guard() of
+ ok -> maybe_complete_tx(State);
+ pausing -> State
+ end.
+
+send_confirms([], State) ->
+ State;
+send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation
+ State;
+send_confirms([MsgSeqNo], State) ->
+ ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
+ State;
+send_confirms(Cs, State) ->
+ coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
+ #'basic.ack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
+
+coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
+ SMsgSeqNos = lists:usort(MsgSeqNos),
+ CutOff = case dtree:is_empty(UC) of
+ true -> lists:last(SMsgSeqNos) + 1;
+ false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
+ end,
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
+ case Ms of
+ [] -> ok;
+ _ -> ok = send(MkMsgFun(lists:last(Ms), true), State)
+ end,
+ [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss],
+ State.
+
+ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
+ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks].
+
+ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
+
+maybe_complete_tx(State = #ch{tx = {_, _}}) ->
+ State;
+maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
+ case dtree:is_empty(UC) of
+ false -> State;
+ true -> complete_tx(State#ch{confirmed = []})
+ end.
+
+complete_tx(State = #ch{tx = committing}) ->
+ ok = send(#'tx.commit_ok'{}, State),
+ State#ch{tx = new_tx()};
+complete_tx(State = #ch{tx = failed}) ->
+ {noreply, State1} = handle_exception(
+ rabbit_misc:amqp_error(
+ precondition_failed, "partial tx completion", [],
+ 'tx.commit'),
+ State),
+ State1#ch{tx = new_tx()}.
+
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _) -> self();
+i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
+i(number, #ch{channel = Channel}) -> Channel;
+i(user, #ch{user = User}) -> User#user.username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{tx = Tx}) -> Tx =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(name, State) -> name(State);
+i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{}) -> 0;
+i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
+i(acks_uncommitted, #ch{}) -> 0;
+i(state, #ch{state = running}) -> credit_flow:state();
+i(state, #ch{state = State}) -> State;
+i(prefetch_count, #ch{consumer_prefetch = C}) -> C;
+i(global_prefetch_count, #ch{limiter = Limiter}) ->
+ rabbit_limiter:get_prefetch_limit(Limiter);
+i(garbage_collection, _State) ->
+ rabbit_misc:get_gc_info(self());
+i(reductions, _State) ->
+ {reductions, Reductions} = erlang:process_info(self(), reductions),
+ Reductions;
+i(Item, _) ->
+ throw({bad_argument, Item}).
+
+name(#ch{conn_name = ConnName, channel = Channel}) ->
+ list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
+
+incr_stats(Incs, Measure) ->
+ [begin
+ rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
+ %% Keys in the process dictionary are used to clean up the core metrics
+ put({Type, Key}, none)
+ end || {Type, Key, Inc} <- Incs].
+
+emit_stats(State) -> emit_stats(State, []).
+
+emit_stats(State, Extra) ->
+ [{reductions, Red} | Coarse0] = infos(?STATISTICS_KEYS, State),
+ rabbit_core_metrics:channel_stats(self(), Extra ++ Coarse0),
+ rabbit_core_metrics:channel_stats(reductions, self(), Red).
+
+erase_queue_stats(QName) ->
+ rabbit_core_metrics:channel_queue_down({self(), QName}),
+ erase({queue_stats, QName}),
+ [begin
+ rabbit_core_metrics:channel_queue_exchange_down({self(), QX}),
+ erase({queue_exchange_stats, QX})
+ end || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].
+
+get_vhost(#ch{virtual_host = VHost}) -> VHost.
+
+get_user(#ch{user = User}) -> User.
+
+delete_stats({queue_stats, QName}) ->
+ rabbit_core_metrics:channel_queue_down({self(), QName});
+delete_stats({exchange_stats, XName}) ->
+ rabbit_core_metrics:channel_exchange_down({self(), XName});
+delete_stats({queue_exchange_stats, QX}) ->
+ rabbit_core_metrics:channel_queue_exchange_down({self(), QX});
+delete_stats(_) ->
+ ok.
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
new file mode 100644
index 0000000000..b237a5f691
--- /dev/null
+++ b/src/rabbit_channel_interceptor.erl
@@ -0,0 +1,104 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_channel_interceptor).
+
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([init/1, intercept_in/3]).
+
+-type(method_name() :: rabbit_framing:amqp_method_name()).
+-type(original_method() :: rabbit_framing:amqp_method_record()).
+-type(processed_method() :: rabbit_framing:amqp_method_record()).
+-type(original_content() :: rabbit_types:maybe(rabbit_types:content())).
+-type(processed_content() :: rabbit_types:maybe(rabbit_types:content())).
+-type(interceptor_state() :: term()).
+
+-callback description() -> [proplists:property()].
+%% Derive some initial state from the channel. This will be passed back
+%% as the third argument of intercept/3.
+-callback init(rabbit_channel:channel()) -> interceptor_state().
+-callback intercept(original_method(), original_content(),
+ interceptor_state()) ->
+ {processed_method(), processed_content()} |
+ rabbit_misc:channel_or_connection_exit().
+-callback applies_to() -> list(method_name()).
+
+init(Ch) ->
+ Mods = [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor)],
+ check_no_overlap(Mods),
+ [{Mod, Mod:init(Ch)} || Mod <- Mods].
+
+check_no_overlap(Mods) ->
+ check_no_overlap1([sets:from_list(Mod:applies_to()) || Mod <- Mods]).
+
+%% Check no non-empty pairwise intersection in a list of sets
+check_no_overlap1(Sets) ->
+ lists:foldl(fun(Set, Union) ->
+ Is = sets:intersection(Set, Union),
+ case sets:size(Is) of
+ 0 -> ok;
+ _ ->
+ internal_error("Interceptor: more than one "
+ "module handles ~p~n", [Is])
+ end,
+ sets:union(Set, Union)
+ end,
+ sets:new(),
+ Sets),
+ ok.
+
+intercept_in(M, C, Mods) ->
+ lists:foldl(fun({Mod, ModState}, {M1, C1}) ->
+ call_module(Mod, ModState, M1, C1)
+ end,
+ {M, C},
+ Mods).
+
+call_module(Mod, St, M, C) ->
+ % this little dance is because Mod might be unloaded at any point
+ case (catch {ok, Mod:intercept(M, C, St)}) of
+ {ok, R} -> validate_response(Mod, M, C, R);
+ {'EXIT', {undef, [{Mod, intercept, _, _} | _]}} -> {M, C}
+ end.
+
+validate_response(Mod, M1, C1, R = {M2, C2}) ->
+ case {validate_method(M1, M2), validate_content(C1, C2)} of
+ {true, true} -> R;
+ {false, _} ->
+ internal_error("Interceptor: ~p expected to return "
+ "method: ~p but returned: ~p",
+ [Mod, rabbit_misc:method_record_type(M1),
+ rabbit_misc:method_record_type(M2)]);
+ {_, false} ->
+ internal_error("Interceptor: ~p expected to return "
+ "content iff content is provided but "
+ "content in = ~p; content out = ~p",
+ [Mod, C1, C2])
+ end.
+
+validate_method(M, M2) ->
+ rabbit_misc:method_record_type(M) =:= rabbit_misc:method_record_type(M2).
+
+validate_content(none, none) -> true;
+validate_content(#content{}, #content{}) -> true;
+validate_content(_, _) -> false.
+
+%% keep dialyzer happy
+-spec internal_error(string(), [any()]) -> no_return().
+internal_error(Format, Args) ->
+ rabbit_misc:protocol_error(internal_error, Format, Args).
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
new file mode 100644
index 0000000000..4a545dfdbf
--- /dev/null
+++ b/src/rabbit_exchange_decorator.erl
@@ -0,0 +1,113 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_decorator).
+
+-include("rabbit.hrl").
+
+-export([select/2, set/1, register/2, unregister/1]).
+
+%% This is like an exchange type except that:
+%%
+%% 1) It applies to all exchanges as soon as it is installed, therefore
+%% 2) It is not allowed to affect validation, so no validate/1 or
+%% assert_args_equivalence/2
+%%
+%% It's possible in the future we might make decorators
+%% able to manipulate messages as they are published.
+
+-type(tx() :: 'transaction' | 'none').
+-type(serial() :: pos_integer() | tx()).
+
+-callback description() -> [proplists:property()].
+
+%% Should Rabbit ensure that all binding events that are
+%% delivered to an individual exchange can be serialised? (they
+%% might still be delivered out of order, but there'll be a
+%% serial number).
+-callback serialise_events(rabbit_types:exchange()) -> boolean().
+
+%% called after declaration and recovery
+-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
+
+%% called after exchange (auto)deletion.
+-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
+ 'ok'.
+
+%% called when the policy attached to this exchange changes.
+-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
+ 'ok'.
+
+%% called after a binding has been added or recovered
+-callback add_binding(serial(), rabbit_types:exchange(),
+ rabbit_types:binding()) -> 'ok'.
+
+%% called after bindings have been deleted.
+-callback remove_bindings(serial(), rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok'.
+
+%% Allows additional destinations to be added to the routing decision.
+-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ [rabbit_amqqueue:name() | rabbit_exchange:name()].
+
+%% Whether the decorator wishes to receive callbacks for the exchange
+%% none:no callbacks, noroute:all callbacks except route, all:all callbacks
+-callback active_for(rabbit_types:exchange()) -> 'none' | 'noroute' | 'all'.
+
+%%----------------------------------------------------------------------------
+
+%% select a subset of active decorators
+select(all, {Route, NoRoute}) -> filter(Route ++ NoRoute);
+select(route, {Route, _NoRoute}) -> filter(Route);
+select(raw, {Route, NoRoute}) -> Route ++ NoRoute.
+
+filter(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+set(X) ->
+ Decs = lists:foldl(fun (D, {Route, NoRoute}) ->
+ ActiveFor = D:active_for(X),
+ {cons_if_eq(all, ActiveFor, D, Route),
+ cons_if_eq(noroute, ActiveFor, D, NoRoute)}
+ end, {[], []}, list()),
+ X#exchange{decorators = Decs}.
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
+
+cons_if_eq(Select, Select, Item, List) -> [Item | List];
+cons_if_eq(_Select, _Other, _Item, List) -> List.
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(exchange_decorator, TypeName, ModuleName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(exchange_decorator, TypeName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+maybe_recover(X = #exchange{name = Name,
+ decorators = Decs}) ->
+ #exchange{decorators = Decs1} = set(X),
+ Old = lists:sort(select(all, Decs)),
+ New = lists:sort(select(all, Decs1)),
+ case New of
+ Old -> ok;
+ _ -> %% TODO create a tx here for non-federation decorators
+ [M:create(none, X) || M <- New -- Old],
+ rabbit_exchange:update_decorators(Name)
+ end.
diff --git a/src/rabbit_health_check.erl b/src/rabbit_health_check.erl
new file mode 100644
index 0000000000..d8d4cc240b
--- /dev/null
+++ b/src/rabbit_health_check.erl
@@ -0,0 +1,95 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+-module(rabbit_health_check).
+
+%% External API
+-export([node/1, node/2]).
+
+%% Internal API
+-export([local/0]).
+
+-spec node(node(), timeout()) -> ok | {badrpc, term()} | {error_string, string()}.
+-spec local() -> ok | {error_string, string()}.
+
+%%----------------------------------------------------------------------------
+%% External functions
+%%----------------------------------------------------------------------------
+
+node(Node) ->
+ %% same default as in CLI
+ node(Node, 70000).
+node(Node, Timeout) ->
+ rabbit_misc:rpc_call(Node, rabbit_health_check, local, [], Timeout).
+
+local() ->
+ run_checks([list_channels, list_queues, alarms, rabbit_node_monitor]).
+
+%%----------------------------------------------------------------------------
+%% Internal functions
+%%----------------------------------------------------------------------------
+run_checks([]) ->
+ ok;
+run_checks([C|Cs]) ->
+ case node_health_check(C) of
+ ok ->
+ run_checks(Cs);
+ Error ->
+ Error
+ end.
+
+node_health_check(list_channels) ->
+ case rabbit_channel:info_local([pid]) of
+ L when is_list(L) ->
+ ok;
+ Other ->
+ ErrorMsg = io_lib:format("list_channels unexpected output: ~p",
+ [Other]),
+ {error_string, ErrorMsg}
+ end;
+
+node_health_check(list_queues) ->
+ health_check_queues(rabbit_vhost:list());
+
+node_health_check(rabbit_node_monitor) ->
+ case rabbit_node_monitor:partitions() of
+ L when is_list(L) ->
+ ok;
+ Other ->
+ ErrorMsg = io_lib:format("rabbit_node_monitor reports unexpected partitions value: ~p",
+ [Other]),
+ {error_string, ErrorMsg}
+ end;
+
+node_health_check(alarms) ->
+ case proplists:get_value(alarms, rabbit:status()) of
+ [] ->
+ ok;
+ Alarms ->
+ ErrorMsg = io_lib:format("resource alarm(s) in effect:~p", [Alarms]),
+ {error_string, ErrorMsg}
+ end.
+
+health_check_queues([]) ->
+ ok;
+health_check_queues([VHost|RestVHosts]) ->
+ case rabbit_amqqueue:info_local(VHost) of
+ L when is_list(L) ->
+ health_check_queues(RestVHosts);
+ Other ->
+ ErrorMsg = io_lib:format("list_queues unexpected output for vhost ~s: ~p",
+ [VHost, Other]),
+ {error_string, ErrorMsg}
+ end.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
new file mode 100644
index 0000000000..cb7f181121
--- /dev/null
+++ b/src/rabbit_nodes.erl
@@ -0,0 +1,230 @@
+%% The contents of this file are subject to the Mozilla Public License
+
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_nodes).
+
+-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
+ is_running/2, is_process_running/2,
+ cluster_name/0, set_cluster_name/1, ensure_epmd/0,
+ all_running/0]).
+
+-include_lib("kernel/include/inet.hrl").
+
+-define(EPMD_TIMEOUT, 30000).
+-define(TCP_DIAGNOSTIC_TIMEOUT, 5000).
+-define(ERROR_LOGGER_HANDLER, rabbit_error_logger_handler).
+
+%%----------------------------------------------------------------------------
+%% Specs
+%%----------------------------------------------------------------------------
+
+-spec names(string()) ->
+ rabbit_types:ok_or_error2([{string(), integer()}], term()).
+-spec diagnostics([node()]) -> string().
+-spec cookie_hash() -> string().
+-spec is_running(node(), atom()) -> boolean().
+-spec is_process_running(node(), atom()) -> boolean().
+-spec cluster_name() -> binary().
+-spec set_cluster_name(binary()) -> 'ok'.
+-spec ensure_epmd() -> 'ok'.
+-spec all_running() -> [node()].
+
+%%----------------------------------------------------------------------------
+
+names(Hostname) ->
+ Self = self(),
+ Ref = make_ref(),
+ {Pid, MRef} = spawn_monitor(
+ fun () -> Self ! {Ref, net_adm:names(Hostname)} end),
+ timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
+ receive
+ {Ref, Names} -> erlang:demonitor(MRef, [flush]),
+ Names;
+ {'DOWN', MRef, process, Pid, Reason} -> {error, Reason}
+ end.
+
+diagnostics(Nodes) ->
+ verbose_erlang_distribution(true),
+ NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n"
+ "attempted to contact: ~p~n", [Nodes]}] ++
+ [diagnostics_node(Node) || Node <- Nodes] ++
+ current_node_details(),
+ verbose_erlang_distribution(false),
+ rabbit_misc:format_many(lists:flatten(NodeDiags)).
+
+verbose_erlang_distribution(true) ->
+ net_kernel:verbose(1),
+ error_logger:add_report_handler(?ERROR_LOGGER_HANDLER);
+verbose_erlang_distribution(false) ->
+ net_kernel:verbose(0),
+ error_logger:delete_report_handler(?ERROR_LOGGER_HANDLER).
+
+current_node_details() ->
+ [{"~ncurrent node details:~n- node name: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- home dir: ~s", [Home]};
+ Other -> {"- no home dir: ~p", [Other]}
+ end,
+ {"- cookie hash: ~s", [cookie_hash()]}].
+
+diagnostics_node(Node) ->
+ {Name, Host} = parts(Node),
+ [{"~s:", [Node]} |
+ case names(Host) of
+ {error, Reason} ->
+ [{" * unable to connect to epmd (port ~s) on ~s: ~s~n",
+ [epmd_port(), Host, rabbit_misc:format_inet_error(Reason)]}];
+ {ok, NamePorts} ->
+ [{" * connected to epmd (port ~s) on ~s",
+ [epmd_port(), Host]}] ++
+ case net_adm:ping(Node) of
+ pong -> dist_working_diagnostics(Node);
+ pang -> dist_broken_diagnostics(Name, Host, NamePorts)
+ end
+ end].
+
+epmd_port() ->
+ case init:get_argument(epmd_port) of
+ {ok, [[Port | _] | _]} when is_list(Port) -> Port;
+ error -> "4369"
+ end.
+
+dist_working_diagnostics(Node) ->
+ case rabbit:is_running(Node) of
+ true -> [{" * node ~s up, 'rabbit' application running", [Node]}];
+ false -> [{" * node ~s up, 'rabbit' application not running~n"
+ " * running applications on ~s: ~p~n"
+ " * suggestion: start_app on ~s",
+ [Node, Node, remote_apps(Node), Node]}]
+ end.
+
+remote_apps(Node) ->
+ %% We want a timeout here because really, we don't trust the node,
+ %% the last thing we want to do is hang.
+ case rpc:call(Node, application, which_applications, [5000]) of
+ {badrpc, _} = E -> E;
+ Apps -> [App || {App, _, _} <- Apps]
+ end.
+
+dist_broken_diagnostics(Name, Host, NamePorts) ->
+ case [{N, P} || {N, P} <- NamePorts, N =:= Name] of
+ [] ->
+ {SelfName, SelfHost} = parts(node()),
+ Others = [list_to_atom(N) || {N, _} <- NamePorts,
+ N =/= case SelfHost of
+ Host -> SelfName;
+ _ -> never_matches
+ end],
+ OthersDiag = case Others of
+ [] -> [{" no other nodes on ~s",
+ [Host]}];
+ _ -> [{" other nodes on ~s: ~p",
+ [Host, Others]}]
+ end,
+ [{" * epmd reports: node '~s' not running at all", [Name]},
+ OthersDiag, {" * suggestion: start the node", []}];
+ [{Name, Port}] ->
+ [{" * epmd reports node '~s' running on port ~b", [Name, Port]} |
+ case diagnose_connect(Host, Port) of
+ ok ->
+ connection_succeeded_diagnostics();
+ {error, Reason} ->
+ [{" * can't establish TCP connection, reason: ~s~n"
+ " * suggestion: blocked by firewall?",
+ [rabbit_misc:format_inet_error(Reason)]}]
+ end]
+ end.
+
+connection_succeeded_diagnostics() ->
+ case gen_event:call(error_logger, ?ERROR_LOGGER_HANDLER, get_connection_report) of
+ [] ->
+ [{" * TCP connection succeeded but Erlang distribution "
+ "failed~n"
+ " * suggestion: hostname mismatch?~n"
+ " * suggestion: is the cookie set correctly?~n"
+ " * suggestion: is the Erlang distribution using TLS?", []}];
+ Report ->
+ [{" * TCP connection succeeded but Erlang distribution "
+ "failed~n", []}]
+ ++ Report
+ end.
+
+diagnose_connect(Host, Port) ->
+ case inet:gethostbyname(Host) of
+ {ok, #hostent{h_addrtype = Family}} ->
+ case gen_tcp:connect(Host, Port, [Family],
+ ?TCP_DIAGNOSTIC_TIMEOUT) of
+ {ok, Socket} -> gen_tcp:close(Socket),
+ ok;
+ {error, _} = E -> E
+ end;
+ {error, _} = E ->
+ E
+ end.
+
+make(NodeStr) ->
+ rabbit_nodes_common:make(NodeStr).
+
+parts(NodeStr) ->
+ rabbit_nodes_common:parts(NodeStr).
+
+cookie_hash() ->
+ base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
+
+is_running(Node, Application) ->
+ case rpc:call(Node, rabbit_misc, which_applications, []) of
+ {badrpc, _} -> false;
+ Apps -> proplists:is_defined(Application, Apps)
+ end.
+
+is_process_running(Node, Process) ->
+ case rpc:call(Node, erlang, whereis, [Process]) of
+ {badrpc, _} -> false;
+ undefined -> false;
+ P when is_pid(P) -> true
+ end.
+
+cluster_name() ->
+ rabbit_runtime_parameters:value_global(
+ cluster_name, cluster_name_default()).
+
+cluster_name_default() ->
+ {ID, _} = rabbit_nodes:parts(node()),
+ {ok, Host} = inet:gethostname(),
+ {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
+ list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
+
+set_cluster_name(Name) ->
+ rabbit_runtime_parameters:set_global(cluster_name, Name).
+
+ensure_epmd() ->
+ {ok, Prog} = init:get_argument(progname),
+ ID = rabbit_misc:random(1000000000),
+ Port = open_port(
+ {spawn_executable, os:find_executable(Prog)},
+ [{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]),
+ "-noshell", "-eval", "halt()."]},
+ exit_status, stderr_to_stdout, use_stdio]),
+ port_shutdown_loop(Port).
+
+port_shutdown_loop(Port) ->
+ receive
+ {Port, {exit_status, _Rc}} -> ok;
+ {Port, _} -> port_shutdown_loop(Port)
+ end.
+
+all_running() -> rabbit_mnesia:cluster_nodes(running).
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
new file mode 100644
index 0000000000..6329b0bdaa
--- /dev/null
+++ b/src/rabbit_queue_collector.erl
@@ -0,0 +1,89 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_collector).
+
+%% Queue collector keeps track of exclusive queues and cleans them
+%% up e.g. when their connection is closed.
+
+-behaviour(gen_server).
+
+-export([start_link/1, register/2, delete_all/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {monitors, delete_from}).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-spec start_link(rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error().
+-spec register(pid(), pid()) -> 'ok'.
+
+%%----------------------------------------------------------------------------
+
+start_link(ProcName) ->
+ gen_server:start_link(?MODULE, [ProcName], []).
+
+register(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register, Q}, infinity).
+
+delete_all(CollectorPid) ->
+ rabbit_queue_collector_common:delete_all(CollectorPid).
+
+%%----------------------------------------------------------------------------
+
+init([ProcName]) ->
+ ?store_proc_name(ProcName),
+ {ok, #state{monitors = pmon:new(), delete_from = undefined}}.
+
+%%--------------------------------------------------------------------------
+
+handle_call({register, QPid}, _From,
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
+ case Deleting of
+ undefined -> ok;
+ _ -> ok = rabbit_amqqueue:delete_immediately([QPid])
+ end,
+ {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}};
+
+handle_call(delete_all, From, State = #state{monitors = QMons,
+ delete_from = undefined}) ->
+ case pmon:monitored(QMons) of
+ [] -> {reply, ok, State#state{delete_from = From}};
+ QPids -> ok = rabbit_amqqueue:delete_immediately(QPids),
+ {noreply, State#state{delete_from = From}}
+ end.
+
+handle_cast(Msg, State) ->
+ {stop, {unhandled_cast, Msg}, State}.
+
+handle_info({'DOWN', _MRef, process, DownPid, _Reason},
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
+ QMons1 = pmon:erase(DownPid, QMons),
+ case Deleting =/= undefined andalso pmon:is_empty(QMons1) of
+ true -> gen_server:reply(Deleting, ok);
+ false -> ok
+ end,
+ {noreply, State#state{monitors = QMons1}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
new file mode 100644
index 0000000000..fa0bafa4c3
--- /dev/null
+++ b/src/rabbit_queue_decorator.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_queue_decorator).
+
+-include("rabbit.hrl").
+
+-export([select/1, set/1, register/2, unregister/1]).
+
+%%----------------------------------------------------------------------------
+
+-callback startup(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
+ 'ok'.
+
+-callback active_for(rabbit_types:amqqueue()) -> boolean().
+
+%% called with Queue, MaxActivePriority, IsEmpty
+-callback consumer_state_changed(
+ rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'.
+
+%%----------------------------------------------------------------------------
+
+select(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(queue_decorator, TypeName, ModuleName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(queue_decorator, TypeName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+maybe_recover(Q = #amqqueue{name = Name,
+ decorators = Decs}) ->
+ #amqqueue{decorators = Decs1} = set(Q),
+ Old = lists:sort(select(Decs)),
+ New = lists:sort(select(Decs1)),
+ case New of
+ Old -> ok;
+ _ -> [M:startup(Q) || M <- New -- Old],
+ rabbit_amqqueue:update_decorators(Name)
+ end.