diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 48 |
1 files changed, 23 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4ba1330388..db812b80e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,7 +49,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, - dead_letter_exchange + dlx }). -record(consumer, {tag, ack_required}). @@ -99,21 +99,21 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, #q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = backing_queue_module(Q), - backing_queue_state = undefined, - active_consumers = queue:new(), - blocked_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = undefined, - expiry_timer_ref = undefined, - ttl = undefined, - dead_letter_exchange = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = dict:new()}, hibernate, + {ok, #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = backing_queue_module(Q), + backing_queue_state = undefined, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = undefined, + expiry_timer_ref = undefined, + ttl = undefined, + dlx = undefined, + stats_timer = rabbit_event:init_stats_timer(), + msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -184,16 +184,15 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> end, State, [{<<"x-expires">>, fun init_expires/2}, {<<"x-message-ttl">>, fun init_ttl/2}, {<<"x-dead-letter-exchange">>, - fun init_dead_letter_exchange/2}]). + fun init_dlx/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). -init_dead_letter_exchange(DLE, State = #q{q = #amqqueue{ - name = #resource{ - virtual_host = VHostPath}}}) -> - State#q{dead_letter_exchange = rabbit_misc:r(VHostPath, exchange, DLE)}. +init_dlx(DLE, State = #q{q = #amqqueue{name = #resource{ + virtual_host = VHostPath}}}) -> + State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLE)}. terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = @@ -745,7 +744,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. -dead_letter_callback_fun(_Reason, #q{dead_letter_exchange = undefined}) -> +dead_letter_callback_fun(_Reason, #q{dlx = undefined}) -> fun(_MsgFun, LookupState) -> LookupState end; dead_letter_callback_fun(Reason, State) -> fun(MsgFun, LookupState) -> @@ -754,8 +753,7 @@ dead_letter_callback_fun(Reason, State) -> LookupState1 end. -maybe_dead_letter_queue(_Reason, State = #q{ - dead_letter_exchange = undefined}) -> +maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) -> State; maybe_dead_letter_queue(Reason, State = #q{ backing_queue_state = BQS, @@ -768,7 +766,7 @@ maybe_dead_letter_queue(Reason, State = #q{ maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1}) end. -dead_letter_msg(Msg, Reason, State = #q{dead_letter_exchange = DLE}) -> +dead_letter_msg(Msg, Reason, State = #q{dlx = DLE}) -> Exchange = rabbit_exchange:lookup_or_die(DLE), rabbit_exchange:publish( |
