diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-09-10 01:37:03 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-09-10 01:37:03 +0300 |
| commit | 2eec79191f25368fe60435d5829c028e57426841 (patch) | |
| tree | 00cb8834ca081f79e7aa45bdb8c78a1d03202918 | |
| parent | ae63a44aa8fca0dc388f680052b6d934f6eb2189 (diff) | |
| parent | 1393a458941e9b1193c51366b9a9bce9cf088d36 (diff) | |
| download | rabbitmq-server-git-2eec79191f25368fe60435d5829c028e57426841.tar.gz | |
Merge branch 'stable'
| -rw-r--r-- | src/rabbit_variable_queue.erl | 29 |
1 files changed, 13 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 15eac57e68..02bf80321a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -695,8 +695,7 @@ ack(AckTags, State) -> {accumulate_ack(MsgStatus, Acc), State3} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], + remove_msgs_by_id(MsgIdsByStore, MSCState), {lists:reverse(AllMsgIds), a(State1 #vqstate { index_state = IndexState1, ack_out_counter = AckOutCount + length(AckTags) })}. @@ -1122,7 +1121,7 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) -> +betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = lists:foldr( fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, @@ -1134,9 +1133,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun false -> MsgStatus = m(beta_msg_status(M)), HaveMsg = msg_in_ram(MsgStatus), Size = msg_size(MsgStatus), - case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA) orelse - gb_trees:is_defined(SeqId, QPA)) of + case is_msg_in_pending_acks(SeqId, State) of false -> {?QUEUE:in_r(MsgStatus, Filtered1), Delivers1, Acks1, RRC + one_if(HaveMsg), @@ -1151,6 +1148,13 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun %% been stored in the QI, thus the message must have been in %% qi_pending_ack, thus it must already have been in RAM. +is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)). + expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); expand_delta(SeqId, #delta { start_seq_id = StartSeqId, @@ -1436,9 +1440,7 @@ remove_queue_entries(Q, DelsAndAcksFun, {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, {orddict:new(), [], [], State}, Q), - ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> - msg_store_remove(MSCState, IsPersistent, MsgIds) - end, ok, MsgIdsByStore), + remove_msgs_by_id(MsgIdsByStore, MSCState), DelsAndAcksFun(Delivers, Acks, State1). remove_queue_entries1( @@ -1886,9 +1888,7 @@ next({delta, #delta{start_seq_id = SeqId, next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> - case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of + case is_msg_in_pending_acks(SeqId, State) of false -> Next = {delta, Delta, Rest, State}, {value, beta_msg_status(M), false, Next, IndexState}; true -> next({delta, Delta, Rest, State}, IndexState) @@ -2060,9 +2060,6 @@ maybe_deltas_to_betas(DelsAndAcksFun, index_state = IndexState, ram_msg_count = RamMsgCount, ram_bytes = RamBytes, - ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA, disk_read_count = DiskReadCount, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, @@ -2075,7 +2072,7 @@ maybe_deltas_to_betas(DelsAndAcksFun, IndexState), {Q3a, RamCountsInc, RamBytesInc, State1} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, QPA, DelsAndAcksFun, + DelsAndAcksFun, State #vqstate { index_state = IndexState1 }), State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, ram_bytes = RamBytes + RamBytesInc, |
