diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-02-07 11:12:43 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-02-07 11:12:43 +0000 |
| commit | 2578d9303980cd66e973f7981f1d8ae076ac316c (patch) | |
| tree | cbd61bc748bb6797941d66ba22380c22341edd78 /src | |
| parent | 77ba4b3dd4a52307c73002aa201c0e444f14416d (diff) | |
| parent | ebcd774c1bc96a908262a67635863dd1af8c2849 (diff) | |
| download | rabbitmq-server-git-2578d9303980cd66e973f7981f1d8ae076ac316c.tar.gz | |
Merged default into bug23483
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_basic.erl | 69 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 11 |
4 files changed, 61 insertions, 42 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index c5bd9575e3..a144124f45 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,10 +18,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/5]). +-export([publish/1, message/3, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). --export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -41,8 +40,10 @@ rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> - (rabbit_types:message() | rabbit_types:error(any()))). + properties_input(), binary()) -> rabbit_types:message()). +-spec(message/3 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> rabbit_types:message()). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: @@ -56,9 +57,6 @@ rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). --spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) -> - (boolean() | - {'invalid', non_neg_integer()})). -endif. @@ -98,19 +96,33 @@ from_content(Content) -> rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. +%% This breaks the spec rule forbidding message modification +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} = DecodedContent, + Key) when Headers =/= undefined -> + case lists:keyfind(Key, 1, Headers) of + false -> DecodedContent; + Tuple -> Headers0 = lists:delete(Tuple, Headers), + DecodedContent#content{ + properties_bin = none, + properties = Props#'P_basic'{headers = Headers0}} + end; +strip_header(DecodedContent, _Key) -> + DecodedContent. + +message(ExchangeName, RoutingKey, + #content{properties = Props} = DecodedContent) -> + #basic_message{ + exchange_name = ExchangeName, + routing_key = RoutingKey, + content = strip_header(DecodedContent, ?DELETED_HEADER), + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent), + route_list = [RoutingKey | header_routes(Props#'P_basic'.headers)]}. + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), Content = build_content(Properties, BodyBin), - case is_message_persistent(Content) of - {invalid, Other} -> - {error, {invalid_delivery_mode, Other}}; - IsPersistent when is_boolean(IsPersistent) -> - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent} - end. + message(ExchangeName, RoutingKeyBin, Content). properties(P = #'P_basic'{}) -> P; @@ -152,5 +164,26 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> {invalid, Other} + Other -> rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false end. + +% Extract CC routes from headers +header_routes(undefined) -> + []; +header_routes(HeadersTable) -> + lists:flatten([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {longstr, Route} -> Route; + {array, Routes} -> rkeys(Routes, []); + _ -> [] + end || HeaderKey <- ?ROUTING_HEADERS]). + +rkeys([{longstr, Route} | Rest], RKeys) -> + rkeys(Rest, [Route | RKeys]); +rkeys([_ | Rest], RKeys) -> + rkeys(Rest, RKeys); +rkeys(_, RKeys) -> + RKeys. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a82e5eff3e..be232bd235 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -549,18 +549,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), - IsPersistent = is_message_persistent(DecodedContent), {MsgSeqNo, State1} = case ConfirmEnabled of false -> {undefined, State}; true -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, + Message = rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -1228,17 +1223,6 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(Content) -> - case rabbit_basic:is_message_persistent(Content) of - {invalid, Other} -> - rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false; - IsPersistent when is_boolean(IsPersistent) -> - IsPersistent - end. - process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State#ch.writer_pid, no_route), record_confirm(MsgSeqNo, XName, State); diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index c51b0913a0..0baac1f87f 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -36,8 +36,9 @@ description() -> {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_routing_key(Name, RoutingKey). + #delivery{message = #basic_message{route_list = Routes}}) -> + lists:flatten([rabbit_router:match_routing_key(Name, RKey) || + RKey <- Routes]). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 9cbf8100e2..beee497429 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -44,11 +44,12 @@ description() -> {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_bindings(Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end). + #delivery{message = #basic_message{route_list = Routes}}) -> + lists:flatten([rabbit_router:match_bindings( + Name, + fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RKey) + end) || RKey <- Routes]). split_topic_key(Key) -> string:tokens(binary_to_list(Key), "."). |
