summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-01 18:48:35 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-01 18:48:35 +0100
commit99f80aea7d1afe8c0137dbbd3228f370cf82a91c (patch)
treee3e4acf3b36a297dce5f508de4b654e2f5cbccdc /src
parentf58589539ce169a0e23ee05dfd43fec170250e13 (diff)
downloadrabbitmq-server-git-99f80aea7d1afe8c0137dbbd3228f370cf82a91c.tar.gz
When converting to disk mode, use tx_publish and tx_commit instead of publish. This massively reduces the number of sync calls to disk_queue, potentially to one, if every message in the queue is non persistent (or the queue is non durable).
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mixed_queue.erl25
1 files changed, 16 insertions, 9 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 3c60d25fbc..23696f27fc 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -125,30 +125,37 @@ to_disk_only_mode(TxnMessages, State =
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
Msgs = queue:to_list(MsgBuf),
- Requeue =
+ {Requeue, TxPublish} =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk},
- RQueueAcc) ->
+ {RQueueAcc, TxPublishAcc}) ->
case OnDisk of
true ->
+ ok = rabbit_disk_queue:tx_commit(Q, TxPublishAcc, []),
{MsgId, IsDelivered, AckTag, _PersistRemaining} =
rabbit_disk_queue:phantom_deliver(Q),
- [ {AckTag, {next, IsDelivered}} | RQueueAcc ];
+ {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []};
false ->
ok = if [] == RQueueAcc -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(
Q, lists:reverse(RQueueAcc))
end,
- ok = rabbit_disk_queue:publish(
- Q, Msg, false),
- []
+ ok = rabbit_disk_queue:tx_publish(Msg),
+ {[], [ MsgId | TxPublishAcc ]}
end;
- ({MsgId, IsDelivered}, RQueueAcc) ->
+ ({MsgId, IsDelivered}, {RQueueAcc, TxPublishAcc}) ->
+ ok = if [] == TxPublishAcc -> ok;
+ true -> rabbit_disk_queue:tx_commit(Q, TxPublishAcc,
+ [])
+ end,
{MsgId, IsDelivered, AckTag, _PersistRemaining} =
rabbit_disk_queue:phantom_deliver(Q),
- [ {AckTag, {next, IsDelivered}} | RQueueAcc ]
- end, [], Msgs),
+ {[ {AckTag, {next, IsDelivered}} | RQueueAcc ], []}
+ end, {[], []}, Msgs),
+ ok = if [] == TxPublish -> ok;
+ true -> rabbit_disk_queue:tx_commit(Q, TxPublish, [])
+ end,
ok = if [] == Requeue -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))