diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-07 10:52:55 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-07 10:52:55 +0000 |
| commit | fbe14837b9dd29b9d17f542a815bac00fcdf5fe1 (patch) | |
| tree | b0e6a1c5ef9133bbe017d2cd098c53ca00866c38 /src | |
| parent | d6376fbdbed5b36dc435cddc99a7beaa18dbe4f2 (diff) | |
| download | rabbitmq-server-git-fbe14837b9dd29b9d17f542a815bac00fcdf5fe1.tar.gz | |
record pending acks in a queue rather than set in queue process
Just as we do in the channel; this is more efficient for the typical
ack-in-order access pattern.
We depend on tags being passed to the queue process in order when
ack'ing/rejecting. This requires some slightly fiddly code in the
channel.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 51 |
2 files changed, 53 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d7cd9fb1c7..4341c3d6d8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -352,7 +352,7 @@ ch_record(ChPid) -> undefined -> MonitorRef = erlang:monitor(process, ChPid), C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, - acktags = sets:new(), + acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), is_limit_active = false, @@ -366,9 +366,9 @@ ch_record(ChPid) -> update_ch_record(C = #cr{consumer_count = ConsumerCount, acktags = ChAckTags, unsent_message_count = UnsentMessageCount}) -> - case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of - {0, 0, 0} -> ok = erase_ch_record(C); - _ -> ok = store_ch_record(C) + case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of + {true, 0, 0} -> ok = erase_ch_record(C); + _ -> ok = store_ch_record(C) end, C. @@ -451,7 +451,7 @@ deliver_msg_to_consumer(DeliverFun, rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of - true -> sets:add_element(AckTag, ChAckTags); + true -> queue:in(AckTag, ChAckTags); false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, @@ -638,7 +638,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; - false -> {ok, requeue_and_run(sets:to_list(ChAckTags), + false -> {ok, requeue_and_run(queue:to_list(ChAckTags), ensure_expiry_timer(State1))} end end. @@ -677,11 +677,21 @@ subtract_acks(ChPid, AckTags, State, Fun) -> not_found -> State; C = #cr{acktags = ChAckTags} -> - update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2, - ChAckTags, AckTags)}), + update_ch_record( + C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), Fun(State) end. +subtract_acks([], [], AckQ) -> + AckQ; +subtract_acks([], Prefix, AckQ) -> + queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); +subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> + case queue:out(AckQ) of + {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); + {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + end. + message_properties(Message, Confirm, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(Message, TTL), needs_confirming = Confirm == eventually}. @@ -886,7 +896,7 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]); + lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); @@ -1042,7 +1052,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - ChAckTags1 = sets:add_element(AckTag, ChAckTags), + ChAckTags1 = queue:in(AckTag, ChAckTags), update_ch_record(C#cr{acktags = ChAckTags1}), State2; false -> State2 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2686d76d9c..831058db31 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -40,7 +40,14 @@ queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). --record(tx, {msgs, acks, nacks}). + +-record(tx, {msgs, acks}). %% (1) +%% (1) acks looks s.t. like this: +%% [{true,[[6,7,8],[5]]},{ack,[[4],[1,2,3]]}, ...] +%% +%% Each element is a pair consisting of a tag and a list of lists of +%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true' +%% (reject w requeue), 'false' (reject w/o requeue). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -647,7 +654,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, case Tx of none -> ack(Acked, State1), State1; - #tx{acks = Acks} -> State1#ch{tx = Tx#tx{acks = Acked ++ Acks}} + #tx{acks = Acks} -> Acks1 = ack_cons(ack, Acked, Acks), + State1#ch{tx = Tx#tx{acks = Acks1}} end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -1032,24 +1040,23 @@ handle_method(#'tx.select'{}, _, State) -> handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); -handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs, - acks = Acks, - nacks = Nacks}, +handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs, + acks = Acks}, limiter = Limiter}) -> State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), - ack(Acks, State1), lists:foreach( - fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, Nacks), + fun ({ack, A}) -> ack(append_reverse(A), State1); + ({Requeue, A}) -> reject(Requeue, append_reverse(A), Limiter) + end, lists:reverse(Acks)), {noreply, maybe_complete_tx(State1#ch{tx = committing})}; handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> precondition_failed("channel is not transactional"); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - tx = #tx{acks = Acks, - nacks = Nacks}}) -> - NacksL = lists:append([L || {_, L} <- Nacks]), - UAMQ1 = queue:from_list(lists:usort(Acks ++ NacksL ++ queue:to_list(UAMQ))), + tx = #tx{acks = Acks}}) -> + AcksL = append_reverse([append_reverse(L) || {_, L} <- Acks]), + UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, tx = new_tx()}}; @@ -1206,10 +1213,10 @@ reject(DeliveryTag, Requeue, Multiple, State1 = State#ch{unacked_message_q = Remaining}, {noreply, case Tx of - none -> reject(Requeue, Acked, State1#ch.limiter), - State1; - #tx{nacks = Nacks} -> Nacks1 = [{Requeue, Acked} | Nacks], - State1#ch{tx = Tx#tx{nacks = Nacks1}} + none -> reject(Requeue, Acked, State1#ch.limiter), + State1; + #tx{acks = Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks), + State1#ch{tx = Tx#tx{acks = Acks1}} end}. reject(Requeue, Acked, Limiter) -> @@ -1252,7 +1259,10 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> - {[UnackedMsg | ToAcc], + {case ToAcc of + [] -> [UnackedMsg]; + _ -> lists:reverse([UnackedMsg | ToAcc]) + end, case PrefixAcc of [] -> QTail; _ -> queue:join( @@ -1282,7 +1292,7 @@ ack(Acked, State = #ch{queue_names = QNames}) -> end, Acked), ok = notify_limiter(State#ch.limiter, Acked). -new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}. +new_tx() -> #tx{msgs = queue:new(), acks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1299,7 +1309,7 @@ foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case foreach_per_queue(F, UAL) -> T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons(QPid, MsgId, T) - end, gb_trees:empty(), UAL), + end, gb_trees:empty(), lists:reverse(UAL)), rabbit_misc:gb_trees_foreach(F, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, @@ -1440,6 +1450,11 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. +append_reverse(L) -> lists:append(lists:reverse(L)). + +ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, [Acked | Acks]} | L]; +ack_cons(Tag, Acked, Acks) -> [{Tag, [Acked]} | Acks]. + maybe_complete_tx(State = #ch{tx = #tx{}}) -> State; maybe_complete_tx(State = #ch{unconfirmed = UC}) -> |
