diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 8 |
2 files changed, 10 insertions, 9 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 2bc40123df..71d812f663 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -231,9 +231,9 @@ -spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> {'empty' | {msg_id(), binary(), non_neg_integer(), - bool(), {msg_id(), seq_id()}}}). + bool(), {msg_id(), seq_id()}, non_neg_integer()}}). -spec(phantom_deliver/1 :: (queue_name()) -> - { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}). + { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}). -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok'). @@ -546,7 +546,8 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), [{MsgId, _RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), - true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Length - 1}), + Remaining = Length - 1, + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Remaining}), ok = if Delivered -> ok; true -> @@ -557,9 +558,9 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> {FileHdl, State1} = get_read_handle(File, State), {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), - {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining}, State1}; - true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State} + true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining}, State} end end. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index c909e2a5a9..811d140a4e 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -68,19 +68,19 @@ publish(MsgId, Msg, IsPersistent, deliver(State = #mqstate { mode = disk, queue = Q }) -> {rabbit_disk_queue:deliver(Q), State}; -deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) -> +deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, next_write_seq = NextWrite }) -> {Result, MsgBuf2} = queue:out(MsgBuf), case Result of empty -> {empty, State}; - {value, {_Seq, {MsgId, Msg, IsPersistent}}} -> + {value, {Seq, {MsgId, Msg, IsPersistent}}} -> {IsDelivered, Ack} = if IsPersistent -> - {MsgId, IsDelivered2, Ack2} = rabbit_disk_queue:phantom_deliver(Q), + {MsgId, IsDelivered2, Ack2, _PersistRemaining} = rabbit_disk_queue:phantom_deliver(Q), {IsDelivered2, Ack2}; true -> {false, noack} end, - {{MsgId, Msg, size(Msg), IsDelivered, Ack}, + {{MsgId, Msg, size(Msg), IsDelivered, Ack, (NextWrite - 1 - Seq)}, State #mqstate { msg_buf = MsgBuf2 }} end. |
