diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 94 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 13 |
3 files changed, 75 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6aed2f8741..207f6babfb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2]). --export([init_with_backing_queue_state/6]). +-export([init_with_backing_queue_state/7]). % Queue's state -record(q, {q, @@ -118,7 +118,7 @@ init(Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries) -> + RateTRef, AckTags, Deliveries, GTC) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), case Owner of none -> ok; @@ -140,7 +140,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, expiry_timer_ref = undefined, ttl = undefined, stats_timer = rabbit_event:init_stats_timer(), - guid_to_channel = dict:new()})), + guid_to_channel = GTC})), lists:foldl( fun (Delivery, StateN) -> {_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e2f9b0208d..b05d69732c 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -37,7 +37,7 @@ backing_queue, backing_queue_state, set_delivered, - seen + seen_status }). %% --------------------------------------------------------------------------- @@ -70,15 +70,15 @@ init(#amqqueue { arguments = Args } = Q, Recover) -> backing_queue = BQ, backing_queue_state = BQS, set_delivered = 0, - seen = sets:new() }. + seen_status = dict:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) -> +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, set_delivered = BQ:len(BQS), - seen = Seen }. + seen_status = SeenStatus }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -102,29 +102,61 @@ purge(State = #state { gm = GM, set_delivered = 0 }}. publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen = Seen }) -> - case sets:is_element(Guid, Seen) of - true -> State #state { seen = sets:del_element(Guid, Seen) }; - false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State #state { backing_queue_state = BQS1 } - end. + State = #state { gm = GM, + backing_queue = BQ }) -> + {ok, State1} = + maybe_publish( + fun (BQS) -> + ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + {ok, BQ:publish(Msg, MsgProps, ChPid, BQS)} + end, State), + State1. publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps, - ChPid, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen = Seen }) -> - case sets:is_element(Guid, Seen) of - true -> State #state { seen = sets:del_element(Guid, Seen) }; - false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid, - MsgProps, Msg}), - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, - MsgProps, ChPid, BQS), - {AckTag, State #state { backing_queue_state = BQS1 }} + ChPid, State = #state { gm = GM, + backing_queue = BQ }) -> + case maybe_publish( + fun (BQS) -> + ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid, + MsgProps, Msg}), + BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS) + end, State) of + {ok, State1} -> + %% publish_delivered but we've already published this + %% message. This means that we received the msg when we + %% were a slave but only via GM, not from the + %% channel. + %% + %% If AckRequired then we would have requeued the message + %% upon our promotion to master. Astonishingly, we think + %% we're empty, which means that someone else has already + %% consumed the message post requeue, and now we're about + %% to send it to another consumer. This could not be more + %% wrong. + +maybe_publish(Fun, State = #state { seen_status = SS, + backing_queue_state = BQS }) -> + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(Guid, SS) of + error -> + {Result, BQS1} = Fun(BQS), + {Result, State #state { backing_queue_state = BQS1 }}; + {ok, {published, ChPid}} -> + %% It already got published when we were a slave and no + %% confirmation is waiting. amqqueue_process will have + %% recorded if there's a confirm due to arrive, so can + %% delete entry. + {ok, State #state { seen_status = dict:erase(Guid, SS) }}; + {ok, {confirmed, ChPid}} -> + %% It got confirmed before we became master, but we've + %% only just received the publish from the channel, so + %% couldn't previously know what the msg_seq_no was. Thus + %% confirm now. amqqueue_process will have recorded a + %% confirm is due immediately prior to here (and thus _it_ + %% knows the guid -> msg_seq_no mapping). + ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + self(), ?MODULE, fun (State1) -> {[Guid], State1} end), + {ok, State #state { seen_status = dict:erase(Guid, SS) }} end. dropwhile(Fun, State = #state { gm = GM, @@ -143,7 +175,7 @@ fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, set_delivered = SetDelivered, - seen = Seen }) -> + seen_status = SS }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, case Result of @@ -154,13 +186,13 @@ fetch(AckRequired, State = #state { gm = GM, ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}), IsDelivered1 = IsDelivered orelse SetDelivered > 0, SetDelivered1 = lists:max([0, SetDelivered - 1]), - Seen1 = case SetDelivered + SetDelivered1 of - 1 -> sets:new(); %% transition to empty - _ -> Seen - end, + SS1 = case SetDelivered + SetDelivered1 of + 1 -> dict:new(); %% transition to empty + _ -> SS + end, {{Message, IsDelivered1, AckTag, Remaining}, State1 #state { set_delivered = SetDelivered1, - seen = Seen1 }} + seen_status = SS1 }} end. ack(AckTags, State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 93f885bae3..5cdae16c2b 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -133,6 +133,8 @@ handle_call({gm_deaths, Deaths}, From, master_node = MNode }) -> rabbit_log:info("Slave ~p saw deaths ~p for ~s~n", [self(), Deaths, rabbit_misc:rs(QueueName)]), + %% The GM has told us about deaths, which means we're not going to + %% receive any more messages from GM case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of {ok, Pid} when node(Pid) =:= MNode -> reply(ok, State); @@ -332,28 +334,31 @@ promote_me(From, #state { q = Q, backing_queue_state = BQS, rate_timer_ref = RateTRef, sender_queues = SQ, - guid_ack = GA }) -> + guid_ack = GA, + guid_status = GS }) -> rabbit_log:info("Promoting slave ~p for ~s~n", [self(), rabbit_misc:rs(Q #amqqueue.name)]), {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), - %% TODO fix up seen MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, sets:new()), + CPid, BQ, BQS, GM, GS), %% We have to do the requeue via this init because otherwise we %% don't have access to the relevent MsgPropsFun. Also, we are %% already in mnesia as the master queue pid. Thus we cannot just %% publish stuff by sending it to ourself - we must pass it %% through to this init, otherwise we can violate ordering %% constraints. + GTC = dict:from_list( + [{Guid, {ChPid, MsgSeqNo}} || + {Guid, {published, ChPid, MsgSeqNo}} <- dict:to_list(GS)]), AckTags = [AckTag || {_Guid, AckTag} <- dict:to_list(GA)], Deliveries = lists:append([queue:to_list(PubQ) || {_ChPid, PubQ} <- dict:to_list(SQ)]), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries), + AckTags, Deliveries, GTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> |
