summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-04-06 12:43:52 +0100
committerRob Harrop <rob@rabbitmq.com>2011-04-06 12:43:52 +0100
commit9b966d0af5da4cc4c52ec583f5de981b1d81ae16 (patch)
treebe169af5b4696d6829a3d716504f6ec9fb5ba08c /src
parent038aa54e50a6197f4b7546da0e18fa3692a96821 (diff)
downloadrabbitmq-server-git-9b966d0af5da4cc4c52ec583f5de981b1d81ae16.tar.gz
Store DLE arg in the queue state
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl41
1 files changed, 25 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2b0fe17e54..1d5d1f87e7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,7 +48,8 @@
stats_timer,
msg_id_to_channel,
ttl,
- ttl_timer_ref
+ ttl_timer_ref,
+ dead_letter_exchange
}).
-record(consumer, {tag, ack_required}).
@@ -99,20 +100,21 @@ init(Q) ->
process_flag(trap_exit, true),
{ok, BQ} = application:get_env(backing_queue_module),
- {ok, #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = dict:new()}, hibernate,
+ {ok, #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = undefined,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ dead_letter_exchange = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -180,12 +182,19 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
undefined -> State1
end
end, State, [{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-dead-letter-exchange">>,
+ fun init_dead_letter_exchange/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_dead_letter_exchange(DLE, State = #q{q = #amqqueue{
+ name = #resource{
+ virtual_host = VHostPath}}}) ->
+ State#q{dead_letter_exchange = rabbit_misc:r(VHostPath, exchange, DLE)}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),