diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-19 00:48:48 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-19 00:48:48 +0100 |
| commit | 1c3e228929a4828e8d731b63db05d31b4d66107d (patch) | |
| tree | e30a646dd37fdc1492bec582670dd51f1c957cc7 | |
| parent | 1cf7c30120d48a2d6e4820e7f858119463e3ae51 (diff) | |
| download | rabbitmq-server-git-1c3e228929a4828e8d731b63db05d31b4d66107d.tar.gz | |
Spotted and corrected some mistakes where messages published to the mixed queue in disk-only mode would not be marked delivered even if they were persistent, thus resulting in redelivery on broker startup without the message being marked predelivered.
Also, spotted (not fixed yet) bug in commit coalescing in which the mnesia transaction is always commiting before the messages are flushed to disk. What should happen is that if coalescing is going to happen, the mnesia transaction should be delayed too, and happen only _after_ the disk sync. I.e. it doesn't matter if we disk sync and then the mnesia txn fails, but it does matter if the mnesia txn succeeds and then the disk sync fails.
Also, I think I've worked out how to do prefetching properly. It's not actually that complex.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 4 |
2 files changed, 7 insertions, 7 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 178771b8c7..6bd3e04657 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -458,10 +458,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}. handle_call({deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, true, false, State), + {ok, Result, State1} = internal_deliver(Q, true, State), reply(Result, State1); handle_call({phantom_deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, false, false, State), + {ok, Result, State1} = internal_deliver(Q, false, State), reply(Result, State1); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> {Reply, State1} = @@ -887,7 +887,7 @@ cache_is_full(#dqstate { message_cache = Cache }) -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, ReadMsg, FakeDeliver, +internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, empty, State}; @@ -895,7 +895,7 @@ internal_deliver(Q, ReadMsg, FakeDeliver, Remaining = WriteSeqId - ReadSeqId - 1, {ok, Result, State1} = internal_read_message( - Q, ReadSeqId, ReadMsg, FakeDeliver, false, State), + Q, ReadSeqId, ReadMsg, false, false, State), true = ets:insert(Sequences, {Q, ReadSeqId+1, WriteSeqId}), {ok, @@ -979,7 +979,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) - end. internal_auto_ack(Q, State) -> - case internal_deliver(Q, false, true, State) of + case internal_deliver(Q, false, State) of {ok, empty, State1} -> {ok, State1}; {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining}, State1} -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 3b86596b6a..50b75789bd 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -282,7 +282,7 @@ publish_delivered(Msg = memory_size = QSize, memory_gain = Gain }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> Persist = IsDurable andalso IsPersistent, - rabbit_disk_queue:publish(Q, Msg, false), + ok = rabbit_disk_queue:publish(Q, Msg, true), MsgSize = size_of_message(Msg), State1 = State #mqstate { memory_size = QSize + MsgSize, memory_gain = Gain + MsgSize }, @@ -291,7 +291,7 @@ publish_delivered(Msg = %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag - {MsgId, IsPersistent, false, AckTag, 0} = + {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), {ok, AckTag, State1}; false -> |
