diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 166 |
2 files changed, 169 insertions, 2 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b2d086b27d..37c91a855b 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -234,8 +234,9 @@ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). --spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). --spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}], [seq_id()]) -> 'ok'). +-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok'). +-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}], + [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id()}]) -> 'ok'). diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl new file mode 100644 index 0000000000..c7c76eb230 --- /dev/null +++ b/src/rabbit_mixed_queue.erl @@ -0,0 +1,166 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_mixed_queue). + +-export([publish/4, deliver/1, ack/2, + tx_publish/4, tx_commit/3, tx_cancel/2, + requeue/2, purge/1]). + +-record(mqstate, { mode, + msg_buf, + next_write_seq, + queue + } + ). + +publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk, queue = Q }) -> + ok = rabbit_disk_queue:publish(Q, MsgId, Msg), + {ok, State}; +publish(MsgId, Msg, IsPersistent, + State = #mqstate { queue = Q, mode = mixed, + next_write_seq = NextSeq, msg_buf = MsgBuf }) -> + if IsPersistent -> + ok = rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, Msg); + true -> ok + end, + {ok, State #mqstate { next_write_seq = NextSeq + 1, + msg_buf = queue:in({NextSeq, {MsgId, Msg, IsPersistent}}, + MsgBuf) + }}. + +deliver(State = #mqstate { mode = disk, queue = Q }) -> + {rabbit_disk_queue:deliver(Q), State}; +deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) -> + {Result, MsgBuf2} = queue:out(MsgBuf), + case Result of + empty -> + {empty, State}; + {value, {_Seq, {MsgId, Msg, IsPersistent}}} -> + {IsDelivered, Ack} = + if IsPersistent -> + {MsgId, IsDelivered2, Ack2} = rabbit_disk_queue:phantom_deliver(Q), + {IsDelivered2, Ack2}; + true -> {false, noack} + end, + {{MsgId, Msg, size(Msg), IsDelivered, Ack}, + State #mqstate { msg_buf = MsgBuf2 }} + end. + +remove_noacks(Acks) -> + lists:filter(fun (A) -> A /= noack end, Acks). + +ack(Acks, State = #mqstate { queue = Q }) -> + ok = rabbit_disk_queue:ack(Q, remove_noacks(Acks)), + {ok, State}. + +tx_publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk }) -> + ok = rabbit_disk_queue:tx_publish(MsgId, Msg), + {ok, State}; +tx_publish(MsgId, Msg, true, State = #mqstate { mode = mixed }) -> + ok = rabbit_disk_queue:tx_publish(MsgId, Msg), + {ok, State}; +tx_publish(_MsgId, _Msg, false, State = #mqstate { mode = mixed }) -> + {ok, State}. + +only_msg_ids(Pubs) -> + lists:map(fun (P) -> element(1, P) end, Pubs). + +tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) -> + ok = rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), Acks), + {ok, State}; +tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, + msg_buf = MsgBuf, + next_write_seq = NextSeq + }) -> + {PersistentPubs, MsgBuf2, NextSeq2} = + lists:foldl(fun ({MsgId, Msg, IsPersistent}, {Acc, MsgBuf3, NextSeq3}) -> + Acc2 = + if IsPersistent -> + [{MsgId, NextSeq3} | Acc]; + true -> Acc + end, + MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}}, + MsgBuf3), + {Acc2, MsgBuf4, NextSeq3 + 1} + end, {[], MsgBuf, NextSeq}, Publishes), + %% foldl reverses, so re-reverse PersistentPubs to match + %% requirements of rabbit_disk_queue (ascending SeqIds) + ok = rabbit_disk_queue:tx_commit_with_seqs(Q, lists:reverse(PersistentPubs), + remove_noacks(Acks)), + {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. + +only_persistent_msg_ids(Pubs) -> + lists:reverse(lists:foldl(fun ({MsgId, _, IsPersistent}, Acc) -> + if IsPersistent -> [MsgId | Acc]; + true -> Acc + end + end, [], Pubs)). + +tx_cancel(Publishes, State = #mqstate { mode = disk }) -> + ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)), + {ok, State}; +tx_cancel(Publishes, State = #mqstate { mode = mixed }) -> + ok = rabbit_disk_queue:tx_cancel(only_persistent_msg_ids(Publishes)), + {ok, State}. + +only_ack_tags(MsgWithAcks) -> + lists:map(fun (P) -> element(2, P) end, MsgWithAcks). + +requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) -> + rabbit_disk_queue:requeue(Q, only_ack_tags(MessagesWithAckTags)), + {ok, State}; +requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, + msg_buf = MsgBuf, + next_write_seq = NextSeq + }) -> + {PersistentPubs, MsgBuf2, NextSeq2} = + lists:foldl(fun ({{MsgId, Msg, IsPersistent}, AckTag}, {Acc, MsgBuf3, NextSeq3}) -> + Acc2 = + if IsPersistent -> + {MsgId, _OldSeqId} = AckTag, + [{AckTag, NextSeq3} | Acc]; + true -> Acc + end, + MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}}, + MsgBuf3), + {Acc2, MsgBuf4, NextSeq3 + 1} + end, {[], MsgBuf, NextSeq}, MessagesWithAckTags), + ok = rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(PersistentPubs)), + {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. + +purge(State = #mqstate { queue = Q, mode = disk }) -> + Count = rabbit_disk_queue:purge(Q), + {Count, State}; +purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) -> + rabbit_disk_queue:purge(Q), + Count = queue:len(MsgBuf), + {Count, State #mqstate { msg_buf = queue:new() }}. |
