summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-06-07 13:55:05 +0100
committerRob Harrop <rob@rabbitmq.com>2011-06-07 13:55:05 +0100
commit9d36f516d5eeaa303619b45159473fabb3015e2e (patch)
tree757cb911bc6fcfa5780552d7dce5573f2667fd95 /src
parentfebeb8df17ac5a0b024bce2c4f73c1a5b269d0da (diff)
parent39a0d03500351179d771ad80eec8a697ba3ae54a (diff)
downloadrabbitmq-server-git-9d36f516d5eeaa303619b45159473fabb3015e2e.tar.gz
Merge with default and rough outline of DL for ttl expiry
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl31
-rw-r--r--src/rabbit_amqqueue_process.erl77
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_variable_queue.erl17
5 files changed, 98 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c870374084..619ee64170 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -298,28 +298,43 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
[<<"x-expires">>, <<"x-message-ttl">>]).
-check_declare_arguments(QueueName, Args) ->
- [case Fun(rabbit_misc:table_lookup(Args, Key)) of
+check_declare_arguments(QueueName = #resource{virtual_host = VHostPath},
+ Args) ->
+ [case Fun(rabbit_misc:table_lookup(Args, Key), VHostPath) of
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_integer_argument/1},
- {<<"x-message-ttl">>, fun check_integer_argument/1}]],
+ end ||
+ {Key, Fun} <-
+ [{<<"x-expires">>, fun check_integer_argument/2},
+ {<<"x-message-ttl">>, fun check_integer_argument/2},
+ {<<"x-dead-letter-exchange">>, fun check_exchange_argument/2}]],
ok.
-check_integer_argument(undefined) ->
+check_integer_argument(undefined, _VHostPath) ->
ok;
-check_integer_argument({Type, Val}) when Val > 0 ->
+check_integer_argument({Type, Val}, _VHostPath) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, Val}) ->
+check_integer_argument({_Type, Val}, _VHostPath) ->
{error, {value_zero_or_less, Val}}.
+check_exchange_argument(undefined, _VHostPath) ->
+ ok;
+check_exchange_argument({longstr, Val}, VHostPath) ->
+ case rabbit_exchange:lookup(rabbit_misc:r(VHostPath, exchange, Val)) of
+ {ok, _Exchange} -> ok;
+ {error, not_found} -> {error, {non_existent_exchange, Val}}
+ end;
+check_exchange_argument({Type, _Val}, _VHostPath) ->
+ {error, {unacceptable_type, Type}}.
+
+
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 07a24af828..074768f409 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,7 +48,8 @@
stats_timer,
msg_id_to_channel,
ttl,
- ttl_timer_ref
+ ttl_timer_ref,
+ dead_letter_exchange
}).
-record(consumer, {tag, ack_required}).
@@ -98,20 +99,21 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = dict:new()}, hibernate,
+ {ok, #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = backing_queue_module(Q),
+ backing_queue_state = undefined,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ dead_letter_exchange = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -178,12 +180,19 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
undefined -> State1
end
end, State, [{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-dead-letter-exchange">>,
+ fun init_dead_letter_exchange/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_dead_letter_exchange(DLE, State = #q{q = #amqqueue{
+ name = #resource{
+ virtual_host = VHostPath}}}) ->
+ State#q{dead_letter_exchange = rabbit_misc:r(VHostPath, exchange, DLE)}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -718,6 +727,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ fun (Msg) -> maybe_dead_letter(Msg, expired_queue_ttl, State) end,
BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
@@ -735,6 +745,41 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+maybe_dead_letter(Msg, _Reason, #q{dead_letter_exchange = undefined}) ->
+ ok;
+maybe_dead_letter(Msg = #basic_message{content = Content},
+ Reason, #q{dead_letter_exchange = DLE}) ->
+ %% Should this be lookup_or_die? Do we really want to stop the
+ %% message from being discarded if the exchange is not there?
+ Exchange = rabbit_exchange:lookup_or_die(DLE),
+
+ %% Should do something with the routing result here, but what?
+ %% Are we going to stop the message from being discarded if
+ %% unroutable? At the least we should write to the error log if
+ %% the routing fails.
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(false, false, none,
+ record_death_reason(Reason, Msg), undefined)),
+ ok.
+
+record_death_reason(Reason,
+ Msg = #basic_message{
+ content = Content = #content{
+ properties = Props = #'P_basic'{
+ headers = Headers}}}) ->
+ ReasonTuple = {<<"x-death-reason">>, longstr,
+ list_to_binary(atom_to_list(Reason))},
+ Headers1 = case Headers of
+ undefined -> [ReasonTuple];
+ _ -> [ReasonTuple | Headers]
+ end,
+ Msg#basic_message{
+ content = Content#content{
+ properties = Props#'P_basic'{
+ headers = Headers1}}}.
+
+
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 217ad3eb5b..96aeb4cad8 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -98,7 +98,7 @@ behaviour_info(callbacks) ->
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
- {dropwhile, 2},
+ {dropwhile, 3},
%% Produce the next message.
{fetch, 2},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3f4aa54e7f..41053aeee9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2175,7 +2175,9 @@ test_dropwhile(VQ0) ->
VQ2 = rabbit_variable_queue:dropwhile(
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
- end, VQ1),
+ end,
+ fun(_Msg) -> ok end,
+ VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a167cca0c5..c777ad4d02 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,7 +19,7 @@
-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1, dropwhile/2,
+ requeue/3, len/1, is_empty/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/3, discard/3,
@@ -559,17 +559,19 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
drain_confirmed(State = #vqstate { confirmed = C }) ->
{gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
-dropwhile(Pred, State) ->
- {_OkOrEmpty, State1} = dropwhile1(Pred, State),
+dropwhile(Pred, DropFun, State) ->
+ {_OkOrEmpty, State1} = dropwhile1(Pred, DropFun, State),
a(State1).
-dropwhile1(Pred, State) ->
+dropwhile1(Pred, DropFun, State) ->
internal_queue_out(
- fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
+ fun(MsgStatus = #msg_status { msg_props = MsgProps,
+ msg = Msg }, State1) ->
case Pred(MsgProps) of
- true -> {_, State2} = internal_fetch(false, MsgStatus,
+ true -> DropFun(Msg),
+ {_, State2} = internal_fetch(false, MsgStatus,
State1),
- dropwhile1(Pred, State2);
+ dropwhile1(Pred, DropFun, State2);
false -> {ok, in_r(MsgStatus, State1)}
end
end, State).
@@ -592,6 +594,7 @@ fetch(AckRequired, State) ->
internal_fetch(AckRequired, MsgStatus1, State2)
end, State).
+
internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
case queue:out(Q4) of
{empty, _Q4} ->