summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-02 13:01:30 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-02 13:01:30 +0000
commitdacc27d78b8ff020a41a6919cd3f5a54fc9d586c (patch)
tree44d2306da308e9ed764ed218a8a1dcdd349d5710
parent7768d11214271ced5f957b4fcf3b3b6a966d8cd3 (diff)
downloadrabbitmq-server-git-dacc27d78b8ff020a41a6919cd3f5a54fc9d586c.tar.gz
Fixed confirms in HA queues. Broke slave promotion. Will fix
-rw-r--r--src/rabbit_mirror_queue_slave.erl168
1 files changed, 104 insertions, 64 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index df9a28f4a3..5c101ee2d5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -88,9 +88,8 @@
sender_queues, %% :: Pid -> MsgQ
guid_ack, %% :: Guid -> AckTag
- seen, %% Set Guid
- guid_to_channel %% for confirms
+ guid_status
}).
-define(SYNC_INTERVAL, 25). %% milliseconds
@@ -144,9 +143,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
guid_ack = dict:new(),
- seen = sets:new(),
-
- guid_to_channel = dict:new()
+ guid_status = dict:new()
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
@@ -318,35 +315,41 @@ maybe_run_queue_via_backing_queue(
{Guids, BQS1} = BQ:invoke(Mod, Fun, BQS),
confirm_messages(Guids, State #state { backing_queue_state = BQS1 }).
-record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) ->
- GTC;
-record_confirm_or_confirm(
- #delivery { sender = ChPid,
- message = #basic_message { is_persistent = true,
- guid = Guid },
- msg_seq_no = MsgSeqNo }, #amqqueue { durable = true }, GTC) ->
- dict:store(Guid, {ChPid, MsgSeqNo}, GTC);
-record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo },
- _Q, GTC) ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
- GTC.
-
-confirm_messages(Guids, State = #state { guid_to_channel = GTC }) ->
- {CMs, GTC1} =
+
+needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
+ never;
+needs_confirming(#delivery { message = #basic_message {
+ is_persistent = true } },
+ #state { q = #amqqueue { durable = true } }) ->
+ eventually;
+needs_confirming(_Delivery, _State) ->
+ immediately.
+
+confirm_messages(Guids, State = #state { guid_status = GS }) ->
+ {GS1, CMs} =
lists:foldl(
- fun(Guid, {CMs, GTC0}) ->
- case dict:find(Guid, GTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(Guid, GTC0)};
- _ ->
- {CMs, GTC0}
+ fun (Guid, {GSN, CMsN} = Acc) ->
+ %% We will never see {confirmed, ChPid} here.
+ case dict:find(Guid, GSN) of
+ error ->
+ %% If it needed confirming, it'll have
+ %% already been done.
+ Acc;
+ {ok, {published, ChPid}} ->
+ %% Still not seen it from the channel, just
+ %% record that it's been confirmed.
+ {dict:store(Guid, {confirmed, ChPid}, GSN), CMsN};
+ {ok, {published, ChPid, MsgSeqNo}} ->
+ %% Seen from both GM and Channel. Can now
+ %% confirm.
+ {dict:erase(Guid, GSN),
+ gb_trees_cons(ChPid, MsgSeqNo, CMsN)}
end
- end, {gb_trees:empty(), GTC}, Guids),
- gb_trees:map(fun(ChPid, MsgSeqNos) ->
- rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, {GS, gb_trees:empty()}, Guids),
+ gb_trees:map(fun (ChPid, MsgSeqNos) ->
+ ok = rabbit_channel:confirm(ChPid, MsgSeqNos)
end, CMs),
- State #state { guid_to_channel = GTC1 }.
+ State #state { guid_status = GS1 }.
gb_trees_cons(Key, Value, Tree) ->
case gb_trees:lookup(Key, Tree) of
@@ -363,7 +366,6 @@ promote_me(From, #state { q = Q,
backing_queue_state = BQS,
rate_timer_ref = RateTRef,
sender_queues = SQ,
- seen = Seen,
guid_ack = GA }) ->
rabbit_log:info("Promoting slave ~p for ~s~n",
[self(), rabbit_misc:rs(Q #amqqueue.name)]),
@@ -371,8 +373,9 @@ promote_me(From, #state { q = Q,
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
+ %% TODO fix up seen
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, Seen),
+ CPid, BQ, BQS, GM, sets:new()),
%% We have to do the requeue via this init because otherwise we
%% don't have access to the relevent MsgPropsFun. Also, we are
%% already in mnesia as the master queue pid. Thus we cannot just
@@ -441,59 +444,97 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
State #state { rate_timer_ref = undefined }.
maybe_enqueue_message(
- Delivery = #delivery { message = #basic_message { guid = Guid },
- sender = ChPid },
- State = #state { q = Q,
- sender_queues = SQ,
- seen = Seen,
- guid_to_channel = GTC }) ->
- case sets:is_element(Guid, Seen) of
- true ->
- GTC1 = record_confirm_or_confirm(Delivery, Q, GTC),
- State #state { guid_to_channel = GTC1,
- seen = sets:del_element(Guid, Seen) };
- false ->
+ Delivery = #delivery { message = #basic_message { guid = Guid },
+ msg_seq_no = MsgSeqNo,
+ sender = ChPid },
+ State = #state { sender_queues = SQ,
+ guid_status = GS }) ->
+ %% We will never see {published, ChPid, MsgSeqNo} here.
+ case dict:find(Guid, GS) of
+ error ->
MQ = case dict:find(ChPid, SQ) of
{ok, MQ1} -> MQ1;
error -> queue:new()
end,
SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ),
- State #state { sender_queues = SQ1 }
+ State #state { sender_queues = SQ1 };
+ {ok, {confirmed, ChPid}} ->
+ %% BQ has confirmed it but we didn't know what the
+ %% msg_seq_no was at the time. We do now!
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ State #state { guid_status = dict:erase(Guid, GS) };
+ {ok, {published, ChPid}} ->
+ %% It was published to the BQ and we didn't know the
+ %% msg_seq_no so couldn't confirm it at the time.
+ case needs_confirming(Delivery, State) of
+ never ->
+ State #state { guid_status = dict:erase(Guid, GS) };
+ eventually ->
+ State #state {
+ guid_status = dict:store(
+ Guid, {published, ChPid, MsgSeqNo}, GS) };
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ State #state { guid_status = dict:erase(Guid, GS) }
+ end
end.
process_instruction(
{publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }},
- State = #state { q = Q,
- sender_queues = SQ,
+ State = #state { sender_queues = SQ,
backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA,
- seen = Seen,
- guid_to_channel = GTC }) ->
- {SQ1, Seen1, GTC1} =
+ guid_status = GS }) ->
+
+ %% We really are going to do the publish right now, even though we
+ %% may not have seen it directly from the channel. As a result, we
+ %% may know that it needs confirming without knowing its
+ %% msg_seq_no, which means that we can see the confirmation come
+ %% back from the backing queue without knowing the msg_seq_no,
+ %% which means that we're going to have to hang on to the fact
+ %% that we've seen the guid confirmed until we can associate it
+ %% with a msg_seq_no.
+ GS1 = dict:store(Guid, {published, ChPid}, GS),
+ {SQ1, GS2} =
case dict:find(ChPid, SQ) of
error ->
- {SQ, sets:add_element(Guid, Seen), GTC};
+ {SQ, GS1};
{ok, MQ} ->
case queue:out(MQ) of
{empty, _MQ} ->
- {SQ, sets:add_element(Guid, Seen), GTC};
+ {SQ, GS1};
{{value, Delivery = #delivery {
- message = #basic_message { guid = Guid } }},
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { guid = Guid } }},
MQ1} ->
- GTC2 = record_confirm_or_confirm(Delivery, Q, GTC),
- {dict:store(ChPid, MQ1, SQ), Seen, GTC2};
+ %% We received the msg from the channel
+ %% first. Thus we need to deal with confirms
+ %% here.
+ {dict:store(ChPid, MQ1, SQ),
+ case needs_confirming(Delivery, State) of
+ never ->
+ GS;
+ eventually ->
+ dict:store(
+ Guid, {published, ChPid, MsgSeqNo}, GS);
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ GS
+ end};
{{value, #delivery {}}, _MQ1} ->
%% The instruction was sent to us before we
%% were within the mirror_pids within the
- %% amqqueue record. We'll never receive the
- %% message directly.
- {SQ, Seen, GTC}
+ %% #amqqueue{} record. We'll never receive the
+ %% message directly from the channel. And the
+ %% channel will not be expecting any confirms
+ %% from us.
+ {SQ, GS}
end
end,
- State1 = State #state { sender_queues = SQ1,
- seen = Seen1,
- guid_to_channel = GTC1 },
+
+ State1 = State #state { sender_queues = SQ1,
+ guid_status = GS2 },
{ok,
case Deliver of
false ->
@@ -507,8 +548,7 @@ process_instruction(
false -> GA
end,
State1 #state { backing_queue_state = BQS1,
- guid_ack = GA1,
- guid_to_channel = GTC1 }
+ guid_ack = GA1 }
end};
process_instruction({set_length, Length},
State = #state { backing_queue = BQ,