summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl48
1 files changed, 23 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4ba1330388..db812b80e4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,7 +49,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
- dead_letter_exchange
+ dlx
}).
-record(consumer, {tag, ack_required}).
@@ -99,21 +99,21 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- dead_letter_exchange = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = dict:new()}, hibernate,
+ {ok, #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = backing_queue_module(Q),
+ backing_queue_state = undefined,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ dlx = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -184,16 +184,15 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
end, State, [{<<"x-expires">>, fun init_expires/2},
{<<"x-message-ttl">>, fun init_ttl/2},
{<<"x-dead-letter-exchange">>,
- fun init_dead_letter_exchange/2}]).
+ fun init_dlx/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
-init_dead_letter_exchange(DLE, State = #q{q = #amqqueue{
- name = #resource{
- virtual_host = VHostPath}}}) ->
- State#q{dead_letter_exchange = rabbit_misc:r(VHostPath, exchange, DLE)}.
+init_dlx(DLE, State = #q{q = #amqqueue{name = #resource{
+ virtual_host = VHostPath}}}) ->
+ State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLE)}.
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
@@ -745,7 +744,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-dead_letter_callback_fun(_Reason, #q{dead_letter_exchange = undefined}) ->
+dead_letter_callback_fun(_Reason, #q{dlx = undefined}) ->
fun(_MsgFun, LookupState) -> LookupState end;
dead_letter_callback_fun(Reason, State) ->
fun(MsgFun, LookupState) ->
@@ -754,8 +753,7 @@ dead_letter_callback_fun(Reason, State) ->
LookupState1
end.
-maybe_dead_letter_queue(_Reason, State = #q{
- dead_letter_exchange = undefined}) ->
+maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) ->
State;
maybe_dead_letter_queue(Reason, State = #q{
backing_queue_state = BQS,
@@ -768,7 +766,7 @@ maybe_dead_letter_queue(Reason, State = #q{
maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1})
end.
-dead_letter_msg(Msg, Reason, State = #q{dead_letter_exchange = DLE}) ->
+dead_letter_msg(Msg, Reason, State = #q{dlx = DLE}) ->
Exchange = rabbit_exchange:lookup_or_die(DLE),
rabbit_exchange:publish(