summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-07-31 20:59:15 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-07-31 20:59:15 +0100
commitd6416a72d485c74c05dd0c1bb0d2cd9235ea9c60 (patch)
treed457130b542e038626bc73c42e87d57f71bbe618 /src
parente43f0bddc96ea676ded522b235056f9e3c41fe21 (diff)
downloadrabbitmq-server-git-d6416a72d485c74c05dd0c1bb0d2cd9235ea9c60.tar.gz
Add IndexOnDisk to AckEntry
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl12
1 files changed, 7 insertions, 5 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ecfe3cdac1..f49b10aa87 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -570,7 +570,7 @@ requeue(AckTags, MsgPropsFun, State) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
true, false, State1),
State2;
- ({IsPersistent, MsgId, MsgProps}, State1) ->
+ ({IsPersistent, MsgId, MsgProps, _IndexOnDisk}, State1) ->
#vqstate { msg_store_clients = MSCState } = State1,
{{ok, Msg = #basic_message{}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
@@ -1186,13 +1186,14 @@ record_pending_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk,
msg_props = MsgProps } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true -> {{IsPersistent, MsgId, MsgProps}, RAI};
+ true -> {{IsPersistent, MsgId, MsgProps, IndexOnDisk}, RAI};
false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
@@ -1258,7 +1259,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_id = MsgId },
{PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
-accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
+accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk},
{PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
@@ -1414,9 +1415,10 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
MsgStatus = #msg_status {
msg_id = MsgId, %% ASSERTION
is_persistent = false, %% ASSERTION
- msg_props = MsgProps } = dict:fetch(SeqId, PA),
+ msg_props = MsgProps,
+ index_on_disk = IndexOnDisk } = dict:fetch(SeqId, PA),
{_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA),
+ PA1 = dict:store(SeqId, {false, MsgId, MsgProps, IndexOnDisk}, PA),
limit_ram_acks(Quota - 1,
State1 #vqstate { pending_ack = PA1,
ram_ack_index = RAI1 })