summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-08 18:06:51 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-08 18:06:51 +0100
commit9bf1d86e4b4b919f7799c0f1dec99b1043ef9707 (patch)
tree087bb6b48556b6dcace57410fe9b322f8eb11957
parent835ee0039f6e1adb1ed8e9505d97e532b78b21fd (diff)
downloadrabbitmq-server-git-9bf1d86e4b4b919f7799c0f1dec99b1043ef9707.tar.gz
can now switch the mixed queue between modes
-rw-r--r--src/rabbit_mixed_queue.erl49
1 files changed, 48 insertions, 1 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 610a236641..dc180f00c4 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -39,6 +39,8 @@
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
length/1, is_empty/1, delete_queue/1]).
+-export([to_disk_only_mode/1, to_mixed_mode/1]).
+
-record(mqstate, { mode,
msg_buf,
next_write_seq,
@@ -53,11 +55,56 @@ start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed ->
lists:foldl(
fun ({_MsgId, Msg, _Size, Delivered, SeqId}, {Buf, NSeq})
when SeqId >= NSeq ->
- {queue:in({SeqId, binary_to_term(Msg), Delivered}, Buf), SeqId + 1}
+ {queue:in({SeqId, bin_to_msg(Msg), Delivered}, Buf), SeqId + 1}
end, {queue:new(), 0}, QList),
{ok, #mqstate { mode = Mode, msg_buf = MsgBuf, next_write_seq = NextSeq,
queue = Queue, is_durable = IsDurable }}.
+to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ is_durable = IsDurable }) ->
+ Msgs = queue:to_list(MsgBuf),
+ AckTags =
+ lists:foldl(
+ fun ({_Seq, Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent },
+ IsDelivered}, AcksAcc) ->
+ ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
+ if IsDurable andalso IsPersistent ->
+ {MsgId, IsDelivered, AckTag, _PersistRemaining}
+ = rabbit_disk_queue:phantom_deliver(Q),
+ [AckTag | AcksAcc];
+ true -> AcksAcc
+ end
+ end, [], Msgs),
+ ok = rabbit_disk_queue:ack(Q, lists:reverse(AckTags)),
+ State #mqstate { mode = disk, msg_buf = queue:new() }.
+
+to_mixed_mode(State = #mqstate { mode = disk, msg_buf = MsgBuf, queue = Q,
+ is_durable = IsDurable,
+ next_write_seq = NextSeq }) ->
+ QList = rabbit_disk_queue:dump_queue(Q),
+ {MsgBuf1, NextSeq1, AckTags} =
+ lists:foldl(
+ fun ({MsgId, MsgBin, _Size, IsDelivered, SeqId}, {Buf, NSeq, AcksAcc})
+ when SeqId >= NSeq ->
+ Msg = #basic_message { guid = MsgId,
+ is_persistent = IsPersistent }
+ = bin_to_msg(MsgBin),
+ Buf1 = queue:in({SeqId, Msg, IsDelivered}, Buf),
+ NSeq1 = SeqId + 1,
+ AcksAcc1 =
+ if IsDurable andalso IsPersistent ->
+ [AcksAcc];
+ true ->
+ {MsgId, IsDelivered, AckTag, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ [AckTag | AcksAcc]
+ end,
+ {Buf1, NSeq1, AcksAcc1}
+ end, {MsgBuf, NextSeq, []}, QList),
+ ok = rabbit_disk_queue:ack(Q, lists:reverse(AckTags)),
+ State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }.
+
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
term_to_binary(Msg #basic_message { content = ClearedContent }).