summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-20 18:29:36 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-20 18:29:36 +0100
commitba7b3357fae655597055bac121af67eeb3373560 (patch)
tree20f04b4ccc9d6b0cd9428de8fdf7dc1206716133 /src
parent7c11db0333b2ec7149ed9a5927ba841473f3f056 (diff)
parenta9e3887873b0556544abd24883ff5bef43c6892d (diff)
downloadrabbitmq-server-git-ba7b3357fae655597055bac121af67eeb3373560.tar.gz
Merge in default
Diffstat (limited to 'src')
-rw-r--r--src/pg_local.erl19
-rw-r--r--src/rabbit_channel.erl157
-rw-r--r--src/rabbit_exchange.erl19
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_reader.erl2
5 files changed, 185 insertions, 15 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl
index f535b1362b..4d9914d9b7 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -34,7 +34,7 @@
%%
-module(pg_local).
--export([join/2, leave/2, get_members/1]).
+-export([join/2, leave/2, get_members/1, in_group/2]).
-export([sync/0]). %% intended for testing only; not part of official API
-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2]).
@@ -50,6 +50,7 @@
-spec(join/2 :: (name(), pid()) -> 'ok').
-spec(leave/2 :: (name(), pid()) -> 'ok').
-spec(get_members/1 :: (name()) -> [pid()]).
+-spec(in_group/2 :: (name(), pid()) -> boolean()).
-spec(sync/0 :: () -> 'ok').
@@ -81,6 +82,16 @@ get_members(Name) ->
ensure_started(),
group_members(Name).
+in_group(Name, Pid) ->
+ ensure_started(),
+ %% The join message is a cast and thus can race, but we want to
+ %% keep it that way to be fast in the common case.
+ case member_present(Name, Pid) of
+ true -> true;
+ false -> sync(),
+ member_present(Name, Pid)
+ end.
+
sync() ->
ensure_started(),
gen_server:call(?MODULE, sync, infinity).
@@ -199,6 +210,12 @@ member_in_group(Pid, Name) ->
[{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
lists:duplicate(N, Pid).
+member_present(Name, Pid) ->
+ case ets:lookup(pg_local_table, {member, Name, Pid}) of
+ [_] -> true;
+ [] -> false
+ end.
+
member_groups(Pid) ->
[Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d2f6719c75..418653dfb0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,8 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]).
+-export([send_command/2, deliver/4, deliver_reply/2,
+ send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/1]).
@@ -30,7 +31,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Internal
--export([list_local/0]).
+-export([list_local/0, deliver_reply_local/3]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
conn_name, limiter, tx, next_tag, unacked_message_q, user,
@@ -39,7 +40,7 @@
queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, mandatory, capabilities, trace_state,
- consumer_prefetch}).
+ consumer_prefetch, reply_consumer}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -96,6 +97,9 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
+-spec(deliver_reply/2 :: (binary(), rabbit_types:delivery()) -> 'ok').
+-spec(deliver_reply_local/3 ::
+ (pid(), binary(), rabbit_types:delivery()) -> 'ok').
-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
-> 'ok').
@@ -142,6 +146,41 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
+ {ok, Pid, Key} = decode_fast_reply_to(Rest),
+ delegate:invoke_no_result(
+ Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}).
+
+%% We want to ensure people can't use this mechanism to send a message
+%% to an arbitrary process and kill it!
+deliver_reply_local(Pid, Key, Delivery) ->
+ case pg_local:in_group(rabbit_channels, Pid) of
+ true -> gen_server2:cast(Pid, {deliver_reply, Key, Delivery});
+ false -> ok
+ end.
+
+declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
+ exists;
+declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) ->
+ case decode_fast_reply_to(Rest) of
+ {ok, Pid, Key} ->
+ Msg = {declare_fast_reply_to, Key},
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const(not_found),
+ fun() -> gen_server2:call(Pid, Msg, infinity) end);
+ error ->
+ not_found
+ end;
+declare_fast_reply_to(_) ->
+ not_found.
+
+decode_fast_reply_to(Suffix) ->
+ case binary:split(Suffix, <<".">>) of
+ [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
+ {ok, Pid, Key};
+ _ -> error
+ end.
+
send_credit_reply(Pid, Len) ->
gen_server2:cast(Pid, {send_credit_reply, Len}).
@@ -219,7 +258,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = 0},
+ consumer_prefetch = 0,
+ reply_consumer = none},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -262,6 +302,13 @@ handle_call({info, Items}, _From, State) ->
handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)});
+handle_call({declare_fast_reply_to, Key}, _From,
+ State = #ch{reply_consumer = Consumer}) ->
+ reply(case Consumer of
+ {_, _, Key} -> exists;
+ _ -> not_found
+ end, State);
+
handle_call(_Request, _From, State) ->
noreply(State).
@@ -326,6 +373,29 @@ handle_cast({deliver, ConsumerTag, AckRequired,
Content),
noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
+handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) ->
+ noreply(State);
+handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) ->
+ noreply(State);
+handle_cast({deliver_reply, Key, #delivery{message =
+ #basic_message{exchange_name = ExchangeName,
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag,
+ reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = false,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(State);
+handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) ->
+ noreply(State);
+
handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
WriterPid, #'basic.credit_ok'{available = Len}),
@@ -609,6 +679,21 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
+maybe_set_fast_reply_to(
+ C = #content{properties = P = #'P_basic'{reply_to =
+ <<"amq.rabbitmq.reply-to">>}},
+ #ch{reply_consumer = ReplyConsumer}) ->
+ case ReplyConsumer of
+ none -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "fast reply consumer does not exist", []);
+ {_, Suf, _K} -> Rep = <<"amq.rabbitmq.reply-to.", Suf/binary>>,
+ rabbit_binary_generator:clear_encoded_content(
+ C#content{properties = P#'P_basic'{reply_to = Rep}})
+ end;
+maybe_set_fast_reply_to(C, _State) ->
+ C.
+
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -686,7 +771,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
- rabbit_binary_parser:ensure_content_decoded(Content),
+ maybe_set_fast_reply_to(
+ rabbit_binary_parser:ensure_content_decoded(Content), State),
check_user_id_header(Props, State),
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
@@ -761,6 +847,55 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
{reply, #'basic.get_empty'{}, State}
end;
+handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
+ consumer_tag = CTag0,
+ no_ack = NoAck,
+ nowait = NoWait},
+ _, State = #ch{reply_consumer = ReplyConsumer,
+ consumer_mapping = ConsumerMapping}) ->
+ case dict:find(CTag0, ConsumerMapping) of
+ error ->
+ case {ReplyConsumer, NoAck} of
+ {none, true} ->
+ CTag = case CTag0 of
+ <<>> -> rabbit_guid:binary(
+ rabbit_guid:gen_secure(), "amq.ctag");
+ Other -> Other
+ end,
+ %% Precalculate both suffix and key; base64 encoding is
+ %% expensive
+ Key = base64:encode(rabbit_guid:gen_secure()),
+ PidEnc = base64:encode(term_to_binary(self())),
+ Suffix = <<PidEnc/binary, ".", Key/binary>>,
+ State1 = State#ch{reply_consumer = {CTag, Suffix, Key}},
+ case NoWait of
+ true -> {noreply, State1};
+ false -> Rep = #'basic.consume_ok'{consumer_tag = CTag},
+ {reply, Rep, State1}
+ end;
+ {_, false} ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "reply consumer cannot acknowledge", []);
+ _ ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "reply consumer already set", [])
+ end;
+ {ok, _} ->
+ %% Attempted reuse of consumer tag.
+ rabbit_misc:protocol_error(
+ not_allowed, "attempt to reuse consumer tag '~s'", [CTag0])
+ end;
+
+handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
+ _, State = #ch{reply_consumer = {ConsumerTag, _, _}}) ->
+ State1 = State#ch{reply_consumer = none},
+ case NoWait of
+ true -> {noreply, State1};
+ false -> Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
+ {reply, Rep, State1}
+ end;
+
handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ConsumerTag,
no_local = _, % FIXME: implement
@@ -975,6 +1110,18 @@ handle_method(#'exchange.unbind'{destination = DestinationNameBin,
SourceNameBin, exchange, DestinationNameBin, RoutingKey,
Arguments, #'exchange.unbind_ok'{}, NoWait, State);
+%% Note that all declares to these are effectively passive. If it
+%% exists it by definition has one consumer.
+handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
+ _/binary>> = QueueNameBin,
+ nowait = NoWait}, _,
+ State = #ch{virtual_host = VHost}) ->
+ QueueName = rabbit_misc:r(VHost, queue, QueueNameBin),
+ case declare_fast_reply_to(QueueNameBin) of
+ exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State);
+ not_found -> rabbit_misc:not_found(QueueName)
+ end;
+
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
durable = DurableDeclare,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 8b944763a8..f184174c8a 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -344,14 +344,21 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
decorators = Decorators} = X,
#delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
- case {RName, rabbit_exchange_decorator:select(route, Decorators)} of
- {<<"">>, []} ->
- %% Optimisation
- [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
- {_, SelectedDecorators} ->
- lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []}))
+ case RName of
+ <<>> ->
+ RKsSorted = lists:usort(RKs),
+ [rabbit_channel:deliver_reply(RK, Delivery) ||
+ RK <- RKsSorted, virtual_reply_queue(RK)],
+ [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted,
+ not virtual_reply_queue(RK)];
+ _ ->
+ Decs = rabbit_exchange_decorator:select(route, Decorators),
+ lists:usort(route1(Delivery, Decs, {[X], XName, []}))
end.
+virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true;
+virtual_reply_queue(_) -> false.
+
route1(_, _, {[], _, QNames}) ->
QNames;
route1(Delivery, Decorators,
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d5dd9712f9..17fca7bbaf 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -801,8 +801,7 @@ find_auto_cluster_node([Node | Nodes]) ->
find_auto_cluster_node(Nodes)
end,
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _} = Reason -> Diag = rabbit_nodes:diagnostics([Node]),
- Fail("~p~n~s~n", [Reason, Diag]);
+ {badrpc, _} = Reason -> Fail("~p~n", [Reason]);
%% old delegate hash check
{_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]);
{OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b2930f88d7..ca73006aed 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -412,7 +412,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
rabbit_event:init_stats_timer(State, #v1.stats_timer);
-handle_other({'$gen_cast', force_event_refresh}, State) ->
+handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
%% Ignore, we will emit a created event once we start running.
State;
handle_other(ensure_stats, State) ->