diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 4 |
2 files changed, 7 insertions, 7 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 178771b8c7..6bd3e04657 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -458,10 +458,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}. handle_call({deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, true, false, State), + {ok, Result, State1} = internal_deliver(Q, true, State), reply(Result, State1); handle_call({phantom_deliver, Q}, _From, State) -> - {ok, Result, State1} = internal_deliver(Q, false, false, State), + {ok, Result, State1} = internal_deliver(Q, false, State), reply(Result, State1); handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> {Reply, State1} = @@ -887,7 +887,7 @@ cache_is_full(#dqstate { message_cache = Cache }) -> %% ---- INTERNAL RAW FUNCTIONS ---- -internal_deliver(Q, ReadMsg, FakeDeliver, +internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, empty, State}; @@ -895,7 +895,7 @@ internal_deliver(Q, ReadMsg, FakeDeliver, Remaining = WriteSeqId - ReadSeqId - 1, {ok, Result, State1} = internal_read_message( - Q, ReadSeqId, ReadMsg, FakeDeliver, false, State), + Q, ReadSeqId, ReadMsg, false, false, State), true = ets:insert(Sequences, {Q, ReadSeqId+1, WriteSeqId}), {ok, @@ -979,7 +979,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) - end. internal_auto_ack(Q, State) -> - case internal_deliver(Q, false, true, State) of + case internal_deliver(Q, false, State) of {ok, empty, State1} -> {ok, State1}; {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining}, State1} -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 3b86596b6a..50b75789bd 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -282,7 +282,7 @@ publish_delivered(Msg = memory_size = QSize, memory_gain = Gain }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> Persist = IsDurable andalso IsPersistent, - rabbit_disk_queue:publish(Q, Msg, false), + ok = rabbit_disk_queue:publish(Q, Msg, true), MsgSize = size_of_message(Msg), State1 = State #mqstate { memory_size = QSize + MsgSize, memory_gain = Gain + MsgSize }, @@ -291,7 +291,7 @@ publish_delivered(Msg = %% must call phantom_deliver otherwise the msg remains at %% the head of the queue. This is synchronous, but %% unavoidable as we need the AckTag - {MsgId, IsPersistent, false, AckTag, 0} = + {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), {ok, AckTag, State1}; false -> |
