diff options
| -rw-r--r-- | include/rabbit.hrl | 1 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
| -rw-r--r-- | packaging/macports/net/rabbitmq-server/Portfile | 18 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 121 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 25 |
15 files changed, 266 insertions, 138 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0096ada1b8..50ddafbaef 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -146,6 +146,7 @@ host :: string() | atom(), port :: non_neg_integer()}). -type(not_found() :: {'error', 'not_found'}). +-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -endif. diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index bed1611ab8..3c3be609ce 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -117,6 +117,9 @@ fi rm -rf %{buildroot} %changelog +* Tue May 19 2009 Matthias Radestock <matthias@lshift.net> 1.5.5-1 +- Maintenance release for the 1.5.x series + * Mon Apr 6 2009 Matthias Radestock <matthias@lshift.net> 1.5.4-1 - Maintenance release for the 1.5.x series diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index d1ccd3a0c2..7c5673f77b 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (1.5.5-1) hardy; urgency=low + + * New Upstream Release + + -- Matthias Radestock <matthias@lshift.net> Tue, 19 May 2009 09:57:54 +0100 + rabbitmq-server (1.5.4-1) hardy; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index b2b3ab0236..216360725d 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -7,7 +7,7 @@ Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -Depends: erlang-nox, adduser, logrotate, ${misc:Depends} +Depends: erlang-nox, erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index d9d16dbbb2..659132568f 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -1,15 +1,15 @@ # -*- coding: utf-8; mode: tcl; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- vim:fenc=utf-8:filetype=tcl:et:sw=4:ts=4:sts=4 # $Id$ -PortSystem 1.0 - -name rabbitmq-server -version 1.5.3 -categories net -maintainers tonyg@rabbitmq.com -platforms darwin -description The RabbitMQ AMQP Server -long_description \ +PortSystem 1.0 +name rabbitmq-server +version 1.5.5 +revision 0 +categories net +maintainers tonyg@rabbitmq.com +platforms darwin +description The RabbitMQ AMQP Server +long_description \ RabbitMQ is an implementation of AMQP, the emerging standard for \ high performance enterprise messaging. The RabbitMQ server is a \ robust and scalable implementation of an AMQP broker. diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 54348d9a1c..99b912ec09 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -45,11 +45,13 @@ -ifdef(use_specs). +-type(permission_atom() :: 'configure' | 'read' | 'write'). + -spec(check_login/2 :: (binary(), binary()) -> user()). -spec(user_pass_login/2 :: (username(), password()) -> user()). -spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). -spec(check_resource_access/3 :: - (username(), r(atom()), non_neg_integer()) -> 'ok'). + (username(), r(atom()), permission_atom()) -> 'ok'). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -137,6 +139,10 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. +permission_index(configure) -> #permission.configure; +permission_index(write) -> #permission.write; +permission_index(read) -> #permission.read. + check_resource_access(Username, R = #resource{kind = exchange, name = <<"">>}, Permission) -> @@ -158,7 +164,7 @@ check_resource_access(Username, [#user_permission{permission = P}] -> case regexp:match( binary_to_list(Name), - binary_to_list(element(Permission, P))) of + binary_to_list(element(permission_index(Permission), P))) of {match, _, _} -> true; nomatch -> false end diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c56e51888b..542ea242dc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,8 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). +-export([start/0, recover/0, declare/4, delete/3, purge/1]). +-export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). @@ -102,6 +103,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -159,11 +161,17 @@ declare(QueueName, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args, pid = none}), + internal_declare(Q, true). + +internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> ok = store_queue(Q), - ok = add_default_binding(Q), + case WantDefaultBinding of + true -> add_default_binding(Q); + false -> ok + end, Q; [ExistingQ] -> ExistingQ end diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 73fae89277..5f96b84be1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -417,7 +417,7 @@ all_tx() -> record_pending_message(Txn, Message = #basic_message { is_persistent = IsPersistent }) -> Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Message, false} | Pending], + store_tx(Txn, Tx #tx { pending_messages = [Message | Pending], is_persistent = IsPersistentTxn orelse IsPersistent }). @@ -432,18 +432,21 @@ commit_transaction(Txn, State) -> } = lookup_tx(Txn), PendingMessagesOrdered = lists:reverse(PendingMessages), PendingAcksOrdered = lists:append(lists:reverse(PendingAcks)), - case lookup_ch(ChPid) of - not_found -> State; - C = #cr { unacked_messages = UAM } -> - {MsgWithAcks, Remaining} = - collect_messages(PendingAcksOrdered, UAM), - store_ch_record(C#cr{unacked_messages = Remaining}), - {ok, MS} = rabbit_mixed_queue:tx_commit( - PendingMessagesOrdered, - lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), - State #q.mixed_state), - State #q { mixed_state = MS } - end. + {ok, MS} = + case lookup_ch(ChPid) of + not_found -> + rabbit_mixed_queue:tx_commit( + PendingMessagesOrdered, [], State #q.mixed_state); + C = #cr { unacked_messages = UAM } -> + {MsgWithAcks, Remaining} = + collect_messages(PendingAcksOrdered, UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), + rabbit_mixed_queue:tx_commit( + PendingMessagesOrdered, + lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), + State #q.mixed_state) + end, + State #q { mixed_state = MS }. rollback_transaction(Txn, State) -> #tx { pending_messages = PendingMessages diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl new file mode 100644 index 0000000000..1d44543aac --- /dev/null +++ b/src/rabbit_basic.erl @@ -0,0 +1,72 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_basic). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([publish/4, message/4]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) -> + {ok, routing_result(), [pid()]} | not_found()). +-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> + message()). + +-endif. + +%%---------------------------------------------------------------------------- + +publish(Mandatory, Immediate, Txn, + Message = #basic_message{exchange_name = ExchangeName}) -> + case rabbit_exchange:lookup(ExchangeName) of + {ok, X} -> + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message), + {ok, RoutingRes, DeliveredQPids}; + Other -> + Other + end. + +message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + Content = #content{class_id = ClassId, + properties = #'P_basic'{content_type = ContentTypeBin}, + properties_bin = none, + payload_fragments_rev = [BodyBin]}, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + guid = rabbit_guid:guid(), + is_persistent = false}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d14a01bee9..5142f9b7a3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -231,13 +231,13 @@ clear_permission_cache() -> ok. check_configure_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.configure). + check_resource_access(Username, Resource, configure). check_write_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.write). + check_resource_access(Username, Resource, write). check_read_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.read). + check_resource_access(Username, Resource, read). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( @@ -306,20 +306,39 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath}) -> + Content, State = #ch{ virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - {noreply, publish(Mandatory, Immediate, - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = is_message_persistent(DecodedContent)}, - rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent)}, + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey, + Message), + case RoutingRes of + routed -> + ok; + unroutable -> + %% FIXME: 312 should be replaced by the ?NO_ROUTE + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); + not_delivered -> + %% FIXME: 313 should be replaced by the ?NO_CONSUMERS + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) + end, + {noreply, case TxnKey of + none -> State; + _ -> add_tx_participants(DeliveredQPids, State) + end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -548,6 +567,13 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, {ok, FoundX} -> FoundX; {error, not_found} -> check_name('exchange', ExchangeNameBin), + case rabbit_misc:r_arg(VHostPath, exchange, Args, + <<"alternate-exchange">>) of + undefined -> ok; + AName -> check_read_permitted(ExchangeName, State), + check_write_permitted(AName, State), + ok + end, rabbit_exchange:declare(ExchangeName, CheckedType, Durable, @@ -761,30 +787,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ok -> return_ok(State, NoWait, ReturnMethod) end. -publish(Mandatory, Immediate, Message, QPids, - State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> - Handled = deliver(QPids, Mandatory, Immediate, TxnKey, - Message, WriterPid), - case TxnKey of - none -> State; - _ -> add_tx_participants(Handled, State) - end. - -deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> - case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of - {ok, DeliveredQPids} -> DeliveredQPids; - {error, unroutable} -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>), - []; - {error, not_delivered} -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>), - [] - end. - basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index dc5824f1c9..d73edb73b8 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -74,7 +74,9 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> - {ok, _QueueNames} = rabbit_exchange:simple_publish( - false, false, LogExch, RoutingKey, <<"text/plain">>, - list_to_binary(io_lib:format(Format, Data))), + {ok, _RoutingRes, _DeliveredQPids} = + rabbit_basic:publish(false, false, none, + rabbit_basic:message( + LogExch, RoutingKey, <<"text/plain">>, + list_to_binary(io_lib:format(Format, Data)))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index e0f76d895c..ca0e337b84 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,8 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - simple_publish/6, simple_publish/3, - route/3]). + publish/5]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). @@ -57,8 +56,6 @@ -ifdef(use_specs). --type(publish_res() :: {'ok', [pid()]} | - not_found() | {'error', 'unroutable' | 'not_delivered'}). -type(bind_res() :: 'ok' | {'error', 'queue_not_found' | 'exchange_not_found' | @@ -75,11 +72,8 @@ -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). --spec(simple_publish/6 :: - (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> - publish_res()). --spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). +-spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) -> + {routing_result(), [pid()]}). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -194,38 +188,44 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -%% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, - ContentTypeBin, BodyBin) -> - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), - Content = #content{class_id = ClassId, - properties = #'P_basic'{content_type = ContentTypeBin}, - properties_bin = none, - payload_fragments_rev = [BodyBin]}, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - is_persistent = false, - guid = rabbit_guid:guid() - }, - simple_publish(Mandatory, Immediate, Message). - -%% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}) -> - case lookup(ExchangeName) of - {ok, Exchange} -> - QPids = route(Exchange, RoutingKey, Content), - rabbit_router:deliver(QPids, Mandatory, Immediate, - none, Message); - {error, Error} -> {error, Error} +publish(X, Mandatory, Immediate, Txn, Message) -> + publish(X, [], Mandatory, Immediate, Txn, Message). + +publish(X, Seen, Mandatory, Immediate, Txn, + Message = #basic_message{routing_key = RK, content = C}) -> + case rabbit_router:deliver(route(X, RK, C), + Mandatory, Immediate, Txn, Message) of + {_, []} = R -> + #exchange{name = XName, arguments = Args} = X, + case rabbit_misc:r_arg(XName, exchange, Args, + <<"alternate-exchange">>) of + undefined -> + R; + AName -> + NewSeen = [XName | Seen], + case lists:member(AName, NewSeen) of + true -> + R; + false -> + case lookup(AName) of + {ok, AX} -> + publish(AX, NewSeen, + Mandatory, Immediate, Txn, + Message); + {error, not_found} -> + rabbit_log:warning( + "alternate exchange for ~s " + "does not exist: ~s", + [rabbit_misc:rs(XName), + rabbit_misc:rs(AName)]), + R + end + end + end; + R -> + R end. -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% @@ -252,6 +252,9 @@ route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> route(X = #exchange{type = direct}, RoutingKey, _Content) -> match_routing_key(X, RoutingKey). +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange match_bindings(#exchange{name = Name}, Match) -> @@ -383,32 +386,40 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - call_with_exchange_and_queue( - ExchangeName, QueueName, - fun (X, Q) -> + binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; - true -> ok = sync_binding( - ExchangeName, QueueName, RoutingKey, Arguments, - Q#amqqueue.durable, fun mnesia:write/3) + true -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:write/3) end end). delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + _ -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3), + maybe_auto_delete(X) + end + end). + +binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - ok = sync_binding( - ExchangeName, QueueName, RoutingKey, Arguments, - Q#amqqueue.durable, fun mnesia:delete_object/3), - maybe_auto_delete(X) + Fun(X, Q, #binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = sort_arguments(Arguments)}) end). -sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> - Binding = #binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = sort_arguments(Arguments)}, +sync_binding(Binding, Durable, Fun) -> ok = case Durable of true -> Fun(rabbit_durable_route, #route{binding = Binding}, write); @@ -474,7 +485,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort (sort_arguments) that -%% route/3 and sync_binding/6 do. +%% route/3 and {add,delete}_binding/4 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index acadf2a0cc..c965c69314 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -39,7 +39,7 @@ -export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). --export([r/3, r/2, rs/1]). +-export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). @@ -79,12 +79,14 @@ -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K) - when is_subtype(K, atom())). +-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> + r(K) when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} when is_subtype(K, atom())). +-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> + undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). @@ -175,6 +177,14 @@ r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) -> r(VHostPath, Kind) when is_binary(VHostPath) -> #resource{virtual_host = VHostPath, kind = Kind, name = '_'}. +r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) -> + r_arg(VHostPath, Kind, Table, Key); +r_arg(VHostPath, Kind, Table, Key) -> + case lists:keysearch(Key, 1, Table) of + {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin); + false -> undefined + end. + rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 8455bf1c9d..1b0386e696 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -51,7 +51,7 @@ start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed -> QList = rabbit_disk_queue:dump_queue(Queue), {MsgBuf, NextSeq} = lists:foldl( - fun ({MsgId, Msg, Size, Delivered, SeqId}, {Buf, NSeq}) + fun ({_MsgId, Msg, _Size, Delivered, SeqId}, {Buf, NSeq}) when SeqId >= NSeq -> {queue:in({SeqId, Msg, Delivered}, Buf), SeqId + 1} end, {queue:new(), 0}, QList), @@ -178,8 +178,11 @@ only_persistent_msg_ids(Pubs) -> tx_cancel(Publishes, State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)), {ok, State}; -tx_cancel(Publishes, State = #mqstate { mode = mixed }) -> - ok = rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes)), +tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable }) -> + MsgIds = if IsDurable -> only_persistent_msg_ids(Publishes); + true -> [] + end, + ok = rabbit_disk_queue:tx_cancel(MsgIds), {ok, State}. only_ack_tags(MsgWithAcks) -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 0b06a063a7..57166428bf 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -51,7 +51,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> - {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}). + {routing_result(), [pid()]}). -endif. @@ -98,14 +98,15 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - {ok, lists:flatmap( - fun ({Node, QPids}) -> - gen_server2:cast( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}), - QPids - end, - NodeQPids)}; + {routed, + lists:flatmap( + fun ({Node, QPids}) -> + gen_server2:cast( + {?SERVER, Node}, + {deliver, QPids, Mandatory, Immediate, Txn, Message}), + QPids + end, + NodeQPids)}; deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( @@ -179,6 +180,6 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> QPids). %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) -check_delivery(true, _ , {false, []}) -> {error, unroutable}; -check_delivery(_ , true, {_ , []}) -> {error, not_delivered}; -check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}. +check_delivery(true, _ , {false, []}) -> {unroutable, []}; +check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; +check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. |
