summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl74
-rw-r--r--src/rabbit_variable_queue.erl20
2 files changed, 68 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 074768f409..cef37758c2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -121,16 +121,18 @@ terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(Reason, State = #q{backing_queue = BQ}) ->
+ State1 = maybe_dead_letter_queue(queue_deleted, State),
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
+
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(qname(State)),
+ rabbit_amqqueue:internal_delete(qname(State1)),
BQS1
- end, State).
+ end, State1).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -727,7 +729,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- fun (Msg) -> maybe_dead_letter(Msg, expired_queue_ttl, State) end,
+ dead_letter_drop_fun(expired, State),
BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
@@ -745,10 +747,32 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-maybe_dead_letter(Msg, _Reason, #q{dead_letter_exchange = undefined}) ->
- ok;
-maybe_dead_letter(Msg = #basic_message{content = Content},
- Reason, #q{dead_letter_exchange = DLE}) ->
+dead_letter_drop_fun(_Reason, #q{dead_letter_exchange = undefined}) ->
+ fun(_MsgFun, LookupState) -> LookupState end;
+dead_letter_drop_fun(Reason, State) ->
+ fun(MsgFun, LookupState) ->
+ {Msg, LookupState1} = MsgFun(LookupState),
+ dead_letter_msg(Msg, Reason, State),
+ LookupState1
+ end.
+
+maybe_dead_letter_queue(_Reason, State = #q{
+ dead_letter_exchange = undefined}) ->
+ State;
+maybe_dead_letter_queue(Reason, State = #q{
+ backing_queue_state = BQS,
+ backing_queue = BQ,
+ dead_letter_exchange = DLE}) ->
+ case BQ:fetch(false, BQS) of
+ {empty, BQS1} ->
+ State#q{backing_queue_state = BQS1};
+ {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} ->
+ dead_letter_msg(Msg, Reason, State),
+ maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1})
+ end.
+
+
+dead_letter_msg(Msg, Reason, State = #q{dead_letter_exchange = DLE}) ->
%% Should this be lookup_or_die? Do we really want to stop the
%% message from being discarded if the exchange is not there?
Exchange = rabbit_exchange:lookup_or_die(DLE),
@@ -760,24 +784,32 @@ maybe_dead_letter(Msg = #basic_message{content = Content},
rabbit_exchange:publish(
Exchange,
rabbit_basic:delivery(false, false, none,
- record_death_reason(Reason, Msg), undefined)),
+ make_dead_letter_msg(Reason, Msg, State),
+ undefined)),
ok.
-record_death_reason(Reason,
- Msg = #basic_message{
- content = Content = #content{
- properties = Props = #'P_basic'{
- headers = Headers}}}) ->
- ReasonTuple = {<<"x-death-reason">>, longstr,
- list_to_binary(atom_to_list(Reason))},
+make_dead_letter_msg(Reason,
+ Msg = #basic_message{
+ content = Content = #content{
+ properties = Props = #'P_basic'{
+ headers = Headers}}},
+ State) ->
+
+ #resource{name = QName} = qname(State),
+
+ DeathHeaders = [{<<"x-death-reason">>, longstr,
+ list_to_binary(atom_to_list(Reason))},
+ {<<"x-death-queue">>, longstr, QName}],
+
Headers1 = case Headers of
- undefined -> [ReasonTuple];
- _ -> [ReasonTuple | Headers]
+ undefined -> DeathHeaders;
+ _ -> Headers ++ DeathHeaders
end,
- Msg#basic_message{
- content = Content#content{
- properties = Props#'P_basic'{
- headers = Headers1}}}.
+ Content1 =
+ rabbit_binary_generator:clear_encoded_content(
+ Content#content{properties = Props#'P_basic'{headers = Headers1}}),
+
+ Msg#basic_message{id = rabbit_guid:guid(), content = Content1}.
now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c777ad4d02..a4c51fde1e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -568,11 +568,14 @@ dropwhile1(Pred, DropFun, State) ->
fun(MsgStatus = #msg_status { msg_props = MsgProps,
msg = Msg }, State1) ->
case Pred(MsgProps) of
- true -> DropFun(Msg),
- {_, State2} = internal_fetch(false, MsgStatus,
- State1),
- dropwhile1(Pred, DropFun, State2);
- false -> {ok, in_r(MsgStatus, State1)}
+ true ->
+ {MsgStatus1, State2} =
+ DropFun(read_msg_callback(), {MsgStatus, State1}),
+
+ {_, State3} = internal_fetch(false, MsgStatus1, State2),
+ dropwhile1(Pred, DropFun, State3);
+ false ->
+ {ok, in_r(MsgStatus, State1)}
end
end, State).
@@ -606,6 +609,13 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
Fun(MsgStatus, State #vqstate { q4 = Q4a })
end.
+read_msg_callback() ->
+ fun({MsgStatus, State}) ->
+ {MsgStatus1 = #msg_status { msg = Msg }, State1} =
+ read_msg(MsgStatus, State),
+ {Msg, {MsgStatus1, State1}}
+ end.
+
read_msg(MsgStatus = #msg_status { msg = undefined,
msg_id = MsgId,
is_persistent = IsPersistent },