diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-03 15:23:01 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-03 15:23:01 +0000 |
| commit | ed7b399c105b8f10f2c44eb23fb5f212b7662321 (patch) | |
| tree | 595115140049838a762a36f7442b28ca3fb5c49e /src | |
| parent | 76658beb9c6f7015b75024753f3fd7808c07ffb8 (diff) | |
| download | rabbitmq-server-git-ed7b399c105b8f10f2c44eb23fb5f212b7662321.tar.gz | |
Manage AckTag -> Ctag mapping in rabbit_queue_consumers, leave the channel out of it.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 48 |
4 files changed, 71 insertions, 59 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8a32f46441..b71410fed8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,7 +22,7 @@ -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, deliver_flow/2, requeue/4, ack/4, reject/5]). + stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). @@ -140,10 +140,9 @@ qpids()). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> qpids()). --spec(requeue/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok'). --spec(ack/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok'). --spec(reject/5 :: (pid(), boolean(), [msg_id()], rabbit_types:ctag(), pid()) -> - 'ok'). +-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). +-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). +-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> @@ -551,14 +550,12 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). -requeue(QPid, MsgIds, CTag, ChPid) -> - delegate:call(QPid, {requeue, MsgIds, CTag, ChPid}). +requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). -ack(QPid, MsgIds, CTag, ChPid) -> - delegate:cast(QPid, {ack, MsgIds, CTag, ChPid}). +ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). -reject(QPid, Requeue, MsgIds, CTag, ChPid) -> - delegate:cast(QPid, {reject, Requeue, MsgIds, CTag, ChPid}). +reject(QPid, Requeue, MsgIds, ChPid) -> + delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}). notify_down_all(QPids, ChPid) -> {_, Bads} = delegate:call(QPids, {notify_down, ChPid}), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5be3bac820..c53ff4cf09 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -577,16 +577,16 @@ fetch(AckRequired, State = #q{backing_queue = BQ, State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), {Result, maybe_send_drained(Result =:= empty, State1)}. -ack(AckTags, CTag, ChPid, State) -> - subtract_acks(AckTags, CTag, ChPid, State, +ack(AckTags, ChPid, State) -> + subtract_acks(ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), State1#q{backing_queue_state = BQS1} end). -requeue(AckTags, CTag, ChPid, State) -> - subtract_acks(AckTags, CTag, ChPid, State, +requeue(AckTags, ChPid, State) -> + subtract_acks(ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end). possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> @@ -649,9 +649,8 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(AckTags, CTag, ChPid, State = #q{consumers = Consumers}, Fun) -> - case rabbit_queue_consumers:subtract_acks( - AckTags, CTag, ChPid, Consumers) of +subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) -> + case rabbit_queue_consumers:subtract_acks(ChPid, AckTags, Consumers) of not_found -> State; unchanged -> Fun(State); {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, @@ -1011,9 +1010,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, State1 = State#q{backing_queue_state = BQS1}, reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); -handle_call({requeue, AckTags, CTag, ChPid}, From, State) -> +handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - noreply(requeue(AckTags, CTag, ChPid, State)); + noreply(requeue(AckTags, ChPid, State)); handle_call(sync_mirrors, _From, State = #q{backing_queue = rabbit_mirror_queue_master, @@ -1074,21 +1073,21 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State1 = State#q{senders = Senders1}, noreply(deliver_or_enqueue(Delivery, Delivered, State1)); -handle_cast({ack, AckTags, CTag, ChPid}, State) -> - noreply(ack(AckTags, CTag, ChPid, State)); +handle_cast({ack, AckTags, ChPid}, State) -> + noreply(ack(AckTags, ChPid, State)); -handle_cast({reject, true, AckTags, CTag, ChPid}, State) -> - noreply(requeue(AckTags, CTag, ChPid, State)); +handle_cast({reject, true, AckTags, ChPid}, State) -> + noreply(requeue(AckTags, ChPid, State)); -handle_cast({reject, false, AckTags, CTag, ChPid}, State) -> +handle_cast({reject, false, AckTags, ChPid}, State) -> noreply(with_dlx( State#q.dlx, - fun (X) -> subtract_acks(AckTags, CTag, ChPid, State, + fun (X) -> subtract_acks(ChPid, AckTags, State, fun (State1) -> dead_letter_rejected_msgs( AckTags, X, State1) end) end, - fun () -> ack(AckTags, CTag, ChPid, State) end)); + fun () -> ack(AckTags, ChPid, State) end)); handle_cast(delete_immediately, State) -> stop(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 44545e1935..4d866908ed 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -872,13 +872,11 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, UAMQL = queue:to_list(UAMQ), - foreach_per_consumer( - fun ({QPid, CTag}, MsgIds) -> + foreach_per_queue( + fun (QPid, MsgIds) -> rabbit_misc:with_exit_handler( OkFun, - fun () -> - rabbit_amqqueue:requeue(QPid, MsgIds, CTag, self()) - end) + fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end) end, lists:reverse(UAMQL)), ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous @@ -1315,9 +1313,9 @@ reject(DeliveryTag, Requeue, Multiple, %% NB: Acked is in youngest-first order reject(Requeue, Acked, Limiter) -> - foreach_per_consumer( - fun ({QPid, CTag}, MsgIds) -> - rabbit_amqqueue:reject(QPid, Requeue, MsgIds, CTag, self()) + foreach_per_queue( + fun (QPid, MsgIds) -> + rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self()) end, Acked), ok = notify_limiter(Limiter, Acked). @@ -1375,9 +1373,9 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> %% NB: Acked is in youngest-first order ack(Acked, State = #ch{queue_names = QNames}) -> - foreach_per_consumer( - fun ({QPid, CTag}, MsgIds) -> - ok = rabbit_amqqueue:ack(QPid, MsgIds, CTag, self()), + foreach_per_queue( + fun (QPid, MsgIds) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), ?INCR_STATS(case dict:find(QPid, QNames) of {ok, QName} -> Count = length(MsgIds), [{queue_stats, QName, Count}]; @@ -1408,15 +1406,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers, sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. -foreach_per_consumer(_F, []) -> +foreach_per_queue(_F, []) -> ok; -foreach_per_consumer(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case - F({QPid, CTag}, [MsgId]); +foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case + F(QPid, [MsgId]); %% NB: UAL should be in youngest-first order; the tree values will %% then be in oldest-first order -foreach_per_consumer(F, UAL) -> - T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> - rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) +foreach_per_queue(F, UAL) -> + T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons(QPid, MsgId, T) end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 017a4f9c1a..46a37899e8 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/4, + send_drained/0, deliver/3, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit/6, utilisation/1]). @@ -82,7 +82,7 @@ {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. --spec subtract_acks([ack()], rabbit_types:ctag(), ch(), state()) -> +-spec subtract_acks(ch(), [ack()], state()) -> 'not_found' | 'unchanged' | {'unblocked', state()}. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. @@ -173,7 +173,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> blocked_consumers = BlockedQ} -> AllConsumers = priority_queue:join(Consumers, BlockedQ), ok = erase_ch_record(C), - {queue:to_list(ChAckTags), + {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)], tags(priority_queue:to_list(AllConsumers)), State#state{consumers = remove_consumers(ChPid, Consumers)}} end. @@ -230,7 +230,7 @@ deliver_to_consumer(FetchFun, rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of - true -> queue:in(AckTag, ChAckTags); + true -> queue:in({AckTag, CTag}, ChAckTags); false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, @@ -239,17 +239,22 @@ deliver_to_consumer(FetchFun, record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), - update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), + update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}), ok. -subtract_acks(AckTags, CTag, ChPid, State) -> +subtract_acks(ChPid, AckTags, State) -> case lookup_ch(ChPid) of not_found -> not_found; C = #cr{acktags = ChAckTags, limiter = Lim} -> + {CTagCounts, AckTags2} = subtract_acks(AckTags, [], [], ChAckTags), {Unblocked, Lim2} = - rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), - AckTags2 = subtract_acks(AckTags, [], ChAckTags), + lists:foldl( + fun ({CTag, Count}, {UnblockedN, LimN}) -> + {Unblocked1, LimN1} = + rabbit_limiter:ack_from_queue(LimN, CTag, Count), + {UnblockedN orelse Unblocked1, LimN1} + end, {false, Lim}, CTagCounts), C2 = C#cr{acktags = AckTags2, limiter = Lim2}, case Unblocked of true -> unblock(C2, State); @@ -258,16 +263,29 @@ subtract_acks(AckTags, CTag, ChPid, State) -> end end. -subtract_acks([], [], AckQ) -> - AckQ; -subtract_acks([], Prefix, AckQ) -> - queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); -subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> +subtract_acks([], [], CTagCounts, AckQ) -> + {CTagCounts, AckQ}; +subtract_acks([], Prefix, CTagCounts, AckQ) -> + {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)}; +subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> case queue:out(AckQ) of - {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); - {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + {{value, {T, CTag}}, QTail} -> + subtract_acks(TL, Prefix, + incr_ctag_count(CTag, CTagCounts), QTail); + {{value, {AT, CTag}}, QTail} -> + subtract_acks(AckTags, [AT | Prefix], + incr_ctag_count(CTag, CTagCounts), QTail) end. +incr_ctag_count(CTag, []) -> [{CTag, 1}]; +incr_ctag_count(CTag, [{CTag, N}]) -> [{CTag, N + 1}]; +incr_ctag_count(CTag, CTagCounts) -> case lists:keyfind(CTag, 1, CTagCounts) of + false -> [{CTag, 1} | CTagCounts]; + {CTag, N} -> [{CTag, N + 1} | + lists:keydelete( + CTag, 1, CTagCounts)] + end. + possibly_unblock(Update, ChPid, State) -> case lookup_ch(ChPid) of not_found -> unchanged; |
