summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-04 12:06:22 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-04 12:06:22 +0100
commit1a96bfd21b5b8adbc9d856ba349e1ef6e063fe8a (patch)
tree3b32626a9255fd301a43321ff7cbf5b61a893080
parent834e68f2f3f567f137b6cdab80f637f71da6f0d2 (diff)
downloadrabbitmq-server-git-1a96bfd21b5b8adbc9d856ba349e1ef6e063fe8a.tar.gz
only write out persistent messages sent to a durable queue
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_mixed_queue.erl50
3 files changed, 33 insertions, 25 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 5062e7e986..c0d09547c5 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -157,8 +157,8 @@ start(normal, []) ->
end},
{"disk queue",
fun () ->
- ok = start_child(rabbit_disk_queue) %%,
- %% ok = rabbit_disk_queue:to_ram_disk_mode() %% TODO, CHANGE ME
+ ok = start_child(rabbit_disk_queue),
+ ok = rabbit_disk_queue:to_ram_disk_mode() %% TODO, CHANGE ME
end},
{"guid generator",
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5a8bd4a43d..73fae89277 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -90,9 +90,9 @@ start_link(Q) ->
%%----------------------------------------------------------------------------
-init(Q = #amqqueue { name = QName }) ->
+init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, MS} = rabbit_mixed_queue:start_link(QName, disk), %% TODO, CHANGE ME
+ {ok, MS} = rabbit_mixed_queue:start_link(QName, Durable, mixed), %% TODO, CHANGE ME
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 6a8f30979a..d1000c887e 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
--export([start_link/2]).
+-export([start_link/3]).
-export([publish/2, publish_delivered/2, deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
@@ -42,12 +42,14 @@
-record(mqstate, { mode,
msg_buf,
next_write_seq,
- queue
+ queue,
+ is_durable
}
).
-start_link(Queue, Mode) when Mode =:= disk orelse Mode =:= mixed ->
- {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1, queue = Queue }}.
+start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed ->
+ {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1,
+ queue = Queue, is_durable = IsDurable }}.
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
@@ -61,9 +63,9 @@ publish(Msg = #basic_message { guid = MsgId },
ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
{ok, State};
publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
- State = #mqstate { queue = Q, mode = mixed,
+ State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
next_write_seq = NextSeq, msg_buf = MsgBuf }) ->
- ok = if IsPersistent ->
+ ok = if IsDurable andalso IsPersistent ->
rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, msg_to_bin(Msg));
true -> ok
end,
@@ -71,32 +73,33 @@ publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
msg_buf = queue:in({NextSeq, Msg, false}, MsgBuf)
}}.
-%% assumption here is that the queue is empty already (only called via publish immediate)
+%% assumption here is that the queue is empty already (only called via attempt_immediate_delivery)
publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent},
- State = #mqstate { mode = Mode, queue = Q, next_write_seq = NextSeq })
- when Mode =:= disk orelse IsPersistent ->
+ State = #mqstate { mode = Mode, queue = Q, is_durable = IsDurable,
+ next_write_seq = NextSeq })
+ when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
{MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
State2 = if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 };
true -> State
end,
{ok, AckTag, State2};
-publish_delivered(#basic_message { is_persistent = false },
- State = #mqstate { mode = mixed }) ->
+publish_delivered(_Msg, State = #mqstate { mode = mixed }) ->
{ok, noack, State}.
deliver(State = #mqstate { mode = disk, queue = Q }) ->
{MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q),
Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
{{Msg, IsDelivered, AckTag, Remaining}, State};
-deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write_seq = NextWrite }) ->
+deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ next_write_seq = NextWrite, is_durable = IsDurable }) ->
{Result, MsgBuf2} = queue:out(MsgBuf),
case Result of
empty ->
{empty, State};
{value, {Seq, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered}} ->
AckTag =
- if IsPersistent ->
+ if IsDurable andalso IsPersistent ->
{MsgId, IsDelivered, AckTag2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q),
AckTag2;
true -> noack
@@ -118,12 +121,12 @@ ack(Acks, State = #mqstate { queue = Q }) ->
tx_publish(Msg = #basic_message { guid = MsgId }, State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
-tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = true },
- State = #mqstate { mode = mixed }) ->
+tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
+ State = #mqstate { mode = mixed, is_durable = IsDurable })
+ when IsDurable andalso IsPersistent ->
ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
-tx_publish(#basic_message { is_persistent = false },
- State = #mqstate { mode = mixed }) ->
+tx_publish(_Msg, State = #mqstate { mode = mixed }) ->
{ok, State}.
only_msg_ids(Pubs) ->
@@ -134,7 +137,8 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) ->
{ok, State};
tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
- next_write_seq = NextSeq
+ next_write_seq = NextSeq,
+ is_durable = IsDurable
}) ->
{PersistentPubs, MsgBuf2, NextSeq2} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
@@ -149,7 +153,10 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
end, {[], MsgBuf, NextSeq}, Publishes),
%% foldl reverses, so re-reverse PersistentPubs to match
%% requirements of rabbit_disk_queue (ascending SeqIds)
- ok = rabbit_disk_queue:tx_commit_with_seqs(Q, lists:reverse(PersistentPubs),
+ PersistentPubs2 = if IsDurable -> lists:reverse(PersistentPubs);
+ true -> []
+ end,
+ ok = rabbit_disk_queue:tx_commit_with_seqs(Q, PersistentPubs2,
remove_noacks(Acks)),
{ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
@@ -177,13 +184,14 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) ->
{ok, State};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
- next_write_seq = NextSeq
+ next_write_seq = NextSeq,
+ is_durable = IsDurable
}) ->
{PersistentPubs, MsgBuf2, NextSeq2} =
lists:foldl(fun ({Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, AckTag},
{Acc, MsgBuf3, NextSeq3}) ->
Acc2 =
- if IsPersistent ->
+ if IsDurable andalso IsPersistent ->
{MsgId, _OldSeqId} = AckTag,
[{AckTag, NextSeq3} | Acc];
true -> Acc