diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 |
1 files changed, 28 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 720d390abf..a0bb3b0bd6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -69,7 +69,7 @@ ch_pid, limiter_pid, monitor_ref, - unacked_messages, + acktags, is_limit_active, txn, unsent_message_count}). @@ -230,7 +230,7 @@ ch_record(ChPid) -> C = #cr{consumer_count = 0, ch_pid = ChPid, monitor_ref = MonitorRef, - unacked_messages = [], + acktags = sets:new(), is_limit_active = false, txn = none, unsent_message_count = 0}, @@ -271,7 +271,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, ActiveConsumersTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), + acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of @@ -281,12 +281,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - NewUAM = case AckRequired of - true -> [AckTag|UAM]; - false -> UAM - end, + ChAckTags1 = case AckRequired of + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags + end, NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, + acktags = ChAckTags1}, store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of @@ -414,7 +414,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> not_found -> {ok, State}; #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, - unacked_messages = UAM} -> + acktags = ChAckTags} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), State1 = State#q{ @@ -433,7 +433,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> _ -> rollback_transaction(Txn, ChPid, State1) end, - {ok, requeue_and_run(UAM, State2)} + {ok, requeue_and_run(ChAckTags, State2)} end end. @@ -472,25 +472,21 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, {AckTags, BQS1} = BQ:tx_commit(Txn, From, BQS), %% ChPid must be known here because of the participant management %% by the channel. - C = #cr{unacked_messages = UAM} = lookup_ch(ChPid), - Remaining = ordsets:to_list(ordsets:subtract(ordsets:from_list(UAM), - ordsets:from_list(AckTags))), - store_ch_record(C#cr{unacked_messages = Remaining, txn = none}), + C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), State#q{backing_queue_state = BQS1}. rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS}) -> {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), %% Iff we removed acktags from the channel record on ack+txn then %% we would add them back in here (would also require ChPid) record_current_channel_tx(ChPid, none), State#q{backing_queue_state = BQS1}. -collect_messages(AckTags, UAM) -> - AckTagsSet = ordsets:from_list(AckTags), - UAMSet = ordsets:from_list(UAM), - {ordsets:to_list(ordsets:intersection(AckTagsSet, UAMSet)), - ordsets:to_list(ordsets:subtract(UAMSet, AckTagsSet))}. +subtract_acks(A, B) when is_list(B) -> + lists:foldl(fun sets:del_element/2, A, B). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -515,8 +511,7 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([ordsets:size(UAM) || - #cr{unacked_messages = UAM} <- all_ch_record()]); + lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); @@ -600,8 +595,9 @@ handle_call({basic_get, ChPid, NoAck}, _From, {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> case AckRequired of - true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid), - store_ch_record(C#cr{unacked_messages = [AckTag|UAM]}); + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( + C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); false -> ok end, reply({ok, Remaining, {QName, self(), AckTag, IsDelivered, Message}}, @@ -753,13 +749,12 @@ handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue_state = BQS, case lookup_ch(ChPid) of not_found -> noreply(State); - C = #cr{unacked_messages = UAM} -> - {AckTags1, Remaining} = collect_messages(AckTags, UAM), + C = #cr{acktags = ChAckTags} -> {C1, BQS1} = case Txn of - none -> {C#cr{unacked_messages = Remaining}, - BQ:ack(AckTags1, BQS)}; - _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags1, BQS)} + none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), + {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; + _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, store_ch_record(C1), noreply(State #q { backing_queue_state = BQS1 }) @@ -774,10 +769,10 @@ handle_cast({requeue, AckTags, ChPid}, State) -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), noreply(State); - C = #cr{unacked_messages = UAM} -> - {AckTags1, Remaining} = collect_messages(AckTags, UAM), - store_ch_record(C#cr{unacked_messages = Remaining}), - noreply(requeue_and_run(AckTags1, State)) + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(requeue_and_run(AckTags, State)) end; handle_cast({unblock, ChPid}, State) -> |
