summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2011-06-08 15:13:14 +0100
committerRob Harrop <rob@rabbitmq.com>2011-06-08 15:13:14 +0100
commitcc4bf8586cfa9d0771ad5c51556f8de972d76782 (patch)
treed3d62ee68654e653badac4214fcccee83e5af7f9 /src
parentcc97fb50de0ec36c8fd40ccbfb75c122e2b52acf (diff)
downloadrabbitmq-server-git-cc4bf8586cfa9d0771ad5c51556f8de972d76782.tar.gz
Support for DL'ing messages that are rejected with requeue=false
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl25
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_variable_queue.erl25
3 files changed, 32 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cef37758c2..91802018f3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -729,7 +729,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- dead_letter_drop_fun(expired, State),
+ dead_letter_callback_fun(expired, State),
BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
@@ -747,9 +747,9 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-dead_letter_drop_fun(_Reason, #q{dead_letter_exchange = undefined}) ->
+dead_letter_callback_fun(_Reason, #q{dead_letter_exchange = undefined}) ->
fun(_MsgFun, LookupState) -> LookupState end;
-dead_letter_drop_fun(Reason, State) ->
+dead_letter_callback_fun(Reason, State) ->
fun(MsgFun, LookupState) ->
{Msg, LookupState1} = MsgFun(LookupState),
dead_letter_msg(Msg, Reason, State),
@@ -761,8 +761,7 @@ maybe_dead_letter_queue(_Reason, State = #q{
State;
maybe_dead_letter_queue(Reason, State = #q{
backing_queue_state = BQS,
- backing_queue = BQ,
- dead_letter_exchange = DLE}) ->
+ backing_queue = BQ}) ->
case BQ:fetch(false, BQS) of
{empty, BQS1} ->
State#q{backing_queue_state = BQS1};
@@ -771,7 +770,6 @@ maybe_dead_letter_queue(Reason, State = #q{
maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1})
end.
-
dead_letter_msg(Msg, Reason, State = #q{dead_letter_exchange = DLE}) ->
%% Should this be lookup_or_die? Do we really want to stop the
%% message from being discarded if the exchange is not there?
@@ -1103,10 +1101,11 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
{stop, normal, {ok, BQ:len(BQS)}, State}
end;
-handle_call(purge, _From, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+handle_call(purge, _From, State = #q{backing_queue = BQ}) ->
+ State1 = #q{backing_queue_state = BQS} =
+ maybe_dead_letter_queue(queue_purged, State),
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ reply({ok, Count}, State1#q{backing_queue_state = BQS1});
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1143,7 +1142,9 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags,
+ fun(_, BQS0) -> BQS0 end,
+ BQS),
{NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
@@ -1164,7 +1165,9 @@ handle_cast({reject, AckTags, Requeue, ChPid},
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ false -> Fun = dead_letter_callback_fun(
+ rejected, State),
+ {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS),
State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 96aeb4cad8..3d7fb4895d 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -105,7 +105,7 @@ behaviour_info(callbacks) ->
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
- {ack, 2},
+ {ack, 3},
%% A publish, but in the context of a transaction.
{tx_publish, 5},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a4c51fde1e..f75095346f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
+ fetch/2, ack/3, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/3,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
@@ -565,8 +565,7 @@ dropwhile(Pred, DropFun, State) ->
dropwhile1(Pred, DropFun, State) ->
internal_queue_out(
- fun(MsgStatus = #msg_status { msg_props = MsgProps,
- msg = Msg }, State1) ->
+ fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) ->
case Pred(MsgProps) of
true ->
{MsgStatus1, State2} =
@@ -610,10 +609,16 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) ->
end.
read_msg_callback() ->
- fun({MsgStatus, State}) ->
+ fun({MsgStatus = #msg_status {}, State}) ->
{MsgStatus1 = #msg_status { msg = Msg }, State1} =
read_msg(MsgStatus, State),
- {Msg, {MsgStatus1, State1}}
+ {Msg, {MsgStatus1, State1}};
+ ({{IsPersistent, MsgId, _MsgProps}, State}) ->
+ #vqstate { msg_store_clients = MSCState } = State,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, {undefined, State #vqstate {
+ msg_store_clients = MSCState1 }}}
end.
read_msg(MsgStatus = #msg_status { msg = undefined,
@@ -681,9 +686,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
len = Len1,
persistent_count = PCount1 })}.
-ack(AckTags, State) ->
+ack(AckTags, Fun, State) ->
{MsgIds, State1} = ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
+ fun (MsgStatus = #msg_status {}, State0) ->
+ {_, State2} = Fun(read_msg_callback(),
+ {MsgStatus, State0}),
+ State2
+ end,
AckTags, State),
{MsgIds, a(State1)}.
@@ -1220,7 +1229,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Acks = lists:append(SAcks),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
- {_MsgIds, State1} = ack(Acks, State),
+ {_MsgIds, State1} = ack(Acks, fun(_, State0) -> State0 end, State),
{SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },