summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-06-13 10:32:45 +0100
committerRob Harrop <rob@rabbitmq.com>2011-06-13 10:32:45 +0100
commit44d75673aa6c6c8c429072124bf4e391b9c93bd7 (patch)
tree0e111b2ac8960710a320a84ab9b66abb9f5d5ef0
parent6e508e646db617d2a98fa650cb0fd1c4877f4f68 (diff)
parentcc4bf8586cfa9d0771ad5c51556f8de972d76782 (diff)
downloadrabbitmq-server-git-44d75673aa6c6c8c429072124bf4e391b9c93bd7.tar.gz
Merge with default
-rw-r--r--src/rabbit_amqqueue.erl31
-rw-r--r--src/rabbit_amqqueue_process.erl126
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_variable_queue.erl46
5 files changed, 165 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c870374084..619ee64170 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -298,28 +298,43 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
[<<"x-expires">>, <<"x-message-ttl">>]).
-check_declare_arguments(QueueName, Args) ->
- [case Fun(rabbit_misc:table_lookup(Args, Key)) of
+check_declare_arguments(QueueName = #resource{virtual_host = VHostPath},
+ Args) ->
+ [case Fun(rabbit_misc:table_lookup(Args, Key), VHostPath) of
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_integer_argument/1},
- {<<"x-message-ttl">>, fun check_integer_argument/1}]],
+ end ||
+ {Key, Fun} <-
+ [{<<"x-expires">>, fun check_integer_argument/2},
+ {<<"x-message-ttl">>, fun check_integer_argument/2},
+ {<<"x-dead-letter-exchange">>, fun check_exchange_argument/2}]],
ok.
-check_integer_argument(undefined) ->
+check_integer_argument(undefined, _VHostPath) ->
ok;
-check_integer_argument({Type, Val}) when Val > 0 ->
+check_integer_argument({Type, Val}, _VHostPath) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, Val}) ->
+check_integer_argument({_Type, Val}, _VHostPath) ->
{error, {value_zero_or_less, Val}}.
+check_exchange_argument(undefined, _VHostPath) ->
+ ok;
+check_exchange_argument({longstr, Val}, VHostPath) ->
+ case rabbit_exchange:lookup(rabbit_misc:r(VHostPath, exchange, Val)) of
+ {ok, _Exchange} -> ok;
+ {error, not_found} -> {error, {non_existent_exchange, Val}}
+ end;
+check_exchange_argument({Type, _Val}, _VHostPath) ->
+ {error, {unacceptable_type, Type}}.
+
+
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1e5ad3490c..1c3277d6e4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,7 +48,8 @@
stats_timer,
msg_id_to_channel,
ttl,
- ttl_timer_ref
+ ttl_timer_ref,
+ dead_letter_exchange
}).
-record(consumer, {tag, ack_required}).
@@ -98,20 +99,21 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = dict:new()}, hibernate,
+ {ok, #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = backing_queue_module(Q),
+ backing_queue_state = undefined,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ dead_letter_exchange = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -119,16 +121,18 @@ terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(Reason, State = #q{backing_queue = BQ}) ->
+ State1 = maybe_dead_letter_queue(queue_deleted, State),
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
+
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(qname(State)),
+ rabbit_amqqueue:internal_delete(qname(State1)),
BQS1
- end, State).
+ end, State1).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -178,12 +182,19 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
undefined -> State1
end
end, State, [{<<"x-expires">>, fun init_expires/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-dead-letter-exchange">>,
+ fun init_dead_letter_exchange/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_dead_letter_exchange(DLE, State = #q{q = #amqqueue{
+ name = #resource{
+ virtual_host = VHostPath}}}) ->
+ State#q{dead_letter_exchange = rabbit_misc:r(VHostPath, exchange, DLE)}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
stop_sync_timer(stop_rate_timer(State)),
@@ -716,6 +727,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ dead_letter_callback_fun(expired, State),
BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
@@ -733,6 +745,69 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+dead_letter_callback_fun(_Reason, #q{dead_letter_exchange = undefined}) ->
+ fun(_MsgFun, LookupState) -> LookupState end;
+dead_letter_callback_fun(Reason, State) ->
+ fun(MsgFun, LookupState) ->
+ {Msg, LookupState1} = MsgFun(LookupState),
+ dead_letter_msg(Msg, Reason, State),
+ LookupState1
+ end.
+
+maybe_dead_letter_queue(_Reason, State = #q{
+ dead_letter_exchange = undefined}) ->
+ State;
+maybe_dead_letter_queue(Reason, State = #q{
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ case BQ:fetch(false, BQS) of
+ {empty, BQS1} ->
+ State#q{backing_queue_state = BQS1};
+ {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} ->
+ dead_letter_msg(Msg, Reason, State),
+ maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1})
+ end.
+
+dead_letter_msg(Msg, Reason, State = #q{dead_letter_exchange = DLE}) ->
+ %% Should this be lookup_or_die? Do we really want to stop the
+ %% message from being discarded if the exchange is not there?
+ Exchange = rabbit_exchange:lookup_or_die(DLE),
+
+ %% Should do something with the routing result here, but what?
+ %% Are we going to stop the message from being discarded if
+ %% unroutable? At the least we should write to the error log if
+ %% the routing fails.
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(false, false, none,
+ make_dead_letter_msg(Reason, Msg, State),
+ undefined)),
+ ok.
+
+make_dead_letter_msg(Reason,
+ Msg = #basic_message{
+ content = Content = #content{
+ properties = Props = #'P_basic'{
+ headers = Headers}}},
+ State) ->
+
+ #resource{name = QName} = qname(State),
+
+ DeathHeaders = [{<<"x-death-reason">>, longstr,
+ list_to_binary(atom_to_list(Reason))},
+ {<<"x-death-queue">>, longstr, QName}],
+
+ Headers1 = case Headers of
+ undefined -> DeathHeaders;
+ _ -> Headers ++ DeathHeaders
+ end,
+ Content1 =
+ rabbit_binary_generator:clear_encoded_content(
+ Content#content{properties = Props#'P_basic'{headers = Headers1}}),
+
+ Msg#basic_message{id = rabbit_guid:guid(), content = Content1}.
+
+
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1024,10 +1099,11 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
{stop, normal, {ok, BQ:len(BQS)}, State}
end;
-handle_call(purge, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+handle_call(purge, _From, State = #q{backing_queue = BQ}) ->
+ State1 = #q{backing_queue_state = BQS} =
+ maybe_dead_letter_queue(queue_purged, State),
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ reply({ok, Count}, State1#q{backing_queue_state = BQS1});
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1064,7 +1140,9 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags,
+ fun(_, BQS0) -> BQS0 end,
+ BQS),
{NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
@@ -1085,7 +1163,9 @@ handle_cast({reject, AckTags, Requeue, ChPid},
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ false -> Fun = dead_letter_callback_fun(
+ rejected, State),
+ {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS),
State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 217ad3eb5b..3d7fb4895d 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -98,14 +98,14 @@ behaviour_info(callbacks) ->
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
- {dropwhile, 2},
+ {dropwhile, 3},
%% Produce the next message.
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
- {ack, 2},
+ {ack, 3},
%% A publish, but in the context of a transaction.
{tx_publish, 5},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3f4aa54e7f..41053aeee9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2175,7 +2175,9 @@ test_dropwhile(VQ0) ->
VQ2 = rabbit_variable_queue:dropwhile(
fun(#message_properties { expiry = Expiry }) ->
Expiry =< 5
- end, VQ1),
+ end,
+ fun(_Msg) -> ok end,
+ VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a167cca0c5..f75095346f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,8 +18,8 @@
-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1, dropwhile/2,
+ fetch/2, ack/3, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/3, discard/3,
@@ -559,18 +559,22 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
drain_confirmed(State = #vqstate { confirmed = C }) ->
{gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
-dropwhile(Pred, State) ->
- {_OkOrEmpty, State1} = dropwhile1(Pred, State),
+dropwhile(Pred, DropFun, State) ->
+ {_OkOrEmpty, State1} = dropwhile1(Pred, DropFun, State),
a(State1).
-dropwhile1(Pred, State) ->
+dropwhile1(Pred, DropFun, State) ->
internal_queue_out(
fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
case Pred(MsgProps) of
- true -> {_, State2} = internal_fetch(false, MsgStatus,
- State1),
- dropwhile1(Pred, State2);
- false -> {ok, in_r(MsgStatus, State1)}
+ true ->
+ {MsgStatus1, State2} =
+ DropFun(read_msg_callback(), {MsgStatus, State1}),
+
+ {_, State3} = internal_fetch(false, MsgStatus1, State2),
+ dropwhile1(Pred, DropFun, State3);
+ false ->
+ {ok, in_r(MsgStatus, State1)}
end
end, State).
@@ -592,6 +596,7 @@ fetch(AckRequired, State) ->
internal_fetch(AckRequired, MsgStatus1, State2)
end, State).
+
internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
case queue:out(Q4) of
{empty, _Q4} ->
@@ -603,6 +608,19 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
Fun(MsgStatus, State #vqstate { q4 = Q4a })
end.
+read_msg_callback() ->
+ fun({MsgStatus = #msg_status {}, State}) ->
+ {MsgStatus1 = #msg_status { msg = Msg }, State1} =
+ read_msg(MsgStatus, State),
+ {Msg, {MsgStatus1, State1}};
+ ({{IsPersistent, MsgId, _MsgProps}, State}) ->
+ #vqstate { msg_store_clients = MSCState } = State,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, {undefined, State #vqstate {
+ msg_store_clients = MSCState1 }}}
+ end.
+
read_msg(MsgStatus = #msg_status { msg = undefined,
msg_id = MsgId,
is_persistent = IsPersistent },
@@ -668,9 +686,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
len = Len1,
persistent_count = PCount1 })}.
-ack(AckTags, State) ->
+ack(AckTags, Fun, State) ->
{MsgIds, State1} = ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
+ fun (MsgStatus = #msg_status {}, State0) ->
+ {_, State2} = Fun(read_msg_callback(),
+ {MsgStatus, State0}),
+ State2
+ end,
AckTags, State),
{MsgIds, a(State1)}.
@@ -1207,7 +1229,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Acks = lists:append(SAcks),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
- {_MsgIds, State1} = ack(Acks, State),
+ {_MsgIds, State1} = ack(Acks, fun(_, State0) -> State0 end, State),
{SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },