summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-09-10 01:37:03 +0300
committerMichael Klishin <mklishin@pivotal.io>2015-09-10 01:37:03 +0300
commit2eec79191f25368fe60435d5829c028e57426841 (patch)
tree00cb8834ca081f79e7aa45bdb8c78a1d03202918
parentae63a44aa8fca0dc388f680052b6d934f6eb2189 (diff)
parent1393a458941e9b1193c51366b9a9bce9cf088d36 (diff)
downloadrabbitmq-server-git-2eec79191f25368fe60435d5829c028e57426841.tar.gz
Merge branch 'stable'
-rw-r--r--src/rabbit_variable_queue.erl29
1 files changed, 13 insertions, 16 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 15eac57e68..02bf80321a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -695,8 +695,7 @@ ack(AckTags, State) ->
{accumulate_ack(MsgStatus, Acc), State3}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
- [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
{lists:reverse(AllMsgIds),
a(State1 #vqstate { index_state = IndexState1,
ack_out_counter = AckOutCount + length(AckTags) })}.
@@ -1122,7 +1121,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) ->
+betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
{Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
@@ -1134,9 +1133,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
false -> MsgStatus = m(beta_msg_status(M)),
HaveMsg = msg_in_ram(MsgStatus),
Size = msg_size(MsgStatus),
- case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA) orelse
- gb_trees:is_defined(SeqId, QPA)) of
+ case is_msg_in_pending_acks(SeqId, State) of
false -> {?QUEUE:in_r(MsgStatus, Filtered1),
Delivers1, Acks1,
RRC + one_if(HaveMsg),
@@ -1151,6 +1148,13 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
%% been stored in the QI, thus the message must have been in
%% qi_pending_ack, thus it must already have been in RAM.
+is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)).
+
expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
@@ -1436,9 +1440,7 @@ remove_queue_entries(Q, DelsAndAcksFun,
{MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
{orddict:new(), [], [], State}, Q),
- ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
- msg_store_remove(MSCState, IsPersistent, MsgIds)
- end, ok, MsgIdsByStore),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
DelsAndAcksFun(Delivers, Acks, State1).
remove_queue_entries1(
@@ -1886,9 +1888,7 @@ next({delta, #delta{start_seq_id = SeqId,
next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
- case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of
+ case is_msg_in_pending_acks(SeqId, State) of
false -> Next = {delta, Delta, Rest, State},
{value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
@@ -2060,9 +2060,6 @@ maybe_deltas_to_betas(DelsAndAcksFun,
index_state = IndexState,
ram_msg_count = RamMsgCount,
ram_bytes = RamBytes,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA,
disk_read_count = DiskReadCount,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
@@ -2075,7 +2072,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
IndexState),
{Q3a, RamCountsInc, RamBytesInc, State1} =
betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, QPA, DelsAndAcksFun,
+ DelsAndAcksFun,
State #vqstate { index_state = IndexState1 }),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
ram_bytes = RamBytes + RamBytesInc,