diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-08-13 11:46:27 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-08-13 11:46:27 +0100 |
| commit | d1200a6f962bae90fe437e6e3624559f86d96dc2 (patch) | |
| tree | 092ed930f227852221233857fff113df49093c03 | |
| parent | 86bb4c83c1a868023cec79d13185b6fd8ad1f8b3 (diff) | |
| download | rabbitmq-server-git-d1200a6f962bae90fe437e6e3624559f86d96dc2.tar.gz | |
prompt expiry of requeued messages
which may require moving the timer expiry forward
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0250902fb3..283eba7c7a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + ttl_timer_expiry, senders, publish_seqno, unconfirmed, @@ -734,19 +735,23 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). -ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) - when TTL =/= undefined -> - case BQ:is_empty(BQS) of - true -> State; - false -> After = (case Expiry - now_micros() of - V when V > 0 -> V + 999; %% always fire later - _ -> 0 - end) div 1000, - TRef = erlang:send_after(After, self(), drop_expired), - State#q{ttl_timer_ref = TRef} +ensure_ttl_timer(undefined, State) -> + State; +ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) -> + State; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> + After = (case Expiry - now_micros() of + V when V > 0 -> V + 999; %% always fire later + _ -> 0 + end) div 1000, + TRef = erlang:send_after(After, self(), drop_expired), + State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, + ttl_timer_expiry = TExpiry}) + when Expiry < TExpiry -> + case erlang:cancel_timer(TRef) of + false -> State; + _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) end; ensure_ttl_timer(_Expiry, State) -> State. |
