diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2011-06-08 13:09:43 +0100 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2011-06-08 13:09:43 +0100 |
| commit | 415eaa2552be75a578ae3dacecf1c9ff4ca5fa8b (patch) | |
| tree | fb0d762631a509226302a55e827c7f2b07c7826d | |
| parent | 9d36f516d5eeaa303619b45159473fabb3015e2e (diff) | |
| download | rabbitmq-server-git-415eaa2552be75a578ae3dacecf1c9ff4ca5fa8b.tar.gz | |
Publishing under new GUID now. DL'd messages have x-death-reason and x-death-queue headers for admin purposes. Support DL for queue.delete. Fixed issue when messages not in memory were not loaded back from disk
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 20 |
2 files changed, 68 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 074768f409..cef37758c2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -121,16 +121,18 @@ terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(Reason, State = #q{backing_queue = BQ}) -> + State1 = maybe_dead_letter_queue(queue_deleted, State), %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> + rabbit_event:notify( queue_deleted, [{pid, self()}]), BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. - rabbit_amqqueue:internal_delete(qname(State)), + rabbit_amqqueue:internal_delete(qname(State1)), BQS1 - end, State). + end, State1). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -727,7 +729,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - fun (Msg) -> maybe_dead_letter(Msg, expired_queue_ttl, State) end, + dead_letter_drop_fun(expired, State), BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -745,10 +747,32 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. -maybe_dead_letter(Msg, _Reason, #q{dead_letter_exchange = undefined}) -> - ok; -maybe_dead_letter(Msg = #basic_message{content = Content}, - Reason, #q{dead_letter_exchange = DLE}) -> +dead_letter_drop_fun(_Reason, #q{dead_letter_exchange = undefined}) -> + fun(_MsgFun, LookupState) -> LookupState end; +dead_letter_drop_fun(Reason, State) -> + fun(MsgFun, LookupState) -> + {Msg, LookupState1} = MsgFun(LookupState), + dead_letter_msg(Msg, Reason, State), + LookupState1 + end. + +maybe_dead_letter_queue(_Reason, State = #q{ + dead_letter_exchange = undefined}) -> + State; +maybe_dead_letter_queue(Reason, State = #q{ + backing_queue_state = BQS, + backing_queue = BQ, + dead_letter_exchange = DLE}) -> + case BQ:fetch(false, BQS) of + {empty, BQS1} -> + State#q{backing_queue_state = BQS1}; + {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} -> + dead_letter_msg(Msg, Reason, State), + maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1}) + end. + + +dead_letter_msg(Msg, Reason, State = #q{dead_letter_exchange = DLE}) -> %% Should this be lookup_or_die? Do we really want to stop the %% message from being discarded if the exchange is not there? Exchange = rabbit_exchange:lookup_or_die(DLE), @@ -760,24 +784,32 @@ maybe_dead_letter(Msg = #basic_message{content = Content}, rabbit_exchange:publish( Exchange, rabbit_basic:delivery(false, false, none, - record_death_reason(Reason, Msg), undefined)), + make_dead_letter_msg(Reason, Msg, State), + undefined)), ok. -record_death_reason(Reason, - Msg = #basic_message{ - content = Content = #content{ - properties = Props = #'P_basic'{ - headers = Headers}}}) -> - ReasonTuple = {<<"x-death-reason">>, longstr, - list_to_binary(atom_to_list(Reason))}, +make_dead_letter_msg(Reason, + Msg = #basic_message{ + content = Content = #content{ + properties = Props = #'P_basic'{ + headers = Headers}}}, + State) -> + + #resource{name = QName} = qname(State), + + DeathHeaders = [{<<"x-death-reason">>, longstr, + list_to_binary(atom_to_list(Reason))}, + {<<"x-death-queue">>, longstr, QName}], + Headers1 = case Headers of - undefined -> [ReasonTuple]; - _ -> [ReasonTuple | Headers] + undefined -> DeathHeaders; + _ -> Headers ++ DeathHeaders end, - Msg#basic_message{ - content = Content#content{ - properties = Props#'P_basic'{ - headers = Headers1}}}. + Content1 = + rabbit_binary_generator:clear_encoded_content( + Content#content{properties = Props#'P_basic'{headers = Headers1}}), + + Msg#basic_message{id = rabbit_guid:guid(), content = Content1}. now_micros() -> timer:now_diff(now(), {0,0,0}). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c777ad4d02..a4c51fde1e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -568,11 +568,14 @@ dropwhile1(Pred, DropFun, State) -> fun(MsgStatus = #msg_status { msg_props = MsgProps, msg = Msg }, State1) -> case Pred(MsgProps) of - true -> DropFun(Msg), - {_, State2} = internal_fetch(false, MsgStatus, - State1), - dropwhile1(Pred, DropFun, State2); - false -> {ok, in_r(MsgStatus, State1)} + true -> + {MsgStatus1, State2} = + DropFun(read_msg_callback(), {MsgStatus, State1}), + + {_, State3} = internal_fetch(false, MsgStatus1, State2), + dropwhile1(Pred, DropFun, State3); + false -> + {ok, in_r(MsgStatus, State1)} end end, State). @@ -606,6 +609,13 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. +read_msg_callback() -> + fun({MsgStatus, State}) -> + {MsgStatus1 = #msg_status { msg = Msg }, State1} = + read_msg(MsgStatus, State), + {Msg, {MsgStatus1, State1}} + end. + read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }, |
