summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-12 12:51:45 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-12 12:51:45 +0100
commitb954691aabfddd323c2431dffe17c99991029e14 (patch)
tree6c48ae1a2dd1cc3f57063a2b41617e407d405e8b
parentbe5837815fad0f5073031c4c5c377836267cd702 (diff)
downloadrabbitmq-server-git-b954691aabfddd323c2431dffe17c99991029e14.tar.gz
Yep, as I'd thought, the next_seq_id field was totally unused for anything useful. The code is thus now a good bit simpler.
-rw-r--r--src/rabbit_mixed_queue.erl135
1 files changed, 54 insertions, 81 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 74e47a00db..31c0fb10c7 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -43,7 +43,6 @@
-record(mqstate, { mode,
msg_buf,
- next_write_seq,
queue,
is_durable,
length
@@ -53,15 +52,15 @@
start_link(Queue, IsDurable, disk) ->
purge_non_persistent_messages(
#mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
- next_write_seq = 0, is_durable = IsDurable, length = 0 });
+ is_durable = IsDurable, length = 0 });
start_link(Queue, IsDurable, mixed) ->
{ok, State} = start_link(Queue, IsDurable, disk),
to_mixed_mode(State).
to_disk_only_mode(State = #mqstate { mode = disk }) ->
{ok, State};
-to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
- next_write_seq = NextSeq }) ->
+to_disk_only_mode(State =
+ #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) ->
rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]),
%% We enqueue _everything_ here. This means that should a message
%% already be in the disk queue we must remove it and add it back
@@ -70,32 +69,30 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
Msgs = queue:to_list(MsgBuf),
- {NextSeq1, Requeue} =
+ Requeue =
lists:foldl(
- fun ({_Seq, Msg = #basic_message { guid = MsgId },
- IsDelivered, OnDisk}, {NSeq, RQueueAcc}) ->
+ fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk},
+ RQueueAcc) ->
if OnDisk ->
{MsgId, IsDelivered, AckTag, _PersistRemaining} =
rabbit_disk_queue:phantom_deliver(Q),
- {NSeq + 1,
- [ {AckTag, {NSeq, IsDelivered}} | RQueueAcc ]};
+ [ {AckTag, {next, IsDelivered}} | RQueueAcc ];
true ->
ok = if [] == RQueueAcc -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(
Q, lists:reverse(RQueueAcc))
end,
- ok = rabbit_disk_queue:publish_with_seq(
- Q, MsgId, NSeq, msg_to_bin(Msg), false),
- {NSeq + 1, []}
+ ok = rabbit_disk_queue:publish(
+ Q, MsgId, msg_to_bin(Msg), false),
+ []
end
- end, {NextSeq, []}, Msgs),
+ end, [], Msgs),
ok = if [] == Requeue -> ok;
true ->
rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
end,
- {ok, State #mqstate { mode = disk, msg_buf = queue:new(),
- next_write_seq = NextSeq1 }}.
+ {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}.
to_mixed_mode(State = #mqstate { mode = mixed }) ->
{ok, State};
@@ -104,50 +101,45 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) ->
%% load up a new queue with everything that's on disk.
%% don't remove non-persistent messages that happen to be on disk
QList = rabbit_disk_queue:dump_queue(Q),
- {MsgBuf1, NextSeq1, Length} =
+ {MsgBuf1, Length} =
lists:foldl(
- fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId},
- {Buf, NSeq, L})
- when SeqId >= NSeq ->
+ fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, _SeqId},
+ {Buf, L}) ->
Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
- {queue:in({SeqId, Msg, IsDelivered, true}, Buf), SeqId+1, L+1}
- end, {queue:new(), 0, 0}, QList),
- State1 = State #mqstate { mode = mixed, msg_buf = MsgBuf1,
- next_write_seq = NextSeq1 },
- {ok, State1}.
+ {queue:in({Msg, IsDelivered, true}, Buf), L+1}
+ end, {queue:new(), 0}, QList),
+ {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
is_durable = IsDurable }) ->
%% iterate through the content on disk, ack anything which isn't
%% persistent, accumulate everything else that is persistent and
%% requeue it
- NextSeq = rabbit_disk_queue:next_write_seq(Q),
- {Acks, Requeue, NextSeq2} =
- deliver_all_messages(Q, IsDurable, [], [], NextSeq),
+ {Acks, Requeue, Length} =
+ deliver_all_messages(Q, IsDurable, [], [], 0),
ok = if Requeue == [] -> ok;
true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
end,
ok = if Acks == [] -> ok;
true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks))
end,
- Length = NextSeq2 - NextSeq,
- {ok, State #mqstate { next_write_seq = NextSeq2, length = Length }}.
+ {ok, State #mqstate { length = Length }}.
-deliver_all_messages(Q, IsDurable, Acks, Requeue, NextSeq) ->
+deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) ->
case rabbit_disk_queue:deliver(Q) of
- empty -> {Acks, Requeue, NextSeq};
+ empty -> {Acks, Requeue, Length};
{MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} ->
#basic_message { guid = MsgId, is_persistent = IsPersistent } =
bin_to_msg(MsgBin),
OnDisk = IsPersistent andalso IsDurable,
- {Acks2, Requeue2, NextSeq2} =
+ {Acks2, Requeue2, Length2} =
if OnDisk -> {Acks,
- [{AckTag, {NextSeq, IsDelivered}} | Requeue],
- NextSeq + 1
+ [{AckTag, {next, IsDelivered}} | Requeue],
+ Length + 1
};
- true -> {[AckTag | Acks], Requeue, NextSeq}
+ true -> {[AckTag | Acks], Requeue, Length}
end,
- deliver_all_messages(Q, IsDurable, Acks2, Requeue2, NextSeq2)
+ deliver_all_messages(Q, IsDurable, Acks2, Requeue2, Length2)
end.
msg_to_bin(Msg = #basic_message { content = Content }) ->
@@ -163,47 +155,34 @@ publish(Msg = #basic_message { guid = MsgId },
{ok, State #mqstate { length = Length + 1 }};
publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable,
- next_write_seq = NextSeq, msg_buf = MsgBuf,
- length = Length }) ->
+ msg_buf = MsgBuf, length = Length }) ->
OnDisk = IsDurable andalso IsPersistent,
ok = if OnDisk ->
- rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq,
- msg_to_bin(Msg), false);
+ rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false);
true -> ok
end,
- {ok, State #mqstate { next_write_seq = NextSeq + 1,
- msg_buf = queue:in({NextSeq, Msg, false, OnDisk},
- MsgBuf),
- length = Length + 1
- }}.
+ {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf),
+ length = Length + 1 }}.
%% Assumption here is that the queue is empty already (only called via
-%% attempt_immediate_delivery). Also note that the seq id assigned by
-%% the disk queue could well not be the same as the NextSeq (true =
-%% NextSeq >= disk_queue_write_seq_for_queue(Q)) , but this doesn't
-%% matter because the AckTag will still be correct (AckTags for
-%% non-persistent messages don't exist).
+%% attempt_immediate_delivery).
publish_delivered(Msg =
#basic_message { guid = MsgId, is_persistent = IsPersistent},
State = #mqstate { mode = Mode, is_durable = IsDurable,
- next_write_seq = NextSeq, queue = Q,
- length = 0 })
+ queue = Q, length = 0 })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false),
- State1 = if Mode =:= disk -> State;
- true -> State #mqstate { next_write_seq = NextSeq + 1 }
- end,
if IsDurable andalso IsPersistent ->
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
{MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
- {ok, AckTag, State1};
+ {ok, AckTag, State};
true ->
%% in this case, we don't actually care about the ack, so
%% auto ack it (asynchronously).
ok = rabbit_disk_queue:auto_ack_next_message(Q),
- {ok, noack, State1}
+ {ok, noack, State}
end;
publish_delivered(_Msg, State = #mqstate { mode = mixed, length = 0 }) ->
{ok, noack, State}.
@@ -224,9 +203,8 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
State #mqstate { length = Length - 1}};
deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
- next_write_seq = NextWrite, msg_buf = MsgBuf,
- length = Length }) ->
- {{value, {Seq, Msg = #basic_message { guid = MsgId,
+ msg_buf = MsgBuf, length = Length }) ->
+ {{value, {Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
IsDelivered, OnDisk}}, MsgBuf2}
= queue:out(MsgBuf),
@@ -282,33 +260,30 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q,
{ok, State #mqstate { length = Length + erlang:length(Publishes) }};
tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
- next_write_seq = NextSeq,
is_durable = IsDurable,
length = Length
}) ->
- {PersistentPubs, MsgBuf2, NextSeq2} =
+ {PersistentPubs, MsgBuf2} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
- {Acc, MsgBuf3, NextSeq3}) ->
+ {Acc, MsgBuf3}) ->
OnDisk = IsPersistent andalso IsDurable,
Acc2 =
if OnDisk ->
- [{Msg #basic_message.guid, NextSeq3}
- | Acc];
+ [Msg #basic_message.guid | Acc];
true -> Acc
end,
- MsgBuf4 = queue:in({NextSeq3, Msg, false, OnDisk},
- MsgBuf3),
- {Acc2, MsgBuf4, NextSeq3 + 1}
- end, {[], MsgBuf, NextSeq}, Publishes),
+ MsgBuf4 = queue:in({Msg, false, OnDisk}, MsgBuf3),
+ {Acc2, MsgBuf4}
+ end, {[], MsgBuf}, Publishes),
%% foldl reverses, so re-reverse PersistentPubs to match
%% requirements of rabbit_disk_queue (ascending SeqIds)
RealAcks = remove_noacks(Acks),
ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok;
true ->
- rabbit_disk_queue:tx_commit_with_seqs(
+ rabbit_disk_queue:tx_commit(
Q, lists:reverse(PersistentPubs), RealAcks)
end,
- {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2,
+ {ok, State #mqstate { msg_buf = MsgBuf2,
length = Length + erlang:length(Publishes) }}.
only_persistent_msg_ids(Pubs) ->
@@ -357,27 +332,25 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
{ok, State #mqstate {length = Length + erlang:length(MessagesWithAckTags)}};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
- next_write_seq = NextSeq,
is_durable = IsDurable,
length = Length
}) ->
- {PersistentPubs, MsgBuf2, NextSeq2} =
+ {PersistentPubs, MsgBuf2} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
- {Acc, MsgBuf3, NextSeq3}) ->
+ {Acc, MsgBuf3}) ->
OnDisk = IsDurable andalso IsPersistent,
Acc2 =
- if OnDisk -> [{AckTag, {NextSeq3, true}} | Acc];
+ if OnDisk -> [AckTag | Acc];
true -> Acc
end,
- MsgBuf4 = queue:in({NextSeq3, Msg, true, OnDisk}, MsgBuf3),
- {Acc2, MsgBuf4, NextSeq3 + 1}
- end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
+ MsgBuf4 = queue:in({Msg, true, OnDisk}, MsgBuf3),
+ {Acc2, MsgBuf4}
+ end, {[], MsgBuf}, MessagesWithAckTags),
ok = if [] == PersistentPubs -> ok;
- true -> rabbit_disk_queue:requeue_with_seqs(
- Q, lists:reverse(PersistentPubs))
+ true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
end,
- {ok, State #mqstate {msg_buf = MsgBuf2, next_write_seq = NextSeq2,
+ {ok, State #mqstate {msg_buf = MsgBuf2,
length = Length + erlang:length(MessagesWithAckTags)}}.
purge(State = #mqstate { queue = Q, mode = disk, length = Count }) ->