diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-16 17:39:53 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-16 17:39:53 +0100 |
| commit | a00e05261cc178cf967b7a4917e3c094a966e489 (patch) | |
| tree | 6b7d050da7cc168780b75b18c851464a330fe507 | |
| parent | 78690e72efb16c5d777af3fab4f16048d2d1a34d (diff) | |
| download | rabbitmq-server-git-a00e05261cc178cf967b7a4917e3c094a966e489.tar.gz | |
publish_delivered(false, ...) -> discard
The two really are the same.
Since 'discard' doesn't need the message or props, there is less stuff
to pass around over GM.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 59 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 21 |
5 files changed, 69 insertions, 80 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7ce6958223..9def64ff25 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -513,6 +513,14 @@ send_or_record_confirm(#delivery{sender = SenderPid, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. +discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}}, + State) -> + %% fake an 'eventual' confirm from BQ; noop if not needed + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + confirm_messages([MsgId], State), + BQS1 = BQ:discard(MsgId, SenderPid, BQS), + State1#q{backing_queue_state = BQS1}. + run_message_queue(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), @@ -521,17 +529,20 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, +attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, + Props = #message_properties{delivered = Delivered}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( - fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> + fun (true, State1 = #q{backing_queue_state = BQS2}) -> {AckTag, BQS3} = BQ:publish_delivered( - AckRequired, Message, Props, - SenderPid, BQS2), - {{Message, Props#message_properties.delivered, AckTag}, - true, State1#q{backing_queue_state = BQS3}} + Message, Props, SenderPid, BQS2), + {{Message, Delivered, AckTag}, + true, State1#q{backing_queue_state = BQS3}}; + (false, State1) -> + {{Message, Delivered, undefined}, + true, discard(Delivery, State1)} end, false, State#q{backing_queue_state = BQS1}); {published, BQS1} -> {true, State#q{backing_queue_state = BQS1}}; @@ -548,11 +559,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, State2; %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> - %% fake an 'eventual' confirm from BQ; noop if not needed - State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = - confirm_messages([Message#basic_message.id], State2), - BQS1 = BQ:discard(Message, SenderPid, BQS), - State3#q{backing_queue_state = BQS1}; + discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index c6d1778532..f257c9776f 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -84,12 +84,16 @@ %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). --callback publish_delivered(true, rabbit_types:basic_message(), +-callback publish_delivered(rabbit_types:basic_message(), rabbit_types:message_properties(), pid(), state()) - -> {ack(), state()}; - (false, rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) - -> {undefined, state()}. + -> {ack(), state()}. + +%% Called to inform the BQ about messages which have reached the +%% queue, but are not going to be further passed to BQ for some +%% reason. Note that this is may be invoked for messages for which +%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', +%% BQS}. +-callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). @@ -200,13 +204,6 @@ -callback is_duplicate(rabbit_types:basic_message(), state()) -> {'false'|'published'|'discarded', state()}. -%% Called to inform the BQ about messages which have reached the -%% queue, but are not going to be further passed to BQ for some -%% reason. Note that this is may be invoked for messages for which -%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', -%% BQS}. --callback discard(rabbit_types:basic_message(), pid(), state()) -> state(). - -else. -export([behaviour_info/1]). @@ -214,12 +211,11 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, - {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, + {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}, - {discard, 3}]; + {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 377d51868d..cce19c907a 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,11 +17,11 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). + status/1, invoke/3, is_duplicate/2, fold/3]). -export([start/1, stop/0]). @@ -183,25 +183,42 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). -publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, +publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS, ack_msg_id = AM }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast( - GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), - {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), AM1 = maybe_store_acktag(AckTag, MsgId, AM), - {AckTag, - ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, - ack_msg_id = AM1 })}. + State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }, + {AckTag, ensure_monitoring(ChPid, State1)}. + +discard(MsgId, ChPid, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> + %% It's a massive error if we get told to discard something that's + %% already been published or published-and-confirmed. To do that + %% would require non FIFO access. Hence we should not find + %% 'published' or 'confirmed' in this dict:find. + case dict:find(MsgId, SS) of + error -> + ok = gm:broadcast(GM, {discard, ChPid, MsgId}), + BQS1 = BQ:discard(MsgId, ChPid, BQS), + ensure_monitoring( + ChPid, State #state { + backing_queue_state = BQS1, + seen_status = dict:erase(MsgId, SS) }); + {ok, discarded} -> + State + end. dropwhile(Pred, AckRequired, State = #state{gm = GM, @@ -375,26 +392,6 @@ is_duplicate(Message = #basic_message { id = MsgId }, {discarded, State} end. -discard(Msg = #basic_message { id = MsgId }, ChPid, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> - %% It's a massive error if we get told to discard something that's - %% already been published or published-and-confirmed. To do that - %% would require non FIFO access. Hence we should not find - %% 'published' or 'confirmed' in this dict:find. - case dict:find(MsgId, SS) of - error -> - ok = gm:broadcast(GM, {discard, ChPid, Msg}), - ensure_monitoring( - ChPid, State #state { - backing_queue_state = BQ:discard(Msg, ChPid, BQS), - seen_status = dict:erase(MsgId, SS) }); - {ok, discarded} -> - State - end. - %% --------------------------------------------------------------------------- %% Other exported functions %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3d8bd8b408..e763bd9b0a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -696,25 +696,23 @@ publish_or_discard(Status, ChPid, MsgId, State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. -process_instruction({publish, false, ChPid, MsgProps, +process_instruction({publish, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; -process_instruction({publish, {true, AckRequired}, ChPid, MsgProps, +process_instruction({publish_delivered, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, - ChPid, BQS), - {ok, maybe_store_ack(AckRequired, MsgId, AckTag, + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {ok, maybe_store_ack(true, MsgId, AckTag, State1 #state { backing_queue_state = BQS1 })}; -process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, - State) -> +process_instruction({discard, ChPid, MsgId}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(discarded, ChPid, MsgId, State), - BQS1 = BQ:discard(Msg, ChPid, BQS), + BQS1 = BQ:discard(MsgId, ChPid, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ddb136a73d..8ff44a296d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,11 +17,11 @@ -module(rabbit_variable_queue). -export([init/3, terminate/2, delete_and_terminate/2, purge/1, - publish/4, publish_delivered/5, drain_confirmed/1, + publish/4, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). + is_duplicate/2, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -545,17 +545,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, ram_msg_count = RamMsgCount + 1, unconfirmed = UC1 })). -publish_delivered(false, #basic_message { id = MsgId }, - #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { async_callback = Callback, - len = 0 }) -> - case NeedsConfirming of - true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); - false -> ok - end, - {undefined, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, +publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, + id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, State = #vqstate { len = 0, @@ -579,6 +570,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount1, unconfirmed = UC1 }))}. +discard(_MsgId, _ChPid, State) -> State. + drain_confirmed(State = #vqstate { confirmed = C }) -> case gb_sets:is_empty(C) of true -> {[], State}; %% common case @@ -821,8 +814,6 @@ invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). is_duplicate(_Msg, State) -> {false, State}. -discard(_Msg, _ChPid, State) -> State. - %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- |
