summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl1
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/macports/net/rabbitmq-server/Portfile18
-rw-r--r--src/rabbit_access_control.erl10
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_basic.erl72
-rw-r--r--src/rabbit_channel.erl72
-rw-r--r--src/rabbit_error_logger.erl8
-rw-r--r--src/rabbit_exchange.erl121
-rw-r--r--src/rabbit_misc.erl16
-rw-r--r--src/rabbit_mixed_queue.erl9
-rw-r--r--src/rabbit_router.erl25
15 files changed, 266 insertions, 138 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0096ada1b8..50ddafbaef 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -146,6 +146,7 @@
host :: string() | atom(),
port :: non_neg_integer()}).
-type(not_found() :: {'error', 'not_found'}).
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-endif.
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index bed1611ab8..3c3be609ce 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -117,6 +117,9 @@ fi
rm -rf %{buildroot}
%changelog
+* Tue May 19 2009 Matthias Radestock <matthias@lshift.net> 1.5.5-1
+- Maintenance release for the 1.5.x series
+
* Mon Apr 6 2009 Matthias Radestock <matthias@lshift.net> 1.5.4-1
- Maintenance release for the 1.5.x series
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index d1ccd3a0c2..7c5673f77b 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (1.5.5-1) hardy; urgency=low
+
+ * New Upstream Release
+
+ -- Matthias Radestock <matthias@lshift.net> Tue, 19 May 2009 09:57:54 +0100
+
rabbitmq-server (1.5.4-1) hardy; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index b2b3ab0236..216360725d 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -7,7 +7,7 @@ Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-nox, adduser, logrotate, ${misc:Depends}
+Depends: erlang-nox, erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile
index d9d16dbbb2..659132568f 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/net/rabbitmq-server/Portfile
@@ -1,15 +1,15 @@
# -*- coding: utf-8; mode: tcl; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- vim:fenc=utf-8:filetype=tcl:et:sw=4:ts=4:sts=4
# $Id$
-PortSystem 1.0
-
-name rabbitmq-server
-version 1.5.3
-categories net
-maintainers tonyg@rabbitmq.com
-platforms darwin
-description The RabbitMQ AMQP Server
-long_description \
+PortSystem 1.0
+name rabbitmq-server
+version 1.5.5
+revision 0
+categories net
+maintainers tonyg@rabbitmq.com
+platforms darwin
+description The RabbitMQ AMQP Server
+long_description \
RabbitMQ is an implementation of AMQP, the emerging standard for \
high performance enterprise messaging. The RabbitMQ server is a \
robust and scalable implementation of an AMQP broker.
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 54348d9a1c..99b912ec09 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -45,11 +45,13 @@
-ifdef(use_specs).
+-type(permission_atom() :: 'configure' | 'read' | 'write').
+
-spec(check_login/2 :: (binary(), binary()) -> user()).
-spec(user_pass_login/2 :: (username(), password()) -> user()).
-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
-spec(check_resource_access/3 ::
- (username(), r(atom()), non_neg_integer()) -> 'ok').
+ (username(), r(atom()), permission_atom()) -> 'ok').
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -137,6 +139,10 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
+permission_index(configure) -> #permission.configure;
+permission_index(write) -> #permission.write;
+permission_index(read) -> #permission.read.
+
check_resource_access(Username,
R = #resource{kind = exchange, name = <<"">>},
Permission) ->
@@ -158,7 +164,7 @@ check_resource_access(Username,
[#user_permission{permission = P}] ->
case regexp:match(
binary_to_list(Name),
- binary_to_list(element(Permission, P))) of
+ binary_to_list(element(permission_index(Permission), P))) of
{match, _, _} -> true;
nomatch -> false
end
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c56e51888b..542ea242dc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,8 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
+-export([start/0, recover/0, declare/4, delete/3, purge/1]).
+-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
@@ -102,6 +103,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -159,11 +161,17 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
+ internal_declare(Q, true).
+
+internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> ok = store_queue(Q),
- ok = add_default_binding(Q),
+ case WantDefaultBinding of
+ true -> add_default_binding(Q);
+ false -> ok
+ end,
Q;
[ExistingQ] -> ExistingQ
end
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 73fae89277..5f96b84be1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -417,7 +417,7 @@ all_tx() ->
record_pending_message(Txn, Message = #basic_message { is_persistent = IsPersistent }) ->
Tx = #tx{pending_messages = Pending, is_persistent = IsPersistentTxn } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [{Message, false} | Pending],
+ store_tx(Txn, Tx #tx { pending_messages = [Message | Pending],
is_persistent = IsPersistentTxn orelse IsPersistent
}).
@@ -432,18 +432,21 @@ commit_transaction(Txn, State) ->
} = lookup_tx(Txn),
PendingMessagesOrdered = lists:reverse(PendingMessages),
PendingAcksOrdered = lists:append(lists:reverse(PendingAcks)),
- case lookup_ch(ChPid) of
- not_found -> State;
- C = #cr { unacked_messages = UAM } ->
- {MsgWithAcks, Remaining} =
- collect_messages(PendingAcksOrdered, UAM),
- store_ch_record(C#cr{unacked_messages = Remaining}),
- {ok, MS} = rabbit_mixed_queue:tx_commit(
- PendingMessagesOrdered,
- lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
- State #q.mixed_state),
- State #q { mixed_state = MS }
- end.
+ {ok, MS} =
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_mixed_queue:tx_commit(
+ PendingMessagesOrdered, [], State #q.mixed_state);
+ C = #cr { unacked_messages = UAM } ->
+ {MsgWithAcks, Remaining} =
+ collect_messages(PendingAcksOrdered, UAM),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
+ rabbit_mixed_queue:tx_commit(
+ PendingMessagesOrdered,
+ lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks),
+ State #q.mixed_state)
+ end,
+ State #q { mixed_state = MS }.
rollback_transaction(Txn, State) ->
#tx { pending_messages = PendingMessages
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
new file mode 100644
index 0000000000..1d44543aac
--- /dev/null
+++ b/src/rabbit_basic.erl
@@ -0,0 +1,72 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_basic).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-export([publish/4, message/4]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(publish/4 :: (bool(), bool(), maybe(txn()), message()) ->
+ {ok, routing_result(), [pid()]} | not_found()).
+-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
+ message()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+publish(Mandatory, Immediate, Txn,
+ Message = #basic_message{exchange_name = ExchangeName}) ->
+ case rabbit_exchange:lookup(ExchangeName) of
+ {ok, X} ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(X, Mandatory, Immediate, Txn, Message),
+ {ok, RoutingRes, DeliveredQPids};
+ Other ->
+ Other
+ end.
+
+message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ Content = #content{class_id = ClassId,
+ properties = #'P_basic'{content_type = ContentTypeBin},
+ properties_bin = none,
+ payload_fragments_rev = [BodyBin]},
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ guid = rabbit_guid:guid(),
+ is_persistent = false}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d14a01bee9..5142f9b7a3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -231,13 +231,13 @@ clear_permission_cache() ->
ok.
check_configure_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, #permission.configure).
+ check_resource_access(Username, Resource, configure).
check_write_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, #permission.write).
+ check_resource_access(Username, Resource, write).
check_read_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, #permission.read).
+ check_resource_access(Username, Resource, read).
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
@@ -306,20 +306,39 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath}) ->
+ Content, State = #ch{ virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- {noreply, publish(Mandatory, Immediate,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = is_message_persistent(DecodedContent)},
- rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent)},
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
+ Message),
+ case RoutingRes of
+ routed ->
+ ok;
+ unroutable ->
+ %% FIXME: 312 should be replaced by the ?NO_ROUTE
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
+ not_delivered ->
+ %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
+ end,
+ {noreply, case TxnKey of
+ none -> State;
+ _ -> add_tx_participants(DeliveredQPids, State)
+ end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -548,6 +567,13 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
{ok, FoundX} -> FoundX;
{error, not_found} ->
check_name('exchange', ExchangeNameBin),
+ case rabbit_misc:r_arg(VHostPath, exchange, Args,
+ <<"alternate-exchange">>) of
+ undefined -> ok;
+ AName -> check_read_permitted(ExchangeName, State),
+ check_write_permitted(AName, State),
+ ok
+ end,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
@@ -761,30 +787,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ok -> return_ok(State, NoWait, ReturnMethod)
end.
-publish(Mandatory, Immediate, Message, QPids,
- State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
- Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
- Message, WriterPid),
- case TxnKey of
- none -> State;
- _ -> add_tx_participants(Handled, State)
- end.
-
-deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
- case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of
- {ok, DeliveredQPids} -> DeliveredQPids;
- {error, unroutable} ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>),
- [];
- {error, not_delivered} ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>),
- []
- end.
-
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index dc5824f1c9..d73edb73b8 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -74,7 +74,9 @@ publish(_Other, _Format, _Data, _State) ->
ok.
publish1(RoutingKey, Format, Data, LogExch) ->
- {ok, _QueueNames} = rabbit_exchange:simple_publish(
- false, false, LogExch, RoutingKey, <<"text/plain">>,
- list_to_binary(io_lib:format(Format, Data))),
+ {ok, _RoutingRes, _DeliveredQPids} =
+ rabbit_basic:publish(false, false, none,
+ rabbit_basic:message(
+ LogExch, RoutingKey, <<"text/plain">>,
+ list_to_binary(io_lib:format(Format, Data)))),
ok.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index e0f76d895c..ca0e337b84 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,8 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
- simple_publish/6, simple_publish/3,
- route/3]).
+ publish/5]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
@@ -57,8 +56,6 @@
-ifdef(use_specs).
--type(publish_res() :: {'ok', [pid()]} |
- not_found() | {'error', 'unroutable' | 'not_delivered'}).
-type(bind_res() :: 'ok' | {'error',
'queue_not_found' |
'exchange_not_found' |
@@ -75,11 +72,8 @@
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
--spec(simple_publish/6 ::
- (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
- publish_res()).
--spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
--spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
+-spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) ->
+ {routing_result(), [pid()]}).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -194,38 +188,44 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-%% Usable by Erlang code that wants to publish messages.
-simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
- ContentTypeBin, BodyBin) ->
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
- Content = #content{class_id = ClassId,
- properties = #'P_basic'{content_type = ContentTypeBin},
- properties_bin = none,
- payload_fragments_rev = [BodyBin]},
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = Content,
- is_persistent = false,
- guid = rabbit_guid:guid()
- },
- simple_publish(Mandatory, Immediate, Message).
-
-%% Usable by Erlang code that wants to publish messages.
-simple_publish(Mandatory, Immediate,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}) ->
- case lookup(ExchangeName) of
- {ok, Exchange} ->
- QPids = route(Exchange, RoutingKey, Content),
- rabbit_router:deliver(QPids, Mandatory, Immediate,
- none, Message);
- {error, Error} -> {error, Error}
+publish(X, Mandatory, Immediate, Txn, Message) ->
+ publish(X, [], Mandatory, Immediate, Txn, Message).
+
+publish(X, Seen, Mandatory, Immediate, Txn,
+ Message = #basic_message{routing_key = RK, content = C}) ->
+ case rabbit_router:deliver(route(X, RK, C),
+ Mandatory, Immediate, Txn, Message) of
+ {_, []} = R ->
+ #exchange{name = XName, arguments = Args} = X,
+ case rabbit_misc:r_arg(XName, exchange, Args,
+ <<"alternate-exchange">>) of
+ undefined ->
+ R;
+ AName ->
+ NewSeen = [XName | Seen],
+ case lists:member(AName, NewSeen) of
+ true ->
+ R;
+ false ->
+ case lookup(AName) of
+ {ok, AX} ->
+ publish(AX, NewSeen,
+ Mandatory, Immediate, Txn,
+ Message);
+ {error, not_found} ->
+ rabbit_log:warning(
+ "alternate exchange for ~s "
+ "does not exist: ~s",
+ [rabbit_misc:rs(XName),
+ rabbit_misc:rs(AName)]),
+ R
+ end
+ end
+ end;
+ R ->
+ R
end.
-sort_arguments(Arguments) ->
- lists:keysort(1, Arguments).
-
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
@@ -252,6 +252,9 @@ route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
route(X = #exchange{type = direct}, RoutingKey, _Content) ->
match_routing_key(X, RoutingKey).
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
match_bindings(#exchange{name = Name}, Match) ->
@@ -383,32 +386,40 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
+ binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
- true -> ok = sync_binding(
- ExchangeName, QueueName, RoutingKey, Arguments,
- Q#amqqueue.durable, fun mnesia:write/3)
+ true -> ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:write/3)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+ binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] -> {error, binding_not_found};
+ _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ maybe_auto_delete(X)
+ end
+ end).
+
+binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
- ok = sync_binding(
- ExchangeName, QueueName, RoutingKey, Arguments,
- Q#amqqueue.durable, fun mnesia:delete_object/3),
- maybe_auto_delete(X)
+ Fun(X, Q, #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = sort_arguments(Arguments)})
end).
-sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
- Binding = #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = sort_arguments(Arguments)},
+sync_binding(Binding, Durable, Fun) ->
ok = case Durable of
true -> Fun(rabbit_durable_route,
#route{binding = Binding}, write);
@@ -474,7 +485,7 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
-%% route/3 and sync_binding/6 do.
+%% route/3 and {add,delete}_binding/4 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index acadf2a0cc..c965c69314 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -39,7 +39,7 @@
-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
--export([r/3, r/2, rs/1]).
+-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
@@ -79,12 +79,14 @@
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K)
- when is_subtype(K, atom())).
+-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) ->
+ r(K) when is_subtype(K, atom())).
-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
kind :: K,
name :: '_'}
when is_subtype(K, atom())).
+-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
+ undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(report_cover/0 :: () -> 'ok').
@@ -175,6 +177,14 @@ r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) ->
r(VHostPath, Kind) when is_binary(VHostPath) ->
#resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
+r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
+ r_arg(VHostPath, Kind, Table, Key);
+r_arg(VHostPath, Kind, Table, Key) ->
+ case lists:keysearch(Key, 1, Table) of
+ {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin);
+ false -> undefined
+ end.
+
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
[Kind, Name, VHostPath])).
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 8455bf1c9d..1b0386e696 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -51,7 +51,7 @@ start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed ->
QList = rabbit_disk_queue:dump_queue(Queue),
{MsgBuf, NextSeq} =
lists:foldl(
- fun ({MsgId, Msg, Size, Delivered, SeqId}, {Buf, NSeq})
+ fun ({_MsgId, Msg, _Size, Delivered, SeqId}, {Buf, NSeq})
when SeqId >= NSeq ->
{queue:in({SeqId, Msg, Delivered}, Buf), SeqId + 1}
end, {queue:new(), 0}, QList),
@@ -178,8 +178,11 @@ only_persistent_msg_ids(Pubs) ->
tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)),
{ok, State};
-tx_cancel(Publishes, State = #mqstate { mode = mixed }) ->
- ok = rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes)),
+tx_cancel(Publishes, State = #mqstate { mode = mixed, is_durable = IsDurable }) ->
+ MsgIds = if IsDurable -> only_persistent_msg_ids(Publishes);
+ true -> []
+ end,
+ ok = rabbit_disk_queue:tx_cancel(MsgIds),
{ok, State}.
only_ack_tags(MsgWithAcks) ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 0b06a063a7..57166428bf 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -51,7 +51,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) ->
- {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}).
+ {routing_result(), [pid()]}).
-endif.
@@ -98,14 +98,15 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
%% therefore safe to use a fire-and-forget cast here and return
%% the QPids - the semantics is preserved. This scales much better
%% than the non-immediate case below.
- {ok, lists:flatmap(
- fun ({Node, QPids}) ->
- gen_server2:cast(
- {?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message}),
- QPids
- end,
- NodeQPids)};
+ {routed,
+ lists:flatmap(
+ fun ({Node, QPids}) ->
+ gen_server2:cast(
+ {?SERVER, Node},
+ {deliver, QPids, Mandatory, Immediate, Txn, Message}),
+ QPids
+ end,
+ NodeQPids)};
deliver_per_node(NodeQPids, Mandatory, Immediate,
Txn, Message) ->
R = rabbit_misc:upmap(
@@ -179,6 +180,6 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
QPids).
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
-check_delivery(true, _ , {false, []}) -> {error, unroutable};
-check_delivery(_ , true, {_ , []}) -> {error, not_delivered};
-check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}.
+check_delivery(true, _ , {false, []}) -> {unroutable, []};
+check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
+check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.