summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-21 13:12:20 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-21 13:12:20 +0100
commita3003a0a60bef9a03d8cf2479a8b247c6fc8bdab (patch)
tree2505c8373b4bc80795a1b8830facc4691c1afbc4
parent7a35bdd921e3b24d2e1919b9e6f2023e1c7ea37c (diff)
downloadrabbitmq-server-git-a3003a0a60bef9a03d8cf2479a8b247c6fc8bdab.tar.gz
altered api so that deliver returns the seq_id (actually, it's a tuple of {msgid, seqid}, but that's irrelevant), and that ack requires this seq_id (tuple) back in.
This avoids extra mnesia work and makes ack much faster. Given that the amqqueue already tracks unacked messages, this seems reasonable. However, if not, back off to the parent of this revision.
-rw-r--r--src/rabbit_disk_queue.erl33
-rw-r--r--src/rabbit_tests.erl8
2 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 9b0849c35f..0623d77d24 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -82,8 +82,8 @@ publish(Q, MsgId, Msg) when is_binary(Msg) ->
deliver(Q) ->
gen_server:call(?SERVER, {deliver, Q}, infinity).
-ack(Q, MsgIds) when is_list(MsgIds) ->
- gen_server:cast(?SERVER, {ack, Q, MsgIds}).
+ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}).
tx_publish(MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
@@ -154,8 +154,8 @@ handle_call(clean_stop, _From, State) ->
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
{noreply, State1};
-handle_cast({ack, Q, MsgIds}, State) ->
- {ok, State1} = internal_ack(Q, MsgIds, State),
+handle_cast({ack, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_ack(Q, MsgSeqIds, State),
{noreply, State1};
handle_cast({tx_publish, MsgId, MsgBody}, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
@@ -238,7 +238,7 @@ internal_deliver(Q, State = #dqstate { msg_location = MsgLocation,
true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
end,
true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
- {ok, {MsgId, MsgBody, BodySize, Delivered},
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}
end
end.
@@ -249,13 +249,13 @@ internal_ack(Q, MsgIds, State) ->
%% Q is only needed if MnesiaDelete = true
%% called from tx_cancel with MnesiaDelete = false
%% called from ack with MnesiaDelete = true
-remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
- sequences = Sequences,
- file_summary = FileSummary,
- current_file_name = CurName
- }) ->
+remove_messages(Q, MsgSeqIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
+ sequences = Sequences,
+ file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
{Files, MaxSeqId}
- = lists:foldl(fun (MsgId, {Files2, MaxSeqId2}) ->
+ = lists:foldl(fun ({MsgId, SeqId}, {Files2, MaxSeqId2}) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
= dets:lookup(MsgLocation, MsgId),
Files3 =
@@ -275,19 +275,13 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
Files2
end,
{if MnesiaDelete ->
- [#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }]
- = mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = {Q, '_'},
- is_delivered = '_'
- }),
ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
lists:max([SeqId, MaxSeqId2]);
true ->
MaxSeqId2
end,
Files3}
- end, {sets:new(), 0}, MsgIds),
+ end, {sets:new(), 0}, MsgSeqIds),
true = if MnesiaDelete ->
[{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
if MaxSeqId > ReadSeqId ->
@@ -371,6 +365,9 @@ internal_publish(Q, MsgId, MsgBody, State) ->
{ok, State1}.
internal_tx_cancel(MsgIds, State) ->
+ % we don't need seq ids because we're not touching mnesia, because seqids were
+ % never assigned
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
remove_messages(undefined, MsgIds, false, State).
%% ---- ROLLING OVER THE APPEND FILE ----
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1e66fe9a81..3eab352d22 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -705,8 +705,8 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
]]),
{Deliver, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q), ok end || N <- List],
- rabbit_disk_queue:ack(Q, List),
+ [[fun() -> [begin SeqIds = [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(Q), SeqId end || N <- List],
+ rabbit_disk_queue:ack(Q, SeqIds),
ok = rabbit_disk_queue:tx_commit(Q, [])
end || Q <- Qs]
end]]),
@@ -738,8 +738,8 @@ rdq_stress_gc(MsgCount) ->
end
end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)])))
++ lists:seq(1, (StartChunk - 1)),
- [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q),
- rabbit_disk_queue:ack(q, [N]),
+ [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:ack(q, [SeqId]),
rabbit_disk_queue:tx_commit(q, [])
end || N <- AckList],
rdq_stop().