diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-23 16:37:28 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-23 16:37:28 +0100 |
| commit | 9b3d19faf258c10cc4127ceeab684139516b1e8c (patch) | |
| tree | e2a338fe48224788a6f694f20ef8d8c4a812c43c | |
| parent | ec82e9b796ad3cbbf25289ebac8bd4b8c2f7d348 (diff) | |
| parent | 72c55d7baa733915e01bd3e9bd1528c2447d7f89 (diff) | |
| download | rabbitmq-server-git-9b3d19faf258c10cc4127ceeab684139516b1e8c.tar.gz | |
merge emile qc changes
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
4 files changed, 21 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a370a25e23..0cf7de4001 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -696,14 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ }) -> Now = now_micros(), - {Msgs, BQS1} = - BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - true, BQS), DLXFun = dead_letter_fun(expired, State), - lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), + ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + case DLXFun of + undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS), + BQS1; + _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach( + fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), + BQS1 + end, ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -720,9 +724,7 @@ ensure_ttl_timer(State) -> State. dead_letter_fun(_Reason, #q{dlx = undefined}) -> - fun(_Msg, _AckTag) -> - ok - end; + undefined; dead_letter_fun(Reason, _State) -> fun(Msg, AckTag) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 150c25513f..28c57bb0f3 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -120,8 +120,8 @@ %% Drop messages from the head of the queue while the supplied predicate returns %% true. Also accepts a boolean parameter that determines whether the messages -%% are to be acked or not. If they are, the messages and the acktags are -%% returned. +%% necessitate an ack or not. If they do, the function returns a list of +%% messages with the respective acktags. -callback dropwhile(msg_pred(), true, state()) -> {[{rabbit_types:basic_message(), ack()}], state()}; (msg_pred(), false, state()) diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 3afa5b6022..551fdf182f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -168,13 +168,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Pred, AckMsgs, +dropwhile(Pred, AckRequired, State = #state{gm = GM, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Msgs, BQS1} = BQ:dropwhile(Pred, AckMsgs, BQS), + {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), ok = gm:broadcast(GM, {set_length, Len1}), Dropped = Len - Len1, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c418cc4d5f..ddbe6bcc6b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -578,24 +578,24 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, AckMsgs, State) -> - End = fun(S) when AckMsgs -> {[], S}; - (S) -> {undefined, S} +dropwhile(Pred, AckRequired, State) -> + End = fun(S) when AckRequired -> {[], S}; + (S) -> {undefined, S} end, case queue_out(State) of {empty, State1} -> End(a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), AckMsgs} of + case {Pred(MsgProps), AckRequired} of {true, true} -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), {{Msg, _, AckTag, _}, State3} = internal_fetch(true, MsgStatus1, State2), - {L, State4} = dropwhile(Pred, AckMsgs, State3), + {L, State4} = dropwhile(Pred, AckRequired, State3), {[{Msg, AckTag} | L], State4}; {true, false} -> {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, AckMsgs, State2); + dropwhile(Pred, AckRequired, State2); {false, _} -> End(a(in_r(MsgStatus, State1))) end |
