summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-04-07 13:29:28 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-04-07 13:29:28 +0100
commit3376cc7697174d530058fd4522417bc6037d992a (patch)
tree8081f6988eba451cb9cc7dfaf9ae5fda891c88d2
parent158f5e7a918d1ef5b16a8a9c0cd9fa097d908a15 (diff)
downloadrabbitmq-server-git-3376cc7697174d530058fd4522417bc6037d992a.tar.gz
Add BQ:discard, correct BQ:is_duplicate, finally fix the last bits of immediate delivery, though hopefully in a way which has not leaked through to the lower layers...
-rw-r--r--include/rabbit_backing_queue_spec.hrl3
-rw-r--r--src/rabbit_amqqueue_process.erl39
-rw-r--r--src/rabbit_backing_queue.erl13
-rw-r--r--src/rabbit_mirror_queue_master.erl18
-rw-r--r--src/rabbit_mirror_queue_slave.erl68
-rw-r--r--src/rabbit_variable_queue.erl5
6 files changed, 112 insertions, 34 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index f5e441dc72..b0c5f13b03 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -72,4 +72,5 @@
-spec(status/1 :: (state()) -> [{atom(), any()}]).
-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
-spec(is_duplicate/2 :: (rabbit_types:basic_message(), state()) ->
- {boolean(), state()}).
+ {'false'|'published'|'discarded', state()}).
+-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 575d69f463..79f6472db7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -524,15 +524,6 @@ attempt_delivery(Delivery = #delivery{txn = none,
_ -> ok
end,
case BQ:is_duplicate(Message, BQS) of
- {true, BQS1} ->
- %% if the message has previously been seen by the BQ then
- %% it must have been seen under the same circumstances as
- %% now: i.e. if it is now a deliver_immediately then it
- %% must have been before. Consequently, if the BQ has seen
- %% it before then it's safe to assume it's been delivered
- %% (i.e. the only thing that cares about that is
- %% deliver_immediately).
- {true, Confirm, State#q{backing_queue_state = BQS1}};
{false, BQS1} ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -553,7 +544,17 @@ attempt_delivery(Delivery = #delivery{txn = none,
{Delivered, State2} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
State#q{backing_queue_state = BQS1}),
- {Delivered, Confirm, State2}
+ {Delivered, Confirm, State2};
+ {Duplicate, BQS1} ->
+ %% if the message has previously been seen by the BQ then
+ %% it must have been seen under the same circumstances as
+ %% now: i.e. if it is now a deliver_immediately then it
+ %% must have been before.
+ Delivered = case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
end;
attempt_delivery(Delivery = #delivery{txn = Txn,
sender = ChPid,
@@ -561,13 +562,17 @@ attempt_delivery(Delivery = #delivery{txn = Txn,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
case BQ:is_duplicate(Message, BQS) of
- {true, BQS1} ->
- {true, Confirm, State#q{backing_queue_state = BQS1}};
{false, BQS1} ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
BQS1),
- {true, Confirm, State#q{backing_queue_state = BQS2}}
+ {true, Confirm, State#q{backing_queue_state = BQS2}};
+ {Duplicate, BQS1} ->
+ Delivered = case Duplicate of
+ published -> true;
+ discarded -> false
+ end,
+ {Delivered, Confirm, State#q{backing_queue_state = BQS1}}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
@@ -721,6 +726,12 @@ rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+discard_delivery(#delivery{sender = ChPid,
+ message = Message},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}.
+
reset_msg_expiry_fun(TTL) ->
fun(MsgProps) ->
MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
@@ -910,7 +921,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
{Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
reply(Delivered, case Delivered of
true -> maybe_record_confirm_message(Confirm, State1);
- false -> State1
+ false -> discard_delivery(Delivery, State1)
end);
handle_call({deliver, Delivery}, From, State) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index dfa5500e97..0bbbd559d3 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -172,9 +172,16 @@ behaviour_info(callbacks) ->
{invoke, 3},
%% Called prior to a publish or publish_delivered call. Allows
- %% the BQ to signal that it's already seen this message and thus
- %% the message should be dropped.
- {is_duplicate, 2}
+ %% the BQ to signal that it's already seen this message (and in
+ %% what capacity - i.e. was it published previously or discarded
+ %% previously) and thus the message should be dropped.
+ {is_duplicate, 2},
+
+ %% Called to inform the BQ about messages which have reached the
+ %% queue, but are not going to be further passed to BQ for some
+ %% reason. Note that this is not invoked for messages for which
+ %% BQ:is_duplicate/2 has already returned {true, BQS}.
+ {discard, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 42af4e51ec..b0a22edd21 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2]).
+ status/1, invoke/3, is_duplicate/2, discard/3]).
-export([start/1, stop/0]).
@@ -150,6 +150,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
{MsgIds1, SS1} =
lists:foldl(
fun (MsgId, {MsgIdsN, SSN}) ->
+ %% We will never see 'discarded' here
case dict:find(MsgId, SSN) of
error ->
{[MsgId | MsgIdsN], SSN};
@@ -300,7 +301,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {published, State #state { seen_status = dict:erase(MsgId, SS) }};
{ok, confirmed} ->
%% It got published when we were a slave via gm, and
%% confirmed some time after that (maybe even after
@@ -310,6 +311,15 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% need to confirm now. As above, amqqueue_process will
%% have the entry for the msg_id_to_channel mapping added
%% immediately after calling is_duplicate/2.
- {true, State #state { seen_status = dict:erase(MsgId, SS),
- confirmed = [MsgId | Confirmed] }}
+ {published, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }};
+ {ok, discarded} ->
+ {discarded, State #state { seen_status = dict:erase(MsgId, SS) }}
end.
+
+discard(Msg = #basic_message {}, ChPid,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {discard, ChPid, Msg}),
+ State#state{backing_queue_state = BQ:discard(Msg, ChPid, BQS)}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4a9dc1fe7b..628135b141 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -313,6 +313,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{MS1, CMs} =
lists:foldl(
fun (MsgId, {MSN, CMsN} = Acc) ->
+ %% We will never see 'discarded' here
case dict:find(MsgId, MSN) of
error ->
%% If it needed confirming, it'll have
@@ -395,21 +396,25 @@ promote_me(From, #state { q = Q,
%%
%% MS contains the following three entry types:
%%
- %% {published, ChPid}:
+ %% a) {published, ChPid}:
%% published via gm only; pending arrival of publication from
%% channel, maybe pending confirm.
%%
- %% {published, ChPid, MsgSeqNo}:
+ %% b) {published, ChPid, MsgSeqNo}:
%% published via gm and channel; pending confirm.
%%
- %% {confirmed, ChPid}:
+ %% c) {confirmed, ChPid}:
%% published via gm only, and confirmed; pending publication
%% from channel.
%%
- %% The two outer forms only, need to go to the master state
+ %% d) discarded
+ %% seen via gm only as discarded. Pending publication from
+ %% channel
+ %%
+ %% The forms a, c and d only, need to go to the master state
%% seen_status (SS).
%%
- %% The middle form only, needs to go through to the queue_process
+ %% The form b only, needs to go through to the queue_process
%% state to form the msg_id_to_channel mapping (MTC).
%%
%% No messages that are enqueued from SQ at this point will have
@@ -420,9 +425,12 @@ promote_me(From, #state { q = Q,
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
- SS = dict:from_list([{MsgId, Status}
- || {MsgId, {Status, _ChPid}} <- dict:to_list(MS),
- Status =:= published orelse Status =:= confirmed]),
+ MSList = dict:to_list(MS),
+ SS = dict:from_list(
+ [E || E = {_MsgId, discarded} <- MSList] ++
+ [{MsgId, Status}
+ || {MsgId, {Status, _ChPid}} <- MSList,
+ Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, SS),
@@ -528,7 +536,11 @@ maybe_enqueue_message(
immediately ->
ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
State #state { msg_id_status = dict:erase(MsgId, MS) }
- end
+ end;
+ {ok, discarded} ->
+ %% We've already heard from GM that the msg is to be
+ %% discarded. We won't see this again.
+ State #state { msg_id_status = dict:erase(MsgId, MS) }
end.
process_instruction(
@@ -559,8 +571,7 @@ process_instruction(
{{value, {Delivery = #delivery {
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}},
- MQ1} ->
+ _EnqueueOnPromotion}}, MQ1} ->
%% We received the msg from the channel
%% first. Thus we need to deal with confirms
%% here.
@@ -604,6 +615,41 @@ process_instruction(
State1 #state { backing_queue_state = BQS1,
msg_id_ack = MA1 }
end};
+process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
+ State = #state { sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_status = MS }) ->
+ %% Many of the comments around the publish head above apply here
+ %% too.
+ MS1 = dict:store(MsgId, discarded, MS),
+ {SQ1, MS2} =
+ case dict:find(ChPid, SQ) of
+ error ->
+ {SQ, MS1};
+ {ok, MQ} ->
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {SQ, MS1};
+ {{value, {#delivery {
+ message = #basic_message { id = MsgId } },
+ _EnqueueOnPromotion}}, MQ1} ->
+ %% We've already seen it from the channel,
+ %% we're not going to see this again, so don't
+ %% add it to MS
+ {dict:store(ChPid, MQ1, SQ), MS};
+ {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ1} ->
+ %% The instruction was sent to us before we
+ %% were within the mirror_pids within the
+ %% #amqqueue{} record. We'll never receive the
+ %% message directly from the channel.
+ {SQ, MS}
+ end
+ end,
+ BQS1 = BQ:discard(Msg, ChPid, BQS),
+ {ok, State #state { sender_queues = SQ1,
+ msg_id_status = MS2,
+ backing_queue_state = BQS1 }};
process_instruction({set_length, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a8f9974adc..84987c8849 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,8 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
+ status/1, invoke/3, is_duplicate/2, discard/3,
+ multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -888,6 +889,8 @@ invoke(?MODULE, Fun, State) ->
is_duplicate(_Msg, State) -> {false, State}.
+discard(_Msg, _ChPid, State) -> State.
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------