diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_basic.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 54 |
3 files changed, 88 insertions, 45 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl new file mode 100644 index 0000000000..b2e858208e --- /dev/null +++ b/src/rabbit_basic.erl @@ -0,0 +1,71 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_basic). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([publish/4, message/4]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) -> + {ok, routing_result(), [pid()]} | not_found()). +-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> + message()). + +-endif. + +%%---------------------------------------------------------------------------- + +publish(Mandatory, Immediate, Txn, + Message = #basic_message{exchange_name = ExchangeName}) -> + case rabbit_exchange:lookup(ExchangeName) of + {ok, X} -> + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message), + {ok, RoutingRes, DeliveredQPids}; + Other -> + Other + end. + +message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + Content = #content{class_id = ClassId, + properties = #'P_basic'{content_type = ContentTypeBin}, + properties_bin = none, + payload_fragments_rev = [BodyBin]}, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + persistent_key = none}. diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index bc14fdb837..d73edb73b8 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -74,7 +74,9 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> - {ok, _RoutingRes} = rabbit_exchange:simple_publish( - false, false, LogExch, RoutingKey, <<"text/plain">>, - list_to_binary(io_lib:format(Format, Data))), + {ok, _RoutingRes, _DeliveredQPids} = + rabbit_basic:publish(false, false, none, + rabbit_basic:message( + LogExch, RoutingKey, <<"text/plain">>, + list_to_binary(io_lib:format(Format, Data)))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index c760329ebb..eb45e4f3b4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - publish/5, simple_publish/6, simple_publish/3, route/3]). + publish/5]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). @@ -74,12 +74,6 @@ -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) -> {routing_result(), [pid()]}). --spec(simple_publish/6 :: - (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> - {ok, routing_result()} | not_found()). --spec(simple_publish/3 :: (bool(), bool(), message()) -> - {ok, routing_result()} | not_found()). --spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -196,60 +190,36 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + publish(X, Mandatory, Immediate, Txn, Message = #basic_message{routing_key = RK, content = C}) -> case rabbit_router:deliver(route(X, RK, C), Mandatory, Immediate, Txn, Message) of - {RoutingRes, []} -> - {routed, DeliveredQPids} = handle_unrouted(X, Txn, Message), - {RoutingRes, DeliveredQPids}; - Other -> - Other + {RoutingRes, []} -> DeliveredQPids = handle_unrouted(X, Txn, Message), + {RoutingRes, DeliveredQPids}; + Other -> Other end. handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) -> case rabbit_misc:r_arg(XName, exchange, Args, <<"ume">>) of undefined -> - {routed, []}; + []; UmeName -> case lookup(UmeName) of {ok, Ume} -> - publish(Ume, false, false, Txn, Message); + {routed, DeliveredQPids} = + publish(Ume, false, false, Txn, Message), + DeliveredQPids; {error, not_found} -> rabbit_log:warning( "unroutable message exchange for ~s does not exist: ~s", [rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]), - {routed, []} + [] end end. -%% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, - ContentTypeBin, BodyBin) -> - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), - Content = #content{class_id = ClassId, - properties = #'P_basic'{content_type = ContentTypeBin}, - properties_bin = none, - payload_fragments_rev = [BodyBin]}, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - persistent_key = none}, - simple_publish(Mandatory, Immediate, Message). - -%% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, - Message = #basic_message{exchange_name = ExchangeName}) -> - case lookup(ExchangeName) of - {ok, X} -> {RoutingRes, _} = publish(X, Mandatory, Immediate, none, - Message), - {ok, RoutingRes}; - Other -> Other - end. - -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% |
