summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-04 02:35:53 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-04 02:35:53 +0000
commit6a54dc4518b5cc22dcbe2468ae8d99ecd06fd984 (patch)
tree547b8b87555729c0b0276655fc64117b4e68f6e1 /src
parentf884136a0ccb5462f53f83559d544297f667198d (diff)
parent3fa23a6ea47e18a406281db75e5728aa8d503e25 (diff)
downloadrabbitmq-server-git-6a54dc4518b5cc22dcbe2468ae8d99ecd06fd984.tar.gz
merge stable into default
Diffstat (limited to 'src')
-rw-r--r--src/credit_flow.erl75
-rw-r--r--src/pmon.erl6
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl28
-rw-r--r--src/rabbit_amqqueue_process.erl133
-rw-r--r--src/rabbit_auth_backend_internal.erl10
-rw-r--r--src/rabbit_backing_queue.erl65
-rw-r--r--src/rabbit_backing_queue_qc.erl112
-rw-r--r--src/rabbit_channel.erl363
-rw-r--r--src/rabbit_exchange.erl65
-rw-r--r--src/rabbit_guid.erl19
-rw-r--r--src/rabbit_mirror_queue_master.erl137
-rw-r--r--src/rabbit_mirror_queue_slave.erl25
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_reader.erl203
-rw-r--r--src/rabbit_tests.erl168
-rw-r--r--src/rabbit_trace.erl37
-rw-r--r--src/rabbit_variable_queue.erl284
-rw-r--r--src/rabbit_vhost.erl2
21 files changed, 978 insertions, 774 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index ba99811f70..102c353f9b 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -52,6 +52,22 @@
%%----------------------------------------------------------------------------
+%% process dict update macro - eliminates the performance-hurting
+%% closure creation a HOF would introduce
+-define(UPDATE(Key, Default, Var, Expr),
+ begin
+ %% We deliberately allow Var to escape from the case here
+ %% to be used in Expr. Any temporary var we introduced
+ %% would also escape, and might conflict.
+ case get(Key) of
+ undefined -> Var = Default;
+ Var -> ok
+ end,
+ put(Key, Expr)
+ end).
+
+%%----------------------------------------------------------------------------
+
%% There are two "flows" here; of messages and of credit, going in
%% opposite directions. The variable names "From" and "To" refer to
%% the flow of credit, but the function names refer to the flow of
@@ -66,29 +82,33 @@
send(From) -> send(From, ?DEFAULT_CREDIT).
send(From, {InitialCredit, _MoreCreditAfter}) ->
- update({credit_from, From}, InitialCredit,
- fun (1) -> block(From),
- 0;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_from, From}, InitialCredit, C,
+ if C == 1 -> block(From),
+ 0;
+ true -> C - 1
+ end).
ack(To) -> ack(To, ?DEFAULT_CREDIT).
ack(To, {_InitialCredit, MoreCreditAfter}) ->
- update({credit_to, To}, MoreCreditAfter,
- fun (1) -> grant(To, MoreCreditAfter),
- MoreCreditAfter;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_to, To}, MoreCreditAfter, C,
+ if C == 1 -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ true -> C - 1
+ end).
handle_bump_msg({From, MoreCredit}) ->
- update({credit_from, From}, 0,
- fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
- C + MoreCredit;
- (C) -> C + MoreCredit
- end).
-
-blocked() -> get(credit_blocked, []) =/= [].
+ ?UPDATE({credit_from, From}, 0, C,
+ if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ true -> C + MoreCredit
+ end).
+
+blocked() -> case get(credit_blocked) of
+ undefined -> false;
+ [] -> false;
+ _ -> true
+ end.
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
@@ -105,24 +125,17 @@ grant(To, Quantity) ->
Msg = {bump_credit, {self(), Quantity}},
case blocked() of
false -> To ! Msg;
- true -> update(credit_deferred, [],
- fun (Deferred) -> [{To, Msg} | Deferred] end)
+ true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
+block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
- update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
case blocked() of
- false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
- erase(credit_deferred);
+ false -> case erase(credit_deferred) of
+ undefined -> ok;
+ Credits -> [To ! Msg || {To, Msg} <- Credits]
+ end;
true -> ok
end.
-
-get(Key, Default) ->
- case get(Key) of
- undefined -> Default;
- Value -> Value
- end.
-
-update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/pmon.erl b/src/pmon.erl
index 1aeebb72bc..3789811923 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -19,6 +19,8 @@
-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
monitored/1, is_empty/1]).
+-compile({no_auto_import, [monitor/2]}).
+
-ifdef(use_specs).
%%----------------------------------------------------------------------------
@@ -48,7 +50,9 @@ monitor(Item, M) ->
false -> dict:store(Item, erlang:monitor(process, Item), M)
end.
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], M) -> M; %% optimisation
+monitor_all([Item], M) -> monitor(Item, M); %% optimisation
+monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
demonitor(Item, M) ->
case dict:find(Item, M) of
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c3a6d283b2..7b8348fc31 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -400,13 +400,11 @@ status() ->
is_running() -> is_running(node()).
-is_running(Node) ->
- rabbit_nodes:is_running(Node, rabbit).
+is_running(Node) -> rabbit_nodes:is_running(Node, rabbit).
environment() ->
- lists:keysort(
- 1, [P || P = {K, _} <- application:get_all_env(rabbit),
- K =/= default_pass]).
+ lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit),
+ K =/= default_pass]).
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 173f76481b..1a27036473 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,7 +34,7 @@
-export([start_mirroring/1, stop_mirroring/1]).
%% internal
--export([internal_declare/2, internal_delete/2, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -156,11 +156,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/2 ::
- (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -257,7 +257,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName, QPid),
+ false -> TailFun = internal_delete(QueueName),
fun () -> TailFun(), ExistingQ end
end
end
@@ -302,6 +302,8 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup([]) -> []; %% optimisation
+lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive for reasons explained in rabbit_misc:dirty_read/1.
@@ -569,7 +571,7 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName, QPid) ->
+internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
@@ -579,8 +581,7 @@ internal_delete(QueueName, QPid) ->
fun() ->
ok = T(),
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QueueName}])
+ [{name, QueueName}])
end
end
end).
@@ -600,7 +601,7 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
- qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
@@ -613,10 +614,9 @@ on_node_down(Node) ->
fun () ->
T(),
lists:foreach(
- fun({QName, QPid}) ->
+ fun(QName) ->
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QName}])
+ [{name, QName}])
end, Qs)
end
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ab735be6dd..f9614517fe 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,7 +85,7 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [pid,
+ [name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
@@ -101,16 +101,14 @@
]).
-define(CREATION_EVENT_KEYS,
- [pid,
- name,
+ [name,
durable,
auto_delete,
arguments,
owner_pid
]).
--define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
%%----------------------------------------------------------------------------
@@ -182,7 +180,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, self()),
+ rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -264,7 +262,7 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
-init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
@@ -278,7 +276,8 @@ terminate_shutdown(Fun, State) ->
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- [emit_consumer_deleted(Ch, CTag)
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -478,11 +477,10 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, _Remaining}, State1} =
- fetch(AckRequired, State),
+ {Result, State1} = fetch(AckRequired, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State1),
- {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}.
+ drop_expired_msgs(State1),
+ {Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
State;
@@ -528,15 +526,15 @@ discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State),
+ drop_expired_msgs(State),
{_IsEmpty1, State2} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
BQ:is_empty(BQS), State1),
State2.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
- Props = #message_properties{delivered = Delivered},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
@@ -558,15 +556,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State1) of
+ Props = message_properties(Message, Confirm, State),
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
{true, State2} ->
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -598,9 +596,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
-remove_consumers(ChPid, Queue) ->
+remove_consumers(ChPid, Queue, QName) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag),
+ emit_consumer_deleted(ChPid, CTag, QName),
false;
(_) ->
true
@@ -641,7 +639,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
- _ = remove_consumers(ChPid, Blocked), %% for stats emission
+ QName = qname(State),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -649,7 +648,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State#q.active_consumers,
+ QName),
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
@@ -697,10 +697,9 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually,
- delivered = Delivered}.
+ needs_confirming = Confirm == eventually}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
@@ -712,19 +711,22 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_messages(State = #q{dlx = DLX,
- backing_queue_state = BQS,
- backing_queue = BQ }) ->
+drop_expired_msgs(State = #q{dlx = DLX,
+ backing_queue_state = BQS,
+ backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, BQS1} = case DLX of
- undefined -> {Next, undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ -> {Next, Msgs, BQS2} =
- BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun = dead_letter_fun(expired),
- DLXFun(Msgs),
+ undefined -> BQ:dropwhile(ExpirePred, BQS);
+ _ -> {Next, Msgs, BQS2} =
+ BQ:fetchwhile(ExpirePred,
+ fun accumulate_msgs/3,
+ [], BQS),
+ case Msgs of
+ [] -> ok;
+ _ -> (dead_letter_fun(expired))(
+ lists:reverse(Msgs))
+ end,
{Next, BQS2}
end,
ensure_ttl_timer(case Props of
@@ -732,6 +734,8 @@ drop_expired_messages(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
+accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+
ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
@@ -787,12 +791,9 @@ stop(State) -> stop(undefined, noreply, State).
stop(From, Reply, State = #q{unconfirmed = UC}) ->
case {dtree:is_empty(UC), Reply} of
- {true, noreply} ->
- {stop, normal, State};
- {true, _} ->
- {stop, normal, Reply, State};
- {false, _} ->
- noreply(State#q{delayed_stop = {From, Reply}})
+ {true, noreply} -> {stop, normal, State};
+ {true, _} -> {stop, normal, Reply, State};
+ {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
end.
cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
@@ -946,19 +947,19 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
-emit_consumer_deleted(ChPid, ConsumerTag) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
%%----------------------------------------------------------------------------
@@ -1049,11 +1050,11 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_messages(State1)) of
+ case fetch(AckRequired, drop_expired_msgs(State1)) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag, Remaining}, State2} ->
- State3 =
+ {{Message, IsDelivered, AckTag}, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
ChAckTags1 = sets:add_element(AckTag, ChAckTags),
@@ -1062,14 +1063,13 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = ExistingHolder}) ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+ _From, State = #q{exclusive_consumer = Holder}) ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
@@ -1078,7 +1078,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
+ true -> Holder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
@@ -1093,7 +1093,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
+ not NoAck, qname(State2)),
reply(ok, State2)
end;
@@ -1104,7 +1104,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
not_found ->
reply(ok, State);
C = #cr{blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag),
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
@@ -1114,7 +1114,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(From, ok, State1)
@@ -1123,16 +1123,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(ensure_expiry_timer(State)),
+ drop_expired_msgs(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- IsEmpty = BQ:is_empty(BQS),
+ IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
- IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
- IfUnused and not(IsUnused) -> reply({error, in_use}, State);
+ IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
+ IfUnused and not(IsUnused) -> reply({error, in_use}, State);
true -> stop(From, {ok, BQ:len(BQS)}, State)
end;
@@ -1148,11 +1148,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ QName = qname(State),
case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
+ emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
reply(ok, State).
@@ -1200,8 +1202,9 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
+ {ok, BQS1} = BQ:ackfold(
+ fun (M, A, ok) -> DLXFun([{M, A}]) end,
+ ok, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1310,7 +1313,7 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+ noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
handle_info(emit_stats, State) ->
emit_stats(State),
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 7b9df81e78..919be3f3ee 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -49,7 +49,7 @@
-spec(hash_password/1 :: (rabbit_types:password())
-> rabbit_types:password_hash()).
-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok').
--spec(list_users/0 :: () -> rabbit_types:infos()).
+-spec(list_users/0 :: () -> [rabbit_types:infos()]).
-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(lookup_user/1 :: (rabbit_types:username())
-> rabbit_types:ok(rabbit_types:internal_user())
@@ -58,14 +58,14 @@
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
-> 'ok').
--spec(list_permissions/0 :: () -> rabbit_types:infos()).
+-spec(list_permissions/0 :: () -> [rabbit_types:infos()]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> rabbit_types:infos()).
+ (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_permissions/1 ::
- (rabbit_types:username()) -> rabbit_types:infos()).
+ (rabbit_types:username()) -> [rabbit_types:infos()]).
-spec(list_user_vhost_permissions/2 ::
(rabbit_types:username(), rabbit_types:vhost())
- -> rabbit_types:infos()).
+ -> [rabbit_types:infos()]).
-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index af660c60a0..99b5946e59 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,17 +26,16 @@
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
+-type(drop_result(Ack) ::
+ ('empty' | {rabbit_types:msg_id(), Ack})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -78,8 +77,8 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state().
+ rabbit_types:message_properties(), boolean(), pid(),
+ state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -124,33 +123,50 @@
%% be ignored.
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
-%% Drop messages from the head of the queue while the supplied predicate returns
-%% true. Also accepts a boolean parameter that determines whether the messages
-%% necessitate an ack or not. If they do, the function returns a list of
-%% messages with the respective acktags.
--callback dropwhile(msg_pred(), true, state())
- -> {rabbit_types:message_properties() | undefined,
- [{rabbit_types:basic_message(), ack()}], state()};
- (msg_pred(), false, state())
- -> {rabbit_types:message_properties() | undefined,
- undefined, state()}.
+%% Drop messages from the head of the queue while the supplied
+%% predicate on message properties returns true. Returns the first
+%% message properties for which the predictate returned false, or
+%% 'undefined' if the whole backing queue was traversed w/o the
+%% predicate ever returning false.
+-callback dropwhile(msg_pred(), state())
+ -> {rabbit_types:message_properties() | undefined, state()}.
+
+%% Like dropwhile, except messages are fetched in "require
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
+ -> {rabbit_types:message_properties() | undefined,
+ A, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}.
+%% Remove the next message.
+-callback drop(true, state()) -> {drop_result(ack()), state()};
+ (false, state()) -> {drop_result(undefined), state()}.
+
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
-%% Acktags supplied are for messages which should be processed. The
-%% provided callback function is called with each message.
--callback fold(msg_fun(), state(), [ack()]) -> state().
-
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(),
+ rabbit_types:message_properties(),
+ A) -> {('stop' | 'cont'), A}),
+ A, state()) -> {A, state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -210,9 +226,10 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {dropwhile, 2}, {fetchwhile, 4},
+ {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index b37fbb29e2..5b3b8aa806 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -85,17 +85,19 @@ backing_queue_test(Cmds) ->
%% Commands
-%% Command frequencies are tuned so that queues are normally reasonably
-%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple
-%% and purging cause extreme queue lengths, so these have lower probabilities.
-%% Fetches are sufficiently frequent so that commands that need acktags
-%% get decent coverage.
+%% Command frequencies are tuned so that queues are normally
+%% reasonably short, but they may sometimes exceed
+%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue
+%% lengths, so these have lower probabilities. Fetches/drops are
+%% sufficiently frequent so that commands that need acktags get decent
+%% coverage.
command(S) ->
frequency([{10, qc_publish(S)},
{1, qc_publish_delivered(S)},
{1, qc_publish_multiple(S)}, %% very slow
- {15, qc_fetch(S)}, %% needed for ack and requeue
+ {9, qc_fetch(S)}, %% needed for ack and requeue
+ {6, qc_drop(S)}, %%
{15, qc_ack(S)},
{15, qc_requeue(S)},
{3, qc_set_ram_duration_target(S)},
@@ -104,7 +106,8 @@ command(S) ->
{1, qc_dropwhile(S)},
{1, qc_is_empty(S)},
{1, qc_timeout(S)},
- {1, qc_purge(S)}]).
+ {1, qc_purge(S)},
+ {1, qc_fold(S)}]).
qc_publish(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish,
@@ -112,7 +115,7 @@ qc_publish(#state{bqstate = BQ}) ->
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
expiry = oneof([undefined | lists:seq(1, 10)])},
- self(), BQ]}.
+ false, self(), BQ]}.
qc_publish_multiple(#state{}) ->
{call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
@@ -124,6 +127,9 @@ qc_publish_delivered(#state{bqstate = BQ}) ->
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
+qc_drop(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, drop, [boolean(), BQ]}.
+
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
@@ -141,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -152,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) ->
qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
+qc_fold(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fold, [makefoldfun(pos_integer()), foldacc(), BQ]}.
+
%% Preconditions
%% Create long queues by only allowing publishing
@@ -173,7 +182,7 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
%% Model updates
-next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
+next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _BQ]}) ->
#state{len = Len,
messages = Messages,
confirms = Confirms,
@@ -217,22 +226,10 @@ next_state(S, Res,
};
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
- #state{len = Len, messages = Messages, acks = Acks} = S,
- ResultInfo = {call, erlang, element, [1, Res]},
- BQ1 = {call, erlang, element, [2, Res]},
- AckTag = {call, erlang, element, [3, ResultInfo]},
- S1 = S#state{bqstate = BQ1},
- case gb_trees:is_empty(Messages) of
- true -> S1;
- false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
- S2 = S1#state{len = Len - 1, messages = M2},
- case AckReq of
- true ->
- S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
- false ->
- S2
- end
- end;
+ next_state_fetch_and_drop(S, Res, AckReq, 3);
+
+next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) ->
+ next_state_fetch_and_drop(S, Res, AckReq, 2);
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
@@ -265,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [3, Res]},
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
@@ -278,19 +275,38 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()};
+
+next_state(S, Res, {call, ?BQMOD, fold, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1}.
%% Postconditions
postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
- {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
+ {{MsgFetched, _IsDelivered, AckTag}, _BQ} ->
{_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ Len =/= 0;
+ {empty, _BQ} ->
+ Len =:= 0
+ end;
+
+postcondition(S, {call, ?BQMOD, drop, _Args}, Res) ->
+ #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
+ case Res of
+ {{MsgIdFetched, AckTag}, _BQ} ->
+ {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
+ MsgId = eval({call, erlang, element,
+ [?RECORD_INDEX(id, basic_message), Msg]}),
+ MsgIdFetched =:= MsgId andalso
+ not proplists:is_defined(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms) andalso
+ Len =/= 0;
{empty, _BQ} ->
Len =:= 0
end;
@@ -313,6 +329,15 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
ReportedConfirmed);
+postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) ->
+ #state{messages = Messages} = S,
+ {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) ->
+ {stop, Acc};
+ ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) ->
+ FoldFun(Msg, MsgProps, Acc)
+ end, {cont, Acc0}, gb_trees:to_list(Messages)),
+ true = Model =:= Res;
+
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
@@ -371,6 +396,15 @@ rand_choice(List, Selection, N) ->
rand_choice(List -- [Picked], [Picked | Selection],
N - 1).
+makefoldfun(Size) ->
+ fun (Msg, _MsgProps, Acc) ->
+ case length(Acc) > Size of
+ false -> {cont, [Msg | Acc]};
+ true -> {stop, Acc}
+ end
+ end.
+foldacc() -> [].
+
dropfun(Props) ->
Expiry = eval({call, erlang, element,
[?RECORD_INDEX(expiry, message_properties), Props]}),
@@ -388,6 +422,24 @@ drop_messages(Messages) ->
end
end.
+next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) ->
+ #state{len = Len, messages = Messages, acks = Acks} = S,
+ ResultInfo = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ AckTag = {call, erlang, element, [AckTagIdx, ResultInfo]},
+ S1 = S#state{bqstate = BQ1},
+ case gb_trees:is_empty(Messages) of
+ true -> S1;
+ false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
+ false ->
+ S2
+ end
+ end.
+
-else.
-export([prop_disabled/0]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b97af6d8ca..1af60de8b4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,13 +33,15 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- conn_name, limiter, tx_status, next_tag, unacked_message_q,
- uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
- virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, delivering_queues,
+ conn_name, limiter, tx, next_tag, unacked_message_q, user,
+ virtual_host, most_recently_declared_queue,
+ queue_names, queue_monitors, consumer_mapping,
+ blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
+-record(tx, {msgs, acks, nacks}).
+
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(STATISTICS_KEYS,
@@ -64,6 +66,12 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INCR_STATS(Incs, Measure, State),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
+ fine -> incr_stats(Incs, Measure);
+ _ -> ok
+ end).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -185,15 +193,13 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
conn_pid = ConnPid,
conn_name = ConnName,
limiter = Limiter,
- tx_status = none,
+ tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
- uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
@@ -312,9 +318,12 @@ handle_cast({deliver, ConsumerTag, AckRequired,
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
+
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
- noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ {noreply, ensure_stats_timer(State1), Timeout}.
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@@ -325,8 +334,10 @@ handle_info(timeout, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- noreply([ensure_stats_timer],
- rabbit_event:reset_stats_timer(State, #ch.stats_timer));
+ State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
+ %% NB: don't call noreply/1 since we don't want to kick off the
+ %% stats timer.
+ {noreply, send_confirms(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -334,9 +345,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
- erase_queue_stats(QPid),
- noreply(State4#ch{queue_monitors = pmon:erase(
- QPid, State4#ch.queue_monitors)});
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -366,30 +381,11 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> reply(Reply, [], NewState).
-
-reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate).
+reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
-reply(Reply, Mask, NewState, Timeout) ->
- {reply, Reply, next_state(Mask, NewState), Timeout}.
+noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-noreply(NewState) -> noreply([], NewState).
-
-noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
-
-noreply(Mask, NewState, Timeout) ->
- {noreply, next_state(Mask, NewState), Timeout}.
-
--define(MASKED_CALL(Fun, Mask, State),
- case lists:member(Fun, Mask) of
- true -> State;
- false -> Fun(State)
- end).
-
-next_state(Mask, State) ->
- State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State),
- State2 = ?MASKED_CALL(send_confirms, Mask, State1),
- State2.
+next_state(State) -> ensure_stats_timer(send_confirms(State)).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
@@ -523,16 +519,12 @@ check_not_default_exchange(_) ->
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
-%% One, quite reasonable, interpretation of the spec, taken by the
-%% QPid M1 Java client, is that the exclusion of "amq." prefixed names
+%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names
%% only applies on actual creation, and not in the cases where the
-%% entity already exists. This is how we use this function in the code
-%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly
-%% 0-9SP1, making it illegal to attempt to declare an exchange/queue
-%% with an amq.* name when passive=false. So this will need
-%% revisiting.
+%% entity already exists or passive=true.
%%
-%% TODO: enforce other constraints on name. See AMQP JIRA 69.
+%% NB: We deliberately do not enforce the other constraints on names
+%% required by the spec.
check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
rabbit_misc:protocol_error(
access_refused,
@@ -553,11 +545,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-record_confirm(undefined, _, State) ->
- State;
-record_confirm(MsgSeqNo, XName, State) ->
- record_confirms([{MsgSeqNo, XName}], State).
-
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -597,8 +584,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
%% while waiting for the reply to a synchronous command, we generally
%% do allow this...except in the case of a pending tx.commit, where
%% it could wreak havoc.
-handle_method(_Method, _, #ch{tx_status = TxStatus})
- when TxStatus =/= none andalso TxStatus =/= in_progress ->
+handle_method(_Method, _, #ch{tx = Tx})
+ when Tx =:= committing orelse Tx =:= failed ->
rabbit_misc:protocol_error(
channel_error, "unexpected command while processing 'tx.commit'", []);
@@ -612,7 +599,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
- tx_status = TxStatus,
+ tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -626,22 +613,22 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Props, State),
check_expiration_header(Props),
{MsgSeqNo, State1} =
- case {TxStatus, ConfirmEnabled} of
+ case {Tx, ConfirmEnabled} of
{none, false} -> {undefined, State};
{_, _} -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, TraceState),
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
+ DQ = {Delivery, QNames},
{noreply,
- case TxStatus of
- none -> deliver_to_queues({Delivery, QNames}, State1);
- in_progress -> TMQ = State1#ch.uncommitted_message_q,
- NewTMQ = queue:in({Delivery, QNames}, TMQ),
- State1#ch{uncommitted_message_q = NewTMQ}
+ case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ #tx{msgs = Msgs} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = Tx#tx{msgs = Msgs1}}
end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
@@ -655,15 +642,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
- case TxStatus of
- none -> ack(Acked, State1),
- State1;
- in_progress -> State1#ch{uncommitted_acks =
- Acked ++ State1#ch.uncommitted_acks}
+ case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ #tx{acks = Acks} -> State1#ch{tx = Tx#tx{acks = Acked ++ Acks}}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -677,7 +663,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -689,7 +675,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- State1 = monitor_delivering_queue(NoAck, QPid, State),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -728,10 +714,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
State1 = monitor_delivering_queue(
- NoAck, QPid, State#ch{consumer_mapping = CM1}),
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1040,34 +1027,37 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
precondition_failed("cannot switch from confirm to tx mode");
+handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
+
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
+ {reply, #'tx.select_ok'{}, State};
-handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _,
- State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL,
- limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
- ack(TAL, State1),
+handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs,
+ acks = Acks,
+ nacks = Nacks},
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ ack(Acks, State1),
lists:foreach(
- fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
- {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, Nacks),
+ {noreply, maybe_complete_tx(State1#ch{tx = committing})};
-handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL}) ->
- TNL1 = lists:append([L || {_, L} <- TNL]),
- UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
-
-handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
+ tx = #tx{acks = Acks,
+ nacks = Nacks}}) ->
+ NacksL = lists:append([L || {_, L} <- Nacks]),
+ UAMQ1 = queue:from_list(lists:usort(Acks ++ NacksL ++ queue:to_list(UAMQ))),
+ {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
+ tx = new_tx()}};
+
+handle_method(#'confirm.select'{}, _, #ch{tx = #tx{}}) ->
precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
@@ -1126,9 +1116,12 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
delivering_queues = case NoAck of
true -> DQ;
false -> sets:add_element(QPid, DQ)
@@ -1212,17 +1205,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
Content).
reject(DeliveryTag, Requeue, Multiple,
- State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
- case TxStatus of
- none ->
- reject(Requeue, Acked, State1#ch.limiter),
- State1;
- in_progress ->
- State1#ch{uncommitted_nacks =
- [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ #tx{nacks = Nacks} -> Nacks1 = [{Requeue, Acked} | Nacks],
+ State1#ch{tx = Tx#tx{nacks = Nacks1}}
end}.
reject(Requeue, Acked, Limiter) ->
@@ -1233,21 +1224,21 @@ reject(Requeue, Acked, Limiter) ->
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
- Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State);
+ true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
@@ -1277,18 +1268,20 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-ack(Acked, State) ->
+ack(Acked, State = #ch{queue_names = QNames}) ->
Incs = fold_per_queue(
fun (QPid, MsgIds, L) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{queue_stats, QPid, length(MsgIds)} | L]
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count} | L];
+ error -> L
+ end
end, [], Acked),
ok = notify_limiter(State#ch.limiter, Acked),
- incr_stats(Incs, ack, State).
+ ?INCR_STATS(Incs, ack, State).
-new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = []}.
+new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1339,50 +1332,72 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
- QNames}, State) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = State#ch{queue_monitors =
- pmon:monitor_all(DeliveredQPids,
- State#ch.queue_monitors)},
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State1),
- incr_stats([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- State2.
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1}),
+ ?INCR_STATS([{exchange_stats, XName, 1} |
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
+ State1.
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_route),
- incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirms([{MsgSeqNo, XName}], State);
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)}.
+ State#ch.unconfirmed)};
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State, no_route),
+ ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> record_confirms([{MsgSeqNo, XName}], State)
+ end.
send_nacks([], State) ->
State;
-send_nacks(MXs, State = #ch{tx_status = none}) ->
+send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx_status = failed}).
+ maybe_complete_tx(State#ch{tx = failed}).
-send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
+send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
-send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
+send_confirms(State = #ch{tx = none, confirmed = C}) ->
MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
- incr_stats([{exchange_stats, XName, 1}], confirm, State),
+ ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
send_confirms(MsgSeqNos, State#ch{confirmed = []});
@@ -1418,7 +1433,7 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+maybe_complete_tx(State = #ch{tx = #tx{}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
case dtree:is_empty(UC) of
@@ -1426,16 +1441,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
true -> complete_tx(State#ch{confirmed = []})
end.
-complete_tx(State = #ch{tx_status = committing}) ->
+complete_tx(State = #ch{tx = committing}) ->
ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
- State#ch{tx_status = in_progress};
-complete_tx(State = #ch{tx_status = failed}) ->
+ State#ch{tx = new_tx()};
+complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
State),
- State1#ch{tx_status = in_progress}.
+ State1#ch{tx = new_tx()}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1444,19 +1459,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx_status = TE}) -> TE =/= none;
+i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
-i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
- dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
- queue:len(UAMQ);
-i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
- queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
- length(TAL);
+i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
+i(messages_uncommitted, #ch{tx = #tx{msgs = Msgs}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{}) -> 0;
+i(acks_uncommitted, #ch{tx = #tx{acks = Acks}}) -> length(Acks);
+i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
@@ -1467,12 +1479,8 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-incr_stats(Incs, Measure, State) ->
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> [update_measures(Type, Key, Inc, Measure) ||
- {Type, Key, Inc} <- Incs];
- _ -> ok
- end.
+incr_stats(Incs, Measure) ->
+ [update_measures(Type, Key, Inc, Measure) || {Type, Key, Inc} <- Incs].
update_measures(Type, Key, Inc, Measure) ->
Measures = case get({Type, Key}) of
@@ -1489,24 +1497,23 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- CoarseStats = infos(?STATISTICS_KEYS, State),
+ Coarse = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse ->
- rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
- fine ->
- FineStats =
- [{channel_queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {channel_exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats,
- Extra ++ CoarseStats ++ FineStats)
+ coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
+ fine -> Fine = [{channel_queue_stats,
+ [{QName, Stats} ||
+ {{queue_stats, QName}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{XName, Stats} ||
+ {{exchange_stats, XName}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
end.
-erase_queue_stats(QPid) ->
- erase({queue_stats, QPid}),
+erase_queue_stats(QName) ->
+ erase({queue_stats, QName}),
[erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+ {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a205b23d0b..9339161f29 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -39,8 +39,7 @@
-spec(recover/0 :: () -> [name()]).
-spec(callback/4::
(rabbit_types:exchange(), fun_name(),
- fun((boolean()) -> non_neg_integer()) | atom(),
- [any()]) -> 'ok').
+ fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
@@ -114,26 +113,19 @@ recover() ->
[XName || #exchange{name = XName} <- Xs].
callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
- Serial = fun (Bool) ->
- case Serial0 of
- _ when is_atom(Serial0) -> Serial0;
- _ -> Serial0(Bool)
- end
+ Serial = if is_function(Serial0) -> Serial0;
+ is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
- [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
- || M <- decorators()],
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
+ M <- decorators()],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
serialise_events(X = #exchange{type = Type}) ->
- case [Serialise || M <- decorators(),
- Serialise <- [M:serialise_events(X)],
- Serialise == true] of
- [] -> (type_to_module(Type)):serialise_events();
- _ -> true
- end.
+ lists:any(fun (M) -> M:serialise_events(X) end, decorators())
+ orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
Serial = case serialise_events(X) of
@@ -318,22 +310,19 @@ route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
route(X = #exchange{name = XName}, Delivery) ->
- route1(Delivery, {queue:from_list([X]), XName, []}).
-
-route1(Delivery, {WorkList, SeenXs, QNames}) ->
- case queue:out(WorkList) of
- {empty, _WorkList} ->
- lists:usort(QNames);
- {{value, X = #exchange{type = Type}}, WorkList1} ->
- DstNames = process_alternate(
- X, ((type_to_module(Type)):route(X, Delivery))),
- route1(Delivery,
- lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
- DstNames))
- end.
+ route1(Delivery, {[X], XName, []}).
+
+route1(_, {[], _, QNames}) ->
+ lists:usort(QNames);
+route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
+ DstNames = process_alternate(
+ X, ((type_to_module(Type)):route(X, Delivery))),
+ route1(Delivery,
+ lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
+ DstNames)).
process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
- Results;
+ Results;
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
@@ -347,23 +336,25 @@ process_route(#resource{kind = exchange} = XName,
Acc;
process_route(#resource{kind = exchange} = XName,
{WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
- {case lookup(XName) of
- {ok, X} -> queue:in(X, WorkList);
- {error, not_found} -> WorkList
- end, gb_sets:from_list([SeenX, XName]), QNames};
+ {cons_if_present(XName, WorkList),
+ gb_sets:from_list([SeenX, XName]), QNames};
process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames} = Acc) ->
case gb_sets:is_element(XName, SeenXs) of
true -> Acc;
- false -> {case lookup(XName) of
- {ok, X} -> queue:in(X, WorkList);
- {error, not_found} -> WorkList
- end, gb_sets:add_element(XName, SeenXs), QNames}
+ false -> {cons_if_present(XName, WorkList),
+ gb_sets:add_element(XName, SeenXs), QNames}
end;
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
+cons_if_present(XName, L) ->
+ case lookup(XName) of
+ {ok, X} -> [X | L];
+ {error, not_found} -> L
+ end.
+
call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index cedbbdb380..8ee9ad5bc0 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -104,8 +104,6 @@ advance_blocks({B1, B2, B3, B4}, I) ->
B5 = erlang:phash2({B1, I}, 4294967296),
{{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
-blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>.
-
%% generate a GUID. This function should be used when performance is a
%% priority and predictability is not an issue. Otherwise use
%% gen_secure/0.
@@ -114,14 +112,15 @@ gen() ->
%% time we need a new guid we rotate them producing a new hash
%% with the aid of the counter. Look at the comments in
%% advance_blocks/2 for details.
- {BS, I} = case get(guid) of
- undefined -> <<B1:32, B2:32, B3:32, B4:32>> =
- erlang:md5(term_to_binary(fresh())),
- {{B1,B2,B3,B4}, 0};
- {BS0, I0} -> advance_blocks(BS0, I0)
- end,
- put(guid, {BS, I}),
- blocks_to_binary(BS).
+ case get(guid) of
+ undefined -> <<B1:32, B2:32, B3:32, B4:32>> = Res =
+ erlang:md5(term_to_binary(fresh())),
+ put(guid, {{B1, B2, B3, B4}, 0}),
+ Res;
+ {BS, I} -> {{B1, B2, B3, B4}, _} = S = advance_blocks(BS, I),
+ put(guid, S),
+ <<B1:32, B2:32, B3:32, B4:32>>
+ end.
%% generate a non-predictable GUID.
%%
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index df733546b4..e857f39526 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,11 +17,12 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
- dropwhile/3, set_ram_duration_target/2, ram_duration/1,
+ purge/1, publish/5, publish_delivered/4,
+ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, fold/3]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -37,10 +38,8 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered,
seen_status,
confirmed,
- ack_msg_id,
known_senders
}).
@@ -54,10 +53,8 @@
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- set_delivered :: non_neg_integer(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
- ack_msg_id :: dict(),
known_senders :: set()
}).
@@ -113,10 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0,
seen_status = dict:new(),
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:new() }.
stop_mirroring(State = #state { coordinator = CPid,
@@ -135,8 +130,8 @@ terminate({shutdown, dropped} = Reason,
%% in without this node being restarted. Thus we must do the full
%% blown delete_and_terminate now, but only locally: we do not
%% broadcast delete_and_terminate.
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 };
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
+
terminate(Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -147,8 +142,7 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
stop_all_slaves(Reason, State),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
stop_all_slaves(Reason, #state{gm = GM}) ->
Info = gm:info(GM),
@@ -174,30 +168,27 @@ purge(State = #state { gm = GM,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
- {Count, State #state { backing_queue_state = BQS1,
- set_delivered = 0 }}.
+ {Count, State #state { backing_queue_state = BQS1 }}.
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
discard(MsgId, ChPid, State = #state { gm = GM,
@@ -220,22 +211,17 @@ discard(MsgId, ChPid, State = #state { gm = GM,
State
end.
-dropwhile(Pred, AckRequired,
- State = #state{gm = GM,
- backing_queue = BQ,
- set_delivered = SetDelivered,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
- Len1 = BQ:len(BQS1),
- Dropped = Len - Len1,
- case Dropped of
- 0 -> ok;
- _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
- end,
- SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Next, Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, BQS1} = BQ:dropwhile(Pred, BQS),
+ {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
+
+fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
+ {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -267,43 +253,33 @@ drain_confirmed(State = #state { backing_queue = BQ,
seen_status = SS1,
confirmed = [] }}.
-fetch(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+fetch(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
- Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- SetDelivered1 = lists:max([0, SetDelivered - 1]),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1,
- ack_msg_id = AM1 }}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1)
+ end}.
+
+drop(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:drop(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, AckTag} -> drop_one(AckTag, State1)
+ end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
- {MsgIds, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
-
-fold(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -312,6 +288,16 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+ackfold(MsgFun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
+ {Acc1, State #state { backing_queue_state = BQS1 }}.
+
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).
@@ -409,10 +395,8 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
sender_death_fun() ->
@@ -440,8 +424,25 @@ depth_fun() ->
end)
end.
-maybe_store_acktag(undefined, _MsgId, AM) -> AM;
-maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+drop_one(AckTag, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
+ State.
+
+drop(PrevLen, AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ case PrevLen - Len of
+ 0 -> State;
+ Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
+ State
+ end.
ensure_monitoring(ChPid, State = #state { coordinator = CPid,
known_senders = KS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1ba1420f42..9354f48514 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -28,7 +28,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -318,7 +318,6 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{gm, _Msg} -> 5;
- {post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
@@ -329,6 +328,8 @@ prioritise_info(Msg, _State) ->
_ -> 0
end.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
@@ -701,7 +702,7 @@ process_instruction({publish, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
@@ -725,8 +726,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
- BQ:fetch(AckRequired, BQSN),
+ {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
@@ -735,21 +735,6 @@ process_instruction({drop, Length, Dropped, AckRequired},
true -> State1;
false -> update_delta(ToDrop - Dropped, State1)
end};
-process_instruction({fetch, AckRequired, MsgId, Remaining},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- _ when QLen =< Remaining andalso AckRequired ->
- State;
- _ when QLen =< Remaining ->
- update_delta(-1, State)
- end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 4efde50ec5..edaa719899 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -352,13 +352,12 @@ set_table_value(Table, Key, Type, Value) ->
sort_field_table(
lists:keystore(Key, 1, Table, {Key, Type, Value})).
-r(#resource{virtual_host = VHostPath}, Kind, Name)
- when is_binary(Name) ->
+r(#resource{virtual_host = VHostPath}, Kind, Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
-r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) ->
+r(VHostPath, Kind, Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name}.
-r(VHostPath, Kind) when is_binary(VHostPath) ->
+r(VHostPath, Kind) ->
#resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6576ba5210..6a442fecf2 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -68,7 +68,8 @@
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
+ {'running_nodes', [node()]} |
+ {'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8d0e4456df..258ac0ce4a 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -53,7 +53,7 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
+-spec(partitions/0 :: () -> {node(), [node()]}).
-endif.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 928786e983..13e8feff08 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,12 +35,16 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
+-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at, host, peer_host,
- port, peer_port}).
+ channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
+
+-record(connection, {name, host, peer_host, port, peer_port,
+ protocol, user, timeout_sec, frame_max, vhost,
+ client_properties, capabilities,
+ auth_mechanism, auth_state}).
+
+-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -205,15 +209,21 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
{PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
State = #v1{parent = Parent,
sock = ClientSock,
- name = list_to_binary(Name),
connection = #connection{
+ name = list_to_binary(Name),
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
protocol = none,
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
- capabilities = []},
+ capabilities = [],
+ auth_mechanism = none,
+ auth_state = none},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -224,15 +234,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
- auth_mechanism = none,
- auth_state = none,
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never,
- host = Host,
- peer_host = PeerHost,
- port = Port,
- peer_port = PeerPort},
+ throttle = #throttle{
+ conserve_resources = false,
+ last_blocked_by = none,
+ last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -288,8 +293,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_resources, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb,
+ State = #v1{throttle = Throttle}) ->
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -372,29 +379,31 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_resources = Mem}) ->
- case {CS, Mem orelse credit_flow:blocked()} of
+control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ case {CS, (Throttle#throttle.conserve_resources orelse
+ credit_flow:blocked())} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
State#v1{connection_state = running};
- {blocked, true} -> update_last_blocked_by(State);
+ {blocked, true} -> State#v1{throttle = update_last_blocked_by(
+ Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking}) ->
+maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- update_last_blocked_by(State#v1{connection_state = blocked,
- last_blocked_at = erlang:now()});
+ State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{last_blocked_at = erlang:now()})};
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_resources = true}) ->
- State#v1{last_blocked_by = resource};
-update_last_blocked_by(State = #v1{conserve_resources = false}) ->
- State#v1{last_blocked_by = flow}.
+update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
+ Throttle#throttle{last_blocked_by = resource};
+update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
+ Throttle#throttle{last_blocked_by = flow}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -531,9 +540,10 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
create_channel(Channel, State) ->
- #v1{sock = Sock, name = Name, queue_collector = Collector,
+ #v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
+ connection = #connection{name = Name,
+ protocol = Protocol,
frame_max = FrameMax,
user = User,
vhost = VHost,
@@ -594,40 +604,36 @@ handle_frame(Type, Channel, Payload, State) ->
unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- {ChPid, AState} = case get({channel, Channel}) of
+ ChKey = {channel, Channel},
+ {ChPid, AState} = case get(ChKey) of
undefined -> create_channel(Channel, State);
Other -> Other
end,
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end.
-
-process_channel_frame(Frame, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- control_throttle(State);
-post_process_frame({method, MethodName, _}, _ChPid,
- State = #v1{connection = #connection{
- protocol = Protocol}}) ->
- case Protocol:method_has_content(MethodName) of
- true -> erlang:bump_reductions(2000),
- maybe_block(control_throttle(State));
- false -> control_throttle(State)
- end;
+ State;
+post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
+ maybe_block(State);
+post_process_frame({content_body, _}, _ChPid, State) ->
+ maybe_block(State);
post_process_frame(_Frame, _ChPid, State) ->
- control_throttle(State).
+ State.
%%--------------------------------------------------------------------------
@@ -746,13 +752,13 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
- State = State0#v1{auth_mechanism = AuthMechanism,
- auth_state = AuthMechanism:init(Sock),
- connection_state = securing,
+ State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
- capabilities = Capabilities}},
+ capabilities = Capabilities,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -790,10 +796,11 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
- connection = Connection = #connection{
- user = User,
- protocol = Protocol},
- sock = Sock}) ->
+ connection = Connection = #connection{
+ user = User,
+ protocol = Protocol},
+ sock = Sock,
+ throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -801,7 +808,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
- conserve_resources = Conserve}),
+ throttle = Throttle#throttle{
+ conserve_resources = Conserve}}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -870,10 +878,10 @@ auth_mechanisms_binary(Sock) ->
string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
- State = #v1{auth_mechanism = AuthMechanism,
- auth_state = AuthState,
- connection = Connection =
- #connection{protocol = Protocol},
+ State = #v1{connection = Connection =
+ #connection{protocol = Protocol,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
@@ -886,14 +894,16 @@ auth_phase(Response,
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
ok = send_on_channel0(Sock, Secure, Protocol),
- State#v1{auth_state = AuthState1};
+ State#v1{connection = Connection#connection{
+ auth_state = AuthState1}};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = server_frame_max(),
heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}}
+ connection = Connection#connection{user = User,
+ auth_state = none}}
end.
%%--------------------------------------------------------------------------
@@ -901,11 +911,6 @@ auth_phase(Response,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) -> self();
-i(name, #v1{name = Name}) -> Name;
-i(host, #v1{host = Host}) -> Host;
-i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
-i(port, #v1{port = Port}) -> Port;
-i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
@@ -922,36 +927,32 @@ i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
-i(auth_mechanism, #v1{auth_mechanism = none}) ->
+i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
+
+ic(name, #connection{name = Name}) -> Name;
+ic(host, #connection{host = Host}) -> Host;
+ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
+ic(port, #connection{port = Port}) -> Port;
+ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
+ic(protocol, #connection{protocol = none}) -> none;
+ic(protocol, #connection{protocol = P}) -> P:version();
+ic(user, #connection{user = none}) -> '';
+ic(user, #connection{user = U}) -> U#user.username;
+ic(vhost, #connection{vhost = VHost}) -> VHost;
+ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
+ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(client_properties, #connection{client_properties = CP}) -> CP;
+ic(auth_mechanism, #connection{auth_mechanism = none}) ->
none;
-i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) ->
proplists:get_value(name, Mechanism:description());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
-i(user, #v1{connection = #connection{user = none}}) ->
- '';
-i(user, #v1{connection = #connection{user = #user{
- username = Username}}}) ->
- Username;
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
- VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
- Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
- FrameMax;
-i(client_properties, #v1{connection = #connection{client_properties =
- ClientProperties}}) ->
- ClientProperties;
-i(Item, #v1{}) ->
- throw({bad_argument, Item}).
+ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a68caadbdc..09ed3d0890 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1297,8 +1297,7 @@ test_statistics() ->
QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
- {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
- QPid = Q#amqqueue.pid,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
@@ -1322,9 +1321,9 @@ test_statistics() ->
length(proplists:get_value(
channel_queue_exchange_stats, E)) > 0
end),
- [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
[{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
- [{{QPid,X},[{publish,1}]}] =
+ [{{QRes,X},[{publish,1}]}] =
proplists:get_value(channel_queue_exchange_stats, Event2),
%% Check the stats remove stuff on queue deletion
@@ -1349,31 +1348,31 @@ test_refresh_events(SecondaryNode) ->
[channel_created, queue_created]),
{_Writer, Ch} = test_spawn(),
- expect_events(Ch, channel_created),
+ expect_events(pid, Ch, channel_created),
rabbit_channel:shutdown(Ch),
{_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(Ch2, channel_created),
+ expect_events(pid, Ch2, channel_created),
rabbit_channel:shutdown(Ch2),
- {new, #amqqueue { pid = QPid } = Q} =
+ {new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(test_queue(), false, false, [], none),
- expect_events(QPid, queue_created),
+ expect_events(name, QName, queue_created),
rabbit_amqqueue:delete(Q, false, false),
rabbit_tests_event_receiver:stop(),
passed.
-expect_events(Pid, Type) ->
- expect_event(Pid, Type),
+expect_events(Tag, Key, Type) ->
+ expect_event(Tag, Key, Type),
rabbit:force_event_refresh(),
- expect_event(Pid, Type).
+ expect_event(Tag, Key, Type).
-expect_event(Pid, Type) ->
+expect_event(Tag, Key, Type) ->
receive #event{type = Type, props = Props} ->
- case pget(pid, Props) of
- Pid -> ok;
- _ -> expect_event(Pid, Type)
+ case pget(Tag, Props) of
+ Key -> ok;
+ _ -> expect_event(Tag, Key, Type)
end
after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
@@ -2228,6 +2227,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
+ variable_queue_publish(IsPersistent, Count, PropFun,
+ fun (_N) -> <<>> end, VQ).
+
+variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2236,16 +2239,18 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>),
- PropFun(N, #message_properties{}), self(), VQN)
+ end},
+ PayloadFun(N)),
+ PropFun(N, #message_properties{}), false, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ IsDelivered, AckTagN}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
+ Rem = rabbit_variable_queue:len(VQM),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -2311,13 +2316,40 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_drop/1,
fun test_variable_queue_fold_msg_on_disk/1,
- fun test_dropwhile/1,
+ fun test_dropfetchwhile/1,
fun test_dropwhile_varying_ram_duration/1,
+ fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
- fun test_variable_queue_requeue/1]],
+ fun test_variable_queue_requeue/1,
+ fun test_variable_queue_fold/1]],
passed.
+test_variable_queue_fold(VQ0) ->
+ Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(
+ true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ lists:foldl(
+ fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end,
+ VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
+
+test_variable_queue_fold(Cut, Count, VQ0) ->
+ {Acc, VQ1} = rabbit_variable_queue:fold(
+ fun (M, _, A) ->
+ case msg2int(M) =< Cut of
+ true -> {cont, [M | A]};
+ false -> {stop, A}
+ end
+ end, [], VQ0),
+ true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] ==
+ [msg2int(M) || M <- Acc],
+ VQ1.
+
+msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
+ binary_to_term(list_to_binary(lists:reverse(P))).
+
test_variable_queue_requeue(VQ0) ->
Interval = 50,
Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
@@ -2337,7 +2369,7 @@ test_variable_queue_requeue(VQ0) ->
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag, _}, VQb} =
+ {{#basic_message{}, true, AckTag}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
VQb
end, VQ5, lists:reverse(Acks)),
@@ -2373,41 +2405,86 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6.
-test_dropwhile(VQ0) ->
+test_drop(VQ0) ->
+ %% start by sending a messages
+ VQ1 = variable_queue_publish(false, 1, VQ0),
+ %% drop message with AckRequired = true
+ {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ true = rabbit_variable_queue:is_empty(VQ2),
+ true = AckTag =/= undefinded,
+ %% drop again -> empty
+ {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
+ %% requeue
+ {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
+ %% drop message with AckRequired = false
+ {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ true = rabbit_variable_queue:is_empty(VQ5),
+ VQ5.
+
+test_dropfetchwhile(VQ0) ->
Count = 10,
%% add messages with sequential expiry
VQ1 = variable_queue_publish(
false, Count,
- fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
+ fun (N, Props) -> Props#message_properties{expiry = N} end,
+ fun erlang:term_to_binary/1, VQ0),
+
+ %% fetch the first 5 messages
+ {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
+ rabbit_variable_queue:fetchwhile(
+ fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
+ fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
+ {[Msg | MsgAcc], [AckTag | AckAcc]}
+ end, {[], []}, VQ1),
+ true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
+
+ %% requeue them
+ {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2),
%% drop the first 5 messages
- {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
-
- %% fetch five now
- VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _, _}, VQM} =
+ {#message_properties{expiry = 6}, VQ4} =
+ rabbit_variable_queue:dropwhile(
+ fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3),
+
+ %% fetch 5
+ VQ5 = lists:foldl(fun (N, VQN) ->
+ {{Msg, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
+ true = msg2int(Msg) == N,
VQM
- end, VQ2, lists:seq(6, Count)),
+ end, VQ4, lists:seq(6, Count)),
%% should be empty now
- {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
+ true = rabbit_variable_queue:is_empty(VQ5),
- VQ4.
+ VQ5.
test_dropwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, VQ2} = rabbit_variable_queue:dropwhile(
+ fun (_) -> false end, VQ1),
+ VQ2
+ end, VQ0).
+
+test_fetchwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
+ fun (_) -> false end,
+ fun (_, _, A) -> A end,
+ ok, VQ1),
+ VQ2
+ end, VQ0).
+
+test_dropfetchwhile_varying_ram_duration(Fun, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ VQ3 = Fun(VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {_, undefined, VQ6} =
- rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ VQ6 = Fun(VQ5),
VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
@@ -2442,7 +2519,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ Len = rabbit_variable_queue:len(VQ2),
{_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
@@ -2507,8 +2585,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
- rabbit_variable_queue:fetch(true, VQ7),
+ {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
+ Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
@@ -2530,7 +2608,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- VQ3 = rabbit_variable_queue:fold(fun (_M, _A) -> ok end, VQ2, AckTags),
+ {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
+ ok, VQ2, AckTags),
VQ3.
test_queue_recover() ->
@@ -2554,10 +2633,11 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, true, _AckTag1}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
+ CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName, QPid1)
+ rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 3a5b96de4f..601656da1c 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -31,9 +31,9 @@
-type(state() :: rabbit_types:exchange() | 'none').
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
--spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
--spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
+-spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
@@ -43,26 +43,26 @@
%%----------------------------------------------------------------------------
init(VHost) ->
- case tracing(VHost) of
+ case enabled(VHost) of
false -> none;
true -> {ok, X} = rabbit_exchange:lookup(
rabbit_misc:r(VHost, exchange, ?XNAME)),
X
end.
-tracing(VHost) ->
+enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}},
- TraceX) ->
- maybe_trace(TraceX, Msg, <<"publish">>, XName, []).
+tap_in(_Msg, none) -> ok;
+tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) ->
+ trace(TraceX, Msg, <<"publish">>, XName, []).
-tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
- TraceX) ->
+tap_out(_Msg, none) -> ok;
+tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- maybe_trace(TraceX, Msg, <<"deliver">>, QName,
- [{<<"redelivered">>, signedint, RedeliveredNum}]).
+ trace(TraceX, Msg, <<"deliver">>, QName,
+ [{<<"redelivered">>, signedint, RedeliveredNum}]).
%%----------------------------------------------------------------------------
@@ -83,14 +83,11 @@ update_config(Fun) ->
%%----------------------------------------------------------------------------
-maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) ->
+trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
+ _RKPrefix, _RKSuffix, _Extra) ->
ok;
-maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
- _RKPrefix, _RKSuffix, _Extra) ->
- ok;
-maybe_trace(X, Msg = #basic_message{content = #content{
- payload_fragments_rev = PFR}},
- RKPrefix, RKSuffix, Extra) ->
+trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}},
+ RKPrefix, RKSuffix, Extra) ->
{ok, _, _} = rabbit_basic:publish(
X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
#'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6dc65bab31..37ca6de075 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,11 +17,12 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- depth/1, set_ram_duration_target/2, ram_duration/1,
+ publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4,
+ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
+ is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -255,7 +256,6 @@
q4,
next_seq_id,
pending_ack,
- pending_ack_index,
ram_ack_index,
index_state,
msg_store_clients,
@@ -349,7 +349,7 @@
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
pending_ack :: gb_tree(),
- ram_ack_index :: gb_tree(),
+ ram_ack_index :: gb_set(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -521,16 +521,16 @@ purge(State = #vqstate { q4 = Q4,
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -557,8 +557,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = true },
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
@@ -579,27 +578,28 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+dropwhile(Pred, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, a(State1)};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = remove(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> {MsgProps, a(in_r(MsgStatus, State1))}
+ end
+ end.
-dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
- (Next, S) -> {Next, undefined, S}
- end,
+fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
{empty, State1} ->
- End(undefined, a(State1));
+ {undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), AckRequired} of
- {true, true} ->
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _, AckTag, _}, State3} =
- internal_fetch(true, MsgStatus1, State2),
- dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
- {true, false} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, AckRequired, State2, undefined);
- {false, _} ->
- End(MsgProps, a(in_r(MsgStatus, State1)))
+ case Pred(MsgProps) of
+ true -> {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(true, MsgStatus, State2),
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
+ false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
@@ -610,9 +610,18 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
- {Res, a(State3)}
+ {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ end.
+
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
ack([], State) ->
@@ -638,16 +647,6 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-fold(undefined, State, _AckTags) ->
- State;
-fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- a(lists:foldl(fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), false, State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)).
-
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
@@ -669,6 +668,36 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(
+ fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) ->
+ MsgStatus = gb_trees:get(SeqId, PA),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
+fold(Fun, Acc, #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd },
+ q3 = Q3,
+ q4 = Q4 } = State) ->
+ QFun = fun(MsgStatus, {Acc0, State0}) ->
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {StopGo, AccNext} =
+ Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
+ {StopGo, {AccNext, State1}}
+ end,
+ {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
+ {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3),
+ {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
+ DeltaSeqId, DeltaSeqIdEnd, State2),
+ {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2),
+ {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1),
+ {Acc5, State5}.
+
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -722,7 +751,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_trees:size(RamAckIndex),
+ RamAckCount = gb_sets:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -801,7 +830,7 @@ status(#vqstate {
{pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
+ {ram_ack_count , gb_sets:size(RAI)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -864,11 +893,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
%% when requeueing, we re-add a msg_id to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps = #message_properties { delivered = Delivered }) ->
- %% TODO would it make sense to remove #msg_status.is_delivered?
+msg_status(IsPersistent, IsDelivered, SeqId,
+ Msg = #basic_message { id = MsgId }, MsgProps) ->
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = Delivered,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
@@ -1006,7 +1034,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty(),
+ ram_ack_index = gb_sets:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1044,9 +1072,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, true, State),
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1062,35 +1091,33 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
-
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
+read_msg(#msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent },
CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, _CountDiskToRam, State) ->
- {MsgStatus, State}.
-
-internal_fetch(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount }) ->
+ RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam),
+ {Msg, State #vqstate { ram_msg_count = RamMsgCount1,
+ msg_store_clients = MSCState1 }};
+read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) ->
+ {Msg, State}.
+
+remove(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1101,12 +1128,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
- end,
+ IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
+ end,
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
@@ -1117,16 +1143,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
end,
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag, Len1},
- State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1 }}.
+ {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1 }}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1223,18 +1247,15 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
- msg_on_disk = MsgOnDisk } = MsgStatus,
+record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
ack_in_counter = AckInCount}) ->
- {AckEntry, RAI1} =
- case MsgOnDisk of
- true -> {m(trim_msg_status(MsgStatus)), RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
- end,
- State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
+ RAI1 = case Msg of
+ undefined -> RAI;
+ _ -> gb_sets:insert(SeqId, RAI)
+ end,
+ State #vqstate { pending_ack = gb_trees:insert(SeqId, MsgStatus, PA),
ram_ack_index = RAI1,
ack_in_counter = AckInCount + 1}.
@@ -1242,7 +1263,7 @@ remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
{gb_trees:get(SeqId, PA),
State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+ ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}.
purge_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
@@ -1253,7 +1274,7 @@ purge_pending_ack(KeepPersistent,
accumulate_ack(MsgStatus, Acc)
end, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty() },
+ ram_ack_index = gb_sets:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1346,11 +1367,12 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue
+%% Internal plumbing for requeue and fold
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
+ {Msg, State1} = read_msg(MsgStatus, true, State),
+ {MsgStatus#msg_status { msg = Msg }, State1};
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
@@ -1415,6 +1437,41 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A;
+qfoldl( Fun, {cont, Acc} = A, Q) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> A;
+ {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1)
+ end.
+
+lfoldl(_Fun, {stop, _Acc} = A, _L) -> A;
+lfoldl(_Fun, {cont, _Acc} = A, []) -> A;
+lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T).
+
+delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
+ {stop, {Acc, State}};
+delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
+ {cont, {Acc, State}};
+delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
+ #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState } = State) ->
+ DeltaSeqId1 = lists:min(
+ [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {StopCont, {Acc1, MSCState1}} =
+ lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered},
+ {Acc0, MSCState0}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent, MsgId),
+ {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {StopCont, {AccNext, MSCState1}}
+ end, {cont, {Acc, MSCState}}, List),
+ delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
+ State #vqstate { index_state = IndexState1,
+ msg_store_clients = MSCState1 }).
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1453,7 +1510,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
+ case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
@@ -1481,13 +1538,12 @@ limit_ram_acks(0, State) ->
{0, State};
limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
ram_ack_index = RAI }) ->
- case gb_trees:is_empty(RAI) of
+ case gb_sets:is_empty(RAI) of
true ->
{Quota, State};
false ->
- {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
- gb_trees:get(SeqId, PA),
+ {SeqId, RAI1} = gb_sets:take_largest(RAI),
+ MsgStatus = gb_trees:get(SeqId, PA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 297fa56fe3..0bb18f4c7e 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -123,7 +123,7 @@ with(VHostPath, Thunk) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
-i(tracing, VHost) -> rabbit_trace:tracing(VHost);
+i(tracing, VHost) -> rabbit_trace:enabled(VHost);
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).