summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEssien Ita Essien <essiene@gmail.com>2009-06-21 22:02:38 +0100
committerEssien Ita Essien <essiene@gmail.com>2009-06-21 22:02:38 +0100
commitc412342142a3e3b1c92f85b5ee829f8f8e1a0d0a (patch)
tree122caf6c1b557176de3c9fa30e4655f4bea60dbe /src
parent27e15fbe128f946fb8b8d5f6fabe593452820952 (diff)
parentda5a1fd5b839d2ccbf83f17d618aae2184731ece (diff)
downloadrabbitmq-server-git-c412342142a3e3b1c92f85b5ee829f8f8e1a0d0a.tar.gz
Merge with upstream
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_access_control.erl14
-rw-r--r--src/rabbit_amqqueue.erl35
-rw-r--r--src/rabbit_amqqueue_process.erl318
-rw-r--r--src/rabbit_basic.erl75
-rw-r--r--src/rabbit_channel.erl80
-rw-r--r--src/rabbit_error_logger.erl10
-rw-r--r--src/rabbit_exchange.erl143
-rw-r--r--src/rabbit_limiter.erl18
-rw-r--r--src/rabbit_log.erl12
-rw-r--r--src/rabbit_misc.erl20
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_persister.erl2
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_router.erl76
-rw-r--r--src/rabbit_tests.erl4
16 files changed, 486 insertions, 352 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0d1ce689ea..e910e24efc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -136,7 +136,7 @@ start(normal, []) ->
ok = rabbit_amqqueue:start(),
- {ok, MemoryAlarms} = application:get_env(memory_alarms),
+ {ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
ok = rabbit_binary_generator:
@@ -226,10 +226,14 @@ print_banner() ->
[Product, Version,
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
- io:format("Logging to ~p~nSASL logging to ~p~n~n",
- [log_location(kernel), log_location(sasl)]).
-
-
+ Settings = [{"node", node()},
+ {"log", log_location(kernel)},
+ {"sasl log", log_location(sasl)},
+ {"database dir", rabbit_mnesia:dir()}],
+ DescrLen = lists:max([length(K) || {K, _V} <- Settings]),
+ Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
+ lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
+ io:nl().
start_child(Mod) ->
{ok,_} = supervisor:start_child(rabbit_sup,
@@ -315,7 +319,7 @@ rotate_logs(File, Suffix, OldHandler, NewHandler) ->
log_rotation_result({error, MainLogError}, {error, SaslLogError}) ->
{error, {{cannot_rotate_main_logs, MainLogError},
- {cannot_rotate_sasl_logs, SaslLogError}}};
+ {cannot_rotate_sasl_logs, SaslLogError}}};
log_rotation_result({error, MainLogError}, ok) ->
{error, {cannot_rotate_main_logs, MainLogError}};
log_rotation_result(ok, {error, SaslLogError}) ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 54348d9a1c..6ff7a1046c 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -45,11 +45,13 @@
-ifdef(use_specs).
+-type(permission_atom() :: 'configure' | 'read' | 'write').
+
-spec(check_login/2 :: (binary(), binary()) -> user()).
-spec(user_pass_login/2 :: (username(), password()) -> user()).
-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
-spec(check_resource_access/3 ::
- (username(), r(atom()), non_neg_integer()) -> 'ok').
+ (username(), r(atom()), permission_atom()) -> 'ok').
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -137,6 +139,10 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
+permission_index(configure) -> #permission.configure;
+permission_index(write) -> #permission.write;
+permission_index(read) -> #permission.read.
+
check_resource_access(Username,
R = #resource{kind = exchange, name = <<"">>},
Permission) ->
@@ -158,7 +164,7 @@ check_resource_access(Username,
[#user_permission{permission = P}] ->
case regexp:match(
binary_to_list(Name),
- binary_to_list(element(Permission, P))) of
+ binary_to_list(element(permission_index(Permission), P))) of
{match, _, _} -> true;
nomatch -> false
end
@@ -239,8 +245,8 @@ add_vhost(VHostPath) ->
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index eb076e94d6..198e2782b4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,10 +31,11 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
+-export([start/0, recover/0, declare/4, delete/3, purge/1]).
+-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
+ stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
@@ -84,7 +85,7 @@
{'error', 'in_use'} |
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
--spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()).
+-spec(deliver/2 :: (pid(), delivery()) -> bool()).
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
@@ -102,6 +103,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -157,11 +159,17 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
+ internal_declare(Q, true).
+
+internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> ok = store_queue(Q),
- ok = add_default_binding(Q),
+ case WantDefaultBinding of
+ true -> add_default_binding(Q);
+ false -> ok
+ end,
Q;
[ExistingQ] -> ExistingQ
end
@@ -201,9 +209,7 @@ with(Name, F, E) ->
with(Name, F) ->
with(Name, F, fun () -> {error, not_found} end).
with_or_die(Name, F) ->
- with(Name, F, fun () -> rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(Name)])
- end).
+ with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -235,13 +241,16 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
-deliver(_IsMandatory, true, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
-deliver(true, _IsImmediate, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
+deliver(QPid, #delivery{immediate = true,
+ txn = Txn, sender = ChPid, message = Message}) ->
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid},
+ infinity);
+deliver(QPid, #delivery{mandatory = true,
+ txn = Txn, sender = ChPid, message = Message}) ->
+ gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity),
true;
-deliver(false, _IsImmediate, Txn, Message, QPid) ->
- gen_server2:cast(QPid, {deliver, Txn, Message}),
+deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
+ gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
true.
redeliver(QPid, Messages) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c390b2b7e4..cf0ef44f5c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,19 +53,21 @@
has_had_consumers,
next_msg_id,
message_buffer,
- round_robin}).
+ active_consumers,
+ blocked_consumers}).
-record(consumer, {tag, ack_required}).
-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
%% These are held in our process dictionary
--record(cr, {consumers,
+-record(cr, {consumer_count,
ch_pid,
limiter_pid,
monitor_ref,
unacked_messages,
is_limit_active,
+ txn,
unsent_message_count}).
-define(INFO_KEYS,
@@ -98,7 +100,8 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}, ?HIBERNATE_AFTER}.
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -128,11 +131,12 @@ ch_record(ChPid) ->
case get(Key) of
undefined ->
MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumers = [],
+ C = #cr{consumer_count = 0,
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
is_limit_active = false,
+ txn = none,
unsent_message_count = 0},
put(Key, C),
C;
@@ -146,7 +150,7 @@ all_ch_record() ->
[C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+ Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
BlockedOld = is_ch_blocked(OldCR),
@@ -156,20 +160,25 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
+record_current_channel_tx(ChPid, Txn) ->
+ %% as a side effect this also starts monitoring the channel (if
+ %% that wasn't happening already)
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
+
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
- round_robin = RoundRobin,
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
- case queue:out(RoundRobin) of
+ case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
- RoundRobinTail} ->
+ ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case not(AckRequired) orelse rabbit_limiter:can_send(
- LimiterPid, self()) of
+ case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
@@ -181,24 +190,38 @@ deliver_immediately(Message, Delivered,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
- NewConsumers =
+ {NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block -> block_consumers(ChPid, RoundRobinTail)
+ ok -> {queue:in(QEntry, ActiveConsumersTail),
+ BlockedConsumers};
+ block ->
+ {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId + 1}};
+ {offered, AckRequired,
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers,
+ next_msg_id = NextId + 1}};
false ->
store_ch_record(C#cr{is_limit_active = true}),
- NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_immediately(Message, Delivered,
- State#q{round_robin = NewConsumers})
+ {NewActiveConsumers, NewBlockedConsumers} =
+ move_consumers(ChPid,
+ ActiveConsumers,
+ BlockedConsumers),
+ deliver_immediately(
+ Message, Delivered,
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers})
end;
{empty, _} ->
{not_offered, State}
end.
-attempt_delivery(none, Message, State) ->
+attempt_delivery(none, _ChPid, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
{true, State1};
@@ -209,13 +232,13 @@ attempt_delivery(none, Message, State) ->
{not_offered, State1} ->
{false, State1}
end;
-attempt_delivery(Txn, Message, State) ->
+attempt_delivery(Txn, ChPid, Message, State) ->
persist_message(Txn, qname(State), Message),
- record_pending_message(Txn, Message),
+ record_pending_message(Txn, ChPid, Message),
{true, State}.
-deliver_or_enqueue(Txn, Message, State) ->
- case attempt_delivery(Txn, Message, State) of
+deliver_or_enqueue(Txn, ChPid, Message, State) ->
+ case attempt_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
@@ -228,22 +251,24 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
State).
-block_consumers(ChPid, RoundRobin) ->
- %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]),
- queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(RoundRobin))).
-
-unblock_consumers(ChPid, Consumers, RoundRobin) ->
- %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]),
- queue:join(RoundRobin,
- queue:from_list([{ChPid, Con} || Con <- Consumers])).
+add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
-block_consumer(ChPid, ConsumerTag, RoundRobin) ->
- %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]),
+remove_consumer(ChPid, ConsumerTag, Queue) ->
+ %% TODO: replace this with queue:filter/2 once we move to R12
queue:from_list(lists:filter(
fun ({CP, #consumer{tag = CT}}) ->
(CP /= ChPid) or (CT /= ConsumerTag)
- end, queue:to_list(RoundRobin))).
+ end, queue:to_list(Queue))).
+
+remove_consumers(ChPid, Queue) ->
+ %% TODO: replace this with queue:filter/2 once we move to R12
+ queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(Queue))).
+
+move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(From)),
+ {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -254,65 +279,48 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
- unblock -> NewRR = unblock_consumers(ChPid,
- NewC#cr.consumers,
- State#q.round_robin),
- run_poke_burst(State#q{round_robin = NewRR})
+ unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
+ move_consumers(ChPid,
+ State#q.blocked_consumers,
+ State#q.active_consumers),
+ run_poke_burst(
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedeConsumers})
end
end.
-check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
- {continue, State};
-check_auto_delete(State = #q{has_had_consumers = false}) ->
- {continue, State};
-check_auto_delete(State = #q{round_robin = RoundRobin}) ->
- % The clauses above rule out cases where no-one has consumed from
- % this queue yet, and cases where we are not an auto_delete queue
- % in any case. Thus it remains to check whether we have any active
- % listeners at this point.
- case queue:is_empty(RoundRobin) of
- true ->
- % There are no waiting listeners. It's possible that we're
- % completely unused. Check.
- case is_unused() of
- true ->
- % There are no active consumers at this
- % point. This is the signal to autodelete.
- {stop, State};
- false ->
- % There is at least one active consumer, so we
- % shouldn't delete ourselves.
- {continue, State}
- end;
- false ->
- % There are some waiting listeners, thus we are not
- % unused, so can continue life as normal without needing
- % to check the process dictionary.
- {continue, State}
- end.
+should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
+should_auto_delete(#q{has_had_consumers = false}) -> false;
+should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
- round_robin = ActiveConsumers}) ->
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found -> noreply(State);
- #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
- NewActive = block_consumers(ChPid, ActiveConsumers),
+ #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
+ unacked_messages = UAM} ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
- case check_auto_delete(
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
- State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- round_robin = NewActive})) of
- {continue, NewState} ->
- noreply(NewState);
- {stop, NewState} ->
- {stop, normal, NewState}
+ case Txn of
+ none -> ok;
+ _ -> ok = rollback_work(Txn, qname(State)),
+ erase_tx(Txn)
+ end,
+ NewState =
+ deliver_or_enqueue_n(
+ [{Message, true} ||
+ {_Messsage_id, Message} <- dict:to_list(UAM)],
+ State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, _} -> none;
+ Other -> Other
+ end,
+ active_consumers = remove_consumers(
+ ChPid, State#q.active_consumers),
+ blocked_consumers = remove_consumers(
+ ChPid, State#q.blocked_consumers)}),
+ case should_auto_delete(NewState) of
+ false -> noreply(NewState);
+ true -> {stop, normal, NewState}
end
end.
@@ -325,12 +333,12 @@ check_queue_owner(none, _) -> ok;
check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
check_queue_owner({_, _}, _) -> mismatch.
-check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) ->
+check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
-check_exclusive_access(none, false) ->
+check_exclusive_access(none, false, _State) ->
ok;
-check_exclusive_access(none, true) ->
- case is_unused() of
+check_exclusive_access(none, true, State) ->
+ case is_unused(State) of
true -> ok;
false -> in_use
end.
@@ -355,16 +363,8 @@ run_poke_burst(MessageBuffer, State) ->
State#q{message_buffer = MessageBuffer}
end.
-is_unused() ->
- is_unused1(get()).
-
-is_unused1([]) ->
- true;
-is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest])
- when Consumers /= [] ->
- false;
-is_unused1([_ | Rest]) ->
- is_unused1(Rest).
+is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
+ queue:is_empty(State#q.blocked_consumers).
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -456,13 +456,17 @@ is_tx_persistent(Txn) ->
#tx{is_persistent = Res} = lookup_tx(Txn),
Res.
-record_pending_message(Txn, Message) ->
+record_pending_message(Txn, ChPid, Message) ->
Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
+ record_current_channel_tx(ChPid, Txn),
+ store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
+ ch_pid = ChPid}).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}).
+ record_current_channel_tx(ChPid, Txn),
+ store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
+ ch_pid = ChPid}).
process_pending(Txn, State) ->
#tx{ch_pid = ChPid,
@@ -519,9 +523,8 @@ i(messages, State) ->
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
-i(consumers, _) ->
- lists:sum([length(Consumers) ||
- #cr{consumers = Consumers} <- all_ch_record()]);
+i(consumers, State) ->
+ queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
i(transactions, _) ->
length(all_tx_record());
i(memory, _) ->
@@ -541,7 +544,7 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call({deliver_immediately, Txn, Message}, _From, State) ->
+handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -555,12 +558,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, Message, State),
+ {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
-handle_call({deliver, Txn, Message}, _From, State) ->
+handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
+ {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
@@ -603,78 +606,91 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder,
- round_robin = RoundRobin}) ->
+ exclusive_consumer = ExistingHolder}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
reply({error, queue_owned_by_another_connection}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumers = Consumers} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not(NoAck)},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- if Consumers == [] ->
+ if ConsumerCount == 0 ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok
end,
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
- exclusive_consumer =
- if
- ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
- round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
+ exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- reply(ok, run_poke_burst(State1))
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_poke_burst(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder,
- round_robin = RoundRobin}) ->
+ State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
- NewConsumers = lists:filter
- (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
- Consumers),
- store_ch_record(C#cr{consumers = NewConsumers}),
- if NewConsumers == [] ->
+ C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
+ store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
+ if ConsumerCount == 1 ->
ok = rabbit_limiter:unregister(LimiterPid, self());
true ->
ok
end,
ok = maybe_send_reply(ChPid, OkMsg),
- case check_auto_delete(
- State#q{exclusive_consumer = cancel_holder(ChPid,
- ConsumerTag,
- Holder),
- round_robin = block_consumer(ChPid,
- ConsumerTag,
- RoundRobin)}) of
- {continue, State1} ->
- reply(ok, State1);
- {stop, State1} ->
- {stop, normal, ok, State1}
+ NewState =
+ State#q{exclusive_consumer = cancel_holder(ChPid,
+ ConsumerTag,
+ Holder),
+ active_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.active_consumers),
+ blocked_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.blocked_consumers)},
+ case should_auto_delete(NewState) of
+ false -> reply(ok, NewState);
+ true -> {stop, normal, ok, NewState}
end
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
- round_robin = RoundRobin}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
+ active_consumers = ActiveConsumers}) ->
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
+ State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
IsEmpty = queue:is_empty(MessageBuffer),
- IsUnused = is_unused(),
+ IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) ->
reply({error, not_empty}, State);
@@ -693,7 +709,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
case Owner of
none ->
- case check_exclusive_access(Holder, true) of
+ case check_exclusive_access(Holder, true, State) of
in_use ->
%% FIXME: Is this really the right answer? What if
%% an active consumer's reader is actually the
@@ -711,9 +727,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
reply(locked, State)
end.
-handle_cast({deliver, Txn, Message}, State) ->
+handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
+ {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
noreply(NewState);
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
@@ -769,10 +785,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
possibly_unblock(
State, ChPid,
- fun (C = #cr{consumers = Consumers,
+ fun (C = #cr{consumer_count = ConsumerCount,
limiter_pid = OldLimiterPid,
is_limit_active = Limited}) ->
- if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ if ConsumerCount =/= 0 andalso OldLimiterPid == undefined ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
new file mode 100644
index 0000000000..761b3863b4
--- /dev/null
+++ b/src/rabbit_basic.erl
@@ -0,0 +1,75 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_basic).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-export([publish/1, message/4, delivery/4]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(publish/1 :: (delivery()) ->
+ {ok, routing_result(), [pid()]} | not_found()).
+-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
+-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
+ message()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+publish(Delivery = #delivery{
+ message = #basic_message{exchange_name = ExchangeName}}) ->
+ case rabbit_exchange:lookup(ExchangeName) of
+ {ok, X} ->
+ {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {ok, RoutingRes, DeliveredQPids};
+ Other ->
+ Other
+ end.
+
+delivery(Mandatory, Immediate, Txn, Message) ->
+ #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
+ sender = self(), message = Message}.
+
+message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ Content = #content{class_id = ClassId,
+ properties = #'P_basic'{content_type = ContentTypeBin},
+ properties_bin = none,
+ payload_fragments_rev = [BodyBin]},
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ persistent_key = none}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b2716ec478..3089bb6293 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -231,13 +231,13 @@ clear_permission_cache() ->
ok.
check_configure_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, #permission.configure).
+ check_resource_access(Username, Resource, configure).
check_write_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, #permission.write).
+ check_resource_access(Username, Resource, write).
check_read_permitted(Resource, #ch{ username = Username}) ->
- check_resource_access(Username, Resource, #permission.read).
+ check_resource_access(Username, Resource, read).
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
@@ -306,7 +306,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath}) ->
+ Content, State = #ch{ virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -317,12 +319,30 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> rabbit_guid:guid();
false -> none
end,
- {noreply, publish(Mandatory, Immediate,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ persistent_key = PersistentKey},
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
+ case RoutingRes of
+ routed ->
+ ok;
+ unroutable ->
+ %% FIXME: 312 should be replaced by the ?NO_ROUTE
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
+ not_delivered ->
+ %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
+ end,
+ {noreply, case TxnKey of
+ none -> State;
+ _ -> add_tx_participants(DeliveredQPids, State)
+ end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -551,6 +571,13 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
{ok, FoundX} -> FoundX;
{error, not_found} ->
check_name('exchange', ExchangeNameBin),
+ case rabbit_misc:r_arg(VHostPath, exchange, Args,
+ <<"alternate-exchange">>) of
+ undefined -> ok;
+ AName -> check_read_permitted(ExchangeName, State),
+ check_write_permitted(AName, State),
+ ok
+ end,
rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
@@ -579,8 +606,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
die_precondition_failed(
"~s in use", [rabbit_misc:rs(ExchangeName)]);
@@ -746,11 +772,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
{error, exchange_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
+ rabbit_misc:not_found(QueueName);
{error, exchange_and_queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
@@ -767,30 +791,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ok -> return_ok(State, NoWait, ReturnMethod)
end.
-publish(Mandatory, Immediate, Message, QPids,
- State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
- Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
- Message, WriterPid),
- case TxnKey of
- none -> State;
- _ -> add_tx_participants(Handled, State)
- end.
-
-deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
- case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of
- {ok, DeliveredQPids} -> DeliveredQPids;
- {error, unroutable} ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>),
- [];
- {error, not_delivered} ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>),
- []
- end.
-
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index dc5824f1c9..76016a8cb2 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -74,7 +74,11 @@ publish(_Other, _Format, _Data, _State) ->
ok.
publish1(RoutingKey, Format, Data, LogExch) ->
- {ok, _QueueNames} = rabbit_exchange:simple_publish(
- false, false, LogExch, RoutingKey, <<"text/plain">>,
- list_to_binary(io_lib:format(Format, Data))),
+ {ok, _RoutingRes, _DeliveredQPids} =
+ rabbit_basic:publish(
+ rabbit_basic:delivery(
+ false, false, none,
+ rabbit_basic:message(
+ LogExch, RoutingKey, <<"text/plain">>,
+ list_to_binary(io_lib:format(Format, Data))))),
ok.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index fc89cfca51..8fb9eae304 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,8 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
- simple_publish/6, simple_publish/3,
- route/3]).
+ publish/2]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
@@ -57,8 +56,6 @@
-ifdef(use_specs).
--type(publish_res() :: {'ok', [pid()]} |
- not_found() | {'error', 'unroutable' | 'not_delivered'}).
-type(bind_res() :: 'ok' | {'error',
'queue_not_found' |
'exchange_not_found' |
@@ -75,11 +72,7 @@
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
--spec(simple_publish/6 ::
- (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
- publish_res()).
--spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
--spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
+-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -164,9 +157,7 @@ lookup(Name) ->
lookup_or_die(Name) ->
case lookup(Name) of
{ok, X} -> X;
- {error, not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(Name)])
+ {error, not_found} -> rabbit_misc:not_found(Name)
end.
list(VHostPath) ->
@@ -196,36 +187,41 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-%% Usable by Erlang code that wants to publish messages.
-simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
- ContentTypeBin, BodyBin) ->
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
- Content = #content{class_id = ClassId,
- properties = #'P_basic'{content_type = ContentTypeBin},
- properties_bin = none,
- payload_fragments_rev = [BodyBin]},
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = Content,
- persistent_key = none},
- simple_publish(Mandatory, Immediate, Message).
-
-%% Usable by Erlang code that wants to publish messages.
-simple_publish(Mandatory, Immediate,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}) ->
- case lookup(ExchangeName) of
- {ok, Exchange} ->
- QPids = route(Exchange, RoutingKey, Content),
- rabbit_router:deliver(QPids, Mandatory, Immediate,
- none, Message);
- {error, Error} -> {error, Error}
+publish(X, Delivery) ->
+ publish(X, [], Delivery).
+
+publish(X, Seen, Delivery = #delivery{
+ message = #basic_message{routing_key = RK, content = C}}) ->
+ case rabbit_router:deliver(route(X, RK, C), Delivery) of
+ {_, []} = R ->
+ #exchange{name = XName, arguments = Args} = X,
+ case rabbit_misc:r_arg(XName, exchange, Args,
+ <<"alternate-exchange">>) of
+ undefined ->
+ R;
+ AName ->
+ NewSeen = [XName | Seen],
+ case lists:member(AName, NewSeen) of
+ true ->
+ R;
+ false ->
+ case lookup(AName) of
+ {ok, AX} ->
+ publish(AX, NewSeen, Delivery);
+ {error, not_found} ->
+ rabbit_log:warning(
+ "alternate exchange for ~s "
+ "does not exist: ~s",
+ [rabbit_misc:rs(XName),
+ rabbit_misc:rs(AName)]),
+ R
+ end
+ end
+ end;
+ R ->
+ R
end.
-sort_arguments(Arguments) ->
- lists:keysort(1, Arguments).
-
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
@@ -239,9 +235,9 @@ route(X = #exchange{type = topic}, RoutingKey, _Content) ->
route(X = #exchange{type = headers}, _RoutingKey, Content) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- H -> sort_arguments(H)
- end,
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
match_bindings(X, fun (#binding{args = Spec}) ->
headers_match(Spec, Headers)
end);
@@ -252,6 +248,9 @@ route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
route(X = #exchange{type = direct}, RoutingKey, _Content) ->
match_routing_key(X, RoutingKey).
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
match_bindings(#exchange{name = Name}, Match) ->
@@ -383,32 +382,40 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
+ binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
- true -> ok = sync_binding(
- ExchangeName, QueueName, RoutingKey, Arguments,
- Q#amqqueue.durable, fun mnesia:write/3)
+ true -> ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:write/3)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+ binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] -> {error, binding_not_found};
+ _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ maybe_auto_delete(X)
+ end
+ end).
+
+binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
- ok = sync_binding(
- ExchangeName, QueueName, RoutingKey, Arguments,
- Q#amqqueue.durable, fun mnesia:delete_object/3),
- maybe_auto_delete(X)
+ Fun(X, Q, #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = sort_arguments(Arguments)})
end).
-sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
- Binding = #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = sort_arguments(Arguments)},
+sync_binding(Binding, Durable, Fun) ->
ok = case Durable of
true -> Fun(rabbit_durable_route,
#route{binding = Binding}, write);
@@ -474,7 +481,7 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
-%% route/3 and sync_binding/6 do.
+%% route/3 and {add,delete}_binding/4 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
@@ -482,14 +489,14 @@ parse_x_match(Other) ->
%%
headers_match(Pattern, Data) ->
MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
"(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
headers_match(Pattern, Data, true, false, MatchKind).
headers_match([], _Data, AllMatch, _AnyMatch, all) ->
@@ -516,8 +523,8 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
%% the corresponding data field. I've interpreted that to
%% mean a type of "void" for the pattern field.
PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
PT =/= DT -> {false, AnyMatch};
PV == DV -> {AllMatch, true};
true -> {false, AnyMatch}
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 3f9b6ebb9b..9f3dcbd071 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -36,7 +36,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
--export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -47,7 +47,7 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
+-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid) ->
+can_send(undefined, _QPid, _AckRequired) ->
true;
-can_send(LimiterPid, QPid) ->
+can_send(LimiterPid, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
+ infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
@@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
+handle_call({can_send, QPid, AckRequired}, _From,
+ State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = Volume + 1}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end.
handle_cast(shutdown, State) ->
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index f408336e94..dd5b498b07 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -75,7 +75,7 @@ debug(Fmt, Args) when is_list(Args) ->
message(Direction, Channel, MethodRecord, Content) ->
gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
+ {message, Direction, Channel, MethodRecord, Content}).
info(Fmt) ->
gen_server:cast(?SERVER, {info, Fmt}).
@@ -112,11 +112,11 @@ handle_cast({debug, Fmt, Args}, State) ->
{noreply, State};
handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
+ [case Direction of
+ in -> "-->";
+ out -> "<--" end,
+ Channel,
+ {MethodRecord, Content}]),
{noreply, State};
handle_cast({info, Fmt}, State) ->
error_logger:info_msg(Fmt),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index eced0b3cbe..72e16f0fc0 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -36,9 +36,10 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, protocol_error/3, protocol_error/4]).
+-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
--export([r/3, r/2, rs/1]).
+-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
@@ -72,16 +73,19 @@
(atom() | amqp_error(), string(), [any()]) -> no_return()).
-spec(protocol_error/4 ::
(atom() | amqp_error(), string(), [any()], atom()) -> no_return()).
+-spec(not_found/1 :: (r(atom())) -> no_return()).
-spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K)
- when is_subtype(K, atom())).
+-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) ->
+ r(K) when is_subtype(K, atom())).
-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
kind :: K,
name :: '_'}
when is_subtype(K, atom())).
+-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
+ undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(report_cover/0 :: () -> 'ok').
@@ -139,6 +143,8 @@ protocol_error(Error, Explanation, Params, Method) ->
CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)),
exit({amqp, Error, CompleteExplanation, Method}).
+not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
+
get_config(Key) ->
case dirty_read({rabbit_config, Key}) of
{ok, {rabbit_config, Key, V}} -> {ok, V};
@@ -169,6 +175,14 @@ r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) ->
r(VHostPath, Kind) when is_binary(VHostPath) ->
#resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
+r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
+ r_arg(VHostPath, Kind, Table, Key);
+r_arg(VHostPath, Kind, Table, Key) ->
+ case lists:keysearch(Key, 1, Table) of
+ {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin);
+ false -> undefined
+ end.
+
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
[Kind, Name, VHostPath])).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 15213861bd..575ecb0adc 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -31,7 +31,7 @@
-module(rabbit_mnesia).
--export([ensure_mnesia_dir/0, status/0, init/0, is_db_empty/0,
+-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, reset/0, force_reset/0]).
-export([table_names/0]).
@@ -47,6 +47,7 @@
-ifdef(use_specs).
-spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]).
+-spec(dir/0 :: () -> string()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> bool()).
@@ -148,8 +149,10 @@ table_definitions() ->
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
+dir() -> mnesia:system_info(directory).
+
ensure_mnesia_dir() ->
- MnesiaDir = mnesia:system_info(directory) ++ "/",
+ MnesiaDir = dir() ++ "/",
case filelib:ensure_dir(MnesiaDir) of
{error, Reason} ->
throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
@@ -185,7 +188,7 @@ check_schema_integrity() ->
%% it doesn't.
cluster_nodes_config_filename() ->
- mnesia:system_info(directory) ++ "/cluster_nodes.config".
+ dir() ++ "/cluster_nodes.config".
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
@@ -301,7 +304,7 @@ create_schema() ->
move_db() ->
mnesia:stop(),
- MnesiaDir = filename:dirname(mnesia:system_info(directory) ++ "/"),
+ MnesiaDir = filename:dirname(dir() ++ "/"),
{{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
BackupDir = lists:flatten(
io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
@@ -418,7 +421,7 @@ reset(Force) ->
ok = delete_cluster_nodes_config(),
%% remove persistet messages and any other garbage we find
lists:foreach(fun file:delete/1,
- filelib:wildcard(mnesia:system_info(directory) ++ "/*")),
+ filelib:wildcard(dir() ++ "/*")),
ok.
leave_cluster([], _) -> ok;
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index f4fa45993a..d0d60ddf3d 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -259,7 +259,7 @@ log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
pending_logs = [Message | Logs]}.
base_filename() ->
- mnesia:system_info(directory) ++ "/rabbit_persister.LOG".
+ rabbit_mnesia:dir() ++ "/rabbit_persister.LOG".
take_snapshot(LogHandle, OldFileName, Snapshot) ->
ok = disk_log:sync(LogHandle),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ba6d6e6a42..a09783bec4 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -231,7 +231,7 @@ start_connection(Parent, Deb, ClientSock) ->
connection_state = pre_init},
handshake, 8))
catch
- Ex -> (if Ex == connection_closed_abruptly ->
+ Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
true ->
fun rabbit_log:error/2
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 0b06a063a7..10f80cc301 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
-export([start_link/0,
- deliver/5]).
+ deliver/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -50,8 +50,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) ->
- {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}).
+-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
-endif.
@@ -62,13 +61,13 @@ start_link() ->
-ifdef(BUG19758).
-deliver(QPids, Mandatory, Immediate, Txn, Message) ->
- check_delivery(Mandatory, Immediate,
- run_bindings(QPids, Mandatory, Immediate, Txn, Message)).
+deliver(QPids, Delivery) ->
+ check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
+ run_bindings(QPids, Delivery)).
-else.
-deliver(QPids, Mandatory, Immediate, Txn, Message) ->
+deliver(QPids, Delivery) ->
%% we reduce inter-node traffic by grouping the qpids by node and
%% only delivering one copy of the message to each node involved,
%% which then in turn delivers it to its queues.
@@ -81,16 +80,14 @@ deliver(QPids, Mandatory, Immediate, Txn, Message) ->
[QPid], D)
end,
dict:new(), QPids)),
- Mandatory, Immediate, Txn, Message).
+ Delivery).
-deliver_per_node([{Node, QPids}], Mandatory, Immediate,
- Txn, Message)
- when Node == node() ->
+deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
%% optimisation
- check_delivery(Mandatory, Immediate,
- run_bindings(QPids, Mandatory, Immediate, Txn, Message));
-deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
- Txn, Message) ->
+ check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
+ run_bindings(QPids, Delivery));
+deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver in run_bindings below will deliver the
%% message to the queue process asynchronously, and return true,
@@ -98,22 +95,19 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
%% therefore safe to use a fire-and-forget cast here and return
%% the QPids - the semantics is preserved. This scales much better
%% than the non-immediate case below.
- {ok, lists:flatmap(
- fun ({Node, QPids}) ->
- gen_server2:cast(
- {?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message}),
- QPids
- end,
- NodeQPids)};
-deliver_per_node(NodeQPids, Mandatory, Immediate,
- Txn, Message) ->
+ {routed,
+ lists:flatmap(
+ fun ({Node, QPids}) ->
+ gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}),
+ QPids
+ end,
+ NodeQPids)};
+deliver_per_node(NodeQPids, Delivery) ->
R = rabbit_misc:upmap(
fun ({Node, QPids}) ->
- try gen_server2:call(
- {?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message},
- infinity)
+ try gen_server2:call({?SERVER, Node},
+ {deliver, QPids, Delivery},
+ infinity)
catch
_Class:_Reason ->
%% TODO: figure out what to log (and do!) here
@@ -130,7 +124,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
end,
{false, []},
R),
- check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}).
+ check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
+ {Routed, lists:append(Handled)}).
-endif.
@@ -139,19 +134,17 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
init([]) ->
{ok, no_state}.
-handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message},
- From, State) ->
+handle_call({deliver, QPids, Delivery}, From, State) ->
spawn(
fun () ->
- R = run_bindings(QPids, Mandatory, Immediate, Txn, Message),
+ R = run_bindings(QPids, Delivery),
gen_server2:reply(From, R)
end),
{noreply, State}.
-handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message},
- State) ->
+handle_cast({deliver, QPids, Delivery}, State) ->
%% in order to preserve message ordering we must not spawn here
- run_bindings(QPids, Mandatory, Immediate, Txn, Message),
+ run_bindings(QPids, Delivery),
{noreply, State}.
handle_info(_Info, State) ->
@@ -165,11 +158,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
+run_bindings(QPids, Delivery) ->
lists:foldl(
fun (QPid, {Routed, Handled}) ->
- case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate,
- Txn, Message, QPid) of
+ case catch rabbit_amqqueue:deliver(QPid, Delivery) of
true -> {true, [QPid | Handled]};
false -> {true, Handled};
{'EXIT', _Reason} -> {Routed, Handled}
@@ -179,6 +171,6 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
QPids).
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
-check_delivery(true, _ , {false, []}) -> {error, unroutable};
-check_delivery(_ , true, {_ , []}) -> {error, not_delivered};
-check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}.
+check_delivery(true, _ , {false, []}) -> {unroutable, []};
+check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
+check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8f0a3a8973..01757509ec 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -261,7 +261,7 @@ test_log_management() ->
%% original log files are not writable
ok = make_files_non_writable([MainLog, SaslLog]),
{error, {{cannot_rotate_main_logs, _},
- {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
+ {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
%% logging directed to tty (handlers were removed in last test)
ok = clean_logs([MainLog, SaslLog], Suffix),
@@ -280,7 +280,7 @@ test_log_management() ->
ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}),
ok = application:set_env(kernel, error_logger, {file, MainLog}),
ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog},
- {rabbit_sasl_report_file_h, SaslLog}]),
+ {rabbit_sasl_report_file_h, SaslLog}]),
passed.
test_log_management_during_startup() ->