diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-13 16:05:22 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-13 16:05:22 +0200 |
| commit | 1f38a7e780f8730bc5d8ea421c85e8ebd2c355ef (patch) | |
| tree | 49611d4a9abc7694a6e334a4cd0f3a7efdd9636a /src | |
| parent | 9257e753216065c5183be92d1cc086d24aabefa6 (diff) | |
| download | rabbitmq-server-git-1f38a7e780f8730bc5d8ea421c85e8ebd2c355ef.tar.gz | |
implements requeue for lazy queues
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 20 |
1 files changed, 19 insertions, 1 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 45ba72edba..72741ab6f0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -672,7 +672,8 @@ ack(AckTags, State) -> a(State1 #vqstate { index_state = IndexState1, ack_out_counter = AckOutCount + length(AckTags) })}. -requeue(AckTags, #vqstate { delta = Delta, +requeue(AckTags, #vqstate { mode = default, + delta = Delta, q3 = Q3, q4 = Q4, in_counter = InCounter, @@ -692,6 +693,23 @@ requeue(AckTags, #vqstate { delta = Delta, q3 = Q3a, q4 = Q4a, in_counter = InCounter + MsgCount, + len = Len + MsgCount })))}; +requeue(AckTags, #vqstate { mode = lazy, + delta = Delta, + q3 = Q3, + in_counter = InCounter, + len = Len } = State) -> + {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [], + delta_limit(Delta), + fun publish_beta/2, State), + {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, + State1), + MsgCount = length(MsgIds1), + {MsgIds1, a(reduce_memory_use( + maybe_update_rates( + State2 #vqstate { delta = Delta1, + q3 = Q3a, + in_counter = InCounter + MsgCount, len = Len + MsgCount })))}. ackfold(MsgFun, Acc, State, AckTags) -> |
