summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-14 10:05:52 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-14 10:05:52 +0100
commit416d9968ba498d0b50751b4a0995f383cae34d70 (patch)
tree8b1bb2bd6f2d97d38a2287914d9b9c637b6e7fa2
parentccca12049ade4f11602ec71a30459e315f4b15ab (diff)
downloadrabbitmq-server-git-416d9968ba498d0b50751b4a0995f383cae34d70.tar.gz
refactor: inline {publish,ack}_in_tx
-rw-r--r--src/rabbit_variable_queue.erl33
1 files changed, 14 insertions, 19 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 32c5176485..b18ea2c59f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -517,19 +517,22 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
ack(AckTags, State) ->
ack(fun (_AckEntry, State1) -> State1 end, AckTags, State).
-tx_publish(Txn, Msg = #basic_message { is_persistent = true },
- State = #vqstate { msg_store_clients = MSCState, durable = true }) ->
- MsgStatus = msg_status(true, undefined, Msg),
- {#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
- publish_in_tx(Txn, Msg),
- State #vqstate { msg_store_clients = MSCState1 };
-tx_publish(Txn, Msg, State) ->
- publish_in_tx(Txn, Msg),
- State.
+tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent },
+ State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
+ Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ case IsPersistent andalso IsDurable of
+ true -> MsgStatus = msg_status(true, undefined, Msg),
+ {#msg_status { msg_on_disk = true }, MSCState1} =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ State #vqstate { msg_store_clients = MSCState1 };
+ false -> State
+ end.
tx_ack(Txn, AckTags, State) ->
- ack_in_tx(Txn, AckTags),
+ Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
State.
tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
@@ -738,14 +741,6 @@ store_tx(Txn, Tx) ->
erase_tx(Txn) ->
erase({txn, Txn}).
-publish_in_tx(Txn, Msg) ->
- Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }).
-
-ack_in_tx(Txn, AckTags) ->
- Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }).
-
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% form the avg over the current period and the previous
Avg = 1000000 * ((Count + OCount) / timer:now_diff(Now, OThen)),