diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 |
1 files changed, 5 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 89f357a42d..e6e3989fc4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -374,13 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - {State2, ChAckTags1} = + ChAckTags1 = case AckRequired of - true -> {State1, - sets:add_element(AckTag, ChAckTags)}; - false -> {State1, ChAckTags} - %%{confirm_message(Message, State1), - %% ChAckTags} + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -397,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State3 = State2#q{ + State2 = State1#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State3); + deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> true = maybe_store_ch_record(C#cr{is_limit_active = true}), @@ -438,9 +435,6 @@ confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> end, State#q{guid_to_channel = dict:erase(Guid, GTC)}. -confirm_message(#basic_message{guid = Guid}, State) -> - confirm_message_by_guid(Guid, State). - record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> State; record_confirm_message(#delivery{sender = ChPid, |
