summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl11
-rw-r--r--src/rabbit_mixed_queue.erl8
2 files changed, 10 insertions, 9 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 2bc40123df..71d812f663 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -231,9 +231,9 @@
-spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
{'empty' | {msg_id(), binary(), non_neg_integer(),
- bool(), {msg_id(), seq_id()}}}).
+ bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
-spec(phantom_deliver/1 :: (queue_name()) ->
- { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}).
+ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok').
@@ -546,7 +546,8 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
[{MsgId, _RefCount, File, Offset, TotalSize}] =
dets_ets_lookup(State, MsgId),
- true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Length - 1}),
+ Remaining = Length - 1,
+ true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Remaining}),
ok =
if Delivered -> ok;
true ->
@@ -557,9 +558,9 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
{FileHdl, State1} = get_read_handle(File, State),
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
- {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining},
State1};
- true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
+ true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}, State}
end
end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index c909e2a5a9..811d140a4e 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -68,19 +68,19 @@ publish(MsgId, Msg, IsPersistent,
deliver(State = #mqstate { mode = disk, queue = Q }) ->
{rabbit_disk_queue:deliver(Q), State};
-deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) ->
+deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write_seq = NextWrite }) ->
{Result, MsgBuf2} = queue:out(MsgBuf),
case Result of
empty ->
{empty, State};
- {value, {_Seq, {MsgId, Msg, IsPersistent}}} ->
+ {value, {Seq, {MsgId, Msg, IsPersistent}}} ->
{IsDelivered, Ack} =
if IsPersistent ->
- {MsgId, IsDelivered2, Ack2} = rabbit_disk_queue:phantom_deliver(Q),
+ {MsgId, IsDelivered2, Ack2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q),
{IsDelivered2, Ack2};
true -> {false, noack}
end,
- {{MsgId, Msg, size(Msg), IsDelivered, Ack},
+ {{MsgId, Msg, size(Msg), IsDelivered, Ack, (NextWrite - 1 - Seq)},
State #mqstate { msg_buf = MsgBuf2 }}
end.