summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl43
-rw-r--r--src/rabbit_binding.erl39
-rw-r--r--src/rabbit_channel.erl121
-rw-r--r--src/rabbit_direct.erl1
-rw-r--r--src/rabbit_queue_index.erl8
-rw-r--r--src/rabbit_quorum_queue.erl108
-rw-r--r--src/rabbit_reader.erl1
-rw-r--r--src/rabbit_vhost.erl7
8 files changed, 223 insertions, 105 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 337786b571..5488a88836 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -108,12 +108,16 @@ warn_file_limit() ->
ok
end.
--spec recover(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
+-spec recover(rabbit_types:vhost()) ->
+ {RecoveredClassic :: [amqqueue:amqqueue()],
+ FailedClassic :: [amqqueue:amqqueue()],
+ Quorum :: [amqqueue:amqqueue()]}.
recover(VHost) ->
- Classic = find_local_durable_classic_queues(VHost),
+ AllClassic = find_local_durable_classic_queues(VHost),
Quorum = find_local_quorum_queues(VHost),
- recover_classic_queues(VHost, Classic) ++ rabbit_quorum_queue:recover(Quorum).
+ {RecoveredClassic, FailedClassic} = recover_classic_queues(VHost, AllClassic),
+ {RecoveredClassic, FailedClassic, rabbit_quorum_queue:recover(Quorum)}.
recover_classic_queues(VHost, Queues) ->
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
@@ -124,15 +128,16 @@ recover_classic_queues(VHost, Queues) ->
BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]),
case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
{ok, _} ->
- recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
+ RecoveredQs = recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)),
+ RecoveredNames = [amqqueue:get_name(Q) || Q <- RecoveredQs],
+ FailedQueues = [Q || Q <- Queues,
+ not lists:member(amqqueue:get_name(Q), RecoveredNames)],
+ {RecoveredQs, FailedQueues};
{error, Reason} ->
rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
throw({error, Reason})
end.
-filter_per_type(Queues) ->
- lists:partition(fun(Q) -> amqqueue:is_classic(Q) end, Queues).
-
filter_pid_per_type(QPids) ->
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
@@ -156,12 +161,14 @@ stop(VHost) ->
-spec start([amqqueue:amqqueue()]) -> 'ok'.
start(Qs) ->
- {Classic, _Quorum} = filter_per_type(Qs),
%% At this point all recovered queues and their bindings are
%% visible to routing, so now it is safe for them to complete
%% their initialisation (which may involve interacting with other
%% queues).
- _ = [amqqueue:get_pid(Q) ! {self(), go} || Q <- Classic],
+ _ = [amqqueue:get_pid(Q) ! {self(), go}
+ || Q <- Qs,
+ %% All queues are supposed to be classic here.
+ amqqueue:is_classic(Q)],
ok.
mark_local_durable_queues_stopped(VHost) ->
@@ -609,14 +616,14 @@ priv_absent(QueueName, _QPid, _IsDurable, timeout) ->
rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) ->
'ok' | rabbit_types:channel_exit() | rabbit_types:connection_exit().
-assert_equivalence(Q, Durable1, AD1, Args1, Owner) ->
+assert_equivalence(Q, DurableDeclare, AutoDeleteDeclare, Args1, Owner) ->
QName = amqqueue:get_name(Q),
- Durable = amqqueue:is_durable(Q),
- AD = amqqueue:is_auto_delete(Q),
- rabbit_misc:assert_field_equivalence(Durable, Durable1, QName, durable),
- rabbit_misc:assert_field_equivalence(AD, AD1, QName, auto_delete),
- assert_args_equivalence(Q, Args1),
- check_exclusive_access(Q, Owner, strict).
+ DurableQ = amqqueue:is_durable(Q),
+ AutoDeleteQ = amqqueue:is_auto_delete(Q),
+ ok = check_exclusive_access(Q, Owner, strict),
+ ok = rabbit_misc:assert_field_equivalence(DurableQ, DurableDeclare, QName, durable),
+ ok = rabbit_misc:assert_field_equivalence(AutoDeleteQ, AutoDeleteDeclare, QName, auto_delete),
+ ok = assert_args_equivalence(Q, Args1).
-spec check_exclusive_access(amqqueue:amqqueue(), pid()) ->
'ok' | rabbit_types:channel_exit().
@@ -633,7 +640,9 @@ check_exclusive_access(Q, _ReaderPid, _MatchType) ->
QueueName = amqqueue:get_name(Q),
rabbit_misc:protocol_error(
resource_locked,
- "cannot obtain exclusive access to locked ~s",
+ "cannot obtain exclusive access to locked ~s. It could be originally "
+ "declared on another connection or the exclusive property value does not "
+ "match that of the original declaration.",
[rabbit_misc:rs(QueueName)]).
-spec with_exclusive_access_or_die(name(), pid(), qfun(A)) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index ab3bc6c819..05db4188ba 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -49,7 +49,6 @@
-type bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error(
- 'binding_not_found' |
{'binding_invalid', string(), [any()]}).
-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
-type inner_fun() ::
@@ -178,19 +177,15 @@ add(Src, Dst, B, ActingUser) ->
lock_resource(Src),
lock_resource(Dst),
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
- case (SrcDurable andalso DstDurable andalso
- mnesia:read({rabbit_durable_route, B}) =/= []) of
- false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
- fun mnesia:write/3),
- x_callback(transaction, Src, add_binding, B),
- Serial = rabbit_exchange:serial(Src),
- fun () ->
- x_callback(Serial, Src, add_binding, B),
- ok = rabbit_event:notify(
- binding_created,
- info(B) ++ [{user_who_performed_action, ActingUser}])
- end;
- true -> rabbit_misc:const({error, binding_not_found})
+ ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ fun mnesia:write/3),
+ x_callback(transaction, Src, add_binding, B),
+ Serial = rabbit_exchange:serial(Src),
+ fun () ->
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(
+ binding_created,
+ info(B) ++ [{user_who_performed_action, ActingUser}])
end.
-spec remove(rabbit_types:binding()) -> bind_res().
@@ -208,7 +203,10 @@ remove(Binding, InnerFun, ActingUser) ->
case mnesia:read(rabbit_route, B, write) of
[] -> case mnesia:read(rabbit_durable_route, B, write) of
[] -> rabbit_misc:const(ok);
- _ -> rabbit_misc:const({error, binding_not_found})
+ %% We still delete the binding and run
+ %% all post-delete functions if there is only
+ %% a durable route in the database
+ _ -> remove(Src, Dst, B, ActingUser)
end;
_ -> case InnerFun(Src, Dst) of
ok -> remove(Src, Dst, B, ActingUser);
@@ -275,9 +273,8 @@ list_for_source(SrcName) ->
-spec list_for_destination
(rabbit_types:binding_destination()) -> bindings().
-list_for_destination(DstName) ->
- implicit_for_destination(DstName) ++
- mnesia:async_dirty(
+list_for_destination(DstName = #resource{virtual_host = VHostPath}) ->
+ AllBindings = mnesia:async_dirty(
fun() ->
Route = #route{binding = #binding{destination = DstName,
_ = '_'}},
@@ -285,7 +282,11 @@ list_for_destination(DstName) ->
#reverse_route{reverse_binding = B} <-
mnesia:match_object(rabbit_reverse_route,
reverse_route(Route), read)]
- end).
+ end),
+ Filtered = lists:filter(fun(#binding{source = S}) ->
+ S =/= ?DEFAULT_EXCHANGE(VHostPath)
+ end, AllBindings),
+ implicit_for_destination(DstName) ++ Filtered.
implicit_bindings(VHostPath) ->
DstQueues = rabbit_amqqueue:list_names(VHostPath),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 036aa9a60c..8f5f159f88 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -63,6 +63,7 @@
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
-export([force_event_refresh/1]).
+-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -78,7 +79,7 @@
-export([list_queue_states/1, get_max_message_size/0]).
%% Mgmt HTTP API refactor
--export([handle_method/5]).
+-export([handle_method/6]).
-record(ch, {
%% starting | running | flow | closing
@@ -97,6 +98,9 @@
%% same as reader's name, see #v1.name
%% in rabbit_reader
conn_name,
+ %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
+ %% or any other channel creating/spawning entity
+ source,
%% limiter pid, see rabbit_limiter
limiter,
%% none | {Msgs, Acks} | committing | failed |
@@ -448,6 +452,14 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
+-spec source(pid(), any()) -> any().
+
+source(Pid, Source) when is_pid(Pid) ->
+ case erlang:is_process_alive(Pid) of
+ true -> Pid ! {channel_source, Source};
+ false -> {error, channel_terminated}
+ end.
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
@@ -805,7 +817,10 @@ handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup(QName)
end, QueueStates0),
- noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})).
+ noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
+
+handle_info({channel_source, Source}, State = #ch{}) ->
+ noreply(State#ch{source = Source}).
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -938,11 +953,11 @@ check_write_permitted(Resource, User) ->
check_read_permitted(Resource, User) ->
check_resource_access(User, Resource, read).
-check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, write).
+check_write_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, write).
-check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey) ->
- check_topic_authorisation(Resource, User, ConnPid, RoutingKey, read).
+check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
+ check_topic_authorisation(Resource, User, ConnPid, RoutingKey, ChSrc, read).
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
@@ -978,14 +993,17 @@ check_internal_exchange(_) ->
ok.
check_topic_authorisation(Resource = #exchange{type = topic},
- User, none, RoutingKey, Permission) ->
+ User, none, RoutingKey, _ChSrc, Permission) ->
%% Called from outside the channel by mgmt API
AmqpParams = [],
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
check_topic_authorisation(Resource = #exchange{type = topic},
- User, ConnPid, RoutingKey, Permission) when is_pid(ConnPid) ->
- AmqpParams = get_amqp_params(ConnPid),
+ User, ConnPid, RoutingKey, ChSrc, Permission) when is_pid(ConnPid) ->
+ AmqpParams = get_amqp_params(ConnPid, ChSrc),
check_topic_authorisation(Resource, User, AmqpParams, RoutingKey, Permission);
+check_topic_authorisation(_, _, _, _, _, _) ->
+ ok.
+
check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost}, type = topic},
User = #user{username = Username},
AmqpParams, RoutingKey, Permission) ->
@@ -1003,11 +1021,10 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
User, Resource, Permission, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(topic_permission_cache, [{Resource, Context, Permission} | CacheTail])
- end;
-check_topic_authorisation(_, _, _, _, _) ->
- ok.
+ end.
-get_amqp_params(ConnPid) when is_pid(ConnPid) ->
+get_amqp_params(_ConnPid, rabbit_reader) -> [];
+get_amqp_params(ConnPid, _Any) when is_pid(ConnPid) ->
Timeout = get_operation_timeout(),
get_amqp_params(ConnPid, rabbit_misc:is_process_alive(ConnPid), Timeout).
@@ -1227,13 +1244,14 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid,
+ source = ChSrc,
max_message_size = MaxMessageSize}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
- check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
+ check_write_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
@@ -1516,76 +1534,85 @@ handle_method(#'exchange.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
user = User,
queue_collector_pid = CollectorPid,
- conn_pid = ConnPid}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ conn_pid = ConnPid,
+ source = ChSrc}) ->
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
+ source = ChSrc,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
virtual_host = VHostPath,
queue_collector_pid = CollectorPid,
user = User}) ->
{ok, PurgedMessageCount} =
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- handle_method(Method, ConnPid, CollectorPid, VHostPath, User),
+ handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
_, State = #ch{conn_pid = ConnPid,
+ source = ChSrc,
user = User,
queue_collector_pid = CollectorPid,
virtual_host = VHostPath}) ->
- case handle_method(Method, ConnPid, CollectorPid,
+ case handle_method(Method, ConnPid, ChSrc, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
@@ -1811,7 +1838,7 @@ handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
- RoutingKey, Arguments, VHostPath, ConnPid,
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc,
#user{username = Username} = User) ->
ExchangeNameBin = strip_cr_lf(SourceNameBin0),
DestinationNameBin = strip_cr_lf(DestinationNameBin0),
@@ -1820,14 +1847,11 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, User),
- ExchangeLookup = rabbit_exchange:lookup(ExchangeName),
- case ExchangeLookup of
+ case rabbit_exchange:lookup(ExchangeName) of
{error, not_found} ->
- %% no-op
- ExchangeLookup;
+ ok;
{ok, Exchange} ->
- check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey),
- ExchangeLookup
+ check_read_permitted_on_topic(Exchange, User, ConnPid, RoutingKey, ChSrc)
end,
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
@@ -2226,6 +2250,7 @@ i(user_who_performed_action, Ch) -> i(user, Ch);
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(source, #ch{source = ChSrc}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
@@ -2298,39 +2323,39 @@ handle_method(#'exchange.bind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'exchange.unbind'{destination = DestinationNameBin,
source = SourceNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
SourceNameBin, exchange, DestinationNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:remove/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments},
- ConnPid, _CollectorId, VHostPath, User) ->
+ ConnPid, ChSrc, _CollectorId, VHostPath, User) ->
binding_action(fun rabbit_binding:add/3,
ExchangeNameBin, queue, QueueNameBin,
- RoutingKey, Arguments, VHostPath, ConnPid, User);
+ RoutingKey, Arguments, VHostPath, ConnPid, ChSrc, User);
%% Note that all declares to these are effectively passive. If it
%% exists it by definition has one consumer.
handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
_/binary>> = QueueNameBin},
- _ConnPid, _CollectorPid, VHost, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHost, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHost, queue, StrippedQueueNameBin),
case declare_fast_reply_to(StrippedQueueNameBin) of
@@ -2344,7 +2369,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
- ConnPid, CollectorPid, VHostPath,
+ ConnPid, ChSrc, CollectorPid, VHostPath,
#user{username = Username} = User) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
@@ -2403,7 +2428,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, ConnPid, CollectorPid, VHostPath,
+ handle_method(Declare, ConnPid, ChSrc, CollectorPid, VHostPath,
User);
{absent, Q, Reason} ->
rabbit_amqqueue:absent(Q, Reason);
@@ -2419,7 +2444,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
passive = true},
- ConnPid, _CollectorPid, VHostPath, _User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = rabbit_misc:r(VHostPath, queue, StrippedQueueNameBin),
Fun = fun (Q0) ->
@@ -2433,7 +2458,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty},
- ConnPid, _CollectorPid, VHostPath,
+ ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
QueueName = qbin_to_resource(StrippedQueueNameBin, VHostPath),
@@ -2462,7 +2487,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
end;
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
User = #user{username = Username}) ->
StrippedExchangeNameBin = strip_cr_lf(ExchangeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, StrippedExchangeNameBin),
@@ -2478,7 +2503,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
ok
end;
handle_method(#'queue.purge'{queue = QueueNameBin},
- ConnPid, _CollectorPid, VHostPath, User) ->
+ ConnPid, _ChSrc, _CollectorPid, VHostPath, User) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User),
rabbit_amqqueue:with_exclusive_access_or_die(
@@ -2491,7 +2516,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- _ConnPid, _CollectorPid, VHostPath,
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath,
#user{username = Username} = User) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
@@ -2524,7 +2549,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete, Internal, Args);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
passive = true},
- _ConnPid, _CollectorPid, VHostPath, _User) ->
+ _ConnPid, _ChSrc, _CollectorPid, VHostPath, _User) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, strip_cr_lf(ExchangeNameBin)),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 50e8f3d2b0..e43bfba90c 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -211,6 +211,7 @@ start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
rabbit_direct_client_sup,
[{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}]),
+ _ = rabbit_channel:source(ChannelPid, ?MODULE),
{ok, ChannelPid}.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 61373e49c1..665eea12df 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -1472,14 +1472,18 @@ move_to_per_vhost_stores(#resource{} = QueueName) ->
OldQueueDir = filename:join([queues_base_dir(), "queues",
queue_name_to_dir_name_legacy(QueueName)]),
NewQueueDir = queue_dir(QueueName),
+ rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'",
+ [OldQueueDir, NewQueueDir]),
case rabbit_file:is_dir(OldQueueDir) of
true ->
ok = rabbit_file:ensure_dir(NewQueueDir),
ok = rabbit_file:rename(OldQueueDir, NewQueueDir),
ok = ensure_queue_name_stub_file(NewQueueDir, QueueName);
false ->
- rabbit_log:info("Queue index directory not found for queue ~p~n",
- [QueueName])
+ Msg = "Queue index directory '~s' not found for ~s~n",
+ Args = [OldQueueDir, rabbit_misc:rs(QueueName)],
+ rabbit_log_upgrade:error(Msg, Args),
+ rabbit_log:error(Msg, Args)
end,
ok.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e1c1c320f4..9e73981541 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -37,6 +37,8 @@
-export([requeue/3]).
-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
+-export([shrink_all/1,
+ grow/4]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -288,8 +290,7 @@ reductions(Name) ->
0
end.
--spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue() |
- {'absent', amqqueue:amqqueue(), atom()}].
+-spec recover([amqqueue:amqqueue()]) -> [amqqueue:amqqueue()].
recover(Queues) ->
[begin
@@ -666,7 +667,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
ServerId = {RaName, Node},
case ra:start_server(RaName, ServerId, ra_machine(Q),
- [{RaName, N} || N <- QNodes]) of
+ [{RaName, N} || N <- QNodes]) of
ok ->
case ra:add_member(ServerRef, ServerId) of
{ok, _, Leader} ->
@@ -679,11 +680,15 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok;
+ timeout ->
+ {error, timeout};
E ->
%% TODO should we stop the ra process here?
E
end;
- {error, _} = E ->
+ timeout ->
+ {error, timeout};
+ E ->
E
end.
@@ -713,20 +718,91 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
QName = amqqueue:get_name(Q),
{RaName, _} = amqqueue:get_pid(Q),
ServerId = {RaName, Node},
- case ra:leave_and_delete_server(ServerId) of
- ok ->
- Fun = fun(Q1) ->
- amqqueue:set_quorum_nodes(
- Q1,
- lists:delete(Node, amqqueue:get_quorum_nodes(Q1)))
- end,
- rabbit_misc:execute_mnesia_transaction(
- fun() -> rabbit_amqqueue:update(QName, Fun) end),
- ok;
- E ->
- E
+ case amqqueue:get_quorum_nodes(Q) of
+ [Node] ->
+ %% deleting the last member is not allowed
+ {error, last_node};
+ _ ->
+ case ra:leave_and_delete_server(ServerId) of
+ ok ->
+ Fun = fun(Q1) ->
+ amqqueue:set_quorum_nodes(
+ Q1,
+ lists:delete(Node,
+ amqqueue:get_quorum_nodes(Q1)))
+ end,
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> rabbit_amqqueue:update(QName, Fun) end),
+ ok;
+ timeout ->
+ {error, timeout};
+ E ->
+ E
+ end
end.
+-spec shrink_all(node()) ->
+ [{rabbit_amqqueue:name(),
+ {ok, pos_integer()} | {error, pos_integer(), term()}}].
+shrink_all(Node) ->
+ [begin
+ QName = amqqueue:get_name(Q),
+ rabbit_log:info("~s: removing member (replica) on node ~w",
+ [rabbit_misc:rs(QName), Node]),
+ Size = length(amqqueue:get_quorum_nodes(Q)),
+ case delete_member(Q, Node) of
+ ok ->
+ {QName, {ok, Size-1}};
+ {error, Err} ->
+ rabbit_log:warning("~s: failed to remove member (replica) on node ~w, error: ~w",
+ [rabbit_misc:rs(QName), Node, Err]),
+ {QName, {error, Size, Err}}
+ end
+ end || Q <- rabbit_amqqueue:list(),
+ amqqueue:get_type(Q) == quorum,
+ lists:member(Node, amqqueue:get_quorum_nodes(Q))].
+
+-spec grow(node(), binary(), binary(), all | even) ->
+ [{rabbit_amqqueue:name(),
+ {ok, pos_integer()} | {error, pos_integer(), term()}}].
+grow(Node, VhostSpec, QueueSpec, Strategy) ->
+ Running = rabbit_mnesia:cluster_nodes(running),
+ [begin
+ Size = length(amqqueue:get_quorum_nodes(Q)),
+ QName = amqqueue:get_name(Q),
+ rabbit_log:info("~s: adding a new member (replica) on node ~w",
+ [rabbit_misc:rs(QName), Node]),
+ case add_member(Q, Node) of
+ ok ->
+ {QName, {ok, Size + 1}};
+ {error, Err} ->
+ rabbit_log:warning(
+ "~s: failed to add member (replica) on node ~w, error: ~w",
+ [rabbit_misc:rs(QName), Node, Err]),
+ {QName, {error, Size, Err}}
+ end
+ end
+ || Q <- rabbit_amqqueue:list(),
+ amqqueue:get_type(Q) == quorum,
+ %% don't add a member if there is already one on the node
+ not lists:member(Node, amqqueue:get_quorum_nodes(Q)),
+ %% node needs to be running
+ lists:member(Node, Running),
+ matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)),
+ is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
+ is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
+
+get_resource_name(#resource{name = Name}) ->
+ Name.
+
+matches_strategy(all, _) -> true;
+matches_strategy(even, Members) ->
+ length(Members) rem 2 == 0.
+
+is_match(Subj, E) ->
+ nomatch /= re:run(Subj, E).
+
+
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c0cb9c57d5..6f0b0a5ea5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -924,6 +924,7 @@ create_channel(Channel,
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
Protocol, User, VHost, Capabilities, Collector}),
+ _ = rabbit_channel:source(ChPid, ?MODULE),
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 9180f9ca0a..1721c9b806 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -53,10 +53,11 @@ recover(VHost) ->
VHostStubFile = filename:join(VHostDir, ".vhost"),
ok = rabbit_file:ensure_dir(VHostStubFile),
ok = file:write_file(VHostStubFile, VHost),
- Qs = rabbit_amqqueue:recover(VHost),
- QNames = [amqqueue:get_name(Q) || Q <- Qs],
+ {RecoveredClassic, FailedClassic, Quorum} = rabbit_amqqueue:recover(VHost),
+ AllQs = RecoveredClassic ++ FailedClassic ++ Quorum,
+ QNames = [amqqueue:get_name(Q) || Q <- AllQs],
ok = rabbit_binding:recover(rabbit_exchange:recover(VHost), QNames),
- ok = rabbit_amqqueue:start(Qs),
+ ok = rabbit_amqqueue:start(RecoveredClassic),
%% Start queue mirrors.
ok = rabbit_mirror_queue_misc:on_vhost_up(VHost),
ok.