diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 14 |
2 files changed, 8 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9abe9069d9..cab77c1bd6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -423,7 +423,7 @@ record_confirm_message(#delivery{msg_seq_no = MsgSeqNo, ack_by_acktags(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {BQS1, AckdGuids} = BQ:ack(AckTags, BQS), + {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b07ee279df..8c498a7917 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -608,11 +608,11 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, end. ack(AckTags, State) -> - {State1, Guids} = + {Guids, State1} = ack(fun rabbit_msg_store:remove/2, fun (_AckEntry, State1) -> State1 end, AckTags, State), - {a(State1), Guids}. + {Guids, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, State = #vqstate { durable = IsDurable, @@ -661,7 +661,7 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> end)}. requeue(AckTags, State) -> - {State1, _Guids} = + {_Guids, State1} = ack(fun rabbit_msg_store:release/2, fun (#msg_status { msg = Msg }, State1) -> {_SeqId, State2} = publish(Msg, true, false, false, State1), @@ -984,7 +984,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), Pubs = lists:append(lists:reverse(SPubs)), - {NewState, _Guids} = ack(Acks, State), + {_Guids, NewState} = ack(Acks, State), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, @@ -1157,7 +1157,7 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - {State, []}; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = @@ -1177,8 +1177,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> State2 = remove_confirms(gb_sets:from_list(AckdGuids), State1), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - {State2 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }, AckdGuids}. + {AckdGuids, State2 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, |
