summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-02-07 11:12:43 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-02-07 11:12:43 +0000
commit2578d9303980cd66e973f7981f1d8ae076ac316c (patch)
treecbd61bc748bb6797941d66ba22380c22341edd78 /src
parent77ba4b3dd4a52307c73002aa201c0e444f14416d (diff)
parentebcd774c1bc96a908262a67635863dd1af8c2849 (diff)
downloadrabbitmq-server-git-2578d9303980cd66e973f7981f1d8ae076ac316c.tar.gz
Merged default into bug23483
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_basic.erl69
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_exchange_type_direct.erl5
-rw-r--r--src/rabbit_exchange_type_topic.erl11
4 files changed, 61 insertions, 42 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index c5bd9575e3..a144124f45 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,10 +18,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/5]).
+-export([publish/1, message/3, message/4, properties/1, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
--export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -41,8 +40,10 @@
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) ->
- (rabbit_types:message() | rabbit_types:error(any()))).
+ properties_input(), binary()) -> rabbit_types:message()).
+-spec(message/3 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ rabbit_types:decoded_content()) -> rabbit_types:message()).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -56,9 +57,6 @@
rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
--spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) ->
- (boolean() |
- {'invalid', non_neg_integer()})).
-endif.
@@ -98,19 +96,33 @@ from_content(Content) ->
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
+%% This breaks the spec rule forbidding message modification
+strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} = DecodedContent,
+ Key) when Headers =/= undefined ->
+ case lists:keyfind(Key, 1, Headers) of
+ false -> DecodedContent;
+ Tuple -> Headers0 = lists:delete(Tuple, Headers),
+ DecodedContent#content{
+ properties_bin = none,
+ properties = Props#'P_basic'{headers = Headers0}}
+ end;
+strip_header(DecodedContent, _Key) ->
+ DecodedContent.
+
+message(ExchangeName, RoutingKey,
+ #content{properties = Props} = DecodedContent) ->
+ #basic_message{
+ exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent),
+ route_list = [RoutingKey | header_routes(Props#'P_basic'.headers)]}.
+
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
Content = build_content(Properties, BodyBin),
- case is_message_persistent(Content) of
- {invalid, Other} ->
- {error, {invalid_delivery_mode, Other}};
- IsPersistent when is_boolean(IsPersistent) ->
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = Content,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent}
- end.
+ message(ExchangeName, RoutingKeyBin, Content).
properties(P = #'P_basic'{}) ->
P;
@@ -152,5 +164,26 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> {invalid, Other}
+ Other -> rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
+ [Other]),
+ false
end.
+
+% Extract CC routes from headers
+header_routes(undefined) ->
+ [];
+header_routes(HeadersTable) ->
+ lists:flatten([case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {longstr, Route} -> Route;
+ {array, Routes} -> rkeys(Routes, []);
+ _ -> []
+ end || HeaderKey <- ?ROUTING_HEADERS]).
+
+rkeys([{longstr, Route} | Rest], RKeys) ->
+ rkeys(Rest, [Route | RKeys]);
+rkeys([_ | Rest], RKeys) ->
+ rkeys(Rest, RKeys);
+rkeys(_, RKeys) ->
+ RKeys.
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a82e5eff3e..be232bd235 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -549,18 +549,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
- IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1} =
case ConfirmEnabled of
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
+ Message = rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -1228,17 +1223,6 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(Content) ->
- case rabbit_basic:is_message_persistent(Content) of
- {invalid, Other} ->
- rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false;
- IsPersistent when is_boolean(IsPersistent) ->
- IsPersistent
- end.
-
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State#ch.writer_pid, no_route),
record_confirm(MsgSeqNo, XName, State);
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index c51b0913a0..0baac1f87f 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -36,8 +36,9 @@ description() ->
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_routing_key(Name, RoutingKey).
+ #delivery{message = #basic_message{route_list = Routes}}) ->
+ lists:flatten([rabbit_router:match_routing_key(Name, RKey) ||
+ RKey <- Routes]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 9cbf8100e2..beee497429 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -44,11 +44,12 @@ description() ->
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_bindings(Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end).
+ #delivery{message = #basic_message{route_list = Routes}}) ->
+ lists:flatten([rabbit_router:match_bindings(
+ Name,
+ fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RKey)
+ end) || RKey <- Routes]).
split_topic_key(Key) ->
string:tokens(binary_to_list(Key), ".").