summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl94
-rw-r--r--src/rabbit_mirror_queue_slave.erl13
3 files changed, 75 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6aed2f8741..207f6babfb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
--export([init_with_backing_queue_state/6]).
+-export([init_with_backing_queue_state/7]).
% Queue's state
-record(q, {q,
@@ -118,7 +118,7 @@ init(Q) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries) ->
+ RateTRef, AckTags, Deliveries, GTC) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
case Owner of
none -> ok;
@@ -140,7 +140,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
expiry_timer_ref = undefined,
ttl = undefined,
stats_timer = rabbit_event:init_stats_timer(),
- guid_to_channel = dict:new()})),
+ guid_to_channel = GTC})),
lists:foldl(
fun (Delivery, StateN) ->
{_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e2f9b0208d..b05d69732c 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -37,7 +37,7 @@
backing_queue,
backing_queue_state,
set_delivered,
- seen
+ seen_status
}).
%% ---------------------------------------------------------------------------
@@ -70,15 +70,15 @@ init(#amqqueue { arguments = Args } = Q, Recover) ->
backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = 0,
- seen = sets:new() }.
+ seen_status = dict:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = BQ:len(BQS),
- seen = Seen }.
+ seen_status = SeenStatus }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -102,29 +102,61 @@ purge(State = #state { gm = GM,
set_delivered = 0 }}.
publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen = Seen }) ->
- case sets:is_element(Guid, Seen) of
- true -> State #state { seen = sets:del_element(Guid, Seen) };
- false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State #state { backing_queue_state = BQS1 }
- end.
+ State = #state { gm = GM,
+ backing_queue = BQ }) ->
+ {ok, State1} =
+ maybe_publish(
+ fun (BQS) ->
+ ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ {ok, BQ:publish(Msg, MsgProps, ChPid, BQS)}
+ end, State),
+ State1.
publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps,
- ChPid, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen = Seen }) ->
- case sets:is_element(Guid, Seen) of
- true -> State #state { seen = sets:del_element(Guid, Seen) };
- false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid,
- MsgProps, Msg}),
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg,
- MsgProps, ChPid, BQS),
- {AckTag, State #state { backing_queue_state = BQS1 }}
+ ChPid, State = #state { gm = GM,
+ backing_queue = BQ }) ->
+ case maybe_publish(
+ fun (BQS) ->
+ ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid,
+ MsgProps, Msg}),
+ BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS)
+ end, State) of
+ {ok, State1} ->
+ %% publish_delivered but we've already published this
+ %% message. This means that we received the msg when we
+ %% were a slave but only via GM, not from the
+ %% channel.
+ %%
+ %% If AckRequired then we would have requeued the message
+ %% upon our promotion to master. Astonishingly, we think
+ %% we're empty, which means that someone else has already
+ %% consumed the message post requeue, and now we're about
+ %% to send it to another consumer. This could not be more
+ %% wrong.
+
+maybe_publish(Fun, State = #state { seen_status = SS,
+ backing_queue_state = BQS }) ->
+ %% We will never see {published, ChPid, MsgSeqNo} here.
+ case dict:find(Guid, SS) of
+ error ->
+ {Result, BQS1} = Fun(BQS),
+ {Result, State #state { backing_queue_state = BQS1 }};
+ {ok, {published, ChPid}} ->
+ %% It already got published when we were a slave and no
+ %% confirmation is waiting. amqqueue_process will have
+ %% recorded if there's a confirm due to arrive, so can
+ %% delete entry.
+ {ok, State #state { seen_status = dict:erase(Guid, SS) }};
+ {ok, {confirmed, ChPid}} ->
+ %% It got confirmed before we became master, but we've
+ %% only just received the publish from the channel, so
+ %% couldn't previously know what the msg_seq_no was. Thus
+ %% confirm now. amqqueue_process will have recorded a
+ %% confirm is due immediately prior to here (and thus _it_
+ %% knows the guid -> msg_seq_no mapping).
+ ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ self(), ?MODULE, fun (State1) -> {[Guid], State1} end),
+ {ok, State #state { seen_status = dict:erase(Guid, SS) }}
end.
dropwhile(Fun, State = #state { gm = GM,
@@ -143,7 +175,7 @@ fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = SetDelivered,
- seen = Seen }) ->
+ seen_status = SS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
@@ -154,13 +186,13 @@ fetch(AckRequired, State = #state { gm = GM,
ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}),
IsDelivered1 = IsDelivered orelse SetDelivered > 0,
SetDelivered1 = lists:max([0, SetDelivered - 1]),
- Seen1 = case SetDelivered + SetDelivered1 of
- 1 -> sets:new(); %% transition to empty
- _ -> Seen
- end,
+ SS1 = case SetDelivered + SetDelivered1 of
+ 1 -> dict:new(); %% transition to empty
+ _ -> SS
+ end,
{{Message, IsDelivered1, AckTag, Remaining},
State1 #state { set_delivered = SetDelivered1,
- seen = Seen1 }}
+ seen_status = SS1 }}
end.
ack(AckTags, State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 93f885bae3..5cdae16c2b 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -133,6 +133,8 @@ handle_call({gm_deaths, Deaths}, From,
master_node = MNode }) ->
rabbit_log:info("Slave ~p saw deaths ~p for ~s~n",
[self(), Deaths, rabbit_misc:rs(QueueName)]),
+ %% The GM has told us about deaths, which means we're not going to
+ %% receive any more messages from GM
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
{ok, Pid} when node(Pid) =:= MNode ->
reply(ok, State);
@@ -332,28 +334,31 @@ promote_me(From, #state { q = Q,
backing_queue_state = BQS,
rate_timer_ref = RateTRef,
sender_queues = SQ,
- guid_ack = GA }) ->
+ guid_ack = GA,
+ guid_status = GS }) ->
rabbit_log:info("Promoting slave ~p for ~s~n",
[self(), rabbit_misc:rs(Q #amqqueue.name)]),
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
- %% TODO fix up seen
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, sets:new()),
+ CPid, BQ, BQS, GM, GS),
%% We have to do the requeue via this init because otherwise we
%% don't have access to the relevent MsgPropsFun. Also, we are
%% already in mnesia as the master queue pid. Thus we cannot just
%% publish stuff by sending it to ourself - we must pass it
%% through to this init, otherwise we can violate ordering
%% constraints.
+ GTC = dict:from_list(
+ [{Guid, {ChPid, MsgSeqNo}} ||
+ {Guid, {published, ChPid, MsgSeqNo}} <- dict:to_list(GS)]),
AckTags = [AckTag || {_Guid, AckTag} <- dict:to_list(GA)],
Deliveries = lists:append([queue:to_list(PubQ)
|| {_ChPid, PubQ} <- dict:to_list(SQ)]),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries),
+ AckTags, Deliveries, GTC),
{become, rabbit_amqqueue_process, QueueState, hibernate}.
noreply(State) ->