diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-02 14:36:16 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-02 14:36:16 +0100 |
| commit | 5f349bb0659c65e3b301e01ee6b59b4fe2d4ddad (patch) | |
| tree | 4de1819c24bf8567b3b08ef06e63289d6ff408fd | |
| parent | 3f412c00b18b3a7f2e864273d3b62a4027d046fc (diff) | |
| parent | 671607f23fb4fad384610e02f2e5236350c9a637 (diff) | |
| download | rabbitmq-server-git-5f349bb0659c65e3b301e01ee6b59b4fe2d4ddad.tar.gz | |
stable to default
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 36 |
3 files changed, 19 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f08f82922d..5409a806b4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -549,10 +549,8 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, {{Message, Delivered, undefined}, true, discard(Delivery, State1)} end, false, State#q{backing_queue_state = BQS1}); - {published, BQS1} -> - {true, State#q{backing_queue_state = BQS1}}; - {discarded, BQS1} -> - {true, discard(Delivery, State#q{backing_queue_state = BQS1})} + {true, BQS1} -> + {true, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2f247448c7..bf26cb5a63 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -90,10 +90,7 @@ -> {ack(), state()}. %% Called to inform the BQ about messages which have reached the -%% queue, but are not going to be further passed to BQ for some -%% reason. Note that this may be invoked for messages for which -%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', -%% BQS}. +%% queue, but are not going to be further passed to BQ. -callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). %% Return ids of messages which have been confirmed since the last @@ -216,11 +213,10 @@ -callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). %% Called prior to a publish or publish_delivered call. Allows the BQ -%% to signal that it's already seen this message (and in what capacity -%% - i.e. was it published previously or discarded previously) and -%% thus the message should be dropped. +%% to signal that it's already seen this message, (e.g. it was published +%% or discarded previously) and thus the message should be dropped. -callback is_duplicate(rabbit_types:basic_message(), state()) - -> {'false'|'published'|'discarded', state()}. + -> {boolean(), state()}. -else. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index bcd4861acf..572cd0caa2 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -225,21 +225,10 @@ discard(MsgId, ChPid, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, seen_status = SS }) -> - %% It's a massive error if we get told to discard something that's - %% already been published or published-and-confirmed. To do that - %% would require non FIFO access. Hence we should not find - %% 'published' or 'confirmed' in this dict:find. - case dict:find(MsgId, SS) of - error -> - ok = gm:broadcast(GM, {discard, ChPid, MsgId}), - BQS1 = BQ:discard(MsgId, ChPid, BQS), - ensure_monitoring( - ChPid, State #state { - backing_queue_state = BQS1, - seen_status = dict:erase(MsgId, SS) }); - {ok, discarded} -> - State - end. + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {discard, ChPid, MsgId}), + ensure_monitoring(ChPid, State #state { backing_queue_state = + BQ:discard(MsgId, ChPid, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> @@ -393,8 +382,9 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {published, State #state { seen_status = dict:erase(MsgId, SS) }}; - {ok, confirmed} -> + {true, State #state { seen_status = dict:erase(MsgId, SS) }}; + {ok, Disposition} + when Disposition =:= confirmed %% It got published when we were a slave via gm, and %% confirmed some time after that (maybe even after %% promotion), but before we received the publish from the @@ -403,12 +393,12 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% need to confirm now. As above, amqqueue_process will %% have the entry for the msg_id_to_channel mapping added %% immediately after calling is_duplicate/2. - {published, State #state { seen_status = dict:erase(MsgId, SS), - confirmed = [MsgId | Confirmed] }}; - {ok, discarded} -> - %% Don't erase from SS here because discard/2 is about to - %% be called and we need to be able to detect this case - {discarded, State} + orelse Disposition =:= discarded -> + %% Message was discarded while we were a slave. Confirm now. + %% As above, amqqueue_process will have the entry for the + %% msg_id_to_channel mapping. + {true, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. %% --------------------------------------------------------------------------- |
