summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-21 13:12:20 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-21 13:12:20 +0100
commita3003a0a60bef9a03d8cf2479a8b247c6fc8bdab (patch)
tree2505c8373b4bc80795a1b8830facc4691c1afbc4 /src
parent7a35bdd921e3b24d2e1919b9e6f2023e1c7ea37c (diff)
downloadrabbitmq-server-git-a3003a0a60bef9a03d8cf2479a8b247c6fc8bdab.tar.gz
altered api so that deliver returns the seq_id (actually, it's a tuple of {msgid, seqid}, but that's irrelevant), and that ack requires this seq_id (tuple) back in.
This avoids extra mnesia work and makes ack much faster. Given that the amqqueue already tracks unacked messages, this seems reasonable. However, if not, back off to the parent of this revision.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl33
-rw-r--r--src/rabbit_tests.erl8
2 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 9b0849c35f..0623d77d24 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -82,8 +82,8 @@ publish(Q, MsgId, Msg) when is_binary(Msg) ->
deliver(Q) ->
gen_server:call(?SERVER, {deliver, Q}, infinity).
-ack(Q, MsgIds) when is_list(MsgIds) ->
- gen_server:cast(?SERVER, {ack, Q, MsgIds}).
+ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}).
tx_publish(MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
@@ -154,8 +154,8 @@ handle_call(clean_stop, _From, State) ->
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
{noreply, State1};
-handle_cast({ack, Q, MsgIds}, State) ->
- {ok, State1} = internal_ack(Q, MsgIds, State),
+handle_cast({ack, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_ack(Q, MsgSeqIds, State),
{noreply, State1};
handle_cast({tx_publish, MsgId, MsgBody}, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
@@ -238,7 +238,7 @@ internal_deliver(Q, State = #dqstate { msg_location = MsgLocation,
true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
end,
true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
- {ok, {MsgId, MsgBody, BodySize, Delivered},
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}
end
end.
@@ -249,13 +249,13 @@ internal_ack(Q, MsgIds, State) ->
%% Q is only needed if MnesiaDelete = true
%% called from tx_cancel with MnesiaDelete = false
%% called from ack with MnesiaDelete = true
-remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
- sequences = Sequences,
- file_summary = FileSummary,
- current_file_name = CurName
- }) ->
+remove_messages(Q, MsgSeqIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
+ sequences = Sequences,
+ file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
{Files, MaxSeqId}
- = lists:foldl(fun (MsgId, {Files2, MaxSeqId2}) ->
+ = lists:foldl(fun ({MsgId, SeqId}, {Files2, MaxSeqId2}) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
= dets:lookup(MsgLocation, MsgId),
Files3 =
@@ -275,19 +275,13 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
Files2
end,
{if MnesiaDelete ->
- [#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }]
- = mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = {Q, '_'},
- is_delivered = '_'
- }),
ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
lists:max([SeqId, MaxSeqId2]);
true ->
MaxSeqId2
end,
Files3}
- end, {sets:new(), 0}, MsgIds),
+ end, {sets:new(), 0}, MsgSeqIds),
true = if MnesiaDelete ->
[{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
if MaxSeqId > ReadSeqId ->
@@ -371,6 +365,9 @@ internal_publish(Q, MsgId, MsgBody, State) ->
{ok, State1}.
internal_tx_cancel(MsgIds, State) ->
+ % we don't need seq ids because we're not touching mnesia, because seqids were
+ % never assigned
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
remove_messages(undefined, MsgIds, false, State).
%% ---- ROLLING OVER THE APPEND FILE ----
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1e66fe9a81..3eab352d22 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -705,8 +705,8 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
]]),
{Deliver, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q), ok end || N <- List],
- rabbit_disk_queue:ack(Q, List),
+ [[fun() -> [begin SeqIds = [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(Q), SeqId end || N <- List],
+ rabbit_disk_queue:ack(Q, SeqIds),
ok = rabbit_disk_queue:tx_commit(Q, [])
end || Q <- Qs]
end]]),
@@ -738,8 +738,8 @@ rdq_stress_gc(MsgCount) ->
end
end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)])))
++ lists:seq(1, (StartChunk - 1)),
- [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q),
- rabbit_disk_queue:ack(q, [N]),
+ [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:ack(q, [SeqId]),
rabbit_disk_queue:tx_commit(q, [])
end || N <- AckList],
rdq_stop().