diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-22 18:08:45 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-22 18:08:45 +0100 |
| commit | 76403c059d1ec0c418fe35a0d1e8a10ce09420a9 (patch) | |
| tree | 25dce9eb8db970335d53b53fade2aa3f9d65f00b | |
| parent | 086f13e9a4b4192b769a0f0ff9706665213be1c8 (diff) | |
| download | rabbitmq-server-git-76403c059d1ec0c418fe35a0d1e8a10ce09420a9.tar.gz | |
Well, it /tends/ to work, but sometimes falls over, apparently trying to read a message which has been erased from mnesia. Mysterious!
| -rw-r--r-- | src/rabbit_disk_queue.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 11 |
3 files changed, 47 insertions, 21 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 71d812f663..2e3ff89a3d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -538,8 +538,8 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mo internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, empty, State}; - [{Q, _ReadSeqId, _WriteSeqId, 0}] -> {ok, empty, State}; - [{Q, ReadSeqId, WriteSeqId, Length}] -> + [{Q, SeqId, SeqId, 0}] -> {ok, empty, State}; + [{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 -> [Obj = #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, next_seq_id = ReadSeqId2}] = @@ -671,16 +671,22 @@ internal_tx_publish(MsgId, MsgBody, {ok, State} end. -adjust_last_msg_seq_id(_Q, ExpectedSeqId, next) -> +adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) -> ExpectedSeqId; -adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId) -> +adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) -> SuppliedSeqId; -adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId) -> +adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) -> ExpectedSeqId; -adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId) when SuppliedSeqId > ExpectedSeqId -> +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) when SuppliedSeqId > ExpectedSeqId -> [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}), ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), + SuppliedSeqId; +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) when SuppliedSeqId > ExpectedSeqId -> + [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock), + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }, + Lock), SuppliedSeqId. %% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next)) @@ -716,7 +722,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, {Acc, ExpectedSeqId}) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), - SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId), + SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId, write), NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1; true -> NextSeqId end, @@ -754,7 +760,7 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) -> [{Q, ReadSeqId2, WriteSeqId2, Length2}] -> {ReadSeqId2, WriteSeqId2, Length2} end, - WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId), + WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty), WriteSeqId3Next = WriteSeqId3 + 1, true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next, Length + 1}), ok = mnesia:dirty_write(rabbit_disk_queue, @@ -772,7 +778,7 @@ internal_tx_cancel(MsgIds, State) -> internal_requeue(_Q, [], State) -> {ok, State}; -internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], +internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail], State = #dqstate { sequences = Sequences }) -> %% We know that every seq_id in here is less than the ReadSeqId %% you'll get if you look up this queue in Sequences (i.e. they've @@ -798,6 +804,10 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], %% the Q _must_ already exist [{Q, ReadSeqId, WriteSeqId, Length}] = ets:lookup(Sequences, Q), + ReadSeqId2 = + if ReadSeqId == WriteSeqId andalso FirstSeqIdTo > WriteSeqId -> FirstSeqIdTo; + true -> ReadSeqId + end, MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]), {atomic, WriteSeqId2} = mnesia:transaction( @@ -807,7 +817,7 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], fun ({{{MsgId, SeqIdOrig}, SeqIdTo}, {_NextMsgSeqId, NextSeqIdTo}}, ExpectedSeqIdTo) -> - SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo), + SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1; true -> NextSeqIdTo end, @@ -823,7 +833,7 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], NextSeqIdTo2 end, WriteSeqId, MsgSeqIdsZipped) end), - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2, Length + erlang:length(MsgSeqIds)}), + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2, Length + erlang:length(MsgSeqIds)}), {ok, State}. internal_purge(Q, State = #dqstate { sequences = Sequences }) -> @@ -834,12 +844,15 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - MsgSeqIds = lists:foldl( - fun (SeqId, Acc) -> - [#dq_msg_loc { is_delivered = false, msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), - [{MsgId, SeqId} | Acc] - end, [], lists:seq(ReadSeqId, WriteSeqId - 1)), + MsgSeqIds = + rabbit_misc:unfold( + fun (SeqId) when SeqId == WriteSeqId -> false; + (SeqId) -> + [#dq_msg_loc { msg_id = MsgId, + next_seq_id = NextSeqId } + ] = mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + {true, {MsgId, SeqId}, NextSeqId} + end, ReadSeqId), remove_messages(Q, MsgSeqIds, txn, State) end), true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId, 0}), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index f207038eff..153a8a7cb8 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -51,6 +51,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2]). -import(mnesia). -import(lists). @@ -406,3 +407,12 @@ stop_applications(Apps) -> not_started, cannot_stop_application, Apps). + +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> Acc + end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 037aeebf18..5cda8eca90 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -77,11 +77,14 @@ publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, %% assumption here is that the queue is empty already (only called via publish immediate) publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent}, - State = #mqstate { mode = Mode, queue = Q }) + State = #mqstate { mode = Mode, queue = Q, next_write_seq = NextSeq }) when Mode =:= disk orelse IsPersistent -> ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), - {ok, AckTag, State}; + State2 = if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 }; + true -> State + end, + {ok, AckTag, State2}; publish_delivered(#basic_message { is_persistent = false }, State = #mqstate { mode = mixed }) -> {ok, noack, State}. @@ -203,9 +206,9 @@ purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) -> Count = queue:len(MsgBuf), {Count, State #mqstate { msg_buf = queue:new() }}. -length(State = #mqstate { queue = Q, mode = disk }) -> +length(#mqstate { queue = Q, mode = disk }) -> rabbit_disk_queue:length(Q); -length(State = #mqstate { mode = mixed, msg_buf = MsgBuf }) -> +length(#mqstate { mode = mixed, msg_buf = MsgBuf }) -> queue:len(MsgBuf). is_empty(State) -> |
