diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-12 12:51:45 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-12 12:51:45 +0100 |
| commit | b954691aabfddd323c2431dffe17c99991029e14 (patch) | |
| tree | 6c48ae1a2dd1cc3f57063a2b41617e407d405e8b | |
| parent | be5837815fad0f5073031c4c5c377836267cd702 (diff) | |
| download | rabbitmq-server-git-b954691aabfddd323c2431dffe17c99991029e14.tar.gz | |
Yep, as I'd thought, the next_seq_id field was totally unused for anything useful. The code is thus now a good bit simpler.
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 135 |
1 files changed, 54 insertions, 81 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 74e47a00db..31c0fb10c7 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -43,7 +43,6 @@ -record(mqstate, { mode, msg_buf, - next_write_seq, queue, is_durable, length @@ -53,15 +52,15 @@ start_link(Queue, IsDurable, disk) -> purge_non_persistent_messages( #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, - next_write_seq = 0, is_durable = IsDurable, length = 0 }); + is_durable = IsDurable, length = 0 }); start_link(Queue, IsDurable, mixed) -> {ok, State} = start_link(Queue, IsDurable, disk), to_mixed_mode(State). to_disk_only_mode(State = #mqstate { mode = disk }) -> {ok, State}; -to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - next_write_seq = NextSeq }) -> +to_disk_only_mode(State = + #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) -> rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]), %% We enqueue _everything_ here. This means that should a message %% already be in the disk queue we must remove it and add it back @@ -70,32 +69,30 @@ to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. Msgs = queue:to_list(MsgBuf), - {NextSeq1, Requeue} = + Requeue = lists:foldl( - fun ({_Seq, Msg = #basic_message { guid = MsgId }, - IsDelivered, OnDisk}, {NSeq, RQueueAcc}) -> + fun ({Msg = #basic_message { guid = MsgId }, IsDelivered, OnDisk}, + RQueueAcc) -> if OnDisk -> {MsgId, IsDelivered, AckTag, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), - {NSeq + 1, - [ {AckTag, {NSeq, IsDelivered}} | RQueueAcc ]}; + [ {AckTag, {next, IsDelivered}} | RQueueAcc ]; true -> ok = if [] == RQueueAcc -> ok; true -> rabbit_disk_queue:requeue_with_seqs( Q, lists:reverse(RQueueAcc)) end, - ok = rabbit_disk_queue:publish_with_seq( - Q, MsgId, NSeq, msg_to_bin(Msg), false), - {NSeq + 1, []} + ok = rabbit_disk_queue:publish( + Q, MsgId, msg_to_bin(Msg), false), + [] end - end, {NextSeq, []}, Msgs), + end, [], Msgs), ok = if [] == Requeue -> ok; true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) end, - {ok, State #mqstate { mode = disk, msg_buf = queue:new(), - next_write_seq = NextSeq1 }}. + {ok, State #mqstate { mode = disk, msg_buf = queue:new() }}. to_mixed_mode(State = #mqstate { mode = mixed }) -> {ok, State}; @@ -104,50 +101,45 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) -> %% load up a new queue with everything that's on disk. %% don't remove non-persistent messages that happen to be on disk QList = rabbit_disk_queue:dump_queue(Q), - {MsgBuf1, NextSeq1, Length} = + {MsgBuf1, Length} = lists:foldl( - fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId}, - {Buf, NSeq, L}) - when SeqId >= NSeq -> + fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, _SeqId}, + {Buf, L}) -> Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), - {queue:in({SeqId, Msg, IsDelivered, true}, Buf), SeqId+1, L+1} - end, {queue:new(), 0, 0}, QList), - State1 = State #mqstate { mode = mixed, msg_buf = MsgBuf1, - next_write_seq = NextSeq1 }, - {ok, State1}. + {queue:in({Msg, IsDelivered, true}, Buf), L+1} + end, {queue:new(), 0}, QList), + {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1 }}. purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable }) -> %% iterate through the content on disk, ack anything which isn't %% persistent, accumulate everything else that is persistent and %% requeue it - NextSeq = rabbit_disk_queue:next_write_seq(Q), - {Acks, Requeue, NextSeq2} = - deliver_all_messages(Q, IsDurable, [], [], NextSeq), + {Acks, Requeue, Length} = + deliver_all_messages(Q, IsDurable, [], [], 0), ok = if Requeue == [] -> ok; true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) end, ok = if Acks == [] -> ok; true -> rabbit_disk_queue:ack(Q, lists:reverse(Acks)) end, - Length = NextSeq2 - NextSeq, - {ok, State #mqstate { next_write_seq = NextSeq2, length = Length }}. + {ok, State #mqstate { length = Length }}. -deliver_all_messages(Q, IsDurable, Acks, Requeue, NextSeq) -> +deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) -> case rabbit_disk_queue:deliver(Q) of - empty -> {Acks, Requeue, NextSeq}; + empty -> {Acks, Requeue, Length}; {MsgId, MsgBin, _Size, IsDelivered, AckTag, _Remaining} -> #basic_message { guid = MsgId, is_persistent = IsPersistent } = bin_to_msg(MsgBin), OnDisk = IsPersistent andalso IsDurable, - {Acks2, Requeue2, NextSeq2} = + {Acks2, Requeue2, Length2} = if OnDisk -> {Acks, - [{AckTag, {NextSeq, IsDelivered}} | Requeue], - NextSeq + 1 + [{AckTag, {next, IsDelivered}} | Requeue], + Length + 1 }; - true -> {[AckTag | Acks], Requeue, NextSeq} + true -> {[AckTag | Acks], Requeue, Length} end, - deliver_all_messages(Q, IsDurable, Acks2, Requeue2, NextSeq2) + deliver_all_messages(Q, IsDurable, Acks2, Requeue2, Length2) end. msg_to_bin(Msg = #basic_message { content = Content }) -> @@ -163,47 +155,34 @@ publish(Msg = #basic_message { guid = MsgId }, {ok, State #mqstate { length = Length + 1 }}; publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = mixed, is_durable = IsDurable, - next_write_seq = NextSeq, msg_buf = MsgBuf, - length = Length }) -> + msg_buf = MsgBuf, length = Length }) -> OnDisk = IsDurable andalso IsPersistent, ok = if OnDisk -> - rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, - msg_to_bin(Msg), false); + rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false); true -> ok end, - {ok, State #mqstate { next_write_seq = NextSeq + 1, - msg_buf = queue:in({NextSeq, Msg, false, OnDisk}, - MsgBuf), - length = Length + 1 - }}. + {ok, State #mqstate { msg_buf = queue:in({Msg, false, OnDisk}, MsgBuf), + length = Length + 1 }}. %% Assumption here is that the queue is empty already (only called via -%% attempt_immediate_delivery). Also note that the seq id assigned by -%% the disk queue could well not be the same as the NextSeq (true = -%% NextSeq >= disk_queue_write_seq_for_queue(Q)) , but this doesn't -%% matter because the AckTag will still be correct (AckTags for -%% non-persistent messages don't exist). +%% attempt_immediate_delivery). publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, State = #mqstate { mode = Mode, is_durable = IsDurable, - next_write_seq = NextSeq, queue = Q, - length = 0 }) + queue = Q, length = 0 }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), - State1 = if Mode =:= disk -> State; - true -> State #mqstate { next_write_seq = NextSeq + 1 } - end, if IsDurable andalso IsPersistent -> %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), - {ok, AckTag, State1}; + {ok, AckTag, State}; true -> %% in this case, we don't actually care about the ack, so %% auto ack it (asynchronously). ok = rabbit_disk_queue:auto_ack_next_message(Q), - {ok, noack, State1} + {ok, noack, State} end; publish_delivered(_Msg, State = #mqstate { mode = mixed, length = 0 }) -> {ok, noack, State}. @@ -224,9 +203,8 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, State #mqstate { length = Length - 1}}; deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, - next_write_seq = NextWrite, msg_buf = MsgBuf, - length = Length }) -> - {{value, {Seq, Msg = #basic_message { guid = MsgId, + msg_buf = MsgBuf, length = Length }) -> + {{value, {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered, OnDisk}}, MsgBuf2} = queue:out(MsgBuf), @@ -282,33 +260,30 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q, {ok, State #mqstate { length = Length + erlang:length(Publishes) }}; tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - next_write_seq = NextSeq, is_durable = IsDurable, length = Length }) -> - {PersistentPubs, MsgBuf2, NextSeq2} = + {PersistentPubs, MsgBuf2} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, - {Acc, MsgBuf3, NextSeq3}) -> + {Acc, MsgBuf3}) -> OnDisk = IsPersistent andalso IsDurable, Acc2 = if OnDisk -> - [{Msg #basic_message.guid, NextSeq3} - | Acc]; + [Msg #basic_message.guid | Acc]; true -> Acc end, - MsgBuf4 = queue:in({NextSeq3, Msg, false, OnDisk}, - MsgBuf3), - {Acc2, MsgBuf4, NextSeq3 + 1} - end, {[], MsgBuf, NextSeq}, Publishes), + MsgBuf4 = queue:in({Msg, false, OnDisk}, MsgBuf3), + {Acc2, MsgBuf4} + end, {[], MsgBuf}, Publishes), %% foldl reverses, so re-reverse PersistentPubs to match %% requirements of rabbit_disk_queue (ascending SeqIds) RealAcks = remove_noacks(Acks), ok = if ([] == PersistentPubs) andalso ([] == RealAcks) -> ok; true -> - rabbit_disk_queue:tx_commit_with_seqs( + rabbit_disk_queue:tx_commit( Q, lists:reverse(PersistentPubs), RealAcks) end, - {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2, + {ok, State #mqstate { msg_buf = MsgBuf2, length = Length + erlang:length(Publishes) }}. only_persistent_msg_ids(Pubs) -> @@ -357,27 +332,25 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, {ok, State #mqstate {length = Length + erlang:length(MessagesWithAckTags)}}; requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - next_write_seq = NextSeq, is_durable = IsDurable, length = Length }) -> - {PersistentPubs, MsgBuf2, NextSeq2} = + {PersistentPubs, MsgBuf2} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, - {Acc, MsgBuf3, NextSeq3}) -> + {Acc, MsgBuf3}) -> OnDisk = IsDurable andalso IsPersistent, Acc2 = - if OnDisk -> [{AckTag, {NextSeq3, true}} | Acc]; + if OnDisk -> [AckTag | Acc]; true -> Acc end, - MsgBuf4 = queue:in({NextSeq3, Msg, true, OnDisk}, MsgBuf3), - {Acc2, MsgBuf4, NextSeq3 + 1} - end, {[], MsgBuf, NextSeq}, MessagesWithAckTags), + MsgBuf4 = queue:in({Msg, true, OnDisk}, MsgBuf3), + {Acc2, MsgBuf4} + end, {[], MsgBuf}, MessagesWithAckTags), ok = if [] == PersistentPubs -> ok; - true -> rabbit_disk_queue:requeue_with_seqs( - Q, lists:reverse(PersistentPubs)) + true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) end, - {ok, State #mqstate {msg_buf = MsgBuf2, next_write_seq = NextSeq2, + {ok, State #mqstate {msg_buf = MsgBuf2, length = Length + erlang:length(MessagesWithAckTags)}}. purge(State = #mqstate { queue = Q, mode = disk, length = Count }) -> |
