summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-22 11:24:39 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-22 11:24:39 +0000
commita3d76edcebaf3abf0f1a24b0c2c5acce122467fe (patch)
tree45a7edeff99cdfd15b6775c2c52f64688fe5152f
parent477e71bbddbd6d146d285519b0d3fe54b0ceb4a3 (diff)
parent567c69fb37cc569107b8e779d43d01d929772e5d (diff)
downloadrabbitmq-server-git-a3d76edcebaf3abf0f1a24b0c2c5acce122467fe.tar.gz
merge stable into default
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_backing_queue_qc.erl70
-rw-r--r--src/rabbit_exchange.erl22
-rw-r--r--src/rabbit_mirror_queue_master.erl38
-rw-r--r--src/rabbit_mirror_queue_slave.erl23
-rw-r--r--src/rabbit_tests.erl15
-rw-r--r--src/rabbit_variable_queue.erl13
8 files changed, 129 insertions, 72 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 68bd1c6c26..dc258fa606 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -730,8 +730,10 @@ drop_expired_messages(State = #q{dlx = DLX,
{Next, BQS2};
_ -> {Next, Msgs, BQS2} =
BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun = dead_letter_fun(expired),
- DLXFun(Msgs),
+ case Msgs of
+ [] -> ok;
+ _ -> (dead_letter_fun(expired))(Msgs)
+ end,
{Next, BQS2}
end,
ensure_ttl_timer(case Props of
@@ -1135,11 +1137,11 @@ handle_call(stat, _From, State) ->
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- IsEmpty = BQ:is_empty(BQS),
+ IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
- IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
- IfUnused and not(IsUnused) -> reply({error, in_use}, State);
+ IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
+ IfUnused and not(IsUnused) -> reply({error, in_use}, State);
true -> stop(From, {ok, BQ:len(BQS)}, State)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index af660c60a0..00de3e175c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -29,6 +29,10 @@
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
{rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+-type(drop_result(Ack) ::
+ ('empty' |
+ %% MessageId, AckTag, Remaining_Len
+ {rabbit_types:msg_id(), Ack, non_neg_integer()})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
@@ -139,6 +143,10 @@
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}.
+%% Remove the next message.
+-callback drop(true, state()) -> {drop_result(ack()), state()};
+ (false, state()) -> {drop_result(undefined), state()}.
+
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index b37fbb29e2..3168ca5c77 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -85,17 +85,19 @@ backing_queue_test(Cmds) ->
%% Commands
-%% Command frequencies are tuned so that queues are normally reasonably
-%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple
-%% and purging cause extreme queue lengths, so these have lower probabilities.
-%% Fetches are sufficiently frequent so that commands that need acktags
-%% get decent coverage.
+%% Command frequencies are tuned so that queues are normally
+%% reasonably short, but they may sometimes exceed
+%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue
+%% lengths, so these have lower probabilities. Fetches/drops are
+%% sufficiently frequent so that commands that need acktags get decent
+%% coverage.
command(S) ->
frequency([{10, qc_publish(S)},
{1, qc_publish_delivered(S)},
{1, qc_publish_multiple(S)}, %% very slow
- {15, qc_fetch(S)}, %% needed for ack and requeue
+ {9, qc_fetch(S)}, %% needed for ack and requeue
+ {6, qc_drop(S)}, %%
{15, qc_ack(S)},
{15, qc_requeue(S)},
{3, qc_set_ram_duration_target(S)},
@@ -124,6 +126,9 @@ qc_publish_delivered(#state{bqstate = BQ}) ->
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
+qc_drop(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, drop, [boolean(), BQ]}.
+
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
@@ -217,22 +222,10 @@ next_state(S, Res,
};
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
- #state{len = Len, messages = Messages, acks = Acks} = S,
- ResultInfo = {call, erlang, element, [1, Res]},
- BQ1 = {call, erlang, element, [2, Res]},
- AckTag = {call, erlang, element, [3, ResultInfo]},
- S1 = S#state{bqstate = BQ1},
- case gb_trees:is_empty(Messages) of
- true -> S1;
- false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
- S2 = S1#state{len = Len - 1, messages = M2},
- case AckReq of
- true ->
- S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
- false ->
- S2
- end
- end;
+ next_state_fetch_and_drop(S, Res, AckReq, 3);
+
+next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) ->
+ next_state_fetch_and_drop(S, Res, AckReq, 2);
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
@@ -295,6 +288,21 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
Len =:= 0
end;
+postcondition(S, {call, ?BQMOD, drop, _Args}, Res) ->
+ #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
+ case Res of
+ {{MsgIdFetched, AckTag, RemainingLen}, _BQ} ->
+ {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
+ MsgId = eval({call, erlang, element,
+ [?RECORD_INDEX(id, basic_message), Msg]}),
+ MsgIdFetched =:= MsgId andalso
+ not proplists:is_defined(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms) andalso
+ RemainingLen =:= Len - 1;
+ {empty, _BQ} ->
+ Len =:= 0
+ end;
+
postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) ->
#state{acks = Acks, confirms = Confrms} = S,
not proplists:is_defined(AckTag, Acks) andalso
@@ -388,6 +396,24 @@ drop_messages(Messages) ->
end
end.
+next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) ->
+ #state{len = Len, messages = Messages, acks = Acks} = S,
+ ResultInfo = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ AckTag = {call, erlang, element, [AckTagIdx, ResultInfo]},
+ S1 = S#state{bqstate = BQ1},
+ case gb_trees:is_empty(Messages) of
+ true -> S1;
+ false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
+ false ->
+ S2
+ end
+ end.
+
-else.
-export([prop_disabled/0]).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a205b23d0b..e72cbafef7 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -39,8 +39,7 @@
-spec(recover/0 :: () -> [name()]).
-spec(callback/4::
(rabbit_types:exchange(), fun_name(),
- fun((boolean()) -> non_neg_integer()) | atom(),
- [any()]) -> 'ok').
+ fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
@@ -114,26 +113,19 @@ recover() ->
[XName || #exchange{name = XName} <- Xs].
callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
- Serial = fun (Bool) ->
- case Serial0 of
- _ when is_atom(Serial0) -> Serial0;
- _ -> Serial0(Bool)
- end
+ Serial = if is_function(Serial0) -> Serial0;
+ is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
- [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
- || M <- decorators()],
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
+ M <- decorators()],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
serialise_events(X = #exchange{type = Type}) ->
- case [Serialise || M <- decorators(),
- Serialise <- [M:serialise_events(X)],
- Serialise == true] of
- [] -> (type_to_module(Type)):serialise_events();
- _ -> true
- end.
+ lists:any(fun (M) -> M:serialise_events(X) end, decorators())
+ orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
Serial = case serialise_events(X) of
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index df733546b4..961636b118 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
+ purge/1, publish/4, publish_delivered/4,
+ discard/3, fetch/2, drop/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
@@ -270,22 +271,30 @@ drain_confirmed(State = #state { backing_queue = BQ,
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+ set_delivered = SetDelivered }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
{Result, State1};
- {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
- Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
+ {Message, IsDelivered, AckTag, Remaining} ->
+ ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- SetDelivered1 = lists:max([0, SetDelivered - 1]),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
{{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1,
- ack_msg_id = AM1 }}
+ drop(Message#basic_message.id, AckTag, State1)}
+ end.
+
+drop(AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:drop(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ case Result of
+ empty ->
+ {Result, State1};
+ {MsgId, AckTag, Remaining} ->
+ ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
+ {Result, drop(MsgId, AckTag, State1)}
end.
ack(AckTags, State = #state { gm = GM,
@@ -440,6 +449,15 @@ depth_fun() ->
end)
end.
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
+ ack_msg_id = AM }) ->
+ State #state { set_delivered = lists:max([0, SetDelivered - 1]),
+ ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+
maybe_store_acktag(undefined, _MsgId, AM) -> AM;
maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1ba1420f42..3ad8eb7785 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -28,7 +28,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -329,6 +329,8 @@ prioritise_info(Msg, _State) ->
_ -> 0
end.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
@@ -725,8 +727,8 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
- BQ:fetch(AckRequired, BQSN),
+ {{MsgId, AckTag, _Remaining}, BQSN1} =
+ BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
@@ -735,21 +737,6 @@ process_instruction({drop, Length, Dropped, AckRequired},
true -> State1;
false -> update_delta(ToDrop - Dropped, State1)
end};
-process_instruction({fetch, AckRequired, MsgId, Remaining},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- _ when QLen =< Remaining andalso AckRequired ->
- State;
- _ when QLen =< Remaining ->
- update_delta(-1, State)
- end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 096f949061..444c737521 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2300,6 +2300,7 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_drop/1,
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
@@ -2361,6 +2362,20 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6.
+test_drop(VQ0) ->
+ %% start by sending a messages
+ VQ1 = variable_queue_publish(false, 1, VQ0),
+ %% drop message with AckRequired = true
+ {{MsgId, AckTag, 0}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ true = AckTag =/= undefinded,
+ %% drop again -> empty
+ {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
+ %% requeue
+ {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
+ %% drop message with AckRequired = false
+ {{MsgId, undefined, 0}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ VQ5.
+
test_dropwhile(VQ0) ->
Count = 10,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a3fd9d917..208eb70f89 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
+ dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, multiple_routing_keys/0, fold/3]).
@@ -255,7 +255,6 @@
q4,
next_seq_id,
pending_ack,
- pending_ack_index,
ram_ack_index,
index_state,
msg_store_clients,
@@ -615,6 +614,16 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {{_Msg, _IsDelivered, AckTag, Remaining}, State2} =
+ internal_fetch(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)}
+ end.
+
ack([], State) ->
{[], State};
ack(AckTags, State) ->