diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-08 18:06:51 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-08 18:06:51 +0100 |
| commit | 9bf1d86e4b4b919f7799c0f1dec99b1043ef9707 (patch) | |
| tree | 087bb6b48556b6dcace57410fe9b322f8eb11957 | |
| parent | 835ee0039f6e1adb1ed8e9505d97e532b78b21fd (diff) | |
| download | rabbitmq-server-git-9bf1d86e4b4b919f7799c0f1dec99b1043ef9707.tar.gz | |
can now switch the mixed queue between modes
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 49 |
1 files changed, 48 insertions, 1 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 610a236641..dc180f00c4 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -39,6 +39,8 @@ tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, length/1, is_empty/1, delete_queue/1]). +-export([to_disk_only_mode/1, to_mixed_mode/1]). + -record(mqstate, { mode, msg_buf, next_write_seq, @@ -53,11 +55,56 @@ start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed -> lists:foldl( fun ({_MsgId, Msg, _Size, Delivered, SeqId}, {Buf, NSeq}) when SeqId >= NSeq -> - {queue:in({SeqId, binary_to_term(Msg), Delivered}, Buf), SeqId + 1} + {queue:in({SeqId, bin_to_msg(Msg), Delivered}, Buf), SeqId + 1} end, {queue:new(), 0}, QList), {ok, #mqstate { mode = Mode, msg_buf = MsgBuf, next_write_seq = NextSeq, queue = Queue, is_durable = IsDurable }}. +to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + is_durable = IsDurable }) -> + Msgs = queue:to_list(MsgBuf), + AckTags = + lists:foldl( + fun ({_Seq, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + IsDelivered}, AcksAcc) -> + ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), + if IsDurable andalso IsPersistent -> + {MsgId, IsDelivered, AckTag, _PersistRemaining} + = rabbit_disk_queue:phantom_deliver(Q), + [AckTag | AcksAcc]; + true -> AcksAcc + end + end, [], Msgs), + ok = rabbit_disk_queue:ack(Q, lists:reverse(AckTags)), + State #mqstate { mode = disk, msg_buf = queue:new() }. + +to_mixed_mode(State = #mqstate { mode = disk, msg_buf = MsgBuf, queue = Q, + is_durable = IsDurable, + next_write_seq = NextSeq }) -> + QList = rabbit_disk_queue:dump_queue(Q), + {MsgBuf1, NextSeq1, AckTags} = + lists:foldl( + fun ({MsgId, MsgBin, _Size, IsDelivered, SeqId}, {Buf, NSeq, AcksAcc}) + when SeqId >= NSeq -> + Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent } + = bin_to_msg(MsgBin), + Buf1 = queue:in({SeqId, Msg, IsDelivered}, Buf), + NSeq1 = SeqId + 1, + AcksAcc1 = + if IsDurable andalso IsPersistent -> + [AcksAcc]; + true -> + {MsgId, IsDelivered, AckTag, _PersistRemaining} = + rabbit_disk_queue:phantom_deliver(Q), + [AckTag | AcksAcc] + end, + {Buf1, NSeq1, AcksAcc1} + end, {MsgBuf, NextSeq, []}, QList), + ok = rabbit_disk_queue:ack(Q, lists:reverse(AckTags)), + State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }. + msg_to_bin(Msg = #basic_message { content = Content }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), term_to_binary(Msg #basic_message { content = ClearedContent }). |
