summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-22 14:03:47 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-22 14:03:47 +0100
commit7b0f295c2e7acc3b6a40874d3feb5d000ea34c78 (patch)
treeca36d9cc87a69ed3a9e4e2a8cfa7feaac509784c /src
parentf4a4d7520cc2ddfe39d21b3fc6c70a39e41d423d (diff)
downloadrabbitmq-server-git-7b0f295c2e7acc3b6a40874d3feb5d000ea34c78.tar.gz
Tiny changes to amqqueue_process, but mainly getting the mixed_queue api into proper shape.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_mixed_queue.erl94
2 files changed, 61 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bcf4dae474..15b3a03655 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -228,7 +228,7 @@ attempt_delivery(none, Message, State) ->
{AckTag, State3} =
if AckRequired ->
%% TODO API CHANGE
- {ok, MS, AckTag2} = rabbit_mixed_queue:publish_delivered(Message,
+ {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Message,
State2 #q.mixed_state),
{AckTag2, State2 #q { mixed_state = MS }};
true ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 790f4b756e..e56e667d4f 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -31,10 +31,12 @@
-module(rabbit_mixed_queue).
+-include("rabbit.hrl").
+
-export([start_link/2]).
--export([publish/4, deliver/1, ack/2,
- tx_publish/4, tx_commit/3, tx_cancel/2,
+-export([publish/2, publish_delivered/2, deliver/1, ack/2,
+ tx_publish/2, tx_commit/3, tx_cancel/2,
requeue/2, purge/1, length/1, is_empty/1]).
-record(mqstate, { mode,
@@ -51,36 +53,56 @@ start_link(Queue, Mode) when Mode =:= disk orelse Mode =:= mixed ->
rabbit_disk_queue:to_ram_disk_mode(), %% TODO, CHANGE ME
{ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1, queue = Queue }}.
-publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk, queue = Q }) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, Msg),
+msg_to_bin(Msg = #basic_message { content = Content }) ->
+ ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
+ term_to_binary(Msg #basic_message { content = ClearedContent }).
+
+bin_to_msg(MsgBin) ->
+ binary_to_term(MsgBin).
+
+publish(Msg = #basic_message { guid = MsgId },
+ State = #mqstate { mode = disk, queue = Q }) ->
+ ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
{ok, State};
-publish(MsgId, Msg, IsPersistent,
+publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
State = #mqstate { queue = Q, mode = mixed,
next_write_seq = NextSeq, msg_buf = MsgBuf }) ->
- if IsPersistent ->
- ok = rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, Msg);
- true -> ok
- end,
+ ok = if IsPersistent ->
+ rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, msg_to_bin(Msg));
+ true -> ok
+ end,
{ok, State #mqstate { next_write_seq = NextSeq + 1,
- msg_buf = queue:in({NextSeq, {MsgId, Msg, IsPersistent}},
- MsgBuf)
+ msg_buf = queue:in({NextSeq, Msg, false}, MsgBuf)
}}.
+%% assumption here is that the queue is empty already (only called via publish immediate)
+publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent},
+ State = #mqstate { mode = Mode, queue = Q })
+ when Mode =:= disk orelse IsPersistent ->
+ ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
+ {MsgId, false, Ack, 0} = rabbit_disk_queue:phantom_deliver(Q),
+ {ok, Ack, State};
+publish_delivered(#basic_message { is_persistent = false },
+ State = #mqstate { mode = mixed }) ->
+ {ok, noack, State}.
+
deliver(State = #mqstate { mode = disk, queue = Q }) ->
- {rabbit_disk_queue:deliver(Q), State};
+ {MsgId, MsgBin, _Size, IsDelivered, AckTag, Remaining} = rabbit_disk_queue:deliver(Q),
+ Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin),
+ {{Msg, IsDelivered, AckTag, Remaining}, State};
deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write_seq = NextWrite }) ->
{Result, MsgBuf2} = queue:out(MsgBuf),
case Result of
empty ->
{empty, State};
- {value, {Seq, {MsgId, Msg, IsPersistent}}} ->
- {IsDelivered, Ack} =
+ {value, {Seq, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered}} ->
+ AckTag =
if IsPersistent ->
- {MsgId, IsDelivered2, Ack2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q),
- {IsDelivered2, Ack2};
- true -> {false, noack}
+ {MsgId, IsDelivered, AckTag2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q),
+ AckTag2;
+ true -> noack
end,
- {{MsgId, Msg, size(Msg), IsDelivered, Ack, (NextWrite - 1 - Seq)},
+ {{Msg, IsDelivered, AckTag, (NextWrite - 1 - Seq)},
State #mqstate { msg_buf = MsgBuf2 }}
end.
@@ -94,17 +116,19 @@ ack(Acks, State = #mqstate { queue = Q }) ->
{ok, State}
end.
-tx_publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk }) ->
- ok = rabbit_disk_queue:tx_publish(MsgId, Msg),
+tx_publish(Msg = #basic_message { guid = MsgId }, State = #mqstate { mode = disk }) ->
+ ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
-tx_publish(MsgId, Msg, true, State = #mqstate { mode = mixed }) ->
- ok = rabbit_disk_queue:tx_publish(MsgId, Msg),
+tx_publish(Msg = #basic_message { guid = MsgId, is_persistent = true },
+ State = #mqstate { mode = mixed }) ->
+ ok = rabbit_disk_queue:tx_publish(MsgId, msg_to_bin(Msg)),
{ok, State};
-tx_publish(_MsgId, _Msg, false, State = #mqstate { mode = mixed }) ->
+tx_publish(#basic_message { is_persistent = false },
+ State = #mqstate { mode = mixed }) ->
{ok, State}.
only_msg_ids(Pubs) ->
- lists:map(fun (P) -> element(1, P) end, Pubs).
+ lists:map(fun (Msg) -> Msg #basic_message.guid end, Pubs).
tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) ->
ok = rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), Acks),
@@ -112,16 +136,16 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) ->
tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
msg_buf = MsgBuf,
next_write_seq = NextSeq
- }) ->
+ }) ->
{PersistentPubs, MsgBuf2, NextSeq2} =
- lists:foldl(fun ({MsgId, Msg, IsPersistent}, {Acc, MsgBuf3, NextSeq3}) ->
+ lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
+ {Acc, MsgBuf3, NextSeq3}) ->
Acc2 =
if IsPersistent ->
- [{MsgId, NextSeq3} | Acc];
+ [{Msg #basic_message.guid, NextSeq3} | Acc];
true -> Acc
end,
- MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}},
- MsgBuf3),
+ MsgBuf4 = queue:in({NextSeq3, Msg, false}, MsgBuf3),
{Acc2, MsgBuf4, NextSeq3 + 1}
end, {[], MsgBuf, NextSeq}, Publishes),
%% foldl reverses, so re-reverse PersistentPubs to match
@@ -131,8 +155,9 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
{ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
only_persistent_msg_ids(Pubs) ->
- lists:reverse(lists:foldl(fun ({MsgId, _, IsPersistent}, Acc) ->
- if IsPersistent -> [MsgId | Acc];
+ lists:reverse(lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
+ Acc) ->
+ if IsPersistent -> [Msg #basic_message.guid | Acc];
true -> Acc
end
end, [], Pubs)).
@@ -147,6 +172,7 @@ tx_cancel(Publishes, State = #mqstate { mode = mixed }) ->
only_ack_tags(MsgWithAcks) ->
lists:map(fun (P) -> element(2, P) end, MsgWithAcks).
+%% [{Msg, AckTag}]
requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) ->
rabbit_disk_queue:requeue(Q, only_ack_tags(MessagesWithAckTags)),
{ok, State};
@@ -155,15 +181,15 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
next_write_seq = NextSeq
}) ->
{PersistentPubs, MsgBuf2, NextSeq2} =
- lists:foldl(fun ({{MsgId, Msg, IsPersistent}, AckTag}, {Acc, MsgBuf3, NextSeq3}) ->
+ lists:foldl(fun ({Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, AckTag},
+ {Acc, MsgBuf3, NextSeq3}) ->
Acc2 =
if IsPersistent ->
{MsgId, _OldSeqId} = AckTag,
[{AckTag, NextSeq3} | Acc];
true -> Acc
end,
- MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}},
- MsgBuf3),
+ MsgBuf4 = queue:in({NextSeq3, Msg, true}, MsgBuf3),
{Acc2, MsgBuf4, NextSeq3 + 1}
end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
ok = rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(PersistentPubs)),