diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 1 |
2 files changed, 25 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0dbd7f1778..f3fce61a03 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -346,12 +346,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - % PubAck after message delivered to consumer (disregard consumer acks) - State2 = confirm_message(Message#basic_message.guid, State1), - ChAckTags1 = case AckRequired of - true -> sets:add_element(AckTag, ChAckTags); - false -> ChAckTags - end, + {State2, ChAckTags1} = + case AckRequired of + true -> {State1, sets:add_element(AckTag, ChAckTags)}; + false -> + {confirm_message_internal(Message#basic_message.guid, + State1), ChAckTags} + end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, store_ch_record(NewC), @@ -401,7 +402,7 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. -confirm_message(Guid, State = #q{guid_to_channel = GTC}) -> +confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) -> case dict:find(Guid, GTC) of {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo), @@ -698,14 +699,16 @@ handle_call({basic_get, ChPid, NoAck}, _From, case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - % PubAck after message got - State2 = confirm_message(Message#basic_message.guid, State1), - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); - false -> ok - end, + State2 = case AckRequired of + true -> + C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( + C#cr{acktags = sets:add_element(AckTag, ChAckTags)}), + State1; + false -> + confirm_message_internal(Message#basic_message.guid, + State1) + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, reply({ok, Remaining, Msg}, State2#q{backing_queue_state = BQS1}) end; @@ -816,6 +819,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). + handle_cast({deliver, Txn, Message, MsgSeqNo, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. State1 = maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State), @@ -838,6 +842,11 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State#q{backing_queue_state = BQS1}) end; +handle_cast({confirm_messages, Guids}, State) -> + noreply(lists:foldl(fun (Guid, State0) -> + confirm_message_internal(Guid, State0) + end, State, Guids)); + handle_cast({reject, AckTags, Requeue, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ca459665d4..e4020b6069 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1137,6 +1137,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold(fun (MsgStore, Guids, ok) -> + gen_server2:cast(self(), {confirm_messages, Guids}), MsgStoreFun(MsgStore, Guids) end, ok, GuidsByStore), PCount1 = PCount - case orddict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of |
