diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-07-31 20:59:15 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-07-31 20:59:15 +0100 |
| commit | d6416a72d485c74c05dd0c1bb0d2cd9235ea9c60 (patch) | |
| tree | d457130b542e038626bc73c42e87d57f71bbe618 /src | |
| parent | e43f0bddc96ea676ded522b235056f9e3c41fe21 (diff) | |
| download | rabbitmq-server-git-d6416a72d485c74c05dd0c1bb0d2cd9235ea9c60.tar.gz | |
Add IndexOnDisk to AckEntry
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ecfe3cdac1..f49b10aa87 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -570,7 +570,7 @@ requeue(AckTags, MsgPropsFun, State) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), true, false, State1), State2; - ({IsPersistent, MsgId, MsgProps}, State1) -> + ({IsPersistent, MsgId, MsgProps, _IndexOnDisk}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), @@ -1186,13 +1186,14 @@ record_pending_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk, msg_props = MsgProps } = MsgStatus, State = #vqstate { pending_ack = PA, ram_ack_index = RAI, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> {{IsPersistent, MsgId, MsgProps}, RAI}; + true -> {{IsPersistent, MsgId, MsgProps, IndexOnDisk}, RAI}; false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), @@ -1258,7 +1259,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_id = MsgId }, {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; -accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, +accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk}, {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), @@ -1414,9 +1415,10 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, MsgStatus = #msg_status { msg_id = MsgId, %% ASSERTION is_persistent = false, %% ASSERTION - msg_props = MsgProps } = dict:fetch(SeqId, PA), + msg_props = MsgProps, + index_on_disk = IndexOnDisk } = dict:fetch(SeqId, PA), {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA), + PA1 = dict:store(SeqId, {false, MsgId, MsgProps, IndexOnDisk}, PA), limit_ram_acks(Quota - 1, State1 #vqstate { pending_ack = PA1, ram_ack_index = RAI1 }) |
