summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-03 15:23:01 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-03 15:23:01 +0000
commited7b399c105b8f10f2c44eb23fb5f212b7662321 (patch)
tree595115140049838a762a36f7442b28ca3fb5c49e /src
parent76658beb9c6f7015b75024753f3fd7808c07ffb8 (diff)
downloadrabbitmq-server-git-ed7b399c105b8f10f2c44eb23fb5f212b7662321.tar.gz
Manage AckTag -> Ctag mapping in rabbit_queue_consumers, leave the channel out of it.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl19
-rw-r--r--src/rabbit_amqqueue_process.erl31
-rw-r--r--src/rabbit_channel.erl32
-rw-r--r--src/rabbit_queue_consumers.erl48
4 files changed, 71 insertions, 59 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8a32f46441..b71410fed8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,7 +22,7 @@
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, deliver_flow/2, requeue/4, ack/4, reject/5]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
@@ -140,10 +140,9 @@
qpids()).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
qpids()).
--spec(requeue/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok').
--spec(ack/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok').
--spec(reject/5 :: (pid(), boolean(), [msg_id()], rabbit_types:ctag(), pid()) ->
- 'ok').
+-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
+-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
+-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
@@ -551,14 +550,12 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, MsgIds, CTag, ChPid) ->
- delegate:call(QPid, {requeue, MsgIds, CTag, ChPid}).
+requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, MsgIds, CTag, ChPid) ->
- delegate:cast(QPid, {ack, MsgIds, CTag, ChPid}).
+ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
-reject(QPid, Requeue, MsgIds, CTag, ChPid) ->
- delegate:cast(QPid, {reject, Requeue, MsgIds, CTag, ChPid}).
+reject(QPid, Requeue, MsgIds, ChPid) ->
+ delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
notify_down_all(QPids, ChPid) ->
{_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5be3bac820..c53ff4cf09 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -577,16 +577,16 @@ fetch(AckRequired, State = #q{backing_queue = BQ,
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
{Result, maybe_send_drained(Result =:= empty, State1)}.
-ack(AckTags, CTag, ChPid, State) ->
- subtract_acks(AckTags, CTag, ChPid, State,
+ack(AckTags, ChPid, State) ->
+ subtract_acks(ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
State1#q{backing_queue_state = BQS1}
end).
-requeue(AckTags, CTag, ChPid, State) ->
- subtract_acks(AckTags, CTag, ChPid, State,
+requeue(AckTags, ChPid, State) ->
+ subtract_acks(ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
@@ -649,9 +649,8 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:timeout(BQS)}.
-subtract_acks(AckTags, CTag, ChPid, State = #q{consumers = Consumers}, Fun) ->
- case rabbit_queue_consumers:subtract_acks(
- AckTags, CTag, ChPid, Consumers) of
+subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) ->
+ case rabbit_queue_consumers:subtract_acks(ChPid, AckTags, Consumers) of
not_found -> State;
unchanged -> Fun(State);
{unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
@@ -1011,9 +1010,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
State1 = State#q{backing_queue_state = BQS1},
reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
-handle_call({requeue, AckTags, CTag, ChPid}, From, State) ->
+handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- noreply(requeue(AckTags, CTag, ChPid, State));
+ noreply(requeue(AckTags, ChPid, State));
handle_call(sync_mirrors, _From,
State = #q{backing_queue = rabbit_mirror_queue_master,
@@ -1074,21 +1073,21 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State1 = State#q{senders = Senders1},
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
-handle_cast({ack, AckTags, CTag, ChPid}, State) ->
- noreply(ack(AckTags, CTag, ChPid, State));
+handle_cast({ack, AckTags, ChPid}, State) ->
+ noreply(ack(AckTags, ChPid, State));
-handle_cast({reject, true, AckTags, CTag, ChPid}, State) ->
- noreply(requeue(AckTags, CTag, ChPid, State));
+handle_cast({reject, true, AckTags, ChPid}, State) ->
+ noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, false, AckTags, CTag, ChPid}, State) ->
+handle_cast({reject, false, AckTags, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
- fun (X) -> subtract_acks(AckTags, CTag, ChPid, State,
+ fun (X) -> subtract_acks(ChPid, AckTags, State,
fun (State1) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
end) end,
- fun () -> ack(AckTags, CTag, ChPid, State) end));
+ fun () -> ack(AckTags, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 44545e1935..4d866908ed 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -872,13 +872,11 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
- foreach_per_consumer(
- fun ({QPid, CTag}, MsgIds) ->
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
rabbit_misc:with_exit_handler(
OkFun,
- fun () ->
- rabbit_amqqueue:requeue(QPid, MsgIds, CTag, self())
- end)
+ fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
@@ -1315,9 +1313,9 @@ reject(DeliveryTag, Requeue, Multiple,
%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
- foreach_per_consumer(
- fun ({QPid, CTag}, MsgIds) ->
- rabbit_amqqueue:reject(QPid, Requeue, MsgIds, CTag, self())
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self())
end, Acked),
ok = notify_limiter(Limiter, Acked).
@@ -1375,9 +1373,9 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
%% NB: Acked is in youngest-first order
ack(Acked, State = #ch{queue_names = QNames}) ->
- foreach_per_consumer(
- fun ({QPid, CTag}, MsgIds) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, CTag, self()),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
?INCR_STATS(case dict:find(QPid, QNames) of
{ok, QName} -> Count = length(MsgIds),
[{queue_stats, QName, Count}];
@@ -1408,15 +1406,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-foreach_per_consumer(_F, []) ->
+foreach_per_queue(_F, []) ->
ok;
-foreach_per_consumer(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case
- F({QPid, CTag}, [MsgId]);
+foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId]);
%% NB: UAL should be in youngest-first order; the tree values will
%% then be in oldest-first order
-foreach_per_consumer(F, UAL) ->
- T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) ->
- rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T)
+foreach_per_queue(F, UAL) ->
+ T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons(QPid, MsgId, T)
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 017a4f9c1a..46a37899e8 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/4,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
credit/6, utilisation/1]).
@@ -82,7 +82,7 @@
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
--spec subtract_acks([ack()], rabbit_types:ctag(), ch(), state()) ->
+-spec subtract_acks(ch(), [ack()], state()) ->
'not_found' | 'unchanged' | {'unblocked', state()}.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
@@ -173,7 +173,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
blocked_consumers = BlockedQ} ->
AllConsumers = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
- {queue:to_list(ChAckTags),
+ {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
tags(priority_queue:to_list(AllConsumers)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -230,7 +230,7 @@ deliver_to_consumer(FetchFun,
rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> queue:in(AckTag, ChAckTags);
+ true -> queue:in({AckTag, CTag}, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -239,17 +239,22 @@ deliver_to_consumer(FetchFun,
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
- update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
+ update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}),
ok.
-subtract_acks(AckTags, CTag, ChPid, State) ->
+subtract_acks(ChPid, AckTags, State) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
C = #cr{acktags = ChAckTags, limiter = Lim} ->
+ {CTagCounts, AckTags2} = subtract_acks(AckTags, [], [], ChAckTags),
{Unblocked, Lim2} =
- rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
- AckTags2 = subtract_acks(AckTags, [], ChAckTags),
+ lists:foldl(
+ fun ({CTag, Count}, {UnblockedN, LimN}) ->
+ {Unblocked1, LimN1} =
+ rabbit_limiter:ack_from_queue(LimN, CTag, Count),
+ {UnblockedN orelse Unblocked1, LimN1}
+ end, {false, Lim}, CTagCounts),
C2 = C#cr{acktags = AckTags2, limiter = Lim2},
case Unblocked of
true -> unblock(C2, State);
@@ -258,16 +263,29 @@ subtract_acks(AckTags, CTag, ChPid, State) ->
end
end.
-subtract_acks([], [], AckQ) ->
- AckQ;
-subtract_acks([], Prefix, AckQ) ->
- queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
-subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+subtract_acks([], [], CTagCounts, AckQ) ->
+ {CTagCounts, AckQ};
+subtract_acks([], Prefix, CTagCounts, AckQ) ->
+ {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)};
+subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
case queue:out(AckQ) of
- {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
- {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ {{value, {T, CTag}}, QTail} ->
+ subtract_acks(TL, Prefix,
+ incr_ctag_count(CTag, CTagCounts), QTail);
+ {{value, {AT, CTag}}, QTail} ->
+ subtract_acks(AckTags, [AT | Prefix],
+ incr_ctag_count(CTag, CTagCounts), QTail)
end.
+incr_ctag_count(CTag, []) -> [{CTag, 1}];
+incr_ctag_count(CTag, [{CTag, N}]) -> [{CTag, N + 1}];
+incr_ctag_count(CTag, CTagCounts) -> case lists:keyfind(CTag, 1, CTagCounts) of
+ false -> [{CTag, 1} | CTagCounts];
+ {CTag, N} -> [{CTag, N + 1} |
+ lists:keydelete(
+ CTag, 1, CTagCounts)]
+ end.
+
possibly_unblock(Update, ChPid, State) ->
case lookup_ch(ChPid) of
not_found -> unchanged;