summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-22 18:08:45 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-22 18:08:45 +0100
commit76403c059d1ec0c418fe35a0d1e8a10ce09420a9 (patch)
tree25dce9eb8db970335d53b53fade2aa3f9d65f00b
parent086f13e9a4b4192b769a0f0ff9706665213be1c8 (diff)
downloadrabbitmq-server-git-76403c059d1ec0c418fe35a0d1e8a10ce09420a9.tar.gz
Well, it /tends/ to work, but sometimes falls over, apparently trying to read a message which has been erased from mnesia. Mysterious!
-rw-r--r--src/rabbit_disk_queue.erl47
-rw-r--r--src/rabbit_misc.erl10
-rw-r--r--src/rabbit_mixed_queue.erl11
3 files changed, 47 insertions, 21 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 71d812f663..2e3ff89a3d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -538,8 +538,8 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mo
internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, empty, State};
- [{Q, _ReadSeqId, _WriteSeqId, 0}] -> {ok, empty, State};
- [{Q, ReadSeqId, WriteSeqId, Length}] ->
+ [{Q, SeqId, SeqId, 0}] -> {ok, empty, State};
+ [{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 ->
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
next_seq_id = ReadSeqId2}] =
@@ -671,16 +671,22 @@ internal_tx_publish(MsgId, MsgBody,
{ok, State}
end.
-adjust_last_msg_seq_id(_Q, ExpectedSeqId, next) ->
+adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) ->
ExpectedSeqId;
-adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId) ->
+adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) ->
SuppliedSeqId;
-adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId) ->
+adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) ->
ExpectedSeqId;
-adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId) when SuppliedSeqId > ExpectedSeqId ->
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) when SuppliedSeqId > ExpectedSeqId ->
[Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}),
ok = mnesia:dirty_write(rabbit_disk_queue,
Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }),
+ SuppliedSeqId;
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) when SuppliedSeqId > ExpectedSeqId ->
+ [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock),
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { next_seq_id = SuppliedSeqId },
+ Lock),
SuppliedSeqId.
%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next))
@@ -716,7 +722,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
{Acc, ExpectedSeqId}) ->
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
dets_ets_lookup(State, MsgId),
- SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId),
+ SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId, write),
NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1;
true -> NextSeqId
end,
@@ -754,7 +760,7 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
[{Q, ReadSeqId2, WriteSeqId2, Length2}] ->
{ReadSeqId2, WriteSeqId2, Length2}
end,
- WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId),
+ WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty),
WriteSeqId3Next = WriteSeqId3 + 1,
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next, Length + 1}),
ok = mnesia:dirty_write(rabbit_disk_queue,
@@ -772,7 +778,7 @@ internal_tx_cancel(MsgIds, State) ->
internal_requeue(_Q, [], State) ->
{ok, State};
-internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
+internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail],
State = #dqstate { sequences = Sequences }) ->
%% We know that every seq_id in here is less than the ReadSeqId
%% you'll get if you look up this queue in Sequences (i.e. they've
@@ -798,6 +804,10 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
%% the Q _must_ already exist
[{Q, ReadSeqId, WriteSeqId, Length}] = ets:lookup(Sequences, Q),
+ ReadSeqId2 =
+ if ReadSeqId == WriteSeqId andalso FirstSeqIdTo > WriteSeqId -> FirstSeqIdTo;
+ true -> ReadSeqId
+ end,
MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]),
{atomic, WriteSeqId2} =
mnesia:transaction(
@@ -807,7 +817,7 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
fun ({{{MsgId, SeqIdOrig}, SeqIdTo},
{_NextMsgSeqId, NextSeqIdTo}},
ExpectedSeqIdTo) ->
- SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo),
+ SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1;
true -> NextSeqIdTo
end,
@@ -823,7 +833,7 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
NextSeqIdTo2
end, WriteSeqId, MsgSeqIdsZipped)
end),
- true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2, Length + erlang:length(MsgSeqIds)}),
+ true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2, Length + erlang:length(MsgSeqIds)}),
{ok, State}.
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
@@ -834,12 +844,15 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- MsgSeqIds = lists:foldl(
- fun (SeqId, Acc) ->
- [#dq_msg_loc { is_delivered = false, msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
- [{MsgId, SeqId} | Acc]
- end, [], lists:seq(ReadSeqId, WriteSeqId - 1)),
+ MsgSeqIds =
+ rabbit_misc:unfold(
+ fun (SeqId) when SeqId == WriteSeqId -> false;
+ (SeqId) ->
+ [#dq_msg_loc { msg_id = MsgId,
+ next_seq_id = NextSeqId }
+ ] = mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
+ {true, {MsgId, SeqId}, NextSeqId}
+ end, ReadSeqId),
remove_messages(Q, MsgSeqIds, txn, State)
end),
true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId, 0}),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index f207038eff..153a8a7cb8 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -51,6 +51,7 @@
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
+-export([unfold/2]).
-import(mnesia).
-import(lists).
@@ -406,3 +407,12 @@ stop_applications(Apps) ->
not_started,
cannot_stop_application,
Apps).
+
+unfold(Fun, Init) ->
+ unfold(Fun, [], Init).
+
+unfold(Fun, Acc, Init) ->
+ case Fun(Init) of
+ {true, E, I} -> unfold(Fun, [E|Acc], I);
+ false -> Acc
+ end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 037aeebf18..5cda8eca90 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -77,11 +77,14 @@ publish(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
%% assumption here is that the queue is empty already (only called via publish immediate)
publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent},
- State = #mqstate { mode = Mode, queue = Q })
+ State = #mqstate { mode = Mode, queue = Q, next_write_seq = NextSeq })
when Mode =:= disk orelse IsPersistent ->
ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
{MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q),
- {ok, AckTag, State};
+ State2 = if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 };
+ true -> State
+ end,
+ {ok, AckTag, State2};
publish_delivered(#basic_message { is_persistent = false },
State = #mqstate { mode = mixed }) ->
{ok, noack, State}.
@@ -203,9 +206,9 @@ purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) ->
Count = queue:len(MsgBuf),
{Count, State #mqstate { msg_buf = queue:new() }}.
-length(State = #mqstate { queue = Q, mode = disk }) ->
+length(#mqstate { queue = Q, mode = disk }) ->
rabbit_disk_queue:length(Q);
-length(State = #mqstate { mode = mixed, msg_buf = MsgBuf }) ->
+length(#mqstate { mode = mixed, msg_buf = MsgBuf }) ->
queue:len(MsgBuf).
is_empty(State) ->