diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-14 10:05:52 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-14 10:05:52 +0100 |
| commit | 416d9968ba498d0b50751b4a0995f383cae34d70 (patch) | |
| tree | 8b1bb2bd6f2d97d38a2287914d9b9c637b6e7fa2 | |
| parent | ccca12049ade4f11602ec71a30459e315f4b15ab (diff) | |
| download | rabbitmq-server-git-416d9968ba498d0b50751b4a0995f383cae34d70.tar.gz | |
refactor: inline {publish,ack}_in_tx
| -rw-r--r-- | src/rabbit_variable_queue.erl | 33 |
1 files changed, 14 insertions, 19 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 32c5176485..b18ea2c59f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -517,19 +517,22 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, ack(AckTags, State) -> ack(fun (_AckEntry, State1) -> State1 end, AckTags, State). -tx_publish(Txn, Msg = #basic_message { is_persistent = true }, - State = #vqstate { msg_store_clients = MSCState, durable = true }) -> - MsgStatus = msg_status(true, undefined, Msg), - {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), - publish_in_tx(Txn, Msg), - State #vqstate { msg_store_clients = MSCState1 }; -tx_publish(Txn, Msg, State) -> - publish_in_tx(Txn, Msg), - State. +tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, + State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> + Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + case IsPersistent andalso IsDurable of + true -> MsgStatus = msg_status(true, undefined, Msg), + {#msg_status { msg_on_disk = true }, MSCState1} = + maybe_write_msg_to_disk(false, MsgStatus, MSCState), + State #vqstate { msg_store_clients = MSCState1 }; + false -> State + end. tx_ack(Txn, AckTags, State) -> - ack_in_tx(Txn, AckTags), + Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), State. tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> @@ -738,14 +741,6 @@ store_tx(Txn, Tx) -> erase_tx(Txn) -> erase({txn, Txn}). -publish_in_tx(Txn, Msg) -> - Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }). - -ack_in_tx(Txn, AckTags) -> - Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }). - update_rate(Now, Then, Count, {OThen, OCount}) -> %% form the avg over the current period and the previous Avg = 1000000 * ((Count + OCount) / timer:now_diff(Now, OThen)), |
