diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 28 |
1 files changed, 8 insertions, 20 deletions
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index e5811c343f..86d80e66fa 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -127,25 +127,29 @@ ack(AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> State #iv_state { pending_ack = PA1 }. tx_publish(Txn, Msg, State = #iv_state { qname = QName }) -> - publish_in_tx(Txn, Msg), + Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), ok = persist_message(Txn, QName, Msg), State. tx_ack(Txn, AckTags, State = #iv_state { qname = QName, pending_ack = PA }) -> - ack_in_tx(Txn, AckTags), + Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), ok = persist_acks(Txn, QName, AckTags, PA), State. tx_rollback(Txn, State = #iv_state { qname = QName }) -> #tx { pending_acks = AckTags } = lookup_tx(Txn), - ok = rollback_work(Txn, QName), + ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1, + Txn, QName), erase_tx(Txn), {lists:flatten(AckTags), State}. tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, queue = Q, len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), - ok = commit_work(Txn, QName), + ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, + Txn, QName), erase_tx(Txn), Fun(), AckTags1 = lists:flatten(AckTags), @@ -228,14 +232,6 @@ do_if_persistent(F, Txn, QName) -> true -> F({Txn, QName}) end. -publish_in_tx(Txn, Msg) -> - Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }). - -ack_in_tx(Txn, AckTags) -> - Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }). - %%---------------------------------------------------------------------------- persist_message(_Txn, _QName, #basic_message { is_persistent = false }) -> @@ -273,11 +269,3 @@ persist_work(none, _QName, WorkList) -> persist_work(Txn, QName, WorkList) -> mark_tx_persistent(Txn), rabbit_persister:extend_transaction({Txn, QName}, WorkList). - -commit_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName). - -rollback_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName). |
