diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-07 13:29:28 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-07 13:29:28 +0100 |
| commit | 3376cc7697174d530058fd4522417bc6037d992a (patch) | |
| tree | 8081f6988eba451cb9cc7dfaf9ae5fda891c88d2 | |
| parent | 158f5e7a918d1ef5b16a8a9c0cd9fa097d908a15 (diff) | |
| download | rabbitmq-server-git-3376cc7697174d530058fd4522417bc6037d992a.tar.gz | |
Add BQ:discard, correct BQ:is_duplicate, finally fix the last bits of immediate delivery, though hopefully in a way which has not leaked through to the lower layers...
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 5 |
6 files changed, 112 insertions, 34 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index f5e441dc72..b0c5f13b03 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -72,4 +72,5 @@ -spec(status/1 :: (state()) -> [{atom(), any()}]). -spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). -spec(is_duplicate/2 :: (rabbit_types:basic_message(), state()) -> - {boolean(), state()}). + {'false'|'published'|'discarded', state()}). +-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 575d69f463..79f6472db7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -524,15 +524,6 @@ attempt_delivery(Delivery = #delivery{txn = none, _ -> ok end, case BQ:is_duplicate(Message, BQS) of - {true, BQS1} -> - %% if the message has previously been seen by the BQ then - %% it must have been seen under the same circumstances as - %% now: i.e. if it is now a deliver_immediately then it - %% must have been before. Consequently, if the BQ has seen - %% it before then it's safe to assume it's been delivered - %% (i.e. the only thing that cares about that is - %% deliver_immediately). - {true, Confirm, State#q{backing_queue_state = BQS1}}; {false, BQS1} -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -553,7 +544,17 @@ attempt_delivery(Delivery = #delivery{txn = none, {Delivered, State2} = deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State#q{backing_queue_state = BQS1}), - {Delivered, Confirm, State2} + {Delivered, Confirm, State2}; + {Duplicate, BQS1} -> + %% if the message has previously been seen by the BQ then + %% it must have been seen under the same circumstances as + %% now: i.e. if it is now a deliver_immediately then it + %% must have been before. + Delivered = case Duplicate of + published -> true; + discarded -> false + end, + {Delivered, Confirm, State#q{backing_queue_state = BQS1}} end; attempt_delivery(Delivery = #delivery{txn = Txn, sender = ChPid, @@ -561,13 +562,17 @@ attempt_delivery(Delivery = #delivery{txn = Txn, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case BQ:is_duplicate(Message, BQS) of - {true, BQS1} -> - {true, Confirm, State#q{backing_queue_state = BQS1}}; {false, BQS1} -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, BQS1), - {true, Confirm, State#q{backing_queue_state = BQS2}} + {true, Confirm, State#q{backing_queue_state = BQS2}}; + {Duplicate, BQS1} -> + Delivered = case Duplicate of + published -> true; + discarded -> false + end, + {Delivered, Confirm, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> @@ -721,6 +726,12 @@ rollback_transaction(Txn, C, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +discard_delivery(#delivery{sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. + reset_msg_expiry_fun(TTL) -> fun(MsgProps) -> MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} @@ -910,7 +921,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); - false -> State1 + false -> discard_delivery(Delivery, State1) end); handle_call({deliver, Delivery}, From, State) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index dfa5500e97..0bbbd559d3 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -172,9 +172,16 @@ behaviour_info(callbacks) -> {invoke, 3}, %% Called prior to a publish or publish_delivered call. Allows - %% the BQ to signal that it's already seen this message and thus - %% the message should be dropped. - {is_duplicate, 2} + %% the BQ to signal that it's already seen this message (and in + %% what capacity - i.e. was it published previously or discarded + %% previously) and thus the message should be dropped. + {is_duplicate, 2}, + + %% Called to inform the BQ about messages which have reached the + %% queue, but are not going to be further passed to BQ for some + %% reason. Note that this is not invoked for messages for which + %% BQ:is_duplicate/2 has already returned {true, BQS}. + {discard, 3} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 42af4e51ec..b0a22edd21 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2]). + status/1, invoke/3, is_duplicate/2, discard/3]). -export([start/1, stop/0]). @@ -150,6 +150,7 @@ drain_confirmed(State = #state { backing_queue = BQ, {MsgIds1, SS1} = lists:foldl( fun (MsgId, {MsgIdsN, SSN}) -> + %% We will never see 'discarded' here case dict:find(MsgId, SSN) of error -> {[MsgId | MsgIdsN], SSN}; @@ -300,7 +301,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {true, State #state { seen_status = dict:erase(MsgId, SS) }}; + {published, State #state { seen_status = dict:erase(MsgId, SS) }}; {ok, confirmed} -> %% It got published when we were a slave via gm, and %% confirmed some time after that (maybe even after @@ -310,6 +311,15 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% need to confirm now. As above, amqqueue_process will %% have the entry for the msg_id_to_channel mapping added %% immediately after calling is_duplicate/2. - {true, State #state { seen_status = dict:erase(MsgId, SS), - confirmed = [MsgId | Confirmed] }} + {published, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }}; + {ok, discarded} -> + {discarded, State #state { seen_status = dict:erase(MsgId, SS) }} end. + +discard(Msg = #basic_message {}, ChPid, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {discard, ChPid, Msg}), + State#state{backing_queue_state = BQ:discard(Msg, ChPid, BQS)}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 4a9dc1fe7b..628135b141 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -313,6 +313,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {MS1, CMs} = lists:foldl( fun (MsgId, {MSN, CMsN} = Acc) -> + %% We will never see 'discarded' here case dict:find(MsgId, MSN) of error -> %% If it needed confirming, it'll have @@ -395,21 +396,25 @@ promote_me(From, #state { q = Q, %% %% MS contains the following three entry types: %% - %% {published, ChPid}: + %% a) {published, ChPid}: %% published via gm only; pending arrival of publication from %% channel, maybe pending confirm. %% - %% {published, ChPid, MsgSeqNo}: + %% b) {published, ChPid, MsgSeqNo}: %% published via gm and channel; pending confirm. %% - %% {confirmed, ChPid}: + %% c) {confirmed, ChPid}: %% published via gm only, and confirmed; pending publication %% from channel. %% - %% The two outer forms only, need to go to the master state + %% d) discarded + %% seen via gm only as discarded. Pending publication from + %% channel + %% + %% The forms a, c and d only, need to go to the master state %% seen_status (SS). %% - %% The middle form only, needs to go through to the queue_process + %% The form b only, needs to go through to the queue_process %% state to form the msg_id_to_channel mapping (MTC). %% %% No messages that are enqueued from SQ at this point will have @@ -420,9 +425,12 @@ promote_me(From, #state { q = Q, %% this does not affect MS, nor which bits go through to SS in %% Master, or MTC in queue_process. - SS = dict:from_list([{MsgId, Status} - || {MsgId, {Status, _ChPid}} <- dict:to_list(MS), - Status =:= published orelse Status =:= confirmed]), + MSList = dict:to_list(MS), + SS = dict:from_list( + [E || E = {_MsgId, discarded} <- MSList] ++ + [{MsgId, Status} + || {MsgId, {Status, _ChPid}} <- MSList, + Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( CPid, BQ, BQS, GM, SS), @@ -528,7 +536,11 @@ maybe_enqueue_message( immediately -> ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), State #state { msg_id_status = dict:erase(MsgId, MS) } - end + end; + {ok, discarded} -> + %% We've already heard from GM that the msg is to be + %% discarded. We won't see this again. + State #state { msg_id_status = dict:erase(MsgId, MS) } end. process_instruction( @@ -559,8 +571,7 @@ process_instruction( {{value, {Delivery = #delivery { msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, - MQ1} -> + _EnqueueOnPromotion}}, MQ1} -> %% We received the msg from the channel %% first. Thus we need to deal with confirms %% here. @@ -604,6 +615,41 @@ process_instruction( State1 #state { backing_queue_state = BQS1, msg_id_ack = MA1 } end}; +process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_status = MS }) -> + %% Many of the comments around the publish head above apply here + %% too. + MS1 = dict:store(MsgId, discarded, MS), + {SQ1, MS2} = + case dict:find(ChPid, SQ) of + error -> + {SQ, MS1}; + {ok, MQ} -> + case queue:out(MQ) of + {empty, _MQ} -> + {SQ, MS1}; + {{value, {#delivery { + message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ1} -> + %% We've already seen it from the channel, + %% we're not going to see this again, so don't + %% add it to MS + {dict:store(ChPid, MQ1, SQ), MS}; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ1} -> + %% The instruction was sent to us before we + %% were within the mirror_pids within the + %% #amqqueue{} record. We'll never receive the + %% message directly from the channel. + {SQ, MS} + end + end, + BQS1 = BQ:discard(Msg, ChPid, BQS), + {ok, State #state { sender_queues = SQ1, + msg_id_status = MS2, + backing_queue_state = BQS1 }}; process_instruction({set_length, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a8f9974adc..84987c8849 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,8 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). + status/1, invoke/3, is_duplicate/2, discard/3, + multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -888,6 +889,8 @@ invoke(?MODULE, Fun, State) -> is_duplicate(_Msg, State) -> {false, State}. +discard(_Msg, _ChPid, State) -> State. + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- |
