diff options
| author | Marek Majkowski <marek@rabbitmq.com> | 2011-02-25 12:10:57 +0000 |
|---|---|---|
| committer | Marek Majkowski <marek@rabbitmq.com> | 2011-02-25 12:10:57 +0000 |
| commit | be4b20cfb43c4f687af10e71c518f792c6606291 (patch) | |
| tree | c6dac4f33a445580bd65f808ea2b5e274cbddad9 /src | |
| parent | d2cf22aff35196b9c778f47a0c47324293370687 (diff) | |
| parent | 79740987af7ce6a60d661513a9dddbab889dcdfc (diff) | |
| download | rabbitmq-server-git-be4b20cfb43c4f687af10e71c518f792c6606291.tar.gz | |
bug23278 merged into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 3 | ||||
| -rw-r--r-- | src/rabbit.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 106 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 256 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 96 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 349 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 168 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 159 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 34 |
14 files changed, 576 insertions, 693 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 7c548bdc74..f41815d06b 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -889,13 +889,16 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, false -> {noreply, run_pending_item(Item, State)} end; + handle_call({set_limit, Limit}, _From, State) -> {reply, ok, maybe_reduce( process_pending(State #fhc_state { limit = Limit, obtain_limit = obtain_limit(Limit) }))}; + handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> {reply, Limit, State}; + handle_call({info, Items}, _From, State) -> {reply, infos(Items, State), State}. diff --git a/src/rabbit.erl b/src/rabbit.erl index 1beed5c1a7..faf484af9c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -214,7 +214,8 @@ stop_and_halt() -> ok. status() -> - [{running_applications, application:which_applications()}] ++ + [{pid, list_to_integer(os:getpid())}, + {running_applications, application:which_applications()}] ++ rabbit_mnesia:status(). rotate_logs(BinarySuffix) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index d67c7f58e1..dc81ace6bf 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -61,8 +61,7 @@ -spec(map_exception/3 :: (rabbit_channel:channel_number(), rabbit_types:amqp_error() | any(), rabbit_types:protocol()) -> - {boolean(), - rabbit_channel:channel_number(), + {rabbit_channel:channel_number(), rabbit_framing:amqp_method_record()}). -endif. @@ -301,24 +300,21 @@ clear_encoded_content(Content = #content{}) -> map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = lookup_amqp_exception(Reason, Protocol), - ShouldClose = SuggestedClose orelse (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; _ -> Protocol:method_id(FailedMethod) end, - {CloseChannel, CloseMethod} = - case ShouldClose of - true -> {0, #'connection.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}}; - false -> {Channel, #'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}} - end, - {ShouldClose, CloseChannel, CloseMethod}. + case SuggestedClose orelse (Channel == 0) of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end. lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 73c031de83..34a5e5a440 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,16 +20,16 @@ -behaviour(gen_server2). --export([start_link/8, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/9, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1]). +-export([emit_stats/1, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, @@ -67,10 +67,10 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/8 :: - (channel_number(), pid(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), - fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> +-spec(start_link/9 :: + (channel_number(), pid(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -90,16 +90,17 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(ready_for_close/1 :: (pid()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, User, VHost, Capabilities, +start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun) -> - gen_server2:start_link(?MODULE, - [Channel, ReaderPid, WriterPid, User, VHost, - Capabilities, CollectorPid, StartLimiterFun], []). + gen_server2:start_link( + ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -148,14 +149,18 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). +ready_for_close(Pid) -> + gen_server2:cast(Pid, ready_for_close). + %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid, - StartLimiterFun]) -> +init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, + CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, + protocol = Protocol, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, @@ -222,14 +227,11 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, normal, State#ch{state = terminating}} + {stop, normal, State} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, normal, terminating(Reason#amqp_error{method = MethodName}, - State)}; - exit:normal -> - {stop, normal, State}; + send_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -237,6 +239,11 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State), hibernate}; +handle_cast(ready_for_close, State = #ch{state = closing, + writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), + {stop, normal, State}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -309,18 +316,16 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. -terminate(_Reason, State = #ch{state = terminating}) -> - terminate(State); - terminate(Reason, State) -> - Res = rollback_and_notify(State), + {Res, _State1} = rollback_and_notify(State), case Reason of normal -> ok = Res; shutdown -> ok = Res; {shutdown, _Term} -> ok = Res; _ -> ok end, - terminate(State). + pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -358,10 +363,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> - ok = rollback_and_notify(State), - Reader ! {channel_exit, Channel, Reason}, - State#ch{state = terminating}. +send_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid}) -> + {CloseChannel, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [ReaderPid, Channel, Reason]), + %% something bad's happened: rollback_and_notify may not be 'ok' + {_Result, State1} = rollback_and_notify(State), + case CloseChannel of + Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), + {noreply, State1}; + _ -> ReaderPid ! {channel_exit, Channel, Reason}, + {stop, normal, State1} + end. return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> @@ -544,11 +561,20 @@ handle_method(#'channel.open'{}, _, _State) -> handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = rollback_and_notify(State), - ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), +handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> stop; +handle_method(#'channel.close'{}, _, State = #ch{state = closing}) -> + {reply, #'channel.close_ok'{}, State}; + +handle_method(_Method, _, State = #ch{state = closing}) -> + {noreply, State}; + +handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> + {ok, State1} = rollback_and_notify(State), + ReaderPid ! {channel_closing, self()}, + {noreply, State1}; + handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -1099,9 +1125,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, - WriterPid, Reason) -> - {_Close, ReplyCode, ReplyText} = - rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), + #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> + {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, @@ -1188,10 +1213,13 @@ internal_rollback(State = #ch{transaction_id = TxnKey, NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}). +rollback_and_notify(State = #ch{state = closing}) -> + {ok, State}; rollback_and_notify(State = #ch{transaction_id = none}) -> - notify_queues(State); + {notify_queues(State), State#ch{state = closing}}; rollback_and_notify(State) -> - notify_queues(internal_rollback(State)). + State1 = internal_rollback(State), + {notify_queues(State1), State1#ch{state = closing}}. fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( @@ -1258,10 +1286,10 @@ is_message_persistent(Content) -> end. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_route), + ok = basic_return(Msg, State, no_route), record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + ok = basic_return(Msg, State, no_consumers), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1334,10 +1362,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -terminate(_State) -> - pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9005819405..9cc407bc34 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,12 +31,13 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {'tcp', rabbit_types:protocol(), rabbit_net:socket(), - rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), pid(), rabbit_types:protocol(), rabbit_types:user(), + rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()} | - {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}). + {'direct', rabbit_channel:channel_number(), pid(), + rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), + rabbit_framing:amqp_table(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -44,7 +45,7 @@ %%---------------------------------------------------------------------------- -start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, +start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -57,20 +58,20 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, User, VHost, Capabilities, - Collector, start_limiter_fun(SupPid)]}, + [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, User, VHost, Capabilities, - Collector}) -> +start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, User, - VHost, Capabilities, Collector, + [Channel, ClientChannelPid, ClientChannelPid, Protocol, + User, VHost, Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 3a18950f84..746bb66eb5 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -20,6 +20,7 @@ -export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). +-define(WAIT_FOR_VM_ATTEMPTS, 5). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -293,7 +294,30 @@ action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), display_list(call(Node, {rabbit_auth_backend_internal, - list_vhost_permissions, [VHost]})). + list_vhost_permissions, [VHost]})); + +action(wait, Node, [], _Opts, Inform) -> + Inform("Waiting for ~p", [Node]), + wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS). + +wait_for_application(Node, Attempts) -> + case rpc_call(Node, application, which_applications, [infinity]) of + {badrpc, _} = E -> NewAttempts = Attempts - 1, + case NewAttempts of + 0 -> E; + _ -> wait_for_application0(Node, NewAttempts) + end; + Apps -> case proplists:is_defined(rabbit, Apps) of + %% We've seen the node up; if it goes down + %% die immediately. + true -> ok; + false -> wait_for_application0(Node, 0) + end + end. + +wait_for_application0(Node, Attempts) -> + timer:sleep(1000), + wait_for_application(Node, Attempts). default_if_empty(List, Default) when is_list(List) -> if List == [] -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 5c89bf49c9..586563f61e 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/6]). +-export([boot/0, connect/4, start_channel/7]). -include("rabbit.hrl"). @@ -28,10 +28,10 @@ -spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/6 :: - (rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> - {'ok', pid()}). +-spec(start_channel/7 :: + (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid()) -> {'ok', pid()}). -endif. @@ -69,10 +69,11 @@ connect(Username, Password, VHost, Protocol) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, User, VHost, Capabilities, Collector) -> +start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities, + Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, User, VHost, Capabilities, - Collector}]), + [{direct, Number, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}]), {ok, ChannelPid}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 9cbf8100e2..c1741b30a7 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -15,6 +15,7 @@ %% -module(rabbit_exchange_type_topic). + -include("rabbit.hrl"). -behaviour(rabbit_exchange_type). @@ -31,58 +32,223 @@ {requires, rabbit_registry}, {enables, kernel_ready}]}). --export([topic_matches/2]). - --ifdef(use_specs). - --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). - --endif. +%%---------------------------------------------------------------------------- description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_bindings(Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end). - -split_topic_key(Key) -> - string:tokens(binary_to_list(Key), "."). - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) - when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or - last_topic_match(P, [BacktrackNext | R], BacktrackList). +%% NB: This may return duplicate results in some situations (that's ok) +route(#exchange{name = X}, + #delivery{message = #basic_message{routing_key = Key}}) -> + Words = split_topic_key(Key), + mnesia:async_dirty(fun trie_match/2, [X, Words]). validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. -delete(_Tx, _X, _Bs) -> ok. -add_binding(_Tx, _X, _B) -> ok. -remove_bindings(_Tx, _X, _Bs) -> ok. + +recover(_Exchange, Bs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) + end). + +delete(true, #exchange{name = X}, _Bs) -> + trie_remove_all_edges(X), + trie_remove_all_bindings(X), + ok; +delete(false, _Exchange, _Bs) -> + ok. + +add_binding(true, _Exchange, Binding) -> + internal_add_binding(Binding); +add_binding(false, _Exchange, _Binding) -> + ok. + +remove_bindings(true, _X, Bs) -> + lists:foreach(fun remove_binding/1, Bs), + ok; +remove_bindings(false, _X, _Bs) -> + ok. + +remove_binding(#binding{source = X, key = K, destination = D}) -> + Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path), + ok. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). + +%%---------------------------------------------------------------------------- + +internal_add_binding(#binding{source = X, key = K, destination = D}) -> + FinalNode = follow_down_create(X, split_topic_key(K)), + trie_add_binding(X, FinalNode, D), + ok. + +trie_match(X, Words) -> + trie_match(X, root, Words, []). + +trie_match(X, Node, [], ResAcc) -> + trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [], + trie_bindings(X, Node) ++ ResAcc); +trie_match(X, Node, [W | RestW] = Words, ResAcc) -> + lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> + trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc) + end, ResAcc, [{W, fun trie_match/4, RestW}, + {"*", fun trie_match/4, RestW}, + {"#", fun trie_match_skip_any/4, Words}]). + +trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) -> + case trie_child(X, Node, Search) of + {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc); + error -> ResAcc + end. + +trie_match_skip_any(X, Node, [], ResAcc) -> + trie_match(X, Node, [], ResAcc); +trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) -> + trie_match_skip_any(X, Node, RestW, + trie_match(X, Node, Words, ResAcc)). + +follow_down_create(X, Words) -> + case follow_down_last_node(X, Words) of + {ok, FinalNode} -> FinalNode; + {error, Node, RestW} -> lists:foldl( + fun (W, CurNode) -> + NewNode = new_node_id(), + trie_add_edge(X, CurNode, NewNode, W), + NewNode + end, Node, RestW) + end. + +follow_down_last_node(X, Words) -> + follow_down(X, fun (_, Node, _) -> Node end, root, Words). + +follow_down_get_path(X, Words) -> + {ok, Path} = + follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end, + [{root, none}], Words), + Path. + +follow_down(X, AccFun, Acc0, Words) -> + follow_down(X, root, AccFun, Acc0, Words). + +follow_down(_X, _CurNode, _AccFun, Acc, []) -> + {ok, Acc}; +follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> + case trie_child(X, CurNode, W) of + {ok, NextNode} -> follow_down(X, NextNode, AccFun, + AccFun(W, NextNode, Acc), RestW); + error -> {error, Acc, Words} + end. + +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of + true -> ok; + false -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath) + end. + +trie_child(X, Node, Word) -> + case mnesia:read(rabbit_topic_trie_edge, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}) of + [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; + [] -> error + end. + +trie_bindings(X, Node) -> + MatchHead = #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = '$1'}}, + mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). + +trie_add_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). + +trie_remove_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). + +trie_edge_op(X, FromNode, ToNode, W, Op) -> + ok = Op(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = FromNode, + word = W}, + node_id = ToNode}, + write). + +trie_add_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:write/3). + +trie_remove_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:delete_object/3). + +trie_binding_op(X, Node, D, Op) -> + ok = Op(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = D}}, + write). + +trie_has_any_children(X, Node) -> + has_any(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_has_any_bindings(X, Node) -> + has_any(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_remove_all_edges(X) -> + remove_all(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + _ = '_'}, + _ = '_'}). + +trie_remove_all_bindings(X) -> + remove_all(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, _ = '_'}, + _ = '_'}). + +has_any(Table, MatchHead) -> + Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read), + select_while_no_result(Select) /= '$end_of_table'. + +select_while_no_result({[], Cont}) -> + select_while_no_result(mnesia:select(Cont)); +select_while_no_result(Other) -> + Other. + +remove_all(Table, Pattern) -> + lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, + mnesia:match_object(Table, Pattern, write)). + +new_node_id() -> + rabbit_guid:guid(). + +split_topic_key(Key) -> + split_topic_key(Key, [], []). + +split_topic_key(<<>>, [], []) -> + []; +split_topic_key(<<>>, RevWordAcc, RevResAcc) -> + lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [C | RevWordAcc], RevResAcc). + diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a9b4e17745..fc95b77b36 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -20,7 +20,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1]). + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]). -export([table_names/0]). @@ -54,6 +54,7 @@ -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). +-spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). -endif. @@ -185,6 +186,17 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_edge, + [{record_name, topic_trie_edge}, + {attributes, record_info(fields, topic_trie_edge)}, + {type, ordered_set}, + {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, + {rabbit_topic_trie_binding, + [{record_name, topic_trie_binding}, + {attributes, record_info(fields, topic_trie_binding)}, + {type, ordered_set}, + {match, #topic_trie_binding{trie_binding = trie_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, @@ -216,6 +228,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_edge_match() -> + #trie_edge{exchange_name = exchange_name_match(), + _='_'}. +trie_binding_match() -> + #trie_binding{exchange_name = exchange_name_match(), + _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> @@ -264,45 +282,48 @@ ensure_schema_integrity() -> check_schema_integrity() -> Tables = mnesia:system_info(tables), - case [Error || {Tab, TabDef} <- table_definitions(), - case lists:member(Tab, Tables) of - false -> - Error = {table_missing, Tab}, - true; - true -> - {_, ExpAttrs} = proplists:lookup(attributes, TabDef), - Attrs = mnesia:table_info(Tab, attributes), - Error = {table_attributes_mismatch, Tab, - ExpAttrs, Attrs}, - Attrs /= ExpAttrs - end] of - [] -> check_table_integrity(); - Errors -> {error, Errors} + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other end. -check_table_integrity() -> - ok = wait_for_tables(), - case lists:all(fun ({Tab, TabDef}) -> - {_, Match} = proplists:lookup(match, TabDef), - read_test_table(Tab, Match) - end, table_definitions()) of - true -> ok; - false -> {error, invalid_table_content} +check_table_attributes(Tab, TabDef) -> + {_, ExpAttrs} = proplists:lookup(attributes, TabDef), + case mnesia:table_info(Tab, attributes) of + ExpAttrs -> ok; + Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} end. -read_test_table(Tab, Match) -> +check_table_content(Tab, TabDef) -> + {_, Match} = proplists:lookup(match, TabDef), case mnesia:dirty_first(Tab) of '$end_of_table' -> - true; + ok; Key -> ObjList = mnesia:dirty_read(Tab, Key), MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), case ets:match_spec_run(ObjList, MatchComp) of - ObjList -> true; - _ -> false + ObjList -> ok; + _ -> {error, {table_content_invalid, Tab, Match, ObjList}} end end. +check_tables(Fun) -> + case [Error || {Tab, TabDef} <- table_definitions(), + case Fun(Tab, TabDef) of + ok -> Error = none, false; + {error, Error} -> true + end] of + [] -> ok; + Errors -> {error, Errors} + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -369,17 +390,15 @@ init_db(ClusterNodes, Force) -> case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of {[], true, [_]} -> %% True single disc node, attempt upgrade - ok = wait_for_tables(), case rabbit_upgrade:maybe_upgrade() of - ok -> ensure_schema_ok(); + ok -> ensure_schema_integrity(); version_not_available -> schema_ok_or_move() end; {[], true, _} -> %% "Master" (i.e. without config) disc node in cluster, %% verify schema - ok = wait_for_tables(), ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_ok(); + ensure_schema_integrity(); {[], false, _} -> %% Nothing there at all, start from scratch ok = create_schema(); @@ -396,7 +415,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ensure_schema_ok() + ensure_schema_integrity() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -429,12 +448,6 @@ ensure_version_ok({ok, DiscVersion}) -> ensure_version_ok({error, _}) -> ok = rabbit_upgrade:write_version(). -ensure_schema_ok() -> - case check_schema_integrity() of - ok -> ok; - {error, Reason} -> throw({error, {schema_invalid, Reason}}) - end. - create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -443,7 +456,6 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = wait_for_tables(), ok = rabbit_upgrade:write_version(). move_db() -> @@ -472,8 +484,7 @@ copy_db(Destination) -> mnesia:stop(), case rabbit_misc:recursive_copy(dir(), Destination) of ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = wait_for_tables(); + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia); {error, E} -> {error, E} end. @@ -541,7 +552,8 @@ wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; + ok -> + ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl deleted file mode 100644 index ebd7fe8a06..0000000000 --- a/src/rabbit_multi.erl +++ /dev/null @@ -1,349 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. -%% - --module(rabbit_multi). --include("rabbit.hrl"). - --export([start/0, stop/0]). - --define(RPC_SLEEP, 500). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start/0 :: () -> no_return()). --spec(stop/0 :: () -> 'ok'). --spec(usage/0 :: () -> no_return()). - --endif. - -%%---------------------------------------------------------------------------- - -start() -> - RpcTimeout = - case init:get_argument(maxwait) of - {ok,[[N1]]} -> 1000 * list_to_integer(N1); - _ -> ?MAX_WAIT - end, - case init:get_plain_arguments() of - [] -> - usage(); - FullCommand -> - {Command, Args} = parse_args(FullCommand), - case catch action(Command, Args, RpcTimeout) of - ok -> - io:format("done.~n"), - halt(); - {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join(FullCommand, " ")]), - usage(); - timeout -> - print_error("timeout starting some nodes.", []), - halt(1); - Other -> - print_error("~p", [Other]), - halt(2) - end - end. - -print_error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). - -parse_args([Command | Args]) -> - {list_to_atom(Command), Args}. - -stop() -> - ok. - -usage() -> - io:format("~s", [rabbit_multi_usage:usage()]), - halt(1). - -action(start_all, [NodeCount], RpcTimeout) -> - io:format("Starting all nodes...~n", []), - application:load(rabbit), - {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts( - getenv("RABBITMQ_NODENAME")), - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - throw({cannot_connect_to_epmd, NodeHost, EpmdReason}); - {ok, _} -> - ok - end, - {NodePids, Running} = - case list_to_integer(NodeCount) of - 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), - RpcTimeout), - {[NodePid], Started}; - N -> start_nodes(N, N, [], true, NodeName, - get_node_tcp_listener(), RpcTimeout) - end, - write_pids_file(NodePids), - case Running of - true -> ok; - false -> timeout - end; - -action(status, [], RpcTimeout) -> - io:format("Status of all running nodes...~n", []), - call_all_nodes( - fun ({Node, Pid}) -> - RabbitRunning = - case is_rabbit_running(Node, RpcTimeout) of - false -> not_running; - true -> running - end, - io:format("Node '~p' with Pid ~p: ~p~n", - [Node, Pid, RabbitRunning]) - end); - -action(stop_all, [], RpcTimeout) -> - io:format("Stopping all nodes...~n", []), - call_all_nodes(fun ({Node, Pid}) -> - io:format("Stopping node ~p~n", [Node]), - rpc:call(Node, rabbit, stop_and_halt, []), - case kill_wait(Pid, RpcTimeout, false) of - false -> kill_wait(Pid, RpcTimeout, true); - true -> ok - end, - io:format("OK~n", []) - end), - delete_pids_file(); - -action(rotate_logs, [], RpcTimeout) -> - action(rotate_logs, [""], RpcTimeout); - -action(rotate_logs, [Suffix], RpcTimeout) -> - io:format("Rotating logs for all nodes...~n", []), - BinarySuffix = list_to_binary(Suffix), - call_all_nodes( - fun ({Node, _}) -> - io:format("Rotating logs for node ~p", [Node]), - case rpc:call(Node, rabbit, rotate_logs, - [BinarySuffix], RpcTimeout) of - {badrpc, Error} -> io:format(": ~p.~n", [Error]); - ok -> io:format(": ok.~n", []) - end - end). - -%% PNodePid is the list of PIDs -%% Running is a boolean exhibiting success at some moment -start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running}; - -start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) -> - {NodePre, NodeSuff} = NodeNameBase, - NodeNumber = Total - N, - NodePre1 = case NodeNumber of - %% For compatibility with running a single node - 0 -> NodePre; - _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber) - end, - Node = rabbit_misc:makenode({NodePre1, NodeSuff}), - os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), - case Listener of - {NodeIpAddress, NodePortBase} -> - NodePort = NodePortBase + NodeNumber, - os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), - os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress); - undefined -> - ok - end, - {NodePid, Started} = start_node(Node, RpcTimeout), - start_nodes(N - 1, Total, [NodePid | PNodePid], - Started and Running, NodeNameBase, Listener, RpcTimeout). - -start_node(Node, RpcTimeout) -> - io:format("Starting node ~s...~n", [Node]), - case rpc:call(Node, os, getpid, []) of - {badrpc, _} -> - Port = run_rabbitmq_server(), - Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port), - Pid = case rpc:call(Node, os, getpid, []) of - {badrpc, _} -> throw(cannot_get_pid); - PidS -> list_to_integer(PidS) - end, - io:format("~s~n", [case Started of - true -> "OK"; - false -> "timeout" - end]), - {{Node, Pid}, Started}; - PidS -> - Pid = list_to_integer(PidS), - throw({node_already_running, Node, Pid}) - end. - -wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> - false; -wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> - case is_rabbit_running(Node, RpcTimeout) of - true -> true; - false -> receive - {'EXIT', Port, PosixCode} -> - throw({node_start_failed, PosixCode}) - after ?RPC_SLEEP -> - wait_for_rabbit_to_start( - Node, RpcTimeout - ?RPC_SLEEP, Port) - end - end. - -run_rabbitmq_server() -> - with_os([{unix, fun run_rabbitmq_server_unix/0}, - {win32, fun run_rabbitmq_server_win32/0}]). - -run_rabbitmq_server_unix() -> - CmdLine = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server -noinput", - erlang:open_port({spawn, CmdLine}, [nouse_stdio]). - -run_rabbitmq_server_win32() -> - Cmd = filename:nativename(os:find_executable("cmd")), - CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") ++ - "\\rabbitmq-server.bat\" -noinput -detached", - erlang:open_port({spawn_executable, Cmd}, - [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]}, - nouse_stdio]). - -is_rabbit_running(Node, RpcTimeout) -> - case rpc:call(Node, rabbit, status, [], RpcTimeout) of - {badrpc, _} -> false; - Status -> case proplists:get_value(running_applications, Status) of - undefined -> false; - Apps -> lists:keymember(rabbit, 1, Apps) - end - end. - -with_os(Handlers) -> - {OsFamily, _} = os:type(), - case proplists:get_value(OsFamily, Handlers) of - undefined -> throw({unsupported_os, OsFamily}); - Handler -> Handler() - end. - -pids_file() -> getenv("RABBITMQ_PIDS_FILE"). - -write_pids_file(Pids) -> - FileName = pids_file(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> - Device; - {error, Reason} -> - throw({cannot_create_pids_file, FileName, Reason}) - end, - try - ok = io:write(Handle, Pids), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({cannot_create_pids_file, FileName, Reason1}) - end - end, - ok. - -delete_pids_file() -> - FileName = pids_file(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({cannot_delete_pids_file, FileName, Reason}) - end. - -read_pids_file() -> - FileName = pids_file(), - case file:consult(FileName) of - {ok, [Pids]} -> Pids; - {error, enoent} -> []; - {error, Reason} -> throw({cannot_read_pids_file, FileName, Reason}) - end. - -kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 -> - Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9"; - true -> "kill" - end - end}, - %% Kill forcefully always on Windows, since erl.exe - %% seems to completely ignore non-forceful killing - %% even when everything is working - {win32, fun () -> "taskkill /f /pid" end}]), - os:cmd(Cmd ++ " " ++ integer_to_list(Pid)), - false; % Don't assume what we did just worked! - -% Returns true if the process is dead, false otherwise. -kill_wait(Pid, TimeLeft, Forceful) -> - timer:sleep(?RPC_SLEEP), - io:format(".", []), - is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful). - -% Test using some OS clunkiness since we shouldn't trust -% rpc:call(os, getpid, []) at this point -is_dead(Pid) -> - PidS = integer_to_list(Pid), - with_os([{unix, fun () -> - system("kill -0 " ++ PidS - ++ " >/dev/null 2>&1") /= 0 - end}, - {win32, fun () -> - Res = os:cmd("tasklist /nh /fi \"pid eq " ++ - PidS ++ "\" 2>&1"), - case re:run(Res, "erl\\.exe", [{capture, none}]) of - match -> false; - _ -> true - end - end}]). - -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. - -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). - -call_all_nodes(Func) -> - case read_pids_file() of - [] -> throw(no_nodes_running); - NodePids -> lists:foreach(Func, NodePids) - end. - -getenv(Var) -> - case os:getenv(Var) of - false -> throw({missing_env_var, Var}); - Value -> Value - end. - -get_node_tcp_listener() -> - try - {getenv("RABBITMQ_NODE_IP_ADDRESS"), - list_to_integer(getenv("RABBITMQ_NODE_PORT"))} - catch _ -> - case application:get_env(rabbit, tcp_listeners) of - {ok, [{_IpAddy, _Port} = Listener]} -> - Listener; - {ok, [Port]} when is_number(Port) -> - {"0.0.0.0", Port}; - {ok, []} -> - undefined; - {ok, Other} -> - throw({cannot_start_multiple_nodes, multiple_tcp_listeners, - Other}); - undefined -> - throw({missing_configuration, tcp_listeners}) - end - end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e9ff97f952..3908b64692 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -57,92 +57,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). -%% connection lifecycle -%% -%% all state transitions and terminations are marked with *...* -%% -%% The lifecycle begins with: start handshake_timeout timer, *pre-init* -%% -%% all states, unless specified otherwise: -%% socket error -> *exit* -%% socket close -> *throw* -%% writer send failure -> *throw* -%% forced termination -> *exit* -%% handshake_timeout -> *throw* -%% pre-init: -%% receive protocol header -> send connection.start, *starting* -%% starting: -%% receive connection.start_ok -> *securing* -%% securing: -%% check authentication credentials -%% if authentication success -> send connection.tune, *tuning* -%% if more challenge needed -> send connection.secure, -%% receive connection.secure_ok *securing* -%% otherwise send close, *exit* -%% tuning: -%% receive connection.tune_ok -> start heartbeats, *opening* -%% opening: -%% receive connection.open -> send connection.open_ok, *running* -%% running: -%% receive connection.close -> -%% tell channels to terminate gracefully -%% if no channels then send connection.close_ok, start -%% terminate_connection timer, *closed* -%% else *closing* -%% forced termination -%% -> wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *exit* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, mark channel as closing, *running* -%% handshake_timeout -> ignore, *running* -%% heartbeat timeout -> *throw* -%% conserve_memory=true -> *blocking* -%% blocking: -%% conserve_memory=true -> *blocking* -%% conserve_memory=false -> *running* -%% receive a method frame for a content-bearing method -%% -> process, stop receiving, *blocked* -%% ...rest same as 'running' -%% blocked: -%% conserve_memory=true -> *blocked* -%% conserve_memory=false -> resume receiving, *running* -%% ...rest same as 'running' -%% closing: -%% socket close -> *terminate* -%% receive connection.close -> send connection.close_ok, -%% *closing* -%% receive frame -> ignore, *closing* -%% handshake_timeout -> ignore, *closing* -%% heartbeat timeout -> *throw* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, mark channel as closing -%% if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% else *closing* -%% channel exits normally -%% -> if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% closed: -%% socket close -> *terminate* -%% receive connection.close -> send connection.close_ok, -%% *closed* -%% receive connection.close_ok -> self() ! terminate_connection, -%% *closed* -%% receive frame -> ignore, *closed* -%% terminate_connection timeout -> *terminate* -%% handshake_timeout -> ignore, *closed* -%% heartbeat timeout -> *throw* -%% channel exit -> log error, *closed* -%% -%% -%% TODO: refactor the code so that the above is obvious - -define(IS_RUNNING(State), (State#v1.connection_state =:= running orelse State#v1.connection_state =:= blocking orelse @@ -337,6 +251,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({inet_error, Reason}); {conserve_memory, Conserve} -> mainloop(Deb, internal_conserve_memory(Conserve, State)); + {channel_closing, ChPid} -> + ok = rabbit_channel:ready_for_close(ChPid), + channel_cleanup(ChPid), + mainloop(Deb, State); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -444,32 +362,32 @@ close_connection(State = #v1{queue_collector = Collector, erlang:send_after(TimeoutMillisec, self(), terminate_connection), State#v1{connection_state = closed}. -close_channel(Channel, State) -> - put({channel, Channel}, closing), - State. - handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - erase({ch_pid, ChPid}), + channel_cleanup(ChPid), maybe_close(State); uncontrolled -> case channel_cleanup(ChPid) of undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> maybe_close( + Channel -> rabbit_log:error( + "connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close( handle_exception(State, Channel, Reason)) end end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of - undefined -> undefined; - Channel -> erase({channel, Channel}), - erase({ch_pid, ChPid}), - Channel + undefined -> undefined; + {Channel, MRef} -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + Channel end. -all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. terminate_channels() -> NChannels = @@ -524,8 +442,8 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. -termination_kind(normal) -> controlled; -termination_kind(_) -> uncontrolled. +termination_kind(normal) -> controlled; +termination_kind(_) -> uncontrolled. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, @@ -561,8 +479,8 @@ handle_frame(Type, Channel, Payload, Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), case AnalyzedFrame of - {method, 'channel.close', _} -> - erase({channel, Channel}), + {method, 'channel.close_ok', _} -> + channel_cleanup(ChPid), State; {method, MethodName, _} -> case (State#v1.connection_state =:= blocking @@ -574,25 +492,6 @@ handle_frame(Type, Channel, Payload, _ -> State end; - closing -> - %% According to the spec, after sending a - %% channel.close we must ignore all frames except - %% channel.close and channel.close_ok. In the - %% event of a channel.close, we should send back a - %% channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erase({channel, Channel}); - {method, 'channel.close', _} -> - %% We're already closing this channel, so - %% there's no cleanup to do (notify - %% queues, etc.) - ok = rabbit_writer:internal_send_command( - State#v1.sock, Channel, - #'channel.close_ok'{}, Protocol); - _ -> ok - end, - State; undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -969,13 +868,13 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), - erlang:monitor(process, ChPid), + MRef = erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), - put({ch_pid, ChPid}, Channel), + put({ch_pid, ChPid}, {Channel, MRef}), State. process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> @@ -991,29 +890,20 @@ process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> AState end. -log_channel_error(ConnectionState, Channel, Reason) -> - rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", - [self(), ConnectionState, Channel, Reason]). - -handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log_channel_error(closed, Channel, Reason), +handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> State; -handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> - log_channel_error(CS, Channel, Reason), +handle_exception(State, Channel, Reason) -> send_exception(State, Channel, Reason). send_exception(State = #v1{connection = #connection{protocol = Protocol}}, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = + {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - NewState = case ShouldClose of - true -> terminate_channels(), - close_connection(State); - false -> close_channel(Channel, State) - end, + terminate_channels(), + State1 = close_connection(State), ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod, Protocol), - NewState. + State1#v1.sock, 0, CloseMethod, Protocol), + State1. internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2015170aed..09695d95e3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -585,32 +585,131 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). -test_topic_match(P, R) -> - test_topic_match(P, R, true). - -test_topic_match(P, R, Expected) -> - case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), - list_to_binary(R)) of - Expected -> - passed; - _ -> - {topic_match_failure, P, R} - end. - test_topic_matching() -> - passed = test_topic_match("#", "test.test"), - passed = test_topic_match("#", ""), - passed = test_topic_match("#.T.R", "T.T.R"), - passed = test_topic_match("#.T.R", "T.R.T.R"), - passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"), - passed = test_topic_match("#.test", "test"), - passed = test_topic_match("#.test", "test.test"), - passed = test_topic_match("#.test", "ignored.test"), - passed = test_topic_match("#.test", "more.ignored.test"), - passed = test_topic_match("#.test", "notmatched", false), - passed = test_topic_match("#.z", "one.two.three.four", false), + XName = #resource{virtual_host = <<"/">>, + kind = exchange, + name = <<"test_exchange">>}, + X = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + %% create + rabbit_exchange_type_topic:validate(X), + exchange_op_callback(X, create, []), + + %% add some bindings + Bindings = lists:map( + fun ({Key, Q}) -> + #binding{source = XName, + key = list_to_binary(Key), + destination = #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)}} + end, [{"a.b.c", "t1"}, + {"a.*.c", "t2"}, + {"a.#.b", "t3"}, + {"a.b.b.c", "t4"}, + {"#", "t5"}, + {"#.#", "t6"}, + {"#.b", "t7"}, + {"*.*", "t8"}, + {"a.*", "t9"}, + {"*.b.c", "t10"}, + {"a.#", "t11"}, + {"a.#.#", "t12"}, + {"b.b.c", "t13"}, + {"a.b.b", "t14"}, + {"a.b", "t15"}, + {"b.c", "t16"}, + {"", "t17"}, + {"*.*.*", "t18"}, + {"vodka.martini", "t19"}, + {"a.b.c", "t20"}, + {"*.#", "t21"}, + {"#.*.#", "t22"}, + {"*.#.#", "t23"}, + {"#.#.#", "t24"}, + {"*", "t25"}, + {"#.b.#", "t26"}]), + lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, + Bindings), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", + "t18", "t20", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", + "t12", "t15", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", + "t18", "t21", "t22", "t23", "t24", "t26"]}, + {"", ["t5", "t6", "t17", "t24"]}, + {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23", + "t24"]}, + {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", + "t24"]}, + {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23", + "t24"]}, + {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22", + "t23", "t24", "t26"]}, + {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, + {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25"]}]), + + %% remove some bindings + RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), + lists:nth(11, Bindings), lists:nth(19, Bindings), + lists:nth(21, Bindings)], + exchange_op_callback(X, remove_bindings, [RemovedBindings]), + RemainingBindings = ordsets:to_list( + ordsets:subtract(ordsets:from_list(Bindings), + ordsets:from_list(RemovedBindings))), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", + "t23", "t24", "t26"]}, + {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", + "t22", "t23", "t24", "t26"]}, + {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", + "t23", "t24", "t26"]}, + {"", ["t6", "t17", "t24"]}, + {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, + {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, + {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, + {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, + {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", + "t24", "t26"]}, + {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + + %% remove the entire exchange + exchange_op_callback(X, delete, [RemainingBindings]), + %% none should match now + test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. +exchange_op_callback(X, Fun, ExtraArgs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), + rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + +test_topic_expect_match(X, List) -> + lists:foreach( + fun ({Key, Expected}) -> + BinKey = list_to_binary(Key), + Res = rabbit_exchange_type_topic:route( + X, #delivery{message = #basic_message{routing_key = + BinKey}}), + ExpectedRes = lists:map( + fun (Q) -> #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)} + end, Expected), + true = (lists:usort(ExpectedRes) =:= lists:usort(Res)) + end, List). + test_app_management() -> %% starting, stopping, status ok = control_action(stop_app, []), @@ -1019,9 +1118,9 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - user(<<"user">>), <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + 1, self(), Writer, rabbit_framing_amqp_0_9_1, user(<<"user">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1079,9 +1178,9 @@ test_server_status() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, user(<<"guest">>), - <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + 1, Me, Writer, rabbit_framing_amqp_0_9_1, user(<<"guest">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) @@ -1305,7 +1404,7 @@ test_queue_cleanup(_SecondaryNode) -> rabbit_channel:do(Ch, #'queue.declare'{ passive = true, queue = ?CLEANUP_QUEUE_NAME }), receive - {channel_exit, 1, {amqp_error, not_found, _, _}} -> + #'channel.close'{reply_code = 404} -> ok after 2000 -> throw(failed_to_receive_channel_exit) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b0a715233a..89acc10c91 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -98,7 +98,6 @@ vertices(Module, Steps) -> edges(_Module, Steps) -> [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. - unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 68b88b3e45..b9dbe418d9 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -25,6 +25,7 @@ -rabbit_upgrade({add_ip_to_listener, []}). -rabbit_upgrade({internal_exchanges, []}). -rabbit_upgrade({user_to_internal_user, [hash_passwords]}). +-rabbit_upgrade({topic_trie, []}). %% ------------------------------------------------------------------- @@ -35,6 +36,7 @@ -spec(add_ip_to_listener/0 :: () -> 'ok'). -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). +-spec(topic_trie/0 :: () -> 'ok'). -endif. @@ -47,7 +49,7 @@ %% point. remove_user_scope() -> - mnesia( + transform( rabbit_user_permission, fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) -> {user_permission, UV, {permission, Conf, Write, Read}} @@ -55,7 +57,7 @@ remove_user_scope() -> [user_vhost, permission]). hash_passwords() -> - mnesia( + transform( rabbit_user, fun ({user, Username, Password, IsAdmin}) -> Hash = rabbit_auth_backend_internal:hash_password(Password), @@ -64,7 +66,7 @@ hash_passwords() -> [username, password_hash, is_admin]). add_ip_to_listener() -> - mnesia( + transform( rabbit_listener, fun ({listener, Node, Protocol, Host, Port}) -> {listener, Node, Protocol, Host, {0,0,0,0}, Port} @@ -77,27 +79,41 @@ internal_exchanges() -> fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> {exchange, Name, Type, Durable, AutoDelete, false, Args} end, - [ ok = mnesia(T, - AddInternalFun, - [name, type, durable, auto_delete, internal, arguments]) + [ ok = transform(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) || T <- Tables ], ok. user_to_internal_user() -> - mnesia( + transform( rabbit_user, fun({user, Username, PasswordHash, IsAdmin}) -> {internal_user, Username, PasswordHash, IsAdmin} end, [username, password_hash, is_admin], internal_user). +topic_trie() -> + create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, + {attributes, [trie_edge, node_id]}, + {type, ordered_set}]), + create(rabbit_topic_trie_binding, [{record_name, topic_trie_binding}, + {attributes, [trie_binding, value]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- -mnesia(TableName, Fun, FieldList) -> +transform(TableName, Fun, FieldList) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. -mnesia(TableName, Fun, FieldList, NewRecordName) -> +transform(TableName, Fun, FieldList, NewRecordName) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, NewRecordName), ok. + +create(Tab, TabDef) -> + {atomic, ok} = mnesia:create_table(Tab, TabDef), + ok. |
