diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 16 | ||||
| -rw-r--r-- | src/rabbit.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism_amqplain.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism_cr_demo.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism_plain.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 62 | ||||
| -rw-r--r-- | src/rabbit_event.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 258 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 82 | ||||
| -rw-r--r-- | src/rabbit_msg_file.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 145 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 38 |
23 files changed, 683 insertions, 258 deletions
diff --git a/src/gm.erl b/src/gm.erl index b3fb7eca53..70633a085f 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -55,14 +55,14 @@ %% start_link/3 %% Provide the group name, the callback module name, and any arguments %% you wish to be passed into the callback module's functions. The -%% joined/1 will be called when we have joined the group, and the list -%% of arguments will have appended to it a list of the current members -%% of the group. See the comments in behaviour_info/1 below for -%% further details of the callback functions. +%% joined/2 function will be called when we have joined the group, +%% with the arguments passed to start_link and a list of the current +%% members of the group. See the comments in behaviour_info/1 below +%% for further details of the callback functions. %% %% leave/1 %% Provide the Pid. Removes the Pid from the group. The callback -%% terminate/1 function will be called. +%% terminate/2 function will be called. %% %% broadcast/2 %% Provide the Pid and a Message. The message will be sent to all @@ -455,16 +455,16 @@ behaviour_info(callbacks) -> %% quickly, it's possible that we will never see that member %% appear in either births or deaths. However we are guaranteed %% that (1) we will see a member joining either in the births - %% here, or in the members passed to joined/1 before receiving + %% here, or in the members passed to joined/2 before receiving %% any messages from it; and (2) we will not see members die that %% we have not seen born (or supplied in the members to - %% joined/1). + %% joined/2). {members_changed, 3}, %% Supplied with Args provided in start_link, the sender, and the %% message. This does get called for messages injected by this %% member, however, in such cases, there is no special - %% significance of this call: it does not indicate that the + %% significance of this invocation: it does not indicate that the %% message has made it to any other members, let alone all other %% members. {handle_msg, 3}, diff --git a/src/rabbit.erl b/src/rabbit.erl index fb5207b960..4b706b413c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -44,6 +44,7 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, + {requires, file_handle_cache}, {enables, external_infrastructure}]}). -rabbit_boot_step({file_handle_cache, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 01f7fa6fbf..6aed2f8741 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -448,27 +448,22 @@ confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> fun(Guid, {CMs, GTC0}) -> case dict:find(Guid, GTC0) of {ok, {ChPid, MsgSeqNo}} -> - {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)}; + {gb_trees_cons(ChPid, MsgSeqNo, CMs), + dict:erase(Guid, GTC0)}; _ -> {CMs, GTC0} end - end, {[], GTC}, Guids), - case lists:usort(CMs) of - [{Ch, MsgSeqNo} | CMs1] -> - [rabbit_channel:confirm(ChPid, MsgSeqNos) || - {ChPid, MsgSeqNos} <- group_confirms_by_channel( - CMs1, [{Ch, [MsgSeqNo]}])]; - [] -> - ok - end, + end, {gb_trees:empty(), GTC}, Guids), + gb_trees:map(fun(ChPid, MsgSeqNos) -> + rabbit_channel:confirm(ChPid, MsgSeqNos) + end, CMs), State#q{guid_to_channel = GTC1}. -group_confirms_by_channel([], Acc) -> - Acc; -group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> - group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]); -group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) -> - group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]). +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> {no_confirm, State}; diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index 1d14f9f0b8..897199ee78 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -23,6 +23,10 @@ behaviour_info(callbacks) -> %% A description. {description, 0}, + %% If this mechanism is enabled, should it be offered for a given socket? + %% (primarily so EXTERNAL can be SSL-only) + {should_offer, 1}, + %% Called before authentication starts. Should create a state %% object to be passed through all the stages of authentication. {init, 1}, diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index 5e422eee89..2168495db8 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_auth_mechanism). --export([description/0, init/1, handle_response/2]). +-export([description/0, should_offer/1, init/1, handle_response/2]). -include("rabbit_auth_mechanism_spec.hrl"). @@ -38,6 +38,9 @@ description() -> [{name, <<"AMQPLAIN">>}, {description, <<"QPid AMQPLAIN mechanism">>}]. +should_offer(_Sock) -> + true. + init(_Sock) -> []. diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl index 7fd20f8b32..77aa34ea0a 100644 --- a/src/rabbit_auth_mechanism_cr_demo.erl +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_auth_mechanism). --export([description/0, init/1, handle_response/2]). +-export([description/0, should_offer/1, init/1, handle_response/2]). -include("rabbit_auth_mechanism_spec.hrl"). @@ -43,6 +43,9 @@ description() -> {description, <<"RabbitMQ Demo challenge-response authentication " "mechanism">>}]. +should_offer(_Sock) -> + true. + init(_Sock) -> #state{}. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 1ca07018e4..e2f9bff9c5 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_auth_mechanism). --export([description/0, init/1, handle_response/2]). +-export([description/0, should_offer/1, init/1, handle_response/2]). -include("rabbit_auth_mechanism_spec.hrl"). @@ -41,6 +41,9 @@ description() -> [{name, <<"PLAIN">>}, {description, <<"SASL PLAIN authentication mechanism">>}]. +should_offer(_Sock) -> + true. + init(_Sock) -> []. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index c5bd9575e3..57aad80888 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,10 +18,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/5]). +-export([publish/1, message/3, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). --export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -41,8 +40,11 @@ rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> - (rabbit_types:message() | rabbit_types:error(any()))). + properties_input(), binary()) -> rabbit_types:message()). +-spec(message/3 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> + rabbit_types:ok_or_error2(rabbit_types:message(), any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: @@ -56,9 +58,6 @@ rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). --spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) -> - (boolean() | - {'invalid', non_neg_integer()})). -endif. @@ -98,19 +97,40 @@ from_content(Content) -> rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. -message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> +%% This breaks the spec rule forbidding message modification +strip_header(#content{properties = #'P_basic'{headers = undefined}} + = DecodedContent, _Key) -> + DecodedContent; +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} + = DecodedContent, Key) -> + case lists:keysearch(Key, 1, Headers) of + false -> DecodedContent; + {value, Found} -> Headers0 = lists:delete(Found, Headers), + rabbit_binary_generator:clear_encoded_content( + DecodedContent#content{ + properties = Props#'P_basic'{ + headers = Headers0}}) + end. + +message(ExchangeName, RoutingKey, + #content{properties = Props} = DecodedContent) -> + try + {ok, #basic_message{ + exchange_name = ExchangeName, + content = strip_header(DecodedContent, ?DELETED_HEADER), + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent), + routing_keys = [RoutingKey | + header_routes(Props#'P_basic'.headers)]}} + catch + {error, _Reason} = Error -> Error + end. + +message(ExchangeName, RoutingKey, RawProperties, BodyBin) -> Properties = properties(RawProperties), Content = build_content(Properties, BodyBin), - case is_message_persistent(Content) of - {invalid, Other} -> - {error, {invalid_delivery_mode, Other}}; - IsPersistent when is_boolean(IsPersistent) -> - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent} - end. + {ok, Msg} = message(ExchangeName, RoutingKey, Content), + Msg. properties(P = #'P_basic'{}) -> P; @@ -152,5 +172,18 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> {invalid, Other} + Other -> throw({error, {delivery_mode_unknown, Other}}) end. + +% Extract CC routes from headers +header_routes(undefined) -> + []; +header_routes(HeadersTable) -> + lists:append( + [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {array, Routes} -> [Route || {longstr, Route} <- Routes]; + undefined -> []; + {Type, _Val} -> throw({error, {unacceptable_type_in_header, + Type, + binary_to_list(HeaderKey)}}) + end || HeaderKey <- ?ROUTING_HEADERS]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 34a5e5a440..e92421fc9e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -254,7 +254,7 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> @@ -593,32 +593,33 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), - IsPersistent = is_message_persistent(DecodedContent), {MsgSeqNo, State1} = case ConfirmEnabled of false -> {undefined, State}; true -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish( - Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, - MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, - MsgSeqNo, Message, State1), - maybe_incr_stats([{ExchangeName, 1} | - [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - {noreply, case TxnKey of - none -> State2; - _ -> add_tx_participants(DeliveredQPids, State2) - end}; + case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of + {ok, Message} -> + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + MsgSeqNo)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + ExchangeName, MsgSeqNo, Message, + State1), + maybe_incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish, State2), + {noreply, case TxnKey of + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) + end}; + {error, Reason} -> + rabbit_misc:protocol_error(precondition_failed, + "invalid message: ~p", [Reason]) + end; handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, @@ -658,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, {ok, MessageCount, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), @@ -1123,7 +1124,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, end. basic_return(#basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}, #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), @@ -1274,22 +1275,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(Content) -> - case rabbit_basic:is_message_persistent(Content) of - {invalid, Other} -> - rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false; - IsPersistent when is_boolean(IsPersistent) -> - IsPersistent - end. - process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_not_delivered, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 40ade4b742..40651d36f7 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -130,15 +130,8 @@ notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> - try - %% TODO: switch to os:timestamp() when we drop support for - %% Erlang/OTP < R13B01 - gen_event:notify(rabbit_event, #event{type = Type, - props = Props, - timestamp = now()}) - catch error:badarg -> - %% badarg means rabbit_event is no longer registered. We never - %% unregister it so the great likelihood is that we're shutting - %% down the broker but some events were backed up. Ignore it. - ok - end. + %% TODO: switch to os:timestamp() when we drop support for + %% Erlang/OTP < R13B01 + gen_event:notify(rabbit_event, #event{type = Type, + props = Props, + timestamp = now()}). diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index c51b0913a0..349c2f6ee4 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -36,8 +36,8 @@ description() -> {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_routing_key(Name, RoutingKey). + #delivery{message = #basic_message{routing_keys = Routes}}) -> + rabbit_router:match_routing_key(Name, Routes). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 382fb6270d..bc5293c81d 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -36,7 +36,7 @@ description() -> {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, _Delivery) -> - rabbit_router:match_routing_key(Name, '_'). + rabbit_router:match_routing_key(Name, ['_']). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 9cbf8100e2..2363d05e01 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -15,6 +15,7 @@ %% -module(rabbit_exchange_type_topic). + -include("rabbit.hrl"). -behaviour(rabbit_exchange_type). @@ -31,58 +32,225 @@ {requires, rabbit_registry}, {enables, kernel_ready}]}). --export([topic_matches/2]). - --ifdef(use_specs). - --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). - --endif. +%%---------------------------------------------------------------------------- description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_bindings(Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end). - -split_topic_key(Key) -> - string:tokens(binary_to_list(Key), "."). - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) - when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or - last_topic_match(P, [BacktrackNext | R], BacktrackList). +%% NB: This may return duplicate results in some situations (that's ok) +route(#exchange{name = X}, + #delivery{message = #basic_message{routing_keys = Routes}}) -> + lists:append([begin + Words = split_topic_key(RKey), + mnesia:async_dirty(fun trie_match/2, [X, Words]) + end || RKey <- Routes]). validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. -delete(_Tx, _X, _Bs) -> ok. -add_binding(_Tx, _X, _B) -> ok. -remove_bindings(_Tx, _X, _Bs) -> ok. + +recover(_Exchange, Bs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) + end). + +delete(true, #exchange{name = X}, _Bs) -> + trie_remove_all_edges(X), + trie_remove_all_bindings(X), + ok; +delete(false, _Exchange, _Bs) -> + ok. + +add_binding(true, _Exchange, Binding) -> + internal_add_binding(Binding); +add_binding(false, _Exchange, _Binding) -> + ok. + +remove_bindings(true, _X, Bs) -> + lists:foreach(fun remove_binding/1, Bs), + ok; +remove_bindings(false, _X, _Bs) -> + ok. + +remove_binding(#binding{source = X, key = K, destination = D}) -> + Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path), + ok. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). + +%%---------------------------------------------------------------------------- + +internal_add_binding(#binding{source = X, key = K, destination = D}) -> + FinalNode = follow_down_create(X, split_topic_key(K)), + trie_add_binding(X, FinalNode, D), + ok. + +trie_match(X, Words) -> + trie_match(X, root, Words, []). + +trie_match(X, Node, [], ResAcc) -> + trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [], + trie_bindings(X, Node) ++ ResAcc); +trie_match(X, Node, [W | RestW] = Words, ResAcc) -> + lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> + trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc) + end, ResAcc, [{W, fun trie_match/4, RestW}, + {"*", fun trie_match/4, RestW}, + {"#", fun trie_match_skip_any/4, Words}]). + +trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) -> + case trie_child(X, Node, Search) of + {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc); + error -> ResAcc + end. + +trie_match_skip_any(X, Node, [], ResAcc) -> + trie_match(X, Node, [], ResAcc); +trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) -> + trie_match_skip_any(X, Node, RestW, + trie_match(X, Node, Words, ResAcc)). + +follow_down_create(X, Words) -> + case follow_down_last_node(X, Words) of + {ok, FinalNode} -> FinalNode; + {error, Node, RestW} -> lists:foldl( + fun (W, CurNode) -> + NewNode = new_node_id(), + trie_add_edge(X, CurNode, NewNode, W), + NewNode + end, Node, RestW) + end. + +follow_down_last_node(X, Words) -> + follow_down(X, fun (_, Node, _) -> Node end, root, Words). + +follow_down_get_path(X, Words) -> + {ok, Path} = + follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end, + [{root, none}], Words), + Path. + +follow_down(X, AccFun, Acc0, Words) -> + follow_down(X, root, AccFun, Acc0, Words). + +follow_down(_X, _CurNode, _AccFun, Acc, []) -> + {ok, Acc}; +follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> + case trie_child(X, CurNode, W) of + {ok, NextNode} -> follow_down(X, NextNode, AccFun, + AccFun(W, NextNode, Acc), RestW); + error -> {error, Acc, Words} + end. + +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of + true -> ok; + false -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath) + end. + +trie_child(X, Node, Word) -> + case mnesia:read(rabbit_topic_trie_edge, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}) of + [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; + [] -> error + end. + +trie_bindings(X, Node) -> + MatchHead = #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = '$1'}}, + mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). + +trie_add_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). + +trie_remove_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). + +trie_edge_op(X, FromNode, ToNode, W, Op) -> + ok = Op(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = FromNode, + word = W}, + node_id = ToNode}, + write). + +trie_add_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:write/3). + +trie_remove_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:delete_object/3). + +trie_binding_op(X, Node, D, Op) -> + ok = Op(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = D}}, + write). + +trie_has_any_children(X, Node) -> + has_any(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_has_any_bindings(X, Node) -> + has_any(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_remove_all_edges(X) -> + remove_all(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + _ = '_'}, + _ = '_'}). + +trie_remove_all_bindings(X) -> + remove_all(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, _ = '_'}, + _ = '_'}). + +has_any(Table, MatchHead) -> + Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read), + select_while_no_result(Select) /= '$end_of_table'. + +select_while_no_result({[], Cont}) -> + select_while_no_result(mnesia:select(Cont)); +select_while_no_result(Other) -> + Other. + +remove_all(Table, Pattern) -> + lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, + mnesia:match_object(Table, Pattern, write)). + +new_node_id() -> + rabbit_guid:guid(). + +split_topic_key(Key) -> + split_topic_key(Key, [], []). + +split_topic_key(<<>>, [], []) -> + []; +split_topic_key(<<>>, RevWordAcc, RevResAcc) -> + lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [C | RevWordAcc], RevResAcc). + diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index b1488365db..d2860b2f29 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -20,7 +20,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1]). + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]). -export([table_names/0]). @@ -54,6 +54,7 @@ -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). +-spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). -endif. @@ -185,6 +186,17 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_edge, + [{record_name, topic_trie_edge}, + {attributes, record_info(fields, topic_trie_edge)}, + {type, ordered_set}, + {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, + {rabbit_topic_trie_binding, + [{record_name, topic_trie_binding}, + {attributes, record_info(fields, topic_trie_binding)}, + {type, ordered_set}, + {match, #topic_trie_binding{trie_binding = trie_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, @@ -217,6 +229,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_edge_match() -> + #trie_edge{exchange_name = exchange_name_match(), + _='_'}. +trie_binding_match() -> + #trie_binding{exchange_name = exchange_name_match(), + _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> @@ -265,45 +283,48 @@ ensure_schema_integrity() -> check_schema_integrity() -> Tables = mnesia:system_info(tables), - case [Error || {Tab, TabDef} <- table_definitions(), - case lists:member(Tab, Tables) of - false -> - Error = {table_missing, Tab}, - true; - true -> - {_, ExpAttrs} = proplists:lookup(attributes, TabDef), - Attrs = mnesia:table_info(Tab, attributes), - Error = {table_attributes_mismatch, Tab, - ExpAttrs, Attrs}, - Attrs /= ExpAttrs - end] of - [] -> check_table_integrity(); - Errors -> {error, Errors} + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other end. -check_table_integrity() -> - ok = wait_for_tables(), - case lists:all(fun ({Tab, TabDef}) -> - {_, Match} = proplists:lookup(match, TabDef), - read_test_table(Tab, Match) - end, table_definitions()) of - true -> ok; - false -> {error, invalid_table_content} +check_table_attributes(Tab, TabDef) -> + {_, ExpAttrs} = proplists:lookup(attributes, TabDef), + case mnesia:table_info(Tab, attributes) of + ExpAttrs -> ok; + Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} end. -read_test_table(Tab, Match) -> +check_table_content(Tab, TabDef) -> + {_, Match} = proplists:lookup(match, TabDef), case mnesia:dirty_first(Tab) of '$end_of_table' -> - true; + ok; Key -> ObjList = mnesia:dirty_read(Tab, Key), MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), case ets:match_spec_run(ObjList, MatchComp) of - ObjList -> true; - _ -> false + ObjList -> ok; + _ -> {error, {table_content_invalid, Tab, Match, ObjList}} end end. +check_tables(Fun) -> + case [Error || {Tab, TabDef} <- table_definitions(), + case Fun(Tab, TabDef) of + ok -> Error = none, false; + {error, Error} -> true + end] of + [] -> ok; + Errors -> {error, Errors} + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -370,7 +391,6 @@ init_db(ClusterNodes, Force) -> case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of {[], true, [_]} -> %% True single disc node, attempt upgrade - ok = wait_for_tables(), case rabbit_upgrade:maybe_upgrade() of ok -> ensure_schema_integrity(); version_not_available -> schema_ok_or_move() @@ -465,8 +485,7 @@ copy_db(Destination) -> mnesia:stop(), case rabbit_misc:recursive_copy(dir(), Destination) of ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = wait_for_tables(); + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia); {error, E} -> {error, E} end. @@ -534,7 +553,8 @@ wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; + ok -> + ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index cfea4982b4..55e6ac47c1 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -16,7 +16,7 @@ -module(rabbit_msg_file). --export([append/3, read/2, scan/2]). +-export([append/3, read/2, scan/4]). %%---------------------------------------------------------------------------- @@ -45,9 +45,9 @@ -spec(read/2 :: (io_device(), msg_size()) -> rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()}, any())). --spec(scan/2 :: (io_device(), file_size()) -> - {'ok', [{rabbit_guid:guid(), msg_size(), position()}], - position()}). +-spec(scan/4 :: (io_device(), file_size(), + fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A), + A) -> {'ok', A, position()}). -endif. @@ -79,43 +79,44 @@ read(FileHdl, TotalSize) -> KO -> KO end. -scan(FileHdl, FileSize) when FileSize >= 0 -> - scan(FileHdl, FileSize, <<>>, 0, [], 0). +scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 -> + scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc). -scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) -> +scan(_FileHdl, FileSize, _Data, FileSize, ScanOffset, _Fun, Acc) -> {ok, Acc, ScanOffset}; -scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) -> +scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) -> Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]), case file_handle_cache:read(FileHdl, Read) of {ok, Data1} -> {Data2, Acc1, ScanOffset1} = - scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset), + scanner(<<Data/binary, Data1/binary>>, ScanOffset, Fun, Acc), ReadOffset1 = ReadOffset + size(Data1), - scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1); + scan(FileHdl, FileSize, Data2, ReadOffset1, ScanOffset1, Fun, Acc1); _KO -> {ok, Acc, ScanOffset} end. -scan(<<>>, Acc, Offset) -> - {<<>>, Acc, Offset}; -scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> - {<<>>, Acc, Offset}; %% Nothing to do other than stop. -scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, - WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) -> - TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, - case WriteMarker of - ?WRITE_OK_MARKER -> - %% Here we take option 5 from - %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in - %% which we read the Guid as a number, and then convert it - %% back to a binary in order to work around bugs in - %% Erlang's GC. - <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> = - <<GuidAndMsg:Size/binary>>, - <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, - scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); - _ -> - scan(Rest, Acc, Offset + TotalSize) - end; -scan(Data, Acc, Offset) -> - {Data, Acc, Offset}. +scanner(<<>>, Offset, _Fun, Acc) -> + {<<>>, Acc, Offset}; +scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) -> + {<<>>, Acc, Offset}; %% Nothing to do other than stop. +scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, + WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) -> + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + case WriteMarker of + ?WRITE_OK_MARKER -> + %% Here we take option 5 from + %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in + %% which we read the Guid as a number, and then convert it + %% back to a binary in order to work around bugs in + %% Erlang's GC. + <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> = + <<GuidAndMsg:Size/binary>>, + <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, + scanner(Rest, Offset + TotalSize, Fun, + Fun({Guid, TotalSize, Offset, Msg}, Acc)); + _ -> + scanner(Rest, Offset + TotalSize, Fun, Acc) + end; +scanner(Data, Offset, _Fun, Acc) -> + {Data, Acc, Offset}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 7f3cf35faa..9e65e44237 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -26,6 +26,8 @@ -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal +-export([transform_dir/3, force_recovery/2]). %% upgrade + -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -33,9 +35,10 @@ -include("rabbit_msg_store.hrl"). --define(SYNC_INTERVAL, 25). %% milliseconds +-define(SYNC_INTERVAL, 5). %% milliseconds -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). +-define(TRANSFORM_TMP, "transform_tmp"). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read]). @@ -160,6 +163,9 @@ -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> deletion_thunk()). -spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()). +-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok'). +-spec(transform_dir/3 :: (file:filename(), server(), + fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'). -endif. @@ -1523,7 +1529,8 @@ scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( Hdl, filelib:file_size( - form_filename(Dir, FileName))), + form_filename(Dir, FileName)), + fun scan_fun/2, []), %% if something really bad has happened, %% the close could fail, but ignore file_handle_cache:close(Hdl), @@ -1532,6 +1539,9 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. +scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) -> + [{Guid, TotalSize, Offset} | Acc]. + %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. @@ -1956,3 +1966,47 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {got, FinalOffsetZ}, {destination, Destination}]} end. + +force_recovery(BaseDir, Store) -> + Dir = filename:join(BaseDir, atom_to_list(Store)), + file:delete(filename:join(Dir, ?CLEAN_FILENAME)), + recover_crashed_compactions(BaseDir), + ok. + +foreach_file(D, Fun, Files) -> + [Fun(filename:join(D, File)) || File <- Files]. + +foreach_file(D1, D2, Fun, Files) -> + [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. + +transform_dir(BaseDir, Store, TransformFun) -> + Dir = filename:join(BaseDir, atom_to_list(Store)), + TmpDir = filename:join(Dir, ?TRANSFORM_TMP), + TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end, + case filelib:is_dir(TmpDir) of + true -> throw({error, transform_failed_previously}); + false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION), + foreach_file(Dir, TmpDir, TransformFile, FileList), + foreach_file(Dir, fun file:delete/1, FileList), + foreach_file(TmpDir, Dir, fun file:copy/2, FileList), + foreach_file(TmpDir, fun file:delete/1, FileList), + ok = file:del_dir(TmpDir) + end. + +transform_msg_file(FileOld, FileNew, TransformFun) -> + rabbit_misc:ensure_parent_dirs_exist(FileNew), + {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), + {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], + [{write_buffer, + ?HANDLE_CACHE_BUFFER_SIZE}]), + {ok, _Acc, _IgnoreSize} = + rabbit_msg_file:scan( + RefOld, filelib:file_size(FileOld), + fun({Guid, _Size, _Offset, BinMsg}, ok) -> + {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)), + {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew), + ok + end, ok), + file_handle_cache:close(RefOld), + file_handle_cache:close(RefNew), + ok. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3908b64692..b172db56a5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -158,7 +158,7 @@ server_properties(Protocol) -> {copyright, ?COPYRIGHT_MESSAGE}, {information, ?INFORMATION_MESSAGE}]]], - %% Filter duplicated properties in favor of config file provided values + %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). @@ -564,7 +564,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, version_major = ProtocolMajor, version_minor = ProtocolMinor, server_properties = server_properties(Protocol), - mechanisms = auth_mechanisms_binary(), + mechanisms = auth_mechanisms_binary(Sock), locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), switch_callback(State#v1{connection = Connection#connection{ @@ -616,7 +616,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, State0 = #v1{connection_state = starting, connection = Connection, sock = Sock}) -> - AuthMechanism = auth_mechanism_to_module(Mechanism), + AuthMechanism = auth_mechanism_to_module(Mechanism, Sock), Capabilities = case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of {table, Capabilities1} -> Capabilities1; @@ -709,14 +709,14 @@ handle_method0(_Method, #v1{connection_state = S}) -> send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). -auth_mechanism_to_module(TypeBin) -> +auth_mechanism_to_module(TypeBin, Sock) -> case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "unknown authentication mechanism '~s'", [TypeBin]); T -> - case {lists:member(T, auth_mechanisms()), + case {lists:member(T, auth_mechanisms(Sock)), rabbit_registry:lookup_module(auth_mechanism, T)} of {true, {ok, Module}} -> Module; @@ -727,15 +727,15 @@ auth_mechanism_to_module(TypeBin) -> end end. -auth_mechanisms() -> +auth_mechanisms(Sock) -> {ok, Configured} = application:get_env(auth_mechanisms), - [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism), - lists:member(Name, Configured)]. + [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism), + Module:should_offer(Sock), lists:member(Name, Configured)]. -auth_mechanisms_binary() -> +auth_mechanisms_binary(Sock) -> list_to_binary( string:join( - [atom_to_list(A) || A <- auth_mechanisms()], " ")). + [atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). auth_phase(Response, State = #v1{auth_mechanism = AuthMechanism, diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 309e0e6ed9..422e53e82a 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -37,7 +37,8 @@ fun ((rabbit_types:binding()) -> boolean())) -> match_result()). -spec(match_routing_key/2 :: (rabbit_types:binding_source(), - routing_key() | '_') -> match_result()). + [routing_key()] | ['_']) -> + match_result()). -endif. @@ -82,12 +83,22 @@ match_bindings(SrcName, Match) -> Match(Binding)]), mnesia:async_dirty(fun qlc:e/1, [Query]). -match_routing_key(SrcName, RoutingKey) -> +match_routing_key(SrcName, [RoutingKey]) -> MatchHead = #route{binding = #binding{source = SrcName, destination = '$1', key = RoutingKey, _ = '_'}}, - mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]); +match_routing_key(SrcName, [_|_] = RoutingKeys) -> + Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} || + RKey <- RoutingKeys]]), + MatchHead = #route{binding = #binding{source = SrcName, + destination = '$1', + key = '$2', + _ = '_'}}, + mnesia:dirty_select(rabbit_route, [{MatchHead, [Condition], ['$1']}]). + + %%-------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 992e5efd34..2c6086d030 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -586,32 +586,131 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). -test_topic_match(P, R) -> - test_topic_match(P, R, true). - -test_topic_match(P, R, Expected) -> - case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), - list_to_binary(R)) of - Expected -> - passed; - _ -> - {topic_match_failure, P, R} - end. - test_topic_matching() -> - passed = test_topic_match("#", "test.test"), - passed = test_topic_match("#", ""), - passed = test_topic_match("#.T.R", "T.T.R"), - passed = test_topic_match("#.T.R", "T.R.T.R"), - passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"), - passed = test_topic_match("#.test", "test"), - passed = test_topic_match("#.test", "test.test"), - passed = test_topic_match("#.test", "ignored.test"), - passed = test_topic_match("#.test", "more.ignored.test"), - passed = test_topic_match("#.test", "notmatched", false), - passed = test_topic_match("#.z", "one.two.three.four", false), + XName = #resource{virtual_host = <<"/">>, + kind = exchange, + name = <<"test_exchange">>}, + X = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + %% create + rabbit_exchange_type_topic:validate(X), + exchange_op_callback(X, create, []), + + %% add some bindings + Bindings = lists:map( + fun ({Key, Q}) -> + #binding{source = XName, + key = list_to_binary(Key), + destination = #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)}} + end, [{"a.b.c", "t1"}, + {"a.*.c", "t2"}, + {"a.#.b", "t3"}, + {"a.b.b.c", "t4"}, + {"#", "t5"}, + {"#.#", "t6"}, + {"#.b", "t7"}, + {"*.*", "t8"}, + {"a.*", "t9"}, + {"*.b.c", "t10"}, + {"a.#", "t11"}, + {"a.#.#", "t12"}, + {"b.b.c", "t13"}, + {"a.b.b", "t14"}, + {"a.b", "t15"}, + {"b.c", "t16"}, + {"", "t17"}, + {"*.*.*", "t18"}, + {"vodka.martini", "t19"}, + {"a.b.c", "t20"}, + {"*.#", "t21"}, + {"#.*.#", "t22"}, + {"*.#.#", "t23"}, + {"#.#.#", "t24"}, + {"*", "t25"}, + {"#.b.#", "t26"}]), + lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, + Bindings), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", + "t18", "t20", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", + "t12", "t15", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", + "t18", "t21", "t22", "t23", "t24", "t26"]}, + {"", ["t5", "t6", "t17", "t24"]}, + {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23", + "t24"]}, + {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", + "t24"]}, + {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23", + "t24"]}, + {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22", + "t23", "t24", "t26"]}, + {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, + {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25"]}]), + + %% remove some bindings + RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), + lists:nth(11, Bindings), lists:nth(19, Bindings), + lists:nth(21, Bindings)], + exchange_op_callback(X, remove_bindings, [RemovedBindings]), + RemainingBindings = ordsets:to_list( + ordsets:subtract(ordsets:from_list(Bindings), + ordsets:from_list(RemovedBindings))), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", + "t23", "t24", "t26"]}, + {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", + "t22", "t23", "t24", "t26"]}, + {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", + "t23", "t24", "t26"]}, + {"", ["t6", "t17", "t24"]}, + {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, + {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, + {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, + {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, + {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", + "t24", "t26"]}, + {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + + %% remove the entire exchange + exchange_op_callback(X, delete, [RemainingBindings]), + %% none should match now + test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. +exchange_op_callback(X, Fun, ExtraArgs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), + rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + +test_topic_expect_match(X, List) -> + lists:foreach( + fun ({Key, Expected}) -> + BinKey = list_to_binary(Key), + Res = rabbit_exchange_type_topic:route( + X, #delivery{message = #basic_message{routing_keys = + [BinKey]}}), + ExpectedRes = lists:map( + fun (Q) -> #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)} + end, Expected), + true = (lists:usort(ExpectedRes) =:= lists:usort(Res)) + end, List). + test_app_management() -> %% starting, stopping, status ok = control_action(stop_app, []), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index bde336c000..0bc5af1569 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -64,7 +64,7 @@ -type(content() :: undecoded_content() | decoded_content()). -type(basic_message() :: #basic_message{exchange_name :: rabbit_exchange:name(), - routing_key :: rabbit_router:routing_key(), + routing_keys :: [rabbit_router:routing_key()], content :: content(), guid :: rabbit_guid:guid(), is_persistent :: boolean()}). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b0a715233a..89acc10c91 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -98,7 +98,6 @@ vertices(Module, Steps) -> edges(_Module, Steps) -> [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. - unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 68b88b3e45..b9dbe418d9 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -25,6 +25,7 @@ -rabbit_upgrade({add_ip_to_listener, []}). -rabbit_upgrade({internal_exchanges, []}). -rabbit_upgrade({user_to_internal_user, [hash_passwords]}). +-rabbit_upgrade({topic_trie, []}). %% ------------------------------------------------------------------- @@ -35,6 +36,7 @@ -spec(add_ip_to_listener/0 :: () -> 'ok'). -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). +-spec(topic_trie/0 :: () -> 'ok'). -endif. @@ -47,7 +49,7 @@ %% point. remove_user_scope() -> - mnesia( + transform( rabbit_user_permission, fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) -> {user_permission, UV, {permission, Conf, Write, Read}} @@ -55,7 +57,7 @@ remove_user_scope() -> [user_vhost, permission]). hash_passwords() -> - mnesia( + transform( rabbit_user, fun ({user, Username, Password, IsAdmin}) -> Hash = rabbit_auth_backend_internal:hash_password(Password), @@ -64,7 +66,7 @@ hash_passwords() -> [username, password_hash, is_admin]). add_ip_to_listener() -> - mnesia( + transform( rabbit_listener, fun ({listener, Node, Protocol, Host, Port}) -> {listener, Node, Protocol, Host, {0,0,0,0}, Port} @@ -77,27 +79,41 @@ internal_exchanges() -> fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> {exchange, Name, Type, Durable, AutoDelete, false, Args} end, - [ ok = mnesia(T, - AddInternalFun, - [name, type, durable, auto_delete, internal, arguments]) + [ ok = transform(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) || T <- Tables ], ok. user_to_internal_user() -> - mnesia( + transform( rabbit_user, fun({user, Username, PasswordHash, IsAdmin}) -> {internal_user, Username, PasswordHash, IsAdmin} end, [username, password_hash, is_admin], internal_user). +topic_trie() -> + create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, + {attributes, [trie_edge, node_id]}, + {type, ordered_set}]), + create(rabbit_topic_trie_binding, [{record_name, topic_trie_binding}, + {attributes, [trie_binding, value]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- -mnesia(TableName, Fun, FieldList) -> +transform(TableName, Fun, FieldList) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. -mnesia(TableName, Fun, FieldList, NewRecordName) -> +transform(TableName, Fun, FieldList, NewRecordName) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, NewRecordName), ok. + +create(Tab, TabDef) -> + {atomic, ok} = mnesia:create_table(Tab, TabDef), + ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 74487ade05..119fc94948 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3]). + status/1, invoke/3, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -294,6 +294,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({multiple_routing_keys, []}). + -ifdef(use_specs). -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). @@ -351,6 +353,8 @@ -include("rabbit_backing_queue_spec.hrl"). +-spec(multiple_routing_keys/0 :: () -> 'ok'). + -endif. -define(BLANK_DELTA, #delta { start_seq_id = undefined, @@ -1461,8 +1465,8 @@ msgs_written_to_disk(QPid, GuidSet, written) -> msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), State #vqstate { msgs_on_disk = - gb_sets:intersection( - gb_sets:union(MOD, GuidSet), UC) }) + gb_sets:union( + MOD, gb_sets:intersection(UC, GuidSet)) }) end). msg_indices_written_to_disk(QPid, GuidSet) -> @@ -1474,8 +1478,8 @@ msg_indices_written_to_disk(QPid, GuidSet) -> msgs_confirmed(gb_sets:intersection(GuidSet, MOD), State #vqstate { msg_indices_on_disk = - gb_sets:intersection( - gb_sets:union(MIOD, GuidSet), UC) }) + gb_sets:union( + MIOD, gb_sets:intersection(UC, GuidSet)) }) end). %%---------------------------------------------------------------------------- @@ -1816,3 +1820,27 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> push_betas_to_deltas( Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. + +%%---------------------------------------------------------------------------- +%% Upgrading +%%---------------------------------------------------------------------------- + +multiple_routing_keys() -> + transform_storage( + fun ({basic_message, ExchangeName, Routing_Key, Content, + Guid, Persistent}) -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + Guid, Persistent}}; + (_) -> {error, corrupt_message} + end), + ok. + + +%% Assumes message store is not running +transform_storage(TransformFun) -> + transform_store(?PERSISTENT_MSG_STORE, TransformFun), + transform_store(?TRANSIENT_MSG_STORE, TransformFun). + +transform_store(Store, TransformFun) -> + rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), + rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). |
