summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_variable_queue.erl14
2 files changed, 8 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9abe9069d9..cab77c1bd6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -423,7 +423,7 @@ record_confirm_message(#delivery{msg_seq_no = MsgSeqNo,
ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {BQS1, AckdGuids} = BQ:ack(AckTags, BQS),
+ {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b07ee279df..8c498a7917 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -608,11 +608,11 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
end.
ack(AckTags, State) ->
- {State1, Guids} =
+ {Guids, State1} =
ack(fun rabbit_msg_store:remove/2,
fun (_AckEntry, State1) -> State1 end,
AckTags, State),
- {a(State1), Guids}.
+ {Guids, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
State = #vqstate { durable = IsDurable,
@@ -661,7 +661,7 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
end)}.
requeue(AckTags, State) ->
- {State1, _Guids} =
+ {_Guids, State1} =
ack(fun rabbit_msg_store:release/2,
fun (#msg_status { msg = Msg }, State1) ->
{_SeqId, State2} = publish(Msg, true, false, false, State1),
@@ -984,7 +984,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
Pubs = lists:append(lists:reverse(SPubs)),
- {NewState, _Guids} = ack(Acks, State),
+ {_Guids, NewState} = ack(Acks, State),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
@@ -1157,7 +1157,7 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {State, []};
+ {[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
@@ -1177,8 +1177,8 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
State2 = remove_confirms(gb_sets:from_list(AckdGuids), State1),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {State2 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }, AckdGuids}.
+ {AckdGuids, State2 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,