diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-20 13:34:59 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-20 13:34:59 +0000 |
| commit | b8a7a8dd6af079d250d187af7341a0085a89f112 (patch) | |
| tree | 7f8f5db022e47f9b2240312d1b7e41f41fba21df /src | |
| parent | de0d9b6a23d1ac6bcdea17d5d201cf4d3e588faa (diff) | |
| download | rabbitmq-server-git-b8a7a8dd6af079d250d187af7341a0085a89f112.tar.gz | |
introduce bq:drop/2 and use it in slaves to prevent msg fetching
- 'drop' is the same as 'fetch' except it doesn't read messages from
the msg store
- slaves never fetch messages, they only drop them
- technically, mq_master:drop doesn't need to exist, since 'drop' is
only invoked by the slaves, but we provide an implementation for
completeness.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
4 files changed, 49 insertions, 28 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index af660c60a0..00de3e175c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -29,6 +29,10 @@ ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). +-type(drop_result(Ack) :: + ('empty' | + %% MessageId, AckTag, Remaining_Len + {rabbit_types:msg_id(), Ack, non_neg_integer()})). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: @@ -139,6 +143,10 @@ -callback fetch(true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}. +%% Remove the next message. +-callback drop(true, state()) -> {drop_result(ack()), state()}; + (false, state()) -> {drop_result(undefined), state()}. + %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as Acks. -callback ack([ack()], state()) -> {msg_ids(), state()}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index df733546b4..961636b118 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,7 +17,8 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2, + purge/1, publish/4, publish_delivered/4, + discard/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, @@ -270,22 +271,30 @@ drain_confirmed(State = #state { backing_queue = BQ, fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = SetDelivered, - ack_msg_id = AM }) -> + set_delivered = SetDelivered }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, case Result of empty -> {Result, State1}; - {#basic_message { id = MsgId } = Message, IsDelivered, AckTag, - Remaining} -> - ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}), + {Message, IsDelivered, AckTag, Remaining} -> + ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), IsDelivered1 = IsDelivered orelse SetDelivered > 0, - SetDelivered1 = lists:max([0, SetDelivered - 1]), - AM1 = maybe_store_acktag(AckTag, MsgId, AM), {{Message, IsDelivered1, AckTag, Remaining}, - State1 #state { set_delivered = SetDelivered1, - ack_msg_id = AM1 }} + drop(Message#basic_message.id, AckTag, State1)} + end. + +drop(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:drop(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + case Result of + empty -> + {Result, State1}; + {MsgId, AckTag, Remaining} -> + ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), + {Result, drop(MsgId, AckTag, State1)} end. ack(AckTags, State = #state { gm = GM, @@ -440,6 +449,15 @@ depth_fun() -> end) end. +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered, + ack_msg_id = AM }) -> + State #state { set_delivered = lists:max([0, SetDelivered - 1]), + ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }. + maybe_store_acktag(undefined, _MsgId, AM) -> AM; maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index bea4758c4d..3ad8eb7785 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -727,8 +727,8 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State1 = lists:foldl( fun (const, StateN = #state{backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} = - BQ:fetch(AckRequired, BQSN), + {{MsgId, AckTag, _Remaining}, BQSN1} = + BQ:drop(AckRequired, BQSN), maybe_store_ack( AckRequired, MsgId, AckTag, StateN #state { backing_queue_state = BQSN1 }) @@ -737,21 +737,6 @@ process_instruction({drop, Length, Dropped, AckRequired}, true -> State1; false -> update_delta(ToDrop - Dropped, State1) end}; -process_instruction({fetch, AckRequired, MsgId, Remaining}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }); - _ when QLen =< Remaining andalso AckRequired -> - State; - _ when QLen =< Remaining -> - update_delta(-1, State) - end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8a3fd9d917..367b4802c2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0, fold/3]). @@ -615,6 +615,16 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. +drop(AckRequired, State) -> + case queue_out(State) of + {empty, State1} -> + {empty, a(State1)}; + {{value, MsgStatus}, State1} -> + {{_Msg, _IsDelivered, AckTag, Remaining}, State2} = + internal_fetch(AckRequired, MsgStatus, State1), + {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)} + end. + ack([], State) -> {[], State}; ack(AckTags, State) -> |
