diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-20 18:29:36 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-20 18:29:36 +0100 |
| commit | ba7b3357fae655597055bac121af67eeb3373560 (patch) | |
| tree | 20f04b4ccc9d6b0cd9428de8fdf7dc1206716133 /src | |
| parent | 7c11db0333b2ec7149ed9a5927ba841473f3f056 (diff) | |
| parent | a9e3887873b0556544abd24883ff5bef43c6892d (diff) | |
| download | rabbitmq-server-git-ba7b3357fae655597055bac121af67eeb3373560.tar.gz | |
Merge in default
Diffstat (limited to 'src')
| -rw-r--r-- | src/pg_local.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 157 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 |
5 files changed, 185 insertions, 15 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl index f535b1362b..4d9914d9b7 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -34,7 +34,7 @@ %% -module(pg_local). --export([join/2, leave/2, get_members/1]). +-export([join/2, leave/2, get_members/1, in_group/2]). -export([sync/0]). %% intended for testing only; not part of official API -export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -50,6 +50,7 @@ -spec(join/2 :: (name(), pid()) -> 'ok'). -spec(leave/2 :: (name(), pid()) -> 'ok'). -spec(get_members/1 :: (name()) -> [pid()]). +-spec(in_group/2 :: (name(), pid()) -> boolean()). -spec(sync/0 :: () -> 'ok'). @@ -81,6 +82,16 @@ get_members(Name) -> ensure_started(), group_members(Name). +in_group(Name, Pid) -> + ensure_started(), + %% The join message is a cast and thus can race, but we want to + %% keep it that way to be fast in the common case. + case member_present(Name, Pid) of + true -> true; + false -> sync(), + member_present(Name, Pid) + end. + sync() -> ensure_started(), gen_server:call(?MODULE, sync, infinity). @@ -199,6 +210,12 @@ member_in_group(Pid, Name) -> [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}), lists:duplicate(N, Pid). +member_present(Name, Pid) -> + case ets:lookup(pg_local_table, {member, Name, Pid}) of + [_] -> true; + [] -> false + end. + member_groups(Pid) -> [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})]. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d2f6719c75..418653dfb0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,8 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). +-export([send_command/2, deliver/4, deliver_reply/2, + send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/1]). @@ -30,7 +31,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Internal --export([list_local/0]). +-export([list_local/0, deliver_reply_local/3]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, conn_name, limiter, tx, next_tag, unacked_message_q, user, @@ -39,7 +40,7 @@ queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, mandatory, capabilities, trace_state, - consumer_prefetch}). + consumer_prefetch, reply_consumer}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -96,6 +97,9 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). +-spec(deliver_reply/2 :: (binary(), rabbit_types:delivery()) -> 'ok'). +-spec(deliver_reply_local/3 :: + (pid(), binary(), rabbit_types:delivery()) -> 'ok'). -spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'). @@ -142,6 +146,41 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> + {ok, Pid, Key} = decode_fast_reply_to(Rest), + delegate:invoke_no_result( + Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}). + +%% We want to ensure people can't use this mechanism to send a message +%% to an arbitrary process and kill it! +deliver_reply_local(Pid, Key, Delivery) -> + case pg_local:in_group(rabbit_channels, Pid) of + true -> gen_server2:cast(Pid, {deliver_reply, Key, Delivery}); + false -> ok + end. + +declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> + exists; +declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) -> + case decode_fast_reply_to(Rest) of + {ok, Pid, Key} -> + Msg = {declare_fast_reply_to, Key}, + rabbit_misc:with_exit_handler( + rabbit_misc:const(not_found), + fun() -> gen_server2:call(Pid, Msg, infinity) end); + error -> + not_found + end; +declare_fast_reply_to(_) -> + not_found. + +decode_fast_reply_to(Suffix) -> + case binary:split(Suffix, <<".">>) of + [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)), + {ok, Pid, Key}; + _ -> error + end. + send_credit_reply(Pid, Len) -> gen_server2:cast(Pid, {send_credit_reply, Len}). @@ -219,7 +258,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), - consumer_prefetch = 0}, + consumer_prefetch = 0, + reply_consumer = none}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -262,6 +302,13 @@ handle_call({info, Items}, _From, State) -> handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); +handle_call({declare_fast_reply_to, Key}, _From, + State = #ch{reply_consumer = Consumer}) -> + reply(case Consumer of + {_, _, Key} -> exists; + _ -> not_found + end, State); + handle_call(_Request, _From, State) -> noreply(State). @@ -326,6 +373,29 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) -> + noreply(State); +handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) -> + noreply(State); +handle_cast({deliver_reply, Key, #delivery{message = + #basic_message{exchange_name = ExchangeName, + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag, + reply_consumer = {ConsumerTag, _Suffix, Key}}) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = false, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + noreply(State); +handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) -> + noreply(State); + handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.credit_ok'{available = Len}), @@ -609,6 +679,21 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +maybe_set_fast_reply_to( + C = #content{properties = P = #'P_basic'{reply_to = + <<"amq.rabbitmq.reply-to">>}}, + #ch{reply_consumer = ReplyConsumer}) -> + case ReplyConsumer of + none -> rabbit_misc:protocol_error( + precondition_failed, + "fast reply consumer does not exist", []); + {_, Suf, _K} -> Rep = <<"amq.rabbitmq.reply-to.", Suf/binary>>, + rabbit_binary_generator:clear_encoded_content( + C#content{properties = P#'P_basic'{reply_to = Rep}}) + end; +maybe_set_fast_reply_to(C, _State) -> + C. + record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -686,7 +771,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = - rabbit_binary_parser:ensure_content_decoded(Content), + maybe_set_fast_reply_to( + rabbit_binary_parser:ensure_content_decoded(Content), State), check_user_id_header(Props, State), check_expiration_header(Props), DoConfirm = Tx =/= none orelse ConfirmEnabled, @@ -761,6 +847,55 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, {reply, #'basic.get_empty'{}, State} end; +handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, + consumer_tag = CTag0, + no_ack = NoAck, + nowait = NoWait}, + _, State = #ch{reply_consumer = ReplyConsumer, + consumer_mapping = ConsumerMapping}) -> + case dict:find(CTag0, ConsumerMapping) of + error -> + case {ReplyConsumer, NoAck} of + {none, true} -> + CTag = case CTag0 of + <<>> -> rabbit_guid:binary( + rabbit_guid:gen_secure(), "amq.ctag"); + Other -> Other + end, + %% Precalculate both suffix and key; base64 encoding is + %% expensive + Key = base64:encode(rabbit_guid:gen_secure()), + PidEnc = base64:encode(term_to_binary(self())), + Suffix = <<PidEnc/binary, ".", Key/binary>>, + State1 = State#ch{reply_consumer = {CTag, Suffix, Key}}, + case NoWait of + true -> {noreply, State1}; + false -> Rep = #'basic.consume_ok'{consumer_tag = CTag}, + {reply, Rep, State1} + end; + {_, false} -> + rabbit_misc:protocol_error( + precondition_failed, + "reply consumer cannot acknowledge", []); + _ -> + rabbit_misc:protocol_error( + precondition_failed, "reply consumer already set", []) + end; + {ok, _} -> + %% Attempted reuse of consumer tag. + rabbit_misc:protocol_error( + not_allowed, "attempt to reuse consumer tag '~s'", [CTag0]) + end; + +handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, + _, State = #ch{reply_consumer = {ConsumerTag, _, _}}) -> + State1 = State#ch{reply_consumer = none}, + case NoWait of + true -> {noreply, State1}; + false -> Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, + {reply, Rep, State1} + end; + handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, no_local = _, % FIXME: implement @@ -975,6 +1110,18 @@ handle_method(#'exchange.unbind'{destination = DestinationNameBin, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.unbind_ok'{}, NoWait, State); +%% Note that all declares to these are effectively passive. If it +%% exists it by definition has one consumer. +handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", + _/binary>> = QueueNameBin, + nowait = NoWait}, _, + State = #ch{virtual_host = VHost}) -> + QueueName = rabbit_misc:r(VHost, queue, QueueNameBin), + case declare_fast_reply_to(QueueNameBin) of + exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State); + not_found -> rabbit_misc:not_found(QueueName) + end; + handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = DurableDeclare, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 8b944763a8..f184174c8a 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -344,14 +344,21 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> - case {RName, rabbit_exchange_decorator:select(route, Decorators)} of - {<<"">>, []} -> - %% Optimisation - [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; - {_, SelectedDecorators} -> - lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []})) + case RName of + <<>> -> + RKsSorted = lists:usort(RKs), + [rabbit_channel:deliver_reply(RK, Delivery) || + RK <- RKsSorted, virtual_reply_queue(RK)], + [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted, + not virtual_reply_queue(RK)]; + _ -> + Decs = rabbit_exchange_decorator:select(route, Decorators), + lists:usort(route1(Delivery, Decs, {[X], XName, []})) end. +virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true; +virtual_reply_queue(_) -> false. + route1(_, _, {[], _, QNames}) -> QNames; route1(Delivery, Decorators, diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d5dd9712f9..17fca7bbaf 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -801,8 +801,7 @@ find_auto_cluster_node([Node | Nodes]) -> find_auto_cluster_node(Nodes) end, case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _} = Reason -> Diag = rabbit_nodes:diagnostics([Node]), - Fail("~p~n~s~n", [Reason, Diag]); + {badrpc, _} = Reason -> Fail("~p~n", [Reason]); %% old delegate hash check {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]); {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b2930f88d7..ca73006aed 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -412,7 +412,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), rabbit_event:init_stats_timer(State, #v1.stats_timer); -handle_other({'$gen_cast', force_event_refresh}, State) -> +handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> %% Ignore, we will emit a created event once we start running. State; handle_other(ensure_stats, State) -> |
