diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2011-06-08 15:13:14 +0100 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2011-06-08 15:13:14 +0100 |
| commit | cc4bf8586cfa9d0771ad5c51556f8de972d76782 (patch) | |
| tree | d3d62ee68654e653badac4214fcccee83e5af7f9 /src | |
| parent | cc97fb50de0ec36c8fd40ccbfb75c122e2b52acf (diff) | |
| download | rabbitmq-server-git-cc4bf8586cfa9d0771ad5c51556f8de972d76782.tar.gz | |
Support for DL'ing messages that are rejected with requeue=false
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 25 |
3 files changed, 32 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cef37758c2..91802018f3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -729,7 +729,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - dead_letter_drop_fun(expired, State), + dead_letter_callback_fun(expired, State), BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -747,9 +747,9 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. -dead_letter_drop_fun(_Reason, #q{dead_letter_exchange = undefined}) -> +dead_letter_callback_fun(_Reason, #q{dead_letter_exchange = undefined}) -> fun(_MsgFun, LookupState) -> LookupState end; -dead_letter_drop_fun(Reason, State) -> +dead_letter_callback_fun(Reason, State) -> fun(MsgFun, LookupState) -> {Msg, LookupState1} = MsgFun(LookupState), dead_letter_msg(Msg, Reason, State), @@ -761,8 +761,7 @@ maybe_dead_letter_queue(_Reason, State = #q{ State; maybe_dead_letter_queue(Reason, State = #q{ backing_queue_state = BQS, - backing_queue = BQ, - dead_letter_exchange = DLE}) -> + backing_queue = BQ}) -> case BQ:fetch(false, BQS) of {empty, BQS1} -> State#q{backing_queue_state = BQS1}; @@ -771,7 +770,6 @@ maybe_dead_letter_queue(Reason, State = #q{ maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1}) end. - dead_letter_msg(Msg, Reason, State = #q{dead_letter_exchange = DLE}) -> %% Should this be lookup_or_die? Do we really want to stop the %% message from being discarded if the exchange is not there? @@ -1103,10 +1101,11 @@ handle_call({delete, IfUnused, IfEmpty}, _From, {stop, normal, {ok, BQ:len(BQS)}, State} end; -handle_call(purge, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +handle_call(purge, _From, State = #q{backing_queue = BQ}) -> + State1 = #q{backing_queue_state = BQS} = + maybe_dead_letter_queue(queue_purged, State), {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + reply({ok, Count}, State1#q{backing_queue_state = BQS1}); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1143,7 +1142,9 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - {_Guids, BQS1} = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = BQ:ack(AckTags, + fun(_, BQS0) -> BQS0 end, + BQS), {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, @@ -1164,7 +1165,9 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + false -> Fun = dead_letter_callback_fun( + rejected, State), + {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS), State#q{backing_queue_state = BQS1} end) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 96aeb4cad8..3d7fb4895d 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -105,7 +105,7 @@ behaviour_info(callbacks) -> %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as Acks. - {ack, 2}, + {ack, 3}, %% A publish, but in the context of a transaction. {tx_publish, 5}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a4c51fde1e..f75095346f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, + fetch/2, ack/3, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, @@ -565,8 +565,7 @@ dropwhile(Pred, DropFun, State) -> dropwhile1(Pred, DropFun, State) -> internal_queue_out( - fun(MsgStatus = #msg_status { msg_props = MsgProps, - msg = Msg }, State1) -> + fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> case Pred(MsgProps) of true -> {MsgStatus1, State2} = @@ -610,10 +609,16 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> end. read_msg_callback() -> - fun({MsgStatus, State}) -> + fun({MsgStatus = #msg_status {}, State}) -> {MsgStatus1 = #msg_status { msg = Msg }, State1} = read_msg(MsgStatus, State), - {Msg, {MsgStatus1, State1}} + {Msg, {MsgStatus1, State1}}; + ({{IsPersistent, MsgId, _MsgProps}, State}) -> + #vqstate { msg_store_clients = MSCState } = State, + {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, {undefined, State #vqstate { + msg_store_clients = MSCState1 }}} end. read_msg(MsgStatus = #msg_status { msg = undefined, @@ -681,9 +686,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { len = Len1, persistent_count = PCount1 })}. -ack(AckTags, State) -> +ack(AckTags, Fun, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, + fun (MsgStatus = #msg_status {}, State0) -> + {_, State2} = Fun(read_msg_callback(), + {MsgStatus, State0}), + State2 + end, AckTags, State), {MsgIds, a(State1)}. @@ -1220,7 +1229,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Acks = lists:append(SAcks), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], - {_MsgIds, State1} = ack(Acks, State), + {_MsgIds, State1} = ack(Acks, fun(_, State0) -> State0 end, State), {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, |
