summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-03 14:47:11 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-03 14:47:11 +0000
commit7d59f2418c58b9b92f4f077c59f475eb34e4745e (patch)
tree12d44111f085023d97d7ef07a7ed18bf2848d791
parentec140ce9c61eb52c013976d42f492f4797065fd2 (diff)
parentbca38ca706223d3469be965240ab3facaa6dc641 (diff)
downloadrabbitmq-server-git-7d59f2418c58b9b92f4f077c59f475eb34e4745e.tar.gz
Merge bug25369
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/credit_flow.erl75
-rw-r--r--src/rabbit_amqqueue_process.erl36
-rw-r--r--src/rabbit_backing_queue.erl26
-rw-r--r--src/rabbit_channel.erl171
-rw-r--r--src/rabbit_mirror_queue_master.erl15
-rw-r--r--src/rabbit_reader.erl203
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl131
9 files changed, 324 insertions, 344 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0ccb80bf02..7385b4b3c0 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -27,9 +27,6 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties, capabilities}).
-
-record(content,
{class_id,
properties, %% either 'none', or a decoded record/tuple
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index ba99811f70..102c353f9b 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -52,6 +52,22 @@
%%----------------------------------------------------------------------------
+%% process dict update macro - eliminates the performance-hurting
+%% closure creation a HOF would introduce
+-define(UPDATE(Key, Default, Var, Expr),
+ begin
+ %% We deliberately allow Var to escape from the case here
+ %% to be used in Expr. Any temporary var we introduced
+ %% would also escape, and might conflict.
+ case get(Key) of
+ undefined -> Var = Default;
+ Var -> ok
+ end,
+ put(Key, Expr)
+ end).
+
+%%----------------------------------------------------------------------------
+
%% There are two "flows" here; of messages and of credit, going in
%% opposite directions. The variable names "From" and "To" refer to
%% the flow of credit, but the function names refer to the flow of
@@ -66,29 +82,33 @@
send(From) -> send(From, ?DEFAULT_CREDIT).
send(From, {InitialCredit, _MoreCreditAfter}) ->
- update({credit_from, From}, InitialCredit,
- fun (1) -> block(From),
- 0;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_from, From}, InitialCredit, C,
+ if C == 1 -> block(From),
+ 0;
+ true -> C - 1
+ end).
ack(To) -> ack(To, ?DEFAULT_CREDIT).
ack(To, {_InitialCredit, MoreCreditAfter}) ->
- update({credit_to, To}, MoreCreditAfter,
- fun (1) -> grant(To, MoreCreditAfter),
- MoreCreditAfter;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_to, To}, MoreCreditAfter, C,
+ if C == 1 -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ true -> C - 1
+ end).
handle_bump_msg({From, MoreCredit}) ->
- update({credit_from, From}, 0,
- fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
- C + MoreCredit;
- (C) -> C + MoreCredit
- end).
-
-blocked() -> get(credit_blocked, []) =/= [].
+ ?UPDATE({credit_from, From}, 0, C,
+ if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ true -> C + MoreCredit
+ end).
+
+blocked() -> case get(credit_blocked) of
+ undefined -> false;
+ [] -> false;
+ _ -> true
+ end.
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
@@ -105,24 +125,17 @@ grant(To, Quantity) ->
Msg = {bump_credit, {self(), Quantity}},
case blocked() of
false -> To ! Msg;
- true -> update(credit_deferred, [],
- fun (Deferred) -> [{To, Msg} | Deferred] end)
+ true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
+block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
- update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
case blocked() of
- false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
- erase(credit_deferred);
+ false -> case erase(credit_deferred) of
+ undefined -> ok;
+ Credits -> [To ! Msg || {To, Msg} <- Credits]
+ end;
true -> ok
end.
-
-get(Key, Default) ->
- case get(Key) of
- undefined -> Default;
- Value -> Value
- end.
-
-update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 03bcdf43f9..f9614517fe 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -262,7 +262,7 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
-init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
@@ -479,7 +479,7 @@ deliver_msg_to_consumer(DeliverFun,
deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State1),
+ drop_expired_msgs(State1),
{Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
@@ -526,7 +526,7 @@ discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State),
+ drop_expired_msgs(State),
{_IsEmpty1, State2} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
BQ:is_empty(BQS), State1),
@@ -711,16 +711,16 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_messages(State = #q{dlx = DLX,
- backing_queue_state = BQS,
- backing_queue = BQ }) ->
+drop_expired_msgs(State = #q{dlx = DLX,
+ backing_queue_state = BQS,
+ backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, BQS1} = case DLX of
undefined -> BQ:dropwhile(ExpirePred, BQS);
_ -> {Next, Msgs, BQS2} =
BQ:fetchwhile(ExpirePred,
- fun accumulate_msgs/4,
+ fun accumulate_msgs/3,
[], BQS),
case Msgs of
[] -> ok;
@@ -734,7 +734,7 @@ drop_expired_messages(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
-accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc].
ensure_ttl_timer(undefined, State) ->
State;
@@ -791,12 +791,9 @@ stop(State) -> stop(undefined, noreply, State).
stop(From, Reply, State = #q{unconfirmed = UC}) ->
case {dtree:is_empty(UC), Reply} of
- {true, noreply} ->
- {stop, normal, State};
- {true, _} ->
- {stop, normal, Reply, State};
- {false, _} ->
- noreply(State#q{delayed_stop = {From, Reply}})
+ {true, noreply} -> {stop, normal, State};
+ {true, _} -> {stop, normal, Reply, State};
+ {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
end.
cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
@@ -1053,7 +1050,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_messages(State1)) of
+ case fetch(AckRequired, drop_expired_msgs(State1)) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1126,7 +1123,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(ensure_expiry_timer(State)),
+ drop_expired_msgs(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
@@ -1205,8 +1202,9 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
+ {ok, BQS1} = BQ:ackfold(
+ fun (M, A, ok) -> DLXFun([{M, A}]) end,
+ ok, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1315,7 +1313,7 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+ noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
handle_info(emit_stats, State) ->
emit_stats(State),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 272df5c1b7..99b5946e59 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,8 +35,7 @@
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -133,14 +132,11 @@
-> {rabbit_types:message_properties() | undefined, state()}.
%% Like dropwhile, except messages are fetched in "require
-%% acknowledgement" mode and are passed, together with their Delivered
-%% flag and ack tag, to the supplied function. The function is also
-%% fed an accumulator. The result of fetchwhile is as for dropwhile
-%% plus the accumulator.
--callback fetchwhile(msg_pred(),
- fun ((rabbit_types:basic_message(), boolean(), ack(), A)
- -> A),
- A, state())
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
-> {rabbit_types:message_properties() | undefined,
A, state()}.
@@ -156,14 +152,14 @@
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
-%% Acktags supplied are for messages which should be processed. The
-%% provided callback function is called with each message.
--callback foreach_ack(msg_fun(), state(), [ack()]) -> state().
-
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
%% Fold over all the messages in a queue and return the accumulated
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
@@ -233,7 +229,7 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
- {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
+ {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c19f9c3aa1..1af60de8b4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,14 +33,15 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- conn_name, limiter, tx_status, next_tag, unacked_message_q,
- uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ conn_name, limiter, tx, next_tag, unacked_message_q, user,
virtual_host, most_recently_declared_queue,
queue_names, queue_monitors, consumer_mapping,
blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
+-record(tx, {msgs, acks, nacks}).
+
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(STATISTICS_KEYS,
@@ -192,12 +193,9 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
conn_pid = ConnPid,
conn_name = ConnName,
limiter = Limiter,
- tx_status = none,
+ tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
- uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -320,9 +318,12 @@ handle_cast({deliver, ConsumerTag, AckRequired,
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
+
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
- noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ {noreply, ensure_stats_timer(State1), Timeout}.
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@@ -333,8 +334,10 @@ handle_info(timeout, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- noreply([ensure_stats_timer],
- rabbit_event:reset_stats_timer(State, #ch.stats_timer));
+ State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
+ %% NB: don't call noreply/1 since we don't want to kick off the
+ %% stats timer.
+ {noreply, send_confirms(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -378,30 +381,11 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> reply(Reply, [], NewState).
-
-reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate).
-
-reply(Reply, Mask, NewState, Timeout) ->
- {reply, Reply, next_state(Mask, NewState), Timeout}.
-
-noreply(NewState) -> noreply([], NewState).
+reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
-noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
+noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-noreply(Mask, NewState, Timeout) ->
- {noreply, next_state(Mask, NewState), Timeout}.
-
--define(MASKED_CALL(Fun, Mask, State),
- case lists:member(Fun, Mask) of
- true -> State;
- false -> Fun(State)
- end).
-
-next_state(Mask, State) ->
- State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State),
- State2 = ?MASKED_CALL(send_confirms, Mask, State1),
- State2.
+next_state(State) -> ensure_stats_timer(send_confirms(State)).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
@@ -600,8 +584,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
%% while waiting for the reply to a synchronous command, we generally
%% do allow this...except in the case of a pending tx.commit, where
%% it could wreak havoc.
-handle_method(_Method, _, #ch{tx_status = TxStatus})
- when TxStatus =/= none andalso TxStatus =/= in_progress ->
+handle_method(_Method, _, #ch{tx = Tx})
+ when Tx =:= committing orelse Tx =:= failed ->
rabbit_misc:protocol_error(
channel_error, "unexpected command while processing 'tx.commit'", []);
@@ -615,7 +599,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
- tx_status = TxStatus,
+ tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -629,7 +613,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Props, State),
check_expiration_header(Props),
{MsgSeqNo, State1} =
- case {TxStatus, ConfirmEnabled} of
+ case {Tx, ConfirmEnabled} of
{none, false} -> {undefined, State};
{_, _} -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
@@ -639,12 +623,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
rabbit_trace:tap_in(Message, TraceState),
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
+ DQ = {Delivery, QNames},
{noreply,
- case TxStatus of
- none -> deliver_to_queues({Delivery, QNames}, State1);
- in_progress -> TMQ = State1#ch.uncommitted_message_q,
- NewTMQ = queue:in({Delivery, QNames}, TMQ),
- State1#ch{uncommitted_message_q = NewTMQ}
+ case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ #tx{msgs = Msgs} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = Tx#tx{msgs = Msgs1}}
end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
@@ -658,15 +642,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
- case TxStatus of
- none -> ack(Acked, State1),
- State1;
- in_progress -> State1#ch{uncommitted_acks =
- Acked ++ State1#ch.uncommitted_acks}
+ case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ #tx{acks = Acks} -> State1#ch{tx = Tx#tx{acks = Acked ++ Acks}}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -1044,34 +1027,37 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
precondition_failed("cannot switch from confirm to tx mode");
+handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
+
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
+ {reply, #'tx.select_ok'{}, State};
-handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _,
- State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL,
- limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
- ack(TAL, State1),
+handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs,
+ acks = Acks,
+ nacks = Nacks},
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ ack(Acks, State1),
lists:foreach(
- fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
- {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, Nacks),
+ {noreply, maybe_complete_tx(State1#ch{tx = committing})};
-handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL}) ->
- TNL1 = lists:append([L || {_, L} <- TNL]),
- UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
-
-handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
+ tx = #tx{acks = Acks,
+ nacks = Nacks}}) ->
+ NacksL = lists:append([L || {_, L} <- Nacks]),
+ UAMQ1 = queue:from_list(lists:usort(Acks ++ NacksL ++ queue:to_list(UAMQ))),
+ {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
+ tx = new_tx()}};
+
+handle_method(#'confirm.select'{}, _, #ch{tx = #tx{}}) ->
precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
@@ -1219,17 +1205,15 @@ basic_return(#basic_message{exchange_name = ExchangeName,
Content).
reject(DeliveryTag, Requeue, Multiple,
- State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
- case TxStatus of
- none ->
- reject(Requeue, Acked, State1#ch.limiter),
- State1;
- in_progress ->
- State1#ch{uncommitted_nacks =
- [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ #tx{nacks = Nacks} -> Nacks1 = [{Requeue, Acked} | Nacks],
+ State1#ch{tx = Tx#tx{nacks = Nacks1}}
end}.
reject(Requeue, Acked, Limiter) ->
@@ -1297,9 +1281,7 @@ ack(Acked, State = #ch{queue_names = QNames}) ->
ok = notify_limiter(State#ch.limiter, Acked),
?INCR_STATS(Incs, ack, State).
-new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = []}.
+new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1400,18 +1382,18 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
send_nacks([], State) ->
State;
-send_nacks(MXs, State = #ch{tx_status = none}) ->
+send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx_status = failed}).
+ maybe_complete_tx(State#ch{tx = failed}).
-send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
+send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
-send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
+send_confirms(State = #ch{tx = none, confirmed = C}) ->
MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
@@ -1451,7 +1433,7 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+maybe_complete_tx(State = #ch{tx = #tx{}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
case dtree:is_empty(UC) of
@@ -1459,16 +1441,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
true -> complete_tx(State#ch{confirmed = []})
end.
-complete_tx(State = #ch{tx_status = committing}) ->
+complete_tx(State = #ch{tx = committing}) ->
ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
- State#ch{tx_status = in_progress};
-complete_tx(State = #ch{tx_status = failed}) ->
+ State#ch{tx = new_tx()};
+complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
State),
- State1#ch{tx_status = in_progress}.
+ State1#ch{tx = new_tx()}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1477,19 +1459,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx_status = TE}) -> TE =/= none;
+i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
-i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
- dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
- queue:len(UAMQ);
-i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
- queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
- length(TAL);
+i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
+i(messages_uncommitted, #ch{tx = #tx{msgs = Msgs}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{}) -> 0;
+i(acks_uncommitted, #ch{tx = #tx{acks = Acks}}) -> length(Acks);
+i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e3d967bc53..e857f39526 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,11 +18,11 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/5, publish_delivered/4,
- discard/3, fetch/2, drop/2, ack/2,
- requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -281,10 +281,6 @@ ack(AckTags, State = #state { gm = GM,
end,
{MsgIds, State #state { backing_queue_state = BQS1 }}.
-foreach_ack(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }.
-
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -292,6 +288,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+ackfold(MsgFun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
+ {Acc1, State #state { backing_queue_state = BQS1 }}.
+
fold(Fun, Acc, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fold(Fun, Acc, BQS),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 928786e983..13e8feff08 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,12 +35,16 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
+-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at, host, peer_host,
- port, peer_port}).
+ channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
+
+-record(connection, {name, host, peer_host, port, peer_port,
+ protocol, user, timeout_sec, frame_max, vhost,
+ client_properties, capabilities,
+ auth_mechanism, auth_state}).
+
+-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -205,15 +209,21 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
{PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
State = #v1{parent = Parent,
sock = ClientSock,
- name = list_to_binary(Name),
connection = #connection{
+ name = list_to_binary(Name),
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
protocol = none,
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
- capabilities = []},
+ capabilities = [],
+ auth_mechanism = none,
+ auth_state = none},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -224,15 +234,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
- auth_mechanism = none,
- auth_state = none,
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never,
- host = Host,
- peer_host = PeerHost,
- port = Port,
- peer_port = PeerPort},
+ throttle = #throttle{
+ conserve_resources = false,
+ last_blocked_by = none,
+ last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -288,8 +293,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_resources, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb,
+ State = #v1{throttle = Throttle}) ->
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -372,29 +379,31 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_resources = Mem}) ->
- case {CS, Mem orelse credit_flow:blocked()} of
+control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ case {CS, (Throttle#throttle.conserve_resources orelse
+ credit_flow:blocked())} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
State#v1{connection_state = running};
- {blocked, true} -> update_last_blocked_by(State);
+ {blocked, true} -> State#v1{throttle = update_last_blocked_by(
+ Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking}) ->
+maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- update_last_blocked_by(State#v1{connection_state = blocked,
- last_blocked_at = erlang:now()});
+ State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{last_blocked_at = erlang:now()})};
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_resources = true}) ->
- State#v1{last_blocked_by = resource};
-update_last_blocked_by(State = #v1{conserve_resources = false}) ->
- State#v1{last_blocked_by = flow}.
+update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
+ Throttle#throttle{last_blocked_by = resource};
+update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
+ Throttle#throttle{last_blocked_by = flow}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -531,9 +540,10 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
create_channel(Channel, State) ->
- #v1{sock = Sock, name = Name, queue_collector = Collector,
+ #v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
+ connection = #connection{name = Name,
+ protocol = Protocol,
frame_max = FrameMax,
user = User,
vhost = VHost,
@@ -594,40 +604,36 @@ handle_frame(Type, Channel, Payload, State) ->
unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- {ChPid, AState} = case get({channel, Channel}) of
+ ChKey = {channel, Channel},
+ {ChPid, AState} = case get(ChKey) of
undefined -> create_channel(Channel, State);
Other -> Other
end,
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end.
-
-process_channel_frame(Frame, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- control_throttle(State);
-post_process_frame({method, MethodName, _}, _ChPid,
- State = #v1{connection = #connection{
- protocol = Protocol}}) ->
- case Protocol:method_has_content(MethodName) of
- true -> erlang:bump_reductions(2000),
- maybe_block(control_throttle(State));
- false -> control_throttle(State)
- end;
+ State;
+post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
+ maybe_block(State);
+post_process_frame({content_body, _}, _ChPid, State) ->
+ maybe_block(State);
post_process_frame(_Frame, _ChPid, State) ->
- control_throttle(State).
+ State.
%%--------------------------------------------------------------------------
@@ -746,13 +752,13 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
- State = State0#v1{auth_mechanism = AuthMechanism,
- auth_state = AuthMechanism:init(Sock),
- connection_state = securing,
+ State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
- capabilities = Capabilities}},
+ capabilities = Capabilities,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -790,10 +796,11 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
- connection = Connection = #connection{
- user = User,
- protocol = Protocol},
- sock = Sock}) ->
+ connection = Connection = #connection{
+ user = User,
+ protocol = Protocol},
+ sock = Sock,
+ throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -801,7 +808,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
- conserve_resources = Conserve}),
+ throttle = Throttle#throttle{
+ conserve_resources = Conserve}}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -870,10 +878,10 @@ auth_mechanisms_binary(Sock) ->
string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
- State = #v1{auth_mechanism = AuthMechanism,
- auth_state = AuthState,
- connection = Connection =
- #connection{protocol = Protocol},
+ State = #v1{connection = Connection =
+ #connection{protocol = Protocol,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
@@ -886,14 +894,16 @@ auth_phase(Response,
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
ok = send_on_channel0(Sock, Secure, Protocol),
- State#v1{auth_state = AuthState1};
+ State#v1{connection = Connection#connection{
+ auth_state = AuthState1}};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = server_frame_max(),
heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}}
+ connection = Connection#connection{user = User,
+ auth_state = none}}
end.
%%--------------------------------------------------------------------------
@@ -901,11 +911,6 @@ auth_phase(Response,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) -> self();
-i(name, #v1{name = Name}) -> Name;
-i(host, #v1{host = Host}) -> Host;
-i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
-i(port, #v1{port = Port}) -> Port;
-i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
@@ -922,36 +927,32 @@ i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
-i(auth_mechanism, #v1{auth_mechanism = none}) ->
+i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
+
+ic(name, #connection{name = Name}) -> Name;
+ic(host, #connection{host = Host}) -> Host;
+ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
+ic(port, #connection{port = Port}) -> Port;
+ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
+ic(protocol, #connection{protocol = none}) -> none;
+ic(protocol, #connection{protocol = P}) -> P:version();
+ic(user, #connection{user = none}) -> '';
+ic(user, #connection{user = U}) -> U#user.username;
+ic(vhost, #connection{vhost = VHost}) -> VHost;
+ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
+ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(client_properties, #connection{client_properties = CP}) -> CP;
+ic(auth_mechanism, #connection{auth_mechanism = none}) ->
none;
-i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) ->
proplists:get_value(name, Mechanism:description());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
-i(user, #v1{connection = #connection{user = none}}) ->
- '';
-i(user, #v1{connection = #connection{user = #user{
- username = Username}}}) ->
- Username;
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
- VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
- Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
- FrameMax;
-i(client_properties, #v1{connection = #connection{client_properties =
- ClientProperties}}) ->
- ClientProperties;
-i(Item, #v1{}) ->
- throw({bad_argument, Item}).
+ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b499c59b30..09ed3d0890 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2434,7 +2434,7 @@ test_dropfetchwhile(VQ0) ->
{#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
rabbit_variable_queue:fetchwhile(
fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
- fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) ->
+ fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
{[Msg | MsgAcc], [AckTag | AckAcc]}
end, {[], []}, VQ1),
true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
@@ -2473,7 +2473,7 @@ test_fetchwhile_varying_ram_duration(VQ0) ->
fun (VQ1) ->
{_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
fun (_) -> false end,
- fun (_, _, _, A) -> A end,
+ fun (_, _, A) -> A end,
ok, VQ1),
VQ2
end, VQ0).
@@ -2608,8 +2608,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end,
- VQ2, AckTags),
+ {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
+ ok, VQ2, AckTags),
VQ3.
test_queue_recover() ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3e4c7c864f..37ca6de075 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,10 +19,10 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4,
- fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
+ is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -584,7 +584,7 @@ dropwhile(Pred, State) ->
{undefined, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
- true -> {_, State2} = internal_fetch(false, MsgStatus, State1),
+ true -> {_, State2} = remove(false, MsgStatus, State1),
dropwhile(Pred, State2);
false -> {MsgProps, a(in_r(MsgStatus, State1))}
end
@@ -596,11 +596,9 @@ fetchwhile(Pred, Fun, Acc, State) ->
{undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
- true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, IsDelivered, AckTag}, State3} =
- internal_fetch(true, MsgStatus1, State2),
- Acc1 = Fun(Msg, IsDelivered, AckTag, Acc),
- fetchwhile(Pred, Fun, Acc1, State3);
+ true -> {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(true, MsgStatus, State2),
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
@@ -612,9 +610,9 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
- {Res, a(State3)}
+ {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
end.
drop(AckRequired, State) ->
@@ -622,8 +620,7 @@ drop(AckRequired, State) ->
{empty, State1} ->
{empty, a(State1)};
{{value, MsgStatus}, State1} ->
- {{_Msg, _IsDelivered, AckTag}, State2} =
- internal_fetch(AckRequired, MsgStatus, State1),
+ {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
{{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
@@ -650,16 +647,6 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-foreach_ack(undefined, State, _AckTags) ->
- State;
-foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- a(lists:foldl(fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), false, State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)).
-
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
@@ -681,6 +668,16 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(
+ fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) ->
+ MsgStatus = gb_trees:get(SeqId, PA),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
fold(Fun, Acc, #vqstate { q1 = Q1,
q2 = Q2,
delta = #delta { start_seq_id = DeltaSeqId,
@@ -688,9 +685,9 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
q3 = Q3,
q4 = Q4 } = State) ->
QFun = fun(MsgStatus, {Acc0, State0}) ->
- {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } =
- read_msg(MsgStatus, false, State0),
- {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {StopGo, AccNext} =
+ Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
{StopGo, {AccNext, State1}}
end,
{Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
@@ -1075,9 +1072,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, true, State),
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1093,35 +1091,33 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
-
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
+read_msg(#msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent },
CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, _CountDiskToRam, State) ->
- {MsgStatus, State}.
-
-internal_fetch(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount }) ->
+ RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam),
+ {Msg, State #vqstate { ram_msg_count = RamMsgCount1,
+ msg_store_clients = MSCState1 }};
+read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) ->
+ {Msg, State}.
+
+remove(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1132,12 +1128,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
- end,
+ IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
+ end,
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
@@ -1148,15 +1143,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
end,
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag},
- State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1 }}.
+ {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1 }}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1377,7 +1371,8 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
+ {Msg, State1} = read_msg(MsgStatus, true, State),
+ {MsgStatus#msg_status { msg = Msg }, State1};
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.