summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-13 16:05:22 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-13 16:05:22 +0200
commit1f38a7e780f8730bc5d8ea421c85e8ebd2c355ef (patch)
tree49611d4a9abc7694a6e334a4cd0f3a7efdd9636a /src
parent9257e753216065c5183be92d1cc086d24aabefa6 (diff)
downloadrabbitmq-server-git-1f38a7e780f8730bc5d8ea421c85e8ebd2c355ef.tar.gz
implements requeue for lazy queues
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl20
1 files changed, 19 insertions, 1 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 45ba72edba..72741ab6f0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -672,7 +672,8 @@ ack(AckTags, State) ->
a(State1 #vqstate { index_state = IndexState1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-requeue(AckTags, #vqstate { delta = Delta,
+requeue(AckTags, #vqstate { mode = default,
+ delta = Delta,
q3 = Q3,
q4 = Q4,
in_counter = InCounter,
@@ -692,6 +693,23 @@ requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3a,
q4 = Q4a,
in_counter = InCounter + MsgCount,
+ len = Len + MsgCount })))};
+requeue(AckTags, #vqstate { mode = lazy,
+ delta = Delta,
+ q3 = Q3,
+ in_counter = InCounter,
+ len = Len } = State) ->
+ {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [],
+ delta_limit(Delta),
+ fun publish_beta/2, State),
+ {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
+ State1),
+ MsgCount = length(MsgIds1),
+ {MsgIds1, a(reduce_memory_use(
+ maybe_update_rates(
+ State2 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ in_counter = InCounter + MsgCount,
len = Len + MsgCount })))}.
ackfold(MsgFun, Acc, State, AckTags) ->