diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2013-07-01 16:32:17 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2013-07-01 16:32:17 +0100 |
| commit | 8ea0fe60ad33aa8f1ee0823a82640d894d54fb48 (patch) | |
| tree | aabf464948316708f8a1307e9b6f7dcd824c0641 /src | |
| parent | 367e0da95c0d7004651e3ed49944a47561d0edbf (diff) | |
| download | rabbitmq-server-git-8ea0fe60ad33aa8f1ee0823a82640d894d54fb48.tar.gz | |
Refactor
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 9 |
2 files changed, 8 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c17f8460e4..ae88385297 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -533,9 +533,7 @@ run_message_queue(State) -> is_empty(State), State), State1. -attempt_delivery(Delivery = #delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo, - message = Message}, +attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of @@ -551,14 +549,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, true, discard(Delivery, State1)} end, false, State#q{backing_queue_state = BQS1}); {published, BQS1} -> - {true, State#q{backing_queue_state = BQS1}}; + {true, State#q{backing_queue_state = BQS1}}; {discarded, BQS1} -> - State1 = State#q{backing_queue_state = BQS1}, - {true, case MsgSeqNo of - undefined -> State1; - _ -> #basic_message{id = MsgId} = Message, - confirm_messages([MsgId], State1) - end} + {true, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index f1798f5d40..3b49a6b843 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -228,7 +228,7 @@ discard(MsgId, ChPid, State = #state { gm = GM, false = dict:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {discard, ChPid, MsgId}), ensure_monitoring(ChPid, State #state { backing_queue_state = - BQ:discard(MsgId, ChPid, BQS) }. + BQ:discard(MsgId, ChPid, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> @@ -395,9 +395,10 @@ is_duplicate(Message = #basic_message { id = MsgId }, {published, State #state { seen_status = dict:erase(MsgId, SS), confirmed = [MsgId | Confirmed] }}; {ok, discarded} -> - %% Message was discarded while we were a slave. Erase - %% and let amqqueue_process confirm if necessary. - {discarded, State #state { seen_status = dict:erase(MsgId, SS) }} + %% Message was discarded while we were a slave. + %% Erase and confirm. + {discarded, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. %% --------------------------------------------------------------------------- |
