summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-19 00:48:48 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-19 00:48:48 +0100
commit1c3e228929a4828e8d731b63db05d31b4d66107d (patch)
treee30a646dd37fdc1492bec582670dd51f1c957cc7
parent1cf7c30120d48a2d6e4820e7f858119463e3ae51 (diff)
downloadrabbitmq-server-git-1c3e228929a4828e8d731b63db05d31b4d66107d.tar.gz
Spotted and corrected some mistakes where messages published to the mixed queue in disk-only mode would not be marked delivered even if they were persistent, thus resulting in redelivery on broker startup without the message being marked predelivered.
Also, spotted (not fixed yet) bug in commit coalescing in which the mnesia transaction is always commiting before the messages are flushed to disk. What should happen is that if coalescing is going to happen, the mnesia transaction should be delayed too, and happen only _after_ the disk sync. I.e. it doesn't matter if we disk sync and then the mnesia txn fails, but it does matter if the mnesia txn succeeds and then the disk sync fails. Also, I think I've worked out how to do prefetching properly. It's not actually that complex.
-rw-r--r--src/rabbit_disk_queue.erl10
-rw-r--r--src/rabbit_mixed_queue.erl4
2 files changed, 7 insertions, 7 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 178771b8c7..6bd3e04657 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -458,10 +458,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
handle_call({deliver, Q}, _From, State) ->
- {ok, Result, State1} = internal_deliver(Q, true, false, State),
+ {ok, Result, State1} = internal_deliver(Q, true, State),
reply(Result, State1);
handle_call({phantom_deliver, Q}, _From, State) ->
- {ok, Result, State1} = internal_deliver(Q, false, false, State),
+ {ok, Result, State1} = internal_deliver(Q, false, State),
reply(Result, State1);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
{Reply, State1} =
@@ -887,7 +887,7 @@ cache_is_full(#dqstate { message_cache = Cache }) ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, ReadMsg, FakeDeliver,
+internal_deliver(Q, ReadMsg,
State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, empty, State};
@@ -895,7 +895,7 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
Remaining = WriteSeqId - ReadSeqId - 1,
{ok, Result, State1} =
internal_read_message(
- Q, ReadSeqId, ReadMsg, FakeDeliver, false, State),
+ Q, ReadSeqId, ReadMsg, false, false, State),
true = ets:insert(Sequences,
{Q, ReadSeqId+1, WriteSeqId}),
{ok,
@@ -979,7 +979,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
end.
internal_auto_ack(Q, State) ->
- case internal_deliver(Q, false, true, State) of
+ case internal_deliver(Q, false, State) of
{ok, empty, State1} -> {ok, State1};
{ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining},
State1} ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 3b86596b6a..50b75789bd 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -282,7 +282,7 @@ publish_delivered(Msg =
memory_size = QSize, memory_gain = Gain })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
Persist = IsDurable andalso IsPersistent,
- rabbit_disk_queue:publish(Q, Msg, false),
+ ok = rabbit_disk_queue:publish(Q, Msg, true),
MsgSize = size_of_message(Msg),
State1 = State #mqstate { memory_size = QSize + MsgSize,
memory_gain = Gain + MsgSize },
@@ -291,7 +291,7 @@ publish_delivered(Msg =
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
- {MsgId, IsPersistent, false, AckTag, 0} =
+ {MsgId, IsPersistent, true, AckTag, 0} =
rabbit_disk_queue:phantom_deliver(Q),
{ok, AckTag, State1};
false ->