diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-08-23 14:58:21 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-08-23 14:58:21 +0100 |
| commit | 85a00f4fbee557d628e62174174da367cddb6cc4 (patch) | |
| tree | 7f1f6f573160d53f6e4d85b788e40d7add1cd46c | |
| parent | 7c151ba0a8f3482d3651e0dff3244f5fb3a02cac (diff) | |
| download | rabbitmq-server-git-85a00f4fbee557d628e62174174da367cddb6cc4.tar.gz | |
More conservative acks
| -rw-r--r-- | src/rabbit_variable_queue.erl | 30 |
1 files changed, 14 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 203e3f8c92..d399e82010 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1014,11 +1014,10 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent, IsDelivered} of - {false, true, false, _, _} -> Rem(), IndexState1; - {false, true, true, _, false} -> Rem(), Ack(); - { true, true, true, false, false} -> Ack(); - _ -> IndexState1 + case {AckRequired, MsgOnDisk, IndexOnDisk} of + {false, true, false} -> Rem(), IndexState1; + {false, true, true} -> Rem(), Ack(); + _ -> IndexState1 end, %% 3. If an ack is required, add something sensible to PA @@ -1188,7 +1187,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} = + {MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1200,7 +1199,7 @@ remove_pending_ack(KeepPersistent, State1 end; false -> IndexState1 = - rabbit_queue_index:ack(PersistentSeqIds, IndexState), + rabbit_queue_index:ack(dict:fetch_keys(PA), IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], State1 #vqstate { index_state = IndexState1 } @@ -1209,7 +1208,7 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, MsgIdsByStore, AllMsgIds}, + {{MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1224,7 +1223,7 @@ ack(MsgStoreFun, Fun, AckTags, State) -> ram_ack_index = gb_trees:delete_any(SeqId, RAI)})} end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), + IndexState1 = rabbit_queue_index:ack(AckTags, IndexState), [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( @@ -1234,18 +1233,17 @@ ack(MsgStoreFun, Fun, AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false, msg_id = MsgId }, - {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; -accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk}, - {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + {MsgIdsByStore, AllMsgIds}) -> + {MsgIdsByStore, [MsgId | AllMsgIds]}; +accumulate_ack(_SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk}, + {MsgIdsByStore, AllMsgIds}) -> + {rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> |
