summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-30 16:24:50 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-30 16:24:50 +0100
commitd66a200813e0483d4a171f3815a16436371fac92 (patch)
treeab92e8830728713ee9dee1b6bb78e89ef29cb571 /src
parent675e13c1ade6c1a13d6215ab52f5c9095e09547f (diff)
downloadrabbitmq-server-git-d66a200813e0483d4a171f3815a16436371fac92.tar.gz
changed disk -> mixed mode so that messages stay on disk and don't get read. This means the conversion is much faster than it was which is a good thing, at the cost of slower initial delivery.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl39
-rw-r--r--src/rabbit_mixed_queue.erl69
-rw-r--r--src/rabbit_tests.erl13
3 files changed, 71 insertions, 50 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index cf8ddba095..990e5917d7 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -259,9 +259,7 @@
(queue_name(),
[{{msg_id(), seq_id()}, {seq_id_or_next(), bool()}}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
--spec(dump_queue/1 :: (queue_name()) ->
- [{msg_id(), binary(), non_neg_integer(), bool(),
- {msg_id(), seq_id()}, seq_id()}]).
+-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), bool()}]).
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
@@ -1163,20 +1161,27 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {[], State};
[{Q, ReadSeq, WriteSeq, _Length}] ->
- {QList, {WriteSeq, State3}} =
- rabbit_misc:unfold(
- fun ({SeqId, _State1}) when SeqId == WriteSeq ->
- false;
- ({SeqId, State1}) ->
- {ok, {Message, Size, Delivered, {MsgId, SeqId}},
- NextReadSeqId, State2} =
- internal_read_message(Q, SeqId, true, true,
- State1),
- {true,
- {Message, Size, Delivered, {MsgId, SeqId}, SeqId},
- {NextReadSeqId, State2}}
- end, {ReadSeq, State}),
- {lists:reverse(QList), State3}
+ Objs =
+ mnesia:dirty_match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, '_'},
+ msg_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ }),
+ {Msgs, WriteSeq} =
+ lists:foldl(
+ fun (#dq_msg_loc { queue_and_seq_id = {_, Seq},
+ msg_id = MsgId,
+ is_delivered = Delivered,
+ next_seq_id = NextSeq },
+ {Acc, Seq}) ->
+ {[{MsgId, Delivered} | Acc], NextSeq};
+ (#dq_msg_loc { queue_and_seq_id = {_, Seq} },
+ {[], RSeq}) when Seq < RSeq ->
+ {[], RSeq}
+ end, {[], ReadSeq}, lists:keysort(2, Objs)),
+ {lists:reverse(Msgs), State}
end.
internal_delete_non_durable_queues(
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index d171cf186f..3c60d25fbc 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -129,11 +129,12 @@ to_disk_only_mode(TxnMessages, State =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk},
RQueueAcc) ->
- if OnDisk ->
+ case OnDisk of
+ true ->
{MsgId, IsDelivered, AckTag, _PersistRemaining} =
rabbit_disk_queue:phantom_deliver(Q),
[ {AckTag, {next, IsDelivered}} | RQueueAcc ];
- true ->
+ false ->
ok = if [] == RQueueAcc -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(
@@ -142,7 +143,11 @@ to_disk_only_mode(TxnMessages, State =
ok = rabbit_disk_queue:publish(
Q, Msg, false),
[]
- end
+ end;
+ ({MsgId, IsDelivered}, RQueueAcc) ->
+ {MsgId, IsDelivered, AckTag, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ [ {AckTag, {next, IsDelivered}} | RQueueAcc ]
end, [], Msgs),
ok = if [] == Requeue -> ok;
true ->
@@ -170,12 +175,8 @@ to_mixed_mode(TxnMessages, State =
%% load up a new queue with everything that's on disk.
%% don't remove non-persistent messages that happen to be on disk
QList = rabbit_disk_queue:dump_queue(Q),
- {MsgBuf1, Length} =
- lists:foldl(
- fun ({Msg, _Size, IsDelivered, _AckTag, _SeqId},
- {Buf, L}) ->
- {queue:in({Msg, IsDelivered, true}, Buf), L+1}
- end, {queue:new(), 0}, QList),
+ Length = erlang:length(QList),
+ MsgBuf = queue:from_list(QList),
%% remove txn messages from disk which are neither persistent and
%% durable. This is necessary to avoid leaks. This is also pretty
%% much the inverse behaviour of our own tx_cancel/2 which is why
@@ -189,7 +190,7 @@ to_mixed_mode(TxnMessages, State =
end
end, [], TxnMessages),
ok = rabbit_disk_queue:tx_cancel(Cancel),
- {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}.
+ {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable,
@@ -290,21 +291,41 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
State #mqstate { length = Length - 1 }};
deliver(State = #mqstate { mode = mixed, msg_buf = MsgBuf, queue = Q,
is_durable = IsDurable, length = Length }) ->
- {{value, {Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- IsDelivered, OnDisk}}, MsgBuf1}
+ {{value, Value}, MsgBuf1}
= queue:out(MsgBuf),
- AckTag =
- if OnDisk ->
- if IsPersistent andalso IsDurable ->
- {MsgId, IsDelivered, AckTag1, _PersistRem} =
- rabbit_disk_queue:phantom_deliver(Q),
- AckTag1;
- true ->
- ok = rabbit_disk_queue:auto_ack_next_message(Q),
- noack
- end;
- true -> noack
+ {Msg, IsDelivered, AckTag} =
+ case Value of
+ {Msg1 = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ IsDelivered1, OnDisk} ->
+ AckTag1 =
+ case OnDisk of
+ true ->
+ case IsPersistent andalso IsDurable of
+ true ->
+ {MsgId, IsDelivered1, AckTag2, _PersistRem}
+ = rabbit_disk_queue:phantom_deliver(Q),
+ AckTag2;
+ false ->
+ ok = rabbit_disk_queue:auto_ack_next_message
+ (Q),
+ noack
+ end;
+ false -> noack
+ end,
+ {Msg1, IsDelivered1, AckTag1};
+ {MsgId, IsDelivered1} ->
+ {Msg1 = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ _Size, IsDelivered1, AckTag1, _PersistRem}
+ = rabbit_disk_queue:deliver(Q),
+ AckTag2 =
+ case IsPersistent andalso IsDurable of
+ true -> AckTag1;
+ false -> rabbit_disk_queue:ack(Q, [AckTag1]),
+ noack
+ end,
+ {Msg1, IsDelivered1, AckTag2}
end,
Rem = Length - 1,
{{Msg, IsDelivered, AckTag, Rem},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f5447fe39e..b80bdab201 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -948,14 +948,12 @@ rdq_test_dump_queue(Total) ->
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg)) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
- QList = [begin Message = rdq_message(N, Msg),
- Size = size(term_to_binary(Message)),
- {Message, Size, false, {N, (N-1)}, (N-1)}
- end || N <- All],
+ QList = [{N, false} || N <- All],
{Micros, QList} = timer:tc(rabbit_disk_queue, dump_queue, [q]),
rdq_stop(),
io:format("dump ok undelivered (~w micros)~n", [Micros]),
- rdq_start(),
+ {Micros1, _} = timer:tc(rabbit_tests, rdq_start, []),
+ io:format("restarted (~w micros)~n", [Micros1]),
lists:foreach(
fun (N) ->
Remaining = Total - N,
@@ -967,10 +965,7 @@ rdq_test_dump_queue(Total) ->
rdq_stop(),
io:format("dump ok post delivery~n", []),
rdq_start(),
- QList2 = [begin Message = rdq_message(N, Msg),
- Size = size(term_to_binary(Message)),
- {Message, Size, true, {N, (N-1)}, (N-1)}
- end || N <- All],
+ QList2 = [{N, true} || N <- All],
{Micros2, QList2} = timer:tc(rabbit_disk_queue, dump_queue, [q]),
io:format("dump ok post delivery + restart (~w micros)~n", [Micros2]),
rdq_stop(),