diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-22 14:03:47 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-22 14:03:47 +0100 |
| commit | 7b0f295c2e7acc3b6a40874d3feb5d000ea34c78 (patch) | |
| tree | ca36d9cc87a69ed3a9e4e2a8cfa7feaac509784c /src | |
| parent | f4a4d7520cc2ddfe39d21b3fc6c70a39e41d423d (diff) | |
| download | rabbitmq-server-git-7b0f295c2e7acc3b6a40874d3feb5d000ea34c78.tar.gz | |
Tiny changes to amqqueue_process, but mainly getting the mixed_queue api into proper shape.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 94 |
2 files changed, 61 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index bcf4dae474..15b3a03655 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -228,7 +228,7 @@ attempt_delivery(none, Message, State) -> {AckTag, State3} = if AckRequired -> %% TODO API CHANGE - {ok, MS, AckTag2} = rabbit_mixed_queue:publish_delivered(Message, + {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Message, State2 #q.mixed_state), {AckTag2, State2 #q { mixed_state = MS }}; true -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 790f4b756e..e56e667d4f 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -31,10 +31,12 @@ -module(rabbit_mixed_queue). +-include("rabbit.hrl"). + -export([start_link/2]). --export([publish/4, deliver/1, ack/2, - tx_publish/4, tx_commit/3, tx_cancel/2, +-export([publish/2, publish_delivered/2, deliver/1, ack/2, + tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, length/1, is_empty/1]). -record(mqstate, { mode, @@ -51,36 +53,56 @@ start_link(Queue, Mode) when Mode =:= disk orelse Mode =:= mixed -> rabbit_disk_queue:to_ram_disk_mode(), %% TODO, CHANGE ME {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1, queue = Queue }}. -publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk, queue = Q }) -> - ok = rabbit_disk_queue:publish(Q, MsgId, Msg), +msg_to_bin(Msg = #basic_message { content = Content }) -> + ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), + term_to_binary(Msg #basic_message { content = ClearedContent }). + +bin_to_msg(MsgBin) -> + binary_to_term(MsgBin). + +publish(Msg = #basic_message { guid = MsgId }, + State = #mqstate { mode = disk, queue = Q }) -> + ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), {ok, State}; -publish(MsgId, Msg, IsPersistent, +publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = mixed, next_write_seq = NextSeq, msg_buf = MsgBuf }) -> - if IsPersistent -> - ok = rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, Msg); - true -> ok - end, + ok = if IsPersistent -> + rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, msg_to_bin(Msg)); + true -> ok + end, {ok, State #mqstate { next_write_seq = NextSeq + 1, - msg_buf = queue:in({NextSeq, {MsgId, Msg, IsPersistent}}, - MsgBuf) + msg_buf = queue:in({NextSeq, Msg, false}, MsgBuf) }}. +%% assumption here is that the queue is empty already (only called via publish immediate) +publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, + State = #mqstate { mode = Mode, queue = Q }) + when Mode =:= disk orelse IsPersistent -> + ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), + {MsgId, false, Ack, 0} = rabbit_disk_queue:phantom_deliver(Q), + {ok, Ack, State}; +publish_delivered(#basic_message { is_persistent = false }, + State = #mqstate { mode = mixed }) -> + {ok, noack, State}. + deliver(State = #mqstate { mode = disk, queue = Q }) -> - {rabbit_disk_queue:deliver(Q), State}; + {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q), + Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), + {{Msg, IsDelivered, AckTag, Remaining}, State}; deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write_seq = NextWrite }) -> {Result, MsgBuf2} = queue:out(MsgBuf), case Result of empty -> {empty, State}; - {value, {Seq, {MsgId, Msg, IsPersistent}}} -> - {IsDelivered, Ack} = + {value, {Seq, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered}} -> + AckTag = if IsPersistent -> - {MsgId, IsDelivered2, Ack2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), - {IsDelivered2, Ack2}; - true -> {false, noack} + {MsgId, IsDelivered, AckTag2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), + AckTag2; + true -> noack end, - {{MsgId, Msg, size(Msg), IsDelivered, Ack, (NextWrite - 1 - Seq)}, + {{Msg, IsDelivered, AckTag, (NextWrite - 1 - Seq)}, State #mqstate { msg_buf = MsgBuf2 }} end. @@ -94,17 +116,19 @@ ack(Acks, State = #mqstate { queue = Q }) -> {ok, State} end. -tx_publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk }) -> - ok = rabbit_disk_queue:tx_publish(MsgId, Msg), +tx_publish(Msg = #basic_message { guid = MsgId }, State = #mqstate { mode = disk }) -> + ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), {ok, State}; -tx_publish(MsgId, Msg, true, State = #mqstate { mode = mixed }) -> - ok = rabbit_disk_queue:tx_publish(MsgId, Msg), +tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = true }, + State = #mqstate { mode = mixed }) -> + ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)), {ok, State}; -tx_publish(_MsgId, _Msg, false, State = #mqstate { mode = mixed }) -> +tx_publish(#basic_message { is_persistent = false }, + State = #mqstate { mode = mixed }) -> {ok, State}. only_msg_ids(Pubs) -> - lists:map(fun (P) -> element(1, P) end, Pubs). + lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs). tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) -> ok = rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), Acks), @@ -112,16 +136,16 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) -> tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write_seq = NextSeq - }) -> + }) -> {PersistentPubs, MsgBuf2, NextSeq2} = - lists:foldl(fun ({MsgId, Msg, IsPersistent}, {Acc, MsgBuf3, NextSeq3}) -> + lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, + {Acc, MsgBuf3, NextSeq3}) -> Acc2 = if IsPersistent -> - [{MsgId, NextSeq3} | Acc]; + [{Msg #basic_message.guid, NextSeq3} | Acc]; true -> Acc end, - MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}}, - MsgBuf3), + MsgBuf4 = queue:in({NextSeq3, Msg, false}, MsgBuf3), {Acc2, MsgBuf4, NextSeq3 + 1} end, {[], MsgBuf, NextSeq}, Publishes), %% foldl reverses, so re-reverse PersistentPubs to match @@ -131,8 +155,9 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, {ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}. only_persistent_msg_ids(Pubs) -> - lists:reverse(lists:foldl(fun ({MsgId, _, IsPersistent}, Acc) -> - if IsPersistent -> [MsgId | Acc]; + lists:reverse(lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, + Acc) -> + if IsPersistent -> [Msg #basic_message.guid | Acc]; true -> Acc end end, [], Pubs)). @@ -147,6 +172,7 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed }) -> only_ack_tags(MsgWithAcks) -> lists:map(fun (P) -> element(2, P) end, MsgWithAcks). +%% [{Msg, AckTag}] requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) -> rabbit_disk_queue:requeue(Q, only_ack_tags(MessagesWithAckTags)), {ok, State}; @@ -155,15 +181,15 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, next_write_seq = NextSeq }) -> {PersistentPubs, MsgBuf2, NextSeq2} = - lists:foldl(fun ({{MsgId, Msg, IsPersistent}, AckTag}, {Acc, MsgBuf3, NextSeq3}) -> + lists:foldl(fun ({Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, AckTag}, + {Acc, MsgBuf3, NextSeq3}) -> Acc2 = if IsPersistent -> {MsgId, _OldSeqId} = AckTag, [{AckTag, NextSeq3} | Acc]; true -> Acc end, - MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}}, - MsgBuf3), + MsgBuf4 = queue:in({NextSeq3, Msg, true}, MsgBuf3), {Acc2, MsgBuf4, NextSeq3 + 1} end, {[], MsgBuf, NextSeq}, MessagesWithAckTags), ok = rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(PersistentPubs)), |
