diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
3 files changed, 15 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8a9a293bad..eb34aeff47 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -417,8 +417,8 @@ maybe_record_confirm_message(undefined, _, _, State) -> maybe_record_confirm_message(MsgSeqNo, #basic_message { guid = Guid }, ChPid, State) -> - State #q { guid_to_channel = - dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }. + State #q { guid_to_channel = + dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }. run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, @@ -549,8 +549,9 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> case Fun(BQS) of {BQS1, {confirm, Guids}} -> - confirm_messages_internal(Guids, - State #q { backing_queue_state = BQS1 }); + run_message_queue( + confirm_messages_internal(Guids, + State #q { backing_queue_state = BQS1 })); BQS1 -> run_message_queue(State#q{backing_queue_state = BQS1}) end. @@ -868,10 +869,11 @@ handle_cast({ack, Txn, AckTags, ChPid}, none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - {NewBQS, AckdGuids} = BQ:ack(AckTags, BQS), + {NewBQS, {confirm, AckdGuids}} = BQ:ack(AckTags, BQS), NewState = confirm_messages_internal(AckdGuids, - State #q { backing_queue_state = NewBQS }), + State #q { backing_queue_state = + NewBQS }), {NewC, NewState}; _ -> {C#cr{txn = Txn}, @@ -891,9 +893,10 @@ handle_cast({reject, AckTags, Requeue, ChPid}, store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> {BQS1, AckdGuids} = BQ:ack(AckTags, BQS), - confirm_messages_internal(AckdGuids, - State #q { backing_queue_state = BQS1 }) + false -> {BQS1, {confirm, AckdGuids}} = BQ:ack(AckTags, BQS), + confirm_messages_internal( + AckdGuids, + State #q { backing_queue_state = BQS1 }) end) end; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 208f71f030..9e38a9766d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -376,6 +376,7 @@ client_init(Server, Ref) -> client_ref = Ref}. client_terminate(CState, Server) -> + close_all_handles(CState), ok = gen_server2:call(Server, {client_terminate, CState}, infinity). client_delete_and_terminate(CState, Server, Ref) -> @@ -637,7 +638,6 @@ handle_call({client_terminate, CState = #client_msstate { client_ref = CRef }}, _From, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> - ok = close_all_handles(CState), reply(ok, State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), cref_to_guids = dict:erase(CRef, CTG) }). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6521c54496..8b4f55c5e1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1129,7 +1129,7 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - {State, []}; + {State, {confirm, []}}; ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = @@ -1153,7 +1153,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> end, {State2 #vqstate { index_state = IndexState1, persistent_count = PCount1 }, - AckdGuids}. + {confirm, AckdGuids}}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, |
