summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-20 13:34:59 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-20 13:34:59 +0000
commitb8a7a8dd6af079d250d187af7341a0085a89f112 (patch)
tree7f8f5db022e47f9b2240312d1b7e41f41fba21df /src
parentde0d9b6a23d1ac6bcdea17d5d201cf4d3e588faa (diff)
downloadrabbitmq-server-git-b8a7a8dd6af079d250d187af7341a0085a89f112.tar.gz
introduce bq:drop/2 and use it in slaves to prevent msg fetching
- 'drop' is the same as 'fetch' except it doesn't read messages from the msg store - slaves never fetch messages, they only drop them - technically, mq_master:drop doesn't need to exist, since 'drop' is only invoked by the slaves, but we provide an implementation for completeness.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_mirror_queue_master.erl38
-rw-r--r--src/rabbit_mirror_queue_slave.erl19
-rw-r--r--src/rabbit_variable_queue.erl12
4 files changed, 49 insertions, 28 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index af660c60a0..00de3e175c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -29,6 +29,10 @@
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
{rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+-type(drop_result(Ack) ::
+ ('empty' |
+ %% MessageId, AckTag, Remaining_Len
+ {rabbit_types:msg_id(), Ack, non_neg_integer()})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
@@ -139,6 +143,10 @@
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}.
+%% Remove the next message.
+-callback drop(true, state()) -> {drop_result(ack()), state()};
+ (false, state()) -> {drop_result(undefined), state()}.
+
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index df733546b4..961636b118 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
+ purge/1, publish/4, publish_delivered/4,
+ discard/3, fetch/2, drop/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
@@ -270,22 +271,30 @@ drain_confirmed(State = #state { backing_queue = BQ,
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+ set_delivered = SetDelivered }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
{Result, State1};
- {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
- Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
+ {Message, IsDelivered, AckTag, Remaining} ->
+ ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- SetDelivered1 = lists:max([0, SetDelivered - 1]),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
{{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1,
- ack_msg_id = AM1 }}
+ drop(Message#basic_message.id, AckTag, State1)}
+ end.
+
+drop(AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:drop(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ case Result of
+ empty ->
+ {Result, State1};
+ {MsgId, AckTag, Remaining} ->
+ ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
+ {Result, drop(MsgId, AckTag, State1)}
end.
ack(AckTags, State = #state { gm = GM,
@@ -440,6 +449,15 @@ depth_fun() ->
end)
end.
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
+ ack_msg_id = AM }) ->
+ State #state { set_delivered = lists:max([0, SetDelivered - 1]),
+ ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+
maybe_store_acktag(undefined, _MsgId, AM) -> AM;
maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index bea4758c4d..3ad8eb7785 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -727,8 +727,8 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
- BQ:fetch(AckRequired, BQSN),
+ {{MsgId, AckTag, _Remaining}, BQSN1} =
+ BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
@@ -737,21 +737,6 @@ process_instruction({drop, Length, Dropped, AckRequired},
true -> State1;
false -> update_delta(ToDrop - Dropped, State1)
end};
-process_instruction({fetch, AckRequired, MsgId, Remaining},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- _ when QLen =< Remaining andalso AckRequired ->
- State;
- _ when QLen =< Remaining ->
- update_delta(-1, State)
- end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a3fd9d917..367b4802c2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
+ dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, multiple_routing_keys/0, fold/3]).
@@ -615,6 +615,16 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {{_Msg, _IsDelivered, AckTag, Remaining}, State2} =
+ internal_fetch(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)}
+ end.
+
ack([], State) ->
{[], State};
ack(AckTags, State) ->