diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-01 18:48:35 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-01 18:48:35 +0100 |
| commit | 99f80aea7d1afe8c0137dbbd3228f370cf82a91c (patch) | |
| tree | e3e4acf3b36a297dce5f508de4b654e2f5cbccdc /src | |
| parent | f58589539ce169a0e23ee05dfd43fec170250e13 (diff) | |
| download | rabbitmq-server-git-99f80aea7d1afe8c0137dbbd3228f370cf82a91c.tar.gz | |
When converting to disk mode, use tx_publish and tx_commit instead of publish. This massively reduces the number of sync calls to disk_queue, potentially to one, if every message in the queue is non persistent (or the queue is non durable).
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 3c60d25fbc..23696f27fc 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -125,30 +125,37 @@ to_disk_only_mode(TxnMessages, State = %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. Msgs = queue:to_list(MsgBuf), - Requeue = + {Requeue, TxPublish} = lists:foldl( fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk}, - RQueueAcc) -> + {RQueueAcc, TxPublishAcc}) -> case OnDisk of true -> + ok = rabbit_disk_queue:tx_commit(Q, TxPublishAcc, []), {MsgId, IsDelivered, AckTag, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), - [ {AckTag, {next, IsDelivered}} | RQueueAcc ]; + {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []}; false -> ok = if [] == RQueueAcc -> ok; true -> rabbit_disk_queue:requeue_with_seqs( Q, lists:reverse(RQueueAcc)) end, - ok = rabbit_disk_queue:publish( - Q, Msg, false), - [] + ok = rabbit_disk_queue:tx_publish(Msg), + {[], [ MsgId | TxPublishAcc ]} end; - ({MsgId, IsDelivered}, RQueueAcc) -> + ({MsgId, IsDelivered}, {RQueueAcc, TxPublishAcc}) -> + ok = if [] == TxPublishAcc -> ok; + true -> rabbit_disk_queue:tx_commit(Q, TxPublishAcc, + []) + end, {MsgId, IsDelivered, AckTag, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), - [ {AckTag, {next, IsDelivered}} | RQueueAcc ] - end, [], Msgs), + {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []} + end, {[], []}, Msgs), + ok = if [] == TxPublish -> ok; + true -> rabbit_disk_queue:tx_commit(Q, TxPublish, []) + end, ok = if [] == Requeue -> ok; true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) |
