summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-09 13:19:10 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-09 13:19:10 +0000
commitd16bd4bae07f5c6fa9c942ce62310bc721171877 (patch)
treee7874831dd7b48efc0296a6980cd192f656244b9
parent962ea81092a1ebbec6b1cbe26c5c326bab66f3ad (diff)
downloadrabbitmq-server-git-d16bd4bae07f5c6fa9c942ce62310bc721171877.tar.gz
just stashing this whilst I fix something else
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl112
2 files changed, 56 insertions, 58 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0c9eba9dfb..b3e04337f5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -539,7 +539,7 @@ attempt_delivery(#delivery{txn = Txn,
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {{invalid, seen}, _, State1} ->
+ {{invalid, _Bool}, _, State1} ->
{true, State1};
{{valid, true}, _, State1} ->
{true, State1};
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 513a8bb5b3..a1e2a49a92 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -102,62 +102,30 @@ purge(State = #state { gm = GM,
set_delivered = 0 }}.
publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ }) ->
- {ok, State1} =
- maybe_publish(
- fun (BQS) ->
- ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
- {ok, BQ:publish(Msg, MsgProps, ChPid, BQS)}
- end, MsgId, State),
- State1.
+ State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State #state { backing_queue_state = BQS1 }.
publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
- ChPid, State = #state { gm = GM,
- backing_queue = BQ }) ->
- case maybe_publish(
- fun (BQS) ->
- ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid,
- MsgProps, Msg}),
- BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS)
- end, MsgId, State) of
- {ok, State1} ->
- %% publish_delivered but we've already published this
- %% message. This means that we received the msg when we
- %% were a slave but only via GM, not from the
- %% channel.
- %%
- %% If AckRequired then we would have requeued the message
- %% upon our promotion to master. Astonishingly, we think
- %% we're empty, which means that someone else has already
- %% consumed the message post requeue, and now we're about
- %% to send it to another consumer. This could not be more
- %% wrong.
-
-maybe_publish(Fun, MsgId, State = #state { seen_status = SS,
- backing_queue_state = BQS }) ->
- %% We will never see {published, ChPid, MsgSeqNo} here.
- case dict:find(MsgId, SS) of
- error ->
- {Result, BQS1} = Fun(BQS),
- {Result, State #state { backing_queue_state = BQS1 }};
- {ok, {published, ChPid}} ->
- %% It already got published when we were a slave and no
- %% confirmation is waiting. amqqueue_process will have
- %% recorded if there's a confirm due to arrive, so can
- %% delete entry.
- {ok, State #state { seen_status = dict:erase(MsgId, SS) }};
- {ok, {confirmed, ChPid}} ->
- %% It got confirmed before we became master, but we've
- %% only just received the publish from the channel, so
- %% couldn't previously know what the msg_seq_no was. Thus
- %% confirm now. amqqueue_process will have recorded a
- %% confirm is due immediately prior to here (and thus _it_
- %% knows the msg_id -> msg_seq_no mapping).
- ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- self(), ?MODULE, fun (State1) -> {[MsgId], State1} end),
- {ok, State #state { seen_status = dict:erase(MsgId, SS) }}
- end.
+ ChPid, State = #state { gm = GM,
+ backing_queue = BQ,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ %% Must use confirmed_broadcast here in order to guarantee that
+ %% all slaves are forced to interpret this publish_delivered at
+ %% the same point, especially if we die and a slave is promoted.
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ ok = gm:confirmed_broadcast(
+ GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
+ State #state { backing_queue_state = BQS1 }.
dropwhile(Fun, State = #state { gm = GM,
backing_queue = BQ,
@@ -281,7 +249,37 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ,
{MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
-validate_message(Message, #state { backing_queue = BQ,
+validate_message(Message, #state { seen_status = SS,
+ backing_queue = BQ,
backing_queue_state = BSQ }) ->
- %% this will definitely change.
- BQ:validate_message(Message, BQS).
+ %% Here, we need to deal with the possibility that we're about to
+ %% receive a message that we've already seen when we were a slave
+ %% (we received it via gm). Thus if we do receive such message now
+ %% via the channel, there may be a confirm waiting to issue for
+ %% it.
+
+ %% We will never see {published, ChPid, MsgSeqNo} here.
+ case dict:find(MsgId, SS) of
+ error ->
+ %% We permit the underlying BQ to have a peek at it, but
+ %% only if we ourselves are not filtering out the msg.
+ BQ:validate_message(Message, BQS);
+ {ok, {published, ChPid}} ->
+ %% It already got published when we were a slave and no
+ %% confirmation is waiting. amqqueue_process will have, in
+ %% its msg_id_to_channel mapping, the entry for dealing
+ %% with the confirm when that comes back in, so the msg is
+ %% invalid, and we don't need to do anything further here.
+ {invalid, false};
+ {ok, {confirmed, ChPid}} ->
+ %% It got confirmed before we became master, but we've
+ %% only just received the publish from the channel, so
+ %% couldn't previously know what the msg_seq_no was. Thus
+ %% confirm now. amqqueue_process will have recorded a
+ %% confirm is due immediately prior to here (and thus _it_
+ %% knows the msg_id -> msg_seq_no mapping).
+ ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ self(), ?MODULE, fun (State1) -> {[MsgId], State1} end),
+ {ok, State #state { seen_status = dict:erase(MsgId, SS) }}
+ end.
+