summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl10
-rw-r--r--src/rabbit_mixed_queue.erl4
2 files changed, 7 insertions, 7 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 178771b8c7..6bd3e04657 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -458,10 +458,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, State2, {binary, ?HIBERNATE_AFTER_MIN}, 0}.
handle_call({deliver, Q}, _From, State) ->
- {ok, Result, State1} = internal_deliver(Q, true, false, State),
+ {ok, Result, State1} = internal_deliver(Q, true, State),
reply(Result, State1);
handle_call({phantom_deliver, Q}, _From, State) ->
- {ok, Result, State1} = internal_deliver(Q, false, false, State),
+ {ok, Result, State1} = internal_deliver(Q, false, State),
reply(Result, State1);
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
{Reply, State1} =
@@ -887,7 +887,7 @@ cache_is_full(#dqstate { message_cache = Cache }) ->
%% ---- INTERNAL RAW FUNCTIONS ----
-internal_deliver(Q, ReadMsg, FakeDeliver,
+internal_deliver(Q, ReadMsg,
State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, empty, State};
@@ -895,7 +895,7 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
Remaining = WriteSeqId - ReadSeqId - 1,
{ok, Result, State1} =
internal_read_message(
- Q, ReadSeqId, ReadMsg, FakeDeliver, false, State),
+ Q, ReadSeqId, ReadMsg, false, false, State),
true = ets:insert(Sequences,
{Q, ReadSeqId+1, WriteSeqId}),
{ok,
@@ -979,7 +979,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
end.
internal_auto_ack(Q, State) ->
- case internal_deliver(Q, false, true, State) of
+ case internal_deliver(Q, false, State) of
{ok, empty, State1} -> {ok, State1};
{ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining},
State1} ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 3b86596b6a..50b75789bd 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -282,7 +282,7 @@ publish_delivered(Msg =
memory_size = QSize, memory_gain = Gain })
when Mode =:= disk orelse (IsDurable andalso IsPersistent) ->
Persist = IsDurable andalso IsPersistent,
- rabbit_disk_queue:publish(Q, Msg, false),
+ ok = rabbit_disk_queue:publish(Q, Msg, true),
MsgSize = size_of_message(Msg),
State1 = State #mqstate { memory_size = QSize + MsgSize,
memory_gain = Gain + MsgSize },
@@ -291,7 +291,7 @@ publish_delivered(Msg =
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
- {MsgId, IsPersistent, false, AckTag, 0} =
+ {MsgId, IsPersistent, true, AckTag, 0} =
rabbit_disk_queue:phantom_deliver(Q),
{ok, AckTag, State1};
false ->