diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-22 11:24:39 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-22 11:24:39 +0000 |
| commit | a3d76edcebaf3abf0f1a24b0c2c5acce122467fe (patch) | |
| tree | 45a7edeff99cdfd15b6775c2c52f64688fe5152f | |
| parent | 477e71bbddbd6d146d285519b0d3fe54b0ceb4a3 (diff) | |
| parent | 567c69fb37cc569107b8e779d43d01d929772e5d (diff) | |
| download | rabbitmq-server-git-a3d76edcebaf3abf0f1a24b0c2c5acce122467fe.tar.gz | |
merge stable into default
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 70 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 13 |
8 files changed, 129 insertions, 72 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 68bd1c6c26..dc258fa606 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -730,8 +730,10 @@ drop_expired_messages(State = #q{dlx = DLX, {Next, BQS2}; _ -> {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), - DLXFun = dead_letter_fun(expired), - DLXFun(Msgs), + case Msgs of + [] -> ok; + _ -> (dead_letter_fun(expired))(Msgs) + end, {Next, BQS2} end, ensure_ttl_timer(case Props of @@ -1135,11 +1137,11 @@ handle_call(stat, _From, State) -> handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - IsEmpty = BQ:is_empty(BQS), + IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if - IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); - IfUnused and not(IsUnused) -> reply({error, in_use}, State); + IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); + IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> stop(From, {ok, BQ:len(BQS)}, State) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index af660c60a0..00de3e175c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -29,6 +29,10 @@ ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). +-type(drop_result(Ack) :: + ('empty' | + %% MessageId, AckTag, Remaining_Len + {rabbit_types:msg_id(), Ack, non_neg_integer()})). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: @@ -139,6 +143,10 @@ -callback fetch(true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}. +%% Remove the next message. +-callback drop(true, state()) -> {drop_result(ack()), state()}; + (false, state()) -> {drop_result(undefined), state()}. + %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as Acks. -callback ack([ack()], state()) -> {msg_ids(), state()}. diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index b37fbb29e2..3168ca5c77 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -85,17 +85,19 @@ backing_queue_test(Cmds) -> %% Commands -%% Command frequencies are tuned so that queues are normally reasonably -%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple -%% and purging cause extreme queue lengths, so these have lower probabilities. -%% Fetches are sufficiently frequent so that commands that need acktags -%% get decent coverage. +%% Command frequencies are tuned so that queues are normally +%% reasonably short, but they may sometimes exceed +%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue +%% lengths, so these have lower probabilities. Fetches/drops are +%% sufficiently frequent so that commands that need acktags get decent +%% coverage. command(S) -> frequency([{10, qc_publish(S)}, {1, qc_publish_delivered(S)}, {1, qc_publish_multiple(S)}, %% very slow - {15, qc_fetch(S)}, %% needed for ack and requeue + {9, qc_fetch(S)}, %% needed for ack and requeue + {6, qc_drop(S)}, %% {15, qc_ack(S)}, {15, qc_requeue(S)}, {3, qc_set_ram_duration_target(S)}, @@ -124,6 +126,9 @@ qc_publish_delivered(#state{bqstate = BQ}) -> qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. +qc_drop(#state{bqstate = BQ}) -> + {call, ?BQMOD, drop, [boolean(), BQ]}. + qc_ack(#state{bqstate = BQ, acks = Acks}) -> {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. @@ -217,22 +222,10 @@ next_state(S, Res, }; next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> - #state{len = Len, messages = Messages, acks = Acks} = S, - ResultInfo = {call, erlang, element, [1, Res]}, - BQ1 = {call, erlang, element, [2, Res]}, - AckTag = {call, erlang, element, [3, ResultInfo]}, - S1 = S#state{bqstate = BQ1}, - case gb_trees:is_empty(Messages) of - true -> S1; - false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages), - S2 = S1#state{len = Len - 1, messages = M2}, - case AckReq of - true -> - S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]}; - false -> - S2 - end - end; + next_state_fetch_and_drop(S, Res, AckReq, 3); + +next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) -> + next_state_fetch_and_drop(S, Res, AckReq, 2); next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> #state{acks = AcksState} = S, @@ -295,6 +288,21 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> Len =:= 0 end; +postcondition(S, {call, ?BQMOD, drop, _Args}, Res) -> + #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, + case Res of + {{MsgIdFetched, AckTag, RemainingLen}, _BQ} -> + {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), + MsgId = eval({call, erlang, element, + [?RECORD_INDEX(id, basic_message), Msg]}), + MsgIdFetched =:= MsgId andalso + not proplists:is_defined(AckTag, Acks) andalso + not gb_sets:is_element(AckTag, Confrms) andalso + RemainingLen =:= Len - 1; + {empty, _BQ} -> + Len =:= 0 + end; + postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) -> #state{acks = Acks, confirms = Confrms} = S, not proplists:is_defined(AckTag, Acks) andalso @@ -388,6 +396,24 @@ drop_messages(Messages) -> end end. +next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) -> + #state{len = Len, messages = Messages, acks = Acks} = S, + ResultInfo = {call, erlang, element, [1, Res]}, + BQ1 = {call, erlang, element, [2, Res]}, + AckTag = {call, erlang, element, [AckTagIdx, ResultInfo]}, + S1 = S#state{bqstate = BQ1}, + case gb_trees:is_empty(Messages) of + true -> S1; + false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages), + S2 = S1#state{len = Len - 1, messages = M2}, + case AckReq of + true -> + S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]}; + false -> + S2 + end + end. + -else. -export([prop_disabled/0]). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a205b23d0b..e72cbafef7 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -39,8 +39,7 @@ -spec(recover/0 :: () -> [name()]). -spec(callback/4:: (rabbit_types:exchange(), fun_name(), - fun((boolean()) -> non_neg_integer()) | atom(), - [any()]) -> 'ok'). + fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'). -spec(declare/6 :: @@ -114,26 +113,19 @@ recover() -> [XName || #exchange{name = XName} <- Xs]. callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> - Serial = fun (Bool) -> - case Serial0 of - _ when is_atom(Serial0) -> Serial0; - _ -> Serial0(Bool) - end + Serial = if is_function(Serial0) -> Serial0; + is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, - [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) - || M <- decorators()], + [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || + M <- decorators()], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]). serialise_events(X = #exchange{type = Type}) -> - case [Serialise || M <- decorators(), - Serialise <- [M:serialise_events(X)], - Serialise == true] of - [] -> (type_to_module(Type)):serialise_events(); - _ -> true - end. + lists:any(fun (M) -> M:serialise_events(X) end, decorators()) + orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> Serial = case serialise_events(X) of diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index df733546b4..961636b118 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,7 +17,8 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2, + purge/1, publish/4, publish_delivered/4, + discard/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, @@ -270,22 +271,30 @@ drain_confirmed(State = #state { backing_queue = BQ, fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = SetDelivered, - ack_msg_id = AM }) -> + set_delivered = SetDelivered }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, case Result of empty -> {Result, State1}; - {#basic_message { id = MsgId } = Message, IsDelivered, AckTag, - Remaining} -> - ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}), + {Message, IsDelivered, AckTag, Remaining} -> + ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), IsDelivered1 = IsDelivered orelse SetDelivered > 0, - SetDelivered1 = lists:max([0, SetDelivered - 1]), - AM1 = maybe_store_acktag(AckTag, MsgId, AM), {{Message, IsDelivered1, AckTag, Remaining}, - State1 #state { set_delivered = SetDelivered1, - ack_msg_id = AM1 }} + drop(Message#basic_message.id, AckTag, State1)} + end. + +drop(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:drop(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + case Result of + empty -> + {Result, State1}; + {MsgId, AckTag, Remaining} -> + ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), + {Result, drop(MsgId, AckTag, State1)} end. ack(AckTags, State = #state { gm = GM, @@ -440,6 +449,15 @@ depth_fun() -> end) end. +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered, + ack_msg_id = AM }) -> + State #state { set_delivered = lists:max([0, SetDelivered - 1]), + ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }. + maybe_store_acktag(undefined, _MsgId, AM) -> AM; maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1ba1420f42..3ad8eb7785 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -28,7 +28,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2]). + prioritise_cast/2, prioritise_info/2, format_message_queue/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -329,6 +329,8 @@ prioritise_info(Msg, _State) -> _ -> 0 end. +format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + %% --------------------------------------------------------------------------- %% GM %% --------------------------------------------------------------------------- @@ -725,8 +727,8 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State1 = lists:foldl( fun (const, StateN = #state{backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} = - BQ:fetch(AckRequired, BQSN), + {{MsgId, AckTag, _Remaining}, BQSN1} = + BQ:drop(AckRequired, BQSN), maybe_store_ack( AckRequired, MsgId, AckTag, StateN #state { backing_queue_state = BQSN1 }) @@ -735,21 +737,6 @@ process_instruction({drop, Length, Dropped, AckRequired}, true -> State1; false -> update_delta(ToDrop - Dropped, State1) end}; -process_instruction({fetch, AckRequired, MsgId, Remaining}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }); - _ when QLen =< Remaining andalso AckRequired -> - State; - _ when QLen =< Remaining -> - update_delta(-1, State) - end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 096f949061..444c737521 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2300,6 +2300,7 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_drop/1, fun test_dropwhile/1, fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, @@ -2361,6 +2362,20 @@ test_variable_queue_ack_limiting(VQ0) -> VQ6. +test_drop(VQ0) -> + %% start by sending a messages + VQ1 = variable_queue_publish(false, 1, VQ0), + %% drop message with AckRequired = true + {{MsgId, AckTag, 0}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + true = AckTag =/= undefinded, + %% drop again -> empty + {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2), + %% requeue + {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3), + %% drop message with AckRequired = false + {{MsgId, undefined, 0}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + VQ5. + test_dropwhile(VQ0) -> Count = 10, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8a3fd9d917..208eb70f89 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0, fold/3]). @@ -255,7 +255,6 @@ q4, next_seq_id, pending_ack, - pending_ack_index, ram_ack_index, index_state, msg_store_clients, @@ -615,6 +614,16 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. +drop(AckRequired, State) -> + case queue_out(State) of + {empty, State1} -> + {empty, a(State1)}; + {{value, MsgStatus}, State1} -> + {{_Msg, _IsDelivered, AckTag, Remaining}, State2} = + internal_fetch(AckRequired, MsgStatus, State1), + {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)} + end. + ack([], State) -> {[], State}; ack(AckTags, State) -> |
