summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-12 23:42:06 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-12 23:42:06 +0100
commitbd55c33423e579a1dac6899429f283bd1d4cddac (patch)
tree03a72920d881473ed12bdc6c77b3b851365c7656
parentdc36279e3207c731e28ea0f489046ba57c3c7d24 (diff)
parent9db422ac20c8b5c995d05b0846a3379049799cd4 (diff)
downloadrabbitmq-server-git-bd55c33423e579a1dac6899429f283bd1d4cddac.tar.gz
merge default into bug25225
-rw-r--r--src/rabbit_amqqueue_process.erl59
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl30
-rw-r--r--src/rabbit_mirror_queue_slave.erl51
3 files changed, 61 insertions, 79 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d483483581..78aefdc461 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -496,32 +496,21 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
- never;
-should_confirm_message(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
+ {never, State};
+send_or_record_confirm(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- #q{q = #amqqueue{durable = true}}) ->
- {eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(#delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo},
- _State) ->
- {immediately, SenderPid, MsgSeqNo}.
-
-needs_confirming({eventually, _, _, _}) -> true;
-needs_confirming(_) -> false.
-
-maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
- State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel =
- gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
-maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ State = #q{q = #amqqueue{durable = true},
+ msg_id_to_channel = MTC}) ->
+ MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
+ {eventually, State#q{msg_id_to_channel = MTC1}};
+send_or_record_confirm(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo}, State) ->
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- State;
-maybe_record_confirm_message(_Confirm, State) ->
- State.
+ {immediately, State}.
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
@@ -549,26 +538,24 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
{false, State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, Delivered,
- State) ->
- Confirm = should_confirm_message(Delivery, State),
+deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+ Delivered, State) ->
+ {Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props,
- maybe_record_confirm_message(Confirm, State)) of
- {true, State1} ->
- State1;
+ case attempt_delivery(Delivery, Props, State1) of
+ {true, State2} ->
+ State2;
%% the next one is an optimisations
- {false, State1 = #q{ttl = 0, dlx = undefined}} ->
+ {false, State2 = #q{ttl = 0, dlx = undefined}} ->
%% fake an 'eventual' confirm from BQ; noop if not needed
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- confirm_messages([Message#basic_message.id], State1),
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ confirm_messages([Message#basic_message.id], State2),
BQS1 = BQ:discard(Message, SenderPid, BQS),
- State2#q{backing_queue_state = BQS1};
- {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ State3#q{backing_queue_state = BQS1};
+ {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
- State1#q{backing_queue_state = BQS1})
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
@@ -687,7 +674,7 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm),
+ needs_confirming = Confirm == eventually,
delivered = Delivered}.
calculate_msg_expiry(undefined) -> undefined;
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 72dcfc95fc..6cd71fc314 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -101,19 +101,25 @@
%% channel during a publish, only some of the mirrors may receive that
%% publish. As a result of this problem, the messages broadcast over
%% the gm contain published content, and thus slaves can operate
-%% successfully on messages that they only receive via the gm. The key
-%% purpose of also sending messages directly from the channels to the
-%% slaves is that without this, in the event of the death of the
-%% master, messages could be lost until a suitable slave is promoted.
+%% successfully on messages that they only receive via the gm.
%%
-%% However, that is not the only reason. For example, if confirms are
-%% in use, then there is no guarantee that every slave will see the
-%% delivery with the same msg_seq_no. As a result, the slaves have to
-%% wait until they've seen both the publish via gm, and the publish
-%% via the channel before they have enough information to be able to
-%% perform the publish to their own bq, and subsequently issue the
-%% confirm, if necessary. Either form of publish can arrive first, and
-%% a slave can be upgraded to the master at any point during this
+%% The key purpose of also sending messages directly from the channels
+%% to the slaves is that without this, in the event of the death of
+%% the master, messages could be lost until a suitable slave is
+%% promoted. However, that is not the only reason. A slave cannot send
+%% confirms for a message until it has seen it from the
+%% channel. Otherwise, it might send a confirm to a channel for a
+%% message that it might *never* receive from that channel. This can
+%% happen because new slaves join the gm ring (and thus receive
+%% messages from the master) before inserting themselves in the
+%% queue's mnesia record (which is what channels look at for routing).
+%% As it turns out, channels will simply ignore such bogus confirms,
+%% but relying on that would introduce a dangerously tight coupling.
+%%
+%% Hence the slaves have to wait until they've seen both the publish
+%% via gm, and the publish via the channel before they issue the
+%% confirm. Either form of publish can arrive first, and a slave can
+%% be upgraded to the master at any point during this
%% process. Confirms continue to be issued correctly, however.
%%
%% Because the slave is a full process, it impersonates parts of the
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 931a7f9028..f4679184a8 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -406,16 +406,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
%% If it needed confirming, it'll have
%% already been done.
Acc;
- {ok, {published, ChPid}} ->
+ {ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)};
+ {CMsN, dict:store(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
dict:erase(MsgId, MSN)};
- {ok, {confirmed, _ChPid}} ->
+ {ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
%% and then delivered and ack'd before we've
@@ -482,18 +482,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%%
%% MS contains the following three entry types:
%%
- %% a) {published, ChPid}:
+ %% a) published:
%% published via gm only; pending arrival of publication from
%% channel, maybe pending confirm.
%%
%% b) {published, ChPid, MsgSeqNo}:
%% published via gm and channel; pending confirm.
%%
- %% c) {confirmed, ChPid}:
+ %% c) confirmed:
%% published via gm only, and confirmed; pending publication
%% from channel.
%%
- %% d) discarded
+ %% d) discarded:
%% seen via gm only as discarded. Pending publication from
%% channel
%%
@@ -511,22 +511,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
- MSList = dict:to_list(MS),
- SS = dict:from_list(
- [E || E = {_MsgId, discarded} <- MSList] ++
- [{MsgId, Status}
- || {MsgId, {Status, _ChPid}} <- MSList,
- Status =:= published orelse Status =:= confirmed]),
+ St = [published, confirmed, discarded],
+ SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, AckTags, SS, MPids),
- MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
- gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
- (_, MTC0) ->
- MTC0
- end, gb_trees:empty(), MSList),
+ MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_Msgid, _Status, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MS),
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
rabbit_amqqueue_process:init_with_backing_queue_state(
@@ -637,16 +633,12 @@ maybe_enqueue_message(
MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
- {ok, {confirmed, ChPid}} ->
- %% BQ has confirmed it but we didn't know what the
- %% msg_seq_no was at the time. We do now!
+ {ok, confirmed} ->
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 };
- {ok, {published, ChPid}} ->
- %% It was published to the BQ and we didn't know the
- %% msg_seq_no so couldn't confirm it at the time.
+ {ok, published} ->
{MS1, SQ1} =
case needs_confirming(Delivery, State1) of
never -> {dict:erase(MsgId, MS),
@@ -690,20 +682,17 @@ process_instruction(
msg_id_status = MS }) ->
%% We really are going to do the publish right now, even though we
- %% may not have seen it directly from the channel. As a result, we
- %% may know that it needs confirming without knowing its
- %% msg_seq_no, which means that we can see the confirmation come
- %% back from the backing queue without knowing the msg_seq_no,
- %% which means that we're going to have to hang on to the fact
- %% that we've seen the msg_id confirmed until we can associate it
- %% with a msg_seq_no.
+ %% may not have seen it directly from the channel. But we cannot
+ %% issues confirms until the latter has happened. So we need to
+ %% keep track of the MsgId and its confirmation status in the
+ %% meantime.
State1 = ensure_monitoring(ChPid, State),
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, {published, ChPid}, MS)};
+ dict:store(MsgId, published, MS)};
{{value, Delivery = #delivery {
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } }}, MQ2} ->