diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-04 12:06:22 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-04 12:06:22 +0100 |
| commit | 1a96bfd21b5b8adbc9d856ba349e1ef6e063fe8a (patch) | |
| tree | 3b32626a9255fd301a43321ff7cbf5b61a893080 | |
| parent | 834e68f2f3f567f137b6cdab80f637f71da6f0d2 (diff) | |
| download | rabbitmq-server-git-1a96bfd21b5b8adbc9d856ba349e1ef6e063fe8a.tar.gz | |
only write out persistent messages sent to a durable queue
| -rw-r--r-- | src/rabbit.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 50 |
3 files changed, 33 insertions, 25 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 5062e7e986..c0d09547c5 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -157,8 +157,8 @@ start(normal, []) -> end}, {"disk queue", fun () -> - ok = start_child(rabbit_disk_queue) %%, - %% ok = rabbit_disk_queue:to_ram_disk_mode() %% TODO, CHANGE ME + ok = start_child(rabbit_disk_queue), + ok = rabbit_disk_queue:to_ram_disk_mode() %% TODO, CHANGE ME end}, {"guid generator", fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5a8bd4a43d..73fae89277 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -90,9 +90,9 @@ start_link(Q) -> %%---------------------------------------------------------------------------- -init(Q = #amqqueue { name = QName }) -> +init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - {ok, MS} = rabbit_mixed_queue:start_link(QName, disk), %% TODO, CHANGE ME + {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, mixed), %% TODO, CHANGE ME {ok, #q{q = Q, owner = none, exclusive_consumer = none, diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 6a8f30979a..d1000c887e 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). --export([start_link/2]). +-export([start_link/3]). -export([publish/2, publish_delivered/2, deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, @@ -42,12 +42,14 @@ -record(mqstate, { mode, msg_buf, next_write_seq, - queue + queue, + is_durable } ). -start_link(Queue, Mode) when Mode =:= disk orelse Mode =:= mixed -> - {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1, queue = Queue }}. +start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed -> + {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1, + queue = Queue, is_durable = IsDurable }}. msg_to_bin(Msg = #basic_message { content = Content }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), @@ -61,9 +63,9 @@ publish(Msg = #basic_message { guid = MsgId }, ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), {ok, State}; publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - State = #mqstate { queue = Q, mode = mixed, + State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, next_write_seq = NextSeq, msg_buf = MsgBuf }) -> - ok = if IsPersistent -> + ok = if IsDurable andalso IsPersistent -> rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, msg_to_bin(Msg)); true -> ok end, @@ -71,32 +73,33 @@ publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, msg_buf = queue:in({NextSeq, Msg, false}, MsgBuf) }}. -%% assumption here is that the queue is empty already (only called via publish immediate) +%% assumption here is that the queue is empty already (only called via attempt_immediate_delivery) publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, - State = #mqstate { mode = Mode, queue = Q, next_write_seq = NextSeq }) - when Mode =:= disk orelse IsPersistent -> + State = #mqstate { mode = Mode, queue = Q, is_durable = IsDurable, + next_write_seq = NextSeq }) + when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), State2 = if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 }; true -> State end, {ok, AckTag, State2}; -publish_delivered(#basic_message { is_persistent = false }, - State = #mqstate { mode = mixed }) -> +publish_delivered(_Msg, State = #mqstate { mode = mixed }) -> {ok, noack, State}. deliver(State = #mqstate { mode = disk, queue = Q }) -> {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q), Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), {{Msg, IsDelivered, AckTag, Remaining}, State}; -deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write_seq = NextWrite }) -> +deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + next_write_seq = NextWrite, is_durable = IsDurable }) -> {Result, MsgBuf2} = queue:out(MsgBuf), case Result of empty -> {empty, State}; {value, {Seq, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered}} -> AckTag = - if IsPersistent -> + if IsDurable andalso IsPersistent -> {MsgId, IsDelivered, AckTag2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), AckTag2; true -> noack @@ -118,12 +121,12 @@ ack(Acks, State = #mqstate { queue = Q }) -> tx_publish(Msg = #basic_message { guid = MsgId }, State = #mqstate { mode = disk }) -> ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), {ok, State}; -tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = true }, - State = #mqstate { mode = mixed }) -> +tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, + State = #mqstate { mode = mixed, is_durable = IsDurable }) + when IsDurable andalso IsPersistent -> ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), {ok, State}; -tx_publish(#basic_message { is_persistent = false }, - State = #mqstate { mode = mixed }) -> +tx_publish(_Msg, State = #mqstate { mode = mixed }) -> {ok, State}. only_msg_ids(Pubs) -> @@ -134,7 +137,8 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) -> {ok, State}; tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - next_write_seq = NextSeq + next_write_seq = NextSeq, + is_durable = IsDurable }) -> {PersistentPubs, MsgBuf2, NextSeq2} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, @@ -149,7 +153,10 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, end, {[], MsgBuf, NextSeq}, Publishes), %% foldl reverses, so re-reverse PersistentPubs to match %% requirements of rabbit_disk_queue (ascending SeqIds) - ok = rabbit_disk_queue:tx_commit_with_seqs(Q, lists:reverse(PersistentPubs), + PersistentPubs2 = if IsDurable -> lists:reverse(PersistentPubs); + true -> [] + end, + ok = rabbit_disk_queue:tx_commit_with_seqs(Q, PersistentPubs2, remove_noacks(Acks)), {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. @@ -177,13 +184,14 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) -> {ok, State}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - next_write_seq = NextSeq + next_write_seq = NextSeq, + is_durable = IsDurable }) -> {PersistentPubs, MsgBuf2, NextSeq2} = lists:foldl(fun ({Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, AckTag}, {Acc, MsgBuf3, NextSeq3}) -> Acc2 = - if IsPersistent -> + if IsDurable andalso IsPersistent -> {MsgId, _OldSeqId} = AckTag, [{AckTag, NextSeq3} | Acc]; true -> Acc |
