diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 112 |
2 files changed, 56 insertions, 58 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0c9eba9dfb..b3e04337f5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -539,7 +539,7 @@ attempt_delivery(#delivery{txn = Txn, deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {{invalid, seen}, _, State1} -> + {{invalid, _Bool}, _, State1} -> {true, State1}; {{valid, true}, _, State1} -> {true, State1}; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 513a8bb5b3..a1e2a49a92 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -102,62 +102,30 @@ purge(State = #state { gm = GM, set_delivered = 0 }}. publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, - State = #state { gm = GM, - backing_queue = BQ }) -> - {ok, State1} = - maybe_publish( - fun (BQS) -> - ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), - {ok, BQ:publish(Msg, MsgProps, ChPid, BQS)} - end, MsgId, State), - State1. + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State #state { backing_queue_state = BQS1 }. publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, - ChPid, State = #state { gm = GM, - backing_queue = BQ }) -> - case maybe_publish( - fun (BQS) -> - ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid, - MsgProps, Msg}), - BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS) - end, MsgId, State) of - {ok, State1} -> - %% publish_delivered but we've already published this - %% message. This means that we received the msg when we - %% were a slave but only via GM, not from the - %% channel. - %% - %% If AckRequired then we would have requeued the message - %% upon our promotion to master. Astonishingly, we think - %% we're empty, which means that someone else has already - %% consumed the message post requeue, and now we're about - %% to send it to another consumer. This could not be more - %% wrong. - -maybe_publish(Fun, MsgId, State = #state { seen_status = SS, - backing_queue_state = BQS }) -> - %% We will never see {published, ChPid, MsgSeqNo} here. - case dict:find(MsgId, SS) of - error -> - {Result, BQS1} = Fun(BQS), - {Result, State #state { backing_queue_state = BQS1 }}; - {ok, {published, ChPid}} -> - %% It already got published when we were a slave and no - %% confirmation is waiting. amqqueue_process will have - %% recorded if there's a confirm due to arrive, so can - %% delete entry. - {ok, State #state { seen_status = dict:erase(MsgId, SS) }}; - {ok, {confirmed, ChPid}} -> - %% It got confirmed before we became master, but we've - %% only just received the publish from the channel, so - %% couldn't previously know what the msg_seq_no was. Thus - %% confirm now. amqqueue_process will have recorded a - %% confirm is due immediately prior to here (and thus _it_ - %% knows the msg_id -> msg_seq_no mapping). - ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - self(), ?MODULE, fun (State1) -> {[MsgId], State1} end), - {ok, State #state { seen_status = dict:erase(MsgId, SS) }} - end. + ChPid, State = #state { gm = GM, + backing_queue = BQ, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + %% Must use confirmed_broadcast here in order to guarantee that + %% all slaves are forced to interpret this publish_delivered at + %% the same point, especially if we die and a slave is promoted. + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + ok = gm:confirmed_broadcast( + GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + State #state { backing_queue_state = BQS1 }. dropwhile(Fun, State = #state { gm = GM, backing_queue = BQ, @@ -281,7 +249,37 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ, {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), {MsgIds, State #state { backing_queue_state = BQS1 }}. -validate_message(Message, #state { backing_queue = BQ, +validate_message(Message, #state { seen_status = SS, + backing_queue = BQ, backing_queue_state = BSQ }) -> - %% this will definitely change. - BQ:validate_message(Message, BQS). + %% Here, we need to deal with the possibility that we're about to + %% receive a message that we've already seen when we were a slave + %% (we received it via gm). Thus if we do receive such message now + %% via the channel, there may be a confirm waiting to issue for + %% it. + + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, SS) of + error -> + %% We permit the underlying BQ to have a peek at it, but + %% only if we ourselves are not filtering out the msg. + BQ:validate_message(Message, BQS); + {ok, {published, ChPid}} -> + %% It already got published when we were a slave and no + %% confirmation is waiting. amqqueue_process will have, in + %% its msg_id_to_channel mapping, the entry for dealing + %% with the confirm when that comes back in, so the msg is + %% invalid, and we don't need to do anything further here. + {invalid, false}; + {ok, {confirmed, ChPid}} -> + %% It got confirmed before we became master, but we've + %% only just received the publish from the channel, so + %% couldn't previously know what the msg_seq_no was. Thus + %% confirm now. amqqueue_process will have recorded a + %% confirm is due immediately prior to here (and thus _it_ + %% knows the msg_id -> msg_seq_no mapping). + ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + self(), ?MODULE, fun (State1) -> {[MsgId], State1} end), + {ok, State #state { seen_status = dict:erase(MsgId, SS) }} + end. + |
