diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-21 13:12:20 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-21 13:12:20 +0100 |
| commit | a3003a0a60bef9a03d8cf2479a8b247c6fc8bdab (patch) | |
| tree | 2505c8373b4bc80795a1b8830facc4691c1afbc4 | |
| parent | 7a35bdd921e3b24d2e1919b9e6f2023e1c7ea37c (diff) | |
| download | rabbitmq-server-git-a3003a0a60bef9a03d8cf2479a8b247c6fc8bdab.tar.gz | |
altered api so that deliver returns the seq_id (actually, it's a tuple of {msgid, seqid}, but that's irrelevant), and that ack requires this seq_id (tuple) back in.
This avoids extra mnesia work and makes ack much faster. Given that the amqqueue already tracks unacked messages, this seems reasonable. However, if not, back off to the parent of this revision.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 8 |
2 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 9b0849c35f..0623d77d24 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -82,8 +82,8 @@ publish(Q, MsgId, Msg) when is_binary(Msg) -> deliver(Q) -> gen_server:call(?SERVER, {deliver, Q}, infinity). -ack(Q, MsgIds) when is_list(MsgIds) -> - gen_server:cast(?SERVER, {ack, Q, MsgIds}). +ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}). tx_publish(MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}). @@ -154,8 +154,8 @@ handle_call(clean_stop, _From, State) -> handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), {noreply, State1}; -handle_cast({ack, Q, MsgIds}, State) -> - {ok, State1} = internal_ack(Q, MsgIds, State), +handle_cast({ack, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_ack(Q, MsgSeqIds, State), {noreply, State1}; handle_cast({tx_publish, MsgId, MsgBody}, State) -> {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), @@ -238,7 +238,7 @@ internal_deliver(Q, State = #dqstate { msg_location = MsgLocation, true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) end, true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), - {ok, {MsgId, MsgBody, BodySize, Delivered}, + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }} end end. @@ -249,13 +249,13 @@ internal_ack(Q, MsgIds, State) -> %% Q is only needed if MnesiaDelete = true %% called from tx_cancel with MnesiaDelete = false %% called from ack with MnesiaDelete = true -remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, - sequences = Sequences, - file_summary = FileSummary, - current_file_name = CurName - }) -> +remove_messages(Q, MsgSeqIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, + sequences = Sequences, + file_summary = FileSummary, + current_file_name = CurName + }) -> {Files, MaxSeqId} - = lists:foldl(fun (MsgId, {Files2, MaxSeqId2}) -> + = lists:foldl(fun ({MsgId, SeqId}, {Files2, MaxSeqId2}) -> [{MsgId, RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId), Files3 = @@ -275,19 +275,13 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL Files2 end, {if MnesiaDelete -> - [#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }] - = mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = {Q, '_'}, - is_delivered = '_' - }), ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}), lists:max([SeqId, MaxSeqId2]); true -> MaxSeqId2 end, Files3} - end, {sets:new(), 0}, MsgIds), + end, {sets:new(), 0}, MsgSeqIds), true = if MnesiaDelete -> [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), if MaxSeqId > ReadSeqId -> @@ -371,6 +365,9 @@ internal_publish(Q, MsgId, MsgBody, State) -> {ok, State1}. internal_tx_cancel(MsgIds, State) -> + % we don't need seq ids because we're not touching mnesia, because seqids were + % never assigned + MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), remove_messages(undefined, MsgIds, false, State). %% ---- ROLLING OVER THE APPEND FILE ---- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1e66fe9a81..3eab352d22 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -705,8 +705,8 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end ]]), {Deliver, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [begin [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q), ok end || N <- List], - rabbit_disk_queue:ack(Q, List), + [[fun() -> [begin SeqIds = [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(Q), SeqId end || N <- List], + rabbit_disk_queue:ack(Q, SeqIds), ok = rabbit_disk_queue:tx_commit(Q, []) end || Q <- Qs] end]]), @@ -738,8 +738,8 @@ rdq_stress_gc(MsgCount) -> end end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)]))) ++ lists:seq(1, (StartChunk - 1)), - [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q), - rabbit_disk_queue:ack(q, [N]), + [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q), + rabbit_disk_queue:ack(q, [SeqId]), rabbit_disk_queue:tx_commit(q, []) end || N <- AckList], rdq_stop(). |
